手摸手Go 简单聊聊sync.RWMutex
共 5972字,需浏览 12分钟
·
2021-03-09 19:59
那一天我二十一岁,在我一生的黄金时代,我有好多奢侈。我想爱,想吃,还想在一瞬间变成天上半明半暗的云,后来我才知道,生活就是个缓慢受锤的过程,人一天天老下去,奢望也一天天消逝,最后变得像挨了锤的牛一样。可是我过二十一岁生日时没有预见到这一点。我觉得自己会永远生猛下去,什么也锤不了我。---王小波
各位早上好~今天来聊聊Go提供的读写互斥锁sync.RWMutex,
它可以加任意数量的读锁或者一个写锁。读写锁占用规则:
- 读锁占用的情况下,会组织写锁的获取,但是不会阻止其他
goroutine
获取读锁 - 写锁占用的情况下,则不允许任何(读锁/写锁)请求,将整个锁独占
其零值表示未上锁状态。
基本使用
使用sync.RWMutex
可以很容易实现一个协程安全的字典结构。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
type Key interface{}
type Value interface{}
type Dictionary struct {
m sync.RWMutex
data map[Key]Value
}
func (d *Dictionary) Add(k Key, v Value) {
d.m.Lock()
defer d.m.Unlock()
if d.data == nil {
d.data = make(map[Key]Value)
}
d.data[k] = v
}
func (d *Dictionary) Get(k Key) Value {
d.m.RLock()
defer d.m.RUnlock()
return d.data[k]
}
func main() {
d := &Dictionary{}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(2)
weight := rand.Intn(100)
go func(w int) {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
d.Add("leo", fmt.Sprintf("leo超帅的 +%d", w))
wg.Done()
}(weight)
go func() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
fmt.Println(d.Get("leo"))
wg.Done()
}()
}
wg.Wait()
}
sync.RWMutex源码分析
数据结构
type RWMutex struct {
w Mutex // 写操作需要先尝试持有
writerSem uint32 // 等待读操作完成的写等待的信号量
readerSem uint32 // 等待写操作完成的读等待的信号量
readerCount int32 // 阻塞的读操作数量
readerWait int32 // 写操作 来之前 读操作数量
}
// 最大读操作数量
const rwmutexMaxReaders = 1 << 30
上面几个属性,第一次看到readerWait
有点儿懵 这个跟readerCount
有啥关系呢?上图吧 其实也不复杂 具体可以配合下面代码分析一起可能会更好理解。
假设一个场景,不同操作时不同属性的值变化如下表:
操作 | writerSem | readerSem | readerCount | readerWait | rw.w |
---|---|---|---|---|---|
4次Rlock()且均未释放 | 未阻塞写操作 | 未阻塞读操作 | 4 | 0 | 0 |
假设执行一次Unlock() | 未阻塞写操作 | 未阻塞读操作 | 4-1=3 | 0 | 0 |
尝试执行Lock() | 阻塞1个写操作 | 未阻塞读操作 | 3-(1<<30) | 3 | 0 |
Lock()等待readerWait个读操作执行完毕 | |||||
执行2次RUnlock()同时执行2次Rlock() | 阻塞1个写操作 | 阻塞2个读操作 | 3-(1<<30)-2+2 | 3-2 | 0 |
第一次Lock()未获得锁 再次执行Lock() 将被阻塞在rw.w上 | 阻塞1个写操作 | 阻塞2个读操作 | 3-(1<<30)-2+2 | 3-2 | 1 |
第4次RUnlock执行完毕时 会唤醒阻塞的第一个Lock | 未阻塞写 | 阻塞2个读操作 | 3-(1<<30)-2+2-1+(1<<30)=2 | 0 | 1 |
为什么他们的值会是这样?我们接着看源码,然后回过头再对照表格 自然就明了了。
操作方法
RLock
用于读操作抢占锁,它不应该被递归调用。
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
执行RLock
,若rw.readerCount
加1小于0则说明存在写操作持有锁,则将当前的读操作阻塞到rw.readerSem
上。
RUnlock
RUnlock
一次只能解除一个Rlock
操作,并不会影响其他的读操作。如果没有执行RLock
,执行RUnlock
会panic throw("sync: RUnlock of unlocked RWMutex")
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
RUnlock
首先判断r=rw.readerCount
-1
若
r>=0
表示释放读锁成功若
r<0
表示存在写操作持有锁,进入slow-path
func (rw *RWMutex) rUnlockSlow(r int32) {
//不存在RLock操作 不能执行RUnlock
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {//读操作执行完毕 唤醒等待的写操作
//最后一个读操作解锁 唤醒写操作
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
因为初始状态下sync.RWMutex
是未上锁状态,rw.readerCount
初始为0,或者在无读操作加锁的情况下,写操作加锁rw.readerCount
会被置为const rwmutexMaxReaders = 1 << 30
,因此
r+1 == 0 || r+1 == -rwmutexMaxReaders
表明当前无读操作持有锁,而直接执行RUnlock
会panic。
尝试进行rw.readerWait-1
操作,然后判断若rw.readerWait==0
则表明写操作抢占锁之前的读操作都已经处理完毕,此时可以唤醒被阻塞在rw.writerSem
上的写操作了。
Lock
针对写操作时尝试获取锁,如果当前锁被读操作或写操作持有,则阻塞等待直到锁可用。
func (rw *RWMutex) Lock() {
// 首先解决于其他写操作的竞争问题
rw.w.Lock()
// 告诉读操作,这里存在阻塞的写操作
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果仍存存在读操作持有锁 则阻塞等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
大致步骤:
Lock
首先调用了rw.w.Lock()
来解决多个写操作并发请求的竞争问题:如果存在多个写操作,只有一个写操作会获取到rw.w
锁接着尝试剩余的操作,其他的写操作会被阻塞在rw.w
上。- 调用
atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
,这里结合RLock()
中的atomic.AddInt32(&rw.readerCount, 1) < 0
则将读操作阻塞在rw.readerSem
上,以此来让读操作感知到当前是否存在阻塞的写操作。
Unlock
跟sync.Mutex
一样,一个锁定的sync.RWMutex
跟特定的goroutine
没有任何关联。一个goroutine
可能RLock
(Lock
)一个sync.RWMutex
,然后另一个goroutine
可以RUnlock
(Unlock
)掉这个锁状态。
// 这里也是不允许针对一个未执行Lock的rw执行Unlock操作的
func (rw *RWMutex) Unlock() {
// 通知读操作,这里没有激活的写操作了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
基本逻辑:
- 通过
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
恢复rw.readerCount
,即通知读操作这里没有激活的写操作,意味着这个时候读操作可以有机会竞争锁了,即使仍存在阻塞在rw.w
上的写操作,这里应该是防止读操作会因为写操作过多被饿死。 - 判断是否是没
Lock
的情况下执行了Unlock
- 依次唤醒阻塞在
rw.readerSem
上的读操作 rw.w.Unlock
意味着阻塞在rw.w
上的其他写操作可以接着抢占锁了。
关于递归读锁定问题
细心的童鞋可能会发现sync.RWMutex
是禁止递归读锁定的,官方是这么说的
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
大概意思是说,如果我们持有一个sync.RWMutex
的读锁时,可能会有另一个写操作尝试获取锁,因为前面的读锁未释放则这个写操作只能阻塞等待。不幸的是,这个读操作干完活并不释放读锁,而是继续递归调用读操作获取锁,但是这个时候获取读锁的时候发现前面有阻塞的写锁请求,则这个读操作请求只能阻塞等待前面的写操作完事儿。最早的读操作又等待当前的读操作完事儿去释放锁,完美的一个贪吃蛇构成的一个死锁的场景就出现啦。
举个栗子吧:
我们在斐波那契数列递归函数里,递归获取读锁,然后中途我们来个写锁请求,看看啥结果
package main
import (
"sync"
"time"
)
var m sync.RWMutex
func fibonacci(num int) int {
if num < 2 {
return 1
}
m.RLock()
defer m.RUnlock()
time.Sleep(time.Millisecond * 100)
return fibonacci(num-1) + fibonacci(num-2)
}
func main() {
done := make(chan int)
go func() {
m.Lock()
time.Sleep(time.Millisecond * 200)
m.Unlock()
done <- 1
}()
fibonacci(5)
<-done
}
输出结果是那么熟悉的味道。
fatal error: all goroutines are asleep - deadlock!
总结
sync.RWMutex
提供了比sync.Mutex
更加细粒度的锁控制,将读锁和写锁做了分离,本来逻辑会比较复杂,但是它是基于sync.Mutex
所以整体逻辑就变得比较简单,可能readerCount
和readerWait
咋一看有点儿懵,不过仔细看看不难,通过readerCount
的值来达到读写锁通信的目的 设计还是很巧妙的,受益匪浅。
如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。如果觉得还可以 帮忙点个在看😁