线程池ThreadPoolExecutor源码阅读

ThreadPoolExecutor数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//7个核心参数:
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 等待队列
private final BlockingQueue<Runnable> workQueue;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程最大存活时间 + 时间单位
private volatile long keepAliveTime; //TimeUnit unit


// 实际上最大线程量,不是Integer.MAX+VALUE而是2^29-1
private static final int CAPACITY = (1 << 29) - 1;

// ctl用来统计当前线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程池子,操作的时候需要加锁,mainLock
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();

// 当前池子中存活线程数,addWorker成功时会统计largestPoolSize
private int largestPoolSize;

//false(default):空闲时核心线程依旧运行。true:超时后销毁
private volatile boolean allowCoreThreadTimeOut;


//线程池的状态
//RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
private static final int STOP = 1 << COUNT_BITS;
//当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
//当线程池变为TIDYING状态时,会执行钩子函数terminated()。
//terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
private static final int TIDYING = 2 << COUNT_BITS;
//线程池彻底终止,就变成TERMINATED状态。
private static final int TERMINATED = 3 << COUNT_BITS;

线程池状态切换

启动流程

从execute()开始分析,线程池的启动流程。非核心代码省略(比如权限校验等)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public void execute(Runnable command) {
// ctl的值代表当前线程池中的线程数
int c = ctl.get();
// 如果当前线程数 < 核心线程数,直接创建新线程addWorker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池正常,进行任务入队等待
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果线程池不处于运行态,则从队列中移除然后执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果入队失败(已满)则判断是否<maximumSize,小于则创建新的线程执行任务,否则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}


//包装任务并创建线程执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 检查线程池状态、任务是否为空、任务等待队列是否为空等
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
//自旋检查wc
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


addWoker()://mainLock.lock。
//把任务Runnable包成Worker,new Worker时会使用线程工厂创建新的线程 this.thread = getThreadFactory().newThread(this);
w = new Worker(firstTask);
//加入线程池中
workers.add(w);
// 启动线程
w.thread.start()//实际上会调用worker的runWorker方法
//线程启动失败会从池子中删除线程,并ctl--,mainLock.lock。
addWorkerFailed(w);



Worker://Worker继承了AQS,实现了Runnable接口
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

runWorker:
// 线程启动后会循环获取任务,并执行
while (task != null || (task = getTask()) != null) {
//默认空,可重写
beforeExecute(wt, task);
//执行任务
task.run();
//默认空,可重写,一般用来监控当前线程池状态
afterExecute(task, thrown);
}
//如果取不到任务,则退出线程。
//统计线程完成的任务数,completedTaskCount += w.completedTasks;
//从线程池中移除线程,会加锁。workers.remove(w);
processWorkerExit();


getTask():
for(;;){
//判断是设置了允许核心线程超时停止
//wc > 核心线程数说明不应该处理任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 返回null,线程获取不到任务,会结束
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 从workQueue获取队头任务
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}