成员属性
成员属性
成员变量
- shutdown 后是否继续执行周期任务:
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
- shutdown 后是否继续执行延迟任务:
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
- 取消方法是否将该任务从队列中移除:
// 默认 false,不移除,等到线程拿到任务之后抛弃
private volatile boolean removeOnCancel = false;
- 任务的序列号,可以用来比较优先级:
private static final AtomicLong sequencer = new AtomicLong();
延迟任务
ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列
在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
成员变量:
- 任务序列号:
private final long sequenceNumber;
- 执行时间:
private long time; // 任务可以被执行的时间,交付时间,以纳秒表示
private final long period; // 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式
fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动
- 实际的任务对象:
RunnableScheduledFuture\<V\> outerTask = this;
- 任务在队列数组中的索引下标:
// DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除
int heapIndex;
成员方法:
- 构造方法:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
// 任务的触发时间
this.time = ns;
// 任务的周期,多长时间执行一次
this.period = period;
// 任务的序号
this.sequenceNumber = sequencer.getAndIncrement();
}
- compareTo():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
// 类型强转
ScheduledFutureTask\<?\> x = (ScheduledFutureTask\<?\>)other;
// 比较者 - 被比较者的执行时间
long diff = time - x.time;
// 比较者先执行
if (diff < 0)
return -1;
// 被比较者先执行
else if (diff > 0)
return 1;
// 比较者的序列号小
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 不是 ScheduledFutureTask 类型时,根据延迟时间排序
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
- run():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程
public void run() {
// 是否周期性,就是判断 period 是否为 0
boolean periodic = isPeriodic();
// 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 非周期任务,直接调用 FutureTask#run 执行
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任务的执行,返回 true 表示执行成功
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置周期任务的下一次执行时间
setNextRunTime();
// 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
reExecutePeriodic(outerTask);
}
}
周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行
protected boolean runAndReset() {
// 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable\<V\> c = callable;
if (c != null && s == NEW) {
try {
// 执行方法,没有返回值
c.call();
ran = true;
} catch (Throwable ex) {
// 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
setException(ex);
}
}
} finally {
// 执行完成把执行线程引用置为 null
runner = null;
s = state;
// 如果线程被中断进行中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 如果正常执行,返回 true,并且任务状态没有被取消
return ran && s == NEW;
}
// 任务下一次的触发时间
private void setNextRunTime() {
long p = period;
if (p > 0)
// fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
time += p;
else
// fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
time = triggerTime(-p);
}
- reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture\<?\> task) {
if (canRunInCurrentRunState(true)) {
// 【放入任务队列】
super.getQueue().add(task);
// 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
// 如果不能执行且任务还在队列中未被取走,则取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
ensurePrestart();
}
}
- cancel():取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类 FutureTask#cancel 来取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
// removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除
if (cancelled && removeOnCancel && heapIndex >= 0)
// 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池
remove(this);
return cancelled;
}
延迟队列
DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素
其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常
成员变量:
- 容量:
private static final int INITIAL_CAPACITY = 16; // 初始容量
private int size = 0; // 节点数量
private RunnableScheduledFuture\<?\>[] queue =
new RunnableScheduledFuture\<?\>[INITIAL_CAPACITY]; // 存放节点
- 锁:
private final ReentrantLock lock = new ReentrantLock(); // 控制并发
private final Condition available = lock.newCondition();// 条件队列
阻塞等待头节点的线程:线程池内的某个线程去 take() 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader字段是否被占用
如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
堆顶任务.time - now()挂起如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起
// leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑
private Thread leader = null;
成员方法
- offer():插入节点
public boolean offer(Runnable x) {
// 判空
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture\<?\> e = (RunnableScheduledFuture\<?\>)x;
// 队列锁,增加删除数据时都要加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 队列数量大于存放节点的数组长度,需要扩容
if (i >= queue.length)
// 扩容为原来长度的 1.5 倍
grow();
size = i + 1;
// 当前是第一个要插入的节点
if (i == 0) {
queue[0] = e;
// 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标
setIndex(e, 0);
} else {
// 向上调整元素的位置,并更新 heapIndex
siftUp(i, e);
}
// 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接
// 到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费
// 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起
// 原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒,
// 唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务
if (queue[0] == e) {
// 将 leader 设置为 null
leader = null;
// 直接随便唤醒等待头结点的阻塞线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆
private void siftUp(int k, RunnableScheduledFuture\<?\> key) {
while (k > 0) {
// 父节点,就是堆排序
int parent = (k - 1) >>> 1;
RunnableScheduledFuture\<?\> e = queue[parent];
// key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
- poll():非阻塞获取头结点,获取执行时间最近并且可以执行的
// 非阻塞获取
public RunnableScheduledFuture\<?\> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队头节点,因为是小顶堆
RunnableScheduledFuture\<?\> first = queue[0];
// 头结点为空或者的延迟时间没到返回 null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点
return finishPoll(first);
} finally {
lock.unlock();
}
}
private RunnableScheduledFuture\<?\> finishPoll(RunnableScheduledFuture\<?\> f) {
// 获取尾索引
int s = --size;
// 获取尾节点
RunnableScheduledFuture\<?\> x = queue[s];
// 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
queue[s] = null;
// s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
if (s != 0)
// 从索引处 0 开始向下调整
siftDown(0, x);
// 出队的元素索引设置为 -1
setIndex(f, -1);
return f;
}
- take():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务
public RunnableScheduledFuture\<?\> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 保证线程安全
lock.lockInterruptibly();
try {
for (;;) {
// 头节点
RunnableScheduledFuture\<?\> first = queue[0];
if (first == null)
// 等待队列不空,直至有任务通过 offer 入队并唤醒
available.await();
else {
// 获取头节点的延迟时间是否到时
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
return finishPoll(first);
// 逻辑到这说明头节点的延迟时间还没到
first = null;
// 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
if (leader != null)
available.await();
else {
// 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..)
available.awaitNanos(delay);
// 到达阻塞时间时,当前线程会从这里醒来来
} finally {
// t堆顶更新,leader 置为 null,offer 方法释放锁后,
// 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
if (leader == thisThread)
// leader 置为 null 用以接下来判断是否需要唤醒后继线程
leader = null;
}
}
}
}
} finally {
// 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程,
// 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- remove():删除节点,堆移除一个元素的时间复杂度是 O(log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O(1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 查找对象在队列数组中的下标
int i = indexOf(x);
// 节点不存在,返回 false
if (i < 0)
return false;
// 修改元素的 heapIndex,-1 代表删除
setIndex(queue[i], -1);
// 尾索引是长度-1
int s = --size;
// 尾节点作为替代节点
RunnableScheduledFuture\<?\> replacement = queue[s];
queue[s] = null;
// s == i 说明头节点就是尾节点,队列空了
if (s != i) {
// 向下调整
siftDown(i, replacement);
// 说明没发生调整
if (queue[i] == replacement)
// 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
