avatar

目录
goroutinue-limit

之前写了一段代码,没有限制并发量,直接根据传入数组的长度,去开并发进行后面的连接数据库进行请求。本来 run 着没啥事,一般也就几个长度的数据,谁知道那天使用方抽风,直接传入了一个长度为 480 的数组,
导致直接把后端打挂了。

所以紧急修复了一下,使用了Bufferd Channel + sync.WaitGroup 实现并发控制, 同时使用了 errgroup 去捕获错误信息,关于 errgroup,可以参考之前记录的这篇文章

简易代码如下:

go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"errors"
"fmt"
"time"

"golang.org/x/sync/errgroup"
)

func main() {
// 构建测试数组
testArray := make([]int, 0)
for i := 1; i <= 100; i++ {
testArray = append(testArray, i)
}

var (
eg errgroup.Group
limiterCh = make(chan bool, 10)
resultCh = make(chan int, 10) // 这里有缓冲无缓冲均可,但是建议有缓冲,且也为10个
)
start := time.Now().Unix()
go func() {
for v := range resultCh {
fmt.Println("result:", v)
}
}()

for _, v := range testArray {
value := v
limiterCh <- true
eg.Go(func() error {

defer func() {
resultCh <- value
<-limiterCh
}()

time.Sleep(time.Second)

if value == 4 {
return errors.New("value = 4")
}

if value == 17 {
return errors.New("value = 17")
}

return nil
})
}

if err := eg.Wait(); err != nil {
fmt.Println("err", err)
}
close(limiterCh)
close(resultCh)
fmt.Println("run time is:",time.Now().Unix() - start,",expected exec time is:", len(testArray)/10)
}

这段代码中,sync.WaitGroup 主要用来管理 goroutinue ,而 limiterCh 这个带有缓冲的通道则是用来控制并发数。

由于通道的阻塞特性,当并发数达到规定的阈值(创建通道时指定的缓冲容量)之后, limiterCh<-true就会阻塞,其后面的eg.Go() 就不会被执行到,达到了暂停产生 goroutinue 的效果,实现了对 goroutinue 并发数的控制。

当其中一个println()完成的时候,从 limiterCh 中释放了一个空位,此时,被 limiterCh 所阻塞的 for 循环得以继续执行,从而继续产生新的 goroutinue

从输出结果的时间上,可以明显感受到这个过程。

文章作者: Viola Tangxl
文章链接: https://violatangxl.github.io/2020/07/23/goroutinue-limit/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 椰子是只猫
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论