创建并部署 Python 任务
本案例展示了如何将自定义 Python 脚本作为 Data Flow 任务 运行,以及如何将其编排为 组合任务。
该方法需要将 Python 脚本打包到 Docker 镜像中,然后可以在 SCDF 的 本地
和 Kubernetes
实现中使用。
下图展示了该解决方案的架构和涉及的各个组件
当数据流作为任务启动 Python 脚本时,脚本将运行并以成功或失败状态完成。由于这不是标准的 Spring Cloud Task 应用程序,因此用户有责任管理生命周期并将状态更新到数据流也使用的共享数据库。提供了实用程序来帮助处理启动参数并管理数据流数据库中的任务状态。
您可以在示例 GitHub 存储库 中找到源代码,或将其下载为压缩存档:polyglot-python-task.zip。按照构建说明构建和使用项目。
开发
下一个示例中的 python_task.py 脚本显示了一个示例 Python 脚本,您可以将其注册为 Spring Cloud Task。启动时,Python 脚本会打印一条确认消息。然后它会休眠 60 秒,然后完成。如果存在 --error.message=<Text>
启动参数,则脚本会抛出异常以模拟执行失败。
from util.task_status import TaskStatus
from util.task_args import get_task_id, get_db_url, get_task_name, get_cmd_arg
try:
# Connect to SCDF's database.
status = TaskStatus(get_task_id(), get_db_url())
# Set task's status to RUNNING.
status.running()
# Do something.
print('Start task:{}, id:{}'.format(get_task_name(), get_task_id()))
print('Wait for 60 seconds ...')
sys.stdout.flush()
time.sleep(60)
if get_cmd_arg('error.message') is not None:
raise Exception(get_cmd_arg('error.message'))
# Set task's status to COMPLETED.
status.completed()
except Exception as exp:
# Set task's status to FAILED.
status.failed(1, 'Task failed: {}'.format(exp))
由于 Python 脚本不受 Spring Cloud Task
管理,因此用户有责任使用数据流数据库管理和更新进度。
为了解析输入参数并在数据流中管理其状态,自定义脚本使用以下实用程序
-
task_status.py 帮助访问和更新数据流
TASK_EXECUTION
表,以便反映任务的生命周期事件。TaskStatus
类采用task id
和sqlalchemy url
参数(从命令行参数计算得出),并提供用于将任务状态设置为running
、completed
或failed(with exitCode, errorMessage)
的 API。为了访问数据流数据库,task_status
使用以下启动参数,这些参数由数据流在每次任务启动时自动提供--spring.datasource.username=root --spring.datasource.password=yourpassword --spring.datasource.url=jdbc:mariadb://<mariadb-host>:<port>/mariadb --spring.cloud.task.executionid=26
spring.cloud.task.executionid
属性表示任务 ID,在数据流内部已知并保存在TASK_EXECUTION
表中。 - task_args.py 实用程序帮助提取默认(即 exec)入口点样式的任务参数。该实用程序还为可能使用 SCDF 配置的不同数据库构建 sqlalchemy URL(当前仅测试了 MariaDB)。查看 getdburl() 实现。
为了使 python_task.py
充当数据流任务,需要将其捆绑在 Docker 映像中并上传到 DockerHub
。以下 Dockerfile 说明了如何将 Python 脚本捆绑到 Docker 映像中
FROM python:3.7.3-slim
RUN apt-get update
RUN apt-get install build-essential -y
RUN apt-get install libmariadbclient-dev -y
RUN pip install mariadb
RUN pip install sqlalchemy
ADD python_task.py /
ADD util/* /util/
ENTRYPOINT ["python","/python_task.py"]
CMD []
它安装所需的依赖项并添加任务脚本(例如,ADD python_task.py
)和实用程序(在 util
文件夹下)。
将命令留空 ([]
) 并显式设置入口点。
构建
- 检出 示例项目 并导航到
polyglot-python-task
文件夹
git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-task/
- 构建 Docker 镜像并将其推送到 DockerHub
docker build -t springcloud/python-task-with-status:0.1 .
docker push springcloud/python-task-with-status:0.1
提示:将 springcloud
替换为您的 Docker Hub 前缀。
- 将 Docker 镜像注册为 Data Flow
task
应用程序
app register --type task --name python-task-with-status --uri docker://springcloud/python-task-with-status:0.1
部署
按照 安装说明 在 Kubernetes 上设置 Data Flow。
创建 Python 脚本并将其作为 Data Flow 任务启动
task create --name python-task --definition "python-task-with-status"
task launch --name python-task
使用 kubectl get all
和 kubectl logs -f po/python-task-XXXXXX
监控任务的输出。使用 Data Flow UI/task 或 shell (task list) 监控 python-task 的状态。
成功启动任务后,您应该会在 Data Flow 任务 UI 中看到以下报告
再次启动 python-task
,并使用 --error.message=MyTestError
启动参数(以模拟错误)
task launch --name python-task --arguments "--error.message=MyTestError"
第二次任务执行(#2)失败,如 Data Flow 任务 UI 所示
与组合任务一起使用
借助提供的任务状态管理,您可以在 组合任务 中使用 Docker 和 Python 任务。
以下示例定义了一个并行任务执行
task create compose2 --definition "<pp1: python-task-with-status || pp2: python-task-with-status>"
task launch --name compose2
前面的示例将并行启动两次 python-task,如下所示
相反,以下组合任务按顺序启动定义的任务
task create sequence1 --definition "t1: timestamp && python-task-with-status && t2: timestamp”
task launch --name sequence1