SFTP 到 JDBC 文件提取

本案例提供了构建 Data Flow 管道的分步说明,该管道用于从 SFTP 源提取文件并将内容保存到 JDBC 数据存储。该管道旨在在 SFTP 源检测到新文件时启动任务。在这种情况下,任务是一个 Spring Batch 作业,它处理文件,将每行的内容转换为大写,并将其插入到表中。

文件提取 批处理作业从 CSV 文本文件中读取,该文件中的行格式为 first_name,last_name,并使用 JdbcBatchItemWriter] 将每个条目写入数据库表,该写入器对每行执行 INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)

您可以从浏览器或命令行下载包含源代码和示例数据的项目。

wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/recipes/file-ingest/file-to-jdbc/file-to-jdbc.zip?raw=true -O file-to-jdbc.zip

如果您选择不自己构建任务应用程序,可执行 jar 包将发布到Spring Maven 仓库springcloud/ingest Docker 仓库。

该管道使用以下预打包的 Spring Cloud Stream 应用程序构建:

  • sftp-dataflow-source 是一个 SFTP 源,配置为在一个或多个轮询的 SFTP 目录中检测到新文件时发出任务启动请求。
  • dataflow-task-launcher-sink 是一个接收器,充当 Data Flow 服务器的 REST 客户端,用于启动 Data Flow 任务。

此管道可在所有受支持的 Data Flow 平台上运行。SFTP 源在发送任务启动请求之前,会将每个文件从 SFTP 服务器下载到本地目录。该请求将 localFilePath 设置为任务的命令行参数。在云平台上运行时,我们需要挂载一个 SFTP 源容器和任务容器都可以访问的共享目录。在本例中,我们设置了一个 NFS 挂载目录。配置 NFS 的环境和容器是特定于平台的,此处针对 Cloud Foundry v2.3+ 和 minikube 进行了描述。

先决条件

本节介绍启动批处理应用程序之前需要执行的设置和配置步骤。

Data Flow 安装

确保您已将 Spring Cloud Data Flow 安装到您选择的平台

**注意**:对于 Kubernetes,示例任务应用程序配置为使用 MariaDB。Data Flow 服务器也必须配置为使用 MariaDB。

使用 Data Flow

本示例假设您知道如何使用 Spring Cloud Data Flow 通过 Spring Cloud Data Flow 仪表板或 Spring Cloud Data Flow shell 注册和部署应用程序。如果您需要有关使用 Data Flow 的更多说明,请参阅使用 Spring Cloud Data Flow 进行流处理使用 Spring Cloud Data Flow 注册和启动批处理应用程序

SFTP 服务器

此示例需要访问 SFTP 服务器。对于在 本地 机器和 minikube 上运行,我们使用主机作为 SFTP 服务器。对于 Cloud Foundry 和一般的 Kubernetes,需要一个外部 SFTP 服务器。在 SFTP 服务器上,创建一个 /remote-files 目录。这是我们放置文件以触发管道的地方。

NFS 配置

在本地运行时不需要 NFS。

Cloud Foundry NFS 配置

此功能由 Pivotal Cloud Foundry 通过 NFS 卷服务 提供

要运行此示例,我们需要

  • 一个启用了 NFS 卷服务 Cloud Foundry 实例 (v2.3+)
  • 一个可从 Cloud Foundry 实例访问的 NFS 服务器
  • 一个配置正确的 nfs 服务实例

**注意:** 为简单起见,本示例假设 nfs 服务使用以下通用配置创建,并为所有绑定应用程序使用通用挂载点 (/var/scdf)。您也可以在将 NFS 服务绑定到应用程序时通过使用 部署属性 来设置这些参数

cf create-service nfs Existing nfs -c '{"share":<nfs-host>/staging","uid":<uid>,"gid":<gid>, "mount":"/var/scdf"}'

Kubernetes NFS 配置

Kubernetes 提供了许多用于配置和共享持久卷的选项。对于此示例,我们使用 minikube 并使用主机作为 NFS 服务器。以下说明适用于 OS/X,并且对于 Linux 主机也应该类似

确保 minikube 已启动。这些说明中的命令提供了对 minikube VM 的 NFS 访问权限。minikube IP 每次启动时都可能会更改,因此您应该在每次启动后执行这些步骤。

