高并发编程之线程池ThreadPoolExecutor详解

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

说了这么多首先我们需要明确我们为什么需要使用线程池?

(1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗 (2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 (3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

因此为了合理的使用线程池我们需要深入的了解一下其中的原理,首先我们来看一下Java中线程池的继承体系,如下图所示:

1、最顶层的Executor是一个接口,它里面只有一个方法:execute(),用于向线程池提交任务。

2、Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为

  • 1,execute(Runnable command):履行Ruannable类型的任务,
  • 2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
  • 3,shutdown():有序的完成已提交的任务,但是不再接收新任务,
  • 4,shutdownNow():停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。
  • 5,isTerminated():测试是否所有任务都执行完毕了。
  • 6,isShutdown():测试是否该线程池是否被关闭。

3、ThreadPoolExecutor是Java中线程池的核心实现类,用来执行被提交的任务。接下来我们就以ThreadPoolExecutor为中心来探究一下Java中线程池的原理。

一、ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,                  
                          int maximumPoolSize,               
                          long keepAliveTime,                
                          TimeUnit unit,                     
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory,       
                          RejectedExecutionHandler handler) {
    //省略.....
}    

ThreadPoolExecutor有很4个构造方法,这里我用参数最全的一个来说明一下各个参数的含义:

  • 1、corePoolSize:线程池中核心线程个数
  • 2、maximunPoolSize:线程池最大允许的线程个数(这里有一个救急线程的概念,就是非核心线程,它的数量是maximunPoolSize-corePoolSize)
  • 3、keepAliveTime:这个参数是给非核心线程设定的,他表示非核心线程可以空闲时间,超过这个时间就会被回收
  • 4、unit:空闲时间的单位
  • 5、workQueue:任务队列,它是一个阻塞队列,用于存放等待执行的任务。Java中提供的阻塞队列有以下几个:

(1)ArrayBlockingQueue:基于数组的有界队列,遵循先进先出

(2)LinkedBlockingQueue:基于链表的有界队列,遵循先进先出,吞吐量高于ArrayBlockingQueue

(3)SynchronousQueue:一个不存储元素的阻塞队列,每一个插入操作必须要等到另一个线程调用移除操作才可以执行,否则会一直阻塞,吞吐量比LinkedBlockingQueue高

(4)PriorityBlockingQueue:一个具有优先级的阻塞队列

  • 6、threadFactory:线程工厂,用于创建线程以及可以给线程起一个有意义的名字
  • 7、RejectedExecutionHandler:当任务队列和线程池都处于满负荷运行时,新提交的任务应该如何处理的策略,称为饱和策略。Java中提供的策略有以下4种:

(1)ThreadPoolExecutor.AbortPolicy:直接抛出异常,这是默认的策略

(2)ThreadPoolExecutor.CallerRunsPolicy:让调用者来执行该任务

(3)ThreadPoolExecutor.DisCardPolicy:不做任何处理,直接把任务丢掉,也不抛出任何异常

(4)ThreadPoolExecutor.DisCardOldestPolicy:丢弃任务队列头部的任务,然后尝试执行当前任务

当然,我们也可以根据我们的业务场景通过实现RejectExeptionPolicy接口实现自定义策略。

二、线程池的重要属性

1、线程池状态以及有效线程数量属性—ctl

ctl是一个控制线程池状态以及线程池中有效线程数量的int类型字段,在这一个字段中包含了两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了int类型来保存,高3位保存runState,低29位保存workerCount。这么做是为了保证修改线程池状态以及线程池中有效线程数量的操作是一个原子操作的前提前可以借助无锁机制提高效率,如果用两个变量分别保存的话就要通过加锁来保证原子性了。

COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

2、ctl有关的方法:
 // Packing and unpacking ctl                                   
 //获取线程池的运行状态
 private static int runStateOf(int c)     { return c & ~CAPACITY}  
 //获取活动线程数
 private static int workerCountOf(int c)  { return c & CAPACITY;}
 //获取运行状态和活动线程数的值
 private static int ctlOf(int rs, int wc) { return rs | wc; }   
3、线程池的5个状态

// runState is stored in the high-order bits               高3位
private static final int RUNNING    = -1 << COUNT_BITS;   //111
private static final int SHUTDOWN   =  0 << COUNT_BITS;   //000 
private static final int STOP       =  1 << COUNT_BITS;   //001
private static final int TIDYING    =  2 << COUNT_BITS;   //010
private static final int TERMINATED =  3 << COUNT_BITS;   //011
  • RUNNING:线程池处于运行状态,可以正常处理并接受新任务,线程池的初始状态就是RUNNING;
  • SHUTDOWN:当调用shutdown()方法后会由RUNNING转变为SHUTDOWN状态,此时线程池可以正常处理任务但是无法再接收新任务;
  • STOP:代用shutdownNow()方法后,线程会转变为STOP状态,此时线程不再处理已提交的任务并且无法在接受新任务,并且还会中断处理中的任务;
  • TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • TERMINATED:线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED。
4、线程池构建有关的属性
private final BlockingQueue<Runnable> workQueue;    //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
private volatile long  keepAliveTime;    //线程存货时间   
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
private volatile int   poolSize;       //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数,跟线程池的容量没有任何关系
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

三、线程池的基本实现原理

ThreadPoolExecutor中有两个方法可以用于向线程池提交任务,分别是execute()submit()两个方法。execute()方法用于提交不需要返回值的任务submit()用于提交需要返回值的任务,返回值被封装在Future接口中,可以通过提供的get()方法获取返回值。下面通过源码我们来探究一下。

1、execute()方法
public void execute(Runnable command) {
    //提交的任务为空将会抛出NPE
    if (command == null)                                            
        throw new NullPointerException();                           
    //ctl中记录着当前线程池运行状态和有效线程数量,是一个原子变量 
    int c = ctl.get();  
    /**workConutOf(c)将取出当前活跃的线程数量,把它和核心线程数比较,
    * 如果小于核心线程数就创建一个新的线程执行新提交的任务
    */
    if (workerCountOf(c) < corePoolSize) { 
        /*
         * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
         * 如果为true,根据corePoolSize来判断;
         * 如果为false,则根据maximumPoolSize来判断
         */
        if (addWorker(command, true))                               
            return;  
        //创建新的线程失败,重新获取ctl
        c = ctl.get();                                              
    }        
    
    //如果当前线程池是运行状态并且任务添加到队列成功
    if (isRunning(c) && workQueue.offer(command)) {                 
        int recheck = ctl.get();      
        // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
        // 这时需要移除该command
        // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))                
            reject(command);
         /*
         * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
         * 这里传入的参数表示:
         * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
         * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
         * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
         */
        else if (workerCountOf(recheck) == 0)                       
            addWorker(null, false);                                 
    }
    
/*
 * 如果执行到这里,有两种情况:
 * 1. 线程池已经不是RUNNING状态;
 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; 如果失败则调用饱和策略处理该任务。
 */
    else if (!addWorker(command, false))                            
        reject(command);                                            
}                                                                   

总结一下execute()方法运行的主要流程如下图所示:

从图中可以看出,当提交一个新任务当线程池以后,exectue()的处理流程大致如下:

  • 1、首先判断核心线程池中的线程是否都有处于工作状态,如果没有就创建新的线程执行任务,否则进入下一步
  • 2、判断任务队列是否已经满了,如果没有满就班新任务放入阻塞队列中等待被执行;否则进入下一步
  • 3、判断线程池的线程是否都处于工作状态,如果没有就创建新的线程执行任务,否则把任务交给饱和策略处理。
2、addWorker()方法

addWorker主要的功能就是创建一个新线程并执行提交的任务。firstTask参数表示该线程创建后执行的第一个任务;core参数如果为true表示限制线程池中的线程数量应该小于corePoolSize,为false表示限制线程池中的线程数量应该小于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) {              
    retry:                                                                 
    for (;;) {                                                             
        int c = ctl.get(); 
        //获取运行状态
        int rs = runStateOf(c);                                            
         
        /**
        * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
        * 如果这个条件满足后,下面三个条件任意一个满足就会返回false
        * 1. 当前线程池处于SHUTDOWN状态
        * 2. 提交的任务为空
        * 3. 任务队列不为空
        */
        // Check if queue empty only if necessary.                         
        if (rs >= SHUTDOWN &&                                              
            ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))           
            return false;                                                  
                                                                  
        for (;;) {
            //获取活动线程个数
            int wc = workerCountOf(c); 
            /**这里是实现比较容量大小是否超过限制的关键步骤
            *core参数如果为true表示限制线程池中的线程数量应该小于corePoolSize,
            *为false表示限制线程池中的线程数量应该小于maximumPoolSize
            */
            if (wc >= CAPACITY ||                                          
                wc >= (core ? corePoolSize : maximumPoolSize))             
                return false; 
            // 尝试CAS增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))                         
                break retry;  
            // 如果CAS增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl 
            //如果线程池的运行状态发生改变,返回第一个for循环重试
            if (runStateOf(c) != rs)                                       
                continue retry;                                            
            // else CAS failed due to workerCount change; retry inner loop 
        }                                                                  
    }                                                                      
                                                                           
    boolean workerStarted = false;                                         
    boolean workerAdded = false;                                           
    Worker w = null;                                                       
    try {  
        //根据Runnable对象创建一个Worker
        w = new Worker(firstTask);
        //获得Worker对应的线程
        final Thread t = w.thread;                                         
        if (t != null) {                                                   
            final ReentrantLock mainLock = this.mainLock;                  
            mainLock.lock();                                               
            try {                                                          
                // Recheck while holding lock.                             
                // Back out on ThreadFactory failure or if                 
                // shut down before lock acquired.                         
                int rs = runStateOf(ctl.get());                            
                
                // rs < SHUTDOWN表示是RUNNING状态
                if (rs < SHUTDOWN ||                                       
                    (rs == SHUTDOWN && firstTask == null)) {
                    //检查线程是否还活着(活着是否启动)
                    if (t.isAlive()) // precheck that t is startable       
                        throw new IllegalThreadStateException();
                    //将任务添加到阻塞队列中,他是一个HashSet
                    workers.add(w);                                        
                    int s = workers.size();
                    //largestPoolSize记录了线程池曾经出现过的最大的线程数量
                    if (s > largestPoolSize)                               
                        largestPoolSize = s;
                    //任务添加成功标记
                    workerAdded = true;                                    
                }                                                          
            } finally {                                                    
                mainLock.unlock();                                         
            }
            //最后如果添加任务成功就启动它
            if (workerAdded) {                                             
                t.start();   
                //任务启动成功标记
                workerStarted = true;                                      
            }                                                              
        }                                                                  
    } finally {                                                            
        if (! workerStarted)
            //任务启动失败后把任务从HashSet中移除
            addWorkerFailed(w);                                            
    }                                                                      
    return workerStarted;                                                  
}                                                                          
3、内部类Worker

