编程语言
首页 > 编程语言> > Python Apache Beam管道状态API调用

Python Apache Beam管道状态API调用

作者:互联网

当前,我们有一个Python Apache Beam管道正在运行并且能够在本地运行.现在,我们正在使管道在Google Cloud Dataflow上运行,并实现了完全自动化,但是在Dataflow / Apache Beam的管道监视中发现了局限性.

当前,Cloud Dataflow通过其UI界面或命令行中的gcloud有两种监视管道状态的方法.这两种解决方案都不适用于完全自动化的解决方案,在该解决方案中,我们可以考虑无损文件处理.

查看Apache Beam的github,他们有一个文件internal/apiclient.py,该文件显示有一个用于获取作业状态的函数get_job.

我们发现使用get_job的一个实例是runners/dataflow_runner.py.

最终目标是使用此API获取作业或我们自动触发运行的多个作业的状态,以确保最终通过管道成功处理了所有作业.

在运行管道(p.run())之后,谁能向我们解释如何使用此API?我们不知道响应者= Runner.dataflow_client.get_job(job_id)的来源.

如果有人可以在设置/运行我们的管道的过程中更深入地了解我们如何访问此API调用,那就太好了!

解决方法:

我最终只是摆弄代码,发现了如何获得工作细节.我们的下一步是查看是否有办法获取所有作业的列表.

# start the pipeline process
pipeline                 = p.run()
# get the job_id for the current pipeline and store it somewhere
job_id                   = pipeline.job_id()
# setup a job_version variable (either batch or streaming)
job_version              = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
# setup "runner" which is just a dictionary, I call it local
local                    = {}
# create a dataflow_client
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version)
# get the job details from the dataflow_client
print local['dataflow_client'].get_job(job_id)

标签:pipeline,apache-beam,python,google-cloud-dataflow
来源: https://codeday.me/bug/20191112/2024100.html