连接到外部 Kafka 集群
Pivotal Cloud Foundry 在其市场中没有将 Apache Kafka 作为托管服务。但是,开发人员通常会开发和部署与 Cloud Foundry 中的外部 Kafka 集群交互的应用程序。本案例专门介绍了 Spring Cloud Stream、Spring Cloud Data Flow 和 Spring Cloud Data Flow for PCF 磁贴的开发者预期。
我们将回顾所需的 Spring Cloud Stream 属性,以及如何将它们转换为 Cloud Foundry 中以下部署选项的应用程序。
- 应用程序作为独立应用程序实例运行。
- 应用程序作为流数据管道的一部分,通过开源 SCDF 进行部署。
- 应用程序作为流数据管道的一部分,通过 SCDF for PCF 磁贴进行部署。
先决条件
我们首先准备外部 Kafka 集群凭据。
通常,一系列 Kafka 代理统称为 Kafka 集群。每个代理都可以通过其外部 IP 地址或定义良好的 DNS 路由(如果已提供)单独访问。
在本演练中,我们坚持使用更简单的三代理集群设置,其 DNS 地址分别为 foo0.broker.foo
、foo1.broker.foo
和 foo2.broker.foo
。每个代理的默认端口为 9092
。
如果集群是安全的,则根据代理使用的安全选项,当应用程序尝试连接到外部集群时,需要提供不同的属性。同样,为简单起见,我们使用 Kafka 的 JAAS 设置 PlainLoginModule
,用户名为 test
,密码为 bestest
。
用户提供的服务与 Spring Boot 属性的比较
Cloud Foundry 开发人员遇到的下一个问题是,是将 Kafka 连接设置为 Cloud Foundry 自定义用户提供的服务 (CUPS),还是简单地将连接凭据作为 Spring Boot 属性传递。
Cloud Foundry 不支持 Spring Cloud Connector 或 CF-JavaEnv 对 Kafka 的支持,因此,通过将 Kafka CUPS 与应用程序进行服务绑定,您无法自动解析 VCAP_SERVICES
并将连接凭据传递给运行时的应用程序。即使使用 CUPS,您也有责任解析 VCAP_SERVICES
JSON 并将其作为 Boot 属性传递,因此 Kafka 没有自动化。对于好奇的人,您可以在 Spring Cloud Data Flow 的 参考指南 中看到 CUPS 的实际应用。
在本演练中,我们坚持使用 Spring Boot 属性。
独立流式应用程序
应用程序的典型 Cloud Foundry 部署包含一个 manifest.yml
文件。我们使用 source-sample
源应用程序来突出显示连接到外部 Kafka 集群所需的配置
---
applications:
- name: source-sample
host: source-sample
memory: 1G
disk_quota: 1G
instances: 1
path: source-sample.jar
env:
... # other application properties
... # other application properties
SPRING_APPLICATION_JSON: |-
{
"spring.cloud.stream.kafka.binder": {
"brokers": "foo0.broker.foo,foo1.broker.foo,foo2.broker.foo",
"jaas.options": {
"username": "test",
"password":"bestest"
},
"jaas.loginModule":"org.apache.kafka.common.security.plain.PlainLoginModule"
},
"spring.cloud.stream.bindings.output.destination":"fooTopic"
}
使用这些设置,当 source-sample
源部署到 Cloud Foundry 时,它应该能够连接到外部集群。
您可以通过访问 source-sample
执行器端点的 /configprops
来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。
Kafka 连接凭据通过 Spring Cloud Stream Kafka binder 属性提供,在本例中,所有属性都带有 spring.spring.cloud.stream.kafka.binder.*
前缀。
或者,除了通过 SPRING_APPLICATION_JSON
提供属性之外,这些属性也可以作为普通环境变量提供。
SCDF 中的流数据管道(开源)
在 SCDF 中部署流数据管道至少需要两个应用程序。我们在这里使用开箱即用的 time
应用程序作为源,log
应用程序作为接收器。
全局 Kafka 连接配置
在我们跳转到演示演练之前,我们将回顾如何在 SCDF 中集中配置全局属性。有了这种灵活性,通过 SCDF 部署的每个流应用程序也会自动继承所有全局定义的属性,这对于 Kafka 连接凭据等情况非常方便。以下清单显示了全局属性
---
applications:
- name: scdf-server
host: scdf-server
memory: 2G
disk_quota: 2G
timeout: 180
instances: 1
path: spring-cloud-dataflow-server-2.11.3.jar
env:
SPRING_PROFILES_ACTIVE: cloud
JBP_CONFIG_SPRING_AUTO_RECONFIGURATION: '{enabled: false}'
SPRING_CLOUD_SKIPPER_CLIENT_SERVER_URI: http://your-skipper-server-uri/api
SPRING_APPLICATION_JSON: |-
{
"spring.cloud": {
"dataflow.task.platform.cloudfoundry": {
"accounts": {
"foo": {
"connection": {
"url": <api-url>,
"org": <org>,
"space": <space>,
"domain": <app-domain>,
"username": <email>,
"password": <password>,
"skipSslValidation": true
},
"deployment": {
"services": <comma delimited list of service>"
}
}
}
},
"stream": {
"kafka.binder": {
"brokers": "foo0.broker.foo,foo1.broker.foo,foo2.broker.foo",
"jaas": {
"options": {
"username": "test",
"password":"bestest"
},
"loginModule":"org.apache.kafka.common.security.plain.PlainLoginModule"
}
},
"bindings.output.destination":"fooTopic"
}
}
}
services:
- mysql
使用前面的 manifest.yml
文件,SCDF 现在应该会自动将 Kafka 连接凭据传播到所有流应用程序部署。现在您可以创建流
dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'
dataflow:>stream deploy --name fooz
Deployment request has been sent for stream 'fooz'
当 time
和 log
应用程序在 Cloud Foundry 中成功部署并启动后,它们应该会自动连接到配置的外部 Kafka 集群。
您可以通过访问 time
或 log
执行器端点的 /configprops
来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。
显式流级别 Kafka 连接配置
或者,如果您只想使用外部 Kafka 连接凭据部署特定流,则可以在使用显式覆盖部署流时这样做
dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'
dataflow:>stream deploy --name fooz --properties "app.*.spring.cloud.stream.kafka.binder.brokers=foo0.broker.foo,foo1.broker.foo,foo2.broker.foo,app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.username=test,app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.password=besttest,app.*.spring.spring.cloud.stream.kafka.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule"
Deployment request has been sent for stream 'fooz'
当 time
和 log
应用程序在 Cloud Foundry 中成功部署并启动后,它们应该会自动连接到外部 Kafka 集群。
您可以通过访问 time
或 log
执行器端点的 /configprops
来验证连接凭据。同样,您也可以在应用程序日志中看到打印的连接凭据。
用于 PCF 磁贴的 SCDF 中的流数据管道
当您从作为 Pivotal Cloud Foundry 中的托管服务运行的 Spring Cloud Data Flow 部署流时,显式流配置部分中讨论的选项应该仍然有效。
或者,您可以在为 PCF Tile 创建 SCDF 的服务实例时,以 CUPS 属性的形式提供 Kafka 连接凭据。
cf create-service p-dataflow standard data-flow -c '{"messaging-data-service": { "user-provided": {"brokers":"foo0.broker.foo,foo1.broker.foo,foo2.broker.foo","username":"test","password":"bestest"}}}'
这样,在部署流时,您可以将 CUPS 属性作为 VCAP_SERVICES
中的值提供。
dataflow:>stream create fooz --definition "time | log"
Created new stream 'fooz'
dataflow:>stream deploy --name fooz --properties "app.*.spring.cloud.stream.kafka.binder.brokers=${vcap.services.messaging-<GENERATED_GUID>.credentials.brokers},app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.username=${vcap.services.messaging-<GENERATED_GUID>.credentials.username},app.*.spring.spring.cloud.stream.kafka.binder.jaas.options.password=${vcap.services.messaging-<GENERATED_GUID>.credentials.password},app.*.spring.spring.cloud.stream.kafka.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule"
Deployment request has been sent for stream 'fooz'
将 <GENERATED_GUID>
替换为生成的 messaging 服务实例名称的 GUID,您可以在 cf services
命令中找到该名称(例如:messaging-b3e76c87-c5ae-47e4-a83c-5fabf2fc4f11
)。
作为另一种选择,您还可以通过 SCDF 服务应用程序实例提供全局配置属性。SCDF 服务实例准备就绪后,您可以执行以下操作。
- 在应用程序管理器上的 SCDF 服务器应用程序实例 UI 上,转至“设置”。
- 找到“用户提供的环境变量”,然后找到 SPRINGCLOUDDATAFLOWTILECONFIGURATION。
- 提供以下内容作为值:{"spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers": <foo0.broker.foo>, "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.loginModule": "org.apache.kafka.common.security.plain.PlainLoginModule", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.options.username": "test", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.jaas.options.password": "password", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.configuration.security.protocol": "SASL_PLAINTEXT", "spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.configuration.sasl.mechanism": "PLAIN" }
- 应用这些更改后,单击“更新”按钮,并确保在 CF 上重新启动数据流服务器应用程序。
以上配置(步骤 #3 中)中的值仅供说明之用。请相应地更新它们。这些配置适用于使用 SASL PLAINTEXT 安全性保护的 Kafka 集群。