什么是线程池

说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行。
Java全栈知识体系

创建线程池

1. 通过 Executor 框架的工具类 Executors 来创建。


有很多内置的线程池,包括:

  • FixedThreadPool:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:给定的延迟后运行任务或者定期执行任务的线程池。

但并不建议这样创建线程池,原因:

  • FixedThreadPool SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
  • ScheduledThreadPool SingleThreadScheduledExecutor:使用的无界的延迟阻塞队列 DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

2. 使用ThreadPoolExecutor

构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize :线程池中的 核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有 核心线程。
  • maximumPoolSize :线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;当阻塞队列是无界队列, 则 maximumPoolSize 则不起作用, 因为无法提交至 核心线程池的线程会一直持续地放入 workQueue.
  • keepAliveTime :线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用, 超过这个时间的空闲线程将被终止;
  • unit :keepAliveTime 的单位
  • workQueue :用来保存等待被执行的任务的阻塞队列. 在 JDK 中提供了如下阻塞队列:
    • ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按 FIFO 排序任务;
    • LinkedBlockingQueue: 基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQueue;
    • SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue;
    • PriorityBlockingQueue: 具有优先级的无界阻塞队列;
  • threadFactory :创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory
  • handler :线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
    • AbortPolicy: 直接抛出异常,默认策略;
    • CallerRunsPolicy: 用调用者所在的线程来执行任务;
    • DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy: 直接丢弃任务

原理

重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;

内部状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 AtomicInteger 变量 ctl 的功能非常强大: 利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:

  • RUNNING: -1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • TIDYING : 2 << COUNT_BITS,即高 3 位为 010, 所有的任务都已经终止;
  • TERMINATED: 3 << COUNT_BITS,即高 3 位为 011, terminated()方法已经执行完成

任务执行

execute –> addWorker –>runworker (getTask)

execute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
   // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;

    // 执行命令,其中命令(下面称任务)对象是Runnable的实例
    public void execute(Runnable command) {
        // 判断命令(任务)对象非空
        if (command == null)
            throw new NullPointerException();
        // 获取ctl的值
        int c = ctl.get();
        // 判断如果当前工作线程数小于核心线程数,则创建新的核心线程并且执行传入的任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                // 如果创建新的核心线程成功则直接返回
                return;
            // 这里说明创建核心线程失败,需要更新ctl的临时变量c
            c = ctl.get();
        }
        // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
        // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
            // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例)
            if (! isRunning(recheck) && remove(command))
                // 调用拒绝策略处理任务 - 返回
                reject(command);
            // 走到下面的else if分支,说明有以下的前提:
            // 0、待执行的任务已经成功加入任务队列
            // 1、线程池可能是RUNNING状态
            // 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
            // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回
            // 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行 
            // 如果前工作线程数量不为0,原来应该是最后的else分支,但是可以什么也不做,因为任务已经成功入队列,总会有合适的时机分配其他空闲线程去执行它
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 走到这里说明有以下的前提:
        // 0、线程池中的工作线程总数已经大于等于corePoolSize(简单来说就是核心线程已经全部懒创建完毕)
        // 1、线程池可能不是RUNNING状态
        // 2、线程池可能是RUNNING状态同时任务队列已经满了
        // 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
        // 创建非核心线程失败,此时需要拒绝执行任务
        else if (!addWorker(command, false))
            // 调用拒绝策略处理任务 - 返回
            reject(command);
}

这里简单分析一下整个流程(对整个逻辑进行了简化,方便理解):

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。
    JavaGuide

