面试官:从源码角度讲讲ReentrantLock及队列同步器(AQS)

共 34825字,需浏览 70分钟

 ·

2021-08-03 18:49

你知道的越多,不知道的就越多,业余的像一棵小草!

你来,我们一起精进!你不来,我和你的竞争对手一起精进!

编辑:业余草

blog.csdn.net/fuzhongmin05

推荐:https://www.xttblog.com/?p=5178

JDK 中独占锁(排他锁)的实现除了使用关键字 synchronized 外,还可以使用ReentrantLock。虽然在性能上 ReentrantLock 和 synchronized 没有什么大区别,但 ReentrantLock 相比 synchronized 而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。

ReentrantLock 常常对比着 synchronized 来分析,我们先对比着来看然后再一点一点分析。

  1. synchronized 是独占锁,加锁和解锁的过程自动进行,易于操作,但不够灵活。ReentrantLock 也是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。
  2. synchronized 可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁;ReentrantLock 也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
  3. synchronized 不可响应中断,一个线程获取不到锁就一直等着;ReentrantLock 可以相应中断。

ReentrantLock 好像比 synchronized 关键字没好太多,我们再去看看 synchronized 所没有的。一个最主要的就是 ReentrantLock 还可以实现公平锁机制。什么叫公平锁呢?也就是在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁

在讲解 ReentrantLock 之前,必须先要了解一个叫 AQS(队列同步器)的东西,这个东西也是并发包的,AQS 就是AbstractQueuedSynchronizer,它是 Java 并发包中的一个核心,ReentrantLock 以及ReentrantReadWriteLock都是基于 AQS 实现的。想要搞明白 ReentrantLock 是如何实现的,就必须先学习AbstractQueuedSynchronizer。本文结合 JDK1.8 的源码先剖析下队列同步器,然后再详解 ReentrantLock 中如何应用队列同步器实现独占锁的。

队列同步器(AQS)

队列同步器 AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个 volatile 修饰的 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要同步器提供的 3 个方法getState()setState(int newState)compareAndSetState(int expect, int update)来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放方法来供自定义同步组件使用,同步器即可以支持独占式获取同步状态,也可以支持共享式地获取同步状态,这样方便实现不同类型的同步组件(ReentrantLockReentrantReadWriteLockCountDownLatch等)。

AQS 是实现锁(也可以是任何同步组件)的关键:在锁中聚合同步器,利用同步器实现锁的语义。两者的关系:锁是面向使用者的,他定义了使用者与锁交互的接口(比如允许两个线程并行访问),隐藏了实现细节;同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步管理状态、线程的排队、等待与唤醒等底层操作。

队列同步器的接口及模板方法

同步器基于模板设计模式实现的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义的同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。

