创建批量服务

创建批量服务

创建批量服务

Creating a Batch Service

通过此教程你将创建一个基本的批量驱动的解决方案。

This guide walks you through the process of creating a basic batch-driven solution.

将要做什么

What you’ll build

你将创建一个从CSV中导入数据,转为自定义编码后并存入数据库的服务。

You’ll build a service that imports data from a CSV spreadsheet, transforms it with custom code, and stores the final results in a database.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.springframework</groupId>
    <artifactId>gs-batch-processing</artifactId>
    <version>0.1.0</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

业务数据

Business Data

一般你自定义或者业务分析都支持电子表格。在此例中,先创建一个。

Typically your customer or a business analyst supplies a spreadsheet. In this case, you make it up.

src/main/resources/sample-data.csv

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此电子表格中,每行为姓和名,由逗号分开。这也是Spring可以直接支持的常见格式。

This spreadsheet contains a first name and a last name on each row, separated by a comma. This is a fairly common pattern that Spring handles out-of-the-box, as you will see.

接下来,你将编写一个SQL脚本创建一张表来存储数据。

Next, you write a SQL script to create a table to store the data.

src/main/resources/schema-all.sql

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

Spring Boot会在启动时自动运行schema-@@platform@@.sql-all默认为所有平台。

Spring Boot runs schema-@@platform@@.sql automatically during startup. -all is the default for all platforms.

创建业务类

Create a business class

现在了解了数据输入输出的格式,需要写一个类来表示每行的数据。

Now that you see the format of data inputs and outputs, you write code to represent a row of data.

src/main/java/hello/Person.java

package hello;

public class Person {

    private String lastName;
    private String firstName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "firstName: " + firstName + ", lastName: " + lastName;
    }

}

你可以通过构造器传入姓和名来实例化Person类,或者通过设置属性也可以。

You can instantiate the Person class either with first and last name through a constructor, or by setting the properties.

创建实时处理器

Create an intermediate processor

批量处理的通常做法是获取数据,转换,然后传到其他地方。现在要写一个简单的转换器来将名字转为大写。

A common paradigm in batch processing is to ingest data, transform it, and then pipe it out somewhere else. Here you write a simple transformer that converts the names to uppercase.

src/main/java/hello/PersonItemProcessor.java

package hello;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person(firstName, lastName);

        log.info("Converting (" + person + ") into (" + transformedPerson + ")");

        return transformedPerson;
    }

}

PersonItemProcessor实现了Spring Batch的ItemProcessor接口。这样就可以方便的将代码关联到稍后会定义的批量任务上。在此接口中,你将接收到一个Person对象,然后你将转换为一个姓名大写的Person对象。

PersonItemProcessor implements Spring Batch’s ItemProcessor interface. This makes it easy to wire the code into a batch job that you define further down in this guide. According to the interface, you receive an incoming Person object, after which you transform it to an upper-cased Person.

输入输出类型并不是一定要相同。事实上一般读取一种数据后,应用都会输出另一种类型的数据。

There is no requirement that the input and output types be the same. In fact, after one source of data is read, sometimes the application’s data flow needs a different data type.

创建批量任务

Put together a batch job

现在你可以创建真正的批量任务了。Spring Batch提供了许多工具类来减少代码量。这样就可以专注在业务逻辑上。

Now you put together the actual batch job. Spring Batch provides many utility classes that reduce the need to write custom code. Instead, you can focus on the business logic.

src/main/java/hello/BatchConfiguration.java

package hello;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    // tag::readerwriterprocessor[]
    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new ClassPathResource("sample-data.csv"))
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
            .dataSource(dataSource)
            .build();
    }
    // end::readerwriterprocessor[]

    // tag::jobstep[]
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1)
            .end()
            .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
    }
    // end::jobstep[]
}

对于初学者,使用@EnableBatchProcessing注解可以添加许多关键的bean来支持任务并节省精力。此例中使用了内存数据库(由@EnableBatchProcessing提供),意味着程序结束后数据会消失。

For starters, the @EnableBatchProcessing annotation adds many critical beans that support jobs and saves you a lot of leg work. This example uses a memory-based database (provided by @EnableBatchProcessing), meaning that when it’s done, the data is gone.