Worker中的重要属性以及构造方法

private final class Worker                                                       
    extends AbstractQueuedSynchronizer                                           
    implements Runnable{                                                                                                                                                   
    /** Thread this worker is running in.  Null if factory fails. */             
    final Thread thread;                                                         
    /** Initial task to run.  Possibly null. */                                  
    Runnable firstTask;                                                          
    /** Per-thread task counter */                                               
    volatile long completedTasks;                                                
                                                                                 
    /**                                                                          
     * Creates with given first task and thread from ThreadFactory.              
     * @param firstTask the first task (null if none)                            
     */                                                                          
    Worker(Runnable firstTask) {                                                 
        setState(-1); // inhibit interrupts until runWorker                      
        this.firstTask = firstTask;                                              
        this.thread = getThreadFactory().newThread(this);                        
    }
}

线程池中的每一个线程被封装成了一个Worker对象,ThreadPoolExecutor维护的其实就是一组Worker对象。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用来保存传入的任务;thread是在调用构造方法时保存通过ThreadFactory创建的线程,是用来处理任务的线程。

在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。 Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  • (1)lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  • (2)如果正在执行任务,则不应该中断线程;
  • (3)如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  • (4)线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  • (5)之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS的作用是判断线程是否空闲以及是否可以被中断

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,可以看一下Worker中的tryAcquire方法:

