Go调度器学习之goroutine调度详解

作者:IguoChan 时间:2024-04-30 10:06:10 

0. 简介

上篇博客介绍了goroutine的创建、执行和退出的过程,并且提及了在协程切换时涉及的调度循环,本篇博客我们就来探究一下其他情形引起的协程调度。

1. 协程调度发生的时机

在以下情形中,goroutine可能会发生调度:

情形说明
go func(){}使用go关键字创建一个新的goroutine,调度器会考虑调度
GC由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度
系统调用发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来
同步内存访问mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行

2. 创建协程时的调度

其中,使用go关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。

func newproc(fn *funcval) {
  gp := getg()
  pc := getcallerpc()
  systemstack(func() {
     newg := newproc1(fn, gp, pc)

_p_ := getg().m.p.ptr()
     runqput(_p_, newg, true)

if mainStarted {
        wakep()
     }
  })
}

我们还记得,go关键字在创建协程时,Go的编译器会将其转换为runtime.newproc函数,上篇我们详细分析了main goroutine的创建过程,在runtime.main函数中,全局变量mainStarted会被置为true,之后普通协程的创建,则会调用runtime.wakep函数尝试唤醒空闲的P。

func wakep() {
  if atomic.Load(&sched.npidle) == 0 {
     return
  }
  // be conservative about spinning threads
  if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
     return
  }
  startm(nil, true)
}

wakep函数首先确认是否有其他线程正在处于spinning状态,即M是否在找工作,如果没有的话,则调用startm函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。

func startm(_p_ *p, spinning bool) {
  // Disable preemption.
  //
  // Every owned P must have an owner that will eventually stop it in the
  // event of a GC stop request. startm takes transient ownership of a P
  // (either from argument or pidleget below) and transfers ownership to
  // a started M, which will be responsible for performing the stop.
  //
  // Preemption must be disabled during this transient ownership,
  // otherwise the P this is running on may enter GC stop while still
  // holding the transient P, leaving that P in limbo and deadlocking the
  // STW.
  //
  // Callers passing a non-nil P must already be in non-preemptible
  // context, otherwise such preemption could occur on function entry to
  // startm. Callers passing a nil P may be preemptible, so we must
  // disable preemption before acquiring a P from pidleget below.
  mp := acquirem()  // 保证在此期间不会发生栈扩展
  lock(&sched.lock)
  if _p_ == nil {   // 没有指定p,那么需要从空闲队列中取一个p
     _p_ = pidleget()
     if _p_ == nil {// 如果没有空闲的p,直接返回
        unlock(&sched.lock)
        if spinning {
           // The caller incremented nmspinning, but there are no idle Ps,
           // so it's okay to just undo the increment and give up.
           if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
              throw("startm: negative nmspinning")
           }
        }
        releasem(mp)
        return
     }
  }
  nmp := mget()  // 如果有空闲的p,那么取出一个空闲的m
  if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回
     // No M is available, we must drop sched.lock and call newm.
     // However, we already own a P to assign to the M.
     //
     // Once sched.lock is released, another G (e.g., in a syscall),
     // could find no idle P while checkdead finds a runnable G but
     // no running M's because this new M hasn't started yet, thus
     // throwing in an apparent deadlock.
     //
     // Avoid this situation by pre-allocating the ID for the new M,
     // thus marking it as 'running' before we drop sched.lock. This
     // new M will eventually run the scheduler to execute any
     // queued G's.
     id := mReserveID()
     unlock(&sched.lock)

var fn func()
     if spinning {
        // The caller incremented nmspinning, so set m.spinning in the new M.
        fn = mspinning
     }
     newm(fn, _p_, id)
     // Ownership transfer of _p_ committed by start in newm.
     // Preemption is now safe.
     releasem(mp)
     return
  }
  unlock(&sched.lock)
  if nmp.spinning {
     throw("startm: m is spinning")
  }
  if nmp.nextp != 0 {
     throw("startm: m has p")
  }
  if spinning && !runqempty(_p_) {
     throw("startm: p has runnable gs")
  }
  // The caller incremented nmspinning, so set m.spinning in the new M.
  nmp.spinning = spinning
  nmp.nextp.set(_p_)
  notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m
  // Ownership transfer of _p_ committed by wakeup. Preemption is now
  // safe.
  releasem(mp)
}

startm函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,wakep函数就是为了更大程度的利用P,利用CPU资源。

说到这里,我们就需要重温一下上篇博客讲到的,调度中获取goroutine的规则是:

  • 每调度61次就需要从全局队列中获取goroutine

  • 其次优先从本P所在队列中获取goroutine

  • 如果还没有获取到,则从其他P的运行队列中窃取goroutine

其中,从其他P队列中窃取goroutine,调用的是findrunnable函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:

