创建并部署 Python 任务

本案例展示了如何将自定义 Python 脚本作为 Data Flow 任务 运行,以及如何将其编排为 组合任务

该方法需要将 Python 脚本打包到 Docker 镜像中,然后可以在 SCDF 的 本地Kubernetes 实现中使用。

下图展示了该解决方案的架构和涉及的各个组件

SCDF Python Tasks

当数据流作为任务启动 Python 脚本时,脚本将运行并以成功或失败状态完成。由于这不是标准的 Spring Cloud Task 应用程序,因此用户有责任管理生命周期并将状态更新到数据流也使用的共享数据库。提供了实用程序来帮助处理启动参数并管理数据流数据库中的任务状态。

您可以在示例 GitHub 存储库 中找到源代码,或将其下载为压缩存档:polyglot-python-task.zip。按照构建说明构建和使用项目。

开发

下一个示例中的 python_task.py 脚本显示了一个示例 Python 脚本,您可以将其注册为 Spring Cloud Task。启动时,Python 脚本会打印一条确认消息。然后它会休眠 60 秒,然后完成。如果存在 --error.message=<Text> 启动参数,则脚本会抛出异常以模拟执行失败。

from util.task_status import TaskStatus
from util.task_args import get_task_id, get_db_url, get_task_name, get_cmd_arg

try:
    # Connect to SCDF's database.
    status = TaskStatus(get_task_id(), get_db_url())

    # Set task's status to RUNNING.
    status.running()

    # Do something.
    print('Start task:{}, id:{}'.format(get_task_name(), get_task_id()))

    print('Wait for 60 seconds ...')
    sys.stdout.flush()
    time.sleep(60)

    if get_cmd_arg('error.message') is not None:
        raise Exception(get_cmd_arg('error.message'))

    # Set task's status to COMPLETED.
    status.completed()

except Exception as exp:
    # Set task's status to FAILED.
    status.failed(1, 'Task failed: {}'.format(exp))

由于 Python 脚本不受 Spring Cloud Task 管理,因此用户有责任使用数据流数据库管理和更新进度。

为了解析输入参数并在数据流中管理其状态,自定义脚本使用以下实用程序

  • task_status.py 帮助访问和更新数据流 TASK_EXECUTION 表,以便反映任务的生命周期事件。TaskStatus 类采用 task idsqlalchemy url 参数(从命令行参数计算得出),并提供用于将任务状态设置为 runningcompletedfailed(with exitCode, errorMessage) 的 API。为了访问数据流数据库,task_status 使用以下启动参数,这些参数由数据流在每次任务启动时自动提供

    --spring.datasource.username=root
    --spring.datasource.password=yourpassword
    --spring.datasource.url=jdbc:mariadb://<mariadb-host>:<port>/mariadb
    --spring.cloud.task.executionid=26

    spring.cloud.task.executionid 属性表示任务 ID,在数据流内部已知并保存在 TASK_EXECUTION 表中。

  • task_args.py 实用程序帮助提取默认(即 exec)入口点样式的任务参数。该实用程序还为可能使用 SCDF 配置的不同数据库构建 sqlalchemy URL(当前仅测试了 MariaDB)。查看 getdburl() 实现。

为了使 python_task.py 充当数据流任务,需要将其捆绑在 Docker 映像中并上传到 DockerHub。以下 Dockerfile 说明了如何将 Python 脚本捆绑到 Docker 映像中

FROM python:3.7.3-slim

RUN apt-get update
RUN apt-get install build-essential -y
RUN apt-get install libmariadbclient-dev -y
RUN pip install mariadb
RUN pip install sqlalchemy

ADD python_task.py /
ADD util/* /util/

ENTRYPOINT ["python","/python_task.py"]
CMD []

它安装所需的依赖项并添加任务脚本(例如,ADD python_task.py)和实用程序(在 util 文件夹下)。

将命令留空 ([]) 并显式设置入口点。

构建

  1. 检出 示例项目 并导航到 polyglot-python-task 文件夹
git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-task/
  1. 构建 Docker 镜像并将其推送到 DockerHub
docker build -t springcloud/python-task-with-status:0.1 .
docker push springcloud/python-task-with-status:0.1

提示:将 springcloud 替换为您的 Docker Hub 前缀。

  1. 将 Docker 镜像注册为 Data Flow task 应用程序
app register --type task  --name python-task-with-status --uri docker://springcloud/python-task-with-status:0.1

部署

按照 安装说明 在 Kubernetes 上设置 Data Flow。

创建 Python 脚本并将其作为 Data Flow 任务启动

task create --name python-task --definition "python-task-with-status"
task launch --name python-task

使用 kubectl get allkubectl logs -f po/python-task-XXXXXX 监控任务的输出。使用 Data Flow UI/task 或 shell (task list) 监控 python-task 的状态。

成功启动任务后,您应该会在 Data Flow 任务 UI 中看到以下报告

Successful Python Tasks

再次启动 python-task,并使用 --error.message=MyTestError 启动参数(以模拟错误)

task launch --name python-task --arguments "--error.message=MyTestError"

第二次任务执行(#2)失败,如 Data Flow 任务 UI 所示

Python Tasks Failure

与组合任务一起使用

借助提供的任务状态管理,您可以在 组合任务 中使用 Docker 和 Python 任务。

以下示例定义了一个并行任务执行

task create compose2 --definition "<pp1: python-task-with-status || pp2: python-task-with-status>"
task launch --name compose2

前面的示例将并行启动两次 python-task,如下所示

Parallel Composite Polyglot Tasks

相反,以下组合任务按顺序启动定义的任务

task create sequence1 --definition "t1: timestamp && python-task-with-status && t2: timestamp”
task launch --name sequence1

Sequence Composite Polyglot Tasks