本地部署

如果您已配置并构建示例流应用程序并将其配置为使用其中一个受支持的消息代理运行,则可以在本地环境中将它们作为独立应用程序运行。

如果您尚未这样做,则可以在本地环境中下载并运行Kafka,或使用docker-composeKafkaZookeeper作为容器运行。

安装并运行 Kafka

下载并解压缩该存档。然后从 Kafka 安装目录运行以下命令以启动ZooKeeperKafka服务器

./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &

使用 Docker Compose 运行

或者,您可以下载适用于 Docker 的Confluent 平台试用版

curl --silent --output docker-compose.yml \
  https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.0-post/cp-all-in-one/docker-compose.yml

要启动平台

docker-compose up -d

除了 Kafka 代理之外,这还会安装其他 Confluent 组件,包括控制中心。在http://localhost:9021访问控制中心,然后选择主题以显示发布到主题的消息。

运行源

通过使用预定义的UsageDetailSender配置属性,您可以运行应用程序,如下所示

java -jar target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar &

如果您安装了 Kafka 二进制文件,则可以使用 Kafka 控制台消费者查看发送到usage-detail Kafka 主题的消息,如下所示

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-detail

如果您安装了 Confluent 平台,请按照上述说明访问控制中心。选择usage-detail主题和消息选项卡。您应该会看到如下内容

Standalone Usage Detail Sender Kafka

要列出主题,请运行以下命令

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

运行处理器

通过使用 UsageCostProcessor预定义配置属性,您可以运行应用程序,如下所示

java -jar target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar  &

借助 UsageDetailSender 源应用程序中 usage-detail Kafka 主题中的 UsageDetail 数据,您可以看到 usage-cost Kafka 主题中的 UsageCostDetail,如下所示

 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-cost

或者,使用控制中心

Standalone Usage Cost Processer Kafka

运行接收器

通过使用 UsageCostLogger预定义配置属性,您可以运行应用程序,如下所示

java -jar target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar &

现在您可以看到此应用程序记录了使用成本详细信息。

...
2021-03-05 10:44:05.193  INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user2", "callCost": "0.7000000000000001", "dataCost": "21.8" }
2021-03-05 10:44:05.193  INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user2", "callCost": "0.7000000000000001", "dataCost": "21.8" }
2021-03-05 10:44:06.195  INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user4", "callCost": "0.5", "dataCost": "24.55" }
2021-03-05 10:44:06.199  INFO 55690 --- [container-0-C-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user4", "callCost": "0.5", "dataCost": "24.55" }
...

清理

如果您使用 docker-compose 启动了 Confluent 平台,要清理

docker-compose stop

使用 RabbitMQ 进行本地部署

如果您已配置并构建示例流应用程序以使用 RabbitMQ 运行,则可以在 本地环境中将这些应用程序作为独立应用程序运行。

安装并运行 RabbitMQ

要安装和运行 RabbitMQ docker 镜像,请运行以下命令

docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7.14-management

安装完成后,您可以在本地机器上通过以下地址登录到 RabbitMQ 管理控制台:http://localhost:15672。 您可以使用默认帐户用户名和密码:guestguest

运行 UsageDetailSender 源代码

通过使用预定义的UsageDetailSender配置属性,您可以运行应用程序,如下所示

java -jar target/usage-detail-sender-rabbit-0.0.1-SNAPSHOT.jar &

当此应用程序运行时,您可以看到 usage-detail RabbitMQ 交换器已创建,并且名为 usage-detail.usage-cost-consumer 的队列已绑定到此交换器,如下所示

Standalone Usage Detail Sender RabbitMQ

此外,如果您单击 队列 并检查队列 usage-detail.usage-cost-consumer,您可以看到正在被消费并存储在此持久队列中的消息,如下所示

Standalone Usage Detail Sender RabbitMQ Message Guarantee

为这个 源代码 应用程序配置消费者应用程序时,您可以设置 绑定属性以连接到相应的持久队列。

如果您没有设置 requiredGroups 属性,您会发现没有用于消费来自 usage-detail 交换器的消息的 队列,因此,如果在启动此应用程序之前消费者未启动,则消息将会丢失。

运行处理器

通过使用 UsageCostProcessor预定义配置属性,您可以运行应用程序,如下所示

java -jar target/usage-cost-processor-rabbit-0.0.1-SNAPSHOT.jar &

在 RabbitMQ 控制台中,您可以看到

  • UsageCostProcessor 应用程序根据 spring.cloud.stream.bindings.input.group=usage-cost-consumer 属性从 usage-detail.usage-cost-consumer 持久队列中消费消息。
  • UsageCostProcessor 应用程序生成 UsageCostDetail 并将其发送到交换器 usage-cost,这基于 spring.cloud.stream.bindings.output.destination=usage-cost 属性。
  • 创建了 usage-cost.logger 持久队列。 它根据 spring.cloud.stream.bindings.output.producer.requiredGroups=logger 属性消费来自 usage-cost 交换器的消息。

当此应用程序运行时,您可以看到 usage-cost RabbitMQ 交换器已创建,并且名为 usage-cost.logger 的持久队列已绑定到此交换器,如下所示

Standalone Usage Cost Processor RabbitMQ Required Groups

此外,如果您单击 队列 并检查 usage-cost.logger 队列,您可以看到正在被消费并存储在此持久队列中的消息,如下所示

Standalone Usage Cost Processor RabbitMQ Message Guarantee

运行接收器

通过使用 UsageCostLogger预定义配置属性,您可以运行应用程序,如下所示

java -jar target/usage-cost-logger-rabbit-0.0.1-SNAPSHOT.jar &

现在您可以看到,此应用程序通过 usage-cost.logger 持久队列记录它从 usage-cost RabbitMQ 交换接收到的使用成本明细,如下所示:

2019-05-08 08:16:46.442  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user2", "callCost": "28.3", "dataCost": "29.8" }
2019-05-08 08:16:47.446  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user2", "callCost": "12.0", "dataCost": "23.75" }
2019-05-08 08:16:48.451  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user4", "callCost": "16.0", "dataCost": "30.05" }
2019-05-08 08:16:49.454  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLogger     : {"userId": "user1", "callCost": "17.7", "dataCost": "18.0" }