目录引言Mutex结构饥饿模式和正常模式正常模式饥饿模式状态的切换加锁和解锁加锁自旋计算锁的新状态更新锁状态解锁可能遇到的问题锁拷贝panic导致没有unlock引言 golang的
golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的Go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。
本文介绍了常用的同步原语 sync.Mutex
,同时从源码剖析它的结构与实现原理,最后简单介绍了mutex在日常使用中可能遇到的问题,希望大家读有所获。
Mutex运行时数据结构位于sync/mutex.go
包
type Mutex struct {
state int32
sema uint32
}
其中state
表示当前互斥锁的状态,sema
表示 控制锁状态的信号量.
互斥锁的状态定义在常量中:
const (
mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0
mutexWoken // 2 ;从正常模式被从唤醒; 2^1
mutexStarving // 4 ;处于饥饿状态; 2^2
mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift
starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间
)
0即其他状态。
sema
是一个组合,低三位分别表示锁的三种状态,高29位表示正在等待互斥锁释放的gorountine个数,和Java表示线程池状态那部分有点类似
一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙
在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。
为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性
在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式
如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapint32(&m.state, 0, mutexLocked) {
return
}
// 原注释: Slow path (outlined so that the fast path can be inlined)
// 将
m.lockSlow()
}
可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。
若当前状态不为0,则进入lockSlow
方法
先定义了几个参数
var waitStartTime int64
starving := false //
awoke := false
iter := 0
old := m.state
随后进入一个很大的for循环,让我们来逐步分析
for {
// 1 && 2
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 3.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
old&(mutexLocked|mutexStarving) == mutexLocked
当且仅当当前锁状态为mutexLocked时,表达式为true
runtime_canSpin(iter)
是否满足自旋条件
如果当前状态下自旋是合理的,将awoke
置为true,同时设置锁状态为mutexWoken
,进入自旋逻辑
runtime_doSpin()
会执行30次PAUSE
指令,并且仅占用CPU资源 代码位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
停止了自旋后,
new := old
// 1.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 3 && 4.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 5.
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
old&mutexStarving == 0
表明原来不是饥饿模式。如果是饥饿模式的话,其他goroutine不会执行接下来的代码,直接进入等待队列队尾mutexLocked
或者 mutexStarving
模式,waiterCounts数加一mutexLocked
的话,设置锁的新状态为饥饿状态。被唤醒过且抢锁失败
// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 3.
runtime_SeMacquireMutex(&m.sema, queueLifo, 1)
// 4.
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 5.
if old&mutexStarving != 0 {
/
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
mutexLocked
或者是waiterCount数量的改变waitStartTime
不为0 说明当前goroutine已经等待过了,将当前goroutine放到等待队列的队头runtime_SemacquireMutex
方法使当前协程阻塞,runtime_SemacquireMutex
方法中会不断尝试获得锁,并会陷入休眠 等待信号量释放。runtime_SemacquireMutex
方法中返回。此时协程会去更新starving
标志位:如果当前starving
标志位为true或者等待时间超过starvationThresholdNs
,将starving
置为true之后会按照饥饿模式与正常模式,走不同的逻辑
func (m *Mutex) Unlock() {
// 1.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 2.
m.unlockSlow(new)
}
}
unlockSlow
方法func (m *Mutex) unlockSlow(new int32) {
// 1.
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 2.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 2.1.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 2.2.
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 3.
runtime_Semrelease(&m.sema, true, 1)
}
}
1.new+mutexLocked
代表将锁置为1,如果两个状态& 不为0,则说明重复解锁.如果重复解锁则抛出panic
2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回
3. 如果不满足 2
的判断条件,则进入饥饿模式,同时交出锁的使用权
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
此时mu2
能够正常解锁,那么我们再试试解锁mu1
呢
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()
可以看到发生了error
当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("recover")
}
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
safeSlice.lock.Unlock()
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}
修改:
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
}
safeSlice.lock.Unlock()
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}
以上就是Golang Mutex互斥锁深入理解的详细内容,更多关于Golang Mutex互斥锁的资料请关注编程网其它相关文章!
--结束END--
本文标题: GolangMutex互斥锁深入理解
本文链接: https://lsjlt.com/news/121001.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-05
2024-04-05
2024-04-05
2024-04-04
2024-04-05
2024-04-05
2024-04-05
2024-04-05
2024-04-04
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0