公开一个名为 /staging 的共享目录。

sudo mkdir /staging
sudo chmod 777 /staging
sudo echo "/staging -alldirs -mapall="$(id -u)":"$(id -g)" $(minikube ip)" >> /etc/exports
sudo nfsd restart

验证 NFS 挂载

showmount -e 127.0.0.1
Exports list on 127.0.0.1:
/staging 192.168.99.105

配置持久卷和持久卷申领资源。复制以下内容并将其保存到名为 nfs-config.yml 的文件中

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-volume
spec:
  capacity:
    storage: 4Gi
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: standard
  nfs:
    # The address 192.168.99.1 is the Minikube gateway to the host for VirtualBox. This way
    # not the container IP will be visible by the NFS server on the host machine,
    # but the IP address of the `minikube ip` command. You will need to
    # grant access to the `minikube ip` IP address.
    server: 192.168.99.1
    path: '/staging'

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: nfs-volume-claim
  namespace: default
spec:
  storageClassName: standard
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 4Gi

创建资源

kubectl apply -f nfs-config.yml

部署

本节介绍如何部署到以下环境

  • 本地
  • Cloud Foundry
  • Kubernetes

本地

对于本地部署,本示例使用 Kafka 作为消息代理。为远程和本地文件创建目录

mkdir -p /tmp/remote-files /tmp/local-files

注册应用程序

如果您下载并构建了示例项目,则可以使用 file:// URL 注册它。例如 file://<项目路径>/target/ingest-1.0.0-SNAPSHOT.jar 否则,您可以使用已发布的 Maven jar

app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

注册预打包的 sftp 源和 task-launcher 接收器应用程序

app register --name sftp --type source  --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:3.2.1
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE

创建任务

要创建任务,请运行以下命令

task create fileIngestTask --definition fileIngest

创建并部署流

要创建并部署流,请运行以下命令

**注意**:替换 <user><pass>usernamepassword 是本地(或远程)用户的凭据。如果您不使用本地 SFTP 服务器,请通过设置 host 参数(以及可选的 port 参数)来指定主机。如果未定义,则 host 默认为 127.0.0.1port 默认为 22

stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --allow-unknown-keys=true --task.launch.request.taskName=fileIngestTask --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ | task-launcher" --deploy

dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource。默认情况下,接收器每秒轮询其输入目标一次。如果没有任务启动请求,则轮询周期将继续加倍,最长可达 30 秒。如果存在任务启动请求,则触发器将重置为一秒。您可以通过在流定义中设置 task-launcher 接收器属性 trigger.periodtrigger.max-period 来继续触发器参数。

验证流部署

我们可以使用 stream list 命令查看要部署的流的状态,如下例所示

dataflow:>stream list
╔═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════════════════════╗
║Stream Name│                                                         Stream Definition                                                          │           Status           ║
╠═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════════════════════╣
║inboundSftp│sftp --password='******' --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ --task.launch.request.taskName=fileIngestTask│The stream has been         ║
║           │--allow-unknown-keys=true --username=<user> | task-launcher                                                                         │successfully deployed       ║
╚═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════════════════════╝

检查应用程序日志

如果流部署失败,或者您想查看日志以了解任何原因,您可以使用 runtime apps 命令获取为 inboundSftp 流创建的应用程序的日志位置,如下所示

dataflow:>runtime apps
╔═══════════════════════════╤═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║   App Id / Instance Id    │Unit Status│                                                                     No. of Instances / Attributes                                                                      ║
╠═══════════════════════════╪═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║inboundSftp.sftp           │ deployed  │                                                                                   1                                                                                    ║
║                           │           │       guid = 23057                                                                                                                                                     ║
║                           │           │        pid = 71927                                                                                                                                                     ║
║                           │           │       port = 23057                                                                                                                                                     ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.sftp-0         │ deployed  │     stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stderr_0.log         ║
║                           │           │     stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stdout_0.log         ║
║                           │           │        url = http://192.168.64.1:23057                                                                                                                                 ║
║                           │           │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp                      ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.task-launcher  │ deployed  │                                                                                   1                                                                                    ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║                           │           │       guid = 60081                                                                                                                                                     ║
║                           │           │        pid = 71926                                                                                                                                                     ║
║                           │           │       port = 60081                                                                                                                                                     ║
║inboundSftp.task-launcher-0│ deployed  │     stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stderr_0.log║
║                           │           │     stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stdout_0.log║
║                           │           │        url = http://192.168.64.1:60081                                                                                                                                 ║
║                           │           │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher             ║
╚═══════════════════════════╧═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

