RabbitMQ 作为源和接收器
从 RabbitMQ 读取数据并写入数据是一个非常常见的用例。特别是,当源和接收器应用程序使用 Spring Cloud Stream 的 RabbitMQ 绑定器实现时,配置可能会令人困惑。本案例的目标是逐步解开复杂性。
在我们开始之前,我们先描述一下用例需求。
作为用户,我希望
- 从运行在外部 RabbitMQ 集群中的队列中消费
String
类型的负载。 - 对于每个负载,我希望通过将接收到的
String
转换为大写来对其进行转换。 - 最后,我希望将转换后的负载发布到另一个队列,该队列也运行在外部 RabbitMQ 集群中。
为了使它更有趣,我们还在源、处理器和接收器应用程序中使用了 Spring Cloud Stream 的 RabbitMQ 绑定器实现。
配置
此用例需要两个级别的 RabbitMQ 配置。
- 配置 RabbitMQ 源和接收器应用程序以连接到外部 RabbitMQ 集群。
- 在源、处理器和接收器应用程序中配置 RabbitMQ 绑定器属性。我们使用本地运行的 RabbitMQ(地址为
127.0.0.1
,也称为localhost
)作为绑定器。
先决条件
-
下载
rabbit-source
、transform-processor
和rabbit-sink
应用程序。wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-source-rabbit/2.1.0.RELEASE/rabbit-source-rabbit-2.1.0.RELEASE.jar
wget https://repo.spring.io/release/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.RELEASE/transform-processor-rabbit-2.1.0.RELEASE.jar
wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-sink-rabbit/2.1.0.RELEASE/rabbit-sink-rabbit-2.1.0.RELEASE.jar
- 在
127.0.0.1
本地启动 RabbitMQ。 - 设置外部 RabbitMQ 集群并准备集群连接凭据。
部署
完成上一步的所有先决条件后,我们现在可以启动这三个应用程序。
源
要启动源应用程序,请运行以下命令
java -jar rabbit-source-rabbit-2.1.0.RELEASE.jar --server.port=9001 --rabbit.queues=sabbyfooz --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.output.destination=rabzysrc
外部 RabbitMQ 集群凭据通过 --spring.rabbitmq.*
属性提供。绑定器配置通过 --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.*
属性提供。前缀 spring.cloud.stream.binders
指的是绑定器配置属性,而名称 rabbitBinder
是为此绑定器配置选择的配置名称。您必须将 <USER>
、<PASSWORD>
、<HOST>
和 <PORT>
替换为外部集群凭据。这就是将两个不同的 RabbitMQ 凭据传递到同一个应用程序的方式;一个用于实际数据,另一个用于绑定器配置。
sabbyfooz
是我们将从中轮询新数据的队列。rabzysrc
是轮询数据将发布到的目标。
处理器
要启动处理器应用程序,请运行以下命令
java -jar transform-processor-rabbit-2.1.0.RELEASE.jar --server.port=9002 --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysrc --spring.cloud.stream.bindings.output.destination=rabzysink --transformer.expression='''payload.toUpperCase()'''
rabzysrc
是我们将从源应用程序接收新数据的目标。rabzysink
是转换后的数据将发布到的目标。
接收器
要启动接收器应用程序,请运行以下命令
java -jar rabbit-sink-rabbit-2.1.0.RELEASE.jar --server.port=9003 --rabbit.exchange=sabbyexchange --rabbit.routing-key=foo --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysink
外部 RabbitMQ 集群凭据通过 --spring.rabbitmq.*
属性提供。绑定器配置通过 --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.*
属性提供。前缀 spring.cloud.stream.binders
指的是绑定器配置属性,而名称 rabbitBinder
是为此绑定器配置选择的配置名称。您必须将 <USER>
、<PASSWORD>
、<HOST>
和 <PORT>
替换为外部集群凭据。这就是将两个不同的 RabbitMQ 凭据传递到同一个应用程序的方式;一个用于实际数据,另一个用于绑定器配置。
rabzysink
是将接收转换后数据的目标。- 带有
foo
路由键的sabbyexchange
是数据最终到达的位置。
测试
本节介绍如何测试您的流。
发布测试数据
要发布测试数据以便您进行验证
- 转到外部 RabbitMQ 集群的管理控制台。
- 从队列列表中导航到
sabbyfooz
队列。 - 单击
发布消息
以发布测试消息(hello, rabbit!
)。
验证结果
要验证您发布的数据
- 转到外部 RabbitMQ 集群的管理控制台。
- 在此示例中,具有
foo
路由键的sabbyexchange
绑定到sabbybaaz
队列。然后,您可以从队列列表中导航到该队列。 - 单击
获取消息
以接收传入消息。 - 确认有效负载已从小写转换为大写(即
HELLO, RABBIT!
)。
您已完成!演示到此结束。