传指针:共享内存;
传值:共享数据;
无状态、无边界的数据(数据不处于goroutine的生命周期内)
无缓存的 channel 是同步的,即;有缓冲的 channel 是非同步的;
写入/读取 nil channel(即未初始化的channel):
- 阻塞:goroutine 被永久挂起;
- 非阻塞:返回 false;
经典原理
如果 recv 的 goroutine 在等待,则有数据传入时直接 copy 给等待的goroutine,而不是加锁后写入 channel,提升效率;
用途:
csp
:以通信的方式共享内存;
goroutine 间传递 数据信息 或者 控制信号;
用于同步;
返回只读channel: func channel() <-chan struct{}
返回只写channel: func channel() chan<- struct{}
返回读写channel: func channel() chan struct{}
线程安全:
内部实现锁机制,多goroutine并发访问是安全的;
顺序一致:
通过channel发送的数据的顺序,是按照发送到channel上的时间顺序排列的,满足先入先出;
源码分析
构造函数
通过源码可直观看到 3 种类型的 channel:
- men 为 0,无缓冲区 or struct{} 类型,则分配内存大小 mem 为 0;
- 指针类型;
- 非指针类型(struct)
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
...
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// 非指针类型
// 分配 && 调整 channel 的 buf 指向 mem 的位置
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 分别申请 chan 和 buf 空间,两者不需要连续(因为存的也是指针)
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
写流程
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
// 快速路径:检查未获取锁的非阻塞操作是否失败。
// 两次读取此处,如果 channel 被关闭了,
// 即:非阻塞 + 未关闭 + 缓冲区已满 (均为原子操作)
if !block && c.closed == 0 && full(c) {
return false
}
...
}
分支1:存在阻塞读 goroutine
lock(&c.lock)
// 为快路径兜底
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 直接将消息传递给 消费者,不需要进入缓冲区再取 造成额外的开销;
// 基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine;
// send 内部会解锁,即函数签名应该是 sendL 语义;
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// sg:消费者;unlockf:发送完成后解锁 chan 的 func;
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
// 如果 sg 的 elem 字段(即存储接收值的位置)不为空
// 即当前 case 命中分支,直接将数据复制过去
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
...
// 将该消费者 goroutine 标记为已准备好运行;(skip 用于跟踪堆栈,暂不了解)
goready(gp, skip+1)
}
注:
gp
:指向 goroutine 的指针;
sg
:sudog,用于维护等待的 goroutine 相关元信息,管理 chan 上阻塞的 send 和 recv 操作;
gp.param
用于 goroutine 恢复执行时,向其传递数据,如本处将 sudog 指针传回执行的 goroutine;
c.dataqsiz == 0
:无缓冲的 channel
分支2:不存在阻塞的读 goroutine,且环形缓冲区未满
lock(&c.lock)
...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 将当前元素添加到 环形缓冲区 sendx 对应位置
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
分支3:不存在阻塞的读 goroutine,且环形缓冲区已满
lock(&c.lock)
...
// 获取当前的 g
gp := getg()
// 构建并封装当前 goroutine 的 sudog 对象;
mysg := acquireSudog()
...
// 建立 sudog、goroutine、channel 间指向关系
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将 sudog 添加到当前 goroutine 的阻塞写 goroutine 队列中
c.sendq.enqueue(mysg)
...
// 挂起当前 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
...
// 唤醒后的恢复
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
// true: 通道被关闭,但是状态错误设置
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
注:阻塞后恢复,是在对应的 recv方法(可见分支1) 或者 对应 send方法(消费阻塞恢复)中传递数据,而传递的方法都是直接复制内存,即:sendDirect 和 recvDirect 方法,看上去底层实现是在双方的栈上直接复制,外加一些处理内存处理(暂不展开,记个 xtodo);
读流程
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock)
// chan 已关闭
// 缓冲区无数据则直接返回,有则说明此时 无 阻塞写goroutine
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
...
}
...
}
核心注意点:
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
// 只有当 runtime 出现 bug了,才会执行 throw
// 本处意为理论上不可达的行
throw("unreachable")
}
// 如果 chan 缓冲区无数据、且不阻塞,则直接返回
if !block && empty(c) {
// 注意本处顺序,先判断 无数据 再判断 chan 是否被关闭
// 同样是因为 chan 不会从 close 状态回溯
// 所以本处 chan 未关闭,代表空数据时一定也没关闭,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
注意:如果是 12 行和 16 行的判断顺序交换,会出现丢失数据的 bug,即:
G1 写入一个消息后,立刻关闭 chan,G2 此时判断 closed 为 1,且消息尚未完成写入(G1 写入的数据尚未对 G2 可见),但是 G2 判断 empty 为 true,则直接返回,导致写入延迟的数据丢失;
所以源码的控制流是:先检查是否无数据,后检查 chan 是否关闭,未关闭则依赖于 closed 不会回溯的特性,直接返回,已关闭则需要再次判断是否无数据,只有没有数据时才能提前返回;
分支1:有阻塞的写goroutine
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
分支2:无阻塞的写goroutine,且缓冲区有数据
if c.qcount > 0 {
// 直接从环形缓冲区拿,即 recvx 对应的元素
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 防阻塞
if !block {
unlock(&c.lock)
return false, false
}
分支3:无阻塞写goroutine,且缓冲区无数据
gp := getg()
mysg := acquireSudog()
...
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 把 sudog 添加到当前 channel 的阻塞读goroutine队列中
c.recvq.enqueue(mysg)
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
两种接收格式
a := <- ch
b ok := <- ch
看源码解释,貌似是通过编译阶段,根据不同格式将 对应代码汇编为不同的方法,即源码中 v1 和 v2 两个方法:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
阻塞与非阻塞模式
非阻塞写入 chan ,几乎唯一的场景:即 select + default
;
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
同样通过编译改变实际执行的方法;
源码的解释也很通俗易懂,select 被编译为类似如下的控制流:
if selected, ok = selectnbrecv(&v, c); selected {
... foo
} else {
... bar
}
通过传递参数指针,获取读取的消息;
关闭
关闭channel后,接收者可以从中接受剩余数据,然后解除阻塞(即不会丢失数据),如果在此之后再尝试读取已关闭的channel,会返回零值和false;
如果对 已经关闭的 channel 发送数据,会引发panic;(包括close时的阻塞中的写操作)
必须引发panic的原因:当关闭一个channel时,它意味着你已经声明这个通道不再需要传递更多的值了,这是一种信号,告诉接收方不再等待更多的数据,可以停止阻塞并执行后续的操作,但关闭通道并不意味着其中的值会立即被销毁或清空,而是保留在通道内等待被接收;
如果尝试向一个已经关闭的通道写入数据,Go 语言的运行时系统会检测到这个非法操作,并在运行时抛出 panic;这是因为向已关闭的通道写入数据是一种违反通道的使用规则的行为,防止接收方出现差错;这个设计决定是为了保证程序的安全性和稳定性;
例如在多个 Goroutine 中共享同一个通道,而某个 Goroutine 在关闭通道后仍然尝试向通道发送数据,或者误以为通道未关闭而尝试发送数据等,这些错误可能会导致程序在运行时崩溃,并且可能很难追踪和调试; ```go func closechan(c *hchan) { if c == nil { // 关闭未初始化过的 channel 会触发 panic panic(plainError(“close of nil channel”)) }
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
// 重复关也会 panic
panic(plainError("close of nil channel"))
}
c.close = 1
var glist gList
// 将 阻塞读 goroutine 队列中的 g 统一添加到 glist
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 将 阻塞写 goroutine 队列中的 g 统一添加到 glist;(会 panic)
for {
sg := c.sendq.dequeue()
if sg = nil {
break
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = fase
glist.push(gp)
}
unlock(&c.lock)
// 唤醒 glist 当中所有 goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
} } ```
阻塞时期
- 向 一个没有缓冲区 的 channel 发送数据时,如果没有对应的 goroutine 正在接收 channel 上的数据,那么发送方将会持续阻塞,直到数据被成功接收;
- 从 一个没有缓冲区 的 channel 接收数据时,如果没有对应的 goroutine 正在向 channel 发送数据,那么接收方将会持续阻塞,直到channel上有数据可供接收;
- 向 一个已满的有缓冲区 的 channel 发送数据时,发送方将会被阻塞,直到接收方从channel中取走数据,从而为缓冲区腾出空间;
- 从 一个空的有缓冲区的 channel 接收数据时,接收方将会被阻塞,直到有数据发送到 channel;
数据结构
type hchan struct {
qcount uint // chan中的元素数量
dataqsiz uint // chan中的元素容量
buf unsafe.Pointer // 环形数组,存储缓存数据
elemsize uint16 // 元素类型大小
closed uint32 // chan是否被关闭
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
type waitq struct {
first *sudog // 阻塞队列首部
last *sudog // 阻塞队列尾部
}
// 装饰goroutine
type sudog struct {
g *g
next *sudog // 双向队列指针
prev *sudog // 指针
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool // 判断channel是否处于select下,防止直接阻塞了
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}