Doramagic 项目包 · 项目说明书
airflow 项目
生成时间:2026-05-13 22:31:12 UTC
Airflow简介与核心概念
Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许用户使用Python代码定义、调度和执行复杂的批处理工作流。Airflow采用分布式架构,支持大规模工作流编排,是数据工程和MLOps领域广泛使用的开源工作流管理平台。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许用户使用Python代码定义、调度和执行复杂的批处理工作流。Airflow采用分布式架构,支持大规模工作流编排,是数据工程和MLOps领域广泛使用的开源工作流管理平台。
资料来源:README.md
核心架构
Airflow采用主从架构,主要包含以下组件:
| 组件 | 功能 | 说明 |
|---|---|---|
| Scheduler | 调度器 | 负责调度DAG运行,将任务分发给执行器 |
| Executor | 执行器 | 实际执行任务,支持多种类型 |
| Web Server | Web服务器 | 提供UI界面用于监控和管理 |
| Worker | 工作进程 | 执行具体任务的进程 |
| Metadata Database | 元数据库 | 存储DAG、任务、执行状态等元数据 |
资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100
graph TD
A[用户编写的DAG定义] --> B[Scheduler]
B --> C[Executor]
C --> D[Worker]
D --> E[Metadata Database]
B --> E
F[Web Server] --> E
G[用户通过UI/API] --> FDAG有向无环图
DAG(Directed Acyclic Graph)是Airflow的核心概念,代表工作流的结构定义。
DAG定义
from airflow import DAG
from datetime import datetime
with DAG(
dag_id='my_first_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False
) as dag:
task1 = BashOperator(task_id='task_1', bash_command='echo Hello')
task2 = BashOperator(task_id='task_2', bash_command='echo World')
task1 >> task2
资料来源:airflow-core/src/airflow/models/dag.py
DAG属性
| 属性 | 类型 | 描述 |
|---|---|---|
| dag_id | str | DAG的唯一标识符 |
| start_date | datetime | DAG开始日期 |
| schedule_interval | str/timedelta | 调度间隔 |
| catchup | bool | 是否补跑历史数据 |
| max_active_runs | int | 最大同时运行数 |
| is_paused | bool | DAG是否暂停 |
任务与任务实例
任务类型
Airflow支持多种任务类型:
| 任务类型 | 说明 |
|---|---|
| BashOperator | 执行Bash命令 |
| PythonOperator | 执行Python函数 |
| Sensor | 等待特定条件满足 |
| TaskGroup | 任务组容器 |
资料来源:airflow-core/src/airflow/models/taskinstance.py
任务实例状态
stateDiagram-v2
[*] --> queued: 任务创建
queued --> scheduled: Scheduler调度
scheduled --> running: Worker接收
running --> success: 执行成功
running --> failed: 执行失败
running --> upstream_failed: 上游任务失败
success --> [*]
failed --> [*]
upstream_failed --> [*]资料来源:airflow-core/src/airflow/_shared/state/__init__.py
执行器类型
Airflow支持多种执行器,用于在不同环境中执行任务:
| 执行器 | 说明 | 适用场景 |
|---|---|---|
| LocalExecutor | 本地并行执行 | 开发/测试环境 |
| SequentialExecutor | 顺序执行 | 单任务场景 |
| CeleryExecutor | 分布式执行 | 生产环境大规模任务 |
| KubernetesExecutor | K8s Pod执行 | 云原生环境 |
资料来源:airflow-core/src/airflow/cli/cli_config.py
CLI命令行接口
Airflow提供丰富的CLI命令用于管理DAG和任务:
| 命令 | 功能 |
|---|---|
| airflow dags list | 列出所有DAG |
| airflow dags list-runs | 列出DAG运行记录 |
| airflow tasks list | 列出DAG中的任务 |
| airflow tasks test | 测试单个任务 |
| airflow connections list | 列出连接 |
| airflow variables list | 列出变量 |
资料来源:airflow-core/src/airflow/cli/cli_config.py
常用命令示例
# 列出所有DAG
airflow dags list
# 触发DAG手动运行
airflow dags trigger my_dag_id
# 暂停/恢复DAG
airflow dags pause my_dag_id
airflow dags unpause my_dag_id
# 查看任务实例状态
airflow tasks list my_dag_id
连接与变量
连接管理
Airflow提供连接(Connection)功能用于存储外部系统访问凭证:
from airflow.models import Connection
import json
conn = Connection(
conn_id='my_postgres_conn',
conn_type='postgres',
host='localhost',
schema='airflow_db',
login='user',
password='password',
port=5432
)
资料来源:airflow-core/src/airflow/utils/db.py
支持的连接类型
| 连接类型 | 说明 |
|---|---|
| postgres | PostgreSQL数据库 |
| mysql | MySQL数据库 |
| google_cloud_platform | GCP云平台 |
| http | HTTP连接 |
| ssh | SSH连接 |
| s3 | AWS S3存储 |
配置管理
airflow.cfg主要配置项
| 配置节 | 配置项 | 说明 |
|---|---|---|
| core | dags_folder | DAG文件存放路径 |
| core | executor | 执行器类型 |
| core | catchup_by_default | 默认是否补跑 |
| scheduler | num_runs | 调度器循环次数 |
| webserver | web_server_host | Web服务器地址 |
| database | sql_alchemy_conn | 数据库连接字符串 |
资料来源:airflow-core/src/airflow/cli/cli_config.py
工作流执行流程
graph LR
A1[DAG定义加载] --> A2[解析DAG结构]
A2 --> A3[Scheduler创建DAG Run]
A3 --> A4[任务排队]
A4 --> A5[Executor分发任务]
A5 --> A6[Worker执行任务]
A6 --> A7[更新任务状态]
A7 --> A8[记录到元数据库]
A8 --> A9[Web UI展示状态]最佳实践
DAG编写规范
- 任务唯一性:每个任务的task_id在DAG内必须唯一
- 正确设置start_date:避免使用动态时间作为start_date
- 合理的retry配置:为关键任务配置重试策略
- 使用描述性命名:使用清晰的DAG和任务命名
调度配置建议
| 配置项 | 建议值 | 说明 |
|---|---|---|
| catchup | False | 生产环境建议关闭补跑 |
| max_active_runs | 1-5 | 根据任务复杂度调整 |
| schedule_interval | @daily/@hourly | 根据业务需求选择 |
| concurrency | 16-32 | 根据Worker能力设置 |
总结
Apache Airflow作为强大的工作流编排平台,提供了完整的DAG定义、调度执行、监控告警等功能。理解其核心概念(DAG、Task、Executor、Connection)是有效使用Airflow的基础。通过合理的配置和最佳实践,可以构建稳定可靠的数据管道和工作流系统。
资料来源:[README.md](https://github.com/apache/airflow/blob/main/README.md)
快速开始与安装指南
Apache Airflow 是一个开源的工作流编排平台,用于编程创建、调度和监控批处理工作流。本指南将帮助您快速上手安装和配置 Airflow。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
系统要求
在安装 Apache Airflow 之前,请确保您的系统满足以下基本要求:
| 要求项 | 最低版本 | 说明 |
|---|---|---|
| Python | 3.9+ | Airflow 核心功能需要 |
| pip | 最新版本 | Python 包管理器 |
| 操作系统 | Linux, macOS, Windows (WSL2) | 生产环境推荐使用 Linux |
| 数据库 | SQLite 3.15+, PostgreSQL 14+, MySQL 8.0+ | 元数据库存储 |
| 内存 | 4GB RAM | 最小需求,生产环境更高 |
资料来源:INSTALLING.md:1-50
安装方式概览
Airflow 支持多种安装方式,您可以根据实际需求选择合适的方案:
graph TD
A[安装 Airflow] --> B{使用场景}
B -->|快速测试| C[pip 直接安装]
B -->|容器化部署| D[Docker 镜像]
B -->|深度开发| E[从源码编译]
C --> F[使用约束文件安装]
D --> G[官方镜像或自定义]
E --> H[Breeze 开发环境]资料来源:generated/PYPI_README.md:1-30
使用 pip 安装
基础安装
使用 pip 安装 Airflow 的标准方式是通过 pip install 命令配合约束文件:
pip install 'apache-airflow==3.2.0' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"
约束文件确保所有依赖项的版本兼容,避免因依赖冲突导致的安装失败。约束文件根据 Python 版本不同而有所区别。
资料来源:INSTALLING.md:50-80
安装额外依赖
Airflow 提供了丰富的额外依赖包(extras),可按需安装:
# 安装包含 PostgreSQL 和 Google Cloud 支持的版本
pip install 'apache-airflow[postgres,google]==3.2.0' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"
常用额外依赖包包括:
| 额外包名 | 说明 |
|---|---|
| postgres | PostgreSQL 数据库支持 |
| mysql | MySQL 数据库支持 |
| Google Cloud Platform 集成 | |
| amazon | AWS 集成 |
| kubernetes | Kubernetes Executor 支持 |
| sentry | Sentry 错误追踪集成 |
资料来源:generated/PYPI_README.md:40-60
供应商包安装
除了核心 Airflow 包外,您还可以单独安装各种供应商(Provider)包:
# 从 PyPI 安装供应商包
pip install apache-airflow-providers-google
# 从源码安装供应商包
pip install /path/to/apache-airflow-providers-google*.whl
安装后需要验证文件完整性:
shasum -a 512 package-name-version.tar.gz | diff - package-name-version.tar.gz.sha512
资料来源:devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst:1-50
Docker 安装
官方镜像
Airflow 提供了官方 Docker 镜像,可以快速启动完整的工作环境:
# 拉取最新稳定版镜像
docker pull apache/airflow:latest
# 使用 Celery Executor 启动
docker run -d -p 8080:8080 \
-e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@host/db \
apache/airflow:latest
资料来源:Dockerfile:1-30
Docker Compose 方式
生产环境推荐使用 Docker Compose 进行部署,Airflow 项目提供了标准化的 docker-compose.yaml:
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
command: webserver
ports:
- "8080:8080"
depends_on:
- postgres
构建自定义镜像
使用 Breeze 工具构建生产镜像:
# 使用 breeze 构建生产镜像
breeze prod-image build \
--installation-method sources \
--python 3.10 \
--debian-version bullseye
资料来源:dev/breeze/src/airflow_breeze/commands/production_image_commands.py:1-50
从源码安装
克隆仓库
git clone https://github.com/apache/airflow.git
cd airflow
使用 Breeze 开发环境
Breeze 是 Airflow 官方提供的开发环境管理工具,提供完整的开发、测试和构建体验:
# 进入 breeze shell
./breeze shell
# 运行测试
./breeze testing python --test-type unit-tests --package-filter airflow-core
Breeze 使用 Docker 容器化方式,确保开发环境与生产环境的一致性。
资料来源:airflow-core/src/airflow/_vendor/README.md:1-20
编译分发包
构建 Airflow 分发包:
# 使用 Breeze 构建 sdist 或 wheel 包
breeze release-management build-rc \
--airflow-constraints-mode="constraints" \
--distribution-format both
编译后的包位于 dist/ 目录下。
初始化数据库
安装完成后,需要初始化 Airflow 的元数据库:
# 初始化数据库
airflow db init
# 创建管理员用户
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected]
启动 Airflow
Web 服务器
airflow webserver --port 8080
调度器
在另一个终端启动调度器:
airflow scheduler
独立模式
对于快速测试,可以使用独立模式:
airflow standalone
该命令会启动一个包含 Web 服务器和调度器的单进程环境。
验证安装
运行示例 DAG
Airflow 自带多个示例 DAG,可用于验证安装:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('example_dag', start_date=datetime(2024, 1, 1)) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date'
)
资料来源:airflow-core/src/airflow/example_dags/tutorial.py:1-40
检查状态
# 查看 Airflow 版本
airflow version
# 查看已加载的连接
airflow connections list
# 查看已安装提供程序
airflow providers list
快速安装流程图
graph TD
A[开始安装] --> B{选择安装方式}
B -->|pip| C[安装 Python 依赖]
B -->|Docker| D[安装 Docker]
B -->|源码| E[克隆代码仓库]
C --> F[初始化数据库]
D --> G[拉取镜像]
E --> H[安装依赖]
F --> I[启动服务]
G --> I
H --> I
I --> J[创建管理员账户]
J --> K[访问 Web UI]
K --> L[运行示例 DAG]常见问题排查
依赖冲突
如果遇到依赖冲突问题,确保使用正确的约束文件:
pip install apache-airflow==<VERSION> \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-<VERSION>/constraints-<PYTHON_VERSION>.txt"
数据库连接问题
检查数据库连接字符串配置:
[database]
sql_alchemy_conn = postgresql+psycopg2://user:password@localhost/airflow
端口占用
如果 8080 端口被占用,可更换端口:
airflow webserver --port 8888
后续步骤
安装成功后,建议进行以下操作:
- 配置连接:设置数据库、API 密钥等连接信息
- 配置 Executor:根据需求选择 Executor 类型(Local、Celery、Kubernetes)
- 配置告警:设置邮件或其他告警渠道
- 学习 DAG 编写:参考官方教程编写第一个生产 DAG
- 了解最佳实践:学习 Airflow 的安全和性能最佳实践
相关文档链接
- 官方安装文档:https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
- Docker 镜像:https://github.com/apache/airflow/blob/main/Dockerfile
- Breeze 开发工具:https://github.com/apache/airflow/blob/main/dev/breeze/README.md
资料来源:[INSTALLING.md:1-50](https://github.com/apache/airflow/blob/main/INSTALLING.md)
系统架构详解
Apache Airflow 是一个开源的工作流编排平台,采用分布式架构设计,支持大规模任务调度与执行。Airflow 3.0 在架构上进行了重大升级,引入了更加模块化的组件设计,将 DAG 处理、任务调度和执行进行了清晰的分离。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
1. 概述
Apache Airflow 是一个开源的工作流编排平台,采用分布式架构设计,支持大规模任务调度与执行。Airflow 3.0 在架构上进行了重大升级,引入了更加模块化的组件设计,将 DAG 处理、任务调度和执行进行了清晰的分离。
核心架构由以下几个主要组件构成:
- Web服务器:提供用户界面
- 调度器(Scheduler):负责 DAG 调度和任务分发
- DAG处理器(DagFileProcessorManager):负责解析和验证 DAG 文件
- 执行器(Executor):负责实际任务执行
- 元数据库:存储 DAG 定义、任务状态和执行历史
资料来源:airflow-core/docs/img/diagram_basic_airflow_architecture.py:1-50
2. 核心组件架构
2.1 DAG 处理系统
DAG 处理系统是 Airflow 的核心子系统之一,负责从文件系统读取、解析和验证 DAG 定义。
graph TD
A[DAG 文件目录] --> B[DagFileProcessorManager]
B --> C[文件扫描器]
B --> D[文件处理器池]
C -->|检测文件变更| D
D --> E[Processor Subprocess]
E --> F[DAG 对象]
F --> G[Import Timeout Manager]
G --> H[数据库更新]
style B fill:#e1f5fe
style D fill:#fff3e0
style E fill:#e8f5e9关键组件:
| 组件 | 文件位置 | 职责 |
|---|---|---|
| DagFileProcessorManager | dag_processing/manager.py | 管理 DAG 文件处理的生命周期 |
| DagFileProcessor | dag_processing/processor.py | 执行单个 DAG 文件的解析 |
| ImportTimeoutManager | dag_processing/manager.py | 监控 DAG 导入超时 |
资料来源:airflow-core/src/airflow/dag_processing/manager.py:1-100
#### 2.1.1 DAG 文件扫描
DagFileProcessorManager 维护一个文件扫描循环,定期检查配置的 DAG 目录中的文件变更:
# 扫描间隔配置
processor_poll_interval -> scheduler_idle_sleep_time
默认配置变更说明:
| 旧配置项 | 新配置项 | 说明 |
|---|---|---|
scheduler.processor_poll_interval | scheduler.scheduler_idle_sleep_time | 调度器空闲休眠时间 |
scheduler.create_cron_data_intervals | scheduler.create_cron_data_intervals | 默认值由 True 改为 False |
scheduler.create_delta_data_intervals | scheduler.create_delta_data_intervals | 默认值由 True 改为 False |
资料来源:airflow-core/src/airflow/cli/commands/config_command.py:30-80
2.2 调度器系统
调度器是 Airflow 的大脑,负责决定何时运行哪些任务。
graph TD
A[SchedulerJobRunner] --> B[关键区域锁]
B --> C{获取锁}
C -->|成功| D[处理过期 DAG]
C -->|失败| E[等待]
D --> F[调度待处理任务]
F --> G[发送任务到执行器]
G --> H[更新任务状态]
H --> I[释放锁]
I --> B
style A fill:#fff8e1
style B fill:#ffecb3
style G fill:#c8e6c9#### 2.2.1 调度器关键流程
- 获取关键区域锁:确保多调度器环境下的安全性
- 处理过期 DAG:清理陈旧的 DAG 数据
- 调度任务:根据依赖关系和执行时间分发任务
- 孤儿任务处理:检测并处理孤立任务
调度器指标:
| 指标名称 | 类型 | 说明 |
|---|---|---|
scheduler.critical_section_busy | counter | 关键区域锁竞争次数 |
scheduler.orphaned_tasks.adopted | counter | 被采纳的孤儿任务数 |
scheduler.orphaned_tasks.cleared | counter | 被清理的孤儿任务数 |
scheduler.tasks.killed_externally | counter | 外部杀死任务数 |
资料来源:airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:50-80
2.3 任务执行系统
任务执行通过 Task SDK 实现,采用远程进程通信模式。
graph LR
A[任务实例] --> B[任务状态存储]
A --> C[任务参数]
A --> D[执行上下文]
B -.->|get| E[...]
B -.->|set| F[...]
B -.->|delete| G[...]
style A fill:#e3f2fd
style B fill:#bbdefb#### 2.3.1 任务状态管理
任务状态通过 TaskState 类进行管理,提供键值对存储:
class TaskState:
def get(self, key: str) -> str | None
def set(self, key: str, value: str) -> None
def delete(self, key: str) -> None
def clear(self, all_map_indices: bool = False) -> None
| 方法 | 功能 | 说明 |
|---|---|---|
get(key) | 获取值 | 返回存储的值,键不存在返回 None |
set(key, value) | 设置值 | 写入或覆盖指定键的值 |
delete(key) | 删除值 | 删除单个键,键不存在无操作 |
clear() | 清空 | 删除所有 map indices 的状态 |
资料来源:task-sdk/src/airflow/sdk/execution_time/context.py:80-120
3. CLI 命令系统
Airflow 提供丰富的命令行接口,用于管理和操作各个组件。
3.1 命令分类
| 命令组 | 功能 |
|---|---|
airflow dags | DAG 管理(backfill, list-runs, pause, unpause, test) |
airflow tasks | 任务操作(run, failed-deps, render, test) |
airflow connections | 连接管理 |
airflow providers | 提供者信息 |
airflow config | 配置查看 |
airflow db-manager | 数据库管理器 |
资料来源:airflow-core/src/airflow/cli/cli_config.py:50-150
3.2 任务命令详解
| 命令 | 功能 | 关键参数 |
|---|---|---|
airflow tasks run | 运行单个任务 | --dag-id, --task-id, --logical-date-or-run-id |
airflow tasks render | 渲染任务模板 | --dag-id, --task-id, --logical-date-or-run-id |
airflow tasks test | 测试任务(不记录状态) | --dag-id, --task-id, --logical-date-or-run-id |
airflow tasks failed-deps | 检查未满足依赖 | --dag-id, --task-id, --logical-date-or-run-id |
测试命令特点:
- 不检查依赖关系
- 不记录状态到数据库
- 适用于快速验证任务逻辑
资料来源:airflow-core/src/airflow/cli/cli_config.py:150-200
4. 配置系统
4.1 关键配置变更(Airflow 2.x → 3.0)
| 配置项 | 变更类型 | 旧值 | 新值 |
|---|---|---|---|
scheduler.catchup_by_default | 默认值变更 | True | False |
scheduler.create_cron_data_intervals | 默认值变更 | True | False |
scheduler.create_delta_data_intervals | 默认值变更 | True | False |
scheduler.processor_poll_interval | 重命名 | - | scheduler_idle_sleep_time |
scheduler.deactivate_stale_dags_interval | 重命名 | - | parsing_cleanup_interval |
scheduler.statsd_on | 重命名 | - | metrics.statsd_on |
scheduler.max_threads | 重命名 | - | dag_processor.parsing_processes |
资料来源:airflow-core/src/airflow/cli/commands/config_command.py:20-70
4.2 DAG Processing 配置
dag_processing:
manager_stalls: Number of stalled DagFileProcessorManager
processor_timeouts: DAG 处理超时次数
dag_file_refresh_error: DAG 文件加载失败次数
5. 执行架构图
graph TD
subgraph Web层
UI[Web UI]
end
subgraph 调度层
Scheduler[Scheduler Job Runner]
DagProcessor[DagFileProcessorManager]
ImportTimeout[Import Timeout Manager]
end
subgraph 执行层
Executor[Executor]
Worker[Worker Process]
TaskRunner[Task Runner]
end
subgraph 存储层
DB[(Metadata DB)]
DAGFS[DAG File System]
end
UI <-->|API| Scheduler
Scheduler <-->|调度决策| Executor
DagProcessor <-->|解析DAG| DAGFS
DagProcessor <-->|写入| DB
Executor <-->|分发任务| Worker
Worker <-->|执行| TaskRunner
TaskRunner <-->|状态更新| DB
style Scheduler fill:#ff9800,color:#fff
style DagProcessor fill:#4caf50,color:#fff
style Executor fill:#2196f3,color:#fff6. 监控指标
Airflow 提供全面的监控指标体系:
6.1 DAG 处理指标
| 指标 | 描述 |
|---|---|
dag_processing.manager_stalls | DagFileProcessorManager 停滞次数 |
dag_file_refresh_error | DAG 文件刷新错误数 |
dag_file_processor_timeouts | DAG 文件处理器超时(已废弃) |
6.2 调度器指标
| 指标 | 描述 |
|---|---|
scheduler.critical_section_busy | 调度器关键区域忙次数 |
scheduler.orphaned_tasks.adopted | 被采纳的孤儿任务 |
scheduler.orphaned_tasks.cleared | 被清理的孤儿任务 |
scheduler.tasks.killed_externally | 外部杀死任务数 |
6.3 任务执行指标
| 指标 | 描述 |
|---|---|
ti.start | 任务启动次数 |
ti.finish | 任务完成次数 |
资料来源:airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:30-100
7. 总结
Apache Airflow 采用分布式架构设计,核心组件包括:
- DagFileProcessorManager:负责 DAG 文件的解析和验证
- SchedulerJobRunner:负责任务的调度和分发
- Executor:负责实际任务执行
- Task SDK:提供任务运行时的上下文管理
各组件通过消息队列和数据库进行通信,确保系统的高可用性和可扩展性。
资料来源:[airflow-core/docs/img/diagram_basic_airflow_architecture.py:1-50]()
核心组件详解
Apache Airflow 是一个开源的工作流编排平台,其核心组件构成了整个任务调度和执行的基础架构。这些组件包括DAG(有向无环图)、DAGRun(DAG运行实例)、Connection(连接)、Variable(变量)和Pool(资源池)。理解这些核心组件对于深入掌握Airflow的工作原理至关重要。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow 是一个开源的工作流编排平台,其核心组件构成了整个任务调度和执行的基础架构。这些组件包括DAG(有向无环图)、DAGRun(DAG运行实例)、Connection(连接)、Variable(变量)和Pool(资源池)。理解这些核心组件对于深入掌握Airflow的工作原理至关重要。
DAG 模型
什么是 DAG
DAG(Directed Acyclic Graph,有向无环图)是Airflow中工作流定义的核心抽象。它代表了一组需要执行的任务以及任务之间的依赖关系。
DAG 关键属性
| 属性 | 类型 | 说明 |
|---|---|---|
| dag_id | str | DAG的唯一标识符 |
| description | str | DAG的描述信息 |
| schedule_interval | - | 调度间隔配置 |
| start_date | datetime | DAG开始执行的日期 |
| end_date | datetime | DAG结束执行的日期(可选) |
| catchup | bool | 是否补跑历史数据 |
| max_active_runs | int | 最大并发运行数 |
| concurrency | int | 最大并发任务数 |
| is_paused | bool | DAG是否暂停 |
DAG 的生命周期
graph LR
A[创建DAG] --> B[DAG解析]
B --> C[调度器调度]
C --> D[创建DAGRun]
D --> E[执行任务]
E --> F[DAGRun完成]
F --> G{是否继续调度}
G -->|是| C
G -->|否| H[DAG结束]DAGRun 模型
概述
DAGRun是DAG的一次具体执行实例。每次触发或调度DAG时,都会创建一个DAGRun实例来跟踪该次执行的状态。
DAGRun 状态
| 状态 | 说明 |
|---|---|
| QUEUED | 排队等待执行 |
| RUNNING | 正在执行 |
| SUCCESS | 成功完成 |
| FAILED | 执行失败 |
DAGRun 类型
| 类型 | 说明 |
|---|---|
| SCHEDULED | 由调度器正常调度触发 |
| MANUAL | 手动触发的运行 |
| BACKFILL | 回填作业执行 |
核心属性
DAGRun包含以下关键属性用于追踪执行信息:
- run_id: 每次运行的唯一标识
- state: 当前运行状态
- execution_date: 执行日期
- start_date: 实际开始时间
- end_date: 结束时间
- triggering_user_name: 触发运行的用户名
Connection 连接管理
连接类型
Airflow支持多种连接类型,用于与外部系统进行交互:
| 连接类型 | 说明 | 默认配置 |
|---|---|---|
| postgres | PostgreSQL数据库 | 需要host、schema、login、password |
| mysql | MySQL数据库 | 需要host、schema、login、password |
| google_cloud_platform | GCP服务 | 使用默认schema |
| http | HTTP端点 | 需要host地址 |
| ftp | FTP服务器 | 需要host、login、password |
| ssh | SSH连接 | 需要host、login、key_file |
| redis | Redis缓存 | 需要host、port |
| fs | 文件系统 | 需要path配置 |
| hive_cli | Hive CLI | 需要host、port、schema |
| hiveserver2 | HiveServer2 | 需要host、schema、port |
| gremlin | Gremlin图数据库 | 需要host、port |
| facebook_social | Facebook社交 | 需要app_id、app_secret等 |
| iceberg | Iceberg表格式 | 需要host配置 |
连接创建示例
Connection(
conn_id="gcp_default",
conn_type="google_cloud_platform",
schema="default"
)
Variable 变量管理
变量概述
Variable用于存储和检索Airflow中的键值对配置信息,可在DAG和任务之间共享。
变量操作
| 操作 | 说明 | 命令 |
|---|---|---|
| list | 列出所有变量 | airflow variables list |
| get | 获取变量值 | airflow variables get <key> |
| set | 设置变量 | airflow variables set <key> <value> |
| delete | 删除变量 | airflow variables delete <key> |
| import | 批量导入 | airflow variables import <file> |
| export | 批量导出 | airflow variables export <file> |
JSON序列化支持
Variable支持JSON序列化,可存储复杂数据结构:
airflow variables set my_json '{"key": "value", "list": [1, 2, 3]}'
Pool 资源池管理
池的作用
Pool用于限制同时执行的任务数量,控制资源使用并管理并发度。
内置默认池
| 池名称 | 默认槽位数 | 说明 |
|---|---|---|
| default_pool | 根据配置 | 用于未指定池的任务 |
槽位管理
graph TB
A[任务请求] --> B{槽位可用?}
B -->|是| C[分配槽位]
B -->|否| D[排队等待]
C --> E[执行任务]
E --> F[释放槽位]
D --> G{槽位释放?}
G -->|是| C
G -->|否| DCLI 命令行接口
可用命令组
Airflow提供了丰富的CLI命令用于管理核心组件:
| 命令组 | 说明 | 主要子命令 |
|---|---|---|
| dag | DAG管理 | list, details, list-runs, pause, unpause, backfill, test |
| dagrun | 运行管理 | list, trigger, clear |
| connections | 连接管理 | list, add, delete, edit |
| variables | 变量管理 | list, get, set, delete, import, export |
| pools | 池管理 | list, set |
| config | 配置查看 | list, get, show |
| providers | 提供者信息 | list, details |
| assets | 资产管理 | list, details, materialize |
常用命令示例
# 列出所有DAG
airflow dags list
# 触发DAG运行
airflow dags trigger <dag_id>
# 暂停DAG
airflow dags pause <dag_id>
# 列出所有连接
airflow connections list
# 导出变量
airflow variables export variables.json
配置变更说明
Airflow 3.0 配置变化
从Airflow 2.x迁移到3.0时,以下配置项发生了变化:
| 旧配置项 | 新配置项 | 变化类型 |
|---|---|---|
| scheduler.catchup_by_default | scheduler.catchup | 默认值改为False |
| scheduler.create_cron_data_intervals | - | 默认值改为False |
| scheduler.create_delta_data_intervals | - | 默认值改为False |
| scheduler.processor_poll_interval | scheduler.scheduler_idle_sleep_time | 重命名 |
| scheduler.deactivate_stale_dags_interval | scheduler.parsing_cleanup_interval | 重命名 |
| scheduler.statsd_on | metrics.statsd_on | 重命名 |
| scheduler.max_threads | dag_processor.parsing_processes | 重命名 |
catchup 行为变化
在 Airflow 3.0 中,catchup的默认值为False。这意味着未明确设置catchup参数的 DAG 默认不会进行历史数据补跑。如果DAG依赖补跑行为,需要在airflow.cfg的scheduler部分将该配置设置为True。资料来源:airflow-core/src/airflow/cli/commands/config_command.py
数据模型关系
erDiagram
DAG ||--o{ DAGRun : creates
DAGRun ||--o{ TaskInstance : contains
TaskInstance ||--|| Pool : uses
Connection ||--o{ TaskInstance : referenced_by
Variable ||--o{ TaskInstance : accessed_by
DAG {
string dag_id
string description
string schedule_interval
datetime start_date
boolean is_paused
}
DAGRun {
string run_id
string state
datetime execution_date
datetime start_date
datetime end_date
}
TaskInstance {
string task_id
string state
datetime start_date
datetime end_date
}
Pool {
string pool
int slots
string description
}
Connection {
string conn_id
string conn_type
string host
string schema
}
Variable {
string key
string val
string description
}安全最佳实践
连接凭证管理
- 使用Fernet加密: Airflow使用Fernet加密存储敏感连接凭证
- 轮换加密密钥: 定期执行
airflow rotate-fernet-key更新加密密钥
airflow rotate-fernet-key
敏感信息处理
| 类型 | 存储方式 | 建议 |
|---|---|---|
| 密码 | 加密存储 | 使用变量或连接extra字段 |
| API密钥 | 加密存储 | 使用Secret Backend |
| 连接凭证 | 加密存储 | 使用连接管理界面 |
总结
Apache Airflow的核心组件构成了一个完整的工作流编排系统。DAG作为工作流定义的核心,依赖调度器生成DAGRun实例,而DAGRun则管理具体任务的执行。Connection、Variable和Pool等组件提供了与外部系统交互、配置管理和资源控制的能力。深入理解这些组件及其相互关系,对于熟练使用Airflow进行工作流开发和运维至关重要。
来源:https://github.com/apache/airflow / 项目说明书
执行器类型与选择
执行器(Executor)是 Apache Airflow 架构中的核心组件,负责实际执行任务实例。每个执行器实现了特定的任务调度和执行策略,从单机的本地执行到分布式的集群环境,Airflow 提供了多种执行器类型以满足不同场景的需求。执行器的选择直接影响工作流的性能、可扩展性和可靠性。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
执行器(Executor)是 Apache Airflow 架构中的核心组件,负责实际执行任务实例。每个执行器实现了特定的任务调度和执行策略,从单机的本地执行到分布式的集群环境,Airflow 提供了多种执行器类型以满足不同场景的需求。执行器的选择直接影响工作流的性能、可扩展性和可靠性。
执行器系统遵循统一的接口规范,所有执行器都继承自基础执行器类,定义了任务提交、状态查询、心跳维护等标准方法。资料来源:airflow-core/src/airflow/executors/executor_loader.py:1-50
执行器架构
核心组件关系
graph TD
DAG[DAG 有向无环图] --> Scheduler[调度器]
Scheduler --> ExecutorLoader[执行器加载器]
ExecutorLoader --> Executor[执行器实例]
Executor --> TaskInstance[任务实例]
Executor --> Worker[Worker 节点]
Executor --> LocalExecutor[LocalExecutor]
Executor --> CeleryExecutor[CeleryExecutor]
Executor --> KubernetesExecutor[KubernetesExecutor]
Executor --> SequentialExecutor[SequentialExecutor]
LocalExecutor --> LocalWorker[本地进程]
CeleryExecutor --> CeleryWorker[Celery Worker]
KubernetesExecutor --> K8sPod[K8s Pod]执行器类型总览
| 执行器类型 | 说明 | 适用场景 | 并发能力 |
|---|---|---|---|
| SequentialExecutor | 顺序执行器 | 开发调试、单节点 | 1 |
| LocalExecutor | 本地执行器 | 单机生产、小规模任务 | 多进程 |
| CeleryExecutor | Celery 执行器 | 分布式、跨机器 | 集群规模 |
| KubernetesExecutor | K8s 执行器 | 云原生、自动扩缩容 | Pod 数量 |
| LocalKubernetesExecutor | 本地 K8s 执行器 | 测试环境 | 可配置 |
| CeleryKubernetesExecutor | 混合执行器 | 灵活调度 | 集群规模 |
资料来源:airflow-core/src/airflow/executors/executor_loader.py:50-80
执行器加载机制
执行器加载器职责
ExecutorLoader 是 Airflow 中负责动态加载和解析执行器配置的中央组件。它支持多种执行器配置格式,包括简单的单执行器名称和复杂的团队别名配置。
执行器加载器的主要职责包括:
- 解析
airflow.cfg中的执行器配置 - 验证执行器名称的有效性
- 实例化对应的执行器类
- 处理执行器别名和团队配置
# 执行器名称解析核心逻辑
if module_or_name in CORE_EXECUTOR_NAMES:
executor_names_per_team.append(
ExecutorName(
alias=alias,
module_path=cls.executors[module_or_name],
team_name=team_name
)
)
资料来源:airflow-core/src/airflow/executors/executor_loader.py:80-120
执行器配置格式
执行器配置支持以下几种格式:
graph LR
A[配置文件] --> B[执行器名称]
B --> C[单执行器]
B --> D[别名配置]
B --> E[团队配置]
C --> C1[LocalExecutor]
C --> C2[CeleryExecutor]
D --> D1[MyAlias:LocalExecutor]
D1 --> D2[alias=MyAlias<br/>module=LocalExecutor]
E --> E1[team:executor]
E1 --> E2[team_name=team<br/>executor=executor]#### 简单配置
executor = LocalExecutor
#### 别名配置
executor = MyLocalExecutor:LocalExecutor
格式为 别名:执行器类型,其中别名用于日志和监控标识。
#### 团队配置
[executors]
TeamA = TeamA: CeleryExecutor
TeamB = TeamB: KubernetesExecutor
资料来源:airflow-core/src/airflow/executors/executor_loader.py:100-150
核心执行器类型详解
SequentialExecutor
顺序执行器是最基础的执行器,以单线程顺序方式执行任务。此执行器主要用于以下场景:
- 开发环境:在没有多进程支持的环境中运行
- 调试场景:需要逐个追踪任务执行流程
- 最小化部署:资源受限的单机环境
# SequentialExecutor 核心特性
- 单进程顺序执行
- 无并发能力
- 任务队列单一
- 无需外部依赖
资料来源:airflow-core/src/airflow/executors/executor_loader.py:150-180
LocalExecutor
本地执行器在单机环境下提供多进程并发执行能力。它使用 Python 的 multiprocessing 模块创建工作进程池,每个工作进程可以并行执行多个任务。
graph TD
LocalExecutor --> Worker1[Worker Process 1]
LocalExecutor --> Worker2[Worker Process 2]
LocalExecutor --> WorkerN[Worker Process N]
Worker1 --> Task1[Task Instance]
Worker1 --> Task2[Task Instance]
Worker2 --> Task3[Task Instance]
WorkerN --> Task4[Task Instance]
Worker1 -.-> Queue[任务队列]
Worker2 -.-> Queue
WorkerN -.-> Queue配置参数:
| 参数 | 说明 | 默认值 |
|---|---|---|
| parallelism | 并行任务数上限 | CPU 核心数 |
| local_worker_kwargs | 工作进程额外参数 | {} |
资料来源:airflow-core/src/airflow/executors/executor_loader.py:180-220
CeleryExecutor
CeleryExecutor 是分布式执行器,使用 Celery 作为消息队列来处理跨多台机器的任务分发和执行。Celery 是 Python 生态中成熟的任务队列系统,支持 Redis、RabbitMQ 等多种消息代理。
graph TD
Scheduler[Airflow Scheduler] -->|任务提交| CeleryBroker[Celery Broker<br/>Redis/RabbitMQ]
CeleryBroker --> Worker1[Celery Worker 1]
CeleryBroker --> Worker2[Celery Worker 2]
CeleryBroker --> WorkerN[Celery Worker N]
Worker1 -->|执行结果| ResultBackend[Result Backend]
Worker2 -->|执行结果| ResultBackend
WorkerN -->|执行结果| ResultBackend
ResultBackend -->|状态同步| Scheduler适用场景:
- 需要水平扩展的任务处理
- 跨多台机器的分布式部署
- 需要任务优先级和重试策略
- 长时间运行的后台任务
依赖组件:
- Celery 消息代理(Redis 或 RabbitMQ)
- Celery Worker 节点
- 结果后端(数据库或缓存)
资料来源:providers/celery/src/airflow/providers/celery/executors/celery_executor.py:1-100
KubernetesExecutor
KubernetesExecutor 是原生运行在 Kubernetes 集群上的执行器,每个任务都在独立的 Pod 中执行。这种设计提供了极佳的隔离性和资源弹性。
graph TD
Scheduler[Scheduler] --> K8sAPI[Kubernetes API]
K8sAPI --> Pod1[Task Pod 1]
K8sAPI --> Pod2[Task Pod 2]
K8sAPI --> PodN[Task Pod N]
Pod1 -->|完成| K8sAPI
Pod2 -->|完成| K8sAPI
PodN -->|完成| K8sAPI
K8sAPI --> ConfigMap[ConfigMap<br/>任务定义]
K8sAPI --> Secret[Secret<br/>连接凭证]核心优势:
| 特性 | 说明 |
|---|---|
| 任务隔离 | 每个任务独立 Pod,避免资源冲突 |
| 自动扩缩容 | 根据任务队列动态创建/销毁 Pod |
| 资源控制 | 支持 CPU、内存 Limits 和 Requests 配置 |
| 亲和性 | 支持节点亲和性、污点容忍等调度策略 |
| 生命周期 | 任务完成即销毁,资源高效利用 |
配置选项:
| 参数 | 说明 |
|---|---|
| namespace | Kubernetes 命名空间 |
| parallelism | 最大并发 Pod 数 |
| worker_pods_creation_batch_size | 批量创建大小 |
| kube_client_request_args | K8s 客户端参数 |
执行器配置管理
配置文件位置
执行器配置通过 airflow.cfg 文件管理,主要配置项位于 [core] 部分:
[core]
executor = LocalExecutor
Breeze 开发环境配置
在 Airflow Breeze 开发环境中,可以使用统一的命令行选项配置执行器:
# 查看执行器列表
breeze shell --executor
# 使用指定执行器启动
breeze shell --executor KubernetesExecutor
支持的执行器选项:
| 选项 | 说明 | 可用值 |
|---|---|---|
| --executor | 执行器类型 | LocalExecutor, CeleryExecutor, KubernetesExecutor 等 |
资料来源:dev/breeze/src/airflow_breeze/commands/common_options.py:100-150
执行器别名与团队配置
Airflow 支持通过 ExecutorLoader 实现执行器别名和团队级别的配置:
class ExecutorName(NamedTuple):
alias: str | None
module_path: str
team_name: str | None
配置示例:
[executors]
# 格式: 别名 = 模块路径
my_alias = airflow.executors.local_executor.LocalExecutor
team_default = airflow.providers.celery.executors.celery_executor.CeleryExecutor
配置验证逻辑:
# 检查是否为内置执行器名称
if module_or_name in CORE_EXECUTOR_NAMES:
executor_names_per_team.append(
ExecutorName(
alias=alias,
module_path=cls.executors[module_or_name],
team_name=team_name
)
)
# 检查是否为模块路径
elif "." not in module_or_name:
raise AirflowConfigException(
"Incorrectly formatted executor configuration..."
)
资料来源:airflow-core/src/airflow/executors/executor_loader.py:200-260
执行器选择指南
决策流程
graph TD
Start[开始选择执行器] --> Q1{部署规模?}
Q1 -->|单机/开发| Q2{需要并发?}
Q1 -->|分布式| Q3{K8s 环境?}
Q1 -->|大规模| Q4{需要灵活调度?}
Q2 -->|是| LocalExecutor
Q2 -->|否| SequentialExecutor
Q3 -->|是| KubernetesExecutor
Q3 -->|否| CeleryExecutor
Q4 -->|是| CeleryKubernetesExecutor
Q4 -->|否| CeleryExecutor场景推荐
| 场景 | 推荐执行器 | 原因 |
|---|---|---|
| 本地开发调试 | SequentialExecutor | 简单、无依赖 |
| 单机生产环境 | LocalExecutor | 多进程并发、资源可控 |
| 小型集群 | CeleryExecutor | 配置简单、成熟稳定 |
| 云原生/K8s | KubernetesExecutor | 弹性扩缩容、任务隔离 |
| 混合云环境 | CeleryKubernetesExecutor | 灵活调度、兼容性好 |
配置示例
#### 本地执行器配置
[core]
executor = LocalExecutor
[local_executor]
parallelism = 8
#### Celery 执行器配置
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = redis://redis:6379/1
worker_concurrency = 16
#### Kubernetes 执行器配置
[core]
executor = KubernetesExecutor
[kubernetes]
namespace = airflow
parallelism = 32
资料来源:airflow-core/src/airflow/executors/executor_loader.py:260-320
命令行接口
查看执行器信息
通过 Airflow CLI 可以查看已配置的执行器信息:
# 查看 Airflow 信息(包含执行器)
airflow info
# 查看提供商执行器列表
airflow providers executors
可用的 providers 子命令:
| 命令 | 说明 |
|---|---|
providers list | 列出所有提供商 |
providers executors | 列出可用的执行器 |
providers details | 提供商详细信息 |
资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100
CLI 配置定义
执行器相关的 CLI 命令在 cli_config.py 中定义:
# 提供商执行器列表命令
ActionCommand(
name="executors",
help="Get information about executors provided",
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
)
命令行参数:
| 参数 | 短标识 | 说明 |
|---|---|---|
| --output | -o | 输出格式(table、json、yaml) |
| --verbose | -v | 详细输出模式 |
资料来源:airflow-core/src/airflow/cli/cli_config.py:100-200
默认连接配置
执行器运行需要配置相应的连接凭证。Airflow 在初始化数据库时提供默认连接配置:
开发环境默认连接
Connection(
conn_id="fs_default",
conn_type="fs",
extra='{"path": "/"}',
)
Connection(
conn_id="google_cloud_default",
conn_type="google_cloud_platform",
schema="default",
)
Connection(
conn_id="http_default",
conn_type="http",
host="https://www.httpbin.org/",
)
生产环境注意事项
在生产环境中部署时,需要:
- 配置真实的数据库连接(避免 SQLite)
- 根据执行器类型配置相应的消息代理连接
- 设置正确的认证凭证和 SSL 选项
资料来源:airflow-core/src/airflow/utils/db.py:1-100
最佳实践
执行器选择原则
- 从简单开始:开发环境使用 LocalExecutor,生产环境根据需求升级
- 资源评估:估算并发任务数、任务运行时长、资源消耗
- 运维能力:评估团队对消息队列或容器编排的熟悉程度
- 成本考虑:云环境优先考虑 KubernetesExecutor 以获得弹性
- 扩展性预留:选择具有一定扩展空间的执行器类型
性能优化建议
| 优化项 | LocalExecutor | CeleryExecutor | KubernetesExecutor |
|---|---|---|---|
| 并发数 | parallelism 参数 | worker_concurrency | parallelism 参数 |
| 心跳间隔 | 降低网络延迟 | 合理设置超时 | 配置 liveness 探针 |
| 资源限制 | 进程级限制 | Worker 级限制 | Pod 级限制 |
| 监控 | 进程监控 | Celery Events | K8s Events |
常见问题排查
graph TD
Issue[问题排查] --> Check1{执行器启动失败?}
Issue --> Check2{任务堆积?}
Issue --> Check3{连接超时?}
Check1 --> Solution1[检查配置<br/>验证依赖]
Check2 --> Solution2[增加并发数<br/>优化任务拆分]
Check3 --> Solution3[检查网络<br/>验证凭证]资料来源:airflow-core/src/airflow/executors/executor_loader.py:320-400
总结
Apache Airflow 的执行器系统提供了灵活的任务执行框架,从单机的顺序执行到分布式的 Kubernetes 集群执行,能够满足不同规模和场景的需求。执行器的选择应综合考虑部署环境、团队能力、运维成本和扩展性要求。
对于新项目,建议先使用 LocalExecutor 进行开发和测试,待系统稳定后再根据实际负载和扩展需求迁移到 CeleryExecutor 或 KubernetesExecutor。在选择执行器时,务必参考官方文档中的具体版本要求,确保所有依赖组件正确配置。
资料来源:airflow-core/src/airflow/executors/executor_loader.py:400-450
资料来源:[airflow-core/src/airflow/executors/executor_loader.py:50-80]()
数据流转与交换机制
Apache Airflow 中的数据流转与交换机制是工作流编排的核心能力,允许任务之间传递数据、状态和事件。该机制主要由三大支柱组成:XCom(跨任务通信)、Asset(资产) 和 Callback(回调)。这些机制共同构成了 Airflow 工作流中数据流动的完整生态,使得分布式任务执行环境下的数据交换变得可靠且可追踪。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow 中的数据流转与交换机制是工作流编排的核心能力,允许任务之间传递数据、状态和事件。该机制主要由三大支柱组成:XCom(跨任务通信)、Asset(资产) 和 Callback(回调)。这些机制共同构成了 Airflow 工作流中数据流动的完整生态,使得分布式任务执行环境下的数据交换变得可靠且可追踪。
XCom 机制允许任务在执行完成后向元数据库写入值,后续任务可以检索这些值进行后续处理。Asset 机制提供了一种基于数据源状态变化触发 DAG 执行的优雅方式。Callback 机制则支持任务生命周期中的关键节点执行自定义逻辑,如任务失败时发送告警。
XCom 机制详解
XCom 核心概念
XCom(Cross-Communication)是 Airflow 中实现任务间数据交换的主要机制。它通过 airflow.models.xcom.BaseXCom 类实现,数据默认存储在 Airflow 元数据库中,支持任务执行器在分布式环境下共享数据。
graph TD
A[任务 A 执行] -->|push| B[XCom 写入]
B --> C[(元数据库)]
C -->|pull| D[任务 B 读取]
D --> E[任务 B 继续执行]
F[参数传递] -.->|包含| B
G[返回值自动推送] -.->|包含| BXCom 支持推送和拉取两种操作模式。任务可以通过 xcom_push() 方法显式推送数据,也可以通过返回 Python 对象让 Airflow 自动推送。拉取操作使用 xcom_pull() 方法,可以按任务 ID 或 DAG run ID 筛选特定数据。
XCom 数据模型
XCom 的数据模型定义在 BaseXCom 类中,包含以下核心字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
| key | String | XCom 值的键名,用于标识数据 |
| value | JSON | 序列化的数据值,支持基础类型和嵌套结构 |
| timestamp | DateTime | XCom 创建时间戳 |
| task_id | String | 推送该 XCom 的任务 ID |
| dag_id | String | 所属 DAG 的唯一标识符 |
| run_id | String | DAG Run 的执行 ID |
| map_index | Integer | Map 任务中的索引值(-1 表示非 Map 任务) |
资料来源:airflow-core/src/airflow/models/xcom.py:1-100
XCom API 与操作方法
#### 推送数据
# 方式一:显式推送
task_instance.xcom_push(key="result", value={"status": "success", "data": [1, 2, 3]})
# 方式二:通过 return 语句自动推送
def extract_data(**context):
return {"records": 100, "source": "api"}
#### 拉取数据
# 按任务 ID 拉取最新值
result = task_instance.xcom_pull(task_ids=["transform_data"])[0]
# 按 task_ids 和 key 精确拉取
value = task_instance.xcom_pull(task_ids=["extract"], key="record_count")
#### 使用 XComArg 简化操作
XComArg 提供了一种声明式的数据获取方式,特别适合与 Jinja 模板结合使用:
from airflow.models.baseoperator import xcom_arg
extract_task >> transform_task
XCom 的序列化与反序列化
XCom 值在存储前会被序列化为 JSON 格式,检索时自动反序列化。支持的类型包括:
| Python 类型 | 序列化格式 | 限制说明 |
|---|---|---|
| str | JSON String | 无限制 |
| int, float | JSON Number | 无限制 |
| bool | JSON Boolean | 无限制 |
| list, tuple | JSON Array | 元素类型需可序列化 |
| dict | JSON Object | 键值需可序列化 |
| datetime | ISO 8601 字符串 | 需正确处理时区 |
| bytes | Base64 编码 | 大小受 max_xcom_size 限制 |
资料来源:airflow-core/src/airflow/models/xcom.py:150-200
XCom 配置与性能优化
XCom 的行为可通过以下配置参数调整:
| 参数 | 默认值 | 说明 |
|---|---|---|
max_xcom_size | 48KB | 单个 XCom 值的最大大小 |
xcom_backend | BaseXCom | 自定义 XCom 后端类 |
enable_xcom_pickling | True | 是否允许 pickle 序列化(非 JSON) |
在高并发场景下,建议:
- 避免传输大型数据集,将大数据存储在外部存储(如 S3、HDFS),XCom 只传递引用
- 使用自定义 XCom 后端将数据存储到 GCS、Redis 等外部系统
- 及时清理过期 XCom 数据以减少元数据库负担
Asset 资产机制
Asset 概述
Asset 是 Airflow 2.10+ 引入的新一代数据依赖管理机制,它代表 DAG 外部的数据源或数据目的地。通过 Asset,Airflow 可以感知数据源的变化并自动触发 DAG 执行,实现数据驱动的工作流编排。
graph LR
A[数据源变化] -->|触发| B[Asset 事件]
B --> C{DAG 调度}
C -->|是| D[创建 DAG Run]
C -->|否| E[等待下次检查]
F[Asset 定义] -->|关联| G[DAG Schedule]Asset 数据模型
Asset 模型定义在 airflow.models.asset 模块中:
class Asset:
name: str
uri: str
group: str | None
extra: dict | None
created_at: datetime
updated_at: datetime
资料来源:airflow-core/src/airflow/models/asset.py:1-50
Asset 与 DAG Schedule 绑定
DAG 可以通过 schedule 参数与一个或多个 Asset 关联:
from airflow.models.asset import Asset
# 单个 Asset
daily_sales = Asset(uri="s3://data-bucket/daily-sales/", name="daily_sales")
with DAG(
dag_id="process_sales",
schedule=[daily_sales], # 当 Asset 有新事件时触发
...
):
process_sales_task()
Asset Event 事件机制
每当外部数据源发生变化时,可以向 Airflow 报告 Asset Event:
from airflow.sdk import AssetEvent
AssetEvent(
asset=daily_sales,
extra={"partition": "2024-01-15"},
source_task_instance_id="report_ingestion",
)
Asset 命令行操作
Airflow CLI 提供了完整的 Asset 管理命令:
| 命令 | 功能 |
|---|---|
airflow assets list | 列出所有注册的 Asset |
airflow assets details | 查看 Asset 详情 |
airflow assets materialize | 手动触发 Asset 物化 |
airflow assets events | 查看 Asset 事件历史 |
资料来源:airflow-core/src/airflow/cli/cli_config.py:1-100
Callback 回调机制
Callback 机制概述
Callback 允许在 DAG 和任务生命周期中的关键节点执行自定义逻辑。这些节点包括任务失败、成功、重试以及 DAG 运行时错误等场景。Callback 通过 airflow.callbacks.callback_requests 模块中定义的数据结构传递给执行器。
graph TD
A[任务状态变更] -->|生成| B[CallbackRequest]
B --> C[(Callback 队列)]
C -->|处理| D[Callback Executor]
D --> E[执行用户逻辑]
F[on_failure_callback] -.-> E
G[on_success_callback] -.-> E
H[on_retry_callback] -.-> ECallback Request 数据结构
Callback 请求通过以下类定义:
| 类名 | 用途 | 触发时机 |
|---|---|---|
TaskCallbackRequest | 任务级回调 | 任务成功/失败/重试 |
DagCallbackRequest | DAG 级回调 | DAG 运行失败/成功 |
SlaCallbackRequest | SLA 超时回调 | 任务超出 SLA 阈值 |
DagRunCallbackRequest | DAG Run 回调 | DAG Run 状态变更 |
资料来源:airflow-core/src/airflow/callbacks/callback_requests.py:1-80
TaskCallbackRequest 详解
class TaskCallbackRequest(CallbackRequest):
simple_task_instance: SimpleTaskInstance
msg: str | None
processor_subprocess: bool
| 字段 | 类型 | 说明 |
|---|---|---|
| simple_task_instance | SimpleTaskInstance | 任务实例的简化表示 |
| msg | str | 回调消息(如错误信息) |
| processor_subprocess | bool | 是否在子进程中处理 |
Callback 注册方式
#### 方式一:任务级别
def task_failure_alert(context):
dag_id = context['dag_run'].dag_id
task_id = context['task'].task_id
send_alert(dag_id, task_id)
with DAG(...) as dag:
task = PythonOperator(
task_id="example_task",
python_callable=my_func,
on_failure_callback=task_failure_alert,
)
#### 方式二:DAG 级别
def dag_failure_alert(context):
dag_id = context['dag_run'].dag_id
send_dag_failure_notification(dag_id)
with DAG(
dag_id="my_dag",
on_failure_callback=dag_failure_alert,
):
# tasks...
#### 方式三:使用 Airflow API 动态注册
from airflow.models import TaskCallbackRequest
from airflow.callbacks.callback_requests import _prepare_callback
# 在执行器中动态注册
callback_request = _prepare_callback(
ti=task_instance,
callback_type="failure",
msg="Task failed with exception",
)
数据流转最佳实践
避免数据膨胀
XCom 数据存储在元数据库中,过大的数据量会影响数据库性能。建议遵循以下原则:
- 小数据原则:XCom 值应保持在 10KB 以下
- 引用传递:大型数据使用 URI/路径引用
- 定期清理:配置 XCom 过期策略
数据验证与类型安全
在任务间传递数据时,应实施严格的验证策略:
from pydantic import BaseModel, validator
class DataRecord(BaseModel):
id: int
value: float
tags: list[str]
@validator('value')
def value_must_be_positive(cls, v):
if v < 0:
raise ValueError('value must be positive')
return v
def process_data(**context):
raw_data = context['ti'].xcom_pull(task_ids='extract', key='data')
validated = DataRecord(**raw_data)
return validated.dict()
幂等性设计
任务应设计为幂等的,即相同输入多次执行产生相同结果。这在使用 XCom 进行数据传递时尤为重要,因为任务可能因重试而多次执行。
Asset 与 XCom 的选择
| 场景 | 推荐机制 | 原因 |
|---|---|---|
| 触发 DAG 执行 | Asset | 自动调度,感知数据源变化 |
| 任务间传递结果 | XCom | 轻量,适合小数据量 |
| 共享大型数据集 | 外部存储 + XCom 引用 | 避免元数据库压力 |
| 事件驱动工作流 | Asset + TriggerDagRunOperator | 结合使用 |
总结
Apache Airflow 的数据流转与交换机制通过 XCom、Asset 和 Callback 三大组件,为分布式工作流环境提供了完整的数据协调能力。XCom 专注于任务间的数据传递,Asset 实现了数据驱动的 DAG 触发,而 Callback 则在关键生命周期节点提供了扩展点。理解并合理运用这些机制,能够构建出高效、可靠且可维护的数据流水线。
资料来源:airflow-core/docs/core-concepts/xcoms.rst:1-50 资料来源:airflow-core/docs/authoring-and-scheduling/assets.rst:1-50
资料来源:[airflow-core/src/airflow/models/xcom.py:1-100]()
FastAPI核心API
Apache Airflow的FastAPI核心API是项目现代化Web服务架构的重要组成部分,旨在替代传统的Flask API。这一API层采用FastAPI框架构建,提供了高性能、异步支持、自动文档生成等特性。FastAPI核心API主要分为两个独立的应用程序:Core API(核心API)和Execution API(执行API),分别承担不同的职责。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow的FastAPI核心API是项目现代化Web服务架构的重要组成部分,旨在替代传统的Flask API。这一API层采用FastAPI框架构建,提供了高性能、异步支持、自动文档生成等特性。FastAPI核心API主要分为两个独立的应用程序:Core API(核心API)和Execution API(执行API),分别承担不同的职责。
FastAPI核心API位于 airflow-core/src/airflow/api_fastapi/ 目录下,采用模块化设计,将路由、数据模型、认证管理器等组件分离到独立的子模块中。这种架构设计使得代码组织清晰,便于维护和扩展。
架构概览
graph TB
subgraph "FastAPI应用层"
CA[Core API<br/>core_api/app.py]
EA[Execution API<br/>execution_api/app.py]
end
subgraph "认证层"
SAM[SimpleAuthManager<br/>simple_auth_manager.py]
AM[Auth Managers]
end
subgraph "路由层"
UI_DAGS[UI DAG Routes<br/>routes/ui/dags.py]
API_ROUTES[API Routes]
end
subgraph "数据模型层"
DM[DAG Run Datamodels<br/>datamodels/dag_run.py]
DT[其他Datamodels]
end
CA --> SAM
EA --> SAM
CA --> UI_DAGS
CA --> API_ROUTES
CA --> DM
EA --> DM核心组件
Core API应用
Core API是Airflow的主要REST API服务,负责提供DAG管理、任务调度、监控等核心功能。该应用通过 core_api/app.py 文件初始化和配置,包含以下关键特性:
- 异步请求处理支持
- OpenAPI自动文档生成
- Pydantic数据验证
- 依赖注入系统
- 中间件支持
Core API的路由结构按功能模块划分,包括UI路由、作业路由、配置路由等多个子模块。这种模块化设计使得API扩展和维护更加便捷。
Execution API应用
Execution API是专门用于任务执行的轻量级API服务,通过 execution_api/app.py 实现。该API主要负责:
- 任务实例的执行状态管理
- XCom消息传递
- 任务心跳检测
- 执行结果回调
Execution API采用独立部署模式,与Core API解耦,以提高系统的可靠性和响应速度。
数据模型
DAG Run数据模型
DAG Run数据模型定义了DAG运行实例的结构,是整个调度系统的核心数据结构之一。在 datamodels/dag_run.py 中定义的主要字段包括:
| 字段名 | 类型 | 说明 | 资料来源 |
|---|---|---|---|
| dag_id | str | DAG唯一标识符 | dag_run.py |
| run_id | str | 运行实例唯一标识 | dag_run.py |
| state | DagRunState | 运行状态枚举 | dag_run.py |
| execution_date | datetime | 执行时间戳 | dag_run.py |
| start_date | datetime | 开始时间 | dag_run.py |
| end_date | datetime | 结束时间 | dag_run.py |
| conf | dict | DAG配置参数 | dag_run.py |
数据验证
所有通过API接收的数据都会经过Pydantic模型验证,确保数据类型和格式的正确性。这种自动验证机制减少了手动编写数据校验代码的需求,提高了开发效率。
路由结构
UI DAG路由
UI DAG路由是面向Web界面的API端点集合,通过 routes/ui/dags.py 文件实现。这些路由主要用于:
- DAG列表查询
- DAG运行历史记录
- DAG状态监控
- 任务依赖关系可视化
UI路由通常需要用户认证,并返回适合前端渲染的数据格式。
路由组织方式
graph LR
subgraph "路由注册"
RC[Route Configuration<br/>cli_config.py]
end
subgraph "CLI命令"
CMD1[airflow dags]
CMD2[airflow tasks]
CMD3[airflow connections]
end
RC --> CMD1
RC --> CMD2
RC --> CMD3
CMD1 --> DAG_ROUTES[DAG Routes]
CMD2 --> TASK_ROUTES[Task Routes]
CMD3 --> CONN_ROUTES[Connection Routes]在 cli_config.py 中定义了Airflow CLI命令与API路由的映射关系,支持的命令包括:
| 命令组 | 子命令 | 功能说明 |
|---|---|---|
| dags | backfill, list-runs, pause, unpause, test | DAG生命周期管理 |
| tasks | list, run, state, clear | 任务操作管理 |
| connections | list, add, delete, edit | 连接配置管理 |
| providers | list, get | Provider信息查询 |
认证与授权
SimpleAuthManager
SimpleAuthManager是FastAPI认证系统的核心组件,位于 auth/managers/simple/simple_auth_manager.py。它提供了简化的认证机制,适用于单机部署或开发环境。
主要功能包括:
- 用户身份验证
- 会话管理
- 权限检查
- API密钥验证
认证管理器架构
classDiagram
class BaseAuthManager {
<<abstract>>
+authenticate()
+get_user()
+is_authorized()
}
class SimpleAuthManager {
+authenticate()
+get_user()
+is_authorized()
+get_user_role()
}
BaseAuthManager <|-- SimpleAuthManager认证流程
认证请求的标准处理流程如下:
- 客户端发送带认证信息的请求
- 中间件拦截请求并提取认证凭证
- SimpleAuthManager验证凭证有效性
- 根据用户角色确定访问权限
- 返回认证结果或拒绝访问
CLI集成
命令注册机制
在 cli_config.py 中使用懒加载模式注册CLI命令,这种方式可以提高应用启动速度:
ActionCommand(
name="info",
help="Show information about current Airflow and environment",
func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
args=(ARG_ANONYMIZE, ARG_FILE_IO, ARG_VERBOSE, ARG_OUTPUT),
)
懒加载命令
懒加载机制确保只有在实际调用命令时才加载对应的模块,这有助于:
- 减少内存占用
- 加快应用启动时间
- 避免不必要的模块依赖
支持的懒加载命令类型包括:
info_command- 环境信息展示plugins_command- 插件信息导出standalone_command- 独立运行模式config_command- 配置管理version_command- 版本信息cheat_sheet_command- 命令速查表
配置管理
配置命令
FastAPI核心API提供了丰富的配置管理功能,包括:
| 命令 | 功能 | 说明 |
|---|---|---|
| config get-value | 获取配置值 | 打印指定配置项的值 |
| config list | 列出配置 | 显示所有配置选项 |
| config lint | 配置检查 | 验证配置迁移正确性 |
| config update | 更新配置 | 修改配置值 |
配置参数
配置API支持多种参数用于过滤和格式化输出:
--section- 指定配置章节--option- 指定配置选项--include-descriptions- 包含描述信息--include-examples- 包含示例值--include-sources- 显示配置来源--include-env-vars- 显示环境变量--hide-sensitive- 隐藏敏感信息
API响应格式
统一响应结构
FastAPI核心API采用统一的响应格式,便于客户端处理:
{
"data": {},
"meta": {
"status": "success",
"timestamp": "2024-01-01T00:00:00Z"
}
}
错误响应
错误响应遵循统一的格式,包含错误码、消息和详细信息:
{
"error": {
"code": "DAG_NOT_FOUND",
"message": "DAG with id 'example_dag' not found",
"details": {}
}
}
技术特性
异步支持
FastAPI核心API全面支持异步操作,包括:
- 异步数据库查询
- 异步HTTP请求
- 异步文件操作
- 并发任务处理
自动文档
基于OpenAPI标准,API自动生成交互式文档:
- Swagger UI 可通过
/docs访问 - ReDoc 可通过
/redoc访问 - OpenAPI JSON 可通过
/openapi.json获取
数据验证
使用Pydantic v2进行严格的数据验证和转换:
- 类型检查
- 默认值处理
- 自定义验证器
- 嵌套模型支持
部署架构
多实例部署
在生产环境中,FastAPI核心API支持多实例部署:
graph TB
LB[Load Balancer]
API1[Core API Instance 1]
API2[Core API Instance 2]
API3[Core API Instance 3]
DB[(Database)]
Redis[(Redis)]
LB --> API1
LB --> API2
LB --> API3
API1 --> DB
API2 --> DB
API3 --> DB
API1 --> Redis
API2 --> Redis
API3 --> Redis与传统Flask API对比
| 特性 | FastAPI | Flask |
|---|---|---|
| 性能 | 异步原生支持 | 同步为主 |
| 文档 | 自动生成OpenAPI | 需手动编写 |
| 验证 | Pydantic自动验证 | 手动验证 |
| 类型安全 | 完整类型注解 | 运行时检查 |
| 并发 | 原生异步支持 | 需扩展支持 |
安全考虑
敏感信息保护
API在处理敏感信息时采取以下保护措施:
- 敏感配置加密存储
- 日志输出脱敏处理
- 环境变量优先读取
- 连接凭证安全传输
权限控制
基于角色的访问控制(RBAC)确保API操作的安全性:
- 管理员权限 - 完全访问
- 编辑权限 - 修改配置
- 只读权限 - 查询数据
- 无权限 - 拒绝访问
扩展开发
自定义路由
开发者可以通过以下步骤添加自定义路由:
- 在相应模块下创建路由文件
- 定义路由函数和参数
- 注册路由到应用
- 添加CLI命令映射
自定义认证
扩展认证机制需要实现以下接口:
authenticate()- 用户认证方法get_user()- 获取用户信息is_authorized()- 权限检查方法
最佳实践
API设计原则
- RESTful规范 - 遵循REST设计原则
- 版本控制 - 保持API向后兼容
- 错误处理 - 返回有意义的错误信息
- 性能优化 - 使用异步操作提高吞吐量
- 文档维护 - 及时更新API文档
性能优化建议
- 使用连接池管理数据库连接
- 合理使用缓存减少数据库查询
- 实现请求限流防止滥用
- 监控API响应时间及时发现问题
相关文档
来源:https://github.com/apache/airflow / 项目说明书
React前端架构
Apache Airflow 的 React 前端是一个现代化的单页应用(SPA),采用 React 18、TypeScript 和 Chakra UI 构建,为用户提供了直观、高效的 DAG 管理和监控界面。该前端架构遵循组件化设计原则,通过 TanStack Table 实现数据表格功能,使用 react-router-dom 进行路由管理,并集成了 react-i18...
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow 的 React 前端是一个现代化的单页应用(SPA),采用 React 18、TypeScript 和 Chakra UI 构建,为用户提供了直观、高效的 DAG 管理和监控界面。该前端架构遵循组件化设计原则,通过 TanStack Table 实现数据表格功能,使用 react-router-dom 进行路由管理,并集成了 react-i18next 实现国际化支持。资料来源:dev/react-plugin-tools/react_plugin_template/README.md
技术栈概览
核心依赖
| 技术 | 用途 | 版本说明 |
|---|---|---|
| React 18 | UI 框架 | 核心库 |
| TypeScript | 类型系统 | 类型安全 |
| Chakra UI | UI 组件库 | 样式和组件 |
| TanStack Table | 数据表格 | 排序、分页 |
| react-router-dom | 路由管理 | SPA 导航 |
| react-i18next | 国际化 | 多语言支持 |
构建工具
前端项目使用 Vite 作为构建工具,提供快速的开发服务器和优化的生产构建。Vite 配置将 React 和相关生态库标记为外部依赖,以减少插件包体积。资料来源:dev/react-plugin-tools/react_plugin_template/README.md
页面组件架构
页面目录结构
Airflow 前端的页面组件位于 airflow-core/src/airflow/ui/src/pages/ 目录下,采用模块化组织方式:
pages/
├── DagRuns.tsx # DAG运行列表页
├── DagsList/ # DAG列表模块
│ └── DagsList.tsx
├── Providers.tsx # 提供商信息页
├── Run/ # 运行详情模块
│ └── Header.tsx
├── XCom/ # XCom数据模块
│ └── XCom.tsx
└── HITLTaskInstances/ # 人工干预任务实例模块
└── HITLTaskInstances.tsx
页面组件通用模式
每个页面组件通常遵循以下结构模式:
- 导入依赖:引入必要的 React hooks、组件和工具函数
- 类型定义:定义页面专用的 TypeScript 接口
- 列定义工厂函数:创建
createColumns函数用于生成 TanStack Table 列配置 - 主组件导出:使用自定义 hooks 获取数据并渲染页面
const createColumns = (translate: TFunction): Array<ColumnDef<ProviderResponse>> => [
{
accessorKey: "package_name",
cell: ({ row: { original } }) => (
<Link ...>
{original.package_name}
</Link>
),
enableSorting: false,
header: translate("providers.columns.packageName"),
},
// ... 更多列定义
];
资料来源:airflow-core/src/airflow/ui/src/pages/Providers.tsx
核心组件体系
UI 基础组件
基础 UI 组件位于 airflow-core/src/airflow/ui/src/components/ui/ 目录下,提供可复用的界面元素。
#### Tag 组件
Tag 组件是对 Chakra UI Tag 的封装,提供了统一的标签样式:
const Tag = React.forwardRef<HTMLSpanElement, TagProps>(({
closable = Boolean(onClose),
endElement,
startElement,
...rest
}) => {
return (
<ChakraTag.Root ref={ref} {...rest}>
{Boolean(startElement) ? <ChakraTag.StartElement>{startElement}</ChakraTag.StartElement> : undefined}
<ChakraTag.Label>{children}</ChakraTag.Label>
{Boolean(endElement) ? <ChakraTag.EndElement>{endElement}</ChakraTag.EndElement> : undefined}
{Boolean(closable) ? (
<ChakraTag.EndElement>
<ChakraTag.CloseTrigger onClick={onClose} />
</ChakraTag.EndElement>
) : undefined}
</ChakraTag.Root>
);
});
| 属性 | 类型 | 说明 |
|---|---|---|
| closable | boolean | 是否显示关闭按钮 |
| endElement | ReactNode | 标签尾部元素 |
| startElement | ReactNode | 标签头部元素 |
| onClose | () => void | 关闭回调函数 |
资料来源:airflow-core/src/airflow/ui/src/components/ui/Tag.tsx
数据展示组件
#### DAG 触发运行组件 (TriggeredRuns)
TriggeredRuns 组件用于展示由 DAG 触发的运行信息,支持单个和多个运行场景:
export const TriggeredRuns = ({ dagRuns }: Props) => {
if (dagRuns === undefined || dagRuns.length === 0) {
return undefined;
}
return dagRuns.length === 1 ? (
<Flex gap={1}>
<Text>{`${translate("triggered")} ${translate("dagRun_one")}: `}</Text>
<StateBadge state={dagRuns[0]?.state as DagRunState} />
<Link asChild color="fg.info">
<RouterLink to={`/dags/${dagRuns[0]?.dag_id}/runs/${dagRuns[0]?.run_id}`}>
{dagRuns[0]?.dag_id}
</RouterLink>
</Link>
</Flex>
) : (
<Popover.Root autoFocus={false} lazyMount unmountOnExit>
<Popover.Trigger asChild>
<Button size="sm" variant="outline">
{`${dagRuns.length} ${translate("triggered")} ...`}
</Button>
</Popover.Trigger>
<Popover.Content ...>
{/* 多个运行时显示下拉列表 */}
</Popover.Content>
</Popover.Root>
);
};
| 场景 | 渲染方式 |
|---|---|
| 单个运行 | 内联展示 DAG ID 和状态徽章 |
| 多个运行 | 下拉菜单,点击展开列表 |
资料来源:airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx
任务实例工具提示组件
TaskInstanceTooltip 组件提供任务实例的详细信息悬浮提示:
<TaskInstanceTooltip
state={taskInstance.state}
taskId={taskInstance.task_id}
runId={runId}
queuedWhen={taskInstance.queued_when}
scheduledWhen={taskInstance.scheduled_when}
startDate={taskInstance.start_date}
endDate={taskInstance.end_date}
tryNumber={taskInstance.try_number}
/>
显示的信息包括:任务 ID、状态、运行 ID、调度时间、队列时间、开始时间、结束时间以及重试次数。资料来源:airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx
对话框组件
#### 清除任务实例确认对话框
ClearTaskInstanceConfirmationDialog 是用于确认清除任务实例操作的模态对话框:
<Dialog.Root open={isOpen} onOpenChange={onClose}>
<Dialog.Content>
<Dialog.Header>
<Dialog.Title>
<Icon color="tomato" pr="2" size="lg">
<GoAlertFill />
</Icon>
{translate("dags:runAndTaskActions.confirmationDialog.title")}
</Dialog.Title>
<Dialog.Description>
{translate("dags:runAndTaskActions.confirmationDialog.description", {
state: taskCurrentState,
time: getRelativeTime(firstInstance.start_date),
user: firstInstance?.unixname ?? "unknown user",
})}
</Dialog.Description>
</Dialog.Header>
<Dialog.Footer>
<Button colorPalette="blue" onClick={onClose}>
{translate("common:modal.confirm")}
</Button>
</Dialog.Footer>
</Dialog.Content>
</Dialog.Root>
对话框使用 Radix UI 的 Dialog 组件,提供标题、描述和底部操作区域,支持国际化翻译。资料来源:airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
#### 批量清除任务实例对话框
ClearGroupTaskInstanceDialog 用于批量清除一组任务实例,支持以下参数:
| 参数 | 类型 | 说明 | |
|---|---|---|---|
| future | boolean | 是否包含未来运行 | |
| past | boolean | 是否包含过去运行 | |
| upstream | boolean | 是否包含上游任务 | |
| onlyFailed | boolean | 仅清除失败任务 | |
| runOnLatestVersion | boolean | 使用最新 DAG 版本运行 | |
| note | string \ | null | 清除操作备注 |
| groupTaskIds | string[] | 任务 ID 列表 |
资料来源:airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
模态框组件
#### DAG 导入错误模态框
DAGImportErrorsModal 展示 DAG 导入过程中的错误信息:
- 使用 Accordion 组件折叠/展开每个错误详情
- 使用 Pagination 组件分页显示错误列表
- 显示错误时间戳和完整的堆栈跟踪信息
<Accordion.Root ...>
{importErrors.map((importError) => (
<Accordion.Item key={importError.timestamp} value={importError.timestamp}>
<Accordion.ItemTrigger>
<Text>{importError.filename}</Text>
</Accordion.ItemTrigger>
<Accordion.ItemContent>
<Text color="fg.error" fontSize="sm" whiteSpace="pre-wrap">
<code>{importError.stack_trace}</code>
</Text>
</Accordion.ItemContent>
</Accordion.Item>
))}
</Accordion.Root>
资料来源:airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx
表格列配置系统
TanStack Table 集成
Airflow 前端使用 TanStack Table(React Table)作为数据表格解决方案,通过列定义工厂函数生成可配置的表格结构。
DAG 运行列表列配置
DagRuns.tsx 中的列配置示例:
| 访问器键 | 组件 | 功能 |
|---|---|---|
| run_after | Time | 显示运行时间 |
| state | StateBadge | 任务状态徽章 |
| run_type | RunTypeIcon + Text | 运行类型图标和文字 |
| triggering_user_name | Text | 触发用户名称 |
| start_date | Time | 开始时间 |
| end_date | Time | 结束时间 |
| duration | renderDuration | 持续时间渲染 |
{
accessorKey: "state",
cell: ({ row: { original: { state } } }) => (
<StateBadge state={state}>
{translate(`common:states.${state}`)}
</StateBadge>
),
header: () => translate("state"),
},
资料来源:airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
DAG 列表特殊列
DagsList.tsx 包含一些独特的列定义:
{
accessorKey: "pending_actions",
cell: ({ row: { original: dag } }) => (
<NeedsReviewBadge dagId={dag.dag_id} pendingActions={dag.pending_actions} />
),
enableSorting: false,
header: "",
},
{
accessorKey: "trigger",
cell: ({ row: { original } }) => (
<TriggerDAGButton
allowedRunTypes={original.allowed_run_types}
dagDisplayName={original.dag_display_name}
dagId={original.dag_id}
isPaused={original.is_paused}
/>
),
enableSorting: false,
header: "",
},
这些列用于显示待处理操作和 DAG 触发按钮,没有排序功能。资料来源:airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx
国际化实现
翻译函数使用
前端使用 react-i18next 的 useTranslation hook 获取翻译函数:
const { t: translate } = useTranslation("common");
// 在列配置中使用
header: () => translate("dagRun.runAfter")
// 动态键翻译
{translate(`common:states.${state}`)}
命名空间管理
翻译键使用冒号分隔命名空间,例如 common:states.running、dagRun.runAfter 等,允许多个翻译文件独立管理。资料来源:airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
路由与导航
路由模式
Airflow 前端使用 react-router-dom 进行客户端路由,主要导航模式包括:
// 链接到 DAG 详情
<RouterLink to={`/dags/${dagId}/runs/${runId}`}>
// 链接到任务实例
<RouterLink to={getTaskInstanceLink({
dagId: original.dag_id,
dagRunId: original.run_id,
mapIndex: original.map_index,
taskId: original.task_id,
})}>
嵌套路由结构
页面通过嵌套路由组织,支持从 DAG 列表 → DAG 详情 → 任务实例的层级导航。
React 插件系统
插件模板架构
Airflow 提供 React 插件模板,支持第三方开发者构建可集成到主应用的组件库:
import { PluginComponent } from 'your-plugin-name';
<PluginComponent />
插件开发指南
| 步骤 | 说明 |
|---|---|
| 组件开发 | 使用 React + TypeScript 编写功能组件 |
| 主题适配 | 自动继承 Airflow 主应用主题 |
| 构建配置 | 使用 Vite 构建为库文件 |
| 外部依赖 | React 生态库标记为外部以避免冲突 |
资料来源:dev/react-plugin-tools/react_plugin_template/README.md
数据流架构
graph TD
A[用户交互] --> B[React 组件]
B --> C[API 请求]
C --> D[Airflow REST API]
D --> E[后端数据库]
E --> D
D --> C
C --> F[状态更新]
F --> B
B --> G[UI 渲染]
H[TanStack Table] --> I[列定义配置]
I --> B
B --> J[数据展示]
K[useTranslation] --> L[翻译键]
L --> M[翻译文件]
M --> K组件生命周期管理
状态管理
组件使用 React hooks 管理状态:
useState:本地组件状态useEffect:副作用处理useTranslation:国际化useParams:URL 参数获取useTableURLState:表格 URL 状态同步
条件渲染模式
// 使用短路求值
{Boolean(dagRuns.length) && <Component />}
// 使用三元运算符
{condition ? <TrueComponent /> : <FalseComponent />}
// 使用 && 运算符
{hasCondition && <Component />}
样式系统
Chakra UI 集成
前端使用 Chakra UI 作为主要样式解决方案,提供:
- 预设的设计令牌(颜色、间距、字体)
- 可访问性内置支持
- 主题定制能力
- 组件组合模式
颜色语义
| 语义颜色 | 用途 | 示例 |
|---|---|---|
| fg.info | 信息文本 | 链接 |
| fg.error | 错误文本 | 错误消息 |
| bg.emphasized | 强调背景 | 弹窗背景 |
最佳实践
组件开发规范
- 类型安全:所有组件使用 TypeScript,明确输入输出类型
- 可访问性:使用语义化 HTML 和 ARIA 属性
- 国际化:所有用户可见文本使用翻译函数
- 错误处理:组件包含错误边界和降级处理
- 性能优化:使用
lazyMount和unmountOnExit延迟加载
文件组织原则
- 页面组件:位于
pages/目录,包含业务逻辑 - 通用组件:位于
components/目录,可复用 - UI 组件:位于
components/ui/目录,基础元素 - 布局组件:位于
layouts/目录,页面结构
总结
Apache Airflow 的 React 前端采用了现代化的组件化架构,通过 TanStack Table 实现灵活的数据表格功能,借助 Chakra UI 提供一致的用户界面体验,使用 react-i18next 支持多语言环境。插件系统的设计允许第三方开发者扩展功能,同时保持与主应用的主题一致性。整个前端架构体现了可维护性、可扩展性和用户体验的平衡。
资料来源:[airflow-core/src/airflow/ui/src/pages/Providers.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/Providers.tsx)
容器化与Kubernetes部署
Apache Airflow 作为分布式工作流编排平台,支持通过容器化和 Kubernetes 实现灵活的部署方案。本文详细介绍 Airflow 的 Docker 镜像构建机制、Kubernetes 部署架构以及生产环境最佳实践。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
1. Docker 容器化概述
Apache Airflow 提供官方的 apache/airflow Docker 镜像,该镜像是构建生产级部署环境的基础。镜像采用多阶段构建技术,基于 Python slim 镜像优化体积。
1.1 Docker 构建配置
Airflow 的 Dockerfile 使用 Docker BuildKit 语法,支持通过 --mount=type=cache 挂载缓存目录以加速构建过程:
# syntax=docker/dockerfile:1.4
FROM python:{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}-slim-{ALLOWED_DEBIAN_VERSIONS[0]}
RUN apt-get update && apt-get install -y --no-install-recommends libatomic1 git curl
RUN pip install uv=={UV_VERSION}
RUN --mount=type=cache,id=cache-airflow-build-dockerfile-installation,target=/root/.cache/ \
uv pip install --system ignore pip=={AIRFLOW_PIP_VERSION} hatch=={HATCH_VERSION} \
pyyaml=={PYYAML_VERSION} gitpython=={GITPYTHON_VERSION} rich==
资料来源:dev/breeze/src/airflow_breeze/commands/release_management_commands.py:52-58
1.2 入口点与初始化
Docker 镜像的入口点脚本负责初始化 SQLite 数据库(当未配置外部数据库时):
如果未设置AIRFLOW__DATABASE__SQL_ALCHEMY_CONN变量,则在${AIRFLOW_HOME}/airflow.db创建 SQLite 数据库。
资料来源:docker-stack-docs/README.md:15-16
2. Kubernetes 部署架构
Airflow 在 Kubernetes 上的部署通过 Helm Chart 实现,支持高度可配置的分布式架构。
2.1 支持的 Kubernetes 版本
Apache Airflow 根据三大云服务商的 Kubernetes 支持周期维护兼容版本列表:
| 云服务商 | 生命周期页面 |
|---|---|
| Amazon EKS | endoflife.date/amazon-eks |
| Azure Kubernetes Service | endoflife.date/azure-kubernetes-service |
| Google Kubernetes Engine | endoflife.date/google-kubernetes-engine |
当前允许的 Kubernetes 版本列表:
| 版本 | 状态 |
|---|---|
| v1.30.13 | ✅ 支持 |
| v1.31.12 | ✅ 支持 |
| v1.32.8 | ✅ 支持 |
| v1.33.4 | ✅ 支持 |
| v1.34.0 | ✅ 支持 |
资料来源:dev/breeze/src/airflow_breeze/global_constants.py:ALLOWED_KUBERNETES_VERSIONS
2.2 Kubernetes 组件架构
Airflow 在 Kubernetes 环境中的核心组件包括:
graph TD
subgraph "Kubernetes 集群"
subgraph "airflow Namespace"
Scheduler["Scheduler Pod<br/>调度器"]
Webserver["Webserver Pod<br/>Web服务器"]
Triggerer["Triggerer Pod<br/>触发器"]
DagProcessor["Dag Processor Pod<br/>DAG处理器"]
end
subgraph "Pod 配置"
Executor["Executor<br/>执行器"]
Worker["Worker Pod<br/>工作节点"]
K8SExecutor["K8S Executor<br/>Kubernetes执行器"]
end
end
ExternalDB["外部数据库<br/>PostgreSQL/MySQL"]
Redis["消息队列<br/>Redis"]
Scheduler --> Executor
Executor --> K8SExecutor
K8SExecutor --> Worker
Scheduler --> ExternalDB
Webserver --> ExternalDB
Triggerer --> Redis2.3 多命名空间模式
Airflow 支持多命名空间部署模式,允许在不同命名空间中运行测试环境和生产环境:
option_multi_namespace_mode = click.option(
"--multi-namespace-mode",
help="使用多命名空间模式",
is_flag=True,
envvar="MULTI_NAMESPACE_MODE",
)
资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:multi_namespace_mode
3. Kubernetes 部署命令
3.1 部署工作流程
graph LR
A["breeze k8s deploy-airflow"] --> B["创建 KinD 集群"]
B --> C["创建 Kubernetes 命名空间"]
C --> D["部署 Helm Chart"]
D --> E["配置 Airflow 组件"]
E --> F["验证部署状态"]3.2 核心部署选项
| 参数 | 说明 | 默认值 |
|---|---|---|
--python | Python 版本 | 3.10 |
--kubernetes-version | Kubernetes 集群版本 | v1.31.12 |
--executor | 执行器类型 | LocalExecutor |
--deploy/--no-deploy | 是否通过 Skaffold 部署 | False |
--upgrade | 升级而非安装 Helm Chart | False |
--multi-namespace-mode | 启用多命名空间模式 | False |
--rebuild-base-image | 重建基础镜像 | False |
资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:option_executor
3.3 部署流程代码
部署 Airflow 到 Kubernetes 集群的核心逻辑:
def _deploy_airflow(
python: str,
kubernetes_version: str,
output: Output | None,
executor: str,
upgrade: bool,
use_standard_naming: bool,
wait_time_in_seconds: int,
extra_options: str,
multi_namespace_mode: bool,
):
cluster_name = get_kubectl_cluster_name(
python=python, kubernetes_version=kubernetes_version
)
# 创建命名空间
get_console(output=output).print(f"[info]创建集群 {cluster_name} 的命名空间")
run_command_with_k8s_env(
["kubectl", "create", "namespace", HELM_AIRFLOW_NAMESPACE],
python=python,
kubernetes_version=kubernetes_version,
output=output,
check=False,
)
资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:_deploy_airflow
4. Kubernetes 执行器配置
4.1 Pod 覆盖机制
Kubernetes 执行器允许通过 pod_override 动态配置 Pod 规格:
start_task_executor_config = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})
)
}
@task(executor_config=start_task_executor_config)
def start_task():
print_stuff()
资料来源:airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:20-27
4.2 Volume 挂载配置
支持在 Task 执行时挂载持久化卷:
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/",
name="example-kubernetes-test-volume"
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}
资料来源:airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:31-56
4.3 执行器配置参数表
| 配置项 | 类型 | 说明 |
|---|---|---|
pod_override | V1Pod | Pod 覆盖规格 |
annotations | Dict[str, str] | Pod 注解 |
labels | Dict[str, str] | Pod 标签 |
node_selector | Dict[str, str] | 节点选择器 |
tolerations | List[V1Toleration] | 容忍度配置 |
affinity | V1Affinity | 亲和性配置 |
resources | V1ResourceRequirements | 资源限制 |
5. Docker Compose 开发环境
对于本地开发,Airflow 提供 Docker Compose 配置:
services:
airflow:
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS__FOLDER: ${AIRFLOW_PROJ_DIR:-.}/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
资料来源:airflow-core/docs/howto/docker-compose/docker-compose.yaml
5.1 Docker Compose 配置参数
| 环境变量 | 说明 | 默认值 |
|---|---|---|
AIRFLOW__CORE__EXECUTOR | 执行器类型 | LocalExecutor |
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN | 数据库连接字符串 | sqlite |
AIRFLOW__CORE__FERNET_KEY | 加密密钥 | - |
AIRFLOW__CORE__DAGS__FOLDER | DAG 文件目录 | ./dags |
AIRFLOW__CORE__LOAD_EXAMPLES | 加载示例 DAG | true |
AIRFLOW__CORE__AIRFLOW_HOME | Airflow 主目录 | /opt/airflow |
6. 生产环境最佳实践
6.1 部署检查清单
| 检查项 | 说明 |
|---|---|
| 外部数据库配置 | 使用 PostgreSQL 或 MySQL 而非 SQLite |
| 执行器选择 | 生产环境推荐 CeleryExecutor 或 KubernetesExecutor |
| 高可用配置 | 部署多个 Scheduler 和 Webserver 实例 |
| 资源限制 | 为各组件设置 CPU 和内存限制 |
| 健康检查 | 配置 readiness 和 liveness 探针 |
| 持久化存储 | 使用 PVC 持久化 DAG 和日志 |
6.2 安全配置
生产环境部署时应考虑以下安全措施:
- 配置 Fernet 密钥:启用敏感数据加密
- 使用 Secrets:通过 Kubernetes Secrets 管理凭据
- 网络策略:配置 Pod 间网络隔离
- RBAC:启用基于角色的访问控制
- 审计日志:启用 Airflow 审计日志功能
6.3 监控与告警
建议集成的监控组件:
- Metrics: Prometheus + Grafana
- Logging: ELK Stack 或 Loki
- Tracing: Jaeger
- Alerting: Alertmanager
7. 快速部署命令
7.1 使用 Breeze 部署
# 部署到 Kind 集群
breeze k8s deploy-airflow --python 3.10 --kubernetes-version v1.31.12
# 升级现有部署
breeze k8s deploy-airflow --upgrade
# 启用多命名空间模式
breeze k8s deploy-airflow --multi-namespace-mode
# 进入集群 Shell
breeze k8s shell
# 运行测试
breeze k8s tests
7.2 Skaffold 开发循环
对于开发场景,支持热重载功能:
breeze k8s dev --skaffold-args "--auto-sync"
该命令会:
- 同步 DAG 文件到运行中的 Pod
- 热重载 airflow-core 源码(Scheduler/Triggerer/DAG Processor/API Server)
- 自动刷新 UI(暂不支持)
资料来源:dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:kubernetes_group
8. 总结
Apache Airflow 的容器化和 Kubernetes 部署方案提供了企业级的灵活性。通过官方 Docker 镜像、Helm Chart 以及 Breeze 工具链,开发者和运维人员可以:
- 在本地快速搭建开发环境
- 在 Kubernetes 上实现弹性扩展
- 通过 Kubernetes 执行器获得细粒度的资源控制
- 利用多命名空间支持隔离测试和生产环境
建议在生产部署前仔细评估业务需求,选择合适的执行器类型,并遵循本文档的最佳实践确保系统稳定性和安全性。
资料来源:[dev/breeze/src/airflow_breeze/commands/release_management_commands.py:52-58]()
Provider生态系统
Apache Airflow的Provider生态系统是Airflow的核心扩展机制,允许用户通过安装额外的Provider包来集成各种外部服务和平台。每个Provider提供特定的Hook、Operator、Trigger、Sensor和连接类型,使用户能够在DAG中与外部系统进行交互。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Apache Airflow的Provider生态系统是Airflow的核心扩展机制,允许用户通过安装额外的Provider包来集成各种外部服务和平台。每个Provider提供特定的Hook、Operator、Trigger、Sensor和连接类型,使用户能够在DAG中与外部系统进行交互。
Provider作为独立的Python包发布到PyPI,可以通过pip install apache-airflow-providers-<provider_name>进行安装。这种模块化设计使得Airflow核心保持精简,同时用户可以根据实际需求选择性地添加所需的集成功能。
架构设计
Provider核心组件
每个Provider包通常包含以下核心组件:
| 组件类型 | 描述 | 用途 |
|---|---|---|
| Hook | 数据连接抽象类 | 提供与外部系统交互的基础接口 |
| Operator | 任务执行单元 | 定义具体的业务操作逻辑 |
| Sensor | 外部依赖检查器 | 轮询外部条件直到满足为止 |
| Trigger | 异步事件触发器 | 支持triggerer组件异步执行 |
| Connection | 连接配置 | 定义与外部系统的连接参数 |
层级架构
graph TD
A[Airflow Core] --> B[Providers Manager]
B --> C[Provider Package]
C --> D[Hooks]
C --> E[Operators]
C --> F[Sensors]
C --> G[Triggers]
D --> H[External Services]
E --> H
F --> H
G --> H
subgraph "Provider Package Structure"
I[get_provider_info.py]
J[__init__.py]
K[hooks/*.py]
L[operators/*.py]
M[sensors/*.py]
endHook实现机制
Hook是Provider与外部系统交互的基础桥梁。不同Provider的Hook都遵循统一的基类设计规范。
AWS Hook实现示例
在Amazon Provider中,Hook通过boto3与AWS服务进行交互:
class Ec2Hook(AwsBaseHook):
API_TYPES = frozenset({"resource_type", "client_type"})
def __init__(self, api_type="resource_type", *args, **kwargs) -> None:
if api_type not in self.API_TYPES:
raise AirflowException("api_type can only be one of %s", self.API_TYPES)
kwargs[api_type] = "ec2"
self._api_type = api_type
super().__init__(*args, **kwargs)
资料来源:providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py:24-36
连接类型注册
Provider通过get_provider_info.py文件声明其提供的连接类型:
{
"integration-name": "AWS EC2",
"external-doc-url": "https://aws.amazon.com/ec2/",
"logo": "/docs/integration-logos/[email protected]",
"tags": ["aws"],
}
资料来源:providers/amazon/src/airflow/providers/amazon/get_provider_info.py:45-52
Provider生命周期管理
文档生成流程
Provider的文档通过Breeze工具自动生成,包括README、conf.py等文件:
def _generate_docs_conf(context: dict[str, Any], provider_details: ProviderPackageDetails):
docs_conf_content = render_template(
template_name="conf",
context=context,
extension=".py",
keep_trailing_newline=True,
)
docs_conf_path = provider_details.root_provider_path / "docs" / "conf.py"
docs_conf_path.write_text(docs_conf_content)
资料来源:dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py:89-97
版本与依赖管理
Provider的最小Airflow版本依赖通过解析dependencies确定:
def get_min_airflow_version(provider_id: str) -> str:
provider_details = get_provider_details(provider_id=provider_id)
min_airflow_version = MIN_AIRFLOW_VERSION
for dependency in provider_details.dependencies:
if dependency.startswith("apache-airflow>="):
current_min_airflow_version = dependency.split(">=")[1]
if "," in current_min_airflow_version:
current_min_airflow_version = current_min_airflow_version.split(",")[0]
if PackagingVersion(current_min_airflow_version) > PackagingVersion(MIN_AIRFLOW_VERSION):
min_airflow_version = current_min_airflow_version
return min_airflow_version
资料来源:dev/breeze/src/airflow_breeze/utils/packages.py:78-91
CLI命令集成
Provider功能通过Airflow CLI暴露,允许用户在命令行执行各种管理操作:
GroupCommand(
name="providers",
help="Display providers",
subcommands=PROVIDERS_COMMANDS,
),
资料来源:airflow-core/src/airflow/cli/cli_config.py:89-92
Provider相关命令
| 命令 | 功能 |
|---|---|
airflow providers list | 列出所有已安装的Provider |
airflow providers links | 显示Provider提供的链接 |
airflow providers secrets | 获取secrets后端信息 |
airflow providers executors | 获取Executor信息 |
airflow providers auth-managers | 获取认证管理器信息 |
默认连接配置
Airflow在初始化数据库时会创建一系列默认连接,这些连接覆盖了常用的外部系统:
| 连接ID | 连接类型 | 默认配置 |
|---|---|---|
| facebook_social | facebook_social | Facebook社交登录凭证 |
| fs_default | fs | 本地文件系统 |
| ftp_default | ftp | localhost:21 |
| google_cloud_default | google_cloud_platform | GCP默认项目 |
| http_default | http | httpbin.org |
| iceberg_default | iceberg | Iceberg REST服务 |
资料来源:airflow-core/src/airflow/utils/db.py:180-220
事件驱动架构
MessageQueueTrigger集成
Provider支持通过MessageQueueTrigger实现事件驱动的任务执行:
class GooglePubSubEvent(Protocol):
scheme = "google+pubsub"
def trigger_class(self) -> type[BaseEventTrigger]:
return PubsubPullTrigger
资料来源:providers/google/src/airflow/providers/google/event_scheduling/events/pubsub.py:52-57
异步Trigger架构
sequenceDiagram
DAG Task ->> Trigger: 创建Trigger实例
Trigger ->> MessageQueue: 订阅消息队列
MessageQueue -->> Trigger: 事件到达
Trigger ->> Triggerer: 触发条件满足
Triggerer ->> DAG Task: 任务执行完成Provider发现机制
自动注册流程
当Airflow启动时,Providers Manager自动扫描并加载所有已安装的Provider:
graph LR
A[启动Airflow] --> B[扫描Provider包]
B --> C[加载Hook]
B --> D[加载Operator]
B --> E[加载Sensor]
B --> F[注册Connection类型]
C --> G[注册到UI]
D --> G
E --> G
F --> G主流Provider一览
Amazon Provider (apache-airflow-providers-amazon)
提供与AWS服务的深度集成,包括:
- 计算服务:EC2、Lambda、ECS
- 数据服务:Glue、S3、Redshift、Athena
- 编排服务:Step Functions、CloudFormation
- 机器学习:SageMaker、Bedrock
资料来源:providers/amazon/src/airflow/providers/amazon/aws/hooks/cloud_formation.py:18-25
Google Provider (apache-airflow-providers-google)
集成Google Cloud Platform服务,包括BigQuery、GCS、Cloud Run等。
Apache Kafka Provider
提供Kafka消息队列的Hook和Operator支持,用于流数据处理场景。
总结
Provider生态系统是Apache Airflow实现高度可扩展性的核心机制。通过标准化的组件接口、统一的生命周期管理和灵活的CLI集成,Provider使得Airflow能够优雅地连接各种外部系统。用户可以根据工作需求选择性地安装Provider,实现与云服务商、数据库、消息队列等数十种外部平台的深度集成。
资料来源:[providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py:24-36]()
失败模式与踩坑日记
保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。
可能增加新用户试用和生产接入成本。
假设不成立时,用户拿不到承诺的能力。
新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
下游已经要求复核,不能在页面中弱化。
Pitfall Log / 踩坑日志
项目:apache/airflow
摘要:发现 19 个潜在踩坑项,其中 1 个为 high/blocking;最高优先级:安装坑 - 来源证据:ExternalTaskSensor can succeed early for task groups with NULL task states。
1. 安装坑 · 来源证据:`ExternalTaskSensor` can succeed early for task groups with NULL task states
- 严重度:high
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:
ExternalTaskSensorcan succeed early for task groups with NULL task states - 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_3c746f7ce44f43f1a5a81840f4ee741a | https://github.com/apache/airflow/issues/66877 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
2. 能力坑 · 能力判断依赖假设
- 严重度:medium
- 证据强度:source_linked
- 发现:README/documentation is current enough for a first validation pass.
- 对用户的影响:假设不成立时,用户拿不到承诺的能力。
- 建议检查:将假设转成下游验证清单。
- 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
- 证据:capability.assumptions | github_repo:33884891 | https://github.com/apache/airflow | README/documentation is current enough for a first validation pass.
3. 维护坑 · 维护活跃度未知
- 严重度:medium
- 证据强度:source_linked
- 发现:未记录 last_activity_observed。
- 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
- 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
- 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | last_activity_observed missing
4. 安全/权限坑 · 下游验证发现风险项
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 对用户的影响:下游已经要求复核,不能在页面中弱化。
- 建议检查:进入安全/权限治理复核队列。
- 防护动作:下游风险存在时必须保持 review/recommendation 降级。
- 证据:downstream_validation.risk_items | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium
5. 安全/权限坑 · 存在安全注意事项
- 严重度:medium
- 证据强度:source_linked
- 发现:No sandbox install has been executed yet; downstream must verify before user use.
- 对用户的影响:用户安装前需要知道权限边界和敏感操作。
- 建议检查:转成明确权限清单和安全审查提示。
- 防护动作:安全注意事项必须面向用户前置展示。
- 证据:risks.safety_notes | github_repo:33884891 | https://github.com/apache/airflow | No sandbox install has been executed yet; downstream must verify before user use.
6. 安全/权限坑 · 存在评分风险
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 对用户的影响:风险会影响是否适合普通用户安装。
- 建议检查:把风险写入边界卡,并确认是否需要人工复核。
- 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
- 证据:risks.scoring_risks | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium
7. 安全/权限坑 · 来源证据:Apache Airflow 3.1.6
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.6
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_c94c5b79bb91454c9e0ad22b4a36dc11 | https://github.com/apache/airflow/releases/tag/3.1.6 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。
8. 安全/权限坑 · 来源证据:Apache Airflow 3.1.7
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.7
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_80a5614167b44ecca96422caa56afca4 | https://github.com/apache/airflow/releases/tag/3.1.7 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。
9. 安全/权限坑 · 来源证据:Apache Airflow 3.1.8
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.1.8
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_3437fcfc89ff40a0ba17b7e5a5d8aa2c | https://github.com/apache/airflow/releases/tag/3.1.8 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
10. 安全/权限坑 · 来源证据:Apache Airflow 3.2.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.2.0
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_d18a498a98ed48f0bb3f813aaa554aea | https://github.com/apache/airflow/releases/tag/3.2.0 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。
11. 安全/权限坑 · 来源证据:Apache Airflow 3.2.1
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow 3.2.1
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_41889eab2c0240bbb0bd010262d41346 | https://github.com/apache/airflow/releases/tag/3.2.1 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
12. 安全/权限坑 · 来源证据:Apache Airflow Ctl (airflowctl) 0.1.2
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Ctl (airflowctl) 0.1.2
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_df5b495084624a899a4674d4b0193ec7 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.2 | 来源类型 github_release 暴露的待验证使用条件。
13. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.19.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.19.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_0f5ecc599d634c1daa9ee6c9f2434849 | https://github.com/apache/airflow/releases/tag/helm-chart/1.19.0 | 来源类型 github_release 暴露的待验证使用条件。
14. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.20.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.20.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_c13fe02283094cffa9e186716cc8dcf5 | https://github.com/apache/airflow/releases/tag/helm-chart/1.20.0 | 来源讨论提到 node 相关条件,需在安装/试用前复核。
15. 安全/权限坑 · 来源证据:Apache Airflow Helm Chart 1.21.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Apache Airflow Helm Chart 1.21.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_d29213d0ae2441e8907f407d8dc9b861 | https://github.com/apache/airflow/releases/tag/helm-chart/1.21.0 | 来源讨论提到 docker 相关条件,需在安装/试用前复核。
16. 安全/权限坑 · 来源证据:airflow-ctl/0.1.3
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:airflow-ctl/0.1.3
- 对用户的影响:可能影响授权、密钥配置或安全边界。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_9096523c7ba541c6ac1b6b926d6f6bc0 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.3 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
17. 安全/权限坑 · 来源证据:edge3: upgrade from 1.x silently leaves schema stale — `BaseDBManager.upgradedb` stamps `alembic_version_edge3` to head…
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:edge3: upgrade from 1.x silently leaves schema stale —
BaseDBManager.upgradedbstampsalembic_version_edge3to head without running migrations - 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_2bc80550929a49ddbce05c557bb225e0 | https://github.com/apache/airflow/issues/66524 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
18. 维护坑 · issue/PR 响应质量未知
- 严重度:low
- 证据强度:source_linked
- 发现:issue_or_pr_quality=unknown。
- 对用户的影响:用户无法判断遇到问题后是否有人维护。
- 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
- 防护动作:issue/PR 响应未知时,必须提示维护风险。
- 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | issue_or_pr_quality=unknown
19. 维护坑 · 发布节奏不明确
- 严重度:low
- 证据强度:source_linked
- 发现:release_recency=unknown。
- 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
- 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
- 证据:evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | release_recency=unknown
来源:Doramagic 发现、验证与编译记录