连接到外部 Kafka 集群

Pivotal Cloud Foundry 在其市场中没有将 Apache Kafka 作为托管服务。但是,开发人员通常会开发和部署与 Cloud Foundry 中的外部 Kafka 集群交互的应用程序。本案例专门介绍了 Spring Cloud Stream、Spring Cloud Data Flow 和 Spring Cloud Data Flow for PCF 磁贴的开发者预期。

我们将回顾所需的 Spring Cloud Stream 属性,以及如何将它们转换为 Cloud Foundry 中以下部署选项的应用程序。

  • 应用程序作为独立应用程序实例运行。
  • 应用程序作为流数据管道的一部分,通过开源 SCDF 进行部署。
  • 应用程序作为流数据管道的一部分,通过 SCDF for PCF 磁贴进行部署。

先决条件

我们首先准备外部 Kafka 集群凭据。

通常,一系列 Kafka 代理统称为 Kafka 集群。每个代理都可以通过其外部 IP 地址或定义良好的 DNS 路由(如果已提供)单独访问。

在本演练中,我们坚持使用更简单的三代理集群设置,其 DNS 地址分别为 foo0.broker.foofoo1.broker.foofoo2.broker.foo。每个代理的默认端口为 9092

如果集群是安全的,则根据代理使用的安全选项,当应用程序尝试连接到外部集群时,需要提供不同的属性。同样,为简单起见,我们使用 Kafka 的 JAAS 设置 PlainLoginModule,用户名为 test,密码为 bestest

用户提供的服务与 Spring Boot 属性的比较

Cloud Foundry 开发人员遇到的下一个问题是,是将 Kafka 连接设置为 Cloud Foundry 自定义用户提供的服务 (CUPS),还是简单地将连接凭据作为 Spring Boot 属性传递。

Cloud Foundry 不支持 Spring Cloud ConnectorCF-JavaEnv 对 Kafka 的支持,因此,通过将 Kafka CUPS 与应用程序进行服务绑定,您无法自动解析 VCAP_SERVICES 并将连接凭据传递给运行时的应用程序。即使使用 CUPS,您也有责任解析 VCAP_SERVICES JSON 并将其作为 Boot 属性传递,因此 Kafka 没有自动化。对于好奇的人,您可以在 Spring Cloud Data Flow 的 参考指南 中看到 CUPS 的实际应用。

在本演练中,我们坚持使用 Spring Boot 属性。

独立流式应用程序

应用程序的典型 Cloud Foundry 部署包含一个 manifest.yml 文件。我们使用 source-sample 源应用程序来突出显示连接到外部 Kafka 集群所需的配置

---
applications:
- name: source-sample
  host: source-sample
  memory: 1G
  disk_quota: 1G
  instances: 1
  path: source-sample.jar
env:
    ... # other application properties
    ... # other application properties
    SPRING_APPLICATION_JSON: |-
        {
            "spring.cloud.stream.kafka.binder": {
                "brokers": "foo0.broker.foo,foo1.broker.foo,foo2.broker.foo",
                "jaas.options": {
                    "username": "test",
                    "password":"bestest"
                },
                "jaas.loginModule":"org.apache.kafka.common.security.plain.PlainLoginModule"
            },
            "spring.cloud.stream.bindings.output.destination":"fooTopic"
        }

使用这些设置,当 source-sample 源部署到 Cloud Foundry 时,它应该能够连接到外部集群。

您可以通过访问 source-sample 执行器端点的 /configprops 来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。

Kafka 连接凭据通过 Spring Cloud Stream Kafka binder 属性提供,在本例中,所有属性都带有 spring.spring.cloud.stream.kafka.binder.* 前缀。

或者,除了通过 SPRING_APPLICATION_JSON 提供属性之外,这些属性也可以作为普通环境变量提供。

SCDF 中的流数据管道(开源)

在 SCDF 中部署流数据管道至少需要两个应用程序。我们在这里使用开箱即用的 time 应用程序作为源,log 应用程序作为接收器。

全局 Kafka 连接配置

在我们跳转到演示演练之前,我们将回顾如何在 SCDF 中集中配置全局属性。有了这种灵活性,通过 SCDF 部署的每个流应用程序也会自动继承所有全局定义的属性,这对于 Kafka 连接凭据等情况非常方便。以下清单显示了全局属性

---
applications:
  - name: scdf-server
    host: scdf-server
    memory: 2G
    disk_quota: 2G
    timeout: 180
    instances: 1
    path: spring-cloud-dataflow-server-2.11.3.jar
