golanfg Channel
标签搜索

golanfg Channel

BeforeIeave
2022-07-27 / 0 评论 / 172 阅读 / 正在检测是否收录...

1 内存于通信

不要通过共享内存的方式通信,而是应该通过通信的方式共享内存
例如不要使用传输变量地址的方式检测变量的值,而可以用管道直接判断因为管道有内容才会允许被取值。

1.1 优势

1.可以避免协程竞争和数据冲突的问题。
2.管道示更高级的抽象,可以降低开发难度,增加程序的可读性。
3.模块之间更易解耦,增强可扩展性和可维护性。

2 channel

type hchan struct {
    //缓存区的构成-5行
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32                //状态值 0-开启  1-关闭
    elemtype *_type // element type
    
    //指向链表的当前执行的位置
    sendx    uint   // send index
    recvx    uint   // receive index
    //发送和接收的链表
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    
    //保护此结构体的所有字段--任何操作结构体的函数都要加锁-目的是保护结构体本身
    lock      mutex

2.1 channel缓存区的构建

channel1.png
channel2.png

2.2 底层原理

2.2.1 语法糖

<-关键字其实是一个语法糖,编译的时候会变为runtime.chansend1()。

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

->也是语法糖,编译阶段i<- c 转化为runtime.chanrecv1(); i, ok <- c转化为runtime.chanrecv2()。最终还是调用chanrecv()方法。

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

2.2.2 channel发送的情形

直接发送

在数据发送之前,已经有协程G在等待接收了。则先唤醒协程,再将数据直接拷贝

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {            
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }        //是否为空的判断
    if debugChan {
        print("chansend: chan=", c, "\n")
    }
    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
    }
    if !block && c.closed == 0 && full(c) {
        return false
    }
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    //上锁
    lock(&c.lock)
    //判断channel是否被关闭
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    //从等待队列中取协程
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    //若等待队列无法取出协程则存入缓存
    //缓存
    //...

chansend() 调用了 send()

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)        //发送数据
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    //与gopark()相反,作用是唤醒协程
    goready(gp, skip+1)
}

send() 调用 sendDirect()

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    //sudog结构体的elem指向的是传入参数的地址
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)

    memmove(dst, src, t.size)        //内存移动-直接拷贝
}

放入缓存

没有协程G接收数据,但是有数据且有缓存空间。
chansend()的剩余代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //...
    //之上为直接发送
    //缓存区已缓存的数量 < 总量
if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        //参数移动到缓存单元
        typedmemmove(c.elemtype, qp, ep)
        //维护数据
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    //缓存区已满
    //...
}

休眠等待

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    
    //...
    //缓存已满 
    if !block {
        unlock(&c.lock)
        return false
    }
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()                //拿到自己的协程结构体
    mysg := acquireSudog()        //包装自己
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    //将自己包装成sudog结构体--协程
    //将自己放入等待队列
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    //进入休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    KeepAlive(ep)
    //被唤醒之后只需维护数据-以下的代码,不用接收数据---已经被取走了
    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
return true
}

2.2.3 channel的数据接收

有等待的协程,从协程接收数据

数据接收前已经有协程等待发送,且没有缓存时:直接拷贝数据然后唤醒send的协程。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    //block指是否阻塞
    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // Fast path: check for failed non-blocking operation without acquiring the lock.
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
    //是否已经被关闭
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    //再发送队列中拿到协程
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

chanrecv() 调用 recv() 函数

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    //判断缓存区是否为空
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)            
        }
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)    //缓存区为空则直接接收--调用内存移动函数
        }
    } else {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    //维护协程状态
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)        //唤醒发送协程--ta不用再发送了
}

有等待的协程,从缓存接收

channel有缓存,自己将缓存中的数据拿走再将发送队列等待的协程的数据放入缓存中最后再唤醒发送的协程
recv() 之后的部分:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)            
        }
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)    //缓存区为空则直接接收--调用内存移动函数
        }
    } else {        //直接取值的情形,else-有缓存
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)    //直接取出缓存的数据
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)    //将等待发送数据协程的数据移至缓存空间
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    //维护协程状态
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)        //唤醒发送协程--ta不用再发送了
}

没有等待携程,接收缓存

没有协程再发送队列等待,且channel有缓存。’

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    //..
    //队列中没有协程,缓存中有
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)    //将数据移动到接收地址
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

没有等待协程,阻塞接收

没有任何数据存在,自己进入接收队列等待。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    //..
    //前面没有任何可取的数据
    if !block {
        unlock(&c.lock)
        return false, false
    }
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)            //包装完成后将自己放入接收队列中
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    //休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up--唤醒后维护一下数据,结束即可--数据已经被取走了。
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

2.2.4 非阻塞的channel的使用-select

1.编译时判断存在接收、发送、默认路径,
2.首先查看是否有可以立即执行的case,
3.没有则直接进入default语句,
4.没有default语句则将自己注册再所有的case的channel中,进入休眠

2.2.5 timer计时器

func main(){
    t := time.NewTimer(time.Second)    //生成定时器,t在一秒后向其管道中塞一个值
    <- t.C         //从t中的通道取出数据
    //要等待1s才会有值!!
    //...
}
0

评论

博主关闭了所有页面的评论