addWorker 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}


    // 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  
    // 注意这是一个死循环 - 最外层循环
    for (int c = ctl.get();;) {
        // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
        // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
        // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
        // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        // 注意这也是一个死循环 - 二层循环
        for (;;) {
            // 这里每一轮循环都会重新获取工作线程数wc
            // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
            // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 成功通过CAS更新工作线程数wc,则break到最外层的循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 
            // else CAS failed due to workerCount change; retry inner loop 
        }
    }
    // 标记工作线程是否启动成功
    boolean workerStarted = false;
    // 标记工作线程是否创建成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
        // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
                // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
                // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
                // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把创建的工作线程实例添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                    // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
            if (workerAdded) {
                t.start();
                // 标记线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,需要从工作线程集合移除对应的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 添加Worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 从工作线程集合移除之
        if (w != null)
            workers.remove(w);
        // wc数量减1    
        decrementWorkerCount();
        // 基于状态判断尝试终结线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

分成几步来看

1. 检查线程池状态和任务队列
1
2
3
4
5
6
7
8
9
10
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&
                             firstTask == null &&
                             ! workQueue.isEmpty()))
        return false;

这段代码首先检查线程池的状态 rs(通过 runStateOf(c))。如果线程池处于 SHUTDOWN 状态并且 firstTask 非空或者任务队列为空,则不能添加新的工作线程。

  • 当 firstTask 为空时,意图通常是增加一个线程来处理已有的任务,而不是立即执行一个新的任务。
  • 如果线程池处于 SHUTDOWN 状态且任务队列为空,意味着没有任务需要处理,因此没有必要添加新的线程。
2. 控制线程数量
1
2
3
4
5
6
7
8
9
10
11
for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
}
  1. 无限循环: for (;;) { … } 创建了一个没有明确退出条件的循环,通常意味着循环会一直执行,直到通过循环体内的逻辑(如 break 语句)主动退出。
  2. 获取当前工作者线程数: int wc = workerCountOf(c); 从线程池的控制状态变量 ctl 中提取当前的工作者线程数(workerCount)。
  3. 检查线程池容量限制:如果当前工作者线程数 wc 已经达到最大容量 CAPACITY,或者
    当前线程数 wc 已经满足核心线程数要求(如果 core 为 true,则与 corePoolSize 比较;如果 core 为 false,则与 maximumPoolSize 比较), 则直接返回 false,表示无法成功添加新的工作者线程。
  4. 尝试增加工作者线程数:
    使用 compareAndIncrementWorkerCount(c)尝试原子性地增加工作者线程计数。如果成功增加,则跳出循环(break retry;),继续执行 addWorker 方法的后续步骤,比如实际创建并启动新线程。
  5. 重新读取 ctl 值: 如果增加计数失败(因为其他线程同时修改了 ctl),则通过 c = ctl.get();重新读取 ctl 的值,准备下一轮尝试。
  6. 检查运行状态是否改变:
    如果在重新读取 ctl 之后发现线程池的运行状态(runStateOf(c))发生了变化,说明线程池的状态不再与循环开始时一致,这时使用 continue retry;跳回循环开头重新评估条件。这是因为状态变化可能影响到是否应该继续尝试添加工作者线程的决策。
  7. 循环内部注释: 最后的注释// else CAS failed due to workerCount change; retry inner loop 解释了如果 compareAndIncrementWorkerCount 失败(CAS 操作失败),是因为工作者线程数在尝试增加时被其他线程改变了,所以需要通过 continue retry;指令回到循环开始,重新尝试整个过程。
3. 创建和启动工作线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
  1. 初始化变量:
    boolean workerStarted = false; 和 boolean workerAdded = false; 分别记录线程是否成功启动和是否成功添加到工作者集合。
    Worker w = null; 初始化一个 Worker 对象引用,Worker 类内部封装了线程和任务。
  2. 创建 Worker 对象:
    w = new Worker(firstTask); 创建一个 Worker 实例,传入第一个任务 firstTask。Worker 类通常包含一个 Runnable 任务和一个 Thread 对象,其中 Runnable 就是任务,Thread 则是执行该任务的实际线程。
  3. 检查线程有效性:
    final Thread t = w.thread; 获取 Worker 内部的线程引用。
    接下来通过 if (t != null)确保线程实例有效,防止 Worker 构造失败导致的空指针异常。
  4. 获取锁并检查线程池状态:
    mainLock.lock(); 获取线程池的主锁,确保以下操作的原子性和线程安全。
    再次检查线程池运行状态,确保在持有锁的情况下,线程池没有被关闭或正在关闭过程中,并且没有任务需要执行(当 rs == SHUTDOWN 且 firstTask == null 时)。
    如果线程已经启动(t.isAlive()),抛出异常,因为尝试启动一个已经启动的线程是非法的。
  5. 添加工作者线程:
    如果所有条件检查通过,将 Worker 实例添加到工作者集合 workers 中,并更新最大线程数 largestPoolSize。
    设置 workerAdded = true; 表示工作者线程已经被成功添加到集合中。
  6. 释放锁:
    finally 块确保无论是否成功添加工作者,都能释放 mainLock,避免死锁。
  7. 启动线程:
    如果 workerAdded 为 true,即工作者线程成功添加到集合中,调用 t.start();启动线程,并设置 workerStarted = true;

