构建函数
1 | /** |
参数说明
- corePoolSize(线程池的基本大小)
- 当提交一个任务到线程池时,线程池会创建一个线程来执行任务
- 即使其他空闲的基本线程能够执行新任务也会创建线程
- 等到需要执行的任务数大于线程池基本大小时就不再创建
- 调用prestartAllCoreThreads()后,线程池会提前创建并启动所有基本线程
- maximumPoolSize(线程池最大数量)
- 允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数
- 则线程池会再创建新的线程执行任务
- 无线队列会使该参数失效
- keepAliveTime(线程活动保持时间)
- 线程池的工作线程空闲后,保持存活的时间
- 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
- unit(线程活动保持的时间单位)
- 天(DAYS)
- 小时(HOURS)
- 分钟(MINUTES)
- 毫秒(MILLISECONDS)、
- 微秒(MICROSECONDS,千分之一毫秒)
- 纳秒(NANOSECONDS,千分之一微秒)
- workQueue(任务队列):用于保存等待执行的任务的阻塞队列
- ArrayBlockingQueue:
- 基于数组结构的有界阻塞队列,排序规则:FIFO
- LinkedBlockingQueue:
- 基于链表结构的阻塞队列,排序规则:FIFO,
- 吞吐量通常要高于ArrayBlockingQueue。
- Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:
- 一个不存储元素的阻塞队列(读写交换执行,否则会阻塞。)
- 吞吐量通常要高于Linked-BlockingQueue,
- Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:
- 一个具有优先级的无限阻塞队列。
- ArrayBlockingQueue:
- threadFactory(线程工厂)
- 设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
- 使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
- handler(拒绝策略)
- 当队列和线程池都满了(饱和状态)或关闭时,那么必须采取一种策略处理提交的新任务
- 这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常
- 具体策略包括:
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:只用调用者所在线程来运行任务
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
- DiscardPolicy:不处理,丢弃掉
- 实现RejectedExecutionHandler接口自定义策略
排队的方式
- 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
- 有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
任务封装
线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。
工作线程定义
1 | /** |
工作线程任务的真正执行
1 | /** |
1 | /** |
1 | /** |
任务提交

- 主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。
- 然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callable
task))。 - 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
- 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
说明:
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
任务执行过程
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务,即使此时线程池中存在空闲线程;(需要使用全局锁处理)
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入workQueue队列,等待线程池中任务调度执行;
- 如果这时候workQueue队列满了(无法加入队列),而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;(需要使用全局锁处理)
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,会出行对应的拒绝策略。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小,当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭


ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),基本上都是加入到等待队列中处理。
任务执行分析

任务提交执行代码
1 | /** |
1 | /** |
线程池关闭
使用shutdown
或shutdownNow
方法关闭线程池。它原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt
方法来中断线程,所以无法响应中断的任务可能永远无法终止。
- shutdownNow:任务要不求执行完成
- 将线程池的状态设置成STOP,
- 尝试停止所有的正在执行或暂停任务的线程,
- 返回等待执行任务的列表
- shutdown:任务需要执行完成
- 将线程池的状态设置成SHUTDOWN状态
- 中断所有没有正在执行任务的线程。
注意事项
isShutdown
方法在调用shutdown
或shutdownNow
之后立即返回trueisTerminaed
当所有任务都关闭后,表示线程池关闭成功,并且返回true
shutdown
1 | /** |
shutdownNow
1 | /** |
线程池监控
taskCount:线程池需要执行的任务数量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* -- 返回曾经计划执行的任务总数的近似值(由于任务和线程的状态可能在计算期间发生变化)
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 已经执行完成的任务数
long n = completedTaskCount;
// 加上任务中执行完成的数量
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
// 加上阻塞队列的大小
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
* - 完成的任务数,只有在工作线程完成之后才更新该值,需要通过mainLock下获取
*/
private long completedTaskCount;
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}getActiveCount:获取活动的线程数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法
参考
- 《Java并发编程的艺术》
- 《Java并发编程》
- 《Java多线程编程核心技术》