ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造函数,实际上,其中的三个都是调用的剩下一个 ,现在分析一下其中几个参数的含义,充分理解后,可以尝试调整一下特征库服务中的那个线程池的参数。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • int corePoolSize:核心线程池的大小。在创建了线程池以后,默认情况下线程池中没有任何线程,而是等到有任务提交上来后才会 新建线程,除非显示的调用prestartAllCoreThreads()或者prestartCoreThread()方法,这两个方法是预先创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执 行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

  • int maximumPoolSize:线程池的最大线程数,指的是这个线程池最多能够创建多少个线程;

  • long keepAliveTime:表示线程没有任务执行时,最多存活多长时间就终止。默认情况下,只有当线程池中的线程数大于 corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:即如果线程池中的线程数目大于 corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。 除非显示调用allowCoreThreadTimeOut方法,则在线程池中的线程数目不大于corePoolSize时,keepAliveTime参数也 会起作用,直到线程池中的线程数目为0。

  • TimeUnit unit:参数keepAliveTime的时间单位,在TimeUnit类中的静态变量,共有7种单位。

  • BlockingQueue workQueue:一个阻塞队列,用来存放提交的待执行任务,这个参数的选择会对你的线程池运行效果 产生很大影响,一般来说,阻塞队列有以下几种选择:

    • ArrayBlockingQueue,有界阻塞队列
    • LinkedBlockingQueue(比较常用),链表结构理论上可以无界,但一般我们也指定一个大小,使其成为有界阻塞队列。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue,内部不进行任何存储,当前线程添加一个任务入队时,当前线程和后续添加任务的线程都会被阻塞,直到有一个线程从队列中取出任务。Executors.newCachedThreadPool使用了这个队列

    一个思考:你能说出上面几种阻塞队列的区别吗?

  • ThreadFactory threadFactory:线程工厂,主要用来创建线程。

  • RejectedExecutionHandler handler:拒绝策略。表示当拒绝处理任务时的策略:当任务提交过来以后,发现线程池中的所有 线程都处于工作中,找不到一个空闲的线程(此时意味着阻塞队列已满),就会将这个任务拒绝,主要策略有以下几种,都是 ThreadPoolExecutor的内部类:

    • ThreadPoolExecutor.AbortPolicy:丢弃该任务并抛出RejectedExecutionException异常
    • ThreadPoolExecutor.DiscardPolicy:仅丢弃任务,不抛出异常
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新执行该任务
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务

  这就是ThreadPoolExecutor类的构造函数的参数含义,我们再来看一下这个类继承的一些父类以及父接口的内容,理解一下它们 之间的关系。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面 意思可以理解,就是用来执行传进去的任务的;      然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

  • execute()
  • submit()
  • shutdown()
  • shutdownNow()

  execute()方法其实是顶层接口Executor声明的方法,ThreadPoolExecutor类提供了实现,这也是我们今天研究的类的核心 方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在 ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,去看submit()方法的实现,会发现它实际上 还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

  shutdown()和shutdownNow()是用来关闭线程池的。

  有了上面的基础后,我们再来深入分析一下线程池的具体实现原理。

一、线程池状态

  在ThreadPoolExecutor中定义了一个volatile变量runState,保证线程之间可见,表示线程池的状态,可以有以下几种取值:

  • RUNNING 创建线程池时,线程池处于running状态
  • SHUTDOWN 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不接受新的任务,然后等待所有任务执行完毕
  • STOP 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不接受新的任务,并且会尝试终止正在执行的任务
  • TIDYING 当所有任务已经终止时,ctl记录的"任务数量"为0,线程池就会变成TIDYING状态。当线程池变为此状态时,会执行 钩子函数terminated(),terminated()函数在ThreadPoolExecutor中是空的,可以自己覆盖,定义想要的行为。
  • TERMINATED 线程池彻底终止,执行完terminated()方法后,就会从TIDYING—>TERMINATED。

二、任务的执行

  我们先再来看一下ThreadPoolExecutor中的一些成员变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));   //ctl是一个32位的整形数字,高三位用来存储线程池的状态,其余位的数值表示线程池中正在运行的线程数
private final BlockingQueue<Runnable> workQueue;         //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
 
private volatile long  keepAliveTime;    //线程存活时间,默认只对非核心线程有效   
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

  下面我们看一下ThreadPoolExecutor的核心方法execute(),源码如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();           //这个变量存储的是线程池的状态以及当前正在执行任务的线程数目
        if (workerCountOf(c) < corePoolSize) {          //如果正在执行任务的线程数目小于 核心线程数目,则调用addWorker()方法将新建一个核心线程,并执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();           //如果调用失败了,则重新获取线程池状态变量
        }
        if (isRunning(c) && workQueue.offer(command)) {   //执行到此处,说明可能发生了以下两种情况之一:1、当前运行的线程数已经大于等于核心线程数;2、线程池已经被关
                                                      //闭(addWoker调用失败的原因),所以在此处判断一下线程池的运行状态
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))      //再次验证线程池状态,如果线程池已经不是运行状态了,则把刚刚添加进队列的任务移除,并拒绝
                reject(command);
            else if (workerCountOf(recheck) == 0)             //如果线程池还是正常运行中,判断正在运行的线程数是否等于0,等于0则添加一个新的线程执行该任务
                addWorker(null, false);
        }
        else if (!addWorker(command, false))           //线程池处于shutdown以上的状态,拒绝。
            reject(command);
    }

  上面execute()方法中,首先判断当前运行的线程数是否小于核心线程数,小于则直接创建一个核心线程来执行该任务;否则就尝试将其添加至阻塞队列中,添加成功后,再次判断一下当前有没有正在执行任务的 线程,防止刚添加进来就没有工作线程worker了,如果添加不成功,则队列已满,会直接拒绝该任务。execute()的核心方法是addWorker()方法,这里面确保了每一个工作线程worker,在执行完任务以后 ,就直接去队列头部取任务了。代码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程池状态至少是shutdown状态,且传入的task和队列为空至少有一个不满足,则直接返回
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }