什么是组合任务?

组合任务是一个有向图,其中图的每个节点都是一个任务应用程序。Spring Cloud Data Flow 允许您通过其基于浏览器的 UI、shell 或 RESTful API 创建组合任务。在本节中,我们将向您展示如何创建和管理组合任务。

组合任务 101

在介绍如何创建和管理组合任务之前,我们需要讨论一个场景,我们希望在该场景中启动一系列任务定义。
为了便于讨论,假设我们要启动标识为 task-atask-btask-c 的任务定义。例如,我们启动 task-a,如果 task-a 成功完成,我们希望启动 task-b。一旦 task-b 成功完成,我们希望启动 task-c。在这种情况下,图形将如下所示

Composed Task Graph img.png

可以使用 Spring Cloud Data Flow 的任务定义 DSL 来表示前面的图表,如下所示

task-a && task-b && task-c

前面 DSL 中的 && 表示 && 左侧的任务定义必须成功完成,然后才能启动流程中的下一个任务定义。

创建前面的组合任务定义后,您可以像启动常规任务定义一样启动它。在后台,Spring Cloud Data Flow 会启动 Composed Task Runner 应用程序来管理组合任务图的执行。它通过解析 Spring Cloud Data Flow 任务定义,然后向 Spring Cloud Data Flow 服务器发出 RESTful API 调用来启动任务定义来实现此目的。每个任务完成后,它都会启动下一个任务定义。在以下部分中,我们将向您展示如何创建自己的组合任务图,以探索创建组合任务流的各种方法。

配置 Spring Cloud Data Flow 启动组合任务

如前所述,Composed-Task-Runner 是一个应用程序,用于管理组合任务图中任务的执行。因此,在创建组合任务之前,我们需要配置 Spring Cloud Data Flow 以正确启动 Composed Task Runner。

配置数据流以启动组合任务运行器

启动组合任务时,Spring Cloud Data Flow 会将属性传递给 Composed-Task-Runner,以便它可以正确执行有向图。为此,您必须配置 Spring Cloud Data Flow 的 dataflow.server.uri 属性,以便 Composed Task Runner 可以向正确的 SCDF 服务器发出 RESTful API 调用

  • dataflow.server.uri:Spring Cloud Data Flow 服务器的 URI,Composed Task Runner 使用它来执行其 RESTful API 调用。它默认为 https://localhost:9393
  • spring.cloud.dataflow.task.composedtaskrunner.uri:建立 Spring Cloud Data Flow 获取 Composed Task Runner 工件的位置。默认情况下,Spring Cloud Data Flow 会从 Maven Central 检索 localCloud Foundry 平台的工件。对于 Kubernetes,它从 DockerHub 获取工件。
  • spring.cloud.dataflow.task.composedtaskrunner.imagePullSecret:如果在 Kubernetes 中运行并且 Composed Task Runner 映像位于需要身份验证的存储库中,则可以配置一个包含在提取映像时使用的凭据的密钥。属性值是必须先创建的已配置密钥的名称。请按照从私有注册表中提取映像指南创建密钥。
  • `maximumConcurrentTasks` - Spring Cloud Data Flow 允许用户限制每个已配置平台上并发运行任务的最大数量,以防止 IaaS/硬件资源饱和。所有受支持平台的默认限制设置为 `20`。如果平台实例上并发运行的任务数大于或等于限制,则下一个任务启动请求将失败,并通过 RESTful API、Shell 和 UI 返回错误消息。您可以通过将相应的部署程序属性设置为最大并发任务数来配置平台实例的此限制。

    spring.cloud.dataflow.task.platform.<platform-type>.accounts[<account-name>].deployment.maximumConcurrentTasks`

    `` 是已配置平台帐户的名称(如果没有显式配置帐户,则为 `default`)。`` 指的是当前支持的部署程序之一:`local`、`cloudfoundry` 或 `kubernetes`。

更改此属性将需要重新启动 Spring Cloud Data Flow。

注册示例应用程序

在使用以下组合任务示例之前,您必须先注册示例中使用的示例应用程序。因此,在本指南中,您需要使用以下名称多次注册时间戳应用程序:`task-a`、`task-b`、`task-c`、`task-d`、`task-e` 和 `task-f`。

Spring Cloud Data Flow 支持 Maven、HTTP、文件和 Docker 资源进行本地部署。对于本地部署,我们使用 Maven 资源。Maven 工件的 URI 通常采用以下格式:`maven://<groupId>:<artifactId>:<version>`。示例应用程序的 Maven URI 如下所示:

maven://io.spring:timestamp-task:2.0.2

`maven:` 协议指定了一个 Maven 工件,该工件通过使用为 Data Flow 服务器配置的远程和本地 Maven 存储库进行解析。要注册应用程序,请单击页面左侧的 **应用程序** 选项卡。选择 **添加应用程序** 和 **注册一个或多个应用程序**。对于 `task-a`,请填写表单,如下图所示,然后单击 **注册应用程序**。

Register the  transition sample

对 `task-a`、`task-b`、`task-c`、`task-d`、`task-e` 和 `task-f` 重复此注册过程,同时使用相同的 URI:`maven://io.spring:timestamp-task:2.0.2`。

Spring Cloud Data Flow 支持 Maven、HTTP 和 Docker 资源进行本地部署。对于 Cloud Foundry,我们使用 HTTP(实际上是 HTTPS)资源。HTTPS 资源的 URI 采用以下格式:`https://<web-path>/<artifactName>-<version>.jar`。然后,Spring Cloud Data Flow 从 HTTPS URI 中提取工件。

