远程分区介绍
批处理开发者指南向您展示了如何创建一个典型的单线程 Spring Batch 应用程序。虽然这对大多数批处理应用程序来说是一个很好的解决方案,但在某些情况下,批处理作业中的特定步骤可能需要相当长的时间才能完成所需的工作。Spring Batch 提供了一个解决方案,允许 批处理作业 对 步骤 执行进行分区,以便每个分区处理一部分工作。简而言之,分区允许多个大型批处理应用程序实例并发运行。这样做的目的是减少处理长时间运行的批处理作业所需的总时间。可以成功分区处理的过程是那些可以拆分输入文件或可以对主数据库表进行分区以允许应用程序针对不同数据集运行的过程。
假设我们有一个三步作业
作业在左侧作为一系列步骤实例运行。在本例中,我们有一个步骤(步骤 1
),它是分区的管理器
。管理器步骤
负责为每个工作器
分配工作并启动它们。在本例中,工作器
是启用了特定配置文件的 Spring Batch 应用程序的另一个实例。我们图表中的工作器实际上是部署到平台的 Spring Batch 应用程序的副本。JobRepository
中的 Spring Batch 元数据确保每个工作器对于每个作业执行只执行一次。
使用分区构建我们自己的批处理应用程序
在我们的示例应用程序中,我们创建一个批处理作业,该作业包含一个已分区的步骤,并且每个分区都打印其分区号。
您可以在Spring Cloud Task 示例中查看已完成的项目。
初始化器
要创建我们的批处理应用程序,我们需要访问 Spring Initializr 网站并创建一个项目,如下所示
- 访问Spring Initializr 网站。
- 选择 spring boot 的最新
2.7.x
版本。 - 创建一个新的 Maven 项目,组名为
io.spring.cloud
,工件名为partition
。 - 在“**依赖项**”文本框中,键入
task
以选择 Cloud Task 依赖项。 - 在“**依赖项**”文本框中,键入
jdbc
,然后选择 JDBC API 依赖项。 -
在“**依赖项**”文本框中,键入
h2
,然后选择 H2 依赖项。- 我们使用 H2 进行单元测试。
-
在“**依赖项**”文本框中,键入
mariadb
,然后选择 mariadb 依赖项(或您喜欢的数据库)。- 我们使用 mariadb 作为运行时数据库。
- 在“**依赖项**”文本框中,键入
batch
,然后选择 Batch。 - 单击“**生成项目**”按钮。
- 下载
partition.zip
文件,将其解压缩,然后将项目导入您喜欢的 IDE。
设置 MariaDB
按照以下说明为此示例运行 MariaDB Docker 镜像
-
运行以下命令拉取 MariaDB Docker 镜像
docker pull mariadb:10.4.22
-
运行以下命令启动 MariaDB
docker run --name mariadb -d -p 3306:3306 -e MARIADB_ROOT_PASSWORD=password -e MARIADB_DATABASE=task mariadb:10.4.22
构建应用程序
构建应用程序:
- 在你喜欢的 IDE 中,创建
io.spring.cloud.partition.configuration
包。 - 使用你喜欢的 IDE 将以下依赖项添加到 pom.xml 中。
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-local</artifactId>
<version>2.9.3</version>
</dependency>
-
创建一个 Java 配置,指定 Partition
Job
所需的 bean。在本例中,创建一个JobConfiguration
类(位于io.spring.cloud.partition.configuration
中),其内容如下所示。@Configuration public class JobConfiguration { private static final int GRID_SIZE = 4; @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public DataSource dataSource; @Autowired public JobRepository jobRepository; @Autowired private ConfigurableApplicationContext context; @Autowired private DelegatingResourceLoader resourceLoader; @Autowired private Environment environment; @Bean public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer) throws Exception { Resource resource = this.resourceLoader .getResource("maven://io.spring.cloud:partition:0.0.1-SNAPSHOT"); DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep"); List<String> commandLineArgs = new ArrayList<>(3); commandLineArgs.add("--spring.profiles.active=worker"); commandLineArgs.add("--spring.cloud.task.initialize.enable=false"); commandLineArgs.add("--spring.batch.initializer.enabled=false"); partitionHandler .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs)); partitionHandler .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(1); partitionHandler.setApplicationName("PartitionedBatchJobTask"); return partitionHandler; } @Bean @Profile("!worker") public Job partitionedJob(PartitionHandler partitionHandler) throws Exception { Random random = new Random(); return this.jobBuilderFactory.get("partitionedJob" + random.nextInt()) .start(step1(partitionHandler)) .build(); } @Bean public Step step1(PartitionHandler partitionHandler) throws Exception { return this.stepBuilderFactory.get("step1") .partitioner(workerStep().getName(), partitioner()) .step(workerStep()) .partitionHandler(partitionHandler) .build(); } @Bean public Partitioner partitioner() { return new Partitioner() { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) { ExecutionContext context1 = new ExecutionContext(); context1.put("partitionNumber", i); partitions.put("partition" + i, context1); } return partitions; } }; } @Bean @Profile("worker") public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) { return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository); } @Bean public Step workerStep() { return this.stepBuilderFactory.get("workerStep") .tasklet(workerTasklet(null)) .build(); } @Bean @StepScope public Tasklet workerTasklet( final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) { return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { System.out.println("This tasklet ran partition: " + partitionNumber); return RepeatStatus.FINISHED; } }; } }
-
PartitionHandler
是知道如何对Step
进行分区的组件。它向远程步骤发送StepExecution
请求。 -
Job
管理批处理过程。 - 管理器使用此
Step
来启动工作步骤 -
Partitioner
生成执行上下文作为新步骤执行的输入参数。 -
DeployerStepExecutionHandler
使用 Spring Cloud Deployer 在云平台上启动工作步骤执行。 - 工作器使用此
Step
来执行Tasklet
。 -
Tasklet
执行分区工作集的业务逻辑 - 在本例中,打印分区号。
-
- 现在,我们可以将
@EnableTask
和@EnableBatchProcessing
注解添加到PartitionApplication
类中,如下所示
@SpringBootApplication
@EnableTask
@EnableBatchProcessing
public class PartitionApplication {
public static void main(String[] args) {
SpringApplication.run(PartitionApplication.class, args);
}
}
-
@EnableTask
注解设置一个TaskRepository
,用于存储有关任务执行的信息,例如任务的开始和结束时间及其退出代码。 -
@EnableBatchProcessing
注解启用 Spring Batch 功能,并提供用于设置批处理作业的基本配置。
部署
本节介绍如何部署批处理应用程序。
本地
本节介绍如何将批处理应用程序部署到本地计算机。
- 现在我们可以进行下一步,构建项目。在命令行中,将目录切换到项目所在的位置,并使用 Maven 构建项目:
./mvnw clean install -DskipTests
。 -
现在,我们可以使用启动批处理应用程序所需的配置来执行应用程序。
要配置批处理应用程序的执行,请将以下属性添加到您的环境中
export spring_batch_initializeSchema=always
java -jar target/partition-0.0.1-SNAPSHOT.jar --spring.application.json='{"spring":{"datasource":{"url":"jdbc:mariadb://localhost:3306/task","username":"root","password":"password","driverClassName":"org.mariadb.jdbc.Driver"}}}'
-
spring.batch.initializeSchema
:使用 Spring Batch 所需的表初始化数据库。在本例中,我们声明我们始终
希望这样做。如果表已存在,则不会覆盖它们。
清理
要停止并移除在 Docker 实例中运行的 MariaDB 容器,请运行以下命令
docker stop mariadb
docker rm mariadb