其他分享
首页 > 其他分享> > 你不可不知的任务调度神器-AirFlow

你不可不知的任务调度神器-AirFlow

作者:互联网

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

大数据真好玩

点击右侧关注,大数据真好玩!

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

Airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。AirFlow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,Airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且Airflow提供了监控和报警系统。

Airflow 使用 DAG (有向无环图) 来定义工作流,配置作业依赖关系非常方便,从管理方便和使用简单角度来讲,AirFlow远超过其他的任务调度工具。

Airflow 的天然优势

Airflow 是免费的,我们可以将一些常做的巡检任务,定时脚本(如 crontab ),ETL处理,监控等任务放在 AirFlow 上集中管理,甚至都不用再写监控脚本,作业出错会自动发送日志到指定人员邮箱,低成本高效率地解决生产问题。但是由于中文文档太少,大多不够全全,因此想快速上手并不十分容易。首先要具备一定的 Python 知识,反复阅读官方文档,理解调度原理。本系列分享由浅入深,逐步细化,尝试为你揭开 AirFlow 的面纱。

AirFlow 的架构和组成

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

AirFlow的架构图如上图所示,包含了以下核心的组件:

其中主要的部件介绍如下:

Scheduler

调度器。调度器是整个airlfow的核心枢纽,负责发现用户定义的dag文件,并根据定时器将有向无环图转为若干个具体的dagrun,并监控任务状态。

Dag 有向无环图。有向无环图用于定义任务的任务依赖关系。任务的定义由算子operator进行,其中,BaseOperator是所有算子的父类。

Dagrun 有向无环图任务实例。在调度器的作用下,每个有向无环图都会转成任务实例。不同的任务实例之间用dagid/ 执行时间(execution date)进行区分。

Taskinstance dagrun下面的一个任务实例。具体来说,对于每个dagrun实例,算子(operator)都将转成对应的Taskinstance。由于任务可能失败,根据定义调度器决定是否重试。不同的任务实例由 dagid/执行时间(execution date)/算子/执行时间/重试次数进行区分。

Executor 任务执行器。每个任务都需要由任务执行器完成。BaseExecutor是所有任务执行器的父类。

LocalTaskJob 负责监控任务与行,其中包含了一个重要属性taskrunner。

TaskRunner 开启子进程,执行任务。

AirFlow安装和初体验

安装 AirFlow 需要 Pyhton环境,关于环境的安装大家可以自行查询,不在展开。这里我们直接使用python的pip工具进行 AirFlow 的安装:

# airflow 需要 home 目录,默认是~/airflow,
# 但是如果你需要,放在其它位置也是可以的
# (可选)
export AIRFLOW_HOME = ~/airflow

# 使用 pip 从 pypi 安装
pip install apache-airflow

# 初始化数据库
airflow initdb

# 启动 web 服务器,默认端口是 8080
airflow webserver -p 8080

# 启动定时器
airflow scheduler

# 在浏览器中浏览 localhost:8080,并在 home 页开启 example dag

AirFlow默认使用sqlite作为数据库,直接执行数据库初始化命令后,会在环境变量路径下新建一个数据库文件airflow.db。当然了你也可以指定 Mysql 作为 AirFlow的数据库,只需要修改airflow.conf 即可:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://root:xxxxxx@localhost:3306/airflow

安装完毕,启动 AirFlow我们进入 UI页面可以看到:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk= 当然我们还可以切换到树视图模式:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

此外,还支持图标视图、甘特图等模式,是不是非常高大上?

Hello AirFlow!

到此我们本地已经安装了一个单机版本的 AirFlow,然后我们可以根据官网可以做一个Demo来体验一下 AirFlow的强大。首先在此之前,我们要介绍一些概念和原理:

我们在编写AirFlow任务时,AirFlow到底做了什么?

那么我们就需要新增一个自己的Dag文件,我们直接使用官网的例子,这是一个典型的ETL任务:

"""
### ETL DAG Tutorial Documentation
This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
as part of the documentation that goes along with the Airflow Functional DAG tutorial located
[here](https://airflow.apache.org/tutorial_decorated_flows.html)
"""
# [START tutorial]
# [START import_module]
import json

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
}
# [END default_args]

