详解Golang中select的使用与源码分析
作者:胡大海 时间:2024-05-09 14:52:08
背景
golang
中主推 channel
通信。单个 channel
的通信可以通过一个goroutine
往 channel
发数据,另外一个从channel
取数据进行。这是阻塞的,因为要想顺利执行完这个步骤,需要 channel
准备好
才行,准备好
的条件如下:
1.发送
缓存有空间(如果是有缓存的
channel
)有等待接收的
goroutine
2.接收
缓存有数据(如果是有缓存的
channel
)有等待发送的
goroutine
对channel
实际使用中还有如下两个需求,这个时候就需要select
了。
同时监听多个
channel
在没有
channel
准备好的时候,也可以往下执行。
select 流程
1.空select
。作用是阻塞当前goroutine
。不要用for{}
来阻塞goroutine
,因为会占用cpu。而select{}
不会,因为当前goroutine
不会再被调度。
if len(cases) == 0 {
block()
}
2.配置好poll的顺序。由于是同时监听多个channel
的发送或者接收,所以需要按照一定的顺序查看哪个channel
准备好了。如果每次采用select
中的顺序查看channel
是否准备好了,那么只要在前面的channel
准备好的足够快,那么会造成后面的channel
即使准备好了,也永远不会被执行。打乱顺序的逻辑如下,采用了洗牌算法\color{red}{洗牌算法}洗牌算法,注意此过程中会过滤掉channel为nil的case。\color{red}{注意此过程中会过滤掉 channel 为 nil 的 case。}注意此过程中会过滤掉channel为nil的case。
// generate permuted order
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
3.配置好lock的顺序。由于可能会修改channel
中的数据,所以在打算往channel
中发送数据或者从channel
接收数据的时候,需要锁住 channel
。而一个channel
可能被多个select
监听,如果两个select
对两个channel
A和B,分别按照顺序A, B和B,A上锁,是可能会造成死锁的,导致两个select
都执行不下去。
所以select
中锁住channel
的顺序至关重要,解决方案是按照channel
的地址的顺序锁住channel
。因为在两个select
中channel
有交集的时候,都是按照交集中channel
的地址顺序锁channel
。
实际排序代码如下,采用堆排序算法\color{red}{堆排序算法}堆排序算法按照channel
的地址从小到大对channel
进行排序。
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
for i := range lockorder {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := len(lockorder) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
4.锁住select
中的所有channel
。要查看channel
中的数据了。
// lock all the channels involved in the select
sellock(scases, lockorder)
5.第一轮查看是否已有准备好
的channel。如果有直接发送数据到channel
或者从channel
接收数据。注意select
的channel
切片中,前面部分是从channel
接收数据的case,后半部分是往channel
发送数据的case。
按照pollorder
顺序查看是否有channel
准备好了。
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends {
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else {
if raceenabled {
racereadpc(c.raceaddr(), casePC(casi), chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}
6.直接执行default
分支
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}
7.第二轮遍历channel
。创建sudog
把当前goroutine
放到每个channel
的等待列表中去,等待channel
准备好时被唤醒。
// pass 2 - enqueue on all chans
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
8.等待被唤醒。其中gopark
的时候会释放对所有channel
占用的锁。
// wait for someone to wake us up
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
gp.activeStackChans = false
9.被唤醒
锁住所有
channel
清理当前
goroutine
的等待sudog
找到是被哪个
channel
唤醒的,并清理每个channel
上当前的goroutine
对应的sudog
sellock(scases, lockorder)
gp.selectDone = 0
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
来源:https://juejin.cn/post/7182844543551668284