再次探讨go实现无限 buffer 的 channel方法
作者:机智的小小帅 发布时间:2024-02-10 23:34:54
前言
总所周知,go
里面只有两种 channel
,一种是 unbuffered channel, 其声明方式为
ch := make(chan interface{})
另一种是 buffered channel,其声明方式为
bufferSize := 5
ch := make(chan interface{},bufferSize)
对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 bufferSize 。
jojo,buffer channel 的大小是有极限的,我不做 channel 了。
一旦 channel
满了的话,再往里面添加元素的话,将会阻塞。
so how can we make a infinite buffer channel?
本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。
实现
接口的设计
首先当然是建一个 struct
,在百度翻译的帮助下,我们将这个 struct
取名为 InfiniteChannel
type InfiniteChannel struct {
}
思考一下 channel
的核心行为,实际上就两个,一个流入(Fan in),一个流出(Fan out),因此我们添加如下几个 method。
func (c *InfiniteChannel) In(val interface{}) {
// todo
}
func (c *InfiniteChannel) Out() interface{} {
// todo
}
内部实现
通过 In()
接收的数据,总得需要一个地方来存放。我们可以用一个 slice
来存放,就算用 In()
往里面添加了很多元素,也可以通过 append()
来拓展 slice
,slice
的容量可以无限拓展下去(内存足够的话),所以 channel
也是 infinite
。 InfiniteChannel
的第一个成员就这么敲定下来的。
type InfiniteChannel struct {
data []interface{}
}
用户调用 In()
和 Out()
时,可能是并发的环境,在 go
中如何进行并发编程,最容易想到的肯定是 channel
了,因此我们在内部准备两个 channel
,一个 inChan
,一个 outChan
,用 inChan
来接收数据,用 outChan
来流出数据。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan <- val
}
func (c *InfiniteChannel) Out() interface{} {
return <-c.outChan
}
其中, inChan
和 outChan
都是 unbuffered channel。
此外,也肯定是需要一个 select
来处理来自 inChan
和 outChan
身上的事件。因此我们另起一个协程,在里面做 select
操作。
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChan <- c.pop():// pop() 将取出队列的首个元素
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
}
go c.background()// 注意这里另起了一个协程
return c
}
ps:感觉这也算是 go 并发编程的一个套路了。即
在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inChan 和 outChan),不会操作 struct 内的其他数据(在本例中,In() 和 Out() 都没有直接操作 data)。
触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。
pop()
的实现也非常简单。
// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
测试一下
用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// out
<nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0
可以看到,将 InfiniteChannel
内没有数据可供消费时,调用 Out()
将会返回一个 nil
,不过这也在我们的意料之中,原因是 pop()
在队列为空时,将会返回 nil。
目前 InfiniteChannel
的行为与标准的 channel
的行为是有出入的,go
中的 channel
,在没有数据却仍要取数据时会被阻塞,如何实现这个效果?
优化
我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。
首先把原来的 background()
摘出来
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChan <- c.pop():
}
}
}
对 outChan
进行一个简单封装
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() <- c.pop():
}
}
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
return c.outChan
}
目前为止,一切照旧。
点睛之笔来了:
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if len(c.data) == 0 {
return nil
}
return c.outChan
}
在 c.data
为空的时候,返回一个 nil
在 background()
中,当执行到 case c.outChan <- c.pop():
时,实际上将会变成:
case nil <- nil:
在 go
中,是无法往一个 nil
的 channel
中发送元素的。例如
func main() {
var c chan interface{}
select {
case c <- 1:
}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
var c chan interface{}
select {
case c <- 1:
default:
fmt.Println("hello world")
}
}
// hello world
因此,对于
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() <- c.pop():
}
将会一直阻塞在 select
那里,直到 inChan
来了数据。
再测试一下
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
最后,程序 panic
了,因为死锁了。
补充
实际上 channel
除了 In()
和 Out()
外,还有一个行为,即 close()
,如果 channel close 后,依旧从其中取元素的话,将会取出该类型的默认值。
func main() {
c := make(chan interface{})
close(c)
for true {
v := <-c
fmt.Println(v)
time.Sleep(time.Second)
}
}
// output
// <nil>
// <nil>
// <nil>
// <nil>
func main() {
c := make(chan interface{})
close(c)
for true {
v, isOpen := <-c
fmt.Println(v, isOpen)
time.Sleep(time.Second)
}
}
// output
// <nil> false
// <nil> false
// <nil> false
// <nil> false
我们也需要实现相同的效果。
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := <-c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() <- c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
// 这里添加了对 c.isOpen 的判断
if c.isOpen && len(c.data) == 0 {
return nil
}
return c.outChan
}
再测试一下
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close()// 这里调用了 Close
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// output
012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0
符合预期
遗憾
目前看上去已经很完美了,但是和标准的 channel
相比,仍然有差距。因为标准的 channel
是有这种用法的
v,isOpen := <- ch
可以通过 isOpen
变量来获取 channel
的开闭情况。
因此 InfiniteChannel
也应该提供一个类似的 method
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
// todo
}
可惜的是,要想得知 InfiniteChannel
是否是 Open
的,就必定要访问 InfiniteChannel
内的 isOpen
成员。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
而 isOpen
并非 channel
类型,根据之前的套路,这种非 channel
类型的成员只应该被 select
协程访问。一旦有多个协程访问,就会出现并发问题,除非加锁。
我不能接受!所以干脆不提供这个 method 了,嘿嘿。
完整代码
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close()
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan <- val
}
func (c *InfiniteChannel) Out() interface{} {
return <-c.outChan
}
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := <-c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() <- c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if c.isOpen && len(c.data) == 0 {
return nil
}
return c.outChan
}
参考
https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
来源:https://www.cnblogs.com/XiaoXiaoShuai-/p/14878525.html
猜你喜欢
- mysql存储过程delimiter $DROP FUNCTION IF EXISTS `fun_convert`$CREATE DEFIN
- 为什么要使用Enum.(Why?)在普通类别中,枚举和我们在对象中定义的类变量一样的,每一个类变量就是一个枚举项,访问方式如下:class
- 前言学习Python的过程中,比较喜欢通过实际的小项目进行巩固学习,决定写一个弹跳小球的程序。这个实战例程是在公众号上看到的,他的编写过程比
- pandas中有时需要按行依次对.csv文件读取内容,那么如何进行呢?我们来完整操作一遍,假设我们已经有了一个.csv文件。# 1.导入包i
- MySQL行转列操作 所谓的行转列操作,就是将一个表的行信息转化为列信息,说着可能比较笼统,这里先举个例
- 点工具栏中〔显示估计的查询计划〕,结果提示Documents and Settings\XXX\Local Settings\Temp\1\
- mtx文件是按照稀疏矩阵格式存储的矩阵数据,可以按照以下步骤读取:1、安装scanpy包pip install scanpy2、文件读取im
- 在支持FSO的情况下,可以显示本站内的所有ASP页面的代码适用于代码演示时在效果页面上直接显示该页面的代码而不用再对代码制作专门的页面使用方
- 自从python2.2提供了yield关键字之后,python的生成器的很大一部分用途就是可以用来构建协同程序,能够将函数挂起返回中间值并能
- 形参可以设置参数默认值,设置遵循从右至左原则例如:fun(x=0,y=1),fun(x,y=1),但不可以是fun(x=1,y)形参设置可以
- 用了一段时间的 typescript 之后,深感中大型项目中 typescript 的必要性,它能够提前在编译期避免许多 bug,如很恶心的
- plt.subplot()plt.subplot(nrows, ncols, index, **kwargs)第一个参数:*args (官网
- 1.函数array() 功能:创建一个数组变量 格式:array(list) 参数:list为数组变量中的每个数值列,中间用逗号间隔 例子:
- 转自: http://www.qqread.com/mysql/z442108305.html对于程序开发人员而言,目前使用最流行的两种后台
- 最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):def gradient_desce
- 本文针对MySQL数据库基本操作进行学习研究,需要了解的朋友不要错过这篇文章。以下均是在Windows 64位操作系统下的命令行使用。学习之
- @Author:Runsen1876年,亚历山大·格雷厄姆·贝尔(Alexander Graham Bell)发明了一种电报机,可以通过电线
- 判断python中的一个字符串是否为空,可以使用如下方法1、使用字符串长度判断len(s) ==0 则字符串为空#!/user/local/
- 一、旧版本(1.0以下)的卷积函数:tf.nn.conv2d在tf1.0中,对卷积层重新进行了封装,比原来版本的卷积层有了很大的简化。con
- 前言为了数据安全,数据库需要定期备份,这个大家都懂,然而数据库备份的时候,最怕写操作,因为这个最容易导致数据的不一致,松哥举一个简单的例子大