在并发编程中,频繁的创建和销毁线程非常影响处理的效率,而且会使得线程抢占系统资源从而导致阻塞。这时候就需要线程池的帮助,线程池的创建离不开阻塞队列。

阻塞队列

什么是阻塞队列

当阻塞队列是空的,从队列中获取元素的操作将会被阻塞;

当阻塞队列是满的,往队列里添加元素的操作将会被阻塞;

试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素;

同样,试图从满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他线程从列中移除一个或者多个元素或者完全清空队列使队列重新变得空闲起来并后续新增。

在多线程领域里:所谓阻塞,在某些情况下会挂起线程,一旦条件满足,被挂起的线程又会自动被唤醒。

阻塞队列的关系如下图所示,其中BlockingQueue是继承与Queue。而下面七个阻塞队列,都是实现了BlockingQueue接口。

BlockingQueue实现结构 BlockingQueue实现结构

上面七个具体的阻塞队列的UML图如下所示。

阻塞队列的UML图 阻塞队列的UML图

表1 常用阻塞队列

阻塞队列名称 底层实现 出入队 名词解释
ArrayBlockingQueue 数组 先进先出 ReentrantLockCondition 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 链表 先进先出 ReentrantLockCondition 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue 数组实现的二叉树 优先级 ReentrantLockCondition 一个支持优先级排序的无界阻塞队列
DelayQueue 优先级队列+延迟 出队时间 ReentrantLockCondition 一个使用优先级队列实现的无界阻塞队列
SynchronousQueue 不存数据,阻塞线程链表 线程阻塞匹配 无锁,CAS实现 一个不存储元素的阻塞队列
LinkedTransferQueue 链表 先进先出 无锁,CAS实现 一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque 双向链表 先进先出,先进后出,优先出入 ReentrantLockCondition 一个由链表结构组成的双向阻塞队列

这些常用的阻塞队列都是对BlockingQueue接口的实现,也都是线程安全的。

有界无界区别

生产者消费者模型如图所示。

生产者消费者模式图 生产者消费者模式图

有限队列就是长度有限,满了以后生产者会阻塞。

无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者JVM帮你解决烦恼,直接把你 OOM kill 省事了。

其实无界也会阻塞,因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。

ArrayBlockingQueue

数组实现的有界阻塞队列。

此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时可以设置参数。

LinkedBlockingQueue

链表实现的有界阻塞队列。

此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

ArrayBlockingQueueLinkedBlockingQueue区别

  1. 队列中锁的实现不同

    ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;

    LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

  2. 在生产或消费时操作不同

    ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;

    LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会影响性能

  3. 队列大小初始化方式不同

    ArrayBlockingQueue实现的队列中必须指定队列的大小;

    LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

PriorityBlockingQueue

支持优先级的无界阻塞队列。

默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。Handler实现也用的是优先级阻塞队列

DelayQueue

支持延时获取元素的无界阻塞队列。

队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,用于缓存系统的设计。可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

SynchronousQueue

不存储元素的阻塞队列。

每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueueArrayBlockingQueue

LinkedTransferQueue

链表结构组成的无界阻塞队列

多了tryTransfer和transfer方法。

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

(2)tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

LinkedBlockingDeque

链表结构组成的双向阻塞队列。

所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

表2 双向阻塞队列方法表

名称 解释
addFirst 指定的元素插入此双端队列的开头,失败抛异常
offerFirst 插入此双端队列表示的队列的开头,成功返回true,失败不抛异常
peekFirst 获取但不移除此双端队列的第一个元素,队列为空则返回 null
pollFirst 获取并移除此双端队列的第一个元素,队列为空则返回 null
removeFirst 获取并移除此双端队列第一个元素,失败抛异常
takeFirst 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素
addLast 指定的元素插入此双端队列的末尾,失败抛异常
offerLast 插入此双端队列表示的队列的末尾,成功返回true,失败不抛异常
peekLast 获取但不移除此双端队列的最后一个元素,队列为空则返回 null
pollLast 获取并移除此双端队列的最后一个元素,队列为空则返回 null
removeLast 获取并移除此双端队列最后一个元素,失败抛异常
takeLast 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

总结

尽量使用有界队列,有界队列可以增加系统稳定性,无限受到系统的资源的限制。无界的话,线程池中永远只有核心线程

线程池

线程池简介