func findrunnable() (gp *g, inheritTime bool) {
  _g_ := getg()

top:
  _p_ := _g_.m.p.ptr()
  ...

// local runq
  // 再从本地队列找找
  if gp, inheritTime := runqget(_p_); gp != nil {
     return gp, inheritTime
  }

// global runq
  // 再看看全局队列
  if sched.runqsize != 0 {
     lock(&sched.lock)
     gp := globrunqget(_p_, 0)
     unlock(&sched.lock)
     if gp != nil {
        return gp, false
     }
  }

...

// Spinning Ms: steal work from other Ps.
  //
  // Limit the number of spinning Ms to half the number of busy Ps.
  // This is necessary to prevent excessive CPU consumption when
  // GOMAXPROCS>>1 but the program parallelism is low.
  procs := uint32(gomaxprocs)
  if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
     if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
     }

gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine
     now = tnow
     if gp != nil {
        // Successfully stole.
        return gp, inheritTime
     }
     if newWork {
        // There may be new timer or GC work; restart to
        // discover.
        goto top
     }
     if w != 0 && (pollUntil == 0 || w < pollUntil) {
        // Earlier timer to wait for.
        pollUntil = w
     }
  }

...

// return P and block
  // 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠
  lock(&sched.lock)
  if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
     unlock(&sched.lock)
     goto top
  }
  if sched.runqsize != 0 {
     gp := globrunqget(_p_, 0)
     unlock(&sched.lock)
     return gp, false
  }
  if releasep() != _p_ {
     throw("findrunnable: wrong p")
  }
  pidleput(_p_)
  unlock(&sched.lock)

...
     _g_.m.spinning = false // m即将睡眠,状态不再是spinning
     if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
        throw("findrunnable: negative nmspinning")
     }

...
  stopm() // 休眠
  goto top
}

从上面的代码可以看出,工作线程会反复尝试寻找运行的goroutine,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine时的状态称之为自旋(spinning)状态,而前面讲到wakep调用startm函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。

窃取算法stealWork我们就不分析了,有兴趣的同学可以看看。下面具体分析下stopm是怎么实现线程睡眠的。

func stopm() {
  _g_ := getg()

if _g_.m.locks != 0 {
     throw("stopm holding locks")
  }
  if _g_.m.p != 0 {
     throw("stopm holding p")
  }
  if _g_.m.spinning {
     throw("stopm spinning")
  }

lock(&sched.lock)
  mput(_g_.m)         // 把m放到sched.midle空闲队列
  unlock(&sched.lock)
  mPark()
  acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作
  _g_.m.nextp = 0
}

func mPark() {
  gp := getg()
  notesleep(&gp.m.park) // 进入睡眠状态
  noteclear(&gp.m.park)
}

可以看出,stopm主要是将m对象放到调度器的空闲线程队列,然后通过notesleep进入睡眠状态。notego runtime实现的一次性睡眠和唤醒机制,通过notesleep进入睡眠状态,然后另一个线程可以通过notewakeup唤醒这个线程。

小结

上面巴拉巴拉讲了那么多,看的人有点头晕,我们接下来讲一个很小的例子梳理一下以上的逻辑(主线程的创建和执行在上一篇博客中详细叙述过,这里不再赘述),主线程创建了一个goroutine,这时候会触发wakep,接下来可能会唤醒空闲的工作线程(如果是第一个非main goroutine,就没有空闲的工作线程),或者创建一个新的工作线程,或者什么都不做。

如果是创建一个新的工作线程,那么其开启执行的点也是mstart函数(注意区分mstartstartm),然后在schedule函数中会尝试去获取goroutine,如果全局和本地的goroutine队列都没有,则会去其他的P上窃取goroutine,如果窃取不成功,则会休眠。

如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。

窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。

Go调度器学习之goroutine调度详解

3. 主动挂起

Go中,有很多种情形会导致goroutine阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如channel的读写,我们以通道的阻塞读为例,来介绍goroutine的主动挂起的调度方式。

3.1 协程挂起

和前面介绍的Map一样,channel的读也有以下两种读取方式:

v := <- ch
v, ok := <- ch

分别对应以下chanrecv1chanrecv2函数:

//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
  chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
  _, received = chanrecv(c, elem, true)
  return
}

无论是哪个函数,最终调用的都是chanrecv函数:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...

c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中

atomic.Store8(&gp.parkingOnChan, 1)
  // 挂起这个goroutine
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

...
}

chanrecv会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个goroutine放到channelrecvqueue中,然后调用gopark函数将当前goroutine挂起并阻塞。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
  if reason != waitReasonSleep {
     checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
  }
  mp := acquirem()
  gp := mp.curg
  status := readgstatus(gp)
  if status != _Grunning && status != _Gscanrunning {
     throw("gopark: bad g status")
  }
  mp.waitlock = lock
  mp.waitunlockf = unlockf
  gp.waitreason = reason
  mp.waittraceev = traceEv
  mp.waittraceskip = traceskip
  releasem(mp)
  // can't do anything that might move the G between Ms here.
  mcall(park_m)
}

