Doramagic 项目包 · 项目说明书

airflow 项目

生成时间:2026-05-13 22:31:12 UTC

Airflow简介与核心概念

Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许用户使用Python代码定义、调度和执行复杂的批处理工作流。Airflow采用分布式架构,支持大规模工作流编排,是数据工程和MLOps领域广泛使用的开源工作流管理平台。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 DAG定义

继续阅读本节完整说明和来源证据。

章节 DAG属性

继续阅读本节完整说明和来源证据。

章节 任务类型

继续阅读本节完整说明和来源证据。

概述

Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许用户使用Python代码定义、调度和执行复杂的批处理工作流。Airflow采用分布式架构,支持大规模工作流编排,是数据工程和MLOps领域广泛使用的开源工作流管理平台。

资料来源:README.md

核心架构

Airflow采用主从架构,主要包含以下组件:

组件功能说明
Scheduler调度器负责调度DAG运行,将任务分发给执行器
Executor执行器实际执行任务,支持多种类型
Web ServerWeb服务器提供UI界面用于监控和管理
Worker工作进程执行具体任务的进程
Metadata Database元数据库存储DAG、任务、执行状态等元数据

资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100

graph TD
    A[用户编写的DAG定义] --> B[Scheduler]
    B --> C[Executor]
    C --> D[Worker]
    D --> E[Metadata Database]
    B --> E
    F[Web Server] --> E
    G[用户通过UI/API] --> F

DAG有向无环图

DAG(Directed Acyclic Graph)是Airflow的核心概念,代表工作流的结构定义。

DAG定义

from airflow import DAG
from datetime import datetime

with DAG(
    dag_id='my_first_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False
) as dag:
    task1 = BashOperator(task_id='task_1', bash_command='echo Hello')
    task2 = BashOperator(task_id='task_2', bash_command='echo World')
    task1 >> task2

资料来源:airflow-core/src/airflow/models/dag.py

DAG属性

属性类型描述
dag_idstrDAG的唯一标识符
start_datedatetimeDAG开始日期
schedule_intervalstr/timedelta调度间隔
catchupbool是否补跑历史数据
max_active_runsint最大同时运行数
is_pausedboolDAG是否暂停

任务与任务实例

任务类型

Airflow支持多种任务类型:

任务类型说明
BashOperator执行Bash命令
PythonOperator执行Python函数
Sensor等待特定条件满足
TaskGroup任务组容器

资料来源:airflow-core/src/airflow/models/taskinstance.py

任务实例状态

stateDiagram-v2
    [*] --> queued: 任务创建
    queued --> scheduled: Scheduler调度
    scheduled --> running: Worker接收
    running --> success: 执行成功
    running --> failed: 执行失败
    running --> upstream_failed: 上游任务失败
    success --> [*]
    failed --> [*]
    upstream_failed --> [*]

资料来源:airflow-core/src/airflow/_shared/state/__init__.py

执行器类型

Airflow支持多种执行器,用于在不同环境中执行任务:

执行器说明适用场景
LocalExecutor本地并行执行开发/测试环境
SequentialExecutor顺序执行单任务场景
CeleryExecutor分布式执行生产环境大规模任务
KubernetesExecutorK8s Pod执行云原生环境

资料来源:airflow-core/src/airflow/cli/cli_config.py

CLI命令行接口

Airflow提供丰富的CLI命令用于管理DAG和任务:

命令功能
airflow dags list列出所有DAG
airflow dags list-runs列出DAG运行记录
airflow tasks list列出DAG中的任务
airflow tasks test测试单个任务
airflow connections list列出连接
airflow variables list列出变量

资料来源:airflow-core/src/airflow/cli/cli_config.py

常用命令示例

# 列出所有DAG
airflow dags list

# 触发DAG手动运行
airflow dags trigger my_dag_id

# 暂停/恢复DAG
airflow dags pause my_dag_id
airflow dags unpause my_dag_id

# 查看任务实例状态
airflow tasks list my_dag_id

连接与变量

连接管理

Airflow提供连接(Connection)功能用于存储外部系统访问凭证:

from airflow.models import Connection
import json

conn = Connection(
    conn_id='my_postgres_conn',
    conn_type='postgres',
    host='localhost',
    schema='airflow_db',
    login='user',
    password='password',
    port=5432
)

资料来源:airflow-core/src/airflow/utils/db.py

支持的连接类型

连接类型说明
postgresPostgreSQL数据库
mysqlMySQL数据库
google_cloud_platformGCP云平台
httpHTTP连接
sshSSH连接
s3AWS S3存储

配置管理

airflow.cfg主要配置项

配置节配置项说明
coredags_folderDAG文件存放路径
coreexecutor执行器类型
corecatchup_by_default默认是否补跑
schedulernum_runs调度器循环次数
webserverweb_server_hostWeb服务器地址
databasesql_alchemy_conn数据库连接字符串

资料来源:airflow-core/src/airflow/cli/cli_config.py

工作流执行流程

graph LR
    A1[DAG定义加载] --> A2[解析DAG结构]
    A2 --> A3[Scheduler创建DAG Run]
    A3 --> A4[任务排队]
    A4 --> A5[Executor分发任务]
    A5 --> A6[Worker执行任务]
    A6 --> A7[更新任务状态]
    A7 --> A8[记录到元数据库]
    A8 --> A9[Web UI展示状态]

最佳实践

DAG编写规范

  1. 任务唯一性:每个任务的task_id在DAG内必须唯一
  2. 正确设置start_date:避免使用动态时间作为start_date
  3. 合理的retry配置:为关键任务配置重试策略
  4. 使用描述性命名:使用清晰的DAG和任务命名

调度配置建议

配置项建议值说明
catchupFalse生产环境建议关闭补跑
max_active_runs1-5根据任务复杂度调整
schedule_interval@daily/@hourly根据业务需求选择
concurrency16-32根据Worker能力设置

总结

Apache Airflow作为强大的工作流编排平台,提供了完整的DAG定义、调度执行、监控告警等功能。理解其核心概念(DAG、Task、Executor、Connection)是有效使用Airflow的基础。通过合理的配置和最佳实践,可以构建稳定可靠的数据管道和工作流系统。

资料来源:[README.md](https://github.com/apache/airflow/blob/main/README.md)

快速开始与安装指南

Apache Airflow 是一个开源的工作流编排平台,用于编程创建、调度和监控批处理工作流。本指南将帮助您快速上手安装和配置 Airflow。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 基础安装

继续阅读本节完整说明和来源证据。

章节 安装额外依赖

继续阅读本节完整说明和来源证据。

章节 供应商包安装

继续阅读本节完整说明和来源证据。

系统要求

在安装 Apache Airflow 之前,请确保您的系统满足以下基本要求:

要求项最低版本说明
Python3.9+Airflow 核心功能需要
pip最新版本Python 包管理器
操作系统Linux, macOS, Windows (WSL2)生产环境推荐使用 Linux
数据库SQLite 3.15+, PostgreSQL 14+, MySQL 8.0+元数据库存储
内存4GB RAM最小需求,生产环境更高

资料来源:INSTALLING.md:1-50

安装方式概览

Airflow 支持多种安装方式,您可以根据实际需求选择合适的方案:

graph TD
    A[安装 Airflow] --> B{使用场景}
    B -->|快速测试| C[pip 直接安装]
    B -->|容器化部署| D[Docker 镜像]
    B -->|深度开发| E[从源码编译]
    C --> F[使用约束文件安装]
    D --> G[官方镜像或自定义]
    E --> H[Breeze 开发环境]

资料来源:generated/PYPI_README.md:1-30

使用 pip 安装

基础安装

使用 pip 安装 Airflow 的标准方式是通过 pip install 命令配合约束文件:

pip install 'apache-airflow==3.2.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"

约束文件确保所有依赖项的版本兼容,避免因依赖冲突导致的安装失败。约束文件根据 Python 版本不同而有所区别。

资料来源:INSTALLING.md:50-80

安装额外依赖

Airflow 提供了丰富的额外依赖包(extras),可按需安装:

# 安装包含 PostgreSQL 和 Google Cloud 支持的版本
pip install 'apache-airflow[postgres,google]==3.2.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"

常用额外依赖包包括:

额外包名说明
postgresPostgreSQL 数据库支持
mysqlMySQL 数据库支持
googleGoogle Cloud Platform 集成
amazonAWS 集成
kubernetesKubernetes Executor 支持
sentrySentry 错误追踪集成

资料来源:generated/PYPI_README.md:40-60

供应商包安装

除了核心 Airflow 包外,您还可以单独安装各种供应商(Provider)包:

# 从 PyPI 安装供应商包
pip install apache-airflow-providers-google

# 从源码安装供应商包
pip install /path/to/apache-airflow-providers-google*.whl

安装后需要验证文件完整性:

shasum -a 512 package-name-version.tar.gz | diff - package-name-version.tar.gz.sha512

资料来源:devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst:1-50

Docker 安装

官方镜像

Airflow 提供了官方 Docker 镜像,可以快速启动完整的工作环境:

# 拉取最新稳定版镜像
docker pull apache/airflow:latest

# 使用 Celery Executor 启动
docker run -d -p 8080:8080 \
  -e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@host/db \
  apache/airflow:latest

资料来源:Dockerfile:1-30

Docker Compose 方式

生产环境推荐使用 Docker Compose 进行部署,Airflow 项目提供了标准化的 docker-compose.yaml:

version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
  
  airflow-webserver:
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
    command: webserver
    ports:
      - "8080:8080"
    depends_on:
      - postgres

构建自定义镜像

使用 Breeze 工具构建生产镜像:

# 使用 breeze 构建生产镜像
breeze prod-image build \
  --installation-method sources \
  --python 3.10 \
  --debian-version bullseye

资料来源:dev/breeze/src/airflow_breeze/commands/production_image_commands.py:1-50

从源码安装

克隆仓库

git clone https://github.com/apache/airflow.git
cd airflow

使用 Breeze 开发环境

Breeze 是 Airflow 官方提供的开发环境管理工具,提供完整的开发、测试和构建体验:

# 进入 breeze shell
./breeze shell

# 运行测试
./breeze testing python --test-type unit-tests --package-filter airflow-core

Breeze 使用 Docker 容器化方式,确保开发环境与生产环境的一致性。

资料来源:airflow-core/src/airflow/_vendor/README.md:1-20

编译分发包

构建 Airflow 分发包:

# 使用 Breeze 构建 sdist 或 wheel 包
breeze release-management build-rc \
  --airflow-constraints-mode="constraints" \
  --distribution-format both

编译后的包位于 dist/ 目录下。

初始化数据库

安装完成后,需要初始化 Airflow 的元数据库:

# 初始化数据库
airflow db init

# 创建管理员用户
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected]

启动 Airflow

Web 服务器

airflow webserver --port 8080

调度器

在另一个终端启动调度器:

airflow scheduler

独立模式

对于快速测试,可以使用独立模式:

airflow standalone

该命令会启动一个包含 Web 服务器和调度器的单进程环境。

验证安装

运行示例 DAG

Airflow 自带多个示例 DAG,可用于验证安装:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG('example_dag', start_date=datetime(2024, 1, 1)) as dag:
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

资料来源:airflow-core/src/airflow/example_dags/tutorial.py:1-40

检查状态

# 查看 Airflow 版本
airflow version

# 查看已加载的连接
airflow connections list

# 查看已安装提供程序
airflow providers list

快速安装流程图

graph TD
    A[开始安装] --> B{选择安装方式}
    B -->|pip| C[安装 Python 依赖]
    B -->|Docker| D[安装 Docker]
    B -->|源码| E[克隆代码仓库]
    C --> F[初始化数据库]
    D --> G[拉取镜像]
    E --> H[安装依赖]
    F --> I[启动服务]
    G --> I
    H --> I
    I --> J[创建管理员账户]
    J --> K[访问 Web UI]
    K --> L[运行示例 DAG]

常见问题排查

依赖冲突

如果遇到依赖冲突问题,确保使用正确的约束文件:

pip install apache-airflow==<VERSION> \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-<VERSION>/constraints-<PYTHON_VERSION>.txt"

数据库连接问题

检查数据库连接字符串配置:

[database]
sql_alchemy_conn = postgresql+psycopg2://user:password@localhost/airflow

端口占用

如果 8080 端口被占用,可更换端口:

airflow webserver --port 8888

后续步骤

安装成功后,建议进行以下操作:

  1. 配置连接:设置数据库、API 密钥等连接信息
  2. 配置 Executor:根据需求选择 Executor 类型(Local、Celery、Kubernetes)
  3. 配置告警:设置邮件或其他告警渠道
  4. 学习 DAG 编写:参考官方教程编写第一个生产 DAG
  5. 了解最佳实践:学习 Airflow 的安全和性能最佳实践

相关文档链接

  • 官方安装文档:https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
  • Docker 镜像:https://github.com/apache/airflow/blob/main/Dockerfile
  • Breeze 开发工具:https://github.com/apache/airflow/blob/main/dev/breeze/README.md

