函数组合

Spring Cloud Stream 包含与 Spring Cloud Function 基于函数的编程模型的集成,该模型允许将应用程序的业务逻辑建模为 java.util.Functionjava.util.Consumerjava.util.Supplier,分别代表 ProcessorSinkSource 的角色。

关于 编程模型 的指南展示了如何使用函数式风格编写 Processor

在此基础上,我们可以通过导入现有 SourceSink 的配置并添加定义 java.util.Function 的代码来扩展现有 SourceSink 应用程序。

考虑以下流

http | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log

对于某些用例,简单的有效负载转换逻辑可能不需要独立的应用程序,并且将转换与源或接收器组合可能是有益的。例如,我们可能希望避免为如此少量的逻辑部署额外的应用程序,或者避免在消息中间件上发送任何敏感数据。

为此,我们可以创建一个新的源应用程序,该应用程序导入 HttpSourceConfiguration 并将 java.util.Function 注册为 Spring bean。这将在原始源的输出之后自动应用该函数。类似地,对于 Sink 实例,该函数在接收器的输入之前应用。

此外,您不仅可以将原始源或接收器与单个函数组合,还可以使用原始源或接收器声明性地组合多个函数。

在本指南中,我们将创建前面定义的流,然后创建一个新的 http-transformer 源应用程序,它将原始转换器表达式封装为两个 java.util.Function 实例。部署了一个新的流来执行相同的处理,但现在只使用两个应用程序而不是三个。

在本指南中,我们假设您已经按照安装指南中的说明导入了相应的 httptransformerlog 应用程序,并将它们注册到 Spring Cloud Data Flow。

使用三个应用程序

对于第一个流,我们使用预构建的 httptransformlog 应用程序。

首先,我们创建以下流

stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然后,我们按如下方式部署流

stream deploy hello

在本指南中,我们使用本地安装,以便我们可以将一些数据发布到 localhost 上的端点,如下所示

http post --data "friend" --target "http://localhost:9000"

您可以在 log 应用程序中看到以下日志消息

[sformer.hello-1] log-sink                                 : Hello FRIEND

使用两个应用程序

在此步骤中,我们将创建一个新的源应用程序并将其注册,该应用程序将流中两个应用程序(http | transformer)的功能组合到一个应用程序中。然后,我们部署新的流并验证输出是否与上一个示例相同。

新的源应用程序名为 http-transformer,它导入 http 源应用程序的配置,并将两个 java.util.Function 实例定义为 Spring Bean。以下清单显示了此应用程序的源代码

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

Spring Cloud Stream 有一个名为 spring.cloud.stream.function.definition 的属性。它接受一个用管道或逗号分隔的函数列表,这些函数按顺序调用。

设置此属性后,功能 Bean 将在运行时自动链接。

函数组合按以下方式发生

  • 当 Spring Cloud Stream 应用程序的类型为 Source 时,组合函数将在源 output 之后应用。
  • 当 Spring Cloud Stream 应用程序的类型为 Sink 时,组合函数将在接收器 input 之前应用。

要先应用 upper 函数,然后再应用 concat 函数,需要按如下方式设置属性

spring.cloud.stream.function.definition=upper|concat

Spring Cloud Function 使用管道符号将位于同一个 JVM 中的两个函数组合在一起。请注意,Spring Cloud Data Flow DSL 中的管道符号用于将一个应用程序的消息中间件输出连接到另一个应用程序。

现在,您可以构建新的源,注册它,并在本地计算机上部署流。

构建

如果您想注册我们在 Maven 存储库和 Dockerhub 中提供的 http-transformermavendocker 资源 URI,则可以跳过本节。

您可以从 Github 下载此应用程序的源代码。

如果您使用 RabbitMQ binder,则可以下载 包含 RabbitMQ binder 的 http-transformer。下载并解压缩源代码后,您可以使用 Maven 构建应用程序,如下所示

cd composed-http-transformer-rabbitmq
./mvnw clean install

如果您使用 Kafka binder,则可以下载 包含 Kafka binder 的 http-transformer。下载并解压缩源代码后,您可以使用 Maven 构建应用程序,如下所示

cd composed-http-transformer-kafka
./mvnw clean install

注册本地构建的应用程序

现在,您可以使用 Data Flow Shell 注册 http-transformer 应用程序,如下所示

app register --name http-transformer --type source --uri file:///<YOUR-SOURCE-CODE>/target/composed-http-transformer-[kafka/rabbitmq]-0.0.1-SNAPSHOT.jar

对于 --uri 选项,请将工件的目录名称和路径替换为适合您系统的路径。

注册现成的应用程序

http-transformer 应用程序的 Maven 和 Docker 工件在 KafkaRabbitMQ binder 中均已提供。

以下列表描述了包含 Kafka binder 的 Maven 工件

app register --name http-transformer --type source --uri maven://io.spring.dataflow.sample:composed-http-transformer-kafka:0.0.1-SNAPSHOT

以下列表描述了包含 RabbitMQ binder 的 Maven 工件