示例应用程序的 HTTPS URI 如下所示:

https://repo.spring.io/libs-snapshot/io/spring/timestamp-task/2.0.2/timestamp-task-2.0.2.jar

要注册应用程序,请单击页面左侧的 **应用程序** 选项卡。选择 **添加应用程序** 和 **注册一个或多个应用程序**。填写表单,如下图所示,然后单击 **注册应用程序**。

注册转换示例 task-atask-btask-ctask-dtask-etask-f 重复此注册过程,并使用相同的 URI。

https://repo.spring.io/libs-snapshot/io/spring/timestamp-task/2.0.2/timestamp-task-2.0.2.jar

Spring Cloud Data Flow 支持用于 Kubernetes 部署的 Docker 资源。Docker 镜像的 URI 格式为 docker:<docker-image-path>/<imageName>:<version>,并使用为 Data Flow 任务平台配置的 Docker 注册表和镜像拉取策略进行解析。

示例应用程序的 Docker URI 如下所示

docker:springcloudtask/timestamp-task:2.0.2

要注册应用程序,请单击页面左侧的 **应用程序** 选项卡。选择 **添加应用程序** 和 **注册一个或多个应用程序**。填写表单,如下图所示,然后单击 **注册应用程序**。

Register the transition sample

task-atask-btask-ctask-dtask-etask-f 重复此注册过程,并使用相同的 URI:docker:springcloudtask/timestamp-task:2.0.2

如果对于给定的 Spring Cloud Data Flow 部署无法访问 Maven Central 或 DockerHub,则可以使用 spring.cloud.dataflow.task.composed.task.runner.uri 属性指定不同的 URI 来检索组合任务运行器。

转换示例项目

为了探索通过组合任务图可用的某些流,我们需要一个应用程序,该应用程序允许我们在启动时配置其退出状态。此 transition-sample 使我们能够通过组合任务图探索各种流。

从 Github 获取转换示例项目

我们需要从 Github 拉取项目

  1. 打开终端会话。
  2. 选择您要克隆项目的目录作为工作目录。
  3. 在工作目录中,运行以下 git 命令

    git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples.git
  4. 进入 spring-cloud-dataflow-samples/transition-sample 目录

    cd spring-cloud-dataflow-samples/transition-sample

构建过渡示例项目

要构建应用程序,请使用以下命令

./mvnw clean install

要构建 Docker 镜像,请使用以下命令

./mvnw dockerfile:build

注册过渡示例

对于本地部署,Spring Cloud Data Flow 支持 Maven、HTTP、文件和 Docker 资源。在本例中,我们使用 Maven 资源。Maven 工件的 URI 通常采用以下格式 maven://<groupId>:<artifactId>:<version>。示例应用程序的 Maven URI 如下所示

maven://io.spring:transition-sample:1.0.0.BUILD-SNAPSHOT

maven: 协议指定了一个 Maven 工件,该工件通过使用为 Data Flow 服务器配置的远程和本地 Maven 存储库来解析。要注册应用程序,请单击页面左侧的“**应用程序**”选项卡。选择“**添加应用程序**”和“**注册一个或多个应用程序**”。填写表单,如下图所示,然后单击“**注册应用程序**”。

Register the  transition sample

对于本地部署,Spring Cloud Data Flow 支持 Maven、HTTP 和 Docker 资源。在本例中,我们使用 HTTP(实际上是 HTTPS)资源。HTTPS 资源的 URI 采用以下格式 https://<web-path>/<artifactName>-<version>.jar。然后,Spring Cloud Data Flow 从 HTTPS URI 中提取工件。

示例应用程序的 HTTPS URI 如下所示

http://<path to your jar>:transition-sample:1.0.0.BUILD-SNAPSHOT

要注册应用程序,请单击页面左侧的 **应用程序** 选项卡。选择 **添加应用程序** 和 **注册一个或多个应用程序**。填写表单,如下图所示,然后单击 **注册应用程序**。

Register the transition sample

对于 Kubernetes 部署,Spring Cloud Data Flow 支持 Docker 资源。Docker 镜像的 URI 采用以下格式 docker:<docker-image-path>/<imageName>:<version>,并使用为 Data Flow 任务平台配置的 Docker 注册表和镜像拉取策略进行解析。

示例应用程序的 Docker URI 如下所示

docker:springcloud/transition-sample:latest

要注册应用程序,请单击页面左侧的 **应用程序** 选项卡。选择 **添加应用程序** 和 **注册一个或多个应用程序**。填写表单,如下图所示,然后单击 **注册应用程序**。

Register the transition sample

构建组合任务

本节探讨 Spring Cloud Data Flow 支持的三种基础结构

  • 条件执行
  • 过渡执行
  • 拆分执行

条件执行

条件执行使用双与符号 && 表示。这使得序列中的每个任务仅在前一个任务成功完成后才会启动。

创建条件执行组合任务定义

要使用 Spring Cloud Data Flow UI 创建条件执行,请单击仪表板左侧的**任务**选项卡,然后按页面顶部的**创建任务**按钮。现在复制以下表达式并将其粘贴到页面顶部文本框中

task-a && task-b

您可以在仪表板中看到出现的图形,如下图所示: 条件执行流程

在前面的示例中,请注意我们使用了 task-atask-b 标签。这是必要的,因为我们在同一个图中使用了两个时间戳应用程序。