将文件复制到远程目录

通常,数据会上传到 SFTP 服务器。我们通过将文件复制到 --remote-dir 指定的目录来模拟此操作。您可以在 示例项目data/ 目录中找到示例数据。

data/name-list.csv 复制到 SFTP 源正在监控的 /tmp/remote-files 目录中。检测到此文件后,sftp 源会将其下载到 --local-dir 指定的 /tmp/local-files 目录中,并发出任务启动请求。任务启动请求包括要启动的任务的名称以及作为命令行参数给出的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameterFileIngestTask 作业处理由名为 localFilePathJobParameter 给出的文件。由于最近没有任何请求,因此任务会在发布请求后 30 秒内启动(请参阅上面关于配置启动触发器的早期提示)。

cp data/name-list.csv /tmp/remote-files

批处理作业启动时,您会在 SCDF 控制台日志中看到类似以下内容

2018-10-26 16:47:24.879  INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher      : Command to be executed: /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -jar <path-to>/batch/file-ingest/target/ingest-1.0.0.jar localFilePath=/tmp/local-files/name-list.csv --spring.cloud.task.executionid=1
2018-10-26 16:47:25.100  INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher      : launching task fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
   Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/fileIngestTask3100511340216074735/1540586844871/fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de

检查作业执行

数据接收完毕且批处理作业运行后,会记录为一次作业执行。我们可以查看作业执行情况,例如,在 Spring Cloud Data Flow shell 中发出以下命令

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Tue May 01 23:34:05 EDT 2018│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

我们还可以列出有关该特定作业执行的更多详细信息

dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤══════════════════════════════╗
║                  Key                  │            Value             ║
╠═══════════════════════════════════════╪══════════════════════════════╣
║Job Execution Id                       │1                             ║
║Task Execution Id                      │1                             ║
║Task Instance Id                       │1                             ║
║Job Name                               │ingestJob                     ║
║Create Time                            │Fri Oct 26 16:57:51 EDT 2018  ║
║Start Time                             │Fri Oct 26 16:57:51 EDT 2018  ║
║End Time                               │Fri Oct 26 16:57:53 EDT 2018  ║
║Running                                │false                         ║
║Stopping                               │false                         ║
║Step Execution Count                   │1                             ║
║Execution Status                       │COMPLETED                     ║
║Exit Status                            │COMPLETED                     ║
║Exit Message                           │                              ║
║Definition Status                      │Created                       ║
║Job Parameters                         │                              ║
║-spring.cloud.task.executionid(STRING) │1                             ║
║run.id(LONG)                           │1                             ║
║localFilePath(STRING)                  │/tmp/local-files/name-list.csv║
╚═══════════════════════════════════════╧══════════════════════════════╝

验证数据

当批处理作业运行时,它会处理本地目录 (/tmp/local-files) 中的文件,将每个项目的名称转换为大写,并将其插入数据库。

您可以使用任何支持 H2 数据库的数据库工具来检查数据。在本例中,我们使用 DBeaver 数据库工具。我们可以检查表以确保我们的数据已正确处理。

在 DBeaver 中,使用 JDBC URL jdbc:h2:tcp://:19092/mem:dataflow 和用户 sa(无密码)创建与数据库的连接。连接后,展开 PUBLIC 模式,展开 ,然后双击 PEOPLE 表。加载表数据后,单击“数据”选项卡以查看数据。

Cloud Foundry

本节介绍如何在 Cloud Foundry 上设置 Spring Batch 和 Spring Cloud Data Flow,然后创建示例批处理过程。

先决条件

在 Cloud Foundry 上运行此示例需要满足以下条件

  • 按照“Cloud Foundry NFS 配置”部分中的说明配置 NFS 服务器并创建 nfs 服务以访问它。
  • 具有 /remote-files 目录的外部 SFTP 服务器。
  • 一个 mysql 服务实例
  • 一个 rabbit 服务实例
  • PivotalMySQLWeb 或其他数据库工具来查看数据

注册应用程序

