Python 流处理器
本节中的示例代码展示了如何在 Data Flow 流中将 Python 脚本作为处理器运行。
在本指南中,我们将 Python 脚本打包为 Docker 镜像并将其部署到 Kubernetes。我们使用 Apache Kafka 作为消息中间件。我们将 Docker 镜像在 Data Flow 中注册为类型为 Processor
的应用程序。
本指南演示了一个文本处理流数据管道。它通过 HTTP 接收文本消息,将文本处理委托给注册为数据流处理器的 Python 脚本,并将结果打印到日志中。如果 reversestring
属性设置为 true
,则 Python 脚本会反转输入文本。否则,结果消息将保持不变。
下图显示了文本反转处理管道
开发
您可以在示例 GitHub 存储库 中找到源代码,并从 polyglot-python-processor.zip 下载为压缩存档。
处理器使用 kafka-python 库创建消费者和生产者连接。
主执行循环位于 python_processor.py 中。对于在入站 Kafka 主题上收到的每条消息,脚本要么将输出按原样发送到 Kafka 主题,要么如果 --reversestring=true
作为流定义的一部分传递给处理器,则反转字符串,然后将其发送到输出。以下清单显示了 python_processor.py
#!/usr/bin/env python
import os
import sys
from kafka import KafkaConsumer, KafkaProducer
from util.http_status_server import HttpHealthServer
from util.task_args import get_kafka_binder_brokers, get_input_channel, get_output_channel, get_reverse_string
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
producer = KafkaProducer(bootstrap_servers=[get_kafka_binder_brokers()])
HttpHealthServer.run_thread()
while True:
for message in consumer:
output_message = message.value
reverse_string = get_reverse_string()
if reverse_string is not None and reverse_string.lower() == "true":
output_message = "".join(reversed(message.value))
producer.send(get_output_channel(), output_message)
帮助器方法定义在一个名为 task_args.py
的实用程序文件中。它们有助于提取常见的环境和命令行值。
HTTPServer
实现作为线程运行,该线程使用始终返回 HTTP 200 的默认实现响应 Spring Boot 路径运行状况检查端点(/actuator/health
和 /actuator/info
)。Dockerfile
创建镜像。
为了使 python_processor.py
充当数据流 处理器
,需要将其捆绑在 Docker 镜像中并上传到 DockerHub
。以下 Dockerfile 显示了如何将 Python 脚本捆绑到 Docker 镜像中
FROM springcloud/openjdk:latest
RUN apt-get update && apt-get install --no-install-recommends -y \
python-pip \
&& rm -rf /var/lib/apt/lists/*
RUN pip install kafka-python
COPY python_processor.py /processor/
COPY util/*.py /processor/util/
ENTRYPOINT ["python", "/processor/python_processor.py", "$@", "--"]
Dockerfile 安装所需的依赖项,添加 python_processor.py
脚本和实用程序(在 util
文件夹下),并设置命令入口点。
构建
我们现在可以构建 Docker 镜像并将其推送到 DockerHub 注册表。 为此,请执行以下操作
-
检出示例项目并导航到
polyglot-python-processor
文件夹git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-processor/
-
在
polyglot-python-processor/
中,构建polyglot-python-processor
Docker 镜像并将其推送到 DockerHubdocker build -t springcloud/polyglot-python-processor:0.1 . docker push springcloud/polyglot-python-processor:0.1
将
springcloud
替换为您的 Docker Hub 前缀。
在 Docker Hub 中发布后,您可以在数据流中注册镜像并进行部署。
部署
要部署处理器,请执行以下操作
- 按照安装说明在 Kubernetes 上设置数据流。
-
通过运行以下命令从 Minikube 检索数据流 URL
minikube service --url scdf-server
-
通过运行以下命令配置您的数据流 shell
dataflow config server --uri <Your Data Flow URL>
-
导入 SCDF 应用启动器
app import --uri https://dataflow.springframework.org.cn/kafka-docker-latest
-
将
polyglot-python-processor
注册为类型为processor
的python-processor
。app register --type processor --name python-processor --uri docker://springcloud/polyglot-python-processor:0.1
docker://springcloud/polyglot-python-processor:0.1
从DockerHub 存储库解析。 -
通过运行以下命令创建数据流
text-reversal
流stream create --name text-reversal --definition "http --server.port=32123 | python-processor --reversestring=true | log"
http
源侦听端口32123
上的传入 HTTP 消息,并将它们转发到python-processor
。 处理器配置为反转输入消息(如果reversestring=true
)并将它们向下游发送到log
接收器。 -
通过使用
kubernetes.createNodePort
属性将 HTTP 端口公开给本地主机来部署流,方法是运行以下命令stream deploy text-reversal --properties "deployer.http.kubernetes.createNodePort=32123"
-
从 minikube 检索
http-source
URL 以发布测试数据,方法是运行以下命令minikube service --url text-reversal-http-v1 http://192.168.99.104:32123
-
通过运行以下命令针对
http-source
应用程序发布示例消息http post --target http://192.168.99.104:32123 --data "hello world"
如果发布成功,您应该会看到如下确认消息
> POST (text/plain) http://192.168.99.104:32123 hello world > 202 ACCEPTED
-
通过运行以下命令检查已发布消息的日志
kubectl logs -f <log pod name>
您应该会看到类似于以下内容的输出
INFO 1 --- [container-0-C-1] log-sink : dlrow olleh
您应该会看到以相反顺序发布的消息(在本例中为
dlrow olleh
)。