数据分区
分区是状态处理中的一个关键概念,无论是出于性能还是一致性原因,都要确保所有相关数据一起处理。例如,在时间窗口平均值计算示例中,重要的是来自任何给定传感器的所有测量值都由同一个应用程序实例处理。或者,您可能希望缓存与传入事件相关的某些数据,以便可以在不进行远程过程调用来检索相关数据的情况下对其进行充实。
分区支持允许根据内容将负载路由到流数据管道中的下游应用程序实例。当您希望下游应用程序实例处理来自上游应用程序特定分区的数据时,这尤其有用。例如,如果数据管道中的处理器应用程序正在根据负载中的唯一标识符(例如 customerId
)执行操作,则可以根据该唯一标识对流进行分区。
流分区属性
您可以在流部署期间传递以下分区属性,以声明方式配置分区策略,从而将每条消息路由到特定的消费者实例。
以下列表显示了部署分区流的各种方法
app.[应用/标签名称].producer.partitionKeyExtractorClass
:PartitionKeyExtractorStrategy
的类名(默认值:null)。app.[应用/标签名称].producer.partitionKeyExpression
:根据消息评估以确定分区键的 SpEL(Spring 表达式语言)表达式。仅当partitionKeyExtractorClass
为 null 时才适用。如果两者均为 null,则不进行应用程序分区(默认值:null)。app.[应用/标签名称].producer.partitionSelectorClass
:PartitionSelectorStrategy
的类名(默认值:null)。app.[应用/标签名称].producer.partitionSelectorExpression
:根据分区键评估以确定消息路由到的分区索引的 SpEL 表达式。最终分区索引是返回值(整数)模[nextModule].count
。如果类和表达式均为 null,则将基础绑定器的默认PartitionSelectorStrategy
应用于键(默认值:null)。
总之,如果应用程序的部署实例计数 >1 并且前一个应用程序具有 partitionKeyExtractorClass
或 partitionKeyExpression
(partitionKeyExtractorClass
优先),则对该应用程序进行分区。提取分区键后,将通过调用 partitionSelectorClass
(如果存在)或 partitionSelectorExpression % partitionCount
来确定分区应用程序实例。partitionCount
是应用程序计数(在 RabbitMQ 的情况下)或主题的基础分区计数(在 Kafka 的情况下)。
如果 partitionSelectorClass
和 partitionSelectorExpression
均不存在,则结果为 key.hashCode() % partitionCount
。
部署具有分区下游应用程序的流
您可以使用安装指南来设置 Spring Cloud Data Flow 和 Spring Cloud Skipper 服务器。
在本例中,我们使用开箱即用的 http
、splitter
和 log
应用程序。
创建流
本节介绍如何创建和部署分区流。
考虑以下流
- 一个
http
源应用程序在端口 9001 上监听传入的句子。 - 一个
splitter
处理器应用程序将句子拆分为单词,并根据其哈希值对单词进行分区(使用payload
作为partitionKeyExpression
)。 - 一个
log
接收器应用程序被扩展为运行三个应用程序实例,并且每个实例都应该从上游接收唯一的哈希值。
要创建此流
-
在 Spring Cloud Data Flow Dashboard UI 中,从左侧导航栏中选择
Streams
。这样做将显示主 Streams 视图,如下图所示 -
选择
CREATE STREAM(S)
以显示用于创建流定义的图形编辑器,如下图所示您可以在左侧面板中看到
Source
、Processor
和Sink
应用程序(先前已注册)。 - 将每个应用程序拖放到画布上。
-
使用手柄将它们连接在一起。
请注意顶部文本面板中对应的 Data Flow DSL 定义。您也可以输入 Stream DSL 文本,如下所示
words=http --server.port=9001 | splitter --expression=payload.split(' ') | log
- 点击
CREATE STREAM(S)
。
部署流
点击流行左侧的省略号图标以部署流。这样做会将您带到“部署流”页面,您可以在其中输入其他部署属性。
对于此流,我们需要指定以下内容
- 上游应用程序的分区标准
- 下游应用程序计数
在我们的例子中,我们需要设置以下属性
app.splitter.producer.partitionKeyExpression=payload
deployer.log.count=3
在 Dashboard 的流部署页面中,您可以输入
producer.partitionKeyExpression
:为splitter
应用程序将其设置为payload
。count
:为log
应用程序将其设置为3
。
然后点击 **DEPLOY STREAM**,如下图所示
您可以从 Runtime
页面检查流的状态。
当所有应用程序都在运行时,流就成功部署了,如下图所示
所有应用程序运行后,我们就可以开始向 http
源发送数据了。
您可以使用以下 curl
命令发送一些数据
curl -X POST https://#:9001 -H "Content-Type: text/plain" -d "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
要访问 log
应用程序实例的日志文件,请点击 Runtime
并点击 log
应用程序名称(words.log-v1
)以查看每个 log
应用程序实例的 stdout 日志文件位置。
您可以跟踪每个 log
应用程序实例的 stdout 文件。
从日志中,您可以看到来自 splitter
应用程序的输出数据已被分区并由 log
应用程序实例接收。
以下清单显示了 log
实例 1 的日志输出
2019-05-10 20:59:58.574 INFO 13673 --- [itter.words-0-1] log-sink : much
2019-05-10 20:59:58.587 INFO 13673 --- [itter.words-0-1] log-sink : wood
2019-05-10 20:59:58.600 INFO 13673 --- [itter.words-0-1] log-sink : would
2019-05-10 20:59:58.604 INFO 13673 --- [itter.words-0-1] log-sink : if
2019-05-10 20:59:58.609 INFO 13673 --- [itter.words-0-1] log-sink : wood
以下清单显示了 log
实例 2 的日志输出
2019-05-10 20:59:58.579 INFO 13674 --- [itter.words-1-1] log-sink : a
2019-05-10 20:59:58.589 INFO 13674 --- [itter.words-1-1] log-sink : chuck
2019-05-10 20:59:58.595 INFO 13674 --- [itter.words-1-1] log-sink : a
2019-05-10 20:59:58.598 INFO 13674 --- [itter.words-1-1] log-sink : could
2019-05-10 20:59:58.602 INFO 13674 --- [itter.words-1-1] log-sink : chuck
以下清单显示了 log
实例 3 的日志输出
2019-05-10 20:59:58.573 INFO 13675 --- [itter.words-2-1] log-sink : How
2019-05-10 20:59:58.582 INFO 13675 --- [itter.words-2-1] log-sink : woodchuck
2019-05-10 20:59:58.586 INFO 13675 --- [itter.words-2-1] log-sink : woodchuck