流处理入门
Spring Cloud Data Flow 提供了 70 多个预构建的流应用程序,您可以立即使用它们来实现常见的流用例。在本指南中,我们将使用其中两个应用程序构建一个简单的数据管道,该管道生成从外部 HTTP 请求发送的数据,并通过将有效负载记录到终端来使用该数据。
《安装指南》包含有关向 Data Flow 注册这些预构建应用程序的说明。
流 DSL 概述
您可以使用领域特定语言 (DSL) 通过 shell 或仪表板以及以编程方式在 Java 中创建流。仪表板还允许您将应用程序拖放到调色板上并以可视方式连接它们。仪表板是双向的,因此可视化操作会更新 DSL。同样,对 DSL 的编辑会更新流的视图。
DSL 的建模灵感来自 Unix 管道和过滤器语法。例如,定义为 http | log
的流 DSL 表示一个 http
应用程序,它将从 HTTP 帖子接收到的数据发送到消息中间件。
log
应用程序从消息中间件接收包含该数据的消息,并将其记录到终端。DSL 中的每个名称都通过应用程序注册过程与一个应用程序关联。应用程序之间通过 |
符号连接,该符号表示消息中间件,它充当应用程序之间的“管道”。
下图显示了流处理的生命周期

创建流
要创建流,请执行以下操作
- 在菜单中,单击“**流**”。
-
单击“**创建流**”按钮。
屏幕将更改为下图
- 在文本区域中,键入
http | log
。 - 单击“**创建流**”。
-
输入
http-ingest
作为流名称,如下所示 -
单击“**创建流**”按钮。
将显示“定义”页面。
部署流
现在您已经定义了一个流,您可以部署它。为此,请执行以下操作
-
单击您在上一节中创建的
http-ingest
定义旁边的播放(部署)按钮。UI 显示了您可以应用于
http-ingest
流中的应用程序的可用属性。下图所示的此示例使用默认值
如果您使用本地数据流服务器,请添加以下部署属性来设置端口以避免端口冲突
如果将 Spring Cloud Data Flow 部署到 Kubernetes,请在 http
源应用程序上将 kubernetes.createLoadBalancer
部署属性设置为 true
,以便在外部公开服务,如下所示
-
点击**部署流**按钮。
用户界面将返回到“定义”页面。
该流现在处于
正在部署
状态,并在完成部署后变为已部署
状态。您可能需要刷新浏览器才能看到更新后的状态。
验证输出
部署应用程序后,您可以验证其输出。具体操作方法取决于您运行应用程序的位置
本地
本节详细介绍了当您的应用程序在本地服务器上运行时如何验证输出。
测试数据
流部署并运行后,您可以发布一些数据。您可以使用以下 curl 命令执行此操作
curl https://127.0.0.1:20100 -H "Content-type: text/plain" -d "Happy streaming"
结果
部署流后,您可以查看其日志。为此,请执行以下操作
- 点击菜单中的**运行时**。
- 点击
http-ingest.log
。 - 复制仪表板上的
stdout
文本框中的路径 -
在另一个控制台窗口中,键入以下内容,将
/path/from/stdout/textbox/in/dashboard
替换为您在上一步中复制的值docker exec -it skipper tail -f /path/from/stdout/textbox/in/dashboard
日志接收器的输出显示在新窗口中。您应该会看到如下所示的输出。
log-sink : Happy streaming
当您在发送 http 请求后看到足够的输出时,请按 Ctrl+C 结束 tail
命令。
Cloud Foundry
本节详细介绍了如何在应用程序在 Cloud Foundry 上运行时验证输出。
测试数据
在 Cloud Foundry 中部署并运行流后,您可以发布一些数据。您可以使用以下 curl 命令执行此操作
curl http://http-ingest-314-log-v1.cfapps.io -H "Content-type: text/plain" -d "Happy streaming"
结果
现在,您可以再次列出正在运行的应用程序,并在列表中看到您的应用程序,如下所示
cf apps [1h] ✭
Getting apps in org ORG / space SPACE as [email protected]...
name requested state instances memory disk urls
http-ingest-314-log-v1 started 1/1 1G 1G http-ingest-314-log-v1.cfapps.io
http-ingest-314-http-v1 started 1/1 1G 1G http-ingest-314-http-v1.cfapps.io
skipper-server started 1/1 1G 1G skipper-server.cfapps.io
dataflow-server started 1/1 1G 1G dataflow-server.cfapps.io
现在,您可以验证日志,如下所示
cf logs http-ingest-314-log-v1
...
...
2017-11-20T15:39:43.76-0800 [APP/PROC/WEB/0] OUT 2017-11-20 23:39:43.761 INFO 12 --- [ http-ingest-314.ingest-314-1] log-sink : Happy streaming
Kubernetes
本节详细介绍了如何在应用程序在 Kubernetes 上运行时验证输出。
通过运行命令获取 HTTP 服务 URL。
如果部署到支持负载均衡器的集群,则可以通过运行以下命令来确定 HTTP 服务地址
如果您的 Kubernetes 负载均衡器仅提供主机名,请使用
export SERVICE_URL="$(kubectl get svc --namespace default http-ingest-http -o jsonpath='{.status.loadBalancer.ingress[0].name}'):8080"
如果您的 Kubernetes 负载均衡器仅提供 IP 地址,请使用
export SERVICE_URL="$(kubectl get svc --namespace default http-ingest-http -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8080"
LoadBalancer IP 可能需要几分钟才能可用。您可以通过运行 kubectl get svc -w http-ingest-http
来观察服务器的状态
如果您使用 Minikube,则可以使用以下命令获取服务器的 URL
export SERVICE_URL=$(minikube service --url http-ingest-http --namespace='default')
您可以通过键入以下内容来查看应用程序的 HTTP URL
echo $SERVICE_URL
测试数据
在 Kubernetes 中部署并运行流后,您现在可以发布一些数据。您可以使用以下 curl 命令执行此操作
curl $SERVICE_URL -H "Content-type: text/plain" -d "Happy streaming"
结果
结果应与以下示例类似
kubectl get pods
NAME READY STATUS RESTARTS AGE
http-ingest-log-v1-0-2k4r8 1/1 Running 0 2m
http-ingest-http-v1-qhdqq 1/1 Running 0 2m
mysql-777890292-z0dsw 1/1 Running 0 49m
rabbitmq-317767540-2qzrr 1/1 Running 0 49m
scdf-server-2734071167-bjd3g 1/1 Running 0 12m
skipper-2408247821-50z31 1/1 Running 0 15m
现在,您可以验证日志,如下所示
kubectl logs -f http-ingest-log-v1-0-2k4r8
...
...
2017-10-30 22:59:04.966 INFO 1 --- [ http-ingest.http.http-ingest-1] log-sink : Happy streaming
Kail 是一个方便的工具,用于流式传输来自一个或多个命名空间的所有日志。
删除流
现在您可以删除您创建的流。为此,请执行以下操作
- 点击菜单中的流。
- 点击
http-ingest
行上的向下箭头。 - 点击销毁流。
- 当系统提示您确认时,请点击销毁流定义。
更新和回滚流
您可以在持续交付指南中找到此信息。
监控
您可以在流监控指南中找到此信息。