标签搜索

golang 协程

BeforeIeave
2022-07-27 / 0 评论 / 178 阅读 / 正在检测是否收录...

1 线程和进程的关系

1.一个进程可以内包含多个线程,线程可被理解为是不占用空间但是占用CPU的。
2.在同一个进程中线程共享内存空间的。(线程所占的空间是被编译器分配给这个进程的)。
3.线程调度需要由系统进行,CPU需要在不同的线程中进行切换。
所以,线程的缺点在于:a.本身占用的资源大,b. 线程的操作开销大,c.线程的切换开销大

1.1go线程的抽象

type m struct {
    g0      *g     //go运行的第一个协程
    morebuf gobuf
    divmod  uint32

    // Fields not known to debuggers.
    procid        uint64
    gsignal       *g 
    goSigStack    gsignalStack
    sigmask       sigset
    tls           [tlsSlots]uintptr
    mstartfn      func()
    curg          *g        //现在进行到的协程
    caughtsig     guintptr
    p             puintptr
    nextp         puintptr
    oldp          puintptr
    id            int64
    mallocing     int32
    throwing      int32
    preemptoff    string 
    locks         int32
    dying         int32
    profilehz     int32
    spinning      bool
    blocked       bool
    newSigstack   bool
    printlock     int8
    incgo         bool
    freeWait      uint32
    fastrand      uint64
    needextram    bool
    traceback     uint8
    ncgocall      uint64
    ncgo          int32
    cgoCallersUse uint32
    cgoCallers    *cgoCallers
    park          note
    alllink       *m
    schedlink     muintptr
    lockedg       guintptr
    createstack   [32]uintptr
    lockedExt     uint32
    lockedInt     uint32
    nextwaitm     muintptr
    waitunlockf   func(*g, unsafe.Pointer) bool
    waitlock      unsafe.Pointer
    waittraceev   byte
    waittraceskip int
    startingtrace bool
    syscalltick   uint32
    freelink      *m 

    // these are here because they are too large to be on the stack
    // of low-level NOSPLIT functions.
    libcall   libcall
    libcallpc uintptr
    libcallsp uintptr
    libcallg  guintptr
    syscall   libcall

    vdsoSP uintptr
    vdsoPC uintptr

    // preemptGen counts the number of completed preemption
    // signals. This is used to detect when a preemption is
    // requested, but fails. Accessed atomically.
    preemptGen uint32

    // Whether this is a pending preemption signal on this M.
    // Accessed atomically.
    signalPending uint32

    dlogPerM

    mOS            //及记录每种操作系统的线程信息

    // Up to 10 locks held by this m, maintained by the lock ranking code.
    locksHeldLen int
    locksHeld    [10]heldLockInfo
}

线程的抽象.png

2 协程并发的原理

协程是带有执行状态的程序,需要运行的时候,将此协程放入一个线程中传入CPU内计算,再用计算结果更新这个程序,最后将其恢复为协程。(CPU不再切换,而是线程的内容在变)再按照顺序执行下一个协程。
特点:协程复用线程(节省的CPU线程调度的开销)
优点:资源利用的效率高,调度的速度快,拥有超高的并发。

3 协程的本质(源码)

type g struct {
    stack       stack           //stack包含栈的上下限
    stackguard0 uintptr 
    stackguard1 uintptr 

    _panic    *_panic 
    _defer    *_defer 
    m         *m 
    sched     gobuf                //包含sp(原始)指针,指向的是此协程的栈用到了哪个位置
                                //pc指针,程序计数器,表示运行到了哪一行代码
    syscallsp uintptr 
    syscallpc uintptr 
    stktopsp  uintptr 
    
    param        unsafe.Pointer
    atomicstatus uint32            //协程的状态
    stackLock    uint32 
    goid         int64            //协程的id号
    schedlink    guintptr
    waitsince    int64 
    waitreason   waitReason

    preempt       bool
    preemptStop   bool
    preemptShrink bool

    
    asyncSafePoint bool

    paniconfault bool 
    gcscandone   bool
    throwsplit   bool
    
    activeStackChans bool
    
    parkingOnChan uint8

    raceignore     int8
    sysblocktraced bool 
    tracking       bool 
    trackingSeq    uint8 
    runnableStamp  int64
    runnableTime   int64
    sysexitticks   int64 
    traceseq       uint64
    tracelastp     puintptr
    lockedm        muintptr
    sig            uint32
    writebuf       []byte
    sigcode0       uintptr
    sigcode1       uintptr
    sigpc          uintptr
    gopc           uintptr
    ancestors      *[]ancestorInfo
    startpc        uintptr
    racectx        uintptr
    waiting        *sudog
    cgoCtxt        []uintptr
    labels         unsafe.Pointer 
    timer          *timer
    selectDone     uint32
    gcAssistBytes int64
}

