欢迎光临
我们一直在努力

ScheduledThreadPoolExecutor源码解析

ScheduledThreadPoolExecutor是由Doug Lea大神出品,用于执行延时任务的线程池。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { } 

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,并自定义了延迟阻塞队列DelayedWorkQueue用于存放延时任务,同时使用ScheduledFuturetask来包装待执行的任务。

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { } public class FutureTask<V> implements RunnableFuture<V>{} public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V>{} // 因为继承了Delayed接口,所以包装后的任务才可以添加到延时队列中 public interface ScheduledFuture<V> extends Delayed, Future<V>{} 

ScheduledFutureTask即可接受Runnable任务,又可接受Callable任务,本质上调用的是父类FutureTask的构造方法。

延时任务构建时,会指定2个时间参数:

time 任务初次执行的时间 period 周期性执行间隔 若period不为0,则证明该任务为周期性任务 

接着看一下ScheduledFutureTask的run()方法:

public void run() { // 判断该任务是否为周期性任务 boolean periodic = isPeriodic(); // 判断当前线程池状态是否满足该任务的执行 // 若不满足,则放弃执行该任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期任务,则直接执行 else if (!periodic) ScheduledFutureTask.super.run(); // 周期性任务,调用runAndReset()方法,使得任务执行完之后,可以重置为初始状态 else if (ScheduledFutureTask.super.runAndReset()) { // 获取下一次执行时间 setNextRunTime(); // 再次执行 reExecutePeriodic(outerTask); } } void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 将该任务重新添加到延迟队列(此处是实现周期性执行的关键) super.getQueue().add(task); if (!canRunInCurrentRunState(true) &amp;& remove(task)) task.cancel(false); else ensurePrestart(); } } 
public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 参数验空 if (command == null || unit == null) throw new NullPointerException(); // 将传入的任务装饰为ScheduledFutureTask RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 延迟执行该任务 delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { // 若线程池已经Shutdown,直接拒绝该任务 if (isShutdown()) reject(task); else { // 将任务添加到线程池的阻塞队列 super.getQueue().add(task); // 执行前校验一下线程池状态 // 如果线程池已经Shutdown,且线程池配置了Shutdown以后不能运行,且成功将该任务从线程池的阻塞队列中删除 // task.cancel(false)放弃该任务的执行 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 确定执行 ensurePrestart(); } } // 该方法为ThreadPoolExecutor的方法 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } 

addWorker方法会启动工作线程Worker,并执行其start()方法,而Worher本身又实现了Runnable接口的run()方法:

public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // ensurePrestart调用addWorker传入的task为null // getTask()用于从延时阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 任务真正得到执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

ThreadPoolExecutor相关方法的具体解析,可参考我之前的Java线程池系列文章。

若延时任务正常添加并执行,其方法路径如下:

ScheduledThreadPoolExecutor.sumbit–>ScheduledThreadPoolExecutor.schedule–>ScheduledThreadPoolExecutor.delayedExecute–>ThreadPoolExecutor.ensurePrestart–>ThreadPoolExecutor.addWorker–>Worker.run–>Worker.runWorker。

  • 首先将Runnable或者Callable装饰为ScheduledFutureTask添加到延时队列中;
  • 然后调用线程池的addWorker方法启动Worker线程;
  • Worker线程启动后,会执行其run()方法;
  • Worker线程的run方法会不断从延时队列中拉取任务执行,直至延时队列为空。

ScheduledThreadPoolExecutor没有直接使用DelayQueue作为其延时队列,而是自定义了内部静态类DelayedWorkQueue。

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; ... 

跟读完代码可以发现,其和DelayQueue实现逻辑基本一样:

DelayQueue底层使用了PriorityQueue来保存元素,以保证最快过期的元素最先被获取,而DelayedWorkQueue则使用RunnableScheduledFuture<?>[]数组来保存元素,并自行实现了最小堆算法(相当于重新写了一遍PriorityQueue),其他增删元素的操作和DelayQueue完全一致。

DelayedWorkQueue之所以没直接使用PriorityQueue,我猜测主要原因是PriorityQueue的数组声明是Object[],使用RunnableScheduledFuture元素的相关方法时,会涉及到大量的类型强转,影响性能。

相当于用代码冗余来提升性能,引申一下,其实大部分情况下,降低代码冗余和提升性能是相悖的。比如ORM框架使用反射来进行表和POJO的自动映射,减少了代码冗余,但反射会带来一定程度的性能下降。所以凡事无绝对,重要的是如何平衡各因素以满足业务需求。

除了execute()和submit()方法用于提交任务外,ScheduledThreadPoolExecutor还提供了scheduleAtFixedRate方法和scheduleWithFixedDelay方法来使任务周期性执行。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } 

由源码可以看出,2个方法流程基本一致,唯一的区别就在于ScheduledFutureTask的构建。

scheduleAtFixedRate传入的周期执行时间period为正值,而scheduleWithFixedDelay传入的周期执行时间为负值-delay。

周期执行时间主要影响计算下一次的任务执行时间:

private void setNextRunTime() { long p = period; // 若为正值,则下次启动时间为上次启动时间+period if (p > 0) time += p; else // 若为负值,则下次启动时间为当前时间+(-p) // 由于传入的p为负值,-p即为正值 // 上次任务结束的时候才会执行setNextRunTime()方法,所以当前时间即为上次任务结束的时间 time = triggerTime(-p); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } 

综上,scheduleAtFixedRate方法的周期时间是本次任务开始和下次任务开始之间的间隔,而scheduleWithFixedDelay方法的周期时间指的是本次任务的结束和下次任务的开始之间的间隔。

举个例子就明白了,比如某任务平均执行时间为3分钟,设定周期间隔为10分钟,首次任务的执行时间为12:00。

在scheduleAtFixedRate方法下:

第1次执行: start-12:00 end-12:03 第2次执行: start-12:10 end-12:13 第3次执行: start-12:20 end-12:23 ... 

在scheduleWithFixedDelay方法下:

第1次执行: start-12:00 end-12:03 第2次执行: start-12:13 end-12:16 第3次执行: start-12:26 end-12:29 ... 

至此,ScheduledThreadPoolExecutor源码解析完毕,感谢阅读。

  • 海报
海报图正在生成中...
赞(0) 打赏
声明:
1、本博客不从事任何主机及服务器租赁业务,不参与任何交易,也绝非中介。博客内容仅记录博主个人感兴趣的服务器测评结果及一些服务器相关的优惠活动,信息均摘自网络或来自服务商主动提供;所以对本博客提及的内容不作直接、间接、法定、约定的保证,博客内容也不具备任何参考价值及引导作用,访问者需自行甄别。
2、访问本博客请务必遵守有关互联网的相关法律、规定与规则;不能利用本博客所提及的内容从事任何违法、违规操作;否则造成的一切后果由访问者自行承担。
3、未成年人及不能独立承担法律责任的个人及群体请勿访问本博客。
4、一旦您访问本博客,即表示您已经知晓并接受了以上声明通告。
文章名称:《ScheduledThreadPoolExecutor源码解析》
文章链接:https://www.456zj.com/36575.html
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址