流应用程序的持续交付

流数据管道中组合的应用程序可能经常需要更改。更改可以是修复错误或设置不同应用程序属性值的应用程序的新版本。为了避免流处理出现停机时间,我们希望仅对流中已更改的应用程序进行滚动升级。此外,如果升级结果不符合预期,应该可以轻松回滚到应用程序的先前版本。

Spring Cloud Data Flow 通过 Skipper 服务器提供对事件流应用程序持续交付的支持。

Stream Rolling Upgrade and Rollbacks

为了演示这一点,我们可以使用安装 Data Flow 时已注册的一些开箱即用的流应用程序。

本地

本节介绍如何在本地环境中使用持续交付。

流创建和部署

在本节中,我们将创建和部署

  • 一个流,其源代码接收 http 事件
  • 应用转换逻辑的 transform 处理器
  • 显示转换后事件结果的 log 接收器

在以下流定义中,您可以在应用程序部署到 local 时为每个应用程序提供唯一的服务器端口

stream create http-ingest --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() --server.port=9001 | log --server.port=9002" --deploy

您可以使用 stream list 命令验证流是否正在部署中

stream list

输出应类似于以下列表

╔═══════════╤══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═════════════════════════════╗
║Stream Name│                                                      Stream Definition                                                       │           Status            ║
╠═══════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════╣
║http-ingest│http --server.port=9000 | transform --transformer.expression=payload.toUpperCase() --server.port=9001 | log --server.port=9002│The stream is being deployed.║
╚═══════════╧══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═════════════════════════════╝

部署流后,您可以再次运行该命令

stream list

输出更改以指示流已部署

╔═══════════╤══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═════════════════════════════════════════╗
║Stream Name│                                                      Stream Definition                                                       │                 Status                  ║
╠═══════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════════════════╣
║http-ingest│http --server.port=9000 | transform --transformer.expression=payload.toUpperCase() --server.port=9001 | log --server.port=9002│The stream has been successfully deployed║
╚═══════════╧══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═════════════════════════════════════════╝

从 Spring Cloud Data Flow shell 发布一些数据

http post --target "http://localhost:9000" --data "spring"

log 应用程序的日志文件中,您应该看到以下内容

log-sink                                 :  SPRING

stream manifest http-ingest 命令显示此版本流的所有应用程序及其属性

stream manifest http-ingest

您应该看到类似于以下内容的结果

"apiVersion": "skipper.spring.io/v1"
"kind": "SpringCloudDeployerApplication"
"metadata":
  "name": "http"
"spec":
  "resource": "maven://org.springframework.cloud.stream.app:http-source-rabbit:jar"
  "resourceMetadata": "maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:jar:metadata:3.2.1"
  "version": "3.2.1"  "applicationProperties":
    "spring.metrics.export.triggers.application.includes": "integration**"
    "spring.cloud.dataflow.stream.app.label": "http"
    "spring.cloud.stream.metrics.key": "http-ingest.http.${spring.cloud.application.guid}"
    "spring.cloud.stream.bindings.output.producer.requiredGroups": "http-ingest"
    "spring.cloud.stream.metrics.properties": "spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*"
    "server.port": "9000"
    "spring.cloud.stream.bindings.output.destination": "http-ingest.http"
    "spring.cloud.dataflow.stream.name": "http-ingest"
    "spring.cloud.dataflow.stream.app.type": "source"
  "deploymentProperties":
    "spring.cloud.deployer.group": "http-ingest"
---
"apiVersion": "skipper.spring.io/v1"
"kind": "SpringCloudDeployerApplication"
"metadata":
  "name": "log"
"spec":
  "resource": "maven://org.springframework.cloud.stream.app:log-sink-rabbit:jar"
  "resourceMetadata": "maven://org.springframework.cloud.stream.app:log-sink-rabbit:jar:jar:metadata:2.1.1.RELEASE"
  "version": "2.1.1.RELEASE"  "applicationProperties":
    "spring.metrics.export.triggers.application.includes": "integration**"
    "spring.cloud.dataflow.stream.app.label": "log"
    "spring.cloud.stream.metrics.key": "http-ingest.log.${spring.cloud.application.guid}"
    "spring.cloud.stream.bindings.input.group": "http-ingest"
    "spring.cloud.stream.metrics.properties": "spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*"
    "server.port": "9002"
    "spring.cloud.dataflow.stream.name": "http-ingest"
    "spring.cloud.dataflow.stream.app.type": "sink"
    "spring.cloud.stream.bindings.input.destination": "http-ingest.transform"
  "deploymentProperties":
    "spring.cloud.deployer.group": "http-ingest"
---
"apiVersion": "skipper.spring.io/v1"
"kind": "SpringCloudDeployerApplication"
"metadata":
  "name": "transform"
