SpringBoot 定时任务动态管理通用解决方案

Java技术迷

共 17898字,需浏览 36分钟

 ·

2022-01-05 00:44

点击关注公众号,Java干货及时送达

来源:blog.csdn.net/qq_34886352/article/details/106494637


    一、功能说明

    SpringBoot的定时任务的加强工具,实现对SpringBoot原生的定时任务进行动态管理,完全兼容原生@Scheduled注解,无需对原本的定时任务进行修改

    推荐下自己做的 Spring Boot 的实战项目:

    https://github.com/YunaiV/ruoyi-vue-pro

    二、快速使用

    具体的功能已经封装成SpringBoot-starter即插即用

    <dependency>
        <groupId>com.github.guoyixinggroupId>
        <artifactId>spring-boot-starter-super-scheduledartifactId>
        <version>0.3.1version>
    dependency>

    使用方法和源码:

    码云:https://gitee.com/qiaodaimadewangcai/super-scheduled

    github:https://github.com/guoyixing/super-scheduled

    推荐下自己做的 Spring Cloud 的实战项目:

    https://github.com/YunaiV/onemall

    三、实现原理

    1、动态管理实现

    (1) 配置管理介绍

    @Component("superScheduledConfig")
    public class SuperScheduledConfig {
        /**
         * 执行定时任务的线程池
         */

        private ThreadPoolTaskScheduler taskScheduler;

        /**
         * 定时任务名称与定时任务回调钩子  的关联关系容器
         */

        private Map nameToScheduledFuture = new ConcurrentHashMap<>();

        /**
         * 定时任务名称与定时任务需要执行的逻辑  的关联关系容器
         */

        private Map nameToRunnable = new ConcurrentHashMap<>();

        /**
         * 定时任务名称与定时任务的源信息  的关联关系容器
         */

        private Map nameToScheduledSource = new ConcurrentHashMap<>();
     /* 普通的get/sets省略 */
    }

    (2) 使用后处理器拦截SpringBoot原本的定时任务

    • 实现ApplicationContextAware接口拿到SpringBoot的上下文
    • 实现BeanPostProcessor接口,将这个类标记为后处理器,后处理器会在每个bean实例化之后执行
    • 使用@DependsOn注解强制依赖SuperScheduledConfig类,让SpringBoot实例化SuperScheduledPostProcessor类之前先实例化SuperScheduledConfig类
    • 主要实现逻辑在postProcessAfterInitialization()方法中
    @DependsOn({"superScheduledConfig"})
    @Component
    @Order
    public class SuperScheduledPostProcessor implements BeanPostProcessorApplicationContextAware {
        protected final Log logger = LogFactory.getLog(getClass());

        private ApplicationContext applicationContext;

        /**
         * 实例化bean之前的操作
         * @param bean bean实例
         * @param beanName bean的Name
         */

        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
            return bean;
        }

        /**
         * 实例化bean之后的操作
         * @param bean bean实例
         * @param beanName bean的Name
         */

        @Override
        public Object postProcessAfterInitialization(Object bean,
                                                     String beanName)
     throws BeansException 
    {
            //1.获取配置管理器
            SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);

            //2.获取当前实例化完成的bean的所有方法
            Method[] methods = bean.getClass().getDeclaredMethods();
            //循环处理对每个方法逐一处理
            if (methods.length > 0) {
                for (Method method : methods) {
                 //3.尝试在该方法上获取@Scheduled注解(SpringBoot的定时任务注解)
                    Scheduled annotation = method.getAnnotation(Scheduled.class);
                    //如果无法获取到@Scheduled注解,就跳过这个方法
                    if (annotation == null) {
                        continue;
                    }
                    //4.创建定时任务的源属性
                    //创建定时任务的源属性(用来记录定时任务的配置,初始化的时候记录的是注解上原本的属性)
                    ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
                    //对注解上获取到源属性中的属性进行检测
                    if (!scheduledSource.check()) {
                        throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解参数错误");
                    }
                    //生成定时任务的名称(id),使用beanName+“.”+方法名
                    String name = beanName + "." + method.getName();
                    //将以key-value的形式,将源数据存入配置管理器中,key:定时任务的名称 value:源数据
                    superScheduledConfig.addScheduledSource(name, scheduledSource);
                    try {
                     //5.将原本SpringBoot的定时任务取消掉
                        clearOriginalScheduled(annotation);
                    } catch (Exception e) {
                        throw new SuperScheduledException("在关闭原始方法" + beanName + method.getName() + "时出现错误");
                    }
                }
            }
            //最后bean保持原有返回
            return bean;
        }

        /**
         * 修改注解原先的属性
         * @param annotation 注解实例对象
         * @throws Exception
         */

        private void clearOriginalScheduled(Scheduled annotation) throws Exception {
            changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);
            changeAnnotationValue(annotation, "fixedDelay", -1L);
            changeAnnotationValue(annotation, "fixedDelayString""");
            changeAnnotationValue(annotation, "fixedRate", -1L);
            changeAnnotationValue(annotation, "fixedRateString""");
            changeAnnotationValue(annotation, "initialDelay", -1L);
            changeAnnotationValue(annotation, "initialDelayString""");
        }


        /**
         * 获取SpringBoot的上下文
         * @param applicationContext SpringBoot的上下文
         */

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    (3) 使用ApplicationRunner初始化自定义的定时任务运行器

    • 实现ApplicationContextAware接口拿到SpringBoot的上下文
    • 使用@DependsOn注解强制依赖threadPoolTaskScheduler类
    • 实现ApplicationRunner接口,在所有bean初始化结束之后,运行自定义逻辑
    • 主要实现逻辑在run()方法中
    @DependsOn("threadPoolTaskScheduler")
    @Component
    public class SuperScheduledApplicationRunner implements ApplicationRunnerApplicationContextAware {
        protected final Log logger = LogFactory.getLog(getClass());
        private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        private ApplicationContext applicationContext;
     
     /**
         * 定时任务配置管理器
         */

        @Autowired
        private SuperScheduledConfig superScheduledConfig;
        /**
         * 定时任务执行线程
         */

        @Autowired
        private ThreadPoolTaskScheduler threadPoolTaskScheduler;

        @Override
        public void run(ApplicationArguments args) {
         //1.定时任务配置管理器中缓存  定时任务执行线程
            superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
            //2.获取所有定时任务源数据
            Map nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
            //逐一处理定时任务
            for (String name : nameToScheduledSource.keySet()) {
                //3.获取定时任务源数据
                ScheduledSource scheduledSource = nameToScheduledSource.get(name);
                //4.获取所有增强类
                String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
                //5.创建执行控制器
                SuperScheduledRunnable runnable = new SuperScheduledRunnable();
                //配置执行控制器
                runnable.setMethod(scheduledSource.getMethod());
                runnable.setBean(scheduledSource.getBean());
                //6.逐一处理增强类(增强器实现原理后面具体分析)
                List points = new ArrayList<>(baseStrengthenBeanNames.length);
                for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
                 //7.将增强器代理成point
                    Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
                    //创建代理
                    Point proxy = ProxyUtils.getInstance(Point.classnew RunnableBaseInterceptor(baseStrengthenBeanrunnable));
                    proxy.setSuperScheduledName(name);
                    //8.所有的points连成起来
                    points.add(proxy);
                }
       //将point形成调用链
                runnable.setChain(new Chain(points));
                //将执行逻辑封装并缓存到定时任务配置管理器中
                superScheduledConfig.addRunnable(name, runnable::invoke);
                try {
                 //8.启动定时任务
                    ScheduledFuture schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
                            , scheduledSource, runnable::invoke);
                    //将线程回调钩子存到任务配置管理器中
                    superScheduledConfig.addScheduledFuture(name, schedule);
                    logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");

                } catch (Exception e) {
                    throw new SuperScheduledException("任务" + name + "启动失败,错误信息:" + e.getLocalizedMessage());
                }
            }
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    (4) 进行动态管理

    @Component
    public class SuperScheduledManager {
        protected final Log logger = LogFactory.getLog(getClass());
        private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        @Autowired
        private SuperScheduledConfig superScheduledConfig;

        /**
         * 修改Scheduled的执行周期
         *
         * @param name scheduled的名称
         * @param cron cron表达式
         */

        public void setScheduledCron(String name, String cron) {
            //终止原先的任务
            cancelScheduled(name);
            //创建新的任务
            ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
            scheduledSource.clear();
            scheduledSource.setCron(cron);
            addScheduled(name, scheduledSource);
        }

        /**
         * 修改Scheduled的fixedDelay
         *
         * @param name       scheduled的名称
         * @param fixedDelay 上一次执行完毕时间点之后多长时间再执行
         */

        public void setScheduledFixedDelay(String name, Long fixedDelay) {
            //终止原先的任务
            cancelScheduled(name);
            //创建新的任务
            ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
            scheduledSource.clear();
            scheduledSource.setFixedDelay(fixedDelay);
            addScheduled(name, scheduledSource);
        }

        /**
         * 修改Scheduled的fixedRate
         *
         * @param name      scheduled的名称
         * @param fixedRate 上一次开始执行之后多长时间再执行
         */

        public void setScheduledFixedRate(String name, Long fixedRate) {
            //终止原先的任务
            cancelScheduled(name);
            //创建新的任务
            ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
            scheduledSource.clear();
            scheduledSource.setFixedRate(fixedRate);
            addScheduled(name, scheduledSource);
        }

        /**
         * 查询所有启动的Scheduled
         */

        public List getRunScheduledName() {
            Set names = superScheduledConfig.getNameToScheduledFuture().keySet();
            return new ArrayList<>(names);
        }

        /**
         * 查询所有的Scheduled
         */

        public List getAllSuperScheduledName() {
            Set names = superScheduledConfig.getNameToRunnable().keySet();
            return new ArrayList<>(names);
        }

        /**
         * 终止Scheduled
         *
         * @param name scheduled的名称
         */

        public void cancelScheduled(String name) {
            ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
            scheduledFuture.cancel(true);
            superScheduledConfig.removeScheduledFuture(name);
            logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经终止...");
        }

        /**
         * 启动Scheduled
         *
         * @param name            scheduled的名称
         * @param scheduledSource 定时任务的源信息
         */

        public void addScheduled(String name, ScheduledSource scheduledSource) {
            if (getRunScheduledName().contains(name)) {
                throw new SuperScheduledException("定时任务" + name + "已经被启动过了");
            }
            if (!scheduledSource.check()) {
                throw new SuperScheduledException("定时任务" + name + "源数据内容错误");
            }

            scheduledSource.refreshType();

            Runnable runnable = superScheduledConfig.getRunnable(name);
            ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();


            ScheduledFuture schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);
            logger.info(df.format(LocalDateTime.now()) + "任务" + name + "已经启动...");

            superScheduledConfig.addScheduledSource(name, scheduledSource);
            superScheduledConfig.addScheduledFuture(name, schedule);
        }

        /**
         * 以cron类型启动Scheduled
         *
         * @param name scheduled的名称
         * @param cron cron表达式
         */

        public void addCronScheduled(String name, String cron) {
            ScheduledSource scheduledSource = new ScheduledSource();
            scheduledSource.setCron(cron);

            addScheduled(name, scheduledSource);
        }

        /**
         * 以fixedDelay类型启动Scheduled
         *
         * @param name         scheduled的名称
         * @param fixedDelay   上一次执行完毕时间点之后多长时间再执行
         * @param initialDelay 第一次执行的延迟时间
         */

        public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
            ScheduledSource scheduledSource = new ScheduledSource();
            scheduledSource.setFixedDelay(fixedDelay);
            if (initialDelay != null && initialDelay.length == 1) {
                scheduledSource.setInitialDelay(initialDelay[0]);
            } else if (initialDelay != null && initialDelay.length > 1) {
                throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
            }

            addScheduled(name, scheduledSource);
        }

        /**
         * 以fixedRate类型启动Scheduled
         *
         * @param name         scheduled的名称
         * @param fixedRate    上一次开始执行之后多长时间再执行
         * @param initialDelay 第一次执行的延迟时间
         */

        public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
            ScheduledSource scheduledSource = new ScheduledSource();
            scheduledSource.setFixedRate(fixedRate);
            if (initialDelay != null && initialDelay.length == 1) {
                scheduledSource.setInitialDelay(initialDelay[0]);
            } else if (initialDelay != null && initialDelay.length > 1) {
                throw new SuperScheduledException("第一次执行的延迟时间只能传入一个参数");
            }

            addScheduled(name, scheduledSource);
        }

        /**
         * 手动执行一次任务
         *
         * @param name scheduled的名称
         */

        public void runScheduled(String name) {
            Runnable runnable = superScheduledConfig.getRunnable(name);
            runnable.run();
        }
    }

    2、增强接口实现

    增强器实现的整体思路与SpringAop的思路一致,实现没有Aop复杂

    (1) 增强接口

    @Order(Ordered.HIGHEST_PRECEDENCE)
    public interface BaseStrengthen {
        /**
         * 前置强化方法
         *
         * @param bean   bean实例(或者是被代理的bean)
         * @param method 执行的方法对象
         * @param args   方法参数
         */

        void before(Object bean, Method method, Object[] args);

        /**
         * 后置强化方法
         * 出现异常不会执行
         * 如果未出现异常,在afterFinally方法之后执行
         *
         * @param bean   bean实例(或者是被代理的bean)
         * @param method 执行的方法对象
         * @param args   方法参数
         */

        void after(Object bean, Method method, Object[] args);

        /**
         * 异常强化方法
         *
         * @param bean   bean实例(或者是被代理的bean)
         * @param method 执行的方法对象
         * @param args   方法参数
         */

        void exception(Object bean, Method method, Object[] args);

        /**
         * Finally强化方法,出现异常也会执行
         *
         * @param bean   bean实例(或者是被代理的bean)
         * @param method 执行的方法对象
         * @param args   方法参数
         */

        void afterFinally(Object bean, Method method, Object[] args);
    }

    (2) 代理抽象类

    public abstract class Point {
        /**
         * 定时任务名
         */

        private String superScheduledName;

        /**
         * 抽象的执行方法,使用代理实现
         * @param runnable 定时任务执行器
         */

        public abstract Object invoke(SuperScheduledRunnable runnable);
        
        /* 普通的get/sets省略 */
    }

    (3) 调用链类

    public class Chain {
        private List list;
        private int index = -1;
        /**
         * 索引自增1
         */

        public int incIndex() {
            return ++index;
        }

        /**
         * 索引还原
         */

        public void resetIndex() {
            this.index = -1;
        }
    }

    (4) cglib动态代理实现

    使用cglib代理增强器,将增强器全部代理成调用链节点Point

    public class RunnableBaseInterceptor implements MethodInterceptor {
        /**
         * 定时任务执行器
         */

        private SuperScheduledRunnable runnable;
        /**
         * 定时任务增强类
         */

        private BaseStrengthen strengthen;

        @Override
        public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
            Object result;
            //如果执行的是invoke()方法
            if ("invoke".equals(method.getName())) {
             //前置强化方法
                strengthen.before(obj, method, args);
                try {
                 //调用执行器中的invoke()方法
                    result = runnable.invoke();
                } catch (Exception e) {
                 //异常强化方法
                    strengthen.exception(obj, method, args);
                    throw new SuperScheduledException(strengthen.getClass() + "中强化执行时发生错误", e);
                } finally {
                 //Finally强化方法,出现异常也会执行
                    strengthen.afterFinally(obj, method, args);
                }
                //后置强化方法
                strengthen.after(obj, method, args);

            } else {
             //直接执行方法
                result = methodProxy.invokeSuper(obj, args);
            }
            return result;
        }

        public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
            this.runnable = runnable;
            if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {
                this.strengthen = (BaseStrengthen) object;
            } else {
                throw new SuperScheduledException(object.getClass() + "对象不是BaseStrengthen类型");
            }
        }

        public RunnableBaseInterceptor() {

        }
    }

    (5) 定时任务执行器实现

    public class SuperScheduledRunnable {
        /**
         * 原始的方法
         */

        private Method method;
        /**
         * 方法所在的bean
         */

        private Object bean;
        /**
         * 增强器的调用链
         */

        private Chain chain;


        public Object invoke() {
            Object result;
            //索引自增1
            if (chain.incIndex() == chain.getList().size()) {
                //调用链中的增强方法已经全部执行结束
                try {
                    //调用链索引初始化
                    chain.resetIndex();
                    //增强器全部执行完毕,执行原本的方法
                    result = method.invoke(bean);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new SuperScheduledException(e.getLocalizedMessage());
                }
            } else {
                //获取被代理后的方法增强器
                Point point = chain.getList().get(chain.getIndex());
                //执行增强器代理
                //增强器代理中,会回调方法执行器,形成调用链,逐一运行调用链中的增强器
                result = point.invoke(this);
            }
            return result;
        }
        
        /* 普通的get/sets省略 */
    }

    (6) 增强器代理逻辑

    com.gyx.superscheduled.core.SuperScheduledApplicationRunner类中的代码片段

    //创建执行控制器
    SuperScheduledRunnable runnable = new SuperScheduledRunnable();
    runnable.setMethod(scheduledSource.getMethod());
    runnable.setBean(scheduledSource.getBean());
    //用来存放 增强器的代理对象
    List points = new ArrayList<>(baseStrengthenBeanNames.length);
    //循环所有的增强器的beanName
    for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
     //获取增强器的bean对象
        Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);
        //将增强器代理成Point节点
        Point proxy = ProxyUtils.getInstance(Point.classnew RunnableBaseInterceptor(baseStrengthenBeanrunnable));
        proxy.setSuperScheduledName(name);
        //增强器的代理对象缓存到list中
        points.add(proxy);
    }
    //将增强器代理实例的集合生成调用链
    //执行控制器中设置调用链
    runnable.setChain(new Chain(points));

    1、致歉!抖音Semi Design承认参考阿里Ant Design

    2、对比7种分布式事务方案,还是偏爱阿里开源的Seata,真香!

    3、Redis存储结构体信息,选hash还是string?

    4、扫盲 docker 常用命令

    5、最全分布式Session解决方案

    6、21 款 yyds 的 IDEA插件

    7、真香!用 IDEA 神器看源码,效率真高!

    点分享

    点收藏

    点点赞

    点在看

    浏览 27
    点赞
    评论
    收藏
    分享

    手机扫一扫分享

    分享
    举报
    评论
    图片
    表情
    推荐
    点赞
    评论
    收藏
    分享

    手机扫一扫分享

    分享
    举报