使用 Prometheus、Alertmanager 和 SCDF 扩展 API 进行自动扩展

要了解 Spring Cloud Data Flow 中的基本扩展概念,请参阅扩展指南。

概述

该解决方案使用Prometheus 警报规则根据应用程序吞吐量指标定义扩容和缩容警报。警报由Prometheus AlertManager和自定义webhook管理,后者又会触发 SCDF 中的扩展 API调用。

对于流数据管道(时间 | 转换 | 日志),我们将展示如何在 时间转换 应用程序之间测量吞吐率,以便将其用作警报的决定因素。我们将讨论在定义的阈值超过设置的规则以及触发自动扩展调用时如何触发警报。以下伪代码说明了此类警报规则的逻辑

rateDifference = rate(time) - rate(transform)                         
if rateDifference > 500 for 1 minute do fire HighThroughputDifference 
if rateDifference == 0 for 3 minutes do fire ZeroThroughputDifference 
  • 实时计算 时间转换 应用程序之间吞吐量(即速率)差异的查询表达式。
  • 当速率差异在 1 分钟 内超过 500 条消息/秒 时触发的 高吞吐量差异 警报规则。
  • 如果速率差异在至少 3 分钟 内保持 0 条消息/秒,则触发的 零吞吐量差异 警报规则。

下图显示了高级架构。

SCDF autoscaling architecture

数据流指标架构 是在 Micrometer 库的帮助下设计的。Prometheus 可以是收集各种应用程序指标以进行性能分析的监控后端之一,它也允许进行警报配置。

Prometheus 中的警报分为

  • 警报规则:在 Prometheus 服务内部定义并由其触发。
  • Alertmanager:一个独立的服务,用于接收和管理触发的警报,并将其发送到预先注册的 Webhook。

警报规则 基于 Prometheus 表达式语言 (PQL)。它们用于定义警报并将扩展警报发送到 Alertmanager。例如,扩展警报规则定义如下所示

alert: HighThroughputDifference
expr:  avg(irate(spring_integration_send_seconds_count{application_name="time"}[1m])) by(stream_name) -
       avg(irate(spring_integration_send_seconds_count{application_name="transform"}[1m])) by(stream_name) > 500
for: 30s

请参阅 alert.rule.yml(以及 kubectl 安装中的 此处)以查看正在使用的警报规则定义。

spring_integration_send_seconds_count 指标来自 spring integration micrometer 支持,用于计算消息速率。

Alertmanager 是一个独立的服务,用于管理警报,包括静默、抑制、聚合以及将通知发送到预先配置的 Webhook。

AlertWebHookApplicationscdf-alert-webhook Spring Boot 应用程序的一部分)是一个自定义 Spring Boot 应用程序,在 config.yml 中注册为 Alertmanager Webhook 接收器AlertWebHookApplication 接收来自 Prometheus 的警报通知(JSON 格式)。借助 SCDF 的 Scale API,它可以触发扩展请求,以自动扩展 SCDF 中警报流数据管道所引用的应用程序。

警报通知还包含警报 PQL 表达式中使用的指标标签。对于我们的示例,这意味着 stream_name 标签与通知警报一起传递,让 AlertWebHookApplication 确定必须扩展的数据管道的名称。

数据流规模 REST API 提供了一种平台无关的机制来扩展数据管道应用程序。

AlertWebHookApplication 使用 spring.cloud.dataflow.client.server-uri 属性来配置规模 API 端点。有关完整的部署配置,请参阅 alertwebhook-deployment.yaml

以下视频动画显示了数据流自动缩放流程

先决条件

此方案使用 Kubernetes 平台。

该方案已在具有五个节点的 GKE 集群上成功测试。

由于多个应用程序实例导致 CPU 需求较高,因此在 minikube 上运行该方案可能会很困难或无法实现。

按照 Kubectl 安装说明设置 Spring Cloud Data Flow 和 Kafka 代理。

然后安装 AlertmanagerAlertWebHook 并重新配置 Prometheus 服务

kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"
kubectl delete pods -l app=prometheus

my-release-prometheus-server 替换为当前的 Prometheus CM。使用 kubectl get cm 列出您的配置。

按照 Helm 安装说明设置 Spring Cloud Data Flow 和 Kafka 代理。您可以使用 features.monitoring.enabled=true 以及至少 10Gi 的存储空间

