使用函数式应用构建流数据管道

本案例描述了如何使用 Spring Cloud Stream 构建简单的基于函数的应用程序,并将它们嵌入到 Spring Cloud Data Flow 流数据管道中。

概述

我们创建一个 time-source 应用程序,它以配置的时间间隔生成当前日期或时间戳,并将其发送到消息中间件。我们还创建了一个接收器 log-sink 应用程序,用于消费从中间件发布的消息。

有关 Spring Cloud Stream 如何提供此支持的更多信息,请参阅Spring Cloud Stream 文档

使用 Spring Cloud Stream 3.x 作为依赖项,您可以使用 java.util.function.Supplier 编写 Source 应用程序,如下所示

package com.example.timesource;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Supplier;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class TimeSourceApplication {

	@Bean
	public Supplier<String> timeSupplier() {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
		return () -> {
			return sdf.format(new Date());
		};
	}

	public static void main(String[] args) {
		SpringApplication.run(TimeSourceApplication.class, args);
	}

}

Spring Cloud Stream 提供了 spring.cloud.stream.poller.DefaultPollerProperties,您可以对其进行配置以触发 Supplier 函数 timeSupplier()。例如,您可以使用 --spring.cloud.stream.poller.fixed-delay=5000 属性每五秒触发一次此 Supplier 函数。

同样,您可以使用 java.util.function.Consumer 编写 Sink 应用程序,如下所示

package com.example.logsink;

import java.util.function.Consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;

@SpringBootApplication
public class LogSinkApplication {

	@Bean
	IntegrationFlow logConsumerFlow() {
		return IntegrationFlows.from(MessageConsumer.class, (gateway) -> gateway.beanName("logConsumer"))
				.handle((payload, headers) -> {
					if (payload instanceof byte[]) {
						return new String((byte[]) payload);
					}
					return payload;
				})
				.log(LoggingHandler.Level.INFO, "log-consumer", "payload")
				.get();
	}

	private interface MessageConsumer extends Consumer<Message<?>> {}

	public static void main(String[] args) {
		SpringApplication.run(LogSinkApplication.class, args);
	}
}

构建 time-sourcelog-sink 应用程序后,您可以将这些应用程序注册到 Spring Cloud Data Flow 中。

假设您使用以下应用程序创建了一个流:

ticktock=time-source | log-sink

您需要确保将这些应用程序的功能绑定映射到 Spring Cloud Data Flow 能够理解的相应 outputinput 名称。

这意味着您需要在部署流时配置以下属性

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input

timeSupplier 函数的输出和 logConsumer 函数的输入需要映射到 Spring Cloud Data Flow 能够理解的出站和入站名称:outputinput

除此之外,您还需要提供一种触发 Supplier 函数的方法,在本例中为

app.time-source.spring.cloud.stream.poller.fixed-delay=5000

如果您使用 local 部署器运行此程序,您还可以将应用程序中的日志继承到 Skipper 服务器日志中,以便您可以在 Skipper 服务器日志中看到 ticktock 流消息在 log-sink 消费者端的输出

deployer.*.local.inherit-logging=true