首先看一下继承结构
通过上面的图,可以知道,其实除了公共功能,右边所有的类都是对AQS的一个实现,所以了解AQS是我们了解这些方法的基础
结合源码进行工作流程分析
首先为了不让我们在看源码的过程中晕头转向,首先得把握住AQS的核心点:
占锁->失败存储线程->阻塞存储线程->唤醒
首先进入到ReentrantLock中的lock方法
public void lock() { //这里是通过sync进行实现的 sync.lock(); }
继续进入lock中,会发现有公平锁和非公平锁的实现,这里选择非公平锁进行查看
这里有lock方法的主要实现
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //CAS内部 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
分析:这里的方法主要是通过CAS判断state是否为0,如果为0,那么就将state更新为1,然后设置这个线程为占锁的线程,如果不为0,那么即占锁失败,这时候就会执行**acquire(1);**这个方法。
补充:CAS里面主要是通过unsafe类进行一个CAS的操作,因为unsafe是一个可以直接操作内存数据的方法,绕过了JMM内存可见性。
进入到acquire(1);方法中
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
分析:这个方法主要做了三个事情
- 再次尝试获取锁:tryAcquire(arg)
- 加入到阻塞队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
- 自己中断:selfInterrupt()
- 下面我们来逐个分析这三个方法到底是怎么实现的
首先进入tryAcquire(arg)方法中
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //非公平锁对tryAcquire的实现方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
分析:通过tryAcquire(arg)方法可以看到,首先获取state的值,如果state的值为0,说明之前的线程已经释放锁,那么就通过CAS尝试占锁,成功则返回。这里进入第二个判断条件,如果state不等于0,但是当前线程和占锁的线程是同一个线程,那么就证明这是一个可重入的操作(ReentrantLock是一个可重入锁),那么就对state的值+1,完成可重入操作,并且返回。最后如果都不符合,那么就返回false,表示再次获取锁失败。
进入到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法中
首先来看一下addWaiter(Node.EXCLUSIVE)这个方法是干嘛的
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
分析:这个方法主要就是将当前线程封装成一个节点,然后判断尾节点是否为空,如果不为空,那么就将当前节点通过CAS加入到队尾,然后返回。如果为空,就证明这个队列还是一个空队列,就会进行一个空队列的入队操作enq(node),然后返回。
进入到enq方法中看看
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
分析:这个方法主要通过无限循环进行入列操作,如果队列为空,那么就通过CAS设置为头结点,并且尾节点也指向头节点;如果不为空就通过CAS把节点加到队尾。
了解了addWaiter这个方法回到acquireQueued这个方法中
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
分析:通过无限循环,检查当前节点的前面一个节点是否为头结点,如果是就尝试获取锁,获取成功之后就将当前节点设置为头结点,并且返回false表示当前节点无需阻塞shouldParkAfterFailedAcquire方法就是在阻塞当前线程之前,会检查当前队列中阻塞线程状态是否正常,如果正常那么就通过parkAndCheckInterrupt进行阻塞,并且返回true
进入到shouldParkAfterFailedAcquire(p, node)方法中
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
分析:这个方法主要就是判断当前节点的前面节点的状态,如果前面节点的状态是SIGNAL那么就说明前面节点是正常的,可以添加到当前节点的后面;如果前面节点的状态大于0,意思就是前面的节点已经放弃等待了,那么继续往前找节点,直到找到为不大于0状态的节点,然后将这个节点的状态设置成-1;如果都不满足,就返回false。
进入到unlock方法中
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
分析:进入到unlock方法中,会执行一个release的方法,这个方法首先执行的是tryRelease尝试解锁,如果释放锁成功,那么就通过unparkSuccessor方法进行唤醒阻塞队列中的线程。
进入到TryRelease方法中
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
分析:首先获取state值减去release初始值,判断当前线程时候与获取到锁的线程一致,不一致抛出异常,一致就判断c的值是否为0,如果为0表示可以释放锁,那么就将占锁线程置为null,设置state的值,返回。
进入到unparkSuccessor(h)方法中
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
分析:首先判断头节点的状态,如果小于0那么就通过CAS将头结点的状态设为0。获取头节点的下一个节点,如果为空或者状态大于0,那么从队列的末尾往前遍历,找到一个节点的状态<= 0的,然后记录这个节点,如果不为空,就通过unpark进行唤醒。
总结
大致的流程就是:首先一个线程进入,如果此时共享资源没有被占用,那么就通过CAS将state设置为1,也就是将共享资源变为占用状态,然后执行对应的操作,最后释放锁。如果此时有线程已经占用了,那么这个线程就会进入阻塞队列,不过进入阻塞队列之前还会再次尝试获取,如果也获取失败了,那么检查阻塞队列中线程的状态,找到线程的状态为SIGNAL,然后添加到这个节点的后面。现在节点进入到了阻塞队列中,如果占锁线程执行完毕调用unlock方法,那么会通过tryrelease方法先进行释放锁操作,释放完成之后,会调用unparkSuccessor里面的LockSupport.unpark方法,进行下一个线程的唤醒。这时候进入到acquireQueued这个方法中的parkAndCheckInterrupt这个方法,检测到当前线程不是阻塞状态,那么就会通过自旋一直尝试获取锁。