要注册应用程序,请运行以下命令

app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

然后注册预打包的 sftp 源应用程序和 task-launcher sink 应用程序

app register --name sftp --type source  --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:3.2.1
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE

创建任务

要创建任务,请运行以下命令

task create fileIngestTask --definition fileIngest

创建流

sftp 源配置为发布任务启动请求,该请求将启动 fileIngestTask 任务。启动请求使用部署属性 task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfsnfs 服务绑定到任务容器。

**注意**:请在以下流定义中替换 <user><pass><host><data-flow-server-uri>

stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --host=<host>  --allow-unknown-keys=true --remote-dir=/remote-files/ --local-dir=/var/scdf/shared-files/ --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs | task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri>"

dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource。默认情况下,sink 每 1 秒轮询一次其输入目标。如果没有任务启动请求,则轮询周期继续加倍,最长为 30 秒。如果存在任务启动请求,则触发器重置为 1 秒。您可以通过在流定义中设置 task-launcher sink 属性 trigger.periodtrigger.max-period 来配置触发器参数。

部署流

部署流时,我们还必须通过运行以下命令来配置 sftp pod

stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs"

验证流部署

我们可以使用 stream list 命令查看要部署的流的状态,如下例所示

dataflow:>stream list
╔═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═══════════════════╗
║Stream Name│                                                                                     Stream Definition                                                                                     │      Status       ║
╠═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═══════════════════╣
║inboundSftp│sftp --task.launch.request.deployment-properties='deployer.*.cloudfoundry.services=nfs' --sftp.factory.password='******' --sftp.local-dir=/var/scdf/shared-files/                          │The stream has been║
║           │--sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files/ --sftp.factory.host=<host> --task.launch.request.taskName=fileIngestTask |        │successfully       ║
║           │task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri                                                                                                              │deployed           ║
╚═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═══════════════════╝

检查应用程序日志

使用 Cloud Foundry CLI 列出应用程序。sourcesink 应用程序应处于已启动状态。

cf apps
Getting apps in org myorg / space myspace as someuser...
OK

name                                   requested state   instances   memory   disk   urls
...
Ky7Uk6q-inboundSftp-sftp-v1            started           1/1         2G       1G     Ky7Uk6q-inboundSftp-sftp-v1.apps.hayward.cf-app.com
Ky7Uk6q-inboundSftp-task-launcher-v1   started           1/1         2G       1G     Ky7Uk6q-inboundSftp-task-launcher-v1.apps.hayward.cf-app.com
...

sftp 源的日志文件有助于调试 SFTP 连接失败等问题并验证 SFTP 下载。要查看日志,请运行以下命令

cf logs Ky7Uk6q-inboundSftp-sftp-v1 --recent

task-launcher 应用程序的日志也有助于调试数据流连接问题并验证任务启动请求

cf logs Ky7Uk6q-inboundSftp-task-launcher-v1 --recent

将文件复制到远程目录

您可以在 示例项目data/ 目录中找到示例数据。

连接到 SFTP 服务器并将 data/name-list.csv 上传到 remote-files 目录。

检测到此文件后,sftp 源会将其下载到 --local-dir 指定的 /var/scdf/shared-files 目录。我们使用为 nfs 服务配置的 /var/scdf 共享挂载路径。下载文件后,源会发出任务启动请求。任务启动请求包括要启动的任务的名称以及作为命令行参数给出的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameterFileIngestTask 作业处理由名为 localFilePathJobParameter 给出的文件。由于最近没有任何请求,因此任务会在发布请求后 30 秒内启动(请参阅前面有关配置启动触发器的提示)。

检查作业执行

要检查作业检查,请运行以下命令(显示其输出)

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Tue Jun 11 15:56:27 EDT 2019│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

我们还可以列出有关该特定作业执行的更多详细信息

dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤════════════════════════════════════╗
║                  Key                  │               Value                ║
╠═══════════════════════════════════════╪════════════════════════════════════╣
║Job Execution Id                       │6                                   ║
║Task Execution Id                      │6                                   ║
║Task Instance Id                       │6                                   ║
║Job Name                               │ingestJob                           ║
║Create Time                            │Thu Jun 13 17:06:28 EDT 2019        ║
║Start Time                             │Thu Jun 13 17:06:29 EDT 2019        ║
║End Time                               │Thu Jun 13 17:06:57 EDT 2019        ║
║Running                                │false                               ║
║Stopping                               │false                               ║
║Step Execution Count                   │1                                   ║
║Execution Status                       │COMPLETED                           ║
║Exit Status                            │COMPLETED                           ║
║Exit Message                           │                                    ║
║Definition Status                      │Created                             ║
║Job Parameters                         │                                    ║
║-spring.cloud.task.executionid(STRING) │1                                   ║
║run.id(LONG)                           │1                                   ║
║localFilePath(STRING)                  │/var/scdf/shared-files/name-list.csv║
╚═══════════════════════════════════════╧════════════════════════════════════╝

验证数据

当批处理作业运行时,它会处理本地目录 (/var/scdf/shared-files) 中的文件,将每个项目的名称转换为大写,然后将其插入数据库。

使用 PivotalMySQLWeb 检查数据。

Kubernetes

本节介绍如何在 Kubernetes 上设置 Spring Batch 和 Spring Cloud Data Flow,然后创建示例批处理过程。

先决条件

本示例假设 Data Flow 安装在使用 Kafka 和 MySQL 的 Minikube 上。我们建议使用 Helm 图表。要开始,请运行以下命令

helm install --name my-release --set kafka.enabled=true,rabbitmq.enabled=false,server.service.type=NodePort stable/spring-cloud-data-flow

要在 Kubernetes 上运行此示例,需要配置 NFS 服务器并创建相应的 persistent volumepersistent volume claim 资源,如 Kubernetes NFS 配置 部分所述。我们还需要一个具有 /remote-files 目录的外部 SFTP 服务器。

注册应用程序

如果您下载了 示例项目,则可以构建 Docker 映像并将其发布到 Minikube 注册表

eval $(minikube docker-env)
./mvnw clean package docker:build -Pkubernetes

否则,您可以跳过此步骤以从 Dockerhub 拉取映像

app register --name fileIngest --type task --uri docker://springcloud/ingest

注册预打包的 sftp 源和 task-launcher 接收器应用程序

app register --name sftp --type source  --uri docker://springcloudstream/sftp-dataflow-source-kafka:3.2.1 --metadata-uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:jar:metadata:3.2.1
app register --name task-launcher --type sink --uri docker://springcloudstream/task-launcher-dataflow-sink-kafka:1.0.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:jar:metadata:1.0.1.RELEASE

创建任务

要创建任务,请运行以下命令

task create fileIngestTask --definition fileIngest

创建流

sftp 源配置为发布任务启动请求以启动 fileIngestTask 任务。启动请求使用 deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}]deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}] 部署属性将 NFS 共享挂载到任务 pod。

注意:在以下流定义中替换 <user><pass><data-flow-server-uri>。这里的 <host> 值是 VirtualBox 的默认 Minikube 网关。

要获取 <data-flow-server-uri>,请找到服务的名称并使用 minikube service 命令

kubectl get svc
...
my-release-data-flow-server     NodePort       10.97.74.123     <none>        80:30826/TCP

minikube service my-release-data-flow-server --url
http://192.168.99.105:30826
stream create inboundSftp --definition "sftp --host=192.168.99.1 --username=<user> --password=<pass> --allow-unknown-keys=true --remote-dir=/remote-files --local-dir=/staging/shared-files --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}] | task-launcher --spring.cloud.dataflow.client.server-uri=<dataflow-uri>"

dataflow-task-launcher-sink 使用由具有指数退避的动态触发器控制的 PollableMessageSource。默认情况下,接收器每秒轮询一次其输入目标。如果没有任务启动请求,则轮询周期继续加倍,最长为 30 秒。如果存在任务启动请求,则触发器重置为一秒。您可以通过在流定义中设置 task-launcher 接收器属性 trigger.periodtrigger.max-period 来配置触发器参数。

部署流

部署流时,我们还必须为 sftp 源配置卷挂载。

stream deploy inboundSftp --properties "deployer.sftp.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.sftp.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"

验证流部署

我们可以使用 stream list 命令查看要部署的流的状态,如下例所示

