创建和部署 Python 应用
本案例展示了如何将 Python 脚本部署为 Data Flow 应用。与其他应用类型(source
、processor
或 sink
)不同,Data Flow 在部署 app
应用类型时不会设置连接生产者和消费者的部署属性。开发者有责任在部署时“连接”多个应用,以便它们使用部署属性进行通信。
本案例创建了一个数据处理管道,该管道将 input
流的时间戳分派到下游通道 even
或 odd
。从技术上讲,本案例实现了 动态路由器 集成模式。该管道从 timeDest
输入通道接收 timestamps
消息。根据时间戳值,它将消息路由到专用下游通道 evenDest
或 oddDest
之一。
下图显示了数据处理管道架构
作为时间戳源,该应用使用预构建的 时间源 应用,但将其注册为 Data Flow App
类型。它不断地向名为 timeDest
的下游 Kafka 主题发送时间戳。
由 Python 脚本实现并打包为 Docker 镜像的 Router
应用从 timeDest
Kafka 主题消费传入的时间戳,并根据时间戳值将消息向下游路由到 evenDest
Kafka 主题或 oddDest
Kafka 主题。
Even Logger
和 Odd Logger
组件是预构建的 Log Sink 应用程序,但注册为 Data Flow App
类型。记录器使用 evenDest
或 oddDest
主题并在控制台上打印传入消息。
Apache Kafka 用作消息中间件。
开发
您可以在示例 GitHub 存储库 中找到源代码,并将其下载为压缩存档:polyglot-python-app.zip。
pythonrouterapp.py 实现了时间戳路由器应用程序逻辑
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
class Router:
def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):
self.kafka_brokers = kafka_brokers
self.input_topic = input_topic
self.even_topic = even_topic
self.odd_topic = odd_topic
# Serve the liveliness and readiness probes via http server in a separate thread.
Actuator.start(port=8080, info=info)
# Ensure the output topics exist.
self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])
self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
def __create_topics_if_missing(self, topic_names):
admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
for topic in topic_names:
try:
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
except TopicAlreadyExistsError:
print ('Topic: {} already exists!')
def process_timestamps(self):
while True:
for message in self.consumer:
if message.value is not None:
if self.is_even_timestamp(message.value):
self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
else:
self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)
@staticmethod
def is_even_timestamp(value):
return int(value[-1:]) % 2 == 0
Router(
get_env_info(),
get_kafka_brokers(),
get_channel_topic('input'),
get_channel_topic('even'),
get_channel_topic('odd')
).process_timestamps()
如果在 Python 脚本中使用 print
命令,则必须使用 sys.stdout.flush()
刷新输出缓冲区,以防止其被填满并导致 Kafka 的消费者-生产者流程中断。
- 使用
kafka-python
库来消费和生成 Kafka 消息。process_timestamps
方法不断从输入通道消费时间戳,并将偶数或奇数值路由到输出通道。 Actuator
类位于 actuator.py 实用程序中,用于公开有关正在运行的应用程序的操作信息,例如运行状况、活动状态、信息等。它在单独的线程中运行嵌入式 HTTP 服务器,并公开/actuator/health
和/actuator/info
入口点来处理 Kubernetes 活动状态和就绪状态探测请求。- arguments.py 实用程序有助于从命令行参数和环境变量中检索所需的输入参数。该实用程序假定默认(即 exec)入口点样式。请注意,Data Flow 将 Kafka 代理连接属性作为环境变量传递。
为了使 python_router_app.py
充当 Data Flow app
,需要将其捆绑在 Docker 映像中并上传到 DockerHub
。以下 Dockerfile 显示了如何将 Python 脚本捆绑到 Docker 映像中
FROM python:3.7.3-slim
RUN pip install kafka-python
RUN pip install flask
ADD /util/* /util/
ADD python_router_app.py /
ENTRYPOINT ["python","/python_router_app.py"]
CMD []
Dockerfile 安装了所需的依赖项,添加了 Python 脚本(ADD python_router_app.py
)和实用程序(在 util
文件夹下),并设置了命令入口。
构建
现在我们构建 Docker 镜像并将其推送到 DockerHub 注册表。
检出示例项目并导航到 polyglot-python-app
文件夹
git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-app/
在 polyglot-python-app
中,构建 polyglot-python-app Docker 镜像并将其推送到 DockerHub
docker build -t springcloud/polyglot-python-app:0.2 .
docker push springcloud/polyglot-python-app:0.2
将 springcloud
替换为您的 Docker Hub 前缀。
在 Docker Hub 中发布后,即可在数据流中注册并部署该镜像。
部署
按照安装说明在 Kubernetes 上设置数据流。
从 minikube 中检索数据流 URL(minikube service --url scdf-server
)并配置您的数据流 shell
dataflow config server --uri http://192.168.99.100:30868
导入 SCDF time
和 log
应用启动器,并将 polyglot-python-app
注册为类型为 app
的 python-router
app register --name time --type app --uri docker:springcloudstream/time-source-kafka:3.2.1 --metadata-uri maven://org.springframework.cloud.stream.app:time-source-kafka:jar:metadata:3.2.1
app register --name log --type app --uri docker:springcloudstream/log-sink-kafka:2.1.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:log-sink-kafka:jar:metadata:2.1.1.RELEASE
app register --type app --name python-router --uri docker://springcloud/polyglot-python-app:0.2
docker://springcloud/polyglot-python-app:0.2
从DockerHub 仓库解析。
创建时间戳路由流管道
stream create --name timeStampStream --definition "time || python-router || evenLogger: log || oddLogger: log"
前面显示的流定义利用了 DSL 中的标签功能。
因此,将创建以下流管道
time
、log
和 python-router
应用注册为应用类型的应用程序,因此可以有多个输入和输出绑定(即通道)。数据流不对从一个应用程序到另一个应用程序的数据流做任何假设。在部署时,“连接”多个应用程序以使它们能够通信是开发人员的责任。
牢记这一点,我们使用polyglot-python-app-deployment.properties文件中的部署属性来部署时间戳流管道
stream deploy --name timeStampStream --propertiesFile <polyglot-python-app folder>/polyglot-python-app-deployment.properties
部署属性定义了用于连接时间、python-router 和记录器应用程序的 Kafka 主题
app.time.spring.cloud.stream.bindings.output.destination=timeDest
app.python-router.spring.cloud.stream.bindings.input.destination=timeDest
app.python-router.spring.cloud.stream.bindings.even.destination=evenDest
app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest
app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest
app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest
app.python-router.xxx 前缀是数据流约定,用于将前缀后指定的属性映射到 timeStampStream 流中的 python-router 应用。
时间戳通道绑定到 timeDest
Kafka 主题。路由器的偶数输出通道绑定到 evenDest
主题,奇数通道绑定到 oddDest
主题。部署后,数据流如下所示
-
使用
kubectl get all
命令列出已部署的 k8s 容器的状态。使用kubectl logs -f xxx
观察偶数和奇数管道输出。例如,
kubectl logs -f po/timestampstream-evenlogger-xxx
应该输出2019-05-17 17:56:36.241 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:36 2019-05-17 17:56:38.301 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:38 2019-05-17 17:56:40.402 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:40 ...
kubectl logs -f po/timestampstream-oddlogger-xxx
应该输出2019-05-17 17:56:37.447 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:37 2019-05-17 17:56:39.358 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:39 2019-05-17 17:56:41.452 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:41 ...