一、SpringBatch 介紹
Spring Batch 是一個輕量級、全面的批處理框架,旨在支持開發對企業系統的日常操作至關重要的健壯的批處理應用程序。Spring Batch 建立在人們期望的 Spring Framework 特性(生產力、基于 POJO 的開發方法和一般易用性)的基礎上,同時使開發人員可以在必要時輕松訪問和使用更高級的企業服務。
Spring Batch 不是一個調度框架。在商業和開源領域都有許多優秀的企業調度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在與調度程序結合使用,而不是替代調度程序。
二、業務場景
我們在業務開發中經常遇到這種情況:
Spring Batch 支持以下業務場景:
定期提交批處理。
并發批處理:并行處理作業。
分階段的企業消息驅動處理。
大規模并行批處理。
失敗后手動或計劃重啟。
相關步驟的順序處理(擴展到工作流驅動的批次)。
部分處理:跳過記錄(例如,在回滾時)。
整批交易,適用于批量較小或已有存儲過程或腳本的情況。
三、基礎知識
3.1、整體架構
名稱 | 作用 |
---|---|
JobRepository | 為所有的原型(Job、JobInstance、Step)提供持久化的機制 |
JobLauncher | JobLauncher表示一個簡單的接口,用于啟動一個Job給定的集合 JobParameters |
Job | Job是封裝了整個批處理過程的實體 |
Step | Step是一個域對象,它封裝了批處理作業的一個獨立的順序階段 |
3.2、核心接口
ItemReader: is an abstraction that represents the output of a Step, one batch or chunk of items at a time
ItemProcessor:an abstraction that represents the business processing of an item.
ItemWriter: is an abstraction that represents the output of a Step, one batch or chunk of items at a time.
大體即為 輸入→數據加工→輸出 ,一個Job定義多個Step及處理流程,一個Step通常涵蓋ItemReader、ItemProcessor、ItemWriter
四、基礎實操
4.0、引入 SpringBatch
pom 文件引入 springboot
?? org.springframework.boot ??spring-boot-starter-parent ??2.2.5.RELEASE ???
pom 文件引入 spring-batch 及相關依賴
???? ?????? ????org.springframework.boot ??????spring-boot-starter-batch ?????????? ????org.springframework.boot ??????spring-boot-starter-validation ?????????? ????mysql ??????mysql-connector-java ?????????? ??org.springframework.boot ??????spring-boot-starter-jdbc ????
mysql 創建依賴的庫表
sql 腳本的 jar 包路徑:.....maven epositoryorgspringframeworkatchspring-batch-core4.2.1.RELEASEspring-batch-core-4.2.1.RELEASE.jar!orgspringframeworkatchcoreschema-mysql.sql
啟動類標志@EnableBatchProcessing
@SpringBootApplication @EnableBatchProcessing public?class?SpringBatchStartApplication { ????public?static?void?main(String[]?args)?{ ????????SpringApplication.run(SpringBatchStartApplication.class,?args); ????} }
FirstJobDemo
@Component public?class?FirstJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?firstJob()?{ ????????return?jobBuilderFactory.get("firstJob") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?{ ????????return?stepBuilderFactory.get("step") ????????????????.tasklet((contribution,?chunkContext)?->?{ ????????????????????System.out.println("執行步驟...."); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} }
4.1、流程控制
A、多步驟任務
@Bean public?Job?multiStepJob()?{ ????return?jobBuilderFactory.get("multiStepJob2") ????????????.start(step1()) ????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2()) ????????????.from(step2()) ????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3()) ????????????.from(step3()).end() ????????????.build(); } private?Step?step1()?{ ????return?stepBuilderFactory.get("step1") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執行步驟一操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); } private?Step?step2()?{ ????return?stepBuilderFactory.get("step2") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執行步驟二操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); } private?Step?step3()?{ ????return?stepBuilderFactory.get("step3") ????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????System.out.println("執行步驟三操作。。。"); ????????????????return?RepeatStatus.FINISHED; ????????????}).build(); }
B、并行執行
創建了兩個 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactory的split方法,指定一個異步執行器,將 flow1 和 flow2 異步執行(也就是并行)
@Component public?class?SplitJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?splitJob()?{ ????????return?jobBuilderFactory.get("splitJob") ????????????????.start(flow1()) ????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2()) ????????????????.end() ????????????????.build(); ????} ????private?Step?step1()?{ ????????return?stepBuilderFactory.get("step1") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執行步驟一操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Step?step2()?{ ????????return?stepBuilderFactory.get("step2") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執行步驟二操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Step?step3()?{ ????????return?stepBuilderFactory.get("step3") ????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????System.out.println("執行步驟三操作。。。"); ????????????????????return?RepeatStatus.FINISHED; ????????????????}).build(); ????} ????private?Flow?flow1()?{ ????????return?new?FlowBuilder("flow1") ????????????????.start(step1()) ????????????????.next(step2()) ????????????????.build(); ????} ????private?Flow?flow2()?{ ????????return?new?FlowBuilder ("flow2") ????????????????.start(step3()) ????????????????.build(); ????} }
C、任務決策
決策器的作用就是可以指定程序在不同的情況下運行不同的任務流程,比如今天是周末,則讓任務執行 step1 和 step2,如果是工作日,則之心 step1 和 step3。
@Component public?class?MyDecider?implements?JobExecutionDecider?{ ????@Override ????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{ ????????LocalDate?now?=?LocalDate.now(); ????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek(); ????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{ ????????????return?new?FlowExecutionStatus("weekend"); ????????}?else?{ ????????????return?new?FlowExecutionStatus("workingDay"); ????????} ????} } @Bean public?Job?deciderJob()?{ ?return?jobBuilderFactory.get("deciderJob") ???.start(step1()) ???.next(myDecider) ???.from(myDecider).on("weekend").to(step2()) ???.from(myDecider).on("workingDay").to(step3()) ???.from(step3()).on("*").to(step4()) ???.end() ???.build(); } private?Step?step1()?{ ?return?stepBuilderFactory.get("step1") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執行步驟一操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step2()?{ ?return?stepBuilderFactory.get("step2") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執行步驟二操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step3()?{ ?return?stepBuilderFactory.get("step3") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執行步驟三操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } private?Step?step4()?{ ?return?stepBuilderFactory.get("step4") ???.tasklet((stepContribution,?chunkContext)?->?{ ????System.out.println("執行步驟四操作。。。"); ????return?RepeatStatus.FINISHED; ???}).build(); } D、任務嵌套
任務 Job 除了可以由 Step 或者 Flow 構成外,我們還可以將多個任務 Job 轉換為特殊的 Step,然后再賦給另一個任務 Job,這就是任務的嵌套。
@Component public?class?NestedJobDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Autowired ????private?JobLauncher?jobLauncher; ????@Autowired ????private?JobRepository?jobRepository; ????@Autowired ????private?PlatformTransactionManager?platformTransactionManager; ????//?父任務 ????@Bean ????public?Job?parentJob()?{ ????????return?jobBuilderFactory.get("parentJob") ????????????????.start(childJobOneStep()) ????????????????.next(childJobTwoStep()) ????????????????.build(); ????} ????//?將任務轉換為特殊的步驟 ????private?Step?childJobOneStep()?{ ????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep")) ????????????????.job(childJobOne()) ????????????????.launcher(jobLauncher) ????????????????.repository(jobRepository) ????????????????.transactionManager(platformTransactionManager) ????????????????.build(); ????} ????//?將任務轉換為特殊的步驟 ????private?Step?childJobTwoStep()?{ ????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep")) ????????????????.job(childJobTwo()) ????????????????.launcher(jobLauncher) ????????????????.repository(jobRepository) ????????????????.transactionManager(platformTransactionManager) ????????????????.build(); ????} ????//?子任務一 ????private?Job?childJobOne()?{ ????????return?jobBuilderFactory.get("childJobOne") ????????????????.start( ????????????????????stepBuilderFactory.get("childJobOneStep") ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????????????????System.out.println("子任務一執行步驟。。。"); ????????????????????????????????return?RepeatStatus.FINISHED; ????????????????????????????}).build() ????????????????).build(); ????} ????//?子任務二 ????private?Job?childJobTwo()?{ ????????return?jobBuilderFactory.get("childJobTwo") ????????????????.start( ????????????????????stepBuilderFactory.get("childJobTwoStep") ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{ ????????????????????????????????System.out.println("子任務二執行步驟。。。"); ????????????????????????????????return?RepeatStatus.FINISHED; ????????????????????????????}).build() ????????????????).build(); ????} }
4.2、讀取數據
定義 Model TestData,下面同一
@Data public?class?TestData?{ ????private?int?id; ????private?String?field1; ????private?String?field2; ????private?String?field3; }
讀取數據包含:文本數據讀取、數據庫數據讀取、XML 數據讀取、JSON 數據讀取等,具體自己查資料。
文本數據讀取 Demo
@Component public?class?FileItemReaderDemo?{ ????//?任務創建工廠 ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????//?步驟創建工廠 ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Bean ????public?Job?fileItemReaderJob()?{ ????????return?jobBuilderFactory.get("fileItemReaderJob2") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?{ ????????return?stepBuilderFactory.get("step") ????????????????.chunk(2) ????????????????.reader(fileItemReader()) ????????????????.writer(list?->?list.forEach(System.out::println)) ????????????????.build(); ????} ????private?ItemReader ?fileItemReader()?{ ????????FlatFileItemReader ?reader?=?new?FlatFileItemReader<>(); ????????reader.setResource(new?ClassPathResource("reader/file"));?//?設置文件資源地址 ????????reader.setLinesToSkip(1);?//?忽略第一行 ????????//?AbstractLineTokenizer的三個實現類之一,以固定分隔符處理行數據讀取, ????????//?使用默認構造器的時候,使用逗號作為分隔符,也可以通過有參構造器來指定分隔符 ????????DelimitedLineTokenizer?tokenizer?=?new?DelimitedLineTokenizer(); ????????//?設置屬性名,類似于表頭 ????????tokenizer.setNames("id",?"field1",?"field2",?"field3"); ????????//?將每行數據轉換為TestData對象 ????????DefaultLineMapper ?mapper?=?new?DefaultLineMapper<>(); ????????//?設置LineTokenizer ????????mapper.setLineTokenizer(tokenizer); ????????//?設置映射方式,即讀取到的文本怎么轉換為對應的POJO ????????mapper.setFieldSetMapper(fieldSet?->?{ ????????????TestData?data?=?new?TestData(); ????????????data.setId(fieldSet.readInt("id")); ????????????data.setField1(fieldSet.readString("field1")); ????????????data.setField2(fieldSet.readString("field2")); ????????????data.setField3(fieldSet.readString("field3")); ????????????return?data; ????????}); ????????reader.setLineMapper(mapper); ????????return?reader; ????} }
4.3、輸出數據
輸出數據也包含:文本數據讀取、數據庫數據讀取、XML 數據讀取、JSON 數據讀取等
Component public?class?FileItemWriterDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Resource(name?=?"writerSimpleReader") ????private?ListItemReader?writerSimpleReader; ????@Bean ????public?Job?fileItemWriterJob()?throws?Exception?{ ????????return?jobBuilderFactory.get("fileItemWriterJob") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?throws?Exception?{ ????????return?stepBuilderFactory.get("step") ????????????????. chunk(2) ????????????????.reader(writerSimpleReader) ????????????????.writer(fileItemWriter()) ????????????????.build(); ????} ????private?FlatFileItemWriter ?fileItemWriter()?throws?Exception?{ ????????FlatFileItemWriter ?writer?=?new?FlatFileItemWriter<>(); ????????FileSystemResource?file?=?new?FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file"); ????????Path?path?=?Paths.get(file.getPath()); ????????if?(!Files.exists(path))?{ ????????????Files.createFile(path); ????????} ????????//?設置輸出文件路徑 ????????writer.setResource(file); ????????//?把讀到的每個TestData對象轉換為JSON字符串 ????????LineAggregator ?aggregator?=?item?->?{ ????????????try?{ ????????????????ObjectMapper?mapper?=?new?ObjectMapper(); ????????????????return?mapper.writeValueAsString(item); ????????????}?catch?(JsonProcessingException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????return?""; ????????}; ????????writer.setLineAggregator(aggregator); ????????writer.afterPropertiesSet(); ????????return?writer; ????} }
4.5、處理數據
@Component
public?class?ValidatingItemProcessorDemo?{ ????@Autowired ????private?JobBuilderFactory?jobBuilderFactory; ????@Autowired ????private?StepBuilderFactory?stepBuilderFactory; ????@Resource(name?=?"processorSimpleReader") ????private?ListItemReader?processorSimpleReader; ????@Bean ????public?Job?validatingItemProcessorJob()?throws?Exception?{ ????????return?jobBuilderFactory.get("validatingItemProcessorJob3") ????????????????.start(step()) ????????????????.build(); ????} ????private?Step?step()?throws?Exception?{ ????????return?stepBuilderFactory.get("step") ????????????????. chunk(2) ????????????????.reader(processorSimpleReader) ????????????????.processor(beanValidatingItemProcessor()) ????????????????.writer(list?->?list.forEach(System.out::println)) ????????????????.build(); ????} //????private?ValidatingItemProcessor ?validatingItemProcessor()?{ //????????ValidatingItemProcessor ?processor?=?new?ValidatingItemProcessor<>(); //????????processor.setValidator(value?->?{ //????????????//?對每一條數據進行校驗 //????????????if?("".equals(value.getField3()))?{ //????????????????//?如果field3的值為空串,則拋異常 //????????????????throw?new?ValidationException("field3的值不合法"); //????????????} //????????}); //????????return?processor; //????} ????private?BeanValidatingItemProcessor ?beanValidatingItemProcessor()?throws?Exception?{ ????????BeanValidatingItemProcessor ?beanValidatingItemProcessor?=?new?BeanValidatingItemProcessor<>(); ????????//?開啟過濾,不符合規則的數據被過濾掉; //????????beanValidatingItemProcessor.setFilter(true); ????????beanValidatingItemProcessor.afterPropertiesSet(); ????????return?beanValidatingItemProcessor; ????} }
4.6、任務調度
可以配合 quartz 或者 xxljob 實現定時任務執行
@RestController @RequestMapping("job") public?class?JobController?{ ????@Autowired ????private?Job?job; ????@Autowired ????private?JobLauncher?jobLauncher; ????@GetMapping("launcher/{message}") ????public?String?launcher(@PathVariable?String?message)?throws?Exception?{ ????????JobParameters?parameters?=?new?JobParametersBuilder() ????????????????.addString("message",?message) ????????????????.toJobParameters(); ????????//?將參數傳遞給任務 ????????jobLauncher.run(job,?parameters); ????????return?"success"; ????} } 編輯:黃飛
評論
查看更多