线程的创建和销毁都需要映射到操作系统,因此其代价是比较高昂的。出于避免频繁创建、销毁线程以及方便线程管理的需要,线程池应运而生。其中这个线程池跟之前Handler中的Message的设计思想类似,都属于享元设计模式。包括被RxJava代替的AsyncTask异步处理框架中,也有线程池的应用,用到了executeOnExecutor 的一个实例。 juc虽然为开发人员提供了Executors工具类以及内置的多种线程池,但那些线程池的使用非常局限,无法满足日益复杂的业务场景。阿里官方的编程规约中也推荐开发人员不要直接使用juc自带的线程池,而是根据自身业务场景通过ThreadPoolExecutor进行创建线程池。因此深入理解线程池是非常关键,便于对线程池的二次开发。

线程池原理

这不刚过完了双十一,我们对双十一进行复盘。作为一场购物狂欢节,剁手党会专门等到这几天疯狂购买一些商品。假设这次对应的商家是欧莱雅,其有自己的工厂,固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当前三分钟的时候,订单数量由正式员工处理。当前五分钟的时候订单增加,正式工人已经忙不过来了,将欧莱雅的生产原料暂时堆积在仓库中,等到忙完这阵子再处理(但实际上双十一任务只多不少没时间处理,所以需要调度员实时调度)。当前一小时的时候订单还在增加,仓库堆积满了后,工厂只能临时扩招一批工人来应对生产高峰,当然双十一结束后这批临时工要被清退。当一整天之后,订单数量还在增加,工厂能够招的临时工也以招满后,即工厂已经到达了工厂最大容量数,后面的订单只能通过策略来缓解,一般会让用户过几天下单或者其他策略不再接受新的订单。

这里的工厂为线程池,正式员工为线程池中的核心线程,工厂最大容量数为线程池中的最大线程数量,仓库是线程池中的阻塞队列,策略是线程池中的拒绝策略。

根据上面的双十一的例子抽象出来对应到具体的线程池,那么线程池原理如下图所示

线程池原理图 线程池原理图

线程池原理说明

  1. 如果当前运行的线程少于核心线程数,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于核心线程数,则将任务加入阻塞队列。
  3. 如果无法将任务加入阻塞队列(队列已满),则创建新的线程来处理任务。
  4. 如果创建新线程将使当前运行的线程超出最大线程数,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

线程池的调用关系图

线程池的调用关系图 线程池的调用关系图

线程池对应的UML图

线程池对应的UML图 线程池对应的UML图

存在原因

  1. 降低资源消耗。通过重复利用线程池中已创建的代码,来降低创建和销毁线程时的开销,从而降低资源消耗,提高利用效率。
  2. 提高响应速度。当任务到达时,任务不需要来等待线程创建就能立即执行,因为线程池中有已创建好的线程可以直接供使用。假设一个服务器完成一项任务所需时间为三段时间,T1为 创建线程时间,T2 为线程中执行任务的时间,T3为 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1和T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
  3. 提高线程的可管理性。线程池时稀缺资源,如果无限制的创建,当线程数到达一定数量时,不仅会消耗系统资源,还会降低系统的稳定性,所以使用线程池,就可以进行统一分配,调优和监控。
  4. 提供定时执行、定期执行、单线程、并发数控制等功能。

七大参数

这七大参数主要是集中在构造方法中,juc中有具体解释,这里笔者将其翻译润化了下。

1public ThreadPoolExecutor(int corePoolSize,
2                          int maximumPoolSize,
3                          long keepAliveTime,
4                          TimeUnit unit,
5                          BlockingQueue<Runnable> workQueue,
6                          ThreadFactory threadFactory,	
7                          RejectedExecutionHandler handler) 

表3 构造函数的七大参数

构造参数 名词解释
corePoolSize 核心线程数,空闲状态也要保留在池中的线程数,除非调用allowCoreThreadTimeOut
maximumPoolSize 最大线程数,允许池子中最大的线程数量
keepAliveTime 线程空闲时的存活时间,当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
unit keepAliveTime的时间单位
workQueue **工作队列,**必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的核心线程数时,线程会进入阻塞队列进行阻塞等待
threadFactory 创建线程的工厂,工厂可以给每个新建的线程设置一个具有识别度的线程名,还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
handler 线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务

上述的工作队列,创建线程的工厂,拒绝策略不可设置为空,否则当前这个构造返回一个空对象。

三大默认线程池方法

juc自带了的三大线程池,其实并不常用。这些线程池可由 Executors 这个工具类(或叫线程池工厂)来创建。

FixedThreadPool

1//Executors.java#newFixedThreadPool
2public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
3    return new ThreadPoolExecutor(nThreads, nThreads,
4                                  0L, TimeUnit.MILLISECONDS,
5                                  new LinkedBlockingQueue<Runnable>(),
6                                  threadFactory);
7}

