跳至主要內容

成员属性

mozzie大约 11 分钟JavaJava

成员属性

成员变量

  • shutdown 后是否继续执行周期任务:
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  • shutdown 后是否继续执行延迟任务:
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  • 取消方法是否将该任务从队列中移除:
// 默认 false,不移除,等到线程拿到任务之后抛弃
private volatile boolean removeOnCancel = false;
  • 任务的序列号,可以用来比较优先级:
private static final AtomicLong sequencer = new AtomicLong();

延迟任务

ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列

在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask

成员变量:

  • 任务序列号:
private final long sequenceNumber;
  • 执行时间:
private long time;			// 任务可以被执行的时间,交付时间,以纳秒表示
private final long period;	// 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式

fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动

  • 实际的任务对象:
RunnableScheduledFuture\<V\> outerTask = this;
  • 任务在队列数组中的索引下标:
// DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除
int heapIndex;

成员方法:

  • 构造方法:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    // 任务的触发时间
    this.time = ns;
    // 任务的周期,多长时间执行一次
    this.period = period;
    // 任务的序号
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • compareTo():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        // 类型强转
        ScheduledFutureTask\<?\> x = (ScheduledFutureTask\<?\>)other;
        // 比较者 - 被比较者的执行时间
        long diff = time - x.time;
        // 比较者先执行
        if (diff < 0)
            return -1;
        // 被比较者先执行
        else if (diff > 0)
            return 1;
        // 比较者的序列号小
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    // 不是 ScheduledFutureTask 类型时,根据延迟时间排序
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
  • run():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程
public void run() {
    // 是否周期性,就是判断 period 是否为 0
    boolean periodic = isPeriodic();
    // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 非周期任务,直接调用 FutureTask#run 执行
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 周期任务的执行,返回 true 表示执行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置周期任务的下一次执行时间
        setNextRunTime();
        // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
        reExecutePeriodic(outerTask);
    }
}

周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行

protected boolean runAndReset() {
    // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable\<V\> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行方法,没有返回值
                c.call();
                ran = true;
            } catch (Throwable ex) {
                // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
                setException(ex);
            }
        }
    } finally {
		// 执行完成把执行线程引用置为 null
        runner = null;
        s = state;
        // 如果线程被中断进行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 如果正常执行,返回 true,并且任务状态没有被取消
    return ran && s == NEW;
}
// 任务下一次的触发时间
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
        time += p;
    else
        // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
        time = triggerTime(-p);
}
  • reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture\<?\> task) {
    if (canRunInCurrentRunState(true)) {
        // 【放入任务队列】
        super.getQueue().add(task);
        // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
        // 如果不能执行且任务还在队列中未被取走,则取消任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
            ensurePrestart();
    }
}
  • cancel():取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
    // 调用父类 FutureTask#cancel 来取消任务
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除
    if (cancelled && removeOnCancel && heapIndex >= 0)
        // 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池
        remove(this);
    return cancelled;
}

延迟队列

DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素

其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常

成员变量:

  • 容量:
private static final int INITIAL_CAPACITY = 16;			// 初始容量
private int size = 0;									// 节点数量
private RunnableScheduledFuture\<?\>[] queue = 
    new RunnableScheduledFuture\<?\>[INITIAL_CAPACITY];	// 存放节点
  • 锁:
private final ReentrantLock lock = new ReentrantLock();	// 控制并发
private final Condition available = lock.newCondition();// 条件队列
  • 阻塞等待头节点的线程:线程池内的某个线程去 take() 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader字段是否被占用

  • 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间 堆顶任务.time - now() 挂起

  • 如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起

// leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑
private Thread leader = null;

成员方法

  • offer():插入节点
public boolean offer(Runnable x) {
    // 判空
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture\<?\> e = (RunnableScheduledFuture\<?\>)x;
    // 队列锁,增加删除数据时都要加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // 队列数量大于存放节点的数组长度,需要扩容
        if (i >= queue.length)
            // 扩容为原来长度的 1.5 倍
            grow();
        size = i + 1;
        // 当前是第一个要插入的节点
        if (i == 0) {
            queue[0] = e;
            // 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标
            setIndex(e, 0);
        } else {
            // 向上调整元素的位置,并更新 heapIndex 
            siftUp(i, e);
        }
        // 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接
        //		到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费
       	// 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起
        //		原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒,
        //		唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务
        if (queue[0] == e) {
            // 将 leader 设置为 null
            leader = null;
            // 直接随便唤醒等待头结点的阻塞线程
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}
// 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆
private void siftUp(int k, RunnableScheduledFuture\<?\> key) {
    while (k > 0) {
        // 父节点,就是堆排序
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture\<?\> e = queue[parent];
        // key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}
  • poll():非阻塞获取头结点,获取执行时间最近并且可以执行的
// 非阻塞获取
public RunnableScheduledFuture\<?\> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队头节点,因为是小顶堆
        RunnableScheduledFuture\<?\> first = queue[0];
        // 头结点为空或者的延迟时间没到返回 null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            // 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}
private RunnableScheduledFuture\<?\> finishPoll(RunnableScheduledFuture\<?\> f) {
    // 获取尾索引
    int s = --size;
    // 获取尾节点
    RunnableScheduledFuture\<?\> x = queue[s];
    // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
    queue[s] = null;
    // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
    if (s != 0)
        // 从索引处 0 开始向下调整
        siftDown(0, x);
    // 出队的元素索引设置为 -1
    setIndex(f, -1);
    return f;
}
  • take():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务
public RunnableScheduledFuture\<?\> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 保证线程安全
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 头节点
            RunnableScheduledFuture\<?\> first = queue[0];
            if (first == null)
                // 等待队列不空,直至有任务通过 offer 入队并唤醒
                available.await();
            else {
                // 获取头节点的延迟时间是否到时
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
                    return finishPoll(first);
                
                // 逻辑到这说明头节点的延迟时间还没到
                first = null;
                // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
                if (leader != null)
                    available.await();
                else {
                    // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..)
                        available.awaitNanos(delay);
                        // 到达阻塞时间时,当前线程会从这里醒来来
                    } finally {
                        // t堆顶更新,leader 置为 null,offer 方法释放锁后,
                        // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
                        if (leader == thisThread)
                            // leader 置为 null 用以接下来判断是否需要唤醒后继线程
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程,
        // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
  • remove():删除节点,堆移除一个元素的时间复杂度是 O(log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O(1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑
public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 查找对象在队列数组中的下标
        int i = indexOf(x);
        // 节点不存在,返回 false
        if (i < 0)
            return false;
		// 修改元素的 heapIndex,-1 代表删除
        setIndex(queue[i], -1);
        // 尾索引是长度-1
        int s = --size;
        // 尾节点作为替代节点
        RunnableScheduledFuture\<?\> replacement = queue[s];
        queue[s] = null;
        // s == i 说明头节点就是尾节点,队列空了
        if (s != i) {
            // 向下调整
            siftDown(i, replacement);
            // 说明没发生调整
            if (queue[i] == replacement)
                // 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
贡献者: mozzie