Airflow简介

包括一些简单示例

Posted by Haiming on July 12, 2020

作为一个Data Engineer,往往大家都是从ETL开始做起。那么Airflow就是ETL之中不可缺少的一个工具。

本文会对相关的基础概念进行介绍,并且搭配一些例子。

参考

1. 简介

先讲什么是ETL。ETL是Extract, transform, load三个词的组合,先看Wikipedia的定义:

In computing, extract, transform, load (ETL) is the general procedure of copying data from one or more sources into a destination system which represents the data differently from the source(s) or in a different context than the source(s). The ETL process became a popular concept in the 1970s and is often used in data warehousing.[1]

说白了,就是将信息从多个源传递到多个目的地,同时加入相应的处理。

Airflow就是这样的一款工具,其可以以非常灵活的方式来支持数据的ETL过程,同时孩支持插件完成比如HDFS监控(Hadoop 分布式文件系统),邮件通知等等功能。

其提供了大量的python SDK, 可以使用户在其规范下面,使用python来定义各个ETL节点之间的执行工作,节点关系;同时指定执行计划,失败策略等等,提交到Airflow之后,平台可以根据计划来自动执行。同时还提供了一个Web UI来查看数据流程的执行和支持一部分的简单操作。

2. 概念

AIrflow之中的几个主要概念:

  1. Operators: Airflow定义的一系列算子/操作符,更直接的理解就是python class。不同的operator类实现了不同的功能,比如:
    1. BashOperator: 可以执行用户的一个Bash命令
    2. PythonOperator:可以执行用户指定的一个python函数
    3. EmailOperator:可以进行邮件发送
    4. Sensor:感知器/触发器,可以定义触发条件和动作,在条件满足的时候执行某个动作。例如DatabaseSensorFileSensor等等。
  2. Tasks:其就是Operators的具体事例,在某个Operator上面指定了具体内容。很像OO概念之中的对象。
  3. Task Instances:一个Task的一次运行会产生一个实例
  4. DAGS: 有向无环图,包括的是一系列的taskstasks之间的链接关系。

那么简单梳理,其实使用Airflow的过程,就是定义以上概念的过程:

  1. 根据实际的需要,使用不同Operator
  2. 传入具体的参数,定义一系列的Tasks
  3. 定义Tasks之间的关系,形成一个DAG
  4. 调度DAG运行,此时每一个task会生成一个task instance
  5. 使用命令行或者Web UI进行管理和查看

3. 安装

实际上官网部分的get started主要就是教你如何安装,此处不再赘述,只是复制官网内容:

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

4. 配置

可以看到,在上面的命令之中,我们有一行:

export AIRFLOW_HOME=~/airflow

其中就有我们的数据库文件和配置文件,配置文件是airflow.cfg ,下面就用SMTP举例:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = HOST
smtp_starttls = False
smtp_ssl = False
smtp_user = USER_NAME
smtp_password = PASSWORD
smtp_port = 25
smtp_mail_from = FROM_EMAIL

其他的相关配置也是在其中如法炮制,就不赘述了。

5. 示例

来源:https://airflow.apache.org/docs/stable/tutorial.html

此处的代码,是有三个功能:

  1. BashOperator打印日期
  2. BashOperator睡眠5秒
  3. BashOperator执行模板之中的bash命令

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """

"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

6. 一些常见问题

6.1 时区问题

Airflow之中默认的是UTC时区,想要在+8区使用,需要减去8。比如希望在每天20:00开始执行,那么实际应该填写的时间是12:00.

6.2 定时问题

有一些有趣的参数:

  1. start_date: 流程开始调度的时间,可以早或者晚于当前时间

  2. end_date:流程结束调度的时间,从字面意义来看,这一定要晚于当前时间

  3. catch_up:假设在start_date之中,指定的开始是按早于当前时间,且catch_up设置为true,那么airflow会将过去“遗漏”的调度再执行一遍,且中间不会再进行时间等待:

    如果今天的时间是2018-04-12 08:00, 流程的定时策略是每天上午10:00执行,那么schedule_interval=’00 02 * * *’ (减8小时) 如果start_date是 2018-04-01,且catch_up为true。那么在提交到平台后,Airflow会开始从2018-04-01的日期开始调度执行,执行11次到2018-04-11。 Airflow此时等待到10:00,执行2018-04-12当天的流程