总结流程

  1. 检查线程池状态:确定线程池是否处于运行或可接受新任务的状态,尤其是在 SHUTDOWN 状态时,只接受无任务或队列不为空的情况
  2. 检查和更新线程计数:确保当前线程数未超过线程池容量限制,核心或最大线程数限制。通过原子操作更新工作线程计数,确保线程安全。
  3. 创建新 Worker 对象:用初始任务 firstTask 创建一个新的 Worker,包装了实际的线程。
  4. 添加新 Worker 到线程池:在确保线程池状态允许后,将新 Worker 对象添加到工作线程集合 workers 中,并更新最大池大小记录。
  5. 启动新线程:启动新创建的工作线程,使其开始从任务队列中获取和执行任务。
  6. 错误处理:如果新线程未能成功启动,进行清理操作,确保线程池状态的一致性和正确性。
4. runWorker方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
final void runWorker(Worker w) {
    // 获取当前线程,实际上和Worker持有的线程实例是相同的
    Thread wt = Thread.currentThread();
    // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
    Runnable task = w.firstTask;
    // 设置Worker中持有的初始化时传入的任务对象为null
    w.firstTask = null;
    // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
    w.unlock(); // allow interrupts
    // 记录线程是否因为用户异常终结,默认是true
    boolean completedAbruptly = true;
    try {
        // 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中)
        // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
        while (task != null || (task = getTask()) != null) {
            // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态
            // 否则,要保证当前线程不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子方法,任务执行前
                beforeExecute(wt, task);
                try {
                    task.run();
                    // 钩子方法,任务执行后 - 正常情况
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 钩子方法,任务执行后 - 异常情况
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 清空task临时变量,这个很重要,否则while会死循环执行同一个task
                task = null;
                // 累加Worker完成的任务数
                w.completedTasks++;
                // Worker解锁,本质是AQS释放资源,设置state为0
                w.unlock();
            }
        }
        // 走到这里说明某一次getTask()返回为null,线程正常退出
        completedAbruptly = false;
    } finally {
        // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

总结

  1. Worker先执行一次解锁操作,用于解除不可中断状态。
  2. 通过while循环调用getTask()方法从任务队列中获取任务(当然,首轮循环也有可能是外部传入的firstTask任务实例)。
  3. 如果线程池更变为STOP状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程必须不是中断状态。
  4. 执行任务实例Runnale#run()方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()和afterExecute()。
  5. while循环跳出意味着runWorker()方法结束和工作线程生命周期结束(Worker#run()生命周期完结),会调用processWorkerExit()处理工作线程退出的后续工作。
5. getTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private Runnable getTask() {
    // 记录上一次从队列中拉取的时候是否超时
    boolean timedOut = false; // Did the last poll() time out?
    // 注意这是死循环
    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        // 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑):
        // 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法
        // 2. 任务队列为空
        // 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
        // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
        // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 第二个if:
        // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
        // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
        // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
        // CAS把线程数减去1失败会进入下一轮循环做重试
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
            if (r != null)
                return r;
            // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

对于这个方法,如果返回了null,会导致runWoker直接退出while循环,执行processWorkerExit方法,一个线程就此销毁。

  • 第一处返回null
    1
    2
    3
    4
    5
    if (runStateAtLeast(c, SHUTDOWN)
        && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }
    如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况:
    1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法,因为这个状态下,线程池不会接受新的任务,并且也不会执行阻塞队列里面的任务。
    2. 任务队列为空,因为进入了 || 的第二处,他只能是SHUTDOWN状态,他不会接收新的任务,但是会继续处理阻塞队列里面的任务
      综上,满足条件后会减少工作线程数量并返回null
  • 第二处返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
// 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
// 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 第二个if:
// 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
// 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
// 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
// CAS把线程数减去1失败会进入下一轮循环做重试
if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
        return null;
    continue;
}

首先看timed这个变量,它的值取决于allowCoreThreadTimeOut和 当前线程数是否大于核心线程数,其中allowCoreThreadTimeOut默认为false,所以,一般情况下,核心线程,timedfalse,非核心线程为true。
关于其中的if判断:

  • wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
  • 或者timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
  • 并且工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,

这段逻辑大多数情况下是针对非核心线程。在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时,当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize。如果对于非核心线程,上一轮循环获取任务对象为null,这一轮循环很容易满足timed && timedOut为true,这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环,之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime的意义:

  • 当允许核心线程超时,也就是allowCoreThreadTimeOut设置为true的时候,此时keepAliveTime表示空闲的工作线程的存活周期。
  • 默认情况下不允许核心线程超时,此时keepAliveTime表示空闲的非核心线程的存活周期

关于timedOut这个变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try {
           // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
           // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
           if (r != null)
               return r;
           // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
           timedOut = true;
       } catch (InterruptedException retry) {
           timedOut = false;
       }

可以看到,如果timed为false(核心线程),他调用的是workQueue.take(),这个方法会一直阻塞,直达取到任务,这就是为什么核心线程可以一直存在,而非核心线程则调用的是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),这个方法也会阻塞,只不过是阻塞keepAliveTime这个时间,如果还是没有取到任务,则返回null,那么timedOut就会被设为true,那么对于非核心线程,就很容易再下次循环到第二处返回null的地方而返回null进而回收线程

6. processWorkerExi()方法

processWorkerExit()方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法才算真正的终结)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 因为抛出用户异常导致线程终结,直接使工作线程数减1即可
    // 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
        completedTaskCount += w.completedTasks;
        // 工作线程集合中移除此将要终结的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
    tryTerminate();

    int c = ctl.get();
    // 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
    // 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
    // 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允许核心线程超时,最小值为0,否则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果最小值为0,同时任务队列不空,则更新最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 工作线程数大于等于最小值,直接返回不新增非核心线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

代码的后面部分区域,会判断线程池的状态,如果线程池是RUNNING或者SHUTDOWN状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:

  • allowCoreThreadTimeOut为true,也就是允许核心线程超时的前提下,如果任务队列空,则会通过创建一个非核心线程保持线程池中至少有一个工作线程。
  • allowCoreThreadTimeOut为false,如果工作线程总数大于corePoolSize则直接返回,否则创建一个非核心线程,也就是会趋向于保持线程池中的工作线程数量趋向于corePoolSize
    processWorkerExit()执行完毕之后,意味着该工作线程的生命周期已经完结。

使用submit()时,异常被封装在Future里面,线程继续复用

扩展

优雅关闭线程池

这里我们将线程池注册为spring的Bean,并重写其shutdown方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void shutdown() {
    if (isShutdown()) {
        return;
    }

    super.shutdown();
    if (this.awaitTerminationMillis <= 0) {
        return;
    }

    log.info("Before shutting down ExecutorService {}", threadPoolId);
    try {
        boolean isTerminated = this.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
        if (!isTerminated) {
            log.warn("Timed out while waiting for executor {} to terminate.", threadPoolId);
        } else {
            log.info("ExecutorService {} has been shutdown.", threadPoolId);
        }
    } catch (InterruptedException ex) {
        log.warn("Interrupted while waiting for executor {} to terminate.", threadPoolId);
        Thread.currentThread().interrupt();
    }
}

调用shutdown后,线程池会终止所有空闲的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

tryTerminate() 方法尝试将线程池状态从 SHUTDOWN 转换为 TERMINATED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 检查线程池状态,不符合终止条件则直接返回
        if (isRunning(c) || 
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        
        // 如果工作线程数不为0,则中断一个空闲线程并返回
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        
        // 获取主锁,准备终止线程池
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS操作将线程池状态设置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // 调用终止钩子方法
                } finally {
                    // 将线程池状态设置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll(); // 唤醒所有等待线程池终止的线程
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果CAS失败,则重试
    }
}

大多数情况下,tryTerminate会在前两处判断条件中直接return,所以可以在调用shutdown方法后,调用awaitTermination方法等待线程池执行完最后一批任务。

同时通过@Bean注解将线程池注册为Bean,spring会进行自动销毁方法推断。当 Spring 容器关闭时,它会自动查找 Bean 中名为 closeshutdownstop 等的方法,并在 Bean 销毁时自动调用。

动态修改线程池参数

对于核心线程数和最大线程数,线程池已经提供了api,如果想要动态修改组赛队列的容量只有手动实现。

用**LinkedBlockingQueue** 举例直接修改容量大小。

  1. 队列已满修改容量无效

    当队列已满,调用线程正阻塞在 put() 方法上等待空位:

    1
    2
    3
    while (count.get() == capacity) {
        notFull.await();  // 阻塞在这里
    }

    capacity 虽然被改大了,但线程已经卡在了 await() 上,如果没有人手动调用 signalNotFull(),它永远不会被唤醒。

  2. 容量变小后无法阻塞

    假设当前队列已存入 8 个元素,容量为 10。我们通过反射将容量缩小为 5,期望此后入队操作会被阻塞。

    但你会发现元素还是成功入队。这是因为队列当前 count.get() = 8,我们虽然把 capacity 改成了 5,但 JDK 的 put() 判断仍然是:

    1
    2
    3
    while (count.get() == capacity) {
        notFull.await();
    }

    此时 count.get() == 8capacity == 5,不相等,所以不会阻塞,直接 bypass