gopark函数则使用mcall函数(前面分析过,主要作用是保存当前goroutine现场,然后切换到g0栈去调用作为参数传入的函数)取执行park_m函数:

// park continuation on g0.
func park_m(gp *g) {
  _g_ := getg()

if trace.enabled {
     traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
  }

casgstatus(gp, _Grunning, _Gwaiting)
  dropg()

if fn := _g_.m.waitunlockf; fn != nil {
     ok := fn(gp, _g_.m.waitlock)
     _g_.m.waitunlockf = nil
     _g_.m.waitlock = nil
     if !ok {
        if trace.enabled {
           traceGoUnpark(gp, 2)
        }
        casgstatus(gp, _Gwaiting, _Grunnable)
        execute(gp, true) // Schedule it back, never returns.
     }
  }
  schedule()
}

park_m首先把当前goroutine的状态设置为_Gwaiting(因为它正在等待其它goroutinechannel里面写数据),然后调用dropg函数解除gm之间的关系,最后通过调用schedule函数进入调度循环。

至此,一个goroutine就被主动挂起了。

3.2 协程唤醒

我们继续以上例子,当另一个goroutine对这个channel发送数据的时候

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...

if sg := c.recvq.dequeue(); sg != nil {
     // Found a waiting receiver. We pass the value we want to send
     // directly to the receiver, bypassing the channel buffer (if any).
     send(c, sg, ep, func() { unlock(&c.lock) }, 3)
     return true
  }

...
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ...
  goready(gp, skip+1)
}

channel的发送流程和读取类似,当检查到接收队列中有等待着时,会调用send函数然后调用goready唤醒协程:

func goready(gp *g, traceskip int) {
  systemstack(func() {
     ready(gp, traceskip, true)
  })
}

func ready(gp *g, traceskip int, next bool) {
  if trace.enabled {
     traceGoUnpark(gp, traceskip)
  }

status := readgstatus(gp)

// Mark runnable.
  _g_ := getg()
  mp := acquirem() // disable preemption because it can be holding p in a local var
  if status&^_Gscan != _Gwaiting {
     dumpgstatus(gp)
     throw("bad g->status in ready")
  }

// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
  casgstatus(gp, _Gwaiting, _Grunnable)
  runqput(_g_.m.p.ptr(), gp, next)
  wakep()
  releasem(mp)
}

这里发现,ready函数和创建协程时一样,会触发wakep来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的goroutine会放到P的本地队列的下一个执行goroutine,以提升时效性。

到这里,一个被挂起的协程也就被唤醒了。

4. 小结

本文我们分析了创建协程时发生的调度,也介绍了以channel读写为例子的主动挂起似的调度。而系统调用和GC触发的调度比较复杂,我们放在后面介绍。

来源:https://juejin.cn/post/7215967453929799740

标签:Go,goroutine,调度
0
投稿

猜你喜欢

  • 关于vue中element-ui form或table lable换行的问题

    2023-07-02 17:07:31
  • 对Python3中列表乘以某一个数的示例详解

    2023-05-05 03:10:40
  • 使用Python实现NBA球员数据查询小程序功能

    2021-06-27 17:15:48
  • python中GIL的原理及用法总结

    2023-03-11 07:43:13
  • 用Python画一个LinkinPark的logo代码实例

    2022-09-27 05:52:19
  • Jupyter Notebook远程登录及密码设置操作

    2022-08-29 08:43:04
  • python文件目录操作之os模块

    2023-01-10 14:22:59
  • 用Python做一个哔站小姐姐词云跳舞视频

    2022-09-17 12:32:30
  • 对pandas的dataframe绘图并保存的实现方法

    2021-12-21 14:54:50
  • pandas创建DataFrame的方式小结

    2021-10-25 13:26:23
  • keras分类模型中的输入数据与标签的维度实例

    2022-01-30 02:12:43
  • python绘制带有色块的折线图

    2022-08-11 07:01:58
  • 解决idea git切换多个分支后maven不生效的问题

    2023-06-28 17:48:13
  • matplotlib.pyplot.matshow 矩阵可视化实例

    2022-01-04 02:57:21
  • Python读取及保存mat文件的注意事项说明

    2022-01-10 13:25:54
  • element中form组件prop嵌套属性的问题解决

    2023-07-02 16:49:18
  • Python3 获取一大段文本之间两个关键字之间的内容方法

    2022-01-17 15:03:31
  • tensorflow 用矩阵运算替换for循环 用tf.tile而不写for的方法

    2021-08-11 11:43:55
  • 解决echarts中饼图标签重叠的问题

    2021-10-22 03:33:33
  • mssql server 数据库附加不上解决办法分享

    2011-09-30 11:55:20
  • asp之家 网络编程 m.aspxhome.com