重写同步器指定方法时需要使用同步器提供的如下三个方法来访问或修改同步状态:

  • getState():获取当前同步状态
  • setState(int new State):设置当前同步状态
  • compareAndState(int expect,int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。

同步器可重写的方法:

实现自定义同步组件时,将会调用AbstractQueuedSynchronizer自身已经写好的模板方法,这些模板方法与描述:

同步器可重写的方法

注:模板方法基本分为三类:独占式同步状态获取与释放、共享式同步状态获取与释放和查询同步队列中等待线程情况。

AbstractQueuedSynchronizer要求子类必须覆写的方法如下,之所以要求子类重写这些方法,是为了让使用者可以在其中加入自己的判断逻辑。

同步器提供的模版方法

AQS 中提供了很多关于锁的实现方法

  • getState():获取锁的标志 state 值
  • setState():设置锁的标志 state 值
  • tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回 true,失败则返回 false
  • tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回 true,失败则返回 false

队列同步器数据结构

同步器依赖于内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构成一个节点(Node)并将其加入同步队列,同时阻塞当前线程,当同步状态释放时,会将首节点中的线程唤醒,使其再次尝试获取同步状态。

AbstractQueuedSynchronizer类中,Node 类是静态内部类,其源码如下:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */

    static final int PROPAGATE = -3;
    //等待状态
    volatile int waitStatus;
    //指向前一个结点的指针
    volatile Node prev;
    //指向后一个结点的指针
    volatile Node next;
    //当前结点代表的线程
    volatile Thread thread;
    //等待队列中的后继节点,如果当前节点是共享的,那么这个字段将是SHARED常量,即节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */

    final boolean isShared() {
        return nextWaiter == SHARED;
    }


    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

同时,在AbstractQueuedSynchronizer类中,又单独定义了队列头结点、尾结点、同步状态变量。

//指向队列头结点
private transient volatile Node head;
//指向队列尾结点
private transient volatile Node tail;
//同步状态变量
private volatile int state;
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

同步队列的结构如图:

同步队列的结构

为了接下来能够更好的理解加锁和解锁过程的源码,对该同步队列的特性进行简单的说明:

  1. 同步队列是个先进先出(FIFO)队列,获取锁失败的线程将构造结点并加入队列的尾部,加入队列的过程必须保证线程安全,为什么必须保证线程安全?因为要面对同时有多条线程没有获取到同步状态要加入同步队列尾部的情况;
  2. 队列首结点是获取同步状态成功的线程节点;
  3. 前驱结点线程释放锁后将尝试唤醒后继结点中处于阻塞状态的线程

同步器将节点加入同步队列的过程:

同步器将节点加入同步队列的过程

同步队列遵循 FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为新的首节点。

同步队列遵循 FIFO

注:设置首节点是通过由已经获取到了同步状态的线程来完成的,由于只有一个线程能够获取到同步状态,因此设置头节点的方法并不需要 CAS 来保障,它只需要让head 指针指向原首节点的后继节点并断开原首节点的 next 引用即可。

队列同步器提供的独占式同步状态获取方法

通过AbstractQueuedSynchronizer类提供的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列移除。acquire 方法:

public final void acquire(int arg) {
    //tryAcquie()方法具体要交给子类去实现,AbstractQueuedSynchronizer类中不实现
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码中完成了同步状态的获取、节点构造、加入同步队列以及同步队列中自旋等待的相关工作。

首先调用自定义同步器(AbstractQueuedSynchronizer的子类)实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到就阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

节点的构造以及加入同步队列依靠于 addWaiter 和 enq 方法:

private Node addWaiter(Node mode) {
 首先创建一个新节点,并将当前线程实例封装在内部,mode这里为null
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    ///入队的逻辑
    enq(node);
    return node;
}

/**
 * 队列不空时向尾部添加结点的逻辑在enq(node)方法中也有,之所以会有这部分“重复代码”是对某些特殊情况进行提前处理,牺牲一定的代码可读性换取性能提升。
 */

private Node enq(final Node node) {
    for (;;) {
     //t指向当前队列的最后一个节点,队列为空则为null
        Node t = tail;
        //队列为空
        if (t == null) { 
        //此时链表没有节点,需要初始化让head跟tail都指向一个哨兵节点
        //构造新结点,CAS方式设置为队列首元素,当head==null时更新成功
            if (compareAndSetHead(new Node())) 
                tail = head;//尾指针指向首结点
        } else {  //队列不为空
            node.prev = t;
            if (compareAndSetTail(t, node)) { //CAS将尾指针指向当前结点,当t(原来的尾指针)==tail(当前真实的尾指针)时执行成功
                t.next = node;    //原尾结点的next指针指向当前结点
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //死循环,正常情况下线程只有获得锁才能跳出循环
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
             //将当前结点设置为队列头结点
                setHead(node);
                p.next = null// help GC
                failed = false;
                //正常情况下死循环唯一的出口
                return interrupted;
            }
            //判断是否要阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

看完enq(final Node node)方法后,发现事实上 AQS 队列的头节点其实是个哨兵节点。

enq(final Node node)中,同步器通过死循环的方式来确保节点的添加,在死循环中只有通过 CAS 将当前节点设置为尾节点之后,当前线程才能从该方法返回,否则的话当前线程不断地尝试设置。enq(final Node node)方法将并发添加节点的请求通过 CAS 变得“串行化”了。循环加 CAS 操作是实现乐观锁的标准方式,CAS 是为了实现原子操作而出现的,所谓的原子操作指操作执行期间,不会受其他线程的干扰。Java 实现的 CAS 是调用 unsafe 类提供的方法,底层是调用 C++ 方法,直接操作内存,在 CPU 层面加锁,直接对内存进行操作。

来看shouldParkAfterFailedAcquire(Node pred, Node node),从方法名上我们可以大概猜出这是判断是否要阻塞当前线程的,源码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //状态为SIGNAL

        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */

        return true;
    if (ws > 0) { //状态为CANCELLED,
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */

        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { //状态为初始化状态(ReentrentLock语境下)
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

可以看到针对前驱结点 pred 的状态会进行不同的处理:

  1. pred 状态为 SIGNAL,则返回 true,表示要阻塞当前线程;
  2. pred 状态为 CANCELLED,则一直往队列头部回溯直到找到一个状态不为CANCELLED 的结点,将当前节点 node 挂在这个结点的后面;
  3. pred 的状态为初始化状态,此时通过 CAS 操作将 pred 的状态改为 SIGNAL

其实这个方法的含义很简单,就是确保当前结点的前驱结点的状态为 SIGNAL,SIGNAL 意味着线程释放锁后会唤醒后面阻塞的线程。毕竟,只有确保能够被唤醒,当前线程才能放心的阻塞。

要注意只有在前驱结点已经是 SIGNAL 状态后才会执行后面的方法立即阻塞,对应上面的第一种情况。其他两种情况则因为返回 false 而重新执行一遍

acquireQueued()方法的源码表明节点在进入队列后,就进入了一个自旋状态,每个节点(或者说每个线程),都在自省观察,当条件满足,获取到同步状态,就可以从这个自旋过程中退出,否则依旧留在自旋过程中,

这个过程如下图所示。

自旋状态

acquireQueued(final Node node, int arg)方法中,线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点时才能够尝试获取同步状态,原因如下:

  1. 头节点是成功获取到同步状态的节点,而头节点线程获取到同步状态后,将会唤醒其后继节点,后继节点的线程在被唤醒后需要检查自己的前驱节点是否是头节点;
  2. 维护同步队列的 FIFO 原则

可以看到节点与及节点之间在循环检查的过程中基本上不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放符合 FIFO,并且对于方便对过早通知进行处理。

独占式同步状态获取流程如下图,也就是acquire(int arg)方法的执行流程:

独占式同步状态获取流程

当同步状态获取成功,当前线程从acquire(int arg)方法返回,这也就代表着当前线程获得了锁。

队列同步器提供的独占式同步状态释放方法

释放同步状态,通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。解锁的源码相对简单,代码如下:

public final boolean release(int arg) {
    //tryRelease()方法具体要交给子类去实现,AbstractQueuedSynchronizer类中不实现
    if (tryRelease(arg)) {
        Node h = head;
        //当前队列不为空且头结点状态不为初始化状态(0)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //唤醒同步队列中被阻塞的线程
        return true;
    }
    return false;
}

若当前线程已经完全释放锁,即锁可被其他线程使用,则还应该唤醒后续等待线程。不过在此之前需要进行两个条件的判断:

  • h != null是为了防止队列为空,即没有任何线程处于等待队列中,那么也就不需要进行唤醒的操作;
  • h.waitStatus != 0是为了防止队列中虽有线程,但该线程还未阻塞,由前面的分析知,线程在阻塞自己前必须设置其前驱结点的状态为 SIGNAL,否则它不会阻塞自己;

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccerssor(Node node)方法使用 LcokSupport 来唤醒处于等待(阻塞)状态的线程。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从尾部向头部遍历
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

一般情况下只要唤醒后继结点的线程就行了,但是后继结点可能已经取消等待,所以从队列尾部往前回溯,找到离头结点最近的正常结点,并唤醒其线程。

独占式同步状态释放流程如下图,也就是release(int arg)方法的执行流程:

独占式同步状态释放流程

独占式同步状态获取和释放:

  • 在获取同步状态时,同步器会维持一个同步队列,获取失败的线程都会被加入到同步队列中,并在同步队列中自旋(判断自己前驱节点为头节点)。
  • 移出队列(停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式获取与独占式获取的区别:同一时刻能否有多个线程同时获取到同步状态。

以文件的读写为例:

  1. 如果有一个程序在读文件,那么这一时刻的写操作均被阻塞,而读操作能够同时进行。
  2. 如果有一个程序在写文件,那么这一时刻不管是其他的写操作还是读操作,均被阻塞。
  3. 写操作要求对资源的独占式访问,而读操作可以是共享式访问。

调用同步器的acquireShared()模板方法,可以实现共享式获取同步状态。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    //当前节点加入同步队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //取当前节点的前驱节点
            final Node p = node.predecessor();
            //前驱节点是头结点,就继续尝试获取同步状态
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null// help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //用LockSupport的park方法把当前线程阻塞住
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 当前线程首先调用tryAcquireShared()这个被子类重写的方法,共享式的获取同步状态。如果返回值大于等于 0,表示获取成功并返回。
  • 如果返回值小于0表示获取失败,调用 doAcquireShared() 方法,让线程进入自旋状态。
  • 自旋过程中,如果当前节点的前驱节点是头结点,且调用tryAcquireShared()方法返回值大于等于 0,则退出自旋。否则,继续进行自旋。
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

首先去尝试释放资源tryReleaseShared(arg),如果释放成功了,就代表有资源空闲出来,那么就用 doReleaseShared() 去唤醒后续结点。

ReentrantLock 对队列同步器的应用

syncronized 关键字隐式的支持重进入,比如 syncronized 修饰一个递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获取该锁。

ReentrantLock 虽然没能像 synchronized 关键字一样支持隐式的重进入,但是在调用 lock() 方法时,已经获取了锁的线程,能够再次调用 lock() 方法而不被阻塞。要实现可重入的特性,就要解决以下两个问题:

  • 线程再次获取锁,锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取
  • 锁的最终释放,线程重复 n 次获取了锁,随后在第 n 次释放该锁后,其他线程能够获取到锁。锁的最终释放要求锁对于获取进行自增,对于释放进行自减,当计数等于0时表示锁已经成功释放。

ReentrantLock 通过组合自定义队列同步器来实现锁的可重入式获取与释放。

ReentrantLock 的类图如下,可以看出 ReentrantLock 实现 Lock 接口,Sync 与 ReentrantLock 是组合关系,且 FairSync(公平锁)、NonfairySync(非公平锁)是 Sync 的子类。

FairSync、NonfairySync、Sync

公平锁是 FairSync,非公平锁是 NonfairSync。而不管是 FairSync 还是 NonfariSync,都间接继承自 AbstractQueuedSynchronizer 这个抽象类,如下图所示

NonfariSync 的类图

ReentrantLock 非公平模式获取及释放锁

ReentrantLock 非公平模式下的获取锁的代码如下:

//实现Lock接口的lock方法,调用本方法当前线程获取锁,拿锁成功后就返回
public void lock() {
 //非公平模式下,sync指向的对象类型是NonfairSync
    sync.lock();
}
//实现Lock接口的tryLock方法,尝试非阻塞的获取锁,调用本方法后立刻返回,如果能获取到锁则返回true,否则返回false
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

静态内部类 NonfairSync 的源码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */

    final void lock() {
     //首先CAS尝试下获取锁,先假设每次lock都是非重入
        if (compareAndSetState(01))
            setExclusiveOwnerThread(Thread.currentThread());
        else
         //这里调用的是父类AbstractQueuedSynchronizer的acquire()方法,
         //而acquire()方法中又要调用交由子类去实现的tryAcquiretryAcquire()方法
         //所以会调到下面NonfairSync类的tryAcquire()方法
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

nonfairTryAcquire()方法的源码如下:

//本方法写在Sync类中,而不是FairSync类中
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
     //如果当前锁闲置,就CAS尝试下获取锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0// overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire()方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为已经获取了锁的线程来决定获取操作是否成功,如果是则将同步状态值增加并返回 true,表示获取同步状态成功。

ReentrantLock 类的 unlock 方法:

public void unlock() {
 //这里调用的是父类AbstractQueuedSynchronizer的release()方法,
 //而release()方法中又要调用交由子类去实现的tryRelease()方法
 //所以会调到Sync类的tryRelease()方法
    sync.release(1);
}

由于公平锁与非公平锁的差异主要体现在获取锁上,因此 tryAcquire() 方法由NonfairSync 类与 FairSync 类分别去实现,而无论是公平锁还是非公平锁,锁的释放过程都是一样的,因此 tryRelease() 方法由 Sync 类来实现。源码如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果该锁被获取了 N 次,那么前 (N-1) 次tryRelease(int releases)方法必须返回 false,而只有同步状态完全释放了,才能返回 true。

ReentrantLock 公平模式获取及释放锁

公平锁模式下,对锁的获取有严格的条件限制。在同步队列有线程等待的情况下,所有线程在获取锁前必须先加入同步队列,队列中的线程按加入队列的先后次序获得锁。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        //这里调用的是父类AbstractQueuedSynchronizer的acquire()方法
        //父类的acquire()方法中会调用tryAcquire()方法,该方法由子类去实现
        //即最终会调用到FairSync实现的tryAcquire()方法
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
         //在真正CAS获取锁之前加了hasQueuedPredecessors()方法
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            //溢出了
            if (nextc < 0
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

hasQueuedPredecessors()方法在AbstractQueuedSynchronizer类中(模板方法模式的典型应用,所有公共的方法全部都由AbstractQueuedSynchronizer写好,只有个性化逻辑才下沉给子类去实现),源码如下:

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

从方法名我们就可知道这是判断队列中是否有优先级更高的等待线程,队列中哪个线程优先级最高?由于头结点是当前获取锁的线程,队列中的第二个结点代表的线程优先级最高。那么只要判断队列中第二个结点是否存在以及这个结点是否代表当前线程就行了。这里分了两种情况进行探讨:

  • 第二个结点已经完全插入,但是这个结点是否就是当前线程所在结点还未知,所以通过s.thread != Thread.currentThread()进行判断,如果为 true,说明第二个结点代表其他线程。

  • 第二个结点并未完全插入,我们知道结点入队一共分三步:

    1. 待插入结点的 pre 指针指向原尾结点;
    2. CAS 更新尾指针;
    3. 原尾结点的 next 指针指向新插入结点。所以(s = h.next) == null就是用来判断 2 刚执行成功但还未执行 3 这种情况的。这种情况第二个结点必然属于其他线程。

当前有优先级更高的线程在队列中等待时,那么当前线程将不会执行 CAS 操作去获取锁,保证了线程获取锁的顺序与加入同步队列的顺序一致,很好的保证了公平性,但也增加了获取锁的成本。

基于 FIFO 的同步队列是怎样实现非公平抢占锁的

由 FIFO 队列的特性知,先加入同步队列等待的线程会比后加入的线程更靠近队列的头部,那么它将比后者更早的被唤醒,它也就能更早的得到锁。从这个意义上,对于在同步队列中等待的线程而言,它们获得锁的顺序和加入同步队列的顺序一致,这显然是一种公平模式。然而,线程并非只有在加入队列后才有机会获得锁,哪怕同步队列中已有线程在等待,非公平锁的不公平之处就在于此。回看下非公平锁的加锁流程,线程在进入同步队列等待之前有两次抢占锁的机会:

  • 第一次是非重入式的获取锁,只有在当前锁未被任何线程占有(包括自身)时才能成功;
  • 第二次是在进入同步队列前,包含所有情况的获取锁的方式

只有这两次获取锁都失败后,线程才会构造结点并加入同步队列等待。而线程释放锁时是先释放锁(修改 state 值),然后才唤醒后继结点的线程的。试想下这种情况,线程 A 已经释放锁,但还没来得及唤醒后继线程 C,而这时另一个线程 B 刚好尝试获取锁,此时锁恰好不被任何线程持有,它将成功获取锁而不用加入队列等待。线程 C 此后才被唤醒尝试获取锁,而此时锁已经被线程B抢占,故而其获取失败并继续在队列中等待。

如果以线程第一次尝试获取锁到最后成功获取锁的次序来看,非公平锁确实很不公平。因为在队列中等待很久的线程相比还未进入队列等待的线程并没有优先权,甚至竞争也处于劣势:在队列中的等待的线程要等待前驱结点线程的唤醒,在获取锁之前还要检查自己的前驱结点是否为头结点。在锁竞争激烈的情况下,在队列中等待的线程可能迟迟竞争不到锁。这也就非公平在高并发情况下会出现的饥饿问题。

为什么非公平锁性能好

非公平锁对锁的竞争是抢占式的(对于已经处于等待队列中线程除外),线程在进入等待队列之前可以进行两次尝试,这大大增加了获取锁的机会。这种好处体现在两个方面:

  • 线程不必加入等待队列就可以获得锁,不仅免去了构造结点并加入队列的繁琐操作,同时也节省了线程阻塞唤醒的开销,线程阻塞和唤醒涉及到线程上下文的切换和操作系统的系统调用,是非常耗时的。在高并发情况下,如果线程持有锁的时间非常短,短到线程入队阻塞的过程超过线程持有并释放锁的时间开销,那么这种抢占式特性对并发性能的提升会更加明显;
  • 减少 CAS 竞争。如果线程必须要加入阻塞队列才能获取锁,那入队时 CAS 竞争将变得异常激烈,CAS 操作虽然不会导致失败线程挂起,但不断失败自旋导致的对 CPU 的浪费也不能忽视。

AQS 在其他同步工具上的应用

在 ReentrantLock 的自定义同步器实现中,同步状态表示锁被一个线程重复获取的次数。除了 ReentrantLock,AQS 也被大量应用在其他同步工具上。

ReentrantReadWriteLockReentrantReadWriteLock类使用 AQS 同步状态中的低 16 位来保存写锁持有的次数,高16位用来保存读锁的持有次数。由于 WriteLock 也是一种独占锁,因此其构建方式同 ReentrantLock。ReadLock 则通过使用 acquireShared 方法来支持同时允许多个读线程。

AQS 同步状态

Semaphore:Semaphore类(信号量)使用 AQS 同步状态来保存信号量的当前计数。它里面定义的 acquireShared 方法会减少计数,或当计数为非正值时阻塞线程;tryRelease 方法会增加计数,在计数为正值时还要解除线程的阻塞。

CountDownLatch:CountDownLatch 类使用 AQS 同步状态来表示计数。当该计数为 0 时,所有的 acquire 操作(对应到CountDownLatch中就是 await 方法)才能通过。

FutureTask:FutureTask 类使用 AQS 同步状态来表示某个异步计算任务的运行状态(初始化、运行中、被取消和完成)。设置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一个 FutureTask 时会调用 AQS 的 release 操作,等待计算结果的线程的阻塞解除是通过 AQS 的 acquire 操作实现的。

SynchronousQueues:SynchronousQueues 类使用了内部的等待节点,这些节点可以用于协调生产者和消费者。同时,它使用 AQS 同步状态来控制当某个消费者消费当前一项时,允许一个生产者继续生产,反之亦然。

浏览 33
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报