Golang中channel的原理解读(推荐)

作者:可问春风丶 时间:2024-02-08 15:41:38 

数据结构

channel的数据结构在$GOROOT/src/runtime/chan.go文件下:


type hchan struct {

qcount   uint           // 当前队列中剩余元素个数

dataqsiz uint           // 环形队列长度,即可以存放的元素个数

buf      unsafe.Pointer // 环形队列指针

elemsize uint16         // 每个元素的大小

closed   uint32         // 标记是否关闭

elemtype *_type         // 元素类型

sendx    uint           // 队列下标,指向元素写入时存放到队列中的位置

recvx    uint           // 队列下标,指向元素从队列中读出的位置

recvq    waitq          // 等待读消息的groutine队列

sendq    waitq          // 等待写消息的groutine队列

lock     mutex          // 互斥锁

}

chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:

Golang中channel的原理解读(推荐)

等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog结构:


type waitq struct {
  first *sudog
  last  *sudog
}

type sudog struct {
  g            *g
  next         *sudog
  prev         *sudog
  elem         unsafe.Pointer // data element (may point to stack)

acquiretime  int64
  releasetime  int64
  ticket       uint32
  isSelect     bool

parent       *sudog // semaRoot binary tree
  waitlink     *sudog // g.waiting list or semaRoot
  waittail     *sudog // semaRoot
  c            *hchan // channel
}

创建channel

通常使用make(channel string, 0)的方式创建无缓存的channel,使用make(channel string, 10)创建有缓存的channel。

源码:


func makechan(t *chantype, size int) *hchan {
  elem := t.elem

// compiler checks this but be safe.
  if elem.size >= 1<<16 {
     throw("makechan: invalid channel element type")
  }
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
     throw("makechan: bad alignment")
  }

mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
     panic(plainError("makechan: size out of range"))
  }
  var c *hchan
  switch {

case mem == 0:
  // 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
     c = (*hchan)(mallocgc(hchanSize, nil, true))
     c.buf = c.raceaddr()
  case elem.ptrdata == 0:
  // 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
     c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
     c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
  //单独为 runtime.hchan 和缓冲区分配内存;
     c = new(hchan)
     c.buf = mallocgc(mem, elem, true)
  }

c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)
  // 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段;
  if debugChan {
     print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  }
  return c
}

channel读写

  1. 当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。

  2. 如果缓冲区有数据并有空余位置,将数据放入缓冲区。

  3. 如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。

Golang中channel的原理解读(推荐)

源码:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  .....
  lock(&c.lock)

if c.closed != 0 {
     unlock(&c.lock)
     panic(plainError("send on closed channel"))
  }

// 如果Channel 没有被关闭并且已经有处于读等待的 Goroutine,
  // 那么从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
  if sg := c.recvq.dequeue(); sg != nil {
     send(c, sg, ep, func() { unlock(&c.lock) }, 3)
     return true
  }

// 如果recvq为空且缓冲区中还有剩余空间
  if c.qcount < c.dataqsiz {
  // 计算出下一个可以存储数据的位置,
     qp := chanbuf(c, c.sendx)
     // raceenabled: 是否启用数据竞争检测,在编译时指定,默认为false
     if raceenabled {
     // 发出数据竞争警告
        raceacquire(qp)
        racerelease(qp)
     }
     // 将发送的数据拷贝到缓冲区中,产生内存拷贝
     typedmemmove(c.elemtype, qp, ep)
     // 增加 sendx 索引
     c.sendx++
     if c.sendx == c.dataqsiz {
        c.sendx = 0
     }
     // 增加计数器
     c.qcount++
     unlock(&c.lock)
     return true
  }

if !block {
     unlock(&c.lock)
     return false
  }

// 将channel数据绑定到当前groutine并使groutine休眠
  // 获取发送数据使用的 Goroutine
  gp := getg()
  // 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,
  // 例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
     mysg.releasetime = -1
  }
  // 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 Goroutine的waiting上,
  // 表示 Goroutine 正在等待该sudog准备就绪
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  // 休眠groutine
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  // 保证传入的数据不被GC
  KeepAlive(ep)

