流 Java DSL
除了使用 shell 来创建和部署流之外,您还可以使用 spring-cloud-dataflow-rest-client
模块提供的基于 Java 的 DSL。Java DSL 是 DataFlowTemplate
类的一个便捷包装器,支持以编程方式创建和部署流。
要开始使用,您需要将以下依赖项添加到您的项目中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-rest-client</artifactId>
<version>2.11.3</version>
</dependency>
用法
Java DSL 的核心类是 StreamBuilder
、StreamDefinition
、Stream
、StreamApplication
和 DataFlowTemplate
。入口点是 Stream
上的一个 builder
方法,它接受一个 DataFlowTemplate
的实例。要创建 DataFlowTemplate
的实例,您需要提供 Data Flow Server 的 URI
位置。
Spring Boot 还为 StreamBuilder
和 DataFlowTemplate
提供了自动配置。您可以使用 DataFlowClientProperties
中的属性来配置与 Data Flow 服务器的连接。通常,您应该从 spring.cloud.dataflow.client.server-uri
属性开始。
请考虑以下使用 definition
风格的示例
URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);
dataFlowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/rabbitmq-maven-latest", true);
StreamDefinition streamDefinition = Stream.builder(dataFlowOperations)
.name("ticktock")
.definition("time | log")
.create();
create
方法返回一个 StreamDefinition
的实例,该实例表示已创建但尚未部署的流。这被称为“定义”风格,因为它接受一个用于流定义的字符串(与 shell 中相同)。如果应用程序尚未在 Data Flow 服务器中注册,则可以使用 DataFlowOperations
类注册它们。使用 StreamDefinition
实例,您可以使用 deploy
或 destroy
方法来部署或销毁流。以下示例部署了该流
Stream stream = streamDefinition.deploy();
Stream
实例提供了 getStatus
、destroy
和 undeploy
方法来控制和查询流。如果要立即部署流,则无需创建类型为 StreamDefinition
的单独局部变量。相反,您可以将调用链接在一起,如下所示
Stream stream = Stream.builder(dataFlowOperations)
.name("ticktock")
.definition("time | log")
.create()
.deploy();
deploy
方法被重载以接受 java.util.Map
类型的部署属性。
StreamApplication
类用于“流畅”的 Java DSL 风格,将在下一节中讨论。StreamBuilder
类从 Stream.builder(dataFlowOperations)
方法返回。在较大的应用程序中,通常会创建一个 StreamBuilder
的实例作为 Spring @Bean
,并在整个应用程序中共享它。
Java DSL 风格
Java DSL 提供了两种创建流的风格
definition
风格保留了在 shell 中使用文本 DSL 的管道和过滤器的风格。通过在设置流名称后使用definition
方法来选择此风格,例如Stream.builder(dataFlowOperations).name("ticktock").definition(<definition goes here>)
。fluent
风格允许您通过传入StreamApplication
的实例,将源、处理器和接收器链接在一起。此风格通过在设置流名称后使用source
方法来选择,例如,Stream.builder(dataFlowOperations).name("ticktock").source(<流应用程序实例放在此处>)
。然后,您可以将processor()
和sink()
方法链接在一起,以创建流定义。
为了演示这两种风格,我们包含了一个使用这两种方法的简单流。
以下示例演示了定义方法
public void definitionStyle() throws Exception{
Map<String, String> deploymentProperties = createDeploymentProperties();
Stream woodchuck = Stream.builder(dataFlowOperations)
.name("woodchuck")
.definition("http --server.port=9900 | splitter --expression=payload.split(' ') | log")
.create()
.deploy(deploymentProperties);
waitAndDestroy(woodchuck)
}
以下示例演示了 fluent 方法
private void fluentStyle(DataFlowOperations dataFlowOperations) throws InterruptedException {
logger.info("Deploying stream.");
Stream woodchuck = builder
.name("woodchuck")
.source(source)
.processor(processor)
.sink(sink)
.create()
.deploy();
waitAndDestroy(woodchuck);
}
waitAndDestroy
方法使用 getStatus
方法轮询流的状态
private void waitAndDestroy(Stream stream) throws InterruptedException {
while(!stream.getStatus().equals("deployed")){
System.out.println("Wating for deployment of stream.");
Thread.sleep(5000);
}
System.out.println("Letting the stream run for 2 minutes.");
// Let the stream run for 2 minutes
Thread.sleep(120000);
System.out.println("Destroying stream");
stream.destroy();
}
使用定义风格时,部署属性以 java.util.Map
的形式指定,与 shell 中的方式相同。以下清单显示了 createDeploymentProperties
方法
private Map<String, String> createDeploymentProperties() {
DeploymentPropertiesBuilder propertiesBuilder = new DeploymentPropertiesBuilder();
propertiesBuilder.memory("log", 512);
propertiesBuilder.count("log",2);
propertiesBuilder.put("app.splitter.producer.partitionKeyExpression", "payload");
return propertiesBuilder.build();
}
在这种情况下,除了为日志应用程序设置部署器属性 count
之外,应用程序属性也会在部署时被覆盖。使用 fluent 风格时,部署属性通过使用 addDeploymentProperty
方法添加(例如,new StreamApplication("log").addDeploymentProperty("count", 2)
),并且您不需要在属性前面加上 deployer.<app_name>
。
要创建和部署流,您需要确保首先在数据流服务器中注册了相应的应用程序。尝试创建或部署包含未知应用程序的流会引发异常。您可以使用 DataFlowTemplate
注册您的应用程序,如下所示
dataFlowOperations.appRegistryOperations().importFromResource(
"https://dataflow.springframework.org.cn/rabbitmq-maven-latest", true);
流应用程序也可以是应用程序中的 bean,这些 bean 被注入到其他类中以创建流。构建 Spring 应用程序的方法有很多,但一种方法是使用 @Configuration
类定义 StreamBuilder
和 StreamApplications
@Configuration
public class StreamConfiguration {
@Bean
public StreamBuilder builder() {
return Stream.builder(new DataFlowTemplate(URI.create("http://localhost:9393")));
}
@Bean
public StreamApplication httpSource(){
return new StreamApplication("http");
}
@Bean
public StreamApplication logSink(){
return new StreamApplication("log");
}
}
然后,在另一个类中,您可以使用 @Autowire
注释这些类并部署流
@Component
public class MyStreamApps {
@Autowired
private StreamBuilder streamBuilder;
@Autowired
private StreamApplication httpSource;
@Autowired
private StreamApplication logSink;
public void deploySimpleStream() {
Stream simpleStream = streamBuilder.name("simpleStream")
.source(httpSource)
.sink(logSink)
.create()
.deploy();
}
}
这种风格允许您跨多个流共享 StreamApplications
。
使用 DeploymentPropertiesBuilder
无论您选择哪种风格,deploy(Map<String, String> deploymentProperties)
方法都允许您自定义流的部署方式。我们通过使用构建器风格简化了创建具有属性的映射,并为某些属性创建了静态方法,因此您无需记住这些属性的名称。您可以重写前面的 createDeploymentProperties
示例,如下所示
private Map<String, String> createDeploymentProperties() {
return new DeploymentPropertiesBuilder()
.count("log", 2)
.memory("log", 512)
.put("app.splitter.producer.partitionKeyExpression", "payload")
.build();
}
此实用程序类旨在帮助创建 Map
,并添加了一些方法来帮助定义预定义的属性。
Skipper 部署属性
除了 Spring Cloud Data Flow 之外,您还需要传递某些 Skipper 特定的部署属性,例如,选择目标平台。SkipperDeploymentPropertiesBuilder
提供了 DeploymentPropertiesBuilder
中的所有属性,并添加了 Skipper 所需的属性。以下示例创建了一个 SkipperDeploymentPropertiesBuilder
private Map<String, String> createDeploymentProperties() {
return new SkipperDeploymentPropertiesBuilder()
.count("log", 2)
.memory("log", 512)
.put("app.splitter.producer.partitionKeyExpression", "payload")
.platformName("pcf")
.build();
}