协程的结构.png

4 协程在线程上的运行

4.1 线程的运行

单线程的循环.png

4.1.1 运行schedule()

func schedule() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }

    if _g_.m.lockedg != 0 {
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {
        throw("schedule: in cgo")
    }

top:
    pp := _g_.m.p.ptr()
    pp.preempt = false

    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    if pp.runSafePointFn != 0 {
        runSafePointFn()
    }

    // Sanity check: if we are spinning, the run queue should be empty.
    // Check this before calling checkTimers, as that might call
    // goready to put a ready goroutine on the local run queue.
    if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
        throw("schedule: spinning with local work")
    }

    checkTimers(pp, 0)

    var gp *g
    var inheritTime bool

    // Normal goroutines will check for need to wakeP in ready,
    // but GCworkers and tracereaders will not, so the check must
    // be done here instead.
    tryWakeP := false
    //拿到可执行的协程
    if trace.enabled || trace.shutdown {
        gp = traceReader()
        if gp != nil {
            casgstatus(gp, _Gwaiting, _Grunnable)
            traceGoUnpark(gp, 0)
            tryWakeP = true
        }
    }
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        if gp != nil {
            tryWakeP = true
        }
    }
    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {
        resetspinning()
    }

    if sched.disable.user && !schedEnabled(gp) {
        // Scheduling of this goroutine is disabled. Put it on
        // the list of pending runnable goroutines for when we
        // re-enable user scheduling and look again.
        lock(&sched.lock)
        if schedEnabled(gp) {
            // Something re-enabled scheduling while we
            // were acquiring the lock.
            unlock(&sched.lock)
        } else {
            sched.disable.runnable.pushBack(gp)
            sched.disable.n++
            unlock(&sched.lock)
            goto top
        }
    }

    // If about to schedule a not-normal goroutine (a GCworker or tracereader),
    // wake a P if there is one.
    if tryWakeP {
        wakep()
    }
    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        startlockedm(gp)
        goto top
    }

    execute(gp, inheritTime)
}

4.1.2跳转到execute()

func execute(gp *g, inheritTime bool) {            //gp是即将执行的方法
    _g_ := getg()

    // Assign gp.m before entering _Grunning so running Gs have an
    // M.
    _g_.m.curg = gp
    gp.m = _g_.m
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }

    // Check whether the profiler needs to be turned on or off.
    hz := sched.profilehz
    if _g_.m.profilehz != hz {
        setThreadCPUProfiler(hz)
    }

    if trace.enabled {
        // GoSysExit has to happen when we have a P, but before GoStart.
        // So we emit it here.
        if gp.syscallsp != 0 && gp.sysblocktraced {
            traceGoSysExit(gp.sysexitticks)
        }
        traceGoStart()
    }

    gogo(&gp.sched)
}

4.1.3执行gogo()

gogo()方法是汇编实现的。

// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
    MOVQ    buf+0(FP), BX        // gobuf
    MOVQ    gobuf_g(BX), DX
    MOVQ    0(DX), CX        // make sure g != nil
    JMP    gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
    get_tls(CX)
    MOVQ    DX, g(CX)
    MOVQ    DX, R14        // set the g register
    MOVQ    gobuf_sp(BX), SP    // restore SP //在协程栈里面插入了一个栈帧-goexit()方法
    MOVQ    gobuf_ret(BX), AX
    MOVQ    gobuf_ctxt(BX), DX
    MOVQ    gobuf_bp(BX), BP
    MOVQ    $0, gobuf_sp(BX)    // clear to help garbage collector
    MOVQ    $0, gobuf_ret(BX)
    MOVQ    $0, gobuf_ctxt(BX)
    MOVQ    $0, gobuf_bp(BX)
//下面两行:跳转线程正在执行的程序计数器
    MOVQ    gobuf_pc(BX), BX
    JMP    BX

4.1.4 处理完业务逻辑后退回到插入的栈帧goexit()

goexit()也是由汇编语言实现的。

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
    BYTE    $0x90    // NOP
    CALL    runtime·goexit1(SB)    // does not return
    // traceback from goexit1 must hit code range of goexit
    BYTE    $0x90    // NOP