# [START instantiate_dag]
with DAG(
    'tutorial_etl_dag',
    default_args=default_args,
    description='ETL DAG tutorial',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
) as dag:
    # [END instantiate_dag]
    # [START documentation]
    dag.doc_md = __doc__
    # [END documentation]

    # [START extract_function]
    def extract(**kwargs):
        ti = kwargs['ti']
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push('order_data', data_string)

    # [END extract_function]

    # [START transform_function]
    def transform(**kwargs):
        ti = kwargs['ti']
        extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push('total_order_value', total_value_json_string)

    # [END transform_function]

    # [START load_function]
    def load(**kwargs):
        ti = kwargs['ti']
        total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value')
        total_order_value = json.loads(total_value_string)

        print(total_order_value)

    # [END load_function]

    # [START main_flow]
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
    )
    extract_task.doc_md = """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""

    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
    )
    transform_task.doc_md = """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""

    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
    )
    load_task.doc_md = """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""

    extract_task >> transform_task >> load_task

# [END main_flow]

# [END tutorial]

tutorial.py这个文件需要放置在airflow.cfg设置的 DAGs 文件夹中。DAGs 的默认位置是~/airflow/dags。然后执行以下命令:

python ~/airflow/dags/tutorial.py

如果这个脚本没有报错,那就证明您的代码和您的 Airflow 环境没有特别大的问题。我们可以用一些简单的脚本查看这个新增的任务:

# 打印出所有正在活跃状态的 DAGs
airflow list_dags

# 打印出 'tutorial' DAG 中所有的任务
airflow list_tasks tutorial

# 打印出 'tutorial' DAG 的任务层次结构
airflow list_tasks tutorial --tree

然后我们就可以在上面我们提到的UI界面中看到运行中的任务了!

AirFlow本身还有一些常用的命令:

backfill            Run subps of a DAG for a specified date range
list_tasks          List the tasks within a DAG
clear               Clear a set of task instance, as if they never ran
pause               Pause a DAG
unpause             Resume a paused DAG
trigger_dag         Trigger a DAG run
pool                CRUD operations on pools
variables           CRUD operations on variables
kerberos            Start a kerberos ticket renewer
render              Render a task instance's template(s)
run                 Run a single task instance
initdb              Initialize the metadata database
list_dags           List all the DAGs
dag_state           Get the status of a dag run
task_failed_deps    Returns the unmet dependencies for a task instance
                    from the perspective of the scheduler. In other words,
                    why a task instance doesn't get scheduled and then
                    queued by the scheduler, and then run by an executor).
task_state          Get the status of a task instance
serve_logs          Serve logs generate by worker
test                Test a task instance. This will run a task without
                    checking for dependencies or recording it's state in
                    the database.
webserver           Start a Airflow webserver instance
resetdb             Burn down and rebuild the metadata database
upgradedb           Upgrade the metadata database to latest version
scheduler           Start a scheduler instance
worker              Start a Celery worker node
flower              Start a Celery Flower
version             Show the version
connections         List/Add/Delete connections

总体来看,AirFlow的上手难度和特性支持都还不错,同时还有比较不错的扩展性。如果用户熟悉Python能进行一些定制化开发,简直不要太爽!

而且,Airflow 已经在 Adobe、Airbnb、Google、Lyft 等商业公司内部得到广泛应用;国内,阿里巴巴也有使用(Maat),业界有大规模实践经验。

快来试一试吧!

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

Hadoop YARN:调度性能优化实践

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

大数据可视化从未如此简单 - Apache Zepplien全面介绍

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

JVM性能调优实践—G1垃圾收集器全视角解析

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

版权声明:

本文为大数据技术与架构原创和整理,转载请联系授权。未经作者允许转载追究侵权责任。

微信公众号|import_bigdata

编辑 《大数据技术与架构》

欢迎点赞+收藏+转发朋友圈素质三连

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

文章不错?点个【在看】吧! ????

标签:AirFlow,value,task,神器,任务,airflow,任务调度,DAG
来源: https://blog.51cto.com/u_9928699/2898692