流 Java DSL

除了使用 shell 来创建和部署流之外,您还可以使用 spring-cloud-dataflow-rest-client 模块提供的基于 Java 的 DSL。Java DSL 是 DataFlowTemplate 类的一个便捷包装器,支持以编程方式创建和部署流。

要开始使用,您需要将以下依赖项添加到您的项目中

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dataflow-rest-client</artifactId>
	<version>2.11.3</version>
</dependency>

用法

Java DSL 的核心类是 StreamBuilderStreamDefinitionStreamStreamApplicationDataFlowTemplate。入口点是 Stream 上的一个 builder 方法,它接受一个 DataFlowTemplate 的实例。要创建 DataFlowTemplate 的实例,您需要提供 Data Flow Server 的 URI 位置。

Spring Boot 还为 StreamBuilderDataFlowTemplate 提供了自动配置。您可以使用 DataFlowClientProperties 中的属性来配置与 Data Flow 服务器的连接。通常,您应该从 spring.cloud.dataflow.client.server-uri 属性开始。

请考虑以下使用 definition 风格的示例

URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);
dataFlowOperations.appRegistryOperations().importFromResource(
                     "https://dataflow.springframework.org.cn/rabbitmq-maven-latest", true);
StreamDefinition streamDefinition = Stream.builder(dataFlowOperations)
                                      .name("ticktock")
                                      .definition("time | log")
                                      .create();

create 方法返回一个 StreamDefinition 的实例,该实例表示已创建但尚未部署的流。这被称为“定义”风格,因为它接受一个用于流定义的字符串(与 shell 中相同)。如果应用程序尚未在 Data Flow 服务器中注册,则可以使用 DataFlowOperations 类注册它们。使用 StreamDefinition 实例,您可以使用 deploydestroy 方法来部署或销毁流。以下示例部署了该流

Stream stream = streamDefinition.deploy();

Stream 实例提供了 getStatusdestroyundeploy 方法来控制和查询流。如果要立即部署流,则无需创建类型为 StreamDefinition 的单独局部变量。相反,您可以将调用链接在一起,如下所示

Stream stream = Stream.builder(dataFlowOperations)
                  .name("ticktock")
                  .definition("time | log")
                  .create()
                  .deploy();

deploy 方法被重载以接受 java.util.Map 类型的部署属性。

StreamApplication 类用于“流畅”的 Java DSL 风格,将在下一节中讨论。StreamBuilder 类从 Stream.builder(dataFlowOperations) 方法返回。在较大的应用程序中,通常会创建一个 StreamBuilder 的实例作为 Spring @Bean,并在整个应用程序中共享它。

Java DSL 风格

Java DSL 提供了两种创建流的风格

  • definition 风格保留了在 shell 中使用文本 DSL 的管道和过滤器的风格。通过在设置流名称后使用 definition 方法来选择此风格,例如 Stream.builder(dataFlowOperations).name("ticktock").definition(<definition goes here>)
  • fluent 风格允许您通过传入 StreamApplication 的实例,将源、处理器和接收器链接在一起。此风格通过在设置流名称后使用 source 方法来选择,例如,Stream.builder(dataFlowOperations).name("ticktock").source(<流应用程序实例放在此处>)。然后,您可以将 processor()sink() 方法链接在一起,以创建流定义。

为了演示这两种风格,我们包含了一个使用这两种方法的简单流。

以下示例演示了定义方法

public void definitionStyle() throws Exception{

  Map<String, String> deploymentProperties = createDeploymentProperties();

  Stream woodchuck = Stream.builder(dataFlowOperations)
          .name("woodchuck")
          .definition("http --server.port=9900 | splitter --expression=payload.split(' ') | log")
          .create()
          .deploy(deploymentProperties);

  waitAndDestroy(woodchuck)
}

以下示例演示了 fluent 方法

private void fluentStyle(DataFlowOperations dataFlowOperations) throws InterruptedException {

  logger.info("Deploying stream.");

  Stream woodchuck = builder
    .name("woodchuck")
    .source(source)
    .processor(processor)
    .sink(sink)
    .create()
    .deploy();

  waitAndDestroy(woodchuck);
}

waitAndDestroy 方法使用 getStatus 方法轮询流的状态