现在按下页面底部的 创建任务 按钮。将出现一个对话框,要求您**确认任务创建**。为此,请在 名称 字段中输入 conditional-execution 作为组合任务名称,然后单击**创建任务**按钮,如下图所示: 条件执行创建

现在将显示“任务定义”页面,您可以看到创建了三个任务定义,如下图所示: 条件执行任务定义列表

  1. conditional-execution 任务定义是管理有向图执行的 Composed-Task-Runner 应用程序。
  2. conditional-execution-task-a 是表示您之前输入的 DSL 中定义的 task-a 应用程序的任务定义。
  3. conditional-execution-task-b 是表示您之前输入的 DSL 中定义的 task-b 应用程序的任务定义。

启动条件执行组合任务定义

To launch the composed task, click the dropdown icon to the left of the task definition named conditional-execution and select the Launch option, as the following image shows: Conditional Execution Task Definition Launch Now the task launch page appears. Since we are using app defaults, we merely need to press the LAUNCH TASK button, as the following image shows: Conditional Execution Task Definition Launch When the composed task called conditional-execution is launched, it launches the task called conditional-execution-task-a, and, if it completes successfully, the task called conditional-execution-task-b is launched. If conditional-execution-task-a fails, conditional-execution-task-b does not launch.

检查条件执行组合任务定义的状态

现在我们已经执行了 conditional-execution 任务定义,我们可以检查任务执行状态。为此,请单击“任务”页面顶部的“执行”选项卡。在这里,我们可以看到 conditional-execution (Composed-Task-Runner) 成功启动了每个子应用程序(conditional-execution-task-aconditional-execution-task-b),如下图所示: 条件执行流程

过渡执行

通过转换,您可以指定希望流程遵循的树分支。任务转换由以下符号表示 ->。为了演示,我们创建一个基本的转换图。

创建基本转换任务定义

要使用 Spring Cloud Data Flow UI 创建基本转换,请单击仪表板左侧的 **任务** 选项卡,然后按页面顶部的 **创建任务** 按钮。现在,复制以下表达式并将其粘贴到位于页面顶部的文本框中

transition-sample 'FAILED' -> task-a 'COMPLETED' -> task-b

它应该如下所示: Transition Execution Flow

您可以使用 Spring Cloud Data Flow UI 的拖放功能来绘制图形,而不是使用 DSL。

现在图形已经呈现出来,我们可以深入了解细节。第一个要启动的应用程序是 transition-sample。由于 transition-sample 是一个 Spring Cloud Task 应用程序,因此 Spring Cloud Task 会在执行结束时将退出消息记录到数据库中。此消息将具有以下值之一

  • COMPLETED:任务已成功完成。
  • FAILED:任务在其执行过程中失败。
  • 自定义退出消息:Spring Cloud Task 应用程序可以返回自定义退出消息,如 Spring Cloud Task 文档 中所述。

transition-sample 应用程序执行完成后,组合任务运行器会检查 transition-sample 的退出消息,然后评估它应该采用哪条路径。在我们的例子中,它有两条路径(由 -> 运算符表示)。

  • FAILED:如果 transition-sample 返回 FAILED,则执行标记为 task-atimestamp 应用程序。
  • COMPLETED:如果 transition-sample 返回 COMPLETED,则执行标记为 task-btimestamp 应用程序。

现在按下页面底部的 **创建任务** 按钮。此时会出现一个对话框,要求您 **确认任务创建**。为此,请在 **名称** 字段中输入 basictransition 作为组合任务名称,然后点击 **创建任务** 按钮,如下图所示: Transition Execution Flow

启动组合任务定义

现在我们可以多次启动组合任务,以便我们可以通过树形结构来测试它的路径。

首先,我们可以看看如果我们将退出消息设置为 FAILED 会发生什么。为此,请选择要执行的 basictransition 组合任务运行器,如下图所示: Transition Execution Flow Launch 现在,从任务启动页面,使用以下内容填充页面

First set the interval between checks for the composed task runner to 1000 milliseconds. This is done by clicking the EDIT button on the CTR properties under the Global column as shown below: Transition Execution Set Interval Time Prop Now enter 1000 in the interval-time-between-checks field Transition Execution Set Interval Time Prop Set Click the UPDATE button.

现在让我们将转换应用程序设置为返回退出消息 FAILED。这可以通过单击 transition-sample 列下 应用程序属性 上的 编辑 按钮来完成。更新对话框出现后,在 exit-message 行中输入 FAILED,如下所示: Transition Execution app Prop Set

点击 更新 按钮。

现在点击 启动任务 按钮。现在它已经执行,我们可以验证是否确实遵循了 FAILED 路径。我们可以通过单击任务页面左侧的 **任务执行** 选项卡来完成此操作: Transition Execution Flow Launch-List

这样做向我们展示了控制组合任务执行 basic-transition 的 组合任务运行器 已启动,并且 transition-sample 已启动。从那里,执行了 FAILED 分支,如 basictransition-task-a 已启动 所示。

现在重新启动 组合任务运行器 并将 exit-message 设置为 COMPLETED 以执行另一个分支。为此,请选择要执行的 basictransition,如下图所示: Transition Execution Flow Launch

您可以通过点击 transition-sample 列下 应用程序属性 上的 编辑 按钮来完成此操作。更新对话框出现后,在 exit-message 行中输入 COMPLETED,如下所示: Transition Execution app Prop Set

