Java进阶之并发编程(三)AQS详解
一、什么是JUC
JUC指的是Java并发编程工具包(Java Concurrency Utilities),它提供了一系列的工具类和接口,用于简化并发编程的开发。虽然Java语言中已经提供了synchronized关键字来支持多线程并发编程,但是JUC提供了更加灵活和高效的实现方式,以满足更加复杂的并发编程需求。
具体来说,JUC提供了以下几个重要的类和接口:
- Lock和Condition:Lock和Condition是替代synchronized关键字的工具,它们提供了更加灵活和可控的线程同步机制,使得在高并发情况下能够更加高效地协调线程之间的交互。
- Semaphore:Semaphore是一种计数信号量,它可以用来控制同时访问某个资源的线程数量。
- CountDownLatch:CountDownLatch是一种倒计数器,它可以让一个线程等待多个其他线程执行完毕后再继续执行。
- CyclicBarrier:CyclicBarrier也是一种倒计数器,它可以让多个线程相互等待,直到所有线程都到达某个屏障点后再一起继续执行。
- Executor和ExecutorService:Executor和ExecutorService是线程池的实现,它们可以有效地管理和控制线程的数量,从而避免创建过多的线程导致系统资源浪费和性能下降。
综上所述,虽然synchronized关键字可以实现基本的线程同步和互斥,但在高并发情况下,JUC提供的工具类和接口能够更加灵活和高效地协调线程之间的交互,从而提高程序的性能和可靠性。
在正式介绍JUC之前,先思考一个问题,那就是如果让我们自己实现一个类似sychronized的锁机制,我们要如何去设计呢?
锁设计猜想
一定会设计到锁的抢占 , 需要有一个标记来实现互斥。 全局变量(0,1)
抢占到了锁,怎么处理(不需要处理.)
没抢占到锁,怎么处理
- 需要等待(让处于排队中的线程,如果没有抢占到锁,则直接先阻塞->释放CPU资源)。
- 如何让线程等待?
- wait/notify(线程通信的机制,无法指定唤醒某个线程)
- LockSupport.park/unpark(阻塞一个指定的线程,唤醒一个指定的线程) Condition
- 如何让线程等待?
- 需要排队(允许有N个线程被阻塞,此时线程处于活跃状态)。
- 通过一个数据结构,把这N个排队的线程存储起来。
- 需要等待(让处于排队中的线程,如果没有抢占到锁,则直接先阻塞->释放CPU资源)。
抢占到锁的释放过程,如何处理
- LockSupport.unpark() -> 唤醒处于队列中的指定线程.
锁抢占的公平性(是否允许插队)
公平
非公平
在JUC中,AQS(AbstractQueuedSynchronizer)即是实现上述过程的一个框架类
二、AQS
AQS(AbstractQueuedSynchronizer)是一个用于构建锁、同步器等并发组件的框架
它是Java并发包(java.util.concurrent)的核心组件之一。
首先介绍一下AQS中用到的一些类与变量
1.内部类—Node
Node作为等待队列节点类,在线程竞争锁失败后,AQS会将线程封装成一个Node节点,放入等待队列当中,自旋等待,自旋竞争再次失败后就会进入等待状态,等待被唤醒再次竞争锁
Node类包含以下主要字段:
- prev:指向前一个节点的指针。
- next:指向后一个节点的指针。
- thread:持有该节点的线程对象。
- waitStatus:用于表示线程的状态,包括取消、阻塞、等待等。
- nextWaiter:用于在等待队列中链接不同条件的线程。
1 | +------+ prev +-----+ +-----+ |
Node类源码
1 | static final class Node { |
2.变量
1 | /** |
同步队列是AQS中一个重要的概念,用于实现线程的阻塞和唤醒,以及线程的竞争获取同步状态。
AQS中包含了三种队列概念:等待队列、条件队列和同步队列。
- 等待队列(Wait Queue):等待队列是AQS中用于存放被阻塞的线程的数据结构。当一个线程需要获取同步状态,但是当前同步状态已经被其他线程占用时,该线程会被封装成一个Node节点并加入到等待队列中。等待队列是一个FIFO队列,可以保证等待时间最长的线程先被唤醒。
- 条件队列(Condition Queue):条件队列是基于等待队列实现的,用于支持条件变量的功能。当一个线程需要等待一个条件变量时,它会被封装成一个Node节点并加入到条件队列中,而不是等待队列中。当满足条件时,条件队列中的线程会被转移至等待队列中等待获取同步状态。
- 同步队列(Sync Queue):同步队列是AQS中存放已经获取到同步状态的线程(即通过acquire获取成功)的数据结构。当一个线程获取到同步状态后,它会从等待队列中转移到同步队列中,并且会释放之前占有的同步状态。同步队列的管理是通过head和tail指针实现的,head指向同步队列的第一个节点,tail指向同步队列的最后一个节点。
此外,还需要注意
exclusiveOwnerThread这个变量,它来自于AQS的抽象父类AbstractOwnableSynchronizer,该类就两个方法,分别是设置和获取该变量exclusiveOwnerThread
3.方法
AQS中的几个重要方法如下:
- acquire(int arg):尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
- acquireShared(int arg):尝试获取共享锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
- release(int arg):释放独占锁。
- releaseShared(int arg):释放共享锁。
- tryAcquire(int arg):尝试获取独占锁,如果获取成功,则返回true,否则返回false。
- tryAcquireShared(int arg):尝试获取共享锁,如果获取成功,则返回一个大于等于0的值,表示获取共享锁的线程数,否则返回负数。
- tryRelease(int arg):尝试释放独占锁,如果成功则返回true,否则返回false。
- tryReleaseShared(int arg):尝试释放共享锁,如果成功则返回true,否则返回false。
- acquireInterruptibly(int arg):尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
这些方法是AQS中的核心方法,用于实现同步和互斥。
acquire()、acquireShared()、release()、releaseShared() 是AQS中最常用的同步方法
tryAcquire()、tryAcquireShared()、tryRelease()、tryReleaseShared() 则是尝试获取/释放同步状态,这些方法一般是被重载后使用。
acquireInterruptibly() 方法也是尝试获取独占锁,不过它会响应中断。
1)compareAndSetState
AQS中通过state变量来标识同步状态,为了保证state参数修改的可见性、原子性,在AQS当中使用CAS机制来进行state的修改
1 | protected final boolean compareAndSetState(int expect, int update) { |
2)acquire
尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
1 | public final void acquire(int arg) { |
其中,tryAcquire 是 AQS 中一个抽象方法,需要用户自定义实现。在 AQS 中,同步状态的获取和释放都是通过 tryAcquire 和 tryRelease 方法实现的,因此用户可以根据自己的需求来定义同步状态的获取和释放逻辑。
addWaiter 方法是 AQS 中的一个辅助方法,用于将一个新的 Node 节点加入到等待队列中,并返回这个新节点。
1 | private Node addWaiter(Node mode) { |
在这里,AQS中的enq方法用于将线程加入到等待队列中,实现方式是通过CAS(compare-and-swap)操作将节点插入到队尾,保证线程的插入是原子性的。
1 | private Node enq(final Node node) { |
acquireQueued传入addWaiter 方法返回的节点,一丝是在线程被包装成节点入队之后,还会尝试调用自旋来获取锁,步骤如下:
- 尝试获取锁,如果成功则直接返回。
- 如果获取锁失败,则线程会进入自旋状态,不断地检查前驱节点的状态是否为 SIGNAL。
- 如果前驱节点状态为 SIGNAL,说明当前线程可以尝试获取同步状态了,于是调用tryAcquire方法再次尝试获取锁。
- 如果获取锁成功,则当前线程会从等待队列中移除,并返回。
- 如果tryAcquire方法返回false,则当前线程会继续自旋等待前驱节点唤醒自己。
acquireQueued方法中的自旋是在等待前驱节点释放锁的过程中进行的,如果等待时间过长,一般是由于前驱节点无法释放锁,这时会进入阻塞状态。在进入阻塞状态前,会将自己的节点状态设置为WAITING,并通过LockSupport.park()方法挂起线程,等待前驱节点的唤醒。
1 | // node表示当前来抢占锁的线程 |
3)acquireShared
尝试获取共享锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
1 | public final void acquireShared(int arg) { |
4)release
释放独占锁
1 | public final boolean release(int arg) { |
其中,tryRelease交给子类实现
4.小结
AQS是一个用于构建同步器的框架,它为子类提供了许多方法,使得子类仅需要重写部分方法就可以方便的实现同步功能,下面是AQS为子类提供的一些公共功能
- 获取/释放锁:AQS提供了acquire()和release()两个方法,这两个方法是获取锁和释放锁的基础。
- 等待队列的操作:AQS提供了许多操作等待队列的方法,例如enq()、deq()、transferForSignal()等等。这些方法让子类能够方便地实现等待队列的管理。
- 条件队列的操作:AQS还提供了一些操作条件队列的方法,例如addConditionWaiter()、transferAfterCancelledWait()等等。这些方法可以让子类方便地实现条件队列的管理。
- 重入锁的实现:AQS还提供了一些方法,可以方便地实现重入锁。例如tryAcquire()、tryRelease()等等。
- 共享锁的实现:AQS提供了一些方法,可以方便地实现共享锁。例如tryAcquireShared()、tryReleaseShared()等等。
- 线程的中断处理:AQS提供了interruptMode()和clearInterruptsForReentry()两个方法,可以方便地处理线程中断的情况。
总的来说,AQS并没有定义具体的加锁和释放锁的逻辑,而是通过子类来实现这些逻辑,同时提供了一些钩子方法,使得子类可以在特定的时间点进行扩展和定制。这种设计能够提高复用性和灵活性,使得开发者可以快速地构建出各种同步器,以满足不同的需求。
其中,子类至少需要重写以下方法:
- tryAcquire(int):尝试以独占模式获取同步状态,如果获取成功,返回true,否则返回false。
- tryRelease(int):尝试以独占模式释放同步状态,如果释放成功,返回true,否则返回false。
- tryAcquireShared(int):尝试以共享模式获取同步状态,如果获取成功,返回大于等于0的值,否则返回小于0的值。
- tryReleaseShared(int):尝试以共享模式释放同步状态,如果释放成功,返回true,否则返回false。
三、ReentrantLock类
1.基础分析
ReentrantLock并不是直接继承了AQS抽象类,而是定义了一个内部类来继承AQS,并在此基础上分别实现了公平锁与非公平锁两种同步类型,宏观上看代码如下:
1 | public class ReentrantLock implements Lock, java.io.Serializable { |
在不指定锁类型时,声明的就是非公平锁,如果传入了boolean值,则判断后生成公平或者非公平锁
1 | public ReentrantLock() { |
2.非公平锁-加锁
首先看加锁过程:
1 | final void lock() { |
在上面已经分析过acquire方法,这里不再赘述,简单来说就是一个封装线程为节点入队,并在入队后自旋尝试继续获取锁的过程
1 | public final void acquire(int arg) { |
由ReentrantLock实现的非公平锁加锁过程
1 | protected final boolean tryAcquire(int acquires) { |
3.公平锁-加锁
1 | // 尝试抢占一把锁 |
4.解锁
ReentrantLock类内
1 | public void unlock() { |
AQS中
1 | public final boolean release(int arg) { |
ReentrantLock中
1 | protected final boolean tryRelease(int releases) { |
AQS中
1 | private void unparkSuccessor(Node node) { |
下面是waitStatus的取值与代表意义
- SIGNAL(-1):表示当前节点的后继节点需要被唤醒。
- CANCELLED(1):表示当前节点已经取消等待。
- CONDITION(2):表示当前节点在条件队列中等待。
- PROPAGATE(-3):用于共享模式下,表示后继节点需要向前传播唤醒信号。
四、补充
1.AQS为什么要采用双向链表的结构呢?
1)新入队列的线程,需要保证它的前置节点状态是正常的,不然可能存在异常节点导致后续无法正常唤醒,因此需要能够从后往前查询,不然就要从头往后遍历
2)在队列中的线程,是允许被中断的,被中断之后标记为cancel状态,继续存在与队列当中,如果单向链表,就必须要从头往后遍历
3)新入队的节点,由于公平锁的存在,要判断前驱节点是否是头节点,如果是头节点才会继续自旋尝试获取锁,不然入队后的自旋就毫无意义,因此要能查找前置节点
总结:主要是要考虑入队和唤醒