资料来源:[INSTALLING.md:1-50](https://github.com/apache/airflow/blob/main/INSTALLING.md)

系统架构详解

Apache Airflow 是一个开源的工作流编排平台,采用分布式架构设计,支持大规模任务调度与执行。Airflow 3.0 在架构上进行了重大升级,引入了更加模块化的组件设计,将 DAG 处理、任务调度和执行进行了清晰的分离。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 2.1 DAG 处理系统

继续阅读本节完整说明和来源证据。

章节 2.2 调度器系统

继续阅读本节完整说明和来源证据。

章节 2.3 任务执行系统

继续阅读本节完整说明和来源证据。

1. 概述

Apache Airflow 是一个开源的工作流编排平台,采用分布式架构设计,支持大规模任务调度与执行。Airflow 3.0 在架构上进行了重大升级,引入了更加模块化的组件设计,将 DAG 处理、任务调度和执行进行了清晰的分离。

核心架构由以下几个主要组件构成:

  • Web服务器:提供用户界面
  • 调度器(Scheduler):负责 DAG 调度和任务分发
  • DAG处理器(DagFileProcessorManager):负责解析和验证 DAG 文件
  • 执行器(Executor):负责实际任务执行
  • 元数据库:存储 DAG 定义、任务状态和执行历史

资料来源:airflow-core/docs/img/diagram_basic_airflow_architecture.py:1-50

2. 核心组件架构

2.1 DAG 处理系统

DAG 处理系统是 Airflow 的核心子系统之一,负责从文件系统读取、解析和验证 DAG 定义。

graph TD
    A[DAG 文件目录] --> B[DagFileProcessorManager]
    B --> C[文件扫描器]
    B --> D[文件处理器池]
    C -->|检测文件变更| D
    D --> E[Processor Subprocess]
    E --> F[DAG 对象]
    F --> G[Import Timeout Manager]
    G --> H[数据库更新]
    
    style B fill:#e1f5fe
    style D fill:#fff3e0
    style E fill:#e8f5e9

关键组件

组件文件位置职责
DagFileProcessorManagerdag_processing/manager.py管理 DAG 文件处理的生命周期
DagFileProcessordag_processing/processor.py执行单个 DAG 文件的解析
ImportTimeoutManagerdag_processing/manager.py监控 DAG 导入超时

资料来源:airflow-core/src/airflow/dag_processing/manager.py:1-100

#### 2.1.1 DAG 文件扫描

DagFileProcessorManager 维护一个文件扫描循环,定期检查配置的 DAG 目录中的文件变更:

# 扫描间隔配置
processor_poll_interval -> scheduler_idle_sleep_time

默认配置变更说明:

旧配置项新配置项说明
scheduler.processor_poll_intervalscheduler.scheduler_idle_sleep_time调度器空闲休眠时间
scheduler.create_cron_data_intervalsscheduler.create_cron_data_intervals默认值由 True 改为 False
scheduler.create_delta_data_intervalsscheduler.create_delta_data_intervals默认值由 True 改为 False

资料来源:airflow-core/src/airflow/cli/commands/config_command.py:30-80

2.2 调度器系统

调度器是 Airflow 的大脑,负责决定何时运行哪些任务。

graph TD
    A[SchedulerJobRunner] --> B[关键区域锁]
    B --> C{获取锁}
    C -->|成功| D[处理过期 DAG]
    C -->|失败| E[等待]
    D --> F[调度待处理任务]
    F --> G[发送任务到执行器]
    G --> H[更新任务状态]
    H --> I[释放锁]
    I --> B
    
    style A fill:#fff8e1
    style B fill:#ffecb3
    style G fill:#c8e6c9

#### 2.2.1 调度器关键流程

  1. 获取关键区域锁:确保多调度器环境下的安全性
  2. 处理过期 DAG:清理陈旧的 DAG 数据
  3. 调度任务:根据依赖关系和执行时间分发任务
  4. 孤儿任务处理:检测并处理孤立任务

调度器指标

指标名称类型说明
scheduler.critical_section_busycounter关键区域锁竞争次数
scheduler.orphaned_tasks.adoptedcounter被采纳的孤儿任务数
scheduler.orphaned_tasks.clearedcounter被清理的孤儿任务数
scheduler.tasks.killed_externallycounter外部杀死任务数

资料来源:airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:50-80

2.3 任务执行系统

任务执行通过 Task SDK 实现,采用远程进程通信模式。

graph LR
    A[任务实例] --> B[任务状态存储]
    A --> C[任务参数]
    A --> D[执行上下文]
    
    B -.->|get| E[...]
    B -.->|set| F[...]
    B -.->|delete| G[...]
    
    style A fill:#e3f2fd
    style B fill:#bbdefb

#### 2.3.1 任务状态管理

任务状态通过 TaskState 类进行管理,提供键值对存储:

class TaskState:
    def get(self, key: str) -> str | None
    def set(self, key: str, value: str) -> None
    def delete(self, key: str) -> None
    def clear(self, all_map_indices: bool = False) -> None
方法功能说明
get(key)获取值返回存储的值,键不存在返回 None
set(key, value)设置值写入或覆盖指定键的值
delete(key)删除值删除单个键,键不存在无操作
clear()清空删除所有 map indices 的状态

资料来源:task-sdk/src/airflow/sdk/execution_time/context.py:80-120

3. CLI 命令系统

Airflow 提供丰富的命令行接口,用于管理和操作各个组件。

3.1 命令分类

命令组功能
airflow dagsDAG 管理(backfill, list-runs, pause, unpause, test)
airflow tasks任务操作(run, failed-deps, render, test)
airflow connections连接管理
airflow providers提供者信息
airflow config配置查看
airflow db-manager数据库管理器

资料来源:airflow-core/src/airflow/cli/cli_config.py:50-150

3.2 任务命令详解

命令功能关键参数
airflow tasks run运行单个任务--dag-id, --task-id, --logical-date-or-run-id
airflow tasks render渲染任务模板--dag-id, --task-id, --logical-date-or-run-id
airflow tasks test测试任务(不记录状态)--dag-id, --task-id, --logical-date-or-run-id
airflow tasks failed-deps检查未满足依赖--dag-id, --task-id, --logical-date-or-run-id

测试命令特点

  • 不检查依赖关系
  • 不记录状态到数据库
  • 适用于快速验证任务逻辑

资料来源:airflow-core/src/airflow/cli/cli_config.py:150-200

4. 配置系统

4.1 关键配置变更(Airflow 2.x → 3.0)

配置项变更类型旧值新值
scheduler.catchup_by_default默认值变更TrueFalse
scheduler.create_cron_data_intervals默认值变更TrueFalse
scheduler.create_delta_data_intervals默认值变更TrueFalse
scheduler.processor_poll_interval重命名-scheduler_idle_sleep_time
scheduler.deactivate_stale_dags_interval重命名-parsing_cleanup_interval
scheduler.statsd_on重命名-metrics.statsd_on
scheduler.max_threads重命名-dag_processor.parsing_processes

资料来源:airflow-core/src/airflow/cli/commands/config_command.py:20-70

4.2 DAG Processing 配置

dag_processing:
  manager_stalls: Number of stalled DagFileProcessorManager
  processor_timeouts: DAG 处理超时次数
  dag_file_refresh_error: DAG 文件加载失败次数

5. 执行架构图

graph TD
    subgraph Web层
        UI[Web UI] 
    end
    
    subgraph 调度层
        Scheduler[Scheduler Job Runner]
        DagProcessor[DagFileProcessorManager]
        ImportTimeout[Import Timeout Manager]
    end
    
    subgraph 执行层
        Executor[Executor]
        Worker[Worker Process]
        TaskRunner[Task Runner]
    end
    
    subgraph 存储层
        DB[(Metadata DB)]
        DAGFS[DAG File System]
    end
    
    UI <-->|API| Scheduler
    Scheduler <-->|调度决策| Executor
    DagProcessor <-->|解析DAG| DAGFS
    DagProcessor <-->|写入| DB
    Executor <-->|分发任务| Worker
    Worker <-->|执行| TaskRunner
    TaskRunner <-->|状态更新| DB
    
    style Scheduler fill:#ff9800,color:#fff
    style DagProcessor fill:#4caf50,color:#fff
    style Executor fill:#2196f3,color:#fff

6. 监控指标

Airflow 提供全面的监控指标体系:

6.1 DAG 处理指标

指标描述
dag_processing.manager_stallsDagFileProcessorManager 停滞次数
dag_file_refresh_errorDAG 文件刷新错误数
dag_file_processor_timeoutsDAG 文件处理器超时(已废弃)

6.2 调度器指标

指标描述
scheduler.critical_section_busy调度器关键区域忙次数
scheduler.orphaned_tasks.adopted被采纳的孤儿任务
scheduler.orphaned_tasks.cleared被清理的孤儿任务
scheduler.tasks.killed_externally外部杀死任务数

6.3 任务执行指标

指标描述
ti.start任务启动次数
ti.finish任务完成次数

资料来源:airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:30-100

7. 总结

Apache Airflow 采用分布式架构设计,核心组件包括:

  1. DagFileProcessorManager:负责 DAG 文件的解析和验证
  2. SchedulerJobRunner:负责任务的调度和分发
  3. Executor:负责实际任务执行
  4. Task SDK:提供任务运行时的上下文管理

各组件通过消息队列和数据库进行通信,确保系统的高可用性和可扩展性。

资料来源:[airflow-core/docs/img/diagram_basic_airflow_architecture.py:1-50]()

核心组件详解

Apache Airflow 是一个开源的工作流编排平台,其核心组件构成了整个任务调度和执行的基础架构。这些组件包括DAG(有向无环图)、DAGRun(DAG运行实例)、Connection(连接)、Variable(变量)和Pool(资源池)。理解这些核心组件对于深入掌握Airflow的工作原理至关重要。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 什么是 DAG

继续阅读本节完整说明和来源证据。

章节 DAG 关键属性

继续阅读本节完整说明和来源证据。

章节 DAG 的生命周期

继续阅读本节完整说明和来源证据。

概述

Apache Airflow 是一个开源的工作流编排平台,其核心组件构成了整个任务调度和执行的基础架构。这些组件包括DAG(有向无环图)、DAGRun(DAG运行实例)、Connection(连接)、Variable(变量)和Pool(资源池)。理解这些核心组件对于深入掌握Airflow的工作原理至关重要。

DAG 模型

什么是 DAG

DAG(Directed Acyclic Graph,有向无环图)是Airflow中工作流定义的核心抽象。它代表了一组需要执行的任务以及任务之间的依赖关系。

DAG 关键属性

属性类型说明
dag_idstrDAG的唯一标识符
descriptionstrDAG的描述信息
schedule_interval-调度间隔配置
start_datedatetimeDAG开始执行的日期
end_datedatetimeDAG结束执行的日期(可选)
catchupbool是否补跑历史数据
max_active_runsint最大并发运行数
concurrencyint最大并发任务数
is_pausedboolDAG是否暂停

DAG 的生命周期

graph LR
    A[创建DAG] --> B[DAG解析]
    B --> C[调度器调度]
    C --> D[创建DAGRun]
    D --> E[执行任务]
    E --> F[DAGRun完成]
    F --> G{是否继续调度}
    G -->|是| C
    G -->|否| H[DAG结束]

DAGRun 模型

概述

DAGRun是DAG的一次具体执行实例。每次触发或调度DAG时,都会创建一个DAGRun实例来跟踪该次执行的状态。

DAGRun 状态

状态说明
QUEUED排队等待执行
RUNNING正在执行
SUCCESS成功完成
FAILED执行失败

DAGRun 类型

类型说明
SCHEDULED由调度器正常调度触发
MANUAL手动触发的运行
BACKFILL回填作业执行

核心属性

DAGRun包含以下关键属性用于追踪执行信息:

  • run_id: 每次运行的唯一标识
  • state: 当前运行状态
  • execution_date: 执行日期
  • start_date: 实际开始时间
  • end_date: 结束时间
  • triggering_user_name: 触发运行的用户名

Connection 连接管理

连接类型

Airflow支持多种连接类型,用于与外部系统进行交互:

连接类型说明默认配置
postgresPostgreSQL数据库需要host、schema、login、password
mysqlMySQL数据库需要host、schema、login、password
google_cloud_platformGCP服务使用默认schema
httpHTTP端点需要host地址
ftpFTP服务器需要host、login、password
sshSSH连接需要host、login、key_file
redisRedis缓存需要host、port
fs文件系统需要path配置
hive_cliHive CLI需要host、port、schema
hiveserver2HiveServer2需要host、schema、port
gremlinGremlin图数据库需要host、port
facebook_socialFacebook社交需要app_id、app_secret等
icebergIceberg表格式需要host配置

连接创建示例

Connection(
    conn_id="gcp_default",
    conn_type="google_cloud_platform",
    schema="default"
)

Variable 变量管理

变量概述

Variable用于存储和检索Airflow中的键值对配置信息,可在DAG和任务之间共享。

变量操作

操作说明命令
list列出所有变量airflow variables list
get获取变量值airflow variables get <key>
set设置变量airflow variables set <key> <value>
delete删除变量airflow variables delete <key>
import批量导入airflow variables import <file>
export批量导出airflow variables export <file>

JSON序列化支持

Variable支持JSON序列化,可存储复杂数据结构:

airflow variables set my_json '{"key": "value", "list": [1, 2, 3]}'

Pool 资源池管理

池的作用

Pool用于限制同时执行的任务数量,控制资源使用并管理并发度。

内置默认池

池名称默认槽位数说明
default_pool根据配置用于未指定池的任务

槽位管理

graph TB
    A[任务请求] --> B{槽位可用?}
    B -->|是| C[分配槽位]
    B -->|否| D[排队等待]
    C --> E[执行任务]
    E --> F[释放槽位]
    D --> G{槽位释放?}
    G -->|是| C
    G -->|否| D

CLI 命令行接口

可用命令组

Airflow提供了丰富的CLI命令用于管理核心组件:

命令组说明主要子命令
dagDAG管理list, details, list-runs, pause, unpause, backfill, test
dagrun运行管理list, trigger, clear
connections连接管理list, add, delete, edit
variables变量管理list, get, set, delete, import, export
pools池管理list, set
config配置查看list, get, show
providers提供者信息list, details
assets资产管理list, details, materialize

常用命令示例

# 列出所有DAG
airflow dags list

# 触发DAG运行
airflow dags trigger <dag_id>

# 暂停DAG
airflow dags pause <dag_id>

# 列出所有连接
airflow connections list

# 导出变量
airflow variables export variables.json

配置变更说明

Airflow 3.0 配置变化

从Airflow 2.x迁移到3.0时,以下配置项发生了变化:

旧配置项新配置项变化类型
scheduler.catchup_by_defaultscheduler.catchup默认值改为False
scheduler.create_cron_data_intervals-默认值改为False
scheduler.create_delta_data_intervals-默认值改为False
scheduler.processor_poll_intervalscheduler.scheduler_idle_sleep_time重命名
scheduler.deactivate_stale_dags_intervalscheduler.parsing_cleanup_interval重命名
scheduler.statsd_onmetrics.statsd_on重命名
scheduler.max_threadsdag_processor.parsing_processes重命名

catchup 行为变化

在 Airflow 3.0 中,catchup 的默认值为 False。这意味着未明确设置 catchup 参数的 DAG 默认不会进行历史数据补跑。如果DAG依赖补跑行为,需要在 airflow.cfgscheduler 部分将该配置设置为 True。资料来源:airflow-core/src/airflow/cli/commands/config_command.py

数据模型关系

erDiagram
    DAG ||--o{ DAGRun : creates
    DAGRun ||--o{ TaskInstance : contains
    TaskInstance ||--|| Pool : uses
    Connection ||--o{ TaskInstance : referenced_by
    Variable ||--o{ TaskInstance : accessed_by
    
    DAG {
        string dag_id
        string description
        string schedule_interval
        datetime start_date
        boolean is_paused
    }
    
    DAGRun {
        string run_id
        string state
        datetime execution_date
        datetime start_date
        datetime end_date
    }
    
    TaskInstance {
        string task_id
        string state
        datetime start_date
        datetime end_date
    }
    
    Pool {
        string pool
        int slots
        string description
    }
    
    Connection {
        string conn_id
        string conn_type
        string host
        string schema
    }
    
    Variable {
        string key
        string val
        string description
    }

安全最佳实践

连接凭证管理

  1. 使用Fernet加密: Airflow使用Fernet加密存储敏感连接凭证
  2. 轮换加密密钥: 定期执行 airflow rotate-fernet-key 更新加密密钥
airflow rotate-fernet-key

敏感信息处理

类型存储方式建议
密码加密存储使用变量或连接extra字段
API密钥加密存储使用Secret Backend
连接凭证加密存储使用连接管理界面

总结

Apache Airflow的核心组件构成了一个完整的工作流编排系统。DAG作为工作流定义的核心,依赖调度器生成DAGRun实例,而DAGRun则管理具体任务的执行。Connection、Variable和Pool等组件提供了与外部系统交互、配置管理和资源控制的能力。深入理解这些组件及其相互关系,对于熟练使用Airflow进行工作流开发和运维至关重要。

来源:https://github.com/apache/airflow / 项目说明书

执行器类型与选择

执行器(Executor)是 Apache Airflow 架构中的核心组件,负责实际执行任务实例。每个执行器实现了特定的任务调度和执行策略,从单机的本地执行到分布式的集群环境,Airflow 提供了多种执行器类型以满足不同场景的需求。执行器的选择直接影响工作流的性能、可扩展性和可靠性。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 核心组件关系

继续阅读本节完整说明和来源证据。

章节 执行器类型总览

继续阅读本节完整说明和来源证据。

章节 执行器加载器职责

继续阅读本节完整说明和来源证据。

概述

执行器(Executor)是 Apache Airflow 架构中的核心组件,负责实际执行任务实例。每个执行器实现了特定的任务调度和执行策略,从单机的本地执行到分布式的集群环境,Airflow 提供了多种执行器类型以满足不同场景的需求。执行器的选择直接影响工作流的性能、可扩展性和可靠性。

执行器系统遵循统一的接口规范,所有执行器都继承自基础执行器类,定义了任务提交、状态查询、心跳维护等标准方法。资料来源:airflow-core/src/airflow/executors/executor_loader.py:1-50

执行器架构

核心组件关系

graph TD
    DAG[DAG 有向无环图] --> Scheduler[调度器]
    Scheduler --> ExecutorLoader[执行器加载器]
    ExecutorLoader --> Executor[执行器实例]
    Executor --> TaskInstance[任务实例]
    Executor --> Worker[Worker 节点]
    
    Executor --> LocalExecutor[LocalExecutor]
    Executor --> CeleryExecutor[CeleryExecutor]
    Executor --> KubernetesExecutor[KubernetesExecutor]
    Executor --> SequentialExecutor[SequentialExecutor]
    
    LocalExecutor --> LocalWorker[本地进程]
    CeleryExecutor --> CeleryWorker[Celery Worker]
    KubernetesExecutor --> K8sPod[K8s Pod]

执行器类型总览

执行器类型说明适用场景并发能力
SequentialExecutor顺序执行器开发调试、单节点1
LocalExecutor本地执行器单机生产、小规模任务多进程
CeleryExecutorCelery 执行器分布式、跨机器集群规模
KubernetesExecutorK8s 执行器云原生、自动扩缩容Pod 数量
LocalKubernetesExecutor本地 K8s 执行器测试环境可配置
CeleryKubernetesExecutor混合执行器灵活调度集群规模

资料来源:airflow-core/src/airflow/executors/executor_loader.py:50-80

执行器加载机制

执行器加载器职责

ExecutorLoader 是 Airflow 中负责动态加载和解析执行器配置的中央组件。它支持多种执行器配置格式,包括简单的单执行器名称和复杂的团队别名配置。

执行器加载器的主要职责包括:

  1. 解析 airflow.cfg 中的执行器配置
  2. 验证执行器名称的有效性
  3. 实例化对应的执行器类
  4. 处理执行器别名和团队配置
# 执行器名称解析核心逻辑
if module_or_name in CORE_EXECUTOR_NAMES:
    executor_names_per_team.append(
        ExecutorName(
            alias=alias, 
            module_path=cls.executors[module_or_name], 
            team_name=team_name
        )
    )

资料来源:airflow-core/src/airflow/executors/executor_loader.py:80-120

执行器配置格式

执行器配置支持以下几种格式:

graph LR
    A[配置文件] --> B[执行器名称]
    B --> C[单执行器]
    B --> D[别名配置]
    B --> E[团队配置]
    
    C --> C1[LocalExecutor]
    C --> C2[CeleryExecutor]
    
    D --> D1[MyAlias:LocalExecutor]
    D1 --> D2[alias=MyAlias<br/>module=LocalExecutor]
    
    E --> E1[team:executor]
    E1 --> E2[team_name=team<br/>executor=executor]

#### 简单配置

executor = LocalExecutor

#### 别名配置

executor = MyLocalExecutor:LocalExecutor

格式为 别名:执行器类型,其中别名用于日志和监控标识。

#### 团队配置

[executors]
TeamA = TeamA: CeleryExecutor
TeamB = TeamB: KubernetesExecutor

资料来源:airflow-core/src/airflow/executors/executor_loader.py:100-150

核心执行器类型详解

SequentialExecutor

顺序执行器是最基础的执行器,以单线程顺序方式执行任务。此执行器主要用于以下场景:

  • 开发环境:在没有多进程支持的环境中运行
  • 调试场景:需要逐个追踪任务执行流程
  • 最小化部署:资源受限的单机环境
# SequentialExecutor 核心特性
- 单进程顺序执行
- 无并发能力
- 任务队列单一
- 无需外部依赖

资料来源:airflow-core/src/airflow/executors/executor_loader.py:150-180

LocalExecutor

本地执行器在单机环境下提供多进程并发执行能力。它使用 Python 的 multiprocessing 模块创建工作进程池,每个工作进程可以并行执行多个任务。

graph TD
    LocalExecutor --> Worker1[Worker Process 1]
    LocalExecutor --> Worker2[Worker Process 2]
    LocalExecutor --> WorkerN[Worker Process N]
    
    Worker1 --> Task1[Task Instance]
    Worker1 --> Task2[Task Instance]
    Worker2 --> Task3[Task Instance]
    WorkerN --> Task4[Task Instance]
    
    Worker1 -.-> Queue[任务队列]
    Worker2 -.-> Queue
    WorkerN -.-> Queue

配置参数

参数说明默认值
parallelism并行任务数上限CPU 核心数
local_worker_kwargs工作进程额外参数{}

资料来源:airflow-core/src/airflow/executors/executor_loader.py:180-220

CeleryExecutor

CeleryExecutor 是分布式执行器,使用 Celery 作为消息队列来处理跨多台机器的任务分发和执行。Celery 是 Python 生态中成熟的任务队列系统,支持 Redis、RabbitMQ 等多种消息代理。

graph TD
    Scheduler[Airflow Scheduler] -->|任务提交| CeleryBroker[Celery Broker<br/>Redis/RabbitMQ]
    CeleryBroker --> Worker1[Celery Worker 1]
    CeleryBroker --> Worker2[Celery Worker 2]
    CeleryBroker --> WorkerN[Celery Worker N]
    
    Worker1 -->|执行结果| ResultBackend[Result Backend]
    Worker2 -->|执行结果| ResultBackend
    WorkerN -->|执行结果| ResultBackend
    
    ResultBackend -->|状态同步| Scheduler

适用场景

  • 需要水平扩展的任务处理
  • 跨多台机器的分布式部署
  • 需要任务优先级和重试策略
  • 长时间运行的后台任务

依赖组件

  • Celery 消息代理(Redis 或 RabbitMQ)
  • Celery Worker 节点
  • 结果后端(数据库或缓存)

资料来源:providers/celery/src/airflow/providers/celery/executors/celery_executor.py:1-100

KubernetesExecutor

KubernetesExecutor 是原生运行在 Kubernetes 集群上的执行器,每个任务都在独立的 Pod 中执行。这种设计提供了极佳的隔离性和资源弹性。

graph TD
    Scheduler[Scheduler] --> K8sAPI[Kubernetes API]
    K8sAPI --> Pod1[Task Pod 1]
    K8sAPI --> Pod2[Task Pod 2]
    K8sAPI --> PodN[Task Pod N]
    
    Pod1 -->|完成| K8sAPI
    Pod2 -->|完成| K8sAPI
    PodN -->|完成| K8sAPI
    
    K8sAPI --> ConfigMap[ConfigMap<br/>任务定义]
    K8sAPI --> Secret[Secret<br/>连接凭证]

核心优势

特性说明
任务隔离每个任务独立 Pod,避免资源冲突
自动扩缩容根据任务队列动态创建/销毁 Pod
资源控制支持 CPU、内存 Limits 和 Requests 配置
亲和性支持节点亲和性、污点容忍等调度策略
生命周期任务完成即销毁,资源高效利用

配置选项

参数说明
namespaceKubernetes 命名空间
parallelism最大并发 Pod 数
worker_pods_creation_batch_size批量创建大小
kube_client_request_argsK8s 客户端参数

资料来源:providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:1-150

执行器配置管理

配置文件位置

执行器配置通过 airflow.cfg 文件管理,主要配置项位于 [core] 部分:

[core]
executor = LocalExecutor

Breeze 开发环境配置

在 Airflow Breeze 开发环境中,可以使用统一的命令行选项配置执行器:

# 查看执行器列表
breeze shell --executor

# 使用指定执行器启动
breeze shell --executor KubernetesExecutor

支持的执行器选项

选项说明可用值
--executor执行器类型LocalExecutor, CeleryExecutor, KubernetesExecutor 等

资料来源:dev/breeze/src/airflow_breeze/commands/common_options.py:100-150

执行器别名与团队配置

Airflow 支持通过 ExecutorLoader 实现执行器别名和团队级别的配置:

class ExecutorName(NamedTuple):
    alias: str | None
    module_path: str
    team_name: str | None

配置示例:

[executors]
# 格式: 别名 = 模块路径
my_alias = airflow.executors.local_executor.LocalExecutor
team_default = airflow.providers.celery.executors.celery_executor.CeleryExecutor

配置验证逻辑

# 检查是否为内置执行器名称
if module_or_name in CORE_EXECUTOR_NAMES:
    executor_names_per_team.append(
        ExecutorName(
            alias=alias, 
            module_path=cls.executors[module_or_name], 
            team_name=team_name
        )
    )
# 检查是否为模块路径
elif "." not in module_or_name:
    raise AirflowConfigException(
        "Incorrectly formatted executor configuration..."
    )

资料来源:airflow-core/src/airflow/executors/executor_loader.py:200-260

执行器选择指南

决策流程

graph TD
    Start[开始选择执行器] --> Q1{部署规模?}
    
    Q1 -->|单机/开发| Q2{需要并发?}
    Q1 -->|分布式| Q3{K8s 环境?}
    Q1 -->|大规模| Q4{需要灵活调度?}
    
    Q2 -->|是| LocalExecutor
    Q2 -->|否| SequentialExecutor
    
    Q3 -->|是| KubernetesExecutor
    Q3 -->|否| CeleryExecutor
    
    Q4 -->|是| CeleryKubernetesExecutor
    Q4 -->|否| CeleryExecutor

场景推荐

场景推荐执行器原因
本地开发调试SequentialExecutor简单、无依赖
单机生产环境LocalExecutor多进程并发、资源可控
小型集群CeleryExecutor配置简单、成熟稳定
云原生/K8sKubernetesExecutor弹性扩缩容、任务隔离
混合云环境CeleryKubernetesExecutor灵活调度、兼容性好

配置示例

#### 本地执行器配置

[core]
executor = LocalExecutor

[local_executor]
parallelism = 8

#### Celery 执行器配置

[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = redis://redis:6379/1
worker_concurrency = 16

#### Kubernetes 执行器配置

[core]
executor = KubernetesExecutor

[kubernetes]
namespace = airflow
parallelism = 32

资料来源:airflow-core/src/airflow/executors/executor_loader.py:260-320

命令行接口

查看执行器信息

通过 Airflow CLI 可以查看已配置的执行器信息:

# 查看 Airflow 信息(包含执行器)
airflow info

# 查看提供商执行器列表
airflow providers executors

可用的 providers 子命令

命令说明
providers list列出所有提供商
providers executors列出可用的执行器
providers details提供商详细信息

资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100

CLI 配置定义

执行器相关的 CLI 命令在 cli_config.py 中定义:

# 提供商执行器列表命令
ActionCommand(
    name="executors",
    help="Get information about executors provided",
    func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
    args=(ARG_OUTPUT, ARG_VERBOSE),
)

命令行参数

参数短标识说明
--output-o输出格式(table、json、yaml)
--verbose-v详细输出模式

资料来源:airflow-core/src/airflow/cli/cli_config.py:100-200

默认连接配置

执行器运行需要配置相应的连接凭证。Airflow 在初始化数据库时提供默认连接配置:

开发环境默认连接

Connection(
    conn_id="fs_default",
    conn_type="fs",
    extra='{"path": "/"}',
)

Connection(
    conn_id="google_cloud_default",
    conn_type="google_cloud_platform",
    schema="default",
)

Connection(
    conn_id="http_default",
    conn_type="http",
    host="https://www.httpbin.org/",
)

生产环境注意事项

在生产环境中部署时,需要:

  1. 配置真实的数据库连接(避免 SQLite)
  2. 根据执行器类型配置相应的消息代理连接
  3. 设置正确的认证凭证和 SSL 选项

资料来源:airflow-core/src/airflow/utils/db.py:1-100

最佳实践

执行器选择原则

  1. 从简单开始:开发环境使用 LocalExecutor,生产环境根据需求升级
  2. 资源评估:估算并发任务数、任务运行时长、资源消耗
  3. 运维能力:评估团队对消息队列或容器编排的熟悉程度
  4. 成本考虑:云环境优先考虑 KubernetesExecutor 以获得弹性
  5. 扩展性预留:选择具有一定扩展空间的执行器类型

性能优化建议

优化项LocalExecutorCeleryExecutorKubernetesExecutor
并发数parallelism 参数worker_concurrencyparallelism 参数
心跳间隔降低网络延迟合理设置超时配置 liveness 探针
资源限制进程级限制Worker 级限制Pod 级限制
监控进程监控Celery EventsK8s Events

常见问题排查

graph TD
    Issue[问题排查] --> Check1{执行器启动失败?}
    Issue --> Check2{任务堆积?}
    Issue --> Check3{连接超时?}
    
    Check1 --> Solution1[检查配置<br/>验证依赖]
    Check2 --> Solution2[增加并发数<br/>优化任务拆分]
    Check3 --> Solution3[检查网络<br/>验证凭证]

资料来源:airflow-core/src/airflow/executors/executor_loader.py:320-400

总结

Apache Airflow 的执行器系统提供了灵活的任务执行框架,从单机的顺序执行到分布式的 Kubernetes 集群执行,能够满足不同规模和场景的需求。执行器的选择应综合考虑部署环境、团队能力、运维成本和扩展性要求。

对于新项目,建议先使用 LocalExecutor 进行开发和测试,待系统稳定后再根据实际负载和扩展需求迁移到 CeleryExecutor 或 KubernetesExecutor。在选择执行器时,务必参考官方文档中的具体版本要求,确保所有依赖组件正确配置。

资料来源:airflow-core/src/airflow/executors/executor_loader.py:400-450

资料来源:[airflow-core/src/airflow/executors/executor_loader.py:50-80]()

数据流转与交换机制

Apache Airflow 中的数据流转与交换机制是工作流编排的核心能力,允许任务之间传递数据、状态和事件。该机制主要由三大支柱组成:XCom(跨任务通信)、Asset(资产) 和 Callback(回调)。这些机制共同构成了 Airflow 工作流中数据流动的完整生态,使得分布式任务执行环境下的数据交换变得可靠且可追踪。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 XCom 核心概念

继续阅读本节完整说明和来源证据。

章节 XCom 数据模型

继续阅读本节完整说明和来源证据。

章节 XCom API 与操作方法

继续阅读本节完整说明和来源证据。

概述

Apache Airflow 中的数据流转与交换机制是工作流编排的核心能力,允许任务之间传递数据、状态和事件。该机制主要由三大支柱组成:XCom(跨任务通信)Asset(资产)Callback(回调)。这些机制共同构成了 Airflow 工作流中数据流动的完整生态,使得分布式任务执行环境下的数据交换变得可靠且可追踪。

XCom 机制允许任务在执行完成后向元数据库写入值,后续任务可以检索这些值进行后续处理。Asset 机制提供了一种基于数据源状态变化触发 DAG 执行的优雅方式。Callback 机制则支持任务生命周期中的关键节点执行自定义逻辑,如任务失败时发送告警。

XCom 机制详解

XCom 核心概念

XCom(Cross-Communication)是 Airflow 中实现任务间数据交换的主要机制。它通过 airflow.models.xcom.BaseXCom 类实现,数据默认存储在 Airflow 元数据库中,支持任务执行器在分布式环境下共享数据。

graph TD
    A[任务 A 执行] -->|push| B[XCom 写入]
    B --> C[(元数据库)]
    C -->|pull| D[任务 B 读取]
    D --> E[任务 B 继续执行]
    
    F[参数传递] -.->|包含| B
    G[返回值自动推送] -.->|包含| B

XCom 支持推送和拉取两种操作模式。任务可以通过 xcom_push() 方法显式推送数据,也可以通过返回 Python 对象让 Airflow 自动推送。拉取操作使用 xcom_pull() 方法,可以按任务 ID 或 DAG run ID 筛选特定数据。

XCom 数据模型

XCom 的数据模型定义在 BaseXCom 类中,包含以下核心字段:

字段名类型说明
keyStringXCom 值的键名,用于标识数据
valueJSON序列化的数据值,支持基础类型和嵌套结构
timestampDateTimeXCom 创建时间戳
task_idString推送该 XCom 的任务 ID
dag_idString所属 DAG 的唯一标识符
run_idStringDAG Run 的执行 ID
map_indexIntegerMap 任务中的索引值(-1 表示非 Map 任务)

资料来源:airflow-core/src/airflow/models/xcom.py:1-100

XCom API 与操作方法

#### 推送数据

# 方式一:显式推送
task_instance.xcom_push(key="result", value={"status": "success", "data": [1, 2, 3]})

# 方式二:通过 return 语句自动推送
def extract_data(**context):
    return {"records": 100, "source": "api"}

#### 拉取数据

# 按任务 ID 拉取最新值
result = task_instance.xcom_pull(task_ids=["transform_data"])[0]

# 按 task_ids 和 key 精确拉取
value = task_instance.xcom_pull(task_ids=["extract"], key="record_count")

#### 使用 XComArg 简化操作

XComArg 提供了一种声明式的数据获取方式,特别适合与 Jinja 模板结合使用:

from airflow.models.baseoperator import xcom_arg

extract_task >> transform_task

XCom 的序列化与反序列化

XCom 值在存储前会被序列化为 JSON 格式,检索时自动反序列化。支持的类型包括:

Python 类型序列化格式限制说明
strJSON String无限制
int, floatJSON Number无限制
boolJSON Boolean无限制
list, tupleJSON Array元素类型需可序列化
dictJSON Object键值需可序列化
datetimeISO 8601 字符串需正确处理时区
bytesBase64 编码大小受 max_xcom_size 限制

资料来源:airflow-core/src/airflow/models/xcom.py:150-200

XCom 配置与性能优化

XCom 的行为可通过以下配置参数调整:

参数默认值说明
max_xcom_size48KB单个 XCom 值的最大大小
xcom_backendBaseXCom自定义 XCom 后端类
enable_xcom_picklingTrue是否允许 pickle 序列化(非 JSON)

在高并发场景下,建议:

  1. 避免传输大型数据集,将大数据存储在外部存储(如 S3、HDFS),XCom 只传递引用
  2. 使用自定义 XCom 后端将数据存储到 GCS、Redis 等外部系统
  3. 及时清理过期 XCom 数据以减少元数据库负担

Asset 资产机制

Asset 概述

Asset 是 Airflow 2.10+ 引入的新一代数据依赖管理机制,它代表 DAG 外部的数据源或数据目的地。通过 Asset,Airflow 可以感知数据源的变化并自动触发 DAG 执行,实现数据驱动的工作流编排。

graph LR
    A[数据源变化] -->|触发| B[Asset 事件]
    B --> C{DAG 调度}
    C -->|是| D[创建 DAG Run]
    C -->|否| E[等待下次检查]
    
    F[Asset 定义] -->|关联| G[DAG Schedule]

Asset 数据模型

Asset 模型定义在 airflow.models.asset 模块中:

class Asset:
    name: str
    uri: str
    group: str | None
    extra: dict | None
    created_at: datetime
    updated_at: datetime

资料来源:airflow-core/src/airflow/models/asset.py:1-50

Asset 与 DAG Schedule 绑定

DAG 可以通过 schedule 参数与一个或多个 Asset 关联:

from airflow.models.asset import Asset

# 单个 Asset
daily_sales = Asset(uri="s3://data-bucket/daily-sales/", name="daily_sales")

with DAG(
    dag_id="process_sales",
    schedule=[daily_sales],  # 当 Asset 有新事件时触发
    ...
):
    process_sales_task()

Asset Event 事件机制

每当外部数据源发生变化时,可以向 Airflow 报告 Asset Event:

from airflow.sdk import AssetEvent

AssetEvent(
    asset=daily_sales,
    extra={"partition": "2024-01-15"},
    source_task_instance_id="report_ingestion",
)

Asset 命令行操作

Airflow CLI 提供了完整的 Asset 管理命令:

命令功能
airflow assets list列出所有注册的 Asset
airflow assets details查看 Asset 详情
airflow assets materialize手动触发 Asset 物化
airflow assets events查看 Asset 事件历史

资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100

Callback 回调机制

Callback 机制概述

Callback 允许在 DAG 和任务生命周期中的关键节点执行自定义逻辑。这些节点包括任务失败、成功、重试以及 DAG 运行时错误等场景。Callback 通过 airflow.callbacks.callback_requests 模块中定义的数据结构传递给执行器。

graph TD
    A[任务状态变更] -->|生成| B[CallbackRequest]
    B --> C[(Callback 队列)]
    C -->|处理| D[Callback Executor]
    D --> E[执行用户逻辑]
    
    F[on_failure_callback] -.-> E
    G[on_success_callback] -.-> E
    H[on_retry_callback] -.-> E

Callback Request 数据结构

Callback 请求通过以下类定义:

类名用途触发时机
TaskCallbackRequest任务级回调任务成功/失败/重试
DagCallbackRequestDAG 级回调DAG 运行失败/成功
SlaCallbackRequestSLA 超时回调任务超出 SLA 阈值
DagRunCallbackRequestDAG Run 回调DAG Run 状态变更

资料来源:airflow-core/src/airflow/callbacks/callback_requests.py:1-80

TaskCallbackRequest 详解

class TaskCallbackRequest(CallbackRequest):
    simple_task_instance: SimpleTaskInstance
    msg: str | None
    processor_subprocess: bool
字段类型说明
simple_task_instanceSimpleTaskInstance任务实例的简化表示
msgstr回调消息(如错误信息)
processor_subprocessbool是否在子进程中处理

Callback 注册方式

#### 方式一:任务级别

def task_failure_alert(context):
    dag_id = context['dag_run'].dag_id
    task_id = context['task'].task_id
    send_alert(dag_id, task_id)

with DAG(...) as dag:
    task = PythonOperator(
        task_id="example_task",
        python_callable=my_func,
        on_failure_callback=task_failure_alert,
    )

#### 方式二:DAG 级别

def dag_failure_alert(context):
    dag_id = context['dag_run'].dag_id
    send_dag_failure_notification(dag_id)

with DAG(
    dag_id="my_dag",
    on_failure_callback=dag_failure_alert,
):
    # tasks...

#### 方式三:使用 Airflow API 动态注册

from airflow.models import TaskCallbackRequest
from airflow.callbacks.callback_requests import _prepare_callback

# 在执行器中动态注册
callback_request = _prepare_callback(
    ti=task_instance,
    callback_type="failure",
    msg="Task failed with exception",
)

数据流转最佳实践

避免数据膨胀

XCom 数据存储在元数据库中,过大的数据量会影响数据库性能。建议遵循以下原则:

  1. 小数据原则:XCom 值应保持在 10KB 以下
  2. 引用传递:大型数据使用 URI/路径引用
  3. 定期清理:配置 XCom 过期策略

数据验证与类型安全

在任务间传递数据时,应实施严格的验证策略:

from pydantic import BaseModel, validator

class DataRecord(BaseModel):
    id: int
    value: float
    tags: list[str]
    
    @validator('value')
    def value_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('value must be positive')
        return v

def process_data(**context):
    raw_data = context['ti'].xcom_pull(task_ids='extract', key='data')
    validated = DataRecord(**raw_data)
    return validated.dict()

幂等性设计

任务应设计为幂等的,即相同输入多次执行产生相同结果。这在使用 XCom 进行数据传递时尤为重要,因为任务可能因重试而多次执行。

Asset 与 XCom 的选择

场景推荐机制原因
触发 DAG 执行Asset自动调度,感知数据源变化
任务间传递结果XCom轻量,适合小数据量
共享大型数据集外部存储 + XCom 引用避免元数据库压力
事件驱动工作流Asset + TriggerDagRunOperator结合使用

总结

Apache Airflow 的数据流转与交换机制通过 XCom、Asset 和 Callback 三大组件,为分布式工作流环境提供了完整的数据协调能力。XCom 专注于任务间的数据传递,Asset 实现了数据驱动的 DAG 触发,而 Callback 则在关键生命周期节点提供了扩展点。理解并合理运用这些机制,能够构建出高效、可靠且可维护的数据流水线。

资料来源:airflow-core/docs/core-concepts/xcoms.rst:1-50 资料来源:airflow-core/docs/authoring-and-scheduling/assets.rst:1-50

资料来源:[airflow-core/src/airflow/models/xcom.py:1-100]()

FastAPI核心API

Apache Airflow的FastAPI核心API是项目现代化Web服务架构的重要组成部分,旨在替代传统的Flask API。这一API层采用FastAPI框架构建,提供了高性能、异步支持、自动文档生成等特性。FastAPI核心API主要分为两个独立的应用程序:Core API(核心API)和Execution API(执行API),分别承担不同的职责。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 Core API应用

继续阅读本节完整说明和来源证据。

章节 Execution API应用

继续阅读本节完整说明和来源证据。

章节 DAG Run数据模型

继续阅读本节完整说明和来源证据。

概述

Apache Airflow的FastAPI核心API是项目现代化Web服务架构的重要组成部分,旨在替代传统的Flask API。这一API层采用FastAPI框架构建,提供了高性能、异步支持、自动文档生成等特性。FastAPI核心API主要分为两个独立的应用程序:Core API(核心API)和Execution API(执行API),分别承担不同的职责。

FastAPI核心API位于 airflow-core/src/airflow/api_fastapi/ 目录下,采用模块化设计,将路由、数据模型、认证管理器等组件分离到独立的子模块中。这种架构设计使得代码组织清晰,便于维护和扩展。

架构概览

graph TB
    subgraph "FastAPI应用层"
        CA[Core API<br/>core_api/app.py]
        EA[Execution API<br/>execution_api/app.py]
    end
    
    subgraph "认证层"
        SAM[SimpleAuthManager<br/>simple_auth_manager.py]
        AM[Auth Managers]
    end
    
    subgraph "路由层"
        UI_DAGS[UI DAG Routes<br/>routes/ui/dags.py]
        API_ROUTES[API Routes]
    end
    
    subgraph "数据模型层"
        DM[DAG Run Datamodels<br/>datamodels/dag_run.py]
        DT[其他Datamodels]
    end
    
    CA --> SAM
    EA --> SAM
    CA --> UI_DAGS
    CA --> API_ROUTES
    CA --> DM
    EA --> DM

核心组件

Core API应用

Core API是Airflow的主要REST API服务,负责提供DAG管理、任务调度、监控等核心功能。该应用通过 core_api/app.py 文件初始化和配置,包含以下关键特性:

  • 异步请求处理支持
  • OpenAPI自动文档生成
  • Pydantic数据验证
  • 依赖注入系统
  • 中间件支持

Core API的路由结构按功能模块划分,包括UI路由、作业路由、配置路由等多个子模块。这种模块化设计使得API扩展和维护更加便捷。

Execution API应用

Execution API是专门用于任务执行的轻量级API服务,通过 execution_api/app.py 实现。该API主要负责:

  • 任务实例的执行状态管理
  • XCom消息传递
  • 任务心跳检测
  • 执行结果回调

Execution API采用独立部署模式,与Core API解耦,以提高系统的可靠性和响应速度。

数据模型

DAG Run数据模型

DAG Run数据模型定义了DAG运行实例的结构,是整个调度系统的核心数据结构之一。在 datamodels/dag_run.py 中定义的主要字段包括:

字段名类型说明资料来源
dag_idstrDAG唯一标识符dag_run.py
run_idstr运行实例唯一标识dag_run.py
stateDagRunState运行状态枚举dag_run.py
execution_datedatetime执行时间戳dag_run.py
start_datedatetime开始时间dag_run.py
end_datedatetime结束时间dag_run.py
confdictDAG配置参数dag_run.py

数据验证

所有通过API接收的数据都会经过Pydantic模型验证,确保数据类型和格式的正确性。这种自动验证机制减少了手动编写数据校验代码的需求,提高了开发效率。

路由结构

UI DAG路由

UI DAG路由是面向Web界面的API端点集合,通过 routes/ui/dags.py 文件实现。这些路由主要用于:

  • DAG列表查询
  • DAG运行历史记录
  • DAG状态监控
  • 任务依赖关系可视化

UI路由通常需要用户认证,并返回适合前端渲染的数据格式。

路由组织方式

graph LR
    subgraph "路由注册"
        RC[Route Configuration<br/>cli_config.py]
    end
    
    subgraph "CLI命令"
        CMD1[airflow dags]
        CMD2[airflow tasks]
        CMD3[airflow connections]
    end
    
    RC --> CMD1
    RC --> CMD2
    RC --> CMD3
    
    CMD1 --> DAG_ROUTES[DAG Routes]
    CMD2 --> TASK_ROUTES[Task Routes]
    CMD3 --> CONN_ROUTES[Connection Routes]

cli_config.py 中定义了Airflow CLI命令与API路由的映射关系,支持的命令包括:

命令组子命令功能说明
dagsbackfill, list-runs, pause, unpause, testDAG生命周期管理
taskslist, run, state, clear任务操作管理
connectionslist, add, delete, edit连接配置管理
providerslist, getProvider信息查询

认证与授权

SimpleAuthManager

SimpleAuthManager是FastAPI认证系统的核心组件,位于 auth/managers/simple/simple_auth_manager.py。它提供了简化的认证机制,适用于单机部署或开发环境。

主要功能包括:

  • 用户身份验证
  • 会话管理
  • 权限检查
  • API密钥验证

认证管理器架构

classDiagram
    class BaseAuthManager {
        <<abstract>>
        +authenticate()
        +get_user()
        +is_authorized()
    }
    
    class SimpleAuthManager {
        +authenticate()
        +get_user()
        +is_authorized()
        +get_user_role()
    }
    
    BaseAuthManager <|-- SimpleAuthManager

认证流程

认证请求的标准处理流程如下:

  1. 客户端发送带认证信息的请求
  2. 中间件拦截请求并提取认证凭证
  3. SimpleAuthManager验证凭证有效性
  4. 根据用户角色确定访问权限
  5. 返回认证结果或拒绝访问

CLI集成

命令注册机制

cli_config.py 中使用懒加载模式注册CLI命令,这种方式可以提高应用启动速度:

ActionCommand(
    name="info",
    help="Show information about current Airflow and environment",
    func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
    args=(ARG_ANONYMIZE, ARG_FILE_IO, ARG_VERBOSE, ARG_OUTPUT),
)

懒加载命令

懒加载机制确保只有在实际调用命令时才加载对应的模块,这有助于:

  • 减少内存占用
  • 加快应用启动时间
  • 避免不必要的模块依赖

支持的懒加载命令类型包括:

  • info_command - 环境信息展示
  • plugins_command - 插件信息导出
  • standalone_command - 独立运行模式
  • config_command - 配置管理
  • version_command - 版本信息
  • cheat_sheet_command - 命令速查表

配置管理

配置命令

FastAPI核心API提供了丰富的配置管理功能,包括:

命令功能说明
config get-value获取配置值打印指定配置项的值
config list列出配置显示所有配置选项
config lint配置检查验证配置迁移正确性
config update更新配置修改配置值

配置参数

配置API支持多种参数用于过滤和格式化输出:

  • --section - 指定配置章节
  • --option - 指定配置选项
  • --include-descriptions - 包含描述信息
  • --include-examples - 包含示例值
  • --include-sources - 显示配置来源
  • --include-env-vars - 显示环境变量
  • --hide-sensitive - 隐藏敏感信息

API响应格式

统一响应结构

FastAPI核心API采用统一的响应格式,便于客户端处理:

{
  "data": {},
  "meta": {
    "status": "success",
    "timestamp": "2024-01-01T00:00:00Z"
  }
}

错误响应

错误响应遵循统一的格式,包含错误码、消息和详细信息:

{
  "error": {
    "code": "DAG_NOT_FOUND",
    "message": "DAG with id 'example_dag' not found",
    "details": {}
  }
}

技术特性

异步支持

FastAPI核心API全面支持异步操作,包括:

  • 异步数据库查询
  • 异步HTTP请求
  • 异步文件操作
  • 并发任务处理

自动文档

基于OpenAPI标准,API自动生成交互式文档:

  • Swagger UI 可通过 /docs 访问
  • ReDoc 可通过 /redoc 访问
  • OpenAPI JSON 可通过 /openapi.json 获取

数据验证

使用Pydantic v2进行严格的数据验证和转换:

  • 类型检查
  • 默认值处理
  • 自定义验证器
  • 嵌套模型支持

部署架构

多实例部署

在生产环境中,FastAPI核心API支持多实例部署:

graph TB
    LB[Load Balancer]
    API1[Core API Instance 1]
    API2[Core API Instance 2]
    API3[Core API Instance 3]
    DB[(Database)]
    Redis[(Redis)]
    
    LB --> API1
    LB --> API2
    LB --> API3
    API1 --> DB
    API2 --> DB
    API3 --> DB
    API1 --> Redis
    API2 --> Redis
    API3 --> Redis

与传统Flask API对比

特性FastAPIFlask
性能异步原生支持同步为主
文档自动生成OpenAPI需手动编写
验证Pydantic自动验证手动验证
类型安全完整类型注解运行时检查
并发原生异步支持需扩展支持

安全考虑

敏感信息保护

API在处理敏感信息时采取以下保护措施:

  • 敏感配置加密存储
  • 日志输出脱敏处理
  • 环境变量优先读取
  • 连接凭证安全传输

权限控制

基于角色的访问控制(RBAC)确保API操作的安全性:

  • 管理员权限 - 完全访问
  • 编辑权限 - 修改配置
  • 只读权限 - 查询数据
  • 无权限 - 拒绝访问

扩展开发

自定义路由

开发者可以通过以下步骤添加自定义路由:

  1. 在相应模块下创建路由文件
  2. 定义路由函数和参数
  3. 注册路由到应用
  4. 添加CLI命令映射

自定义认证

扩展认证机制需要实现以下接口:

  • authenticate() - 用户认证方法
  • get_user() - 获取用户信息
  • is_authorized() - 权限检查方法

最佳实践

API设计原则

  1. RESTful规范 - 遵循REST设计原则
  2. 版本控制 - 保持API向后兼容
  3. 错误处理 - 返回有意义的错误信息
  4. 性能优化 - 使用异步操作提高吞吐量
  5. 文档维护 - 及时更新API文档

性能优化建议

  • 使用连接池管理数据库连接
  • 合理使用缓存减少数据库查询
  • 实现请求限流防止滥用
  • 监控API响应时间及时发现问题

相关文档

来源:https://github.com/apache/airflow / 项目说明书

React前端架构

Apache Airflow 的 React 前端是一个现代化的单页应用(SPA),采用 React 18、TypeScript 和 Chakra UI 构建,为用户提供了直观、高效的 DAG 管理和监控界面。该前端架构遵循组件化设计原则,通过 TanStack Table 实现数据表格功能,使用 react-router-dom 进行路由管理,并集成了 react-i18...

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 核心依赖

继续阅读本节完整说明和来源证据。

章节 构建工具

继续阅读本节完整说明和来源证据。

章节 页面目录结构

继续阅读本节完整说明和来源证据。

概述

Apache Airflow 的 React 前端是一个现代化的单页应用(SPA),采用 React 18、TypeScript 和 Chakra UI 构建,为用户提供了直观、高效的 DAG 管理和监控界面。该前端架构遵循组件化设计原则,通过 TanStack Table 实现数据表格功能,使用 react-router-dom 进行路由管理,并集成了 react-i18next 实现国际化支持。资料来源:dev/react-plugin-tools/react_plugin_template/README.md

技术栈概览

核心依赖

技术用途版本说明
React 18UI 框架核心库
TypeScript类型系统类型安全
Chakra UIUI 组件库样式和组件
TanStack Table数据表格排序、分页
react-router-dom路由管理SPA 导航
react-i18next国际化多语言支持

构建工具

前端项目使用 Vite 作为构建工具,提供快速的开发服务器和优化的生产构建。Vite 配置将 React 和相关生态库标记为外部依赖,以减少插件包体积。资料来源:dev/react-plugin-tools/react_plugin_template/README.md

页面组件架构

页面目录结构

Airflow 前端的页面组件位于 airflow-core/src/airflow/ui/src/pages/ 目录下,采用模块化组织方式:

pages/
├── DagRuns.tsx          # DAG运行列表页
├── DagsList/            # DAG列表模块
│   └── DagsList.tsx
├── Providers.tsx        # 提供商信息页
├── Run/                 # 运行详情模块
│   └── Header.tsx
├── XCom/                # XCom数据模块
│   └── XCom.tsx
└── HITLTaskInstances/   # 人工干预任务实例模块
    └── HITLTaskInstances.tsx

页面组件通用模式

每个页面组件通常遵循以下结构模式:

  1. 导入依赖:引入必要的 React hooks、组件和工具函数
  2. 类型定义:定义页面专用的 TypeScript 接口
  3. 列定义工厂函数:创建 createColumns 函数用于生成 TanStack Table 列配置
  4. 主组件导出:使用自定义 hooks 获取数据并渲染页面
const createColumns = (translate: TFunction): Array<ColumnDef<ProviderResponse>> => [
  {
    accessorKey: "package_name",
    cell: ({ row: { original } }) => (
      <Link ...>
        {original.package_name}
      </Link>
    ),
    enableSorting: false,
    header: translate("providers.columns.packageName"),
  },
  // ... 更多列定义
];

资料来源:airflow-core/src/airflow/ui/src/pages/Providers.tsx

核心组件体系

UI 基础组件

基础 UI 组件位于 airflow-core/src/airflow/ui/src/components/ui/ 目录下,提供可复用的界面元素。

#### Tag 组件

Tag 组件是对 Chakra UI Tag 的封装,提供了统一的标签样式:

const Tag = React.forwardRef<HTMLSpanElement, TagProps>(({ 
  closable = Boolean(onClose), 
  endElement, 
  startElement, 
  ...rest 
}) => {
  return (
    <ChakraTag.Root ref={ref} {...rest}>
      {Boolean(startElement) ? <ChakraTag.StartElement>{startElement}</ChakraTag.StartElement> : undefined}
      <ChakraTag.Label>{children}</ChakraTag.Label>
      {Boolean(endElement) ? <ChakraTag.EndElement>{endElement}</ChakraTag.EndElement> : undefined}
      {Boolean(closable) ? (
        <ChakraTag.EndElement>
          <ChakraTag.CloseTrigger onClick={onClose} />
        </ChakraTag.EndElement>
      ) : undefined}
    </ChakraTag.Root>
  );
});
属性类型说明
closableboolean是否显示关闭按钮
endElementReactNode标签尾部元素
startElementReactNode标签头部元素
onClose() => void关闭回调函数

资料来源:airflow-core/src/airflow/ui/src/components/ui/Tag.tsx

数据展示组件

#### DAG 触发运行组件 (TriggeredRuns)

TriggeredRuns 组件用于展示由 DAG 触发的运行信息,支持单个和多个运行场景:

export const TriggeredRuns = ({ dagRuns }: Props) => {
  if (dagRuns === undefined || dagRuns.length === 0) {
    return undefined;
  }

  return dagRuns.length === 1 ? (
    <Flex gap={1}>
      <Text>{`${translate("triggered")} ${translate("dagRun_one")}: `}</Text>
      <StateBadge state={dagRuns[0]?.state as DagRunState} />
      <Link asChild color="fg.info">
        <RouterLink to={`/dags/${dagRuns[0]?.dag_id}/runs/${dagRuns[0]?.run_id}`}>
          {dagRuns[0]?.dag_id}
        </RouterLink>
      </Link>
    </Flex>
  ) : (
    <Popover.Root autoFocus={false} lazyMount unmountOnExit>
      <Popover.Trigger asChild>
        <Button size="sm" variant="outline">
          {`${dagRuns.length} ${translate("triggered")} ...`}
        </Button>
      </Popover.Trigger>
      <Popover.Content ...>
        {/* 多个运行时显示下拉列表 */}
      </Popover.Content>
    </Popover.Root>
  );
};
场景渲染方式
单个运行内联展示 DAG ID 和状态徽章
多个运行下拉菜单,点击展开列表

资料来源:airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx

任务实例工具提示组件

TaskInstanceTooltip 组件提供任务实例的详细信息悬浮提示:

<TaskInstanceTooltip
  state={taskInstance.state}
  taskId={taskInstance.task_id}
  runId={runId}
  queuedWhen={taskInstance.queued_when}
  scheduledWhen={taskInstance.scheduled_when}
  startDate={taskInstance.start_date}
  endDate={taskInstance.end_date}
  tryNumber={taskInstance.try_number}
/>

显示的信息包括:任务 ID、状态、运行 ID、调度时间、队列时间、开始时间、结束时间以及重试次数。资料来源:airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx

对话框组件

#### 清除任务实例确认对话框

ClearTaskInstanceConfirmationDialog 是用于确认清除任务实例操作的模态对话框:

<Dialog.Root open={isOpen} onOpenChange={onClose}>
  <Dialog.Content>
    <Dialog.Header>
      <Dialog.Title>
        <Icon color="tomato" pr="2" size="lg">
          <GoAlertFill />
        </Icon>
        {translate("dags:runAndTaskActions.confirmationDialog.title")}
      </Dialog.Title>
      <Dialog.Description>
        {translate("dags:runAndTaskActions.confirmationDialog.description", {
          state: taskCurrentState,
          time: getRelativeTime(firstInstance.start_date),
          user: firstInstance?.unixname ?? "unknown user",
        })}
      </Dialog.Description>
    </Dialog.Header>
    <Dialog.Footer>
      <Button colorPalette="blue" onClick={onClose}>
        {translate("common:modal.confirm")}
      </Button>
    </Dialog.Footer>
  </Dialog.Content>
</Dialog.Root>

对话框使用 Radix UI 的 Dialog 组件,提供标题、描述和底部操作区域,支持国际化翻译。资料来源:airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx

#### 批量清除任务实例对话框

ClearGroupTaskInstanceDialog 用于批量清除一组任务实例,支持以下参数:

参数类型说明
futureboolean是否包含未来运行
pastboolean是否包含过去运行
upstreamboolean是否包含上游任务
onlyFailedboolean仅清除失败任务
runOnLatestVersionboolean使用最新 DAG 版本运行
notestring \null清除操作备注
groupTaskIdsstring[]任务 ID 列表

资料来源:airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx

模态框组件

#### DAG 导入错误模态框

DAGImportErrorsModal 展示 DAG 导入过程中的错误信息:

  • 使用 Accordion 组件折叠/展开每个错误详情
  • 使用 Pagination 组件分页显示错误列表
  • 显示错误时间戳和完整的堆栈跟踪信息
<Accordion.Root ...>
  {importErrors.map((importError) => (
    <Accordion.Item key={importError.timestamp} value={importError.timestamp}>
      <Accordion.ItemTrigger>
        <Text>{importError.filename}</Text>
      </Accordion.ItemTrigger>
      <Accordion.ItemContent>
        <Text color="fg.error" fontSize="sm" whiteSpace="pre-wrap">
          <code>{importError.stack_trace}</code>
        </Text>
      </Accordion.ItemContent>
    </Accordion.Item>
  ))}
</Accordion.Root>

资料来源:airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx

表格列配置系统

TanStack Table 集成

Airflow 前端使用 TanStack Table(React Table)作为数据表格解决方案,通过列定义工厂函数生成可配置的表格结构。

DAG 运行列表列配置

DagRuns.tsx 中的列配置示例:

访问器键组件功能
run_afterTime显示运行时间
stateStateBadge任务状态徽章
run_typeRunTypeIcon + Text运行类型图标和文字
triggering_user_nameText触发用户名称
start_dateTime开始时间
end_dateTime结束时间
durationrenderDuration持续时间渲染
{
  accessorKey: "state",
  cell: ({ row: { original: { state } } }) => (
    <StateBadge state={state}>
      {translate(`common:states.${state}`)}
    </StateBadge>
  ),
  header: () => translate("state"),
},

资料来源:airflow-core/src/airflow/ui/src/pages/DagRuns.tsx

DAG 列表特殊列

DagsList.tsx 包含一些独特的列定义:

{
  accessorKey: "pending_actions",
  cell: ({ row: { original: dag } }) => (
    <NeedsReviewBadge dagId={dag.dag_id} pendingActions={dag.pending_actions} />
  ),
  enableSorting: false,
  header: "",
},
{
  accessorKey: "trigger",
  cell: ({ row: { original } }) => (
    <TriggerDAGButton
      allowedRunTypes={original.allowed_run_types}
      dagDisplayName={original.dag_display_name}
      dagId={original.dag_id}
      isPaused={original.is_paused}
    />
  ),
  enableSorting: false,
  header: "",
},

这些列用于显示待处理操作和 DAG 触发按钮,没有排序功能。资料来源:airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx

国际化实现

翻译函数使用

前端使用 react-i18nextuseTranslation hook 获取翻译函数:

const { t: translate } = useTranslation("common");

// 在列配置中使用
header: () => translate("dagRun.runAfter")

// 动态键翻译
{translate(`common:states.${state}`)}

命名空间管理

翻译键使用冒号分隔命名空间,例如 common:states.runningdagRun.runAfter 等,允许多个翻译文件独立管理。资料来源:airflow-core/src/airflow/ui/src/pages/DagRuns.tsx

路由与导航

路由模式

Airflow 前端使用 react-router-dom 进行客户端路由,主要导航模式包括:

// 链接到 DAG 详情
<RouterLink to={`/dags/${dagId}/runs/${runId}`}>

// 链接到任务实例
<RouterLink to={getTaskInstanceLink({
  dagId: original.dag_id,
  dagRunId: original.run_id,
  mapIndex: original.map_index,
  taskId: original.task_id,
})}>

嵌套路由结构

页面通过嵌套路由组织,支持从 DAG 列表 → DAG 详情 → 任务实例的层级导航。

React 插件系统

插件模板架构

Airflow 提供 React 插件模板,支持第三方开发者构建可集成到主应用的组件库:

import { PluginComponent } from 'your-plugin-name';

<PluginComponent />

插件开发指南

步骤说明
组件开发使用 React + TypeScript 编写功能组件
主题适配自动继承 Airflow 主应用主题
构建配置使用 Vite 构建为库文件
外部依赖React 生态库标记为外部以避免冲突

资料来源:dev/react-plugin-tools/react_plugin_template/README.md

数据流架构

graph TD
    A[用户交互] --> B[React 组件]
    B --> C[API 请求]
    C --> D[Airflow REST API]
    D --> E[后端数据库]
    E --> D
    D --> C
    C --> F[状态更新]
    F --> B
    B --> G[UI 渲染]
    
    H[TanStack Table] --> I[列定义配置]
    I --> B
    B --> J[数据展示]
    
    K[useTranslation] --> L[翻译键]
    L --> M[翻译文件]
    M --> K

组件生命周期管理

状态管理

组件使用 React hooks 管理状态:

  • useState:本地组件状态
  • useEffect:副作用处理
  • useTranslation:国际化
  • useParams:URL 参数获取
  • useTableURLState:表格 URL 状态同步

条件渲染模式

// 使用短路求值
{Boolean(dagRuns.length) && <Component />}

// 使用三元运算符
{condition ? <TrueComponent /> : <FalseComponent />}

// 使用 && 运算符
{hasCondition && <Component />}

样式系统

Chakra UI 集成

前端使用 Chakra UI 作为主要样式解决方案,提供:

  • 预设的设计令牌(颜色、间距、字体)
  • 可访问性内置支持
  • 主题定制能力
  • 组件组合模式

颜色语义

语义颜色用途示例
fg.info信息文本链接
fg.error错误文本错误消息
bg.emphasized强调背景弹窗背景

最佳实践

组件开发规范

  1. 类型安全:所有组件使用 TypeScript,明确输入输出类型
  2. 可访问性:使用语义化 HTML 和 ARIA 属性
  3. 国际化:所有用户可见文本使用翻译函数
  4. 错误处理:组件包含错误边界和降级处理
  5. 性能优化:使用 lazyMountunmountOnExit 延迟加载

文件组织原则

  • 页面组件:位于 pages/ 目录,包含业务逻辑
  • 通用组件:位于 components/ 目录,可复用
  • UI 组件:位于 components/ui/ 目录,基础元素
  • 布局组件:位于 layouts/ 目录,页面结构

总结

Apache Airflow 的 React 前端采用了现代化的组件化架构,通过 TanStack Table 实现灵活的数据表格功能,借助 Chakra UI 提供一致的用户界面体验,使用 react-i18next 支持多语言环境。插件系统的设计允许第三方开发者扩展功能,同时保持与主应用的主题一致性。整个前端架构体现了可维护性、可扩展性和用户体验的平衡。

资料来源:[airflow-core/src/airflow/ui/src/pages/Providers.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/Providers.tsx)

容器化与Kubernetes部署

Apache Airflow 作为分布式工作流编排平台,支持通过容器化和 Kubernetes 实现灵活的部署方案。本文详细介绍 Airflow 的 Docker 镜像构建机制、Kubernetes 部署架构以及生产环境最佳实践。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 1.1 Docker 构建配置

继续阅读本节完整说明和来源证据。

章节 1.2 入口点与初始化

继续阅读本节完整说明和来源证据。

章节 2.1 支持的 Kubernetes 版本

继续阅读本节完整说明和来源证据。

1. Docker 容器化概述

Apache Airflow 提供官方的 apache/airflow Docker 镜像,该镜像是构建生产级部署环境的基础。镜像采用多阶段构建技术,基于 Python slim 镜像优化体积。

1.1 Docker 构建配置

Airflow 的 Dockerfile 使用 Docker BuildKit 语法,支持通过 --mount=type=cache 挂载缓存目录以加速构建过程:

# syntax=docker/dockerfile:1.4
FROM python:{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}-slim-{ALLOWED_DEBIAN_VERSIONS[0]}
RUN apt-get update && apt-get install -y --no-install-recommends libatomic1 git curl
RUN pip install uv=={UV_VERSION}
RUN --mount=type=cache,id=cache-airflow-build-dockerfile-installation,target=/root/.cache/ \
  uv pip install --system ignore pip=={AIRFLOW_PIP_VERSION} hatch=={HATCH_VERSION} \
  pyyaml=={PYYAML_VERSION} gitpython=={GITPYTHON_VERSION} rich==

资料来源:dev/breeze/src/airflow_breeze/commands/release_management_commands.py:52-58

1.2 入口点与初始化

Docker 镜像的入口点脚本负责初始化 SQLite 数据库(当未配置外部数据库时):

如果未设置 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 变量,则在 ${AIRFLOW_HOME}/airflow.db 创建 SQLite 数据库。

资料来源:docker-stack-docs/README.md:15-16

2. Kubernetes 部署架构

Airflow 在 Kubernetes 上的部署通过 Helm Chart 实现,支持高度可配置的分布式架构。

2.1 支持的 Kubernetes 版本

Apache Airflow 根据三大云服务商的 Kubernetes 支持周期维护兼容版本列表:

云服务商生命周期页面
Amazon EKSendoflife.date/amazon-eks
Azure Kubernetes Serviceendoflife.date/azure-kubernetes-service
Google Kubernetes Engineendoflife.date/google-kubernetes-engine

当前允许的 Kubernetes 版本列表:

版本状态
v1.30.13✅ 支持
v1.31.12✅ 支持
v1.32.8✅ 支持
v1.33.4✅ 支持
v1.34.0✅ 支持

资料来源:dev/breeze/src/airflow_breeze/global_constants.py:ALLOWED_KUBERNETES_VERSIONS

2.2 Kubernetes 组件架构

Airflow 在 Kubernetes 环境中的核心组件包括:

graph TD
    subgraph "Kubernetes 集群"
        subgraph "airflow Namespace"
            Scheduler["Scheduler Pod<br/>调度器"]
            Webserver["Webserver Pod<br/>Web服务器"]
            Triggerer["Triggerer Pod<br/>触发器"]
            DagProcessor["Dag Processor Pod<br/>DAG处理器"]
        end
        
        subgraph "Pod 配置"
            Executor["Executor<br/>执行器"]
            Worker["Worker Pod<br/>工作节点"]
            K8SExecutor["K8S Executor<br/>Kubernetes执行器"]
        end
    end
    
    ExternalDB["外部数据库<br/>PostgreSQL/MySQL"]
    Redis["消息队列<br/>Redis"]
    
    Scheduler --> Executor
    Executor --> K8SExecutor
    K8SExecutor --> Worker
    Scheduler --> ExternalDB
    Webserver --> ExternalDB
    Triggerer --> Redis

2.3 多命名空间模式

Airflow 支持多命名空间部署模式,允许在不同命名空间中运行测试环境和生产环境:

option_multi_namespace_mode = click.option(
    "--multi-namespace-mode",
    help="使用多命名空间模式",
    is_flag=True,
    envvar="MULTI_NAMESPACE_MODE",
)

资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:multi_namespace_mode

3. Kubernetes 部署命令

3.1 部署工作流程

graph LR
    A["breeze k8s deploy-airflow"] --> B["创建 KinD 集群"]
    B --> C["创建 Kubernetes 命名空间"]
    C --> D["部署 Helm Chart"]
    D --> E["配置 Airflow 组件"]
    E --> F["验证部署状态"]

3.2 核心部署选项

参数说明默认值
--pythonPython 版本3.10
--kubernetes-versionKubernetes 集群版本v1.31.12
--executor执行器类型LocalExecutor
--deploy/--no-deploy是否通过 Skaffold 部署False
--upgrade升级而非安装 Helm ChartFalse
--multi-namespace-mode启用多命名空间模式False
--rebuild-base-image重建基础镜像False

资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:option_executor

3.3 部署流程代码

部署 Airflow 到 Kubernetes 集群的核心逻辑:

def _deploy_airflow(
    python: str,
    kubernetes_version: str,
    output: Output | None,
    executor: str,
    upgrade: bool,
    use_standard_naming: bool,
    wait_time_in_seconds: int,
    extra_options: str,
    multi_namespace_mode: bool,
):
    cluster_name = get_kubectl_cluster_name(
        python=python, kubernetes_version=kubernetes_version
    )
    # 创建命名空间
    get_console(output=output).print(f"[info]创建集群 {cluster_name} 的命名空间")
    run_command_with_k8s_env(
        ["kubectl", "create", "namespace", HELM_AIRFLOW_NAMESPACE],
        python=python,
        kubernetes_version=kubernetes_version,
        output=output,
        check=False,
    )

资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:_deploy_airflow

4. Kubernetes 执行器配置

4.1 Pod 覆盖机制

Kubernetes 执行器允许通过 pod_override 动态配置 Pod 规格:

start_task_executor_config = {
    "pod_override": k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})
    )
}

@task(executor_config=start_task_executor_config)
def start_task():
    print_stuff()

资料来源:airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:20-27

4.2 Volume 挂载配置

支持在 Task 执行时挂载持久化卷:

executor_config_volume_mount = {
    "pod_override": k8s.V1Pod(
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    volume_mounts=[
                        k8s.V1VolumeMount(
                            mount_path="/foo/",
                            name="example-kubernetes-test-volume"
                        )
                    ],
                )
            ],
            volumes=[
                k8s.V1Volume(
                    name="example-kubernetes-test-volume",
                    host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                )
            ],
        )
    ),
}

