使用 Data Flow 和 RabbitMQ 进行流处理

本节介绍如何使用 Data Flow 注册流应用程序、创建流 DSL,以及将流部署到 Cloud Foundry、Kubernetes 和本地机器。

在前面的指南中,我们创建了 SourceProcessorSink 流应用程序,并将它们作为独立应用程序部署到多个平台上。在本指南中,我们将使用 Data Flow 注册这些应用程序、创建流 DSL,以及将流部署到 Cloud Foundry、Kubernetes 和本地机器。

开发

上一指南中的所有示例应用程序都可以在 https://repo.spring.io Maven 存储库中找到,作为 mavendocker 工件。

对于 UsageDetailSender 源,请使用以下之一

maven://io.spring.dataflow.sample:usage-detail-sender-rabbit:0.0.1-SNAPSHOT
docker://springcloudstream/usage-detail-sender-rabbit:0.0.1-SNAPSHOT

对于 UsageCostProcessor 处理器,请使用以下之一

maven://io.spring.dataflow.sample:usage-cost-processor-rabbit:0.0.1-SNAPSHOT
docker://springcloudstream/usage-cost-processor-rabbit:0.0.1-SNAPSHOT

对于 UsageCostLogger 接收器,请使用以下之一

maven://io.spring.dataflow.sample:usage-cost-logger-rabbit:0.0.1-SNAPSHOT
docker://springcloudstream/usage-cost-logger-rabbit:0.0.1-SNAPSHOT

数据流仪表板

假设数据流已在某个受支持的平台上安装并运行,请在浏览器中打开 <data-flow-url>/dashboard。其中,<data-flow-url> 取决于平台。有关如何确定安装的基本 URL,请参阅安装指南。如果数据流在您的本地机器上运行,请访问 http://localhost:9393/dashboard

应用程序注册

Spring Cloud Data Flow 中的应用程序注册为命名资源,以便在使用 Data Flow DSL 配置和组合流式管道时可以引用它们。注册将逻辑应用程序名称和类型与其物理资源相关联,物理资源由 URI 给出。

URI 符合模式,可以表示 Maven 工件、Docker 镜像或实际的 http(s)file URL。Data Flow 定义了一些逻辑应用程序类型,以指示其作为流式组件、任务或独立应用程序的角色。对于流式应用程序,正如您可能预期的那样,我们使用 SourceProcessorSink 类型。

数据流仪表板位于“应用程序注册”视图,我们可以在其中注册源、处理器和接收器应用程序,如下所示

Add an application

在此步骤中,我们将注册之前创建的应用程序。注册应用程序时,您需要提供其

  • 位置 URI(Maven、HTTP、Docker、文件等)
  • 应用程序版本
  • 应用程序类型(源、处理器或接收器)
  • 应用程序名称

下表显示了我们在之前的指南中创建的应用程序

应用程序名称 应用程序类型 应用程序 URI
usage-detail-sender maven://io.spring.dataflow.sample:usage-detail-sender-rabbit:0.0.1-SNAPSHOT
usage-cost-processor 处理器 maven://io.spring.dataflow.sample:usage-cost-processor-rabbit:0.0.1-SNAPSHOT
usage-cost-logger 接收器 maven://io.spring.dataflow.sample:usage-cost-logger-rabbit:0.0.1-SNAPSHOT

如果您在 Docker 环境中运行 Spring Cloud Data Flow 服务器,请确保您的应用程序工件 URI 可访问。例如,除非您已使应用程序位置可访问,否则您可能无法从 SCDF 或 Skipper Docker 容器访问 file:/。我们建议对应用程序 URI 使用 http://maven://docker://

在本例中,假设您在本地开发环境中运行 Spring Cloud Data Flow 和 Skipper 服务器。