点击 更新 按钮。

点击 启动任务 按钮。

现在它已经执行完毕,我们可以验证是否遵循了 COMPLETED 路径。您可以通过点击页面左侧的 任务执行 选项卡来完成此操作: Transition Execution Flow Launch-CompleteList

转换还有更多状态吗?

如果我为退出消息输入 FOO 会发生什么?

为此,请选择要执行的 basictransition 组合任务运行器,如下图所示: Transition Execution Flow Launch 启动页面出现后,单击 transition-sample 列下 Application properties 上的 EDIT 按钮。更新对话框出现后,在 exit-message 行中输入 FOO,如下所示

Transition Execution Flow Launch-Config-FOO 点击 更新 按钮。

点击 启动任务 按钮。

现在它已经执行完毕,我们可以验证是否实际遵循了路径 FOO。为此,请单击页面左侧的**任务执行**选项卡: Transition Execution Flow Launch-FOO-LIST

在这种情况下,我们看到组合任务最终只运行了 Composed Task Runner 和转换示例。这是因为没有以 FOO 为目标。我们如何处理这种情况——也就是说,为 COMPLETEDFAILED 和其他所有情况都设置路径?

在这种情况下,我们希望使用通配符创建另一个组合任务。

要使用 Spring Cloud Data Flow UI 创建基本转换,请单击仪表板左侧的**任务**选项卡,然后按页面顶部的**创建任务**按钮。现在复制以下表达式并将其粘贴到页面顶部文本框中

transition-sample 'FAILED' -> task-a 'COMPLETED' -> task-b '*' -> task-c

它应该如下所示: Transition Execution Foo_Flow 现在按下页面底部的**创建任务**按钮。 现在会出现一个对话框,要求您**确认任务创建**。 为此,请在名称字段中输入anothertransition作为组合任务名称,然后单击**创建任务**按钮,如下图所示: Transition Execution Foo_Flow_Create

为此,请选择要执行的anothertransition 组合任务运行器,如下图所示: Transition Execution Flow Launch-Another 从任务启动页面,使用以下内容填充页面

参数

--increment-instance-enabled=true
--interval-time-between-checks=1000

参数

app.anothertransition.transition-sample.taskapp.exitMessage=FOO

当启动页面出现时,点击 transition-sample 列下 Application properties 上的 EDIT 按钮。当更新对话框出现时,在 exit-message 行输入 FOO,如下所示

Transition Execution Flow Launch-Config-FOO 点击 更新 按钮。

点击 启动任务 按钮。

现在验证路径 FOO 是否真的被遵循。为此,请点击任务页面顶部的 **Executions** 选项卡: Transition Execution Flow Launch-FOO-success-LIST

在这种情况下,我们看到通配符捕获了所有其他退出消息。我们可以通过看到 anothertransition-task-c 已启动来验证。

拆分执行

如果我们想同时执行多个任务怎么办?组合任务 DSL 支持拆分的概念,让你可以做到这一点。任务定义 DSL 支持拆分的概念,它允许你同时启动多个任务应用程序。每个拆分都包含一个任务列表,这些任务包含在小于号 < 和大于号 > 符号之间,并由两个竖线符号 (||) 分隔。
例如,如果我们想同时启动三个任务,DSL 将如下所示

<task-a || task-b || task-c>

现在我们可以创建一个包含拆分和转换的组合任务,以展示其可能性。要使用 Spring Cloud Data Flow UI 创建拆分图示例,请点击仪表板左侧的 **Tasks** 选项卡,然后点击页面顶部的 **CREATE TASK** 按钮。现在复制下面的表达式并将其粘贴到页面顶部的文本框中

<task-a || task-b || task-c>  && transition-sample 'FAILED' -> task-d 'COMPLETED' -> task-e '*' -> task-f

它应该如下所示: Transition Execution Split_Flow 现在点击页面底部的**创建任务**按钮。 现在会弹出一个对话框,要求您**确认创建任务**。 为此,请在名称字段中输入splitgraph作为组合任务名称,然后按**创建任务**按钮,如下图所示: Transition Execution Split_Flow_Create

Select the splitgraph Composed Task Runner to be executed, as the following image shows: Transition Execution Flow SplitLaunch From the task launch page let's configure the composed task runner. This is done by clicking the EDIT button on the CTR properties under the Global column as shown below: Transition Execution Set Interval Time Prop Now:

  • interval-time-between-checks字段中输入1000
  • thread-core-pool-size字段中输入4
  • closecontext-enabled字段中输入true

它应该类似于以下内容: Transition Execution Set Interval Time Prop Set 点击更新按钮。

点击transition-sample列下应用程序属性上的编辑按钮。出现更新对话框后,在 exit-message 行中输入FOO,如下所示

Transition Execution Flow Launch-Config-FOO 点击 更新 按钮。

点击 启动任务 按钮。

验证所有任务是否已启动以及是否实际遵循了路径FOO。为此,请单击任务页面左侧的**任务执行**选项卡: Transition Execution Flow Launch-split-LIST

在本例中,我们可以看到在 CTR 启动我们的过渡应用程序之前,splitgraph-task-asplitgraph-task-bsplitgraph-task-c 同时被触发。我们还添加了一个新参数:--split-thread-core-pool-size=4。它基本上表明组合任务运行器可以同时运行四个应用程序。

参数和属性

