Spring Batch入门篇

时间:2022-04-26
本文章向大家介绍Spring Batch入门篇,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Spring Batch,一个很多人还觉得陌生的框架,它是Spring Cloud Task的基础,主要用来实现批量任务的处理。该框架在国内的使用非常少,所以一直以来对于该框架在中文资料也一直都非常欠缺。

因此,在这里向大家推荐一位愿意将与我们分享Spring Batch技术细节的开源爱好者,也是我们spring4all.com社区的Spring Batch专题版主:杨小强童鞋!

下面我们就跟着他的系列文章一步步的了解Spring Batch的技术细节。

简介

SpringBatch 是一个大数据量的并行处理框架。通常用于数据的离线迁移,和数据处理,⽀持事务、并发、流程、监控、纵向和横向扩展,提供统⼀的接⼝管理和任务管理;SpringBatch是SpringSource和埃森哲为了统一业界并行处理标准为广大开发者提供方便开发的一套框架。

官方地址:github.com/spring-projects/spring-batch

  • SpringBatch 本身提供了重试,异常处理,跳过,重启、任务处理统计,资源管理等特性,这些特性开发者看重他的主要原因;
  • SpringBatch 是一个轻量级的批处理框架;
  • SpringBatch 结构分层,业务与处理策略、结构分离;
  • 任务的运行的实例状态,执行数据,参数都会落地到数据库;

快速入门

pom.xml 添加

<dependency>    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId></dependency>

创建BatchConfig(可以是其他类名)

@Configuration
@EnableBatchProcessingpublic class BatchConfig {    // tag::readerwriterprocessor[]
   @Bean        public FlatFileItemReader<Person> flatFileItemReader() {
     FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
     reader.setResource(new ClassPathResource("sample-data.csv"));
     FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
     reader.setLineMapper(new DefaultLineMapper<Person>() {{
         setLineTokenizer(new DelimitedLineTokenizer() {{
             setNames(new String[]{"firstName", "lastName"});
         }});
         setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
             setTargetType(Person.class);
         }});
     }});        return reader;
  }

  @Bean       public JdbcPagingItemReader<Person> jdbcPagingItemReader(DataSource dataSource) {
     JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(dataSource);
        reader.setFetchSize(100);

        reader.setQueryProvider(new MySqlPagingQueryProvider() {{
            setSelectClause("SELECT person_id,first_name,last_name");
            setFromClause("from people");
            setWhereClause("last_name=:lastName");
            setSortKeys(new HashMap<String, Order>() {{
                put("person_id", Order.ASCENDING);
            }});
        }});
        reader.setParameterValues(new HashMap<String, Object>() {{
            put("lastName", "DOE");
        }});
        reader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));        return reader;
    }

  @Bean       public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource) {
    JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
    writer.setDataSource(dataSource);        return writer;
  }    
    // end::readerwriterprocessor[]

    // tag::jobstep[]
  @Bean      public Job importUserJob(JobBuilderFactory jobBuilderFactory, JobCompletionNotificationListener listener, Step step) {               return jobBuilderFactory.get("importUserJob")
          .incrementer(new RunIdIncrementer())
          .listener(listener)
          .start(step)
          .build();
  }

   @Bean       public Step step1(StepBuilderFactory stepBuilderFactory,                 ‍PersonItemProcessor processor,                 ItemWriter jdbcBatchItemWriter,                 ItemReader flatFileItemReader) { ‍              return stepBuilderFactory.get("step1")
                  .<Person, Person>chunk(10)
                .reader(flatFileItemReader)
                .processor(processor)
                .writer(jdbcBatchItemWriter)
                .build();
    }    // end::jobstep[]}

Spring Batch的分层架构

  • Insfrastructure 策略管理:包括任务的失败重试,异常处理,事务,skip,以及数据的输入输出(文本文件,DB,Message)
  • Core: springBatch 的核心,包括JobLauch,job,step等等
  • Application: 业务处理,创建任务,决定任务的执行方式(定时任务,手动触发等)

Spring Batch执行流程

敬请持续关注该系列文章