作为一个Data Engineer,往往大家都是从ETL开始做起。那么Airflow就是ETL之中不可缺少的一个工具。
本文会对相关的基础概念进行介绍,并且搭配一些例子。
1. 简介
先讲什么是ETL。ETL是Extract, transform, load三个词的组合,先看Wikipedia的定义:
In computing, extract, transform, load (ETL) is the general procedure of copying data from one or more sources into a destination system which represents the data differently from the source(s) or in a different context than the source(s). The ETL process became a popular concept in the 1970s and is often used in data warehousing.[1]
说白了,就是将信息从多个源传递到多个目的地,同时加入相应的处理。
Airflow就是这样的一款工具,其可以以非常灵活的方式来支持数据的ETL过程,同时孩支持插件完成比如HDFS监控(Hadoop 分布式文件系统),邮件通知等等功能。
其提供了大量的python SDK, 可以使用户在其规范下面,使用python来定义各个ETL节点之间的执行工作,节点关系;同时指定执行计划,失败策略等等,提交到Airflow之后,平台可以根据计划来自动执行。同时还提供了一个Web UI来查看数据流程的执行和支持一部分的简单操作。
2. 概念
AIrflow之中的几个主要概念:
Operators
: Airflow定义的一系列算子/操作符,更直接的理解就是python class。不同的operator类实现了不同的功能,比如:BashOperator
: 可以执行用户的一个Bash命令PythonOperator
:可以执行用户指定的一个python函数EmailOperator
:可以进行邮件发送Sensor
:感知器/触发器,可以定义触发条件和动作,在条件满足的时候执行某个动作。例如DatabaseSensor
,FileSensor
等等。
Tasks
:其就是Operators
的具体事例,在某个Operator
上面指定了具体内容。很像OO概念之中的对象。Task Instances
:一个Task的一次运行会产生一个实例DAGS
: 有向无环图,包括的是一系列的tasks
和tasks
之间的链接关系。
那么简单梳理,其实使用Airflow的过程,就是定义以上概念的过程:
- 根据实际的需要,使用不同
Operator
- 传入具体的参数,定义一系列的
Tasks
- 定义
Tasks
之间的关系,形成一个DAG - 调度DAG运行,此时每一个task会生成一个task instance
- 使用命令行或者Web UI进行管理和查看
3. 安装
实际上官网部分的get started主要就是教你如何安装,此处不再赘述,只是复制官网内容:
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
4. 配置
可以看到,在上面的命令之中,我们有一行:
export AIRFLOW_HOME=~/airflow
其中就有我们的数据库文件和配置文件,配置文件是airflow.cfg
,下面就用SMTP举例:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = HOST
smtp_starttls = False
smtp_ssl = False
smtp_user = USER_NAME
smtp_password = PASSWORD
smtp_port = 25
smtp_mail_from = FROM_EMAIL
其他的相关配置也是在其中如法炮制,就不赘述了。
5. 示例
来源:https://airflow.apache.org/docs/stable/tutorial.html
此处的代码,是有三个功能:
BashOperator
打印日期BashOperator
睡眠5秒BashOperator
执行模板之中的bash命令
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> [t2, t3]
6. 一些常见问题
6.1 时区问题
Airflow之中默认的是UTC时区,想要在+8区使用,需要减去8。比如希望在每天20:00开始执行,那么实际应该填写的时间是12:00.
6.2 定时问题
有一些有趣的参数:
-
start_date
: 流程开始调度的时间,可以早或者晚于当前时间 -
end_date
:流程结束调度的时间,从字面意义来看,这一定要晚于当前时间 -
catch_up
:假设在start_date
之中,指定的开始是按早于当前时间,且catch_up
设置为true,那么airflow会将过去“遗漏”的调度再执行一遍,且中间不会再进行时间等待:如果今天的时间是2018-04-12 08:00, 流程的定时策略是每天上午10:00执行,那么schedule_interval=’00 02 * * *’ (减8小时) 如果start_date是 2018-04-01,且catch_up为true。那么在提交到平台后,Airflow会开始从2018-04-01的日期开始调度执行,执行11次到2018-04-11。 Airflow此时等待到10:00,执行2018-04-12当天的流程