标签搜索

golang 锁

BeforeIeave
2022-07-27 / 0 评论 / 236 阅读 / 正在检测是否收录...

1 atomic 原子操作

是一种硬件层面(锁了内存地址)的加锁机制,保证操作一个表变量的时候,其他的协程或者线程无法访问,但是只能用于简单变量的简单操作

2 sema锁

2.1 源码阅读

也被称作信号量/信号锁,其核心是一个uint32的值,指代同时可并发的数量。而每一个sema锁都对应一个SemaRoot结构体。其拥有一个平衡二叉树用于排队

// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and test/locklinear.go
// for a test that exercises this.
type semaRoot struct {
    lock  mutex 
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    success bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

大致可以简化为:

sema.png

2.2 逻辑源码解读

sema锁获取到的uint32的值>0时:每次获取锁,次值减一,释放锁,此值加一
uint32的值==0时:此时sema锁退化成一个休眠队列。而获取锁时,协程会调用gopark()休眠,并且进入堆数组等待释放锁时,从堆数组中取出一个协程,并将其唤醒

3 常用锁

3.1 互斥锁 sync.Mutex

3.1.1 结构体源码解析

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema  uint32
}

Mutex.png

3.1.2 正常加锁运行逻辑

首先直接尝试CAS(compareAndSwap)直接加锁若失败则多次自旋尝试再失败则进入sema队列休眠

首先某一协程执行代码并加锁

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {    //state的状态置为1
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()            //开始自旋
}

加锁失败则自旋尝试

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        //判断 锁是否还在,是否处于饥饿状态,是否可以继续自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()        //不是刚刚被唤醒则执行此函数--汇编让其执行空语句
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {        //如果锁已经解开并且没有处于饥饿状态
            new |= mutexLocked            //将状态更新---加锁
        }
        if old&(mutexLocked|mutexStarving) != 0 {    //锁是否仍在
            new += 1 << mutexWaiterShift            //状态中的等待数量+1
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {        //实现更新锁的状态
            if old&(mutexLocked|mutexStarving) == 0 {            //再次检验是否成功上锁
                break // locked the mutex with CAS                //自旋加锁成功
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            //获取sema锁--值为0则协程休眠
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)        
            //若协程休眠,则以下代码无法执行
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

3.1.4正常解锁运行逻辑

首先尝试CAS直接解锁(将锁的state置为0),若发现有协程在sema中休眠,则唤醒(runtime_Semrelease())一个协程

直接解锁

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)        //状态由1变为0,解锁
    if new != 0 {        若是0则state中所有的字段都是0,也没有等待的协程
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)    //有等待的或者其他状态
    }
}

state字段不是0时--有其他状态

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            //判断是否有多余的休眠协程
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return    //不是应为其他协程则直接退出--因为可以直接解锁
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)        //有等待的协程则释放一个
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        runtime_Semrelease(&m.sema, true, 1)
    }
}

被释放的协程继续执行Lock的相关代码

//协程休眠...已经结束--被唤醒
//是否需要进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
//回到for的开头再次判断
}

3.1.5 锁饥饿问题

再高并发的地区,某一协程一直无法获取到锁,触发饥饿问题。约定为:当前协程等待锁的时间超过1msstate的相关字段(starving)置为1,进入饥饿模式
饥饿模式中不再自旋新来的协程直接进入sema中休眠,并且被唤醒的协程直接获取锁直到没有协程在队列中等待的时候才回到正常模式

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        //饥饿则直接跳过此段
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        //如果被锁且处于饥饿模式则直接移入等待队列
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {        //更新状态
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            //首先休眠被唤醒后直接判断是否要进入饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            //是则将函数开始声明的变量starving置为true
            old = m.state
            //唤醒后处于饥饿状态
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)        //直接改变状态
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

3.1.6 建议

1.需要加锁的时候,尽量减少锁的时间,并且只在必要的部分加锁
2.建议使用defer语句确保锁的释放

3.2 读写锁 sunc.RWMutex

