Golang Mutex互斥锁深入理解
作者:酒红 发布时间:2024-04-25 13:18:08
引言
Golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。
本文介绍了常用的同步原语 sync.Mutex
,同时从源码剖析它的结构与实现原理,最后简单介绍了mutex在日常使用中可能遇到的问题,希望大家读有所获。
Mutex结构
Mutex运行时数据结构位于sync/mutex.go
包
type Mutex struct {
state int32
sema uint32
}
其中state
表示当前互斥锁的状态,sema
表示 控制锁状态的信号量.
互斥锁的状态定义在常量中:
const (
mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0
mutexWoken // 2 ;从正常模式被从唤醒; 2^1
mutexStarving // 4 ;处于饥饿状态; 2^2
mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift
starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间
)
0即其他状态。
sema
是一个组合,低三位分别表示锁的三种状态,高29位表示正在等待互斥锁释放的gorountine个数,和Java表示线程池状态那部分有点类似
一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙
饥饿模式和正常模式
正常模式
在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。
饥饿模式
为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性
在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
状态的切换
在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式
如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
加锁和解锁
加锁
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 原注释: Slow path (outlined so that the fast path can be inlined)
// 将
m.lockSlow()
}
可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。
若当前状态不为0,则进入lockSlow
方法
先定义了几个参数
var waitStartTime int64
starving := false //
awoke := false
iter := 0
old := m.state
随后进入一个很大的for循环,让我们来逐步分析
自旋
for {
// 1 && 2
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 3.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
old&(mutexLocked|mutexStarving) == mutexLocked
当且仅当当前锁状态为mutexLocked时,表达式为true
runtime_canSpin(iter)
是否满足自旋条件
运行在拥有多个CPU的机器上;
当前Goroutine为了获取该锁进入自旋的次数小于四次;
当前机器上至少存在一个正在运行的处理器 P,并且处理的运行队列为空;
如果当前状态下自旋是合理的,将awoke
置为true,同时设置锁状态为mutexWoken
,进入自旋逻辑
runtime_doSpin()
会执行30次PAUSE
指令,并且仅占用CPU资源 代码位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
计算锁的新状态
停止了自旋后,
new := old
// 1.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 3 && 4.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 5.
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
old&mutexStarving == 0
表明原来不是饥饿模式。如果是饥饿模式的话,其他goroutine不会执行接下来的代码,直接进入等待队列队尾如果原来是
mutexLocked
或者mutexStarving
模式,waiterCounts数加一如果被标记为饥饿状态,且锁状态为
mutexLocked
的话,设置锁的新状态为饥饿状态。被标记为饥饿状态的前提是
被唤醒过且抢锁失败
计算新状态
更新锁状态
// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 3.
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 4.
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 5.
if old&mutexStarving != 0 {
/
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
尝试将锁状态设置为new 。这里设置成功不代表上锁成功,有可能new不为
mutexLocked
或者是waiterCount数量的改变waitStartTime
不为0 说明当前goroutine已经等待过了,将当前goroutine放到等待队列的队头走到这里,会调用
runtime_SemacquireMutex
方法使当前协程阻塞,runtime_SemacquireMutex
方法中会不断尝试获得锁,并会陷入休眠 等待信号量释放。当前协程可以获得信号量,从
runtime_SemacquireMutex
方法中返回。此时协程会去更新starving
标志位:如果当前starving
标志位为true或者等待时间超过starvationThresholdNs
,将starving
置为true
之后会按照饥饿模式与正常模式,走不同的逻辑
- 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
- 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
解锁
func (m *Mutex) Unlock() {
// 1.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 2.
m.unlockSlow(new)
}
}
将锁状态的值增加 -mutexLocked 。如果新状态不等于0,进入
unlockSlow
方法
func (m *Mutex) unlockSlow(new int32) {
// 1.
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 2.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 2.1.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 2.2.
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 3.
runtime_Semrelease(&m.sema, true, 1)
}
}
1.new+mutexLocked
代表将锁置为1,如果两个状态& 不为0,则说明重复解锁.如果重复解锁则抛出panic
2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回
将waiterCount数量-1,尝试选择一个goroutine唤醒
尝试更新锁状态,如果更新锁状态成功,则唤醒队尾的一个gorountine
3. 如果不满足 2
的判断条件,则进入饥饿模式,同时交出锁的使用权
可能遇到的问题
锁拷贝
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
此时mu2
能够正常解锁,那么我们再试试解锁mu1
呢
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()
可以看到发生了error
panic导致没有unlock
当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("recover")
}
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
safeSlice.lock.Unlock()
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}
修改:
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums个协程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
}
safeSlice.lock.Unlock()
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}
来源:https://juejin.cn/post/7095334586019741704
猜你喜欢
- 有多少次你在考虑怎样设置数据库时感到为难?其实,如果你在Linux上使用MySQL,就不会有这种情况了。在Linux上使用Webmin图形界
- 1. 引言深拷贝和浅拷贝是Python中重要的概念,本文重点介绍在NumPy中深拷贝和浅拷贝相关操作的定义和背后的原理。闲话少说,我们直接开
- 在Python中,任何类型的对象都可以做真值测试,并且保证返回True或者False。以下几种值(不论类型)在真值测试中返回False:1.
- 想必很多初次接触python都会见到这样一个语句,if __name__ == "__main__":那么这个语句到底是
- 我们将学习如何通过一种称为修复的方法去除旧照片中的小噪音,笔画等。基本思路很简单:用相邻像素替换那些坏标记,使其看起来像邻域。cv2.inp
- 1.如何用函数先定义后调用,定义阶段只检测语法,不执行代码调用阶段,开始执行代码函数都有返回值定义时无参,调用时也是无参定义时有参,调用时也
- 本文实例为大家分享了Android九宫格图片展示的具体代码,供大家参考,具体内容如下我是从 官网 上面下载的社区版MySQL(版本
- 笔者在学习pandas,在学习过程中总结了一下创建dataframe的方法,通过查阅资料总结遗下几种方法,如果你有其他的方法欢迎留言补充。练
- 在vue项目中, mock数据可以使用 node 的 express模块搭建服务1. 在根目录下创建 test 目录, 用来存放模拟的 js
- 误区 #13.在SQL Server 2000兼容模式下不能使用DMV错误 对
- 如何使用Pytorch实现two-head(多输出)模型1. two-head模型定义先放一张我要实现的模型结构图:如上图,就是一个two-
- FCKeditor至今已经到了2.3.1版本了,对于国内的WEB开发者来说,也基本上都已经“闻风知多少”了,很多人将其融放到自己的项目中,更
- 巧用get和set,能够直接操作对象属性实现读写,可以极大的提高编程效率,给出一个典型示例: var test = { _Name : nu
- UCHOME的代码还是很不错的,学习一下! <?php /** * 定义一些常量 */ @define('IN_UCHOME&
- 关于阻塞主线程join的错误用法Thread.join() 作用为阻塞主线程,即在子线程未返回的时候,主线程等待其返回然后再继续执行.joi
- 视图是一种常用的数据库对象,它将查询的结果以虚拟表的形式存储在数据中。因为视图有非常多的优点:1,可以简化操作,2,可以建立前台和后台的缓冲
- 安装jieba库教程jieba库是一款优秀的 Python 第三方中文分词库,jieba 支持三种分词模式:精确模式、全模式和搜索引擎模式,
- 本文实例讲述了Javascript与PHP验证用户输入URL地址是否正确的方法,分享给大家供大家参考。具体方法如下:1.javascript
- 索引并不是时时都会生效的,比如以下几种情况,将导致索引失效:如果条件中有or,即使其中有条件带索引也不会使用(这也是为什么尽量少用or的原因
- 方案5 使用xml参数 对sql server xml类型参数不熟悉的童鞋需要先了解下XQuery概念,这里简单提下XQuery 是用来从