任务 & 任务计划 Java DSL

除了使用 shell 创建和启动任务之外,您还可以使用 spring-cloud-dataflow-rest-client 模块提供的基于 Java 的 DSL。用于 TaskTaskSchedule 的 Java DSL 提供了围绕 DataFlowTemplate 类的便捷包装器,支持以编程方式创建、启动和调度任务。

要开始使用,您需要将以下依赖项添加到您的项目中

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dataflow-rest-client</artifactId>
	<version>2.11.3</version>
</dependency>

Java DSL 的核心类是 TaskBuilderTaskTaskScheduleTaskScheduleBuilderDataFlowTemplate。任务 DSL 还使用了一些 DataFlowTemplate 类,例如 TaskExecutionResourceTaskExecutionStatusJobExecutionResourceJobInstanceResource

入口点是 TaskTaskSchedule 上的 builder 方法,它接受一个 DataFlowTemplate 实例。

获取 DataFlowOperations 实例

TaskTaskSchedule 的 DSL 都需要一个有效的 DataFlowOperations 实例。Spring Cloud Data Flow 提供了 DataFlowTemplate 作为 DataFlowOperations 接口的实现。

要创建 DataFlowTemplate 的实例,您需要提供 Data Flow 服务器的 URI 位置。Spring Boot 也提供 DataFlowTemplate 的自动配置。您可以使用 DataFlowClientProperties 中的属性来配置与 Data Flow 服务器的连接。通常,您应该从 spring.cloud.dataflow.client.server-uri 属性开始

URI dataFlowUri = URI.create("https://127.0.0.1:9393");
DataFlowOperations dataflowOperations = new DataFlowTemplate(dataFlowUri);

任务 DSL 用法

您可以借助 TaskBuilder 类创建新的 Task 实例,该类从 Task.builder(dataFlowOperations) 方法返回。

请考虑以下示例,它创建了一个新的组合任务

dataflowOperations.appRegistryOperations().importFromResource(
                     "https://dataflow.springframework.org.cn/task-maven-latest", true);

Task task = Task.builder(dataflowOperations)
              .name("myComposedTask")
              .definition("a: timestamp && b:timestamp")
              .description("My Composed Task")
              .build();

build 方法返回一个 Task 定义实例,该实例表示已创建但尚未启动的组合任务。任务定义中使用的 timestamp 指的是在 DataFlow 中注册的任务应用程序名称。

要创建和启动您的任务,您需要确保首先在 Data Flow 服务器中注册了相应的应用程序,如批处理开发者指南 中所示。

尝试启动包含未知应用程序的任务会引发异常。您可以使用 DataFlowOperations 注册您的应用程序,如下所示

dataflowOperations.appRegistryOperations().importFromResource(
            "https://dataflow.springframework.org.cn/task-maven-latest", true);

您可以使用 TaskBuilder 按名称检索现有的 Task 实例,而不是创建新的 Task

Optional<Task> task = Task.builder(dataflowOperations).findByName("myComposedTask");

您还可以列出所有现有任务

List<Task> tasks = Task.builder(dataflowOperations).allTasks();

使用 Task 实例,您可以使用可用的方法来 启动销毁 任务。以下示例启动了该任务

long executionId = task.launch();

executionId 是已启动任务的唯一任务执行标识符。launch 方法被重载以接受启动属性的 java.util.Map<String, String> 和命令行参数的 java.util.List<String>

任务是异步运行的。如果您的用例要求您等待任务完成或其他任务状态,您可以使用 Java 并发实用程序或 Awaitility 库,如下所示

org.awaitility.Awaitility.await().until(
  () -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);

Task 实例提供了 executionStatusdestroystop 方法来控制和查询任务。

Collection<TaskExecutionResource> executions() 方法列出由任务启动的所有 TaskExecutionResource 实例。您可以使用 executionId 检索特定执行的 TaskExecutionResourceOptional<TaskExecutionResource> execution(long executionId))。

类似地,Collection<JobExecutionResource> jobExecutionResources()Collection<JobInstanceResource> jobInstanceResources() 将允许您在任务使用 Spring Batch 作业时对其进行内省。

任务计划 DSL 用法

考虑以下示例,该示例创建一个新任务并对其进行调度

Task task = Task.builder(dataflowOperations)
              .name("myTask")
              .definition("timestamp")
              .description("simple task")
              .build();

TaskSchedule schedule = TaskSchedule.builder(dataflowOperations)
              .scheduleName("mySchedule")
              .task(task)
              .build();

TaskSchedule.builder(dataFlowOperations) 方法返回 TaskScheduleBuilder 类。

build 方法返回一个 TaskSchedule 实例,名为 mySchedule,它配置了一个调度实例。此时,尚未创建调度。

您可以使用 schedule() 方法创建调度

schedule.schedule("56 20 ? * *", Collections.emptyMap());

您还可以使用 unschedule() 方法删除它

schedule.unschedule();

您可以使用 TaskScheduleBuilder 检索一个或所有现有的调度程序

Optional<TaskSchedule> retrievedSchedule =
          taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());

List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);

设置部署属性

本节介绍如何为 TaskTaskScheduler 设置部署属性。

Task 使用 DeploymentPropertiesBuilder

launch(Map<String, String> properties, List<String> arguments) 方法允许您自定义任务的启动方式。我们通过使用构建器样式并为某些属性创建静态方法,使创建具有属性的映射变得更容易,因此您无需记住属性的名称

Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
		.memory("myTask", 512)
		.put("app.timestamp.timestamp.format", "YYYY")
		.build();

        Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
        if(task.isPresent()) {
        long executionId = task.get().launch(taskLaunchProperties, Collections.EMPTY_LIST);
        }

TaskSchedule 设置属性

同样,您可以为 TaskSchedule 实例设置部署属性

Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);