满足多个协程只读(并发读取,单独修改)。

3.2.1 读写锁的实现

type RWMutex struct {
    w           Mutex  // held if there are pending writers//写协程持有的互斥锁
    writerSem   uint32 // semaphore for writers to wait for completing readers//sema队列
    readerSem   uint32 // semaphore for readers to wait for completing writers//sema队列
    readerCount int32  // number of pending readers//正:读取协程的数量, 负:无法读取,写锁
    readerWait  int32  // number of departing readers//写锁生效前需要几个读锁释放
}

3.2.2加写锁

逻辑

1.先加mutex互斥锁,若已经被加锁则阻塞等待;
2.将readerCount减去常量_rwmutexMaxReaders_(变为负值),阻塞读锁获取;
3.计算需要等待多少读协程释放;
4.如果需要等待读协程释放,则进入writerSem队列,直到readerWait=0。

源码

func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // First, resolve competition with other writers.
    rw.w.Lock()//首先加锁互斥锁--有加写锁的资格
    // Announce to readers there is a pending writer.
    //readerCount变为负值,但是r表示等待的数量
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    //之前有协程则进入等待队列
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    //原本等待的数量为0,则直接执行
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

3.2.3解写锁

逻辑

1.将readerCount变为正值(加常量),允许读锁获取;
2.释放等待队列中的读协程。

源码

func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }
    // Announce to readers there is no active writer.
    //变为正值
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    //如果r(等待协程)大于0则释放一个读协程
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()            //将互斥锁解锁
    if race.Enabled {
        race.Enable()
    }
}

3.2.4 加读锁

逻辑

1.将readerCount+1;
2.如果readerCount是正数,则加锁成功;
3.如果readerCount是负数,则进入readerSem队列等待。

源码

func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

3.2.5 解读锁

逻辑

1.readerCount-1;
2.此时readerCount>0时,readerWait-1,代表解锁成功。
3.若readerCount<0时,说明有写锁排队。如果自己时最后一个读协程则唤醒写协程

源码

func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
    if race.Enabled {
        race.Enable()
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
    // A writer is pending.
    //减为0时,唤醒写协程
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

3.3 等待组 sync.WaitGroup

type WaitGroup struct {
    noCopy noCopy            //指示编译器,禁止拷贝
    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    state1 uint64
    // compilers only guarantee that 64-bit fields are 32-bit aligned
    state2 uint32
}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // state1 is 64-bit aligned: nothing to do.
        return &wg.state1, &wg.state2
    } else {
        // state1 is 32-bit aligned but not 64-bit aligned: this means that
        // (&state1)+4 is 64-bit aligned.
        state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
        return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
    }
}

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if race.Enabled && delta > 0 && v == int32(delta) {

        race.Read(unsafe.Pointer(semap))
    }
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
        return
    }

    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
            runtime_Semacquire(semap)
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
    }
}

3.4 锁的初始化 sync.Once

目的:让相关函数的代码只运行一次
思路:争抢mutex互斥锁,失败则进入sema休眠,成功的更改计数值,再次释放。

type Once struct {
    done uint32        //执行后会变成1
    m    Mutex
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        // Outlined slow-path to allow inlining of the fast-path.
        o.doSlow(f)
    }
    //执行过直接退出了
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {            //二次判断是不是真的没有运行过
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

3.5 锁异常的问题

3.5.1 不要拷贝锁

永远不要拷贝锁,否则容易造成死锁情况!---拷贝后的锁状态很难判断
当然,带锁的结构体也不要拷贝。
使用 go vet main.go 可以检测锁拷贝。

3.5.2 RACE竞争检测

很多代码都有语句** if race.Enabled(){...}**,目的是发现隐含的数据竞争问题
终端输入 **go build -race main.go****./main.exe**。若出现warning 则表示出现数据竞争。

3.5.1 go-deadlock 检测

github上搜索go-deadlock第三方库,替代mutex包(用法一样)。

1

评论

博主关闭了所有页面的评论