SFTP 到 JDBC 文件提取
本案例提供了构建 Data Flow 管道的分步说明,该管道用于从 SFTP 源提取文件并将内容保存到 JDBC 数据存储。该管道旨在在 SFTP 源检测到新文件时启动任务。在这种情况下,任务是一个 Spring Batch 作业,它处理文件,将每行的内容转换为大写,并将其插入到表中。
该 文件提取 批处理作业从 CSV 文本文件中读取,该文件中的行格式为 first_name,last_name
,并使用 JdbcBatchItemWriter
] 将每个条目写入数据库表,该写入器对每行执行 INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)
。
您可以从浏览器或命令行下载包含源代码和示例数据的项目。
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/recipes/file-ingest/file-to-jdbc/file-to-jdbc.zip?raw=true -O file-to-jdbc.zip
如果您选择不自己构建任务应用程序,可执行 jar 包将发布到Spring Maven 仓库和springcloud/ingest Docker 仓库。
该管道使用以下预打包的 Spring Cloud Stream 应用程序构建:
- sftp-dataflow-source 是一个 SFTP 源,配置为在一个或多个轮询的 SFTP 目录中检测到新文件时发出任务启动请求。
- dataflow-task-launcher-sink 是一个接收器,充当 Data Flow 服务器的 REST 客户端,用于启动 Data Flow 任务。
此管道可在所有受支持的 Data Flow 平台上运行。SFTP 源在发送任务启动请求之前,会将每个文件从 SFTP 服务器下载到本地目录。该请求将 localFilePath
设置为任务的命令行参数。在云平台上运行时,我们需要挂载一个 SFTP 源容器和任务容器都可以访问的共享目录。在本例中,我们设置了一个 NFS 挂载目录。配置 NFS 的环境和容器是特定于平台的,此处针对 Cloud Foundry v2.3+ 和 minikube 进行了描述。
先决条件
本节介绍启动批处理应用程序之前需要执行的设置和配置步骤。
Data Flow 安装
确保您已将 Spring Cloud Data Flow 安装到您选择的平台
**注意**:对于 Kubernetes,示例任务应用程序配置为使用 MariaDB。Data Flow 服务器也必须配置为使用 MariaDB。
使用 Data Flow
本示例假设您知道如何使用 Spring Cloud Data Flow 通过 Spring Cloud Data Flow 仪表板或 Spring Cloud Data Flow shell 注册和部署应用程序。如果您需要有关使用 Data Flow 的更多说明,请参阅使用 Spring Cloud Data Flow 进行流处理和使用 Spring Cloud Data Flow 注册和启动批处理应用程序。
SFTP 服务器
此示例需要访问 SFTP 服务器。对于在 本地
机器和 minikube
上运行,我们使用主机作为 SFTP 服务器。对于 Cloud Foundry
和一般的 Kubernetes
,需要一个外部 SFTP 服务器。在 SFTP 服务器上,创建一个 /remote-files
目录。这是我们放置文件以触发管道的地方。
NFS 配置
在本地运行时不需要 NFS。
Cloud Foundry NFS 配置
此功能由 Pivotal Cloud Foundry 通过 NFS 卷服务 提供
要运行此示例,我们需要
- 一个启用了 NFS 卷服务 的 Cloud Foundry 实例 (v2.3+)
- 一个可从 Cloud Foundry 实例访问的 NFS 服务器
- 一个配置正确的
nfs
服务实例
**注意:** 为简单起见,本示例假设 nfs
服务使用以下通用配置创建,并为所有绑定应用程序使用通用挂载点 (/var/scdf
)。您也可以在将 NFS 服务绑定到应用程序时通过使用 部署属性 来设置这些参数
cf create-service nfs Existing nfs -c '{"share":<nfs-host>/staging","uid":<uid>,"gid":<gid>, "mount":"/var/scdf"}'
Kubernetes NFS 配置
Kubernetes 提供了许多用于配置和共享持久卷的选项。对于此示例,我们使用 minikube
并使用主机作为 NFS 服务器。以下说明适用于 OS/X
,并且对于 Linux 主机也应该类似
确保 minikube 已启动。这些说明中的命令提供了对 minikube VM 的 NFS 访问权限。minikube IP 每次启动时都可能会更改,因此您应该在每次启动后执行这些步骤。
公开一个名为 /staging
的共享目录。
sudo mkdir /staging
sudo chmod 777 /staging
sudo echo "/staging -alldirs -mapall="$(id -u)":"$(id -g)" $(minikube ip)" >> /etc/exports
sudo nfsd restart
验证 NFS 挂载
showmount -e 127.0.0.1
Exports list on 127.0.0.1:
/staging 192.168.99.105
配置持久卷和持久卷申领资源。复制以下内容并将其保存到名为 nfs-config.yml
的文件中
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs-volume
spec:
capacity:
storage: 4Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: standard
nfs:
# The address 192.168.99.1 is the Minikube gateway to the host for VirtualBox. This way
# not the container IP will be visible by the NFS server on the host machine,
# but the IP address of the `minikube ip` command. You will need to
# grant access to the `minikube ip` IP address.
server: 192.168.99.1
path: '/staging'
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: nfs-volume-claim
namespace: default
spec:
storageClassName: standard
accessModes:
- ReadWriteMany
resources:
requests:
storage: 4Gi
创建资源
kubectl apply -f nfs-config.yml
部署
本节介绍如何部署到以下环境
- 本地
- Cloud Foundry
- Kubernetes
本地
对于本地部署,本示例使用 Kafka 作为消息代理。为远程和本地文件创建目录
mkdir -p /tmp/remote-files /tmp/local-files
注册应用程序
如果您下载并构建了示例项目,则可以使用 file://
URL 注册它。例如 file://<项目路径>/target/ingest-1.0.0-SNAPSHOT.jar
否则,您可以使用已发布的 Maven jar
app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
注册预打包的 sftp
源和 task-launcher
接收器应用程序
app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:3.2.1
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE
创建任务
要创建任务,请运行以下命令
task create fileIngestTask --definition fileIngest
创建并部署流
要创建并部署流,请运行以下命令
**注意**:替换 <user>
和 <pass>
。 username
和 password
是本地(或远程)用户的凭据。如果您不使用本地 SFTP 服务器,请通过设置 host
参数(以及可选的 port
参数)来指定主机。如果未定义,则 host
默认为 127.0.0.1
,port
默认为 22
。
stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --allow-unknown-keys=true --task.launch.request.taskName=fileIngestTask --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ | task-launcher" --deploy
dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource
。默认情况下,接收器每秒轮询其输入目标一次。如果没有任务启动请求,则轮询周期将继续加倍,最长可达 30 秒。如果存在任务启动请求,则触发器将重置为一秒。您可以通过在流定义中设置 task-launcher
接收器属性 trigger.period
和 trigger.max-period
来继续触发器参数。
验证流部署
我们可以使用 stream list
命令查看要部署的流的状态,如下例所示
dataflow:>stream list
╔═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════════════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════════════════════╣
║inboundSftp│sftp --password='******' --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ --task.launch.request.taskName=fileIngestTask│The stream has been ║
║ │--allow-unknown-keys=true --username=<user> | task-launcher │successfully deployed ║
╚═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════════════════════╝
检查应用程序日志
如果流部署失败,或者您想查看日志以了解任何原因,您可以使用 runtime apps
命令获取为 inboundSftp
流创建的应用程序的日志位置,如下所示
dataflow:>runtime apps
╔═══════════════════════════╤═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│ No. of Instances / Attributes ║
╠═══════════════════════════╪═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║inboundSftp.sftp │ deployed │ 1 ║
║ │ │ guid = 23057 ║
║ │ │ pid = 71927 ║
║ │ │ port = 23057 ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.sftp-0 │ deployed │ stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stderr_0.log ║
║ │ │ stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stdout_0.log ║
║ │ │ url = http://192.168.64.1:23057 ║
║ │ │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.task-launcher │ deployed │ 1 ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║ │ │ guid = 60081 ║
║ │ │ pid = 71926 ║
║ │ │ port = 60081 ║
║inboundSftp.task-launcher-0│ deployed │ stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stderr_0.log║
║ │ │ stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stdout_0.log║
║ │ │ url = http://192.168.64.1:60081 ║
║ │ │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher ║
╚═══════════════════════════╧═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
将文件复制到远程目录
通常,数据会上传到 SFTP 服务器。我们通过将文件复制到 --remote-dir
指定的目录来模拟此操作。您可以在 示例项目 的 data/
目录中找到示例数据。
将 data/name-list.csv
复制到 SFTP 源正在监控的 /tmp/remote-files
目录中。检测到此文件后,sftp
源会将其下载到 --local-dir
指定的 /tmp/local-files
目录中,并发出任务启动请求。任务启动请求包括要启动的任务的名称以及作为命令行参数给出的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameter
。FileIngestTask
作业处理由名为 localFilePath
的 JobParameter
给出的文件。由于最近没有任何请求,因此任务会在发布请求后 30 秒内启动(请参阅上面关于配置启动触发器的早期提示)。
cp data/name-list.csv /tmp/remote-files
批处理作业启动时,您会在 SCDF 控制台日志中看到类似以下内容
2018-10-26 16:47:24.879 INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher : Command to be executed: /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -jar <path-to>/batch/file-ingest/target/ingest-1.0.0.jar localFilePath=/tmp/local-files/name-list.csv --spring.cloud.task.executionid=1
2018-10-26 16:47:25.100 INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher : launching task fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/fileIngestTask3100511340216074735/1540586844871/fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
检查作业执行
数据接收完毕且批处理作业运行后,会记录为一次作业执行。我们可以查看作业执行情况,例如,在 Spring Cloud Data Flow shell 中发出以下命令
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Tue May 01 23:34:05 EDT 2018│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
我们还可以列出有关该特定作业执行的更多详细信息
dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤══════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════╪══════════════════════════════╣
║Job Execution Id │1 ║
║Task Execution Id │1 ║
║Task Instance Id │1 ║
║Job Name │ingestJob ║
║Create Time │Fri Oct 26 16:57:51 EDT 2018 ║
║Start Time │Fri Oct 26 16:57:51 EDT 2018 ║
║End Time │Fri Oct 26 16:57:53 EDT 2018 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │1 ║
║run.id(LONG) │1 ║
║localFilePath(STRING) │/tmp/local-files/name-list.csv║
╚═══════════════════════════════════════╧══════════════════════════════╝
验证数据
当批处理作业运行时,它会处理本地目录 (/tmp/local-files
) 中的文件,将每个项目的名称转换为大写,并将其插入数据库。
您可以使用任何支持 H2 数据库的数据库工具来检查数据。在本例中,我们使用 DBeaver 数据库工具。我们可以检查表以确保我们的数据已正确处理。
在 DBeaver 中,使用 JDBC URL jdbc:h2:tcp://:19092/mem:dataflow
和用户 sa
(无密码)创建与数据库的连接。连接后,展开 PUBLIC
模式,展开 表
,然后双击 PEOPLE
表。加载表数据后,单击“数据”选项卡以查看数据。
Cloud Foundry
本节介绍如何在 Cloud Foundry 上设置 Spring Batch 和 Spring Cloud Data Flow,然后创建示例批处理过程。
先决条件
在 Cloud Foundry 上运行此示例需要满足以下条件
- 按照“Cloud Foundry NFS 配置”部分中的说明配置 NFS 服务器并创建
nfs
服务以访问它。 - 具有
/remote-files
目录的外部 SFTP 服务器。 - 一个
mysql
服务实例 - 一个
rabbit
服务实例 - PivotalMySQLWeb 或其他数据库工具来查看数据
注册应用程序
要注册应用程序,请运行以下命令
app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
然后注册预打包的 sftp
源应用程序和 task-launcher
sink 应用程序
app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:3.2.1
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE
创建任务
要创建任务,请运行以下命令
task create fileIngestTask --definition fileIngest
创建流
sftp
源配置为发布任务启动请求,该请求将启动 fileIngestTask
任务。启动请求使用部署属性 task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs
将 nfs
服务绑定到任务容器。
**注意**:请在以下流定义中替换 <user>
、<pass>
、<host>
和 <data-flow-server-uri>
。
stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --host=<host> --allow-unknown-keys=true --remote-dir=/remote-files/ --local-dir=/var/scdf/shared-files/ --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs | task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri>"
dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource
。默认情况下,sink 每 1 秒轮询一次其输入目标。如果没有任务启动请求,则轮询周期继续加倍,最长为 30 秒。如果存在任务启动请求,则触发器重置为 1 秒。您可以通过在流定义中设置 task-launcher
sink 属性 trigger.period
和 trigger.max-period
来配置触发器参数。
部署流
部署流时,我们还必须通过运行以下命令来配置 sftp
pod
stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs"
验证流部署
我们可以使用 stream list
命令查看要部署的流的状态,如下例所示
dataflow:>stream list
╔═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═══════════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═══════════════════╣
║inboundSftp│sftp --task.launch.request.deployment-properties='deployer.*.cloudfoundry.services=nfs' --sftp.factory.password='******' --sftp.local-dir=/var/scdf/shared-files/ │The stream has been║
║ │--sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files/ --sftp.factory.host=<host> --task.launch.request.taskName=fileIngestTask | │successfully ║
║ │task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri │deployed ║
╚═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═══════════════════╝
检查应用程序日志
使用 Cloud Foundry CLI 列出应用程序。source
和 sink
应用程序应处于已启动状态。
cf apps
Getting apps in org myorg / space myspace as someuser...
OK
name requested state instances memory disk urls
...
Ky7Uk6q-inboundSftp-sftp-v1 started 1/1 2G 1G Ky7Uk6q-inboundSftp-sftp-v1.apps.hayward.cf-app.com
Ky7Uk6q-inboundSftp-task-launcher-v1 started 1/1 2G 1G Ky7Uk6q-inboundSftp-task-launcher-v1.apps.hayward.cf-app.com
...
sftp
源的日志文件有助于调试 SFTP 连接失败等问题并验证 SFTP 下载。要查看日志,请运行以下命令
cf logs Ky7Uk6q-inboundSftp-sftp-v1 --recent
task-launcher
应用程序的日志也有助于调试数据流连接问题并验证任务启动请求
cf logs Ky7Uk6q-inboundSftp-task-launcher-v1 --recent
将文件复制到远程目录
您可以在 示例项目 的 data/
目录中找到示例数据。
连接到 SFTP 服务器并将 data/name-list.csv
上传到 remote-files
目录。
检测到此文件后,sftp
源会将其下载到 --local-dir
指定的 /var/scdf/shared-files
目录。我们使用为 nfs
服务配置的 /var/scdf
共享挂载路径。下载文件后,源会发出任务启动请求。任务启动请求包括要启动的任务的名称以及作为命令行参数给出的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameter
。FileIngestTask
作业处理由名为 localFilePath
的 JobParameter
给出的文件。由于最近没有任何请求,因此任务会在发布请求后 30 秒内启动(请参阅前面有关配置启动触发器的提示)。
检查作业执行
要检查作业检查,请运行以下命令(显示其输出)
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Tue Jun 11 15:56:27 EDT 2019│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
我们还可以列出有关该特定作业执行的更多详细信息
dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤════════════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════╪════════════════════════════════════╣
║Job Execution Id │6 ║
║Task Execution Id │6 ║
║Task Instance Id │6 ║
║Job Name │ingestJob ║
║Create Time │Thu Jun 13 17:06:28 EDT 2019 ║
║Start Time │Thu Jun 13 17:06:29 EDT 2019 ║
║End Time │Thu Jun 13 17:06:57 EDT 2019 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │1 ║
║run.id(LONG) │1 ║
║localFilePath(STRING) │/var/scdf/shared-files/name-list.csv║
╚═══════════════════════════════════════╧════════════════════════════════════╝
验证数据
当批处理作业运行时,它会处理本地目录 (/var/scdf/shared-files
) 中的文件,将每个项目的名称转换为大写,然后将其插入数据库。
使用 PivotalMySQLWeb 检查数据。
Kubernetes
本节介绍如何在 Kubernetes 上设置 Spring Batch 和 Spring Cloud Data Flow,然后创建示例批处理过程。
先决条件
本示例假设 Data Flow 安装在使用 Kafka 和 MySQL 的 Minikube 上。我们建议使用 Helm 图表。要开始,请运行以下命令
helm install --name my-release --set kafka.enabled=true,rabbitmq.enabled=false,server.service.type=NodePort stable/spring-cloud-data-flow
要在 Kubernetes 上运行此示例,需要配置 NFS 服务器并创建相应的 persistent volume
和 persistent volume claim
资源,如 Kubernetes NFS 配置 部分所述。我们还需要一个具有 /remote-files
目录的外部 SFTP 服务器。
注册应用程序
如果您下载了 示例项目,则可以构建 Docker 映像并将其发布到 Minikube 注册表
eval $(minikube docker-env)
./mvnw clean package docker:build -Pkubernetes
否则,您可以跳过此步骤以从 Dockerhub 拉取映像
app register --name fileIngest --type task --uri docker://springcloud/ingest
注册预打包的 sftp
源和 task-launcher
接收器应用程序
app register --name sftp --type source --uri docker://springcloudstream/sftp-dataflow-source-kafka:3.2.1 --metadata-uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:jar:metadata:3.2.1
app register --name task-launcher --type sink --uri docker://springcloudstream/task-launcher-dataflow-sink-kafka:1.0.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:jar:metadata:1.0.1.RELEASE
创建任务
要创建任务,请运行以下命令
task create fileIngestTask --definition fileIngest
创建流
sftp
源配置为发布任务启动请求以启动 fileIngestTask
任务。启动请求使用 deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}]
和 deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]
部署属性将 NFS 共享挂载到任务 pod。
注意:在以下流定义中替换 <user>
、<pass>
和 <data-flow-server-uri>
。这里的 <host>
值是 VirtualBox 的默认 Minikube 网关。
要获取 <data-flow-server-uri>
,请找到服务的名称并使用 minikube service
命令
kubectl get svc
...
my-release-data-flow-server NodePort 10.97.74.123 <none> 80:30826/TCP
minikube service my-release-data-flow-server --url
http://192.168.99.105:30826
stream create inboundSftp --definition "sftp --host=192.168.99.1 --username=<user> --password=<pass> --allow-unknown-keys=true --remote-dir=/remote-files --local-dir=/staging/shared-files --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}] | task-launcher --spring.cloud.dataflow.client.server-uri=<dataflow-uri>"
dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource
。默认情况下,接收器每秒轮询一次其输入目标。如果没有任务启动请求,则轮询周期继续加倍,最长为 30 秒。如果存在任务启动请求,则触发器重置为一秒。您可以通过在流定义中设置 task-launcher
接收器属性 trigger.period
和 trigger.max-period
来配置触发器参数。
部署流
部署流时,我们还必须为 sftp
源配置卷挂载。
stream deploy inboundSftp --properties "deployer.sftp.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.sftp.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"
验证流部署
我们可以使用 stream list
命令查看要部署的流的状态,如下例所示
dataflow:>stream list
╔═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════╣
║inboundSftp│sftp │The stream ║
║ │--task.launch.request.deployment-properties="deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"│has been ║
║ │--sftp.factory.password='******' --sftp.local-dir=/staging/shared-files --sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files --sftp.factory.host=192.168.99.1 │successfully║
║ │--task.launch.request.taskName=fileIngestTask | task-launcher --spring.cloud.dataflow.client.server-uri=http://192.168.99.105:30826 │deployed ║
╚═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════╝
检查应用程序日志
使用 kubectl
列出应用程序。source
和 sink
应用程序应处于已启动状态。
kubectl get pods
NAME READY STATUS RESTARTS AGE
...
inboundsftp-sftp-v12-6d55d469bd-t8znd 1/1 Running 0 6m24s
inboundsftp-task-launcher-v12-555d4785c5-zjr6b 1/1 Running 0 6m24s
...
sftp
源的日志文件有助于调试 SFTP 连接失败等问题并验证 SFTP 下载。
kubectl logs inboundsftp-sftp-v12-6d55d469bd-t8znd
task-launcher
应用程序的日志有助于调试数据流连接问题并验证任务启动请求。
kubectl logs inboundsftp-task-launcher-v12-555d4785c5-zjr6b
将文件复制到远程目录
您可以在 示例项目 的 data/
目录中找到示例数据。
连接到 SFTP 服务器并将 data/name-list.csv
上传到 remote-files
目录。
当检测到此文件时,sftp
源会将其下载到 --local-dir
指定的 /var/scdf/shared-files
目录。我们使用为 nfs
服务配置的 /var/scdf
共享挂载路径。文件下载完成后,源会发出任务启动请求。任务启动请求包含要启动的任务的名称,以及作为命令行参数提供的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameter
。FileIngestTask
作业处理由名为 localFilePath
的 JobParameter
提供的文件。由于最近没有任何请求,因此任务会在请求发布后的 30 秒内启动(请参阅前面有关配置启动触发器的提示)。
检查作业执行
要检查作业执行情况,请运行以下命令(显示其输出)
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Thu Jun 13 08:39:59 EDT 2019│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
我们还可以列出有关该特定作业执行的更多详细信息
dataflow:>job execution display --id 1
╔═══════════════════════════════════════════╤═══════════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════════╪═══════════════════════════════════╣
║Job Execution Id │1 ║
║Task Execution Id │424 ║
║Task Instance Id │1 ║
║Job Name │ingestJob ║
║Create Time │Thu Jun 13 08:39:59 EDT 2019 ║
║Start Time │Thu Jun 13 08:39:59 EDT 2019 ║
║End Time │Thu Jun 13 08:40:07 EDT 2019 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │424 ║
║run.id(LONG) │1 ║
║-spring.datasource.username(STRING) │****** ║
║-spring.cloud.task.name(STRING) │fileIngestTask ║
║-spring.datasource.password(STRING) │****** ║
║-spring.datasource.driverClassName(STRING) │org.mariadb.jdbc.Driver ║
║localFilePath(STRING) │/staging/shared-files/name-list.csv║
║-spring.datasource.url(STRING) │****** ║
╚═══════════════════════════════════════════╧═══════════════════════════════════╝
验证数据
当批处理作业运行时,它会处理本地目录(/staging/shared-files
)中的文件,将每个项目的名称转换为大写,并将其插入数据库。
在 mysql
容器中打开一个 shell 以查询 people
表
kubectl get pods
...
my-release-mysql-56f988dd6c-qlm8q 1/1 Running
...
kubectl exec -it my-release-mysql-56f988dd6c-qlm8q -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from dataflow.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
| 1 | AARON | AABERG |
| 2 | AARON | AABY |
| 3 | ABBEY | AADLAND |
| 4 | ABBIE | AAGAARD |
| 5 | ABBY | AAKRE |
| 6 | ABDUL | AALAND |
| 7 | ABE | AALBERS |
| 8 | ABEL | AALDERINK |
| 9 | ABIGAIL | AALUND |
| 10 | ABRAHAM | AAMODT |
| ... |
+-----------+------------+-----------+
限制并发任务执行
此方法使用包含 5000 多个项目的单个文件进行处理。如果我们将 100 个文件复制到远程目录中会发生什么情况?sftp
源会立即处理它们,生成 100 个任务启动请求。Dataflow 服务器异步启动任务,因此这可能会使运行时平台的资源不堪重负。例如,在本地计算机上运行 Data Flow 服务器时,每个启动的任务都会创建一个新的 JVM。在 Cloud Foundry 中,每个任务都会创建一个新的容器实例,而在 Kubernetes 中,则会创建一个 Pod。
幸运的是,Spring Cloud Data Flow 提供了配置设置来限制并发运行的任务数量。
我们可以使用此示例来了解其工作原理。
降低最大并发任务执行数
示例项目在 data/spilt
目录中包含 20 个文件。为了观察限制的实际效果,我们可以将最大并发任务数设置为 3。
要在本地服务器上运行任务,请重新启动服务器并添加以下命令行参数:spring.cloud.dataflow.task.platform.local.accounts[default].maximum-concurrent-tasks=3
。
如果您的进程在 Cloud Foundry 上运行,请运行以下命令
cf set-env <dataflow-server> SPRING_CLOUD_DATAFLOW_TASK_PLATFORM_CLOUDFOUNDRY_ACCOUNTS[DEFAULT]_DEPLOYMENT_MAXIMUMCONCURRENTTASKS 3
如果您的进程在 Kubernetes 上运行,请通过运行以下命令编辑 Data Flow 服务器的 configmap
kubectl edit configmap my-release-data-flow-server
添加 maximum-concurrent-tasks
属性,如下所示
apiVersion: v1
data:
application.yaml: |-
spring:
cloud:
dataflow:
task:
platform:
kubernetes:
accounts:
default:
maximum-concurrent-tasks: 3
limits:
memory: 1024Mi
cpu: 500m
编辑 configmap
后,删除 Data Flow 服务器 pod 以强制其重新启动。然后等待其重新启动。
验证是否强制执行最大并发任务执行数
任务启动器接收器轮询输入目标。轮询周期根据任务启动请求的存在以及通过 Data Flow 服务器的 tasks/executions/current
REST 端点报告的当前正在运行的任务数量进行调整。如果任务平台的并发任务数达到其限制,则接收器会查询此端点并暂停轮询输入以获取新请求。这会在创建任务启动请求和执行请求之间引入 1-30 秒的延迟,从而牺牲了一些性能以换取弹性。任务启动请求永远不会发送到死信队列,因为服务器繁忙或不可用。当没有任务启动请求时,指数退避还可以防止应用程序过度查询服务器。
监控任务执行
跟踪 task-launcher
容器日志。
您还可以监控 Data Flow 服务器以了解当前的任务执行情况
watch curl <dataflow-server-url>/tasks/executions/current
Every 2.0s: curl http://192.168.99.105:30826/tasks/executions/current
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 92 0 92 0 0 1202 0 --:--:-- --:--:-- --:--:
-- 1210
[{"name":"default","type":"Kubernetes","maximumTaskExecutions":3,"runningExecutionCount":0}]
使用多个文件运行示例
部署示例流后,将 data/spilt
中的 20 个文件上传到 /remote-files
文件。在 task-launcher
日志中,您应该会看到指数退避正在工作
2019-06-14 15:00:48.247 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling period reset to 1000 ms.
2019-06-14 15:00:49.265 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:50.433 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:51.686 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:52.929 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:52.929 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:00:55.008 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:55.008 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:00:59.039 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:59.040 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:07.104 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:07.104 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 16 seconds.
2019-06-14 15:01:23.127 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling resumed
2019-06-14 15:01:23.128 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:23.232 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling period reset to 1000 ms.
2019-06-14 15:01:24.277 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:25.483 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:26.743 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:28.035 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:29.324 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:29.325 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:01:31.435 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:31.436 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:01:35.531 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:35.532 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:43.615 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:43.615 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 16 seconds.
避免重复处理
sftp
源不会处理它已经见过的文件。它使用 元数据存储 通过在运行时从消息中提取内容来跟踪文件。默认情况下,它使用内存元数据存储,但它可以插入到持久存储,这对于生产部署非常有用。因此,如果我们重新部署流或重新启动 sftp
源,则此状态将丢失并重新处理文件。
借助 Spring 的魔力,我们可以自动配置一个可用的持久元数据存储来防止重复处理。
在本例中,我们将 自动配置 JDBC 元数据存储,因为我们已经在使用 JDBC 数据库。
配置并构建 SFTP 源
为此,我们向 sftp-dataflow
源添加了一些 JDBC 依赖项。
克隆 [sftp]https://github.com/spring-cloud-stream-app-starters/sftp 流应用启动器。将目录更改为 SFTP 目录。在以下命令中,将 <binder>
替换为 kafka
或 rabbit
(根据您的配置选择),然后运行该命令
./mvnw clean install -DskipTests -PgenerateApps
cd apps/sftp-dataflow-source-<binder>
将以下依赖项添加到 pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
如果您在 Kubernetes 上运行,请使用 mariadb 驱动程序而不是 H2
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.3.0</version>
</dependency>
如果您在使用内存中 H2 数据库的本地服务器上运行,请在 src/main/resources/application.properties
中设置 JDBC URL 以使用 Data Flow 服务器的数据库
spring.datasource.url=jdbc:h2:tcp://:19092/mem:dataflow
如果您在 Kubernetes 上运行,请将数据源设置为使用 mysql
服务的内部 IP,如下例所示
spring.datasource.url=jdbc:mysql://10.98.214.235:3306/dataflow
如果您在 Cloud Foundry 或 Kubernetes 中运行,请将以下属性添加到 src/main/resources/application.properties
spring.integration.jdbc.initialize-schema=always
构建 sftp
源并将其注册到 Data Flow。
运行示例应用程序
按照说明在您首选的平台上运行示例,直到 复制文件...
步骤。
如果您已经完成了主要练习,请将数据恢复到初始状态并重新部署流
- 清理本地和远程数据目录。
- 在数据库中运行
DROP TABLE PEOPLE;
SQL 命令。 - 取消部署流并再次部署它以运行更新后的
sftp
源。
如果您在 Cloud Foundry 中运行,请设置部署属性以将 sftp
绑定到 mysql
服务,如下例所示
dataflow>stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs,mysql"
将文件复制到远程目录
我们使用一个小文件来完成此操作。示例项目中的 data/split
目录包含 data/name-list.csv
的内容,这些内容被拆分到 20 个文件中。上传 names_aa.csv
检查数据库
使用前面介绍的数据库工具查看 INT_METADATA_STORE
表的内容。下图显示了结果
请注意,这里有一个键值对,其中键标识文件名(sftpSource/
前缀为 sftp
源应用程序提供了一个命名空间)。值是一个时间戳,指示何时接收到消息。元数据存储跟踪已处理的文件。这可以防止在每个轮询周期中从远程目录中拉取相同的文件。只有新文件或已更新的文件才会被处理。
由于 PEOPLE
表上没有唯一性约束,因此我们的批处理作业多次处理一个文件会导致表行重复。由于我们配置了一个持久化的元数据存储,因此可以防止在容器重启时出现重复处理。您可以通过取消部署和重新部署流或重新启动 sftp
源来验证这一点。
如果我们查看 PEOPLE
表,它应该类似于这样
现在我们可以将相同的文件上传到 SFTP 服务器。如果您已登录到该服务器,则可以更新时间戳,如下所示
touch /remote-files/names_aa.csv
现在该文件被重新处理,并且 PEOPLE
表包含重复数据。如果您 ORDER BY FIRST_NAME
,您将看到如下内容
如果我们将另一个文件放到远程目录中,该文件将被处理,并且我们会在元数据存储中看到另一个条目。