不要急,不要急,马上好^_^...

AQS源码分析


首先看一下继承结构

image-20211117100716785

通过上面的图,可以知道,其实除了公共功能,右边所有的类都是对AQS的一个实现,所以了解AQS是我们了解这些方法的基础

结合源码进行工作流程分析

首先为了不让我们在看源码的过程中晕头转向,首先得把握住AQS的核心点:

占锁->失败存储线程->阻塞存储线程->唤醒

  • 首先进入到ReentrantLock中的lock方法

    public void lock() {
        //这里是通过sync进行实现的
        sync.lock();
    }
  • 继续进入lock中,会发现有公平锁非公平锁的实现,这里选择非公平锁进行查看
    image-20211117103306311

  • 这里有lock方法的主要实现

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    //CAS内部
    protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    • 分析:这里的方法主要是通过CAS判断state是否为0,如果为0,那么就将state更新为1,然后设置这个线程为占锁的线程,如果不为0,那么即占锁失败,这时候就会执行**acquire(1);**这个方法。

    • 补充:CAS里面主要是通过unsafe类进行一个CAS的操作,因为unsafe是一个可以直接操作内存数据的方法,绕过了JMM内存可见性。

  • 进入到acquire(1);方法中

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    分析:这个方法主要做了三个事情

    • 再次尝试获取锁:tryAcquire(arg)
    • 加入到阻塞队列:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    • 自己中断:selfInterrupt()
    • 下面我们来逐个分析这三个方法到底是怎么实现的
  • 首先进入tryAcquire(arg)方法中

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //非公平锁对tryAcquire的实现方法
    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    分析:通过tryAcquire(arg)方法可以看到,首先获取state的值,如果state的值为0,说明之前的线程已经释放锁,那么就通过CAS尝试占锁,成功则返回。这里进入第二个判断条件,如果state不等于0,但是当前线程和占锁的线程是同一个线程,那么就证明这是一个可重入的操作(ReentrantLock是一个可重入锁),那么就对state的值+1,完成可重入操作,并且返回。最后如果都不符合,那么就返回false,表示再次获取锁失败。

  • 进入到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法中

  • 首先来看一下addWaiter(Node.EXCLUSIVE)这个方法是干嘛的

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    分析:这个方法主要就是将当前线程封装成一个节点,然后判断尾节点是否为空,如果不为空,那么就将当前节点通过CAS加入到队尾,然后返回。如果为空,就证明这个队列还是一个空队列,就会进行一个空队列的入队操作enq(node),然后返回。

    • 进入到enq方法中看看

      private Node enq(final Node node) {
          for (;;) {
              Node t = tail;
              if (t == null) { // Must initialize
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                  }
              }
          }
      }

      分析:这个方法主要通过无限循环进行入列操作,如果队列为空,那么就通过CAS设置为头结点,并且尾节点也指向头节点;如果不为空就通过CAS把节点加到队尾。

  • 了解了addWaiter这个方法回到acquireQueued这个方法中

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    分析:通过无限循环,检查当前节点的前面一个节点是否为头结点,如果是就尝试获取锁,获取成功之后就将当前节点设置为头结点,并且返回false表示当前节点无需阻塞shouldParkAfterFailedAcquire方法就是在阻塞当前线程之前,会检查当前队列中阻塞线程状态是否正常,如果正常那么就通过parkAndCheckInterrupt进行阻塞,并且返回true

    • 进入到shouldParkAfterFailedAcquire(p, node)方法中

      /** waitStatus value to indicate thread has cancelled */
              static final int CANCELLED =  1;
      /** waitStatus value to indicate successor's thread needs unparking */
              static final int SIGNAL    = -1;
      /** waitStatus value to indicate thread is waiting on condition */
              static final int CONDITION = -2;
      /**
      * waitStatus value to indicate the next acquireShared should
      * unconditionally propagate
      */
              static final int PROPAGATE = -3;
      
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;
          if (ws == Node.SIGNAL)
              /*
               * This node has already set status asking a release
               * to signal it, so it can safely park.
               */
              return true;
          if (ws > 0) {
              /*
               * Predecessor was cancelled. Skip over predecessors and
               * indicate retry.
               */
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              /*
               * waitStatus must be 0 or PROPAGATE.  Indicate that we
               * need a signal, but don't park yet.  Caller will need to
               * retry to make sure it cannot acquire before parking.
               */
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }

      分析:这个方法主要就是判断当前节点的前面节点的状态,如果前面节点的状态是SIGNAL那么就说明前面节点是正常的,可以添加到当前节点的后面;如果前面节点的状态大于0,意思就是前面的节点已经放弃等待了,那么继续往前找节点,直到找到为不大于0状态的节点,然后将这个节点的状态设置成-1;如果都不满足,就返回false。

  • 进入到unlock方法中

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    分析:进入到unlock方法中,会执行一个release的方法,这个方法首先执行的是tryRelease尝试解锁,如果释放锁成功,那么就通过unparkSuccessor方法进行唤醒阻塞队列中的线程。

  • 进入到TryRelease方法中

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    分析:首先获取state值减去release初始值,判断当前线程时候与获取到锁的线程一致,不一致抛出异常,一致就判断c的值是否为0,如果为0表示可以释放锁,那么就将占锁线程置为null,设置state的值,返回。

  • 进入到unparkSuccessor(h)方法中

    private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    分析:首先判断头节点的状态,如果小于0那么就通过CAS将头结点的状态设为0。获取头节点的下一个节点,如果为空或者状态大于0,那么从队列的末尾往前遍历,找到一个节点的状态<= 0的,然后记录这个节点,如果不为空,就通过unpark进行唤醒。

总结

大致的流程就是:首先一个线程进入,如果此时共享资源没有被占用,那么就通过CAS将state设置为1,也就是将共享资源变为占用状态,然后执行对应的操作,最后释放锁。如果此时有线程已经占用了,那么这个线程就会进入阻塞队列,不过进入阻塞队列之前还会再次尝试获取,如果也获取失败了,那么检查阻塞队列中线程的状态,找到线程的状态为SIGNAL,然后添加到这个节点的后面。现在节点进入到了阻塞队列中,如果占锁线程执行完毕调用unlock方法,那么会通过tryrelease方法先进行释放锁操作,释放完成之后,会调用unparkSuccessor里面的LockSupport.unpark方法,进行下一个线程的唤醒。这时候进入到acquireQueued这个方法中的parkAndCheckInterrupt这个方法,检测到当前线程不是阻塞状态,那么就会通过自旋一直尝试获取锁。


文章作者: Mr Hou
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Mr Hou !
希望得到你的建议哦
  目录