在本指南中,我们将开发三个使用 Spring Cloud Stream 的 Spring Boot 应用程序,并将它们部署到 Cloud Foundry、Kubernetes 和本地机器。在另一个指南中,我们将使用 Data Flow 部署这些应用程序。通过手动部署应用程序,您可以更好地了解 Data Flow 为您自动执行的步骤。
以下部分描述了如何从头开始构建这些应用程序。
如果您愿意,可以下载包含这些应用程序源代码的 zip 文件,解压缩,构建它,然后继续部署部分。
您可以从浏览器下载包含所有三个应用程序的项目。您也可以使用命令行,如下例所示
wget 'https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true' -O usage-cost-stream-sample.zip构建下载的示例
可以使用相同的代码库将流应用程序配置为使用 Kafka 代理或 RabbitMQ 运行。唯一的区别在于可执行 jar 文件。为了使它们与 Kafka 代理一起工作,它们需要 Kafka binder 依赖项(默认情况下启用)。对于 RabbitMQ,它们需要 Rabbit binder。
要为 Kafka 构建示例流应用程序,请从根项目目录
$./mvnw clean package -Pkafka要为 RabbitMQ 构建示例流应用程序,请从根项目目录
$./mvnw clean package -Prabbit开发
我们创建了三个使用配置的绑定器进行通信的 Spring Cloud Stream 应用程序。
该场景是一家手机公司为其客户创建账单。用户拨打的每个电话都有一个 duration 和通话期间使用的 data 量。作为生成账单过程的一部分,需要将原始通话数据转换为通话时长费用和使用数据量费用。
使用包含通话 duration 和通话期间使用的数据量 data 的 UsageDetail 类对通话进行建模。使用包含通话费用(costCall)和数据费用(costData)的 UsageCostDetail 类对账单进行建模。每个类都包含一个 ID(userId)来标识打电话的人。
三个流应用程序如下
Source应用程序(名为UsageDetailSender)生成用户的通话duration和每个userId使用的数据量data,并发送一条包含UsageDetail对象作为 JSON 的消息。Processor应用程序(名为UsageCostProcessor)使用UsageDetail并计算每个userId的通话费用和数据费用。它发送UsageCostDetail对象作为 JSON。Sink应用程序(名为UsageCostLogger)使用UsageCostDetail对象并记录通话和数据的费用。
来源
在此步骤中,我们创建 UsageDetailSender 源。
您可以 直接下载 Spring Initializr 生成的项目
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名为
io.spring.dataflow.sample,工件名为usage-detail-sender-kafka,包为io.spring.dataflow.sample.usagedetailsender。 - 在“依赖项”文本框中,键入
Kafka以选择 Kafka binder 依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 点击“生成项目”按钮。
现在,您应该 解压缩 usage-detail-sender-kafka.zip 文件并将项目导入您喜欢的 IDE。
当使用 Kafka 作为消息代理时,您可以选择许多配置选项来扩展或覆盖以实现所需的运行时行为。 Kafka binder 文档 列出了 Kafka binder 配置属性。
您可以 直接下载 Spring Initializr 生成的项目
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名为
io.spring.dataflow.sample,项目名称为usage-detail-sender-rabbit,包为io.spring.dataflow.sample.usagedetailsender。 - 在“依赖项”文本框中,键入
RabbitMQ以选择 RabbitMQ binder 依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 点击“生成项目”按钮。
现在,您应该 解压缩 usage-detail-sender-rabbit.zip 文件并将项目导入您喜欢的 IDE。
持久化队列
默认情况下,Spring Cloud Stream 消费者应用程序会创建一个 匿名 自动删除队列。如果生产者应用程序在消费者应用程序之前启动,这可能会导致消息未被存储和转发。即使交换机是持久化的,我们也需要一个 持久化 队列绑定到交换机,以便存储消息以供以后消费。因此,为了保证消息传递,您需要一个 持久化 队列。
要预先创建持久化队列并将其绑定到交换机,生产者应用程序应设置以下属性
spring.cloud.stream.bindings.<channelName>.producer.requiredGroupsrequiredGroups 属性接受生产者必须确保消息传递到的组的逗号分隔列表。设置此属性后,将使用 <exchange>.<requiredGroup> 格式创建一个持久化队列。
在使用 RabbitMQ 作为消息代理时,您可以选择许多配置选项来扩展或覆盖以实现所需的运行时行为。RabbitMQ Binder 文档列出了 RabbitMQ binder 配置属性。
业务逻辑
现在我们可以创建此应用程序所需的代码。为此
- 在
io.spring.dataflow.sample.usagedetailsender包中创建一个UsageDetail类,其内容类似于 UsageDetail.java 中的内容。UsageDetail类包含userId、data和duration属性。 - 在
io.spring.dataflow.sample.usagedetailsender包中创建UsageDetailSender类。它应该类似于以下列表
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageDetailSender {
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}sendEvents Supplier 提供了一个填充了随机值的 UsageDetail 对象。Spring Cloud Stream 自动绑定此函数以将其数据发送到配置的输出目标。Spring Cloud Stream 还为任何 Supplier 配置了一个默认轮询器,该轮询器默认情况下将每秒调用一次该函数。
配置
配置 source 应用程序时,我们需要设置 output 绑定目标(RabbitMQ 交换器或 Kafka 主题的名称),生产者将在其中发布数据。
为了方便起见,我们将函数输出绑定名称 sendEvents-out-0(表示与 sendEvents 函数的第一个输出参数对应的输出)别名为逻辑名称 output。或者,我们可以直接绑定输出绑定名称:spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail。有关更详细的说明,请参阅函数绑定名称。
在 src/main/resources/application.properties 中,添加以下属性
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0构建
现在我们可以构建使用情况详细信息发送器应用程序了。
在 usage-detail-sender 根目录中,使用以下命令使用 maven 构建项目
./mvnw clean package测试
Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>TestChannelBinderConfiguration 提供了一个内存绑定器实现,用于跟踪和测试应用程序的出站和入站消息,而不是消息代理绑定器实现。测试配置包括 InputDestination 和 OutputDestination bean,用于发送和接收消息。要对 UsageDetailSender 应用程序进行单元测试,请在 UsageDetailSenderApplicationTests 类中添加以下代码。
package io.spring.dataflow.sample.usagedetailsender;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageDetailSenderApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}contextLoads测试用例验证应用程序是否成功启动。testUsageDetailSender测试用例使用OutputDestination接收和验证UsageDetailSender发送的消息。
内存测试绑定器的行为与任何消息代理绑定器实现完全相同。值得注意的是,在 Spring Cloud Stream 应用程序中,消息有效负载始终是一个字节数组,默认情况下编码为 JSON。使用应用程序在其输入通道上接收字节,并根据内容类型自动委托给适当的 MessageConverter,以转换字节以匹配使用函数的参数类型,在本例中为 UsageDetail。对于测试,我们需要显式执行此步骤。或者,我们可以直接调用 JSON 解析器,而不是使用 MessageConverter。
处理器
在此步骤中,我们将创建 UsageCostProcessor 处理器。
您可以直接下载 Spring Initializr 生成的项目
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名为
io.spring.dataflow.sample,工件名为usage-cost-processor-kafka,包名为io.spring.dataflow.sample.usagecostprocessor。 - 在“依赖项”文本框中,键入
Kafka以选择 Kafka binder 依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 点击“生成项目”按钮。
现在,您应该解压缩 usage-cost-processor-kafka.zip 文件,并将项目导入您喜欢的 IDE。
您可以直接下载 Spring Initializr 生成的项目
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名为
io.spring.dataflow.sample,工件名为usage-cost-processor-rabbit,包名为io.spring.dataflow.sample.usagecostprocessor。 - 在“依赖项”文本框中,键入
RabbitMQ以选择 RabbitMQ binder 依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 点击“生成项目”按钮。
现在,您应该解压缩 usage-cost-processor-rabbit.zip 文件,并将项目导入您喜欢的 IDE。
业务逻辑
现在我们可以创建此应用程序所需的代码。为此
- 在
io.spring.dataflow.sample.usagecostprocessor中创建UsageDetail类。其内容类似于 UsageDetail.java 的内容。UsageDetail类包含userId、data和duration属性。 - 在 `io.spring.dataflow.sample.usagecostprocessor` 包中创建 `UsageCostDetail` 类。其内容类似于 UsageCostDetail.java 的内容。`UsageCostDetail` 类包含 `userId`、`callCost` 和 `dataCost` 属性。
- 在 `io.spring.dataflow.sample.usagecostprocessor` 包中创建 `UsageCostProcessor` 类,它接收 `UsageDetail` 消息,计算通话和数据成本,并发送 `UsageCostDetail` 消息。以下清单显示了源代码
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}在前面的应用程序中,我们声明了一个 `Function` bean,它接受一个 `UsageDetail` 并返回一个 `UsageCostDetail`。Spring Cloud Stream 将发现此函数并将其输入和输出绑定到为消息中间件配置的目标。如上一节所述,Spring Cloud Stream 使用适当的 `MessageConverter` 来执行必要的类型转换。
配置
配置 `processor` 应用程序时,我们需要设置以下属性
- 此应用程序订阅的 `input` 绑定目标(Kafka 主题或 RabbitMQ 交换器)。
- 生产者将发布数据的 `output` 绑定目标。
对于生产应用程序,最好设置 `spring.cloud.stream.bindings.input.group` 以指定此消费者应用程序所属的消费者组。这确保了每个都使用自己的组 ID 标识的其他消费者应用程序将接收每条消息。每个消费者组可以扩展到多个实例以分配工作负载。Spring Cloud Stream 抽象了 Kafka 原生的此功能,以便将其扩展到 RabbitMQ 和其他绑定器实现。
在 src/main/resources/application.properties 中,添加以下属性
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0为了方便起见,我们将函数绑定名称 `processUsageCost-in-0` 和 `processUsageCost-out-0` 分别别名为 `input` 和 `output`。
- `spring.cloud.stream.bindings.input.destination` 属性将 `UsageCostProcessor` 对象的 `input` 绑定到 `usage-detail` 目标。
- `spring.cloud.stream.bindings.output.destination` 属性将 `UsageCostProcessor` 对象的输出绑定到 `usage-cost` 目标。
输入目标必须与 Source 应用程序的输出目标相同。同样,输出目标必须与 Sink 的输入目标相同(如下所示)。
构建
现在我们可以构建使用成本处理器应用程序。 在 usage-cost-processor riit 目录中,使用以下命令使用 maven 构建项目
./mvnw clean package测试
如上所述,Spring Cloud Stream 提供了一个测试 jar 来测试 Spring Cloud Stream 应用程序
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>TestChannelBinderConfiguration 提供了一个内存中的绑定器实现,用于跟踪和测试应用程序的出站和入站消息。 测试配置包括 InputDestination 和 OutputDestination bean 来发送和接收消息。 要对 UsageCostProcessor 应用程序进行单元测试,请在 UsageCostProcessorApplicationTests 类中添加以下代码
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-cost");
UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}contextLoads测试用例验证应用程序是否成功启动。testUsageCostProcessor测试用例使用InputDestination发送消息,并使用OutputDestination接收和验证消息。
接收器
在此步骤中,我们将创建 UsageCostLogger 接收器。
您可以 直接下载 Spring Initializr 生成的项目 并单击“生成项目”。
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名称为
io.spring.dataflow.sample,工件名称为usage-cost-logger-kafka,以及包io.spring.dataflow.sample.usagecostlogger。 - 在“依赖项”文本框中,键入
Kafka以选择 Kafka 绑定器依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 单击“生成项目”。
现在,您应该 解压缩 usage-cost-logger-kafka.zip 文件并将项目导入您喜欢的 IDE。
您可以 直接下载 Spring Initializr 生成的项目 并单击“生成项目”。
或访问 Spring Initializr 网站 并按照以下说明操作
- 创建一个新的 Maven 项目,其组名为
io.spring.dataflow.sample,构件名为usage-cost-logger-rabbit,包名为io.spring.dataflow.sample.usagecostlogger。 - 在“**依赖项**”文本框中,键入
RabbitMQ以选择 RabbitMQ binder 依赖项。 - 在“依赖项”文本框中,键入
Cloud Stream以选择 Spring Cloud Stream 依赖项。 - 在“依赖项”文本框中,键入
Actuator以选择 Spring Boot actuator 依赖项。 - 单击“生成项目”。
现在,您应该 解压缩 usage-cost-logger-rabbit.zip 文件并将项目导入您喜欢的 IDE。
业务逻辑
创建业务逻辑
- 在
io.spring.dataflow.sample.usagecostlogger包中创建一个UsageCostDetail类。其内容应类似于 UsageCostDetail.java 的内容。UsageCostDetail类包含userId、callCost和dataCost属性。 - 在
io.spring.dataflow.sample.usagecostlogger包中创建UsageCostLogger类,该类接收UsageCostDetail消息并将其记录。以下清单显示了源代码
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}在前面的应用程序中,我们声明了一个接受 UsageCostDetail 的 Consumer bean。Spring Cloud Stream 将发现此函数并将其输入绑定到为消息中间件配置的输入目标。如上一节所述,Spring Cloud Stream 使用适当的 MessageConverter 在调用此 Consumer 之前执行必要的类型转换。
配置
配置 sink 应用程序时,我们需要设置
- 此应用程序订阅的 `input` 绑定目标(Kafka 主题或 RabbitMQ 交换器)。
- (可选)
group用于指定此消费者应用程序所属的消费者组。
为了方便起见,我们将函数绑定名称 process-in-0 别名为 input。
在 src/main/resources/application.properties 中,添加以下属性
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0构建
现在我们可以构建使用成本记录器应用程序了。在 usage-cost-logger 的根目录下,使用以下命令使用 Maven 构建项目
./mvnw clean package测试
要在 UsageCostLoggerApplicationTests 类中对 UsageCostLogger 进行单元测试,请添加以下代码
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}在 pom.xml 中添加 awaitility 依赖项
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>contextLoads测试用例验证应用程序是否成功启动。testUsageCostLogger测试用例通过使用 Spring Boot 测试框架中的OutputCaptureExtension来验证是否调用了UsageCostLogger的process方法。
部署
下一步是使用为这些应用程序配置的消息代理,将这些应用程序部署到其中一个受支持的平台。