SingleFlight模式的Go并发编程学习

作者:kevwan 时间:2024-04-29 13:05:39 

最近接触到微服务框架go-zero,翻看了整个框架代码,发现结构清晰、代码简洁,所以决定阅读源码学习下,本次阅读的源码位于core/syncx/singleflight.go

go-zeroSingleFlight的作用是:将并发请求合并成一个请求,以减少对下层服务的压力。

应用场景

  • 查询缓存时,合并请求,提升服务性能。 假设有一个 IP 查询的服务,每次用户请求先在缓存中查询一个 IP 的归属地,如果缓存中有结果则直接返回,不存在则进行 IP 解析操作。

SingleFlight模式的Go并发编程学习

如上图所示,n 个用户请求查询同一个 IP(8.8.8.8)就会对应 n 个 Redis 的查询,在高并发场景下,如果能将 n 个 Redis 查询合并成一个 Redis 查询,那么性能肯定会提升很多,而 SingleFlight就是用来实现请求合并的,效果如下:

SingleFlight模式的Go并发编程学习

  • 防止缓存击穿。

缓存击穿问题是指:在高并发的场景中,大量的请求同时查询一个 key ,如果这个 key 正好过期失效了,就会导致大量的请求都打到数据库,导致数据库的连接增多,负载上升。

SingleFlight模式的Go并发编程学习

通过SingleFlight可以将对同一个Key的并发请求进行合并,只让其中一个请求到数据库进行查询,其他请求共享同一个结果,可以很大程度提升并发能力。

应用方式

直接上代码:

func main() {
 round := 10
 var wg sync.WaitGroup
 barrier := syncx.NewSingleFlight()
 wg.Add(round)
 for i := 0; i < round; i++ {
   go func() {
     defer wg.Done()
     // 启用10个协程模拟获取缓存操作
     val, err := barrier.Do("get_rand_int", func() (interface{}, error) {
       time.Sleep(time.Second)
       return rand.Int(), nil
     })
     if err != nil {
       fmt.Println(err)
     } else {
       fmt.Println(val)
     }
   }()
 }
 wg.Wait()
}

以上代码,模拟 10 个协程请求 Redis 获取一个 key 的内容,代码很简单,就是执行Do()方法。其中,接收两个参数,第一个参数是获取资源的标识,可以是 redis 中缓存的 key,第二个参数就是一个匿名函数,封装好要做的业务逻辑。最终获得的结果如下:

5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410
5577006791947779410

从上看出,10个协程都获得了同一个结果,也就是只有一个协程真正执行了rand.Int()获取了随机数,其他的协程都共享了这个结果。

源码解析

先看代码结构:

type (
 // 定义接口,有2个方法 Do 和 DoEx,其实逻辑是一样的,DoEx 多了一个标识,主要看Do的逻辑就够了
 SingleFlight interface {
   Do(key string, fn func() (interface{}, error)) (interface{}, error)
   DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
 }
 // 定义 call 的结构
 call struct {
   wg  sync.WaitGroup // 用于实现通过1个 call,其他 call 阻塞
   val interface{}    // 表示 call 操作的返回结果
   err error          // 表示 call 操作发生的错误
 }
 // 总控结构,实现 SingleFlight 接口
 flightGroup struct {
   calls map[string]*call // 不同的 call 对应不同的 key
   lock  sync.Mutex       // 利用锁控制请求
 }
)

然后看最核心的Do方法做了什么事情:

func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
 c, done := g.createCall(key)
 if done {
   return c.val, c.err
 }
 g.makeCall(c, key, fn)
 return c.val, c.err
}

代码很简洁,利用g.createCall(key)对 key 发起 call 请求(其实就是做一件事情),如果此时已经有其他协程已经在发起 call 请求就阻塞住(done 为 true 的情况),等待拿到结果后直接返回。如果 done 是 false,说明当前协程是第一个发起 call 的协程,那么就执行g.makeCall(c, key, fn)真正地发起 call 请求(此后的其他协程就阻塞在了g.createCall(key))。 

SingleFlight模式的Go并发编程学习

从上图可知,其实关键就两步:

  • 判断是第一个请求的协程(利用map)

  • 阻塞住其他所有协程(利用 sync.WaitGroup)

