AQS中的独占模式

Posted by RussXia on June 8, 2018

三、AQS中的独占模式

本文以ReentrantLock的非公平锁为例。上半部分主要讲述了获取锁的过程,下半部分主要讲述释放锁的过程。

ReentrantLock中的加锁操作

ReentrantLock是一个可重入锁!compareAndSetState(0, 1)可以理解为获取锁的操作,如果返回true,则获取锁成功。

下面以ReentrantLock.NonfairSync为例,分析ReentrantLock非公平锁的加锁解锁过程。

final void lock() {
    //直接尝试获取锁,如果获取锁成功,设置锁的占用线程为当前线程。
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    //如果获取锁失败,调用acquire(args)方法,参数为1
    else
        acquire(1);
}
/**
  * acquire由AbstractQueuedSynchronizer提供默认实现
  * 1.重新尝试获取锁,若成功,标记锁被占用的线程为当前线程,返回
  * 2.失败,将自己添加到队列尾部。
  */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

其中final boolean compareAndSetState(int expect, int update)方法,调用的是Unsafe的compareAndSwapInt,保证原子性的。

acquire方法可以分为三步:

  1. 尝试获取锁,如果成功,直接返回–>tryAcquire(arg)
  2. 获取锁失败,将自己添加到队列尾部,addWaiter(Node.EXCLUSIVE)
  3. 如果当前结点的前置节点就是head节点,尝试获取锁,获取失败,执行下一步,成功return
  4. 挂起当前结点

非公平锁的tryAcquire实现

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
/**
  * 1.重新尝试获取锁,如果成功,设置state,标记锁被占用的线程
  * 2.如果锁被当前线程占用,增加重入次数(重入锁,state = currentState + acquires)
  * 3.return false
  * 4.tryAcquire返回false,继续执行acquireQueued方法。
  */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

acquireQueued方法的实现

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

  1. addWaiter(Node.EXCLUSIVE), arg):创建一个独占模式的入队节点,入队
  2. 如果当前结点的前置节点就是head节点,尝试获取锁。
  3. acquireQueued(final Node node, int arg):判断是否需要挂起,不需要则重新尝试获取锁,需要的话,挂起等待被唤醒。
 private Node addWaiter(Node mode) {
     //基于当前线程创建新结点
    Node node = new Node(Thread.currentThread(), mode);
    // 直接在队尾添加元素(快速入队)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //并发情况下,compareAndSetTail可能会失败,则快速入队失败
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //enqueue,入队操作
    enq(node);
    return node;
}
//死循环,支持多线程并发,实际入队操作使用了CAS,提升效率
//只有入队成功,才会退出当前死循环。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //如果队列没有初始化,初始化队列,重新入队。
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //新创建的结点指向队列的尾部。并发下如果CAS操作失败,不断重试
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
//Node是刚刚入队成功的结点
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //如果当前结点的的前置节点就是head,尝试获取锁
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                //获取锁成功,设置当前结点为头结点
                setHead(node);
                //前置节点的next设置为null(此时,前置节点p就没有引用,可以被gc掉了)
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败,判断是否需要挂起,如果需要挂起,则挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //中断标记位设置为true
                interrupted = true;
        }
    } finally {
        //分析获取锁是否失败,如果失败了,取消获取锁。
        if (failed)
            cancelAcquire(node);
    }
}

前面我们分析过Node中waitStatus的几个状态码的含义:

  • SIGNAL: -1,当前结点的后置结点被挂起。所以当前结点释放锁或取消时,当前节点的后继节点必须被唤醒(unpark)
  • CANCELLED: 1,节点因为超时或中断等原因被取消了。一旦处于该状态,其状态就不能被改变,特别是不能被挂起。
  • CONDITION: -2,当前结点在条件队列中。在结点状态设置为0之前,当前结点都不会作为同步队列中的结点被使用。
  • PROPAGATE: -3,共享模式下,当前结点的状态需被无条件的传播下去
  • 0: 以上状态都不是

结合上面的状态码,继续分析判断是否需要挂起和线程挂起部分的源码

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前置节点的waitStatus状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果前置结点被挂起,返回true,调用parkAndCheckInterrupt()挂起线程
        return true;
    if (ws > 0) {
        //前置节点被取消了,跳过被取消的结点,知道不是被取消的结点(CANCELLED=1>0,唯一一个大于0的状态)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //此时,剩余的状态只有0和PROPAGATE(-3)。
        //设置前置节点为Node.SIGNAL,重新进入acquireQueued的for循环
        //(其实就是重新进入shouldParkAfterFailedAcquire方法)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
//调用LockSupport.park(this)方法,挂起当前结点
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

当前结点的前置节点的waitStatus为SIGNAL时,当前结点(对应的线程)应当挂起。 因为其前置结点释放锁时,会通知其随后的结点。(参见SIGNAL状态的含义)。

acquire过程总结

ReentrantLock是一个可重入的互斥锁。互斥锁意味着只能由某一个线程占有锁;可重入意味着可被占有锁的线程重复获取锁。

在ReentrantLock的非公平锁中,在锁时可获取状态时(前一个占有锁的线程完全释放锁),所有的结点都会尝试获取锁。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如果获取锁失败,则进行入队操作。 如果当前结点的前置节点就是头结点,重新尝试获取锁。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //入队
    enq(node);
    return node;
}
//竞争失败,挂起当前线程,等待其被前置结点唤醒。
 if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
    interrupted = true;

ReentrantLock中的释放锁操作

释放锁的操作,相较于加锁,还是简单很多的。ReentrantLock中释放锁的主要分为以下几个步骤:

  1. 释放当前的锁(如果当前线程发生重入,一次只释放一个锁)
  2. 如果当前线程完全释放了锁,判断是否有挂起的后继节点,如果有唤醒后继节点
//unlock调用release方法
public void unlock() {
    sync.release(1);
}
//release方法做两部分的工作:1.释放锁,2.唤醒挂起的后继节点
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁的逻辑如下:

protected final boolean tryRelease(int releases) {
    //释放当前的锁,如果被重入了,这里也只释放这一次的锁
    int c = getState() - releases;
    //如果当前线程不是持有锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果释放了当前锁后,当前线程依然拥有锁(重入),设置state为c,返回false
    //如果当前线程不在拥有锁,是指锁的持有线程为null,返回true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果当前线程完全释放了锁,如果其后继节点挂起了,唤醒后继节点。

private void unparkSuccessor(Node node) {
    //如果当前结点状态小于0,设置当前结点为0,
    //如果失败了的话,等待线程在获取锁的时候,会将自身结点设置为head
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //如果后置结点被取消了或者为null,从队列的尾部向前遍历,直到找到最近的那个需要被唤醒的结点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //执行唤醒操作
    if (s != null)
        LockSupport.unpark(s.thread);
}

ReentrantLock中的公平锁

ReentrantLock的公平锁与非公平锁相比,只是在获取锁的处理上有不同。

//直接调用acquire方法
final void lock() {
    acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //hasQueuedPredecessors看名称可知,当前是否存在等待的前置节点
        //如果不存在,cas获取锁。成功后设置锁被当前线程持有,reture true
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程就是占有锁的线程,重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;
}

如果获取锁失败,当前线程会被加入到队列中。当持有锁的线程释放锁时,队列中前面的结点会获取锁(FIFO)。

  • 公平锁(Fair):加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得
  • 非公平锁(Nonfair):加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待

ReentrantLock默认的lock()方法采用的是非公平锁;非公平锁的效率大概是共公平锁的5-10倍。