首页
友情链接
统计
推荐
GitHub
Search
1
蚁群算法学习
406 阅读
2
golang 锁
333 阅读
3
Go 栈和堆
327 阅读
4
GO语言基础plus
311 阅读
5
golang Map
255 阅读
GO
源码阅读
基础
Python
登录
Search
标签搜索
慕课笔记
BeforeIeave
累计撰写
9
篇文章
累计收到
2
条评论
首页
栏目
GO
源码阅读
基础
Python
页面
友情链接
统计
推荐
GitHub
搜索到
6
篇与
的结果
2022-08-05
Go 栈和堆
1 协程栈1.1 协程栈的作用协程栈表现出了协程的执行路径,在方法内部声明使用的变量也会储存在栈内存,还包括函数的传参以及返回值。1.2 协程栈的位置Go协程栈位于Go的堆内存上。而Go的堆内存位于操作系统的虚拟内存上。1.3 栈空间不足的解决1.3.1 内存逃逸--解决变量太大当栈内存不足的时候,会引发内存逃逸,而如果某栈帧回收后,发现有需要继续使用的变量,或者是发现过大的变量的时候,也会引发内存逃逸。指针逃逸触发条件:函数返回了对象的指针。空接口逃逸触发条件:如果函数的参数类型是 interface{} 那么函数的实参很可能会逃逸。_例如fmt.Println()_因为空接口类型函数通常会使用反射。大变量逃逸触发原因:过大的变量会导致占空间不足(Go栈的初始空间只有2-4k)1.3.2 栈扩容--解决栈帧过多Go的栈空间也是从堆上申请的,而函数被调用前会使用morestack()函数(由汇编实现)判断是否需要进行栈扩容。方式:1.13版本之前使用分段栈,后使用连续栈。分段栈发现协程的栈空间不足的时候,在栈中接着开辟一个新空间,虽然节省空间,但会导致栈指针会不停在两块区域之间跳转。连续栈直接将原本区域的数据复制到新的栈空间中(内存变为原来的两倍),这样新的栈空间内存是连续的。(当空间使用率不足1/4的时候,内存空间会缩小为原来的1/2)。2 堆内存结构其实,go的栈内存用的也是堆内存。2.1 操作系统的虚拟内存指的是操作系统给应用提供的虚拟内存空间。操作系统不允许进程直接读写物理内存。虚拟内存和物理内存的关系是由操作系统管理的。2.2 heapArenaGo每次申请的堆的内存单元(heapArena)为64MB,而一个系统最多拥有2^20个内存单元。至此,所有的内存单元组成了mheap(Go的堆内存)。type heapArena struct { //描述了一个64M的内存单元 bitmap [heapArenaBitmapBytes]byte spans [pagesPerArena]*mspan pageInUse [pagesPerArena / 8]uint8 pageMarks [pagesPerArena / 8]uint8 pageSpecials [pagesPerArena / 8]uint8 checkmarks *checkmarksMap zeroedBase uintptr } type mheap struct { //描述了整个堆内存 lock mutex pages pageAlloc sweepgen uint32 allspans []*mspan // all spans out there pagesInUse atomic.Uint64 pagesSwept atomic.Uint64 pagesSweptBasis atomic.Uint64 sweepHeapLiveBasis uint64 sweepPagesPerByte float64 scavengeGoal uint64 reclaimIndex atomic.Uint64 reclaimCredit atomic.Uintptr arenas [1 << arenaL1Bits]*[1 << arenaL2Bits]*heapArena //记录所有的heapArena heapArenaAlloc linearAlloc arenaHints *arenaHint arena linearAlloc allArenas []arenaIdx sweepArenas []arenaIdx markArenas []arenaIdx curArena struct { base, end uintptr } _ uint32 // ensure 64-bit alignment of central central [numSpanClasses]struct { //68<<1 mcentral mcentral //索引 pad [cpu.CacheLinePadSize - unsafe.Sizeof(mcentral{})%cpu.CacheLinePadSize]byte } spanalloc fixalloc // allocator for span* cachealloc fixalloc // allocator for mcache* specialfinalizeralloc fixalloc // allocator for specialfinalizer* specialprofilealloc fixalloc // allocator for specialprofile* specialReachableAlloc fixalloc // allocator for specialReachable speciallock mutex // lock for special record allocators. arenaHintAlloc fixalloc // allocator for arenaHints unused *specialfinalizer // never set, just here to force the specialfinalizer type into DWARF } 2.2.1 heapArena的使用线性分配直接一个接着一个向后增加,线性排布。若前面有对象被回收也不管,直到64M排布完全,再次遍历,寻找可使用的空缺。链表分配使用内存的时候,将空缺的内存块记录,并使用类似链表的方式储存记录空缺区域地址,有新的对象需要储存的时候,先遍历链表,将空缺补满。正解:分级分配分级分配只是一种思想。将内存区域切分为不同的小的内存块,每个内存块按照级别分配相应的内存(由小到大),将所有的对象放在能储存它的最小的块中。2.2.2 mspan内存管理单元mspan表示N个内存大小相同的格子。Go中一共有68(0-67)种内存管理单元。而每一个内存单元都是现场按需切分的,不是一授权就全部切好。//go:notinheap type mspan struct { next *mspan // next span in list, or nil if none prev *mspan // previous span in list, or nil if none list *mSpanList // For debugging. TODO: Remove. startAddr uintptr // address of first byte of span aka s.base() npages uintptr // number of pages in span manualFreeList gclinkptr // list of free objects in mSpanManual spans freeindex uintptr nelems uintptr // number of object in the span. allocCache uint64 allocBits *gcBits gcmarkBits *gcBits sweepgen uint32 divMul uint32 // for divide by elemsize allocCount uint16 // number of allocated objects spanclass spanClass // size class and noscan (uint8) state mSpanStateBox // mSpanInUse etc; accessed atomically (get/set methods) needzero uint8 // needs to be zeroed before allocation elemsize uintptr // computed from sizeclass or from npages limit uintptr // end of data in span speciallock mutex // guards specials list specials *special // linked list of special records sorted by offset. } 2.2.3 mcentral中心索引因为mspan的分布毫无规律,所以设置中心索引,引导内存分配。 其中一共有136个mcentral结构体,分为68个需要GC扫描的组和68个不需要GC扫描的组。其实本质就是空置内存空间的链表头。type mcentral struct { spanclass spanClass //同级别的span partial [2]spanSet // list of spans with a free object full [2]spanSet // list of spans with no free objects } 2.2.4 mcache 线程缓存因为mcentral是可读可写的,所以在高并发场景中,存在严重的高并发的锁冲突的问题:所以 为每一个线程创建一个线程缓存mcache。每一个线程P拥有一个mcache,而每一个mcache拥有136个mspan(68需要GC,68不需要,且每种一个)。type mcache struct { nextSample uintptr // trigger heap sample after allocating this many bytes scanAlloc uintptr // bytes of scannable heap allocated tiny uintptr tinyoffset uintptr tinyAllocs uintptr alloc [numSpanClasses]*mspan // spans to allocate from, indexed by spanClass stackcache [_NumStackOrders]stackfreelist flushGen uint32 }2.3 结构构成图3 内存的分配3.1 对象分级一共被分为三级:1 Tiny 微对象(0,16B) 无指针2 Small 小对象[16B, 32KB] 3 Large 大对象(32KB,正无穷)其中微,小对象分配至普通的mspan(指1-67号mspan)。而大对象需要量身定制,分配到0级mspan中。3.2 微对象分配即从mcache中拿到本地(二级)mspan,将多个微对象合并成一个16Byte存入。func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer { //一些错误判断 delayedZeroing := false if size <= maxSmallSize { if noscan && size < maxTinySize { //微对象逻辑 off := c.tinyoffset if size&7 == 0 { off = alignUp(off, 8) } else if goarch.PtrSize == 4 && size == 12 { off = alignUp(off, 8) } else if size&3 == 0 { off = alignUp(off, 4) } else if size&1 == 0 { off = alignUp(off, 2) } if off+size <= maxTinySize && c.tiny != 0 { // The object fits into existing tiny block. x = unsafe.Pointer(c.tiny + off) c.tinyoffset = off + size c.tinyAllocs++ mp.mallocing = 0 releasem(mp) return x } span = c.alloc[tinySpanClass] //获取二级span即16个字节 v := nextFreeFast(span) if v == 0 { v, span, shouldhelpgc = c.nextFree(tinySpanClass) //找到地址v } x = unsafe.Pointer(v) (*[2]uint64)(x)[0] = 0 (*[2]uint64)(x)[1] = 0 if !raceenabled && (size < c.tinyoffset || c.tiny == 0) { // Note: disabled when race detector is on, see comment near end of this function. c.tiny = uintptr(x) c.tinyoffset = size } size = maxTinySize } else { //...小对象 } } else { //大对象 } return x }3.3 小对象的分配func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer { //一些错误判断 delayedZeroing := false if size <= maxSmallSize { if noscan && size < maxTinySize { //微小对象处理逻辑 } else { var sizeclass uint8 //计算需要span的级别--查表 if size <= smallSizeMax-8 { sizeclass = size_to_class8[divRoundUp(size, smallSizeDiv)] } else { sizeclass = size_to_class128[divRoundUp(size-smallSizeMax, largeSizeDiv)] } size = uintptr(class_to_size[sizeclass]) spc := makeSpanClass(sizeclass, noscan) span = c.alloc[spc] //c即前面获取的本地span缓存--获取正确级别的span v := nextFreeFast(span) if v == 0 { v, span, shouldhelpgc = c.nextFree(spc) //mcache替换 } x = unsafe.Pointer(v) if needzero && span.needzero != 0 { memclrNoHeapPointers(unsafe.Pointer(v), size) } } } else { //大对象处理逻辑 } return x }3.4 大对象的分配func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer { //一些错误判断 if size <= maxSmallSize { //微,小对象的处理逻辑 } else { shouldhelpgc = true span = c.allocLarge(size, noscan) span.freeindex = 1 span.allocCount = 1 size = span.elemsize x = unsafe.Pointer(span.base()) if needzero && span.needzero != 0 { if noscan { delayedZeroing = true } else { memclrNoHeapPointers(x, size) } } } var scanSize uintptr if !noscan { heapBitsSetType(uintptr(x), size, dataSize, typ) if dataSize > typ.size { if typ.ptrdata != 0 { scanSize = dataSize - typ.size + typ.ptrdata } } else { scanSize = typ.ptrdata } c.scanAlloc += scanSize } publicationBarrier() if gcphase != _GCoff { gcmarknewobject(span, uintptr(x), size, scanSize) } if raceenabled { racemalloc(x, size) } if msanenabled { msanmalloc(x, size) } if asanenabled { rzBeg := unsafe.Add(x, userSize) asanpoison(rzBeg, size-userSize) asanunpoison(x, userSize) } if rate := MemProfileRate; rate > 0 { // Note cache c only valid while m acquired; see #47302 if rate != 1 && size < c.nextSample { c.nextSample -= size } else { profilealloc(mp, x, size) } } mp.mallocing = 0 releasem(mp) if delayedZeroing { if !noscan { throw("delayed zeroing on data that may contain pointers") } memclrNoHeapPointersChunked(size, x) // This is a possible preemption point: see #47302 } if debug.malloc { if debug.allocfreetrace != 0 { tracealloc(x, size, typ) } if inittrace.active && inittrace.id == getg().goid { inittrace.bytes += uint64(size) } } if assistG != nil { assistG.gcAssistBytes -= int64(size - dataSize) } if shouldhelpgc { if t := (gcTrigger{kind: gcTriggerHeap}); t.test() { gcStart(t) } } if raceenabled && noscan && dataSize < maxTinySize { x = add(x, size-dataSize) } return x }调用函数allocLarge()直接在heapArena中开辟新的mspan。func (c *mcache) allocLarge(size uintptr, noscan bool) *mspan { if size+_PageSize < size { throw("out of memory") } npages := size >> _PageShift if size&_PageMask != 0 { npages++ } deductSweepCredit(npages*_PageSize, npages) spc := makeSpanClass(0, noscan) //创建0级的span s := mheap_.alloc(npages, spc) if s == nil { throw("out of memory") } stats := memstats.heapStats.acquire() atomic.Xadduintptr(&stats.largeAlloc, npages*pageSize) atomic.Xadduintptr(&stats.largeAllocCount, 1) memstats.heapStats.release() // Update heapLive. gcController.update(int64(s.npages*pageSize), 0) mheap_.central[spc].mcentral.fullSwept(mheap_.sweepgen).push(s) s.limit = s.base() + size heapBitsForAddr(s.base()).initSpan(s) return s }3.5 mcache的替换在mcache中,每一个级别的mspan链表只有一个,当某一个mspan满了的时候,就会去mcentral中获取一串级别相同的mspan链表。3.6 mcentral扩容从操作系统中申请新的内存,增加heapArena。
2022年08月05日
327 阅读
0 评论
0 点赞
2022-08-02
Go TCP
1 Socket 一种TCP的调用很多系统都提供Socket作为TCP网络链接的抽象。可以认为一个socket就是一个已经经过三次握手的链接。1.1 Socket的通信过程server端会持有一个仅用于监听和接收的socket,若监听到了相关的需求,则新建一个socket处理相关需求。1.2 IO模型由于服务端需要同时读写多个socket,所以创建了三个IO模型处理。1.2.1 阻塞IO建立三个线程,线程不断读取socket的数据,如果没有数据,则自旋再次读取(陷入内核状态),会卡住,直到接收到数据时,切换为业务态,继续执行。这也导致线程的状态切换的开销特别大,显得“低级”。1.2.2 非阻塞IO即业务逻辑会反复遍历所有的socket,有数据则处理,否则直接返回调用下一个socket。因此,线程不会陷入内核态。1.2.3 多路复用型IO操作系统帮助我们监听socket,再注册到事件池中。是一种非阻塞IO的升级版,将socket的可读事件注册在 event poll 中,每次业务函数遍历,事件池直接返回发生的事件(列表),业务则直接处理此socket。2 Go 中对事件池的抽象GO中有Network Poller 实现了对多路复用器的抽象(屏蔽了不同操作系统的epoll的区别)2.1 linux-netpollinit()新建多路复用器func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) //epfd是linux的文件描述符 if epfd < 0 { epfd = epollcreate(1024) //调用汇编函数创建事件池 if epfd < 0 { println("runtime: epollcreate failed with", -epfd) throw("runtime: netpollinit failed") } closeonexec(epfd) } r, w, errno := nonblockingPipe() //创建linux的管道用于关闭epoll if errno != 0 { println("runtime: pipe failed with", -errno) throw("runtime: pipe failed") } ev := epollevent{ events: _EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) //注册管道事件,有事件则关闭epoll if errno != 0 { println("runtime: epollctl failed with", -errno) throw("runtime: epollctl failed") } netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }2.2 linux-netpollopen()插入事件//传入一个socket的fd和pollDesc的指针 func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET //插入四个事件 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //pollDesc详细描述一个socket return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) //传入control中 }2.3 linux-netpoll()查询发生的事件// netpoll checks for ready network connections. // Returns list of goroutines that become runnable. // delay < 0: blocks indefinitely // delay == 0: does not block, just polls // delay > 0: block for up to that many nanoseconds func netpoll(delay int64) gList { //等待的时间 if epfd == -1 { return gList{} } var waitms int32 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { // An arbitrary cap on how long to wait for a timer. // 1e9 ms == ~11.5 days. waitms = 1e9 } var events [128]epollevent //事件储存 retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { //没有事件发生 if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } // If a timed sleep was interrupted, just return to // recalculate how long we should sleep now. if waitms > 0 { return gList{} } goto retry } var toRun gList //协程列表 for i := int32(0); i < n; i++ { //遍历每一个事件 ev := &events[i] if ev.events == 0 { continue } if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { if ev.events != _EPOLLIN { println("runtime: netpoll: break fd ready for", ev.events) throw("runtime: netpoll: break fd ready for something unexpected") } if delay != 0 { // netpollBreak could be picked up by a // nonblocking poll. Only read the byte // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) atomic.Store(&netpollWakeSig, 0) } continue } var mode int32 //如果是_EPOLLIN 增加可写的标志位 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } //如果是_EPOLLOUT 增加可读的标志位 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.setEventErr(ev.events == _EPOLLERR) netpollready(&toRun, pd, mode) //将可运行的协程放入列表内 } } return toRun //协程的列表 } 首先调用epollwait()查询有那些事情发生,再根据socket相关的pollDesc信息,返回可运行的协程列表(即将被唤醒)。3 Network Poller 本质3.1 pollcache和pollDesc结构体3.1.1 pollcache 结构体type pollCache struct { lock mutex //互斥锁 first *pollDesc //链表头 // PollDesc objects must be type-stable, // because we can get ready notification from epoll/kqueue // after the descriptor is closed/reused. // Stale notifications are detected using seq variable, // seq is incremented when deadlines are changed or descriptor is reused. } 3.1.2 pollDesc结构体--描述Sockettype pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock-链表 fd uintptr // constant for pollDesc usage lifetime atomicInfo atomic.Uint32 // atomic pollInfo rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil lock mutex // protects the following fields closing bool user uint32 // user settable cookie rseq uintptr // protects from stale read timers rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline (a nanotime in the future, -1 when expired) wseq uintptr // protects from stale write timers wt timer // write deadline timer wd int64 // write deadline (a nanotime in the future, -1 when expired) self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg. } 3.2 Network Poller的初始化//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { netpollGenericInit() } //.... func netpollGenericInit() { if atomic.Load(&netpollInited) == 0 { lockInit(&netpollInitLock, lockRankNetpollInit) lock(&netpollInitLock) if netpollInited == 0 { netpollinit() //开辟连接池 atomic.Store(&netpollInited, 1) } unlock(&netpollInitLock) } }使用原子操作加载变量netpollInited,保证只初始化一个poller!3.3 Network新增监听Socket//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() //给链表分配一个新的结构体pollDesc-新增节点 lock(&pd.lock) wg := pd.wg.Load() if wg != 0 && wg != pdReady { throw("runtime: blocked write on free polldesc") } rg := pd.rg.Load() if rg != 0 && rg != pdReady { throw("runtime: blocked read on free polldesc") } //新增节点的状态初始化 pd.fd = fd pd.closing = false pd.setEventErr(false) pd.rseq++ pd.rg.Store(0) pd.rd = 0 pd.wseq++ pd.wg.Store(0) pd.wd = 0 pd.self = pd pd.publishInfo() unlock(&pd.lock) errno := netpollopen(fd, pd) //插入事件池 if errno != 0 { pollcache.free(pd) return nil, int(errno) } return pd, 0 }3.4 Network Poller收发数据3.4.1 Socket已经可以读写gc(g0协程)会循环调用netpoll()方法。---因为g0协程一直在跑。func netpoll(delay int64) gList { //... //遍历每一个事件 var mode int32 //如果是_EPOLLIN 增加可写的标志位 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } //如果是_EPOLLOUT 增加可读的标志位 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.setEventErr(ev.events == _EPOLLERR) netpollready(&toRun, pd, mode) //将可运行的协程放入列表内 } //遍历结束 return toRun } 重点在于netpollready()函数调用的netpollunblock()函数:func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } } //ioready = true func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := gpp.Load() if old == pdReady { return nil } if old == 0 && !ioready { // Only set pdReady for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady //更改状态 } if gpp.CompareAndSwap(old, new) { if old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } } 发现Socket可读写时,会将其状态改为pdReady(它的值为1)。而后,协程会调用poll_runtime_pollWait()函数,判断Socket是否允许读写:func poll_runtime_pollWait(pd *pollDesc, mode int) int { errcode := netpollcheckerr(pd, int32(mode)) if errcode != pollNoError { return errcode } // As for now only Solaris, illumos, and AIX use level-triggered IO. if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) { errcode = netpollcheckerr(pd, int32(mode)) if errcode != pollNoError { return errcode } } return pollNoError //没有错误,继续执行 }调用了 netpollblock()函数,判断是否可以继续执行func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg //取出Socket的状态 } // set the gpp semaphore to pdWait for { // Consume notification if already ready. if gpp.CompareAndSwap(pdReady, 0) { //pdReady的值为1,此语句成立 return true //表示直接可写 } if gpp.CompareAndSwap(0, pdWait) { break } // Double check that this isn't corrupt; otherwise we'd loop // forever. if v := gpp.Load(); v != pdReady && v != 0 { throw("runtime: double wait") } } if waitio || netpollcheckerr(pd, mode) == pollNoError { gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } // be careful to not lose concurrent pdReady notification old := gpp.Swap(0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady } 3.4.2 Socket暂时无法读写g0协程还是会一直调用netpoll()方法,判断是否有Socket需要处理。协程会一直调用poll_runtime_pollWait()函数,其调用的netpollblock()函数会判断wg的值是否为pdwait,是的话,直接调用gopark()函数休眠---其中的参数之一是一个函数,会将此协程的地址写道rg或者wg上。如果相关的Socket可读写,netpoll()中的netpollready()函数就会取出rg或者wg储存的地址,放入toRun协程列表中,最后会将该协程唤醒。func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } //取出成功,传输到toRun列表中。 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } }5 Go对Socket的抽象-net5.1 如何监听Socket的新连接-net.Listenfunc Listen(network, address string) (Listener, error) { //Listener是一个接口 var lc ListenConfig return lc.Listen(context.Background(), network, address) }5.1.1 调用函数lc.Listen():func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) { //... case *TCPAddr: l, err = sl.listenTCP(ctx, la) //TCP监听 case *UnixAddr: l, err = sl.listenUnix(ctx, la) default: return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}} } //... }调用函数listenTCP():func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) { fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) if err != nil { return nil, err } return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil }调用函数internetSocket():func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() { raddr = raddr.toLocal(net) } family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode) return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn) }调用函数socket():func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) //创建了一个socket if err != nil { return nil, err } if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { poll.CloseFunc(s) return nil, err } if fd, err = newFD(s, family, sotype, net); err != nil { poll.CloseFunc(s) return nil, err } //... }调用函数newFD():func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { ret := &netFD{ pfd: poll.FD{ Sysfd: sysfd, IsStream: sotype == syscall.SOCK_STREAM, ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW, }, family: family, sotype: sotype, net: net, } return ret, nil }netFD的结构-net中对socket的详细解释:// Network file descriptor. type netFD struct { pfd poll.FD //--记录了pollDsc // immutable until Close family int sotype int isConnected bool // handshake completed or use of association with peer net string laddr Addr raddr Addr } 5.1.2 socket()创建完成netFD后,调用listenStream()func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { //创建完成netFD-- if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil case syscall.SOCK_DGRAM: if err := fd.listenDatagram(laddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil } } if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil }listenStream()中调用了Bind方法-绑定接口func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { var err error if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil { return err } var lsa syscall.Sockaddr if lsa, err = laddr.sockaddr(fd.family); err != nil { return err } if ctrlFn != nil { c, err := newRawConn(fd) if err != nil { return err } if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil { return err } } if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } if err = fd.init(); err != nil { //最后调用了poll_runtime_pollOpen return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } 5.1.3 listenTCP()最后返回TCPlistener// TCPListener is a TCP network listener. Clients should typically // use variables of type Listener instead of assuming TCP. type TCPListener struct { fd *netFD lc ListenConfig }5.1.4 总结1.net.Listen()新建了一个Socket,并Bind相应的端口。2.新建一个FD(Socket的详细描述),返回TCPListener对象(本质是一个LISTEN状态的Socket)。3.将TCPListener的FD信息加入监听。5.2 Read()和Write()方法在net包中的实现,比较简单,看看就行。6结构简图
2022年08月02日
215 阅读
0 评论
0 点赞
2022-07-27
golang 锁
1 atomic 原子操作是一种硬件层面(锁了内存地址)的加锁机制,保证操作一个表变量的时候,其他的协程或者线程无法访问,但是只能用于简单变量的简单操作。2 sema锁2.1 源码阅读也被称作信号量/信号锁,其核心是一个uint32的值,指代同时可并发的数量。而每一个sema锁都对应一个SemaRoot结构体。其拥有一个平衡二叉树用于排队// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem). // Each of those sudog may in turn point (through s.waitlink) to a list // of other sudogs waiting on the same address. // The operations on the inner lists of sudogs with the same address // are all O(1). The scanning of the top-level semaRoot list is O(log n), // where n is the number of distinct addresses with goroutines blocked // on them that hash to the given semaRoot. // See golang.org/issue/17953 for a program that worked badly // before we introduced the second level of list, and test/locklinear.go // for a test that exercises this. type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32 // Number of waiters. Read w/o the lock. }type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool // success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }大致可以简化为:2.2 逻辑源码解读当 sema锁获取到的uint32的值>0时:每次获取锁,次值减一,释放锁,此值加一。此uint32的值==0时:此时sema锁退化成一个休眠队列。而获取锁时,协程会调用gopark()休眠,并且进入堆数组等待。释放锁时,从堆数组中取出一个协程,并将其唤醒。3 常用锁3.1 互斥锁 sync.Mutex3.1.1 结构体源码解析// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 }3.1.2 正常加锁运行逻辑首先直接尝试CAS(compareAndSwap)直接加锁,若失败则多次自旋尝试,再失败则进入sema队列休眠。首先某一协程执行代码并加锁// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { //state的状态置为1 if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() //开始自旋 }加锁失败则自旋尝试func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. //判断 锁是否还在,是否处于饥饿状态,是否可以继续自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() //不是刚刚被唤醒则执行此函数--汇编让其执行空语句 iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { //如果锁已经解开并且没有处于饥饿状态 new |= mutexLocked //将状态更新---加锁 } if old&(mutexLocked|mutexStarving) != 0 { //锁是否仍在 new += 1 << mutexWaiterShift //状态中的等待数量+1 } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { //实现更新锁的状态 if old&(mutexLocked|mutexStarving) == 0 { //再次检验是否成功上锁 break // locked the mutex with CAS //自旋加锁成功 } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } //获取sema锁--值为0则协程休眠 runtime_SemacquireMutex(&m.sema, queueLifo, 1) //若协程休眠,则以下代码无法执行 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }3.1.4正常解锁运行逻辑首先尝试CAS直接解锁(将锁的state置为0),若发现有协程在sema中休眠,则唤醒(runtime_Semrelease())一个协程。直接解锁// Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) //状态由1变为0,解锁 if new != 0 { 若是0则state中所有的字段都是0,也没有等待的协程 // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) //有等待的或者其他状态 } } state字段不是0时--有其他状态func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. //判断是否有多余的休眠协程 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return //不是应为其他协程则直接退出--因为可以直接解锁 } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) //有等待的协程则释放一个 return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } } 被释放的协程继续执行Lock的相关代码//协程休眠...已经结束--被唤醒 //是否需要进入饥饿模式 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } //回到for的开头再次判断 }3.1.5 锁饥饿问题再高并发的地区,某一协程一直无法获取到锁,触发饥饿问题。约定为:当前协程等待锁的时间超过1ms,state的相关字段(starving)置为1,进入饥饿模式。再饥饿模式中,不再自旋,新来的协程直接进入sema中休眠,并且被唤醒的协程直接获取锁,直到没有协程在队列中等待的时候才回到正常模式。func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { //饥饿则直接跳过此段 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } //如果被锁且处于饥饿模式则直接移入等待队列 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新状态 if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) //首先休眠被唤醒后直接判断是否要进入饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs //是则将函数开始声明的变量starving置为true old = m.state //唤醒后处于饥饿状态 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) //直接改变状态 break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } 3.1.6 建议1.需要加锁的时候,尽量减少锁的时间,并且只在必要的部分加锁!2.建议使用defer语句确保锁的释放!3.2 读写锁 sunc.RWMutex满足多个协程只读(并发读取,单独修改)。3.2.1 读写锁的实现type RWMutex struct { w Mutex // held if there are pending writers//写协程持有的互斥锁 writerSem uint32 // semaphore for writers to wait for completing readers//sema队列 readerSem uint32 // semaphore for readers to wait for completing writers//sema队列 readerCount int32 // number of pending readers//正:读取协程的数量, 负:无法读取,写锁 readerWait int32 // number of departing readers//写锁生效前需要几个读锁释放 } 3.2.2加写锁逻辑1.先加mutex互斥锁,若已经被加锁则阻塞等待;2.将readerCount减去常量_rwmutexMaxReaders_(变为负值),阻塞读锁获取;3.计算需要等待多少读协程释放;4.如果需要等待读协程释放,则进入writerSem队列,直到readerWait=0。源码func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock()//首先加锁互斥锁--有加写锁的资格 // Announce to readers there is a pending writer. //readerCount变为负值,但是r表示等待的数量 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. //之前有协程则进入等待队列 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } //原本等待的数量为0,则直接执行 if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }3.2.3解写锁逻辑1.将readerCount变为正值(加常量),允许读锁获取;2.释放等待队列中的读协程。源码func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. //变为正值 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. //如果r(等待协程)大于0则释放一个读协程 for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() //将互斥锁解锁 if race.Enabled { race.Enable() } } 3.2.4 加读锁逻辑1.将readerCount+1;2.如果readerCount是正数,则加锁成功;3.如果readerCount是负数,则进入readerSem队列等待。源码func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }3.2.5 解读锁逻辑1.readerCount-1;2.此时readerCount>0时,readerWait-1,代表解锁成功。3.若readerCount<0时,说明有写锁排队。如果自己时最后一个读协程则唤醒写协程源码func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. //减为0时,唤醒写协程 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }3.3 等待组 sync.WaitGrouptype WaitGroup struct { noCopy noCopy //指示编译器,禁止拷贝 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. state1 uint64 // compilers only guarantee that 64-bit fields are 32-bit aligned state2 uint32 }func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // state1 is 64-bit aligned: nothing to do. return &wg.state1, &wg.state2 } else { // state1 is 32-bit aligned but not 64-bit aligned: this means that // (&state1)+4 is 64-bit aligned. state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) return (*uint64)(unsafe.Pointer(&state[1])), &state[0] } } func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { race.Read(unsafe.Pointer(semap)) } if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } // Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } runtime_Semacquire(semap) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }3.4 锁的初始化 sync.Once目的:让相关函数的代码只运行一次。思路:争抢mutex互斥锁,失败则进入sema休眠,成功的更改计数值,再次释放。type Once struct { done uint32 //执行后会变成1 m Mutex } func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { // Outlined slow-path to allow inlining of the fast-path. o.doSlow(f) } //执行过直接退出了 } func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { //二次判断是不是真的没有运行过 defer atomic.StoreUint32(&o.done, 1) f() } }3.5 锁异常的问题3.5.1 不要拷贝锁永远不要拷贝锁,否则容易造成死锁情况!---拷贝后的锁状态很难判断当然,带锁的结构体也不要拷贝。使用 go vet main.go 可以检测锁拷贝。3.5.2 RACE竞争检测很多代码都有语句** if race.Enabled(){...}**,目的是发现隐含的数据竞争问题。终端输入 **go build -race main.go**,**./main.exe**。若出现warning 则表示出现数据竞争。3.5.1 go-deadlock 检测github上搜索go-deadlock第三方库,替代mutex包(用法一样)。
2022年07月27日
333 阅读
0 评论
1 点赞
2022-07-27
golang 协程
1 线程和进程的关系1.一个进程可以内包含多个线程,线程可被理解为是不占用空间但是占用CPU的。2.在同一个进程中,线程是共享内存空间的。(线程所占的空间是被编译器分配给这个进程的)。3.线程的调度需要由系统进行,CPU需要在不同的线程中进行切换。所以,线程的缺点在于:a.本身占用的资源大,b. 线程的操作开销大,c.线程的切换开销大1.1go线程的抽象type m struct { g0 *g //go运行的第一个协程 morebuf gobuf divmod uint32 // Fields not known to debuggers. procid uint64 gsignal *g goSigStack gsignalStack sigmask sigset tls [tlsSlots]uintptr mstartfn func() curg *g //现在进行到的协程 caughtsig guintptr p puintptr nextp puintptr oldp puintptr id int64 mallocing int32 throwing int32 preemptoff string locks int32 dying int32 profilehz int32 spinning bool blocked bool newSigstack bool printlock int8 incgo bool freeWait uint32 fastrand uint64 needextram bool traceback uint8 ncgocall uint64 ncgo int32 cgoCallersUse uint32 cgoCallers *cgoCallers park note alllink *m schedlink muintptr lockedg guintptr createstack [32]uintptr lockedExt uint32 lockedInt uint32 nextwaitm muintptr waitunlockf func(*g, unsafe.Pointer) bool waitlock unsafe.Pointer waittraceev byte waittraceskip int startingtrace bool syscalltick uint32 freelink *m // these are here because they are too large to be on the stack // of low-level NOSPLIT functions. libcall libcall libcallpc uintptr libcallsp uintptr libcallg guintptr syscall libcall vdsoSP uintptr vdsoPC uintptr // preemptGen counts the number of completed preemption // signals. This is used to detect when a preemption is // requested, but fails. Accessed atomically. preemptGen uint32 // Whether this is a pending preemption signal on this M. // Accessed atomically. signalPending uint32 dlogPerM mOS //及记录每种操作系统的线程信息 // Up to 10 locks held by this m, maintained by the lock ranking code. locksHeldLen int locksHeld [10]heldLockInfo }2 协程并发的原理协程是带有执行状态的程序,需要运行的时候,将此协程放入一个线程中传入CPU内计算,再用计算结果更新这个程序,最后将其恢复为协程。(CPU不再切换,而是线程的内容在变)再按照顺序执行下一个协程。特点:协程复用线程(节省的CPU线程调度的开销)优点:资源利用的效率高,调度的速度快,拥有超高的并发。3 协程的本质(源码)type g struct { stack stack //stack包含栈的上下限 stackguard0 uintptr stackguard1 uintptr _panic *_panic _defer *_defer m *m sched gobuf //包含sp(原始)指针,指向的是此协程的栈用到了哪个位置 //pc指针,程序计数器,表示运行到了哪一行代码 syscallsp uintptr syscallpc uintptr stktopsp uintptr param unsafe.Pointer atomicstatus uint32 //协程的状态 stackLock uint32 goid int64 //协程的id号 schedlink guintptr waitsince int64 waitreason waitReason preempt bool preemptStop bool preemptShrink bool asyncSafePoint bool paniconfault bool gcscandone bool throwsplit bool activeStackChans bool parkingOnChan uint8 raceignore int8 sysblocktraced bool tracking bool trackingSeq uint8 runnableStamp int64 runnableTime int64 sysexitticks int64 traceseq uint64 tracelastp puintptr lockedm muintptr sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr ancestors *[]ancestorInfo startpc uintptr racectx uintptr waiting *sudog cgoCtxt []uintptr labels unsafe.Pointer timer *timer selectDone uint32 gcAssistBytes int64 }4 协程在线程上的运行4.1 线程的运行4.1.1 运行schedule()func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. } // We should not schedule away from a g that is executing a cgo call, // since the cgo call is using the m's g0 stack. if _g_.m.incgo { throw("schedule: in cgo") } top: pp := _g_.m.p.ptr() pp.preempt = false if sched.gcwaiting != 0 { gcstopm() goto top } if pp.runSafePointFn != 0 { runSafePointFn() } // Sanity check: if we are spinning, the run queue should be empty. // Check this before calling checkTimers, as that might call // goready to put a ready goroutine on the local run queue. if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { throw("schedule: spinning with local work") } checkTimers(pp, 0) var gp *g var inheritTime bool // Normal goroutines will check for need to wakeP in ready, // but GCworkers and tracereaders will not, so the check must // be done here instead. tryWakeP := false //拿到可执行的协程 if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) tryWakeP = true } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) if gp != nil { tryWakeP = true } } if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) // We can see gp != nil here even if the M is spinning, // if checkTimers added a local goroutine via goready. } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { resetspinning() } if sched.disable.user && !schedEnabled(gp) { // Scheduling of this goroutine is disabled. Put it on // the list of pending runnable goroutines for when we // re-enable user scheduling and look again. lock(&sched.lock) if schedEnabled(gp) { // Something re-enabled scheduling while we // were acquiring the lock. unlock(&sched.lock) } else { sched.disable.runnable.pushBack(gp) sched.disable.n++ unlock(&sched.lock) goto top } } // If about to schedule a not-normal goroutine (a GCworker or tracereader), // wake a P if there is one. if tryWakeP { wakep() } if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top } execute(gp, inheritTime) }4.1.2跳转到execute()func execute(gp *g, inheritTime bool) { //gp是即将执行的方法 _g_ := getg() // Assign gp.m before entering _Grunning so running Gs have an // M. _g_.m.curg = gp gp.m = _g_.m casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { _g_.m.p.ptr().schedtick++ } // Check whether the profiler needs to be turned on or off. hz := sched.profilehz if _g_.m.profilehz != hz { setThreadCPUProfiler(hz) } if trace.enabled { // GoSysExit has to happen when we have a P, but before GoStart. // So we emit it here. if gp.syscallsp != 0 && gp.sysblocktraced { traceGoSysExit(gp.sysexitticks) } traceGoStart() } gogo(&gp.sched) } 4.1.3执行gogo()gogo()方法是汇编实现的。// func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $0-8 MOVQ buf+0(FP), BX // gobuf MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX // make sure g != nil JMP gogo<>(SB) TEXT gogo<>(SB), NOSPLIT, $0 get_tls(CX) MOVQ DX, g(CX) MOVQ DX, R14 // set the g register MOVQ gobuf_sp(BX), SP // restore SP //在协程栈里面插入了一个栈帧-goexit()方法 MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) // clear to help garbage collector MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) //下面两行:跳转线程正在执行的程序计数器 MOVQ gobuf_pc(BX), BX JMP BX4.1.4 处理完业务逻辑后退回到插入的栈帧goexit()goexit()也是由汇编语言实现的。// The top-most function running on a goroutine // returns to goexit+PCQuantum. TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0 BYTE $0x90 // NOP CALL runtime·goexit1(SB) // does not return // traceback from goexit1 must hit code range of goexit BYTE $0x90 // NOP // //调用了runtime·goexit1(SB)方法 //... // Finishes execution of the current goroutine. func goexit1() { if raceenabled { racegoend() } if trace.enabled { traceGoEnd() } mcall(goexit0) //会切换栈,回到g0栈空间 } //使用goexit0方法传参 func goexit0(gp *g) { _g_ := getg() _p_ := _g_.m.p.ptr() casgstatus(gp, _Grunning, _Gdead) gcController.addScannableStack(_p_, -int64(gp.stack.hi-gp.stack.lo)) if isSystemGoroutine(gp, false) { atomic.Xadd(&sched.ngsys, -1) } gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.preemptStop = false gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { // Flush assist credit to the global pool. This gives // better information to pacing if the application is // rapidly creating an exiting goroutines. assistWorkPerByte := gcController.assistWorkPerByte.Load() scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes)) atomic.Xaddint64(&gcController.bgScanCredit, scanCredit) gp.gcAssistBytes = 0 } dropg() if GOARCH == "wasm" { // no threads yet on wasm gfput(_p_, gp) schedule() // never returns } if _g_.m.lockedInt != 0 { print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n") throw("internal lockOSThread error") } gfput(_p_, gp) if locked { // The goroutine may have locked this thread because // it put it in an unusual kernel state. Kill it // rather than returning it to the thread pool. // Return to mstart, which will release the P and exit // the thread. if GOOS != "plan9" { // See golang.org/issue/22227. gogo(&_g_.m.g0.sched) } else { // Clear lockedExt on plan9 since we may end up re-using // this thread. _g_.m.lockedExt = 0 } } schedule() //再次调用schedule方法 } 4.2 G-M-P调度模型由多个线程同时抓取协程,容易出现锁冲突。通常使用本地队列(获取全局的协程,每次抓取一堆,顺序执行)解决。P即帮助M获取协程,从而减少并发冲突的几率。4.2.1本地队列实现的源码type p struct { //本地队列 id int32 status uint32 // one of pidle/prunning/... link puintptr schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle)原始指针 mcache *mcache pcache pageCache raceprocctx uintptr deferpool []*_defer // pool of available defer structs (see panic.go) deferpoolbuf [32]*_defer // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64 // Queue of runnable goroutines. Accessed without lock.----队列,不需要加锁 runqhead uint32 //头部的序号 runqtail uint32 //尾部的序号 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. // // Note that while other P's may atomically CAS this to zero, // only the owner P can CAS it to a valid G. runnext guintptr //指向下一个可用的协程 // Available G's (status == Gdead) gFree struct { gList n int32 } sudogcache []*sudog sudogbuf [128]*sudog // Cache of mspan objects from the heap. mspancache struct { // We need an explicit length here because this field is used // in allocation codepaths where write barriers are not allowed, // and eliminating the write barrier/keeping it eliminated from // slice updates is tricky, moreso than just managing the length // ourselves. len int buf [128]*mspan } tracebuf traceBufPtr // traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr palloc persistentAlloc // per-P to avoid mutex _ uint32 // Alignment for atomic fields below // The when field of the first entry on the timer heap. // This is updated using atomic functions. // This is 0 if the timer heap is empty. timer0When uint64 // The earliest known nextwhen field of a timer with // timerModifiedEarlier status. Because the timer may have been // modified again, there need not be any timer with this value. // This is updated using atomic functions. // This is 0 if there are no timerModifiedEarlier timers. timerModifiedEarliest uint64 // Per-P GC state gcAssistTime int64 // Nanoseconds in assistAlloc gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic) // gcMarkWorkerMode is the mode for the next mark worker to run in. // That is, this is used to communicate with the worker goroutine // selected for immediate execution by // gcController.findRunnableGCWorker. When scheduling other goroutines, // this field must be set to gcMarkWorkerNotWorker. gcMarkWorkerMode gcMarkWorkerMode // gcMarkWorkerStartTime is the nanotime() at which the most recent // mark worker started. gcMarkWorkerStartTime int64 // gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and // disposed on certain GC state transitions. gcw gcWork // wbBuf is this P's GC write barrier buffer. // // TODO: Consider caching this in the running G. wbBuf wbBuf runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point // statsSeq is a counter indicating whether this P is currently // writing any stats. Its value is even when not, odd when it is. statsSeq uint32 // Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. timersLock mutex // Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. timers []*timer // Number of timers in P's heap. // Modified using atomic instructions. numTimers uint32 // Number of timerDeleted timers in P's heap. // Modified using atomic instructions. deletedTimers uint32 // Race context used while executing timer functions. timerRaceCtx uintptr // scannableStackSizeDelta accumulates the amount of stack space held by // live goroutines (i.e. those eligible for stack scanning). // Flushed to gcController.scannableStackSize once scannableStackSizeSlack // or -scannableStackSizeSlack is reached. scannableStackSizeDelta int64 // preempt is set to indicate that this P should be enter the // scheduler ASAP (regardless of what G is running on it). preempt bool // Padding is no longer needed. False sharing is now not a worry because p is large enough // that its size class is an integer multiple of the cache line size (for any of our architectures). }4.2.2 GPM(GMP)模型关系图M指线程,G指协程,P即本地队列4.2.3 GPM的调用时机在schedule()函数中,有一个对gp的判断if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) // We can see gp != nil here even if the M is spinning, // if checkTimers added a local goroutine via goready. }其执行了runqget()函数,并将本地队列的指针作为参数传入。逻辑源码阅读// Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. next := _p_.runnext //runnext刚好指向下一个可执行的协程 // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P, // because other Ps can race to set runnext to 0, but only the current P can set it to non-0. // Hence, there's no need to retry this CAS if it falls. if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true //取出下一个协程的地址并返回 } for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } } 在读取本地队列后,并没有发现可调用的协程时,gp仍然时nil,此时会执行if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }调用了findrunnable()方法// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { //...获取本地的队列 //本地无队列的时候 // global runq -在全局获取协程 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) //在全局的队列协程中拿一批协程 unlock(&sched.lock) if gp != nil { return gp, false } } // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Spinning Ms: steal work from other Ps. // // Limit the number of spinning Ms to half the number of busy Ps. // This is necessary to prevent excessive CPU consumption when // GOMAXPROCS>>1 but the program parallelism is low. procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } //无协程可获取 gp, inheritTime, tnow, w, newWork := stealWork(now) //任务窃取 now = tnow if gp != nil { // Successfully stole. return gp, inheritTime } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } } //... } 4.2.4 总结golang通过逻辑队列的形式减少了锁的并发(抢夺全局队列中的锁)4.3 协程的并发4.3.1 协程的饥饿问题若正在执行的协程的运行时间特别长,那么本地消息队列中的其他协程会一直处于等待状态,即协程饥饿问题。解决方式:保存现场然后再执行别的业务。然后回到线程执行的schedule()函数,即回到开头。当然,全局队列饥饿问题的解决同样是切换执行的协程。schedule()对应模块源码分析if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. //执行队列循环到达61次时,在全局队列中获取一个协程 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) //只拿一个 unlock(&sched.lock) } }如何触发切换方法:1.主动挂起(runtime.gopark())即业务主动调用gopark()函数,在程序员使用一些函数的时候会出现。// Puts the current goroutine into a waiting state and calls unlockf on the // system stack. // // If unlockf returns false, the goroutine is resumed. // // unlockf must not access this G's stack, as it may be moved between // the call to gopark and the call to unlockf. // // Note that because unlockf is called after putting the G into a waiting // state, the G may have already been readied by the time unlockf is called // unless there is external synchronization preventing the G from being // readied. If unlockf returns false, it must guarantee that the G cannot be // externally readied. // // Reason explains why the goroutine has been parked. It is displayed in stack // traces and heap dumps. Reasons should be unique and descriptive. Do not // re-use reasons, add new ones. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } //维护协程和线程的状态 mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) //mcall()调用的方法表示即将切换栈! //park_m即状态维护并且调用了schedule()函数 }注意:gopark无法在包外使用,但是可以用time.Sleep()函数(底层也是gopark())。 2.系统完成调用时即系统指令调用(系统底层),如检测网络等行为......调用的关系 exitsyscall()->Gosched()->mcall()->goschedguarded_m()->goschedImpl(gp)->schedule()4.3.2 抢占式调度如果协程耗时很长,并且永远不主动挂起也不做系统调用时,需要抢占式调用调节。发现morestack()函数经常被协程调用(在某一个方法内需要调用另一个方法的时候,系统会先调用morestack()方法),可在此设置。关于morestack()函数本意是检测协程栈是否有足够的空间。而在方法内调用调用方法时,编译器会强制插入morestack()。标记抢占系统监控器检测到协程运行时间超过10ms时将此g中的stackguard0字段设置为0xfffffade(栈抢占的标记常量)。抢占操作执行morestack()函数的时候判断协程是否已经被抢占,如果是,则直接回到schedule()函数。//morestack()是汇编方法,其中主要是处理栈空间不足的逻辑,最后调用runtime·newstack(SB) //此函数中除了生成新栈的逻辑外有一句检测语句 func newstack(){ //... preempt := stackguard0 == stackPreempt //检测stackguard0字段是否被标记为栈抢占 //... if preempt { if gp == thisg.m.g0 { throw("runtime: preempt g0") } if thisg.m.p == 0 && thisg.m.locks == 0 { throw("runtime: g is running but p is not") } if gp.preemptShrink { // We're at a synchronous safe point now, so // do the pending stack shrink. gp.preemptShrink = false shrinkstack(gp) } if gp.preemptStop { preemptPark(gp) // never returns } // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return //实行抢占--最后再次回到schedule() } } 补充:基于信号的抢占式调度使用场景:协程内没有方法的调用,即不调用morestack()函数。例如:package main func do(){ i := 0 for{ i++ } } func main(){ go do() }因为操作系统中有许多基于信号的底层通信方式,而线程可用注册对应信号(SIGPIPE, SIGURG,SIGHUP)的处理函数,此调度则直接注册SIGURG(紧急)信号的处理函数。然后再GC工作的时候(此时很多工作都暂停,适合抢占)向目标线程 发送信号,触发调度。// doSigPreempt handles a preemption signal on gp. func doSigPreempt(gp *g, ctxt *sigctxt) { // Check if this G wants to be preempted and is safe to // preempt. if wantAsyncPreempt(gp) { if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { // Adjust the PC and inject a call to asyncPreempt. ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) } } // Acknowledge the preemption. atomic.Xadd(&gp.m.preemptGen, 1) atomic.Store(&gp.m.signalPending, 0) if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, -1) } } //asyncPreempt()汇编方法调用另一个信号抢占的方法,调用mcall,最后回到schedule()方法5 解决协程启动数量过多的问题除了优化业务逻辑和优化硬件设备之外还有两个方法如下:1 使用channel缓冲使用channel限制协程的运行数量func main(){ ch := make(chan struct{}, 3000) for i:= 0; i < math.MaxInt32; i++ { ch <- struct{}{} go func(i int, ch chan struct{}){ fmt.Println(i) time.Sleep(time.Second) <- ch }(i, ch) } }2 协程池(tunny)--不推荐将GMP模型中的全局锁覆盖的协程转变为自己的协程池;但是:Golang的线程相关已经是一种池化了,二次池化会增加复杂度,而GO语言希望的是协程即用即毁,这样有悖于初衷。
2022年07月27日
218 阅读
0 评论
0 点赞
2022-07-27
golang Map
1 HashMap的基本方法1.1 开放寻址法(插入值为例)首先,hashmap的底层是一个数组,每一个数组元素存储一个KV键值对。将KV录入hash函数生成一串数据,再将此数据对数组的长度进行取模运算,得到应当存入的数组的位置(下标),倘若目标地址已经被占用了(hash碰撞),则向后依次遍历直到找到空闲的位置再存值。读取时方法相似。1.2 拉链法(插入值为例)更改底层数组的存储方式,将底层的数组(hash槽)更改为储存KV键值对链表(hash桶)的指针(有些时候可以不用指针)。需要插入的时候,将链表下延即可。读取时,一样找到相关的位置,遍历下方的链表寻值。2 GO语言HashMap源码阅读在runtime包的map.go下。2.1 HashMap底层数据结构体//map的头部指针 type hmap struct{ count int //此表存的键值对的数量. flags uint8 B uint8 //桶个数的关于2的对数 有8个桶->B的值为3 noverflow uint16 hash0 uint32 //生成hash值的种子.用于hash算法 buckets unsafe.Pointer //使用拉链发储存KV键值对.值是2^B.指向bmap数组 oldbuckets unsafe.Pointer nevacuate uintptr extra *mapextra } //桶的数据结构 type bmap struct{ //tophash 存的是key对应的hash值的头一个字节(8位) tophash [bucketCnt]uint8 //bucketCnt 通过位运算得出值为8 //其后跟有bucketCnt个key和相应数量的value //这里不写相应的key和value的存储代码是为了适应任意值类型的KV. //不用空接口的原因:会多一层结构体,更加麻烦 //再之后还跟有overflow的指针(是否溢出8个键值对,是则指向新的桶) }注意:bmap中的tophash、keys、elems三个字段是分别组成三个数组的,只是具有下标对应的关系。2.2 Map的初始化2.2.1 使用make进行初始化观察编译过程使用终端命令go build -gcflags -S ./文件夹名/函数名.go发现最后调用的是tuntime.makmap的方法。makemap的源码:(位置:Go SDK 1.18/src/runtime/map.go)func makemap(t *maptype, hint int, h *hmap) *hmap { mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) if overflow || mem > maxAlloc { hint = 0 } if h == nil { h = new(hmap) //首先创建了一个hmap } h.hash0 = fastrand() B := uint8(0) for overLoadFactor(hint, B) { //根据传入的提示算出桶的数量 B++ } h.B = B if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) //创建桶数组 //同时创建出一些溢出桶备用 if nextOverflow != nil { h.extra = new(mapextra) //溢出桶的底层是mapextra h.extra.nextOverflow = nextOverflow //nextOverflow指向所有的溢出桶 } } return h }2.2.2 使用字面量进行初始化hash := map[string]int{ "1": 2, "3": 4, "5": 6, }先使用make创建空间,当元素小于25个的时候,底层直接赋值,若大于25个元素则底层将转化为循环赋值。(分别生成k数组和v数组,再使用for循环)2.3 Map的查询2.3.1 寻值的过程首先将需要查询的key值和hash0(哈希种子)录入hasher函数生成hasher二进制字符串,根据B的值取最后B位数字,表示的十进制数则为对应的桶号。此时tophash的值是此字符串的前八位。再根据tophash的值,在数组中遍历,再次验证key的正确性,倘若不是(hash碰撞),则再次向下搜寻相同tophash的值的位置,倘若还是没有,则判断overflow是否指向了溢出桶,以此往复。2.3.2 插入(更改)值的过程和寻找值的过程类似。2.4 Map的扩容机制当溢出桶的数量太多了,会导致性能严重下降,此时需要通过扩容优化。2.4.1 扩容节点的判断func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { //... if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again } //... }即:如果map不再扩容当中,或者判断装载因子(最多平均一个hash槽6.5个key)超出范围,或者有太多的溢出桶,则实行扩容:hashGrow(t, h)2.4.2 扩容类型等量扩容可以通俗地理解为数据不多,但是溢出桶过多(hash碰撞,或者原本的数据大量被删除)翻倍扩容可以通俗地理解为数据过多。2.4.3 扩容操作1.前置操作1.创建一组新的桶2.将头部指针oldbuckets指向原有的桶数组3.buckets指针指向新的桶数组4.将map标记为扩容状态func hashGrow(t *maptype, h *hmap) { //计算B的新的溢出值 bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { bigger = 0 h.flags |= sameSizeGrow } //指针的替换更新 oldbuckets := h.buckets //创建新的桶数组 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) //状态的更新 flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // commit the grow (atomic wrt gc) h.B += bigger //这里B的值变了,原本桶的数据会被分配到新的几个桶中 //(原本的B增加后会变成新的数字,则将原本的桶内数据分配到新的几个桶中) h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 //更新extra 新桶的溢出桶的信息更新 if h.extra != nil && h.extra.overflow != nil { // Promote current overflow buckets to the old generation. if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } // the actual copying of the hash table data is done incrementally // by growWork() and evacuate(). }2.将所有数据从旧桶驱逐到新桶中Go采用渐进式驱逐:需要操作某一个旧桶的时候将数据迁移到新桶中注意:读取的时候不进行驱逐只判断需要读取新桶还是旧桶。func evacuate(t *maptype, h *hmap, oldbucket uintptr) { b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) newbit := h.noldbuckets() if !evacuated(b) { // TODO: reuse overflow buckets instead of using new ones, if there // is no iterator using the old buckets. (If !oldIterator.) // xy contains the x and y (low and high) evacuation destinations. var xy [2]evacDst x := &xy[0] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.e = add(x.k, bucketCnt*uintptr(t.keysize)) if !h.sameSizeGrow() { // Only calculate y pointers if we're growing bigger. // Otherwise GC can see bad pointers. y := &xy[1] y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.e = add(y.k, bucketCnt*uintptr(t.keysize)) } for ; b != nil; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) e := add(k, bucketCnt*uintptr(t.keysize)) for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { top := b.tophash[i] if isEmpty(top) { b.tophash[i] = evacuatedEmpty continue } if top < minTopHash { throw("bad map state") } k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 if !h.sameSizeGrow() { // Compute hash to make our evacuation decision (whether we need // to send this key/elem to bucket x or bucket y). hash := t.hasher(k2, uintptr(h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { // If key != key (NaNs), then the hash could be (and probably // will be) entirely different from the old hash. Moreover, // it isn't reproducible. Reproducibility is required in the // presence of iterators, as our evacuation decision must // match whatever decision the iterator made. // Fortunately, we have the freedom to send these keys either // way. Also, tophash is meaningless for these kinds of keys. // We let the low bit of tophash drive the evacuation decision. // We recompute a new random tophash for the next level so // these keys will get evenly distributed across all buckets // after multiple grows. useY = top & 1 top = tophash(hash) } else { if hash&newbit != 0 { useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN") } b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY dst := &xy[useY] // evacuation destination if dst.i == bucketCnt { dst.b = h.newoverflow(t, dst.b) dst.i = 0 dst.k = add(unsafe.Pointer(dst.b), dataOffset) dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) } dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check if t.indirectkey() { *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { typedmemmove(t.key, dst.k, k) // copy elem } if t.indirectelem() { *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { typedmemmove(t.elem, dst.e, e) } dst.i++ // These updates might push these pointers past the end of the // key or elem arrays. That's ok, as we have the overflow pointer // at the end of the bucket to protect against pointing past the // end of the bucket. dst.k = add(dst.k, uintptr(t.keysize)) dst.e = add(dst.e, uintptr(t.elemsize)) } } // Unlink the overflow buckets & clear key/elem to help GC. if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 { b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)) // Preserve b.tophash because the evacuation // state is maintained there. ptr := add(b, dataOffset) n := uintptr(t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }3.回收oldbuckets在数据被驱逐完成后,将回收oldbuckets的内存。2.4.4 注意扩容可能并不是增加桶的数量而是整理桶内的数据。3 Map的并发问题在使用map的时候,经常处于高并发的环境中,若使用mutex包增加锁,则效率大打折扣,所以推荐使用sync.Map。3.1 sync.Map源码阅读type Map struct { mu Mutex // read contains the portion of the map's contents that are safe for // concurrent access (with or without mu held). // // The read field itself is always safe to load, but must only be stored with // mu held. // // Entries stored in read may be updated concurrently without mu, but updating // a previously-expunged entry requires that the entry be copied to the dirty // map and unexpunged with mu held. read atomic.Value // readOnly // dirty contains the portion of the map's contents that require mu to be // held. To ensure that the dirty map can be promoted to the read map quickly, // it also includes all of the non-expunged entries in the read map. // // Expunged entries are not stored in the dirty map. An expunged entry in the // clean map must be unexpunged and added to the dirty map before a new value // can be stored to it. // // If the dirty map is nil, the next write to the map will initialize it by // making a shallow copy of the clean map, omitting stale entries. dirty map[any]*entry // misses counts the number of loads since the read map was last updated that // needed to lock mu to determine whether the key was present. // // Once enough misses have occurred to cover the cost of copying the dirty // map, the dirty map will be promoted to the read map (in the unamended // state) and the next store to the map will make a new dirty copy. misses int } // readOnly is an immutable struct stored atomically in the Map.read field. type readOnly struct { m map[any]*entry amended bool // true if the dirty map contains some key not in m. } ... type entry struct { // p points to the interface{} value stored for the entry. // // If p == nil, the entry has been deleted, and either m.dirty == nil or // m.dirty[key] is e. // // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry // is missing from m.dirty. // // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty // != nil, in m.dirty[key]. // // An entry can be deleted by atomic replacement with nil: when m.dirty is // next created, it will atomically replace nil with expunged and leave // m.dirty[key] unset. // // An entry's associated value can be updated by atomic replacement, provided // p != expunged. If p == expunged, an entry's associated value can be updated // only after first setting m.dirty[key] = e so that lookups using the dirty // map find the entry. p unsafe.Pointer // *interface{} } 发现其包含四个字段(使用了两个map),简记为:type Map struct{ mu //原子锁 read //key(interface{})和value(unsafe.Pointer)都可以是任意值的map dirty //和read差不多也是万能指向的map misses //未命中 }3.2 运行逻辑解释首先 read结构体用于高并发的读写,dirty结构体用于储存追加的字段。读写默认走read结构体。如果amended=true则取dirty结构体中的信息,此时misses+=1。追加先进入read结构体,没有找到则返回到mu锁上dirty,再追加,然后read结构体中amended改为true,表示read结构体内的信息不完整。当misses的值等于dirty的长度时,将dirty的内容给read,下方dirty变为nil,ammended和miss初始为false和0,若后续仍要追加信息,则再次重建dirty结构体。3.3 更难的删除3.3.1 正常删除在read结构体中可以找到,则直接找到相关的key的值,再将指向value的Pointer置为空。3.3.2 追加后删除指dirty内的信息还没有给予read的时候。(上下信息不一致的时间段)首先上锁后再dirty中找到相关的key,然后将指向value的Pointer置为空,在提升(diety变为nil)的时候,将这个key对应的Pointer变为expunged(删除)。则在重建dirty的时候不再重建Pointer为expunged的键值对,同时再次删除或者查询的时候若Pointer的值为此标识,认为其不存在。3.4 总结sync.Map使用了两个map,实质上是分离了扩容问题将不会引发扩容的操作(查改)归于read(map)将可能引发扩容的操作(插入)归于dirty(map)
2022年07月27日
255 阅读
1 评论
0 点赞
1
2