Go by Example - Worker Pools, WaitGroups, Rate Limiting, Atomic Counters, Mutexes and Stateful Goroutines

Go by Example - Worker Pools, WaitGroups, Rate Limiting, Atomic Counters, Mutexes and Stateful Goroutines

介绍go中的Worker Pools, WaitGroups, Rate Limiting, Atomic Counters, Mutexes 和 Stateful Goroutines

Worker Pools

在本示例中,我们将了解如何使用 goroutines 和通道实现 Worker 池。

worker-pools.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"time"
)

// 这是 Worker,我们将同时运行多个实例
// 这些 Worker 将在作业通道上接收作业,并在结果通道上发送相应的结果
// 我们将为每个作业休眠一秒钟,以模拟昂贵的任务
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}

func main() {
// 为了使用我们的 worker 池,我们需要向他们发送工作并收集他们的结果
// 我们为此制作了2个 channel
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)

// 这将启动 3 个 worker,最初由于还没有 job 而受阻
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}

// 在这里,我们发送 5 个任务,然后关闭该通道,以表示这是我们所有的工作
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)

// 最后,我们收集所有工作结果。这也确保了工作程序已经完成
// 等待多个 goroutines 的另一种方法是使用 WaitGroup
for a := 1; a <= numJobs; a++ {
<-results
}
}

我们正在运行的程序显示了不同工作者正在执行的 5 项工作。由于有 3 个工作线程同时工作,因此尽管总工作时间约为 5 秒,但程序仅耗时约 2 秒。

log
1
2
3
4
5
6
7
8
9
10
11
12
$ time go run worker-pools.go
worker 1 started job 1
worker 2 started job 2
worker 3 started job 3
worker 1 finished job 1
worker 1 started job 4
worker 2 finished job 2
worker 2 started job 5
worker 3 finished job 3
worker 1 finished job 4
worker 2 finished job 5
real 0m2.358s

WaitGroups

要等待多个goroutine完成,我们可以使用一个等待组。

waitgroups.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"sync"
"time"
)

// 这是我们将在每个 goroutine 中运行的函数
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)

// 使用time.Sleep来模拟一个花费巨大的任务
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}

func main() {
// 该 WaitGroup 用于等待在此启动的所有 goroutines 结束
// 注意:如果要明确地将 WaitGroup 传递给函数,则应使用指针
var wg sync.WaitGroup

// 启动多个goroutine,并为每个Goroutine增加WaitGroup计数器
for i := 1; i <= 5; i++ {
wg.Add(1)

// 将 Worker 调用包裹在一个闭包中,确保告诉 WaitGroup 该 Worker 已经完成
// 这样,worker 本身就不必知道其执行过程中涉及的并发原语
go func() {
defer wg.Done()
worker(i)
}()
}

// 阻塞,直到 WaitGroup 计数器回到 0;所有工作者通知他们已经完成
wg.Wait()

}

需要注意的是,这种方法无法直接从 Worker 传播错误。对于更高级的用例,请考虑使用 errgroup 软件包

log
1
2
3
4
5
6
7
8
9
10
11
$ go run waitgroups.go
Worker 5 starting
Worker 3 starting
Worker 4 starting
Worker 1 starting
Worker 2 starting
Worker 4 done
Worker 1 done
Worker 2 done
Worker 5 done
Worker 3 done

对于每次调用,工作线程的启动和结束顺序可能不同。

Rate Limiting

速率限制是控制资源利用率和保持服务质量的重要机制。Go 可通过 goroutines、channel 和 tickers 出色地支持速率限制。

rate-limiting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"fmt"
"time"
)

func main() {
// 首先,我们来看看基本的速率限制
// 假设我们想限制对传入请求的处理。我们将通过一个同名通道来处理这些请求
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)

// 该限制器通道将每 200 毫秒接收一个值。这是我们速率限制方案中的调节器
limiter := time.Tick(200 * time.Millisecond)

// 通过在提供每个请求之前阻断来自限制器通道的接收,我们将自己的请求限制在每 200 毫秒一个
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}

// 我们可能希望在速率限制方案中允许短时间的请求,同时保持总体速率限制
// 我们可以通过缓冲限制器通道来实现这一目的。这个 burstyLimiter 通道将允许最多 3 个事件的突发
burstyLimiter := make(chan time.Time, 3)

// 填满通道,表示允许爆裂
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}

// 每隔 200 毫秒,我们会尝试为 burstyLimiter 添加一个新值,直到其上限为 3
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()

// 现在再模拟 5 个传入请求。其中前 3 个将受益于 burstyLimiter 的突发能力
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}

在运行我们的程序时,我们看到第一批请求每隔约 200 毫秒处理一次。

log
1
2
3
4
5
6
7
8
9
10
11
$ go run rate-limiting.go
request 1 2012-10-19 00:38:18.687438 +0000 UTC
request 2 2012-10-19 00:38:18.887471 +0000 UTC
request 3 2012-10-19 00:38:19.087238 +0000 UTC
request 4 2012-10-19 00:38:19.287338 +0000 UTC
request 5 2012-10-19 00:38:19.487331 +0000 UTC
request 1 2012-10-19 00:38:20.487578 +0000 UTC
request 2 2012-10-19 00:38:20.487645 +0000 UTC
request 3 2012-10-19 00:38:20.487676 +0000 UTC
request 4 2012-10-19 00:38:20.687483 +0000 UTC
request 5 2012-10-19 00:38:20.887542 +0000 UTC

对于第二批请求,由于可突发速率限制,我们会立即处理前 3 个请求,然后在每个请求延迟约 200 毫秒后处理其余 2 个请求。

