Go实现并发扇入,批量扇出功能

2022/3/5 实践总结Channel

# 背景

有上万台边缘机器,每台都会有多个agent客户端,并且每个agent都会同时向中心系统上报数据,由于上报数据频繁,并发量也大,每个agent都频繁和中心建立连接,导致中心压力非常大,所以需要对此进行优化,对每台机器上的agent上报数据做聚合,批量进行上报,减少边缘和中心的上报频率,从而减轻中心压力。

# 方案

  • 边缘增加一个batch中间件,所有agent上报的数据由原来直接向中心上报变为向batch中间件上报
  • batch中间件会聚合请求,再分批向中心上报
  • 上报的方式也由原来的同步也改为异步,agent只把数据提交到batch就结束

核心就是实现一个数据并发扇入,分批扇出的功能:

image.png

为了能优化分批的效果,batch中间件会有一个缓存队列,agent提交的数据会先放入到队列中,然后消费端从队列中取出数据,分批上报到中心系统,由于agent和batch中间件属于本地通讯,提交数据又是先放到队列,所以队列中的数据就容易堆积,达到分批的前提条件,有堆积数据,我就可以开始分批处理,完整逻辑如下:

  1. 队列中没数据时等待
  2. 有数据时,不断从队列中取出数据,当数量达到分批数量时,调用中心接口上报。
  3. 数量没达到分批数量,但是队列中又没有数据时,也是上报。
  4. 上报结束后,又跳到2步骤,当都没数据时,就会跳到1步骤继续监听

注:因为即使数据没达到批次数量,当队列没数据时还是会上报,所以当上报数据少,且都是间隔上报,也没有并发,这时就容易出现一个批次就只有一个的情况,分批的就没什么效果

接下来我们开始实现核心分批功能:

/**
 * @Title 分批处理器
 * @Description 将in中的数据取出分批放入out。适合in有大量数据并发或快速写入,需要分批处理的场景
 * @Author hyman
 * @Date 2022-03-05
 **/
package batch


func New(in <-chan []byte, size int) <-chan [][]byte {
	if size < 2 {
		panic("nonsense! batch size less then 2")
	}
	out := make(chan [][]byte)
	go func() {
		defer close(out)
		loopBatch(in, out, size)
	}()
	return out
}

