在本指南中,我们将开发三个使用 Spring Cloud Stream 的 Spring Boot 应用程序,并将它们部署到 Cloud Foundry、Kubernetes 和本地机器。在另一个指南中,我们将使用 Data Flow 部署这些应用程序。通过手动部署应用程序,您可以更好地了解 Data Flow 为您自动执行的步骤。

以下部分描述了如何从头开始构建这些应用程序。

如果您愿意,可以下载包含这些应用程序源代码的 zip 文件,解压缩,构建它,然后继续部署部分。

您可以从浏览器下载包含所有三个应用程序的项目。您也可以使用命令行,如下例所示

wget 'https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true' -O usage-cost-stream-sample.zip

构建下载的示例

可以使用相同的代码库将流应用程序配置为使用 Kafka 代理或 RabbitMQ 运行。唯一的区别在于可执行 jar 文件。为了使它们与 Kafka 代理一起工作,它们需要 Kafka binder 依赖项(默认情况下启用)。对于 RabbitMQ,它们需要 Rabbit binder。

要为 Kafka 构建示例流应用程序,请从根项目目录

$./mvnw clean package -Pkafka

要为 RabbitMQ 构建示例流应用程序,请从根项目目录

$./mvnw clean package -Prabbit

开发

我们创建了三个使用配置的绑定器进行通信的 Spring Cloud Stream 应用程序。

该场景是一家手机公司为其客户创建账单。用户拨打的每个电话都有一个 duration 和通话期间使用的 data 量。作为生成账单过程的一部分,需要将原始通话数据转换为通话时长费用和使用数据量费用。

使用包含通话 duration 和通话期间使用的数据量 dataUsageDetail 类对通话进行建模。使用包含通话费用(costCall)和数据费用(costData)的 UsageCostDetail 类对账单进行建模。每个类都包含一个 ID(userId)来标识打电话的人。

三个流应用程序如下

  • Source 应用程序(名为 UsageDetailSender)生成用户的通话 duration 和每个 userId 使用的数据量 data,并发送一条包含 UsageDetail 对象作为 JSON 的消息。
  • Processor 应用程序(名为 UsageCostProcessor)使用 UsageDetail 并计算每个 userId 的通话费用和数据费用。它发送 UsageCostDetail 对象作为 JSON。
  • Sink 应用程序(名为 UsageCostLogger)使用 UsageCostDetail 对象并记录通话和数据的费用。

来源

在此步骤中,我们创建 UsageDetailSender 源。

您可以 直接下载 Spring Initializr 生成的项目

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名为 io.spring.dataflow.sample,工件名为 usage-detail-sender-kafka,包为 io.spring.dataflow.sample.usagedetailsender
  2. 在“依赖项”文本框中,键入 Kafka 以选择 Kafka binder 依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 点击“生成项目”按钮。

现在,您应该 解压缩 usage-detail-sender-kafka.zip 文件并将项目导入您喜欢的 IDE。

当使用 Kafka 作为消息代理时,您可以选择许多配置选项来扩展或覆盖以实现所需的运行时行为。 Kafka binder 文档 列出了 Kafka binder 配置属性。

您可以 直接下载 Spring Initializr 生成的项目

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名为 io.spring.dataflow.sample,项目名称为 usage-detail-sender-rabbit,包为 io.spring.dataflow.sample.usagedetailsender
  2. 在“依赖项”文本框中,键入 RabbitMQ 以选择 RabbitMQ binder 依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 点击“生成项目”按钮。

现在,您应该 解压缩 usage-detail-sender-rabbit.zip 文件并将项目导入您喜欢的 IDE。

持久化队列

默认情况下,Spring Cloud Stream 消费者应用程序会创建一个 匿名 自动删除队列。如果生产者应用程序在消费者应用程序之前启动,这可能会导致消息未被存储和转发。即使交换机是持久化的,我们也需要一个 持久化 队列绑定到交换机,以便存储消息以供以后消费。因此,为了保证消息传递,您需要一个 持久化 队列。

要预先创建持久化队列并将其绑定到交换机,生产者应用程序应设置以下属性

spring.cloud.stream.bindings.<channelName>.producer.requiredGroups

requiredGroups 属性接受生产者必须确保消息传递到的组的逗号分隔列表。设置此属性后,将使用 <exchange>.<requiredGroup> 格式创建一个持久化队列。

