Java并发编程之线程池源码学习
[TOC]
概述
线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。
虽然我们程序员创建一个线程很容易,直接使用 new Thread() 创建就可以了,但是操作系统做的工作会多很多,它需要发出 系统调用
,陷入内核,调用内核 API 创建线程,为线程分配资源等,这一些操作有很大的开销。
所以,在高并发大流量的情况下,频繁的创建和销毁线程会大大拖慢响应速度,那么有什么能够提高响应速度的方式吗?方式有很多,尽量避免线程的创建和销毁是一种提升性能的方式,也就是把线程 复用
起来,因为性能是我们日常最关注的因素。
我们先来通过认识一下 Executor 框架、然后通过描述线程池的基本概念入手、逐步认识线程池的核心类,然后慢慢进入线程池的原理中,带你一步一步理解线程池。
在 Java 中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下 Java 的线程池
。
Executor 框架
为什么要先说一下 Executor 呢?因为我认为 Executor 是线程池的一个驱动,我们平常创建并执行线程用的一般都是 new Thread().start() 这个方法,这个方法更多强调 创建一个线程并开始运行。而我们后面讲到创建线程池更多体现在驱动执行上。
Executor 的总体框架如下,我们下面会对 Executor 框架中的每个类进行介绍。
我们首先来认识一下 Executor
Executor 接口
Executor 是 java.util.concurrent
的顶级接口,这个接口只有一个方法,那就是 execute
方法。
1 | public interface Executor { |
我们平常创建并启动线程会使用 new Thread().start()
,而 Executor 中的 execute 方法替代了显示创建线程的方式。Executor 的设计初衷就是将任务提交和任务执行细节进行解藕。使用 Executor 框架,你可以使用如下的方式创建线程:
1 | Executor executor = Executors.xxx // xxx 其实就是 Executor 的实现类,我们后面会说 |
execute方法接收一个 Runnable
实例,它用来执行一个任务,而任务就是一个实现了 Runnable 接口的类,但是 execute 方法不能接收实现了 Callable
接口的类,也就是说,execute 方法不能接收具有返回值的任务。
execute 方法创建的线程是异步执行的,也就是说,你不用等待每个任务执行完毕后再执行下一个任务。
比如下面就是一个简单的使用 Executor 创建并执行线程的示例:
1 | public class RunnableTask implements Runnable{ |
Executor 就相当于是族长,大佬只发号令,族长让你异步执行你就得异步执行,族长说不用汇报
任务你就不用回报,但是这个族长管的事情有点少,所以除了 Executor 之外,我们还需要认识其他管家,比如说管你这个线程啥时候终止,啥时候暂停,判断你这个线程当前的状态等,ExecutorService
就是一位大管家。
ExecutorService接口
1 | public interface ExecutorService extends Executor { |
ThreadPoolExecutor
源码解析
我们先来聊聊线程池状态,线程池状态是一个非常有趣的设计点,ThreadPoolExecutor 使用 ctl
来存储线程池状态,这些状态也叫做线程池的生命周期
。想想也是,线程池作为一个存储管理线程的资源池,它自己也要有这些状态,以及状态之间的变更才能更好的满足我们的需求。ctl 其实就是一个 AtomicInteger
类型的变量,保证原子性
。
ctl 除了存储线程池状态之外,它还存储 workerCount
这个概念,workerCount 指示的是有效线程数,workerCount 表示的是已经被允许启动但不允许停止的工作线程数量。workerCount 的值与实际活动线程的数量不同。
ctl 高低位来判断是线程池状态还是工作线程数量,线程池状态位于高位。
这里有个设计点,为什么使用 AtomicInteger 而不是存储上线更大的 AtomicLong 之类的呢?
Lea 并非没有考虑过这个问题,为了表示 int 值,目前 workerCount 的大小是(2 ^ 29)-1(约 5 亿个线程),而不是(2 ^ 31)-1(20亿个)可表示的线程。如果将来有问题,可以将该变量更改为 AtomicLong。但是在需要之前,使用 int 可以使此代码更快,更简单,int 存储占用存储空间更小。
runState 具有如下几种状态:
1 | private static final int RUNNING = -1 << COUNT_BITS; |
我们先上状态轮转图,然后根据状态轮转图做详细的解释。
任务添加策略
当一个任务被添加进线程池时:
- 线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务
- 线程数量达到了 corePools,则将任务移入队列等待
- 队列已满,新建线程(非核心线程)执行任务
- 队列已满,总线程数又达到了 maximumPoolSize,就会由基于饱和策略进行处理(RejectedExecutionHandler)或者抛出异常
说白了就是先利用核心线程,核心线程用完,新来的就加入等待队列,一旦队列满了,那么只能开始非核心线程来执行了。
上面的策略,会在阅读代码的时候体现出来,并且在代码中也能窥探出真正复用空闲线程的实现原理。
execute执行任务
接下来我们就从线程池执行任务的入口分析。
一个线程池可以接受任务类型有Runnable和Callable,分别对应了execute和submit方法。目前我们只分析execute的执行过程。
1 | /** |
总结一下,execute的执行逻辑就是:
如果 当前活动线程数 < 指定的核心线程数,则使用 addWorker 创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);如果创建成功,那么 execute 方法会直接返回。如果没创建成功,可能是由于线程池已经 shutdown,可能是由于并发情况下 workerCountOf(c) < corePoolSize ,别的线程先创建了 worker 线程,导致 workerCoun t>= corePoolSize。
如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;
如果线程池还在 Running 状态,会将 task 加入阻塞队列,加入成功后会进行 double-check
双重校验,继续下面的步骤,如果加入失败,可能是由于队列线程已满,此时会判断是否能够加入线程池中,如果线程池也满了的话,就会直接执行拒绝策略,如果线程池能加入,execute 方法结束。
如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);
从代码中我们也可以看出,即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子,那么我们继续来看看addWorker方法。
根据源码画出的执行流程图:
addWorker添加任务
从上面的执行流程可以看出,添加一个 worker 涉及的工作也非常多,这也是一个比价难啃的点,我们一起来分析下,这是 worker 的源码:
1 | /** |
方法虽然有点长,但是我们只考虑两个关键的地方,先是创建一个worker对象,创建成功后,对线程池状态判断成功后,就去执行该worker对象的thread的启动。
这个方法的执行流程图如下:
[TOC]
也就是说在这个方法里面启动了一个关联到worker的线程,但是这个线程是如何执行我们传进来的runnable任务的呢?接下来看看这个Worker对象到底做了什么。
worker 对象
Worker 位于 ThreadPoolExecutor
内部,它继承了 AQS 类并且实现了 Runnable 接口。Worker 类主要维护了线程运行过程中的中断控制状态。它提供了锁的获取和释放操作。在 worker 的实现中,我们使用了非重入的互斥锁而不是使用重复锁,因为 Lea 觉得我们不应该在调用诸如 setCorePoolSize 之类的控制方法时能够重新获取锁。
最重要的构造方法:
1 | Worker(Runnable firstTask) { |
构造一个 worker 对象需要做三步操作:
初始 AQS 状态为 -1,此时不允许中断 interrupt(),只有在 worker 线程启动了,执行了 runWorker() 方法后,将 state 置为0,才能进行中断。
将 firstTask 赋值给为当前类的全局变量
通过
ThreadFactory
创建一个新的线程。
任务运行
我们前面的流程主要分析了线程池的 execute 方法的执行过程,这个执行过程相当于是任务提交过程,而我们下面要说的是从队列中获取任务并运行的这个工作流程。
一般情况下,我们会从初始任务开始运行,所以我们不需要获取第一个任务。否则,只要线程池还处于 Running 状态,我们会调用 getTask
方法获取任务。getTask 方法可能会返回 null,此时可能是由于线程池状态改变或者是配置参数更改而导致的退出。还有一种情况可能是由于 异常
而引发的,这个我们后面会细说。
下面来看一下 runWorker
方法的源码: