Go channel实现原理分析
作者:天亮^说晚安- 时间:2024-05-05 09:30:29
channel
单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
  channel(管道)底层是一个环形队列(先进先出),send(插入)和recv(取走)从同一个位置沿同一个方向顺序执行。sendx表示最后一次插入元素的位置,recvx表示最后一次取走元素的位置。
var ch chan int //管道的声明
ch = make(chan int, 8) //管道的初始化,环形队列里可容纳8个int
ch <- 1 //往管道里写入(send)数据
ch <- 2
ch <- 3
ch <- 4
ch <- 5
v := <-ch //从管道里取走(recv)数据
fmt.Println(v)
v = <-ch
fmt.Println(v)
channel类型
channel是一种类型,一种引用类型。声明通道类型的格式如下:
var 变量 chan 元素类型
举几个例子:
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
创建channel
通道是引用类型,通道类型的空值是nil。
var ch chan int
fmt.Println(ch) // <nil>
声明的通道后需要使用make函数初始化之后才能使用。
创建channel的格式如下:
make(chan 元素类型, [缓冲大小])
channel的缓冲大小是可选的。
举几个例子:
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
channel操作
通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。
现在我们先使用以下语句定义一个通道:
ch := make(chan int)
发送
将一个值发送到通道中。
ch <- 10 // 把10发送到ch中
接收
从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
关闭
我们通过调用内置的close函数来关闭通道。
close(ch)
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
对一个关闭的通道再发送值就会导致panic。
对一个关闭的通道进行接收会一直获取值直到通道为空。
对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
关闭一个已经关闭的通道会导致panic。
无缓冲的通道
无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
上面这段代码能够通过编译,但是执行的时候会出现以下错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
为什么会出现deadlock错误呢?
因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。
上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?
一种方法是启用一个goroutine去接收值,例如:
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
有缓冲的通道
解决上面问题的方法还有一种就是使用有缓冲区的通道。
我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:
func main() {
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。
close()
可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}
如何优雅的从通道循环取值
当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?
我们来看下面这个例子:
// channel 练习
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启goroutine将0~100的数发送到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
go func() {
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}
从上面的例子中我们看到有两种方式在接收值的时候判断通道是否被关闭,我们通常使用的是for range的方式。
单向通道
有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。
Go语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
其中,
1.chan<- int是一个只能发送的通道,可以发送但是不能接收;
2.<-chan int是一个只能接收的通道,可以接收但是不能发送。
在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。
read_only := make (<-chan int) //定义只读的channel
write_only := make (chan<- int) //定义只写的channel
  定义只读和只写的channel意义不大,一般用于在参数传递中。
//只能向channel里写数据
func send(c chan<- int) {
c <- 1
}
//只能取channel中的数据
func recv(c <-chan int) {
_ = <-c
}
//返回一个只读channel
func (c *Context) Done() <-chan struct{} {
return nil
}
通道遍历
  可以通过for range的方式遍历管道,遍历前必须先关闭管道,禁止再写入元素。
close(ch) //遍历前必须先关闭管道,禁止再写入元素
//遍历管道里剩下的元素
for ele := range ch {
fmt.Println(ele)
}
  slice、map和channel是go语言里的3种引用类型,都可以通过make函数来进行初始化(申请内存分配)。因为它们都包含一个指向底层数据结构的指针,所以称之为“引用”类型。引用类型未初始化时都是nil,可以对它们执行len()函数,返回0。
异步通道
异步管道
asynChann := make(chan int, 8)
  channel底层维护一个环形队列(先进先出),make初始化时指定队列的长度。队列满时,写阻塞;队列空时,读阻塞。sendx指向下一次写入的位置, recvx指向下一次读取的位置。 recvq维护因读管道而被阻塞的协程,sendq维护因写管道而被阻塞的协程。
 同步管道可以认为队列容量为0,当读协程和写协程同时就绪时它们才会彼此帮对方解除阻塞。
syncChann := make(chan int)
  channel仅作为协程间同步的工具,不需要传递具体的数据,管道类型可以用struct{}。空结构体变量的内存占用为0,因此struct{}类型的管道比bool类型的管道还要省内存。
