之前写了一段代码,没有限制并发量,直接根据传入数组的长度,去开并发进行后面的连接数据库进行请求。本来 run 着没啥事,一般也就几个长度的数据,谁知道那天使用方抽风,直接传入了一个长度为 480 的数组,
导致直接把后端打挂了。
所以紧急修复了一下,使用了Bufferd Channel
+ sync.WaitGroup
实现并发控制, 同时使用了 errgroup
去捕获错误信息,关于 errgroup
,可以参考之前记录的这篇文章
简易代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package main
import ( "errors" "fmt" "time"
"golang.org/x/sync/errgroup" )
func main() { testArray := make([]int, 0) for i := 1; i <= 100; i++ { testArray = append(testArray, i) }
var ( eg errgroup.Group limiterCh = make(chan bool, 10) resultCh = make(chan int, 10) ) start := time.Now().Unix() go func() { for v := range resultCh { fmt.Println("result:", v) } }()
for _, v := range testArray { value := v limiterCh <- true eg.Go(func() error {
defer func() { resultCh <- value <-limiterCh }()
time.Sleep(time.Second)
if value == 4 { return errors.New("value = 4") }
if value == 17 { return errors.New("value = 17") }
return nil }) }
if err := eg.Wait(); err != nil { fmt.Println("err", err) } close(limiterCh) close(resultCh) fmt.Println("run time is:",time.Now().Unix() - start,",expected exec time is:", len(testArray)/10) }
|
这段代码中,sync.WaitGroup
主要用来管理 goroutinue
,而 limiterCh
这个带有缓冲的通道则是用来控制并发数。
由于通道的阻塞特性,当并发数达到规定的阈值(创建通道时指定的缓冲容量)之后, limiterCh<-true
就会阻塞,其后面的eg.Go()
就不会被执行到,达到了暂停产生 goroutinue
的效果,实现了对 goroutinue
并发数的控制。
当其中一个println()
完成的时候,从 limiterCh
中释放了一个空位,此时,被 limiterCh
所阻塞的 for
循环得以继续执行,从而继续产生新的 goroutinue
。
从输出结果的时间上,可以明显感受到这个过程。