RabbitMQ 作为源和接收器

从 RabbitMQ 读取数据并写入数据是一个非常常见的用例。特别是,当源和接收器应用程序使用 Spring Cloud Stream 的 RabbitMQ 绑定器实现时,配置可能会令人困惑。本案例的目标是逐步解开复杂性。

在我们开始之前,我们先描述一下用例需求。

作为用户,我希望

  • 从运行在外部 RabbitMQ 集群中的队列中消费 String 类型的负载。
  • 对于每个负载,我希望通过将接收到的 String 转换为大写来对其进行转换。
  • 最后,我希望将转换后的负载发布到另一个队列,该队列也运行在外部 RabbitMQ 集群中。

为了使它更有趣,我们还在源、处理器和接收器应用程序中使用了 Spring Cloud Stream 的 RabbitMQ 绑定器实现。

配置

此用例需要两个级别的 RabbitMQ 配置。

  • 配置 RabbitMQ 源和接收器应用程序以连接到外部 RabbitMQ 集群。
  • 在源、处理器和接收器应用程序中配置 RabbitMQ 绑定器属性。我们使用本地运行的 RabbitMQ(地址为 127.0.0.1,也称为 localhost)作为绑定器。

先决条件

  1. 下载 rabbit-sourcetransform-processorrabbit-sink 应用程序。

    wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-source-rabbit/2.1.0.RELEASE/rabbit-source-rabbit-2.1.0.RELEASE.jar
    wget https://repo.spring.io/release/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.RELEASE/transform-processor-rabbit-2.1.0.RELEASE.jar
    wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-sink-rabbit/2.1.0.RELEASE/rabbit-sink-rabbit-2.1.0.RELEASE.jar
  2. 127.0.0.1 本地启动 RabbitMQ。
  3. 设置外部 RabbitMQ 集群并准备集群连接凭据。

部署

完成上一步的所有先决条件后,我们现在可以启动这三个应用程序。

要启动源应用程序,请运行以下命令

java -jar rabbit-source-rabbit-2.1.0.RELEASE.jar --server.port=9001 --rabbit.queues=sabbyfooz --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.output.destination=rabzysrc

外部 RabbitMQ 集群凭据通过 --spring.rabbitmq.* 属性提供。绑定器配置通过 --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.* 属性提供。前缀 spring.cloud.stream.binders 指的是绑定器配置属性,而名称 rabbitBinder 是为此绑定器配置选择的配置名称。您必须将 <USER><PASSWORD><HOST><PORT> 替换为外部集群凭据。这就是将两个不同的 RabbitMQ 凭据传递到同一个应用程序的方式;一个用于实际数据,另一个用于绑定器配置。

  • sabbyfooz 是我们将从中轮询新数据的队列。
  • rabzysrc 是轮询数据将发布到的目标。

处理器

要启动处理器应用程序,请运行以下命令

java -jar transform-processor-rabbit-2.1.0.RELEASE.jar --server.port=9002 --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysrc --spring.cloud.stream.bindings.output.destination=rabzysink --transformer.expression='''payload.toUpperCase()'''
  • rabzysrc 是我们将从源应用程序接收新数据的目标。
  • rabzysink 是转换后的数据将发布到的目标。

接收器

要启动接收器应用程序,请运行以下命令

java -jar rabbit-sink-rabbit-2.1.0.RELEASE.jar --server.port=9003 --rabbit.exchange=sabbyexchange --rabbit.routing-key=foo --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysink

外部 RabbitMQ 集群凭据通过 --spring.rabbitmq.* 属性提供。绑定器配置通过 --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.* 属性提供。前缀 spring.cloud.stream.binders 指的是绑定器配置属性,而名称 rabbitBinder 是为此绑定器配置选择的配置名称。您必须将 <USER><PASSWORD><HOST><PORT> 替换为外部集群凭据。这就是将两个不同的 RabbitMQ 凭据传递到同一个应用程序的方式;一个用于实际数据,另一个用于绑定器配置。

  • rabzysink 是将接收转换后数据的目标。
  • 带有 foo 路由键的 sabbyexchange 是数据最终到达的位置。

测试

本节介绍如何测试您的流。

发布测试数据

要发布测试数据以便您进行验证

  1. 转到外部 RabbitMQ 集群的管理控制台。
  2. 从队列列表中导航到 sabbyfooz 队列。
  3. 单击 发布消息 以发布测试消息(hello, rabbit!)。

Publish Test Message

验证结果

要验证您发布的数据

  1. 转到外部 RabbitMQ 集群的管理控制台。
  2. 在此示例中,具有 foo 路由键的 sabbyexchange 绑定到 sabbybaaz 队列。然后,您可以从队列列表中导航到该队列。 Exchange and Queue Binding
  3. 单击 获取消息 以接收传入消息。
  4. 确认有效负载已从小写转换为大写(即 HELLO, RABBIT!)。

Publish Test Message

您已完成!演示到此结束。