0%

Airflow 用户指南

Airflow用户指南,基于我的wiki提炼出来的内容,简单描述了使用Airflow过程中有什么注意事项及需要关注的点

依赖关系

根据committer成员的讨论,更加推荐使用位操作符(bitwise operators)来解决依赖

  • 最原始的方式task.set_upstream(task1); task.set_downstream(task2)

  • 位操作(bitwise operators)方式,airflow1.8之后task >> task1; task << task1; task >> task1 << task2

    • DAG的位操作符号可以提供更多的功能: t1 >> [t2, t3] >> t4已经能被支持了
  • 使用chain方法实现多个task的依次依赖(Airflow 1.10.3已经取消chain方法)

    • chain的一般使用

      1
      2
      from airflow.utils.helpers import chain
      chain(task, task1, task2)
    • 通过列表解析直接生成task列表然后chain起来

      1
      2
      3
      from airflow.utils.helpers import chain
      ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
      chain(cond_true, *ds_true)
  • 一对多的链接关系

    • t1 >> [t2, t3](推荐)
    • group = [task1, task2, task3]; task.set_downstream(group);
  • 多对一的链接关系

    • [t1, t2] >> t3(推荐)
    • group = [task1, task2, task3]; task.set_upstream(group)
  • 多对多的笛卡尔积

    • airflow.utils.helper.cross_downstream([t1, t2, t3], [t4, t5, t6])(推荐)

    • 使用自定义的方法

      1
      2
      3
      4
      5
      6
      import itertools
      import airflow.utils.helper.chain
      group_a=[task1, task2, task3]
      group_b=[task4, task5]
      for pair in itertools.product(group_a, group_b):
      chain(*pair)
  • 如果有根据一定条件选择下游执行哪个task操作的逻辑,可以使用BranchPythonOperator算子,使用是可以通过TriggerRule.ONE_SUCCESS设置实现.例如例子A是BranchPythonOperator,一个分支运行B,另一个分支运行C,同时B->C,这时可以在C中设置TriggerRule.ONE_SUCCESS.以前我总认为一个该这样实现,会多个两个算子

时间相关

  • 使用Airflow内置的日期宏
    • airflow内置了部分时间相关的参数,如 '{{ ds }}'
      代表运行的时间, '{{ yesterday_ds }}'
      运行时间昨天的日期,更多时间相关的参数见这里
  • 自定义时间参数
    • 简单的时间操作: 通过replace完成,获取运行日期同时改变成特殊的时间 some_command.sh {{ execution_date.replace(day=1) }}
    • 通过macros对时间进行更多操作: macros.ds_add将内置时间进行计算 '{{ macros.ds_add(ds, 1) }}'

资源限制

作为一个工作流工具,除了完成各种复杂的上下游关系外,我认为解决资源的限制也是很重要的点.资源限制包括限制DAG的并发,限制多个DAG的运行关系.限制同一个/同一类Task的并发

DAG的限制

  • 限制dag并行实例数量

    • airflow.cfg[core]设置dag_concurrency限制并行数量

    • 在DAG文件中限制

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      from airflow import DAG

      default_args = {
      'owner': 'airflow',
      # here to set value
      'concurrency': 10
      }

      dag = DAG(
      'tutorial',
      default_args=default_args,
      description='A simple tutorial DAG',
      schedule_interval=timedelta(days=1),
      )

Task的限制

  • 限制task的并行数量: operetor中的参数task_concurrency可以设置task的并行数量
  • 限制多个不同类型的task并行数量: operetor中的参数pool限制一类task的并行数量,与task_concurrency参数的区别是task_concurrency设置的是同一个task的并行数task_id要相同,pool设置的是一类task的并行数task_id可以不同,只要保证pool参数的名称相同就可以.设置后并行的task不会超过pool对象的slots

将Airflow中的对象通过DAG或者脚本的方式进行保存

connection variables pool

目前Airflow创建connection variables pool能通过如下方式创建:

  • Airflow的cli命令创建,对应命令分别为: airflow connections --add airflow variables -s airflow pools -s
  • Airflow的web UI页面进行设置,分别为: Admin -> connections Admin -> variables Admin -> pools

这里提供一个将connection variables pool固定到DAG的方法,查看这里.主要是之前使用docker-airflow每次重启时都会清空postgre数据,这样能保证connection variables pool能被git进行版本管理,下面以connections的创建为例子,进行说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# conf.py
var = {
'connections': [
{
'conn_id': 'ssh_my_own_1',
'conn_type': 'ssh',
'host': '127.0.0.2',
'port': 22,
'login': 'root',
'password': 'pwd',
},
{
'conn_id': 'ssh_my_own_2',
'conn_type': 'ssh',
'host': '127.0.0.3',
'port': 22,
'login': 'root',
'password': 'pwd',
},
...
]
}
# init_conn_var.py
from airflow import DAG, Connection
from airflow.setting import Session
from airflow.operators.python_operator import PythonOperator