app register --name http-transformer --type source --uri maven://io.spring.dataflow.sample:composed-http-transformer-rabbitmq:0.0.1-SNAPSHOT

以下列表描述了包含 Kafka binder 的 Docker 工件

app register --name http-transformer --type source --uri docker://springcloudstream/composed-http-transformer-kafka:0.0.1-SNAPSHOT

以下列表描述了包含 RabbitMQ binder 的 Docker 工件

app register --name http-transformer --type source --uri docker://springcloudstream/composed-http-transformer-rabbitmq:0.0.1-SNAPSHOT

部署流

我们现在可以使用包含名为 upperconcat 的功能 Bean 的 http-transform 应用程序来部署新的流。

stream create helloComposed --definition "http-transformer --server.port=9001 | log"

可选: 如果未定义属性 spring.cloud.function.definition,或者我们需要覆盖该属性以更改函数定义。函数定义表示由 Spring Cloud Function 定义的功能 DSL。

在这种情况下,它如下所示

stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=concat|upper"

前面的部署将 concatupper 函数 Bean 合成到 http 源应用程序中。

然后我们可以将有效负载发送到 http 应用程序,如下所示

http post --data "friend" --target "http://localhost:9001"

然后您可以在 log 应用程序中看到输出,如下所示

[helloComposed-1] log-sink                                 : Hello FRIEND

Kotlin 支持

Spring Cloud Function 支持 Kotlin。您可以在应用程序中添加基于 Kotlin 的函数 Bean。您可以将任何 Kotlin 函数 Bean 添加到 SourceSink 应用程序的可组合函数中。

为了看到这一点,我们可以创建另一个示例应用程序(http-transformer-kotlin),它定义了 Kotlin 函数 Bean。

Kotlin 函数 Bean 被配置为 processor。在这里,Kotlin 函数 Bean 是 transform 函数,如下定义

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,该项目还具有 spring-cloud-function-kotlin 作为依赖项,以应用对 Kotlin 函数的功能配置支持,其定义如下

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

构建

如果您想在 Spring Cloud Data Flow 服务器中注册 http-transformer-kotlinmavendocker 资源 URI,则可以跳过本节。

您可以从 Github 下载此应用程序的源代码。

如果您使用 RabbitMQ binder,则可以下载 http-transformer-kotlin-with-RabbitMQ-binder。下载并解压缩源代码后,您可以使用 Maven 构建应用程序,如下所示

cd composed-http-transformer-kotlin-kafka
./mvnw clean install

如果您使用 Kafka binder,则可以下载 http-transformer-kotlin-with-Kafka-binder。下载并解压缩源代码后,您可以使用 Maven 构建应用程序,如下所示

cd composed-http-transformer-kotlin-rabbitmq
./mvnw clean install

注册本地构建的应用程序

现在,您可以使用数据流 Shell 注册 http-transformer-kotlin 应用程序,如下所示:

app register --name http-transformer-kotlin --type source --uri file:///>YOUR-SOURCE-CODE>/target/composed-http-transformer-kotlin-[kafka/rabbitmq]-0.0.1-SNAPSHOT.jar

对于 --uri 选项,请将工件的目录名称和路径替换为适合您系统的的值。

注册现成应用程序

http-transformer 应用程序的 Maven 和 Docker 工件在 KafkaRabbitMQ binder 中均已提供。

以下列表描述了包含 Kafka binder 的 Maven 工件

app register --name http-transformer-kotlin --type source --uri maven://io.spring.dataflow.sample:composed-http-transformer-kotlin-kafka:0.0.1-SNAPSHOT

以下列表描述了包含 RabbitMQ binder 的 Maven 工件

app register --name http-transformer-kotlin --type source --uri maven://io.spring.dataflow.sample:composed-http-transformer-kotlin-rabbitmq:0.0.1-SNAPSHOT

以下列表描述了包含 Kafka binder 的 Docker 工件

app register --name http-transformer-kotlin --type source --uri docker://springcloudstream/composed-http-transformer-kotlin-kafka:0.0.1-SNAPSHOT

以下列表描述了包含 RabbitMQ binder 的 Docker 工件

app register --name http-transformer-kotlin --type source --uri docker://springcloudstream/composed-http-transformer-kotlin-rabbitmq:0.0.1-SNAPSHOT

部署流

要使用 http-transformer-kotlin 应用程序作为 创建流,请运行以下命令:

stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

正如我们在 http-transformer 示例中所做的那样,我们可以使用 spring.cloud.stream.function.definition 属性指定任何有效的组合函数 DSL 来构建函数组合。在本例中,我们可以将使用 Java 配置注册的函数 bean 与来自 Kotlin 处理器配置的函数 bean 组合在一起,如下例所示:

stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat"

在以下示例中,函数名称 (transform) 对应于 Kotlin 函数名称。

http post --data "friend" --target "http://localhost:9002"

**注意:**我们可以将 Kotlin 函数与 Java 函数一起使用,因为 Kotlin 函数在内部会转换为 java.util.Function

您可以在 log 应用程序中看到输出,如下所示:

[omposedKotlin-1] log-sink               : Hello How are you FRIEND