env:
  SPRING_PROFILES_ACTIVE: cloud
  JBP_CONFIG_SPRING_AUTO_RECONFIGURATION: '{enabled: false}'
  SPRING_CLOUD_SKIPPER_CLIENT_SERVER_URI: http://your-skipper-server-uri/api
  SPRING_APPLICATION_JSON: |-
    {
        "spring.cloud": {
            "dataflow.task.platform.cloudfoundry": {
                "accounts": {
                    "foo": {
                        "connection": {
                            "url": <api-url>,
                            "org": <org>,
                            "space": <space>,
                            "domain": <app-domain>,
                            "username": <email>,
                            "password": <password>,
                            "skipSslValidation": true
                        },
                        "deployment": {
                            "services": <comma delimited list of service>"
                        }
                    }
                }
            },
            "stream": {
                "kafka.binder": {
                    "brokers": "foo0.broker.foo,foo1.broker.foo,foo2.broker.foo",
                    "jaas": {
                        "options": {
                            "username": "test",
                            "password":"bestest"
                        },
                        "loginModule":"org.apache.kafka.common.security.plain.PlainLoginModule"
                    }
                },
                "bindings.output.destination":"fooTopic"
            }
        }
    }
services:
  - mysql

使用前面的 manifest.yml 文件,SCDF 现在应该会自动将 Kafka 连接凭据传播到所有流应用程序部署。现在您可以创建流

dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'

dataflow:>stream deploy --name fooz
Deployment request has been sent for stream 'fooz'

timelog 应用程序在 Cloud Foundry 中成功部署并启动后,它们应该会自动连接到配置的外部 Kafka 集群。

您可以通过访问 timelog 执行器端点的 /configprops 来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。

显式流级别 Kafka 连接配置

或者,如果您只想使用外部 Kafka 连接凭据部署特定流,则可以在使用显式覆盖部署流时这样做

dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'

dataflow:>stream deploy --name fooz --properties "app.*.spring.cloud.stream.kafka.binder.brokers=foo0.broker.foo,foo1.broker.foo,foo2.broker.foo,app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.username=test,app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.password=besttest,app.*.spring.spring.cloud.stream.kafka.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule"
Deployment request has been sent for stream 'fooz'

timelog 应用程序在 Cloud Foundry 中成功部署并启动后,它们应该会自动连接到外部 Kafka 集群。

您可以通过访问 timelog 执行器端点的 /configprops 来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。

用于 PCF 磁贴的 SCDF 中的流数据管道

当您从作为 Pivotal Cloud Foundry 中的托管服务运行的 Spring Cloud Data Flow 部署流时,显式流配置部分中讨论的选项应该仍然有效。

或者,您可以在为 PCF Tile 创建 SCDF 的服务实例时,以 CUPS 属性的形式提供 Kafka 连接凭据。

cf create-service p-dataflow standard data-flow -c '{"messaging-data-service": { "user-provided": {"brokers":"foo0.broker.foo,foo1.broker.foo,foo2.broker.foo","username":"test","password":"bestest"}}}'

这样,在部署流时,您可以将 CUPS 属性作为 VCAP_SERVICES 中的值提供。

dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'

dataflow:>stream deploy --name fooz --properties "app.*.spring.cloud.stream.kafka.binder.brokers=${vcap.services.messaging-<GENERATED_GUID>.credentials.brokers},app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.username=${vcap.services.messaging-<GENERATED_GUID>.credentials.username},app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.password=${vcap.services.messaging-<GENERATED_GUID>.credentials.password},app.*.spring.spring.cloud.stream.kafka.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule"
Deployment request has been sent for stream 'fooz'

<GENERATED_GUID> 替换为生成的 messaging 服务实例名称的 GUID,您可以在 cf services 命令中找到该名称(例如:messaging-b3e76c87-c5ae-47e4-a83c-5fabf2fc4f11)。

作为另一种选择,您还可以通过 SCDF 服务应用程序实例提供全局配置属性。SCDF 服务实例准备就绪后,您可以执行以下操作。

  1. 在应用程序管理器上的 SCDF 服务器应用程序实例 UI 上,转至“设置”。
  2. 找到“用户提供的环境变量”,然后找到 SPRINGCLOUDDATAFLOWTILECONFIGURATION。
  3. 提供以下内容作为值:{"spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers": <foo0.broker.foo>, "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.loginModule": "org.apache.kafka.common.security.plain.PlainLoginModule", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.options.username": "test", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.options.password": "password", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.configuration.security.protocol": "SASL_PLAINTEXT", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.configuration.sasl.mechanism": "PLAIN" }
  4. 应用这些更改后,单击“更新”按钮,并确保在 CF 上重新启动数据流服务器应用程序。

以上配置(步骤 #3 中)中的值仅供说明之用。请相应地更新它们。这些配置适用于使用 SASL PLAINTEXT 安全性保护的 Kafka 集群。