同样,我在命令行中输入的所有内容是什么?因此,在本例中,我们想展示如何同时使用命令行参数和属性。我们使用参数来为 组合任务运行器 建立属性。

  1. interval-time-between-checks=1000 表示 组合任务运行器 将在每次检查之间等待一秒钟,以确保任务完成(默认值为 10 秒)。
  2. split-thread-core-pool-size=4 表示我们希望同时运行最多四个任务。
  3. closecontext-enabled=true 表示我们希望在 组合任务运行器 时关闭 Spring 上下文。

使用 split 时,必须像上面显示的那样设置 spring.cloud.task.closecontext-enabled 属性。

配置拆分

在上一节所示的示例中,我们通过使用 spring.cloud.task.closecontext-enabledsplit-thread-core-pool-size 属性在组合任务中配置了拆分的行为。您还可以在使用拆分时使用以下属性

  • spring.cloud.task.closecontext-enabled:使用拆分时,需要将此属性设置为 true,否则上下文不会关闭(因为已分配线程来支持拆分)。
  • split-thread-core-pool-size:建立组合任务中拆分所需的初始线程数。拆分中包含的每个任务应用程序都需要一个线程才能执行。(默认为 1)
  • split-thread-max-pool-size:要分配的最大线程数。
  • split-thread-queue-capacity:如果所有线程都在使用中,则在分配新线程之前应排队的任务数。

基本拆分大小

拆分最简单的配置是设置 split-thread-core-pool-size 属性。您需要查看图形并计算具有最多任务应用程序的拆分。这是您需要的线程数。要设置线程数,请使用 split-thread-core-pool-size 属性(默认为 1)。因此,例如,像这样的定义:<AAA || BBB || CCC> && <DDD || EEE> 将需要 split-thread-core-pool-size 为 3。这是因为最大的拆分包含三个任务应用程序。计数为 2 意味着 AAA 和 BBB 将并行运行,但 CCC 将等到 AAA 或 BBB 完成。然后 DDD 和 EEE 将并行运行。

当任务应用程序失败时重启组合任务运行器

如果任务应用程序失败,Spring Cloud Data Flow 中的组合任务允许您重新启动失败的组合任务。当应用程序返回非零 exitCode 时,工作流中的任务应用程序被视为失败。

检测失败的组合任务

启动组合任务后,名为组合任务运行器的应用程序将管理组合任务的执行。由于组合任务运行器是使用 Spring Batch 构建的,因此 Spring Cloud Data Flow 使用“作业执行”页面来跟踪组合任务执行的成功或失败。

如果管理工作流的组合任务作业失败,则命令行运行器的关联退出代码将为 0。这是批处理作业的默认启动行为。但是,如果在组合作业失败时需要退出代码 1,则将组合任务运行器的 spring.cloud.task.batch.fail-on-job-failure 属性设置为 true

例子

在以下示例中,我们有一个简单的条件执行组合任务

task-a && task-b && task-c

假设我们创建了一个名为 my-composed-task 的组合任务,现在我们想使用 UI 启动它

  1. 播放 按钮启动它,如下图所示: 重启组合任务
  2. 出现启动页面时,按 启动任务 按钮。
  3. my-composed-task 完成执行后,我们可以看到 task-b 被标记为 ERROR,这意味着应用程序返回了一个非零的 exitCode。我们可以通过单击页面顶部的 **Executions** 选项卡并查看任务执行来验证这一点。请注意,my-composed-task-task-b 已被标记为退出代码 1。这意味着此任务应用程序返回了一个非零退出代码,从而停止了组合任务的执行。
    Restart_Composed_Task_Failed_Child 一旦我们解决了导致失败的问题,我们就可以重新启动 my-composed-task,组合任务运行器会识别失败的任务应用程序并重新运行它,然后从该点继续执行 DSL。
  4. 按下页面左侧的 Jobs 选项卡。
  5. 现在,按下失败的 my-composed-task 旁边的下拉按钮,并选择**重启作业**,如下图所示: 重启组合任务作业
  6. 组合任务完成后,我们会看到 my-composed-task 的一个新作业,其状态显示为 COMPLETED(已完成)。 重启组合任务完成
  7. 现在,点击页面左侧的 **Tasks(任务)** 选项卡,当出现 Task Definition(任务定义)页面时,点击顶部的 **Executions(执行)** 选项卡。请注意,您有两个 Composed Task Runs(组合任务运行)。第一个是失败的组合任务执行,其中 task-b 失败。然后,在第二次执行中,我们看到 my-composed-task Composed Task Runner(组合任务运行器) 从失败的任务应用程序(task-b)开始图形,并完成了组合任务,如下图所示: 重启组合任务执行

传递属性

Spring Cloud Data Flow 允许您将应用程序和部署属性传递给 Composed Task Runner(组合任务运行器) 以及图形中的任务应用程序。

将属性传递给图形中的任务

您可以通过两种方式为图形中的任务设置属性

  • 在任务定义中设置属性。
  • 在组合任务启动时设置属性。

在任务定义中设置属性

您可以在编写组合任务定义时设置属性。您可以通过在组合任务定义中任务应用程序名称的右侧添加 -- 令牌,后跟属性来实现。以下示例展示了如何执行此操作

task-a --myproperty=value1 --anotherproperty=value2 && task-b --mybproperty=value3

在前面的示例中,task-a 设置了两个属性,task-b 设置了一个属性。

在组合任务启动时设置属性

