ScheduledThreadPool中的Leader-Follow模式你知道不?
共 1665字,需浏览 4分钟
·
2020-08-14 12:04
点击上方蓝色“程序猿DD”,选择“设为星标”
回复“资源”获取独家整理的学习资料!
ScheduledThreadPoolExecutor 是java中一个非常常用的定时调度的工具,其提供了两种定时调度常用模式:
1.固定调度周期的任务执行。
2.固定延迟间隔的任务执行,延迟间隔表示的是前一次执行完成到后一次执行开始的时间差。
1.scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
2.scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
ScheduledThreadPoolExecutor 是基于ThreadPoolExecutor实现的,其继承了类ThreadPoolExecutor。从构造器可以看出,为满足定时调度的需求,其基于ThreadPoolExecutor定制了延迟工作队列DelayedWorkQueue。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS /*10L*/, MILLISECONDS,
new DelayedWorkQueue());
}
DelayedWorkQueue是一个基于最小堆的队列,其在元素进出队列等BlockingQueue接口方法中使用可重入锁,以保证线程安全。为了在O(1)的时间取消特定任务的调度,ScheduleFutrueTask对象中还额外增加了heapIndex字段,以记录其在最小堆中的位置。
Leader-Follower模式
定时调度线程池与一般线程池的一个重要不同:提交的任务是延迟而非立即执行的,因此worker线程调用队列的take以取出执行任务必定是要阻塞的。考虑到等待延迟执行任务的线程无需都使用timed waiting等待队首任务,一个task的执行线程只有一个,全部唤醒只会造成大量无效的线程唤醒和阻塞操作。
ScheduledThreadPool采用了Leader-Follower模式,等待第一个线程的任务也称为leader,其调用available.awaitNanos待当前队列头部任务到达调度时间时唤醒。其余线程作为follower只需调用await方法无限阻塞等待,直至被leader唤醒,并重新完成抢锁->尝试执行队列首元素->抢leader->等待的循环。
public RunnableScheduledFuture> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
若新增元素位于堆顶,此时需安排等待该元素定期调度或立即执行的leader线程。为此offer方法在新增元素应率先调度时,清空leader,并signal唤醒某个等待线程,继续take方法中的循环:抢锁->尝试执行队首元素->抢leader->等待。
public boolean offer(Runnable x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
/*
insert into queue
*/
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
往期推荐
? ? ? ?