返回顶部
首页 > 资讯 > 后端开发 > Python >Spring Batch 如何自定义ItemReader
  • 418
分享到

Spring Batch 如何自定义ItemReader

2024-04-02 19:04:59 418人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录spring Batch 自定义ItemReader创建自定义ItemReader配置ItemReader Bean小结一下Spring Batch 之 ItemReaderJd

Spring Batch 自定义ItemReader

Spring Batch支持各种数据输入源,如文件、数据库等。然而有时也会遇到一些默认不支持的数据源,这时我们则需要实现自己的数据源————自定义ItemReader。本文通过示例说明如何自定义ItemReader。

创建自定义ItemReader

创建自定义ItemReader需要下面两个步骤:

  • 创建一个实现ItemReader接口的类,并提供返回对象类型 T 作为类型参数。
  • 按照下面规则实现ItemReader接口的T read()方法

read()方法如果存在下一个对象则返回,否则返回null。

下面我们自定义ItemReader,其返回在线测试课程的学生信息StuDto类型,为了减少复杂性,该数据存储在内存中。StuDto类是一个简单数据传输对象,代码如下:


@Data
public class StuDTO {
    private String emailAddress;
    private String name;
    private String purchasedPackage;
}

下面参照一下步骤创建ItemReader:

  • 创建InMemoryStudentReader 类
  • 实现ItemReader接口,并设置返回对象类型为StuDto
  • 类中增加List studentData 字段,其包括参加课程的学生信息
  • 类中增加nextStudentIndex 字段,表示下一个StuDto对象的索引
  • 增加私有initialize()方法,初始化学生信息并设置索引值为0
  • 创建构造函数并调用initialize方法
  • 实现read()方法,包括下面规则:如果存在下一个学生,则返回StuDto对象并把索引加一。否则返回null。

InMemoryStudentReader 代码如下:


public class InMemoryStudentReader implements ItemReader<StuDto> { 
    private int nextStudentIndex;
    private List<StuDto> studentData; 
    InMemoryStudentReader() {
        initialize();
    }
 
    private void initialize() {
        StuDto tony = new StuDto();
        tony.setEmailAddress("tony.tester@gmail.com");
        tony.setName("Tony Tester");
        tony.setPurchasedPackage("master");
 
        StuDto nick = new StuDto();
        nick.setEmailAddress("nick.newbie@gmail.com");
        nick.setName("Nick Newbie");
        nick.setPurchasedPackage("starter");
 
        StuDto ian = new StuDto();
        ian.setEmailAddress("ian.intermediate@gmail.com");
        ian.setName("Ian Intermediate");
        ian.setPurchasedPackage("intermediate");
 
        studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
        nextStudentIndex = 0;
    }
 
    @Override
    public StuDto read() throws Exception {
        StuDto nextStudent = null;
 
        if (nextStudentIndex < studentData.size()) {
            nextStudent = studentData.get(nextStudentIndex);
            nextStudentIndex++;
        } 
        return nextStudent;
    }
}

创建好自定义ItemReader后,需要配置其作为bean让Spring Batch Job使用。下面请看如何配置。

配置ItemReader Bean

配置类代码如下:


@Configuration
public class InMemoryStudentJobConfig { 
    @Bean
    ItemReader<StuDto> inMemoryStudentReader() {
        return new InMemoryStudentReader();
    }
}

需要增加@Configuration表明类为配置类, 增加方法返回ItemReader类型,并增加@Bean注解,实现方法内容————返回InMemoryStudentReader对象。

小结一下

本文通过示例说明如何自定义ItemReader,主要包括三个方面:

  • 自定义ItemReader需实现ItemReader接口
  • 实现ItemReader接口,需要指定返回类型作为类型参数(T)
  • 实现接口方法read,如果存在下一个对象则返回,反之返回null

Spring Batch 之 ItemReader

重点介绍 ItemReader,如何从不同数据源读取数据;以及异常处理及重启机制。

JdbcPagingItemReader

数据库中读取数据


@Configuration
public class DBJdbcDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("dbJdbcDemoWriter")
    private ItemWriter<? super Customer> dbJdbcDemoWriter;
 
    @Autowired
    private DataSource dataSource;
 
    @Bean
    public Job DBJdbcDemoJob(){
        return jobBuilderFactory.get("DBJdbcDemoJob")
                .start(dbJdbcDemoStep())
                .build();
     }
 
    @Bean
    public Step dbJdbcDemoStep() {
        return stepBuilderFactory.get("dbJdbcDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .writer(dbJdbcDemoWriter)
                .build();
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
 
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100); //批量读取
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();
 
        });
 
        MysqlPagingQueryProvider queryProvider = new mysqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sorTKEys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys); 
        reader.setQueryProvider(queryProvider); 
        return reader; 
    }
}

Job 和 ItermWriter不是本文介绍重点,此处举例,下面例子相同


@Component("dbJdbcDemoWriter")
public class DbJdbcDemoWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> items) throws Exception {
        for (Customer customer:items)
            System.out.println(customer); 
    }
}