// someone woke us up.
  if mysg != gp.waiting {
     throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if gp.param == nil {
     if c.closed == 0 {
        throw("chansend: spurious wakeup")
     }
     panic(plainError("send on closed channel"))
  }
  gp.param = nil
  if mysg.releasetime > 0 {
     blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil
  releaseSudog(mysg)
  return true
}

  1. 如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前G执行任务,在sendx中拿出一个G,将其数据写入缓冲区尾部并唤醒该G。

  2. 如果sendx不为空且缓冲区为空,直接从sendx中拿出一个G,将G中数据取出并唤醒该G。

  3. 如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。

  4. 如果sendx为空且缓冲区为空,将该G放入recvq,进入休眠,等待被唤醒。

Golang中channel的原理解读(推荐)

源码:


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// block:这次接收是否阻塞
  if debugChan {
     print("chanrecv: chan=", c, "\n")
  }

if c == nil {
     if !block {
        return
     }
     // 从一个空 Channel 接收数据时会直接让出处理器的使用权
     gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
     throw("unreachable")
  }

// Fast path: check for failed non-blocking operation without acquiring the lock.
  if !block && empty(c) {
    // 如果channel为空并且未关闭,直接返回
     if atomic.Load(&c.closed) == 0 {
        return
     }

if empty(c) {
        // The channel is irreversibly closed and empty.
        if raceenabled {
           raceacquire(c.raceaddr())
        }
        if ep != nil {
        // 手动标记清楚对象
           typedmemclr(c.elemtype, ep)
        }
        return true, false
     }
  }

var t0 int64
  if blockprofilerate > 0 {
     t0 = cputicks()
  }

lock(&c.lock)
   //如果channel为空,并且已关闭,说明对象不可达
  if c.closed != 0 && c.qcount == 0 {
     if raceenabled {
        raceacquire(c.raceaddr())
     }
     unlock(&c.lock)
     if ep != nil {
     // 手动标记清除
        typedmemclr(c.elemtype, ep)
     }
     return true, false
  }
   // 如果sendq不为空,直接消费,避免sendq --> queue --> recvx的过程
  if sg := c.sendq.dequeue(); sg != nil {
     recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
     return true, true
  }

// 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中
   // recvx 的索引位置中取出数据进行处理
  if c.qcount > 0 {
     // Receive directly from queue
     qp := chanbuf(c, c.recvx)
     if raceenabled {
        raceacquire(qp)
        racerelease(qp)
     }
     // 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中
     if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
     }
     // 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作
     typedmemclr(c.elemtype, qp)
     c.recvx++
     // recvx位置归零
     if c.recvx == c.dataqsiz {
        c.recvx = 0
     }
     c.qcount-- // 计数减一
     unlock(&c.lock)
     return true, true
  }

if !block {
     unlock(&c.lock)
     return false, false
  }

// 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutine
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
     mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// someone woke us up
  if mysg != gp.waiting {
     throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {
     blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, !closed
}

来源:https://blog.csdn.net/qq_41768400/article/details/120711768

标签:Golang,channel,原理
0
投稿

猜你喜欢

  • MySQL动态修改varchar长度的方法

    2024-01-17 22:27:10
  • 通过AngularJS实现图片上传及缩略图展示示例

    2024-05-02 17:39:52
  • python3.6.3转化为win-exe文件发布的方法

    2021-02-16 05:57:40
  • php中运用http调用的GET和POST方法示例

    2023-11-23 02:39:35
  • python 列表常用方法超详细梳理总结

    2022-04-18 20:34:27
  • 基于.net开发的遵循web标准的个人站点程序包下载

    2023-07-21 12:37:57
  • 图神经网络GNN算法基本原理详解

    2023-08-08 23:53:53
  • 分享很实用的css圆角写法[百度有啊提取]

    2009-01-06 13:05:00
  • PHP常用函数和常见疑难问题解答

    2023-11-08 19:28:17
  • session的存储方式和配置方法介绍

    2022-04-28 05:48:59
  • php session_start()出错原因分析及解决方法

    2024-06-07 15:44:29
  • python time模块时间戳 与 结构化时间详解

    2021-04-09 11:06:42
  • python下读取公私钥做加解密实例详解

    2022-04-17 03:39:00
  • 一文带你了解Go语言标准库math和rand的常用函数

    2024-02-22 07:24:24
  • 配置 Pycharm 默认 Test runner 的图文教程

    2023-12-06 09:03:32
  • Python 短视频爬虫教程

    2022-02-13 00:17:33
  • Python简单实现两个任意字符串乘积的方法示例

    2023-10-29 12:04:40
  • Mysql基础知识点汇总

    2024-01-23 08:15:40
  • ASP实现下载系统防盗链方法

    2008-02-01 14:05:00
  • python读取浮点数和读取文本文件示例

    2022-10-31 16:03:38
  • asp之家 网络编程 m.aspxhome.com