本地部署
如果您已配置并构建示例流应用程序并将其配置为使用其中一个受支持的消息代理运行,则可以在本地
环境中将它们作为独立应用程序运行。
如果您尚未这样做,则可以在本地环境中下载并运行Kafka
,或使用docker-compose
将Kafka
和Zookeeper
作为容器运行。
安装并运行 Kafka
下载并解压缩该存档。然后从 Kafka 安装目录运行以下命令以启动ZooKeeper
和Kafka
服务器
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
使用 Docker Compose 运行
或者,您可以下载适用于 Docker 的Confluent 平台试用版
curl --silent --output docker-compose.yml \
https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.0-post/cp-all-in-one/docker-compose.yml
要启动平台
docker-compose up -d
除了 Kafka 代理之外,这还会安装其他 Confluent 组件,包括控制中心。在http://localhost:9021访问控制中心,然后选择主题
以显示发布到主题的消息。
运行源
通过使用预定义的UsageDetailSender
配置属性,您可以运行应用程序,如下所示
java -jar target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar &
如果您安装了 Kafka 二进制文件,则可以使用 Kafka 控制台消费者查看发送到usage-detail
Kafka 主题的消息,如下所示
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-detail
如果您安装了 Confluent 平台,请按照上述说明访问控制中心。选择usage-detail
主题和消息
选项卡。您应该会看到如下内容
要列出主题,请运行以下命令
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
运行处理器
通过使用 UsageCostProcessor
的预定义配置属性,您可以运行应用程序,如下所示
java -jar target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar &
借助 UsageDetailSender
源应用程序中 usage-detail
Kafka 主题中的 UsageDetail
数据,您可以看到 usage-cost
Kafka 主题中的 UsageCostDetail
,如下所示
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-cost
或者,使用控制中心
运行接收器
通过使用 UsageCostLogger
的预定义配置属性,您可以运行应用程序,如下所示
java -jar target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar &
现在您可以看到此应用程序记录了使用成本详细信息。
...
2021-03-05 10:44:05.193 INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger : {"userId": "user2", "callCost": "0.7000000000000001", "dataCost": "21.8" }
2021-03-05 10:44:05.193 INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger : {"userId": "user2", "callCost": "0.7000000000000001", "dataCost": "21.8" }
2021-03-05 10:44:06.195 INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger : {"userId": "user4", "callCost": "0.5", "dataCost": "24.55" }
2021-03-05 10:44:06.199 INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger : {"userId": "user4", "callCost": "0.5", "dataCost": "24.55" }
...
清理
如果您使用 docker-compose
启动了 Confluent 平台,要清理
docker-compose stop
使用 RabbitMQ 进行本地部署
如果您已配置并构建示例流应用程序以使用 RabbitMQ 运行,则可以在 本地
环境中将这些应用程序作为独立应用程序运行。
安装并运行 RabbitMQ
要安装和运行 RabbitMQ
docker 镜像,请运行以下命令
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7.14-management
安装完成后,您可以在本地机器上通过以下地址登录到 RabbitMQ 管理控制台:http://localhost:15672。 您可以使用默认帐户用户名和密码:guest
和 guest
。
运行 UsageDetailSender
源代码
通过使用预定义的UsageDetailSender
配置属性,您可以运行应用程序,如下所示
java -jar target/usage-detail-sender-rabbit-0.0.1-SNAPSHOT.jar &
当此应用程序运行时,您可以看到 usage-detail
RabbitMQ 交换器已创建,并且名为 usage-detail.usage-cost-consumer
的队列已绑定到此交换器,如下所示
此外,如果您单击 队列
并检查队列 usage-detail.usage-cost-consumer
,您可以看到正在被消费并存储在此持久队列中的消息,如下所示
为这个 源代码
应用程序配置消费者应用程序时,您可以设置 组
绑定属性以连接到相应的持久队列。
如果您没有设置 requiredGroups
属性,您会发现没有用于消费来自 usage-detail
交换器的消息的 队列
,因此,如果在启动此应用程序之前消费者未启动,则消息将会丢失。
运行处理器
通过使用 UsageCostProcessor
的预定义配置属性,您可以运行应用程序,如下所示
java -jar target/usage-cost-processor-rabbit-0.0.1-SNAPSHOT.jar &
在 RabbitMQ 控制台中,您可以看到
UsageCostProcessor
应用程序根据spring.cloud.stream.bindings.input.group=usage-cost-consumer
属性从usage-detail.usage-cost-consumer
持久队列中消费消息。UsageCostProcessor
应用程序生成UsageCostDetail
并将其发送到交换器usage-cost
,这基于spring.cloud.stream.bindings.output.destination=usage-cost
属性。- 创建了
usage-cost.logger
持久队列。 它根据spring.cloud.stream.bindings.output.producer.requiredGroups=logger
属性消费来自usage-cost
交换器的消息。
当此应用程序运行时,您可以看到 usage-cost
RabbitMQ 交换器已创建,并且名为 usage-cost.logger
的持久队列已绑定到此交换器,如下所示
此外,如果您单击 队列
并检查 usage-cost.logger
队列,您可以看到正在被消费并存储在此持久队列中的消息,如下所示
运行接收器
通过使用 UsageCostLogger
的预定义配置属性,您可以运行应用程序,如下所示
java -jar target/usage-cost-logger-rabbit-0.0.1-SNAPSHOT.jar &
现在您可以看到,此应用程序通过 usage-cost.logger
持久队列记录它从 usage-cost
RabbitMQ 交换接收到的使用成本明细,如下所示:
2019-05-08 08:16:46.442 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger : {"userId": "user2", "callCost": "28.3", "dataCost": "29.8" }
2019-05-08 08:16:47.446 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger : {"userId": "user2", "callCost": "12.0", "dataCost": "23.75" }
2019-05-08 08:16:48.451 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger : {"userId": "user4", "callCost": "16.0", "dataCost": "30.05" }
2019-05-08 08:16:49.454 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger : {"userId": "user1", "callCost": "17.7", "dataCost": "18.0" }