使用 SCDF shell 横向扩展流数据管道

要了解 Spring Cloud Data Flow 中的基本扩展概念,请参阅扩展指南。

概述

我们部署了一个流数据管道,模拟了一个资源紧张的场景(例如,CPU 使用率飙升),并使用 SCDF shell 手动增加消费者应用程序实例的数量来处理增加的负载,如下图所示

SCDF Manual Scaling

这种手动方法允许运维人员和开发人员在观察到给定应用程序中的高负载时进行干预并进行横向扩展。例如,零售商店可能希望设计一个可以预先扩展的系统,以满足高负载和需求旺盛的日子,例如黑色星期五。

当需要更复杂的方法来以自动化的方式控制横向扩展和缩减操作时,借助 SCDF 使用 Prometheus 的监控支持,您可以配置和设置自动扩展规则。请参阅使用 SCDF 和 Prometheus 自动扩展流数据管道案例,了解如何执行此操作。

先决条件

此方法使用 Kubernetes 平台。请按照 KubectlHelm 安装说明设置 Spring Cloud Data Flow 和 Kafka broker。然后运行以下命令

helm install --name my-release stable/spring-cloud-data-flow --set kafka.enabled=true,rabbitmq.enabled=false,kafka.persistence.size=10Gi

然后注册最新的 kafka-docker 应用启动器。

然后启动 SCDF Shell 并将其连接到您的 Data Flow 服务器

server-unknown:>dataflow config server http://<SCDF IP>

扩展方案

本节展示了扩展应用程序的方案。

创建数据管道

要创建数据管道,请运行以下命令

stream create --name scaletest --definition "time --fixed-delay=995 --time-unit=MILLISECONDS | transform --expression=\"payload + '-' + T(java.lang.Math).exp(700)\" | log"

time 源以固定的时间间隔(995 毫秒 = ~1 条消息/秒)生成当前时间戳消息,transform 处理器执行数学运算以模拟高 CPU 处理,log 接收器打印转换后的消息负载。

部署具有数据分区的的数据管道

要部署具有数据分区的的数据管道,请运行以下命令

stream deploy --name scaletest --properties "app.time.producer.partitionKeyExpression=payload,app.transform.spring.cloud.stream.kafka.binder.autoAddPartitions=true,app.transform.spring.cloud.stream.kafka.binder.minPartitionCount=4"

producer.partitionKeyExpression=payload 属性配置用于分区的 time 输出绑定。分区键表达式使用消息负载(例如,当前时间戳的 toString() 值)来计算如何将数据分区到下游输出通道。spring.cloud.stream.kafka.binder.autoAddPartitions 部署属性指示 Kafka 绑定程序在需要时创建新分区。如果主题尚未过度分区,则需要此属性。spring.cloud.stream.kafka.binder.minPartitionCount 属性设置 Kafka 绑定程序在主题上配置的最小分区数,transform 处理器在该主题上订阅新数据。

Once the scaletest stream is deployed you should see the following: SCDF manual scaling non-scaled scaletest deployed SCDF manual scaling scaletest deployed

每个应用程序的单个应用程序实例。

使用 SCDF 内置的 Grafana 仪表板查看流应用程序的吞吐量和其他应用程序指标

SCDF manual scaling scaletest deployed

timetransformlog 应用程序保持相同的消息吞吐量(约 1 条消息/秒)。transform 处理当前负载。

增加数据管道负载

现在,我们通过提高时间源的消息生成速率来增加负载。通过将时间源的 time-unit 属性从 MILLISECONDS 更改为 MICROSECONDS,输入速率从每秒一条消息增加到每秒数千条消息。请注意,流滚动更新 功能允许您仅对时间应用程序执行滚动更新,而无需停止整个流。

stream update --name scaletest --properties "app.time.trigger.time-unit=MICROSECONDS"

现在,time 源以 ~5000 条消息/秒 的速率发出消息。但是,transform 处理器的上限约为 1000 条消息/秒。反过来,它会将整个流的吞吐量限制在一定水平。这表明 transform 已成为瓶颈,如下图所示。

SCDF manual scaling scaletest deployed

横向扩展

使用 SCDF Shell 将 transform 实例扩展到四个。

stream scale app instances --name scaletest --applicationName transform --count 4

由于执行了上一条命令,因此部署了三个额外的转换实例,如下图所示。

SCDF manual scaling scaletest deployed after scale out SCDF manual scaling scaletest deployed after scale out

借助 transform 处理器的其他实例,整个数据管道可以赶上 time 源的生成速率。

SCDF manual scaling scaletest metrics after scale out

降低数据管道负载并缩减

现在,我们将源的数据生成速率降低回原始速率(1 条消息/秒)。

stream update --name scaletest --properties "app.time.trigger.time-unit=MILLISECONDS"

额外的 transform 处理器实例不再改变整体吞吐率。可以缩减(即删除)这些额外的实例,如下所示。

SCDF manual scaling scaletest metrics redundant apps

这样做可以让我们将转换应用程序实例的数量减少到原始容量。要减少实例数量,请运行以下命令。

stream scale app instances --name scaletest --applicationName transform --count 1

下图显示了结果: SCDF 手动缩放比例测试缩减指标