# https://github.com/apache/airflow 项目说明书

生成时间：2026-05-13 22:31:12 UTC

## 目录

- [Airflow简介与核心概念](#intro)
- [快速开始与安装指南](#quickstart)
- [系统架构详解](#architecture)
- [核心组件详解](#components)
- [执行器类型与选择](#executors)
- [数据流转与交换机制](#data-flow)
- [FastAPI核心API](#fastapi)
- [React前端架构](#frontend)
- [容器化与Kubernetes部署](#deployment)
- [Provider生态系统](#providers)

<a id='intro'></a>

## Airflow简介与核心概念

### 相关页面

相关主题：[系统架构详解](#architecture), [快速开始与安装指南](#quickstart)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/apache/airflow/blob/main/README.md)
- [airflow-core/src/airflow/models/dag.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py)
- [airflow-core/src/airflow/models/taskinstance.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py)
- [airflow-core/src/airflow/_shared/state/__init__.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_shared/state/__init__.py)
- [airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)
</details>

# Airflow简介与核心概念

## 概述

Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许用户使用Python代码定义、调度和执行复杂的批处理工作流。Airflow采用分布式架构，支持大规模工作流编排，是数据工程和MLOps领域广泛使用的开源工作流管理平台。

资料来源：[README.md](https://github.com/apache/airflow/blob/main/README.md)

## 核心架构

Airflow采用主从架构，主要包含以下组件：

| 组件 | 功能 | 说明 |
|------|------|------|
| Scheduler | 调度器 | 负责调度DAG运行，将任务分发给执行器 |
| Executor | 执行器 | 实际执行任务，支持多种类型 |
| Web Server | Web服务器 | 提供UI界面用于监控和管理 |
| Worker | 工作进程 | 执行具体任务的进程 |
| Metadata Database | 元数据库 | 存储DAG、任务、执行状态等元数据 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py:1-100](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)

```mermaid
graph TD
    A[用户编写的DAG定义] --> B[Scheduler]
    B --> C[Executor]
    C --> D[Worker]
    D --> E[Metadata Database]
    B --> E
    F[Web Server] --> E
    G[用户通过UI/API] --> F
```

## DAG有向无环图

DAG（Directed Acyclic Graph）是Airflow的核心概念，代表工作流的结构定义。

### DAG定义

```python
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id='my_first_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False
) as dag:
    task1 = BashOperator(task_id='task_1', bash_command='echo Hello')
    task2 = BashOperator(task_id='task_2', bash_command='echo World')
    task1 >> task2
```

资料来源：[airflow-core/src/airflow/models/dag.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py)

### DAG属性

| 属性 | 类型 | 描述 |
|------|------|------|
| dag_id | str | DAG的唯一标识符 |
| start_date | datetime | DAG开始日期 |
| schedule_interval | str/timedelta | 调度间隔 |
| catchup | bool | 是否补跑历史数据 |
| max_active_runs | int | 最大同时运行数 |
| is_paused | bool | DAG是否暂停 |

## 任务与任务实例

### 任务类型

Airflow支持多种任务类型：

| 任务类型 | 说明 |
|----------|------|
| BashOperator | 执行Bash命令 |
| PythonOperator | 执行Python函数 |
| Sensor | 等待特定条件满足 |
| TaskGroup | 任务组容器 |

资料来源：[airflow-core/src/airflow/models/taskinstance.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py)

### 任务实例状态

```mermaid
stateDiagram-v2
    [*] --> queued: 任务创建
    queued --> scheduled: Scheduler调度
    scheduled --> running: Worker接收
    running --> success: 执行成功
    running --> failed: 执行失败
    running --> upstream_failed: 上游任务失败
    success --> [*]
    failed --> [*]
    upstream_failed --> [*]
```

资料来源：[airflow-core/src/airflow/_shared/state/__init__.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_shared/state/__init__.py)

## 执行器类型

Airflow支持多种执行器，用于在不同环境中执行任务：

| 执行器 | 说明 | 适用场景 |
|--------|------|----------|
| LocalExecutor | 本地并行执行 | 开发/测试环境 |
| SequentialExecutor | 顺序执行 | 单任务场景 |
| CeleryExecutor | 分布式执行 | 生产环境大规模任务 |
| KubernetesExecutor | K8s Pod执行 | 云原生环境 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)

## CLI命令行接口

Airflow提供丰富的CLI命令用于管理DAG和任务：

| 命令 | 功能 |
|------|------|
| airflow dags list | 列出所有DAG |
| airflow dags list-runs | 列出DAG运行记录 |
| airflow tasks list | 列出DAG中的任务 |
| airflow tasks test | 测试单个任务 |
| airflow connections list | 列出连接 |
| airflow variables list | 列出变量 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)

### 常用命令示例

```bash
# 列出所有DAG
airflow dags list

# 触发DAG手动运行
airflow dags trigger my_dag_id

# 暂停/恢复DAG
airflow dags pause my_dag_id
airflow dags unpause my_dag_id

# 查看任务实例状态
airflow tasks list my_dag_id
```

## 连接与变量

### 连接管理

Airflow提供连接（Connection）功能用于存储外部系统访问凭证：

```python
from airflow.models import Connection
import json

conn = Connection(
    conn_id='my_postgres_conn',
    conn_type='postgres',
    host='localhost',
    schema='airflow_db',
    login='user',
    password='password',
    port=5432
)
```

资料来源：[airflow-core/src/airflow/utils/db.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/db.py)

### 支持的连接类型

| 连接类型 | 说明 |
|----------|------|
| postgres | PostgreSQL数据库 |
| mysql | MySQL数据库 |
| google_cloud_platform | GCP云平台 |
| http | HTTP连接 |
| ssh | SSH连接 |
| s3 | AWS S3存储 |

## 配置管理

### airflow.cfg主要配置项

| 配置节 | 配置项 | 说明 |
|--------|--------|------|
| core | dags_folder | DAG文件存放路径 |
| core | executor | 执行器类型 |
| core | catchup_by_default | 默认是否补跑 |
| scheduler | num_runs | 调度器循环次数 |
| webserver | web_server_host | Web服务器地址 |
| database | sql_alchemy_conn | 数据库连接字符串 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)

## 工作流执行流程

```mermaid
graph LR
    A1[DAG定义加载] --> A2[解析DAG结构]
    A2 --> A3[Scheduler创建DAG Run]
    A3 --> A4[任务排队]
    A4 --> A5[Executor分发任务]
    A5 --> A6[Worker执行任务]
    A6 --> A7[更新任务状态]
    A7 --> A8[记录到元数据库]
    A8 --> A9[Web UI展示状态]
```

## 最佳实践

### DAG编写规范

1. **任务唯一性**：每个任务的task_id在DAG内必须唯一
2. **正确设置start_date**：避免使用动态时间作为start_date
3. **合理的retry配置**：为关键任务配置重试策略
4. **使用描述性命名**：使用清晰的DAG和任务命名

### 调度配置建议

| 配置项 | 建议值 | 说明 |
|--------|--------|------|
| catchup | False | 生产环境建议关闭补跑 |
| max_active_runs | 1-5 | 根据任务复杂度调整 |
| schedule_interval | @daily/@hourly | 根据业务需求选择 |
| concurrency | 16-32 | 根据Worker能力设置 |

## 总结

Apache Airflow作为强大的工作流编排平台，提供了完整的DAG定义、调度执行、监控告警等功能。理解其核心概念（DAG、Task、Executor、Connection）是有效使用Airflow的基础。通过合理的配置和最佳实践，可以构建稳定可靠的数据管道和工作流系统。

---

<a id='quickstart'></a>

## 快速开始与安装指南

### 相关页面

相关主题：[Airflow简介与核心概念](#intro), [容器化与Kubernetes部署](#deployment)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [INSTALLING.md](https://github.com/apache/airflow/blob/main/INSTALLING.md)
- [Dockerfile](https://github.com/apache/airflow/blob/main/Dockerfile)
- [Dockerfile.ci](https://github.com/apache/airflow/blob/main/Dockerfile.ci)
- [airflow-core/src/airflow/example_dags/tutorial.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/tutorial.py)
- [generated/PYPI_README.md](https://github.com/apache/airflow/blob/main/generated/PYPI_README.md)
- [airflow-core/src/airflow/_vendor/README.md](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_vendor/README.md)
- [devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst](https://github.com/apache/airflow/blob/main/devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst)
- [dev/breeze/src/airflow_breeze/commands/production_image_commands.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/commands/production_image_commands.py)
</details>

# 快速开始与安装指南

Apache Airflow 是一个开源的工作流编排平台，用于编程创建、调度和监控批处理工作流。本指南将帮助您快速上手安装和配置 Airflow。

## 系统要求

在安装 Apache Airflow 之前，请确保您的系统满足以下基本要求：

| 要求项 | 最低版本 | 说明 |
|--------|----------|------|
| Python | 3.9+ | Airflow 核心功能需要 |
| pip | 最新版本 | Python 包管理器 |
| 操作系统 | Linux, macOS, Windows (WSL2) | 生产环境推荐使用 Linux |
| 数据库 | SQLite 3.15+, PostgreSQL 14+, MySQL 8.0+ | 元数据库存储 |
| 内存 | 4GB RAM | 最小需求，生产环境更高 |

资料来源：[INSTALLING.md:1-50](https://github.com/apache/airflow/blob/main/INSTALLING.md)

## 安装方式概览

Airflow 支持多种安装方式，您可以根据实际需求选择合适的方案：

```mermaid
graph TD
    A[安装 Airflow] --> B{使用场景}
    B -->|快速测试| C[pip 直接安装]
    B -->|容器化部署| D[Docker 镜像]
    B -->|深度开发| E[从源码编译]
    C --> F[使用约束文件安装]
    D --> G[官方镜像或自定义]
    E --> H[Breeze 开发环境]
```

资料来源：[generated/PYPI_README.md:1-30](https://github.com/apache/airflow/blob/main/generated/PYPI_README.md)

## 使用 pip 安装

### 基础安装

使用 pip 安装 Airflow 的标准方式是通过 pip install 命令配合约束文件：

```bash
pip install 'apache-airflow==3.2.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"
```

约束文件确保所有依赖项的版本兼容，避免因依赖冲突导致的安装失败。约束文件根据 Python 版本不同而有所区别。

资料来源：[INSTALLING.md:50-80](https://github.com/apache/airflow/blob/main/INSTALLING.md)

### 安装额外依赖

Airflow 提供了丰富的额外依赖包（extras），可按需安装：

```bash
# 安装包含 PostgreSQL 和 Google Cloud 支持的版本
pip install 'apache-airflow[postgres,google]==3.2.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.10.txt"
```

常用额外依赖包包括：

| 额外包名 | 说明 |
|----------|------|
| postgres | PostgreSQL 数据库支持 |
| mysql | MySQL 数据库支持 |
| google | Google Cloud Platform 集成 |
| amazon | AWS 集成 |
| kubernetes | Kubernetes Executor 支持 |
| sentry | Sentry 错误追踪集成 |

资料来源：[generated/PYPI_README.md:40-60](https://github.com/apache/airflow/blob/main/generated/PYPI_README.md)

### 供应商包安装

除了核心 Airflow 包外，您还可以单独安装各种供应商（Provider）包：

```bash
# 从 PyPI 安装供应商包
pip install apache-airflow-providers-google

# 从源码安装供应商包
pip install /path/to/apache-airflow-providers-google*.whl
```

安装后需要验证文件完整性：

```bash
shasum -a 512 package-name-version.tar.gz | diff - package-name-version.tar.gz.sha512
```

资料来源：[devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst:1-50](https://github.com/apache/airflow/blob/main/devel-common/src/sphinx_exts/includes/installing-providers-from-sources.rst)

## Docker 安装

### 官方镜像

Airflow 提供了官方 Docker 镜像，可以快速启动完整的工作环境：

```bash
# 拉取最新稳定版镜像
docker pull apache/airflow:latest

# 使用 Celery Executor 启动
docker run -d -p 8080:8080 \
  -e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@host/db \
  apache/airflow:latest
```

资料来源：[Dockerfile:1-30](https://github.com/apache/airflow/blob/main/Dockerfile)

### Docker Compose 方式

生产环境推荐使用 Docker Compose 进行部署，Airflow 项目提供了标准化的 docker-compose.yaml：

```yaml
version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
  
  airflow-webserver:
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
    command: webserver
    ports:
      - "8080:8080"
    depends_on:
      - postgres
```

### 构建自定义镜像

使用 Breeze 工具构建生产镜像：

```bash
# 使用 breeze 构建生产镜像
breeze prod-image build \
  --installation-method sources \
  --python 3.10 \
  --debian-version bullseye
```

资料来源：[dev/breeze/src/airflow_breeze/commands/production_image_commands.py:1-50](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/commands/production_image_commands.py)

## 从源码安装

### 克隆仓库

```bash
git clone https://github.com/apache/airflow.git
cd airflow
```

### 使用 Breeze 开发环境

Breeze 是 Airflow 官方提供的开发环境管理工具，提供完整的开发、测试和构建体验：

```bash
# 进入 breeze shell
./breeze shell

# 运行测试
./breeze testing python --test-type unit-tests --package-filter airflow-core
```

Breeze 使用 Docker 容器化方式，确保开发环境与生产环境的一致性。

资料来源：[airflow-core/src/airflow/_vendor/README.md:1-20](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_vendor/README.md)

### 编译分发包

构建 Airflow 分发包：

```bash
# 使用 Breeze 构建 sdist 或 wheel 包
breeze release-management build-rc \
  --airflow-constraints-mode="constraints" \
  --distribution-format both
```

编译后的包位于 `dist/` 目录下。

## 初始化数据库

安装完成后，需要初始化 Airflow 的元数据库：

```bash
# 初始化数据库
airflow db init

# 创建管理员用户
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com
```

## 启动 Airflow

### Web 服务器

```bash
airflow webserver --port 8080
```

### 调度器

在另一个终端启动调度器：

```bash
airflow scheduler
```

### 独立模式

对于快速测试，可以使用独立模式：

```bash
airflow standalone
```

该命令会启动一个包含 Web 服务器和调度器的单进程环境。

## 验证安装

### 运行示例 DAG

Airflow 自带多个示例 DAG，可用于验证安装：

```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG('example_dag', start_date=datetime(2024, 1, 1)) as dag:
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date'
    )
```

资料来源：[airflow-core/src/airflow/example_dags/tutorial.py:1-40](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/tutorial.py)

### 检查状态

```bash
# 查看 Airflow 版本
airflow version

# 查看已加载的连接
airflow connections list

# 查看已安装提供程序
airflow providers list
```

## 快速安装流程图

```mermaid
graph TD
    A[开始安装] --> B{选择安装方式}
    B -->|pip| C[安装 Python 依赖]
    B -->|Docker| D[安装 Docker]
    B -->|源码| E[克隆代码仓库]
    C --> F[初始化数据库]
    D --> G[拉取镜像]
    E --> H[安装依赖]
    F --> I[启动服务]
    G --> I
    H --> I
    I --> J[创建管理员账户]
    J --> K[访问 Web UI]
    K --> L[运行示例 DAG]
```

## 常见问题排查

### 依赖冲突

如果遇到依赖冲突问题，确保使用正确的约束文件：

```bash
pip install apache-airflow==<VERSION> \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-<VERSION>/constraints-<PYTHON_VERSION>.txt"
```

### 数据库连接问题

检查数据库连接字符串配置：

```ini
[database]
sql_alchemy_conn = postgresql+psycopg2://user:password@localhost/airflow
```

### 端口占用

如果 8080 端口被占用，可更换端口：

```bash
airflow webserver --port 8888
```

## 后续步骤

安装成功后，建议进行以下操作：

1. **配置连接**：设置数据库、API 密钥等连接信息
2. **配置 Executor**：根据需求选择 Executor 类型（Local、Celery、Kubernetes）
3. **配置告警**：设置邮件或其他告警渠道
4. **学习 DAG 编写**：参考官方教程编写第一个生产 DAG
5. **了解最佳实践**：学习 Airflow 的安全和性能最佳实践

## 相关文档链接

- 官方安装文档：https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html
- Docker 镜像：https://github.com/apache/airflow/blob/main/Dockerfile
- Breeze 开发工具：https://github.com/apache/airflow/blob/main/dev/breeze/README.md

---

<a id='architecture'></a>

## 系统架构详解

### 相关页面

相关主题：[核心组件详解](#components), [执行器类型与选择](#executors)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/jobs/scheduler_job_runner.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py)
- [airflow-core/src/airflow/dag_processing/manager.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/dag_processing/manager.py)
- [airflow-core/src/airflow/dag_processing/processor.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/dag_processing/processor.py)
- [airflow-core/docs/img/diagram_basic_airflow_architecture.py](https://github.com/apache/airflow/blob/main/airflow-core/docs/img/diagram_basic_airflow_architecture.py)
- [airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)
- [airflow-core/src/airflow/cli/commands/config_command.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/commands/config_command.py)
- [airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml)
</details>

# 系统架构详解

## 1. 概述

Apache Airflow 是一个开源的工作流编排平台，采用分布式架构设计，支持大规模任务调度与执行。Airflow 3.0 在架构上进行了重大升级，引入了更加模块化的组件设计，将 DAG 处理、任务调度和执行进行了清晰的分离。

核心架构由以下几个主要组件构成：

- **Web服务器**：提供用户界面
- **调度器（Scheduler）**：负责 DAG 调度和任务分发
- **DAG处理器（DagFileProcessorManager）**：负责解析和验证 DAG 文件
- **执行器（Executor）**：负责实际任务执行
- **元数据库**：存储 DAG 定义、任务状态和执行历史

资料来源：[airflow-core/docs/img/diagram_basic_airflow_architecture.py:1-50]()

## 2. 核心组件架构

### 2.1 DAG 处理系统

DAG 处理系统是 Airflow 的核心子系统之一，负责从文件系统读取、解析和验证 DAG 定义。

```mermaid
graph TD
    A[DAG 文件目录] --> B[DagFileProcessorManager]
    B --> C[文件扫描器]
    B --> D[文件处理器池]
    C -->|检测文件变更| D
    D --> E[Processor Subprocess]
    E --> F[DAG 对象]
    F --> G[Import Timeout Manager]
    G --> H[数据库更新]
    
    style B fill:#e1f5fe
    style D fill:#fff3e0
    style E fill:#e8f5e9
```

**关键组件**：

| 组件 | 文件位置 | 职责 |
|------|---------|------|
| DagFileProcessorManager | `dag_processing/manager.py` | 管理 DAG 文件处理的生命周期 |
| DagFileProcessor | `dag_processing/processor.py` | 执行单个 DAG 文件的解析 |
| ImportTimeoutManager | `dag_processing/manager.py` | 监控 DAG 导入超时 |

资料来源：[airflow-core/src/airflow/dag_processing/manager.py:1-100]()

#### 2.1.1 DAG 文件扫描

DagFileProcessorManager 维护一个文件扫描循环，定期检查配置的 DAG 目录中的文件变更：

```python
# 扫描间隔配置
processor_poll_interval -> scheduler_idle_sleep_time
```

默认配置变更说明：

| 旧配置项 | 新配置项 | 说明 |
|---------|---------|------|
| `scheduler.processor_poll_interval` | `scheduler.scheduler_idle_sleep_time` | 调度器空闲休眠时间 |
| `scheduler.create_cron_data_intervals` | `scheduler.create_cron_data_intervals` | 默认值由 True 改为 False |
| `scheduler.create_delta_data_intervals` | `scheduler.create_delta_data_intervals` | 默认值由 True 改为 False |

资料来源：[airflow-core/src/airflow/cli/commands/config_command.py:30-80]()

### 2.2 调度器系统

调度器是 Airflow 的大脑，负责决定何时运行哪些任务。

```mermaid
graph TD
    A[SchedulerJobRunner] --> B[关键区域锁]
    B --> C{获取锁}
    C -->|成功| D[处理过期 DAG]
    C -->|失败| E[等待]
    D --> F[调度待处理任务]
    F --> G[发送任务到执行器]
    G --> H[更新任务状态]
    H --> I[释放锁]
    I --> B
    
    style A fill:#fff8e1
    style B fill:#ffecb3
    style G fill:#c8e6c9
```

#### 2.2.1 调度器关键流程

1. **获取关键区域锁**：确保多调度器环境下的安全性
2. **处理过期 DAG**：清理陈旧的 DAG 数据
3. **调度任务**：根据依赖关系和执行时间分发任务
4. **孤儿任务处理**：检测并处理孤立任务

**调度器指标**：

| 指标名称 | 类型 | 说明 |
|---------|------|------|
| `scheduler.critical_section_busy` | counter | 关键区域锁竞争次数 |
| `scheduler.orphaned_tasks.adopted` | counter | 被采纳的孤儿任务数 |
| `scheduler.orphaned_tasks.cleared` | counter | 被清理的孤儿任务数 |
| `scheduler.tasks.killed_externally` | counter | 外部杀死任务数 |

资料来源：[airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:50-80]()

### 2.3 任务执行系统

任务执行通过 Task SDK 实现，采用远程进程通信模式。

```mermaid
graph LR
    A[任务实例] --> B[任务状态存储]
    A --> C[任务参数]
    A --> D[执行上下文]
    
    B -.->|get| E[...]
    B -.->|set| F[...]
    B -.->|delete| G[...]
    
    style A fill:#e3f2fd
    style B fill:#bbdefb
```

#### 2.3.1 任务状态管理

任务状态通过 TaskState 类进行管理，提供键值对存储：

```python
class TaskState:
    def get(self, key: str) -> str | None
    def set(self, key: str, value: str) -> None
    def delete(self, key: str) -> None
    def clear(self, all_map_indices: bool = False) -> None
```

| 方法 | 功能 | 说明 |
|-----|------|------|
| `get(key)` | 获取值 | 返回存储的值，键不存在返回 None |
| `set(key, value)` | 设置值 | 写入或覆盖指定键的值 |
| `delete(key)` | 删除值 | 删除单个键，键不存在无操作 |
| `clear()` | 清空 | 删除所有 map indices 的状态 |

资料来源：[task-sdk/src/airflow/sdk/execution_time/context.py:80-120]()

## 3. CLI 命令系统

Airflow 提供丰富的命令行接口，用于管理和操作各个组件。

### 3.1 命令分类

| 命令组 | 功能 |
|-------|------|
| `airflow dags` | DAG 管理（backfill, list-runs, pause, unpause, test） |
| `airflow tasks` | 任务操作（run, failed-deps, render, test） |
| `airflow connections` | 连接管理 |
| `airflow providers` | 提供者信息 |
| `airflow config` | 配置查看 |
| `airflow db-manager` | 数据库管理器 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py:50-150]()

### 3.2 任务命令详解

| 命令 | 功能 | 关键参数 |
|------|-----|---------|
| `airflow tasks run` | 运行单个任务 | `--dag-id`, `--task-id`, `--logical-date-or-run-id` |
| `airflow tasks render` | 渲染任务模板 | `--dag-id`, `--task-id`, `--logical-date-or-run-id` |
| `airflow tasks test` | 测试任务（不记录状态） | `--dag-id`, `--task-id`, `--logical-date-or-run-id` |
| `airflow tasks failed-deps` | 检查未满足依赖 | `--dag-id`, `--task-id`, `--logical-date-or-run-id` |

**测试命令特点**：
- 不检查依赖关系
- 不记录状态到数据库
- 适用于快速验证任务逻辑

资料来源：[airflow-core/src/airflow/cli/cli_config.py:150-200]()

## 4. 配置系统

### 4.1 关键配置变更（Airflow 2.x → 3.0）

| 配置项 | 变更类型 | 旧值 | 新值 |
|-------|---------|-----|------|
| `scheduler.catchup_by_default` | 默认值变更 | True | **False** |
| `scheduler.create_cron_data_intervals` | 默认值变更 | True | False |
| `scheduler.create_delta_data_intervals` | 默认值变更 | True | False |
| `scheduler.processor_poll_interval` | 重命名 | - | `scheduler_idle_sleep_time` |
| `scheduler.deactivate_stale_dags_interval` | 重命名 | - | `parsing_cleanup_interval` |
| `scheduler.statsd_on` | 重命名 | - | `metrics.statsd_on` |
| `scheduler.max_threads` | 重命名 | - | `dag_processor.parsing_processes` |

资料来源：[airflow-core/src/airflow/cli/commands/config_command.py:20-70]()

### 4.2 DAG Processing 配置

```yaml
dag_processing:
  manager_stalls: Number of stalled DagFileProcessorManager
  processor_timeouts: DAG 处理超时次数
  dag_file_refresh_error: DAG 文件加载失败次数
```

## 5. 执行架构图

```mermaid
graph TD
    subgraph Web层
        UI[Web UI] 
    end
    
    subgraph 调度层
        Scheduler[Scheduler Job Runner]
        DagProcessor[DagFileProcessorManager]
        ImportTimeout[Import Timeout Manager]
    end
    
    subgraph 执行层
        Executor[Executor]
        Worker[Worker Process]
        TaskRunner[Task Runner]
    end
    
    subgraph 存储层
        DB[(Metadata DB)]
        DAGFS[DAG File System]
    end
    
    UI <-->|API| Scheduler
    Scheduler <-->|调度决策| Executor
    DagProcessor <-->|解析DAG| DAGFS
    DagProcessor <-->|写入| DB
    Executor <-->|分发任务| Worker
    Worker <-->|执行| TaskRunner
    TaskRunner <-->|状态更新| DB
    
    style Scheduler fill:#ff9800,color:#fff
    style DagProcessor fill:#4caf50,color:#fff
    style Executor fill:#2196f3,color:#fff
```

## 6. 监控指标

Airflow 提供全面的监控指标体系：

### 6.1 DAG 处理指标

| 指标 | 描述 |
|------|------|
| `dag_processing.manager_stalls` | DagFileProcessorManager 停滞次数 |
| `dag_file_refresh_error` | DAG 文件刷新错误数 |
| `dag_file_processor_timeouts` | DAG 文件处理器超时（已废弃） |

### 6.2 调度器指标

| 指标 | 描述 |
|------|------|
| `scheduler.critical_section_busy` | 调度器关键区域忙次数 |
| `scheduler.orphaned_tasks.adopted` | 被采纳的孤儿任务 |
| `scheduler.orphaned_tasks.cleared` | 被清理的孤儿任务 |
| `scheduler.tasks.killed_externally` | 外部杀死任务数 |

### 6.3 任务执行指标

| 指标 | 描述 |
|------|------|
| `ti.start` | 任务启动次数 |
| `ti.finish` | 任务完成次数 |

资料来源：[airflow-core/src/airflow/_shared/observability/metrics/metrics_template.yaml:30-100]()

## 7. 总结

Apache Airflow 采用分布式架构设计，核心组件包括：

1. **DagFileProcessorManager**：负责 DAG 文件的解析和验证
2. **SchedulerJobRunner**：负责任务的调度和分发
3. **Executor**：负责实际任务执行
4. **Task SDK**：提供任务运行时的上下文管理

各组件通过消息队列和数据库进行通信，确保系统的高可用性和可扩展性。

---

<a id='components'></a>

## 核心组件详解

### 相关页面

相关主题：[系统架构详解](#architecture), [数据流转与交换机制](#data-flow)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/models/dag.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py)
- [airflow-core/src/airflow/models/dagrun.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dagrun.py)
- [airflow-core/src/airflow/models/connection.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/connection.py)
- [airflow-core/src/airflow/models/variable.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/variable.py)
- [airflow-core/src/airflow/models/pool.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/pool.py)
- [airflow-core/src/airflow/utils/db.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/db.py)
- [airflow-core/src/airflow/cli/commands/config_command.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/commands/config_command.py)

</details>

# 核心组件详解

## 概述

Apache Airflow 是一个开源的工作流编排平台，其核心组件构成了整个任务调度和执行的基础架构。这些组件包括DAG（有向无环图）、DAGRun（DAG运行实例）、Connection（连接）、Variable（变量）和Pool（资源池）。理解这些核心组件对于深入掌握Airflow的工作原理至关重要。

## DAG 模型

### 什么是 DAG

DAG（Directed Acyclic Graph，有向无环图）是Airflow中工作流定义的核心抽象。它代表了一组需要执行的任务以及任务之间的依赖关系。

### DAG 关键属性

| 属性 | 类型 | 说明 |
|------|------|------|
| dag_id | str | DAG的唯一标识符 |
| description | str | DAG的描述信息 |
| schedule_interval | - | 调度间隔配置 |
| start_date | datetime | DAG开始执行的日期 |
| end_date | datetime | DAG结束执行的日期（可选） |
| catchup | bool | 是否补跑历史数据 |
| max_active_runs | int | 最大并发运行数 |
| concurrency | int | 最大并发任务数 |
| is_paused | bool | DAG是否暂停 |

### DAG 的生命周期

```mermaid
graph LR
    A[创建DAG] --> B[DAG解析]
    B --> C[调度器调度]
    C --> D[创建DAGRun]
    D --> E[执行任务]
    E --> F[DAGRun完成]
    F --> G{是否继续调度}
    G -->|是| C
    G -->|否| H[DAG结束]
```

## DAGRun 模型

### 概述

DAGRun是DAG的一次具体执行实例。每次触发或调度DAG时，都会创建一个DAGRun实例来跟踪该次执行的状态。

### DAGRun 状态

| 状态 | 说明 |
|------|------|
| QUEUED | 排队等待执行 |
| RUNNING | 正在执行 |
| SUCCESS | 成功完成 |
| FAILED | 执行失败 |

### DAGRun 类型

| 类型 | 说明 |
|------|------|
| SCHEDULED | 由调度器正常调度触发 |
| MANUAL | 手动触发的运行 |
| BACKFILL | 回填作业执行 |

### 核心属性

DAGRun包含以下关键属性用于追踪执行信息：

- **run_id**: 每次运行的唯一标识
- **state**: 当前运行状态
- **execution_date**: 执行日期
- **start_date**: 实际开始时间
- **end_date**: 结束时间
- **triggering_user_name**: 触发运行的用户名

## Connection 连接管理

### 连接类型

Airflow支持多种连接类型，用于与外部系统进行交互：

| 连接类型 | 说明 | 默认配置 |
|----------|------|----------|
| postgres | PostgreSQL数据库 | 需要host、schema、login、password |
| mysql | MySQL数据库 | 需要host、schema、login、password |
| google_cloud_platform | GCP服务 | 使用默认schema |
| http | HTTP端点 | 需要host地址 |
| ftp | FTP服务器 | 需要host、login、password |
| ssh | SSH连接 | 需要host、login、key_file |
| redis | Redis缓存 | 需要host、port |
| fs | 文件系统 | 需要path配置 |
| hive_cli | Hive CLI | 需要host、port、schema |
| hiveserver2 | HiveServer2 | 需要host、schema、port |
| gremlin | Gremlin图数据库 | 需要host、port |
| facebook_social | Facebook社交 | 需要app_id、app_secret等 |
| iceberg | Iceberg表格式 | 需要host配置 |

### 连接创建示例

```python
Connection(
    conn_id="gcp_default",
    conn_type="google_cloud_platform",
    schema="default"
)
```

## Variable 变量管理

### 变量概述

Variable用于存储和检索Airflow中的键值对配置信息，可在DAG和任务之间共享。

### 变量操作

| 操作 | 说明 | 命令 |
|------|------|------|
| list | 列出所有变量 | `airflow variables list` |
| get | 获取变量值 | `airflow variables get <key>` |
| set | 设置变量 | `airflow variables set <key> <value>` |
| delete | 删除变量 | `airflow variables delete <key>` |
| import | 批量导入 | `airflow variables import <file>` |
| export | 批量导出 | `airflow variables export <file>` |

### JSON序列化支持

Variable支持JSON序列化，可存储复杂数据结构：

```bash
airflow variables set my_json '{"key": "value", "list": [1, 2, 3]}'
```

## Pool 资源池管理

### 池的作用

Pool用于限制同时执行的任务数量，控制资源使用并管理并发度。

### 内置默认池

| 池名称 | 默认槽位数 | 说明 |
|--------|------------|------|
| default_pool | 根据配置 | 用于未指定池的任务 |

### 槽位管理

```mermaid
graph TB
    A[任务请求] --> B{槽位可用?}
    B -->|是| C[分配槽位]
    B -->|否| D[排队等待]
    C --> E[执行任务]
    E --> F[释放槽位]
    D --> G{槽位释放?}
    G -->|是| C
    G -->|否| D
```

## CLI 命令行接口

### 可用命令组

Airflow提供了丰富的CLI命令用于管理核心组件：

| 命令组 | 说明 | 主要子命令 |
|--------|------|------------|
| dag | DAG管理 | list, details, list-runs, pause, unpause, backfill, test |
| dagrun | 运行管理 | list, trigger, clear |
| connections | 连接管理 | list, add, delete, edit |
| variables | 变量管理 | list, get, set, delete, import, export |
| pools | 池管理 | list, set |
| config | 配置查看 | list, get, show |
| providers | 提供者信息 | list, details |
| assets | 资产管理 | list, details, materialize |

### 常用命令示例

```bash
# 列出所有DAG
airflow dags list

# 触发DAG运行
airflow dags trigger <dag_id>

# 暂停DAG
airflow dags pause <dag_id>

# 列出所有连接
airflow connections list

# 导出变量
airflow variables export variables.json
```

## 配置变更说明

### Airflow 3.0 配置变化

从Airflow 2.x迁移到3.0时，以下配置项发生了变化：

| 旧配置项 | 新配置项 | 变化类型 |
|----------|----------|----------|
| scheduler.catchup_by_default | scheduler.catchup | 默认值改为False |
| scheduler.create_cron_data_intervals | - | 默认值改为False |
| scheduler.create_delta_data_intervals | - | 默认值改为False |
| scheduler.processor_poll_interval | scheduler.scheduler_idle_sleep_time | 重命名 |
| scheduler.deactivate_stale_dags_interval | scheduler.parsing_cleanup_interval | 重命名 |
| scheduler.statsd_on | metrics.statsd_on | 重命名 |
| scheduler.max_threads | dag_processor.parsing_processes | 重命名 |

### catchup 行为变化

> 在 Airflow 3.0 中，`catchup` 的默认值为 `False`。这意味着未明确设置 `catchup` 参数的 DAG 默认不会进行历史数据补跑。如果DAG依赖补跑行为，需要在 `airflow.cfg` 的 `scheduler` 部分将该配置设置为 `True`。资料来源：[airflow-core/src/airflow/cli/commands/config_command.py]()

## 数据模型关系

```mermaid
erDiagram
    DAG ||--o{ DAGRun : creates
    DAGRun ||--o{ TaskInstance : contains
    TaskInstance ||--|| Pool : uses
    Connection ||--o{ TaskInstance : referenced_by
    Variable ||--o{ TaskInstance : accessed_by
    
    DAG {
        string dag_id
        string description
        string schedule_interval
        datetime start_date
        boolean is_paused
    }
    
    DAGRun {
        string run_id
        string state
        datetime execution_date
        datetime start_date
        datetime end_date
    }
    
    TaskInstance {
        string task_id
        string state
        datetime start_date
        datetime end_date
    }
    
    Pool {
        string pool
        int slots
        string description
    }
    
    Connection {
        string conn_id
        string conn_type
        string host
        string schema
    }
    
    Variable {
        string key
        string val
        string description
    }
```

## 安全最佳实践

### 连接凭证管理

1. **使用Fernet加密**: Airflow使用Fernet加密存储敏感连接凭证
2. **轮换加密密钥**: 定期执行 `airflow rotate-fernet-key` 更新加密密钥

```bash
airflow rotate-fernet-key
```

### 敏感信息处理

| 类型 | 存储方式 | 建议 |
|------|----------|------|
| 密码 | 加密存储 | 使用变量或连接extra字段 |
| API密钥 | 加密存储 | 使用Secret Backend |
| 连接凭证 | 加密存储 | 使用连接管理界面 |

## 总结

Apache Airflow的核心组件构成了一个完整的工作流编排系统。DAG作为工作流定义的核心，依赖调度器生成DAGRun实例，而DAGRun则管理具体任务的执行。Connection、Variable和Pool等组件提供了与外部系统交互、配置管理和资源控制的能力。深入理解这些组件及其相互关系，对于熟练使用Airflow进行工作流开发和运维至关重要。

---

<a id='executors'></a>

## 执行器类型与选择

### 相关页面

相关主题：[系统架构详解](#architecture), [容器化与Kubernetes部署](#deployment)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/executors/executor_loader.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/executors/executor_loader.py)
- [airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)
- [providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py](https://github.com/apache/airflow/blob/main/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py)
- [dev/breeze/src/airflow_breeze/commands/common_options.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/commands/common_options.py)
- [dev/breeze/src/airflow_breeze/commands/developer_commands.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/commands/developer_commands.py)
- [airflow-core/src/airflow/utils/db.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/db.py)
</details>

# 执行器类型与选择

## 概述

执行器（Executor）是 Apache Airflow 架构中的核心组件，负责实际执行任务实例。每个执行器实现了特定的任务调度和执行策略，从单机的本地执行到分布式的集群环境，Airflow 提供了多种执行器类型以满足不同场景的需求。执行器的选择直接影响工作流的性能、可扩展性和可靠性。

执行器系统遵循统一的接口规范，所有执行器都继承自基础执行器类，定义了任务提交、状态查询、心跳维护等标准方法。资料来源：[airflow-core/src/airflow/executors/executor_loader.py:1-50]()

## 执行器架构

### 核心组件关系

```mermaid
graph TD
    DAG[DAG 有向无环图] --> Scheduler[调度器]
    Scheduler --> ExecutorLoader[执行器加载器]
    ExecutorLoader --> Executor[执行器实例]
    Executor --> TaskInstance[任务实例]
    Executor --> Worker[Worker 节点]
    
    Executor --> LocalExecutor[LocalExecutor]
    Executor --> CeleryExecutor[CeleryExecutor]
    Executor --> KubernetesExecutor[KubernetesExecutor]
    Executor --> SequentialExecutor[SequentialExecutor]
    
    LocalExecutor --> LocalWorker[本地进程]
    CeleryExecutor --> CeleryWorker[Celery Worker]
    KubernetesExecutor --> K8sPod[K8s Pod]
```

### 执行器类型总览

| 执行器类型 | 说明 | 适用场景 | 并发能力 |
|-----------|------|----------|---------|
| SequentialExecutor | 顺序执行器 | 开发调试、单节点 | 1 |
| LocalExecutor | 本地执行器 | 单机生产、小规模任务 | 多进程 |
| CeleryExecutor | Celery 执行器 | 分布式、跨机器 | 集群规模 |
| KubernetesExecutor | K8s 执行器 | 云原生、自动扩缩容 | Pod 数量 |
| LocalKubernetesExecutor | 本地 K8s 执行器 | 测试环境 | 可配置 |
| CeleryKubernetesExecutor | 混合执行器 | 灵活调度 | 集群规模 |

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:50-80]()

## 执行器加载机制

### 执行器加载器职责

`ExecutorLoader` 是 Airflow 中负责动态加载和解析执行器配置的中央组件。它支持多种执行器配置格式，包括简单的单执行器名称和复杂的团队别名配置。

执行器加载器的主要职责包括：

1. 解析 `airflow.cfg` 中的执行器配置
2. 验证执行器名称的有效性
3. 实例化对应的执行器类
4. 处理执行器别名和团队配置

```python
# 执行器名称解析核心逻辑
if module_or_name in CORE_EXECUTOR_NAMES:
    executor_names_per_team.append(
        ExecutorName(
            alias=alias, 
            module_path=cls.executors[module_or_name], 
            team_name=team_name
        )
    )
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:80-120]()

### 执行器配置格式

执行器配置支持以下几种格式：

```mermaid
graph LR
    A[配置文件] --> B[执行器名称]
    B --> C[单执行器]
    B --> D[别名配置]
    B --> E[团队配置]
    
    C --> C1[LocalExecutor]
    C --> C2[CeleryExecutor]
    
    D --> D1[MyAlias:LocalExecutor]
    D1 --> D2[alias=MyAlias<br/>module=LocalExecutor]
    
    E --> E1[team:executor]
    E1 --> E2[team_name=team<br/>executor=executor]
```

#### 简单配置

```ini
executor = LocalExecutor
```

#### 别名配置

```ini
executor = MyLocalExecutor:LocalExecutor
```

格式为 `别名:执行器类型`，其中别名用于日志和监控标识。

#### 团队配置

```ini
[executors]
TeamA = TeamA: CeleryExecutor
TeamB = TeamB: KubernetesExecutor
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:100-150]()

## 核心执行器类型详解

### SequentialExecutor

顺序执行器是最基础的执行器，以单线程顺序方式执行任务。此执行器主要用于以下场景：

- **开发环境**：在没有多进程支持的环境中运行
- **调试场景**：需要逐个追踪任务执行流程
- **最小化部署**：资源受限的单机环境

```python
# SequentialExecutor 核心特性
- 单进程顺序执行
- 无并发能力
- 任务队列单一
- 无需外部依赖
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:150-180]()

### LocalExecutor

本地执行器在单机环境下提供多进程并发执行能力。它使用 Python 的 `multiprocessing` 模块创建工作进程池，每个工作进程可以并行执行多个任务。

```mermaid
graph TD
    LocalExecutor --> Worker1[Worker Process 1]
    LocalExecutor --> Worker2[Worker Process 2]
    LocalExecutor --> WorkerN[Worker Process N]
    
    Worker1 --> Task1[Task Instance]
    Worker1 --> Task2[Task Instance]
    Worker2 --> Task3[Task Instance]
    WorkerN --> Task4[Task Instance]
    
    Worker1 -.-> Queue[任务队列]
    Worker2 -.-> Queue
    WorkerN -.-> Queue
```

**配置参数**：

| 参数 | 说明 | 默认值 |
|------|------|--------|
| parallelism | 并行任务数上限 | CPU 核心数 |
| local_worker_kwargs | 工作进程额外参数 | {} |

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:180-220]()

### CeleryExecutor

CeleryExecutor 是分布式执行器，使用 Celery 作为消息队列来处理跨多台机器的任务分发和执行。Celery 是 Python 生态中成熟的任务队列系统，支持 Redis、RabbitMQ 等多种消息代理。

```mermaid
graph TD
    Scheduler[Airflow Scheduler] -->|任务提交| CeleryBroker[Celery Broker<br/>Redis/RabbitMQ]
    CeleryBroker --> Worker1[Celery Worker 1]
    CeleryBroker --> Worker2[Celery Worker 2]
    CeleryBroker --> WorkerN[Celery Worker N]
    
    Worker1 -->|执行结果| ResultBackend[Result Backend]
    Worker2 -->|执行结果| ResultBackend
    WorkerN -->|执行结果| ResultBackend
    
    ResultBackend -->|状态同步| Scheduler
```

**适用场景**：

- 需要水平扩展的任务处理
- 跨多台机器的分布式部署
- 需要任务优先级和重试策略
- 长时间运行的后台任务

**依赖组件**：

- Celery 消息代理（Redis 或 RabbitMQ）
- Celery Worker 节点
- 结果后端（数据库或缓存）

资料来源：[providers/celery/src/airflow/providers/celery/executors/celery_executor.py:1-100]()

### KubernetesExecutor

KubernetesExecutor 是原生运行在 Kubernetes 集群上的执行器，每个任务都在独立的 Pod 中执行。这种设计提供了极佳的隔离性和资源弹性。

```mermaid
graph TD
    Scheduler[Scheduler] --> K8sAPI[Kubernetes API]
    K8sAPI --> Pod1[Task Pod 1]
    K8sAPI --> Pod2[Task Pod 2]
    K8sAPI --> PodN[Task Pod N]
    
    Pod1 -->|完成| K8sAPI
    Pod2 -->|完成| K8sAPI
    PodN -->|完成| K8sAPI
    
    K8sAPI --> ConfigMap[ConfigMap<br/>任务定义]
    K8sAPI --> Secret[Secret<br/>连接凭证]
```

**核心优势**：

| 特性 | 说明 |
|------|------|
| 任务隔离 | 每个任务独立 Pod，避免资源冲突 |
| 自动扩缩容 | 根据任务队列动态创建/销毁 Pod |
| 资源控制 | 支持 CPU、内存 Limits 和 Requests 配置 |
| 亲和性 | 支持节点亲和性、污点容忍等调度策略 |
| 生命周期 | 任务完成即销毁，资源高效利用 |

**配置选项**：

| 参数 | 说明 |
|------|------|
| namespace | Kubernetes 命名空间 |
| parallelism | 最大并发 Pod 数 |
| worker_pods_creation_batch_size | 批量创建大小 |
| kube_client_request_args | K8s 客户端参数 |

资料来源：[providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:1-150]()

## 执行器配置管理

### 配置文件位置

执行器配置通过 `airflow.cfg` 文件管理，主要配置项位于 `[core]` 部分：

```ini
[core]
executor = LocalExecutor
```

### Breeze 开发环境配置

在 Airflow Breeze 开发环境中，可以使用统一的命令行选项配置执行器：

```bash
# 查看执行器列表
breeze shell --executor

# 使用指定执行器启动
breeze shell --executor KubernetesExecutor
```

**支持的执行器选项**：

| 选项 | 说明 | 可用值 |
|------|------|--------|
| --executor | 执行器类型 | LocalExecutor, CeleryExecutor, KubernetesExecutor 等 |

资料来源：[dev/breeze/src/airflow_breeze/commands/common_options.py:100-150]()

### 执行器别名与团队配置

Airflow 支持通过 `ExecutorLoader` 实现执行器别名和团队级别的配置：

```python
class ExecutorName(NamedTuple):
    alias: str | None
    module_path: str
    team_name: str | None
```

配置示例：

```ini
[executors]
# 格式: 别名 = 模块路径
my_alias = airflow.executors.local_executor.LocalExecutor
team_default = airflow.providers.celery.executors.celery_executor.CeleryExecutor
```

**配置验证逻辑**：

```python
# 检查是否为内置执行器名称
if module_or_name in CORE_EXECUTOR_NAMES:
    executor_names_per_team.append(
        ExecutorName(
            alias=alias, 
            module_path=cls.executors[module_or_name], 
            team_name=team_name
        )
    )
# 检查是否为模块路径
elif "." not in module_or_name:
    raise AirflowConfigException(
        "Incorrectly formatted executor configuration..."
    )
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:200-260]()

## 执行器选择指南

### 决策流程

```mermaid
graph TD
    Start[开始选择执行器] --> Q1{部署规模?}
    
    Q1 -->|单机/开发| Q2{需要并发?}
    Q1 -->|分布式| Q3{K8s 环境?}
    Q1 -->|大规模| Q4{需要灵活调度?}
    
    Q2 -->|是| LocalExecutor
    Q2 -->|否| SequentialExecutor
    
    Q3 -->|是| KubernetesExecutor
    Q3 -->|否| CeleryExecutor
    
    Q4 -->|是| CeleryKubernetesExecutor
    Q4 -->|否| CeleryExecutor
```

### 场景推荐

| 场景 | 推荐执行器 | 原因 |
|------|-----------|------|
| 本地开发调试 | SequentialExecutor | 简单、无依赖 |
| 单机生产环境 | LocalExecutor | 多进程并发、资源可控 |
| 小型集群 | CeleryExecutor | 配置简单、成熟稳定 |
| 云原生/K8s | KubernetesExecutor | 弹性扩缩容、任务隔离 |
| 混合云环境 | CeleryKubernetesExecutor | 灵活调度、兼容性好 |

### 配置示例

#### 本地执行器配置

```ini
[core]
executor = LocalExecutor

[local_executor]
parallelism = 8
```

#### Celery 执行器配置

```ini
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = redis://redis:6379/1
worker_concurrency = 16
```

#### Kubernetes 执行器配置

```ini
[core]
executor = KubernetesExecutor

[kubernetes]
namespace = airflow
parallelism = 32
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:260-320]()

## 命令行接口

### 查看执行器信息

通过 Airflow CLI 可以查看已配置的执行器信息：

```bash
# 查看 Airflow 信息（包含执行器）
airflow info

# 查看提供商执行器列表
airflow providers executors
```

**可用的 providers 子命令**：

| 命令 | 说明 |
|------|------|
| `providers list` | 列出所有提供商 |
| `providers executors` | 列出可用的执行器 |
| `providers details` | 提供商详细信息 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py:1-100]()

### CLI 配置定义

执行器相关的 CLI 命令在 `cli_config.py` 中定义：

```python
# 提供商执行器列表命令
ActionCommand(
    name="executors",
    help="Get information about executors provided",
    func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
    args=(ARG_OUTPUT, ARG_VERBOSE),
)
```

**命令行参数**：

| 参数 | 短标识 | 说明 |
|------|--------|------|
| --output | -o | 输出格式（table、json、yaml） |
| --verbose | -v | 详细输出模式 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py:100-200]()

## 默认连接配置

执行器运行需要配置相应的连接凭证。Airflow 在初始化数据库时提供默认连接配置：

### 开发环境默认连接

```python
Connection(
    conn_id="fs_default",
    conn_type="fs",
    extra='{"path": "/"}',
)

Connection(
    conn_id="google_cloud_default",
    conn_type="google_cloud_platform",
    schema="default",
)

Connection(
    conn_id="http_default",
    conn_type="http",
    host="https://www.httpbin.org/",
)
```

### 生产环境注意事项

在生产环境中部署时，需要：

1. 配置真实的数据库连接（避免 SQLite）
2. 根据执行器类型配置相应的消息代理连接
3. 设置正确的认证凭证和 SSL 选项

资料来源：[airflow-core/src/airflow/utils/db.py:1-100]()

## 最佳实践

### 执行器选择原则

1. **从简单开始**：开发环境使用 LocalExecutor，生产环境根据需求升级
2. **资源评估**：估算并发任务数、任务运行时长、资源消耗
3. **运维能力**：评估团队对消息队列或容器编排的熟悉程度
4. **成本考虑**：云环境优先考虑 KubernetesExecutor 以获得弹性
5. **扩展性预留**：选择具有一定扩展空间的执行器类型

### 性能优化建议

| 优化项 | LocalExecutor | CeleryExecutor | KubernetesExecutor |
|--------|--------------|----------------|-------------------|
| 并发数 | `parallelism` 参数 | `worker_concurrency` | `parallelism` 参数 |
| 心跳间隔 | 降低网络延迟 | 合理设置超时 | 配置 liveness 探针 |
| 资源限制 | 进程级限制 | Worker 级限制 | Pod 级限制 |
| 监控 | 进程监控 | Celery Events | K8s Events |

### 常见问题排查

```mermaid
graph TD
    Issue[问题排查] --> Check1{执行器启动失败?}
    Issue --> Check2{任务堆积?}
    Issue --> Check3{连接超时?}
    
    Check1 --> Solution1[检查配置<br/>验证依赖]
    Check2 --> Solution2[增加并发数<br/>优化任务拆分]
    Check3 --> Solution3[检查网络<br/>验证凭证]
```

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:320-400]()

## 总结

Apache Airflow 的执行器系统提供了灵活的任务执行框架，从单机的顺序执行到分布式的 Kubernetes 集群执行，能够满足不同规模和场景的需求。执行器的选择应综合考虑部署环境、团队能力、运维成本和扩展性要求。

对于新项目，建议先使用 LocalExecutor 进行开发和测试，待系统稳定后再根据实际负载和扩展需求迁移到 CeleryExecutor 或 KubernetesExecutor。在选择执行器时，务必参考官方文档中的具体版本要求，确保所有依赖组件正确配置。

资料来源：[airflow-core/src/airflow/executors/executor_loader.py:400-450]()

---

<a id='data-flow'></a>

## 数据流转与交换机制

### 相关页面

相关主题：[核心组件详解](#components), [FastAPI核心API](#fastapi)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/models/xcom.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/xcom.py)
- [airflow-core/src/airflow/models/asset.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/asset.py)
- [airflow-core/src/airflow/callbacks/callback_requests.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/callbacks/callback_requests.py)
- [airflow-core/docs/core-concepts/xcoms.rst](https://github.com/apache/airflow/blob/main/airflow-core/docs/core-concepts/xcoms.rst)
- [airflow-core/docs/authoring-and-scheduling/assets.rst](https://github.com/apache/airflow/blob/main/airflow-core/docs/authoring-and-scheduling/assets.rst)
</details>

# 数据流转与交换机制

## 概述

Apache Airflow 中的数据流转与交换机制是工作流编排的核心能力，允许任务之间传递数据、状态和事件。该机制主要由三大支柱组成：**XCom（跨任务通信）**、**Asset（资产）** 和 **Callback（回调）**。这些机制共同构成了 Airflow 工作流中数据流动的完整生态，使得分布式任务执行环境下的数据交换变得可靠且可追踪。

XCom 机制允许任务在执行完成后向元数据库写入值，后续任务可以检索这些值进行后续处理。Asset 机制提供了一种基于数据源状态变化触发 DAG 执行的优雅方式。Callback 机制则支持任务生命周期中的关键节点执行自定义逻辑，如任务失败时发送告警。

## XCom 机制详解

### XCom 核心概念

XCom（Cross-Communication）是 Airflow 中实现任务间数据交换的主要机制。它通过 `airflow.models.xcom.BaseXCom` 类实现，数据默认存储在 Airflow 元数据库中，支持任务执行器在分布式环境下共享数据。

```mermaid
graph TD
    A[任务 A 执行] -->|push| B[XCom 写入]
    B --> C[(元数据库)]
    C -->|pull| D[任务 B 读取]
    D --> E[任务 B 继续执行]
    
    F[参数传递] -.->|包含| B
    G[返回值自动推送] -.->|包含| B
```

XCom 支持推送和拉取两种操作模式。任务可以通过 `xcom_push()` 方法显式推送数据，也可以通过返回 Python 对象让 Airflow 自动推送。拉取操作使用 `xcom_pull()` 方法，可以按任务 ID 或 DAG run ID 筛选特定数据。

### XCom 数据模型

XCom 的数据模型定义在 `BaseXCom` 类中，包含以下核心字段：

| 字段名 | 类型 | 说明 |
|--------|------|------|
| key | String | XCom 值的键名，用于标识数据 |
| value | JSON | 序列化的数据值，支持基础类型和嵌套结构 |
| timestamp | DateTime | XCom 创建时间戳 |
| task_id | String | 推送该 XCom 的任务 ID |
| dag_id | String | 所属 DAG 的唯一标识符 |
| run_id | String | DAG Run 的执行 ID |
| map_index | Integer | Map 任务中的索引值（-1 表示非 Map 任务）|

资料来源：[airflow-core/src/airflow/models/xcom.py:1-100]()

### XCom API 与操作方法

#### 推送数据

```python
# 方式一：显式推送
task_instance.xcom_push(key="result", value={"status": "success", "data": [1, 2, 3]})

# 方式二：通过 return 语句自动推送
def extract_data(**context):
    return {"records": 100, "source": "api"}
```

#### 拉取数据

```python
# 按任务 ID 拉取最新值
result = task_instance.xcom_pull(task_ids=["transform_data"])[0]

# 按 task_ids 和 key 精确拉取
value = task_instance.xcom_pull(task_ids=["extract"], key="record_count")
```

#### 使用 XComArg 简化操作

`XComArg` 提供了一种声明式的数据获取方式，特别适合与 Jinja 模板结合使用：

```python
from airflow.models.baseoperator import xcom_arg

extract_task >> transform_task
```

### XCom 的序列化与反序列化

XCom 值在存储前会被序列化为 JSON 格式，检索时自动反序列化。支持的类型包括：

| Python 类型 | 序列化格式 | 限制说明 |
|-------------|-----------|----------|
| str | JSON String | 无限制 |
| int, float | JSON Number | 无限制 |
| bool | JSON Boolean | 无限制 |
| list, tuple | JSON Array | 元素类型需可序列化 |
| dict | JSON Object | 键值需可序列化 |
| datetime | ISO 8601 字符串 | 需正确处理时区 |
| bytes | Base64 编码 | 大小受 max_xcom_size 限制 |

资料来源：[airflow-core/src/airflow/models/xcom.py:150-200]()

### XCom 配置与性能优化

XCom 的行为可通过以下配置参数调整：

| 参数 | 默认值 | 说明 |
|------|--------|------|
| `max_xcom_size` | 48KB | 单个 XCom 值的最大大小 |
| `xcom_backend` | BaseXCom | 自定义 XCom 后端类 |
| `enable_xcom_pickling` | True | 是否允许 pickle 序列化（非 JSON） |

在高并发场景下，建议：
1. 避免传输大型数据集，将大数据存储在外部存储（如 S3、HDFS），XCom 只传递引用
2. 使用自定义 XCom 后端将数据存储到 GCS、Redis 等外部系统
3. 及时清理过期 XCom 数据以减少元数据库负担

## Asset 资产机制

### Asset 概述

Asset 是 Airflow 2.10+ 引入的新一代数据依赖管理机制，它代表 DAG 外部的数据源或数据目的地。通过 Asset，Airflow 可以感知数据源的变化并自动触发 DAG 执行，实现数据驱动的工作流编排。

```mermaid
graph LR
    A[数据源变化] -->|触发| B[Asset 事件]
    B --> C{DAG 调度}
    C -->|是| D[创建 DAG Run]
    C -->|否| E[等待下次检查]
    
    F[Asset 定义] -->|关联| G[DAG Schedule]
```

### Asset 数据模型

Asset 模型定义在 `airflow.models.asset` 模块中：

```python
class Asset:
    name: str
    uri: str
    group: str | None
    extra: dict | None
    created_at: datetime
    updated_at: datetime
```

资料来源：[airflow-core/src/airflow/models/asset.py:1-50]()

### Asset 与 DAG Schedule 绑定

DAG 可以通过 `schedule` 参数与一个或多个 Asset 关联：

```python
from airflow.models.asset import Asset

# 单个 Asset
daily_sales = Asset(uri="s3://data-bucket/daily-sales/", name="daily_sales")

with DAG(
    dag_id="process_sales",
    schedule=[daily_sales],  # 当 Asset 有新事件时触发
    ...
):
    process_sales_task()
```

### Asset Event 事件机制

每当外部数据源发生变化时，可以向 Airflow 报告 Asset Event：

```python
from airflow.sdk import AssetEvent

AssetEvent(
    asset=daily_sales,
    extra={"partition": "2024-01-15"},
    source_task_instance_id="report_ingestion",
)
```

### Asset 命令行操作

Airflow CLI 提供了完整的 Asset 管理命令：

| 命令 | 功能 |
|------|------|
| `airflow assets list` | 列出所有注册的 Asset |
| `airflow assets details` | 查看 Asset 详情 |
| `airflow assets materialize` | 手动触发 Asset 物化 |
| `airflow assets events` | 查看 Asset 事件历史 |

资料来源：[airflow-core/src/airflow/cli/cli_config.py:1-100]()

## Callback 回调机制

### Callback 机制概述

Callback 允许在 DAG 和任务生命周期中的关键节点执行自定义逻辑。这些节点包括任务失败、成功、重试以及 DAG 运行时错误等场景。Callback 通过 `airflow.callbacks.callback_requests` 模块中定义的数据结构传递给执行器。

```mermaid
graph TD
    A[任务状态变更] -->|生成| B[CallbackRequest]
    B --> C[(Callback 队列)]
    C -->|处理| D[Callback Executor]
    D --> E[执行用户逻辑]
    
    F[on_failure_callback] -.-> E
    G[on_success_callback] -.-> E
    H[on_retry_callback] -.-> E
```

### Callback Request 数据结构

Callback 请求通过以下类定义：

| 类名 | 用途 | 触发时机 |
|------|------|----------|
| `TaskCallbackRequest` | 任务级回调 | 任务成功/失败/重试 |
| `DagCallbackRequest` | DAG 级回调 | DAG 运行失败/成功 |
| `SlaCallbackRequest` | SLA 超时回调 | 任务超出 SLA 阈值 |
| `DagRunCallbackRequest` | DAG Run 回调 | DAG Run 状态变更 |

资料来源：[airflow-core/src/airflow/callbacks/callback_requests.py:1-80]()

### TaskCallbackRequest 详解

```python
class TaskCallbackRequest(CallbackRequest):
    simple_task_instance: SimpleTaskInstance
    msg: str | None
    processor_subprocess: bool
```

| 字段 | 类型 | 说明 |
|------|------|------|
| simple_task_instance | SimpleTaskInstance | 任务实例的简化表示 |
| msg | str | 回调消息（如错误信息）|
| processor_subprocess | bool | 是否在子进程中处理 |

### Callback 注册方式

#### 方式一：任务级别

```python
def task_failure_alert(context):
    dag_id = context['dag_run'].dag_id
    task_id = context['task'].task_id
    send_alert(dag_id, task_id)

with DAG(...) as dag:
    task = PythonOperator(
        task_id="example_task",
        python_callable=my_func,
        on_failure_callback=task_failure_alert,
    )
```

#### 方式二：DAG 级别

```python
def dag_failure_alert(context):
    dag_id = context['dag_run'].dag_id
    send_dag_failure_notification(dag_id)

with DAG(
    dag_id="my_dag",
    on_failure_callback=dag_failure_alert,
):
    # tasks...
```

#### 方式三：使用 Airflow API 动态注册

```python
from airflow.models import TaskCallbackRequest
from airflow.callbacks.callback_requests import _prepare_callback

# 在执行器中动态注册
callback_request = _prepare_callback(
    ti=task_instance,
    callback_type="failure",
    msg="Task failed with exception",
)
```

## 数据流转最佳实践

### 避免数据膨胀

XCom 数据存储在元数据库中，过大的数据量会影响数据库性能。建议遵循以下原则：

1. **小数据原则**：XCom 值应保持在 10KB 以下
2. **引用传递**：大型数据使用 URI/路径引用
3. **定期清理**：配置 XCom 过期策略

### 数据验证与类型安全

在任务间传递数据时，应实施严格的验证策略：

```python
from pydantic import BaseModel, validator

class DataRecord(BaseModel):
    id: int
    value: float
    tags: list[str]
    
    @validator('value')
    def value_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('value must be positive')
        return v

def process_data(**context):
    raw_data = context['ti'].xcom_pull(task_ids='extract', key='data')
    validated = DataRecord(**raw_data)
    return validated.dict()
```

### 幂等性设计

任务应设计为幂等的，即相同输入多次执行产生相同结果。这在使用 XCom 进行数据传递时尤为重要，因为任务可能因重试而多次执行。

### Asset 与 XCom 的选择

| 场景 | 推荐机制 | 原因 |
|------|----------|------|
| 触发 DAG 执行 | Asset | 自动调度，感知数据源变化 |
| 任务间传递结果 | XCom | 轻量，适合小数据量 |
| 共享大型数据集 | 外部存储 + XCom 引用 | 避免元数据库压力 |
| 事件驱动工作流 | Asset + TriggerDagRunOperator | 结合使用 |

## 总结

Apache Airflow 的数据流转与交换机制通过 XCom、Asset 和 Callback 三大组件，为分布式工作流环境提供了完整的数据协调能力。XCom 专注于任务间的数据传递，Asset 实现了数据驱动的 DAG 触发，而 Callback 则在关键生命周期节点提供了扩展点。理解并合理运用这些机制，能够构建出高效、可靠且可维护的数据流水线。

资料来源：[airflow-core/docs/core-concepts/xcoms.rst:1-50]()
资料来源：[airflow-core/docs/authoring-and-scheduling/assets.rst:1-50]()

---

<a id='fastapi'></a>

## FastAPI核心API

### 相关页面

相关主题：[React前端架构](#frontend)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/api_fastapi/core_api/app.py](https://github.com/apache/apache-airflow/blob/main/airflow-core/src/airflow/api_fastapi/core_api/app.py)
- [airflow-core/src/airflow/api_fastapi/execution_api/app.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/execution_api/app.py)
- [airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py)
- [airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py](https://github.com/apache/apache-airflow/blob/main/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py)
- [airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py)
- [airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)
</details>

# FastAPI核心API

## 概述

Apache Airflow的FastAPI核心API是项目现代化Web服务架构的重要组成部分，旨在替代传统的Flask API。这一API层采用FastAPI框架构建，提供了高性能、异步支持、自动文档生成等特性。FastAPI核心API主要分为两个独立的应用程序：**Core API**（核心API）和**Execution API**（执行API），分别承担不同的职责。

FastAPI核心API位于 `airflow-core/src/airflow/api_fastapi/` 目录下，采用模块化设计，将路由、数据模型、认证管理器等组件分离到独立的子模块中。这种架构设计使得代码组织清晰，便于维护和扩展。

## 架构概览

```mermaid
graph TB
    subgraph "FastAPI应用层"
        CA[Core API<br/>core_api/app.py]
        EA[Execution API<br/>execution_api/app.py]
    end
    
    subgraph "认证层"
        SAM[SimpleAuthManager<br/>simple_auth_manager.py]
        AM[Auth Managers]
    end
    
    subgraph "路由层"
        UI_DAGS[UI DAG Routes<br/>routes/ui/dags.py]
        API_ROUTES[API Routes]
    end
    
    subgraph "数据模型层"
        DM[DAG Run Datamodels<br/>datamodels/dag_run.py]
        DT[其他Datamodels]
    end
    
    CA --> SAM
    EA --> SAM
    CA --> UI_DAGS
    CA --> API_ROUTES
    CA --> DM
    EA --> DM
```

## 核心组件

### Core API应用

Core API是Airflow的主要REST API服务，负责提供DAG管理、任务调度、监控等核心功能。该应用通过 `core_api/app.py` 文件初始化和配置，包含以下关键特性：

- 异步请求处理支持
- OpenAPI自动文档生成
- Pydantic数据验证
- 依赖注入系统
- 中间件支持

Core API的路由结构按功能模块划分，包括UI路由、作业路由、配置路由等多个子模块。这种模块化设计使得API扩展和维护更加便捷。

### Execution API应用

Execution API是专门用于任务执行的轻量级API服务，通过 `execution_api/app.py` 实现。该API主要负责：

- 任务实例的执行状态管理
- XCom消息传递
- 任务心跳检测
- 执行结果回调

Execution API采用独立部署模式，与Core API解耦，以提高系统的可靠性和响应速度。

## 数据模型

### DAG Run数据模型

DAG Run数据模型定义了DAG运行实例的结构，是整个调度系统的核心数据结构之一。在 `datamodels/dag_run.py` 中定义的主要字段包括：

| 字段名 | 类型 | 说明 | 资料来源 |
|--------|------|------|----------|
| dag_id | str | DAG唯一标识符 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| run_id | str | 运行实例唯一标识 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| state | DagRunState | 运行状态枚举 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| execution_date | datetime | 执行时间戳 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| start_date | datetime | 开始时间 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| end_date | datetime | 结束时间 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |
| conf | dict | DAG配置参数 | [dag_run.py](airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py) |

### 数据验证

所有通过API接收的数据都会经过Pydantic模型验证，确保数据类型和格式的正确性。这种自动验证机制减少了手动编写数据校验代码的需求，提高了开发效率。

## 路由结构

### UI DAG路由

UI DAG路由是面向Web界面的API端点集合，通过 `routes/ui/dags.py` 文件实现。这些路由主要用于：

- DAG列表查询
- DAG运行历史记录
- DAG状态监控
- 任务依赖关系可视化

UI路由通常需要用户认证，并返回适合前端渲染的数据格式。

### 路由组织方式

```mermaid
graph LR
    subgraph "路由注册"
        RC[Route Configuration<br/>cli_config.py]
    end
    
    subgraph "CLI命令"
        CMD1[airflow dags]
        CMD2[airflow tasks]
        CMD3[airflow connections]
    end
    
    RC --> CMD1
    RC --> CMD2
    RC --> CMD3
    
    CMD1 --> DAG_ROUTES[DAG Routes]
    CMD2 --> TASK_ROUTES[Task Routes]
    CMD3 --> CONN_ROUTES[Connection Routes]
```

在 `cli_config.py` 中定义了Airflow CLI命令与API路由的映射关系，支持的命令包括：

| 命令组 | 子命令 | 功能说明 |
|--------|--------|----------|
| dags | backfill, list-runs, pause, unpause, test | DAG生命周期管理 |
| tasks | list, run, state, clear | 任务操作管理 |
| connections | list, add, delete, edit | 连接配置管理 |
| providers | list, get | Provider信息查询 |

## 认证与授权

### SimpleAuthManager

SimpleAuthManager是FastAPI认证系统的核心组件，位于 `auth/managers/simple/simple_auth_manager.py`。它提供了简化的认证机制，适用于单机部署或开发环境。

主要功能包括：

- 用户身份验证
- 会话管理
- 权限检查
- API密钥验证

### 认证管理器架构

```mermaid
classDiagram
    class BaseAuthManager {
        <<abstract>>
        +authenticate()
        +get_user()
        +is_authorized()
    }
    
    class SimpleAuthManager {
        +authenticate()
        +get_user()
        +is_authorized()
        +get_user_role()
    }
    
    BaseAuthManager <|-- SimpleAuthManager
```

### 认证流程

认证请求的标准处理流程如下：

1. 客户端发送带认证信息的请求
2. 中间件拦截请求并提取认证凭证
3. SimpleAuthManager验证凭证有效性
4. 根据用户角色确定访问权限
5. 返回认证结果或拒绝访问

## CLI集成

### 命令注册机制

在 `cli_config.py` 中使用懒加载模式注册CLI命令，这种方式可以提高应用启动速度：

```python
ActionCommand(
    name="info",
    help="Show information about current Airflow and environment",
    func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
    args=(ARG_ANONYMIZE, ARG_FILE_IO, ARG_VERBOSE, ARG_OUTPUT),
)
```

### 懒加载命令

懒加载机制确保只有在实际调用命令时才加载对应的模块，这有助于：

- 减少内存占用
- 加快应用启动时间
- 避免不必要的模块依赖

支持的懒加载命令类型包括：

- `info_command` - 环境信息展示
- `plugins_command` - 插件信息导出
- `standalone_command` - 独立运行模式
- `config_command` - 配置管理
- `version_command` - 版本信息
- `cheat_sheet_command` - 命令速查表

## 配置管理

### 配置命令

FastAPI核心API提供了丰富的配置管理功能，包括：

| 命令 | 功能 | 说明 |
|------|------|------|
| config get-value | 获取配置值 | 打印指定配置项的值 |
| config list | 列出配置 | 显示所有配置选项 |
| config lint | 配置检查 | 验证配置迁移正确性 |
| config update | 更新配置 | 修改配置值 |

### 配置参数

配置API支持多种参数用于过滤和格式化输出：

- `--section` - 指定配置章节
- `--option` - 指定配置选项
- `--include-descriptions` - 包含描述信息
- `--include-examples` - 包含示例值
- `--include-sources` - 显示配置来源
- `--include-env-vars` - 显示环境变量
- `--hide-sensitive` - 隐藏敏感信息

## API响应格式

### 统一响应结构

FastAPI核心API采用统一的响应格式，便于客户端处理：

```json
{
  "data": {},
  "meta": {
    "status": "success",
    "timestamp": "2024-01-01T00:00:00Z"
  }
}
```

### 错误响应

错误响应遵循统一的格式，包含错误码、消息和详细信息：

```json
{
  "error": {
    "code": "DAG_NOT_FOUND",
    "message": "DAG with id 'example_dag' not found",
    "details": {}
  }
}
```

## 技术特性

### 异步支持

FastAPI核心API全面支持异步操作，包括：

- 异步数据库查询
- 异步HTTP请求
- 异步文件操作
- 并发任务处理

### 自动文档

基于OpenAPI标准，API自动生成交互式文档：

- Swagger UI 可通过 `/docs` 访问
- ReDoc 可通过 `/redoc` 访问
- OpenAPI JSON 可通过 `/openapi.json` 获取

### 数据验证

使用Pydantic v2进行严格的数据验证和转换：

- 类型检查
- 默认值处理
- 自定义验证器
- 嵌套模型支持

## 部署架构

### 多实例部署

在生产环境中，FastAPI核心API支持多实例部署：

```mermaid
graph TB
    LB[Load Balancer]
    API1[Core API Instance 1]
    API2[Core API Instance 2]
    API3[Core API Instance 3]
    DB[(Database)]
    Redis[(Redis)]
    
    LB --> API1
    LB --> API2
    LB --> API3
    API1 --> DB
    API2 --> DB
    API3 --> DB
    API1 --> Redis
    API2 --> Redis
    API3 --> Redis
```

### 与传统Flask API对比

| 特性 | FastAPI | Flask |
|------|---------|-------|
| 性能 | 异步原生支持 | 同步为主 |
| 文档 | 自动生成OpenAPI | 需手动编写 |
| 验证 | Pydantic自动验证 | 手动验证 |
| 类型安全 | 完整类型注解 | 运行时检查 |
| 并发 | 原生异步支持 | 需扩展支持 |

## 安全考虑

### 敏感信息保护

API在处理敏感信息时采取以下保护措施：

- 敏感配置加密存储
- 日志输出脱敏处理
- 环境变量优先读取
- 连接凭证安全传输

### 权限控制

基于角色的访问控制（RBAC）确保API操作的安全性：

- 管理员权限 - 完全访问
- 编辑权限 - 修改配置
- 只读权限 - 查询数据
- 无权限 - 拒绝访问

## 扩展开发

### 自定义路由

开发者可以通过以下步骤添加自定义路由：

1. 在相应模块下创建路由文件
2. 定义路由函数和参数
3. 注册路由到应用
4. 添加CLI命令映射

### 自定义认证

扩展认证机制需要实现以下接口：

- `authenticate()` - 用户认证方法
- `get_user()` - 获取用户信息
- `is_authorized()` - 权限检查方法

## 最佳实践

### API设计原则

1. **RESTful规范** - 遵循REST设计原则
2. **版本控制** - 保持API向后兼容
3. **错误处理** - 返回有意义的错误信息
4. **性能优化** - 使用异步操作提高吞吐量
5. **文档维护** - 及时更新API文档

### 性能优化建议

- 使用连接池管理数据库连接
- 合理使用缓存减少数据库查询
- 实现请求限流防止滥用
- 监控API响应时间及时发现问题

## 相关文档

- [Core API完整路由参考](airflow-core/src/airflow/api_fastapi/core_api/routes)
- [Execution API参考](airflow-core/src/airflow/api_fastapi/execution_api)
- [认证管理器文档](airflow-core/src/airflow/api_fastapi/auth/managers)
- [数据模型定义](airflow-core/src/airflow/api_fastapi/core_api/datamodels)

---

<a id='frontend'></a>

## React前端架构

### 相关页面

相关主题：[FastAPI核心API](#fastapi)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/ui/src/pages/DagRuns.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx)
- [airflow-core/src/airflow/ui/src/pages/Providers.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/Providers.tsx)
- [airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx)
- [airflow-core/src/airflow/ui/src/pages/Run/Header.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx)
- [airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx)
- [airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx)
- [airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx)
- [airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx)
- [airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx)
- [airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx)
- [airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx)
- [airflow-core/src/airflow/ui/src/components/ui/Tag.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/ui/Tag.tsx)
- [dev/react-plugin-tools/react_plugin_template/README.md](https://github.com/apache/airflow/blob/main/dev/react-plugin-tools/react_plugin_template/README.md)
</details>

# React前端架构

## 概述

Apache Airflow 的 React 前端是一个现代化的单页应用（SPA），采用 React 18、TypeScript 和 Chakra UI 构建，为用户提供了直观、高效的 DAG 管理和监控界面。该前端架构遵循组件化设计原则，通过 TanStack Table 实现数据表格功能，使用 react-router-dom 进行路由管理，并集成了 react-i18next 实现国际化支持。资料来源：[dev/react-plugin-tools/react_plugin_template/README.md](https://github.com/apache/airflow/blob/main/dev/react-plugin-tools/react_plugin_template/README.md)

## 技术栈概览

### 核心依赖

| 技术 | 用途 | 版本说明 |
|------|------|----------|
| React 18 | UI 框架 | 核心库 |
| TypeScript | 类型系统 | 类型安全 |
| Chakra UI | UI 组件库 | 样式和组件 |
| TanStack Table | 数据表格 | 排序、分页 |
| react-router-dom | 路由管理 | SPA 导航 |
| react-i18next | 国际化 | 多语言支持 |

### 构建工具

前端项目使用 **Vite** 作为构建工具，提供快速的开发服务器和优化的生产构建。Vite 配置将 React 和相关生态库标记为外部依赖，以减少插件包体积。资料来源：[dev/react-plugin-tools/react_plugin_template/README.md](https://github.com/apache/airflow/blob/main/dev/react-plugin-tools/react_plugin_template/README.md)

## 页面组件架构

### 页面目录结构

Airflow 前端的页面组件位于 `airflow-core/src/airflow/ui/src/pages/` 目录下，采用模块化组织方式：

```
pages/
├── DagRuns.tsx          # DAG运行列表页
├── DagsList/            # DAG列表模块
│   └── DagsList.tsx
├── Providers.tsx        # 提供商信息页
├── Run/                 # 运行详情模块
│   └── Header.tsx
├── XCom/                # XCom数据模块
│   └── XCom.tsx
└── HITLTaskInstances/   # 人工干预任务实例模块
    └── HITLTaskInstances.tsx
```

### 页面组件通用模式

每个页面组件通常遵循以下结构模式：

1. **导入依赖**：引入必要的 React hooks、组件和工具函数
2. **类型定义**：定义页面专用的 TypeScript 接口
3. **列定义工厂函数**：创建 `createColumns` 函数用于生成 TanStack Table 列配置
4. **主组件导出**：使用自定义 hooks 获取数据并渲染页面

```typescript
const createColumns = (translate: TFunction): Array<ColumnDef<ProviderResponse>> => [
  {
    accessorKey: "package_name",
    cell: ({ row: { original } }) => (
      <Link ...>
        {original.package_name}
      </Link>
    ),
    enableSorting: false,
    header: translate("providers.columns.packageName"),
  },
  // ... 更多列定义
];
```

资料来源：[airflow-core/src/airflow/ui/src/pages/Providers.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/Providers.tsx)

## 核心组件体系

### UI 基础组件

基础 UI 组件位于 `airflow-core/src/airflow/ui/src/components/ui/` 目录下，提供可复用的界面元素。

#### Tag 组件

`Tag` 组件是对 Chakra UI Tag 的封装，提供了统一的标签样式：

```typescript
const Tag = React.forwardRef<HTMLSpanElement, TagProps>(({ 
  closable = Boolean(onClose), 
  endElement, 
  startElement, 
  ...rest 
}) => {
  return (
    <ChakraTag.Root ref={ref} {...rest}>
      {Boolean(startElement) ? <ChakraTag.StartElement>{startElement}</ChakraTag.StartElement> : undefined}
      <ChakraTag.Label>{children}</ChakraTag.Label>
      {Boolean(endElement) ? <ChakraTag.EndElement>{endElement}</ChakraTag.EndElement> : undefined}
      {Boolean(closable) ? (
        <ChakraTag.EndElement>
          <ChakraTag.CloseTrigger onClick={onClose} />
        </ChakraTag.EndElement>
      ) : undefined}
    </ChakraTag.Root>
  );
});
```

| 属性 | 类型 | 说明 |
|------|------|------|
| closable | boolean | 是否显示关闭按钮 |
| endElement | ReactNode | 标签尾部元素 |
| startElement | ReactNode | 标签头部元素 |
| onClose | () => void | 关闭回调函数 |

资料来源：[airflow-core/src/airflow/ui/src/components/ui/Tag.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/ui/Tag.tsx)

### 数据展示组件

#### DAG 触发运行组件 (TriggeredRuns)

`TriggeredRuns` 组件用于展示由 DAG 触发的运行信息，支持单个和多个运行场景：

```typescript
export const TriggeredRuns = ({ dagRuns }: Props) => {
  if (dagRuns === undefined || dagRuns.length === 0) {
    return undefined;
  }

  return dagRuns.length === 1 ? (
    <Flex gap={1}>
      <Text>{`${translate("triggered")} ${translate("dagRun_one")}: `}</Text>
      <StateBadge state={dagRuns[0]?.state as DagRunState} />
      <Link asChild color="fg.info">
        <RouterLink to={`/dags/${dagRuns[0]?.dag_id}/runs/${dagRuns[0]?.run_id}`}>
          {dagRuns[0]?.dag_id}
        </RouterLink>
      </Link>
    </Flex>
  ) : (
    <Popover.Root autoFocus={false} lazyMount unmountOnExit>
      <Popover.Trigger asChild>
        <Button size="sm" variant="outline">
          {`${dagRuns.length} ${translate("triggered")} ...`}
        </Button>
      </Popover.Trigger>
      <Popover.Content ...>
        {/* 多个运行时显示下拉列表 */}
      </Popover.Content>
    </Popover.Root>
  );
};
```

| 场景 | 渲染方式 |
|------|----------|
| 单个运行 | 内联展示 DAG ID 和状态徽章 |
| 多个运行 | 下拉菜单，点击展开列表 |

资料来源：[airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Assets/TriggeredRuns.tsx)

### 任务实例工具提示组件

`TaskInstanceTooltip` 组件提供任务实例的详细信息悬浮提示：

```typescript
<TaskInstanceTooltip
  state={taskInstance.state}
  taskId={taskInstance.task_id}
  runId={runId}
  queuedWhen={taskInstance.queued_when}
  scheduledWhen={taskInstance.scheduled_when}
  startDate={taskInstance.start_date}
  endDate={taskInstance.end_date}
  tryNumber={taskInstance.try_number}
/>
```

显示的信息包括：任务 ID、状态、运行 ID、调度时间、队列时间、开始时间、结束时间以及重试次数。资料来源：[airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx)

### 对话框组件

#### 清除任务实例确认对话框

`ClearTaskInstanceConfirmationDialog` 是用于确认清除任务实例操作的模态对话框：

```typescript
<Dialog.Root open={isOpen} onOpenChange={onClose}>
  <Dialog.Content>
    <Dialog.Header>
      <Dialog.Title>
        <Icon color="tomato" pr="2" size="lg">
          <GoAlertFill />
        </Icon>
        {translate("dags:runAndTaskActions.confirmationDialog.title")}
      </Dialog.Title>
      <Dialog.Description>
        {translate("dags:runAndTaskActions.confirmationDialog.description", {
          state: taskCurrentState,
          time: getRelativeTime(firstInstance.start_date),
          user: firstInstance?.unixname ?? "unknown user",
        })}
      </Dialog.Description>
    </Dialog.Header>
    <Dialog.Footer>
      <Button colorPalette="blue" onClick={onClose}>
        {translate("common:modal.confirm")}
      </Button>
    </Dialog.Footer>
  </Dialog.Content>
</Dialog.Root>
```

对话框使用 Radix UI 的 Dialog 组件，提供标题、描述和底部操作区域，支持国际化翻译。资料来源：[airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx)

#### 批量清除任务实例对话框

`ClearGroupTaskInstanceDialog` 用于批量清除一组任务实例，支持以下参数：

| 参数 | 类型 | 说明 |
|------|------|------|
| future | boolean | 是否包含未来运行 |
| past | boolean | 是否包含过去运行 |
| upstream | boolean | 是否包含上游任务 |
| onlyFailed | boolean | 仅清除失败任务 |
| runOnLatestVersion | boolean | 使用最新 DAG 版本运行 |
| note | string \| null | 清除操作备注 |
| groupTaskIds | string[] | 任务 ID 列表 |

资料来源：[airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx)

### 模态框组件

#### DAG 导入错误模态框

`DAGImportErrorsModal` 展示 DAG 导入过程中的错误信息：

- 使用 Accordion 组件折叠/展开每个错误详情
- 使用 Pagination 组件分页显示错误列表
- 显示错误时间戳和完整的堆栈跟踪信息

```typescript
<Accordion.Root ...>
  {importErrors.map((importError) => (
    <Accordion.Item key={importError.timestamp} value={importError.timestamp}>
      <Accordion.ItemTrigger>
        <Text>{importError.filename}</Text>
      </Accordion.ItemTrigger>
      <Accordion.ItemContent>
        <Text color="fg.error" fontSize="sm" whiteSpace="pre-wrap">
          <code>{importError.stack_trace}</code>
        </Text>
      </Accordion.ItemContent>
    </Accordion.Item>
  ))}
</Accordion.Root>
```

资料来源：[airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/components/Dashboard/Stats/DAGImportErrorsModal.tsx)

## 表格列配置系统

### TanStack Table 集成

Airflow 前端使用 TanStack Table（React Table）作为数据表格解决方案，通过列定义工厂函数生成可配置的表格结构。

### DAG 运行列表列配置

`DagRuns.tsx` 中的列配置示例：

| 访问器键 | 组件 | 功能 |
|----------|------|------|
| run_after | Time | 显示运行时间 |
| state | StateBadge | 任务状态徽章 |
| run_type | RunTypeIcon + Text | 运行类型图标和文字 |
| triggering_user_name | Text | 触发用户名称 |
| start_date | Time | 开始时间 |
| end_date | Time | 结束时间 |
| duration | renderDuration | 持续时间渲染 |

```typescript
{
  accessorKey: "state",
  cell: ({ row: { original: { state } } }) => (
    <StateBadge state={state}>
      {translate(`common:states.${state}`)}
    </StateBadge>
  ),
  header: () => translate("state"),
},
```

资料来源：[airflow-core/src/airflow/ui/src/pages/DagRuns.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx)

### DAG 列表特殊列

`DagsList.tsx` 包含一些独特的列定义：

```typescript
{
  accessorKey: "pending_actions",
  cell: ({ row: { original: dag } }) => (
    <NeedsReviewBadge dagId={dag.dag_id} pendingActions={dag.pending_actions} />
  ),
  enableSorting: false,
  header: "",
},
{
  accessorKey: "trigger",
  cell: ({ row: { original } }) => (
    <TriggerDAGButton
      allowedRunTypes={original.allowed_run_types}
      dagDisplayName={original.dag_display_name}
      dagId={original.dag_id}
      isPaused={original.is_paused}
    />
  ),
  enableSorting: false,
  header: "",
},
```

这些列用于显示待处理操作和 DAG 触发按钮，没有排序功能。资料来源：[airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/DagsList/DagsList.tsx)

## 国际化实现

### 翻译函数使用

前端使用 `react-i18next` 的 `useTranslation` hook 获取翻译函数：

```typescript
const { t: translate } = useTranslation("common");

// 在列配置中使用
header: () => translate("dagRun.runAfter")

// 动态键翻译
{translate(`common:states.${state}`)}
```

### 命名空间管理

翻译键使用冒号分隔命名空间，例如 `common:states.running`、`dagRun.runAfter` 等，允许多个翻译文件独立管理。资料来源：[airflow-core/src/airflow/ui/src/pages/DagRuns.tsx](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx)

## 路由与导航

### 路由模式

Airflow 前端使用 `react-router-dom` 进行客户端路由，主要导航模式包括：

```typescript
// 链接到 DAG 详情
<RouterLink to={`/dags/${dagId}/runs/${runId}`}>

// 链接到任务实例
<RouterLink to={getTaskInstanceLink({
  dagId: original.dag_id,
  dagRunId: original.run_id,
  mapIndex: original.map_index,
  taskId: original.task_id,
})}>
```

### 嵌套路由结构

页面通过嵌套路由组织，支持从 DAG 列表 → DAG 详情 → 任务实例的层级导航。

## React 插件系统

### 插件模板架构

Airflow 提供 React 插件模板，支持第三方开发者构建可集成到主应用的组件库：

```typescript
import { PluginComponent } from 'your-plugin-name';

<PluginComponent />
```

### 插件开发指南

| 步骤 | 说明 |
|------|------|
| 组件开发 | 使用 React + TypeScript 编写功能组件 |
| 主题适配 | 自动继承 Airflow 主应用主题 |
| 构建配置 | 使用 Vite 构建为库文件 |
| 外部依赖 | React 生态库标记为外部以避免冲突 |

资料来源：[dev/react-plugin-tools/react_plugin_template/README.md](https://github.com/apache/airflow/blob/main/dev/react-plugin-tools/react_plugin_template/README.md)

## 数据流架构

```mermaid
graph TD
    A[用户交互] --> B[React 组件]
    B --> C[API 请求]
    C --> D[Airflow REST API]
    D --> E[后端数据库]
    E --> D
    D --> C
    C --> F[状态更新]
    F --> B
    B --> G[UI 渲染]
    
    H[TanStack Table] --> I[列定义配置]
    I --> B
    B --> J[数据展示]
    
    K[useTranslation] --> L[翻译键]
    L --> M[翻译文件]
    M --> K
```

## 组件生命周期管理

### 状态管理

组件使用 React hooks 管理状态：

- `useState`：本地组件状态
- `useEffect`：副作用处理
- `useTranslation`：国际化
- `useParams`：URL 参数获取
- `useTableURLState`：表格 URL 状态同步

### 条件渲染模式

```typescript
// 使用短路求值
{Boolean(dagRuns.length) && <Component />}

// 使用三元运算符
{condition ? <TrueComponent /> : <FalseComponent />}

// 使用 && 运算符
{hasCondition && <Component />}
```

## 样式系统

### Chakra UI 集成

前端使用 Chakra UI 作为主要样式解决方案，提供：

- 预设的设计令牌（颜色、间距、字体）
- 可访问性内置支持
- 主题定制能力
- 组件组合模式

### 颜色语义

| 语义颜色 | 用途 | 示例 |
|----------|------|------|
| fg.info | 信息文本 | 链接 |
| fg.error | 错误文本 | 错误消息 |
| bg.emphasized | 强调背景 | 弹窗背景 |

## 最佳实践

### 组件开发规范

1. **类型安全**：所有组件使用 TypeScript，明确输入输出类型
2. **可访问性**：使用语义化 HTML 和 ARIA 属性
3. **国际化**：所有用户可见文本使用翻译函数
4. **错误处理**：组件包含错误边界和降级处理
5. **性能优化**：使用 `lazyMount` 和 `unmountOnExit` 延迟加载

### 文件组织原则

- **页面组件**：位于 `pages/` 目录，包含业务逻辑
- **通用组件**：位于 `components/` 目录，可复用
- **UI 组件**：位于 `components/ui/` 目录，基础元素
- **布局组件**：位于 `layouts/` 目录，页面结构

## 总结

Apache Airflow 的 React 前端采用了现代化的组件化架构，通过 TanStack Table 实现灵活的数据表格功能，借助 Chakra UI 提供一致的用户界面体验，使用 react-i18next 支持多语言环境。插件系统的设计允许第三方开发者扩展功能，同时保持与主应用的主题一致性。整个前端架构体现了可维护性、可扩展性和用户体验的平衡。

---

<a id='deployment'></a>

## 容器化与Kubernetes部署

### 相关页面

相关主题：[执行器类型与选择](#executors), [快速开始与安装指南](#quickstart)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py)
- [dev/breeze/src/airflow_breeze/global_constants.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/global_constants.py)
- [airflow-core/src/airflow/example_dags/example_kubernetes_executor.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py)
- [docker-stack-docs/README.md](https://github.com/apache/airflow/blob/main/docker-stack-docs/README.md)
- [chart/docs/production-guide.rst](https://github.com/apache/airflow/blob/main/chart/docs/production-guide.rst)
</details>

# 容器化与Kubernetes部署

Apache Airflow 作为分布式工作流编排平台，支持通过容器化和 Kubernetes 实现灵活的部署方案。本文详细介绍 Airflow 的 Docker 镜像构建机制、Kubernetes 部署架构以及生产环境最佳实践。

## 1. Docker 容器化概述

Apache Airflow 提供官方的 `apache/airflow` Docker 镜像，该镜像是构建生产级部署环境的基础。镜像采用多阶段构建技术，基于 Python slim 镜像优化体积。

### 1.1 Docker 构建配置

Airflow 的 Dockerfile 使用 Docker BuildKit 语法，支持通过 `--mount=type=cache` 挂载缓存目录以加速构建过程：

```dockerfile
# syntax=docker/dockerfile:1.4
FROM python:{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}-slim-{ALLOWED_DEBIAN_VERSIONS[0]}
RUN apt-get update && apt-get install -y --no-install-recommends libatomic1 git curl
RUN pip install uv=={UV_VERSION}
RUN --mount=type=cache,id=cache-airflow-build-dockerfile-installation,target=/root/.cache/ \
  uv pip install --system ignore pip=={AIRFLOW_PIP_VERSION} hatch=={HATCH_VERSION} \
  pyyaml=={PYYAML_VERSION} gitpython=={GITPYTHON_VERSION} rich==
```

资料来源：[dev/breeze/src/airflow_breeze/commands/release_management_commands.py:52-58]()

### 1.2 入口点与初始化

Docker 镜像的入口点脚本负责初始化 SQLite 数据库（当未配置外部数据库时）：

> 如果未设置 `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` 变量，则在 `${AIRFLOW_HOME}/airflow.db` 创建 SQLite 数据库。

资料来源：[docker-stack-docs/README.md:15-16]()

## 2. Kubernetes 部署架构

Airflow 在 Kubernetes 上的部署通过 Helm Chart 实现，支持高度可配置的分布式架构。

### 2.1 支持的 Kubernetes 版本

Apache Airflow 根据三大云服务商的 Kubernetes 支持周期维护兼容版本列表：

| 云服务商 | 生命周期页面 |
|---------|-------------|
| Amazon EKS | [endoflife.date/amazon-eks](https://endoflife.date/amazon-eks) |
| Azure Kubernetes Service | [endoflife.date/azure-kubernetes-service](https://endoflife.date/azure-kubernetes-service) |
| Google Kubernetes Engine | [endoflife.date/google-kubernetes-engine](https://endoflife.date/google-kubernetes-engine) |

当前允许的 Kubernetes 版本列表：

| 版本 | 状态 |
|-----|------|
| v1.30.13 | ✅ 支持 |
| v1.31.12 | ✅ 支持 |
| v1.32.8 | ✅ 支持 |
| v1.33.4 | ✅ 支持 |
| v1.34.0 | ✅ 支持 |

资料来源：[dev/breeze/src/airflow_breeze/global_constants.py:ALLOWED_KUBERNETES_VERSIONS]()

### 2.2 Kubernetes 组件架构

Airflow 在 Kubernetes 环境中的核心组件包括：

```mermaid
graph TD
    subgraph "Kubernetes 集群"
        subgraph "airflow Namespace"
            Scheduler["Scheduler Pod<br/>调度器"]
            Webserver["Webserver Pod<br/>Web服务器"]
            Triggerer["Triggerer Pod<br/>触发器"]
            DagProcessor["Dag Processor Pod<br/>DAG处理器"]
        end
        
        subgraph "Pod 配置"
            Executor["Executor<br/>执行器"]
            Worker["Worker Pod<br/>工作节点"]
            K8SExecutor["K8S Executor<br/>Kubernetes执行器"]
        end
    end
    
    ExternalDB["外部数据库<br/>PostgreSQL/MySQL"]
    Redis["消息队列<br/>Redis"]
    
    Scheduler --> Executor
    Executor --> K8SExecutor
    K8SExecutor --> Worker
    Scheduler --> ExternalDB
    Webserver --> ExternalDB
    Triggerer --> Redis
```

### 2.3 多命名空间模式

Airflow 支持多命名空间部署模式，允许在不同命名空间中运行测试环境和生产环境：

```python
option_multi_namespace_mode = click.option(
    "--multi-namespace-mode",
    help="使用多命名空间模式",
    is_flag=True,
    envvar="MULTI_NAMESPACE_MODE",
)
```

资料来源：[dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:multi_namespace_mode]()

## 3. Kubernetes 部署命令

### 3.1 部署工作流程

```mermaid
graph LR
    A["breeze k8s deploy-airflow"] --> B["创建 KinD 集群"]
    B --> C["创建 Kubernetes 命名空间"]
    C --> D["部署 Helm Chart"]
    D --> E["配置 Airflow 组件"]
    E --> F["验证部署状态"]
```

### 3.2 核心部署选项

| 参数 | 说明 | 默认值 |
|-----|------|--------|
| `--python` | Python 版本 | 3.10 |
| `--kubernetes-version` | Kubernetes 集群版本 | v1.31.12 |
| `--executor` | 执行器类型 | LocalExecutor |
| `--deploy/--no-deploy` | 是否通过 Skaffold 部署 | False |
| `--upgrade` | 升级而非安装 Helm Chart | False |
| `--multi-namespace-mode` | 启用多命名空间模式 | False |
| `--rebuild-base-image` | 重建基础镜像 | False |

资料来源：[dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:option_executor]()

### 3.3 部署流程代码

部署 Airflow 到 Kubernetes 集群的核心逻辑：

```python
def _deploy_airflow(
    python: str,
    kubernetes_version: str,
    output: Output | None,
    executor: str,
    upgrade: bool,
    use_standard_naming: bool,
    wait_time_in_seconds: int,
    extra_options: str,
    multi_namespace_mode: bool,
):
    cluster_name = get_kubectl_cluster_name(
        python=python, kubernetes_version=kubernetes_version
    )
    # 创建命名空间
    get_console(output=output).print(f"[info]创建集群 {cluster_name} 的命名空间")
    run_command_with_k8s_env(
        ["kubectl", "create", "namespace", HELM_AIRFLOW_NAMESPACE],
        python=python,
        kubernetes_version=kubernetes_version,
        output=output,
        check=False,
    )
```

资料来源：[dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:_deploy_airflow]()

## 4. Kubernetes 执行器配置

### 4.1 Pod 覆盖机制

Kubernetes 执行器允许通过 `pod_override` 动态配置 Pod 规格：

```python
start_task_executor_config = {
    "pod_override": k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})
    )
}

@task(executor_config=start_task_executor_config)
def start_task():
    print_stuff()
```

资料来源：[airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:20-27]()

### 4.2 Volume 挂载配置

支持在 Task 执行时挂载持久化卷：

```python
executor_config_volume_mount = {
    "pod_override": k8s.V1Pod(
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    volume_mounts=[
                        k8s.V1VolumeMount(
                            mount_path="/foo/",
                            name="example-kubernetes-test-volume"
                        )
                    ],
                )
            ],
            volumes=[
                k8s.V1Volume(
                    name="example-kubernetes-test-volume",
                    host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                )
            ],
        )
    ),
}
```

资料来源：[airflow-core/src/airflow/example_dags/example_kubernetes_executor.py:31-56]()

### 4.3 执行器配置参数表

| 配置项 | 类型 | 说明 |
|-------|------|------|
| `pod_override` | V1Pod | Pod 覆盖规格 |
| `annotations` | Dict[str, str] | Pod 注解 |
| `labels` | Dict[str, str] | Pod 标签 |
| `node_selector` | Dict[str, str] | 节点选择器 |
| `tolerations` | List[V1Toleration] | 容忍度配置 |
| `affinity` | V1Affinity | 亲和性配置 |
| `resources` | V1ResourceRequirements | 资源限制 |

## 5. Docker Compose 开发环境

对于本地开发，Airflow 提供 Docker Compose 配置：

```yaml
services:
  airflow:
    image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:latest}
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
      AIRFLOW__CORE__FERNET_KEY: ''
      AIRFLOW__CORE__DAGS__FOLDER: ${AIRFLOW_PROJ_DIR:-.}/dags
      AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
```

资料来源：[airflow-core/docs/howto/docker-compose/docker-compose.yaml]()

### 5.1 Docker Compose 配置参数

| 环境变量 | 说明 | 默认值 |
|---------|------|--------|
| `AIRFLOW__CORE__EXECUTOR` | 执行器类型 | LocalExecutor |
| `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` | 数据库连接字符串 | sqlite |
| `AIRFLOW__CORE__FERNET_KEY` | 加密密钥 | - |
| `AIRFLOW__CORE__DAGS__FOLDER` | DAG 文件目录 | ./dags |
| `AIRFLOW__CORE__LOAD_EXAMPLES` | 加载示例 DAG | true |
| `AIRFLOW__CORE__AIRFLOW_HOME` | Airflow 主目录 | /opt/airflow |

## 6. 生产环境最佳实践

### 6.1 部署检查清单

| 检查项 | 说明 |
|-------|------|
| 外部数据库配置 | 使用 PostgreSQL 或 MySQL 而非 SQLite |
| 执行器选择 | 生产环境推荐 CeleryExecutor 或 KubernetesExecutor |
| 高可用配置 | 部署多个 Scheduler 和 Webserver 实例 |
| 资源限制 | 为各组件设置 CPU 和内存限制 |
| 健康检查 | 配置 readiness 和 liveness 探针 |
| 持久化存储 | 使用 PVC 持久化 DAG 和日志 |

### 6.2 安全配置

生产环境部署时应考虑以下安全措施：

1. **配置 Fernet 密钥**：启用敏感数据加密
2. **使用 Secrets**：通过 Kubernetes Secrets 管理凭据
3. **网络策略**：配置 Pod 间网络隔离
4. **RBAC**：启用基于角色的访问控制
5. **审计日志**：启用 Airflow 审计日志功能

### 6.3 监控与告警

建议集成的监控组件：

- **Metrics**: Prometheus + Grafana
- **Logging**: ELK Stack 或 Loki
- **Tracing**: Jaeger
- **Alerting**: Alertmanager

## 7. 快速部署命令

### 7.1 使用 Breeze 部署

```bash
# 部署到 Kind 集群
breeze k8s deploy-airflow --python 3.10 --kubernetes-version v1.31.12

# 升级现有部署
breeze k8s deploy-airflow --upgrade

# 启用多命名空间模式
breeze k8s deploy-airflow --multi-namespace-mode

# 进入集群 Shell
breeze k8s shell

# 运行测试
breeze k8s tests
```

### 7.2 Skaffold 开发循环

对于开发场景，支持热重载功能：

```bash
breeze k8s dev --skaffold-args "--auto-sync"
```

该命令会：
- 同步 DAG 文件到运行中的 Pod
- 热重载 airflow-core 源码（Scheduler/Triggerer/DAG Processor/API Server）
- 自动刷新 UI（暂不支持）

资料来源：[dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py:kubernetes_group]()

## 8. 总结

Apache Airflow 的容器化和 Kubernetes 部署方案提供了企业级的灵活性。通过官方 Docker 镜像、Helm Chart 以及 Breeze 工具链，开发者和运维人员可以：

- 在本地快速搭建开发环境
- 在 Kubernetes 上实现弹性扩展
- 通过 Kubernetes 执行器获得细粒度的资源控制
- 利用多命名空间支持隔离测试和生产环境

建议在生产部署前仔细评估业务需求，选择合适的执行器类型，并遵循本文档的最佳实践确保系统稳定性和安全性。

---

<a id='providers'></a>

## Provider生态系统

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [airflow-core/src/airflow/cli/cli_config.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/cli_config.py)
- [providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py)
- [providers/amazon/src/airflow/providers/amazon/aws/hooks/cloud_formation.py](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/cloud_formation.py)
- [providers/google/src/airflow/providers/google/event_scheduling/events/pubsub.py](https://github.com/apache/airflow/blob/main/providers/google/src/airflow/providers/google/event_scheduling/events/pubsub.py)
- [dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py)
- [dev/breeze/src/airflow_breeze/utils/packages.py](https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/utils/packages.py)
- [providers/amazon/src/airflow/providers/amazon/get_provider_info.py](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/get_provider_info.py)
- [airflow-core/src/airflow/utils/db.py](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/db.py)
</details>

# Provider生态系统

## 概述

Apache Airflow的Provider生态系统是Airflow的核心扩展机制，允许用户通过安装额外的Provider包来集成各种外部服务和平台。每个Provider提供特定的Hook、Operator、Trigger、Sensor和连接类型，使用户能够在DAG中与外部系统进行交互。

Provider作为独立的Python包发布到PyPI，可以通过`pip install apache-airflow-providers-<provider_name>`进行安装。这种模块化设计使得Airflow核心保持精简，同时用户可以根据实际需求选择性地添加所需的集成功能。

## 架构设计

### Provider核心组件

每个Provider包通常包含以下核心组件：

| 组件类型 | 描述 | 用途 |
|---------|------|------|
| Hook | 数据连接抽象类 | 提供与外部系统交互的基础接口 |
| Operator | 任务执行单元 | 定义具体的业务操作逻辑 |
| Sensor | 外部依赖检查器 | 轮询外部条件直到满足为止 |
| Trigger | 异步事件触发器 | 支持triggerer组件异步执行 |
| Connection | 连接配置 | 定义与外部系统的连接参数 |

### 层级架构

```mermaid
graph TD
    A[Airflow Core] --> B[Providers Manager]
    B --> C[Provider Package]
    C --> D[Hooks]
    C --> E[Operators]
    C --> F[Sensors]
    C --> G[Triggers]
    D --> H[External Services]
    E --> H
    F --> H
    G --> H
    
    subgraph "Provider Package Structure"
    I[get_provider_info.py]
    J[__init__.py]
    K[hooks/*.py]
    L[operators/*.py]
    M[sensors/*.py]
    end
```

## Hook实现机制

Hook是Provider与外部系统交互的基础桥梁。不同Provider的Hook都遵循统一的基类设计规范。

### AWS Hook实现示例

在Amazon Provider中，Hook通过boto3与AWS服务进行交互：

```python
class Ec2Hook(AwsBaseHook):
    API_TYPES = frozenset({"resource_type", "client_type"})

    def __init__(self, api_type="resource_type", *args, **kwargs) -> None:
        if api_type not in self.API_TYPES:
            raise AirflowException("api_type can only be one of %s", self.API_TYPES)
        kwargs[api_type] = "ec2"
        self._api_type = api_type
        super().__init__(*args, **kwargs)
```

资料来源：[providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py:24-36]()

### 连接类型注册

Provider通过`get_provider_info.py`文件声明其提供的连接类型：

```python
{
    "integration-name": "AWS EC2",
    "external-doc-url": "https://aws.amazon.com/ec2/",
    "logo": "/docs/integration-logos/AWS-EC2_light-bg@4x.png",
    "tags": ["aws"],
}
```

资料来源：[providers/amazon/src/airflow/providers/amazon/get_provider_info.py:45-52]()

## Provider生命周期管理

### 文档生成流程

Provider的文档通过Breeze工具自动生成，包括README、conf.py等文件：

```python
def _generate_docs_conf(context: dict[str, Any], provider_details: ProviderPackageDetails):
    docs_conf_content = render_template(
        template_name="conf",
        context=context,
        extension=".py",
        keep_trailing_newline=True,
    )
    docs_conf_path = provider_details.root_provider_path / "docs" / "conf.py"
    docs_conf_path.write_text(docs_conf_content)
```

资料来源：[dev/breeze/src/airflow_breeze/prepare_providers/provider_documentation.py:89-97]()

### 版本与依赖管理

Provider的最小Airflow版本依赖通过解析dependencies确定：

```python
def get_min_airflow_version(provider_id: str) -> str:
    provider_details = get_provider_details(provider_id=provider_id)
    min_airflow_version = MIN_AIRFLOW_VERSION
    for dependency in provider_details.dependencies:
        if dependency.startswith("apache-airflow>="):
            current_min_airflow_version = dependency.split(">=")[1]
            if "," in current_min_airflow_version:
                current_min_airflow_version = current_min_airflow_version.split(",")[0]
            if PackagingVersion(current_min_airflow_version) > PackagingVersion(MIN_AIRFLOW_VERSION):
                min_airflow_version = current_min_airflow_version
    return min_airflow_version
```

资料来源：[dev/breeze/src/airflow_breeze/utils/packages.py:78-91]()

## CLI命令集成

Provider功能通过Airflow CLI暴露，允许用户在命令行执行各种管理操作：

```python
GroupCommand(
    name="providers",
    help="Display providers",
    subcommands=PROVIDERS_COMMANDS,
),
```

资料来源：[airflow-core/src/airflow/cli/cli_config.py:89-92]()

### Provider相关命令

| 命令 | 功能 |
|------|------|
| `airflow providers list` | 列出所有已安装的Provider |
| `airflow providers links` | 显示Provider提供的链接 |
| `airflow providers secrets` | 获取secrets后端信息 |
| `airflow providers executors` | 获取Executor信息 |
| `airflow providers auth-managers` | 获取认证管理器信息 |

## 默认连接配置

Airflow在初始化数据库时会创建一系列默认连接，这些连接覆盖了常用的外部系统：

| 连接ID | 连接类型 | 默认配置 |
|--------|----------|----------|
| facebook_social | facebook_social | Facebook社交登录凭证 |
| fs_default | fs | 本地文件系统 |
| ftp_default | ftp | localhost:21 |
| google_cloud_default | google_cloud_platform | GCP默认项目 |
| http_default | http | httpbin.org |
| iceberg_default | iceberg | Iceberg REST服务 |

资料来源：[airflow-core/src/airflow/utils/db.py:180-220]()

## 事件驱动架构

### MessageQueueTrigger集成

Provider支持通过`MessageQueueTrigger`实现事件驱动的任务执行：

```python
class GooglePubSubEvent(Protocol):
    scheme = "google+pubsub"

    def trigger_class(self) -> type[BaseEventTrigger]:
        return PubsubPullTrigger
```

资料来源：[providers/google/src/airflow/providers/google/event_scheduling/events/pubsub.py:52-57]()

### 异步Trigger架构

```mermaid
sequenceDiagram
    DAG Task ->> Trigger: 创建Trigger实例
    Trigger ->> MessageQueue: 订阅消息队列
    MessageQueue -->> Trigger: 事件到达
    Trigger ->> Triggerer: 触发条件满足
    Triggerer ->> DAG Task: 任务执行完成
```

## Provider发现机制

### 自动注册流程

当Airflow启动时，Providers Manager自动扫描并加载所有已安装的Provider：

```mermaid
graph LR
    A[启动Airflow] --> B[扫描Provider包]
    B --> C[加载Hook]
    B --> D[加载Operator]
    B --> E[加载Sensor]
    B --> F[注册Connection类型]
    C --> G[注册到UI]
    D --> G
    E --> G
    F --> G
```

## 主流Provider一览

### Amazon Provider (apache-airflow-providers-amazon)

提供与AWS服务的深度集成，包括：

- **计算服务**：EC2、Lambda、ECS
- **数据服务**：Glue、S3、Redshift、Athena
- **编排服务**：Step Functions、CloudFormation
- **机器学习**：SageMaker、Bedrock

资料来源：[providers/amazon/src/airflow/providers/amazon/aws/hooks/cloud_formation.py:18-25]()

### Google Provider (apache-airflow-providers-google)

集成Google Cloud Platform服务，包括BigQuery、GCS、Cloud Run等。

### Apache Kafka Provider

提供Kafka消息队列的Hook和Operator支持，用于流数据处理场景。

## 总结

Provider生态系统是Apache Airflow实现高度可扩展性的核心机制。通过标准化的组件接口、统一的生命周期管理和灵活的CLI集成，Provider使得Airflow能够优雅地连接各种外部系统。用户可以根据工作需求选择性地安装Provider，实现与云服务商、数据库、消息队列等数十种外部平台的深度集成。

---

---

## Doramagic 踩坑日志

项目：apache/airflow

摘要：发现 19 个潜在踩坑项，其中 1 个为 high/blocking；最高优先级：安装坑 - 来源证据：`ExternalTaskSensor` can succeed early for task groups with NULL task states。

## 1. 安装坑 · 来源证据：`ExternalTaskSensor` can succeed early for task groups with NULL task states

- 严重度：high
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：`ExternalTaskSensor` can succeed early for task groups with NULL task states
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_3c746f7ce44f43f1a5a81840f4ee741a | https://github.com/apache/airflow/issues/66877 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 2. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | github_repo:33884891 | https://github.com/apache/airflow | README/documentation is current enough for a first validation pass.

## 3. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | last_activity_observed missing

## 4. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium

## 5. 安全/权限坑 · 存在安全注意事项

- 严重度：medium
- 证据强度：source_linked
- 发现：No sandbox install has been executed yet; downstream must verify before user use.
- 对用户的影响：用户安装前需要知道权限边界和敏感操作。
- 建议检查：转成明确权限清单和安全审查提示。
- 防护动作：安全注意事项必须面向用户前置展示。
- 证据：risks.safety_notes | github_repo:33884891 | https://github.com/apache/airflow | No sandbox install has been executed yet; downstream must verify before user use.

## 6. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | github_repo:33884891 | https://github.com/apache/airflow | no_demo; severity=medium

## 7. 安全/权限坑 · 来源证据：Apache Airflow 3.1.6

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow 3.1.6
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_c94c5b79bb91454c9e0ad22b4a36dc11 | https://github.com/apache/airflow/releases/tag/3.1.6 | 来源讨论提到 docker 相关条件，需在安装/试用前复核。

## 8. 安全/权限坑 · 来源证据：Apache Airflow 3.1.7

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow 3.1.7
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_80a5614167b44ecca96422caa56afca4 | https://github.com/apache/airflow/releases/tag/3.1.7 | 来源讨论提到 docker 相关条件，需在安装/试用前复核。

## 9. 安全/权限坑 · 来源证据：Apache Airflow 3.1.8

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow 3.1.8
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_3437fcfc89ff40a0ba17b7e5a5d8aa2c | https://github.com/apache/airflow/releases/tag/3.1.8 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 10. 安全/权限坑 · 来源证据：Apache Airflow 3.2.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow 3.2.0
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_d18a498a98ed48f0bb3f813aaa554aea | https://github.com/apache/airflow/releases/tag/3.2.0 | 来源讨论提到 docker 相关条件，需在安装/试用前复核。

## 11. 安全/权限坑 · 来源证据：Apache Airflow 3.2.1

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow 3.2.1
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_41889eab2c0240bbb0bd010262d41346 | https://github.com/apache/airflow/releases/tag/3.2.1 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 12. 安全/权限坑 · 来源证据：Apache Airflow Ctl (airflowctl) 0.1.2

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow Ctl (airflowctl) 0.1.2
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_df5b495084624a899a4674d4b0193ec7 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.2 | 来源类型 github_release 暴露的待验证使用条件。

## 13. 安全/权限坑 · 来源证据：Apache Airflow Helm Chart 1.19.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow Helm Chart 1.19.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_0f5ecc599d634c1daa9ee6c9f2434849 | https://github.com/apache/airflow/releases/tag/helm-chart/1.19.0 | 来源类型 github_release 暴露的待验证使用条件。

## 14. 安全/权限坑 · 来源证据：Apache Airflow Helm Chart 1.20.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow Helm Chart 1.20.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_c13fe02283094cffa9e186716cc8dcf5 | https://github.com/apache/airflow/releases/tag/helm-chart/1.20.0 | 来源讨论提到 node 相关条件，需在安装/试用前复核。

## 15. 安全/权限坑 · 来源证据：Apache Airflow Helm Chart 1.21.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Apache Airflow Helm Chart 1.21.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_d29213d0ae2441e8907f407d8dc9b861 | https://github.com/apache/airflow/releases/tag/helm-chart/1.21.0 | 来源讨论提到 docker 相关条件，需在安装/试用前复核。

## 16. 安全/权限坑 · 来源证据：airflow-ctl/0.1.3

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：airflow-ctl/0.1.3
- 对用户的影响：可能影响授权、密钥配置或安全边界。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_9096523c7ba541c6ac1b6b926d6f6bc0 | https://github.com/apache/airflow/releases/tag/airflow-ctl/0.1.3 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 17. 安全/权限坑 · 来源证据：edge3: upgrade from 1.x silently leaves schema stale — `BaseDBManager.upgradedb` stamps `alembic_version_edge3` to head…

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：edge3: upgrade from 1.x silently leaves schema stale — `BaseDBManager.upgradedb` stamps `alembic_version_edge3` to head without running migrations
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_2bc80550929a49ddbce05c557bb225e0 | https://github.com/apache/airflow/issues/66524 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 18. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | issue_or_pr_quality=unknown

## 19. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | github_repo:33884891 | https://github.com/apache/airflow | release_recency=unknown

<!-- canonical_name: apache/airflow; human_manual_source: deepwiki_human_wiki -->
