Go WaitGroup及Cond底层实现原理
作者:阿甘与阿Q 时间:2024-02-18 23:05:03
WaitGroup
概念
Go
标准库提供了WaitGroup
原语, 可以用它来等待一批 Goroutine 结束
底层数据结构
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
其中 noCopy
是 golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet
检查程序时,就会发现有报错。但需要注意的是,noCopy 不会影响程序正常的编译和运行。
state1
主要是存储着状态和信号量,状态维护了 2 个计数器,一个是请求计数器counter ,另外一个是等待计数器waiter(已调用 WaitGroup.Wait
的 goroutine 的个数)
当数组的首地址是处于一个8
字节对齐的位置上时,那么就将这个数组的前8
个字节作为64
位值使用表示状态,后4
个字节作为32
位值表示信号量(semaphore
);同理如果首地址没有处于8
字节对齐的位置上时,那么就将前4
个字节作为semaphore
,后8
个字节作为64
位数值。
使用方法
在WaitGroup里主要有3个方法:
WaitGroup.Add()
:可以添加或减少请求的goroutine数量,Add(n)
将会导致 counter += n
WaitGroup.Done()
:相当于Add(-1),Done()
将导致 counter -=1
,请求计数器counter为0 时通过信号量调用runtime_Semrelease
唤醒waiter线程
WaitGroup.Wait()
:会将 waiter++
,同时通过信号量调用 runtime_Semacquire(semap)
阻塞当前 goroutine
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
println("hello")
}()
}
wg.Wait()
}
Cond
概念
Go
标准库提供了Cond
原语,可以让 Goroutine 在满足特定条件时被阻塞和唤醒
底层数据结构
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
主要有4
个字段:
nocopy
: golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet
检查程序时,就会发现有报错,但需要注意的是,noCopy 不会影响程序正常的编译和运行
checker
:用于禁止运行期间发生拷贝,双重检查(Double check
)
L
:可以传入一个读写锁或互斥锁,当修改条件或者调用Wait
方法时需要加锁
notify
:通知链表,调用Wait()
方法的Goroutine
会放到这个链表中,从这里获取需被唤醒的Goroutine列表
使用方法
在Cond里主要有3个方法:
sync.NewCond(l Locker)
: 新建一个 sync.Cond 变量,注意该函数需要一个 Locker 作为必填参数,这是因为在cond.Wait()
中底层会涉及到 Locker 的锁操作Cond.Wait()
: 阻塞等待被唤醒,调用Wait函数前需要先加锁;并且由于Wait函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止Cond.Signal()
: 只唤醒一个最先 Wait 的 goroutine,可以不用加锁Cond.Broadcast()
: 唤醒所有Wait的goroutine,可以不用加锁
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
go broadcast(c)
time.Sleep(1 * time.Second)
}
func broadcast(c *sync.Cond) {
// 原子操作
atomic.StoreInt64(&status, 1)
c.Broadcast()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
// Wait 内部会先调用 c.L.Unlock(),来先释放锁,如果调用方不先加锁的话,会报错
}
fmt.Println("listen")
c.L.Unlock()
}
来源:https://juejin.cn/post/7134226651952447524


猜你喜欢
一个查看MSSQLServer数据库空间使用情况的存储过程 SpaceUsed
Python绘制全球疫情变化地图的实例代码

用javascript控制iframe滚动的代码
对Python获取屏幕截图的4种方法详解
困惹的A标签

Python实现简单图像缩放与旋转

vue2.0中set添加属性后视图不能更新的解决办法

Python 队列Queue和PriorityQueue解析

js截取字符串的两种方法及区别详解
如何检测用户第一次访问我的网站并显示友好信息?
Python pandas RFM模型应用实例详解

python中pandas输出完整、对齐的表格的方法

Python深度学习albumentations数据增强库

如何在 IE 中使用 HTML5 元素
graphql---go http请求使用详解
MySQL查询出现1055错误的原因及解决方法

自动备份Oracle数据库
PHP实现用户认证及管理完全源码
Python多线程爬虫实战_爬取糗事百科段子的实例
分析解决Python中sqlalchemy数据库连接池QueuePool异常