如前几节所示,可以使用任务启动页面上的 builder(构建器) 选项卡设置部署和应用程序属性。但是,如果您希望使用文本设置这些属性,则可以点击 Freetext(自由文本) 选项卡。

该属性由三个组件组成

  • 属性类型:告诉 Spring Cloud Data Flow 该属性是 deployment 类型还是 app 类型。

    • 部署属性:负责部署任务应用程序的部署程序的说明。
    • 应用程序属性:直接传递给任务应用程序的属性。
  • 任务应用程序名称:应应用该属性的应用程序的标签或名称。
  • 属性键:要设置的属性的键。

以下示例为名为 my-composed-task 的组合任务中的名为 task-a 的任务应用程序设置 myproperty 属性,用于以下 dsl

task-a && task-b

它看起来像这样: 属性图

类似地,如果我们要传递部署程序属性,则格式将保持不变,只是属性类型将为 deployer。例如,我们需要为 task-a 设置 kubernetes.limits.cpu

    deployer.task-a.kubernetes.limits.cpu=1000m

通过使用 UI 上的 Freetext 选项卡,可以通过以下方式启动组合任务并设置 appdeployer 属性

  1. 启动组合任务,如下图所示,方法是按下需要启动的组合任务定义旁边的 启动 选项: 指定要启动的组合任务
  2. 属性 文本框中设置以下属性: 启动组合任务
  3. 现在按下 **启动任务** 按钮。

启动时设置的属性优先于任务定义中设置的属性。例如,如果属性 myproperty 已在组合任务定义和启动时设置,则将使用启动时设置的值。

在组合任务启动时设置参数

如前几节所示,可以使用任务启动页面上的 builder 选项卡设置参数。但是,如果您希望使用文本设置这些参数,则可以单击 Freetext 选项卡。

该属性由三个组件组成

  • 参数类型:这将始终是 app 类型。
  • 任务应用程序名称:应应用该属性的应用程序的标签或名称。
  • 索引:参数应出现的从零开始的位置。

以下示例为名为 my-composed-task 的组合任务中的名为 task-a 的任务应用程序设置 myargumentAmyargumentB 参数,用于以下 dsl

task-a && task-b

它看起来像这样: 属性图

通过使用 UI 上的 Freetext 选项卡,可以通过以下方式启动组合任务并设置参数

  1. 启动组合任务,如下图所示,方法是按下需要启动的组合任务定义旁边的 启动 选项: 指定要启动的组合任务
  2. Arguments 文本框中按如下方式设置参数: 启动组合任务
  3. 现在按下**启动任务**按钮。

传递属性到组合任务运行器

该属性由三个组件组成

  • 属性类型:告诉 Spring Cloud Data Flow 该属性是 部署 类型还是 应用 类型。

    • 部署属性:负责部署任务应用程序的部署程序的说明。
    • 应用属性:直接传递给任务应用的属性。
  • 组合任务应用程序名称:组合任务运行器应用的名称。
  • 属性键:要设置的属性的键。

我们可以启动一个组合任务,在该任务中,我们希望将以下属性传递给 组合任务运行器

  • increment-instance-enabled:一个 应用 属性,允许单个 组合任务运行器 实例在不更改参数的情况下重新执行。
  • kubernetes.limits.cpu:一个 部署器 属性,用于设置组合任务运行器的 Kubernetes CPU 限制。启动组合任务并为 组合任务运行器 设置 应用部署器 属性,可以通过以下方式使用 UI 完成
  • 启动组合任务,如下图所示,方法是按下需要启动的组合任务定义旁边的 播放 按钮: 指定要启动的组合任务
  • properties 文本框中设置属性,如下图所示: 启动组合任务
  • 按下 **启动任务** 按钮。

使用 RESTful API 启动组合任务

在本节中,我们将提供一个如何创建和启动组合任务的示例。

在本例中,我们要创建一个名为 my-composed-task 的组合任务,并使用以下组合任务定义:

task-a && task-b

使用 curl,命令如下所示:

curl 'http://localhost:9393/tasks/definitions' --data-urlencode "name=my-composed-task" --data-urlencode "definition=task-a && task-b"

Spring Cloud Data Flow 服务器的响应如下所示:

HTTP/1.1 200
Content-Type: application/hal+json
Transfer-Encoding: chunked
Date: Fri, 17 Jan 2020 16:19:04 GMT

{"name":"my-composed-task","dslText":"task-a && task-b","description":"","composed":true,"lastTaskExecution":null,"status":"UNKNOWN","_links":{"self":{"href":"http://localhost:9393/tasks/definitions/my-composed-task"}}}

要验证是否已创建 my-composed-task 组合任务,我们可以执行 curl 列表命令:

curl 'http://localhost:9393/tasks/definitions?page=0&size=10&sort=taskName' -i -X GET

Spring Cloud Data Flow 服务器的响应如下所示:

HTTP/1.1 200
Content-Type: application/hal+json
Transfer-Encoding: chunked
Date: Fri, 17 Jan 2020 16:24:39 GMT

