编程模型

Spring Cloud Stream 提供了使用不同编程模型构建流应用程序的灵活性。

  • 函数式
  • Kafka Streams (当使用 Kafak Streams binder 时)

基于注解的编程模型 (@EnableBinding / @StreamListener) 在 Spring Cloud Stream 3.2.x 中已被弃用。

在接下来的章节中,我们将回顾如何使用不同的编程模型构建一个简单的业务逻辑示例。

为了用一个具体的例子来突出编程的使用,考虑一个我们从 HTTP 端点接收数据的场景。一旦数据可用,我们希望通过添加前缀和后缀来转换有效负载。最后,我们想要验证转换后的数据。

开箱即用的源和接收器

为了演示前面提到的用例,我们首先来看看两个开箱即用的应用程序

自定义处理器

对于源和接收器步骤之间的数据转换,我们重点介绍一个自定义处理器应用程序,并以此为基础演示不同的编程模型。

代码

public class FunctionStreamSampleProcessor {

	@Bean
	public Function<String, String> messenger() {
		return data -> "Hello: " + data + "!";
	}
}
public class KafkaStreamsSampleProcessor {

	@Bean
	public Consumer<KStream<String, String>> messenger() {
		return data -> data.map((k, v) -> new KeyValue<>(null, "Hello: " + v + "!"));
	}
}

处理器中的业务逻辑通过添加“Hello: ”前缀和“!”后缀来转换接收到的有效负载。

相同的“业务逻辑”可以用不同的编程模型来实现,并且每个变体都实现了一个简单的 messenger 函数,该函数可以独立测试并在隔离状态下进行演进。

要点:开发人员可以选择可用的编程模型风格。

配置: (application.properties)

spring.cloud.stream.bindings.input.destination=incomingDataTopic
spring.cloud.stream.bindings.output.destination=outgoingDataTopic
spring.cloud.stream.bindings.input.destination=incomingDataTopic
spring.cloud.stream.bindings.output.destination=outgoingDataTopic

spring.cloud.stream.kafka.streams.binder.applicationId=kstreams-sample

在 Kafka Streams 配置中,您可能注意到了额外的属性 spring.cloud.stream.kafka.streams.binder.applicationId,框架在内部需要使用该属性来唯一标识 Kafka Streams 应用程序。

有关 函数式Kafka Streams 编程模型的更多信息,请参阅 Spring Cloud Stream 参考文档。

在处理器应用程序中组合函数式 Bean

函数组合支持适用于开箱即用的 Spring Cloud Stream Processor 应用程序,因为在现有处理器的应用程序逻辑之前或之后是否需要应用该函数存在歧义。这很难确定。

但是,您可以创建自己的处理器应用程序,这些应用程序使用标准 java.util.Function API 进行函数组合,如下所示

@Configuration
public static class FunctionProcessorConfiguration {

  @Bean
  public Function<String, String> upperAndConcat() {
  return upper().andThen(concat());
  }

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

当您使用自定义 processor 应用程序部署流时,您需要通过定义以下属性来部署 processor 应用程序:spring.cloud.stream.function.definition 来组合函数式 Bean。

在本例中,它将设置为

spring.cloud.stream.function.definition=upper|concat