jdk 源码系列之 TheadPoolExecutor

Posted by Sinsy on March 14, 2021 About 25 k words and Need 70 min

jdk 源码系列之ThreadPoolExecutor

前言

最近在做消息系统重构,许多代码耦合在业务层,不利后续需求的更迭。经过讨论,将日志、消息、邮件抽离出来,放在切面层里面进行异步处理。

在使用的过程中,产生了不少疑问。

  • 为何 worker 是线程安全的?
  • 私有参数的作用是什么?比如(workQueuemaximumPoolSize 等)

以下源码都是基于 JDK 11 的阅读

了解和使用

首先了解下,线程池存在的意义。

1
2
3
4
5
6
7
Thread pools address two different problems: they usually
provide improved performance when executing large numbers of
asynchronous tasks, due to reduced per-task invocation overhead,
and they provide a means of bounding and managing the resources,
including threads, consumed when executing a collection of tasks.
Each ThreadPoolExecutor also maintains some basic statistics, 
such as the number of completed tasks. 

线程池解决了两个问题。

  1. 减少了每个任务的资源开销,换句话说,池的存在就是为了复用资源(例如:CPU数量)。
  2. 方便管理每个任务的基本情况。(这个应该说的是线程名字、线程数量、任务数量等,// 推测)

直接 new 线程池的 ThreadPoolExecutor。需要以下参数。

参数 是否必须 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 存活时间的时间单位
workQueue BlockingQueue <Runnable> 工作队列
threadFactory ThreadFactory 线程工厂
handler RejectedExecutionHandler 拒绝策略


corePoolSize

空闲状态的线程池中保留的线程数量。如果设置了 allowCoreThreadTimeOut 这个属性,也会收到 keepAliveTime 的影响。

maximumPoolSize

允许创建的最大线程数。

keepAliveTime

核心线程数以外的线程最长等待任务的时间。

unit

keepAliveTime 参数的时间的时间单位。

workQueue

工作队列,用于保存任务、提交任务的队列。

threadFactory

用于创建新线程的工厂。默认创建 DefaultThreadFactory 类。

handler

达到最大的线程数、或者是放不进工作队列时,所触发阻止的策略。默认拒绝策略 AbortPolicy

实例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class Test {

    public static void main(String[] args) {


        int corePoolSize = 16;

        int maximumPoolSize = 24;

        long keepAliveTime = 10L;

        TimeUnit timeUnit = TimeUnit.SECONDS;

        BlockingQueue<Runnable> work = new LinkedBlockingQueue<>(16);

        String prefix = "test";
        boolean daemon = true;
        ThreadFactory factory = new MyThreadFactory(prefix, daemon);

        RejectedExecutionHandler handler =
                new ThreadPoolExecutor.CallerRunsPolicy();

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                timeUnit,
                work,
                factory,
                handler
        );

        for (int i = 0; i < 10; i++) {
            MyTask myTask = new MyTask();
            threadPoolExecutor.execute(myTask);
        }

        threadPoolExecutor.shutdown();


    }
}


public class MyTask implements Runnable{

    @Override
    public void run() {
        System.err.println(System.currentTimeMillis());
    }
}

public class MyThreadFactory implements ThreadFactory {

    private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    private final boolean daemonThread;

