ThreadPoolExecutor搞不懂?看这篇就够了
前言 本文部分内容摘抄自throwable作者的博文 ,有些部分作者已经总结的很好,没必要再重复发明轮子,在这里感谢作者。本文的亮点在于:
对ThreadPoolExecutor
的依赖关系,作了细致的分解以宏观层面了解Doug Lea
的设计理念。
总结了ThreadPoolExecutor
的核心流程,归纳了重要的结论。
ThreadPoolExecutor
源码逐行解析。
例举了场景,加深理解线程池工作原理。
由于ThreadPoolExecutor
的代码逻辑和设计理念比较复杂,建议大家学习的时候,多思考,顺着代码的思路,多动动笔画一画才能加深理解。
提出问题 这些问题也是我当时的疑惑点,按本文的思路学习ThreadPoolExecutor
,这些问题迎刃而解。
1、创建一个线程池需要哪些参数?
2、线程池中的状态都有哪些?是如何实现的?
3、线程池为什么需要工作队列?有哪几种工作队列?
4、AQS在线程池中扮演什么角色?它的作用是什么?
5、线程池在shutdown
的时候,如何保证任务不丢失?如何保证任务正常执行完毕?
、线程池在shutdown
之后还能提交任务进来嘛?线程池在shutdownNow
之后还能提交任务进来吗?
9、ThreadPoolExecutor
是如何控制工作线程能够重复利用的。
10、ThreadPoolExecutor
的执行结果是如何到Futrue
中的?
前置知识复习 线程状态 如下图所示,Java中线程可分为NEW,RUNABLE,RUNING,BLOCKED,WAITING,TIMED_WAITING,TERMINATED 共七个状态,一个状态是如何过渡到另一个状态图中标识的很清楚。
初始状态(NEW) 实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态。
就绪状态(RUNNABLE) 就绪状态只是说你资格运行,调度程序没有挑选到你,你就永远是就绪状态。
调用线程的start() 方法,此线程进入就绪状态。
当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入就绪状态。
当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。
锁池里的线程拿到对象锁后,进入就绪状态。
运行中状态(RUNNING) 线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
阻塞状态(BLOCKED) 阻塞状态是线程阻塞在进入synchronized关键字(当然也包括ReentrantLock)修饰的方法或代码块(获取锁)时的状态。
等待(WAITING) 处于这种状态的线程不会被分配CPU执行时间,它们要等待被显式地唤醒,否则会处于无限期等待的状态。
超时等待(TIMED_WAITING) 处于这种状态的线程不会被分配CPU执行时间,不过无须无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。
终止状态(TERMINATED) 当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。
线程池的作用 1、减少了创建和销毁线程的次数,可以重复利用工作线程
2、规范化线程的使用,避免创建过多的线程导致耗尽系统资源
3、丰富的线程管理手段和结果控制
ThreadPoolExecutor的原理 由ThreadPool原理图所示,ThreadPoolExecutor
的核心组件包括:
ThreadFactory:线程工厂,用于创建工作线程,默认的线程工厂是Executors.defaultThreadFactory()
workQueue:阻塞工作队列,常用的有ArrayBlockingQueue
,LinkedBlockingQueue
工作线程池:内部包含了cw(core worker)
和aw(additional worker)
RejectedExecutionHandler:拒绝执行处理器
类的设计 ThreadPoolExecutor
的类图。
Executor Executor
该接口提供了一种将任务提交与每个任务将如何运行的机制,包括线程执行,调度等。Doug Lea
将线程的执行抽象为任务(task)
,任务
由各种实现Executor
的执行器执行,具体执行的内容在执行器中回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface Executor { void execute (Runnable command) ; }
ExecutorService ExecutorService
实现了Executor
接口,它提供了一系列方法管理终止(manage termination)
和用于追踪一个或多个异步任务最终返回Future
的方法。ExecutorService
可以关闭(shutdown)
,关闭之后会拒绝新的任务。定义了两种关闭ExecutorService
的方法。
shutdown()
启动一个有序关闭,在该关闭中先执行已提交的任务,单不接受任何新的任务。如果执行器已经关闭,再次调用该方法也不会产生影响。该方法不负责等待已提交任务完成执行,awaitTermination
来完成此项职责,在执行器发起shutdown request
之后,它将阻塞直到所有任务已完成,或者触发了timeout
,或者线程被中断(interrupted)
,无论这三种情况中的哪一种先触发,都会解除阻塞。
shutdownNow()
尝试停止所有正在执行的任务,忽略正在等待的任务,并返回正在等待执行的任务的列表。该方法不负责等待已提交任务完成执行,awaitTermination
来完成此项职责。除了尽最大努力尝试停止处理正在执行的任务之外,没有任何保证。比如,终止线程的方法是通过Thread.interupt()
方法实现,但是如果任务中没有处理过中断,则shutdownNow()
也无计可施,只能等待任务自然执行完毕。
终止(termination)
时,执行器中没有任务在执行,没有任务在等待执行,更没有任务可以提交进执行器。终止后应关闭执行器,释放资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException ; <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException ; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ;}
Futrue Futrue 代表了异步执行的结果。提供了检测执行完成的方法,等待完成的方法,重试完成方法等控制结果的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public interface Future <V > { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException ; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ;}
一般情况下常用的实现是FutureTask
,它是一个可取消的异步执行结果集,它实现了Futrue接口中定义的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class FutureTask <V > implements RunnableFuture <V > { private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }
AbstractExecutorService AbstractExecutorService
提供了ExecutorService
的默认实现,实现了submit
,invokeAny
,invokeAll
,通过FutureTask
类对Runnable执行过程进行包装。
ThreadPoolExecutor ThreadPoolExecutor
是线程池实现类,它依赖了上面提到的所有类,也是本篇博文重点讲述的类。
总结 通过对ThreadPoolExecutor
依赖的类的了解,更能从宏观上理解作者的设计思想。Executor
将并发执行的线程抽象成任务,交给任务执行器来执行 ,ExecutorService
定义了任务执行器的关闭和提交任务。
线程池状态 如下代码所示,线程池的状态分为了
RUNNING:接受新的任务和处理队列中的任务
SHUTDOWN:拒绝新的任务,但是处理队列中的任务
STOP:拒绝新的任务,不处理队列中的任务,并且中断在执行的任务
TIDYING:所有任务已经终止(terminated),workerCount=0,线程
TERMINATED:terminated已完成
线程池的状态按如下方式进行转换
RUNNING->SHUTDOWN 调用shutdown()
方法
(RUNNING or SHUTDOWN) -> STOP,调用shutdownNow()
SHUTDOWN -> TIDYING,当队列和线程池都为空
STOP->TIDYING,线程池为空
TIDYING -> TERMINATED,当terminated()
hook method 执行完毕,所有线程都在awaitTermination()
中等待线程池状态到达TERMINATED。
1 2 3 4 5 6 7 8 9 10 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
核心流程 任务提交核心流程 根据下图所示的任务提交核心流程,主要是ThreadPoolExecutor#execute()
方法,核心流程总结为以下结论:
核心线程数小于corePoolSize
,则需要创建新的工作线程来执行任务
核心线程数大于等于corePoolSize
,需要将线程放入到阻塞任务队列中等待执行
队列满时需要创建非核心线程来执行任务,所有工作线程(核心线程+非核心线程)数量要小于等于maximumPoolSize
如果工作线程数量已经达到maximumPoolSize
,则拒绝任务,执行拒绝策略
TIP:这里需要注意对核心线程 和非核心线程 的理解,它们不是工作线程的状态,核心线程提出的目的,是为了尽量维持工作线程的数量在corePoolSize
。
创建工作线程核心流程 根据下图对java.util.concurrent.ThreadPoolExecutor#addWorker
的分析,可以得出如下结论:
只有工作线程被添加到线程池中并且工作线程start,才算添加成功,否则Worker
对象只是一个临时对象(被GC掉)
状态为STOP
,TIDYING
,TERMINATED
这三种时,是不会增加工作线程的
状态为SHUTDOWN
时是一个重要的边界条件
任务执行核心流程 如下图是任务执行的流程,可以总结如下:
提交任务时如果工作线程 数量小于核心线程数量,则firstTask != null
,一路顺利执行然后阻塞在队列的poll上。
提交任务时如果工作线程 数量大于等于核心线程数量,则firstTask == null
,需要从任务队列中poll一个任务执行,执行完毕之后继续阻塞在队列的poll上。
提交任务时如果工作线程 数量大于等于核心线程数量并且任务队列已满,需要创建一个非核心线程 来执行任务,则firstTask != null
,执行完毕之后继续阻塞在队列的poll上,不过注意这个poll是允许超时的,最多等待时间为keepAliveTime
。
工作线程在跳出循环之后,线程池会移除该线程对象,并且试图终止线程池(因为需要考量shutdown的情况)
ThreadPoolExecutor
提供了任务执行前和执行后的钩子方法,分别为beforeExecute
和afterExecute
。
工作线程通过实现AQS
来保证线程安全(每次执行任务的时候都会lock
和unlock
)
线程池关闭核心流程 如下图是shutdown
方法的流程图,它是一种“温和“的关闭方式,执行完shutdown
之后,会把线程状态置为SHUTDOWN
,线程池并不会立即关闭,而是先中断能中断的工作线程,有些工作线程正在执行任务中,那么就先不中断,等待它们执行完毕之后中断工作线程。
如下图是shutdownNow
的流程图,与shutdown
不同的地方在于,它关闭线程池的方式比较“粗暴”,直接把线程池状态置为STOP
。不管工作线程是否在执行,全部一一中断,也不等待队列中未执行的任务,把它们返回给客户线程由客户线程自行处置。
ThreadPoolExecutor源码分析 源码分析主要从重要成员变量和核心方法两个层面来讲解。
关键属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers = new HashSet<>(); private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int maximumPoolSize; private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; } }
状态控制 在ThreadPoolExecutor中是通过位运算来处理状态的 。位运算的基础是理解原码 ,反码 ,补码 ,用int ctl = 1来举例
原码: 0000 0000 0000 0000 0000 0000 0000 0001
,十进制是1
反码:1111 1111 1111 1111 1111 1111 1111 1110
,十进制是-2
补码:1111 1111 1111 1111 1111 1111 1111 1111
, 十进制是-1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; }
工作线程为0的前提下,小结下线程池的运行状态:
状态名称
位图
十进制值
描述
RUNNING
111-00000000000000000000000000000
-536870912
运行中状态,可以接收新的任务和执行任务队列中的任务
SHUTDOWN
000-00000000000000000000000000000
0
shutdown状态,不再接收新的任务,但是会执行任务队列中的任务
STOP
001-00000000000000000000000000000
536870912
停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务
TIDYING
010-00000000000000000000000000000
1073741824
整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated()
TERMINATED
011-00000000000000000000000000000
1610612736
终结状态,钩子方法terminated()
执行完毕
这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl
进行比较,或者使用ctl
和线程池状态常量进行比较 )来比较和判断线程池的状态:
工作线程数为0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static boolean runStateLessThan (int c, int s) { return c < s; } private static boolean runStateAtLeast (int c, int s) { return c >= s; } private static boolean isRunning (int c) { return c < SHUTDOWN; }
最后是线程池状态的跃迁图:
核心方法总体概括 execute方法源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
这里简单的分析下流程:
如果工作线程数量小于核心线程数量workerCountOf(c) < corePoolSize
,则直接创建核心线程执行任务
如果工作线程数量大于等于核心线程数量,判断线程池状态,并把任务放入到阻塞队列中等待调度。这里会进行二次检查线程池的状态,如果当前工作线程数量为0,则创建一个非核心线程并传入的任务对象为null。
如果添加任务队列失败,则说明阻塞队列已满,尝试创建非核心线程
如果创建非核心线程失败,则证明线程池已经饱和,调用拒绝策略执行任务
这里是一个疑惑点 :为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。
addWorker方法源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
通过addWorker
方法,得出一个很重要的结论
线程池状态 > SHUTDOWN时(即为STOP,TIDYING,TERMINATED这三种),线程池会拒绝所有的新提交任务。
线程池状态为SHUTDOWN的时候,并不会拒绝全部的任务,只会拒绝firstTask不为空,或者firstTask为空并且workQueue为空。其中firstTask是一个关键点,在工作线程数量小于coreSize或者队列满并且线程数量没有达到maxSize时,firstTask不为空。
所以基于以上分析,当调用threadPool.shutdown()之后,核心线程是提交不进来的,队列满之后的临时线程也提交不进来,只有队列不为空的情况下才能提交进来,直到队列中的任务被全部处理完毕,所有线程就都提交不进来了。
工作线程内部类Worker源码分析 可以看到Worker
实现了AQS
,实现AQS的目的也是为了保证工作线程内部的线程安全和shutdown
时判定工作线程是否在工作中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } } private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
getTask方法源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
这个方法中,有两处十分庞大的if
逻辑,对于第一处if
可能导致工作线程数减去1直接返回null
的场景有:
线程池状态为SHUTDOWN
,一般是调用了shutdown()
方法,并且任务队列为空。
线程池状态为STOP
。
对于第二处if
,逻辑有点复杂,先拆解一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 复制 boolean b1 = wc > maximumPoolSize;boolean b2 = timed && timedOut;boolean b3 = wc > 1 ;boolean b4 = workQueue.isEmpty();boolean r = (b1 || b2) && (b3 || b4);if (r) { if (compareAndDecrementWorkerCount(c)){ return null ; }else { continue ; } }
这段逻辑大多数情况下是针对非核心线程。在execute()
方法中,当线程池总数已经超过了corePoolSize
并且还小于maximumPoolSize
时,当任务队列已经满了的时候,会通过addWorker(task,false)
添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)
的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize
。如果对于非核心线程,上一轮循环获取任务对象为null
,这一轮循环很容易满足timed && timedOut
为true,这个时候getTask()
返回null会导致Worker#runWorker()
方法跳出死循环,之后执行processWorkerExit()
方法处理后续工作,而该非核心线程对应的Worker
则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut
设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime
的意义:
当允许核心线程超时,也就是allowCoreThreadTimeOut
设置为true的时候,此时keepAliveTime
表示空闲的工作线程的存活周期。
默认情况下不允许核心线程超时,此时keepAliveTime
表示空闲的非核心线程的存活周期。
在一些特定的场景下,配置合理的keepAliveTime
能够更好地利用线程池的工作线程资源。
processWorkerExit方法源码分析 processWorkerExit是在工作线程跳出工作队列死循环时,更新线程池数量,线程池容器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
tryTerminate方法源码分析 每个工作线程在工作结束之后都会调用tryTerminate
方法,总结为:
满足两种情况就会把线程池状态置为TERMINATED 1、状态为SHUTDOWN切队列数量为0,工作线程数量为0,对应shutdown关闭的情况 2、状态为STOP且工作线程数量为0,对应shutdownNow关闭的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
关闭线程池方法源码分析 首先和关闭线程池相关的方法有三个shutdown()
,shutdownNow()
,awaitTermination(long timeout, TimeUnit unit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true ; if (nanos <= 0 ) return false ; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
总结下这三个终止线程池的方法,shutdown()和shutdownNow()是中断所有空闲工作线程 ,如果线程在执行Runnable#run()
,那么这个工作线程是不会被中断的,而是等待下一轮执行getTask()
方法的时候,通过线程池状态判断正常终结该工作线程。
awaitTermination
很容易被忽略,因为对线程池状态不了解的人,是不会第一时间想到这个方法的作用的。它的好处是调用改方法的线程会阻塞直到线程状态更新为TERMINATED
才返回,某些需要感知线程池终止的场景需要调用该方法来终止线程池。
reject方法源码分析 拒绝方法的实现很简单了,就是一个回调函数,具体的拒绝策略需要应用者来实现。
1 2 3 final void reject (Runnable command) { handler.rejectedExecution(command, this ); }
场景举例 通过对ThreadPoolExecutor
的核心流程和源码的研究,相信大家对它的设计思想和实现原理有了一定的掌握,接下来需要结合实际场景分析下,本文整理了几个景点的场景,分析一下具体的流程,进一步加深理解。
场景1:线程池刚刚创建,第一次submit(Task),然后执行Futrue.get()
等待执行结果。
此刻线程池状态是RUNNING
,工作线程数量为零,其数量肯定小于corePoolSize
,提交的任务会交给核心线程,且firstTask不为空。工作线程创建好之后,会立即start()
,线程开始执行runWorker
函数,runWorker
中会判断线程的状态不是STOP
,这时会执行任务对象的run()
(一般情况下是Callable接口的run,Callable接口的run又调用了真正的用户执行逻辑)。在工作线程执行完毕之后,Callable
的结果赋值给FutrueTask.outcome
且FutrueTask.status == COMPLETING
,Futrue.get()
解除阻塞,结果返回。
场景2:线程池工作线程数量等于corePoolSize
,队列未满,这时submit(Task),然后执行Futrue.get()
等待执行结果。
此刻线程池状态是RUNNING
,工作线程数量等于corePoolSize
,提交的任务会加入到阻塞队列中。工作线程们“嗷嗷待哺”(阻塞在队列上),这时会有一个工作线程争抢到食物(Task),剩下的流程同场景1。
场景3:线程池工作线程等于corePoolSize
,工作队列已满,这时submit(Task),然后执行Futrue.get()
等待执行结果。
此刻线程池状态是RUNNING
,由于队列已满,需要加入非核心线程来执行任务,非核心线程数量=maximumPoolSize - corePoolSize ,加入新的工作线程且firstTask不为空。剩下的执行流程同场景1。
场景4:执行shutdown()
操作。
在执行shutdown()
之后,首先进行安全性检查,并将线程池状态设置为SHUTDOWN
,中断未执行的线程(通过tryLock可知,在执行的线程是被锁定的),执行中的线程不中断 。线程一旦中断,阻塞在队列take()
方法或者poll()
方法就会抛出InteruptException
,getTask()
方法内部的自旋重启,因为此刻线程池状态是SHUTDOWN
,如果工作队列为空,就直接返回null;如果工作队列不为空,则继续take()
出任务交给工作线程执行,知道队列为空,返回null,逐步使工作线程进入Terminated
状态。在执行shutdown
之后,所有的新提交任务,都会被拒绝,已提交的任务会等待执行完毕。最终线程池状态会变为TERMINATED
,如果想获取到线程池终结的事件,需要调用awaitTermination`方法。
场景4:执行shutdownNow()
操作。
在执行shutdownNow()
之后,首先进行安全性检查,并将线程池状态设置为STOP
,中断所有工作线程(不论工作线程是否在工作中),并移除工作队列中全部的任务,返回给用户自行处置 。由于工作线程全部中断,阻塞在队列take()
方法或者poll()
方法就会抛出InteruptException
,getTask()
方法内部的自旋重启,因为此刻线程池状态是STOP
,就直接返回null,跳出死循环,等待工作线程Terminated
,最后返回所有的工作队列中未执行的任务。在执行shutdownNow
之后,所有的新提交任务,都会被拒绝,未执行的任务会返回给用户自行处理,在执行中的任务虽然工作线程已经被中断,除非任务中处理了中断状态,否则不影响任务执行。
总结 ThreadPoolExecutor
作为Java并发编程中最常使用的工作类,研究其实现原理不仅能够熟练运用线程池写出高效的并发代码,避免采坑之外,还能够发掘作者再创作过程中的“点睛之笔”。
ThreadPoolExecutor
中,大量运用了位计算来提高性能,线程池的状态和工作线程的数量,都交给一个32位的ctl
变量控制,通过把线程
抽象为Worker
和全局锁mainLock
,来保证工作线程执行任务的线程安全,巧妙的运用阻塞队列,来避免线程数量暴增,并通过中断
来控制工作线程阻塞与否,线程的终止过程,即可“温和”也可“粗暴”,整个线程池设计精妙,应用者使用如丝般顺滑。
参考文档 JUC线程池ThreadPoolExecutor源码分析