任务 & 任务计划 Java DSL
除了使用 shell 创建和启动任务之外,您还可以使用 spring-cloud-dataflow-rest-client
模块提供的基于 Java 的 DSL。用于 Task
和 TaskSchedule
的 Java DSL 提供了围绕 DataFlowTemplate
类的便捷包装器,支持以编程方式创建、启动和调度任务。
要开始使用,您需要将以下依赖项添加到您的项目中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-rest-client</artifactId>
<version>2.11.3</version>
</dependency>
Java DSL 的核心类是 TaskBuilder
、Task
、TaskSchedule
、TaskScheduleBuilder
和 DataFlowTemplate
。任务 DSL 还使用了一些 DataFlowTemplate 类,例如 TaskExecutionResource
、TaskExecutionStatus
、JobExecutionResource
和 JobInstanceResource
。
入口点是 Task
或 TaskSchedule
上的 builder
方法,它接受一个 DataFlowTemplate
实例。
获取 DataFlowOperations 实例
Task
和 TaskSchedule
的 DSL 都需要一个有效的 DataFlowOperations
实例。Spring Cloud Data Flow 提供了 DataFlowTemplate
作为 DataFlowOperations
接口的实现。
要创建 DataFlowTemplate
的实例,您需要提供 Data Flow 服务器的 URI
位置。Spring Boot 也提供 DataFlowTemplate
的自动配置。您可以使用 DataFlowClientProperties
中的属性来配置与 Data Flow 服务器的连接。通常,您应该从 spring.cloud.dataflow.client.server-uri
属性开始
URI dataFlowUri = URI.create("https://127.0.0.1:9393");
DataFlowOperations dataflowOperations = new DataFlowTemplate(dataFlowUri);
任务 DSL 用法
您可以借助 TaskBuilder
类创建新的 Task
实例,该类从 Task.builder(dataFlowOperations)
方法返回。
请考虑以下示例,它创建了一个新的组合任务
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/task-maven-latest", true);
Task task = Task.builder(dataflowOperations)
.name("myComposedTask")
.definition("a: timestamp && b:timestamp")
.description("My Composed Task")
.build();
build
方法返回一个 Task
定义实例,该实例表示已创建但尚未启动的组合任务。任务定义中使用的 timestamp
指的是在 DataFlow 中注册的任务应用程序名称。
要创建和启动您的任务,您需要确保首先在 Data Flow 服务器中注册了相应的应用程序,如批处理开发者指南 中所示。
尝试启动包含未知应用程序的任务会引发异常。您可以使用 DataFlowOperations
注册您的应用程序,如下所示
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/task-maven-latest", true);
您可以使用 TaskBuilder
按名称检索现有的 Task 实例,而不是创建新的 Task
Optional<Task> task = Task.builder(dataflowOperations).findByName("myComposedTask");
您还可以列出所有现有任务
List<Task> tasks = Task.builder(dataflowOperations).allTasks();
使用 Task
实例,您可以使用可用的方法来 启动
或 销毁
任务。以下示例启动了该任务
long executionId = task.launch();
executionId
是已启动任务的唯一任务执行标识符。launch
方法被重载以接受启动属性的 java.util.Map<String, String>
和命令行参数的 java.util.List<String>
。
任务是异步运行的。如果您的用例要求您等待任务完成或其他任务状态,您可以使用 Java 并发实用程序或 Awaitility
库,如下所示
org.awaitility.Awaitility.await().until(
() -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);
Task
实例提供了 executionStatus
、destroy
和 stop
方法来控制和查询任务。
Collection<TaskExecutionResource> executions()
方法列出由任务启动的所有 TaskExecutionResource
实例。您可以使用 executionId
检索特定执行的 TaskExecutionResource
(Optional<TaskExecutionResource> execution(long executionId)
)。
类似地,Collection<JobExecutionResource> jobExecutionResources()
和 Collection<JobInstanceResource> jobInstanceResources()
将允许您在任务使用 Spring Batch 作业时对其进行内省。
任务计划 DSL 用法
考虑以下示例,该示例创建一个新任务并对其进行调度
Task task = Task.builder(dataflowOperations)
.name("myTask")
.definition("timestamp")
.description("simple task")
.build();
TaskSchedule schedule = TaskSchedule.builder(dataflowOperations)
.scheduleName("mySchedule")
.task(task)
.build();
TaskSchedule.builder(dataFlowOperations)
方法返回 TaskScheduleBuilder
类。
build
方法返回一个 TaskSchedule
实例,名为 mySchedule
,它配置了一个调度实例。此时,尚未创建调度。
您可以使用 schedule()
方法创建调度
schedule.schedule("56 20 ? * *", Collections.emptyMap());
您还可以使用 unschedule()
方法删除它
schedule.unschedule();
您可以使用 TaskScheduleBuilder
检索一个或所有现有的调度程序
Optional<TaskSchedule> retrievedSchedule =
taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());
List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);
设置部署属性
本节介绍如何为 Task
和 TaskScheduler
设置部署属性。
对 Task
使用 DeploymentPropertiesBuilder
launch(Map<String, String> properties, List<String> arguments)
方法允许您自定义任务的启动方式。我们通过使用构建器样式并为某些属性创建静态方法,使创建具有属性的映射变得更容易,因此您无需记住属性的名称
Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
.memory("myTask", 512)
.put("app.timestamp.timestamp.format", "YYYY")
.build();
Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
if(task.isPresent()) {
long executionId = task.get().launch(taskLaunchProperties, Collections.EMPTY_LIST);
}
为 TaskSchedule
设置属性
同样,您可以为 TaskSchedule
实例设置部署属性
Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);