    public MyThreadFactory(String namePrefix, boolean daemonThread) {

        if (namePrefix == null || "".equals(namePrefix)) {
            this.namePrefix = namePrefix + "-thread-" + POOL_NUMBER;
        } else {
            this.namePrefix = "";
        }

        this.daemonThread = daemonThread;

        SecurityManager s = System.getSecurityManager();
        group = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    @Override
    public Thread newThread(Runnable r) {

        String name = namePrefix + threadNumber.getAndIncrement();

        Thread t = new Thread(group, r, name, 0);
        t.setDaemon(daemonThread);
        return t;
    }

}

类似 ThreadFactory 的实例,其实可以看下 ThreadPoolExecutor 类里面的实例,参考下,了解大概的写法。

小结

对于 corePoolSize、maximumPoolSize 两个参数而言,我们说任务分 CPU 密集型、IO 密集型两种。CPU 密集型可能只有少量长 CPU 执行,IO 密集型则通常具有大量短 CPU 执行。

但是由于业务是具有多样性,今天配置参数,可能只适用于今天,哪天流量起来,就可能触发拒绝策略,而从导致后续的业务都失效了。目前市面上统一或者很好的方案,不过,这几个参数可以设置到后台配置,每次看实际效果,动态调整 corePoolSize、maximumPoolSize、wokrQueue 等参数。

源码

首先看下 ThreadPoolExecutor 继承关系

ThreadPoolExecutor.png

间接继承了 Executor、ExecutorService 两个接口类,直接继承了 AbstractExecutorService。

顶级父类 Executor 只抽象了一个 execute 的执行方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

用于执行新线程的一个抽象方法。

ExecutorService 接口类则更多的是定义执行方法的行为。

比如 shutdown 关闭、awaitTermination 等待终止、submit 提交等,赋予了 Executor 更多的功能,不过这只是做了抽象、都未作实现。

来到 AbstractExecutorService 这个类,实现了部分功能(submit 提交),同时继续加强 Executor 类。

ThreadPoolExecutor 的构造函数

创建一个实例出来

1
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();

构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    //  默认线程工厂
    private static class DefaultThreadFactory implements ThreadFactory {
        // 线程池的编号,一个池底下可以有多个线程
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        // 线程组
        private final ThreadGroup group;