dataflow:>stream list
╔═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════╗
║Stream Name│                                                                                                                  Stream Definition                                                                                                                  │   Status   ║
╠═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════╣
║inboundSftp│sftp                                                                                                                                                                                                                                                 │The stream  ║
║           │--task.launch.request.deployment-properties="deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"│has been    ║
║           │--sftp.factory.password='******' --sftp.local-dir=/staging/shared-files --sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files --sftp.factory.host=192.168.99.1                                     │successfully║
║           │--task.launch.request.taskName=fileIngestTask | task-launcher --spring.cloud.dataflow.client.server-uri=http://192.168.99.105:30826                                                                                                                  │deployed    ║
╚═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════╝

检查应用程序日志

使用 kubectl 列出应用程序。sourcesink 应用程序应处于已启动状态。

kubectl get pods
NAME                                             READY   STATUS    RESTARTS   AGE
...
inboundsftp-sftp-v12-6d55d469bd-t8znd            1/1     Running   0          6m24s
inboundsftp-task-launcher-v12-555d4785c5-zjr6b   1/1     Running   0          6m24s
...

sftp 源的日志文件有助于调试 SFTP 连接失败等问题并验证 SFTP 下载。

kubectl logs inboundsftp-sftp-v12-6d55d469bd-t8znd

task-launcher 应用程序的日志有助于调试数据流连接问题并验证任务启动请求。

kubectl logs inboundsftp-task-launcher-v12-555d4785c5-zjr6b

将文件复制到远程目录

您可以在 示例项目data/ 目录中找到示例数据。

连接到 SFTP 服务器并将 data/name-list.csv 上传到 remote-files 目录。

当检测到此文件时,sftp 源会将其下载到 --local-dir 指定的 /var/scdf/shared-files 目录。我们使用为 nfs 服务配置的 /var/scdf 共享挂载路径。文件下载完成后,源会发出任务启动请求。任务启动请求包含要启动的任务的名称,以及作为命令行参数提供的本地文件路径。Spring Batch 将每个命令行参数绑定到相应的 JobParameterFileIngestTask 作业处理由名为 localFilePathJobParameter 提供的文件。由于最近没有任何请求,因此任务会在请求发布后的 30 秒内启动(请参阅前面有关配置启动触发器的提示)。

检查作业执行

要检查作业执行情况,请运行以下命令(显示其输出)

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Thu Jun 13 08:39:59 EDT 2019│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

我们还可以列出有关该特定作业执行的更多详细信息

dataflow:>job execution display --id 1
╔═══════════════════════════════════════════╤═══════════════════════════════════╗
║                    Key                    │               Value               ║
╠═══════════════════════════════════════════╪═══════════════════════════════════╣
║Job Execution Id                           │1                                  ║
║Task Execution Id                          │424                                ║
║Task Instance Id                           │1                                  ║
║Job Name                                   │ingestJob                          ║
║Create Time                                │Thu Jun 13 08:39:59 EDT 2019       ║
║Start Time                                 │Thu Jun 13 08:39:59 EDT 2019       ║
║End Time                                   │Thu Jun 13 08:40:07 EDT 2019       ║
║Running                                    │false                              ║
║Stopping                                   │false                              ║
║Step Execution Count                       │1                                  ║
║Execution Status                           │COMPLETED                          ║
║Exit Status                                │COMPLETED                          ║
║Exit Message                               │                                   ║
║Definition Status                          │Created                            ║
║Job Parameters                             │                                   ║
║-spring.cloud.task.executionid(STRING)     │424                                ║
║run.id(LONG)                               │1                                  ║
║-spring.datasource.username(STRING)        │******                             ║
║-spring.cloud.task.name(STRING)            │fileIngestTask                     ║
║-spring.datasource.password(STRING)        │******                             ║
║-spring.datasource.driverClassName(STRING) │org.mariadb.jdbc.Driver            ║
║localFilePath(STRING)                      │/staging/shared-files/name-list.csv║
║-spring.datasource.url(STRING)             │******                             ║
╚═══════════════════════════════════════════╧═══════════════════════════════════╝

验证数据

当批处理作业运行时,它会处理本地目录(/staging/shared-files)中的文件,将每个项目的名称转换为大写,并将其插入数据库。

mysql 容器中打开一个 shell 以查询 people

