Go 并发控制、防击穿 - singleflight
# 背景
在高并发场景下经常会有存在并发访问同一个资源,这些并发的请求参数是一样,并且响应的结果也是一样,如果每个请求都重复去查询资源,无疑会给系统带来不必要的开销,并且系统压力也会增大。
为了保护资源,可以对相同资源的并发请求进行拦截,只允许一个请求去查询资源,然后将获取的资源分享给其他的请求,从而较少重复查询的开销,特别是解决缓存击穿时的并发问题。
# 方案
# Go singleflight
Go提供了singleflight组件:"golang.org/x/sync/singleflight"组件,可以很方便实现该功能,实现如下:
var g = singleflight.Group{}
type Info struct {
ID string
}
// handle request
func handle(id string) (Info, error) {
val, err, _ := g.Do(id, func() (interface{}, error) {
// 查询info的信息
info, err := GetInfo(id)
return info, err
})
if err != nil {
return Info{}, err
}
return val.(Info), err
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
模拟客户端调用:
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 1000000000; i++ {
wg.Add(1)
go func(j int) { // 模拟并发查询请求
defer wg.Done()
_, err := handle(strconv.Itoa(j % 1000))
fmt.Println(err)
}(i)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
当并发调用handle请求相同id的资源时,只会有第一个到达的请求会获取到执行GetInfo(id)的权利,在请求还未处理结束期间,其他的请求会阻塞在“g.Do(...)”方法,当第一个请求获取到结果时,会唤醒其他阻塞请求,并将结果分享。
核心方法Do(key string, fn func() (interface{}, error)) 包含两个参数:
- key:请求标识。用来区分请求、标识相同资源的请求,多个请求相同资源对应的key必须相同。
- fn func() (interface{}, error):获取资源方法。该方法返回两个参数,第一个查询结果,第二是异常,获取执行权的请求除了会分享正常结果,还会分享异常,当返回error时,同样也是会被分享,最终这批并发都会响应error
# 源码分析
去除一些非必要的代码,核心的代码可以简化如下:
// 每个穿透的请求封装从一个call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil { // Group属性都是私有,所以刚开始是nil
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // ok为true表示有一个请求穿透去执行,所以请求就Wait
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
// key没有请求穿透查询时,实例化个call,放到全局map中
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// 执行查询,结束后将对应的key从全局map移除,还有一个关键方法就是执行c.wg.Done(),通知其他request请求结束,可以从call 中拿出数据。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
defer func() {
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
delete(g.m, key)
}()
c.val, c.err = fn()
normalReturn = true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
singleflight实现非常简单:
- 穿透的请求被封装为一个call对象,用来存储返回的数据和异常,最关键的是还有一个sync.WaitGroup,是想利用其Wait()阻塞的特性
- Group中包含共享的map:map[string]*call,用来保存正被穿透执行的key和对应请求call,第一个请求会初始化一个call,并且记录到map中。
- 其他相同key的请求从map获取到call,执行里面的WaitGroup的Wait()方法,协程就会阻塞等待
- 第一个请求处理完返回结果后会执行c.wg.Done(),表示请求结束,当前协程和其他协程就可以从call中获取结果返回。
# 最佳实践
在singleflight使用时,需要为同类请求定义一个key,同类请求的key必须相同,理论上可以不同的请求类型使用同一个singleflight实例,只需要为他们定义不同的key就好,比如:
- 获取用户信息,key格式为:“user_“ + 用户ID
- 查询部门格式问:“department” + 部门ID
但是一般情况下这是两类查询功能,是处理两个维度的问题,最好是使用不同的singleflight实例,另外,查询员工逻辑和并发控制是解决两个维度问题,代码不应该耦合在一起,这里提供了两个不同场景的实现方式。
# 函数方式
当查询逻辑的实现是使用函数方法时,我们可以考虑使用闭包方式,为函数增加并发保护,例如:
查询用户的函数是:
func FindUser(db *gorm.DB, id int) (*User, error) {
var u = &User{}
return u, db.First(u, id).Error
}
2
3
4
如果直接修改FindUser增加并发控制就会违背了单一职责,比较好的方式是增加一个新的防并发的方法,实现相同的函数参数和返回值:
var FindUserSingle = findUserSingleWrap(FindUser)
// 增加防击穿保护
func findUserSingleWrap(fn func(db *gorm.DB, id int) (*User, error)) func(db *gorm.DB, id int) (*User, error) {
single := &singleflight.Group{}
return func(db *gorm.DB, id int) (*User, error) {
u, err, _ := single.Do(strconv.Itoa(id), func() (interface{}, error) {
return fn(db, id)
})
if err != nil {
return nil, err
}
return u.(*User), nil
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
findUserSingleWrap方法调用后会返回一个增加并发保护的方法,客户端使用:
var u, err = FindUserSingle.FindUser(db, 11)
不过findUserSingleWrap参数和返回值会有点长,可以优化一下,为函数定义一个类型:
type findUserFn func(db *gorm.DB, id int) (*User, error)
这样findUserSingleWrap就可以简化为:
// 增加防击穿
func findUserSingleWrap(fn findUserFn) findUserFn {
single := &singleflight.Group{}
return func(db *gorm.DB, id int) (*User, error) {
u, err, _ := single.Do(strconv.Itoa(id), func() (interface{}, error) {
return fn(db, id)
})
if err != nil {
return nil, err
}
return u.(*User), nil
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 方法方式
如果FindUser实现是类方法时:
type UserFinder struct {
db *gorm.DB
}
func (r *UserFinder) FindUser(id int) (*User, error) {
var u = &User{}
return u, r.db.First(u, id).Error
}
func NewUserFinder(db *gorm.DB) *UserFinder {
return &UserFinder{
db: db
}
}
2
3
4
5
6
7
8
9
10
11
12
就可以考虑增加一个实现相同接口的类:
type UserFinder interface {
FindUser(id int) (*User, error)
}
type FindUserSingle struct {
single *singleflight.Group
finder UserFinder
}
func (r *UserFinder) FindUser(id int) (*User, error) {
ret, err, _:=r.s.Do(key, func() (interface{}, error) {
return r.finder.FindUser(id)
})
if err != nil {
return u, err
}
return ret.(*User), nil
}
func NewFindUserSingle(finder UserFinder) *FindUserSingle {
return &FindUserSingle{
single: &singleflight.Group{},
finder: finder,
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
客户端:
userFinder := NewFindUserSingle(NewUserFinder(db))
u, err = userFinder.FindUser(11)
2