Python 流处理器

本节中的示例代码展示了如何在 Data Flow 流中将 Python 脚本作为处理器运行。

在本指南中,我们将 Python 脚本打包为 Docker 镜像并将其部署到 Kubernetes。我们使用 Apache Kafka 作为消息中间件。我们将 Docker 镜像在 Data Flow 中注册为类型为 Processor 的应用程序。

本指南演示了一个文本处理流数据管道。它通过 HTTP 接收文本消息,将文本处理委托给注册为数据流处理器的 Python 脚本,并将结果打印到日志中。如果 reversestring 属性设置为 true,则 Python 脚本会反转输入文本。否则,结果消息将保持不变。

下图显示了文本反转处理管道

SCDF Python Tasks

开发

您可以在示例 GitHub 存储库 中找到源代码,并从 polyglot-python-processor.zip 下载为压缩存档。

处理器使用 kafka-python 库创建消费者和生产者连接。

主执行循环位于 python_processor.py 中。对于在入站 Kafka 主题上收到的每条消息,脚本要么将输出按原样发送到 Kafka 主题,要么如果 --reversestring=true 作为流定义的一部分传递给处理器,则反转字符串,然后将其发送到输出。以下清单显示了 python_processor.py

#!/usr/bin/env python

import os
import sys

from kafka import KafkaConsumer, KafkaProducer
from util.http_status_server import HttpHealthServer
from util.task_args import get_kafka_binder_brokers, get_input_channel, get_output_channel, get_reverse_string

consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
producer = KafkaProducer(bootstrap_servers=[get_kafka_binder_brokers()])

HttpHealthServer.run_thread()

while True:
    for message in consumer:
        output_message = message.value
        reverse_string = get_reverse_string()

        if reverse_string is not None and reverse_string.lower() == "true":
            output_message = "".join(reversed(message.value))

        producer.send(get_output_channel(), output_message)

帮助器方法定义在一个名为 task_args.py 的实用程序文件中。它们有助于提取常见的环境和命令行值。

HTTPServer 实现作为线程运行,该线程使用始终返回 HTTP 200 的默认实现响应 Spring Boot 路径运行状况检查端点(/actuator/health/actuator/info)。Dockerfile 创建镜像。

为了使 python_processor.py 充当数据流 处理器,需要将其捆绑在 Docker 镜像中并上传到 DockerHub。以下 Dockerfile 显示了如何将 Python 脚本捆绑到 Docker 镜像中

FROM springcloud/openjdk:latest

RUN apt-get update && apt-get install --no-install-recommends -y \
    python-pip \
 && rm -rf /var/lib/apt/lists/*

RUN pip install kafka-python

COPY python_processor.py /processor/
COPY util/*.py /processor/util/

ENTRYPOINT ["python", "/processor/python_processor.py", "$@", "--"]

Dockerfile 安装所需的依赖项,添加 python_processor.py 脚本和实用程序(在 util 文件夹下),并设置命令入口点。

构建

我们现在可以构建 Docker 镜像并将其推送到 DockerHub 注册表。 为此,请执行以下操作

  1. 检出示例项目并导航到 polyglot-python-processor 文件夹

    git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
    cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-processor/
  2. polyglot-python-processor/ 中,构建 polyglot-python-processor Docker 镜像并将其推送到 DockerHub

    docker build -t springcloud/polyglot-python-processor:0.1 .
    docker push springcloud/polyglot-python-processor:0.1

    springcloud 替换为您的 Docker Hub 前缀。

在 Docker Hub 中发布后,您可以在数据流中注册镜像并进行部署。

部署

要部署处理器,请执行以下操作

  1. 按照安装说明在 Kubernetes 上设置数据流。
  2. 通过运行以下命令从 Minikube 检索数据流 URL

    minikube service --url scdf-server
  3. 通过运行以下命令配置您的数据流 shell

    dataflow config server --uri <Your Data Flow URL>
  4. 导入 SCDF 应用启动器

    app import --uri https://dataflow.springframework.org.cn/kafka-docker-latest
  5. polyglot-python-processor 注册为类型为 processorpython-processor

    app register --type processor --name python-processor --uri docker://springcloud/polyglot-python-processor:0.1

    docker://springcloud/polyglot-python-processor:0.1DockerHub 存储库解析。

  6. 通过运行以下命令创建数据流 text-reversal

    stream create --name text-reversal --definition "http --server.port=32123 | python-processor --reversestring=true  | log"

    http 源侦听端口 32123 上的传入 HTTP 消息,并将它们转发到 python-processor。 处理器配置为反转输入消息(如果 reversestring=true)并将它们向下游发送到 log 接收器。

  7. 通过使用 kubernetes.createNodePort 属性将 HTTP 端口公开给本地主机来部署流,方法是运行以下命令

    stream deploy text-reversal --properties "deployer.http.kubernetes.createNodePort=32123"
  8. 从 minikube 检索 http-source URL 以发布测试数据,方法是运行以下命令

    minikube service --url text-reversal-http-v1
    http://192.168.99.104:32123
  9. 通过运行以下命令针对 http-source 应用程序发布示例消息

    http post --target http://192.168.99.104:32123 --data "hello world"

    如果发布成功,您应该会看到如下确认消息

    > POST (text/plain) http://192.168.99.104:32123 hello world
    > 202 ACCEPTED
  10. 通过运行以下命令检查已发布消息的日志

    kubectl logs -f <log pod name>

    您应该会看到类似于以下内容的输出

    INFO 1 --- [container-0-C-1] log-sink                                 : dlrow olleh

    您应该会看到以相反顺序发布的消息(在本例中为 dlrow olleh)。