func loopBatch(in <-chan []byte, out chan<- [][]byte, size int) {
	var batch [][]byte
	for d := range in {
		batch = append(batch, d)
		more := true // 默认认为还有数据
		for more {
			select {
			default:
				more = false // in 没有数据
			case d, ok := <-in:
				if ok {
					batch = append(batch, d)
				} else {
					more = false // in关闭,且队列里没有数据
				}
			}
			l := len(batch)
			// more = false没数据,同时batch中也没数据,则break到外层for等待数据
			if !more && l == 0 {
				break
			}
			// 如果more = true队列可能还有数据,当batch里的数量小于批次数量,则继续内层for尝试从in取数据
			if more && l < size {
				continue
			}
			// 当in没数据或数量达到分批上限时,发送给out
			out <- batch
			batch = [][]byte{} // 重置
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

核心代码不到四十行,使用一个:in <-chan []byte带缓冲的chan来接收数据,使用一个out的chan作为批量输出,只需在外层再简单包裹一层业务逻辑,就可以满足分批需求。

核心的组装批次的逻辑放在了一个协程中,巧妙的使用两个for来读取in,实现没数据时阻塞和有数据时分批上报:

  • 第一个for用来监听in是否有数据,当队列无数据时会pending,in被关闭时就自动结束for,协程结束。
  • 第二个for巧妙用到了select的default功能,可以在不阻塞的情况下判断队列是否还有数据,实现组装分批数据。

测试

func TestBatchHttp(t *testing.T) {
	var in = make(chan []byte, 10000)
	// 模拟结束请求
	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if b, err := ioutil.ReadAll(r.Body); err == nil {
			in <- b
		}
		w.Write([]byte("success"))
	}))
	var wg sync.WaitGroup


	n :=time.Now()
	// 模拟并发上报100次
	for i := 0; i < 100; i++ {
		wg.Add(1)
		_i:=i
		go func() {
			body, _ := json.Marshal(map[string]string{"say": "hello " + strconv.Itoa(_i)})
			resp, err := http.Post(srv.URL, "application/json", bytes.NewReader(body))
			if err != nil {
				t.Fatal(err)
			}
			defer resp.Body.Close()
			if resp.StatusCode != http.StatusOK {
				t.Fatal(resp.Status)
			}
		}()
	}
	t.Log("post cost time = ", time.Now().Sub(n))
	// 5个一批
	bats := batch.New(in, 5)
	// 分批上报的协程数
	batchWorker := 1
	for i := 0; i < batchWorker; i++ {
		go func() {
			for dd := range bats {
				// 分批调用中心接口上报数据
				for _, d := range dd {
					fmt.Print(string(d))
					wg.Done()
				}
				fmt.Println()
				time.Sleep(10* time.Millisecond) // 模拟消费耗时

			}
		}()
	}
	wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

打印结果:

 batch_test.go:61: post cost time =  240.333µs
{"say":"hello 92"}
{"say":"hello 9"}
{"say":"hello 10"}{"say":"hello 79"}{"say":"hello 98"}{"say":"hello 0"}{"say":"hello 80"}
{"say":"hello 99"}{"say":"hello 93"}{"say":"hello 4"}{"say":"hello 8"}{"say":"hello 5"}
{"say":"hello 11"}{"say":"hello 47"}{"say":"hello 12"}{"say":"hello 7"}{"say":"hello 81"}
{"say":"hello 6"}{"say":"hello 48"}{"say":"hello 13"}{"say":"hello 2"}{"say":"hello 49"}
{"say":"hello 14"}{"say":"hello 75"}{"say":"hello 77"}{"say":"hello 51"}{"say":"hello 50"}
{"say":"hello 74"}{"say":"hello 73"}{"say":"hello 27"}{"say":"hello 52"}{"say":"hello 76"}
{"say":"hello 72"}{"say":"hello 28"}{"say":"hello 41"}{"say":"hello 53"}{"say":"hello 94"}
{"say":"hello 54"}{"say":"hello 15"}{"say":"hello 95"}{"say":"hello 16"}{"say":"hello 56"}
{"say":"hello 61"}{"say":"hello 58"}{"say":"hello 43"}{"say":"hello 85"}{"say":"hello 84"}
{"say":"hello 89"}{"say":"hello 78"}{"say":"hello 96"}{"say":"hello 1"}{"say":"hello 82"}
{"say":"hello 26"}{"say":"hello 57"}{"say":"hello 3"}{"say":"hello 46"}{"say":"hello 83"}
{"say":"hello 40"}{"say":"hello 55"}{"say":"hello 29"}{"say":"hello 65"}{"say":"hello 33"}
{"say":"hello 60"}{"say":"hello 42"}{"say":"hello 30"}{"say":"hello 62"}{"say":"hello 34"}
{"say":"hello 64"}{"say":"hello 66"}{"say":"hello 88"}{"say":"hello 91"}{"say":"hello 35"}
{"say":"hello 38"}{"say":"hello 39"}{"say":"hello 87"}{"say":"hello 18"}{"say":"hello 59"}
{"say":"hello 68"}{"say":"hello 19"}{"say":"hello 63"}{"say":"hello 20"}{"say":"hello 21"}
{"say":"hello 86"}{"say":"hello 17"}{"say":"hello 24"}{"say":"hello 70"}{"say":"hello 69"}
{"say":"hello 90"}{"say":"hello 22"}{"say":"hello 71"}{"say":"hello 25"}{"say":"hello 44"}
{"say":"hello 45"}{"say":"hello 36"}{"say":"hello 23"}{"say":"hello 37"}{"say":"hello 32"}
{"say":"hello 31"}{"say":"hello 67"}{"say":"hello 97"}
--- PASS: TestBatchHttp (0.26s)
PASS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

100个请求数据都有正常打印,并且大部分数量都符合预期,但是前两行只有1个,并没有到达分批数量,这是为什么?!

原因是刚开始分批速度大于提交速度,取出1个数据时队列就没有数据,所以第一批也就只有1个数据,第二批也是一样,由于批次消费就只有开一个协程,且每次消费都time.Sleep(10* time.Millisecond),消费过慢,于是数据逐渐堆积,满足批次数量,后面打印就正常。

PS:要验证这一说法,可以在”bats := batch.New(in, 5)“前面增加”time.Sleep(10* time.Millisecond)“让队列里数据有足够数据,这时打印就会符合预期。

这里的batchWorker只有1个,也就是说是向中心是串行上报,这个可以根据实际需求修改,通过开启多个协程来提高上报速度。

但是,有时候反而不满足需求,这样做会并发上报数据,增加中心压力,同时由于消费过快,反而使队列中的数量就不容易达到分批数量,分批效果就减少,只是加快队列消费而已。

# 总结

Go的chan功能非常强大,各种并发处理都能看到他的身影,也是Go推荐的方式:

Do not communicate by sharing memory; instead, share memory by communicating.

灵活借助chan并发安全,既能阻塞又能select default非阻塞的特性,可以优雅实现很多并发控制的需求