//
//调用了runtime·goexit1(SB)方法
//...
// Finishes execution of the current goroutine.
func goexit1() {
    if raceenabled {
        racegoend()
    }
    if trace.enabled {
        traceGoEnd()
    }
    mcall(goexit0)            //会切换栈,回到g0栈空间
}

//使用goexit0方法传参
func goexit0(gp *g) {
    _g_ := getg()
    _p_ := _g_.m.p.ptr()

    casgstatus(gp, _Grunning, _Gdead)
    gcController.addScannableStack(_p_, -int64(gp.stack.hi-gp.stack.lo))
    if isSystemGoroutine(gp, false) {
        atomic.Xadd(&sched.ngsys, -1)
    }
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.preemptStop = false
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
        // Flush assist credit to the global pool. This gives
        // better information to pacing if the application is
        // rapidly creating an exiting goroutines.
        assistWorkPerByte := gcController.assistWorkPerByte.Load()
        scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
        atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
        gp.gcAssistBytes = 0
    }

    dropg()

    if GOARCH == "wasm" { // no threads yet on wasm
        gfput(_p_, gp)
        schedule() // never returns
    }

    if _g_.m.lockedInt != 0 {
        print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
        throw("internal lockOSThread error")
    }
    gfput(_p_, gp)
    if locked {
        // The goroutine may have locked this thread because
        // it put it in an unusual kernel state. Kill it
        // rather than returning it to the thread pool.

        // Return to mstart, which will release the P and exit
        // the thread.
        if GOOS != "plan9" { // See golang.org/issue/22227.
            gogo(&_g_.m.g0.sched)
        } else {
            // Clear lockedExt on plan9 since we may end up re-using
            // this thread.
            _g_.m.lockedExt = 0
        }
    }
    schedule()                //再次调用schedule方法
}

4.2 G-M-P调度模型

由多个线程同时抓取协程,容易出现锁冲突。通常使用本地队列(获取全局的协程,每次抓取一堆,顺序执行)解决。P即帮助M获取协程,从而减少并发冲突的几率。

4.2.1本地队列实现的源码

type p struct {                //本地队列
    id          int32
    status      uint32 // one of pidle/prunning/...
    link        puintptr
    schedtick   uint32     // incremented on every scheduler call
    syscalltick uint32     // incremented on every system call
    sysmontick  sysmontick // last tick observed by sysmon
    m           muintptr   // back-link to associated m (nil if idle)原始指针
    mcache      *mcache
    pcache      pageCache
    raceprocctx uintptr

    deferpool    []*_defer // pool of available defer structs (see panic.go)
    deferpoolbuf [32]*_defer

    // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
    goidcache    uint64
    goidcacheend uint64

    // Queue of runnable goroutines. Accessed without lock.----队列,不需要加锁
    runqhead uint32            //头部的序号
    runqtail uint32            //尾部的序号
    runq     [256]guintptr
    // runnext, if non-nil, is a runnable G that was ready'd by
    // the current G and should be run next instead of what's in
    // runq if there's time remaining in the running G's time
    // slice. It will inherit the time left in the current time
    // slice. If a set of goroutines is locked in a
    // communicate-and-wait pattern, this schedules that set as a
    // unit and eliminates the (potentially large) scheduling
    // latency that otherwise arises from adding the ready'd
    // goroutines to the end of the run queue.
    //
    // Note that while other P's may atomically CAS this to zero,
    // only the owner P can CAS it to a valid G.
    runnext guintptr                //指向下一个可用的协程

    // Available G's (status == Gdead)
    gFree struct {
        gList
        n int32
    }

    sudogcache []*sudog
    sudogbuf   [128]*sudog

    // Cache of mspan objects from the heap.
    mspancache struct {
        // We need an explicit length here because this field is used
        // in allocation codepaths where write barriers are not allowed,
        // and eliminating the write barrier/keeping it eliminated from
        // slice updates is tricky, moreso than just managing the length
        // ourselves.
        len int
        buf [128]*mspan
    }

    tracebuf traceBufPtr

    // traceSweep indicates the sweep events should be traced.
    // This is used to defer the sweep start event until a span
    // has actually been swept.
    traceSweep bool
    // traceSwept and traceReclaimed track the number of bytes
    // swept and reclaimed by sweeping in the current sweep loop.
    traceSwept, traceReclaimed uintptr

    palloc persistentAlloc // per-P to avoid mutex

    _ uint32 // Alignment for atomic fields below

    // The when field of the first entry on the timer heap.
    // This is updated using atomic functions.
    // This is 0 if the timer heap is empty.
    timer0When uint64

    // The earliest known nextwhen field of a timer with
    // timerModifiedEarlier status. Because the timer may have been
    // modified again, there need not be any timer with this value.
    // This is updated using atomic functions.
    // This is 0 if there are no timerModifiedEarlier timers.
    timerModifiedEarliest uint64

    // Per-P GC state
    gcAssistTime         int64 // Nanoseconds in assistAlloc
    gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)

    // gcMarkWorkerMode is the mode for the next mark worker to run in.
    // That is, this is used to communicate with the worker goroutine
    // selected for immediate execution by
    // gcController.findRunnableGCWorker. When scheduling other goroutines,
    // this field must be set to gcMarkWorkerNotWorker.
    gcMarkWorkerMode gcMarkWorkerMode
    // gcMarkWorkerStartTime is the nanotime() at which the most recent
    // mark worker started.
    gcMarkWorkerStartTime int64

    // gcw is this P's GC work buffer cache. The work buffer is
    // filled by write barriers, drained by mutator assists, and
    // disposed on certain GC state transitions.
    gcw gcWork

    // wbBuf is this P's GC write barrier buffer.
    //
    // TODO: Consider caching this in the running G.
    wbBuf wbBuf

    runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

    // statsSeq is a counter indicating whether this P is currently
    // writing any stats. Its value is even when not, odd when it is.
    statsSeq uint32

    // Lock for timers. We normally access the timers while running
    // on this P, but the scheduler can also do it from a different P.
    timersLock mutex

    // Actions to take at some time. This is used to implement the
    // standard library's time package.
    // Must hold timersLock to access.
    timers []*timer

    // Number of timers in P's heap.
    // Modified using atomic instructions.
    numTimers uint32

    // Number of timerDeleted timers in P's heap.
    // Modified using atomic instructions.
    deletedTimers uint32

    // Race context used while executing timer functions.
    timerRaceCtx uintptr

    // scannableStackSizeDelta accumulates the amount of stack space held by
    // live goroutines (i.e. those eligible for stack scanning).
    // Flushed to gcController.scannableStackSize once scannableStackSizeSlack
    // or -scannableStackSizeSlack is reached.
    scannableStackSizeDelta int64

    // preempt is set to indicate that this P should be enter the
    // scheduler ASAP (regardless of what G is running on it).
    preempt bool

    // Padding is no longer needed. False sharing is now not a worry because p is large enough
    // that its size class is an integer multiple of the cache line size (for any of our architectures).
}

