使用 Prometheus、Alertmanager 和 SCDF 扩展 API 进行自动扩展
要了解 Spring Cloud Data Flow 中的基本扩展概念,请参阅扩展指南。
概述
该解决方案使用Prometheus 警报规则根据应用程序吞吐量指标定义扩容和缩容警报。警报由Prometheus AlertManager和自定义webhook管理,后者又会触发 SCDF 中的扩展 API调用。
对于流数据管道(时间 | 转换 | 日志
),我们将展示如何在 时间
和 转换
应用程序之间测量吞吐率,以便将其用作警报的决定因素。我们将讨论在定义的阈值超过设置的规则以及触发自动扩展调用时如何触发警报。以下伪代码说明了此类警报规则的逻辑
rateDifference = rate(time) - rate(transform)
if rateDifference > 500 for 1 minute do fire HighThroughputDifference
if rateDifference == 0 for 3 minutes do fire ZeroThroughputDifference
- 实时计算
时间
和转换
应用程序之间吞吐量(即速率)差异的查询表达式。 - 当速率差异在
1 分钟
内超过500 条消息/秒
时触发的高吞吐量差异
警报规则。 - 如果速率差异在至少
3 分钟
内保持0 条消息/秒
,则触发的零吞吐量差异
警报规则。
下图显示了高级架构。
数据流指标架构 是在 Micrometer 库的帮助下设计的。Prometheus 可以是收集各种应用程序指标以进行性能分析的监控后端之一,它也允许进行警报配置。
Prometheus 中的警报分为
警报规则
:在 Prometheus 服务内部定义并由其触发。Alertmanager
:一个独立的服务,用于接收和管理触发的警报,并将其发送到预先注册的 Webhook。
警报规则 基于 Prometheus 表达式语言 (PQL)。它们用于定义警报并将扩展警报发送到 Alertmanager
。例如,扩展警报规则定义如下所示
alert: HighThroughputDifference
expr: avg(irate(spring_integration_send_seconds_count{application_name="time"}[1m])) by(stream_name) -
avg(irate(spring_integration_send_seconds_count{application_name="transform"}[1m])) by(stream_name) > 500
for: 30s
请参阅 alert.rule.yml(以及 kubectl 安装中的 此处)以查看正在使用的警报规则定义。
spring_integration_send_seconds_count
指标来自 spring integration
micrometer 支持,用于计算消息速率。
Alertmanager 是一个独立的服务,用于管理警报,包括静默、抑制、聚合以及将通知发送到预先配置的 Webhook。
AlertWebHookApplication(scdf-alert-webhook Spring Boot 应用程序的一部分)是一个自定义 Spring Boot 应用程序,在 config.yml 中注册为 Alertmanager Webhook 接收器。AlertWebHookApplication
接收来自 Prometheus 的警报通知(JSON 格式)。借助 SCDF 的 Scale API,它可以触发扩展请求,以自动扩展 SCDF 中警报流数据管道所引用的应用程序。
警报通知还包含警报 PQL 表达式中使用的指标标签。对于我们的示例,这意味着 stream_name
标签与通知警报一起传递,让 AlertWebHookApplication
确定必须扩展的数据管道的名称。
数据流规模 REST API 提供了一种平台无关的机制来扩展数据管道应用程序。
AlertWebHookApplication
使用 spring.cloud.dataflow.client.server-uri
属性来配置规模 API 端点。有关完整的部署配置,请参阅 alertwebhook-deployment.yaml。
以下视频动画显示了数据流自动缩放流程
先决条件
此方案使用 Kubernetes 平台。
该方案已在具有五个节点的 GKE 集群上成功测试。
由于多个应用程序实例导致 CPU 需求较高,因此在 minikube 上运行该方案可能会很困难或无法实现。
按照 Kubectl 安装说明设置 Spring Cloud Data Flow 和 Kafka 代理。
然后安装 Alertmanager
和 AlertWebHook
并重新配置 Prometheus
服务
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"
kubectl delete pods -l app=prometheus
将 my-release-prometheus-server
替换为当前的 Prometheus CM。使用 kubectl get cm
列出您的配置。
按照 Helm 安装说明设置 Spring Cloud Data Flow 和 Kafka 代理。您可以使用 features.monitoring.enabled=true
以及至少 10Gi
的存储空间
helm install --name my-release stable/spring-cloud-data-flow --set features.monitoring.enabled=true,kafka.enabled=true,rabbitmq.enabled=false,kafka.persistence.size=10Gi
然后安装 Alertmanager
和 AlertWebHook
并重新配置 Prometheus
服务
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"
注册最新的 kafka-docker 应用程序启动器
app import --uri https://dataflow.springframework.org.cn/kafka-docker-latest
启动 SCDF Shell 并将其连接到您的数据流服务器
server-unknown:>dataflow config server http://<SCDF IP>
自动缩放方案
此方案展示了如何设置自动缩放。
创建数据管道
stream create --name scaletest --definition "time --fixed-delay=995 --time-unit=MILLISECONDS | transform --expression=\"payload + '-' + T(java.lang.Math).exp(700)\" | log"
time
源以固定的时间间隔(995 毫秒 = ~1 条消息/秒)生成当前时间戳消息,transform
处理器执行数学运算以模拟高 CPU 处理,而 log
sink 打印转换后的消息负载。
部署具有数据分区的 Data Pipeline
stream deploy --name scaletest --properties "app.time.producer.partitionKeyExpression=payload,app.transform.spring.cloud.stream.kafka.binder.autoAddPartitions=true,app.transform.spring.cloud.stream.kafka.binder.minPartitionCount=4"
属性 producer.partitionKeyExpression=payload
配置时间源的输出绑定以进行分区。分区键表达式使用消息负载(即当前时间戳的 toString()
值)来计算如何将数据分区到下游输出通道。部署属性 spring.cloud.stream.kafka.binder.autoAddPartitions
指示 Kafka binder 在需要时创建新分区。如果主题尚未过度分区,则需要此属性。属性 spring.cloud.stream.kafka.binder.minPartitionCount
设置 Kafka binder 在主题上配置的最小分区数,transform-processor 将在该主题上订阅新数据。
使用 SCDF 的内置 Grafana 仪表板查看流应用程序的吞吐量和其他应用程序指标
time
、transform
和 log
应用程序保持相同的消息吞吐量(~1 条消息/秒)。transform
处理当前负载。
增加数据管道负载
现在,我们通过提高时间源的消息生成速率来增加负载。通过将 time 的 time-unit
属性从 MILLISECONDS
更改为 MICROSECONDS
,输入速率从每秒一条消息增加到每秒数千条消息。请注意,流滚动更新功能允许您对 time 应用程序执行滚动更新,而无需停止整个流。
stream update --name scaletest --properties "app.time.trigger.time-unit=MICROSECONDS"
time
应用使用新的时间单位属性重新部署:
现在,time
源以 ~5000 条消息/秒
的速率发送消息。但是,transform
处理器的上限约为 1000 条消息/秒
,从而将整个流的吞吐量限制在一定水平。这表明 transform
已成为瓶颈。
HighThroughputDifference
Prometheus 警报规则检测到速率差异并触发横向扩展警报。
这将导致添加三个额外的转换实例。
借助 log
接收器的其他实例,整个数据管道赶上了 time
源的生产速率。
降低数据管道负载
假设我们将时间源的数据生产速率降低回原始速率(即 1 条消息/秒
)。
stream update --name scaletest --properties "app.time.trigger.time-unit=MILLISECONDS"
额外的 transform
处理器实例不再改变整体吞吐率。最终,速率差变为零,并触发 ZeroThroughputDifference
警报。反过来,此警报会触发缩减操作,并且可以缩减额外的实例。
使用单个 transform
实例,整个数据管道的吞吐量将恢复正常,为 ~1 条消息/秒
。