Airphin调度系统迁移工具

联合创作 · 2023-09-29 14:28

Airphin 是白鲸开源研发的调度系统迁移工具迁移工具, 目的是将 Apache Airflow DAGs 文件转换成 Apache DolphinScheduler Python SDK 定义文件,从而实现用户将调度系统(Workflow orchestration)从 Airflow 迁移到 DolphinScheduler 的目的。

它是一个基于多规则的 AST 转换器,使用 LibCST 来解析和转换 Airflow 的 DAG 代码,其全部规则使用 Yaml 文件定义,并提供了一定的自定义规则扩展能力。

例子

我们通过一个简单的例子,来说明如何使用 Airphin 的。我们截取了 airflow tutorial.py 中的部分代码作为 Airphin 转化的例子,来说明 Airphin 如何逐步完成转化成 dolphinscheduler python sdk。

图 1:airflow tutorial.py 中的部分代码

图 2:Airphin 如何逐步完成转化成 dolphinscheduler python sdk

假设将 airflow tutorial.py 部分内容保存至文件 tutorial_part.py,想要将其转化成 dolphinscheduler python sdk 定义,只需要一行命令就能完成。结果如图 2 所示,因为命令增加了 --inplace 参数,所以 Airphin 会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用 --inplace 参数,Airphin 会新增一个 tutorial_part-airphin.py 文件来保存转化后的内容。

airphin migrate --inplace tutorial_part.py

通过观察,我们发现这次转化分别触发了多条转化规则,包括

  • 将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import 语句)以及第六行 DAG context

  • 将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用

  • 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta (days=1) 转成 '0 0 0 * * ? *'

最后,我们只需要安装 pydolphinscheduler ,并且将转化后的文件通过 python 运行,就能完成工作流的迁移了,详见 pydolphinscheduler 使用 (https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)。

# 安装 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 将工作流提交到 dolphinschedulerpython tutorial_part.py

在运行 python tutorial_part.py 时,需要保证 dolphinscheduler API 和 python gateway 服务已经启动,并且开放了对应的端口,详见启动 python gateway service (https://dolphinscheduler.apache.org/python/main/start.html#start-python-gateway-service)。

至此,我们通过一个简单的例子,说明了 Airphin 是如何完成迁移的。

浏览 11
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报