        // 线程编号
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        // 线程名前缀
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        // 创建线程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 是否是守护线程,默认非守护线程
            if (t.isDaemon())
                t.setDaemon(false);
            // 优先级别,数字越大优先度越高,默认正常级别
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    // 默认拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
         // 
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 参数不允许设置小于0 或则是 最大线程数小于核心线程数
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 参数不允许设置 null
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 赋值到全局私有变量
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

4个必传参数,corePoolSize maximumPoolSize workQueue keepAliveTime,同时 RejectedExecutionHandler 是 AbortPolicy 策略模式。

顺路提一下,线程池拒绝策略有 4 种。 ThreadPoolPolicy

AbortPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    /**
     * A handler for rejected tasks that throws a
     * {@link RejectedExecutionException}.
     *
     * This is the default handler for {@link ThreadPoolExecutor} and
     * {@link ScheduledThreadPoolExecutor}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

当前任务无法进入工作队列,触发拒绝策略,直接抛出异常。

CallerRunsPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 是否关闭
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

只要不关闭,就不会丢弃任务。

DiscardOldestPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 不是关闭状态
            if (!e.isShutdown()) {
                // 移除队列头部
                e.getQueue().poll();
                // 执行当前任务
                e.execute(r);
            }
        }
    }

丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务。

DiscardPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

什么都不做,既不抛错、也不操作工作队列,静静的丢弃任务。

小结

总结下这 4个线程池拒绝策略。

策略 作用 个人思考以及建议
AbortPolicy 不能提交任务后,抛出异常,并丢失任务 比较重要的业务使用这个,可以快速定位到那个任务丢失
CallerRunsPolicy 只要不关闭,就不会丢弃任务 必须要让所有任务都执行的业务,可以选择这个
DiscardOldestPolicy 丢弃队列最前面的任务,然后执行当前被线程池拒绝的任务 使用这个,必须要抛弃之前的任务,感觉有点鸡肋,慎重使用
DiscardPolicy 静静的丢弃任务 不是重要的计算业务,可以考虑这个

执行过程

在上面的 实例 ,执行的方法是 execute 方法。接下来我们解析剖析这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@link RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        // 任务没null,直接抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         // 这里应该是将工作队列任务数量设置到高位
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 添加进工作队列,true 则使用核心线程来绑定
            // 这里貌似是直接分配任务下去执行了,直接使用核心线程数
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        // 是运行状态,同时将任务,添加至任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 不是运行状态,移除任务
            if (! isRunning(recheck) && remove(command))
                // 触发线程池运行策略
                reject(command);
            // 其他任务都执行完毕
            else if (workerCountOf(recheck) == 0)
                // 增加执行线程的工作对象
                // 不是首次执行任务,使用最大线程数
                addWorker(null, false);
        }
        // 无法添加工作线程对象,触发线程池拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }


    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
     // 添加工作任务
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 这里的 C 应该是 bit 高位的运行状态
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            // 检查是否是结束、或者是停止状态,必须是运行状态
            // 首次的任务不是 null、且队列是 null,不需要添加进工作队列
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                // 添加失败
                return false;


            for (;;) {
                // core 等于 true 的时候 
                // 比较的是 corePoolSize,是否超过工作数量
                // 反之,比较的 maximumPoolSize
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    // 超过无法添加
                    return false;
                // 增加工作数量
                if (compareAndIncrementWorkerCount(c))
                    // 增加成功后,结束整个循环
                    break retry;
                // 重新获取增加一后的工作数量
                c = ctl.get();  // Re-read ctl
                // 检查是否处于关闭状态
                if (runStateAtLeast(c, SHUTDOWN))
                    // 没有处于关闭状态、跳过循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 初始化第一个工作任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // 开始执行程序
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 使用 ReentrantLock 来加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    // 这里依然判断是否是运行状态
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 判断线程是否正在处于活跃状态
                        if (t.isAlive()) // precheck that t is startable
                            // 是活跃状态,这说明有其他任务正在利用这个线程
                            // 所以直接抛出错误
                            throw new IllegalThreadStateException();
                        // 添加进 workers,这个任务执行者
                        workers.add(w);
                        int s = workers.size();
                        // 设置当前最大线程数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 执行工作添加成功
                        workerAdded = true;
                    }
                } finally {
                    // 无论有没有成功,都要释放锁,避免产生死锁
                    mainLock.unlock();
                }
                // 执行工作添加成功
                if (workerAdded) {
                    // 线程启动
                    t.start();
                    // 执行工作正式工作状态
                    workerStarted = true;
                }
            }
        } finally {
            // 可能由于抛出导致 workerStarted 还是为 false
            // 则重新回滚在试试
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 执行工作线程状态
        return workerStarted;
    }

    
    /**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            if (w != null)
                // 移除执行执行工作
                workers.remove(w);
            // 减一操作
            decrementWorkerCount();
            // 尝试进行中断
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
                return;
            // 工作线程数不等于 0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 打断工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 设置当前生命周期状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 终止
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒所有线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

线程池的原理就是,通过一个叫 workers 的 Hashset 添加所有工作线程,且不能大于最大线程数 maximumPoolSize 。这个 worker 的对象保存着线程对象,以及任务对象。

执行的时候,直接分配下去,当超过核心线程数 corePoolSize 的时候,任务则会去到工作队列 workQueue 等待空余的线程调用任务,否则直接执行。。

如果超过最大线程数 maximumPoolSize 或者线程池的生命周期不是处于 RUNNING,都会触线程池的拒绝策略。

另外添加进 workers 的这个 Hashset 都是加上了 ReentrantLock 的所有,这意味着在获取子线程的操作都是线程安全的,当前只有一个对象获取到这个线程。如果有其他任务使用到了这个线程,直接抛出。不在执行任何操作。

总结

总结以下,看了 ThreadPoolExecutor 的体会。

  1. ThreadPoolExecutor 线程安全类。当前的线程必须是没有执行过任何任务,另外 ThreadPoolExecutor 类在请求子线程的都加了锁,其他任务无法在请求到这个线程。线程的局部变量只有该当前任务才能使用到,所以线程安全。
  2. corePoolSize 是保留线程的数量,如果设置过高,很容易闲置了线程资源或者说是限制了 CPU 资源。要具体看情况设置。
  3. 线程池的拒绝策略,DiscardOldestPolicy 要慎重使用。
  4. 在使用线程池的时候,不需要考虑使用的容器是否是线程安全了、或者这个类线程安不安全。

声明

作者: Sinsy
本文链接:https://blog.sincehub.cn/2021/03/14/TheadPoolExecutor/
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文声明。 如您有任何商业合作或者授权方面的协商,请给我留言:550569627@qq.com

引用