来看下g.createCall(key)如何实现的:

func (g *flightGroup) createCall(key string) (c *call, done bool) {
 g.lock.Lock()
 if c, ok := g.calls[key]; ok {
   g.lock.Unlock()
   c.wg.Wait()
   return c, true
 }
 c = new(call)
 c.wg.Add(1)
 g.calls[key] = c
 g.lock.Unlock()
 return c, false
}

先看第一步:判断是第一个请求的协程(利用map)

g.lock.Lock()
if c, ok := g.calls[key]; ok {
 g.lock.Unlock()
 c.wg.Wait()
 return c, true
}

此处判断 map 中的 key 是否存在,如果已经存在,说明已经有其他协程在请求了,当前这个协程只需要等待,等待是利用了sync.WaitGroupWait()方法实现的,此处还是很巧妙的。要注意的是,map 在 Go 中是非并发安全的,所以需要加锁。

再看第二步:阻塞住其他所有协程(利用 sync.WaitGroup)

c = new(call)
c.wg.Add(1)
g.calls[key] = c

因为是第一个发起 call 的协程,所以需要 new 这个 call,然后将wg.Add(1),这样就对应了上面的wg.Wait(),阻塞剩下的协程。随后将 new 的 call 放入 map 中,注意此时只是完成了初始化,并没有真正去执行call请求,真正的处理逻辑在 g.makeCall(c, key, fn)中。

func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
 defer func() {
   g.lock.Lock()
   delete(g.calls, key)
   g.lock.Unlock()
   c.wg.Done()
 }()
 c.val, c.err = fn()
}

这个方法中做的事情很简单,就是执行了传递的匿名函数fn()(也就是真正call请求要做的事情)。最后处理收尾的事情(通过defer),也是分成两步:

  • 删除 map 中的 key,使得下次发起请求可以获取新的值。

  • 调用wg.Done(),让之前阻塞的协程全部获得结果并返回。

至此,SingleFlight 的核心代码就解析完毕了,虽然代码不长,但是这个思想还是很棒的,可以在实际工作中借鉴。

来源:https://my.oschina.net/kevwan/blog/5518730

标签:SingleFlight,Go,并发编程
0
投稿

猜你喜欢

  • Python的列表推导式你了解吗

    2022-08-13 05:30:41
  • 详解golang中bufio包的实现原理

    2024-04-28 09:13:34
  • golang通过node_exporter监控GPU及cpu频率、温度的代码

    2024-02-04 14:53:22
  • python飞机大战pygame游戏框架搭建操作详解

    2022-09-24 05:49:51
  • python利用7z批量解压rar的实现

    2021-05-02 18:58:31
  • Python实现鸡群算法的示例代码

    2022-03-23 19:02:54
  • SQL事务用法begin tran,commit tran和rollback tran的用法

    2024-01-20 01:44:57
  • js判断传入时间和当前时间大小实例(超简单)

    2024-05-02 17:26:40
  • 详解Python模块化--模块(Modules)和包(Packages)

    2023-03-30 01:14:27
  • 手把手教你在Python里使用ChatGPT

    2023-11-10 02:55:58
  • Python模仿POST提交HTTP数据及使用Cookie值的方法

    2022-05-04 04:37:35
  • JS实现普通轮播图特效

    2024-04-17 10:19:52
  • 使用python画出逻辑斯蒂映射(logistic map)中的分叉图案例

    2023-05-19 13:05:18
  • 使用Pycharm+PyQt5弹出子窗口的程序代码

    2022-03-09 20:15:17
  • python通过apply使用元祖和列表调用函数实例

    2021-02-18 03:18:32
  • 数据库的选择原则是什么?

    2010-07-14 21:05:00
  • Python 模拟死锁的常见实例详解

    2022-08-02 04:57:18
  • Golang import 导入包语法及一些特殊用法详解

    2024-02-02 08:28:30
  • SELECT… FOR UPDATE 排他锁的实现

    2024-01-19 12:30:16
  • Yii开启片段缓存的方法

    2023-11-21 05:00:40
  • asp之家 网络编程 m.aspxhome.com