// The value 0 represents the unlocked state
// The value 1 represents the locked state. 
protected boolean tryAcquire(int unused) {
    //尝试上锁
    if (compareAndSetState(0, 1)) {                      
        setExclusiveOwnerThread(Thread.currentThread()); 
        return true;                                     
    }                                                    
    return false;                                        
}                                                        

tryAcquire方法是根据state是否是0来判断的,所以将state设置为-1是为了禁止在执行任务前对线程进行中断

正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

4、runWorker()方法

在Worker类中的run()方法调用了runWorker()方法来执行任务,runWorker()方法的代码如下:

final void runWorker(Worker w) {                                
    Thread wt = Thread.currentThread(); 
    //尝试获取任务
    Runnable task = w.firstTask;                                
    w.firstTask = null; 
    //解锁,允许中断了
    w.unlock(); // allow interrupts 
     // 是否因为异常退出循环
    boolean completedAbruptly = true;                           
    try { 
        // 如果task为null,则通过getTask()获取任务去处理
        while (task != null || (task = getTask()) != null) {    
            w.lock();                                           
             //如果线程池正在停止,那么要保证当前线程是中断状态;
             //如果不是的话,则要保证当前线程不是中断状态;    
            if ((runStateAtLeast(ctl.get(), STOP) ||            
                 (Thread.interrupted() &&                       
                  runStateAtLeast(ctl.get(), STOP))) &&         
                !wt.isInterrupted())                            
                wt.interrupt(); 
            
            try {                                               
                beforeExecute(wt, task);                        
                Throwable thrown = null;                        
                try {
                    //执行任务,注意这里是直接调用了run()方法
                    task.run();                                 
                } catch (RuntimeException x) {                  
                    thrown = x; throw x;                        
                } catch (Error x) {                             
                    thrown = x; throw x;                        
                } catch (Throwable x) {                         
                    thrown = x; throw new Error(x);             
                } finally {                                     
                    afterExecute(task, thrown);                 
                }                                               
            } finally { 
                //将task置为null方便下次使用getTask()获取任务
                task = null;                                    
                w.completedTasks++;                             
                w.unlock();                                     
            }                                                   
        }                                                       
        completedAbruptly = false;                              
    } finally {  
        //对线程做一些善后的工作
        processWorkerExit(w, completedAbruptly);                
    }                                                           
}

