深入理解Golang channel的应用

channel是用于 goroutine 之间的同步、通信的数据结构。它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全。本文通过示例为大家详细介绍了channel的应用,需要的可以参考一下

前言

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

type hchan struct { qcount   uint           // total data in the queue dataqsiz uint           // size of the circular queue buf      unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed   uint32 elemtype *_type // element type sendx    uint   // send index recvx    uint   // receive index recvq    waitq  // list of recv waiters sendq    waitq  // list of send waiters lock mutex } 

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

// 无缓冲通道 ch1 := make(chan int) // 有缓冲通道 ch2 := make(chan int, 10) 

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan { elem := t.elem // mem:缓冲区大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size <0 { panic(plainerror( "makechan: size out of range" )) } var c *hchan switch>

发送

执行以下代码时:

ch <- 3 

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果channel是空 if c == nil { // 非阻塞,直接返回 if !block { return  false } // 否则阻塞当前协程 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,没有关闭,且容量满了,无法发送,直接返回 if !block && c.closed == 0 && full(c) { return  false } // 加锁 lock(&c.lock) // 如果已经关闭,无法发送,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError( "send on closed channel" )) } // 从接收队列弹出一个协程的包装结构sudog if sg := c.recvq.dequeue(); sg != nil { // 如果能弹出,即有等到接收的协程,说明: // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待 // 将要发送的数据拷贝到该协程的接收指针上 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return  true } // 缓冲区还有空间 if c.qcount  0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) // 被唤醒后发现channel关闭了,panic if closed { if c.closed == 0 { throw( "chansend: spurious wakeup" ) } panic(plainError( "send on closed channel" )) } return  true } 

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct { // 协程 g *g // 前一个,后一个指针 next *sudog prev *sudog // 等到发送的数据在哪,等待从哪个位置接收数据 elem unsafe.Pointer acquiretime int64 releasetime int64 ticket      uint32 isSelect bool success bool parent   *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot // 在哪个channel上等待 c        *hchan // channel } 

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

func full(c *hchan) bool { // 无缓冲 if c.dataqsiz == 0 { // 并且没有其他协程在等待 return c.recvq.first == nil } // 有缓冲,但容量装满了 return c.qcount == c.dataqsiz } 

2.send方法:

/** c:要操作的channel sg:弹出的接收者协程 ep:要发送的数据在的位置 */ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒该接收者协程 goready(gp, skip+1) } 

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } 

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw( "unreachable" ) } // 非阻塞,并且channel为空 if !block && empty(c) { // 如果还没关闭,直接返回 if atomic.Load(&c.closed) == 0 { return } // 否则已经关闭, // 如果为空,返回该类型的零值 if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return  true, false } } lock(&c.lock) // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return  true, false } // 如果有发送者正在阻塞,说明: // 1.无缓冲 // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待 if sg := c.sendq.dequeue(); sg != nil { // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return  true, true } // 如果缓存区有数据, if c.qcount > 0 { // qp为缓冲区中下一次接收的位置 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return  true, true } // 接下来就是既没有发送者在等待,也缓冲区也没数据 if !block { unlock(&c.lock) return  false, false } // 将当前协程包装成sudog,阻塞到channel中 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 记录接收地址 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2) // 从这里唤醒 if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return  true, success } 

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

/** c:操作的channel sg:阻塞的发送协程 ep:接收者接收数据的地址 */ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } // 接下来是有缓冲,且缓冲区满的情况 } else { // qp为channel缓冲区中,接收者下一次接收的地址 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者的数据从sg.elem拷贝到qp typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送者 goready(gp, skip+1) } 

关闭

func closechan(c *hchan) { // 不能关闭空channel if c == nil { panic(plainError( "close of nil channel" )) } lock(&c.lock) // 不能重复关闭 if c.closed != 0 { unlock(&c.lock) panic(plainError( "close of closed channel" )) } // 修改关闭状态 c.closed = 1 var glist gList // 释放所有的接收者协程,并为它们赋予零值 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } // 释放所有的发送者协程 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock) // 执行唤醒操作 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } 

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel

以上就是深入理解Golang channel的应用的详细内容,更多关于Golang channel的资料请关注0133技术站其它相关文章!

以上就是深入理解Golang channel的应用的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » 其他教程