坑爹!Quartz 重复调度问题,你遇到过么?
点击关注公众号,回复“2T”获取2TB学习资源!
互联网架构师后台回复 2T 有特别礼包
作者:Lavender
来源:https://segmentfault.com/a/1190000015492260
1. 引子
2. 准备
3)Quartz自带的表里面,本文主要涉及以下3张表:
triggers表。triggers表里记录了,某个trigger的PREV_FIRE_TIME(上次触发时间),NEXT_FIRE_TIME(下一次触发时间),TRIGGER_STATE(当前状态)。虽未尽述,但是本文用到的只有这些。
locks表。Quartz支持分布式,也就是会存在多个线程同时抢占相同资源的情况,而Quartz正是依赖这张表,处理这种状况,至于如何做到,参见3.1。
fired_triggers表,记录正在触发的triggers信息。
4)TRIGGER_STATE,也就是trigger的状态,主要有以下几类:
trigger的初始状态是WAITING,处于WAITING状态的trigger等待被触发。调度线程会不停地扫triggers表,根据NEXT_FIRE_TIME提前拉取即将触发的trigger,如果这个trigger被该调度线程拉取到,它的状态就会变为ACQUIRED。
3. 开始排查
3.1分布式状态下的数据访问
JobStoreSupport
类的executeInNonManagedTXLock()
方法:/**
*Execute the given callback having acquired the given lock.
*Depending on the JobStore,the surrounding transaction maybe
*assumed to be already present(managed).
*
*@param lockName The name of the lock to acquire,for example
*"TRIGGER_ACCESS".If null, then no lock is acquired ,but the
*lockCallback is still executed in a transaction.
*/
这意味着,我们使用这个方法,不仅可以保证事务,还可以选择保证,callback方法的线程安全。
executeInNonManagedTXLock(…)
中的obtainLock(conn,lockName)
方法,即抢锁的过程。这个方法是在Semaphore
接口中定义的,Semaphore
接口通过锁住线程或者资源,来保护资源不被其他线程修改,由于我们的调度信息是存在数据库的,所以现在查看DBSemaphore.java
中obtainLock
方法的具体实现:expandedSQL
和expandedInsertSQL
这两个变量:obtainLock
方法通过locks表的一个行锁(lockName确定)来保证callback方法的事务和线程安全。拿到锁后,obtainLock
方法将lockName
写入threadlocal
。当然在releaseLock
的时候,会将lockName
从threadlocal
中删除。executeInNonManagedTXLock()
方法,保证了在分布式的情况,同一时刻,只有一个线程可以执行这个方法。3.2 quartz的调度过程
QuartzSchedulerThread
是调度线程的具体实现,图3-4 是这个线程run()
方法的主要内容,图中只提到了正常的情况下,也就是流程中没有出现异常的情况下的处理过程。由图可以看出,调度流程主要分为以下三步:1)拉取待触发trigger:
调度线程会一次性拉取距离现在,一定时间窗口内的,一定数量内的,即将触发的trigger信息。那么,时间窗口和数量信息如何确定呢,我们先来看一下,以下几个参数:
idleWaitTime
:默认30s,可通过配置属性org.quartz.scheduler.idleWaitTime
设置。availThreadCount
:获取可用(空闲)的工作线程数量,总会大于1,因为该方法会一直阻塞,直到有工作线程空闲下来。maxBatchSize
:一次拉取trigger的最大数量,默认是1,可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount
改写batchTimeWindow
:时间窗口调节参数,默认是0,可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow
改写misfireThreshold
:超过这个时间还未触发的trigger,被认为发生了misfire,默认60s,可通过org.quartz.jobStore.misfireThreshold
设置。
now + idleWaitTime +batchTimeWindow
),大于(now - misfireThreshold
)的,min(availThreadCount,maxBatchSize)
个triggers,默认情况下,会拉取未来30s,过去60s之间还未fire的1个trigger。随后将这些triggers的状态由WAITING改为ACQUIRED,并插入fired_triggers表。2)触发trigger:
@DisallowConcurrentExecution
),则将状态变为BLOCKED,否则就将状态改为WAITING。3)包装trigger,丢给工作线程池:
遍历triggers,如果其中某个trigger在第二步出错,即返回值里面有exception或者为null,就会做一些triggers表,fired_triggers表的内容修正,跳过这个trigger,继续检查下一个。否则,则根据trigger信息实例化JobRunShell
(实现了Thread接口),同时依据JOB_CLASS_NAME
实例化Job
,随后我们将JobRunShell
实例丢入工作线。
另外,关注公众号互联网架构师,在后台回复:面试,可以获取我整理的 Java 多线程系列教程,非常齐全。
JobRunShell
的run()
方法,Quartz会在执行job.execute()
的前后通知之前绑定的监听器,如果job.execute()
执行的过程中有异常抛出,则执行结果jobExEx
会保存异常信息,反之如果没有异常抛出,则jobExEx
为null。然后根据jobExEx
的不同,得到不同的执行指令instCode
。JobRunShell
将trigger信息,job信息和执行指令传给triggeredJobComplete()
方法来完成最后的数据表更新操作。例如如果job执行过程有异常抛出,就将这个trigger状态变为ERROR,如果是BLOCKED状态,就将其变为WAITING等等,最后从fired_triggers表中删除这个已经执行完成的trigger。注意,这些是在工作线程池异步完成。3.3 排查问题
在前文,我们可以看到,Quartz的调度过程中有3次(可选的)上锁行为,为什么称为可选?因为这三个步骤虽然在executeInNonManagedTXLock
方法的保护下,但executeInNonManagedTXLock
方法可以通过设置传入参数lockName为空,取消上锁。在翻阅代码时,我们看到第一步拉取待触发的trigger时:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {
String lockName;
//判断是否需要上锁
if (isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>(){
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
}, new TransactionValidator<List<OperableTrigger>>() {
//省略
});
}
在加锁之前对lockName做了一次判断,而非像其他加锁方法一样,默认传入的就是LOCK_TRIGGER_ACCESS:
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
//默认上锁
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
//省略
},new TransactionValidator<List<TriggerFiredResult>>() {
//省略
});
}
通过调试发现isAcquireTriggersWithinLock()
的值是false
,因而导致传入的lockName是null。我在代码中加入日志,可以更清楚的看到这个过程。
protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger)
throws JobPersistenceException {
JobDetail job;
Calendar cal = null;
// Make sure trigger wasn't deleted, paused, or completed...
try { // if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't select trigger state: "
+ e.getMessage(), e);
}
调度线程如果发现当前trigger的状态不是ACQUIRED,也就是说,这个trigger被其他线程fire了,就会返回null。在3.2,我们提到,在调度流程的第三步,如果发现某个trigger第二步的返回值是null,就会跳过第三步,取消fire。在通常的情况下,乐观锁能保证不发生重复调度,但是难免发生ABA问题,我们看一下这是发生重复调度时的日志:
3.4 解决办法
如何去解决这个问题呢?在配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true
,这样,在调度流程的第一步,也就是拉取待即将触发的triggers时,是上锁的状态,即不会同时存在多个线程拉取到相同的trigger的情况,也就避免的重复调度的危险。
3.5 心得
此次排查过程并非一帆风顺,走过一些坑,也有一些非技术相关的体会:
3)日志很重要。虽然我们可以调试,但是没有日志,我们是无法发现并证明,程序发生了ABA问题。
4)最重要的是,不要害怕问题,即使是Quartz这样大型的框架,解决问题也不一定需要把2.4MB的源码通通读懂。只要有时间,问题都能解决,只是好的技巧能缩短这个时间,而我们需要在一次次实战中磨练技巧。
-End-
正文结束
1.心态崩了!税前2万4,到手1万4,年终奖扣税方式1月1日起施行~