在使用 RabbitMQ 作为消息代理时,您可以选择许多配置选项来扩展或覆盖以实现所需的运行时行为。RabbitMQ Binder 文档列出了 RabbitMQ binder 配置属性。

业务逻辑

现在我们可以创建此应用程序所需的代码。为此

  1. io.spring.dataflow.sample.usagedetailsender 包中创建一个 UsageDetail 类,其内容类似于 UsageDetail.java 中的内容。UsageDetail 类包含 userIddataduration 属性。
  2. io.spring.dataflow.sample.usagedetailsender 包中创建 UsageDetailSender 类。它应该类似于以下列表
package io.spring.dataflow.sample.usagedetailsender;

import java.util.Random;
import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageDetailSender {

	private String[] users = {"user1", "user2", "user3", "user4", "user5"};

	@Bean
	public Supplier<UsageDetail> sendEvents() {
		return () -> {
			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId(this.users[new Random().nextInt(5)]);
			usageDetail.setDuration(new Random().nextInt(300));
			usageDetail.setData(new Random().nextInt(700));
			return usageDetail;
		};
	}
}

sendEvents Supplier 提供了一个填充了随机值的 UsageDetail 对象。Spring Cloud Stream 自动绑定此函数以将其数据发送到配置的输出目标。Spring Cloud Stream 还为任何 Supplier 配置了一个默认轮询器,该轮询器默认情况下将每秒调用一次该函数。

配置

配置 source 应用程序时,我们需要设置 output 绑定目标(RabbitMQ 交换器或 Kafka 主题的名称),生产者将在其中发布数据。

为了方便起见,我们将函数输出绑定名称 sendEvents-out-0(表示与 sendEvents 函数的第一个输出参数对应的输出)别名为逻辑名称 output。或者,我们可以直接绑定输出绑定名称:spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail。有关更详细的说明,请参阅函数绑定名称

src/main/resources/application.properties 中,添加以下属性

spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

构建

现在我们可以构建使用情况详细信息发送器应用程序了。

usage-detail-sender 根目录中,使用以下命令使用 maven 构建项目

./mvnw clean package

测试

Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序。

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

TestChannelBinderConfiguration 提供了一个内存绑定器实现,用于跟踪和测试应用程序的出站和入站消息,而不是消息代理绑定器实现。测试配置包括 InputDestinationOutputDestination bean,用于发送和接收消息。要对 UsageDetailSender 应用程序进行单元测试,请在 UsageDetailSenderApplicationTests 类中添加以下代码。

package io.spring.dataflow.sample.usagedetailsender;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageDetailSenderApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageDetailSender() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageDetailSenderApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			UsageDetail usageDetail = (UsageDetail) converter
					.fromMessage(sourceMessage, UsageDetail.class);

			assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
			assertThat(usageDetail.getData()).isBetween(0L, 700L);
			assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
		}
	}
}
  • contextLoads 测试用例验证应用程序是否成功启动。
  • testUsageDetailSender 测试用例使用 OutputDestination 接收和验证 UsageDetailSender 发送的消息。

内存测试绑定器的行为与任何消息代理绑定器实现完全相同。值得注意的是,在 Spring Cloud Stream 应用程序中,消息有效负载始终是一个字节数组,默认情况下编码为 JSON。使用应用程序在其输入通道上接收字节,并根据内容类型自动委托给适当的 MessageConverter,以转换字节以匹配使用函数的参数类型,在本例中为 UsageDetail。对于测试,我们需要显式执行此步骤。或者,我们可以直接调用 JSON 解析器,而不是使用 MessageConverter

处理器

在此步骤中,我们将创建 UsageCostProcessor 处理器。

您可以直接下载 Spring Initializr 生成的项目

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名为 io.spring.dataflow.sample,工件名为 usage-cost-processor-kafka,包名为 io.spring.dataflow.sample.usagecostprocessor
  2. 在“依赖项”文本框中,键入 Kafka 以选择 Kafka binder 依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 点击“生成项目”按钮。

现在,您应该解压缩 usage-cost-processor-kafka.zip 文件,并将项目导入您喜欢的 IDE。

