创建和部署 Python 应用

本案例展示了如何将 Python 脚本部署为 Data Flow 应用。与其他应用类型(sourceprocessorsink)不同,Data Flow 在部署 app 应用类型时不会设置连接生产者和消费者的部署属性。开发者有责任在部署时“连接”多个应用,以便它们使用部署属性进行通信。

本案例创建了一个数据处理管道,该管道将 input 流的时间戳分派到下游通道 evenodd。从技术上讲,本案例实现了 动态路由器 集成模式。该管道从 timeDest 输入通道接收 timestamps 消息。根据时间戳值,它将消息路由到专用下游通道 evenDestoddDest 之一。

下图显示了数据处理管道架构

SCDF Python Tasks

作为时间戳源,该应用使用预构建的 时间源 应用,但将其注册为 Data Flow App 类型。它不断地向名为 timeDest 的下游 Kafka 主题发送时间戳。

由 Python 脚本实现并打包为 Docker 镜像的 Router 应用从 timeDest Kafka 主题消费传入的时间戳,并根据时间戳值将消息向下游路由到 evenDest Kafka 主题或 oddDest Kafka 主题。

Even LoggerOdd Logger 组件是预构建的 Log Sink 应用程序,但注册为 Data Flow App 类型。记录器使用 evenDestoddDest 主题并在控制台上打印传入消息。

Apache Kafka 用作消息中间件。

开发

您可以在示例 GitHub 存储库 中找到源代码,并将其下载为压缩存档:polyglot-python-app.zip

pythonrouterapp.py 实现了时间戳路由器应用程序逻辑

from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic

class Router:

    def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):

        self.kafka_brokers = kafka_brokers
        self.input_topic = input_topic
        self.even_topic = even_topic
        self.odd_topic = odd_topic

        # Serve the liveliness and readiness probes via http server in a separate thread.
        Actuator.start(port=8080, info=info)

        # Ensure the output topics exist.
        self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])

        self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
        self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)

    def __create_topics_if_missing(self, topic_names):
        admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
        for topic in topic_names:
            try:
                new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
                admin_client.create_topics(new_topics=[new_topic], validate_only=False)
            except TopicAlreadyExistsError:
                print ('Topic: {} already exists!')

    def process_timestamps(self):
        while True:
            for message in self.consumer:
                if message.value is not None:
                    if self.is_even_timestamp(message.value):
                        self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
                    else:
                        self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)

    @staticmethod
    def is_even_timestamp(value):
        return int(value[-1:]) % 2 == 0


Router(
    get_env_info(),
    get_kafka_brokers(),
    get_channel_topic('input'),
    get_channel_topic('even'),
    get_channel_topic('odd')
).process_timestamps()

如果在 Python 脚本中使用 print 命令,则必须使用 sys.stdout.flush() 刷新输出缓冲区,以防止其被填满并导致 Kafka 的消费者-生产者流程中断。

  • 使用 kafka-python 库来消费和生成 Kafka 消息。process_timestamps 方法不断从输入通道消费时间戳,并将偶数或奇数值路由到输出通道。
  • Actuator 类位于 actuator.py 实用程序中,用于公开有关正在运行的应用程序的操作信息,例如运行状况、活动状态、信息等。它在单独的线程中运行嵌入式 HTTP 服务器,并公开 /actuator/health/actuator/info 入口点来处理 Kubernetes 活动状态和就绪状态探测请求。
  • arguments.py 实用程序有助于从命令行参数和环境变量中检索所需的输入参数。该实用程序假定默认(即 exec)入口点样式。请注意,Data Flow 将 Kafka 代理连接属性作为环境变量传递。

为了使 python_router_app.py 充当 Data Flow app,需要将其捆绑在 Docker 映像中并上传到 DockerHub。以下 Dockerfile 显示了如何将 Python 脚本捆绑到 Docker 映像中

FROM python:3.7.3-slim
RUN pip install kafka-python
RUN pip install flask
ADD /util/* /util/
ADD python_router_app.py /
ENTRYPOINT ["python","/python_router_app.py"]
CMD []

Dockerfile 安装了所需的依赖项,添加了 Python 脚本(ADD python_router_app.py)和实用程序(在 util 文件夹下),并设置了命令入口。

构建

现在我们构建 Docker 镜像并将其推送到 DockerHub 注册表。

检出示例项目并导航到 polyglot-python-app 文件夹

git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-app/

polyglot-python-app 中,构建 polyglot-python-app Docker 镜像并将其推送到 DockerHub

docker build -t springcloud/polyglot-python-app:0.2 .
docker push springcloud/polyglot-python-app:0.2

springcloud 替换为您的 Docker Hub 前缀。

在 Docker Hub 中发布后,即可在数据流中注册并部署该镜像。

部署

按照安装说明在 Kubernetes 上设置数据流。

从 minikube 中检索数据流 URL(minikube service --url scdf-server)并配置您的数据流 shell

dataflow config server --uri http://192.168.99.100:30868

导入 SCDF timelog 应用启动器,并将 polyglot-python-app 注册为类型为 apppython-router

app register --name time --type app --uri docker:springcloudstream/time-source-kafka:3.2.1 --metadata-uri maven://org.springframework.cloud.stream.app:time-source-kafka:jar:metadata:3.2.1

app register --name log --type app --uri docker:springcloudstream/log-sink-kafka:2.1.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:log-sink-kafka:jar:metadata:2.1.1.RELEASE

app register --type app --name python-router --uri docker://springcloud/polyglot-python-app:0.2

docker://springcloud/polyglot-python-app:0.2DockerHub 仓库解析。

创建时间戳路由流管道

stream create --name timeStampStream --definition "time || python-router || evenLogger: log || oddLogger: log"

前面显示的流定义利用了 DSL 中的标签功能

因此,将创建以下流管道

timeStampStream un-deployed

timelogpython-router 应用注册为应用类型的应用程序,因此可以有多个输入和输出绑定(即通道)。数据流不对从一个应用程序到另一个应用程序的数据流做任何假设。在部署时,“连接”多个应用程序以使它们能够通信是开发人员的责任。

牢记这一点,我们使用polyglot-python-app-deployment.properties文件中的部署属性来部署时间戳流管道

stream deploy --name timeStampStream --propertiesFile <polyglot-python-app folder>/polyglot-python-app-deployment.properties

部署属性定义了用于连接时间、python-router 和记录器应用程序的 Kafka 主题

app.time.spring.cloud.stream.bindings.output.destination=timeDest

app.python-router.spring.cloud.stream.bindings.input.destination=timeDest
app.python-router.spring.cloud.stream.bindings.even.destination=evenDest
app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest

app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest
app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest

app.python-router.xxx 前缀是数据流约定,用于将前缀后指定的属性映射到 timeStampStream 流中的 python-router 应用。

时间戳通道绑定到 timeDest Kafka 主题。路由器的偶数输出通道绑定到 evenDest 主题,奇数通道绑定到 oddDest 主题。部署后,数据流如下所示

timeStampStream deployed

  • 使用 kubectl get all 命令列出已部署的 k8s 容器的状态。使用 kubectl logs -f xxx 观察偶数和奇数管道输出。

    例如,kubectl logs -f po/timestampstream-evenlogger-xxx 应该输出

    2019-05-17 17:56:36.241  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:36
    2019-05-17 17:56:38.301  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:38
    2019-05-17 17:56:40.402  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:40
    ...

    kubectl logs -f po/timestampstream-oddlogger-xxx 应该输出

    2019-05-17 17:56:37.447  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:37
    2019-05-17 17:56:39.358  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:39
    2019-05-17 17:56:41.452  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:41
    ...