Java并发之AQS原理剖析


概述:

AbstractQueuedSynchronizer,可以称为抽象队列同步器。

AQS有独占模式和共享模式两种:

  • 独占模式:

公平锁:

非公平锁:

  • 共享模式:

 

数据结构:

  • 基本属性:
/**
 * 同步等待队列的头结点
 */
private transient volatile Node head;

/**
 * 同步等待队列的尾结点
 */
private transient volatile Node tail;

/**
 * 同步资源状态
 */
private volatile int state;
  • 内部类:
static final class Node {
    /**
     * 标记节点为共享模式
     */
    static final Node SHARED = new Node();
    /**
     * 标记节点为独占模式
     */
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     *   CANCELLED:  值为1,表示当前的线程被取消
     *   SIGNAL: 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
     *   CONDITION:  值为-2,表示当前节点在等待condition,也就是在condition队列中;
     *   PROPAGATE:  值为-3,表示当前场景下后续的acquireShared能够得以执行;
     *   0:  表示当前节点在sync队列中,等待着获取锁。
     *  表示当前节点的状态值
     */
    volatile int waitStatus;

    /**
     * 前置节点
     */
    volatile Node prev;

    /**
     * 后继节点
     */
    volatile Node next;

    /**
     * 节点同步状态的线程
     */
    volatile Thread thread;

    /**
     * 存储condition队列中的后继节点
     */
    Node nextWaiter;

    /**
     * 是否为共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 获取前驱结点
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

主要方法解析:

  • tryAcquire/tryAcquireShared(int arg)

  独占/共享模式获取锁;由子类实现,仅仅获取锁,获取锁失败时不进行阻塞排队。

  • tryRelease/tryReleaseShared(int arg)

  独占/共享模式释放锁;由子类实现,仅仅释放锁,释放锁成功不对后继节点进行唤醒操作。

  • acquire/acquireShared(int arg)

  独占/共享模式获取锁,如果线程被中断唤醒,会返回线程中断状态,不会抛异常中止执行操作(忽略中断)。

  • acquireInterruptibly/acquireSharedInterruptibly(int arg)

  独占/共享模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止)。

  • tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)

  独占/共享时间中断模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止);如果超出等待时间则返回加锁失败。

  • release/releaseShared(int arg)

  独占/共享模式释放锁。

  • addWaiter(Node mode)

  将给定模式节点进行入队操作。

 1     private Node addWaiter(Node mode) {
 2         // 根据指定模式,新建一个当前节点的对象
 3         Node node = new Node(Thread.currentThread(), mode);
 4         // Try the fast path of enq; backup to full enq on failure
 5         Node pred = tail;
 6         if (pred != null) {
 7             // 将当前节点的前置节点指向之前的尾结点
 8             node.prev = pred;
 9             // 将当前等待的节点设置为尾结点(原子操作)
10             if (compareAndSetTail(pred, node)) {
11                 // 之前尾结点的后继节点设置为当前等待的节点
12                 pred.next = node;
13                 return node;
14             }
15         }
16         enq(node);
17         return node;
18     }
  • enq(final Node node)

  将节点设置为尾结点。注意这里会进行自旋操作,确保节点设置成功。因为等待的线程需要被唤醒操作;如果操作失败,当前节点没有与其他节点没有引用指向关系,一直就不会被唤醒(除非程序代码中断线程)。

 1     private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             // 判断尾结点是否为空,尾结点初始值是为空
 5             if (t == null) { // Must initialize
 6                 // 尾结点为空,需要初始化
 7                 if (compareAndSetHead(new Node()))
 8                     tail = head;
 9             } else {
10                 // 设置当前节点设置为尾结点
11                 node.prev = t;
12                 if (compareAndSetTail(t, node)) {
13                     t.next = node;
14                     return t;
15                 }
16             }
17         }
18     }
  • acquireQueued(final Node node, int arg)

  已经在队列当中的节点,准备阻塞获取锁。在阻塞前会判断前置节点是否为头结点,如果为头结点;这时会尝试获取下锁(因为这时头结点有可能会释放锁)。

 1     final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 // 当前节点的前置节点
 7                 final Node p = node.predecessor();
 8                 // 入队前会先判断下该节点的前置节点是否是头节点(此时头结点有可能会释放锁);然后尝试去抢锁
 9                 // 在非公平锁场景下有可能会抢锁失败,这时候会继续往下执行 阻塞线程
10                 if (p == head && tryAcquire(arg)) {
11                     //如果抢到锁,将头节点后移(也就是将该节点设置为头结点)
12                     setHead(node);
13                     p.next = null; // help GC
14                     failed = false;
15                     return interrupted;
16                 }
17                 // 如果前置节点不是头结点,或者当前节点抢锁失败;通过shouldParkAfterFailedAcquire判断是否应该阻塞
18                 // 当前置节点的状态为SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞线程
19                 if (shouldParkAfterFailedAcquire(p, node) &&
20                     parkAndCheckInterrupt())
21                     // 该线程已被中断
22                     interrupted = true;
23             }
24         } finally {
25             if (failed)
26                 cancelAcquire(node);
27         }
28     }
  • shouldParkAfterFailedAcquire(Node pred, Node node)

  检查和更新未能获取锁节点的状态,返回是否可以被安全阻塞。

 1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus;   // 获取前置节点的状态
 3         if (ws == Node.SIGNAL)
 4             /*
 5              * 前置节点的状态waitStatus为SIGNAL=-1,当前线程可以安全的阻塞
 6              */
 7             return true;
 8         if (ws > 0) {
 9             /*
10              * 如果前置节点的状态waitStatus>0,即waitStatus为CANCELLED=1(无效节点),需要从同步状态队列中取消等待(移除队列)
11              */
12             do {
13                 node.prev = pred = pred.prev;
14             } while (pred.waitStatus > 0);
15             pred.next = node;
16         } else {
17             /*
18              * 将前置状态的waitStatus修改为SIGNAL=-1,然后当前节点才可以被安全的阻塞
19              */
20             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
21         }
22         return false;
23     }
  • parkAndCheckInterrupt()

  阻塞当前节点,返回当前线程的中断状态。

