数据分区

分区是状态处理中的一个关键概念,无论是出于性能还是一致性原因,都要确保所有相关数据一起处理。例如,在时间窗口平均值计算示例中,重要的是来自任何给定传感器的所有测量值都由同一个应用程序实例处理。或者,您可能希望缓存与传入事件相关的某些数据,以便可以在不进行远程过程调用来检索相关数据的情况下对其进行充实。

分区支持允许根据内容将负载路由到流数据管道中的下游应用程序实例。当您希望下游应用程序实例处理来自上游应用程序特定分区的数据时,这尤其有用。例如,如果数据管道中的处理器应用程序正在根据负载中的唯一标识符(例如 customerId)执行操作,则可以根据该唯一标识对流进行分区。

流分区属性

您可以在流部署期间传递以下分区属性,以声明方式配置分区策略,从而将每条消息路由到特定的消费者实例。

以下列表显示了部署分区流的各种方法

  • app.[应用/标签名称].producer.partitionKeyExtractorClassPartitionKeyExtractorStrategy 的类名(默认值:null)。
  • app.[应用/标签名称].producer.partitionKeyExpression:根据消息评估以确定分区键的 SpEL(Spring 表达式语言)表达式。仅当 partitionKeyExtractorClass 为 null 时才适用。如果两者均为 null,则不进行应用程序分区(默认值:null)。
  • app.[应用/标签名称].producer.partitionSelectorClassPartitionSelectorStrategy 的类名(默认值:null)。
  • app.[应用/标签名称].producer.partitionSelectorExpression:根据分区键评估以确定消息路由到的分区索引的 SpEL 表达式。最终分区索引是返回值(整数)模 [nextModule].count。如果类和表达式均为 null,则将基础绑定器的默认 PartitionSelectorStrategy 应用于键(默认值:null)。

总之,如果应用程序的部署实例计数 >1 并且前一个应用程序具有 partitionKeyExtractorClasspartitionKeyExpressionpartitionKeyExtractorClass 优先),则对该应用程序进行分区。提取分区键后,将通过调用 partitionSelectorClass(如果存在)或 partitionSelectorExpression % partitionCount 来确定分区应用程序实例。partitionCount 是应用程序计数(在 RabbitMQ 的情况下)或主题的基础分区计数(在 Kafka 的情况下)。

如果 partitionSelectorClasspartitionSelectorExpression 均不存在,则结果为 key.hashCode() % partitionCount

部署具有分区下游应用程序的流

您可以使用安装指南来设置 Spring Cloud Data Flow 和 Spring Cloud Skipper 服务器。

在本例中,我们使用开箱即用的 httpsplitterlog 应用程序。

创建流

本节介绍如何创建和部署分区流。

考虑以下流

  • 一个 http 源应用程序在端口 9001 上监听传入的句子。
  • 一个 splitter 处理器应用程序将句子拆分为单词,并根据其哈希值对单词进行分区(使用 payload 作为 partitionKeyExpression)。
  • 一个 log 接收器应用程序被扩展为运行三个应用程序实例,并且每个实例都应该从上游接收唯一的哈希值。

要创建此流

  1. 在 Spring Cloud Data Flow Dashboard UI 中,从左侧导航栏中选择 Streams。这样做将显示主 Streams 视图,如下图所示

    Create stream

  2. 选择 CREATE STREAM(S) 以显示用于创建流定义的图形编辑器,如下图所示

    Create partitioned stream definition

    您可以在左侧面板中看到 SourceProcessorSink 应用程序(先前已注册)。

  3. 将每个应用程序拖放到画布上。
  4. 使用手柄将它们连接在一起。

    请注意顶部文本面板中对应的 Data Flow DSL 定义。您也可以输入 Stream DSL 文本,如下所示

    words=http --server.port=9001 | splitter --expression=payload.split(' ') | log
  5. 点击 CREATE STREAM(S)

部署流

点击流行左侧的省略号图标以部署流。这样做会将您带到“部署流”页面,您可以在其中输入其他部署属性。

对于此流,我们需要指定以下内容

  • 上游应用程序的分区标准
  • 下游应用程序计数

在我们的例子中,我们需要设置以下属性

app.splitter.producer.partitionKeyExpression=payload
deployer.log.count=3

在 Dashboard 的流部署页面中,您可以输入

  • producer.partitionKeyExpression:为 splitter 应用程序将其设置为 payload
  • count:为 log 应用程序将其设置为 3

然后点击 **DEPLOY STREAM**,如下图所示

Deploy stream

您可以从 Runtime 页面检查流的状态。

当所有应用程序都在运行时,流就成功部署了,如下图所示

Stream deployed

所有应用程序运行后,我们就可以开始向 http 源发送数据了。

您可以使用以下 curl 命令发送一些数据

curl -X POST https://#:9001 -H "Content-Type: text/plain" -d "How much wood would a woodchuck chuck if a woodchuck could chuck wood"

要访问 log 应用程序实例的日志文件,请点击 Runtime 并点击 log 应用程序名称(words.log-v1)以查看每个 log 应用程序实例的 stdout 日志文件位置。

您可以跟踪每个 log 应用程序实例的 stdout 文件。

从日志中,您可以看到来自 splitter 应用程序的输出数据已被分区并由 log 应用程序实例接收。

以下清单显示了 log 实例 1 的日志输出

2019-05-10 20:59:58.574  INFO 13673 --- [itter.words-0-1] log-sink                                 : much
2019-05-10 20:59:58.587  INFO 13673 --- [itter.words-0-1] log-sink                                 : wood
2019-05-10 20:59:58.600  INFO 13673 --- [itter.words-0-1] log-sink                                 : would
2019-05-10 20:59:58.604  INFO 13673 --- [itter.words-0-1] log-sink                                 : if
2019-05-10 20:59:58.609  INFO 13673 --- [itter.words-0-1] log-sink                                 : wood

以下清单显示了 log 实例 2 的日志输出

2019-05-10 20:59:58.579  INFO 13674 --- [itter.words-1-1] log-sink                                 : a
2019-05-10 20:59:58.589  INFO 13674 --- [itter.words-1-1] log-sink                                 : chuck
2019-05-10 20:59:58.595  INFO 13674 --- [itter.words-1-1] log-sink                                 : a
2019-05-10 20:59:58.598  INFO 13674 --- [itter.words-1-1] log-sink                                 : could
2019-05-10 20:59:58.602  INFO 13674 --- [itter.words-1-1] log-sink                                 : chuck

以下清单显示了 log 实例 3 的日志输出

2019-05-10 20:59:58.573  INFO 13675 --- [itter.words-2-1] log-sink                                 : How
2019-05-10 20:59:58.582  INFO 13675 --- [itter.words-2-1] log-sink                                 : woodchuck
2019-05-10 20:59:58.586  INFO 13675 --- [itter.words-2-1] log-sink                                 : woodchuck