资料来源:airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:31-56

4.3 执行器配置参数表

配置项类型说明
pod_overrideV1PodPod 覆盖规格
annotationsDict[str, str]Pod 注解
labelsDict[str, str]Pod 标签
node_selectorDict[str, str]节点选择器
tolerationsList[V1Toleration]容忍度配置
affinityV1Affinity亲和性配置
resourcesV1ResourceRequirements资源限制

5. Docker Compose 开发环境

对于本地开发,Airflow 提供 Docker Compose 配置:

services:
  airflow:
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__CORE__DAGS__FOLDER: ${AIRFLOW_PROJ_DIR:-.}/dags
      AIRFLOW__CORE__LOAD_EXAMPLES: 'true'

资料来源:airflow-core/docs/howto/docker-compose/docker-compose.yaml

5.1 Docker Compose 配置参数

环境变量说明默认值
AIRFLOW__CORE__EXECUTOR执行器类型LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN数据库连接字符串sqlite
AIRFLOW__CORE__FERNET_KEY加密密钥-
AIRFLOW__CORE__DAGS__FOLDERDAG 文件目录./dags
AIRFLOW__CORE__LOAD_EXAMPLES加载示例 DAGtrue
AIRFLOW__CORE__AIRFLOW_HOMEAirflow 主目录/opt/airflow

