Airflow 基础 | Airflow 基础系列-03:Operators 介绍

HBase技术社区

共 2330字,需浏览 5分钟

 ·

2021-12-27 17:06

本文翻译 https://www.astronomer.io/guides/what-is-an-operator

Operators

Operators是DAG的主要构成部分,是DAG中封装执行逻辑的基本单元。

如果在DAG中创建一个operator的实例,并提供其所需参数,就构成了一个task。一个DAG通常由很多task以及task之间的依赖关系构成。当Airflow根据execution_date执行task时,就会产生一个task instance。

BashOperator

t1 = BashOperator(
task_id='bash_hello_world',
dag=dag,
bash_command='echo "Hello World"'
)

t1

该BashOperator执行了一个bash命令echo "Hello World"


PythonOperator

def hello(**kwargs):
print('Hello from {kw}'.format(kw=kwargs['my_keyword']))

t2 = PythonOperator(
task_id='python_hello',
dag=dag,
python_callable=hello,
op_kwargs={'my_keyword': 'Airflow'}
)

PythonOperator调用代码中定义的hello函数, 并通过PythonOperatorop_kwargs参数传入函数所需参数。该task运行后将会在控制台输出"Hello from Airflow" 。


PostgresOperator

t3 = PostgresOperator(
task_id='PythonOperator',
sql='CREATE TABLE my_table (my_column varchar(10));',
postgres_conn_id='my_postgres_connection',
autocommit=False
)

该PostgresOperator向postgres数据库执行一条sql语句,数据库连接信息存在名为my_postgres_connection的airflow connection中。如果看PostgresOperator的代码,会发现它实际上通过PostgresHook实现与数据库交互。


SSHOperator

t4 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='my_ssh_connection',
command='echo "Hello from SSH Operator"'
)

与BashOperator类似,SSHOperator也是运行bash命令,只不过它是通过SSH到远端主机执行bash命令。

用于对远程服务器进行身份验证的私钥存储在Airflow Connection中,名为my_ssh_conenction,它可以在任意DAG中引用,所以在使用SSHOperator时只需要关心bash命令是什么。

SSHOperator是通过SSHHook实现建立SSH连接和运行命令。


S3ToRedshift Operator

t5 = S3ToRedshiftOperator(
task_id='S3ToRedshift',
schema='public',
table='my_table',
s3_bucket='my_s3_bucket',
s3_key='{{ ds_nodash }}/my_file.csv',
redshift_conn_id='my_redshift_connection',
aws_conn_id='my_aws_connection'
)

S3ToRedshiftOperator通过Redshift的COPY命令实现从S3加载数据到Redshift,它属于Transfer Operators,这一类Operator用于从一个系统传输数据到另外一个系统。注意S3ToRedshiftOperator的参数中有两个Airflow connection,一个是Redshift的连接信息,一个是S3的连接信息。

这里也用到了另外一个概念 - templates。在参数s3_key处,有个jinja模板符号{{ }} ,其中传入的ds_nodash是Airflow引擎传入模板的预定义变量,其值为DAG Run的逻辑执行日期,nodash 表示日期格式为没有破折号的格式。所以s3_key传入的值依赖于具体运行的时间,如下是个样例值20190711/my_file.csv

templates适合于需要传入动态变化的参数的场景,也能使DAG代码保证幂等(比如上述代码中产生的s3文件都根据各自的日期命名,从而保证了幂等性)。




浏览 172
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报