您可以直接下载 Spring Initializr 生成的项目

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名为 io.spring.dataflow.sample,工件名为 usage-cost-processor-rabbit,包名为 io.spring.dataflow.sample.usagecostprocessor
  2. 在“依赖项”文本框中,键入 RabbitMQ 以选择 RabbitMQ binder 依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 点击“生成项目”按钮。

现在,您应该解压缩 usage-cost-processor-rabbit.zip 文件,并将项目导入您喜欢的 IDE。

业务逻辑

现在我们可以创建此应用程序所需的代码。为此

  1. io.spring.dataflow.sample.usagecostprocessor 中创建 UsageDetail 类。其内容类似于 UsageDetail.java 的内容。UsageDetail 类包含 userIddataduration 属性。
  2. 在 `io.spring.dataflow.sample.usagecostprocessor` 包中创建 `UsageCostDetail` 类。其内容类似于 UsageCostDetail.java 的内容。`UsageCostDetail` 类包含 `userId`、`callCost` 和 `dataCost` 属性。
  3. 在 `io.spring.dataflow.sample.usagecostprocessor` 包中创建 `UsageCostProcessor` 类,它接收 `UsageDetail` 消息,计算通话和数据成本,并发送 `UsageCostDetail` 消息。以下清单显示了源代码
package io.spring.dataflow.sample.usagecostprocessor;

import java.util.function.Function;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostProcessor {

	private double ratePerSecond = 0.1;

	private double ratePerMB = 0.05;

	@Bean
	public Function<UsageDetail, UsageCostDetail> processUsageCost() {
		return usageDetail -> {
			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId(usageDetail.getUserId());
			usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
			usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
			return usageCostDetail;
		};
	}
}

在前面的应用程序中,我们声明了一个 `Function` bean,它接受一个 `UsageDetail` 并返回一个 `UsageCostDetail`。Spring Cloud Stream 将发现此函数并将其输入和输出绑定到为消息中间件配置的目标。如上一节所述,Spring Cloud Stream 使用适当的 `MessageConverter` 来执行必要的类型转换。

配置

配置 `processor` 应用程序时,我们需要设置以下属性

  • 此应用程序订阅的 `input` 绑定目标(Kafka 主题或 RabbitMQ 交换器)。
  • 生产者将发布数据的 `output` 绑定目标。

对于生产应用程序,最好设置 `spring.cloud.stream.bindings.input.group` 以指定此消费者应用程序所属的消费者组。这确保了每个都使用自己的组 ID 标识的其他消费者应用程序将接收每条消息。每个消费者组可以扩展到多个实例以分配工作负载。Spring Cloud Stream 抽象了 Kafka 原生的此功能,以便将其扩展到 RabbitMQ 和其他绑定器实现。

src/main/resources/application.properties 中,添加以下属性

spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

为了方便起见,我们将函数绑定名称 `processUsageCost-in-0` 和 `processUsageCost-out-0` 分别别名为 `input` 和 `output`。

  • `spring.cloud.stream.bindings.input.destination` 属性将 `UsageCostProcessor` 对象的 `input` 绑定到 `usage-detail` 目标。
  • `spring.cloud.stream.bindings.output.destination` 属性将 `UsageCostProcessor` 对象的输出绑定到 `usage-cost` 目标。

输入目标必须与 Source 应用程序的输出目标相同。同样,输出目标必须与 Sink 的输入目标相同(如下所示)。

构建

现在我们可以构建使用成本处理器应用程序。 在 usage-cost-processor riit 目录中,使用以下命令使用 maven 构建项目

./mvnw clean package

测试

如上所述,Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

TestChannelBinderConfiguration 提供了一个内存中的绑定器实现,用于跟踪和测试应用程序的出站和入站消息。 测试配置包括 InputDestinationOutputDestination bean 来发送和接收消息。 要对 UsageCostProcessor 应用程序进行单元测试,请在 UsageCostProcessorApplicationTests 类中添加以下代码

package io.spring.dataflow.sample.usagecostprocessor;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageCostProcessorApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostProcessor() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId("user1");
			usageDetail.setDuration(30L);
			usageDetail.setData(100L);

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			Message<?> message = converter.toMessage(usageDetail, messageHeaders);

			source.send(message);

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000, "usage-cost");

			UsageCostDetail usageCostDetail = (UsageCostDetail) converter
					.fromMessage(sourceMessage, UsageCostDetail.class);

			assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
			assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
		}
	}
}
  • contextLoads 测试用例验证应用程序是否成功启动。
  • testUsageCostProcessor 测试用例使用 InputDestination 发送消息,并使用 OutputDestination 接收和验证消息。

