Spring Batch轻量级批处理框架实战
作者:Mr.Ymx 时间:2023-01-08 00:24:23
1 实战前的理论基础
1.1 Spring Batch是什么
Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大的批处理应用程序。同时使开发人员在必要时可以轻松访问和利用更先进的企业服务。Spring Batch 不是调度框架,它旨在与调度程序一起工作,而不是取代调度程序。
1.2 Spring Batch能做什么
自动化、复杂的大量信息处理,无需用户交互即可最有效地处理。这些操作通常包括基于时间的事件(例如月末计算、通知或通信)。
定期应用在非常大的数据集上重复处理的复杂业务规则(例如,保险福利确定或费率调整)。
将从内部和外部系统接收的信息集成到记录系统中,这些信息通常需要以事务方式进行格式化、验证和处理。批处理用于每天为企业处理数十亿笔交易。
业务场景:
定期提交批处理
并发批处理:作业的并行处理
分阶段的、企业消息驱动的处理
大规模并行批处理
失败后手动或计划重启
依赖步骤的顺序处理(扩展到工作流驱动的批处理)
部分处理:跳过记录(例如,在回滚时)
整批事务,适用于小批量或现有存储过程/脚本的情况
总之Spring batch可以做的:
从数据库、文件或队列中读取大量记录。
以某种方式处理数据。
以修改后的形式写回数据。
1.3 基础架构
1.4 核心概念和抽象
核心概念:一个 Job 有一对多的Step,每个步骤都正好有一个 ItemReader
、一个ItemProcessor
和 一个ItemWriter
。需要启动作业(使用 JobLauncher
),并且需要存储有关当前运行进程的元数据(在 中 JobRepository
)。
2 各个组件介绍
2.1 Job
Job
是封装了整个批处理过程的实体。与其他 Spring 项目一样,一个Job
与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以被称为“作业配置”。
可配置项:
作业的简单名称。
Step
实例的定义和排序。作业是否可重新启动。
2.2 Step
一个Step
是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每个 Job 完全由一个或多个步骤组成。一个Step
包含定义和控制实际批处理所需的所有信息。
一个StepExecution
代表一次尝试执行一个Step
。StepExecution
每次Step
运行时都会创建一个新的,类似于JobExecution
。
2.3 ExecutionContext
一个ExecutionContext
表示由框架持久化和控制的键/值对的集合,以允许开发人员有一个地方来存储范围为StepExecution
对象或JobExecution
对象的持久状态。
2.4 JobRepository
JobRepository
是上述所有 Stereotypes 的持久性机制。它提供了CRUD操作JobLauncher
,Job
以及Step
实现。当 Job
第一次启动,一个JobExecution
被从库中获得,并且,执行的过程中,StepExecution
和JobExecution
实施方式是通过将它们传递到存储库持续。
使用 Java 配置时,@EnableBatchProcessing
注解提供了一个 JobRepository
作为开箱即用自动配置的组件之一。
2.5 JobLauncher
JobLauncher
表示一个简单的接口,用于Job
使用给定的 集合 启动JobParameters
,如以下示例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
期望实现JobExecution
从 中 获得有效JobRepository
并执行Job
。
2.6 Item Reader
ItemReader
是一种抽象,表示一次检索Step
一个项目的输入。当ItemReader
用完它可以提供的项目时,它通过返回来表明这一点null
。
2.7 Item Writer
ItemWriter
是一种抽象,表示一次一个Step
、一批或一大块项目的输出。通常, anItemWriter
不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。
2.8 Item Processor
ItemProcessor
是表示项目的业务处理的抽象。当ItemReader
读取一个项目并ItemWriter
写入它们时,它 ItemProcessor
提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null
表示不应写出该项目。
3 Spring Batch实战
下面就利用我们所学的理论实现一个最简单的Spring Batch批处理项目
3.1 依赖和项目结构以及配置文件
依赖
<!--Spring batch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- mybatis-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.2.0</version>
</dependency>
项目结构
配置文件
server.port=9000
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
3.2 代码和数据表
数据表
CREATE TABLE `student` (
`id` int(100) NOT NULL AUTO_INCREMENT,
`name` varchar(45) DEFAULT NULL,
`age` int(2) DEFAULT NULL,
`address` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT
Student实体类
/**
* @desc: Student实体类
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {
@TableId(value = "id", type = IdType.AUTO)
private Long sId;
@TableField("name")
private String sName;
@TableField("age")
private Integer sAge;
@TableField("address")
private String sAddress;
}
Mapper层
/**
* @desc: Mapper层
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}
模拟数据库(文件)中读取类
/**
* @desc: 模拟数据库中读取
* @author: YanMingXin
* @create: 2021/10/16-10:13
**/
public class StudentVirtualDao {
/**
* 模拟从数据库中读取
*
* @return
*/
public List<Student> getStudents() {
ArrayList<Student> students = new ArrayList<>();
students.add(new Student(1L, "zs", 23, "Beijing"));
students.add(new Student(2L, "ls", 23, "Beijing"));
students.add(new Student(3L, "ww", 23, "Beijing"));
students.add(new Student(4L, "zl", 23, "Beijing"));
students.add(new Student(5L, "mq", 23, "Beijing"));
students.add(new Student(6L, "gb", 23, "Beijing"));
students.add(new Student(7L, "lj", 23, "Beijing"));
students.add(new Student(8L, "ss", 23, "Beijing"));
students.add(new Student(9L, "zsdd", 23, "Beijing"));
students.add(new Student(10L, "zss", 23, "Beijing"));
return students;
}
}
Service层接口
/**
* @desc:
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
public interface StudentService {
List<Student> selectStudentsFromDB();
void insertStudent(Student student);
}
Service层实现类
/**
* @desc: Service层实现类
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private StudentDao studentDao;
@Override
public List<Student> selectStudentsFromDB() {
return studentDao.selectList(null);
}
@Override
public void insertStudent(Student student) {
studentDao.insert(student);
}
}
最核心的配置类BatchConfiguration
/**
* @desc: BatchConfiguration
* @author: YanMingXin
* @create: 2021/10/15-12:25
**/
@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {
/**
* 注入JobBuilderFactory
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
/**
* 注入StepBuilderFactory
*/
@Autowired
public StepBuilderFactory stepBuilderFactory;
/**
* 注入JobRepository
*/
@Autowired
public JobRepository jobRepository;
/**
* 注入JobLauncher
*/
@Autowired
private JobLauncher jobLauncher;
/**
* 注入自定义StudentService
*/
@Autowired
private StudentService studentService;
/**
* 注入自定义job
*/
@Autowired
private Job studentJob;
/**
* 封装writer bean
*
* @return
*/
@Bean
public ItemWriter<Student> writer() {
ItemWriter<Student> writer = new ItemWriter() {
@Override
public void write(List list) throws Exception {
//debug发现是嵌套的List reader的线程List嵌套真正的List
list.forEach((stu) -> {
for (Student student : (ArrayList<Student>) stu) {
studentService.insertStudent(student);
}
});
}
};
return writer;
}
/**
* 封装reader bean
*
* @return
*/
@Bean
public ItemReader<Student> reader() {
ItemReader<Student> reader = new ItemReader() {
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//模拟数据获取
StudentVirtualDao virtualDao = new StudentVirtualDao();
return virtualDao.getStudents();
}
};
return reader;
}
/**
* 封装processor bean
*
* @return
*/
@Bean
public ItemProcessor processor() {
ItemProcessor processor = new ItemProcessor() {
@Override
public Object process(Object o) throws Exception {
//debug发现o就是reader单次单线程读取的数据
return o;
}
};
return processor;
}
/**
* 封装自定义step
*
* @return
*/
@Bean
public Step studentStepOne() {
return stepBuilderFactory.get("studentStepOne")
.chunk(1)
.reader(reader()) //加入reader
.processor(processor()) //加入processor
.writer(writer())//加入writer
.build();
}
/**
* 封装自定义job
*
* @return
*/
@Bean
public Job studentJob() {
return jobBuilderFactory.get("studentJob")
.flow(studentStepOne())//加入step
.end()
.build();
}
/**
* 使用spring 定时任务执行
*/
@Scheduled(fixedRate = 5000)
public void printMessage() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(studentJob, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.3 测试
项目启动1s之后
看数据库,除了我们实体类定义的表以外多出来这么多表,这些表都是spring batch自带的记录日志和错误的表,具体的字段含义的有待研究
4 实战后的总结
Spring Batch有非常快的写入和读取速度,但是带来的影响就是非常耗费内存和数据库连接池的资源如果使用不好的话还会发生异常,因此我们要进行正确的配置,接下来我们进行简单的源码探究:
4.1 JobBuilderFactory
job的获取使用了简单工厂模式和建造者模式JobBuilderFactory获取JobBuilder在经过配置返回一个job对象的实例,该实例就是Spring Batch中最顶级的组件,包含了n和step
public class JobBuilderFactory {
private JobRepository jobRepository;
public JobBuilderFactory(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
//返回JobBuilder
public JobBuilder get(String name) {
JobBuilder builder = new JobBuilder(name).repository(jobRepository);
return builder;
}
}
jobBuilder类
public class JobBuilder extends JobBuilderHelper<JobBuilder> {
/**
* 为指定名称的作业创建一个新的构建器
*/
public JobBuilder(String name) {
super(name);
}
/**
* 创建将执行步骤或步骤序列的新作业构建器。
*/
public SimpleJobBuilder start(Step step) {
return new SimpleJobBuilder(this).start(step);
}
/**
* 创建将执行流的新作业构建器。
*/
public JobFlowBuilder start(Flow flow) {
return new FlowJobBuilder(this).start(flow);
}
/**
* 创建将执行步骤或步骤序列的新作业构建器
*/
public JobFlowBuilder flow(Step step) {
return new FlowJobBuilder(this).start(step);
}
}
4.2 StepBuilderFactory
直接看StepBuilder类
public class StepBuilder extends StepBuilderHelper<StepBuilder> {
public StepBuilder(String name) {
super(name);
}
/**
* 用自定义微线程构建步骤,不一定是项处理。
*/
public TaskletStepBuilder tasklet(Tasklet tasklet) {
return new TaskletStepBuilder(this).tasklet(tasklet);
}
/**
* 构建一个步骤,按照提供的大小以块的形式处理项。为了将这一步扩展到容错,
* 在构建器上调用SimpleStepBuilder的 faultolerant()方法。
* @param <I> 输入类型
* @param <O> 输出类型
*/
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
}
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
}
public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
}
public PartitionStepBuilder partitioner(Step step) {
return new PartitionStepBuilder(this).step(step);
}
public JobStepBuilder job(Job job) {
return new JobStepBuilder(this).job(job);
}
/**
* 创建将执行流的新步骤构建器。
*/
public FlowStepBuilder flow(Flow flow) {
return new FlowStepBuilder(this).flow(flow);
}
}
参考文档:
https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html
https://www.jdon.com/springbatch.html
来源:https://blog.csdn.net/Mr_YanMingXin/article/details/120798271