6. 生产环境最佳实践

6.1 部署检查清单

检查项说明
外部数据库配置使用 PostgreSQL 或 MySQL 而非 SQLite
执行器选择生产环境推荐 CeleryExecutor 或 KubernetesExecutor
高可用配置部署多个 Scheduler 和 Webserver 实例
资源限制为各组件设置 CPU 和内存限制
健康检查配置 readiness 和 liveness 探针
持久化存储使用 PVC 持久化 DAG 和日志

6.2 安全配置

生产环境部署时应考虑以下安全措施:

  1. 配置 Fernet 密钥:启用敏感数据加密
  2. 使用 Secrets:通过 Kubernetes Secrets 管理凭据
  3. 网络策略:配置 Pod 间网络隔离
  4. RBAC:启用基于角色的访问控制
  5. 审计日志:启用 Airflow 审计日志功能

6.3 监控与告警

建议集成的监控组件:

  • Metrics: Prometheus + Grafana
  • Logging: ELK Stack 或 Loki
  • Tracing: Jaeger
  • Alerting: Alertmanager

7. 快速部署命令

7.1 使用 Breeze 部署

# 部署到 Kind 集群
breeze k8s deploy-airflow --python 3.10 --kubernetes-version v1.31.12

# 升级现有部署
breeze k8s deploy-airflow --upgrade