固定线程数线程池的创建方式如下:其中核心线程数与最大线程数固定且相等,采用以链表为底层结构的无界阻塞队列LinkedBlockingQueue设置为默认大小为Integer的最大值。

特点

  • 核心线程数与最大线程数相等,因此不会创建空闲线程keepAliveTime 设置与否无关紧要。
  • 采用无界队列,任务会被无限添加,直至内存溢出(OOM)。
  • 由于无界队列不可能被占满,任务在执行前不可能被拒绝(前提是线程池一直处于运行状态)。

应用场景

  • 适用于线程数固定的场景
  • 适用负载比较重的服务器

SingleThreadExecutor

1//Executors.java#newSingleThreadExecutor
2public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
3    return new FinalizableDelegatedExecutorService
4        (new ThreadPoolExecutor(1, 1,
5                                0L, TimeUnit.MILLISECONDS,
6                                new LinkedBlockingQueue<Runnable>(),
7                                threadFactory));
8}

单线程线程池的创建方式如下:其中核心线程数与最大线程数都为1,采用以链表为底层结构的无界阻塞队列。

特点

  • FixedThreadPool 类似,只是线程数为1而已。

应用场景

  • 适用单线程的场景。
  • 适用于对提交任务的处理有顺序性要求的场景。

CachedThreadPool

1//Executors.java#newCachedThreadPool
2public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
3    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
4                                  60L, TimeUnit.SECONDS,
5                                  new SynchronousQueue<Runnable>(),
6                                  threadFactory);
7}

缓冲线程池的创建方式如下:其中核心线程数为0,最大线程数为Integer.MAX_VALUE(可以理解为无穷大)。采用同步阻塞队列。

特点

  • 核心线程数为0,则初始就创建空闲线程,并且空闲线程的只能等待任务60s,60s内没有提交任务,空闲线程将被销毁。
  • 最大线程数为无穷大,这样会造成巨量线程同时运行,CPU负载过高,导致应用崩溃。
  • 采用同步阻塞队列,即队列不存储任务。提交一个消费一个。由于最大线程数为无穷大,因此,只要提交任务就一定会被消费(应用未崩溃前)。

应用场景

  • 适用于耗时短、异步的小程序。
  • 适用于负载较轻的服务器。

三种排队策略

juc中排队有三种通用策略:

  • 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务,此策略允许无界线程具有增长的可能性。
  • 无界队列。(不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  • 有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

五种状态

juc提供线程池的五种状态state,RUNNING(运行状态)、SHUTDOWN(关闭态)、STOP(停止态)、TIDYING(已终止态)、TERMINATED(彻底终止态)

RUNNING: 能够接收新任务,以及对已添加的任务进行处理

SHUTDOWN: 不接收新任务,但能处理已添加的任务。

STOP:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

TIDYING: 当所有的任务已终止,记录的”任务数量”为0,将转化到TERMINATED状态

TERMINATED: 线程池任务完成

JUC提供线程池的五种状态图 JUC提供线程池的五种状态图

四种拒绝策略

表4 四种拒绝策略名词解释

拒绝策略 名词解释
AbortPolicy 直接抛出异常,默认策略
CallerRunsPolicy 用调用者所在的线程来执行任务
DiscardOldestPolicy 丢弃阻塞队列中靠最前的任务,并执行当前任务
DiscardPolicy 直接丢弃任务

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

JUC四种拒绝策略 JUC四种拒绝策略

合理配置线程池

这里的最大线程数一般设置值参考。

最大线程数数量根据任务特性设置。

CPU密集型:从内存中取数计算,不超过CPU核心数,最大数机器核心数+1,防止页缺失

IO密集型:网络通讯,磁盘读取,一般机器的CPU核心数*2

混合型:上述两种结合

可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

最后

这里笔者主要是通过阻塞队列和线程池两块内容,深刻的了解者整体过程。希望通过了解整体过程和源码分别剥离开来,然没有更加深入的研究对应的线程池源码,需要了解源码的可参考Java线程池原理详解

参考文献

[1] 云深i不知处. 深入Java线程池:从设计思想到源码解读, 2021.

[2] Free的午后,备战Java面试[多线程并发] – 线程池详细讲解,2021.

[3] slow is fast.,大厂之路一由浅入深、并行基础、源码分析一 “J.U.C.L”之线程池(最全,最深,最喜欢!!!!),2021.

[4] Xu Weiteng,JDK ThreadPoolExecutor核心原理与实践,2021

[5] 疯狂哈丘,Java线程池实现原理详解,2018.

[6] 王大军,JUC之阻塞队列介绍,2020.

[7] IT乐知,一文总结常见阻塞队列,2020.

[8] 疯狂哈丘,Java线程池实现原理详解,2018.