kubectl get pods
...
my-release-mysql-56f988dd6c-qlm8q                1/1     Running
...
kubectl exec -it my-release-mysql-56f988dd6c-qlm8q -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from dataflow.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
|         1 | AARON      | AABERG    |
|         2 | AARON      | AABY      |
|         3 | ABBEY      | AADLAND   |
|         4 | ABBIE      | AAGAARD   |
|         5 | ABBY       | AAKRE     |
|         6 | ABDUL      | AALAND    |
|         7 | ABE        | AALBERS   |
|         8 | ABEL       | AALDERINK |
|         9 | ABIGAIL    | AALUND    |
|        10 | ABRAHAM    | AAMODT    |
|      ...                           |
+-----------+------------+-----------+

限制并发任务执行

此方法使用包含 5000 多个项目的单个文件进行处理。如果我们将 100 个文件复制到远程目录中会发生什么情况?sftp 源会立即处理它们,生成 100 个任务启动请求。Dataflow 服务器异步启动任务,因此这可能会使运行时平台的资源不堪重负。例如,在本地计算机上运行 Data Flow 服务器时,每个启动的任务都会创建一个新的 JVM。在 Cloud Foundry 中,每个任务都会创建一个新的容器实例,而在 Kubernetes 中,则会创建一个 Pod。

幸运的是,Spring Cloud Data Flow 提供了配置设置来限制并发运行的任务数量

我们可以使用此示例来了解其工作原理。

降低最大并发任务执行数

示例项目在 data/spilt 目录中包含 20 个文件。为了观察限制的实际效果,我们可以将最大并发任务数设置为 3。

要在本地服务器上运行任务,请重新启动服务器并添加以下命令行参数:spring.cloud.dataflow.task.platform.local.accounts[default].maximum-concurrent-tasks=3

如果您的进程在 Cloud Foundry 上运行,请运行以下命令

cf set-env <dataflow-server> SPRING_CLOUD_DATAFLOW_TASK_PLATFORM_CLOUDFOUNDRY_ACCOUNTS[DEFAULT]_DEPLOYMENT_MAXIMUMCONCURRENTTASKS 3

如果您的进程在 Kubernetes 上运行,请通过运行以下命令编辑 Data Flow 服务器的 configmap

kubectl edit configmap my-release-data-flow-server

添加 maximum-concurrent-tasks 属性,如下所示

apiVersion: v1
data:
  application.yaml: |-
    spring:
      cloud:
        dataflow:
          task:
            platform:
              kubernetes:
                accounts:
                  default:
                    maximum-concurrent-tasks: 3
                    limits:
                      memory: 1024Mi
                      cpu: 500m

编辑 configmap 后,删除 Data Flow 服务器 pod 以强制其重新启动。然后等待其重新启动。

验证是否强制执行最大并发任务执行数

任务启动器接收器轮询输入目标。轮询周期根据任务启动请求的存在以及通过 Data Flow 服务器的 tasks/executions/current REST 端点报告的当前正在运行的任务数量进行调整。如果任务平台的并发任务数达到其限制,则接收器会查询此端点并暂停轮询输入以获取新请求。这会在创建任务启动请求和执行请求之间引入 1-30 秒的延迟,从而牺牲了一些性能以换取弹性。任务启动请求永远不会发送到死信队列,因为服务器繁忙或不可用。当没有任务启动请求时,指数退避还可以防止应用程序过度查询服务器。

监控任务执行

跟踪 task-launcher 容器日志。

您还可以监控 Data Flow 服务器以了解当前的任务执行情况

watch curl <dataflow-server-url>/tasks/executions/current
Every 2.0s: curl http://192.168.99.105:30826/tasks/executions/current

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    92    0    92    0     0   1202      0 --:--:-- --:--:-- --:--:
--  1210
[{"name":"default","type":"Kubernetes","maximumTaskExecutions":3,"runningExecutionCount":0}]

使用多个文件运行示例

部署示例流后,将 data/spilt 中的 20 个文件上传到 /remote-files 文件。在 task-launcher 日志中,您应该会看到指数退避正在工作

2019-06-14 15:00:48.247  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling period reset to 1000 ms.
2019-06-14 15:00:49.265  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:50.433  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:51.686  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:52.929  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:52.929  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:00:55.008  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:55.008  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:00:59.039  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:59.040  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:07.104  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:07.104  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 16 seconds.
2019-06-14 15:01:23.127  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling resumed
2019-06-14 15:01:23.128  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:23.232  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling period reset to 1000 ms.
2019-06-14 15:01:24.277  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:25.483  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:26.743  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:28.035  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:29.324  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:29.325  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:01:31.435  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:31.436  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:01:35.531  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:35.532  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:43.615  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:43.615  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 16 seconds.