FlatFileItemReader

从CVS文件中读取数据


 
@Configuration
public class FlatFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory; 
    @Autowired
    private StepBuilderFactory stepBuilderFactory; 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter; 
    @Bean
    public Job flatFileDemoJob(){
        return jobBuilderFactory.get("flatFileDemoJob")
                .start(flatFileDemoStep())
                .build(); 
    }
 
    @Bean
    public Step flatFileDemoStep() {
        return stepBuilderFactory.get("flatFileDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(flatFileDemoReader())
                .writer(flatFileDemoWriter)
                .build();
    }
 
    @Bean
    @StepScope
    public FlatFileItemReader<Customer> flatFileDemoReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
        reader.setLinesToSkip(1);
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper); 
        return reader; 
    }
}

StaxEventItemReader

从XML文件中读取数据


@Configuration
public class XmlFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory; 
    @Autowired
    private StepBuilderFactory stepBuilderFactory; 
    @Autowired
    @Qualifier("xmlFileDemoWriter")
    private ItemWriter<? super Customer> xmlFileDemoWriter; 
    @Bean
    public Job xmlFileDemoJob(){
        return jobBuilderFactory.get("xmlFileDemoJob")
                .start(xmlFileDemoStep())
                .build(); 
    } 
    @Bean
    public Step xmlFileDemoStep() {
        return stepBuilderFactory.get("xmlFileDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(xmlFileDemoReader())
                .writer(xmlFileDemoWriter)
                .build();
    } 
    @Bean
    @StepScope
    public StaxEventItemReader<Customer> xmlFileDemoReader() {
        StaxEventItemReader<Customer> reader = new StaxEventItemReader<>(); 
        reader.setResource(new ClassPathResource("customer.xml"));
        reader.setFragmentRootElementName("customer");  
        XStreamMarshaller unMarshaller = new XStreamMarshaller();
        Map<String,Class> map = new HashMap<>();
        map.put("customer",Customer.class);
        unMarshaller.setAliases(map);
        reader.setUnmarshaller(unMarshaller);  
        return reader; 
    }
}

MultiResourceItemReader

从多个文件读取数据


@Configuration
public class MultipleFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Value("classpath*:/file*.csv")
    private Resource[] inputFiles;
 
    @Bean
    public Job multipleFileDemoJob(){
        return jobBuilderFactory.get("multipleFileDemoJob")
                .start(multipleFileDemoStep())
                .build(); 
    }
 
    @Bean
    public Step multipleFileDemoStep() {
        return stepBuilderFactory.get("multipleFileDemoStep")
                .<Customer,Customer>chunk(50)
                .reader(multipleResourceItemReader())
                .writer(flatFileDemoWriter)
                .build();
    }
 
    private MultiResourceItemReader<Customer> multipleResourceItemReader() { 
        MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); 
        reader.setDelegate(flatFileReader());
        reader.setResources(inputFiles); 
        return reader;
    }
 
    @Bean
    public FlatFileItemReader<Customer> flatFileReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
       // reader.setLinesToSkip(1);
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper); 
        return reader; 
    }
}

异常处理及重启机制

对于chunk-oriented step,Spring Batch提供了管理状态的工具。如何在一个步骤中管理状态是通过ItemStream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是ExecutionContext实际上它是键值对的映射。map存储特定步骤的状态。该ExecutionContext使重启步骤成为可能,因为状态在JobRepository中持久存在。

执行期间出现错误时,最后一个状态将更新为JobRepository。下次作业运行时,最后一个状态将用于填充ExecutionContext然后

可以继续从上次离开的地方开始运行。

检查ItemStream接口:

将在步骤开始时调用open()并执行ExecutionContext;

用DB填充值; update()将在每个步骤或事务结束时调用,更新ExecutionContext;

完成所有数据块后调用close();

下面我们构造个例子

准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。

ItemReader测试代码


 
@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {  
    private Long curLine = 0L;
    private boolean restart = false; 
    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); 
    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv")); 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper);
    }
 
    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException { 
        Customer customer = null; 
        this.curLine++;
        //如果是重启,则从上一步读取的行数继续往下执行
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }
 
        reader.open(this.executionContext); 
        customer = reader.read();
        //当匹配到wrongName时,显示抛出异常,终止程序
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }
 
    
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        } else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        } 
    }
 
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine); 
    }
 
    @Override
    public void close() throws ItemStreamException { 
    }
}

Job配置

以10条记录为一个批次,进行读取


@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;
 
    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build(); 
    }
 
    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

当我们第一次执行时,程序在33行抛出异常异常,curline值是30;

这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)

接下来,我们更新wrongName,再次执行程序;

程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: Spring Batch 如何自定义ItemReader