以下是分解:

Break it down:

src/main/java/hello/BatchConfiguration.java

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new ClassPathResource("sample-data.csv"))
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
            .dataSource(dataSource)
            .build();
    }

第一段代码定义了输入、处理器和输出。- reader()方法创建了一个ItemReader实例。它会去查找名为sample-data.csv的文件并解析每一行为一个Person对象。- processor()方法创建了一个前面定义的,PersonItemProcessor实例,用于将数据转为大写。- write(DataSource)方法创建了一个ItemWriter。此对象用于JDBC连接并自动获取由@EnableBatchProcessing创建的dataSource副本。其中包含用于通过Java bean属性驱动插入Person的SQL语句。

. The first chunk of code defines the input, processor, and output. – reader() creates an ItemReader. It looks for a file called sample-data.csv and parses each line item with enough information to turn it into a Person. – processor() creates an instance of our PersonItemProcessor you defined earlier, meant to uppercase the data. – write(DataSource) creates an ItemWriter. This one is aimed at a JDBC destination and automatically gets a copy of the dataSource created by @EnableBatchProcessing. It includes the SQL statement needed to insert a single Person driven by Java bean properties.

下面一段主要是任务的配置。

The next chunk focuses on the actual job configuration.

src/main/java/hello/BatchConfiguration.java

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1)
            .end()
            .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
    }

第一个方法定义了任务,第二个方法定义了单个步骤。任务由步骤组成,每个步骤包含读取器,处理器和写入器。

. The first method defines the job and the second one defines a single step. Jobs are built from steps, where each step can involve a reader, a processor, and a writer.

在这个任务的定义中,你需要一个自增计数器,因为任务使用了一个数据库来维护执行状态。然后列出每一个步骤,此任务中只有一个。然后任务声明结束,并通过Java API返回一个配置好的任务。

In this job definition, you need an incrementer because jobs use a database to maintain execution state. You then list each step, of which this job has only one step. The job ends, and the Java API produces a perfectly configured job.

在步骤的定义中,定义了一次写入多少数据。在此例中是一次写入最多10个数据。然后配置前面注入的读取器,处理器,写入器。

In the step definition, you define how much data to write at a time. In this case, it writes up to ten records at a time. Next, you configure the reader, processor, and writer using the injected bits from earlier.

chunk()的前缀为<Person,Person>是因为这是一个泛型方法。前缀表示了每个”chunk”处理后的输入和输出类型,然后与ItemReader<Person>ItemWriter<Person并列。

chunk() is prefixed <Person,Person> because it’s a generic method. This represents the input and output types of each “chunk” of processing, and lines up with ItemReader<Person> and ItemWriter<Person.

src/main/java/hello/JobCompletionNotificationListener.java

package hello;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");

            jdbcTemplate.query("SELECT first_name, last_name FROM people",
                (rs, row) -> new Person(
                    rs.getString(1),
                    rs.getString(2))
            ).forEach(person -> log.info("Found <" + person + "> in the database."));
        }
    }
}

这段代码是监听一个任务的BatchStatus.COMPLETED完成状态,并使用JdbcTemplate来检查结果。

This code listens for when a job is BatchStatus.COMPLETED, and then uses JdbcTemplate to inspect the results.

构建可执行JAR包

Build an executable JAR

此任务对于每个转换的数据都输出一行。任务运行完成后,你可以看见查询数据库的输出。

The job prints out a line for each person that gets transformed. After the job runs, you can also see the output from querying the database.

Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Found <firstName: JILL, lastName: DOE> in the database.
Found <firstName: JOE, lastName: DOE> in the database.
Found <firstName: JUSTIN, lastName: DOE> in the database.
Found <firstName: JANE, lastName: DOE> in the database.
Found <firstName: JOHN, lastName: DOE> in the database.

总结

Summary

恭喜!你完成了一个从数据表中读取数据,处理后在写入数据库的批量任务。

Congratulations! You built a batch job that ingested data from a spreadsheet, processed it, and wrote it to a database.

翻译部分版权归YahaCode团队所有。仅供学习研究之用,任何组织或个人不得私自以此用于任何形式的商业目的

发表评论