ScheduledThreadPoolExecutor 源码浅析

ScheduledThreadPoolExecutor

该线层池其实使用的非常多了,基本遇到定时任务时,都会使用到此线程池来实现该需求,那么,这个线程池的实现是怎么样的呢?

继承关系

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类以及实现了ScheduledExecutorService接口

初始化
1
2
3
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}

该构造函数最终会直接调用父类的构造函数进行初始化,这里可以看到,这里线程池的最大线程数是Integer.MAX_VALUE的,以及设置了相应的其他参数,以及任务队列DelayedWorkQueue,该队列是实现定时任务的重点

如何实现按时间进行任务调度

ScheduledFutureTask

ScheduledThreadPoolExecutor中,定时任务被装饰成为ScheduledFutureTask对象,通过直接查看定时任务发布的代码可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

实际的任务command被装饰为ScheduledFutureTask,如果该任务需要延迟执行,则由函数triggerTime(initialDelay, unit)来实现,并再次被装饰为RunnableScheduledFuture(这代码实际就是返回sft对象),然后进行周期任务的发布delayedExecute(RunnableScheduledFuture)

现在来看看这个ScheduledFutuerTask对象的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

/** Sequence number to break ties FIFO */
private final long sequenceNumber;

/** The time the task is enabled to execute in nanoTime units */
private long time;

/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;

/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;

/**
* Index into delay queue, to support faster cancellation.
* 因为堆实际就是一个完全二叉树,因此完全可以用数组来实现,因此这个heapIndex我在一开始的时候
* 认为它就是元素在数组的下标
*/
int heapIndex;

/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

/**
* Creates a periodic action with given nano time and period.
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}

/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

// 优先队列需要比较元素的优先关系(即大小),因此是定时任务,肯定是最近要执行的任务优先级越大
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

/**
* Returns {@code true} if this is a periodic (not a one-shot) action.
*
* @return {@code true} if periodic
*/
public boolean isPeriodic() {
return period != 0;
}

/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}

/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
// 是否只执行一次
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 如果是 one-shot任务,则直接运行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次任务的执行时间
setNextRunTime();
// 重新压回任务队列,等待下次调度
reExecutePeriodic(outerTask);
}
}
}

因此,所有的任务被ScheduledFutureTask所装饰后,就带有了任务调度的时间信息以及任务的优先级

核心——任务延迟队列DelayedWorkQueue

DelayedWorkQueue是实现任务周期或者到期执行的关键实现,其本质是一个数组实现的无界的优先队列,并且带有阻塞功能(实现了BlockingQueue接口),因此之前前面的线程池初始化时,最大线程数设置为Integer.MAX_VALUE其实就没有用了,因此无界队列,永远不会出现任务进不了队列需要创建核心线层之外的线程进行任务的执行,因此ScheduledThreadPoolExecutor其实就是一个定长的线程池。

这里就给出DelayedWorkQueue的几个函数操作吧

获取一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 直接获取队列的首元素(到达被调度时间的任务)
RunnableScheduledFuture<?> first = queue[0];
// 首元素为空,则队列为空,需要等待队列中有元素
if (first == null)
available.await();
else {
// 获取该任务的任务延迟时间
long delay = first.getDelay(NANOSECONDS);
// 已经到达时间了
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

压入一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
线程的Leader-Follower模式

从上面的代码看到了一个有意思的地方

1
2
Thread thisThread = Thread.currentThread();
leader = thisThread;

这里出现了一个leader,直接查看关于Thread leader的官方注解

Thread designated to wait for the task at the head of the
queue. This variant of the Leader-Follower pattern
(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
minimize unnecessary timed waiting. When a thread becomes
the leader, it waits only for the next delay to elapse, but
other threads await indefinitely. The leader thread must
signal some other thread before returning from take() or
poll(…), unless some other thread becomes leader in the
interim. Whenever the head of the queue is replaced with a
task with an earlier expiration time, the leader field is
invalidated by being reset to null, and some waiting
thread, but not necessarily the current leader, is
signalled. So waiting threads must be prepared to acquire
and lose leadership while waiting.

可知,这是一种称为Leader-Follower的线程模式,该模式的优点就是减少了线程上下文的切换以及数据的拷贝。而DelayedWorkQueue就采用了这种模式来优化,再次来分析take()源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 直接获取队列的首元素(到达被调度时间的任务)
RunnableScheduledFuture<?> first = queue[0];
// 首元素为空,则队列为空,需要等待队列中有元素
if (first == null)
available.await();
else {
// 获取该任务的任务延迟时间
long delay = first.getDelay(NANOSECONDS);
// 已经到达时间了
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 是否存在 leader 线程,如果存在,则直接等待,
if (leader != null)
available.await();
else {
// 不存在,则设置自己为 Leader 线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 自己是否成功成为了 leader 线程,如果成功,则将自己的 leader 属性置位 null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

因此,除了leader线程以外,以及正在处理任务的线层,其余的follower线程均处于等待状态,因此从成为leader线层开始,数据的监听到操作都由一个线程来完成,避免了频繁的线程上下文切换的开销