Go Channel 源码剖析

时间:2022-05-06
本文章向大家介绍Go Channel 源码剖析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
0. 引言
这篇文章介绍一下 Golang channel 的内部实现,包括 channel 的数据结构以及相关操作的代码实现。代码版本 go1.9rc1,部分无关代码直接略去,比如 race detect,对应的代码中的 raceenabled。

1. hchan struct
channel 的底层数据结果是 hchan struct。

type hchan struct {

    qcount   uint           // 队列中数据个数

    dataqsiz uint           // channel 大小

    buf      unsafe.Pointer // 存放数据的环形数组

    elemsize uint16         // channel 中数据类型的大小

    closed   uint32         // 表示 channel 是否关闭

    elemtype *_type // 元素数据类型

    sendx    uint   // send 的数组索引

    recvx    uint   // recv 的数组索引

    recvq    waitq  // 由 recv 行为(也就是 <-ch)阻塞在 channel 上的 goroutine 队列

    sendq    waitq  // 由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列



    // lock protects all fields in hchan, as well as several

    // fields in sudogs blocked on this channel.

    //

    // Do not change another G's status while holding this lock

    // (in particular, do not ready a G), as this can deadlock

    // with stack shrinking.

    lock mutex

}

type waitq struct {

    first *sudog

    last  *sudog

}

type sudog struct {

    // The following fields are protected by the hchan.lock of the

    // channel this sudog is blocking on. shrinkstack depends on

    // this for sudogs involved in channel ops.



    g          *g

    selectdone *uint32 // CAS to 1 to win select race (may point to stack)

    next       *sudog

    prev       *sudog

    elem       unsafe.Pointer // data element (may point to stack)



    // The following fields are never accessed concurrently.

    // For channels, waitlink is only accessed by g.

    // For semaphores, all fields (including the ones above)

    // are only accessed when holding a semaRoot lock.



    acquiretime int64

    releasetime int64

    ticket      uint32

    parent      *sudog // semaRoot binary tree

    waitlink    *sudog // g.waiting list or semaRoot

    waittail    *sudog // semaRoot

    c           *hchan // channel

}

复制代码

上面直接对各个字段做了解释。我们可以看到 channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。其中 recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。列表的实现是 sudog,其实就是一个对 g 的结构的封装。

2. make
通过 make 创建 channel 对应的代码如下。

func makechan(t *chantype, size int64) *hchan {

    elem := t.elem



    // compiler checks this but be safe.

    if elem.size >= 1<<16 {

        throw("makechan: invalid channel element type")

    }

    if hchanSize%maxAlign != 0 || elem.align > maxAlign {

        throw("makechan: bad alignment")

    }

    if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {

        panic(plainError("makechan: size out of range"))

    }



    var c *hchan

    if elem.kind&kindNoPointers != 0 || size == 0 {

        // Allocate memory in one call.

        // Hchan does not contain pointers interesting for GC in this case:

        // buf points into the same allocation, elemtype is persistent.

        // SudoG's are referenced from their owning thread so they can't be collected.

        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.

        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))

        if size > 0 && elem.size != 0 {

            c.buf = add(unsafe.Pointer(c), hchanSize)

        } else {

            // race detector uses this location for synchronization

            // Also prevents us from pointing beyond the allocation (see issue 9401).

            c.buf = unsafe.Pointer(c)

        }

    } else {

        c = new(hchan)

        c.buf = newarray(elem, int(size))

    }

    c.elemsize = uint16(elem.size)

    c.elemtype = elem

    c.dataqsiz = uint(size)



    if debugChan {

        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "n")

    }

    return c

}

复制代码

最前面的两个 if 是一些异常判断:元素类型大小限制和对齐限制。第三个 if 也很明显,判断 size 大小是否小于 0 或者过大。int64(uintptr(size)) != size 这句也是判断 size 是否为负。值得一说的是最后面的判断条件

1
uintptr(size) > (_MaxMem-hchanSize)/elem.size

_MaxMem 我在 Golang 内存管理 那篇文章里面说过,这个是 Arena 区域的最大值,用来分配给堆的。也就是说 channel 是在堆上分配的。

再往下就可以看到分配的代码了。如果 channel 内数据类型不含有指针且 size > 0,则将其分配在连续的内存区域。如果 size = 0,实际上 buf 是不分配空间的。

