Airflow 出自 Airbnb,是一款强大的工作流管理平台。在这个平台下 ,可以很快速的构建一个 Data Pipeline。然而,如果想保证 Airflow 在生产环境中较高的可用性,还是需要做一些工作提升系统的容错能力。
可以将 DAG 视做一组相对独立的的任务集合,DAG 定义了任务集合的执行方式,一个好的 Data Pipeline 需要可读、易运维,更需要保证灾难恢复的时效。一个完整的工作流执行需要 5 个小时执行,如果执行中途偶遇故障,重新执行需要多久?对这个时间的保证,应该在 DAG 设计之初成为底线,贯穿始终。我们来看下面的例子:
1、Extract.Feature 完成基础特征抽取,每天凌晨 3 点 30 执行。
2、Extract.Click 完成点击分析,Schedule 为 None。
3、EXtract.Download 将数据从 ODPS 下载到本地,转存为 CSV 文件,Schedule 为 None。
上述三个 DAG 其实是一个相关的业务组,在实现中根据业务相关性,将其拆分 ,并通过 TriggerDagRunOperator 管理依赖关系。
先看一个 DAG 的代码定义,及其 Graph View:
这个 DAG 有一定的复杂度,Airflow 调度多台机器协同计算,完成一个机器学习项目的离线推理过程,Airflow 部署在其中一个机器,或通过 SSH 的方式向其他机器发送命令,或在本地处理。经过层层优化,所有机器加起来,刚好在规定时间内完成计算任务。
图中三个失败的 sort_task 让这张图显得不够完美,但事发之时,我也没有丝毫担心,因为 sort_task 的位置决定了他的状态无障大局,使用 Airflow 提供的命令行工具可以轻松回补。
以 Scoring.Run.train.0 为例查看路径 [scoreing_run_task,sort_task,raw_to_odps_task] 上各节点的用时占比:
从时间上看,scoreing_run_task 无疑是重点,而所有的 scoreing_run_task 的实例则是 DAG 的核心路径:
在任务编排时,将 sort_task(在上图中是:Scoring.Run.Sort.train.4.6)抽离到 Airflow 所在节点,时间上与 scoreing_run_task 的后序实例(在上图是:Scoring.Run.train.4.7)保持并发,这种编排在最大程度上保证任务用时,同时避免 sort_task 产生意外殃及池鱼。
在上一小节的示例 DAG 中,有三个处于错误状态的任务,错误的原因也很巧合,与本例有关:
大概看了一眼便明白,sort_task 并发量过高(取决于前序任务的并发量)导致内存不足,另外,同时在运行的另外一个 DAG 也有一类消耗内存的任务在多并发执行。在代码层级,我们很难以一种经济的手段去控制不同任务的的并发量,这种情况就像明星无法避免在红毯上与人尴尬的撞衫……
了解之后发现 Airflow 在 BaseOperator 层实现了并发控制。 打开 UI,在 Admin >> Pools 菜单中,添加一个 key,设置一个合适的 Slots:
接下来分别在不同的 DAG 中修改任务。事实上,Airflow(非集群模式下)能做的也只是并发量的控制,至于每个函数实现到底用多少资源,还是需要自己控制。但对我而言,内存不足的问题已经完美解决。
“除非确定,否则别轻易关闭”,这些参数可以放到全局的 default_args 中,邮件通知第一时间让我获得的异常信息。如果手速快,在处理完异常之后,系统可能已经自愈了。
关于重试,需要提一下 SubDag,如果 SubDag 中的某个任务发生了错误,那么重试不是从当前错误的任务开始。他会从头再来,从本质上看,SubDag 其实也是一个 Task。
exit(0) 代表的唯一意义是 业务层的成功,在使用 BashOperator 时最容易遭遇 exit(0) 陷阱,直接的后果是所有的任务看起来成功,但事实上结果不对。
核对日志会发现,在不起眼的地方,产生了不起眼的异常,而这个异常被不正确的 exit(0) 掩盖掉了,避免的方式也很简单:
1、command1 && command2 && ... ,
2、在 bash 头部添加: set -e
我们使用 Airflow 的经验有限, 相关总结难免有疏漏和不足之处, 也希望和大家一起学习成长。
作者介绍
云脑科技高级软件研发工程师 田间
飞信社区 BI 团队负责人, 负责搭建商业智能平台, 为飞信社区数据化运营提供决策支持, 完成飞信好友推荐系统的孵化开发。
翻东西数据团队架构师, 负责电商数据的抓取及挖掘。
前融云架构师,负责公司大数据平台及DevOps系统。
在大数据及分布式系统领域有着丰富的实战经验。