def crt_airflow_conn(conf):
conn = Connection()
conn.conn_id = conf.get('conn_id')
conn.conn_type = conf.get('conn_type')
conn.host = conf.get('host')
conn.port = conf.get('port')
conn.login = conf.get('login')
conn.password = conf.get('password')
conn.schema = conf.get('schema')
conn.extra = conf.get('extra')

session = Session()
try:
exists_conn = session.query(Connection.conn_id == conn.conn_id).one()
except exc.NoResultFound:
logging.info('connection not exists, will create it.')
else:
logging.info('connection exists, will delete it before create.')
session.delete(exists_conn)
finally:
session.add(conn)
session.commit()
session.close()

dag = DAG(
dag_id='create_conn',
schedule_interval='@once',
)

for connection in conf.get('connection'):
crt_conn = PythonOperator(
task_id='create_conn_{}'.format(connection.get('conn_id')),
pyhton_callable=crt_airflow_conn,
op_kwargs={'conf': connection},
provide_context=False,
dag=dag,
)

用户创建

  • 通过Airflow UI页面进行创建Airflow -> Admin -> User,目前创建补支持密码

  • 使用cli命令行进行用户创建airflow create_user -r <ROLE> -u <USERNAME> -e <EMAIL> -p <PASSWORD>

  • 通过自定义脚本实现,将如下脚本放到AIRFLOW_HOME中,当需要创建用户的时候可以运行脚本进行交互式的创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import getpass

    import airflow
    from airflow import models, settings
    from airflow.contrib.auth.backends.password_auth import PasswordUser

    IS_CORRECT = "Y"
    HINT_THIS_SCRIPT = "\nhint!!\n==> YOU RUN THIS SCRIPT TO CREATE AIRFLOW USER NOW\n"
    HINT_USER = "Please enter username you want to create: "
    HINT_EMAIL_WITH_USER = "Please enter email for user `{username}`: "
    HINT_PASSWORD_WITH_USER = "Please enter password for user `{username}`: "
    HINT_CONFIRM_USER_PASSWORD = "\nhint!! > you want to add user `{username}` with email `{email}`\n" \
    "enter 'Y/y' to confirm the information\nor enter other key to reinput information\n>> "

    user = PasswordUser(models.User())

    while True:
    print(HINT_THIS_SCRIPT)
    user.username = input(HINT_USER)
    user.email = input(HINT_EMAIL_WITH_USER.format(username=user.username))
    user.password = getpass.getpass(HINT_PASSWORD_WITH_USER.format(username=user.username))
    correct = input(HINT_CONFIRM_USER_PASSWORD.format(username=user.username, email=user.email))

    if correct.strip().upper() == IS_CORRECT:
    break

    session = settings.Session()
    session.add(user)
    session.commit()
    session.close()

DAG开发流程

调试

  • airflow创建及调试的顺序
    • 上传/更新DAG文件
    • 检测语法有没有错误python <文件名>
    • 使用airflow test运行单个taskairflow test DAG_ID TASK_ID execute_date

Airflow operator

这里记录几个Airflow里面常用但是比较难理解的operator

  • BranchPythonOperator: 通过不同的情况运行对应的下游task.通过python_callable参数的返回值确定下游要运行的task,返回值的名称就是要运行task的task_id

docker-airflow

update at 2019-04-05: 已经在我的仓库中创建了zhongjiajie/docker-airflow定期将puckel/docker-airflow中优秀的PR合并到master,上面还有一套我自己使用的环境branch-custom

较常用的镜像是puckel/docker-airflow,这个镜像维护人的热度不高,且airflow官方的docker进行进行,后期可能会不使用这个版本.下面说明他可能存在的问题

  • 将数据库从postgresql切换到mysql:按照airflow官网的方式直接增加AIRFLOW__CORE__SQL_ALCHEMY_CONN变量没有效果,因为这个repo的scripts/entrypoint.sh有一句AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"指定了数据库的类型和链接信息,即使你在docker-compose指定了AIRFLOW__CORE__SQL_ALCHEMY_CONN也会被scripts/entrypoint.sh覆盖掉,目前比较可取的方法灵感来自是这个issue的答案,将AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"改成: "${AIRFLOW__CORE__SQL_ALCHEMY_CONN:="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"}"

FAQ

  • 自定义operator后发现不能运行但是代码没有问题,有可能是自定义operator中的参数和Airflow内部变量的参数同名,如同ariflow date error中的原因,就是因为子自定义的Operator中定义了一个start_date变量,并把变量声明成template_fields导致的错误
  • airflow schedule_interval设置了@once之后dag一直hung,是因为airflow.cfg中的catchup_by_default设为了True,或者DAG默认参数设为了True,解决上面的问题,只要设置会正确值重启就可.has usage of @once for scheduler interval changed in v1.9

Ref