{"_embedded":{"taskDefinitionResourceList":[{"name":"my-composed-task","dslText":"task-a && task-b","description":"","composed":true,"lastTaskExecution":null,"status":"UNKNOWN","_links":{"self":{"href":"http://localhost:9393/tasks/definitions/my-composed-task"}}},{"name":"my-composed-task-task-a","dslText":"task-a","description":null,"composed":false,"lastTaskExecution":null,"status":"UNKNOWN","_links":{"self":{"href":"http://localhost:9393/tasks/definitions/my-composed-task-task-a"}}},{"name":"my-composed-task-task-b","dslText":"task-b","description":null,"composed":false,"lastTaskExecution":null,"status":"UNKNOWN","_links":{"self":{"href":"http://localhost:9393/tasks/definitions/my-composed-task-task-b"}}}]},"_links":{"self":{"href":"http://localhost:9393/tasks/definitions?page=0&size=10&sort=taskName,asc"}},"page":{"size":10,"totalElements":3,"totalPages":1,"number":0}}

我们可以使用以下属性为 task-a 启动 my-composed-task

  • app.task-a.my-prop=good
  • app.task-b.my-prop=great

运行以下 curl 命令以启动任务:

curl 'http://localhost:9393/tasks/executions' -i -X POST -d 'name=my-composed-task&properties=app.task-a.my-prop=good,%20app.task-b.my-prop=great'

Spring Cloud Data Flow 服务器的响应如下所示:

HTTP/1.1 201
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 17 Jan 2020 16:33:06 GMT

要验证 my-composed-task 组合任务是否已执行,我们可以执行 curl 列表命令:

curl 'http://localhost:9393/tasks/executions?page=0&size=10' -i -X GET

Spring Cloud Data Flow 服务器的响应如下所示:

HTTP/1.1 200
Content-Type: application/hal+json
Transfer-Encoding: chunked
Date: Fri, 17 Jan 2020 16:35:42 GMT

