任务 & 任务计划 Java DSL
除了使用 shell 创建和启动任务之外,您还可以使用 spring-cloud-dataflow-rest-client 模块提供的基于 Java 的 DSL。用于 Task 和 TaskSchedule 的 Java DSL 提供了围绕 DataFlowTemplate 类的便捷包装器,支持以编程方式创建、启动和调度任务。
要开始使用,您需要将以下依赖项添加到您的项目中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-rest-client</artifactId>
<version>2.11.3</version>
</dependency>Java DSL 的核心类是 TaskBuilder、Task、TaskSchedule、TaskScheduleBuilder 和 DataFlowTemplate。任务 DSL 还使用了一些 DataFlowTemplate 类,例如 TaskExecutionResource、TaskExecutionStatus、JobExecutionResource 和 JobInstanceResource。
入口点是 Task 或 TaskSchedule 上的 builder 方法,它接受一个 DataFlowTemplate 实例。
获取 DataFlowOperations 实例
Task 和 TaskSchedule 的 DSL 都需要一个有效的 DataFlowOperations 实例。Spring Cloud Data Flow 提供了 DataFlowTemplate 作为 DataFlowOperations 接口的实现。
要创建 DataFlowTemplate 的实例,您需要提供 Data Flow 服务器的 URI 位置。Spring Boot 也提供 DataFlowTemplate 的自动配置。您可以使用 DataFlowClientProperties 中的属性来配置与 Data Flow 服务器的连接。通常,您应该从 spring.cloud.dataflow.client.server-uri 属性开始
URI dataFlowUri = URI.create("https://:9393");
DataFlowOperations dataflowOperations = new DataFlowTemplate(dataFlowUri);任务 DSL 用法
您可以借助 TaskBuilder 类创建新的 Task 实例,该类从 Task.builder(dataFlowOperations) 方法返回。
请考虑以下示例,它创建了一个新的组合任务
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/task-maven-latest", true);
Task task = Task.builder(dataflowOperations)
.name("myComposedTask")
.definition("a: timestamp && b:timestamp")
.description("My Composed Task")
.build();build 方法返回一个 Task 定义实例,该实例表示已创建但尚未启动的组合任务。任务定义中使用的 timestamp 指的是在 DataFlow 中注册的任务应用程序名称。
要创建和启动您的任务,您需要确保首先在 Data Flow 服务器中注册了相应的应用程序,如批处理开发者指南 中所示。
尝试启动包含未知应用程序的任务会引发异常。您可以使用 DataFlowOperations 注册您的应用程序,如下所示
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/task-maven-latest", true);您可以使用 TaskBuilder 按名称检索现有的 Task 实例,而不是创建新的 Task
Optional<Task> task = Task.builder(dataflowOperations).findByName("myComposedTask");您还可以列出所有现有任务
List<Task> tasks = Task.builder(dataflowOperations).allTasks();使用 Task 实例,您可以使用可用的方法来 启动 或 销毁 任务。以下示例启动了该任务
long executionId = task.launch();executionId 是已启动任务的唯一任务执行标识符。launch 方法被重载以接受启动属性的 java.util.Map<String, String> 和命令行参数的 java.util.List<String>。
任务是异步运行的。如果您的用例要求您等待任务完成或其他任务状态,您可以使用 Java 并发实用程序或 Awaitility 库,如下所示
org.awaitility.Awaitility.await().until(
() -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);Task 实例提供了 executionStatus、destroy 和 stop 方法来控制和查询任务。
Collection<TaskExecutionResource> executions() 方法列出由任务启动的所有 TaskExecutionResource 实例。您可以使用 executionId 检索特定执行的 TaskExecutionResource(Optional<TaskExecutionResource> execution(long executionId))。
类似地,Collection<JobExecutionResource> jobExecutionResources() 和 Collection<JobInstanceResource> jobInstanceResources() 将允许您在任务使用 Spring Batch 作业时对其进行内省。
任务计划 DSL 用法
考虑以下示例,该示例创建一个新任务并对其进行调度
Task task = Task.builder(dataflowOperations)
.name("myTask")
.definition("timestamp")
.description("simple task")
.build();
TaskSchedule schedule = TaskSchedule.builder(dataflowOperations)
.scheduleName("mySchedule")
.task(task)
.build();TaskSchedule.builder(dataFlowOperations) 方法返回 TaskScheduleBuilder 类。
build 方法返回一个 TaskSchedule 实例,名为 mySchedule,它配置了一个调度实例。此时,尚未创建调度。
您可以使用 schedule() 方法创建调度
schedule.schedule("56 20 ? * *", Collections.emptyMap());您还可以使用 unschedule() 方法删除它
schedule.unschedule();您可以使用 TaskScheduleBuilder 检索一个或所有现有的调度程序
Optional<TaskSchedule> retrievedSchedule =
taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());
List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);设置部署属性
本节介绍如何为 Task 和 TaskScheduler 设置部署属性。
对 Task 使用 DeploymentPropertiesBuilder
launch(Map<String, String> properties, List<String> arguments) 方法允许您自定义任务的启动方式。我们通过使用构建器样式并为某些属性创建静态方法,使创建具有属性的映射变得更容易,因此您无需记住属性的名称
Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
.memory("myTask", 512)
.put("app.timestamp.timestamp.format", "YYYY")
.build();
Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
if(task.isPresent()) {
long executionId = task.get().launch(taskLaunchProperties, Collections.EMPTY_LIST);
}为 TaskSchedule 设置属性
同样,您可以为 TaskSchedule 实例设置部署属性
Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);