"spec":
  "resource": "maven://org.springframework.cloud.stream.app:transform-processor-rabbit:jar"
  "resourceMetadata": "maven://org.springframework.cloud.stream.app:transform-processor-rabbit:jar:jar:metadata:3.2.1"
  "version": "3.2.1"  "applicationProperties":
    "spring.metrics.export.triggers.application.includes": "integration**"
    "spring.cloud.dataflow.stream.app.label": "transform"
    "spring.cloud.stream.metrics.key": "http-ingest.transform.${spring.cloud.application.guid}"
    "spring.cloud.stream.bindings.input.group": "http-ingest"
    "transformer.expression": "payload.toUpperCase()"
    "spring.cloud.stream.metrics.properties": "spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*"
    "spring.cloud.stream.bindings.output.producer.requiredGroups": "http-ingest"
    "server.port": "9001"
    "spring.cloud.dataflow.stream.name": "http-ingest"
    "spring.cloud.stream.bindings.output.destination": "http-ingest.transform"
    "spring.cloud.dataflow.stream.app.type": "processor"
    "spring.cloud.stream.bindings.input.destination": "http-ingest.http"
  "deploymentProperties":
    "spring.cloud.deployer.group": "http-ingest"

例如,您可以看到 transform 应用程序具有 "transformer.expression": "payload.toUpperCase()" 属性。stream history http-ingest 命令显示此事件流的历史记录,列出所有可用版本

stream history --name http-ingest

该命令的输出应类似于以下列表

╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║1      │Wed May 08 20:45:18 IST 2019│DEPLOYED│http-ingest │1.0.0          │Install complete║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

流更新

如果要更新现有已部署流以使用不同版本的 log 应用程序,则可以执行流 update 操作。

首先,您可以注册所需版本的 log 应用程序

app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1

然后,您可以执行流更新,如下所示

stream update --name http-ingest --properties "version.log=3.2.1"

运行 stream update 时,您可以运行

jps

然后,您可以看到 log 应用程序 log-sink-rabbit-2.1.0.RELEASE.jar 已部署,并且现有 log-sink-rabbit-2.1.1.RELEASE.jar 已销毁。

流更新完成后,您可以验证 stream manifest 以查看 log 应用程序的版本是否已更改

stream manifest http-ingest

您应该看到类似于以下列表的输出

"apiVersion": "skipper.spring.io/v1"
"kind": "SpringCloudDeployerApplication"
"metadata":
  "name": "log"
"spec":
  "resource": "maven://org.springframework.cloud.stream.app:log-sink-rabbit:jar"
  "resourceMetadata": "maven://org.springframework.cloud.stream.app:log-sink-rabbit:jar:jar:metadata:3.2.1"
  "version": "3.2.1"  "applicationProperties":
    "spring.metrics.export.triggers.application.includes": "integration**"
    "spring.cloud.dataflow.stream.app.label": "log"
    "spring.cloud.stream.metrics.key": "http-ingest.log.${spring.cloud.application.guid}"
    "spring.cloud.stream.bindings.input.group": "http-ingest"
    "spring.cloud.stream.metrics.properties": "spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*"
    "server.port": "9002"
    "spring.cloud.dataflow.stream.name": "http-ingest"
    "spring.cloud.dataflow.stream.app.type": "sink"
    "spring.cloud.stream.bindings.input.destination": "http-ingest.transform"
  "deploymentProperties":
    "spring.cloud.deployer.count": "1"
    "spring.cloud.deployer.group": "http-ingest"
---
...
...

同样,您可以使用 stream history 命令查看操作历史记录

stream history --name http-ingest

输出应类似于以下列表

╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Wed May 08 21:34:45 IST 2019│DEPLOYED│http-ingest │1.0.0          │Upgrade complete║
║1      │Wed May 08 21:30:00 IST 2019│DELETED │http-ingest │1.0.0          │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

您还可以更改应用程序的配置属性,而无需使用新版本的应用程序。假设您要更改 transform 应用程序中使用的转换逻辑,而无需重新部署整个流并单独更新 transform 应用程序。您可以使用以下命令执行此操作

stream update http-ingest --properties "app.transform.expression=payload.toUpperCase().concat('!!!')"

再次运行 stream manifest http-ingest 命令时,您可以看到 transform 应用程序现在已更改为包含 expression 属性,该属性通过在末尾附加 !!! 来转换每个有效负载。

现在测试更新

http post --target "http://localhost:9000" --data "spring"

log 应用程序的日志文件中,您可以看到以下内容

log-sink                                 : SPRING!!!

stream history http-ingest 命令的输出包括此流历史记录中的新事件。

流回滚

如果要将事件流回滚到特定版本,可以使用 stream rollback http-ingest --releaseVersion <release-version> 命令。

您可以回滚到事件流的初始版本(其中 transform 应用程序仅进行大写转换)

stream rollback http-ingest --releaseVersion 1
http post --target "http://localhost:9000" --data "spring"

log 应用程序的日志文件中,您现在可以看到

log-sink : SPRING