1 线程和进程的关系
1.一个进程可以内包含多个线程,线程可被理解为是不占用空间但是占用CPU的。
2.在同一个进程中,线程是共享内存空间的。(线程所占的空间是被编译器分配给这个进程的)。
3.线程的调度需要由系统进行,CPU需要在不同的线程中进行切换。
所以,线程的缺点在于:a.本身占用的资源大,b. 线程的操作开销大,c.线程的切换开销大
1.1go线程的抽象
type m struct {
g0 *g //go运行的第一个协程
morebuf gobuf
divmod uint32
// Fields not known to debuggers.
procid uint64
gsignal *g
goSigStack gsignalStack
sigmask sigset
tls [tlsSlots]uintptr
mstartfn func()
curg *g //现在进行到的协程
caughtsig guintptr
p puintptr
nextp puintptr
oldp puintptr
id int64
mallocing int32
throwing int32
preemptoff string
locks int32
dying int32
profilehz int32
spinning bool
blocked bool
newSigstack bool
printlock int8
incgo bool
freeWait uint32
fastrand uint64
needextram bool
traceback uint8
ncgocall uint64
ncgo int32
cgoCallersUse uint32
cgoCallers *cgoCallers
park note
alllink *m
schedlink muintptr
lockedg guintptr
createstack [32]uintptr
lockedExt uint32
lockedInt uint32
nextwaitm muintptr
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
freelink *m
// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr
libcallsp uintptr
libcallg guintptr
syscall libcall
vdsoSP uintptr
vdsoPC uintptr
// preemptGen counts the number of completed preemption
// signals. This is used to detect when a preemption is
// requested, but fails. Accessed atomically.
preemptGen uint32
// Whether this is a pending preemption signal on this M.
// Accessed atomically.
signalPending uint32
dlogPerM
mOS //及记录每种操作系统的线程信息
// Up to 10 locks held by this m, maintained by the lock ranking code.
locksHeldLen int
locksHeld [10]heldLockInfo
}
2 协程并发的原理
协程是带有执行状态的程序,需要运行的时候,将此协程放入一个线程中传入CPU内计算,再用计算结果更新这个程序,最后将其恢复为协程。(CPU不再切换,而是线程的内容在变)再按照顺序执行下一个协程。
特点:协程复用线程(节省的CPU线程调度的开销)
优点:资源利用的效率高,调度的速度快,拥有超高的并发。
3 协程的本质(源码)
type g struct {
stack stack //stack包含栈的上下限
stackguard0 uintptr
stackguard1 uintptr
_panic *_panic
_defer *_defer
m *m
sched gobuf //包含sp(原始)指针,指向的是此协程的栈用到了哪个位置
//pc指针,程序计数器,表示运行到了哪一行代码
syscallsp uintptr
syscallpc uintptr
stktopsp uintptr
param unsafe.Pointer
atomicstatus uint32 //协程的状态
stackLock uint32
goid int64 //协程的id号
schedlink guintptr
waitsince int64
waitreason waitReason
preempt bool
preemptStop bool
preemptShrink bool
asyncSafePoint bool
paniconfault bool
gcscandone bool
throwsplit bool
activeStackChans bool
parkingOnChan uint8
raceignore int8
sysblocktraced bool
tracking bool
trackingSeq uint8
runnableStamp int64
runnableTime int64
sysexitticks int64
traceseq uint64
tracelastp puintptr
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr
ancestors *[]ancestorInfo
startpc uintptr
racectx uintptr
waiting *sudog
cgoCtxt []uintptr
labels unsafe.Pointer
timer *timer
selectDone uint32
gcAssistBytes int64
}
4 协程在线程上的运行
4.1 线程的运行
4.1.1 运行schedule()
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}
// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
checkTimers(pp, 0)
var gp *g
var inheritTime bool
// Normal goroutines will check for need to wakeP in ready,
// but GCworkers and tracereaders will not, so the check must
// be done here instead.
tryWakeP := false
//拿到可执行的协程
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
if gp != nil {
tryWakeP = true
}
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
wakep()
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
4.1.2跳转到execute()
func execute(gp *g, inheritTime bool) { //gp是即将执行的方法
_g_ := getg()
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
gogo(&gp.sched)
}
4.1.3执行gogo()
gogo()方法是汇编实现的。
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
JMP gogo<>(SB)
TEXT gogo<>(SB), NOSPLIT, $0
get_tls(CX)
MOVQ DX, g(CX)
MOVQ DX, R14 // set the g register
MOVQ gobuf_sp(BX), SP // restore SP //在协程栈里面插入了一个栈帧-goexit()方法
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
//下面两行:跳转线程正在执行的程序计数器
MOVQ gobuf_pc(BX), BX
JMP BX
4.1.4 处理完业务逻辑后退回到插入的栈帧goexit()
goexit()也是由汇编语言实现的。
// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
//
//调用了runtime·goexit1(SB)方法
//...
// Finishes execution of the current goroutine.
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
mcall(goexit0) //会切换栈,回到g0栈空间
}
//使用goexit0方法传参
func goexit0(gp *g) {
_g_ := getg()
_p_ := _g_.m.p.ptr()
casgstatus(gp, _Grunning, _Gdead)
gcController.addScannableStack(_p_, -int64(gp.stack.hi-gp.stack.lo))
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
assistWorkPerByte := gcController.assistWorkPerByte.Load()
scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
dropg()
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_p_, gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
gfput(_p_, gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
schedule() //再次调用schedule方法
}
4.2 G-M-P调度模型
由多个线程同时抓取协程,容易出现锁冲突。通常使用本地队列(获取全局的协程,每次抓取一堆,顺序执行)解决。P即帮助M获取协程,从而减少并发冲突的几率。
4.2.1本地队列实现的源码
type p struct { //本地队列
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)原始指针
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.----队列,不需要加锁
runqhead uint32 //头部的序号
runqtail uint32 //尾部的序号
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
//
// Note that while other P's may atomically CAS this to zero,
// only the owner P can CAS it to a valid G.
runnext guintptr //指向下一个可用的协程
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
sudogcache []*sudog
sudogbuf [128]*sudog
// Cache of mspan objects from the heap.
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, moreso than just managing the length
// ourselves.
len int
buf [128]*mspan
}
tracebuf traceBufPtr
// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept.
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop.
traceSwept, traceReclaimed uintptr
palloc persistentAlloc // per-P to avoid mutex
_ uint32 // Alignment for atomic fields below
// The when field of the first entry on the timer heap.
// This is updated using atomic functions.
// This is 0 if the timer heap is empty.
timer0When uint64
// The earliest known nextwhen field of a timer with
// timerModifiedEarlier status. Because the timer may have been
// modified again, there need not be any timer with this value.
// This is updated using atomic functions.
// This is 0 if there are no timerModifiedEarlier timers.
timerModifiedEarliest uint64
// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
// gcMarkWorkerMode is the mode for the next mark worker to run in.
// That is, this is used to communicate with the worker goroutine
// selected for immediate execution by
// gcController.findRunnableGCWorker. When scheduling other goroutines,
// this field must be set to gcMarkWorkerNotWorker.
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which the most recent
// mark worker started.
gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork
// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// statsSeq is a counter indicating whether this P is currently
// writing any stats. Its value is even when not, odd when it is.
statsSeq uint32
// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer
// Number of timers in P's heap.
// Modified using atomic instructions.
numTimers uint32
// Number of timerDeleted timers in P's heap.
// Modified using atomic instructions.
deletedTimers uint32
// Race context used while executing timer functions.
timerRaceCtx uintptr
// scannableStackSizeDelta accumulates the amount of stack space held by
// live goroutines (i.e. those eligible for stack scanning).
// Flushed to gcController.scannableStackSize once scannableStackSizeSlack
// or -scannableStackSizeSlack is reached.
scannableStackSizeDelta int64
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
// Padding is no longer needed. False sharing is now not a worry because p is large enough
// that its size class is an integer multiple of the cache line size (for any of our architectures).
}
4.2.2 GPM(GMP)模型关系图
M指线程,G指协程,P即本地队列
4.2.3 GPM的调用时机
在schedule()函数中,有一个对gp的判断
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
其执行了runqget()函数,并将本地队列的指针作为参数传入。
逻辑源码阅读
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
next := _p_.runnext //runnext刚好指向下一个可执行的协程
// If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
// because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
// Hence, there's no need to retry this CAS if it falls.
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true //取出下一个协程的地址并返回
}
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
在读取本地队列后,并没有发现可调用的协程时,gp仍然时nil,此时会执行
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
调用了findrunnable()
方法
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
//...获取本地的队列
//本地无队列的时候
// global runq -在全局获取协程
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0) //在全局的队列协程中拿一批协程
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
//无协程可获取
gp, inheritTime, tnow, w, newWork := stealWork(now) //任务窃取
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
//...
}
4.2.4 总结
golang通过逻辑队列的形式减少了锁的并发(抢夺全局队列中的锁)
4.3 协程的并发
4.3.1 协程的饥饿问题
若正在执行的协程的运行时间特别长,那么本地消息队列中的其他协程会一直处于等待状态,即协程饥饿问题。
解决方式:
保存现场然后再执行别的业务。然后回到线程执行的schedule()函数,即回到开头。
schedule()对应模块源码分析
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
//执行队列循环到达61次时,在全局队列中获取一个协程
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1) //只拿一个
unlock(&sched.lock)
}
}
如何触发切换
方法:1.主动挂起(runtime.gopark())
即业务主动调用gopark()函数,在程序员使用一些函数的时候会出现。
// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.
//
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
//
// Note that because unlockf is called after putting the G into a waiting
// state, the G may have already been readied by the time unlockf is called
// unless there is external synchronization preventing the G from being
// readied. If unlockf returns false, it must guarantee that the G cannot be
// externally readied.
//
// Reason explains why the goroutine has been parked. It is displayed in stack
// traces and heap dumps. Reasons should be unique and descriptive. Do not
// re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
//维护协程和线程的状态
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m) //mcall()调用的方法表示即将切换栈!
//park_m即状态维护并且调用了schedule()函数
}
注意:gopark无法在包外使用,但是可以用time.Sleep()函数(底层也是gopark())。
2.系统完成调用时
即系统指令调用(系统底层),如检测网络等行为......
调用的关系
exitsyscall()->Gosched()->mcall()->goschedguarded_m()->goschedImpl(gp)->schedule()
4.3.2 抢占式调度
如果协程耗时很长,并且永远不主动挂起也不做系统调用时,需要抢占式调用调节。发现morestack()函数经常被协程调用(在某一个方法内需要调用另一个方法的时候,系统会先调用morestack()方法),可在此设置。
关于morestack()函数
本意是检测协程栈是否有足够的空间。而在方法内调用调用方法时,编译器会强制插入morestack()。
标记抢占
系统监控器检测到协程运行时间超过10ms时将此g中的stackguard0字段设置为0xfffffade(栈抢占的标记常量)。
抢占操作
执行morestack()函数的时候判断协程是否已经被抢占,如果是,则直接回到schedule()函数。
//morestack()是汇编方法,其中主要是处理栈空间不足的逻辑,最后调用runtime·newstack(SB)
//此函数中除了生成新栈的逻辑外有一句检测语句
func newstack(){
//...
preempt := stackguard0 == stackPreempt //检测stackguard0字段是否被标记为栈抢占
//...
if preempt {
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
if gp.preemptShrink {
// We're at a synchronous safe point now, so
// do the pending stack shrink.
gp.preemptShrink = false
shrinkstack(gp)
}
if gp.preemptStop {
preemptPark(gp) // never returns
}
// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return //实行抢占--最后再次回到schedule()
}
}
补充:基于信号的抢占式调度
使用场景:协程内没有方法的调用,即不调用morestack()函数。例如:
package main
func do(){
i := 0
for{
i++
}
}
func main(){
go do()
}
因为操作系统中有许多基于信号的底层通信方式,而线程可用注册对应信号(SIGPIPE, SIGURG,SIGHUP)的处理函数,此调度则直接注册SIGURG(紧急)信号的处理函数。然后再GC工作的时候(此时很多工作都暂停,适合抢占)向目标线程 发送信号,触发调度。
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
//asyncPreempt()汇编方法调用另一个信号抢占的方法,调用mcall,最后回到schedule()方法
5 解决协程启动数量过多的问题
1 使用channel缓冲
使用channel限制协程的运行数量
func main(){
ch := make(chan struct{}, 3000)
for i:= 0; i < math.MaxInt32; i++ {
ch <- struct{}{}
go func(i int, ch chan struct{}){
fmt.Println(i)
time.Sleep(time.Second)
<- ch
}(i, ch)
}
}
2 协程池(tunny)--不推荐
将GMP模型中的全局锁覆盖的协程转变为自己的协程池;
但是:Golang的线程相关已经是一种池化了,二次池化会增加复杂度,而GO语言希望的是协程即用即毁,这样有悖于初衷。
评论