if elem.kind&kindNoPointers != 0 || size == 0 {

    // Allocate memory in one call.

    // Hchan does not contain pointers interesting for GC in this case:

    // buf points into the same allocation, elemtype is persistent.

    // SudoG's are referenced from their owning thread so they can't be collected.

    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.

    c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))

    if size > 0 && elem.size != 0 {

        c.buf = add(unsafe.Pointer(c), hchanSize)

    } else {

        // race detector uses this location for synchronization

        // Also prevents us from pointing beyond the allocation (see issue 9401).

        c.buf = unsafe.Pointer(c)

    }

}

复制代码

除了上面的情况,剩下的,也就是 size > 0,channel 和 channel.buf 是分别进行分配的。剩下的代码是剩下字段的处理。
else {

        c = new(hchan)

        c.buf = newarray(elem, int(size))   // newarray 也是调用 mallocgc 进行内存分配

}

复制代码

总结一下,make chan 的过程是在堆上进行分配,返回是一个 hchan 的指针。

3. send
send 也就是 ch <- x,对应的函数如下。

// entry point for c <- x from compiled code

//go:nosplit

func chansend1(c *hchan, elem unsafe.Pointer) {

    chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))

}



func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    if c == nil {

        if !block {

            return false

        }

        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)

        throw("unreachable")

    }

    ...

    

    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||

        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {

        return false

    }



    var t0 int64

    if blockprofilerate > 0 {

        t0 = cputicks()

    }



    lock(&c.lock)



    if c.closed != 0 {

        unlock(&c.lock)

        panic(plainError("send on closed channel"))

    }



    if sg := c.recvq.dequeue(); sg != nil {

        // Found a waiting receiver. We pass the value we want to send

        // directly to the receiver, bypassing the channel buffer (if any).

        send(c, sg, ep, func() { unlock(&c.lock) }, 3)

        return true

    }



    if c.qcount < c.dataqsiz {

        // Space is available in the channel buffer. Enqueue the element to send.

        qp := chanbuf(c, c.sendx)

        if raceenabled {

            raceacquire(qp)

            racerelease(qp)

        }

        typedmemmove(c.elemtype, qp, ep)

        c.sendx++

        if c.sendx == c.dataqsiz {

            c.sendx = 0

        }

        c.qcount++

        unlock(&c.lock)

        return true

    }



    if !block {

        unlock(&c.lock)

        return false

    }



    // Block on the channel. Some receiver will complete our operation for us.

    gp := getg()

    mysg := acquireSudog()

    mysg.releasetime = 0

    if t0 != 0 {

        mysg.releasetime = -1

    }

    // No stack splits between assigning elem and enqueuing mysg

    // on gp.waiting where copystack can find it.

    mysg.elem = ep

    mysg.waitlink = nil

    mysg.g = gp

    mysg.selectdone = nil

    mysg.c = c

    gp.waiting = mysg

    gp.param = nil

    c.sendq.enqueue(mysg)

    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)



    // someone woke us up.

    if mysg != gp.waiting {

        throw("G waiting list is corrupted")

    }

    gp.waiting = nil

    if gp.param == nil {

        if c.closed == 0 {

            throw("chansend: spurious wakeup")

        }

        panic(plainError("send on closed channel"))

    }

    gp.param = nil

    if mysg.releasetime > 0 {

        blockevent(mysg.releasetime-t0, 2)

    }

    mysg.c = nil

    releaseSudog(mysg)

    return true

}

复制代码

3.1 nil channel
先来看一下 nil channel 的情况,也就是向没有 make 的 channel 发送数据。上篇文章 深入理解 Go Channel 中留了一个问题:向 nil channel 发送数据会报 fatal error: all goroutines are asleep - deadlock! 错误。

if c == nil {

    gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)

    throw("unreachable")

}



//runtime/trace.go

traceEvGoStop            = 16 // goroutine stops (like in select{}) [timestamp, stack]



//runtime/proc.go

// Puts the current goroutine into a waiting state and calls unlockf.

// If unlockf returns false, the goroutine is resumed.

// unlockf must not access this G's stack, as it may be moved between

// the call to gopark and the call to unlockf.

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string,

复制代码

gopark 会将当前 goroutine 休眠,然后通过 unlockf 来唤醒,注意我们上面传入的 unlockf 是 nil,也就是向 nil channel 发送数据的 goroutine 会一直休眠。同理,从 nil channel 读数据也是一样的处理。我们再看一眼上一篇文章的例子。
func main() {

    var x chan int

    go func() {

        x <- 1

    }()

    <-x

}

复制代码