您可以注册 UsageDetailSender 源应用程序。为此,请执行以下操作

  1. 在“应用程序”视图中,选择“**添加应用程序**”。这将显示一个允许您注册应用程序的视图。
  2. 选择“**注册一个或多个应用程序**”,然后输入源应用程序的 名称类型URI
  3. 注册名为 usage-detail-senderUsageDetailSender 应用程序的 maven 工件,如下所示

    (uri = maven://io.spring.dataflow.sample:usage-detail-sender-rabbit:0.0.1-SNAPSHOT

    如果您使用 docker 工件,则使用以下 URI 注册它

    (uri = docker://springcloudstream/usage-detail-sender-rabbit:0.0.1-SNAPSHOT

  4. 单击“**新建应用程序**”以显示该表单的另一个实例,以输入处理器的值。
  5. 注册名为 usage-cost-processorUsageCostProcessor 处理器应用程序的 maven 工件,如下所示

    (uri = maven://io.spring.dataflow.sample:usage-cost-processor-rabbit:0.0.1-SNAPSHOT

    如果您使用 docker 工件,则使用以下 URI 注册它

    (uri = docker://springcloudstream/usage-cost-processor-rabbit:0.0.1-SNAPSHOT

  6. 单击“**新建应用程序**”以显示该表单的另一个实例,以输入接收器的值。
  7. 注册名为 usage-cost-loggerUsageCostLogger 接收器应用程序的 maven 工件,如下所示

    (uri = maven://io.spring.dataflow.sample:usage-cost-logger-rabbit:0.0.1-SNAPSHOT

    如果您使用 docker 工件,则使用以下 URI 注册它

    (uri = docker://springcloudstream/usage-cost-logger-rabbit:0.0.1-SNAPSHOT

    Register source application maven

  8. 单击“**导入应用程序**”以完成注册。这样做会返回“应用程序”视图,其中列出了您的应用程序。下图显示了一个示例

    Registered applications

创建流定义

要创建流定义,请执行以下操作

  1. 从左侧导航栏中选择“**流**”。这将显示主“流”视图,如下所示

    Create streams

  2. 选择“**创建流**”以显示用于创建流定义的图形编辑器,如下图所示

    Create usage cost logger stream

    您可以在左侧面板中看到上面注册的 处理器接收器 应用程序。

  3. 将每个应用程序拖放到画布上,然后使用手柄将它们连接在一起。请注意顶部文本面板中显示的等效数据流 DSL 定义。
  4. 点击 创建流

    您可以在创建流时键入流的名称 usage-cost-logger

部署

要部署您的流,

  1. 点击箭头图标以部署流。这样做会将您带到“部署流”页面,您可以在其中输入其他部署属性。
  2. 选择 部署,如下所示

    Stream created

  3. 部署流时,请从本地、Kubernetes 或 Cloud Foundry 中选择目标平台帐户。这基于 Spring Cloud Skipper 服务器部署程序平台帐户设置。

    Deploy stream

    当所有应用程序都在运行时,流就成功部署了。

    Stream deployed

上述过程对于所有平台基本相同。以下部分介绍了在本地、Cloud Foundry 和 Kubernetes 上部署 Data Flow 的平台特定详细信息。

本地

如果您在 本地 环境中部署流,则需要为每个应用程序的 server.port 应用程序属性设置唯一值,以便它们可以在 本地 上使用不同的端口。

本地 开发环境中部署流后,您可以使用仪表板的运行时页面或使用 SCDF shell 命令 runtime apps 查看运行时应用程序。运行时应用程序显示有关每个应用程序在本地环境中运行的位置及其日志文件位置的信息。

如果您在 Docker 上运行 SCDF,要访问流式传输应用程序的日志文件,可以运行以下命令(显示其输出)

docker exec <stream-application-docker-container-id> tail -f <stream-application-log-file>

2019-04-19 22:16:04.864  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Mark", "callCost": "0.17", "dataCost": "0.32800000000000007" }
2019-04-19 22:16:04.872  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Janne", "callCost": "0.20800000000000002", "dataCost": "0.298" }
2019-04-19 22:16:04.872  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Ilaya", "callCost": "0.175", "dataCost": "0.16150000000000003" }
2019-04-19 22:16:04.872  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Glenn", "callCost": "0.145", "dataCost": "0.269" }
2019-04-19 22:16:05.256  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Ilaya", "callCost": "0.083", "dataCost": "0.23800000000000002" }
2019-04-19 22:16:06.257  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Janne", "callCost": "0.251", "dataCost": "0.026500000000000003" }
2019-04-19 22:16:07.264  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Janne", "callCost": "0.15100000000000002", "dataCost": "0.08700000000000001" }
2019-04-19 22:16:08.263  INFO 95238 --- [container-0-C-1] c.e.demo.UsageCostLoggerApplication      : {"userId": "Sabby", "callCost": "0.10100000000000002", "dataCost": "0.33" }
2019-04

Cloud Foundry

在按照前面显示的说明注册流应用程序并将其部署到 Cloud Foundry 之前,您应确保在 Cloud Foundry 上运行了 Spring Cloud Data Flow 实例。请参阅 Cloud Foundry 安装指南 以供参考。

按照本章前面显示的步骤操作,在注册应用程序并部署流之后,您可以在 Cloud Foundry 的“组织”和“空间”中看到已成功部署的应用程序,如下所示:

Cloud Foundry Apps Manager with the deployed Stream Application

您还可以在 Spring Cloud Data Flow 仪表板中访问流应用程序的运行时信息。

除了验证流的运行时状态之外,您还应该验证 usage-cost-logger sink 生成的日志输出。在 Cloud Foundry Apps Manager 中,单击 usage-cost-logger sink 应用程序的“**日志**”选项卡。日志语句应如下所示:

Data Flow Runtime Information

Kubernetes

在 Kubernetes 中运行 Spring Cloud Data Flow 服务器后(按照安装指南中的说明操作),您可以:

  • 注册流应用程序
  • 创建、部署和管理流

使用 Spring Cloud Data Flow 服务器注册应用程序

Kubernetes 环境要求应用程序工件为 docker 镜像。

对于 UsageDetailSender 源,请使用以下内容:

docker://springcloudstream/usage-detail-sender-rabbit:0.0.1-SNAPSHOT

对于 UsageCostProcessor 处理器,请使用以下内容:

docker://springcloudstream/usage-cost-processor-rabbit:0.0.1-SNAPSHOT

对于 UsageCostLogger sink,请使用以下内容:

docker://springcloudstream/usage-cost-logger-rabbit:0.0.1-SNAPSHOT

您可以注册这些应用程序,如前面的应用程序注册步骤中所述。

流部署

注册应用程序后,您可以按照上面流部署部分中的说明部署流。

列出 Pod

要列出 Pod(包括服务器组件和流应用程序),请运行以下命令(显示其输出):

 kubectl get pods
NAME                                                         READY   STATUS    RESTARTS   AGE
scdf-release-data-flow-server-795c77b85c-tqdtx               1/1     Running   0          36m
scdf-release-data-flow-skipper-85b6568d6b-2jgcv              1/1     Running   0          36m
scdf-release-mariadb-744757b689-tsnnz                          1/1     Running   0          36m
scdf-release-rabbitmq-5fb7f7f644-878pz                       1/1     Running   0          36m
usage-cost-logger-usage-cost-logger-v1-568599d459-hk9b6      1/1     Running   0          2m41s
usage-cost-logger-usage-cost-processor-v1-79745cf97d-dwjpw   1/1     Running   0          2m42s
usage-cost-logger-usage-detail-sender-v1-6cd7d9d9b8-m2qf6    1/1     Running   0          2m41s
验证日志

为了确保前面部分中的步骤已正确执行,您应该验证日志。以下示例(及其输出)展示了如何确保您期望的值出现在日志中。

kubectl logs -f usage-cost-logger-usage-cost-logger-v1-568599d459-hk9b6
2019-05-17 17:53:44.189  INFO 1 --- [e-cost-logger-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user2", "callCost": "0.7000000000000001", "dataCost": "23.950000000000003" }
2019-05-17 17:53:45.190  INFO 1 --- [e-cost-logger-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user4", "callCost": "2.9000000000000004", "dataCost": "10.65" }
2019-05-17 17:53:46.190  INFO 1 --- [e-cost-logger-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user3", "callCost": "5.2", "dataCost": "28.85" }
2019-05-17 17:53:47.192  INFO 1 --- [e-cost-logger-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user4", "callCost": "1.7000000000000002", "dataCost": "30.35" }

与独立部署的比较

在本节中,我们使用 Spring Cloud Data Flow 和流 DSL 部署了流。

usage-detail-sender | usage-cost-processor | usage-cost-logger

当这三个应用程序作为独立应用程序部署时,您需要设置连接应用程序的绑定属性,以使它们成为一个流。

相反,Spring Cloud Data Flow 允许您将所有三个流应用程序部署为单个流,并负责将一个应用程序连接到另一个应用程序以形成数据流。