sc := make(chan struct{})
sc <- struct{}{}
关于channel的死锁与阻塞
Channel满了,就阻塞写;Channel空了,就阻塞读。
阻塞之后会交出cpu,去执行其他协程,希望其他协程能帮自己解除阻塞。
如果阻塞发生在main协程里,并且没有其他子协程可以执行,那就可以确定“希望永远等不来”,自已把自己杀掉,报一个fatal error:deadlock出来。
如果阻塞发生在子协程里,就不会发生死锁,因为至少main协程是一个值得等待的“希望”,会一直等(阻塞)下去。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{}, 1)
ch <- struct{}{} //有1个缓冲可以用,无需阻塞,可以立即执行
go func() { //子协程1
time.Sleep(5 * time.Second) //sleep一个很长的时间
<-ch //如果把本行代码注释掉,main协程5秒钟后会报fatal error
fmt.Println("sub routine 1 over")
}()
ch <- struct{}{} //由于子协程1已经启动,寄希望于子协程1帮自己解除阻塞,所以会一直等子协程1执行结束。如果子协程1执行结束后没帮自己解除阻塞,则希望完全破灭,报出deadlock
fmt.Println("send to channel in main routine")
go func() { //子协程2
time.Sleep(2 * time.Second)
ch <- struct{}{} //channel已满,子协程2会一直阻塞在这一行
fmt.Println("sub routine 2 over")
}()
time.Sleep(3 * time.Second)
fmt.Println("main routine exit")
}
send to channel in main routine
sub routine 1 over
main routine exit
关闭channel
只有当管道关闭时,才能通过range遍历管道里的数据,否则会发生fatal error。
管道关闭后读操作会立即返回,如果缓冲已空会返回“0值”。
ele, ok := <-ch ok==true代表ele是管道里的真实数据。
向已关闭的管道里send数据会发生panic。
不能重复关闭管道,不能关闭值为nil的管道,否则都会panic。
package main
import (
"fmt"
"time"
)
var cloch = make(chan int, 1)
var cloch2 = make(chan int, 1)
func traverseChannel() {
for ele := range cloch {
fmt.Printf("receive %d\n", ele)
}
fmt.Println()
}
func traverseChannel2() {
for {
if ele, ok := <-cloch2; ok { //ok==true代表管道还没有close
fmt.Printf("receive %d\n", ele)
} else { //管道关闭后,读操作会立即返回“0值”
fmt.Printf("channel have been closed, receive %d\n", ele)
break
}
}
}
func main() {
cloch <- 1
close(cloch)
traverseChannel() //如果不close就直接通过range遍历管道,会发生fatal error: all goroutines are asleep - deadlock!
fmt.Println("==================")
go traverseChannel2()
cloch2 <- 1
close(cloch2)
time.Sleep(10 * time.Millisecond)
}
  channel在并发编程中有多种玩法,经常用channel来实现协程间的同步。
package main
import (
"fmt"
"time"
)
func upstream(ch chan struct{}) {
time.Sleep(15 * time.Millisecond)
fmt.Println("一个上游协程执行结束")
ch <- struct{}{}
}
func downstream(ch chan struct{}) {
<-ch
fmt.Println("下游协程开始执行")
}
func main() {
upstreamNum := 4 //上游协程的数量
downstreamNum := 5 //下游协程的数量
upstreamCh := make(chan struct{}, upstreamNum)
downstreamCh := make(chan struct{}, downstreamNum)
//启动上游协程和下游协程,实际下游协程会先阻塞
for i := 0; i < upstreamNum; i++ {
go upstream(upstreamCh)
}
for i := 0; i < downstreamNum; i++ {
go downstream(downstreamCh)
}
//同步点
for i := 0; i < upstreamNum; i++ {
<-upstreamCh
}
//通过管道让下游协程开始执行
for i := 0; i < downstreamNum; i++ {
downstreamCh <- struct{}{}
}
time.Sleep(10 * time.Millisecond) //等下游协程执行结束
}
通道总结
channel常见的异常总结,如下图:
注意:关闭已经关闭的channel也会引发panic。
来源:https://blog.csdn.net/m0_52896752/article/details/129794582