ScheduledThreadPoolExecutor

ScheduledExecutorService

简单看下其继承关系的类图如下:

Executor

Executor定义了一个线程池的最核心的操作execute,帮助我们将任务提交和任务具体执行、调度进行解耦。

1
void execute(Runnable command);

通过Executor而不是现实的创建线程(new Thread(RunnableTask()).start()):
1
2
3
4
Executor executor = anExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

ExecutorService

ExecutorService提供方法来管理线程池的关闭、提供Future来追踪一个或多个异步任务的执行进度。ExecutorService可以关闭拒绝新来的任务,其提供了两种关闭方式:

  • shutdown() :关闭前允许之前提交的任务先执行完
  • shutdownNow():阻止等待的任务并尝试停止正在执行的任务
    submit()方法通过execute()实现并返回一个Future,可取消任务的执行或等待任务执行完成。
    invokeAnyinvokeAll方法用于批量执行任务,等待至少一个或者全部任务执行完成。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

ScheduledExecutorService

scheduledExecutorService在ExecutorService的基础上加入了定时任务调度的特性,可以调度任务在给定延时执行或在固定周期内执行。
schedule()可创建不同延迟的任务,并返回一个可取消或校验执行的ScheduledFuture<?>对象。
scheduledAtFixedRate()scheduledWithFixedDelay()方法创建周期性任务直达任务取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/**
* initialDelay初始延迟过后 周期性的执行任务
* 如果任务执行时间过长超过执行周期,后来的任务将延迟执行
* 不会并发执行
**/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 同样在initialDelay延迟后,开始周期性执行任务
* 按照前一个任务结束到后一个任务开始的 固定时间间隔执行
**/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是一个可以设置固定延迟或周期性执行任务的ThreadPoolExecutor。相对于Timer来说,当需要多个工作线程,或需要更加灵活 或需要ThreadPoolExecutor相关功能,ScheduledThreadPoolExecutor会更适合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//ScheduledThreadPoolExecutor
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}

public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//任务先加入DelayedWorkQueue中 通过最小堆排序 确定其执行的时机
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
//ThreadPoolExecutor
/**
* 保证线程池即使初始化corePoolSize为0时 也能有线程可供执行任务
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//任务封装成worker对象 并执行
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//任务添加成功则执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

//ThreadPoolExecutor$Worker
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//关键点: 判断Worker初始化任务是否为null 如果第一次初始化任务完成 则通过getTask方法尝试从BlockingQueue中拉取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//Worker包裹的实际任务(FutureTask/ScheduledFutureTask)执行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//从BlockingQueue中拉取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//ScheduledThreadPoolExecutor$ScheduledFutureTask
/**
* 重写了FutureTask的run方法 runAndReset实现周期性任务执行
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}

ScheduledThreadPoolExecutor相对于ThreadPoolExecutor,通过将Runnable任务封装为ScheduledFutureTask,内部使用的BlockingQueue为自定义的DelayedWorkQueue来完成固定延迟或周期性调度任务,ScheduledFutureTask其类图如下:

DelayedWorkQueue其源码类似于PriorityQueue,核心使用了最小堆来维护数据,加入删除数据时使用siftupsiftDown操作来保证最小堆的正确性。
ScheduledThreadPoolExecutor介于此,只需检查最小堆顶部的任务是否达到执行条件即可 故而使用ScheduledThreadPoolExecutor来执行的定时任务 并不是那么准确

坚持原创技术分享,您的支持将鼓励我继续创作!