使用函数式应用构建流数据管道
本案例描述了如何使用 Spring Cloud Stream 构建简单的基于函数的应用程序,并将它们嵌入到 Spring Cloud Data Flow 流数据管道中。
概述
我们创建一个 time-source
应用程序,它以配置的时间间隔生成当前日期或时间戳,并将其发送到消息中间件。我们还创建了一个接收器 log-sink
应用程序,用于消费从中间件发布的消息。
有关 Spring Cloud Stream 如何提供此支持的更多信息,请参阅Spring Cloud Stream 文档。
使用 Spring Cloud Stream 3.x 作为依赖项,您可以使用 java.util.function.Supplier
编写 Source
应用程序,如下所示
package com.example.timesource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Supplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class TimeSourceApplication {
@Bean
public Supplier<String> timeSupplier() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return () -> {
return sdf.format(new Date());
};
}
public static void main(String[] args) {
SpringApplication.run(TimeSourceApplication.class, args);
}
}
Spring Cloud Stream 提供了 spring.cloud.stream.poller.DefaultPollerProperties
,您可以对其进行配置以触发 Supplier
函数 timeSupplier()
。例如,您可以使用 --spring.cloud.stream.poller.fixed-delay=5000
属性每五秒触发一次此 Supplier
函数。
同样,您可以使用 java.util.function.Consumer
编写 Sink
应用程序,如下所示
package com.example.logsink;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;
@SpringBootApplication
public class LogSinkApplication {
@Bean
IntegrationFlow logConsumerFlow() {
return IntegrationFlows.from(MessageConsumer.class, (gateway) -> gateway.beanName("logConsumer"))
.handle((payload, headers) -> {
if (payload instanceof byte[]) {
return new String((byte[]) payload);
}
return payload;
})
.log(LoggingHandler.Level.INFO, "log-consumer", "payload")
.get();
}
private interface MessageConsumer extends Consumer<Message<?>> {}
public static void main(String[] args) {
SpringApplication.run(LogSinkApplication.class, args);
}
}
构建 time-source
和 log-sink
应用程序后,您可以将这些应用程序注册到 Spring Cloud Data Flow 中。
假设您使用以下应用程序创建了一个流:
ticktock=time-source | log-sink
您需要确保将这些应用程序的功能绑定映射到 Spring Cloud Data Flow 能够理解的相应 output
和 input
名称。
这意味着您需要在部署流时配置以下属性
app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input
timeSupplier
函数的输出和 logConsumer
函数的输入需要映射到 Spring Cloud Data Flow 能够理解的出站和入站名称:output
和 input
。
除此之外,您还需要提供一种触发 Supplier
函数的方法,在本例中为
app.time-source.spring.cloud.stream.poller.fixed-delay=5000
如果您使用 local
部署器运行此程序,您还可以将应用程序中的日志继承到 Skipper 服务器日志中,以便您可以在 Skipper 服务器日志中看到 ticktock
流消息在 log-sink
消费者端的输出
deployer.*.local.inherit-logging=true