总结runWorker()方法的执行过程如下:

  • while循环不断地通过getTask()方法获取任务
  • getTask()方法从阻塞队列中取任务(下面会分析到)
  • 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  • 调用task.run()执行任务;
  • 当通过getTask()获取任务为null时跳出循环,执行processWorkerExit()方法,对线程做一些善后处理;
  • runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
5、getTask()方法

getTask()方法是用来从阻塞队列中获取任务的方法,源码如下:

private Runnable getTask() { 
    //timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?            
                                                                          
    for (;;) {                                                            
        int c = ctl.get();                                                
        int rs = runStateOf(c);                                           
                                                                          
        /*
         * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      
            decrementWorkerCount();                                       
            return null;                                                  
        }                                                                 
                                                                          
        int wc = workerCountOf(c);                                        
                                                                          
        // timed变量用于判断是否需要进行超时控制                               
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;      
    /*
     * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
     * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
     * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
     * 如果减1失败,则返回重试。
     * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
     * 当线程池空闲之后非核心线程就是在这里被清除的
     */                                   
        if ((wc > maximumPoolSize || (timed && timedOut))                 
            && (wc > 1 || workQueue.isEmpty())) {                         
            if (compareAndDecrementWorkerCount(c))                        
                return null;                                              
            continue;                                                     
        }                                                                 
                                                                          
        try { 
            //poll方法是有时间限制的,超过指定时间就会返回,take方法则会一直阻塞等待任务
            Runnable r = timed ?                                          
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :     
                workQueue.take();                                         
            if (r != null)
                //返回任务
                return r; 
            // 如果 r == null,说明已经超时,timedOut设置为true
            timedOut = true;                                              
        } catch (InterruptedException retry) { 
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;                                             
        }                                                                 
    }                                                                     
}                                                                         
6、 processWorkerExit()方法

在runWorker()方法中当通过getTask()获取任务是返回null时,一个线程的生命就到达终点了,此时他会执行processWorkerExit()方法把自己从线程池维护的HashSet(workers)中移除了。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adju
        decrementWorkerCount();                                      
                                                                     
    final ReentrantLock mainLock = this.mainLock;                    
    mainLock.lock();                                                 
    try {   
        //汇总完成的任务数
        completedTaskCount += w.completedTasks;
        //从线程池中移除工作线程w
        workers.remove(w);                                           
    } finally {                                                      
        mainLock.unlock();                                           
    }                                                                
    // 根据线程池状态进行判断是否结束线程池         
    tryTerminate();                                                  
                                                                     
    int c = ctl.get();                                               
    if (runStateLessThan(c, STOP)) {                                 
        if (!completedAbruptly) {                                    
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;     
            if (min == 0 && ! workQueue.isEmpty())                   
                min = 1;                                             
            if (workerCountOf(c) >= min)                             
                return; // replacement not needed                    
        }                                                            
        addWorker(null, false);                                      
    } 
}

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute()方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker()通过getTask()获取任务,然后执行任务,如果getTask()返回null,进入processWorkerExit()方法,整个线程结束,如图所示:

四 、使用线程池的正确姿势

1、向线程池提交任务

