SpringBatch的详细学习笔记
文章目录
-
- Gittee地址
- SpringBatch的学习笔记
- 第一章 SpringBatch入门
-
- 第一节 SpringBatch概述
- 第二节 搭建SpringBatch项目
- 第三节 SpringBatch入门程序
- 第四节 替换为MySQL数据库
- 第五节 核心API
- 第二章 作业流
-
- 第一节 Job的创建和使用
- 第二节 Flow的创建和使用
- 第三节 split实现并发执行
- 第四节 决策器的使用
- 第五节 Job的嵌套
- 第六节 监听器的使用
- 第七节 Job参数
- 第三章 数据输入
-
- 第一节 ItemReader概述
- 第二节 从数据库中读取数据
- 第三节 从普通文件中读取数据
- 第四节 从XML文件中读取数据
- 第五节 从多个文件中读取数据
- 第六节 ItemReader异常处理及重启
- 第四章 数据输出
-
- 第一节 ItemWriter概述
- 第二节 数据输出到数据库
- 第三节 数据输出到普通文件
- 第四节 数据输出到xml文件
- 第五节 数据输出到多个文件
- 第六节 ItemProcessor的使用
Gittee地址
SpringBatch的学习笔记
第一章 SpringBatch入门
第一节 SpringBatch概述
Spring Batch是个轻量级的、 完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用Spring框架的开发者或者企业更容易访问和利用企业服务。
Spring Batch提供了大量可重用的组件,包括了日志追踪、事务、任务作业统计、任务重启、跳过、重复资源管理。对于大数据量和高性能的批处理任
务,Spring Batch同样提供了高级功能和特性来支持比如分区功能、远程功能。总之,通过Spring Batch能够支持简单的、复杂的和大数据量的批处理作业。
Spring Batch是一个批处理应用框架,不是调度框架,但需要和调度框架(比如美团的:xxl-job)合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如Quartz. Tivoli、 Control-M、 Cron等)可以使用。
框架主要有以下功能:
Transaction management (事务管理)
Chunk based processing (基于块的处理)
Declarative 1/0 (声明式的输入输出)
Start/Stop/Restart (启动/停止/再启动)
Retry/Skip (重试/跳过)
框架一共有4个主要角色:
JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
Job代表着一个具体的任务。
Step代表着一个具体的步骤,一个Job可以包含多个Step (想象把大象放进冰箱这个任务需要多少个步骤你就明白了) .
JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
第二节 搭建SpringBatch项目
网站,idea。。
第三节 SpringBatch入门程序
@Configuration@EnableBatchProcessingpublic class JobConfiguration { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; @Bean /** * @Description: 创建任务对象 * @Param: [] * @Return: org.springframework.batch.core.Job */ public Job helloWorldJob() { //job名字//开始step return jobBuilderFactory.get("helloWorldJob") .start(step1()) .build(); } @Bean public Step step1() { /** * @Description: step1 step的名字,tasklet执行任务 可以用chunk * @Param: [] * @Return: org.springframework.batch.core.Step */ return stepBuilderFactory.get("step1") .tasklet(new Tasklet() { @Override /*** @Description: RepeatStatus 状态值,一步步来,一个step结束开始下一个* @Param: [stepContribution, chunkContext]* @Return: org.springframework.batch.repeat.RepeatStatus*/ public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("He11o World!"); return RepeatStatus.FINISHED; } }).build(); }}
第四节 替换为MySQL数据库
<dependency><groupId>mysq1</groupId><artifactId>mysql-connector-java</artifactId> </dependency> <dependency><groupId>ore.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency>
application.properties:
spring.datasource.url=jdbc:mysql://localhost:3306/batchspring.datasource.driver-class-name=com.mysql.jdbc.Driverspring.datasource.username=rootspring.datasource.password=123456spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sqlspring.batch.initialize-schema=alwaysspring.datasource.type=com.alibaba.druid.pool.DruidDataSource
第五节 核心API
, sheode tate = 207085
JooPrsates
JolEauutin
**JobInstance:**该领域概念和ob的关系’jJava中实何和类的关系样,Job定义了个工.作流程。JobInstance就是该 工作流程的一个具体实例。一个Job可以有多 个JobInstance
JobParameters:是组可以贯穿整Job的运行时配置参数。不同的配置将产生不同的JobInstance. 如果你是使用JobParameters运行间Job. 那么这次运行会重用 上
JobExecution: 该领域概念 表示JobInstance的运行。 JobInstance运行时可能会成功或者失败。每次jobInstance的运行都会产生 1个JobExecution. 运行时间,开始结束,状态,成功与否。。
**StepExecution:**类似于 JobExecution.该领域对象表示Step的运行。Step足Job的部分。 因此个StepExecution公关联到 个Jobexecution. 另外。浅对象还会存储很
**ExecutionContext:**上下文,从前面的JobExecution. StepExecution的属性介绍中已经提到I该领域概念。说穿了。该领域概念就是个容器。该容器Batch框架控制。框架对该容器持久化。
第二章 作业流
第一节 Job的创建和使用
Job:作业。批处理中的核心概念,是Batch操作的基础单元。
每个作业Job有1个或者多个作业步Step:
@Configuration@EnableBatchProcessingpublic class JobDemo { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * @Description: 让step1,step2,step3依次执行 * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job jobDemoJob(){ return jobBuilderFactory.get("jobDemoJob") .start(step1()) .on("COMPLETED").to(step1()) .from(step2()).on("COMPLETED").to(step3()) .from(step3()).end() .build(); // .from(step2()).on("COMPLETED").fail() // .from(step2()).on("COMPLETED").stopAndRe // .start(step1()) // .next(step2()) // .next(step3()) // .build(); } public Step step3() { return stepBuilderFactory.get("step3") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("step3"); return RepeatStatus.FINISHED; } }).build(); } public Step step2() { return stepBuilderFactory.get("step2") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("step2"); return RepeatStatus.FINISHED; } }).build(); } public Step step1() { return stepBuilderFactory.get("step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("step1"); return RepeatStatus.FINISHED; } }).build(); }}
第二节 Flow的创建和使用
1.Flow是多个Step的集合
2.可以被多个Job复用
3.使用FlowBuilder来创建
@Configuration@EnableBatchProcessingpublic class FlowDemo { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * @Description: 创建Flow对象,指明F1ow对象包含哪些step * @Param: [] * @Return: org.springframework.batch.core.job.flow.Flow */ @Bean public Flow flowDemoFlow(){ return new FlowBuilder<Flow>("flowDemoFlow") .start(flowDemoStep1()) .next(flowDemoStep2()) .build(); } /** * @Description: fowDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job fowDemoJob(){ return jobBuilderFactory.get("fowDemoJob0") .start(flowDemoFlow()) .next(flowDemoStep3()) .end() .build(); } ... ...
第三节 split实现并发执行
实现任务中的多个step或多个flow并发执行
1:创建若干个step
2:创建两个flow
3:创建-个任务包含以上两个flow,井让这两个flow并发执行
@Configuration@EnableBatchProcessingpublic class SplitDemo { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * @Description: 创建Flow对象,指明F1ow对象包含哪些step * @Param: [] * @Return: org.springframework.batch.core.job.flow.Flow */ @Bean public Flow splitDemoFlow1(){ return new FlowBuilder<Flow>("splitDemoFlow1") .start(splitDemoStep1()) .build(); } @Bean public Flow splitDemoFlow2(){ return new FlowBuilder<Flow>("splitDemoFlow2") .start(splitDemoStep2()) .next(splitDemoStep3()) .build(); } /** * @Description: fowDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job splitDemoJob(){ return jobBuilderFactory.get("splitDemoJob") .start(splitDemoFlow1()) .split(new SimpleAsyncTaskExecutor()) .add(splitDemoFlow2()) .end() .build(); }
第四节 决策器的使用
接口: JobExecutionDecider
public class MyDecider implements JobExecutionDecider { private int count; /** * @Description: decide 决策器,先执行odd奇数 * @Param: [jobExecution, stepExecution] * @Return: org.springframework.batch.core.job.flow.FlowExecutionStatus */ @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { count++; if(count%2==0){ return new FlowExecutionStatus("even偶数"); }else { return new FlowExecutionStatus("odd奇数"); } }}
@Configuration@EnableBatchProcessingpublic class DeciderDemo { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * @Description: myDecider 决策器 * @Param: [] * @Return: org.springframework.batch.core.job.flow.JobExecutionDecider */ @Bean public JobExecutionDecider myDecider(){ return new MyDecider(); } @Bean public Job deciderDemoJob(){ return jobBuilderFactory.get("deciderDemoJob") .start(deciderDemoStep1()) .next(myDecider()) .from(myDecider()).on("even偶数").to(deciderDemoStep2()) .from(myDecider()) .on("odd奇数").to(deciderDemoStep3()) .from(deciderDemoStep3()) .on("*").to(myDecider())//无论返回什么,回到决策器 ↑next(myDecider()) .end() .build(); } @Bean public Step deciderDemoStep1(){ return stepBuilderFactory.get("deciderDemoStep1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("deciderDemoStep1!"); return RepeatStatus.FINISHED; } }).build(); }
第五节 Job的嵌套
一个Job可以嵌套在另一个Job中,被嵌赛的Job称为子Job,外部Job称为父Jb。子job不能单独执行, 賽要由父Job来启动
案例:创建两个Job,作为子Job,再创建一个Job作为父Job
@Configurationpublic class NestedDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private JobLauncher launcher; @Autowired private Job childJobOne; @Autowired private Job childJobTwo; @Bean public Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager){ return jobBuilderFactory.get("parentJobs") .start(childJob1(jobRepository,transactionManager)) .next(childJob2(jobRepository,transactionManager)) .build(); } /** * @Description: childJob2返回Job类型的Step,特殊Step * @Param: [] * @Return: org.springframework.batch.core.Step */ private Step childJob2(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new JobStepBuilder(new StepBuilder("childJob2")) .job(childJobTwo) .launcher(launcher) .repository(jobRepository) .transactionManager(transactionManager) .build(); } private Step childJob1(JobRepository jobRepository,PlatformTransactionManager transactionManager) { return new JobStepBuilder(new StepBuilder("childJob1")) .job(childJobOne) .launcher(launcher) .repository(jobRepository) .transactionManager(transactionManager) .build(); }}
@Configuration@EnableBatchProcessingpublic class ChildJob2 { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step childJob2Step1(){ return stepBuilderFactory.get("childJob2Step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("childJob2Step1!"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Step childJob2Step2(){ return stepBuilderFactory.get("childJob2Step2") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("childJob2Step2!"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Job childJobTwo(){ return jobBuilderFactory.get("childJobTwo!!") .start(childJob2Step1()) .next(childJob2Step2()) .build(); }}
spring.batch.job.names=parentJobs //选择执行的Job,名字是自己设置的Job名称
第六节 监听器的使用
用来监听批处理作业的执行情况
创建监听可以通过实现接口或使用注解
//不同的监听,以及触发时机JobExecutionListener(before,after)StepExecutionListener(before,after)ChunkListener(before,after,error)ItemReadListener,ItemProcessListener,ItemWriteListener(before,after,error)public class MyJobListener implements JobExecutionListener{
示例代码:
public class MyChunkListener { @BeforeChunk public void beforeChunk(ChunkContext chunkContext){ System.out.println(chunkContext.getStepContext().getStepName()+"before..."); } @AfterChunk public void afterChunk(ChunkContext chunkContext){ System.out.println(chunkContext.getStepContext().getStepName()+"after..."); }}
public class MyJobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { System.out.println(jobExecution.getJobInstance().getJobName()+"before..."); } @Override public void afterJob(JobExecution jobExecution) { System.out.println(jobExecution.getJobInstance().getJobName()+"after..."); }}
@Configuration@EnableBatchProcessingpublic class ListenerDemo { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job listenerJob() { return jobBuilderFactory.get("listenerJob") .start(step1()) .listener(new MyJobListener()) .build(); } /** * @Description: step1 Chunk使用方式 * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step step1() { return stepBuilderFactory.get("listenerStep1") //数据的读取,规定读取和输出的数据类型每读完2个数据进行一次输出处理 .<String,String>chunk(2) //容错 .faultTolerant() .listener(new MyChunkListener()) //数据的读取 .reader(read()) //数据的写入/输出 .writer(write()) .build(); } /** * @Description: write * @Param: [] * @Return: org.springframework.batch.item.ItemWriter */ @Bean public ItemWriter<String> write() { return new ItemWriter<String>() { @Override public void write(List<? extends String> list) throws Exception { for(String item:list){ System.out.println(item); } } }; } /** * @Description: read * @Param: [] * @Return: org.springframework.batch.item.ItemReader */ @Bean public ItemReader<String> read() { return new ListItemReader<>(Arrays.asList("java","spring","mybatis")); }}
第七节 Job参数
在Job运行时可以以key=value形式传递参数
/** * @ClassName ParametersDemo 实现Step监听器 * @Description ParametersDemo * @Author ZX * @Date 2020/5/30 */@Configuration@EnableBatchProcessingpublic class ParametersDemo implements StepExecutionListener { /** * @Description: 注入创建任务对象的对象 * @Param: * @Return: */ @Autowired private JobBuilderFactory jobBuilderFactory; /** * @Description: //任务的执行由Step决定,注入创建Step对象的对象 * @Param: * @Return: */ @Autowired private StepBuilderFactory stepBuilderFactory; private Map<String,JobParameter> parameters; @Bean public Job parameterJob() { return jobBuilderFactory.get("parameterJob") .start(parameterStep()) .listener(new MyJobListener()) .build(); } /** * @Description: parameterStep * Job执行的是Step,Job使用的数据肯定是在step中使用,只需给Step传递数据。 * 使用监听,使用Step级别的监听来传递数据 * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step parameterStep() { return stepBuilderFactory.get("parameterStep") .listener(this) .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { //输出接收到的参数的值 System.out.println(parameters.get("info")); return RepeatStatus.FINISHED; } }).build(); } @Override public void beforeStep(StepExecution stepExecution) { parameters = stepExecution.getJobParameters().getParameters(); } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null; }}
输入参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uhILC0Ic-1648712938927)(F:\TyporaPhotos\image-20200603134912191.png)]
第三章 数据输入
第一节 ItemReader概述
@Configuration@EnableBatchProcessingpublic class ItemReaderDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job itemReaderDemoJob() { return jobBuilderFactory.get("itemReaderDemoJob") .start(itemReaderDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step itemReaderDemoStep() { return stepBuilderFactory.get("itemReaderDemoStep") .chunk(2) .reader(itemReaderDemoRead()) .writer(list->{ for (Object item:list){ System.out.println(item+"..."); } }).build(); } /** * @Description: itemReaderDemoRead 自定义itemReader * @Param: [] * @Return: MyReader */ @Bean public MyReader itemReaderDemoRead() { List<String> data = Arrays.asList("鼠","牛","虎","兔"); return new MyReader(data); }}
/** * @ClassName MyReader * 自定义ItemReader * @Description MyReader * @Author ZX * @Date 2020/5/30 */public class MyReader implements ItemReader<String> { private Iterator<String> iterator; //构造函数 public MyReader(List<String> list){ this.iterator=list.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { //默认一个一个读数据 if(iterator.hasNext()){ return this.iterator.next(); }else { return null; } }}
第二节 从数据库中读取数据
用JdbcPagingltemReader类实现
@Configuration@EnableBatchProcessingpublic class ItemReaderDbDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired @Qualifier("dbJdbcWriter") private ItemWriter<? super User> dbJdbcWriter; @Bean public Job ItemReaderDbDemoJob(){ return jobBuilderFactory.get("itemReaderDbDemoJob") .start(itemReaderDbStep()) .listener(new MyJobListener()) .build(); } @Bean public Step itemReaderDbStep() { return stepBuilderFactory.get("itemReaderDemoStep") .<User, User>chunk(2) .reader(dbJdbcReader()) .writer(dbJdbcWriter) .build(); } @Bean @StepScope public JdbcPagingItemReader<User> dbJdbcReader() { JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<User>(); reader.setDataSource(dataSource); reader.setFetchSize(2); //把读取到的记录转换成User对象 reader.setRowMapper(new RowMapper<User>() { /** * @Description: 结果集的映射 * @Param: [resultSet, i] * @Return: com.example.springbootjdbc.pojo.Users */ @Override public User mapRow(ResultSet resultSet, int rowNum) throws SQLException { User users = new User(); users.setId(resultSet.getInt("id")); users.setAge(resultSet.getInt("age")); users.setPhone(resultSet.getInt("phone")); users.setUsername(resultSet.getString("username")); users.setEmail(resultSet.getString("email")); return users; } }); //指定sq1语句 MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider () ; provider.setSelectClause ("id,username,age,phone,email") ; provider.setFromClause ("from users") ; //指定根据哪个字段进行排序 Map<String, Order> sort = new HashMap<>(1) ; sort.put("id", Order.ASCENDING); provider.setSortKeys(sort); reader.setQueryProvider(provider); return reader ; }}
/** * @ClassName DbJdbcWriter 自建ItemWriter * @Description DbJdbcWriter * @Author ZX * @Date 2020/5/30 */@Component("dbJdbcWriter")public class DbJdbcWriter implements ItemWriter<User>{ @Override public void write(List<? extends User> list) throws Exception { for (User user:list){ System.out.println(user); } }}
第三节 从普通文件中读取数据
用FlatFileItemReader类
customer.txt内容如下:
id,firstName,lastName,birthday1,Stone,Barrett,1964-10-19 14:11:032,Raymond,Pace,1977-12-11 21 :44:303,Armando,Logan,1986-12-25 11:54:284,Latifah,Barnett,1959-07-24 06:00:165,Cassandra,Moses,1956-09-14 06:49:286,Audra,Hopkins,1984-08-30 04:18:107,Upton,Morrow,1973-82-04 05:26:058,Melodie,Velasquez,1953-04-26 11:16:269,Sybill,Nolan,1951-06-24 14:56:5110,Glenna,Little,1953-08-27 13:15:0811,Ingrid,Jackson,1957-09-05 21:36:4712,Duncan,Castaneda,1979-01 21 18:31:2713,Xaviera,Gillespie,1965-07-18 15:05:2214,Rhoda,Lancaster,1990-09-11 15:52:5415,Fatima,Combs,1979-06-01 06: 58: 54
示例代码
@Configuration@EnableBatchProcessingpublic class FileItemReaderDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("fileFileItemWriter") private ItemWriter<? super Customer> fileFileItemWriter; @Bean public Job FileItemReaderDemo() { return jobBuilderFactory.get("FileItemReaderDemo") .start(FileItemReaderDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step FileItemReaderDemoStep() { return stepBuilderFactory.get("FileItemReaderDemoStep") .<Customer,Customer>chunk(5) .reader(fileItemReaderDemoReader()) .writer(fileFileItemWriter) .build(); } /** * @Description: fileItemReaderDemoReader 文件读取 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemReader */ @Bean public FlatFileItemReader<Customer> fileItemReaderDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>(); reader.setResource(new ClassPathResource("customer.txt")); //跳过第一行 reader.setLinesToSkip(1); //数据解析 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"}); //把解析出的一个数据映射为Customer对象 DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>(); mapper.setLineTokenizer(tokenizer); mapper.setFieldSetMapper(new FieldSetMapper<Customer>() { /** * @Description: mapFieldSet 映射 * @Param: [fieldSet] * @Return: com.example.springbatchdemo.pojo.Customer */ @Override public Customer mapFieldSet(FieldSet fieldSet) throws BindException { Customer customer = new Customer(); customer.setId(fieldSet.readLong("id")); customer.setFirstName(fieldSet.readString("fistName")); customer.setLastName(fieldSet.readString("lastName")); customer.setBirthday(fieldSet.readString("birthday")); return customer; } }); mapper.afterPropertiesSet(); reader.setLineMapper(mapper); return reader; }}
自建ItemWriter:
@Component("fileFileItemWriter")public class FileFileItemWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> list) throws Exception { for (Customer customer:list){ System.out.println(customer); } }}
第四节 从XML文件中读取数据
使用StaxEventtemReader类
xml文件:
<customers> <customer> <id>1</id> <firstName>Mufutau</firstName> <lastName>Maddox</lastName> <birthday>2017-06-05 19:43:51PM</birthday> </customer> <customer> <id>2</id> <firstName>Brenden</firstName> <lastName>Cobb</lastName> <birthday>2017-01-06 13:18:17PM</birthday> </customer> <customer> <id>3</id> <firstName>Kerry</firstName> <lastName>Joseph</lastName> <birthday>2016-09-15 18:32:33PM</birthday> </customer> <customer> <id>4</id> <firstName>asdasd</firstName> <lastName>Joseph</lastName> <birthday>2016-09-15 18:32:33PM</birthday> </customer> <customer> <id>5</id> <firstName>JOJO5</firstName> <lastName>Jobana</lastName> <birthday>2016-09-15 18:32:33PM</birthday> </customer> <customer> <id>6</id> <firstName>XuLun</firstName> <lastName>JoTaiLang</lastName> <birthday>2046-09-15 18:32:33PM</birthday> </customer> <customer> <id>7</id> <firstName>JaiLuo</firstName> <lastName>JieBeiLing</lastName> <birthday>2077-09-15 18:32:33PM</birthday> </customer></customers>
配置pom文件
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId></dependency><dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.7</version></dependency>
示例代码:
@Configuration@EnableBatchProcessingpublic class XmlItemReaderDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("xmlFileWriter") private ItemWriter<? super Customer> xmlFileWriter; @Bean public Job xmlItemReaderDemoJob() { return jobBuilderFactory.get("xmlItemReaderDemoJob") .start(xmlItemReaderDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step xmlItemReaderDemoStep() { return stepBuilderFactory.get("xmlItemReaderDemoStep") .<Customer,Customer>chunk(5) .reader(xmlFileReader()) .writer(xmlFileWriter) .build(); } /** * @Description: xmlFileReader 文件读取 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemReader */ @Bean @StepScope public StaxEventItemReader<Customer> xmlFileReader() { StaxEventItemReader<Customer> reader = new StaxEventItemReader<Customer>(); //指定文件位置 reader.setResource(new ClassPathResource("customer.xml")); //跳过第一行 reader.setFragmentRootElementName("customer"); //把xml转成对象 XStreamMarshaller unmarshaller = new XStreamMarshaller(); //告诉unmarshaller把xml转成什么类型 Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); unmarshaller.setAliases(map); reader.setUnmarshaller(unmarshaller); return reader; }}
自建Itemwriter
@Component("xmlFileWriter")public class XmlItemWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> list) throws Exception { for (Customer customer:list){ System.out.println(customer); } }}
第五节 从多个文件中读取数据
使用MultiResourceItemReader类
file1文件如下,file2,file3省略
1,Stone, Barrett, 1964-10-19 14:11:032,Raymond, Pace,1977-12-11 21:44:303,Armando, Logan,1986-12-25 11:54:284,Latifah, Barnett,1959-07-24 06:00:165,Cassandra, Moses,1956-09-14 06:49:286,Audra, Hopkins,1984-08-30 04:18:107,Upton, Morrow,1973-02-04 05:26:058,Melodie, Velasquez,1953-04-26 11:16:269,sybill, Nolan,1951-06-24 14:56:5110,Glenna, Little, 1953-08-27 13:15:0811,Ingrid, Jackson,1957-09-05 21:36:47.12,Duncan, Castaneda,1979-01-21 18:31:2713,Xaviera, Gillespie,1965-07-18 15:05:2214,Rhoda, Lancaster,1990-09-11 15:52:5415,Fatima, Combs,1979-06-01 06:58:5416,Merri1l, Hopkins ,1990-07-02 17:36:3517,Felicia, Vinson,1959-12-19 20:23:1218,Hanae , Harvey, 1984-12-27 10:36:4919,Ramona, Acosta,1962-06-23 20:03:4020,Katelyn, Hammond ,1988-11-12 19:05:13
示例代码:
@Configuration@EnableBatchProcessingpublic class MultiFileItemReaderDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("multiFileWriter") private ItemWriter<? super Customer> multiFileWriter; @Value("classpath:/file*.txt") private Resource[] fileResources; @Bean public Job multiFileItemReaderDemoJob() { return jobBuilderFactory.get("multiFileItemReaderDemoJob") .start(multiFileItemReaderDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step multiFileItemReaderDemoStep() { return stepBuilderFactory.get("multiFileItemReaderDemoStep") .<Customer,Customer>chunk(5) .reader(multiFileReader()) .writer(multiFileWriter) .build(); } /** * @Description: multiFileReader 虽说是多文件读取,但其实是逐个读取单个文件 * @Param: [] * @Return: org.springframework.batch.item.file.MultiResourceItemReader */ @Bean @StepScope public MultiResourceItemReader<Customer> multiFileReader() { MultiResourceItemReader <Customer> reader = new MultiResourceItemReader<>(); reader.setDelegate(fileItemReaderDemoReader()); reader.setResources(fileResources); return reader; } /** * @Description: fileItemReaderDemoReader 单个文件读取 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemReader */ @Bean public FlatFileItemReader<Customer> fileItemReaderDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>(); reader.setResource(new ClassPathResource("customer.txt")); //跳过第一行 //reader.setLinesToSkip(1); //数据解析 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"}); //把解析出的一个数据映射为Customer对象 DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>(); mapper.setLineTokenizer(tokenizer); mapper.setFieldSetMapper(new FieldSetMapper<Customer>() { /** * @Description: mapFieldSet 映射 * @Param: [fieldSet] * @Return: com.example.springbatchdemo.pojo.Customer */ @Override public Customer mapFieldSet(FieldSet fieldSet) throws BindException { Customer customer = new Customer(); customer.setId(fieldSet.readLong("id")); customer.setFirstName(fieldSet.readString("fistName")); customer.setLastName(fieldSet.readString("lastName")); customer.setBirthday(fieldSet.readString("birthday")); return customer; } }); mapper.afterPropertiesSet(); reader.setLineMapper(mapper); return reader; }}
自定义ItemWriter
@Component("multiFileWriter")public class MultiFileWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> list) throws Exception { for (Customer customer:list){ System.out.println(customer); } }}
第六节 ItemReader异常处理及重启
第四章 数据输出
第一节 ItemWriter概述
ItemReader是一个数据一个数据的读, 而ItemWriter是一批一批的输出
@Configuration@EnableBatchProcessingpublic class ItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("myWriter") private ItemWriter<? super String> myWriter; @Bean public Job ItemWriterDemoJob() { return jobBuilderFactory.get("ItemWriterDemoJob") .start(ItemWriterDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step ItemWriterDemoStep() { return stepBuilderFactory.get("ItemWriterDemoStep") .<String, String>chunk(2) .reader(myReader()) .writer(myWriter).build(); } @Bean public ItemReader<String> myReader() { List<String> items = new ArrayList<>(); for (int i = 1; i <= 50; i++) { items.add("java" + i); } return new ListItemReader<String>(items); }}
/** * @ClassName MyWriter * @Description MyWriter * @Author ZX * @Date 2020/6/1 */@Component("myWriter")public class MyWriter implements ItemWriter<String> { @Override public void write(List<? extends String> list) throws Exception { //输出一批的数量,chunk的值 System.out.println(list.size()); for (String str:list){ System.out.println(str); } }}
第二节 数据输出到数据库
实现的各种类:
Neo4jltemWriter
MongoltemWriter
RepositoryltemWriter
HibernateltemWriter
JdbcBatchltemWriter
JpaltemWriter
GemfireltemWriter
Mysql这里使用JdbcBatchltemWriter
示例代码
@Configuration@EnableBatchProcessingpublic class ItemWriterDbDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("itemWriterDb") private ItemWriter<? super Customer> itemWriterDb; @Autowired @Qualifier("fileItemReaderDemoReader") private ItemReader<? extends Customer> flatFileReader; @Bean public Job ItemWriterDbDemoJob() { return jobBuilderFactory.get("ItemWriterDbDemoJob") .start(ItemWriterDbDemoStep()) .listener(new MyJobListener()) .build(); } @Bean public Step ItemWriterDbDemoStep() { return stepBuilderFactory.get("ItemWriterDbDemoStep") .<Customer,Customer>chunk(5) .reader(flatFileReader) .writer(itemWriterDb) .build(); }}
从文件中读取:
@Configurationpublic class FlatFileReaderConfig { /** * @Description: FlatItemReaderDemoReader 文件读取 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemReader */ @Bean public FlatFileItemReader<Customer> fileItemReaderDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>(); reader.setResource(new ClassPathResource("customer.txt")); //跳过第一行 reader.setLinesToSkip(1); //数据解析 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","fistName","lastName","birthday"}); //把解析出的一个数据映射为Customer对象 DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>(); mapper.setLineTokenizer(tokenizer); mapper.setFieldSetMapper(new FieldSetMapper<Customer>() { /** * @Description: mapFieldSet 映射 * @Param: [fieldSet] * @Return: com.example.springbatchdemo.pojo.Customer */ @Override public Customer mapFieldSet(FieldSet fieldSet) throws BindException { Customer customer = new Customer(); customer.setId(fieldSet.readLong("id")); customer.setFirstName(fieldSet.readString("fistName")); customer.setLastName(fieldSet.readString("lastName")); customer.setBirthday(fieldSet.readString("birthday")); return customer; } }); mapper.afterPropertiesSet(); reader.setLineMapper(mapper); return reader; }}
写入数据库:
@Configurationpublic class ItemWriterDbConfig { @Autowired private DataSource dataSource; @Bean public JdbcBatchItemWriter<Customer> itemWriterDb(){ JdbcBatchItemWriter writer = new JdbcBatchItemWriter<Customer>(); writer.setDataSource(dataSource); writer.setSql("insert into customer(id,firstName,lastName,birthday) values" + "(:id,:firstName,:lastName,:birthday)"); //将Customer中对应属性的值与Sql语句中的四个值进行替换 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Customer>()); return writer; }}
第三节 数据输出到普通文件
FlatFileltemWriter类
示例代码:
@Configuration@EnableBatchProcessingpublic class FileItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcReader") private ItemReader<? extends Customer> dbJdbcReader; @Autowired @Qualifier("fileItemWriter") private ItemWriter<? super Customer> fileItemWriter; /** * @Description: fileItemWriterDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job fileItemWriterDemoJob() { return jobBuilderFactory.get("fileItemWriterDemoJob") .start(fileItemWriterDemoStep()) .listener(new MyJobListener()) .build(); } /** * @Description: fileItemWriterDemoStep * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step fileItemWriterDemoStep() { return stepBuilderFactory.get("fileItemWriterDemoStep") .<Customer,Customer>chunk(5) .reader(dbJdbcReader) .writer(fileItemWriter) .build(); }}
写入文件:
@Configurationpublic class FIleItemWriter { @Bean /** * @Description: fileItemWriter 向文件输出数据,覆盖原先数据 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemWriter */ public FlatFileItemWriter<Customer> fileItemWriter(){ //把Customer对象转成字符串输出到文件 FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<Customer>(); String path="F:\\test/customer.txt"; writer.setResource(new FileSystemResource(path)); //把Customer对象转成字符串 writer.setLineAggregator(new LineAggregator<Customer>() { ObjectMapper mapper = new ObjectMapper(); @Override public String aggregate(Customer customer) { String str = null; try { str=mapper.writeValueAsString(customer); } catch (JsonProcessingException e) { e.printStackTrace(); } return str; } }); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
从数据库中读取:
@Configurationpublic class DbJdbcReaderConfig { @Autowired private DataSource dataSource; /** * @Description: dbJdbcReader 从数据库中读取数据 * @Param: [] * @Return: org.springframework.batch.item.database.JdbcPagingItemReader */ @Bean public JdbcPagingItemReader<Customer> dbJdbcReader() { JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<Customer>(); reader.setDataSource(dataSource); //设置读取缓存,每次取5个 reader.setFetchSize(5); //把读取到的记录转换成User对象 reader.setRowMapper(new RowMapper<Customer>() { /** * @Description: 结果集的映射 * @Param: [resultSet, rowNum] * @Return: com.example.springbootjdbc.pojo.Customer */ @Override public Customer mapRow(ResultSet resultSet, int rowNum) throws SQLException { Customer customer = new Customer(); customer.setId(resultSet.getLong(1)); customer.setFirstName(resultSet.getString(2)); customer.setLastName(resultSet.getString(3)); customer.setBirthday(resultSet.getString(4)); return customer; } }); //指定sq1语句 MySqlPagingQueryProvider provider =new MySqlPagingQueryProvider () ; provider.setSelectClause ("id,firstName,lastName,birthday") ; provider.setFromClause ("from customer") ; //指定根据哪个字段进行排序 Map<String, Order> sort = new HashMap<>(1) ; sort.put("id", Order.ASCENDING); provider.setSortKeys(sort); reader.setQueryProvider(provider); return reader ; }}
第四节 数据输出到xml文件
StaxEvenltemWriter
代码:
@Configuration@EnableBatchProcessingpublic class XmlItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcReader") private ItemReader<? extends Customer> dbJdbcReader; @Autowired @Qualifier("xmlItemWriter") private ItemWriter<? super Customer> xmlItemWriter; /** * @Description: xmlItemWriterDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job xmlItemWriterDemoJob() { return jobBuilderFactory.get("xmlItemWriterDemoJob") .start(xmlItemWriterDemoStep()) .listener(new MyJobListener()) .build(); } /** * @Description: xmlItemWriterDemoStep * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step xmlItemWriterDemoStep() { return stepBuilderFactory.get("xmlItemWriterDemoStep") .<Customer,Customer>chunk(5) .reader(dbJdbcReader) .writer(xmlItemWriter) .build(); } }
写入xml文件:
@Configurationpublic class XmlItemWriterConfig { @Bean public StaxEventItemWriter<Customer> xmlItemWriter(){ StaxEventItemWriter writer = new StaxEventItemWriter<Customer>(); XStreamMarshaller marshaller = new XStreamMarshaller(); //告诉marshaller把数据转成什么类型 Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); marshaller.setAliases(map); writer.setRootTagName("customers"); writer.setMarshaller(marshaller); String path = "F:\\test/cus.xml"; writer.setResource(new FileSystemResource(path)); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
第五节 数据输出到多个文件
CompositeltemWriter
CassifireCompositeltemWriter 根据分类写入不同文件
代码:
@Configuration@EnableBatchProcessingpublic class MultiFileItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcReader") private ItemReader<? extends Customer> dbJdbcReader; @Autowired @Qualifier("multiFileItemWriter") private ItemWriter<? super Customer> multiFileItemWriter; /** * @Description: multiFileItemWriterDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job multiFileItemWriterDemoJob() { return jobBuilderFactory.get("multiFileItemWriterDemoJob") .start(multiFileItemWriterDemoStep()) .listener(new MyJobListener()) .build(); } /** * @Description: multiFileItemWriterDemoStep * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step multiFileItemWriterDemoStep() { return stepBuilderFactory.get("multiFileItemWriterDemoStep") .<Customer,Customer>chunk(5) .reader(dbJdbcReader) .writer(multiFileItemWriter) .build(); }}
只是给多个文件传递数据,不分类:
@Configurationpublic class MultiFIleWriterConfig { /** * @Description: fileWriter 输出数据到txt文件 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemWriter */ @Bean public FlatFileItemWriter<Customer> fileWriter(){ //把Customer对象转成字符串输出到文件 FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<Customer>(); String path="F:\\test/customer.txt"; writer.setResource(new FileSystemResource(path)); //把Customer对象转成字符串 writer.setLineAggregator(new LineAggregator<Customer>() { ObjectMapper mapper = new ObjectMapper(); @Override public String aggregate(Customer customer) { String str = null; try { str=mapper.writeValueAsString(customer); } catch (JsonProcessingException e) { e.printStackTrace(); } return str; } }); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } /** * @Description: xmlWriter 输出数据到xml文件 * @Param: [] * @Return: org.springframework.batch.item.xml.StaxEventItemWriter */ @Bean public StaxEventItemWriter<Customer> xmlWriter(){ StaxEventItemWriter writer = new StaxEventItemWriter<Customer>(); XStreamMarshaller marshaller = new XStreamMarshaller(); //告诉marshaller把数据转成什么类型 Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); marshaller.setAliases(map); writer.setRootTagName("customers"); writer.setMarshaller(marshaller); String path = "F:\\test/cus.xml"; writer.setResource(new FileSystemResource(path)); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } /** * @Description: multiFileItemWriter 调用输出到单个文件操作来实现输出数据到多个文件 * @Param: [] * @Return: org.springframework.batch.item.support.CompositeItemWriter */ @Bean public CompositeItemWriter<Customer> multiFileItemWriter(){ CompositeItemWriter<Customer> writer = new CompositeItemWriter<Customer>(); //输出到两个不同的文件中 writer.setDelegates(Arrays.asList(fileWriter(),xmlWriter())); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
示例代码:CassifireCompositeltemWriter 根据分类写入不同文件
@Configuration@EnableBatchProcessingpublic class MultiFileItemWriterDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcReader") private ItemReader<? extends Customer> dbJdbcReader; @Autowired @Qualifier("multiFileItemWriter") private ItemWriter<? super Customer> multiFileItemWriter; //分类传数据给文件的ClassifierCompositeItemWriter没有实现ItemStream @Autowired @Qualifier("jsonFileWriter") private ItemStreamWriter<? extends Customer> jsonFileWriter; @Autowired @Qualifier("xmlWriter") private ItemStreamWriter<? extends Customer> xmlWriter; /** * @Description: multiFileItemWriterDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job multiFileItemWriterDemoJob2() { return jobBuilderFactory.get("multiFileItemWriterDemoJob2") .start(multiFileItemWriterDemoStep()) .listener(new MyJobListener()) .build(); } /** * @Description: multiFileItemWriterDemoStep * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step multiFileItemWriterDemoStep() { return stepBuilderFactory.get("multiFileItemWriterDemoStep") .<Customer,Customer>chunk(5) .reader(dbJdbcReader) .writer(multiFileItemWriter) //ClassifierCompositeItemWriter没有实现ItemStream .stream(jsonFileWriter) .stream(xmlWriter) .build(); }}
分类输出:
@Configurationpublic class MultiFIleWriterConfig { /** * @Description: fileWriter 输出数据到txt文件 * @Param: [] * @Return: org.springframework.batch.item.file.FlatFileItemWriter */ @Bean public FlatFileItemWriter<Customer> jsonFileWriter(){ //把Customer对象转成字符串输出到文件 FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<Customer>(); String path="F:\\test/customer.txt"; writer.setResource(new FileSystemResource(path)); //把Customer对象转成字符串 writer.setLineAggregator(new LineAggregator<Customer>() { ObjectMapper mapper = new ObjectMapper(); @Override public String aggregate(Customer customer) { String str = null; try { str=mapper.writeValueAsString(customer); } catch (JsonProcessingException e) { e.printStackTrace(); } return str; } }); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } /** * @Description: xmlWriter 输出数据到xml文件 * @Param: [] * @Return: org.springframework.batch.item.xml.StaxEventItemWriter */ @Bean public StaxEventItemWriter<Customer> xmlWriter(){ StaxEventItemWriter writer = new StaxEventItemWriter<Customer>(); XStreamMarshaller marshaller = new XStreamMarshaller(); //告诉marshaller把数据转成什么类型 Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); marshaller.setAliases(map); writer.setRootTagName("customers"); writer.setMarshaller(marshaller); String path = "F:\\test/cus.xml"; writer.setResource(new FileSystemResource(path)); try { writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; } /** * @Description: multiFileItemWriter 调用输出到单个文件操作来实现输出数据到多个文件 * @Param: [] * @Return: org.springframework.batch.item.support.CompositeItemWriter */// @Bean// public CompositeItemWriter multiFileItemWriter(){// CompositeItemWriter writer = new CompositeItemWriter();// //输出到两个不同的文件中// writer.setDelegates(Arrays.asList(jsonFileWriter(),xmlWriter()));// try {// writer.afterPropertiesSet();// } catch (Exception e) {// e.printStackTrace();// }// return writer;// } /** * @Description: multiFileItemWriter 按照某种条件对数据进行分类存储不同文件 * 注意:ClassifierCompositeItemWriter没有实现ItemStream * @Param: [] * @Return: org.springframework.batch.item.support.ClassifierCompositeItemWriter */ @Bean public ClassifierCompositeItemWriter<Customer> multiFileItemWriter(){ ClassifierCompositeItemWriter<Customer> writer = new ClassifierCompositeItemWriter<Customer>(); writer.setClassifier(new Classifier<Customer, ItemWriter<? super Customer>>() { @Override /** * @Description: classify 分类,比如按照年龄分成两个文件 * @Param: [customer] * @Return: org.springframework.batch.item.ItemWriter */ public ItemWriter<? super Customer> classify(Customer customer) { //按照Customer的id进行分类 ItemWriter<Customer> writer1 = customer.getId()%2==0?jsonFileWriter():xmlWriter(); return writer1; } }); return writer; }}
第六节 ItemProcessor的使用
ItemProcessor用于处理业务逻辑,验证,过滤等功能
CompositeltemProcessor类,处理多种ItemProcessor处理方式
案例:从数据库中读取数据,然对数据进行处理,最后输出到普通文件
代码:
@Configuration@EnableBatchProcessingpublic class ItemProcessorDemo { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcReader") private ItemReader<? extends Customer> dbJdbcReader; @Autowired @Qualifier("fileItemWriter") private ItemWriter<? super Customer> fileItemWriter; @Autowired private ItemProcessor<Customer,Customer> firstNameUpperProcessor; @Autowired private ItemProcessor<Customer, Customer> idFilterProcessor; /** * @Description: itemProcessorDemoJob * @Param: [] * @Return: org.springframework.batch.core.Job */ @Bean public Job itemProcessorDemoJob() { return jobBuilderFactory.get("itemProcessorDemoJob") .start(itemProcessorDemoStep()) .listener(new MyJobListener()) .build(); } /** * @Description: itemProcessorDemoStep * @Param: [] * @Return: org.springframework.batch.core.Step */ @Bean public Step itemProcessorDemoStep() { return stepBuilderFactory.get("itemProcessorDemoStep") .<Customer, Customer>chunk(5) .reader(dbJdbcReader) //.processor(firstNameUpperProcessor) 只一种处理方式 .processor(process()) .writer(fileItemWriter) .build(); } /** * @Description: process 同时用多种方式处理方式 * @Param: [] * @Return: org.springframework.batch.item.support.CompositeItemProcessor */ @Bean public CompositeItemProcessor<Customer,Customer> process(){ CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<Customer,Customer>(); List<ItemProcessor<Customer,Customer>> delegates = new ArrayList<>(); delegates.add(firstNameUpperProcessor); delegates.add(idFilterProcessor); processor.setDelegates(delegates); return processor; }}
ItemProcessor使用
@Componentpublic class FirstNameUpperProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(Customer customer) throws Exception { Customer customer1 = new Customer(); customer1.setId(customer.getId()); customer1.setLastName(customer.getLastName()); customer1.setFirstName(customer.getFirstName().toUpperCase()); customer1.setBirthday(customer.getBirthday()); return customer1; }}
@Componentpublic class IdFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(Customer customer) throws Exception { if(customer.getId()%2==0) return customer; else return null; }}