这里一个是 main goroutin 从 nil channel 读数据,进入休眠。go func() 向 nil channel 发送数据,也进入休眠。然后 Go 语言启动的时候还有一个goroutine sysmon 会一直检测系统的运行情况,比如 checkdead()。
func checkdead() {

    ...

    throw("all goroutines are asleep - deadlock!")  // 错误信息就是这里报出来的。

}

复制代码

3.2 closed channel
向 close 的 channel 发送数据,直接 panic。

lock(&c.lock)



if c.closed != 0 {

    unlock(&c.lock)

    panic(plainError("send on closed channel"))

}

复制代码

3.3 发送数据处理
发送数据分三种情况:

有 goroutine 阻塞在 channel 上,此时 hchan.buf 为空:直接将数据发送给该 goroutine。

当前 hchan.buf 还有可用空间:将数据放到 buffer 里面。

当前 hchan.buf 已满:阻塞当前 goroutine。

第一种情况如下。从当前 channel 的等待队列中取出等待的 goroutine,然后调用 send。goready 负责唤醒 goroutine。

lock(&c.lock)



if sg := c.recvq.dequeue(); sg != nil {

    // Found a waiting receiver. We pass the value we want to send

    // directly to the receiver, bypassing the channel buffer (if any).

    send(c, sg, ep, func() { unlock(&c.lock) }, 3)

    return true

}



// send processes a send operation on an empty channel c.

// The value ep sent by the sender is copied to the receiver sg.

// The receiver is then woken up to go on its merry way.

// Channel c must be empty and locked.  send unlocks c with unlockf.

// sg must already be dequeued from c.

// ep must be non-nil and point to the heap or the caller's stack.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

    ... 

    if sg.elem != nil {

        sendDirect(c.elemtype, sg, ep)

        sg.elem = nil

    }

    gp := sg.g

    unlockf()

    gp.param = unsafe.Pointer(sg)

    if sg.releasetime != 0 {

        sg.releasetime = cputicks()

    }

    goready(gp, skip+1)

}

复制代码

第二种情况比较简单。通过比较 qcount 和 dataqsiz 来判断 hchan.buf 是否还有可用空间。除此之后还需要调整一下 sendx 和 qcount。
lock(&c.lock)



if c.qcount < c.dataqsiz {

    // Space is available in the channel buffer. Enqueue the element to send.

    qp := chanbuf(c, c.sendx)

    if raceenabled {

        raceacquire(qp)

        racerelease(qp)

    }

    typedmemmove(c.elemtype, qp, ep)

    c.sendx++

    if c.sendx == c.dataqsiz {

        c.sendx = 0

    }

    c.qcount++

    unlock(&c.lock)

    return true

}

复制代码

第三种情况如下。
// Block on the channel. Some receiver will complete our operation for us.

gp := getg()

mysg := acquireSudog()

mysg.releasetime = 0

if t0 != 0 {

    mysg.releasetime = -1

}



mysg.elem = ep          // 一些初始化工作

mysg.waitlink = nil

mysg.g = gp

mysg.selectdone = nil

mysg.c = c

gp.waiting = mysg

gp.param = nil

c.sendq.enqueue(mysg)   // 当前 goroutine 如等待队列

goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)   //休眠

复制代码

4. recv
读取 channel ( <-c )和发送的情况非常类似。

4.1 nil channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if c == nil {

        if !block {

            return

        }

        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)

        throw("unreachable")

    }

    ...

}

复制代码

4.2 closed channel
从 closed channel 接收数据,如果 channel 中还有数据,接着走下面的流程。如果已经没有数据了,则返回默认值。使用 ok-idiom 方式读取的时候,第二个参数返回 false。

lock(&c.lock)



if c.closed != 0 && c.qcount == 0 {

    if raceenabled {

        raceacquire(unsafe.Pointer(c))

    }

    unlock(&c.lock)

    if ep != nil {

        typedmemclr(c.elemtype, ep)

    }

    return true, false

}

复制代码

4.3 接收数据处理当前有发送 goroutine 阻塞在 channel 上,buf 已满
lock(&c.lock)



if sg := c.sendq.dequeue(); sg != nil {

    // Found a waiting sender. If buffer is size 0, receive value

    // directly from sender. Otherwise, receive from head of queue

    // and add sender's value to the tail of the queue (both map to

    // the same buffer slot because the queue is full).

    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

    return true, true

}

复制代码

buf 中有可用数据
if c.qcount > 0 {

    // Receive directly from queue

    qp := chanbuf(c, c.recvx)

    if raceenabled {

        raceacquire(qp)

        racerelease(qp)

    }

    if ep != nil {

        typedmemmove(c.elemtype, ep, qp)

    }

    typedmemclr(c.elemtype, qp)

    c.recvx++

    if c.recvx == c.dataqsiz {

        c.recvx = 0

    }

    c.qcount--

    unlock(&c.lock)

    return true, true

}

复制代码

buf 为空,阻塞
// no sender available: block on this channel.

gp := getg()

mysg := acquireSudog()

mysg.releasetime = 0

if t0 != 0 {

    mysg.releasetime = -1

}

// No stack splits between assigning elem and enqueuing mysg

// on gp.waiting where copystack can find it.

mysg.elem = ep

mysg.waitlink = nil

gp.waiting = mysg

mysg.g = gp

mysg.selectdone = nil

mysg.c = c

gp.param = nil

c.recvq.enqueue(mysg)

goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

复制代码

5. close
关闭 channel 也就是 close(ch) 对应的代码如下(去掉部分冗余代码)。

func closechan(c *hchan) {

    if c == nil {

        panic(plainError("close of nil channel"))

    }



    lock(&c.lock)

    if c.closed != 0 {

        unlock(&c.lock)

        panic(plainError("close of closed channel"))

    }



    c.closed = 1



    var glist *g



    // release all readers

    for {

        sg := c.recvq.dequeue()

        if sg == nil {

            break

        }

        if sg.elem != nil {

            typedmemclr(c.elemtype, sg.elem)

            sg.elem = nil

        }

        if sg.releasetime != 0 {

            sg.releasetime = cputicks()

        }

        gp := sg.g

        gp.param = nil

        if raceenabled {

            raceacquireg(gp, unsafe.Pointer(c))

        }

        gp.schedlink.set(glist)

        glist = gp

    }



    // release all writers (they will panic)

    for {

        sg := c.sendq.dequeue()

        if sg == nil {

            break

        }

        sg.elem = nil

        if sg.releasetime != 0 {

            sg.releasetime = cputicks()

        }

        gp := sg.g

        gp.param = nil

        if raceenabled {

            raceacquireg(gp, unsafe.Pointer(c))

        }

        gp.schedlink.set(glist)

        glist = gp

    }

    unlock(&c.lock)



    // Ready all Gs now that we've dropped the channel lock.

    for glist != nil {

        gp := glist

        glist = glist.schedlink.ptr()

        gp.schedlink = 0

        goready(gp, 3)

    }

}

复制代码

close channel 的工作除了将 c.closed 设置为 1。还需要:

唤醒 recvq 队列里面的阻塞 goroutine

唤醒 sendq 队列里面的阻塞 goroutine

处理方式是分别遍历 recvq 和 sendq 队列,将所有的 goroutine 放到 glist 队列中,最后唤醒 glist 队列中的 goroutine。

6. select channel
golang 中的 select 语句的实现,在 runtime/select.go 文件中,这篇文章并不打算看 select 的实现。我们要看的是 select 和 channel 一起用的时候。

select {

case c <- x:

    ... foo

default:

    ... bar

}

复制代码

会被编译为
if selectnbsend(c, v) {

    ... foo

} else {

    ... bar

}

复制代码

对应 selectnbsend 函数如下
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {

    return chansend(c, elem, false, getcallerpc(unsafe.Pointer(&c)))

}

复制代码

select {

case v = <-c

    ... foo

default:

    ... bar

}

复制代码

会被编译为
if selectnbrecv(&v, c) {

    ... foo

} else {

    ... bar

}

复制代码

对应 selectnbrecv 函数如下。
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {

    selected, _ = chanrecv(c, elem, false)

    return

}

复制代码

select {

case v, ok = <-c:

    ... foo

default:

    ... bar

}

复制代码

会被编译为
if c != nil && selectnbrecv2(&v, &ok, c) {

    ... foo

} else {

    ... bar

}

复制代码

对应 selectnbrecv2 函数如下。
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {

    // TODO(khr): just return 2 values from this function, now that it is in Go.

    selected, *received = chanrecv(c, elem, false)

    return

}

复制代码

7. 总结
Golang 的 channel 实现集中在文件 runtime/chan.go 中,本身的代码不是很复杂,但是涉及到很多其他的细节,比如 gopark 等,读起来还是有点费劲的。

8. 参考
Go Source Code 1.9rc1