ThreadPoolExecutor中有两个方法可以用于向线程池提交任务,分别是execute()submit()两个方法。

  • execute():用于提交不需要返回值的任务;

  • submit():用于提交需要返回值的任务,返回值被封装在Future接口中,可以通过提供的get()方法获取返回值。

我们在使用线程池的时候最好不要直接使用工具类Executors中提供的Executors.newXXXThreadPool()快捷方法创建线程池,因为这种方式会使用无界的任务队列,为避免OOM,我们应该使用ThreadPoolExecutor的构造方法手动指定队列的最大长度:

public class ThreadPoolTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = null;
        try {
            threadPool = new ThreadPoolExecutor(2, 5, 1000, 
                                                TimeUnit.MILLISECONDS, 
                                                new LinkedBlockingQueue<>(10),
                                                new DefaultTreadFactory(), 
                                 new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 0; i < 16; i++) {
                int j = i;
                threadPool.execute(new Thread(() -> {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "执行了任务" + j);
                }));
            }

        } finally {
            if (threadPool != null)
                threadPool.shutdown();
        }

    }

}


class DefaultTreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        Thread currentThread = new Thread(r);
        currentThread.setName("worker" + currentThread.getId());
        return currentThread;
    }
}

​ 在使用线程池的时候一定要根据任务的特性合理的配置线程池才能最大限度的发挥线程池带来的好处。对于CPU密集型任务应该配置尽可能少的线程,通常配置线程个数略多与CPU个数即可;对于IO密集型任务,每个线程在IO阻塞的时间远远大于其运行的时间,此时可以配置尽可能多的CPU;当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

2、线程池的初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程

下面是这2个方法的实现:

//初始化一个核心线程
public boolean prestartCoreThread() {                 
    return workerCountOf(ctl.get()) < corePoolSize && 
        addWorker(null, true);                        
}                                                     

//初始化全部核心线程
public int prestartAllCoreThreads() {  
     int n = 0;                         
     while (addWorker(null, true))      
         ++n;                           
     return n;                          
 }                                      
3、关闭线程池

 ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务,,当执行这个方法后线程池就会从RUNNING==>SHUTDOWN
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
4、动态调整线程池容量

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize():设置核心池大小
  • setMaximumPoolSize():设置线程池最大能创建的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

五、Excutors工具类中的4类基于ThreadPoolExecutor线程池简单分析

1、newFixedThreadPool()
public static ExecutorService newFixedThreadPool(int nThreads) {       
    return new ThreadPoolExecutor(nThreads, nThreads,                  
                                  0L, TimeUnit.MILLISECONDS,           
                                  new LinkedBlockingQueue<Runnable>());
}                                                                      

特点:

  • 是一个固定大小的线程池,核心线程corePoolSize=maxinumnPoolSize,因此无需超时时间
  • 阻塞队列使用的是LinkedBlockingQueue,没有显示大小,可以看成一个是一个无界阻塞队列,当队列一直添加就可能会导致OOM
  • FixedThreadPool适用于我了满足资源管理的需求,而需要限制当前线程数量的应用场景,是哟适用于负载比较重的服务器。
2、newCacheedThreadPool()
public static ExecutorService newCachedThreadPool() {               
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,             
                                  60L, TimeUnit.SECONDS,            
                                  new SynchronousQueue<Runnable>());
}                                                                   

特点:

  • 核心线程数为0,最大线程数是Integer.MAX_VALUE。
  • 阻塞队列采用了SynchronousQueue,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  • 适用于执行大量的的短期异步任务或负载较轻的服务器
3、newSingleThreadPool()
public static ExecutorService newSingleThreadExecutor() {             
    return new FinalizableDelegatedExecutorService                    
        (new ThreadPoolExecutor(1, 1,                                 
                                0L, TimeUnit.MILLISECONDS,            
                                new LinkedBlockingQueue<Runnable>()));
}

特点:

  • 线程池只有一个线程,并且也是使用LinkedBlockingQueue
  • 适用于需要保证任务串行执行的场景。
4、newScheduledThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(          
        int corePoolSize, ThreadFactory threadFactory) {                
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}                                                                       

特点:

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行
总结

Executors为我们提供了构造线程池的便捷方法,对于服务器程序我们应该杜绝使用这些便捷方法,而是直接使用线程池ThreadPoolExecutor的构造方法,避免无界队列可能导致的OOM以及线程个数限制不当导致的线程数耗尽等问题。ExecutorCompletionService提供了等待所有任务执行结束的有效方式,如果要设置等待的超时时间,则可以通过CountDownLatch完成。

留言区

还能输入500个字符