所以可以这样做:

  1. setCapacity() 中自动唤醒等待线程

    1
    2
    3
    4
    5
    6
    7
    8
    public void setCapacity(int capacity) {
        final int oldCapacity = this.capacity;
        this.capacity = capacity;
        final int size = count.get();
        if (capacity > size && size >= oldCapacity) {
            signalNotFull();
        }
    }

    capacity > size :新容量大于当前队列大小,确保队列现在有可用空间。如果新容量仍然小于或等于当前队列大小,那么队列实际上仍然是满的,没有空间可以接纳新元素,因此不需要通知等待的生产者线程。

    size >= oldCapacity :当前队列大小大于等于旧容量,确保队列之前是满的或接近满的。如果队列之前就有空间(size < oldCapacity),那么等待的生产者线程应该已经被通知过,不需要再次通知。

  2. put() 使用 >= 判断

    1
    2
    3
    while (count.get() >= capacity) {
        notFull.await();
    }

    使用 >= 是为了应对动态调整容量时的边界情况。如果在 put 操作期间,capacity 被动态缩小到小于当前 count(例如,队列有 1000 个元素,容量缩小到 500),count >= capacity 确保线程继续阻塞,直到队列元素数减少到新容量以下。

快速消费

重写线程池execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
 public void execute(Runnable command) {
     if (command == null) {
         throw new NullPointerException();
     }
     
     int c = ctl.get();
     // 如果当前线程数小于核心线程数,创建核心线程
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true)) {
             return;
         }
         c = ctl.get();
     }
     
     // 如果当前线程数小于最大线程数,优先创建非核心线程
     if (workerCountOf(c) < maximumPoolSize) {
         if (addWorker(command, false)) {
             return;
         }
         c = ctl.get();
     }
     
     // 如果线程数已达到最大值,尝试入队
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         if (!isRunning(recheck) && remove(command)) {
             reject(command);
         }
     } else if (!addWorker(command, false)) {
         reject(command);
     }
 }