本文链接: https://lsjlt.com/news/134947.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Spring Batch 如何自定义ItemReader
    目录Spring Batch 自定义ItemReader创建自定义ItemReader配置ItemReader Bean小结一下Spring Batch 之 ItemReaderJd...
    99+
    2024-04-02
  • Spring如何自定义注解
    这篇文章将为大家详细讲解有关Spring如何自定义注解,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。字段注解字段注解一般是用于校验字段是否满足要求,hibernate-validate依赖就提供了很多校验...
    99+
    2023-06-15
  • Spring Boot如何自定义starter
    这篇文章主要介绍Spring Boot如何自定义starter,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!一、简介SpringBoot 最强大的功能就是把我们常用的场景抽取成了一个个starter(场景启动器),我...
    99+
    2023-06-02
  • 如何在spring batch中使用Quart定时器
    如何在spring batch中使用Quart定时器?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。spring Batch是一个基于Spring的企业级批处理框架,它通过配...
    99+
    2023-05-31
    springbatch quart art
  • 如何使用Spring自定义Xml标签
    目录前言正文自定义NameSpaceHandler自定义schemaParserDecorator总结前言 在早期基于Xml配置的Spring Mvc项目中,我们往往会使用<...
    99+
    2024-04-02
  • spring拓展之如何定义自己的namespace
    目录spring拓展 定义自己的namespace1.查看源码认识spring是怎么加载xml配置的2.定义自己的namespacespring-namespace实现自定义标签类1...
    99+
    2024-04-02
  • Spring Data Jpa如何实现自定义方法
    这篇文章将为大家详细讲解有关Spring Data Jpa如何实现自定义方法,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Spring Data Jpa 自定义方法的实现最近项目中用到...
    99+
    2023-06-22
  • spring boot如何实现自定义配置源
    这篇文章给大家分享的是有关spring boot如何实现自定义配置源的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。概述我们知道,在Spring boot中可以通过xml或者@ImportResource 来引入自...
    99+
    2023-05-30
    springboot
  • 详解Spring Boot中如何自定义SpringMVC配置
    目录前言一、SpringBoot 中 SpringMVC 配置概述二、WebMvcConfigurerAdapter 抽象类三、WebMvcConfigurer 接口四、WebMvc...
    99+
    2024-04-02
  • Spring Cloud Zuul如何实现自定义过滤器
    小编给大家分享一下Spring Cloud Zuul如何实现自定义过滤器,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!构建Zuul自定义过滤器,限制ip频繁请求自定义zuul过滤器其实很简单1. 首先pom文件得先引入zu...
    99+
    2023-06-14
  • 如何理解Spring自定义属性编辑器
    如何理解Spring自定义属性编辑器,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Spring 自定义属性编辑器Spring DI注入的时候可以把普通属性注入进来,但是像D...
    99+
    2023-06-17
  • Spring Data MongoDB中如何实现自定义级联
    这篇文章给大家分享的是有关Spring Data MongoDB中如何实现自定义级联的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。前言Spring Data MongoDB 项目提供与MongoDB文档数据库的集...
    99+
    2023-05-30
    springdata mongodb
  • Spring开发中如何实现自定义标签
    今天就跟大家聊聊有关Spring开发中如何实现自定义标签,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Spring框架是现在Java最流行的开源框架之一,并且Spring下的各种子项...
    99+
    2023-05-31
    spring 标签
  • 使用Spring AOP 如何实现自定义注解
    这期内容当中小编将会给大家带来有关使用Spring AOP 如何实现自定义注解,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。在Maven中加入以下以依赖:<!-- Spring AOP + Aspe...
    99+
    2023-05-31
    springaop 注解
  • spring data jpa如何使用自定义repository实现类
    目录spring data jpa使用自定义repository实现类创建MyJpaRepository实现类创建MyJpaRepositoryFactoryBean配置JPAJpa...
    99+
    2024-04-02
  • spring security 自定义Provider 如何实现多种认证
    目录security内部认证流程是这样的1、 Controller2、spring security3、调用匹配的provider内部认证逻辑4、UserDetailsService...
    99+
    2024-04-02
  • 在spring-boot项目中如何实现自定义filter
    在spring-boot项目中如何实现自定义filter?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。传统的javaEE增加Filter是在web.xml中配置...
    99+
    2023-05-31
    springboot 自定义 filter
  • Spring Boot下如何自定义Repository中的DAO方法
     环境配置介绍jdk 1.8, spring Boot 1.5.3.RELEASE, MySQL, Spring Data, JPA问题描述Spring Data提供了一套简单易用的DAO层抽象与封装,覆盖的CURD的基本功能,但...
    99+
    2023-05-31
    spring boot repository
  • 在Spring Boot项目中如何实现自定义PropertySourceLoader
    今天就跟大家聊聊有关在Spring Boot项目中如何实现自定义PropertySourceLoader,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。SpringBoot 的配置文件...
    99+
    2023-05-31
    propertysourceloader springboot ce
  • 如何使用Spring Batch批处理框架
    这篇文章主要讲解了“如何使用Spring Batch批处理框架”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何使用Spring Batch批处理框架”吧!1 前言Spring Batch是...
    99+
    2023-06-16
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作