golang并发底层实现竟然都是它!!!
《手摸手系列》把go sync包中的并发组件已经写完了,本文作为完结篇,最后再来探讨下go运行时锁的实现。记得在《手摸手Go 并发编程的基建Semaphore》那篇中我们聊过sync.Mutex
最终是依赖sema.go
中实现的sleep
和wakeup
原语来实现的。如果细心的小伙伴会发现:
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
semaRoot
里也定义了一个mutex
。正所谓“医者不能自医”,故而Go针对运行时的并发实现了一个简版的互斥锁mutex
。具体怎么 玩的呢?先来回顾一下:
基本使用
我们以sema.go
中的semacquire1
方法为例,mutex
的使用如下:
// Easy case.
if cansemacquire(addr) {
return
}
... ...
lockWithRank(&root.lock, lockRankRoot)
... ...
if cansemacquire(addr) {
....
unlock(&root.lock)
break
}
... ...
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
// goparkunlock里的钩子里藏着unlock方法调用
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock))
return true
}
上述逻辑正如之前所说:
先通过 CAS
快速判断是否获取信号量,成功直接返回;若 CAS
失败,则尝试获取排他锁,因为sync.Mutex
是基于sema.go
实现,故而这里使用了运行时的mutex
来进入临界区获得运行时锁则再次尝试 CAS
成功则解除运行时排他锁返回;否则调用gopark
挂起当前协程等待唤醒
那么Go的运行时排他锁是如何实现的呢?先来看看其数据结构
数据结构
mutex
结构体比较简单,只有一个用于运行时静态锁排名的lockRankStruct
和一个uintpter
的key(根据不同平台实现方案用处不同)。在无竞争的情况下,它跟自旋锁一样快,类似的也是在用户空间利用atomic.Cas
来尝试抢占锁,失败才会掉进系统调用,进行内核中休眠。
// 互斥锁。
// 零值mutex是未上锁状态(不需要初始化锁)
// 初始化有利于静态锁定级 但是不是必须的
type mutex struct {
// 如果禁用了锁定级则为空结构体否则包括锁定等级
lockRankStruct
// 基于futex的实现将其看作uint32的key,
// 而基于sema的实现将其看作M* waitm
// 过去曾经是一个union,但是unions破坏了精确的GC
key uintptr
}
lockRankStruct提供了一种运行时的静态锁排名的机制。静态锁排名会建立文件化的锁之间的总排序顺序,如果违反总的排序,则会报错。只要锁排序是按照文档设计的顺序,锁排序死锁就不会发生。如果要做Go运行时使用这个机制,你需要设置
GOEXPERIMENT=staticlockranking
。默认未开启,此时
lockRanStruct
是一个空结构体。lockWithRank()
等效于lock()
mutex
跟常规的锁一样提供了Lock
和Unlock
方法,不过底层根据平台不同实现也不同,总结起来大致分了三类:
dragonfly freebsd linux
平台下,根据具体平台实现下面两个方法
// 从原子上讲,如果 *addr == val { sleep },可能发生虚假唤醒,但是这是被允许的
// 休眠时间不超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果有任何进程在addr上休眠,则最多唤醒cnt
futexwakeup(addr *uint32, cnt uint32)
aix darwin nacl netbsd openbsd plan9 solaris windows
平台下,根据具体平台实现下面三个方法
// 如果尚未创建mp信号量,则为其创建一个
func semacreate(mp *m)
// 如果ns<0,则获取m的信号量并返回0
// 如果ns>=0,则在ns纳秒内尝试获取m的信号量
// 如果获取了信号量返回0 如果中断或超时返回-1
func semasleep(ns int64) int32
//唤醒已经或即将在其信号量上休眠的mp
func semawakeup(mp *m)
js,wasm
平台下的实现,因为js/wasm还不支持线程,所以不存在抢占。
因为绝大多数生产环境都是Linux平台下,所以今天主要来看看Linux的具体实现,其主要利用系统内核提供的futex
来实现,好了 上代码:
Lock
// +build !goexperiment.staticlockranking
即staticlockranking
未开启的情况下,// +build dragonfly freebsd linux
的实现如下
//lock_futex.go
func lock(l *mutex) {
lockWithRank(l, getLockRank(l))
}
//go:nosplit
func lockWithRank(l *mutex, rank lockRank) {
lock2(l)
}
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
//g绑定的m的lock属性自增加一
gp.m.locks++
// 投机抢占锁 运气好抢到直接返回 不需要进行内核调用
v := atomic.Xchg(key32(&l.key), mutex_locked)
if v == mutex_unlocked {
return
}
// wait为MUTEX_LOCKED还是MUTEX_SLEEPING取决于此互斥对象mutex是否存在线程在休眠
// 如果我们曾经将l->key从MUTEX_SLEEPING更改为其他值,则在返回之前必须小心将其更改回MUTEX_SLEEPING,
// 以确保休眠线程得到其唤醒调用
wait := v
// 单处理器上 无自旋
// 多处理器上,自旋进行ACTIVE_SPIN尝试
spin := 0
if ncpu > 1 {
spin = active_spin
}
for {
// 尝试锁定 自旋
for i := 0; i < spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
procyield(active_spin_cnt)
}
// 尝试锁定 重新调度
for i := 0; i < passive_spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
osyield()
}
// 休眠
v = atomic.Xchg(key32(&l.key), mutex_sleeping)
if v == mutex_unlocked {
return
}
wait = mutex_sleeping
futexsleep(key32(&l.key), mutex_sleeping, -1)
}
}
procyield
只是执行30次PAUSE
active_spin_cnt = 30
procyield(active_spin_cnt)
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX // 将30放入AX寄存器
again:
PAUSE
SUBL $1, AX // 每次减一
JNZ again
RET
linux系统下osyield()
为系统调用的封装
osyield()
//sys_linux_amd64.s
TEXT runtime·osyield(SB),NOSPLIT,$0
MOVL $SYS_sched_yield, AX
SYSCALL
RET
可以看到Lock
的逻辑也并不复杂:
首先给当前g绑定的m的lock属性自增加一,禁止P被抢占 atomic.Xchg
尝试修改mutex.key
为mutex_locked
,获取原始值为mutex_unlocked
表明获取锁,直接返回;否则失败继续执行进入for死循环 首先如果时多核机器,尝试进行自旋+ atomic.Cas
获取锁,尝试自旋4次每次失败执行30次PAUSE
, 成功则返回调用 osyield()
尝试一次重新调度尝试休眠,如果此时锁资源被释放 则获取锁直接返回 调用 futexsleep()
进入休眠,休眠失败在从步骤1继续尝试 直到获取锁 或 休眠成功为止
可以看出Lock
还是尽量先通过自旋、CAS、osyield(),实在没办法才进入futexsleep()
,为什么呢?下面说
接下来我们来一起看看关键的futexsleep()
如何实现的:
//src/runtime/os_linux.go
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
if ns < 0 {
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
return
}
var ts timespec
ts.setNsec(ns)
futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
//go:noescape
func futex(addr unsafe.Pointer, op int32, val uint32, ts, addr2 unsafe.Pointer, val3 uint32) int32
可以看到,futex_sleep()
最终调用了没有函数体的futex()
方法,其汇编实现如下:
// src/runtime/sys_linux_amd64.s
// int64 futex(int32 *uaddr, int32 op, int32 val,
// struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
MOVQ addr+0(FP), DI
MOVL op+8(FP), SI
MOVL val+12(FP), DX
MOVQ ts+16(FP), R10
MOVQ addr2+24(FP), R8
MOVL val3+32(FP), R9
MOVL $SYS_futex, AX
SYSCALL
MOVL AX, ret+40(FP)
RET
Linux环境下,系统调用是原生支持的(darwin上需要通过cgo来完成),所以runtime.futex
直接通过汇编来实现了,通过将$SYS_futex
,也就是202号系统调用编号送入了AX
,然后通过SYSCALL
来完成对futex
的系统调用。
linux
下futex
是如何实现的呢?
从Linux内核看futex
啥是futex?
futex(Fast Userspace muTEX
快速用户空间互斥体)是低级同步原语,为高级API提支持。它的目标是在无竞争的情况下不进入内核或不分配内核资源。它第一次出现在Linux内核开发的2.5.7版本,其语义在2.5.40版本固定下来,然后在2.6.x系列稳定版内核中出现。
futex出现之前,linux的同步机制主要分为两类:
用户态的同步机制 利用原子指令实现自旋锁,但不适用于临界区执行时间过长的场景 内核同步机制 如semaphore等,使用自旋+等待的形式,lock和unlock都是系统调用,即使没有冲突也要通过系统调用进入内核之后才能识别。
理想状态下,在无冲突的情况下在用户空间通过自旋锁解决,如果仍未拿到锁资源需要等待时再通过系统调用完成sleep和wake的语义。那么自旋失败的情况下,将当前线程挂起,然后是不是由持有锁的线程释放锁的时候再将其唤醒呢?带着这个疑问我们一起粗略看看linux内核如何实现futex的吧
数据结构
Linux内核实现主要在kernel/futex.c
和include/linux/futex.h
,翻了下3.10版本内核源码,futex
大体是所以它的结构跟Java
中的HashMap有些类似。futex_hash_bucket
为hash桶,futex_q
为存储的元素实体结构。如下图
/**
* struct futex_q - futex等待队列实体,每个等待任务分配一个
* @list: 等待在futex上带优先级链表
* @task: 等待在futex上的任务
* @lock_ptr: 哈希桶自旋锁指针
* @key: the key the futex is hashed on
* @pi_state: optional priority inheritance state
* @rt_waiter: rt_waiter storage for use with requeue_pi
* @requeue_pi_key: the requeue_pi target futex key
* @bitset: bitset for the optional bitmasked wakeup
*
* We use this hashed waitqueue, instead of a normal wait_queue_t, so
* we can wake only the relevant ones (hashed queues may be shared).
*
* A futex_q has a woken state, just like tasks have TASK_RUNNING.
* It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
* The order of wakeup is always to make the first condition true, then
* the second.
*
* PI futexes are typically woken before they are removed from the hash list via
* the rt_mutex code. See unqueue_me_pi().
*/
struct futex_q {
struct plist_node list;// 等待在futex上按照优先级排序的任务列表
struct task_struct *task; //等待在futex上的任务
spinlock_t *lock_ptr; //哈希桶自旋锁指针
union futex_key key; //futex进行hash的key
struct futex_pi_state *pi_state; //可选的 优先级继承状态 控制优先级继承
struct rt_mutex_waiter *rt_waiter;//
union futex_key *requeue_pi_key;
u32 bitset; //掩码匹配
};
/*
* Hash buckets are shared by all the futex_keys that hash to the same
* location. Each key may have multiple futex_q structures, one for each task
* waiting on a futex.
*/
struct futex_hash_bucket {
spinlock_t lock;
struct plist_head chain;
};
static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
外部调用入口
从mutex
源码分析,我们得出Go最终会通过系统调用内核提供的futex
。futex
原型为:
#include <linux/futex.h>
#include <stdint.h>
#include <sys/time.h>
long futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
futex
方法有6个入参:*uaddr
表示futex字的指针;futex
表示针对futex的具体操作类型;val
的含义依赖于flags
参数;*timeout
表示超时操作的超时时间;*uaddr2
跟操作有关,必填时指向操作使用的第二个futex字;val3
取决于具体操作。futex_op
取值有很多,这里我们关心的主要是FUTEX_WAIT
和FUTEX_WAKE
。
从kernel/futex.c
中可以发现系统调用最终会调用do_futex
,而do_futex
最终通过op
位运算生成cmd
来确定具体的调用的futex的形式。
//kernel/futex.c
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,
struct timespec __user *, utime, u32 __user *, uaddr2,
u32, val3)
{
struct timespec ts;
ktime_t t, *tp = NULL;
u32 val2 = 0;
int cmd = op & FUTEX_CMD_MASK;
if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||
cmd == FUTEX_WAIT_BITSET ||
cmd == FUTEX_WAIT_REQUEUE_PI)) {
if (copy_from_user(&ts, utime, sizeof(ts)) != 0)
return -EFAULT;
if (!timespec_valid(&ts))
return -EINVAL;
t = timespec_to_ktime(ts);
if (cmd == FUTEX_WAIT)
t = ktime_add_safe(ktime_get(), t);
tp = &t;
}
/*
* requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.
* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.
*/
if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||
cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)
val2 = (u32) (unsigned long) utime;
return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
u32 __user *uaddr2, u32 val2, u32 val3)
{
int cmd = op & FUTEX_CMD_MASK;
unsigned int flags = 0;
if (!(op & FUTEX_PRIVATE_FLAG))
flags |= FLAGS_SHARED;
if (op & FUTEX_CLOCK_REALTIME) {
flags |= FLAGS_CLOCKRT;
if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_LOCK_PI:
case FUTEX_UNLOCK_PI:
case FUTEX_TRYLOCK_PI:
case FUTEX_WAIT_REQUEUE_PI:
case FUTEX_CMP_REQUEUE_PI:
if (!futex_cmpxchg_enabled)
return -ENOSYS;
}
switch (cmd) {
case FUTEX_WAIT:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAIT_BITSET:
// 若*uaddr==val则阻塞当前进程/线程 放入阻塞队列
return futex_wait(uaddr, flags, val, timeout, val3);
case FUTEX_WAKE:
val3 = FUTEX_BITSET_MATCH_ANY;
case FUTEX_WAKE_BITSET:
// 最多唤醒val个在uaddr等待的线程/进程
return futex_wake(uaddr, flags, val, val3);
case FUTEX_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);
case FUTEX_CMP_REQUEUE:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);
case FUTEX_WAKE_OP:
return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);
case FUTEX_LOCK_PI:
return futex_lock_pi(uaddr, flags, val, timeout, 0);
case FUTEX_UNLOCK_PI:
return futex_unlock_pi(uaddr, flags);
case FUTEX_TRYLOCK_PI:
return futex_lock_pi(uaddr, flags, 0, timeout, 1);
case FUTEX_WAIT_REQUEUE_PI:
val3 = FUTEX_BITSET_MATCH_ANY;
return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,
uaddr2);
case FUTEX_CMP_REQUEUE_PI:
return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
}
return -ENOSYS;
}
故而,根据参数分析,go运行时调用的futexsleep()
和futexwakeup()
方法
// 原子上,如果 *addr == val { sleep } ;虚假唤醒是被允许的;不会阻塞超过ns,ns<0表示永远休眠
futexsleep(addr *uint32, val uint32, ns int64)
//如果任何线程阻塞在addr上,则唤醒至少cnt个阻塞的任务
futexwakeup(addr *uint32, cnt uint32)
实则对应了futex()
中futex_op
为FUTEX_WAIT
和FUTEX_WAKE
的调用,接下来,我们主要来分析下这两种操作的大概流程。
futex_wait
futex()
方法中的futex_op
为FUTEX_WAIT
时会调用``futex_wait()方法。主要逻辑为判断
*uaddr指向的值和
val值若相等,则将当前线程/进程阻塞在
uaddr所指向的
futex`的值上,放入等待队列。
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to = NULL;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
...
// 设置定时任务,超时abs_time后线程仍未被唤醒,则由定时任务唤醒它
if (abs_time) {
to = &timeout;
hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?
CLOCK_REALTIME : CLOCK_MONOTONIC,
HRTIMER_MODE_ABS);
hrtimer_init_sleeper(to, current);
hrtimer_set_expires_range_ns(&to->timer, *abs_time,
current->timer_slack_ns);
}
retry:
// 准备等待在uaddr上 如果成功则持有hash_bucket的锁
// 比较uaddr和期望值val是否相等 并初始化q.key
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)//val被改变直接返回
goto out;
// 插入futex等待队列,等待超时或通过信号来唤醒
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
/* unqueue_me() drops q.key ref */
// 若unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移除等待队列,所以这里会失败)
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t_thread_info()->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = abs_time->tv64;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {
// 取消定时任务
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
futex_wait()
首先设置定时任务,若超时abs_time后线程仍未被唤醒,则由定时任务唤醒它;然后进入goto retry循环:
调用
futex_wait_setup(uaddr, val, flags, &q, &hb)
根据uaddr
hash找到futex_hash_bucket
,并初始化futex_q.key
。调用
futex_wait_queue_me(hb, &q, to)
插入futex等待队列,等待超时或被信号唤醒
/**
* futex_wait_setup() - 准备等待在一个futex变量上
* @uaddr: futex用户空间地址
* @val: 期望的值
* @flags: futex标识 (FLAGS_SHARED, etc.)
* @q: 关联的futex_q
* @hb: hash_bucket的指针 返回给调用者
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* 0 - uaddr contains val and hb has been locked;
* <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
/*
* Access the page AFTER the hash-bucket is locked.
* Order is important:
*
* Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
* Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
*
* The basic logical guarantee of a futex is that it blocks ONLY
* if cond(var) is known to be true at the time of blocking, for
* any cond. If we locked the hash-bucket after testing *uaddr, that
* would open a race condition where we could block indefinitely with
* cond(var) false, which would violate the guarantee.
*
* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.
*/
retry:
//初始化futex_q 填充q->key
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//1.对q->key进行hash 然后通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]找到futex_hash_bucket
//2. spin_lock(&hb->lock)获得自旋锁
*hb = queue_lock(q);
//原子的将uaddr的值拷贝到uval中
ret = get_futex_value_locked(&uval, uaddr);
if (ret) { //拷贝操作失败 重试
...
goto retry;
}
// 如果uval的值(即uaddr)不等于期望值val 则表明其他线程在修改
// 直接返回无需等待
if (uval != val) {
queue_unlock(q, *hb);
ret = -EWOULDBLOCK;
}
out:
if (ret)
put_futex_key(&q->key);
return ret;
}
futex_wait_setup()
方法主要是为阻塞在uaddr上做准备,主要步骤:
初始化 futex_q
,并初始化futex_q.key
的引用,对 futex_q.key
进行hash通过&futex_queues[hash & ((1 << FUTEX_HASHBITS)-1)]
找到futex_hash_bucket
调用spin_lock(&hb->lock)尝试获得自旋锁 失败则进行重试回到步骤1 判断 *uaddr
的值跟val
是否相等,不相等说明其他线程在修改则释放持有的hb.lock自旋锁,返回-EWOULDBLOCK
(\#define EWOULDBLOCK 246 /* Operation would block (Linux returns EAGAIN) */
即linux中的EAGAIN)
/**
* futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal
* @hb: the futex hash bucket, must be locked by the caller
* @q: the futex_q to queue up on
* @timeout: the prepared hrtimer_sleeper, or null for no timeout
*/
static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
struct hrtimer_sleeper *timeout)
{
// task状态保证在另一个task唤醒它之前被设置,set_current_state利用set_mb()实现
// 设置task状态为TASK_INTERRUPTIBLE CPU只会调度状态为TASK_RUNNING的任务
set_current_state(TASK_INTERRUPTIBLE);
//将q插入到等待队列中然后释放自旋锁
queue_me(q, hb);
//启动定时器
if (timeout) {
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
if (!hrtimer_active(&timeout->timer))
timeout->task = NULL;
}
/*
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
if (likely(!plist_node_empty(&q->list))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
* is no timeout, or if it has yet to expire.
*/
// 未设置过期时间 或过期时间还未到期才进行调度
if (!timeout || timeout->task)
//系统重新进行调度,此时CPU会去执行其他任务,当前任务会被阻塞
schedule();
}
// 走到这里说明当前任务被CPU选中执行
__set_current_state(TASK_RUNNING);
}
futex_wait_queue_me
主要做了几件事:
将设置task状态为 TASK_INTERRUPTIBLE
(CPU只会调度状态为TASK_RUNNING的任务)调用 queueme()
将futex_q
插入到等待队列启动定时任务 若未设置过期时间或过期时间未到期 重新调度进程 获的执行资格 设置task状态为 TASK_RUNNING
总结一下futex_wait
的工作机制:futex_wait_setup()
方法会根据futex_q.key
hash找到对应futex_hash_bucket
,并通过spin_lock(futex_hash_bucket.lock)
获取自旋锁;futex_wait_queue_me()
方法先是将task设置为TASK_INTERRUPTIBLE
然后调用queue_me()
将futex_q
入队,然后才spin_unlock(&hb->lock);
释放持有的自旋锁。也就是说检查*uaddr
值和进程/线程挂起放在了一个临界区中,保证了条件和等待之间的原子性。
futex_wake
futex_wake()
唤醒阻塞在*uaddr
上的任务。
/*
* Wake up waiters matching bitset queued on this futex (uaddr).
*/
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key = FUTEX_KEY_INIT;
int ret;
if (!bitset)
return -EINVAL;
//根据uaddr的值填充&key的内容
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
if (unlikely(ret != 0))
goto out;
//根据&key获取对应uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
//对该hb加自旋锁
spin_lock(&hb->lock);
head = &hb->chain;
//遍历该hb的链表 注意链表中存储的节点时plist_node类型,而这里的this却是futex_q类型
//这种类型转换是通过c中的container_of机制实现的
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
//唤醒对应进程
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
//释放自旋锁
spin_unlock(&hb->lock);
put_futex_key(&key);
out:
return ret;
}
关于wake_futex
,调用wake_futex
必须持有futex_hash_bucket
的自旋锁,之后不能访问入参的futex_q
static void wake_futex(struct futex_q *q)
{
struct task_struct *p = q->task;
if (WARN(q->pi_state || q->rt_waiter, "refusing to wake PI futex\n"))
return;
// 在唤醒任务之前将q->lock_ptr设置为NULL
// 如果另一个CPU发生了非futex方式的唤醒,则任务可能退出。p解引用则是一个不存在的task结构体
// 为避免这种case,需要持有p引用来进行唤醒
get_task_struct(p);
//将q出队列
__unqueue_futex(q);
/*
* 只要写入q-> lock_ptr = NULL,等待的任务就可以释放futex_q,而无需获取任何锁。
* 这里需要一个内存屏障,以防止lock_ptr的后续存储超越plist_del。
*/
smp_wmb();
q->lock_ptr = NULL;
// 将TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE状态的task唤醒
wake_up_state(p, TASK_NORMAL);
// task释放futex_q
put_task_struct(p);
}
futex_wake()
流程如下:
根据 *uaddr
填充futex_q->key
,调用hash_futex(&key);
查找对应的futex_hash_bucket
调用 spin_lock(&hb->lock);
尝试获取hb的自旋锁遍历 futex_hash_bucket
挂载的链表,找到uaddr对应的节点调用 wake_futex
唤起等待的进程,其实就是将task设置为TASK_RUNNING
并放入调度队列中等待CPU调度同时释放futex_q
释放自旋锁
此外futex
同步机制即可用于线程之间同步,也可用于进程之间同步。
用于线程比较简单,因为线程共享虚拟内存空间,futex变量由唯一的虚拟地址表示,线程即可用虚拟地址访问futex变量 用于进程稍微复杂一些,因为进程间是分配的独立的虚拟内存地址空间。需要通过 mmap
让进程可以共享一段地址空间来使用futex变量。故而每个进程访问futex的虚拟地址不一样,但是操作系统知道所有这些虚拟地址映射到同一个表示futex变量的物理地址。
Unlock
与Lock
对应操作,帮助唤醒阻塞的协程。
func unlock(l *mutex) {
unlockWithRank(l)
}
//go:nosplit
func unlockWithRank(l *mutex) {
unlock2(l)
}
func unlock2(l *mutex) {
v := atomic.Xchg(key32(&l.key), mutex_unlocked)
if v == mutex_unlocked {
throw("unlock of unlocked lock")
}
if v == mutex_sleeping {
futexwakeup(key32(&l.key), 1)
}
gp := getg()
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
其逻辑相比Lock
要更简单一些:
通过 atomic.Xchg
更改mutex.key
值 若为mutex_unlocked
说明当前没有锁占用直接panic;若为 mutex_sleeping
则调用futexwakeup()
唤醒阻塞的任务将当前g绑定的m的locks属性减一 解除禁止抢占
关于futexwakeup()
最终也是调用了futex()
方法进而通过系统调用linux的futex实现。
//src/runtime/os_linux.go
// If any procs are sleeping on addr, wake up at most cnt.
//go:nosplit
func futexwakeup(addr *uint32, cnt uint32) {
ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
if ret >= 0 {
return
}
// I don't know that futex wakeup can return
// EAGAIN or EINTR, but if it does, it would be
// safe to loop and call futex again.
systemstack(func() {
print("futexwakeup addr=", addr, " returned ", ret, "\n")
})
*(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}
具体futexwakeup
的语义内核实现可以参考,futex
的原理分析。
总结
理想状态下,在无冲突的情况下,在用户空间通过自旋锁(yield)即可解决资源竞争同步问题,这也是我们所期盼的毕竟用户态的操作开销相对较小;但是一旦冲突加剧或持有锁的操作过长,自旋锁的方式会让CPU空转浪费宝贵的计算资源;进而需要陷入系统调用将当前线程/进程进行休眠,等锁资源释放再唤醒它们。关于futex
,也有这么个说法
刚刚不是用户态的自旋锁已经解决不了问题了吗,你可能会想我们可以这么玩:
func lock() {
for !tryLock(state) {
wait() //系统调用 进入休眠
}
}
func tryLock(state) bool {
i:=0
for !compareAndSwap(state, 0, 1) {
i++
if i>3 {
return false
}
}
return true
}
func unlock() {
compareAndSet(state,1,0)
wakeup() //系统调用唤醒
}
没错思路很好,但是tryLock
和wait
中间存在一个时间窗口问题。比如,我们在lock()
之后调用wait()
之前调用了unlock()
,这个时候当前线程调用wait()
后就没人管它了。怎么办呢?为了解决这个问题,随即引入了futex
,多数语言的玩法都大同小异。
如有异议 请轻拍 欢迎交流。
如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。点赞在看人灿烂😁