AQS

来自智得网
跳转至: 导航、​ 搜索

简介

AQS的基本流程

在jdk1.5之前,java提供的并发机制只有 synchronized,但是synchronized提供的功能较为单一,而且效率较为低下。

jdk1.5的并发包(JUC)提供了多种锁以及同步机制,例如CountDownLatch、FutureTask、ReentrantLock、RenntrantReadWriteLock、Semaphore等,其实现的核心类就是AbstractQueuedSynchronizer,一般简称为AQS,该类为不同场景提供了锁实现以及同步机制的基本框架,为同步状态的原子性管理,线程的阻塞以及解除阻塞,等待同步的排队管理提供了通用的机制。

AQS本质是一个同步器,提供获取锁以及释放锁的能力,多线程场景下加锁解锁的获取步骤如下:

需要加锁的线程发起加锁请求,如果此时锁未被占用,则把锁标志为被该线程占用

如果加锁的时候发现锁已经被其他线程占用,则当前线程需要等待,等待的方式分为两种,一种是通过高频自旋检查锁是否释放,一旦锁释放则重新发起加锁请求,另一种是进入阻塞队列等待被唤醒之后再进行抢锁操作。

占用锁的线程完成业务操作之后,释放锁的时候可以将在阻塞队列中的线程唤醒,以便这些线程可以重新发起加锁的请求。

原理

AQS原理

要满足加锁和解锁的功能需要有下列几个组件:

  • 操作锁定状态的原子能力
  • 存储等待线程的阻塞队列
  • 阻塞和唤醒线程的能力

同步状态

AQS内部标识同步状态的字段为state。所有通过AQS实现功能的同步类都是通过修改state的状态来操作同步状态。

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

对于ReentrantLock等独占锁模式,一个锁中只有一个state状态,当该状态为0时,代表锁的状态空闲,当state大于0时,代表有线程获取到了锁。在获取此类独占锁的过程中,线程通过CAS的方式将state的值从0修改为1,其中修改成功的线程代表获取锁成功。

读写锁中的读锁,CountDownLatch等共享锁模式,锁的state代表占用共享锁的线程数量。共享锁同样通过CAS方法修改state的值,CountDownLatch、Semaphore等对共享锁的线程数量存在限制,所以同样需要通过CAS的结果判定是否加锁成功。

当锁释放的时候,也是首先通过CAS修改state的值将锁变更为可以抢占的状态。

阻塞队列

当线程获取锁失败的时候,需要将这些线程放入队列中进行等待。

AQS采用CHL列表实现等待加锁的排队队列,CHL是以Craig、Landin、Hagersten三个人的名字首字母命名的,CHL是一个公平的双向链表,基于list,线程在一个局部变量上自旋,不断轮询前一个节点状态,如果发现前一个节点释放锁结束,则当前节点成为头节点。

AQS将每个获取锁的请求封装位CHL列表的节点,如果只有一个线程加锁而且加锁加成不会初始化CHL同步队列,第二个线程来竞争加锁失败时,会帮头节点初始化一个节点,并将自己的节点挂再头节点后面。CHL的头节点一定表示当前获取锁的线程节点。入队列的时候CHL通过cas自旋的方式保证并发场景下的正确性,将新入节点置为尾节点。

出队列的时候通过自旋判断是否满足释放条件,如果可以的话,将队列的头节点置为下一个节点。

CHL列表的节点包括以下几个属性。

/**
 * 节点的等待状态一个节点可能位于以下几种状态
 * CANCELLED = 1 节点操作因为超时或者对应的线程被interrupt
 * 节点不应该留在此状态一旦达到此状态将从CHL队列中踢出
 * SIGNAL = -1 节点的继任节点是或者将要成为BLOCKED状态
 *例如通过LockSupport.park()操作),因此一个节点一旦被释放解锁或者取消就需要
 * 唤醒LockSupport.unpack()它的继任节点
 * CONDITION = -2表明节点对应的线程因为不满足一个条件Condition而被阻塞
 * 0 正常状态新生的非CONDITION节点都是此状态
 * 非负值标识节点不需要被通知唤醒)。
 * /
volatile int waitStatus;

/**
 * 此节点的前一个节点节点的waitStatus依赖于前一个节点的状态
 * /
volatile Node prev;

/**
 * 此节点的后一个节点后一个节点是否被唤醒uppark()依赖于当前节点是否被释放
 * /
volatile Node next;

/**
 * 节点绑定的线程
 * /
volatile Thread thread;

/**
 * 下一个等待条件Condition的节点由于Condition是独占模式因此这里有一个简单的队列来描述Condition上的线程节点
 * /
Node nextWaiter;

唤醒能力

在锁释放的时候,如果head节点即占有锁线程节点的waitStatus为Signal时候,说明后续节点处于阻塞状态,需要唤醒后续的节点。

在node节点加入队列的时候,会通过自旋锁将前置节点置为signal。

如果head节点的下一个节点状态为取消或者其他异常情况,CLH会从队尾选择一个节点进行唤醒。

唤醒节点之后,不代表被唤醒的线程获得了锁,只是可以进入加锁流程。

实现

公平锁

公平锁的概念是锁的获取可以按照请求的次序顺序获得。默认的加锁方式之所以不公平,是线程在加锁的时候会首先通过CAS抢锁,但此时可能已经有其它线程在队列中排队等待加锁。

非公平锁的效率更高,但是可能会出现饿死的情况,可能会有线程一直不能获取到锁。

所以公平锁加锁的时候只要当前没有线程排队的时候或者当前线程本身就是锁的持有者的时候,才会参与加锁动作,否则直接进入队列等待。

 protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (isFirst(current) &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

如果加锁失败则需要将线程加入等待队列:

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            Node h = new Node(); // Dummy header
            h.next = node;
            node.prev = h;
            if (compareAndSetHead(h)) {
                tail = node;
                return h;
            }
        }
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (RuntimeException ex) {
        cancelAcquire(node);
        throw ex;
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int s = pred.waitStatus;
    if (s < 0) return true;
    if (s > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
    return false;
}

共享锁

共享锁是指多个线程可以同时获取一个锁,例如读写锁的读锁以及CountDownLatch都属于共享锁。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}