# 启用多命名空间模式
breeze k8s deploy-airflow --multi-namespace-mode

# 进入集群 Shell
breeze k8s shell

# 运行测试
breeze k8s tests

7.2 Skaffold 开发循环

对于开发场景,支持热重载功能:

breeze k8s dev --skaffold-args "--auto-sync"

该命令会:

  • 同步 DAG 文件到运行中的 Pod
  • 热重载 airflow-core 源码(Scheduler/Triggerer/DAG Processor/API Server)
  • 自动刷新 UI(暂不支持)

资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:kubernetes_group

8. 总结

Apache Airflow 的容器化和 Kubernetes 部署方案提供了企业级的灵活性。通过官方 Docker 镜像、Helm Chart 以及 Breeze 工具链,开发者和运维人员可以:

  • 在本地快速搭建开发环境
  • 在 Kubernetes 上实现弹性扩展
  • 通过 Kubernetes 执行器获得细粒度的资源控制
  • 利用多命名空间支持隔离测试和生产环境

建议在生产部署前仔细评估业务需求,选择合适的执行器类型,并遵循本文档的最佳实践确保系统稳定性和安全性。

资料来源:[dev/breeze/src/airflow_breeze/commands/release_management_commands.py:52-58]()

Provider生态系统

Apache Airflow的Provider生态系统是Airflow的核心扩展机制,允许用户通过安装额外的Provider包来集成各种外部服务和平台。每个Provider提供特定的Hook、Operator、Trigger、Sensor和连接类型,使用户能够在DAG中与外部系统进行交互。