接收器

在此步骤中,我们将创建 UsageCostLogger 接收器。

您可以 直接下载 Spring Initializr 生成的项目 并单击“生成项目”。

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名称为 io.spring.dataflow.sample,工件名称为 usage-cost-logger-kafka,以及包 io.spring.dataflow.sample.usagecostlogger
  2. 在“依赖项”文本框中,键入 Kafka 以选择 Kafka 绑定器依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 单击“生成项目”。

现在,您应该 解压缩 usage-cost-logger-kafka.zip 文件并将项目导入您喜欢的 IDE。

您可以 直接下载 Spring Initializr 生成的项目 并单击“生成项目”。

或访问 Spring Initializr 网站 并按照以下说明操作

  1. 创建一个新的 Maven 项目,其组名为 io.spring.dataflow.sample,构件名为 usage-cost-logger-rabbit,包名为 io.spring.dataflow.sample.usagecostlogger
  2. 在“**依赖项**”文本框中,键入 RabbitMQ 以选择 RabbitMQ binder 依赖项。
  3. 在“依赖项”文本框中,键入 Cloud Stream 以选择 Spring Cloud Stream 依赖项。
  4. 在“依赖项”文本框中,键入 Actuator 以选择 Spring Boot actuator 依赖项。
  5. 单击“生成项目”。

现在,您应该 解压缩 usage-cost-logger-rabbit.zip 文件并将项目导入您喜欢的 IDE。

业务逻辑

创建业务逻辑

  1. io.spring.dataflow.sample.usagecostlogger 包中创建一个 UsageCostDetail 类。其内容应类似于 UsageCostDetail.java 的内容。UsageCostDetail 类包含 userIdcallCostdataCost 属性。
  2. io.spring.dataflow.sample.usagecostlogger 包中创建 UsageCostLogger 类,该类接收 UsageCostDetail 消息并将其记录。以下清单显示了源代码
package io.spring.dataflow.sample.usagecostlogger;

import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostLogger {

	private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);

	@Bean
	public Consumer<UsageCostDetail> process() {
		return usageCostDetail -> {
			logger.info(usageCostDetail.toString());
		};
	}
}

在前面的应用程序中,我们声明了一个接受 UsageCostDetailConsumer bean。Spring Cloud Stream 将发现此函数并将其输入绑定到为消息中间件配置的输入目标。如上一节所述,Spring Cloud Stream 使用适当的 MessageConverter 在调用此 Consumer 之前执行必要的类型转换。

配置

配置 sink 应用程序时,我们需要设置

  • 此应用程序订阅的 `input` 绑定目标(Kafka 主题或 RabbitMQ 交换器)。
  • (可选)group 用于指定此消费者应用程序所属的消费者组。

为了方便起见,我们将函数绑定名称 process-in-0 别名为 input

src/main/resources/application.properties 中,添加以下属性

spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

构建

现在我们可以构建使用成本记录器应用程序了。在 usage-cost-logger 的根目录下,使用以下命令使用 Maven 构建项目

./mvnw clean package

测试

要在 UsageCostLoggerApplicationTests 类中对 UsageCostLogger 进行单元测试,请添加以下代码

package io.spring.dataflow.sample.usagecostlogger;

import java.util.HashMap;
import java.util.Map;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostLogger(CapturedOutput output) {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageCostLoggerApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId("user1");
			usageCostDetail.setCallCost(3.0);
			usageCostDetail.setDataCost(5.0);

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);

			source.send(message);

			Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
		}
	}
}

pom.xml 中添加 awaitility 依赖项

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
  • contextLoads 测试用例验证应用程序是否成功启动。
  • testUsageCostLogger 测试用例通过使用 Spring Boot 测试框架中的 OutputCaptureExtension 来验证是否调用了 UsageCostLoggerprocess 方法。

部署

下一步是使用为这些应用程序配置的消息代理,将这些应用程序部署到其中一个受支持的平台