{"_embedded":{"taskExecutionResourceList":[{"executionId":285,"exitCode":0,"taskName":"my-composed-task-task-b","startTime":"2020-01-17T11:33:24.000-0500","endTime":"2020-01-17T11:33:25.000-0500","exitMessage":null,"arguments":["--spring.cloud.task.parent-execution-id=283","--spring.cloud.data.flow.platformname=default","--spring.cloud.task.executionid=285"],"jobExecutionIds":[],"errorMessage":null,"externalExecutionId":"my-composed-task-task-b-217b8de4-8877-4350-8cc7-001a4347d3b5","parentExecutionId":283,"resourceUrl":"URL [file:////Users/glennrenfro/project/spring-cloud-dataflow-samples/pauseasec/target/pauseasec-1.0.0.BUILD-SNAPSHOT.jar]","appProperties":{"spring.datasource.username":"******","my-prop":"great","spring.datasource.url":"******","spring.datasource.driverClassName":"org.mariadb.jdbc.Driver","spring.cloud.task.name":"my-composed-task-task-b","spring.datasource.password":"******"},"deploymentProperties":{"app.task-b.my-prop":"great"},"taskExecutionStatus":"COMPLETE","_links":{"self":{"href":"http://localhost:9393/tasks/executions/285"}}},{"executionId":284,"exitCode":0,"taskName":"my-composed-task-task-a","startTime":"2020-01-17T11:33:15.000-0500","endTime":"2020-01-17T11:33:15.000-0500","exitMessage":null,"arguments":["--spring.cloud.task.parent-execution-id=283","--spring.cloud.data.flow.platformname=default","--spring.cloud.task.executionid=284"],"jobExecutionIds":[],"errorMessage":null,"externalExecutionId":"my-composed-task-task-a-0806d01f-b08a-4db5-a4d2-ab819e9df5df","parentExecutionId":283,"resourceUrl":"io.spring:timestamp-task:jar:2.0.2","appProperties":{"spring.datasource.username":"******","my-prop":"good","spring.datasource.url":"******","spring.datasource.driverClassName":"org.mariadb.jdbc.Driver","spring.cloud.task.name":"my-composed-task-task-a","spring.datasource.password":"******"},"deploymentProperties":{"app.task-a.my-prop":"good"},"taskExecutionStatus":"COMPLETE","_links":{"self":{"href":"http://localhost:9393/tasks/executions/284"}}},{"executionId":283,"exitCode":0,"taskName":"my-composed-task","startTime":"2020-01-17T11:33:12.000-0500","endTime":"2020-01-17T11:33:33.000-0500","exitMessage":null,"arguments":["--spring.cloud.data.flow.platformname=default","--spring.cloud.task.executionid=283","--spring.cloud.data.flow.taskappname=composed-task-runner"],"jobExecutionIds":[75],"errorMessage":null,"externalExecutionId":"my-composed-task-7a2ad551-a81c-46bf-9661-f9d5f78b27c4","parentExecutionId":null,"resourceUrl":"URL [file:////Users/glennrenfro/project/spring-cloud-task-app-starters/composed-task-runner/apps/composedtaskrunner-task/target/composedtaskrunner-task-2.1.3.BUILD-SNAPSHOT.jar]","appProperties":{"spring.datasource.username":"******","spring.datasource.url":"******","spring.datasource.driverClassName":"org.mariadb.jdbc.Driver","spring.cloud.task.name":"my-composed-task","composed-task-properties":"app.my-composed-task-task-a.app.task-a.my-prop=good, app.my-composed-task-task-b.app.task-b.my-prop=great","graph":"my-composed-task-task-a && my-composed-task-task-b","spring.datasource.password":"******"},"deploymentProperties":{"app.composed-task-runner.composed-task-properties":"app.my-composed-task-task-a.app.task-a.my-prop=good, app.my-composed-task-task-b.app.task-b.my-prop=great"},"taskExecutionStatus":"COMPLETE","_links":{"self":{"href":"http://localhost:9393/tasks/executions/283"}}}]},"_links":{"self":{"href":"http://localhost:9393/tasks/executions?page=0&size=10"}},"page":{"size":10,"totalElements":3,"totalPages":1,"number":0}}```

配置组合任务运行器

本节介绍如何配置组合任务运行器。

启用安全设置时启动组合任务

作为用户,在启用 Spring Cloud Data Flow 身份验证后,您有三种启动组合任务的选项

  • 基本身份验证:使用用户名和密码进行身份验证。
  • 用户配置的访问令牌:启动组合任务时,通过设置 dataflow-server-access-token 属性来提供您希望在启动时使用的令牌。
  • Data Flow 提供的用户令牌:如果 dataflow-server-use-user-access-token 属性设置为 true,Spring Cloud Data Flow 会使用当前登录用户的访问令牌自动填充 dataflow-server-access-token 属性。
  • 客户端凭据:启动组合任务时,通过客户端凭据获取访问令牌。

基本身份验证

此示例启动一个组合任务,用户需要在其中提供用户名和密码。 为此,请执行以下操作

  1. 启动组合任务,如下图所示,单击组合任务定义旁边的 启动 选项以启动: 设置用户访问令牌
  2. Now, from the task launch page, populate the dataflow-server-username and dataflow-server-password fields. This is done by clicking the EDIT button on the CTR properties under the Global column as shown below: Transition Execution Set Interval Time Prop Now enter the dataflow-server-username and dataflow-server-password in the appropriate fields Launch Task
  3. 点击 更新 按钮。
  4. 单击“启动任务”按钮以启动组合任务。

使用您自己的访问令牌

如果需要使用特定访问令牌启动组合任务,请使用 dataflow-server-access-token 属性传递令牌。 为此,请执行以下操作

  1. 启动组合任务,如下图所示,单击组合任务定义旁边的 启动 选项以启动: 设置用户访问令牌
  2. 现在,在任务启动页面,填写 dataflow-server-use-user-access-token 字段。可以通过点击 Global 列下 CTR properties 上的 EDIT 按钮来完成,如下所示: Transition Execution Set Interval Time Prop
  3. 现在在相应的字段中输入 dataflow-server-access-token Set User Access Token
  4. 单击“启动任务”按钮以启动组合任务。

用户访问令牌

在本例中,我们将启动一个组合任务,其中 dataflow-server-use-user-access-token 设置为 true。 为此

  1. 按下组合任务定义旁边的“启动”选项,启动组合任务,如下图所示: 设置用户访问令牌
  2. 现在,在任务启动页面中,选择“自由文本”,然后在参数字段中输入“dataflow-server-use-user-access-token”,如下所示: 启动任务
  3. 点击 更新 按钮。
  4. 单击“启动任务”按钮以启动组合任务。

客户端凭据

如果组合任务需要从 OAuth 身份验证服务获取其访问令牌,请使用以下属性

  • oauth2ClientCredentialsClientId
  • oauth2ClientCredentialsClientSecret
  • oauth2ClientCredentialsTokenUri
  • oauth2ClientCredentialsScopes
  • oauth2ClientCredentialsClientAuthenticationMethod

为此

  1. 启动组合任务,如下图所示,单击组合任务定义旁边的 启动 选项以启动: 设置用户访问令牌
  2. 现在,在任务启动页面中,填充 OAuth 客户端凭据属性。 这是通过单击 全局 列下 CTR 属性 上的 编辑 按钮完成的,如下所示: Transition Execution Set Interval Time Prop
  3. 现在在相应的字段中输入 oauth2ClientCredentialsClientIdoauth2ClientCredentialsClientSecretoauth2ClientCredentialsTokenUrioauth2ClientCredentialsScopesoauth2ClientCredentialsClientAuthenticationMethod 是可选的,默认为 CLIENT_SECRET_BASIC。 有关其他选项,请参阅 Spring Security OAuth2 的文档 Set User Access Token
  4. 单击“启动任务”按钮以启动组合任务。

注意:当使用客户端凭据(特别是 OAuth2 客户端 ID)时,以下属性将被忽略:dataflowServerUsernamedataflowServerPassworddataflowServerAccessToken

配置组合任务的 URI

启动组合任务时,Spring Cloud Data Flow 会启动 Composed Task Runner 应用程序来管理组合任务图的执行。它通过解析 Spring Cloud Data Flow 任务定义,然后向 Spring Cloud Data Flow 服务器发出 RESTful API 调用来启动任务定义。每个任务完成后,它会启动下一个任务定义。要设置 Composed Task Runner 用于发出 RESTful API 调用的 URI,您需要在 Spring Cloud Data Flow 服务器中设置 SPRING_CLOUD_DATAFLOW_SERVER_URI 属性。以下两个清单展示了如何执行此操作

  • Spring Cloud Data Flow 服务器的 Kubernetes 规范
apiVersion: apps/v1
kind: Deployment
metadata:
  ...
spec:
  ...
  template:
    ...
    spec:
      containers:
        env:
        ...
        - name: SPRING_CLOUD_DATAFLOW_SERVER_URI
          value: '<URI to your SCDF Server>'
        ...
  • Spring Cloud Data Flow 服务器的 Cloud Foundry 清单
---
applications:
  ...
  env:
    ...
    SPRING_CLOUD_DATAFLOW_SERVER_URI: <URI to your SCDF Server>
  services:
    ...