章节 Provider核心组件

继续阅读本节完整说明和来源证据。

章节 层级架构

继续阅读本节完整说明和来源证据。

章节 AWS Hook实现示例

继续阅读本节完整说明和来源证据。

章节 连接类型注册

继续阅读本节完整说明和来源证据。

概述

Apache Airflow的Provider生态系统是Airflow的核心扩展机制,允许用户通过安装额外的Provider包来集成各种外部服务和平台。每个Provider提供特定的Hook、Operator、Trigger、Sensor和连接类型,使用户能够在DAG中与外部系统进行交互。

Provider作为独立的Python包发布到PyPI,可以通过pip install apache-airflow-providers-<provider_name>进行安装。这种模块化设计使得Airflow核心保持精简,同时用户可以根据实际需求选择性地添加所需的集成功能。

架构设计

Provider核心组件

每个Provider包通常包含以下核心组件:

组件类型描述用途
Hook数据连接抽象类提供与外部系统交互的基础接口
Operator任务执行单元定义具体的业务操作逻辑
Sensor外部依赖检查器轮询外部条件直到满足为止
Trigger异步事件触发器支持triggerer组件异步执行
Connection连接配置定义与外部系统的连接参数

层级架构

graph TD
    A[Airflow Core] --> B[Providers Manager]
    B --> C[Provider Package]
    C --> D[Hooks]
    C --> E[Operators]
    C --> F[Sensors]
    C --> G[Triggers]
    D --> H[External Services]
    E --> H
    F --> H
    G --> H
    
    subgraph "Provider Package Structure"
    I[get_provider_info.py]
    J[__init__.py]
    K[hooks/*.py]
    L[operators/*.py]
    M[sensors/*.py]
    end

Hook实现机制

Hook是Provider与外部系统交互的基础桥梁。不同Provider的Hook都遵循统一的基类设计规范。

AWS Hook实现示例

在Amazon Provider中,Hook通过boto3与AWS服务进行交互:

class Ec2Hook(AwsBaseHook):
    API_TYPES = frozenset({"resource_type", "client_type"})

    def __init__(self, api_type="resource_type", *args, **kwargs) -> None:
        if api_type not in self.API_TYPES:
            raise AirflowException("api_type can only be one of %s", self.API_TYPES)
        kwargs[api_type] = "ec2"
        self._api_type = api_type
        super().__init__(*args, **kwargs)

资料来源:providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py:24-36

连接类型注册

Provider通过get_provider_info.py文件声明其提供的连接类型:

{
    "integration-name": "AWS EC2",
    "external-doc-url": "https://aws.amazon.com/ec2/",
    "logo": "/docs/integration-logos/[email protected]",
    "tags": ["aws"],
}

资料来源:providers/amazon/src/airflow/providers/amazon/get_provider_info.py:45-52

Provider生命周期管理

文档生成流程

Provider的文档通过Breeze工具自动生成,包括README、conf.py等文件:

def _generate_docs_conf(context: dict[str, Any], provider_details: ProviderPackageDetails):
    docs_conf_content = render_template(
        template_name="conf",
        context=context,
        extension=".py",
        keep_trailing_newline=True,
    )
    docs_conf_path = provider_details.root_provider_path / "docs" / "conf.py"
    docs_conf_path.write_text(docs_conf_content)

资料来源:dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py:89-97

版本与依赖管理

Provider的最小Airflow版本依赖通过解析dependencies确定:

def get_min_airflow_version(provider_id: str) -> str:
    provider_details = get_provider_details(provider_id=provider_id)
    min_airflow_version = MIN_AIRFLOW_VERSION
    for dependency in provider_details.dependencies:
        if dependency.startswith("apache-airflow>="):
            current_min_airflow_version = dependency.split(">=")[1]
            if "," in current_min_airflow_version:
                current_min_airflow_version = current_min_airflow_version.split(",")[0]
            if PackagingVersion(current_min_airflow_version) > PackagingVersion(MIN_AIRFLOW_VERSION):
                min_airflow_version = current_min_airflow_version
    return min_airflow_version

资料来源:dev/breeze/src/airflow_breeze/utils/packages.py:78-91

CLI命令集成

Provider功能通过Airflow CLI暴露,允许用户在命令行执行各种管理操作:

GroupCommand(
    name="providers",
    help="Display providers",
    subcommands=PROVIDERS_COMMANDS,
),

资料来源:airflow-core/src/airflow/cli/cli_config.py:89-92

Provider相关命令

命令功能
airflow providers list列出所有已安装的Provider
airflow providers links显示Provider提供的链接
airflow providers secrets获取secrets后端信息
airflow providers executors获取Executor信息
airflow providers auth-managers获取认证管理器信息

默认连接配置

Airflow在初始化数据库时会创建一系列默认连接,这些连接覆盖了常用的外部系统:

连接ID连接类型默认配置
facebook_socialfacebook_socialFacebook社交登录凭证
fs_defaultfs本地文件系统
ftp_defaultftplocalhost:21
google_cloud_defaultgoogle_cloud_platformGCP默认项目
http_defaulthttphttpbin.org
iceberg_defaulticebergIceberg REST服务

资料来源:airflow-core/src/airflow/utils/db.py:180-220

事件驱动架构

MessageQueueTrigger集成

Provider支持通过MessageQueueTrigger实现事件驱动的任务执行:

class GooglePubSubEvent(Protocol):
    scheme = "google+pubsub"

    def trigger_class(self) -> type[BaseEventTrigger]:
        return PubsubPullTrigger

资料来源:providers/google/src/airflow/providers/google/event_scheduling/events/pubsub.py:52-57

异步Trigger架构

sequenceDiagram
    DAG Task ->> Trigger: 创建Trigger实例
    Trigger ->> MessageQueue: 订阅消息队列
    MessageQueue -->> Trigger: 事件到达
    Trigger ->> Triggerer: 触发条件满足
    Triggerer ->> DAG Task: 任务执行完成

Provider发现机制

自动注册流程

当Airflow启动时,Providers Manager自动扫描并加载所有已安装的Provider:

graph LR
    A[启动Airflow] --> B[扫描Provider包]
    B --> C[加载Hook]
    B --> D[加载Operator]
    B --> E[加载Sensor]
    B --> F[注册Connection类型]
    C --> G[注册到UI]
    D --> G
    E --> G
    F --> G

主流Provider一览

Amazon Provider (apache-airflow-providers-amazon)

提供与AWS服务的深度集成,包括:

  • 计算服务:EC2、Lambda、ECS
  • 数据服务:Glue、S3、Redshift、Athena
  • 编排服务:Step Functions、CloudFormation
  • 机器学习:SageMaker、Bedrock

资料来源:providers/amazon/src/airflow/providers/amazon/aws/hooks/cloud_formation.py:18-25

Google Provider (apache-airflow-providers-google)

集成Google Cloud Platform服务,包括BigQuery、GCS、Cloud Run等。

Apache Kafka Provider

提供Kafka消息队列的Hook和Operator支持,用于流数据处理场景。

总结

Provider生态系统是Apache Airflow实现高度可扩展性的核心机制。通过标准化的组件接口、统一的生命周期管理和灵活的CLI集成,Provider使得Airflow能够优雅地连接各种外部系统。用户可以根据工作需求选择性地安装Provider,实现与云服务商、数据库、消息队列等数十种外部平台的深度集成。

资料来源:[providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py:24-36]()

失败模式与踩坑日记

保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。

high 来源证据:`ExternalTaskSensor` can succeed early for task groups with NULL task states

可能增加新用户试用和生产接入成本。

medium 能力判断依赖假设

假设不成立时,用户拿不到承诺的能力。

medium 维护活跃度未知

新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。

medium 下游验证发现风险项

下游已经要求复核,不能在页面中弱化。

Pitfall Log / 踩坑日志

项目:apache/airflow

摘要:发现 19 个潜在踩坑项,其中 1 个为 high/blocking;最高优先级:安装坑 - 来源证据:ExternalTaskSensor can succeed early for task groups with NULL task states。

1. 安装坑 · 来源证据:`ExternalTaskSensor` can succeed early for task groups with NULL task states

  • 严重度:high
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:ExternalTaskSensor can succeed early for task groups with NULL task states
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_3c746f7ce44f43f1a5a81840f4ee741a | https://github.com/apache/airflow/issues/66877 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

2. 能力坑 · 能力判断依赖假设

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:README/documentation is current enough for a first validation pass.
  • 对用户的影响:假设不成立时,用户拿不到承诺的能力。
  • 建议检查:将假设转成下游验证清单。
  • 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
  • 证据:capability.assumptions | github_repo:33884891 | https://github.com/apache/airflow | README/documentation is current enough for a first validation pass.

3. 维护坑 · 维护活跃度未知

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:未记录 last_activity_observed。
  • 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
  • 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
  • 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
  • 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | last_activity_observed missing

4. 安全/权限坑 · 下游验证发现风险项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:下游已经要求复核,不能在页面中弱化。
  • 建议检查:进入安全/权限治理复核队列。
  • 防护动作:下游风险存在时必须保持 review/recommendation 降级。
  • 证据:downstream_validation.risk_items | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium

5. 安全/权限坑 · 存在安全注意事项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:No sandbox install has been executed yet; downstream must verify before user use.
  • 对用户的影响:用户安装前需要知道权限边界和敏感操作。
  • 建议检查:转成明确权限清单和安全审查提示。
  • 防护动作:安全注意事项必须面向用户前置展示。
  • 证据:risks.safety_notes | github_repo:33884891 | https://github.com/apache/airflow | No sandbox install has been executed yet; downstream must verify before user use.

6. 安全/权限坑 · 存在评分风险

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:风险会影响是否适合普通用户安装。
  • 建议检查:把风险写入边界卡,并确认是否需要人工复核。
  • 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
  • 证据:risks.scoring_risks | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium

7. 安全/权限坑 · 来源证据:Apache Airflow 3.1.6

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.6
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_c94c5b79bb91454c9e0ad22b4a36dc11 | https://github.com/apache/airflow/releases/tag/3.1.6 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。

8. 安全/权限坑 · 来源证据:Apache Airflow 3.1.7

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.7
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_80a5614167b44ecca96422caa56afca4 | https://github.com/apache/airflow/releases/tag/3.1.7 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。

9. 安全/权限坑 · 来源证据:Apache Airflow 3.1.8

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.8
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_3437fcfc89ff40a0ba17b7e5a5d8aa2c | https://github.com/apache/airflow/releases/tag/3.1.8 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

10. 安全/权限坑 · 来源证据:Apache Airflow 3.2.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.2.0
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_d18a498a98ed48f0bb3f813aaa554aea | https://github.com/apache/airflow/releases/tag/3.2.0 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。

11. 安全/权限坑 · 来源证据:Apache Airflow 3.2.1

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.2.1
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_41889eab2c0240bbb0bd010262d41346 | https://github.com/apache/airflow/releases/tag/3.2.1 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

12. 安全/权限坑 · 来源证据:Apache Airflow Ctl (airflowctl) 0.1.2

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Ctl (airflowctl) 0.1.2
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_df5b495084624a899a4674d4b0193ec7 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.2 | 来源类型 github_release 暴露的待验证使用条件。

13. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.19.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.19.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_0f5ecc599d634c1daa9ee6c9f2434849 | https://github.com/apache/airflow/releases/tag/helm-chart/1.19.0 | 来源类型 github_release 暴露的待验证使用条件。

14. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.20.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.20.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_c13fe02283094cffa9e186716cc8dcf5 | https://github.com/apache/airflow/releases/tag/helm-chart/1.20.0 | 来源讨论提到 node 相关条件,需在安装/试用前复核。

15. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.21.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.21.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_d29213d0ae2441e8907f407d8dc9b861 | https://github.com/apache/airflow/releases/tag/helm-chart/1.21.0 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。

16. 安全/权限坑 · 来源证据:airflow-ctl/0.1.3

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:airflow-ctl/0.1.3
  • 对用户的影响:可能影响授权、密钥配置或安全边界。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_9096523c7ba541c6ac1b6b926d6f6bc0 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.3 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

17. 安全/权限坑 · 来源证据:edge3: upgrade from 1.x silently leaves schema stale — `BaseDBManager.upgradedb` stamps `alembic_version_edge3` to head…

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:edge3: upgrade from 1.x silently leaves schema stale — BaseDBManager.upgradedb stamps alembic_version_edge3 to head without running migrations
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_2bc80550929a49ddbce05c557bb225e0 | https://github.com/apache/airflow/issues/66524 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

18. 维护坑 · issue/PR 响应质量未知

  • 严重度:low
  • 证据强度:source_linked
  • 发现:issue_or_pr_quality=unknown。
  • 对用户的影响:用户无法判断遇到问题后是否有人维护。
  • 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
  • 防护动作:issue/PR 响应未知时,必须提示维护风险。
  • 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | issue_or_pr_quality=unknown

19. 维护坑 · 发布节奏不明确

  • 严重度:low
  • 证据强度:source_linked
  • 发现:release_recency=unknown。
  • 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
  • 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
  • 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
  • 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | release_recency=unknown

来源:Doramagic 发现、验证与编译记录