ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造函数,实际上,其中的三个都是调用的剩下一个 ,现在分析一下其中几个参数的含义,充分理解后,可以尝试调整一下特征库服务中的那个线程池的参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
int corePoolSize:核心线程池的大小。在创建了线程池以后,默认情况下线程池中没有任何线程,而是等到有任务提交上来后才会 新建线程,除非显示的调用prestartAllCoreThreads()或者prestartCoreThread()方法,这两个方法是预先创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执 行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
-
int maximumPoolSize:线程池的最大线程数,指的是这个线程池最多能够创建多少个线程;
-
long keepAliveTime:表示线程没有任务执行时,最多存活多长时间就终止。默认情况下,只有当线程池中的线程数大于 corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:即如果线程池中的线程数目大于 corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。 除非显示调用allowCoreThreadTimeOut方法,则在线程池中的线程数目不大于corePoolSize时,keepAliveTime参数也 会起作用,直到线程池中的线程数目为0。
-
TimeUnit unit:参数keepAliveTime的时间单位,在TimeUnit类中的静态变量,共有7种单位。
-
BlockingQueue workQueue:一个阻塞队列,用来存放提交的待执行任务,这个参数的选择会对你的线程池运行效果 产生很大影响,一般来说,阻塞队列有以下几种选择:
- ArrayBlockingQueue,有界阻塞队列
- LinkedBlockingQueue(比较常用),链表结构理论上可以无界,但一般我们也指定一个大小,使其成为有界阻塞队列。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue,内部不进行任何存储,当前线程添加一个任务入队时,当前线程和后续添加任务的线程都会被阻塞,直到有一个线程从队列中取出任务。Executors.newCachedThreadPool使用了这个队列
一个思考:你能说出上面几种阻塞队列的区别吗?
-
ThreadFactory threadFactory:线程工厂,主要用来创建线程。
-
RejectedExecutionHandler handler:拒绝策略。表示当拒绝处理任务时的策略:当任务提交过来以后,发现线程池中的所有 线程都处于工作中,找不到一个空闲的线程(此时意味着阻塞队列已满),就会将这个任务拒绝,主要策略有以下几种,都是 ThreadPoolExecutor的内部类:
- ThreadPoolExecutor.AbortPolicy:丢弃该任务并抛出RejectedExecutionException异常
- ThreadPoolExecutor.DiscardPolicy:仅丢弃任务,不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新执行该任务
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务
这就是ThreadPoolExecutor类的构造函数的参数含义,我们再来看一下这个类继承的一些父类以及父接口的内容,理解一下它们 之间的关系。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面 意思可以理解,就是用来执行传进去的任务的; 然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
- execute()
- submit()
- shutdown()
- shutdownNow()
execute()方法其实是顶层接口Executor声明的方法,ThreadPoolExecutor类提供了实现,这也是我们今天研究的类的核心 方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在 ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,去看submit()方法的实现,会发现它实际上 还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
shutdown()和shutdownNow()是用来关闭线程池的。
有了上面的基础后,我们再来深入分析一下线程池的具体实现原理。
一、线程池状态
在ThreadPoolExecutor中定义了一个volatile变量runState,保证线程之间可见,表示线程池的状态,可以有以下几种取值:
- RUNNING 创建线程池时,线程池处于running状态
- SHUTDOWN 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不接受新的任务,然后等待所有任务执行完毕
- STOP 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不接受新的任务,并且会尝试终止正在执行的任务
- TIDYING 当所有任务已经终止时,ctl记录的"任务数量"为0,线程池就会变成TIDYING状态。当线程池变为此状态时,会执行 钩子函数terminated(),terminated()函数在ThreadPoolExecutor中是空的,可以自己覆盖,定义想要的行为。
- TERMINATED 线程池彻底终止,执行完terminated()方法后,就会从TIDYING—>TERMINATED。
二、任务的执行
我们先再来看一下ThreadPoolExecutor中的一些成员变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //ctl是一个32位的整形数字,高三位用来存储线程池的状态,其余位的数值表示线程池中正在运行的线程数
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private volatile long keepAliveTime; //线程存活时间,默认只对非核心线程有效
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
下面我们看一下ThreadPoolExecutor的核心方法execute(),源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); //这个变量存储的是线程池的状态以及当前正在执行任务的线程数目
if (workerCountOf(c) < corePoolSize) { //如果正在执行任务的线程数目小于 核心线程数目,则调用addWorker()方法将新建一个核心线程,并执行任务
if (addWorker(command, true))
return;
c = ctl.get(); //如果调用失败了,则重新获取线程池状态变量
}
if (isRunning(c) && workQueue.offer(command)) { //执行到此处,说明可能发生了以下两种情况之一:1、当前运行的线程数已经大于等于核心线程数;2、线程池已经被关
//闭(addWoker调用失败的原因),所以在此处判断一下线程池的运行状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //再次验证线程池状态,如果线程池已经不是运行状态了,则把刚刚添加进队列的任务移除,并拒绝
reject(command);
else if (workerCountOf(recheck) == 0) //如果线程池还是正常运行中,判断正在运行的线程数是否等于0,等于0则添加一个新的线程执行该任务
addWorker(null, false);
}
else if (!addWorker(command, false)) //线程池处于shutdown以上的状态,拒绝。
reject(command);
}
上面execute()方法中,首先判断当前运行的线程数是否小于核心线程数,小于则直接创建一个核心线程来执行该任务;否则就尝试将其添加至阻塞队列中,添加成功后,再次判断一下当前有没有正在执行任务的 线程,防止刚添加进来就没有工作线程worker了,如果添加不成功,则队列已满,会直接拒绝该任务。execute()的核心方法是addWorker()方法,这里面确保了每一个工作线程worker,在执行完任务以后 ,就直接去队列头部取任务了。代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池状态至少是shutdown状态,且传入的task和队列为空至少有一个不满足,则直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}