远程分区介绍

批处理开发者指南向您展示了如何创建一个典型的单线程 Spring Batch 应用程序。虽然这对大多数批处理应用程序来说是一个很好的解决方案,但在某些情况下,批处理作业中的特定步骤可能需要相当长的时间才能完成所需的工作。Spring Batch 提供了一个解决方案,允许 批处理作业步骤 执行进行分区,以便每个分区处理一部分工作。简而言之,分区允许多个大型批处理应用程序实例并发运行。这样做的目的是减少处理长时间运行的批处理作业所需的总时间。可以成功分区处理的过程是那些可以拆分输入文件或可以对主数据库表进行分区以允许应用程序针对不同数据集运行的过程。

假设我们有一个三步作业

Batch Partitioning

作业在左侧作为一系列步骤实例运行。在本例中,我们有一个步骤(步骤 1),它是分区的管理器管理器步骤负责为每个工作器分配工作并启动它们。在本例中,工作器是启用了特定配置文件的 Spring Batch 应用程序的另一个实例。我们图表中的工作器实际上是部署到平台的 Spring Batch 应用程序的副本。JobRepository 中的 Spring Batch 元数据确保每个工作器对于每个作业执行只执行一次。

使用分区构建我们自己的批处理应用程序

在我们的示例应用程序中,我们创建一个批处理作业,该作业包含一个已分区的步骤,并且每个分区都打印其分区号。

您可以在Spring Cloud Task 示例中查看已完成的项目。

初始化器

要创建我们的批处理应用程序,我们需要访问 Spring Initializr 网站并创建一个项目,如下所示

  1. 访问Spring Initializr 网站
  2. 选择 spring boot 的最新2.7.x版本。
  3. 创建一个新的 Maven 项目,组名为io.spring.cloud,工件名为partition
  4. 在“**依赖项**”文本框中,键入task以选择 Cloud Task 依赖项。
  5. 在“**依赖项**”文本框中,键入jdbc,然后选择 JDBC API 依赖项。
  6. 在“**依赖项**”文本框中,键入h2,然后选择 H2 依赖项。

    1. 我们使用 H2 进行单元测试。
  7. 在“**依赖项**”文本框中,键入mariadb,然后选择 mariadb 依赖项(或您喜欢的数据库)。

    1. 我们使用 mariadb 作为运行时数据库。
  8. 在“**依赖项**”文本框中,键入batch,然后选择 Batch。
  9. 单击“**生成项目**”按钮。
  10. 下载partition.zip文件,将其解压缩,然后将项目导入您喜欢的 IDE。

设置 MariaDB

按照以下说明为此示例运行 MariaDB Docker 镜像

  1. 运行以下命令拉取 MariaDB Docker 镜像

    docker pull mariadb:10.4.22
  2. 运行以下命令启动 MariaDB

    docker run --name mariadb -d -p 3306:3306 -e MARIADB_ROOT_PASSWORD=password -e MARIADB_DATABASE=task mariadb:10.4.22

构建应用程序

构建应用程序:

  1. 在你喜欢的 IDE 中,创建 io.spring.cloud.partition.configuration 包。
  2. 使用你喜欢的 IDE 将以下依赖项添加到 pom.xml 中。
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-integration</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-deployer-local</artifactId>
			<version>2.9.3</version>
		</dependency>
  1. 创建一个 Java 配置,指定 Partition Job 所需的 bean。在本例中,创建一个 JobConfiguration 类(位于 io.spring.cloud.partition.configuration 中),其内容如下所示。

    @Configuration
    public class JobConfiguration {
    
        private static final int GRID_SIZE = 4;
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
        @Autowired
        public DataSource dataSource;
        @Autowired
        public JobRepository jobRepository;
        @Autowired
        private ConfigurableApplicationContext context;
        @Autowired
        private DelegatingResourceLoader resourceLoader;
        @Autowired
        private Environment environment;
    
        @Bean 
        public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer) throws Exception {
            Resource resource = this.resourceLoader
                .getResource("maven://io.spring.cloud:partition:0.0.1-SNAPSHOT");
    
            DeployerPartitionHandler partitionHandler =
                new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
    
            List<String> commandLineArgs = new ArrayList<>(3);
            commandLineArgs.add("--spring.profiles.active=worker");
            commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
            commandLineArgs.add("--spring.batch.initializer.enabled=false");
            partitionHandler
                .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
            partitionHandler
                .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
            partitionHandler.setMaxWorkers(1);
            partitionHandler.setApplicationName("PartitionedBatchJobTask");
    
            return partitionHandler;
        }
    
        @Bean 
        @Profile("!worker")
        public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
            Random random = new Random();
            return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
                .start(step1(partitionHandler))
                .build();
        }
    
        @Bean 
        public Step step1(PartitionHandler partitionHandler) throws Exception {
            return this.stepBuilderFactory.get("step1")
                .partitioner(workerStep().getName(), partitioner())
                .step(workerStep())
                .partitionHandler(partitionHandler)
                .build();
        }
    
        @Bean   
        public Partitioner partitioner() {
            return new Partitioner() {
                @Override
                public Map<String, ExecutionContext> partition(int gridSize) {
    
                    Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
    
                    for (int i = 0; i < GRID_SIZE; i++) {
                        ExecutionContext context1 = new ExecutionContext();
                        context1.put("partitionNumber", i);
    
                        partitions.put("partition" + i, context1);
                    }
    
                    return partitions;
                }
            };
        }
    
        @Bean   
        @Profile("worker")
        public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
            return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
        }
    
        @Bean 
        public Step workerStep() {
            return this.stepBuilderFactory.get("workerStep")
                .tasklet(workerTasklet(null))
                .build();
        }
    
        @Bean 
        @StepScope
        public Tasklet workerTasklet(
            final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
    
            return new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                    System.out.println("This tasklet ran partition: " + partitionNumber);
    
                    return RepeatStatus.FINISHED;
                }
            };
        }
    }
  2. 现在,我们可以将 @EnableTask@EnableBatchProcessing 注解添加到 PartitionApplication 类中,如下所示
@SpringBootApplication
@EnableTask  
@EnableBatchProcessing 
public class PartitionApplication {

	public static void main(String[] args) {
		SpringApplication.run(PartitionApplication.class, args);
	}
}
  • @EnableTask 注解设置一个 TaskRepository,用于存储有关任务执行的信息,例如任务的开始和结束时间及其退出代码。
  • @EnableBatchProcessing 注解启用 Spring Batch 功能,并提供用于设置批处理作业的基本配置。

部署

本节介绍如何部署批处理应用程序。

本地

本节介绍如何将批处理应用程序部署到本地计算机。

  1. 现在我们可以进行下一步,构建项目。在命令行中,将目录切换到项目所在的位置,并使用 Maven 构建项目:./mvnw clean install -DskipTests
  2. 现在,我们可以使用启动批处理应用程序所需的配置来执行应用程序。

    要配置批处理应用程序的执行,请将以下属性添加到您的环境中

export spring_batch_initializeSchema=always 
java -jar target/partition-0.0.1-SNAPSHOT.jar --spring.application.json='{"spring":{"datasource":{"url":"jdbc:mariadb://localhost:3306/task","username":"root","password":"password","driverClassName":"org.mariadb.jdbc.Driver"}}}'
  • spring.batch.initializeSchema:使用 Spring Batch 所需的表初始化数据库。在本例中,我们声明我们始终希望这样做。如果表已存在,则不会覆盖它们。

清理

要停止并移除在 Docker 实例中运行的 MariaDB 容器,请运行以下命令

docker stop mariadb
docker rm mariadb