AQS的基本数据结构

Posted by RussXia on June 6, 2018

AQS的基本数据结构

AQS的背景

AbstractQueuedSynchronizer(简称AQS)是整个JUC的基础,大部分的同步器都是基于AQS的。 Doug Lea在设计之初,再设计之初,希望建立一个小框架,AQS类,来为构造这些同步器提供一种通用的机制。

我们平常所熟悉的,Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock都是基于AQS构建的。

AQS的功能介绍

Synchronizers possess two kinds of methods : at least one
acquire operation that blocks the calling thread unless/until the
synchronization state allows it to proceed, and at least one
release operation that changes synchronization state in a way that
may allow one or more blocked threads to unblock.

同步器一般包含两种方法,acquire、release(名称可能不同)。acquire负责阻塞调用线程,直到同步状态允许其继续执行。 release负责改变同步状态,使一个(独占)或多个(共享)被阻塞的线程继续执行。

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

AQS提供了两种锁机制:独占锁和共享锁。

AQS的数据结构

AQS的基本数据结构

      +------+  prev +-----+       +-----+
 head | Node | <---- | Node| <---- | Node|  tail
      +------+       +-----+       +-----+

AQS的tail指向由Node组成的双向链表的尾,head指向链表的头部。state表示同步状态,如果为0,表示当前锁可用,如果不为0,表示不可用。(CountDownLatch中设置的count数就是state的初始值。)

/**
  * Head of the wait queue, lazily initialized.  Except for
  * initialization, it is modified only via method setHead.  Note:
  * If head exists, its waitStatus is guaranteed not to be
  * CANCELLED.
  */
private transient volatile Node head;

/**
  * Tail of the wait queue, lazily initialized.  Modified only via
  * method enq to add new wait node.
  */
private transient volatile Node tail;

/**
  * The synchronization state.
  */
private volatile int state;

AQS提供了独占锁模式下的获取锁和释放锁,共享模式下的获取锁和释放锁。

   /**
     * 独占锁模式获取锁
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 独占锁模式释放锁
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 共享锁模式获取锁
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * 共享锁模式释放锁
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

Node数据结构

Node是组成双向链表的基本单元,每一个结点都对应着一个线程。其中的waitStatus字段,标识当前结点的状态。下面是Node的源码。

static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     -1,成功拿到锁,但是当前节点的后继节点必须被唤醒(unpark)
     *   CANCELLED:  1,节点因为超时或中断等原因被取消了。一旦处于该状态,其状态就
     *               不能被改变,特别是不能被阻塞。
     *   CONDITION:  -2,当前结点在条件队列中。在结点状态设置为0之前,当前结点
     *               都不会作为同步队列中的结点被使用。
     *   PROPAGATE:  -3,共享模式下,当前结点的状态需被无条件的传播下去
     *   0:          以上状态都不是
     *
     * 普通的同步模式,初始化状态为0。需被原子更新(CAS)。
     */
    volatile int waitStatus;

    /** 前继结点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 在队列上的当前结点对应的线程 */
    volatile Thread thread;

    /**
     * condition队列中下一个等待结点(独占模式)
     * 或者为static final Node SHARED(共享模式)
     */
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS中acquire和release实现的一般形式

acquire操作的主循环次数依赖于具体实现类中tryAcquire的实现方式。另一方面,在没有“取消”操作的情况下,每一个组件的acquire和release都是一个O(1)的操作,忽略park中发生的所有操作系统线程调度。

支持“取消”操作主要是要在acquire循环里的park返回时检查中断或超时。由超时或中断而被取消等待的线程会设置其节点状态,然后unpark其后继节点。在有“取消”的情况下,判断其前驱节点和后继节点以及重置状态可能需要O(n)的遍历(n是队列的长度)。由于“取消”操作,该线程再也不会被阻塞,节点的链接和状态字段可以被快速重建。

acquire操作

if(!tryAcquire(arg)) {
    node = create and enqueue new node;
    pred = node's effective predecessor;
    while (pred is not head node || !tryAcquire(arg)) {
        if (pred's signal bit is set)
            pard()
        else
            compareAndSet pred's signal bit to true;
        pred = node's effective predecessor;
    }
    head = node;
}

release操作

if(tryRelease(arg) && head node's signal bit is set) {
    compareAndSet head's bit to false;
    unpark head's successor, if one exist
}