Atomic Counters

Go 中管理状态的主要机制是通过通道进行通信。例如,我们在 Worker 池中就看到了这一点。不过还有其他一些管理状态的方法。在这里,我们将看看如何使用 sync/atomic 包来处理多个 goroutines 访问的原子计数器。

atomic-counters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {
// 我们将使用原子整数类型来表示我们的(总是正数的)计数器
var ops atomic.Uint64

// WaitGroup 可以帮助我们等待所有 goroutines 完成工作
var wg sync.WaitGroup

// 我们将启动 50 个 goroutines,每个 goroutines 恰好将计数器递增 1000 次
for i := 0; i < 50; i++ {
wg.Add(1)

go func() {
for c := 0; c < 1000; c++ {

// 我们使用 Add 对计数器进行原子递增
ops.Add(1)
}

wg.Done()
}()
}

// 等待所有的goroutine完成
wg.Wait()

// 在这里,没有 goroutines 向 "ops "写入数据,但使用 Load 可以安全地原子读取数据,即使其他 goroutines 正在(原子)更新它
fmt.Println("ops:", ops.Load())
}

我们希望得到精确的 50,000 次操作。如果我们使用一个非原子整数,并用 ops++ 递增,我们很可能会得到一个不同的数字,而且在运行过程中会不断变化,因为 goroutines 会相互干扰。此外,在使用-race 标志运行时,我们还会出现数据竞赛失败。

log
1
2
$ go run atomic-counters.go
ops: 50000

接下来,我们将了解另一种管理状态的工具--互斥。

Mutexes

在上一个示例中,我们看到了如何使用原子操作管理简单的计数器状态。对于更复杂的状态,我们可以使用互斥来安全地跨多个程序访问数据。

mutexes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"sync"
)

// Container 包含一个计数器映射;由于我们希望通过多个程序并发更新计数器映射,因此我们添加了一个 Mutex 来同步访问
// 需要注意的是,Mutex 不能被复制,因此如果要传递该结构,应使用指针
type Container struct {
mu sync.Mutex
counters map[string]int
}

// 在访问计数器前锁定互斥;在函数结束时使用 defer 语句解锁互斥
func (c *Container) inc(name string) {

c.mu.Lock()
defer c.mu.Unlock()
c.counters[name]++
}

func main() {
// 请注意,互斥器的零值可以原样使用,因此这里不需要初始化
c := Container{

counters: map[string]int{"a": 0, "b": 0},
}

var wg sync.WaitGroup

// 该函数在一个循环中递增一个指定的计数器
doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}

// 同时运行多个goroutine;请注意,它们都访问相同的Container,其中两个访问相同的计数器
wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)

// 等待goroutines结束
wg.Wait()
fmt.Println(c.counters)
}

运行程序后,计数器如期更新。

log
1
2
$ go run mutexes.go
map[a:20000 b:10000]

接下来,我们将看看如何仅使用 goroutines 和通道来实现相同的状态管理任务。

Stateful Goroutines

在前面的示例中,我们使用显式锁定和互斥来同步多个 goroutines 对共享状态的访问。另一种方法是使用 goroutines 和通道的内置同步功能来实现相同的结果。这种基于通道的方法与 Go 通过通信共享内存的想法一致,而且每段数据都正好由一个 goroutine 拥有。

stateful-goroutines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)

// 在本例中,我们的状态将由单个 goroutine 拥有。这将保证数据不会因并发访问而损坏
// 为了读取或写入状态,其他 goroutine 将向拥有状态的 goroutine 发送信息,并接收相应的回复
// 这些 readOp 和 writeOp 结构封装了这些请求以及拥有该状态的 goroutine 响应的方式
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}

func main() {
// 如前所述,我们将计算执行的操作数
var readOps uint64
var writeOps uint64

// 其他goroutine将使用读取和写入通道来分别发出读取和写入请求
reads := make(chan readOp)
writes := make(chan writeOp)

// 这里是拥有状态的 goroutine,和上一个示例一样,它是一个映射,但现在对有状态 goroutine 来说是私有的
// 这个 goroutine 会在读取和写入通道上重复选择,并在请求到达时做出响应
// 执行响应时,首先执行请求的操作,然后在响应通道 resp 上发送一个值来表示成功(如果是读取,则发送期望值)
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()

// 这将启动 100 个 goroutine,通过 reads 通道向拥有状态的 goroutine 发出读取指令
// 每次读取都需要构建一个 readOp,通过 reads 通道发送,然后通过提供的 resp 通道接收结果
for r := 0; r < 100; r++ {
go func() {
for {
read := readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}

// 我们也使用类似的方法开始10次写入
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}

// 让goroutines工作一秒钟
time.Sleep(time.Second)

// 最后,采集并报告操作次数
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}

运行我们的程序后发现,基于 goroutine 的状态管理示例总共完成了约 80,000 次操作。

log
1
2
3
$ go run stateful-goroutines.go
readOps: 71708
writeOps: 7177

在这种特殊情况下,基于例程的方法比基于互斥的方法更复杂。但在某些情况下,例如涉及其他通道或管理多个此类互斥体容易出错时,这种方法可能很有用。你应该使用感觉最自然的方法,尤其是在了解程序的正确性方面。

参考链接

Go by Example - Worker Pools, WaitGroups, Rate Limiting, Atomic Counters, Mutexes and Stateful Goroutines

https://blog.wty.cool/2024/03/24/go_by_example/Worker_Pools-WaitGroups-Rate_Limiting-Atomic_Counters-Mutexes-Stateful_Goroutines/

作者

孤独小狼

发布于

2024-03-24

更新于

2024-03-24

许可协议

评论