避免重复处理

sftp 源不会处理它已经见过的文件。它使用 元数据存储 通过在运行时从消息中提取内容来跟踪文件。默认情况下,它使用内存元数据存储,但它可以插入到持久存储,这对于生产部署非常有用。因此,如果我们重新部署流或重新启动 sftp 源,则此状态将丢失并重新处理文件。

借助 Spring 的魔力,我们可以自动配置一个可用的持久元数据存储来防止重复处理。

在本例中,我们将 自动配置 JDBC 元数据存储,因为我们已经在使用 JDBC 数据库。

配置并构建 SFTP 源

为此,我们向 sftp-dataflow 源添加了一些 JDBC 依赖项。

克隆 [sftp]https://github.com/spring-cloud-stream-app-starters/sftp 流应用启动器。将目录更改为 SFTP 目录。在以下命令中,将 <binder> 替换为 kafkarabbit(根据您的配置选择),然后运行该命令

./mvnw clean install -DskipTests -PgenerateApps
cd apps/sftp-dataflow-source-<binder>

将以下依赖项添加到 pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>

如果您在 Kubernetes 上运行,请使用 mariadb 驱动程序而不是 H2

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.3.0</version>
</dependency>

如果您在使用内存中 H2 数据库的本地服务器上运行,请在 src/main/resources/application.properties 中设置 JDBC URL 以使用 Data Flow 服务器的数据库

spring.datasource.url=jdbc:h2:tcp://:19092/mem:dataflow

如果您在 Kubernetes 上运行,请将数据源设置为使用 mysql 服务的内部 IP,如下例所示

spring.datasource.url=jdbc:mysql://10.98.214.235:3306/dataflow

如果您在 Cloud Foundry 或 Kubernetes 中运行,请将以下属性添加到 src/main/resources/application.properties

spring.integration.jdbc.initialize-schema=always

构建 sftp 源并将其注册到 Data Flow。

运行示例应用程序

按照说明在您首选的平台上运行示例,直到 复制文件... 步骤。

如果您已经完成了主要练习,请将数据恢复到初始状态并重新部署流

  1. 清理本地和远程数据目录。
  2. 在数据库中运行 DROP TABLE PEOPLE; SQL 命令。
  3. 取消部署流并再次部署它以运行更新后的 sftp 源。

如果您在 Cloud Foundry 中运行,请设置部署属性以将 sftp 绑定到 mysql 服务,如下例所示

dataflow>stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs,mysql"

将文件复制到远程目录

我们使用一个小文件来完成此操作。示例项目中的 data/split 目录包含 data/name-list.csv 的内容,这些内容被拆分到 20 个文件中。上传 names_aa.csv

检查数据库

使用前面介绍的数据库工具查看 INT_METADATA_STORE 表的内容。下图显示了结果

JDBC Metadata Store

请注意,这里有一个键值对,其中键标识文件名(sftpSource/ 前缀为 sftp 源应用程序提供了一个命名空间)。值是一个时间戳,指示何时接收到消息。元数据存储跟踪已处理的文件。这可以防止在每个轮询周期中从远程目录中拉取相同的文件。只有新文件或已更新的文件才会被处理。

由于 PEOPLE 表上没有唯一性约束,因此我们的批处理作业多次处理一个文件会导致表行重复。由于我们配置了一个持久化的元数据存储,因此可以防止在容器重启时出现重复处理。您可以通过取消部署和重新部署流或重新启动 sftp 源来验证这一点。

如果我们查看 PEOPLE 表,它应该类似于这样

People table

现在我们可以将相同的文件上传到 SFTP 服务器。如果您已登录到该服务器,则可以更新时间戳,如下所示

touch /remote-files/names_aa.csv

现在该文件被重新处理,并且 PEOPLE 表包含重复数据。如果您 ORDER BY FIRST_NAME,您将看到如下内容

People table with duplicates

如果我们将另一个文件放到远程目录中,该文件将被处理,并且我们会在元数据存储中看到另一个条目。