1 atomic 原子操作
是一种硬件层面(锁了内存地址)的加锁机制,保证操作一个表变量的时候,其他的协程或者线程无法访问,但是只能用于简单变量的简单操作。
2 sema锁
2.1 源码阅读
也被称作信号量/信号锁,其核心是一个uint32的值,指代同时可并发的数量。而每一个sema锁都对应一个SemaRoot结构体。其拥有一个平衡二叉树用于排队
// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and test/locklinear.go
// for a test that exercises this.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
大致可以简化为:
2.2 逻辑源码解读
当 sema锁获取到的uint32的值>0时:每次获取锁,次值减一,释放锁,此值加一。
此uint32的值==0时:此时sema锁退化成一个休眠队列。而获取锁时,协程会调用gopark()休眠,并且进入堆数组等待。释放锁时,从堆数组中取出一个协程,并将其唤醒。
3 常用锁
3.1 互斥锁 sync.Mutex
3.1.1 结构体源码解析
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
3.1.2 正常加锁运行逻辑
首先直接尝试CAS(compareAndSwap)直接加锁,若失败则多次自旋尝试,再失败则进入sema队列休眠。
首先某一协程执行代码并加锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { //state的状态置为1
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow() //开始自旋
}
加锁失败则自旋尝试
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
//判断 锁是否还在,是否处于饥饿状态,是否可以继续自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() //不是刚刚被唤醒则执行此函数--汇编让其执行空语句
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 { //如果锁已经解开并且没有处于饥饿状态
new |= mutexLocked //将状态更新---加锁
}
if old&(mutexLocked|mutexStarving) != 0 { //锁是否仍在
new += 1 << mutexWaiterShift //状态中的等待数量+1
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { //实现更新锁的状态
if old&(mutexLocked|mutexStarving) == 0 { //再次检验是否成功上锁
break // locked the mutex with CAS //自旋加锁成功
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
//获取sema锁--值为0则协程休眠
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//若协程休眠,则以下代码无法执行
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
3.1.4正常解锁运行逻辑
首先尝试CAS直接解锁(将锁的state置为0),若发现有协程在sema中休眠,则唤醒(runtime_Semrelease())一个协程。
直接解锁
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //状态由1变为0,解锁
if new != 0 { 若是0则state中所有的字段都是0,也没有等待的协程
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new) //有等待的或者其他状态
}
}
state字段不是0时--有其他状态
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
//判断是否有多余的休眠协程
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return //不是应为其他协程则直接退出--因为可以直接解锁
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1) //有等待的协程则释放一个
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
被释放的协程继续执行Lock的相关代码
//协程休眠...已经结束--被唤醒
//是否需要进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
//回到for的开头再次判断
}
3.1.5 锁饥饿问题
再高并发的地区,某一协程一直无法获取到锁,触发饥饿问题。约定为:当前协程等待锁的时间超过1ms,state的相关字段(starving)置为1,进入饥饿模式。
再饥饿模式中,不再自旋,新来的协程直接进入sema中休眠,并且被唤醒的协程直接获取锁,直到没有协程在队列中等待的时候才回到正常模式。
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
//饥饿则直接跳过此段
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
//如果被锁且处于饥饿模式则直接移入等待队列
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新状态
if old&(mutexLocked|mutexStarving) == 0 {
break
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//首先休眠被唤醒后直接判断是否要进入饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//是则将函数开始声明的变量starving置为true
old = m.state
//唤醒后处于饥饿状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta) //直接改变状态
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
3.1.6 建议
1.需要加锁的时候,尽量减少锁的时间,并且只在必要的部分加锁!
2.建议使用defer语句确保锁的释放!
3.2 读写锁 sunc.RWMutex
3.2.1 读写锁的实现
type RWMutex struct {
w Mutex // held if there are pending writers//写协程持有的互斥锁
writerSem uint32 // semaphore for writers to wait for completing readers//sema队列
readerSem uint32 // semaphore for readers to wait for completing writers//sema队列
readerCount int32 // number of pending readers//正:读取协程的数量, 负:无法读取,写锁
readerWait int32 // number of departing readers//写锁生效前需要几个读锁释放
}
3.2.2加写锁
逻辑
1.先加mutex互斥锁,若已经被加锁则阻塞等待;
2.将readerCount减去常量_rwmutexMaxReaders_(变为负值),阻塞读锁获取;
3.计算需要等待多少读协程释放;
4.如果需要等待读协程释放,则进入writerSem队列,直到readerWait=0。
源码
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()//首先加锁互斥锁--有加写锁的资格
// Announce to readers there is a pending writer.
//readerCount变为负值,但是r表示等待的数量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
//之前有协程则进入等待队列
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
//原本等待的数量为0,则直接执行
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
3.2.3解写锁
逻辑
1.将readerCount变为正值(加常量),允许读锁获取;
2.释放等待队列中的读协程。
源码
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
//变为正值
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
//如果r(等待协程)大于0则释放一个读协程
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock() //将互斥锁解锁
if race.Enabled {
race.Enable()
}
}
3.2.4 加读锁
逻辑
1.将readerCount+1;
2.如果readerCount是正数,则加锁成功;
3.如果readerCount是负数,则进入readerSem队列等待。
源码
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
3.2.5 解读锁
逻辑
1.readerCount-1;
2.此时readerCount>0时,readerWait-1,代表解锁成功。
3.若readerCount<0时,说明有写锁排队。如果自己时最后一个读协程则唤醒写协程
源码
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
//减为0时,唤醒写协程
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
3.3 等待组 sync.WaitGroup
type WaitGroup struct {
noCopy noCopy //指示编译器,禁止拷贝
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
state1 uint64
// compilers only guarantee that 64-bit fields are 32-bit aligned
state2 uint32
}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// state1 is 64-bit aligned: nothing to do.
return &wg.state1, &wg.state2
} else {
// state1 is 32-bit aligned but not 64-bit aligned: this means that
// (&state1)+4 is 64-bit aligned.
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
3.4 锁的初始化 sync.Once
目的:让相关函数的代码只运行一次。
思路:争抢mutex互斥锁,失败则进入sema休眠,成功的更改计数值,再次释放。
type Once struct {
done uint32 //执行后会变成1
m Mutex
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
//执行过直接退出了
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 { //二次判断是不是真的没有运行过
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
3.5 锁异常的问题
3.5.1 不要拷贝锁
永远不要拷贝锁,否则容易造成死锁情况!---拷贝后的锁状态很难判断
当然,带锁的结构体也不要拷贝。
使用 go vet main.go 可以检测锁拷贝。
3.5.2 RACE竞争检测
很多代码都有语句** if race.Enabled(){...}**
,目的是发现隐含的数据竞争问题。
终端输入 **go build -race main.go**
,**./main.exe**
。若出现warning 则表示出现数据竞争。
3.5.1 go-deadlock 检测
github上搜索go-deadlock第三方库,替代mutex包(用法一样)。
评论