1     private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this); //阻塞
3         return Thread.interrupted();
4     }
  • cancelAcquire(Node node)

  取消进行的获取锁操作,在非忽略中断模式下,线程被中断唤醒抛异常时会调用该方法。

 1     //  将当前节点的状态设置为CANCELLED,无效的节点,同时移除队列       
 2     private void cancelAcquire(Node node) {
 3         if (node == null)
 4             return;
 5 
 6         node.thread = null;
 7         Node pred = node.prev;
 8         while (pred.waitStatus > 0)
 9             node.prev = pred = pred.prev;
10 
11         Node predNext = pred.next;
12         node.waitStatus = Node.CANCELLED;
13         if (node == tail && compareAndSetTail(node, pred)) {
14             compareAndSetNext(pred, predNext, null);
15         } else {
16             int ws;
17             if (pred != head &&
18                 ((ws = pred.waitStatus) == Node.SIGNAL ||
19                  (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
20                 pred.thread != null) {
21                 Node next = node.next;
22                 if (next != null && next.waitStatus <= 0)
23                     compareAndSetNext(pred, predNext, next);
24             } else {
25                 unparkSuccessor(node);
26             }
27 
28             node.next = node; // help GC
29         }
30     }    
  • hasQueuedPredecessors()

  判断当前线程是否应该排队。

  1.第一种结果——返回true:(1.1和1.2同时存在,1.2.1和1.2.2有一个存在)

    1.1  h != t为true,说明头结点和尾结点不相等,表示队列中至少有两个不同节点存在,至少有一点不为null。

    1.2  ((s = h.next) == null || s.thread != Thread.currentThread())为true

      1.2.1   (s = h.next) == null为true,表示头结点之后没有后续节点。

      1.2.2   (s = h.next) == null为false,s.thread != Thread.currentThread()为true
          表示头结点之后有后续节点,但是头节点的下一个节点不是当前线程

  2.第二种结果——返回false,无需排队。(2.1和2.2有一个存在)

    2.1  h != t为false,即h == t;表示h和t同时为null或者h和t是同一个节点,无后续节点。

    2.2  h != t为true,((s = h.next) == null || s.thread != Thread.currentThread())为false

      表示队列中至少有两个不同节点存在,同时持有锁的线程为当前线程。

1     public final boolean hasQueuedPredecessors() {
2         Node t = tail; // Read fields in reverse initialization order
3         Node h = head;
4         Node s;
5         return h != t &&
6             ((s = h.next) == null || s.thread != Thread.currentThread());
7     }

 

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

  • 分享:
评论
还没有评论
    发表评论 说点什么