Airflow 基础 | Airflow 基础系列-01:Airflow组件

HBase技术社区

共 3438字,需浏览 7分钟

 ·

2021-12-27 17:05


Apache Airflow 微信公众号是由一群Airflow的爱好者一起维护的,我们旨在普及Airflow知识,给广大Airflow用户搭建一个交流的平台。


前期我们会发布一些Airflow的基本知识文章。本文是Apache Airflow 公众号的开篇之作,主要介绍Airflow的整体架构以及主要的组件。


Airflow 架构


下图是Airflow的架构图,

我们可以看到Airflow主要有4个组件:

  • WebServer

  • Scheduler

  • Executor

  • Database


WebServer - Airflow UI


WebServer 实际上是一个Python的Flask app。你可以在Airflow WebServer的UI界面上看到调度作业的状态。作业的信息都存储在数据库里,WebServer负责查询数据库并在页面上展示作业信息。WebServer 组件也负责读取并展示Remote的作业日志(Airflow的作业日志可以存放在S3,Google Cloud Storage,AzureBlobs,ElasticsSearch等等)。


Scheduler - 作业调度器


Scheduler 是一个多线程的Python进程。Scheduler 通过检查DAG的task依赖关系以及数据库里各个task的状态来决定接下来跑哪个task,什么时候跑以及在哪里跑。


Executor - 作业执行器


Airflow 支持以下4种类型的 Executor

  • SequentialExecutor 按照线性的方式运行task,没有并发和并行。一般用在开发环境里用。

  • LocalExecutor 支持并行和多行程,一般用在单节点的Airflow里。

  • CeleryExecutor 是在分布式环境下执行器。但是依赖第三方的message queue组件来调度task到worker节点,message queue可以用Redis,RabbitMQ。

  • KubernetesExecutor 是 Airfow 1.10新引入的一个执行器,主要用在K8s环境里。


Metadata Database - 元数据数据库


元数据数据库可以是任何支持 SQLAlchemy的数据库(比如Postgres,MySql)。Scheduler 通过修改数据库来更新task状态,WebServer会读取数据库来展示作业状态





Airflow 是如何调度的


  1. Schduler 会扫描dags文件夹,在元数据数据库里创建DAG对应的记录。根据配置,每一个DAG都会分配若干个进程。

  2. 每个进程都会扫描对应的DAG文件,根据调度配置参数创建DagRuns。然后每一个满足被调度条件的Task都会实例出来一个TaskInstance,TaskInstance会被初始化为Scheduled的状态,并在数据库里更新。

  3. Scheduler 进程查询数据库拿到所有Scheduled状态的tasks,并把他们发送到Executor(对应TaskInstance的状态更新为QUEUED)

  4. Worker会从queue里拉取task并执行。TaskInstance的状态由QUEUED转变为 RUNNING

  5. 当一个task结束了,worker会把task状态更新为对应的结束状态(FINISHED,FAILED,等等),Scheduler会在数据库里更新对应的状态。


# https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386
def _process_dags(self, dagbag, dags, tis_out): """ Iterates over the dags and processes them. Processing includes: 1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
:param dagbag: a collection of DAGs to process :type dagbag: models.DagBag :param dags: the DAGs from the DagBag to process :type dags: DAG :param tis_out: A queue to add generated TaskInstance objects :type tis_out: multiprocessing.Queue[TaskInstance] :return: None """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) if dag.is_paused: self.log.info("Not processing DAG %s since it's paused", dag.dag_id) continue
if not dag: self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id) continue
self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag) if dag_run: self.log.info("Created %s", dag_run) self._process_task_instances(dag, tis_out) self.manage_slas(dag)
models.DagStat.update([d.dag_id for d in dags])



Airflow 组件配置


配置Airflow 组件之间的交互是同一个airflow.cfg 文件控制的,这个文件里本身有对各种配置的说明文档,这里我们介绍一些常用的配置项:

Parallelism

以下3个参数可以用来控制task的并行度。

 parallelismdag_concurrency and max_active_runs_per_dag 

parallelism是指airflow executor最多运行的task数。dag_concurrenty 是指单个dag最多运行的task数。

max_active_runs_per_dag 是指最多的dag运行实例。

Scheduler

job_heartbeat_sec 是指task接受外部kill signal的频率(比如你在airflow web页面kill task),默认是5秒钟

scheduler_heartbeat_sec 是指scheduler 触发新task的时间隔间,默认是5秒。

还有更多配置会在后面的文章里介绍,也希望大家多多支持Airflow公众号,后续我们更新更多有关Airflow的文章。

本文翻译 https://www.astronomer.io/guides/airflow-components


浏览 159
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报