helm install --name my-release stable/spring-cloud-data-flow --set features.monitoring.enabled=true,kafka.enabled=true,rabbitmq.enabled=false,kafka.persistence.size=10Gi

然后安装 AlertmanagerAlertWebHook 并重新配置 Prometheus 服务

kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-svc.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/alertwebhook/alertwebhook-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-service.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/alertmanager/prometheus-alertmanager-configmap.yaml
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/main/dataflow-website/recipes/scaling/kubernetes/helm/prometheus/prometheus-configmap.yaml
kubectl patch cm my-release-prometheus-server --patch "$(cat ./prometheus-configmap.yaml)"

注册最新的 kafka-docker 应用程序启动器

app import --uri https://dataflow.springframework.org.cn/kafka-docker-latest

启动 SCDF Shell 并将其连接到您的数据流服务器

server-unknown:>dataflow config server http://<SCDF IP>

自动缩放方案

此方案展示了如何设置自动缩放。

创建数据管道

stream create --name scaletest --definition "time --fixed-delay=995 --time-unit=MILLISECONDS | transform --expression=\"payload + '-' + T(java.lang.Math).exp(700)\" | log"

time 源以固定的时间间隔(995 毫秒 = ~1 条消息/秒)生成当前时间戳消息,transform 处理器执行数学运算以模拟高 CPU 处理,而 log sink 打印转换后的消息负载。

部署具有数据分区的 Data Pipeline

stream deploy --name scaletest --properties "app.time.producer.partitionKeyExpression=payload,app.transform.spring.cloud.stream.kafka.binder.autoAddPartitions=true,app.transform.spring.cloud.stream.kafka.binder.minPartitionCount=4"

属性 producer.partitionKeyExpression=payload 配置时间源的输出绑定以进行分区。分区键表达式使用消息负载(即当前时间戳的 toString() 值)来计算如何将数据分区到下游输出通道。部署属性 spring.cloud.stream.kafka.binder.autoAddPartitions 指示 Kafka binder 在需要时创建新分区。如果主题尚未过度分区,则需要此属性。属性 spring.cloud.stream.kafka.binder.minPartitionCount 设置 Kafka binder 在主题上配置的最小分区数,transform-processor 将在该主题上订阅新数据。

SCDF autoscaling stream deploying

使用 SCDF 的内置 Grafana 仪表板查看流应用程序的吞吐量和其他应用程序指标

SCDF autoscaling initial metrics

timetransformlog 应用程序保持相同的消息吞吐量(~1 条消息/秒)。transform 处理当前负载。

增加数据管道负载

现在,我们通过提高时间源的消息生成速率来增加负载。通过将 time 的 time-unit 属性从 MILLISECONDS 更改为 MICROSECONDS,输入速率从每秒一条消息增加到每秒数千条消息。请注意,流滚动更新功能允许您对 time 应用程序执行滚动更新,而无需停止整个流。

stream update --name scaletest --properties "app.time.trigger.time-unit=MICROSECONDS"

time 应用使用新的时间单位属性重新部署: SCDF 自动扩展增加负载

现在,time 源以 ~5000 条消息/秒 的速率发送消息。但是,transform 处理器的上限约为 1000 条消息/秒,从而将整个流的吞吐量限制在一定水平。这表明 transform 已成为瓶颈。

SCDF autoscaling data pipeline bottleneck

HighThroughputDifference Prometheus 警报规则检测到速率差异并触发横向扩展警报。

SCDF autoscaling scaleout alert

这将导致添加三个额外的转换实例。

SCDF autoscaling adding 3 instances

借助 log 接收器的其他实例,整个数据管道赶上了 time 源的生产速率。

SCDF autoscaling stream catches up

降低数据管道负载

假设我们将时间源的数据生产速率降低回原始速率(即 1 条消息/秒)。

stream update --name scaletest --properties "app.time.trigger.time-unit=MILLISECONDS"

额外的 transform 处理器实例不再改变整体吞吐率。最终,速率差变为零,并触发 ZeroThroughputDifference 警报。反过来,此警报会触发缩减操作,并且可以缩减额外的实例。

SCDF autoscaling scale-in alert

使用单个 transform 实例,整个数据管道的吞吐量将恢复正常,为 ~1 条消息/秒