P结构体.png

4.2.2 GPM(GMP)模型关系图

M指线程,G指协程,P即本地队列
GMP模型.png

4.2.3 GPM的调用时机

schedule()函数中,有一个对gp的判断

if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }

其执行了runqget()函数,并将本地队列的指针作为参数传入。

逻辑源码阅读

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    next := _p_.runnext                //runnext刚好指向下一个可执行的协程
    // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
    // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
    // Hence, there's no need to retry this CAS if it falls.
    if next != 0 && _p_.runnext.cas(next, 0) {
        return next.ptr(), true        //取出下一个协程的地址并返回
    }

    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

在读取本地队列后,并没有发现可调用的协程时,gp仍然时nil,此时会执行

if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }

调用了findrunnable()方法

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    //...获取本地的队列
    //本地无队列的时候
    // global runq        -在全局获取协程
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)    //在全局的队列协程中拿一批协程
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // Spinning Ms: steal work from other Ps.
    //
    // Limit the number of spinning Ms to half the number of busy Ps.
    // This is necessary to prevent excessive CPU consumption when
    // GOMAXPROCS>>1 but the program parallelism is low.
    procs := uint32(gomaxprocs)
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        if !_g_.m.spinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
        }

        //无协程可获取
        gp, inheritTime, tnow, w, newWork := stealWork(now)        //任务窃取
        now = tnow
        if gp != nil {
            // Successfully stole.
            return gp, inheritTime
        }
        if newWork {
            // There may be new timer or GC work; restart to
            // discover.
            goto top
        }
        if w != 0 && (pollUntil == 0 || w < pollUntil) {
            // Earlier timer to wait for.
            pollUntil = w
        }
    }

    //...
}

4.2.4 总结

golang通过逻辑队列的形式减少了锁的并发(抢夺全局队列中的锁)

4.3 协程的并发

4.3.1 协程的饥饿问题

正在执行的协程的运行时间特别长,那么本地消息队列中的其他协程会一直处于等待状态,即协程饥饿问题。

解决方式:

保存现场然后再执行别的业务。然后回到线程执行的schedule()函数,即回到开头。
线程循环触发切换1.png

当然,全局队列饥饿问题的解决同样是切换执行的协程

schedule()对应模块源码分析

if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        //执行队列循环到达61次时,在全局队列中获取一个协程
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)        //只拿一个
            unlock(&sched.lock)
        }
    }

如何触发切换

方法:1.主动挂起runtime.gopark()
主动挂起.png

即业务主动调用gopark()函数,在程序员使用一些函数的时候会出现

// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.
//
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
//
// Note that because unlockf is called after putting the G into a waiting
// state, the G may have already been readied by the time unlockf is called
// unless there is external synchronization preventing the G from being
// readied. If unlockf returns false, it must guarantee that the G cannot be
// externally readied.
//
// Reason explains why the goroutine has been parked. It is displayed in stack
// traces and heap dumps. Reasons should be unique and descriptive. Do not
// re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    //维护协程和线程的状态
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)            //mcall()调用的方法表示即将切换栈!
    //park_m即状态维护并且调用了schedule()函数
}

注意:gopark无法在包外使用,但是可以用time.Sleep()函数(底层也是gopark())。
2.系统完成调用时
系统调用完成时.png

系统指令调用(系统底层),如检测网络等行为......
调用的关系
exitsyscall()->Gosched()->mcall()->goschedguarded_m()->goschedImpl(gp)->schedule()

4.3.2 抢占式调度

如果协程耗时很长,并且永远不主动挂起也不做系统调用时,需要抢占式调用调节。发现morestack()函数经常被协程调用(在某一个方法内需要调用另一个方法的时候,系统会先调用morestack()方法),可在此设置。

关于morestack()函数

本意是检测协程栈是否有足够的空间。而在方法内调用调用方法时,编译器会强制插入morestack()。

标记抢占

系统监控器检测到协程运行时间超过10ms时将此g中的stackguard0字段设置为0xfffffade(栈抢占的标记常量)。

抢占操作

执行morestack()函数的时候判断协程是否已经被抢占,如果,则直接回到schedule()函数。

//morestack()是汇编方法,其中主要是处理栈空间不足的逻辑,最后调用runtime·newstack(SB)
//此函数中除了生成新栈的逻辑外有一句检测语句
func newstack(){
//...
    preempt := stackguard0 == stackPreempt        //检测stackguard0字段是否被标记为栈抢占
//...
    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }

        if gp.preemptShrink {
            // We're at a synchronous safe point now, so
            // do the pending stack shrink.
            gp.preemptShrink = false
            shrinkstack(gp)
        }

        if gp.preemptStop {
            preemptPark(gp) // never returns
        }

        // Act like goroutine called runtime.Gosched.
        gopreempt_m(gp) // never return            //实行抢占--最后再次回到schedule()
    }

}

基于协作的抢占式调度.png

补充:基于信号的抢占式调度

使用场景:协程内没有方法的调用,即不调用morestack()函数。例如:

package main

func do(){
    i := 0
    for{
        i++
    }
}
func main(){
    go do()
}

因为操作系统中有许多基于信号的底层通信方式,而线程可用注册对应信号(SIGPIPE, SIGURG,SIGHUP)的处理函数,此调度则直接注册SIGURG(紧急)信号的处理函数。然后再GC工作的时候(此时很多工作都暂停,适合抢占)向目标线程 发送信号,触发调度。

基于信号的抢占式调度.png

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.
            ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)

    if GOOS == "darwin" || GOOS == "ios" {
        atomic.Xadd(&pendingPreemptSignals, -1)
    }
}
//asyncPreempt()汇编方法调用另一个信号抢占的方法,调用mcall,最后回到schedule()方法

5 解决协程启动数量过多的问题

除了优化业务逻辑优化硬件设备之外还有两个方法如下:

1 使用channel缓冲

使用channel限制协程的运行数量

func main(){
    ch := make(chan struct{}, 3000)
    for i:= 0; i < math.MaxInt32; i++ {
        ch <- struct{}{}
        go func(i int, ch chan struct{}){
            fmt.Println(i)
            time.Sleep(time.Second)
            <- ch
        }(i, ch)
    }
  
}

2 协程池(tunny)--不推荐

将GMP模型中的全局锁覆盖的协程转变为自己的协程池;
但是:Golang的线程相关已经是一种池化了,二次池化会增加复杂度,而GO语言希望的是协程即用即毁,这样有悖于初衷。

0

评论

博主关闭了所有页面的评论