介绍go中的Worker Pools, WaitGroups, Rate Limiting, Atomic Counters, Mutexes 和 Stateful Goroutines
Worker Pools
在本示例中,我们将了解如何使用 goroutines 和通道实现 Worker 池。
worker-pools.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package main
import ( "fmt" "time" )
func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Println("worker", id, "started job", j) time.Sleep(time.Second) fmt.Println("worker", id, "finished job", j) results <- j * 2 } }
func main() { const numJobs = 5 jobs := make(chan int, numJobs) results := make(chan int, numJobs)
for w := 1; w <= 3; w++ { go worker(w, jobs, results) }
for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs)
for a := 1; a <= numJobs; a++ { <-results } }
|
我们正在运行的程序显示了不同工作者正在执行的 5 项工作。由于有 3 个工作线程同时工作,因此尽管总工作时间约为 5 秒,但程序仅耗时约 2 秒。
log1 2 3 4 5 6 7 8 9 10 11 12
| $ time go run worker-pools.go worker 1 started job 1 worker 2 started job 2 worker 3 started job 3 worker 1 finished job 1 worker 1 started job 4 worker 2 finished job 2 worker 2 started job 5 worker 3 finished job 3 worker 1 finished job 4 worker 2 finished job 5 real 0m2.358s
|
WaitGroups
要等待多个goroutine完成,我们可以使用一个等待组。
waitgroups.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package main
import ( "fmt" "sync" "time" )
func worker(id int) { fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) }
func main() { var wg sync.WaitGroup
for i := 1; i <= 5; i++ { wg.Add(1)
go func() { defer wg.Done() worker(i) }() }
wg.Wait()
}
|
需要注意的是,这种方法无法直接从 Worker 传播错误。对于更高级的用例,请考虑使用 errgroup 软件包。
log1 2 3 4 5 6 7 8 9 10 11
| $ go run waitgroups.go Worker 5 starting Worker 3 starting Worker 4 starting Worker 1 starting Worker 2 starting Worker 4 done Worker 1 done Worker 2 done Worker 5 done Worker 3 done
|
对于每次调用,工作线程的启动和结束顺序可能不同。
Rate Limiting
速率限制是控制资源利用率和保持服务质量的重要机制。Go 可通过 goroutines、channel 和 tickers 出色地支持速率限制。
rate-limiting.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package main
import ( "fmt" "time" )
func main() { requests := make(chan int, 5) for i := 1; i <= 5; i++ { requests <- i } close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests { <-limiter fmt.Println("request", req, time.Now()) }
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ { burstyLimiter <- time.Now() }
go func() { for t := range time.Tick(200 * time.Millisecond) { burstyLimiter <- t } }()
burstyRequests := make(chan int, 5) for i := 1; i <= 5; i++ { burstyRequests <- i } close(burstyRequests) for req := range burstyRequests { <-burstyLimiter fmt.Println("request", req, time.Now()) } }
|
在运行我们的程序时,我们看到第一批请求每隔约 200 毫秒处理一次。
log1 2 3 4 5 6 7 8 9 10 11
| $ go run rate-limiting.go request 1 2012-10-19 00:38:18.687438 +0000 UTC request 2 2012-10-19 00:38:18.887471 +0000 UTC request 3 2012-10-19 00:38:19.087238 +0000 UTC request 4 2012-10-19 00:38:19.287338 +0000 UTC request 5 2012-10-19 00:38:19.487331 +0000 UTC request 1 2012-10-19 00:38:20.487578 +0000 UTC request 2 2012-10-19 00:38:20.487645 +0000 UTC request 3 2012-10-19 00:38:20.487676 +0000 UTC request 4 2012-10-19 00:38:20.687483 +0000 UTC request 5 2012-10-19 00:38:20.887542 +0000 UTC
|
对于第二批请求,由于可突发速率限制,我们会立即处理前 3 个请求,然后在每个请求延迟约 200 毫秒后处理其余 2 个请求。
Atomic Counters
Go 中管理状态的主要机制是通过通道进行通信。例如,我们在 Worker 池中就看到了这一点。不过还有其他一些管理状态的方法。在这里,我们将看看如何使用 sync/atomic 包来处理多个 goroutines 访问的原子计数器。
atomic-counters.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package main
import ( "fmt" "sync" "sync/atomic" )
func main() { var ops atomic.Uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ { wg.Add(1)
go func() { for c := 0; c < 1000; c++ {
ops.Add(1) }
wg.Done() }() }
wg.Wait()
fmt.Println("ops:", ops.Load()) }
|
我们希望得到精确的 50,000 次操作。如果我们使用一个非原子整数,并用 ops++ 递增,我们很可能会得到一个不同的数字,而且在运行过程中会不断变化,因为 goroutines 会相互干扰。此外,在使用-race 标志运行时,我们还会出现数据竞赛失败。
log1 2
| $ go run atomic-counters.go ops: 50000
|
接下来,我们将了解另一种管理状态的工具--互斥。
Mutexes
在上一个示例中,我们看到了如何使用原子操作管理简单的计数器状态。对于更复杂的状态,我们可以使用互斥来安全地跨多个程序访问数据。
mutexes.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package main
import ( "fmt" "sync" )
type Container struct { mu sync.Mutex counters map[string]int }
func (c *Container) inc(name string) {
c.mu.Lock() defer c.mu.Unlock() c.counters[name]++ }
func main() { c := Container{
counters: map[string]int{"a": 0, "b": 0}, }
var wg sync.WaitGroup
doIncrement := func(name string, n int) { for i := 0; i < n; i++ { c.inc(name) } wg.Done() }
wg.Add(3) go doIncrement("a", 10000) go doIncrement("a", 10000) go doIncrement("b", 10000)
wg.Wait() fmt.Println(c.counters) }
|
运行程序后,计数器如期更新。
log1 2
| $ go run mutexes.go map[a:20000 b:10000]
|
接下来,我们将看看如何仅使用 goroutines 和通道来实现相同的状态管理任务。
Stateful Goroutines
在前面的示例中,我们使用显式锁定和互斥来同步多个 goroutines 对共享状态的访问。另一种方法是使用 goroutines 和通道的内置同步功能来实现相同的结果。这种基于通道的方法与 Go 通过通信共享内存的想法一致,而且每段数据都正好由一个 goroutine 拥有。
stateful-goroutines.go1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| package main
import ( "fmt" "math/rand" "sync/atomic" "time" )
type readOp struct { key int resp chan int } type writeOp struct { key int val int resp chan bool }
func main() { var readOps uint64 var writeOps uint64
reads := make(chan readOp) writes := make(chan writeOp)
go func() { var state = make(map[int]int) for { select { case read := <-reads: read.resp <- state[read.key] case write := <-writes: state[write.key] = write.val write.resp <- true } } }()
for r := 0; r < 100; r++ { go func() { for { read := readOp{ key: rand.Intn(5), resp: make(chan int)} reads <- read <-read.resp atomic.AddUint64(&readOps, 1) time.Sleep(time.Millisecond) } }() }
for w := 0; w < 10; w++ { go func() { for { write := writeOp{ key: rand.Intn(5), val: rand.Intn(100), resp: make(chan bool)} writes <- write <-write.resp atomic.AddUint64(&writeOps, 1) time.Sleep(time.Millisecond) } }() }
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps) fmt.Println("readOps:", readOpsFinal) writeOpsFinal := atomic.LoadUint64(&writeOps) fmt.Println("writeOps:", writeOpsFinal) }
|
运行我们的程序后发现,基于 goroutine 的状态管理示例总共完成了约 80,000 次操作。
log1 2 3
| $ go run stateful-goroutines.go readOps: 71708 writeOps: 7177
|
在这种特殊情况下,基于例程的方法比基于互斥的方法更复杂。但在某些情况下,例如涉及其他通道或管理多个此类互斥体容易出错时,这种方法可能很有用。你应该使用感觉最自然的方法,尤其是在了解程序的正确性方面。
参考链接