private void waitAndDestroy(Stream stream) throws InterruptedException {

  while(!stream.getStatus().equals("deployed")){
    System.out.println("Wating for deployment of stream.");
    Thread.sleep(5000);
  }

  System.out.println("Letting the stream run for 2 minutes.");
  // Let the stream run for 2 minutes
  Thread.sleep(120000);

  System.out.println("Destroying stream");
  stream.destroy();
}

使用定义风格时,部署属性以 java.util.Map 的形式指定,与 shell 中的方式相同。以下清单显示了 createDeploymentProperties 方法

private Map<String, String> createDeploymentProperties() {
  DeploymentPropertiesBuilder propertiesBuilder = new DeploymentPropertiesBuilder();
  propertiesBuilder.memory("log", 512);
  propertiesBuilder.count("log",2);
  propertiesBuilder.put("app.splitter.producer.partitionKeyExpression", "payload");
  return propertiesBuilder.build();
}

在这种情况下,除了为日志应用程序设置部署器属性 count 之外,应用程序属性也会在部署时被覆盖。使用 fluent 风格时,部署属性通过使用 addDeploymentProperty 方法添加(例如,new StreamApplication("log").addDeploymentProperty("count", 2)),并且您不需要在属性前面加上 deployer.<app_name>

要创建和部署流,您需要确保首先在数据流服务器中注册了相应的应用程序。尝试创建或部署包含未知应用程序的流会引发异常。您可以使用 DataFlowTemplate 注册您的应用程序,如下所示

dataFlowOperations.appRegistryOperations().importFromResource(
            "https://dataflow.springframework.org.cn/rabbitmq-maven-latest", true);

流应用程序也可以是应用程序中的 bean,这些 bean 被注入到其他类中以创建流。构建 Spring 应用程序的方法有很多,但一种方法是使用 @Configuration 类定义 StreamBuilderStreamApplications

@Configuration
public class StreamConfiguration {

  @Bean
  public StreamBuilder builder() {
    return Stream.builder(new DataFlowTemplate(URI.create("http://localhost:9393")));
  }

  @Bean
  public StreamApplication httpSource(){
    return new StreamApplication("http");
  }

  @Bean
  public StreamApplication logSink(){
    return new StreamApplication("log");
  }
}

然后,在另一个类中,您可以使用 @Autowire 注释这些类并部署流

@Component
public class MyStreamApps {

  @Autowired
  private StreamBuilder streamBuilder;

  @Autowired
  private StreamApplication httpSource;

  @Autowired
  private StreamApplication logSink;

  public void deploySimpleStream() {
    Stream simpleStream = streamBuilder.name("simpleStream")
                            .source(httpSource)
                            .sink(logSink)
                            .create()
                            .deploy();
  }
}

这种风格允许您跨多个流共享 StreamApplications

使用 DeploymentPropertiesBuilder

无论您选择哪种风格,deploy(Map<String, String> deploymentProperties) 方法都允许您自定义流的部署方式。我们通过使用构建器风格简化了创建具有属性的映射,并为某些属性创建了静态方法,因此您无需记住这些属性的名称。您可以重写前面的 createDeploymentProperties 示例,如下所示

private Map<String, String> createDeploymentProperties() {
	return new DeploymentPropertiesBuilder()
		.count("log", 2)
		.memory("log", 512)
		.put("app.splitter.producer.partitionKeyExpression", "payload")
		.build();
}

此实用程序类旨在帮助创建 Map,并添加了一些方法来帮助定义预定义的属性。

Skipper 部署属性

除了 Spring Cloud Data Flow 之外,您还需要传递某些 Skipper 特定的部署属性,例如,选择目标平台。SkipperDeploymentPropertiesBuilder 提供了 DeploymentPropertiesBuilder 中的所有属性,并添加了 Skipper 所需的属性。以下示例创建了一个 SkipperDeploymentPropertiesBuilder

private Map<String, String> createDeploymentProperties() {
	return new SkipperDeploymentPropertiesBuilder()
		.count("log", 2)
		.memory("log", 512)
		.put("app.splitter.producer.partitionKeyExpression", "payload")
		.platformName("pcf")
		.build();
}