基于 Spring Batch 和 XXL-Job 的批处理任务实现
springbatch springinteger批处理作业
1 添加依赖
org.springframework.batch spring - batch - integration 你的 Spring Batch 版本 org.springframework.batch spring - batch - core 你的 Spring Batch 版本 org.springframework.integration spring - integration - core 你的 Spring Integration 版本
spring - batch - integration 依赖:它能帮你将 Spring Batch(批处理框架 )与 Spring Integration(企业集成模式框架 )结合,实现批处理作业和外部系统(如消息队列 )交互。
2 核心概念
spring - batch - integration 主要用于:
(1)触发批处理作业:通过 Spring Integration 的消息(如从消息队列获取消息 )
(2)触发 Spring Batch 作业执行,实现事件驱动的批处理。
作业结果交互:把 Spring Batch 作业执行结果(成功、失败、输出数据等 )通过 Spring Integration 发送到其他系统(如发消息通知监控平台 )。
关键组件和流程:
(1)JobLaunchingGateway:作为网关,接收消息(比如消息里包含作业参数 ),触发 Spring Batch 作业启动。
(2)JobExecutionEvent:作业执行过程中产生的事件(开始、结束、失败等 ),可通过 Spring Integration 通道传递,用于监听和后续处理。
(3)消息通道(Channel ):Spring Integration 里传递消息的通道,连接不同组件,像把触发作业的消息传到 JobLaunchingGateway,把作业事件消息传到监听器。
3 代码示例
3.1 包含以下几个核心部分
- job定义:ptbTBJob() 定义了一个完整的批处理任务
- Step 步骤:step01() 定义了任务的具体处理步骤,包括读取、处理、写入
- 数据读取:multiResourceItemReader() 读取数据(通常是文件)
- 数据写入:zjtbWriter() 处理并写入数据
- 任务监听:PtbTBJobListener 监控任务执行前后的状态
- 调度触发:通过 XXL-Job 定时调用这个批处理任务
@Configuration @Slf4j @EnableBatchProcessing public class PtbTBBatchConfig { @Bean public Job ptbTBJob() { // 通过jobBuilderFactory构建一个Job,get方法参数为Job的name return jobBuilderFactory.get(\"ptbTBJob\") .incrementer(new RunIdIncrementer()) .start(step01()) .listener(ptbTBJobListener()) .build(); } @Bean public Step step01() { return stepBuilderFactory.get(\"step01\") .chunk(5000) .reader(multiResourceItemReader(\"\"))// .processor(itemProcessor1()) 这里注释掉了 processor,可能在 writer 中直接处理 .writer(zjtbWriter()) .build(); }// 创建监听@Bean public PtbTBJobListener ptbTBJobListener() { return new PtbTBJobListener(); } }
调度触发:通过 XXL-Job 定时调用这个批处理任务:
- 监听类
@Slf4jpublic class PtbTBJobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { String date = jobExecution.getJobParameters().getString(\"date\"); // 判断文件是否存在 // SFTP文件下载流程开始 } @Override public void afterJob(JobExecution jobExecution) { String date = jobExecution.getJobParameters().getString(\"date\"); } }
@Slf4j@Componentpublic class XxlJobFileReportHandler { // 引入声明的job的实例@Autowired @Qualifier(\"ptbTBJob\") private Job ptbTBJob; /** * zjTbFileReportHandlerTiming 定时 T-1 拉取**日结文件,并入表 */ @XxlJob(\"zjTbFileReportHandlerTiming\") public void zjTbFileReportHandlerTiming() { StopWatch stopWatch = new StopWatch(); stopWatch.start(\"Finished executing \" + ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming \" + LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD) + \" took \"); List rulProjectConfigListFailed = new ArrayList(); try { log.info(ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming===>日期:{}\", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD)); String date = LocalDate.now().minusDays(1).format(DATE_FORMATTER_YYYYMMDD); //拉取日结文件 fileReportJob(\"zjTbFileReportHandlerTiming\",date,ptbTBJob); log.info(ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming结束执行===>日期:{}, 异常项目信息集合为:{}\", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), rulProjectConfigListFailed.stream().map(RulProjectConfig::toLogString).collect(Collectors.toList())); } catch (Exception e) { log.error(ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming执行异常===>日期:{}\", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), e); XxlJobHelper.log(ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming执行异常===>日期:{}, 异常为:{}\", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), e.getMessage()); XxlJobHelper.handleFail(ZFTB_LOG_FLAG_TIMING + \"zjTbFileReportHandlerTiming执行异常===>异常为: \" + e.getMessage()); } finally { stopWatch.stop(); log.info(\"{}{}ms\", stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis()); } }/** * 日结文件处理 * * @param handler handler名称 * @param date 日期 */ private void fileReportJob(String handler,String date,Job job) throws Exception { //拉取日结文件 JobParameters jobParameters = new JobParametersBuilder(jobExplorer)// .getNextJobParameters(job)//生产需要注掉 因为自动生成的不可靠// .addString(\"fileName\", name) .addString(\"date\", date)// 日期能和业务时间关联起来 方便查询 .toJobParameters(); //启动job JobExecution jobExecution = launcher.run(job, jobParameters); log.info(\"执行状态\"+jobExecution.getExitStatus()); log.info(handler+\"执行完成===>日期:{}, 执行状态{}, 参数日期{}\", LocalDateTime.now().format(DATE_FORMATTER_YYYY_MM_DD), jobExecution.getExitStatus(), date); } }
-
参数传递:将日期参数date传入 Job
-
生产环境中,通常需要手动构建JobParameters,确保参数可控且与业务关联,例如:
.addString(“fileName”, name)
.addString(“date”, date)
.toJobParameters(); -
自动生成实例id代码
// 2. 调用getNextJobParameters,传入目标作业,自动生成唯一参数 JobParameters jobParameters = parametersBuilder.getNextJobParameters(myBatchJob); // 3. 启动作业(使用自动生成的参数) jobLauncher.run(myBatchJob, jobParameters);
3.2 任务启动:
- 先执行PtbTBJobListener的beforeJob()方法
- 执行step01步骤:
- 读取数据:multiResourceItemReader读取文件
- 处理数据:这里注释掉了 processor,可能在 writer 中直接处理
- 写入数据:zjtbWriter批量写入,每批 5000 条
- 执行PtbTBJobListener的afterJob()方法
3.3 执行step01步骤 读取数据
@Slf4j@EnableBatchProcessingpublic class PtbTBBatchConfig { @Bean @StepScope public ExtendedMultiResourceItemReader multiResourceItemReader(@Value(\"#{jobParameters[\'date\']}\") String date) { log.info(\"date->\" + date); ExtendedMultiResourceItemReader resourceItemReader = new ExtendedMultiResourceItemReader(); //测试ExtendedMultiResourceItemReader resourceItemReader = new ExtendedMultiResourceItemReader(); return resourceItemReader;/** * 写操作 里面也进行了处理**/@Bean @StepScope public ItemWriter zjtbWriter() { return new ItemWriter() { @Override public void write(List items) throws Exception { log.info(\"重复数量:\" + firstErrorCount); //重复数量+入库数量总数 Long size = firstErrorCount + Long.parseLong(String.valueOf(i)); log.info(\"成功入库数量:\" + i); log.info(\"总数量:\" + size); } }}}}
4 思考Spring Batch 与 XXL-Job 结合使用,而不是直接在 XXL-Job 的逻辑层编写所有代码
4.1 区别
XXL-Job:专注于任务调度和触发
Spring Batch:专注于批处理逻辑的实现
- 直接编写的问题:
如果直接在 XXL-Job 中编写批处理逻辑,会导致调度框架和业务逻辑耦合代码会变得难以维护和扩展 - 实际应用场景:
当批处理逻辑变得复杂时(如需要重试、跳过错误、分区处理等)
当需要监控和追踪批处理任务的执行状态时
强大的批处理功能支持
4.1.1 Spring Batch 提供的关键功能:
- 重试机制:自动处理临时错误
@Beanpublic Step step() { return stepBuilderFactory.get(\"step\") .chunk(1000) .faultTolerant() .retryLimit(3) .retry(Exception.class) .reader(reader()) .writer(writer()) .build();}``
- 跳过策略:跳过特定错误继续处理
.faultTolerant().skipLimit(10).skip(Exception.class)
- 分区处理:并行处理大量数据
@Beanpublic Step partitionStep() { return stepBuilderFactory.get(\"partitionStep\") .partitioner(\"workerStep\", partitioner()) .step(workerStep()) .gridSize(10) .build();}
- 流程控制:复杂的作业流程定义
@Beanpublic Job job() { return jobBuilderFactory.get(\"job\") .start(step1()) .on(\"COMPLETED\").to(step2()) .from(step2()).on(\"*\").to(step3()) .end() .build();}
- 完善的监控和错误处理
Spring Batch 提供的监控功能:
JobRepository:记录作业执行历史
@Autowiredprivate JobExplorer jobExplorer;public void listJobExecutions() { List executions = jobExplorer.findJobExecutions(jobInstance); // 分析执行历史}
- 执行上下文:在步骤间共享数据
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
stepExecutionContext.put(“key”, value);
- 错误处理:灵活的错误处理策略
java
@Bean
public Step errorHandlingStep() {
return stepBuilderFactory.get(“errorHandlingStep”)
.chunk(1000)
.faultTolerant()
.skipPolicy(new CustomSkipPolicy())
.listener(new StepExecutionListener() {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// 自定义错误处理逻辑
return null;
}
})
.reader(reader())
.writer(writer())
.build();
}