一、功能說明
SpringBoot的定時任務的加強工具,實現對SpringBoot原生的定時任務進行動態管理,完全兼容原生@Scheduled注解,無需對原本的定時任務進行修改
二、快速使用
具體的功能已經封裝成SpringBoot-starter即插即用
com.github.guoyixing spring-boot-starter-super-scheduled 0.3.1
三、實現原理
1、動態管理實現
(1) 配置管理介紹
@Component("superScheduledConfig") publicclassSuperScheduledConfig{ /** *執行定時任務的線程池 */ privateThreadPoolTaskSchedulertaskScheduler; /** *定時任務名稱與定時任務回調鉤子的關聯關系容器 */ privateMapnameToScheduledFuture=newConcurrentHashMap<>(); /** *定時任務名稱與定時任務需要執行的邏輯的關聯關系容器 */ privateMap nameToRunnable=newConcurrentHashMap<>(); /** *定時任務名稱與定時任務的源信息的關聯關系容器 */ privateMap nameToScheduledSource=newConcurrentHashMap<>(); /*普通的get/sets省略*/ }
(2) 使用后處理器攔截SpringBoot原本的定時任務
實現ApplicationContextAware接口拿到SpringBoot的上下文
實現BeanPostProcessor接口,將這個類標記為后處理器,后處理器會在每個bean實例化之后執行
使用@DependsOn注解強制依賴SuperScheduledConfig類,讓SpringBoot實例化SuperScheduledPostProcessor類之前先實例化SuperScheduledConfig類
主要實現邏輯在postProcessAfterInitialization()方法中
@DependsOn({"superScheduledConfig"}) @Component @Order publicclassSuperScheduledPostProcessorimplementsBeanPostProcessor,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateApplicationContextapplicationContext; /** *實例化bean之前的操作 *@parambeanbean實例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{ returnbean; } /** *實例化bean之后的操作 *@parambeanbean實例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName)throwsBeansException{ //1.獲取配置管理器 SuperScheduledConfigsuperScheduledConfig=applicationContext.getBean(SuperScheduledConfig.class); //2.獲取當前實例化完成的bean的所有方法 Method[]methods=bean.getClass().getDeclaredMethods(); //循環處理對每個方法逐一處理 if(methods.length>0){ for(Methodmethod:methods){ //3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時任務注解) Scheduledannotation=method.getAnnotation(Scheduled.class); //如果無法獲取到@Scheduled注解,就跳過這個方法 if(annotation==null){ continue; } //4.創建定時任務的源屬性 //創建定時任務的源屬性(用來記錄定時任務的配置,初始化的時候記錄的是注解上原本的屬性) ScheduledSourcescheduledSource=newScheduledSource(annotation,method,bean); //對注解上獲取到源屬性中的屬性進行檢測 if(!scheduledSource.check()){ thrownewSuperScheduledException("在"+beanName+"Bean中"+method.getName()+"方法的注解參數錯誤"); } //生成定時任務的名稱(id),使用beanName+“.”+方法名 Stringname=beanName+"."+method.getName(); //將以key-value的形式,將源數據存入配置管理器中,key:定時任務的名稱value:源數據 superScheduledConfig.addScheduledSource(name,scheduledSource); try{ //5.將原本SpringBoot的定時任務取消掉 clearOriginalScheduled(annotation); }catch(Exceptione){ thrownewSuperScheduledException("在關閉原始方法"+beanName+method.getName()+"時出現錯誤"); } } } //最后bean保持原有返回 returnbean; } /** *修改注解原先的屬性 *@paramannotation注解實例對象 *@throwsException */ privatevoidclearOriginalScheduled(Scheduledannotation)throwsException{ changeAnnotationValue(annotation,"cron",Scheduled.CRON_DISABLED); changeAnnotationValue(annotation,"fixedDelay",-1L); changeAnnotationValue(annotation,"fixedDelayString",""); changeAnnotationValue(annotation,"fixedRate",-1L); changeAnnotationValue(annotation,"fixedRateString",""); changeAnnotationValue(annotation,"initialDelay",-1L); changeAnnotationValue(annotation,"initialDelayString",""); } /** *獲取SpringBoot的上下文 *@paramapplicationContextSpringBoot的上下文 */ @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(3) 使用ApplicationRunner初始化自定義的定時任務運行器
實現ApplicationContextAware接口拿到SpringBoot的上下文
使用@DependsOn注解強制依賴threadPoolTaskScheduler類
實現ApplicationRunner接口,在所有bean初始化結束之后,運行自定義邏輯
主要實現邏輯在run()方法中
@DependsOn("threadPoolTaskScheduler") @Component publicclassSuperScheduledApplicationRunnerimplementsApplicationRunner,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); privateApplicationContextapplicationContext; /** *定時任務配置管理器 */ @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *定時任務執行線程 */ @Autowired privateThreadPoolTaskSchedulerthreadPoolTaskScheduler; @Override publicvoidrun(ApplicationArgumentsargs){ //1.定時任務配置管理器中緩存定時任務執行線程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.獲取所有定時任務源數據 MapnameToScheduledSource=superScheduledConfig.getNameToScheduledSource(); //逐一處理定時任務 for(Stringname:nameToScheduledSource.keySet()){ //3.獲取定時任務源數據 ScheduledSourcescheduledSource=nameToScheduledSource.get(name); //4.獲取所有增強類 String[]baseStrengthenBeanNames=applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.創建執行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); //配置執行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一處理增強類(增強器實現原理后面具體分析) List points=newArrayList<>(baseStrengthenBeanNames.length); for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //7.將增強器代理成point ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //創建代理 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //8.所有的points連成起來 points.add(proxy); } //將point形成調用鏈 runnable.setChain(newChain(points)); //將執行邏輯封裝并緩存到定時任務配置管理器中 superScheduledConfig.addRunnable(name,runnable::invoke); try{ //8.啟動定時任務 ScheduledFuture>schedule=ScheduledFutureFactory.create(threadPoolTaskScheduler ,scheduledSource,runnable::invoke); //將線程回調鉤子存到任務配置管理器中 superScheduledConfig.addScheduledFuture(name,schedule); logger.info(df.format(LocalDateTime.now())+"任務"+name+"已經啟動..."); }catch(Exceptione){ thrownewSuperScheduledException("任務"+name+"啟動失敗,錯誤信息:"+e.getLocalizedMessage()); } } } @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(4) 進行動態管理
@Component publicclassSuperScheduledManager{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *修改Scheduled的執行周期 * *@paramnamescheduled的名稱 *@paramcroncron表達式 */ publicvoidsetScheduledCron(Stringname,Stringcron){ //終止原先的任務 cancelScheduled(name); //創建新的任務 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedDelay * *@paramnamescheduled的名稱 *@paramfixedDelay上一次執行完畢時間點之后多長時間再執行 */ publicvoidsetScheduledFixedDelay(Stringname,LongfixedDelay){ //終止原先的任務 cancelScheduled(name); //創建新的任務 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedRate * *@paramnamescheduled的名稱 *@paramfixedRate上一次開始執行之后多長時間再執行 */ publicvoidsetScheduledFixedRate(Stringname,LongfixedRate){ //終止原先的任務 cancelScheduled(name); //創建新的任務 ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name,scheduledSource); } /** *查詢所有啟動的Scheduled */ publicListgetRunScheduledName(){ Set names=superScheduledConfig.getNameToScheduledFuture().keySet(); returnnewArrayList<>(names); } /** *查詢所有的Scheduled */ publicList getAllSuperScheduledName(){ Set names=superScheduledConfig.getNameToRunnable().keySet(); returnnewArrayList<>(names); } /** *終止Scheduled * *@paramnamescheduled的名稱 */ publicvoidcancelScheduled(Stringname){ ScheduledFuturescheduledFuture=superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now())+"任務"+name+"已經終止..."); } /** *啟動Scheduled * *@paramnamescheduled的名稱 *@paramscheduledSource定時任務的源信息 */ publicvoidaddScheduled(Stringname,ScheduledSourcescheduledSource){ if(getRunScheduledName().contains(name)){ thrownewSuperScheduledException("定時任務"+name+"已經被啟動過了"); } if(!scheduledSource.check()){ thrownewSuperScheduledException("定時任務"+name+"源數據內容錯誤"); } scheduledSource.refreshType(); Runnablerunnable=superScheduledConfig.getRunnable(name); ThreadPoolTaskSchedulertaskScheduler=superScheduledConfig.getTaskScheduler(); ScheduledFuture>schedule=ScheduledFutureFactory.create(taskScheduler,scheduledSource,runnable); logger.info(df.format(LocalDateTime.now())+"任務"+name+"已經啟動..."); superScheduledConfig.addScheduledSource(name,scheduledSource); superScheduledConfig.addScheduledFuture(name,schedule); } /** *以cron類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramcroncron表達式 */ publicvoidaddCronScheduled(Stringname,Stringcron){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *以fixedDelay類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramfixedDelay上一次執行完畢時間點之后多長時間再執行 *@paraminitialDelay第一次執行的延遲時間 */ publicvoidaddFixedDelayScheduled(Stringname,LongfixedDelay,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執行的延遲時間只能傳入一個參數"); } addScheduled(name,scheduledSource); } /** *以fixedRate類型啟動Scheduled * *@paramnamescheduled的名稱 *@paramfixedRate上一次開始執行之后多長時間再執行 *@paraminitialDelay第一次執行的延遲時間 */ publicvoidaddFixedRateScheduled(Stringname,LongfixedRate,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedRate(fixedRate); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執行的延遲時間只能傳入一個參數"); } addScheduled(name,scheduledSource); } /** *手動執行一次任務 * *@paramnamescheduled的名稱 */ publicvoidrunScheduled(Stringname){ Runnablerunnable=superScheduledConfig.getRunnable(name); runnable.run(); } }
2、增強接口實現
增強器實現的整體思路與SpringAop的思路一致,實現沒有Aop復雜
(1) 增強接口
@Order(Ordered.HIGHEST_PRECEDENCE) publicinterfaceBaseStrengthen{ /** *前置強化方法 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執行的方法對象 *@paramargs方法參數 */ voidbefore(Objectbean,Methodmethod,Object[]args); /** *后置強化方法 *出現異常不會執行 *如果未出現異常,在afterFinally方法之后執行 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執行的方法對象 *@paramargs方法參數 */ voidafter(Objectbean,Methodmethod,Object[]args); /** *異常強化方法 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執行的方法對象 *@paramargs方法參數 */ voidexception(Objectbean,Methodmethod,Object[]args); /** *Finally強化方法,出現異常也會執行 * *@parambeanbean實例(或者是被代理的bean) *@parammethod執行的方法對象 *@paramargs方法參數 */ voidafterFinally(Objectbean,Methodmethod,Object[]args); }
(2) 代理抽象類
publicabstractclassPoint{ /** *定時任務名 */ privateStringsuperScheduledName; /** *抽象的執行方法,使用代理實現 *@paramrunnable定時任務執行器 */ publicabstractObjectinvoke(SuperScheduledRunnablerunnable); /*普通的get/sets省略*/ }
(3) 調用鏈類
publicclassChain{ privateListlist; privateintindex=-1; /** *索引自增1 */ publicintincIndex(){ return++index; } /** *索引還原 */ publicvoidresetIndex(){ this.index=-1; } }
(4) cglib動態代理實現
使用cglib代理增強器,將增強器全部代理成調用鏈節點Point
publicclassRunnableBaseInterceptorimplementsMethodInterceptor{ /** *定時任務執行器 */ privateSuperScheduledRunnablerunnable; /** *定時任務增強類 */ privateBaseStrengthenstrengthen; @Override publicObjectintercept(Objectobj,Methodmethod,Object[]args,MethodProxymethodProxy)throwsThrowable{ Objectresult; //如果執行的是invoke()方法 if("invoke".equals(method.getName())){ //前置強化方法 strengthen.before(obj,method,args); try{ //調用執行器中的invoke()方法 result=runnable.invoke(); }catch(Exceptione){ //異常強化方法 strengthen.exception(obj,method,args); thrownewSuperScheduledException(strengthen.getClass()+"中強化執行時發生錯誤",e); }finally{ //Finally強化方法,出現異常也會執行 strengthen.afterFinally(obj,method,args); } //后置強化方法 strengthen.after(obj,method,args); }else{ //直接執行方法 result=methodProxy.invokeSuper(obj,args); } returnresult; } publicRunnableBaseInterceptor(Objectobject,SuperScheduledRunnablerunnable){ this.runnable=runnable; if(BaseStrengthen.class.isAssignableFrom(object.getClass())){ this.strengthen=(BaseStrengthen)object; }else{ thrownewSuperScheduledException(object.getClass()+"對象不是BaseStrengthen類型"); } } publicRunnableBaseInterceptor(){ } }
(5) 定時任務執行器實現
publicclassSuperScheduledRunnable{ /** *原始的方法 */ privateMethodmethod; /** *方法所在的bean */ privateObjectbean; /** *增強器的調用鏈 */ privateChainchain; publicObjectinvoke(){ Objectresult; //索引自增1 if(chain.incIndex()==chain.getList().size()){ //調用鏈中的增強方法已經全部執行結束 try{ //調用鏈索引初始化 chain.resetIndex(); //增強器全部執行完畢,執行原本的方法 result=method.invoke(bean); }catch(IllegalAccessException|InvocationTargetExceptione){ thrownewSuperScheduledException(e.getLocalizedMessage()); } }else{ //獲取被代理后的方法增強器 Pointpoint=chain.getList().get(chain.getIndex()); //執行增強器代理 //增強器代理中,會回調方法執行器,形成調用鏈,逐一運行調用鏈中的增強器 result=point.invoke(this); } returnresult; } /*普通的get/sets省略*/ }
(6) 增強器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner類中的代碼片段
//創建執行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用來存放增強器的代理對象 Listpoints=newArrayList<>(baseStrengthenBeanNames.length); //循環所有的增強器的beanName for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //獲取增強器的bean對象 ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //將增強器代理成Point節點 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //增強器的代理對象緩存到list中 points.add(proxy); } //將增強器代理實例的集合生成調用鏈 //執行控制器中設置調用鏈 runnable.setChain(newChain(points));
審核編輯:劉清
-
處理器
+關注
關注
68文章
19535瀏覽量
231859 -
控制器
+關注
關注
113文章
16573瀏覽量
180416 -
增強器
+關注
關注
1文章
47瀏覽量
8365 -
SpringBoot
+關注
關注
0文章
175瀏覽量
217
原文標題:SpringBoot 定時任務動態管理通用解決方案
文章出處:【微信號:AndroidPush,微信公眾號:Android編程精選】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論