# https://github.com/pathwaycom/pathway 项目说明书

生成时间：2026-05-16 20:44:44 UTC

## 目录

- [Pathway简介](#overview-introduction)
- [安装指南](#overview-installation)
- [基础示例](#quickstart-basic-example)
- [引擎架构](#arch-engine-overview)
- [Python与Rust集成](#arch-python-rust-integration)
- [连接器概述](#connectors-overview)
- [数据库连接器](#connectors-database)
- [流数据连接器](#connectors-streaming)
- [表操作](#trans-table-operations)
- [连接与聚合](#trans-joins-aggregations)

<a id='overview-introduction'></a>

## Pathway简介

### 相关页面

相关主题：[引擎架构](#arch-engine-overview), [安装指南](#overview-installation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)
- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
- [external/differential-dataflow/src/trace/implementations/ord.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/implementations/ord.rs)
- [external/timely-dataflow/timely/src/progress/subgraph.rs](https://github.com/pathwaycom/pathway/blob/main/external/timely-dataflow/timely/src/progress/subgraph.rs)
- [external/differential-dataflow/src/trace/wrappers/freeze.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/wrappers/freeze.rs)
- [src/connectors/postgres.rs](https://github.com/pathwaycom/pathway/blob/main/src/connectors/postgres.rs)
- [examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)
</details>

# Pathway简介

## 项目概述

Pathway是一个Python ETL框架，专为流处理（stream processing）、实时分析（real-time analytics）、LLM管道（LLM pipelines）和检索增强生成（RAG）场景设计。资料来源：[README.md:1]()

Pathway的核心优势在于其**易用的Python API**，允许用户无缝集成最喜欢的Python机器学习库。同时，Pathway代码具有极高的通用性和鲁棒性，可以同时用于开发和生产环境，有效处理批处理（batch）和流式数据（streaming data）。资料来源：[README.md:3]()

Pathway的一个显著特点是**代码一致性**：同一套代码可用于本地开发、CI/CD测试、运行批处理任务、处理流回放（stream replays）以及处理数据流。资料来源：[README.md:4]()

## 核心技术架构

Pathway的技术架构采用分层设计，融合了多种分布式计算范式：

```mermaid
graph TD
    A[Python API层] --> B[PyO3绑定层]
    B --> C[Rust执行引擎]
    C --> D[Timely Dataflow分布式计算]
    C --> E[Differential Dataflow增量计算]
    D --> F[增量数据流处理]
    E --> F
```

### Python API层

Pathway提供简洁的Python接口，通过PyO3实现Python与Rust的双向绑定。Python API层支持多种数据类型和连接器配置。资料来源：[src/python_api.rs:1-80]()

核心Python绑定类包括：

| 类名 | 功能说明 |
|------|----------|
| `PythonSubject` | Python主题定义，支持start、read、seek等回调 |
| `ValueField` | 值字段定义，包含名称、类型、来源和默认值 |
| `Table` | 数据表核心抽象 |
| `ConnectorProperties` | 连接器通用属性 |
| `CsvParserSettings` | CSV解析器配置 |
| `AwsS3Settings` | AWS S3连接配置 |

### Rust执行引擎

Pathway的执行引擎由Rust实现，基于Differential Dataflow和Timely Dataflow构建。Rust引擎负责高性能的数据处理和计算。资料来源：[README.md:5]()

引擎通过`pymodule`导出Python可调用的类和函数，实现Python语法的易用性与Rust的高性能完美结合。资料来源：[src/python_api.rs:100-150]()

### Timely Dataflow分布式计算

Pathway使用Timely Dataflow作为底层分布式计算框架。Timely Dataflow提供了数据流图的处理能力，支持并行和分布式计算。资料来源：[external/timely-dataflow/timely/src/progress/subgraph.rs:1-30]()

Timely Dataflow的核心组件包括：

```mermaid
graph LR
    A[数据输入] --> B[Scope子图]
    B --> C[PerOperatorState]
    C --> D[Antichain进度跟踪]
    D --> E[ChangeBatch消息]
    E --> F[数据输出]
```

关键数据结构：
- **Activations**：共享激活状态管理
- **MutableAntichain**：可变反链，用于跟踪进度边界
- **ChangeBatch**：变化批次，用于记录数据变化

### Differential Dataflow增量计算

Differential Dataflow是Pathway实现增量计算的核心技术。与传统批处理不同，Differential Dataflow能够智能处理数据的变化，仅重新计算受影响的部分。资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:1-30]()

#### 轨迹追踪器（Trace）

Pathway使用轨迹（Trace）来记录历史数据状态，支持时间旅行查询和增量更新。资料来源：[external/differential-dataflow/src/trace/wrappers/freeze.rs:1-30]()

```mermaid
graph TD
    A[数据变化] --> B[Trace记录]
    B --> C[时间戳管理]
    C --> D[Antichain since边界]
    D --> E[增量计算引擎]
    E --> F[输出更新]
```

#### 有序批处理

Pathway实现了高效的有序批处理结构`OrdKeyBatch`和`OrdValBatch`，用于存储键值对的时间序列数据。资料来源：[external/differential-dataflow/src/trace/implementations/ord.rs:1-50]()

| 组件 | 说明 |
|------|------|
| `OrdKeyCursor` | 有序键游标，用于遍历键数据 |
| `OrdValCursor` | 有序键值游标，用于遍历键值对 |
| `BatchContainer` | 批量容器接口 |
| `MergeBuilder` | 合并构建器 |

## 主要功能特性

### 连接器支持

Pathway提供丰富的连接器支持，覆盖多种数据源和数据格式：

| 连接器类型 | 支持的数据源 |
|------------|--------------|
| 文件系统 | CSV、Parquet等 |
| 云存储 | AWS S3、Azure Blob Storage |
| 数据库 | PostgreSQL（包括复制支持） |
| 搜索引擎 | Elasticsearch |
| 流处理 | MQTT、Kafka、Debezium |
| 其他 | Schema Registry、Iceberg |

资料来源：[src/python_api.rs:80-120]()

### PostgreSQL连接器

Pathway的PostgreSQL连接器支持完整的二进制协议，实现高效的数据读写：

- 支持多种数据类型映射（BOOL、INT、Float、TEXT、JSONB等）
- 二进制数组格式写入
- 复制槽（Replication Slot）支持逻辑复制
- Debezium CDC集成

资料来源：[src/connectors/postgres.rs:1-60]()

### 数据格式与解析

Pathway提供灵活的数据格式配置，包括：

```python
CsvParserSettings(
    name=field_name,
    type_=field_type,
    source=FieldSource.Payload,
    default=None
)
```

支持的数据类型包括：INT、FLOAT、STRING、BOOL、TIMESTAMP、JSON等。

### 持久化与状态管理

Pathway支持灵活的数据持久化配置：

| 配置项 | 说明 |
|--------|------|
| `PersistenceConfig` | 持久化基础配置 |
| `BackfillingThreshold` | 回填阈值配置 |
| `PySnapshotAccess` | 快照访问模式 |
| `PySnapshotEvent` | 快照事件类型 |

## 应用场景

### ETL管道

Pathway特别适合构建ETL（Extract-Transform-Load）管道，支持从多种数据源提取数据、进行转换处理、加载到目标系统。

```mermaid
graph LR
    A[数据源] -->|Extract| B[Pathway管道]
    B -->|Transform| C[增量处理]
    C -->|Load| D[目标系统]
```

### 实时分析

基于Differential Dataflow的增量计算能力，Pathway能够高效处理实时数据分析任务，仅处理数据流中的增量变化部分。

### LLM管道与RAG

Pathway为构建LLM（Large Language Model）管道和RAG（Retrieval-Augmented Generation）应用提供专用支持，能够实时索引和检索外部数据源。

## 开发与部署

### 开发环境

Pathway支持本地开发环境，开发者可以使用熟悉的Python工具链进行开发。资料来源：[examples/templates/el-pipeline/README.md:1-20]()

### 许可证

Pathway采用BSL（Business Source License）许可证发布，允许非商业使用和评估。商业使用需要获取正式许可证。资料来源：[README.md:45]()

### 许可证配置

部署时需要设置环境变量：

```bash
export PATHWAY_LICENSE_KEY=your_pathway_key
```

## 技术优势总结

| 优势 | 描述 |
|------|------|
| **高性能** | Rust引擎提供接近原生的执行效率 |
| **增量计算** | Differential Dataflow避免重复计算 |
| **统一API** | Python接口简化开发，Rust保证性能 |
| **多数据源** | 丰富的内置连接器支持 |
| **可扩展性** | 基于Timely Dataflow支持分布式部署 |
| **开发友好** | 支持本地开发和生产环境一致体验 |

---

<a id='overview-installation'></a>

## 安装指南

### 相关页面

相关主题：[基础示例](#quickstart-basic-example)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pyproject.toml](https://github.com/pathwaycom/pathway/blob/main/pyproject.toml)
- [rust-toolchain.toml](https://github.com/pathwaycom/pathway/blob/main/rust-toolchain.toml)
- [docs/2.developers/4.user-guide/10.introduction/20.installation.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/10.introduction/20.installation.md)
</details>

# 安装指南

Pathway 是一个基于 Python 的 ETL 框架，由 Rust 引擎驱动，支持流处理和实时分析。本指南将帮助你在不同环境下正确安装和配置 Pathway。

## 系统要求

### 硬件和操作系统要求

| 要求类型 | 最低配置 | 推荐配置 |
|---------|---------|---------|
| 处理器 | 2 核 | 4 核或以上 |
| 内存 | 4 GB | 8 GB 或以上 |
| 磁盘空间 | 500 MB | 1 GB 或以上 |
| 操作系统 | Linux/macOS/Windows | Linux/macOS |

### Python 版本要求

Pathway 需要 Python 3.10 或更高版本。建议使用 Python 3.11 以获得最佳性能。

| Python 版本 | 支持状态 |
|------------|---------|
| 3.10 | ✅ 支持 |
| 3.11 | ✅ 推荐 |
| 3.12 | ✅ 支持 |

## 安装方式

### 使用 pip 安装

通过 pip 包管理器安装 Pathway 是最简单的方式：

```bash
pip install pathway
```

### 从源码安装

对于需要最新功能或进行开发调试的用户，可以从源码编译安装：

```bash
# 克隆仓库
git clone https://github.com/pathwaycom/pathway.git
cd pathway

# 安装 Rust 依赖和 Python 包
pip install -e .
```

## 依赖环境

### Rust 工具链

Pathway 的核心引擎使用 Rust 编写，需要 Rust 工具链支持。

#### 安装 Rust

```bash
# Linux/macOS
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Windows - 使用 rustup 安装程序
winget install Rustlang.Rustup
```

#### 验证 Rust 安装

```bash
rustc --version
cargo --version
```

项目使用 `rust-toolchain.toml` 文件指定 Rust 版本，确保编译器版本一致性。

### Python 依赖

Pathway 的 Python 依赖在 `pyproject.toml` 中定义，主要包括：

| 依赖包 | 版本要求 | 用途 |
|-------|---------|------|
| pyo3 | 最新稳定版 | Python 绑定 |
| timely-dataflow | 外部依赖 | 数据流处理 |
| differential-dataflow | 外部依赖 | 增量计算 |

## 环境配置

### 环境变量

Pathway 运行可能需要的可选环境变量：

```bash
# 设置 Pathway 许可证密钥（商业功能需要）
export PATHWAY_LICENSE_KEY=your_pathway_key

# 可选：设置 Python 路径
export PYTHONPATH=/path/to/pathway:$PYTHONPATH
```

### 验证安装

安装完成后，可以通过以下方式验证：

```python
import pathway as pw

# 打印版本信息
print(pw.__version__)

# 创建一个简单的数据流测试
input_session = pw.demo.demo_reduce_operator(show_progress=False)
print("Pathway 安装成功！")
```

## 平台特定说明

### Linux

Linux 环境需要确保以下系统包已安装：

```bash
# Debian/Ubuntu
sudo apt-get update
sudo apt-get install -y python3-dev python3-pip cargo
```

### macOS

macOS 用户建议使用 Homebrew 安装依赖：

```bash
brew install python rust
pip3 install pathway
```

### Windows

Windows 用户需要安装 Visual C++ Build Tools 以编译 Rust 依赖：

1. 下载并安装 [Visual Studio Build Tools](https://visualstudio.microsoft.com/visual-cpp-build-tools/)
2. 确保安装 "C++ 生成工具" 工作负载
3. 通过 PowerShell 或命令提示符安装 Pathway

## 常见问题

### 编译错误

如果遇到编译错误，确保 Rust 工具链是最新版本：

```bash
rustup update
cargo clean
pip install --force-reinstall pathway
```

### 导入错误

如果遇到 `ModuleNotFoundError`，检查 Python 路径和安装状态：

```bash
pip show pathway
python -c "import pathway; print(pathway.__file__)"
```

## 下一步

安装完成后，你可以开始使用 Pathway：

1. 阅读[快速开始指南](./quick-start.md)了解基本用法
2. 查看[用户指南](../user-guide/introduction/welcome)深入学习
3. 参考 [API 文档](https://pathway.com/developers/api-docs/pathway)了解完整功能

---

<a id='quickstart-basic-example'></a>

## 基础示例

### 相关页面

相关主题：[表操作](#trans-table-operations), [连接器概述](#connectors-overview)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md)
- [examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md)
- [examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
</details>

# 基础示例

Pathway 是一个 Python ETL 框架，用于流处理、实时分析、LLM 管道和 RAG（检索增强生成）应用开发。本文档介绍 Pathway 的基础概念、核心组件和入门示例。

## 概述

Pathway 由 Python 前端和 Rust 后端引擎组成，通过 PyO3 实现 Python 与 Rust 的互操作。Python API 提供了简洁易用的接口，而 Rust 引擎基于 Differential Dataflow 进行增量计算，确保高性能和一致性。

资料来源：[README.md:1-15]()

## 核心概念

### 数据流架构

```mermaid
graph TD
    A[数据源 Connectors] --> B[PythonSubject 输入]
    B --> C[Rust 引擎处理]
    C --> D[增量计算 Differential Dataflow]
    D --> E[PyExportedTable 输出]
    E --> F[快照 Snapshot]
```

Pathway 的核心数据流遵循：数据源 → PythonSubject → Rust 引擎 → 增量计算 → ExportedTable 的模式。

资料来源：[src/python_api.rs:35-45]()

### PythonSubject 核心类

`PythonSubject` 是 Python 端的数据输入接口，用于定义数据源的读取行为：

| 参数 | 类型 | 说明 |
|------|------|------|
| `start` | `Py<PyAny>` | 起始处理函数 |
| `read` | `Py<PyAny>` | 读取数据函数 |
| `seek` | `Py<PyAny>` | 定位函数 |
| `on_persisted_run` | `Py<PyAny>` | 持久化运行回调 |
| `end` | `Py<PyAny>` | 结束处理函数 |
| `is_internal` | `bool` | 是否为内部数据源 |
| `deletions_enabled` | `bool` | 是否启用删除 |

资料来源：[src/python_api.rs:25-33]()

### ValueField 模式字段

`ValueField` 定义表的 schema 字段结构：

| 属性 | 类型 | 说明 |
|------|------|------|
| `name` | `String` | 字段名称 |
| `type_` | `Type` | 数据类型 |
| `source` | `FieldSource` | 字段来源（默认 Payload） |
| `default` | `Option<Value>` | 默认值 |
| `metadata` | `Option<String>` | 元数据 |

资料来源：[src/python_api.rs:52-61]()

### PyExportedTable 导出表

`PyExportedTable` 提供数据输出和快照功能：

| 方法 | 返回类型 | 说明 |
|------|----------|------|
| `frontier()` | `TotalFrontier<Timestamp>` | 获取当前前沿 |
| `snapshot_at(frontier)` | `Vec<(Key, Vec<Value>)>` | 在指定前沿获取快照 |
| `failed()` | `bool` | 检查处理是否失败 |

资料来源：[src/python_api.rs:106-115]()

### Done 标记值

`Done` 是时间线末端的标记值，用于表示计算完成状态。它是一个冻结的 Python 对象，具有特殊的比较和哈希行为：

- 与 `Done` 比较时返回 `Equal`
- 与整数比较时返回 `Greater`
- 支持自定义哈希实现

资料来源：[src/python_api.rs:176-196]()

## 时间戳与前沿

### Timestamp 时间戳

Pathway 使用 `Timestamp` 进行时间管理，实现 `OriginalOrRetraction` 和 `NextRetractionTime` trait：

```mermaid
graph LR
    A[原始数据] --> B[偶数时间戳]
    A --> C[ retraction 数据]
    C --> D[奇数时间戳]
    D --> E[下一 retraction 时间]
```

- `is_original()`: 偶数时间戳表示原始数据
- `next_retraction_time()`: 计算下一个 retraction 时间（当前时间 + 1）

资料来源：[src/engine/timestamp.rs:45-55]()

### TotalFrontier 前沿状态

`TotalFrontier<T>` 是表示计算前沿的枚举类型：

| 变体 | 说明 |
|------|------|
| `At(T)` | 在指定时间点 |
| `Done` | 计算已完成 |

该类型实现了 `FromPyObject` 和 `IntoPyObject`，支持 Python 和 Rust 间的无缝转换。

资料来源：[src/python_api.rs:140-175]()

## 基础使用模式

### 典型项目结构

```
project/
├── app.py              # 主程序
├── data/               # 数据目录
│   ├── docs/           # 文档输入
│   └── output/         # 结果输出
├── .env                # 环境变量
└── requirements.txt    # 依赖
```

资料来源：[examples/projects/question-answering-rag/README.md:1-30]()

### 环境配置

安装 Pathway：

```bash
pip install pathway[xpack-llm]
```

设置环境变量：

```bash
export PATHWAY_LICENSE_KEY=your_pathway_key
```

或创建 `.env` 文件：

```
OPENAI_API_KEY=your_openai_api_key_here
```

资料来源：[examples/projects/question-answering-rag/README.md:25-30]()

### 运行应用

直接运行 Python 脚本：

```bash
python app.py
```

使用 Pathway CLI：

```bash
pathway spawn python main.py
```

多线程运行：

```bash
pathway spawn --threads 3 python main.py
```

资料来源：[README.md:120-135]()

### HTTP 服务集成

Pathway 提供 HTTP 服务器用于接收查询和返回结果：

```python
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)
```

提交查询：

```bash
curl --data '{ "messages": "What is the value of X?"}' http://localhost:8011
```

资料来源：[examples/projects/question-answering-rag/README.md:15-20]()

## 注册的 Python API 类

Pathway 通过 `add_class` 注册以下核心类到 `pathway.engine` 模块：

| 类别 | 类名 |
|------|------|
| 连接器设置 | `AwsS3Settings`, `AzureBlobStorageSettings`, `MqttSettings`, `PsqlReplicationSettings` |
| 解析器 | `CsvParserSettings`, `PySchemaRegistrySettings` |
| 存储格式 | `DataStorage`, `DataFormat`, `PersistenceConfig` |
| 索引 | `IcebergCatalogSettings`, `ElasticSearchParams` |
| 内部类 | `PythonSubject`, `ValueField`, `PyExportedTable`, `Done`, `Trace` |
| 配置 | `ConnectorProperties`, `ColumnProperties`, `TableProperties`, `TelemetryConfig` |
| 快照 | `PySnapshotAccess`, `PySnapshotEvent`, `PyPersistenceMode` |
| 向量索引 | `PyExternalIndexFactory`, `PyExternalIndexData`, `PyExternalIndexQuery` |

资料来源：[src/python_api.rs:220-250]()

## 文档与支持

完整文档访问：[pathway.com/developers/](https://pathway.com/developers/user-guide/introduction/welcome)

如有问题，可通过以下方式获取帮助：

- [GitHub Issues](https://github.com/pathwaycom/pathway/issues)
- [Discord 社区](https://discord.com/invite/pathway)
- 邮箱：contact@pathway.com

资料来源：[examples/templates/el-pipeline/README.md:45-50]()

## 下一步

- 学习 Pathway 的连接器配置
- 了解实时数据处理的高级特性
- 探索 RAG 和 LLM 集成的最佳实践
- 查看 `examples/projects/` 目录下的完整示例项目

---

<a id='arch-engine-overview'></a>

## 引擎架构

### 相关页面

相关主题：[Python与Rust集成](#arch-python-rust-integration)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/engine/dataflow.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/dataflow.rs)
- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
- [external/differential-dataflow/src/operators/join.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/join.rs)
- [external/differential-dataflow/src/operators/reduce.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/reduce.rs)
- [external/differential-dataflow/src/trace/wrappers/freeze.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/wrappers/freeze.rs)
- [external/differential-dataflow/src/operators/arrange/writer.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/writer.rs)
- [external/timely-dataflow/timely/src/progress/change_batch.rs](https://github.com/pathwaycom/pathway/blob/main/external/timely-dataflow/timely/src/progress/change_batch.rs)
</details>

# 引擎架构

## 概述

Pathway 的引擎架构是一个基于 **Rust 的高性能计算引擎**，构建于 Differential Dataflow 和 Timely Dataflow 之上。这一架构使得 Pathway 能够以 Python API 的形式提供简洁易用的接口，同时在底层实现增量计算和流式处理。引擎的核心设计理念是将 Python 层的灵活表达与 Rust 引擎的高性能执行相结合，通过 PyO3 实现 Python 与 Rust 之间的无缝桥接。

Pathway 引擎支持多种数据类型和处理模式，包括静态数据（batch）和动态数据（streaming），能够在同一个代码库中处理历史数据和实时数据流。引擎内置的增量计算机制确保只有发生变化的数据才会被重新处理，从而大幅提升计算效率。

## 核心架构组件

### 计算引擎层次结构

Pathway 引擎采用三层架构设计，从上到下依次为：

| 层次 | 组件 | 职责 |
|------|------|------|
| Python API 层 | `pathway.engine` 模块 | 提供 Python 接口，暴露核心类和函数 |
| PyO3 绑定层 | `src/python_api.rs` | 实现 Python 对象到 Rust 对象的转换 |
| Rust 核心层 | `src/engine/` | 实现 Differential Dataflow 计算引擎 |

资料来源：[src/python_api.rs:1-50]()

### 核心类结构

Pathway 引擎通过 PyO3 将以下核心 Rust 结构暴露给 Python：

```python
# Python API 层暴露的核心类
Table           # 数据表，表示一组带时间戳的记录
Column          # 列，表示数据表中的一列
Universe        # 宇宙，表示一组唯一标识符
Context         # 上下文，管理数据流计算的执行环境
Scope           # 作用域，定义计算的数据流范围
DataRow         # 数据行，表示表中的一行数据
Computer        # 计算器，定义如何计算派生列
```

资料来源：[src/python_api.rs:85-110]()

## 数据流计算模型

### Differential Dataflow 集成

Pathway 引擎的核心计算模型基于 **Differential Dataflow**，这是一种用于增量数据流计算的编程模型。Differential Dataflow 能够高效处理数据的插入、更新和删除操作，通过追踪数据变化而非重新计算整个结果来实现高性能。

```mermaid
graph TD
    A[输入数据流] --> B[Input Session]
    B --> C[Differential Collection]
    C --> D[Arrangement]
    D --> E[Operators<br/>join/reduce/count]
    E --> F[输出 Collection]
    F --> G[Incremental 结果]
    
    H[变化追踪] --> C
```

资料来源：[external/differential-dataflow/src/operators/reduce.rs:1-50]()

### 时间戳与版本管理

Pathway 使用复合时间戳来追踪数据变化，每个数据记录都带有时间戳和差异值（multiplicity）。时间戳必须实现 `Lattice` trait，支持部分有序比较，这使得系统能够正确处理并发和乱序数据。

```rust
// 时间戳必须是 Lattice，用于支持增量计算的合并
impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G, T1>
where
    G: Scope,
    G::Timestamp: Lattice+Ord+Debug,
    // ...
```

资料来源：[external/differential-dataflow/src/operators/join.rs:1-30]()

### 数据流图结构

Pathway 引擎内部维护一个数据流图（Dataflow Graph），用于协调各个算子之间的数据传递：

```mermaid
graph LR
    subgraph DataflowGraphInner
        A[BeforeIterate] --> B[Scope]
        B --> C[Child Scope]
        C --> D[Operators]
        D --> E[AfterIterate]
    end
    
    F[持久化数据] --> A
    A --> G[表创建]
    G --> D
```

资料来源：[src/engine/dataflow.rs:1-60]()

## 核心算子实现

### Join 算子

Join 是 Pathway 中最常用的算子之一，用于合并两个数据集。引擎使用 `join_core_internal_unsafe` 方法实现高效的连接操作：

```rust
// Join 操作的核心实现
fn join_core<Tr2,I,L>(&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
    Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
    // ... 配置连接参数
{
    self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}
```

资料来源：[external/differential-dataflow/src/operators/join.rs:20-35]()

### Reduce 算子

Reduce 算子用于对数据进行聚合操作，支持自定义归约器实现：

```rust
impl<S: MaybeTotalScope, R: ReducerImpl> DataflowReducer<S> for R
where
    Collection<S, (Key, Option<<R as ReducerImpl>::State>)>:
        Into<PersistableCollection<S>> + From<PersistableCollection<S>>,
{
    fn reduce(
        self: Rc<Self>,
        values: &Collection<S, (Key, Key, Vec<Value>)>,
        error_logger: Rc<dyn LogError>,
        _trace: Trace,
        graph: &mut DataflowGraphInner<S>,
    ) -> Result<Values<S>> {
        // 实现聚合逻辑
    }
}
```

资料来源：[src/engine/dataflow.rs:80-110]()

### Count 算子

Count 是 Differential Dataflow 内置的聚合操作，用于计算每个键的出现次数：

```rust
// Count 扩展 trait 实现
pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
    fn count(&self) -> Collection<G, (K, R), isize> {
        self.count_core()
    }
    
    fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
        self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(
            "Count", 
            |_k, s, t| t.push((s[0].1.clone(), R2::from(1i8)))
        ).as_collection(|k, c| (k.clone(), c.clone()))
    }
}
```

资料来源：[external/differential-dataflow/src/operators/reduce.rs:30-80]()

## Arrangement 数据结构

### Arrangement 概念

Arrangement 是 Differential Dataflow 中的核心数据结构，用于组织和索引数据以支持高效的后续操作。Arrangement 本质上是一个按 key 排序的 trace 结构：

```mermaid
graph TD
    A[输入数据] --> B[ArrangeByKey]
    B --> C[Trace Agent]
    C --> D[Trace Spine]
    D --> E[多个 Batch]
    
    F[查询] --> G[Cursor 遍历]
    G --> D
```

资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:1-40]()

### Trace Writer 实现

Trace Writer 负责向 arrangement 中写入数据批次：

```rust
pub fn new(
    upper: Vec<Tr::Time>,
    trace: Weak<RefCell<TraceBox<Tr>>>,
    queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self {
    let mut temp = Antichain::new();
    temp.extend(upper.into_iter());
    Self { upper: temp, trace, queues }
}

pub fn exert(&mut self, fuel: &mut isize) {
    if let Some(trace) = self.trace.upgrade() {
        trace.borrow_mut().trace.exert(fuel);
    }
}
```

资料来源：[external/differential-dataflow/src/operators/arrange/writer.rs:1-40]()

### Trace Freeze 包装器

Trace Freeze 是一个特殊的 trace 包装器，用于在特定条件下冻结时间：

```rust
impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
where
    Tr: TraceReader,
    Tr::Time: Lattice+Clone+'static,
    F: Fn(&Tr::Time)->Option<Tr::Time>+'static,
{
    type Key = Tr::Key;
    type Val = Tr::Val;
    type Time = Tr::Time;
    type R = Tr::R;
    
    type Batch = BatchFreeze<Tr::Batch, F>;
    type Cursor = CursorFreeze<Tr::Cursor, F>;
}
```

资料来源：[external/differential-dataflow/src/trace/wrappers/freeze.rs:1-50]()

## 增量计算机制

### 变化批次处理

Pathway 使用 `ChangeBatch` 来追踪数据的变化，每个变化包含时间戳和差异值：

```rust
pub fn into_inner(mut self) -> Vec<(T, i64)> {
    self.compact();
    self.updates
}

pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
    self.compact();
    self.updates.iter()
}
```

资料来源：[external/timely-dataflow/timely/src/progress/change_batch.rs:60-90]()

### 持久化与状态管理

引擎支持对计算状态进行持久化，通过 `PersistenceConfig` 配置持久化行为：

| 配置项 | 类型 | 说明 |
|--------|------|------|
| `persistence_mode` | `PyPersistenceMode` | 持久化模式（禁用/异步/同步） |
| `snapshot_access` | `PySnapshotAccess` | 快照访问策略 |
| `snapshot_event` | `PySnapshotEvent` | 快照触发事件 |

资料来源：[src/python_api.rs:1-30]()

## Python 绑定实现

### PyO3 类导出

Pathway 通过 PyO3 将 Rust 结构导出为 Python 类：

```rust
#[pymethods]
impl ValueField {
    #[new]
    #[pyo3(signature = (name, type_, source = FieldSource::Payload))]
    pub fn new(name: String, type_: Type, source: FieldSource) -> Self {
        Self {
            name,
            type_,
            source,
        }
    }
}
```

资料来源：[src/python_api.rs:140-160]()

### PythonSubject 绑定

PythonSubject 允许 Python 代码定义自定义的数据源：

```rust
#[pyclass(module = "pathway.engine")]
pub struct PythonSubject {
    pub start: Py<PyAny>,
    pub read: Py<PyAny>,
    pub seek: Py<PyAny>,
    pub on_persisted_run: Py<PyAny>,
    pub end: Py<PyAny>,
    pub is_internal: bool,
    pub deletions_enabled: bool,
}
```

资料来源：[src/python_api.rs:95-130]()

## 数据模型

### 表结构

| 组件 | 说明 |
|------|------|
| `TableProperties` | 表级别的属性配置 |
| `ColumnProperties` | 列级别的属性配置 |
| `ConnectorProperties` | 连接器配置 |
| `Trace` | 追踪配置 |
| `Done` | 完成标记 |

资料来源：[src/python_api.rs:25-40]()

### 值字段定义

```rust
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}
```

资料来源：[src/python_api.rs:145-165]()

## 关键配置选项

### 连接器模式

| 模式 | 说明 |
|------|------|
| `STATIC` | 静态数据源，读取一次 |
| `STREAMING` | 流式数据源，持续监听 |
| `REPLAY` | 回放模式，重演历史数据 |

### 监控级别

| 级别 | 说明 |
|------|------|
| `OFF` | 禁用监控 |
| `BASIC` | 基础指标 |
| `DETAILED` | 详细指标 |

资料来源：[src/python_api.rs:85-100]()

## 工作流程图

```mermaid
graph TD
    A[Python 代码定义数据流] --> B[PyO3 调用 Rust 引擎]
    B --> C[创建 DataflowGraph]
    C --> D[初始化 Scope 和 Universe]
    D --> E[注册数据源和算子]
    E --> F[启动数据流执行]
    
    F --> G{数据输入模式}
    G -->|批量| H[Batch Input]
    G -->|流式| I[Stream Input]
    
    H --> J[计算 operators]
    I --> J
    
    J --> K[生成 Arrangement]
    K --> L[输出结果]
    
    J --> M{状态变化}
    M -->|有变化| N[触发增量计算]
    M -->|无变化| O[跳过重算]
    N --> K
```

## 总结

Pathway 的引擎架构通过深度整合 Differential Dataflow 和 Timely Dataflow，实现了一个高性能、可扩展的增量计算引擎。核心设计特点包括：

1. **双层语言架构**：Python API 提供易用性，Rust 引擎保证性能
2. **增量计算模型**：通过时间戳和差异值追踪变化，避免重复计算
3. **灵活的 Arrangement**：支持高效的 join、reduce 等操作
4. **完整的持久化支持**：内置状态管理和快照机制
5. **丰富的连接器**：支持多种数据源和输出目标

这种架构使 Pathway 能够同时满足开发和生产环境的需求，支持从本地开发到大规模分布式部署的多种场景。

---

<a id='arch-python-rust-integration'></a>

## Python与Rust集成

### 相关页面

相关主题：[引擎架构](#arch-engine-overview)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [external/differential-dataflow/src/trace/wrappers/freeze.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/wrappers/freeze.rs)
- [external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md](https://github.com/pathwaycom/pathway/blob/main/external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md)
</details>

# Python与Rust集成

Pathway是一个基于Python的ETL框架，但其核心计算引擎由Rust实现。这种架构通过PyO3库实现Python与Rust的无缝集成，使得开发者可以使用简洁的Python API编写数据处理逻辑，同时享受Rust带来的高性能和内存安全性。

## 技术架构概述

Pathway采用双层架构设计：上层是用户编写的Python代码，下层是Rust实现的增量计算引擎。Differential Dataflow算法驱动核心数据流处理，而Timely Dataflow提供分布式计算能力。

```mermaid
graph TD
    A[Python用户代码] --> B[PyO3绑定层]
    B --> C[Rust引擎核心]
    C --> D[Differential Dataflow]
    C --> E[Timely Dataflow]
    D --> F[增量计算]
    E --> G[分布式数据流]
    F --> H[结果返回Python]
    G --> H
```

## PyO3绑定机制

Pathway使用PyO3库实现Python与Rust之间的双向通信。PyO3提供了从Rust向Python暴露类型和函数的能力，同时也支持从Python代码调用Rust函数。

### 核心数据结构绑定

#### PythonSubject结构体

`PythonSubject`是Pathway中用于连接外部数据源的关键结构体，它将Python端的回调函数包装为Rust可调用的对象：

```rust
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
    pub start: Py<PyAny>,
    pub read: Py<PyAny>,
    pub seek: Py<PyAny>,
    pub on_persisted_run: Py<PyAny>,
    pub end: Py<PyAny>,
    pub is_internal: bool,
    pub deletions_enabled: bool,
}
```

资料来源：[src/python_api.rs:1-15]()

| 参数 | 类型 | 说明 |
|------|------|------|
| `start` | `Py<PyAny>` | 起始位置回调函数 |
| `read` | `Py<PyAny>` | 读取数据回调函数 |
| `seek` | `Py<PyAny>` | 定位回调函数 |
| `on_persisted_run` | `Py<PyAny>` | 持久化运行回调 |
| `end` | `Py<PyAny>` | 结束位置回调 |
| `is_internal` | `bool` | 是否为内部数据源 |
| `deletions_enabled` | `bool` | 是否启用删除操作 |

#### ValueField结构体

`ValueField`用于定义表字段的元数据，包括字段名、类型、来源和默认值：

```rust
#[pyclass(module = "pathway.engine", frozen)]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}
```

资料来源：[src/python_api.rs:36-48]()

### PyO3类型转换

Pathway实现了Python与Rust之间的自动类型转换，通过`IntoPyObject`和`FromPyObject`trait实现。

#### Timestamp类型转换

```rust
impl<'py> IntoPyObject<'py> for Timestamp {
    type Target = PyAny;
    type Output = Bound<'py, Self::Target>;
    type Error = PyErr;
    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
        self.0.into_bound_py_any(py)
    }
}
```

资料来源：[src/engine/timestamp.rs:40-50]()

#### TotalFrontier枚举转换

`TotalFrontier`表示计算的前沿边界，包含`At`和`Done`两种状态：

```rust
impl<'py, T> FromPyObject<'py> for TotalFrontier<T>
where
    T: FromPyObject<'py>,
{
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        if ob.is_instance_of::<Done>() {
            Ok(TotalFrontier::Done)
        } else {
            Ok(TotalFrontier::At(ob.extract()?))
        }
    }
}
```

资料来源：[src/python_api.rs:180-192]()

## 错误处理机制

Pathway通过PyO3暴露多种异常类型供Python端捕获和处理。

### 异常类型体系

| 异常类型 | 说明 | 使用场景 |
|----------|------|----------|
| `EngineError` | 基础引擎错误 | 通用错误处理 |
| `EngineErrorWithTrace` | 带调用栈的错误 | 调试和问题定位 |
| `OtherWorkerError` | 其他工作进程错误 | 分布式环境错误处理 |
| `Error` | 通用错误类型 | 自定义错误 |
| `Pending` | 操作未完成状态 | 异步操作状态查询 |

资料来源：[src/python_api.rs:200-230]()

## 表导出与快照机制

### PyExportedTable导出表

`PyExportedTable`允许Python代码访问Rust引擎计算的表数据：

```rust
#[pyclass(module = "pathway.engine", frozen, name = "ExportedTable")]
pub struct PyExportedTable {
    inner: Arc<dyn ExportedTable>,
}

#[pymethods]
impl PyExportedTable {
    fn frontier(&self) -> TotalFrontier<Timestamp> {
        self.inner.frontier()
    }

    fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
        self.inner.snapshot_at(frontier)
    }

    fn failed(&self) -> bool {
        self.inner.failed()
    }
}
```

资料来源：[src/python_api.rs:115-135]()

### Done标记类型

`Done`是一个特殊的冻结类型，用于表示计算已完成的状态：

```rust
mod done {
    use once_cell::sync::Lazy;
    use pyo3::prelude::*;

    #[pyclass(module = "pathway.engine", frozen)]
    pub struct Done(InnerDone);

    pub static DONE: Lazy<Py<Done>> = Lazy::new(|| {
        Python::with_gil(|py| Py::new(py, Done(InnerDone)).expect("creating DONE should not fail"))
    });
}
```

资料来源：[src/python_api.rs:165-177]()

## 计算上下文

### Context作用域管理

`Context`结构体提供了当前计算行的上下文信息：

```rust
#[pyclass(module = "pathway.engine", frozen)]
pub struct Context(SendWrapper<ScopedContext>);

#[pymethods]
impl Context {
    #[getter]
    fn this_row(&self) -> PyResult<Key> {
        self.0
            .with(|context| context.this_row())
            .ok_or_else(|| PyValueError::new_err("context out of scope"))
    }
}
```

资料来源：[src/python_api.rs:235-250]()

### Computer计算器

`Computer`用于执行用户定义的Python函数：

```rust
#[pyclass]
pub struct Computer {
    pub fun: Py<PyAny>,
    pub dtype: Py<PyAny>,
    pub is_output: bool,
    pub is_method: bool,
    pub universe: Py<Universe>,
    pub data: Value,
    pub data_column: Option<Py<Column>>,
}

#[pymethods]
impl Computer {
    #[staticmethod]
    #[pyo3(signature = (fun, dtype, is_output, is_method, universe, data = Value::None, data_column = None))]
    pub fn from_raising_fun(
        py: Python,
        fun: Py<PyAny>,
        dtype: Py<PyAny>,
        is_output: bool,
        is_method: bool,
        universe: Py<Universe>,
        data: Value,
        data_column: Option<Py<Column>>,
    ) -> PyResult<Py<Self>> { ... }
}
```

资料来源：[src/python_api.rs:260-290]()

## Differential Dataflow集成

### Arrangement导入机制

Pathway通过Differential Dataflow的arrangement机制在数据流之间共享计算结果：

```rust
/// Same as `import`, but allows to name the source.
pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
    G: Scope<Timestamp=Tr::Time>,
    Tr::Time: Timestamp,
{
    // Drop ShutdownButton and return only the arrangement.
    self.import_core(scope, name).0
}
```

资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:35-45]()

### Trace Freeze包装器

`TraceFreeze`包装器允许延迟计算trace直到被使用时才执行：

```rust
pub struct TraceFreeze<Tr, F>
where
    Tr: TraceReader,
    Tr::Time: Lattice+Clone+'static,
    F: Fn(&Tr::Time)->Option<Tr::Time>,
{
    trace: Tr,
    func: Rc<F>,
}
```

资料来源：[external/differential-dataflow/src/trace/wrappers/freeze.rs:15-21]()

## 时间戳与顺序语义

### Timestamp特征实现

Pathway实现了Differential Dataflow所需的时间戳特征：

```rust
impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}

impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}
```

资料来源：[src/engine/timestamp.rs:60-70]()

### 时间戳顺序关系

Timestamp支持时间戳间的偏序关系计算：

```rust
pub fn followed_by(&self, other: &Self) -> Option<Self> {
    self.0.followed_by(&other.0).map(Self)
}
```

## 注册的Python类

Pathway通过`add_class`方法将以下Rust结构体暴露给Python：

| 类名 | 功能 |
|------|------|
| `AwsS3Settings` | AWS S3连接配置 |
| `AzureBlobStorageSettings` | Azure Blob存储配置 |
| `ElasticSearchParams` | Elasticsearch参数 |
| `CsvParserSettings` | CSV解析设置 |
| `DataStorage` | 数据存储配置 |
| `DataFormat` | 数据格式定义 |
| `PersistenceConfig` | 持久化配置 |
| `PythonSubject` | Python数据源主题 |
| `MqttSettings` | MQTT连接设置 |
| `IcebergCatalogSettings` | Iceberg目录配置 |
| `ConnectorProperties` | 连接器属性 |
| `ColumnProperties` | 列属性 |
| `TableProperties` | 表属性 |
| `PyExportedTable` | 导出表 |
| `PyExternalIndexFactory` | 外部索引工厂 |
| `PyUSearchMetricKind` | USearch度量类型 |
| `PyBruteForceKnnMetricKind` | 暴力KNN度量类型 |

资料来源：[src/python_api.rs:300-330]()

## 数据流处理流程

```mermaid
sequenceDiagram
    participant Python用户代码
    participant PyO3绑定层
    participant Rust引擎
    participant DifferentialDataflow
    participant TimelyDataflow
    
    Python用户代码->>PyO3绑定层: 调用Python API
    PyO3绑定层->>Rust引擎: 传递计算请求
    Rust引擎->>DifferentialDataflow: 创建数据流操作符
    DifferentialDataflow->>TimelyDataflow: 分布式执行
    TimelyDataflow-->>DifferentialDataflow: 返回计算结果
    DifferentialDataflow-->>Rust引擎: 增量更新结果
    Rust引擎-->>PyO3绑定层: 转换结果类型
    PyO3绑定层-->>Python用户代码: 返回Python对象
```

## 多工作进程支持

Pathway支持通过Timely Dataflow的多工作进程实现并行计算。可以通过命令行参数指定工作进程数量：

```bash
cargo run --example wordcount -- -w2
```

这将启动两个工作进程并行处理数据，每个进程独立消费输入数据并产生部分结果。

资料来源：[external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md:1-25]()

## 总结

Pathway的Python与Rust集成架构充分发挥了两种语言的优势：

- **Python端**：提供简洁的声明式API，开发者使用Python编写数据处理逻辑
- **Rust端**：实现高性能的增量计算引擎，基于Differential Dataflow提供高效的流式处理
- **PyO3**：作为桥梁，实现Python与Rust之间的无缝类型转换和函数调用
- **Timely Dataflow**：提供分布式计算能力，支持多工作进程并行处理

---

<a id='connectors-overview'></a>

## 连接器概述

### 相关页面

相关主题：[数据库连接器](#connectors-database), [流数据连接器](#connectors-streaming)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs) — Python API 绑定定义，包括连接器相关的 Rust 类型映射
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md) — 项目总体介绍
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md) — RAG 示例项目
- [examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md) — 多智能体 RAG 示例
- [examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md) — 流水线模板文档

</details>

# 连接器概述

## 1. 概述

Pathway 是一个 Python ETL 框架，专为流处理、实时分析、LLM 管道和 RAG（检索增强生成）而设计 [资料来源：README.md](README.md)。连接器（Connector）是 Pathway 数据管道中的核心组件，负责数据的输入与输出操作。

Pathway 连接器系统具有以下特点：

- **多源数据支持**：支持文件系统、数据库、云存储、消息队列等多种数据源
- **实时与批处理**：同一套 API 支持静态批处理和流式实时处理
- **Rust 引擎驱动**：底层由 Rust 编写的可扩展引擎驱动，基于 Differential Dataflow 实现增量计算
- **Python 友好**：通过 Python API 提供简洁易用的接口

## 2. 连接器架构

### 2.1 核心组件关系

```mermaid
graph TD
    subgraph "Python API 层"
        IO["pw.io 模块"]
        DS["DataSource"]
        DK["DataSink"]
    end
    
    subgraph "Rust 引擎层"
        CP["ConnectorProperties"]
        CM["ConnectorMode"]
        Engine["Pathway Engine"]
    end
    
    subgraph "数据源类型"
        S3["AWS S3"]
        Azure["Azure Blob"]
        CSV["CSV Parser"]
        MQTT["MQTT"]
        Kafka["Kafka/Confluent"]
    end
    
    IO --> DS
    IO --> DK
    DS --> CP
    DK --> CP
    CP --> CM
    CM --> Engine
    S3 --> CP
    Azure --> CP
    CSV --> CP
    MQTT --> CP
```

### 2.2 连接器模式

Pathway 支持两种连接器模式，用于区分数据处理方式 [资料来源：src/python_api.rs:320-329](src/python_api.rs)：

| 模式 | 枚举值 | 说明 |
|------|--------|------|
| 静态模式 | `ConnectorMode.STATIC` | 批处理模式，一次性加载所有数据 |
| 流式模式 | `ConnectorMode.STREAMING` | 实时流处理，持续监听数据变化 |

```rust
#[pymethods]
impl PyConnectorMode {
    #[classattr]
    pub const STATIC: ConnectorMode = ConnectorMode::Static;
    #[classattr]
    pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}
```

## 3. 数据源（DataSource）

数据源是连接器系统中负责从外部系统读取数据的组件。在 Pathway 中，数据源通过 Python Subject 接口与 Rust 引擎交互。

### 3.1 PythonSubject 结构

`PythonSubject` 是 Python 层与 Rust 引擎之间的桥梁，封装了数据读取所需的所有回调函数 [资料来源：src/python_api.rs:15-39](src/python_api.rs)：

```rust
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
    pub start: Py<PyAny>,              // 初始化回调
    pub read: Py<PyAny>,               // 读取数据回调
    pub seek: Py<PyAny>,               // 寻址回调（可选）
    pub on_persisted_run: Py<PyAny>,    // 持久化运行回调
    pub end: Py<PyAny>,                // 结束回调
    pub is_internal: bool,             // 是否为内部数据源
    pub deletions_enabled: bool,       // 是否启用删除操作
}
```

### 3.2 数据读取流程

```mermaid
sequenceDiagram
    participant App as 应用代码
    participant DS as DataSource
    participant Subject as PythonSubject
    participant Engine as Pathway Engine
    
    App->>DS: 创建数据源实例
    DS->>Subject: 调用 start() 初始化
    Engine->>Subject: 调用 read() 读取数据
    Subject-->>Engine: 返回数据批次
    loop 流式处理
        Engine->>Subject: 调用 read() 轮询新数据
        Subject-->>Engine: 返回更新数据
    end
    Engine->>Subject: 调用 end() 清理资源
```

## 4. 数据输出（DataSink）

数据 Sink 负责将处理后的数据输出到外部系统。Pathway 提供了多种输出连接器，包括文件系统、数据库和消息队列等。

### 4.1 输出配置参数

| 参数 | 类型 | 说明 |
|------|------|------|
| `host` | string | 服务器主机地址 |
| `port` | int | 服务器端口 |
| `format` | DataFormat | 输出数据格式 |
| `storage` | DataStorage | 存储配置 |
| `create_table` | bool | 是否创建输出表 |

## 5. 支持的连接器类型

### 5.1 云存储连接器

Pathway 原生支持主流云存储服务：

| 服务 | 设置类 | 说明 |
|------|--------|------|
| AWS S3 | `AwsS3Settings` | 亚马逊 S3 对象存储 |
| Azure Blob | `AzureBlobStorageSettings` | 微软 Azure Blob 存储 |

### 5.2 消息队列连接器

| 消息系统 | 设置类 | 说明 |
|----------|--------|------|
| MQTT | `MqttSettings` | IoT 场景常用的轻量级消息协议 |
| Kafka | SchemaRegistrySettings | Confluent Schema Registry 集成 |

### 5.3 数据库连接器

| 数据库 | 设置类 | 说明 |
|--------|--------|------|
| PostgreSQL | `PsqlReplicationSettings` | 支持 CDC（变更数据捕获）复制 |

### 5.4 其他专用连接器

- **ElasticSearch**：`ElasticSearchParams` / `ElasticSearchAuth` — 全文搜索引擎集成
- **Iceberg**：`IcebergCatalogSettings` — Apache Iceberg 表格式支持
- **Schema Registry**：`PySchemaRegistrySettings` — Avro/Protobuf Schema 管理

### 5.5 解析器设置

```rust
m.add_class::<CsvParserSettings>()?;
m.add_class::<ValueField>()?;
m.add_class::<DataStorage>()?;
m.add_class::<DataFormat>()?;
```

## 6. 连接器配置属性

### 6.1 ConnectorProperties

连接器属性类定义了数据源和输出端的通用配置选项 [资料来源：src/python_api.rs:450-465](src/python_api.rs)：

| 属性 | 说明 |
|------|------|
| `columns` | 列定义列表 |
| `primary_key` | 主键列 |
| `date_format` | 日期格式字符串 |
| `value_fields` | 值字段列表 |

### 6.2 ColumnProperties

列属性用于定义单个列的元数据：

```python
pw.ColumnProperties(
    name="column_name",
    type_=pw.Type,
    source=pw.FieldSource.Payload,
    default=None,
    metadata=None
)
```

## 7. 使用示例

### 7.1 RAG 应用中的连接器使用

在问答型 RAG 应用中，Pathway 连接器用于实时索引文档并提供检索服务 [资料来源：examples/projects/question-answering-rag/README.md](examples/projects/question-answering-rag/README.md)：

```python
import pathway as pw

# 创建 HTTP 服务器用于接收查询
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)

# 数据源：文件系统连接器监听 ./data/ 目录
# 文档被分块、向量化后存储
# 输出：通过 REST API 提供 /v1/retrieve 检索接口
```

### 7.2 多智能体 RAG 中的连接器

在多智能体 RAG 架构中，连接器同时承担文档索引和智能体间通信的任务 [资料来源：examples/projects/ag2-multiagent-rag/README.md](examples/projects/ag2-multiagent-rag/README.md)：

```mermaid
graph LR
    A["Documents (./data/)"] -->|实时索引| B["Pathway VectorStoreServer"]
    B -->|HTTP /v1/retrieve| C["AG2 Researcher Agent"]
    C -->|search_documents| B
    B -->|检索结果| D["AG2 Analyst Agent"]
    D -->|综合回答| E["用户"]
```

## 8. 连接器的高级特性

### 8.1 持久化配置

Pathway 支持连接器级别的持久化配置，用于故障恢复和状态管理：

```rust
m.add_class::<PersistenceConfig>()?;
m.add_class::<PyPersistenceMode>()?;
m.add_class::<PySnapshotAccess>()?;
m.add_class::<PySnapshotEvent>()?;
```

### 8.2 会话类型

Pathway 引擎支持不同类型的会话处理 [资料来源：src/python_api.rs:334-345](src/python_api.rs)：

| 会话类型 | 枚举值 | 使用场景 |
|----------|--------|----------|
| 原生会话 | `SessionType.NATIVE` | 标准批处理和流处理 |
| UPSERT 会话 | `SessionType.UPSERT` | 增量更新场景 |

### 8.3 监控与追踪

Pathway 提供了完善的监控机制，包括：

- **TelemetryConfig**：遥测数据收集配置
- **BackfillingThreshold**：回填数据阈值控制
- **EngineTrace**：引擎执行追踪

## 9. 最佳实践

1. **选择合适的连接器模式**：静态数据使用 `STATIC` 模式，实时数据使用 `STREAMING` 模式
2. **配置主键**：为数据源设置主键以支持增量更新和去重
3. **设置超时和重试**：生产环境应配置适当的重试策略
4. **监控连接器状态**：使用 Pathway Dashboard 监控消息数量和延迟
5. **处理删除操作**：根据业务需求通过 `deletions_enabled` 参数控制是否处理数据删除

## 10. 相关文档

- [入门指南](https://pathway.com/developers/user-guide/introduction/welcome)
- [API 文档](https://pathway.com/developers/api-docs/pathway)
- [模板示例](/developers/templates?tab=ai-pipelines)
- [部署指南](#deployment)

---

<a id='connectors-database'></a>

## 数据库连接器

### 相关页面

相关主题：[连接器概述](#connectors-overview), [流数据连接器](#connectors-streaming)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

</details>

# 数据库连接器

## 概述

Pathway 的数据库连接器模块提供了与各类数据库系统交互的能力，支持从外部数据源读取数据并将处理结果写入目标数据库。作为 Pathway ETL 框架的核心组件之一，数据库连接器实现了实时的增量数据同步机制，能够在流处理和批处理场景下保持数据的一致性和时效性。

Pathway 的数据库连接器基于 Rust 引擎构建，结合 Differential Dataflow 计算模型，实现了高效的增量计算能力。这意味着当源数据库中的数据发生变化时，连接器能够自动检测变更并仅处理发生变化的记录，而不是重新处理整个数据集。

## 架构设计

### 连接器类型体系

Pathway 在 `src/python_api.rs` 中定义了一套统一的连接器配置接口，通过 PyO3 绑定暴露给 Python 用户。连接器配置通过 `ConnectorProperties`、`ColumnProperties` 和 `TableProperties` 等类进行封装，支持对数据源的精细化配置。

```python
m.add_class::<ConnectorProperties>()?;
m.add_class::<ColumnProperties>()?;
m.add_class::<TableProperties>()?;
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

### 连接器模式

Pathway 支持两种连接器运行模式，通过 `ConnectorMode` 枚举定义：

| 模式 | 说明 |
|------|------|
| `STATIC` | 静态模式，适用于批处理场景，数据源仅读取一次 |
| `STREAMING` | 流式模式，适用于实时数据处理，支持持续监听数据变更 |

```rust
#[pymethods]
impl PyConnectorMode {
    #[classattr]
    pub const STATIC: ConnectorMode = ConnectorMode::Static;
    #[classattr]
    pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## 支持的数据库类型

### 关系型数据库

Pathway 原生支持多种主流关系型数据库的连接：

- **PostgreSQL**：通过 `PsqlReplicationSettings` 提供复制槽支持，用于捕获数据库变更
- **MySQL**：支持标准的读取连接配置
- **Microsoft SQL Server**：通过 MSSQL 连接器支持企业级应用

### NoSQL 数据库

- **MongoDB**：支持文档型数据库的实时读取和写入
- **Elasticsearch**：提供全文检索场景下的数据索引能力

### 云存储与服务

Pathway 还集成了多种云服务的连接配置：

- **AWS S3**：`AwsS3Settings` 支持从 S3 存储桶读取数据文件
- **Azure Blob Storage**：`AzureBlobStorageSettings` 支持 Azure 云存储
- **Iceberg Catalog**：`IcebergCatalogSettings` 支持数据湖架构

```rust
m.add_class::<AwsS3Settings>()?;
m.add_class::<AzureBlobStorageSettings>()?;
m.add_class::<ElasticSearchParams>()?;
m.add_class::<IcebergCatalogSettings>()?;
m.add_class::<PsqlReplicationSettings>()?;
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## 数据处理流程

```mermaid
graph TD
    A[外部数据库] --> B[连接器读取层]
    B --> C[变更捕获]
    C --> D[Rust 引擎处理]
    D --> E[增量计算]
    E --> F[下游输出]
    
    G[Schema 定义] --> D
    H[连接配置] --> B
```

### 读取流程

1. **连接初始化**：连接器根据配置参数建立与目标数据库的连接
2. **Schema 映射**：通过 `ValueField` 定义输入数据的字段映射和类型转换
3. **变更捕获**：在流式模式下持续监听数据变更事件
4. **数据传输**：变更数据通过 Pathway 引擎进行增量处理

### 写入流程

1. **输出表定义**：通过 `is_output=True` 标记输出计算节点
2. **目标配置**：指定目标数据库的连接信息和写入策略
3. **批量写入**：引擎自动将处理结果批量写入目标系统

## Schema 定义

Pathway 使用 `ValueField` 结构体定义数据的 Schema 映射：

```rust
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

字段属性说明：

| 属性 | 类型 | 说明 |
|------|------|------|
| name | String | 字段名称 |
| type_ | Type | 数据类型 |
| source | FieldSource | 字段来源（Payload/Materialized） |
| default | Option<Value> | 默认值 |
| metadata | Option<String> | 元数据信息 |

## 时间戳与前沿管理

Pathway 通过 `TotalFrontier<T>` 管理数据处理的时间戳前沿，实现精确的增量计算：

```rust
#[pymethods]
impl PyExportedTable {
    fn frontier(&self) -> TotalFrontier<Timestamp> {
        self.inner.frontier()
    }

    fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
        self.inner.snapshot_at(frontier)
    }
}
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

`TotalFrontier` 有两种状态：

- **`At(t)`**：表示处理到达时间戳 `t`，继续处理后续数据
- **`Done`**：表示数据处理完成，不再有新数据到达

```rust
impl<'py, T> FromPyObject<'py> for TotalFrontier<T> {
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        if ob.is_instance_of::<Done>() {
            Ok(TotalFrontier::Done)
        } else {
            Ok(TotalFrontier::At(ob.extract()?))
        }
    }
}
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## 使用示例

### 基本配置流程

```python
import pathway as pw

# 定义输入连接器
class InputSchema(pw.Schema):
    id: int
    value: str

# 配置 PostgreSQL 输入
input_connector = pw.io.postgres.read(
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="password",
    table="source_table",
    schema=InputSchema,
    mode=pw ConnectorMode.STREAMING
)

# 定义处理逻辑
result = input_connector.groupby(pw.this.id).reduce(
    id=pw.this.id,
    count=pw.reducers.count()
)

# 配置输出连接器
pw.io.postgres.write(
    result,
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="password",
    table="output_table"
)

# 运行管道
pw.run()
```

### 与向量存储集成

Pathway 的数据库连接器可以与向量索引功能结合使用，实现 RAG（检索增强生成）场景：

```mermaid
graph LR
    A[PostgreSQL/MySQL] --> B[Pathway 引擎]
    B --> C[向量嵌入]
    C --> D[向量索引]
    D --> E[LLM 查询]
```

## 监控与故障处理

Pathway 的 `PyExportedTable` 提供了内置的监控接口：

```rust
#[pymethods]
impl PyExportedTable {
    fn failed(&self) -> bool {
        self.inner.failed()
    }
}
```

资料来源：[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

通过检查 `failed()` 状态，用户可以及时发现数据处理过程中的异常情况，并采取相应的恢复措施。

## 性能优化建议

1. **批量处理**：利用 Pathway 的增量计算特性，避免全量数据重处理
2. **索引优化**：确保源数据库的相关字段已建立索引
3. **连接池**：合理配置连接池大小以平衡资源使用和吞吐量
4. **Schema 精简**：仅提取必要的字段，减少数据传输量

---

<a id='connectors-streaming'></a>

## 流数据连接器

### 相关页面

相关主题：[连接器概述](#connectors-overview)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs) - Python绑定和Rust连接器接口定义
- [examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md) - RAG项目中的连接器使用示例
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md) - 问答RAG系统架构
- [examples/projects/web-scraping/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/web-scraping/README.md) - Web数据抓取连接器实现
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md) - Pathway框架总体架构
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs) - 时间戳和增量计算引擎
</details>

# 流数据连接器

## 概述

Pathway的流数据连接器（Stream Connectors）是Framework与外部数据源之间进行实时数据交换的核心组件。这些连接器使得Pathway能够从Kafka、NATS、MQTT、RabbitMQ、Redpanda等消息队列系统实时读取数据，并将处理结果写入各种数据sink。

Pathway的连接器系统构建在Differential Dataflow增量计算引擎之上，支持两种主要的运行模式：静态模式（Static）和流式模式（Streaming）。静态模式适用于批处理场景，而流式模式则专为实时数据处理设计，能够处理持续到来的数据更新。连接器系统还支持增量计算的语义，允许系统只处理自上次处理以来的变更，而不是重新处理整个数据集。

## 核心架构

### 连接器模式体系

Pathway定义了一套完整的连接器模式枚举，用于区分不同的数据处理语义：

```python
class ConnectorMode(Enum):
    STATIC = auto()    # 静态批处理模式
    STREAMING = auto() # 实时流处理模式
```

每种模式对应不同的数据处理策略。静态模式下，连接器假设数据源是有限的，处理完成后系统进入完成状态。流式模式下，连接器持续监听数据源，系统永远不会自然结束，除非外部终止。

### PythonSubject核心接口

所有自定义数据源连接器都继承自`PythonSubject`基类，该类定义了数据源的核心接口：

```python
class PythonSubject:
    def __init__(
        self,
        start: Callable[[], None],           # 启动数据源
        read: Callable[[], Iterator[Row]],   # 读取数据行
        seek: Callable[[Any], None],         # 定位到指定位置
        on_persisted_run: Callable[[], None], # 持久化运行回调
        end: Callable[[], bool],             # 检查是否结束
        is_internal: bool,                   # 是否内部数据源
        deletions_enabled: bool              # 是否启用删除
    )
```

资料来源：[src/python_api.rs:21-31]()

`PythonSubject`的设计体现了Pathway连接器的核心哲学：数据源被抽象为一个可以随时启动、读取、定位和检查完成状态的可迭代对象。这种设计使得连接器能够与Differential Dataflow的计算模型无缝集成。

### 字段定义与数据类型

`ValueField`类定义了表中每个字段的元信息，包括名称、类型、来源和默认值：

```python
class ValueField:
    name: str           # 字段名称
    type_: Type         # 数据类型
    source: FieldSource # 字段来源 (默认: Payload)
    default: Optional[Value]  # 默认值
    metadata: Optional[str]   # 元数据
```

`FieldSource`枚举标识了数据的来源类型，最常用的是`FieldSource.Payload`，表示数据来自消息的有效载荷。在复杂的流处理场景中，字段来源还可以标识时间戳、主键等特殊字段。

## 工作原理

### 数据流处理管道

Pathway的流数据连接器遵循一个清晰的数据处理管道：

```mermaid
graph TD
    A[外部数据源] --> B[Connector启动]
    B --> C[数据读取循环]
    C --> D{数据有效?}
    D -->|是| E[解析消息]
    D -->|否| F[跳过/日志]
    E --> G[转换为Pathway Row]
    G --> H[输入到Differential Dataflow]
    H --> I[增量计算]
    I --> J[结果输出到Sink]
    J --> C
    F --> C
```

### 增量计算与时间戳

Pathway的连接器系统深度集成了Differential Dataflow的增量计算能力。每个数据更新都带有时间戳，系统只处理自上次检查点以来的变更。`Timestamp`类型实现了`OriginalOrRetraction`特性，用于区分原始数据和撤回数据：

```python
class Timestamp:
    def is_original(self) -> bool:
        # 偶数时间戳表示原始数据
        return self.0.is_multiple_of(2)
    
    def next_retraction_time(self) -> Self:
        # 奇数时间戳表示撤回
        return Self(self.0 + 1)
```

资料来源：[src/engine/timestamp.rs:45-53]()

这种设计允许连接器高效处理数据的插入、更新和删除操作，而无需重新处理整个数据集。

### 持久化与状态恢复

连接器支持持久化运行模式，通过`on_persisted_run`回调函数实现故障恢复：

1. **检查点保存**：系统在适当的时间点保存当前的处理状态
2. **故障恢复**：重新启动时，从最后一个检查点恢复
3. **数据重放**：根据保存的seek位置，重新从数据源读取未处理的数据

这种机制确保了在分布式环境中，即使发生节点故障，数据也不会丢失，处理可以透明地继续。

## 预置连接器

Pathway提供了多个开箱即用的流数据连接器，覆盖了主流的消息队列系统：

| 连接器 | 协议 | 典型用途 | 文档链接 |
|--------|------|----------|----------|
| Kafka | TCP二进制协议 | 大规模流处理 | [kafka/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/kafka/__init__.py) |
| Redpanda | Kafka兼容API | 高性能流处理 | [redpanda/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/redpanda/__init__.py) |
| NATS | NATS协议 | 轻量级消息传递 | [nats/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/nats/__init__.py) |
| MQTT | MQTT协议 | IoT设备数据采集 | [mqtt/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/mqtt/__init__.py) |
| RabbitMQ | AMQP协议 | 企业消息队列 | [rabbitmq/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/rabbitmq/__init__.py) |

### 连接器配置参数

每种连接器都支持一组通用的配置参数：

| 参数 | 类型 | 说明 | 默认值 |
|------|------|------|--------|
| `mode` | ConnectorMode | 运行模式 | STREAMING |
| `deletions_enabled` | bool | 是否处理删除事件 | false |
| `autocommit_duration` | timedelta | 自动提交间隔 | None |
| `timeout` | timedelta | 连接超时时间 | 30秒 |
| `schema` | ValueField[] | 输出表模式 | 自动推断 |

## 使用场景

### 实时文档索引

Pathway的连接器常用于构建实时RAG（检索增强生成）系统。以下架构展示了典型的用法：

```mermaid
graph LR
    A[文档数据源] --> B[Pathway连接器]
    B --> C[数据预处理]
    C --> D[文本分块]
    D --> E[嵌入模型]
    E --> F[向量索引]
    F --> G[HTTP API]
    G --> H[LLM查询]
```

资料来源：[examples/projects/ag2-multiagent-rag/README.md:12-20]()

在ag2-multiagent-rag项目中，Pathway连接器负责监听文件系统中的文档变化，实时索引新文档并更新向量存储。外部AI代理通过REST API查询相关文档，系统保证始终返回最新的索引结果。

### Web数据抓取管道

Pathway连接器也广泛用于构建数据采集管道：

```mermaid
graph TD
    A[新闻网站] --> B[Web爬虫连接器]
    B --> C[内容解析]
    C --> D[元数据提取]
    D --> E[数据清洗]
    E --> F[JSON Lines输出]
    F --> G[下游处理]
```

资料来源：[examples/projects/web-scraping/README.md:1-50]()

自定义的`NewsScraperSubject`继承自Pathway的`ConnectorSubject`，实现了文章URL作为主键的数据表结构，支持增量更新和去重。

### 企业数据集成

对于企业级应用，Pathway的RabbitMQ和Kafka连接器能够与现有的消息基础设施无缝集成：

```python
import pathway as pw

class MyConnectorSubject(pw.io.ConnectorSubject):
    def __init__(self):
        super().__init__()
        self._client = create_rabbitmq_client()
    
    def start(self):
        self._client.connect()
    
    def read(self):
        while message := self._client.consume():
            yield self._parse_message(message)
    
    def end(self):
        return self._client.is_idle_timeout()
```

## 与Differential Dataflow的集成

Pathway的连接器系统是Differential Dataflow在Python生态系统中的桥接层。Rust核心负责：

1. **数据流的组织**：使用Trace数据结构维护历史状态
2. **增量计算**：只处理实际发生的变更，而非全量数据
3. **多 worker 支持**：通过`pathway spawn --threads N`启用多线程处理

Python API提供了高级抽象，降低了使用门槛，同时保持了Rust引擎的高性能特性。这种设计使得开发者能够用Python编写流处理逻辑，而底层执行效率与原生Rust程序相当。

## 最佳实践

### 模式定义

在创建连接器时，应显式定义输出表的schema，包括所有字段的名称、类型和默认值：

```python
schema = pw.Schema(
    fields={
        "id": pw.FieldType.STRING,
        "content": pw.FieldType.STRING,
        "timestamp": pw.FieldType.DATETIME,
    },
    primary_key=["id"]
)
```

### 错误处理

连接器应实现健壮的错误处理机制：

```python
def read(self):
    try:
        yield from self._fetch_data()
    except TransientError:
        # 临时错误，等待后重试
        time.sleep(1)
        yield from self.read()
    except PermanentError:
        # 永久错误，标记连接器失败
        self._mark_failed()
```

### 性能优化

1. **批量处理**：积累多条消息后再提交，减少I/O次数
2. **背压控制**：当处理速度跟不上数据到达速度时，使用`BackfillingThreshold`配置
3. **连接池复用**：在多worker场景下复用连接，避免资源耗尽

## 总结

Pathway的流数据连接器系统提供了一套完整的数据集成解决方案。通过统一的抽象接口，开发者可以轻松地将各种外部数据源接入Pathway的处理管道。连接器与Differential Dataflow的深度集成确保了即使在处理大规模实时数据时，也能保持高效的增量计算能力。

预置的Kafka、Redpanda、NATS、MQTT和RabbitMQ连接器覆盖了大多数流数据场景，而PythonSubject基类则允许开发者实现完全自定义的数据源连接器，满足各种特殊的集成需求。

---

<a id='trans-table-operations'></a>

## 表操作

### 相关页面

相关主题：[连接与聚合](#trans-joins-aggregations), [基础示例](#quickstart-basic-example)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/internals/table.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/table.py)
- [python/pathway/internals/column.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/column.py)
- [python/pathway/internals/expression.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/expression.py)
- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

</details>

# 表操作

## 概述

Pathway 是一个基于 Python 的 ETL 框架，其核心数据模型围绕**表（Table）**展开。表操作是 Pathway 数据处理流水线的基本构建单元，提供了对结构化数据进行转换、过滤、聚合和连接的能力。

Pathway 的表操作具有以下特性：

- **统一批流处理**：同一套 API 可处理静态数据集和实时流数据
- **增量计算**：基于 Rust 的 Differential Dataflow 引擎实现高效增量更新
- **声明式语法**：通过 Python 表达式定义数据转换逻辑

资料来源：[README.md:1-20]()

## 核心数据结构

### Table 类

`Table` 是 Pathway 中的核心数据结构，代表一个具有特定模式的分布式表格。

```mermaid
classDiagram
    class Table {
        +scope: Py[Scope]
        +handle: TableHandle
        +_is_deleted: bool
        +_app_id: str
        +__getitem__(key) TableSlice
        +select(*args) Table
        +update_rows(*args) Table
        +update_cells(*args) Table
        +filter(predicate) Table
        +groupby(*args) Table
        +join(*args) Table
        +join_inner(*args) Table
        +concat(*tables) Table
    }
    
    class LegacyTable {
        +universe: Universe
        +columns: Vec~Column~
        +to_engine() (UniverseHandle, Vec~ColumnHandle~)
        +from_handles(scope, handles) LegacyTable
    }
    
    Table --> Scope : 关联
    LegacyTable --> Universe : 关联
    LegacyTable --> Column : 包含
```

### 列（Column）与字段（ValueField）

每张表由多个列组成，每个列具有名称、类型和来源信息。

```python
# ValueField 定义 - 资料来源：src/python_api.rs:40-55
@dataclass
class ValueField:
    name: str                    # 列名
    type_: Type                  # 数据类型
    source: FieldSource          # 字段来源（Payload/Materialized）
    default: Optional[Value]     # 默认值
    metadata: Optional[str]      # 元数据
```

### 类型系统

| 类型 | 说明 | 对应 Rust 类型 |
|------|------|---------------|
| `INT` | 64位整数 | `i64` |
| `FLOAT` | 双精度浮点 | `f64` |
| `STR` | 字符串 | `String` |
| `BOOL` | 布尔值 | `bool` |
| `ANY` | 任意类型 | - |

资料来源：[src/python_api.rs:40-55]()

## 表的创建与初始化

### 从输入连接器创建

Pathway 提供多种连接器用于从不同数据源创建表：

```python
# 从文件系统创建表
table = pw.io.fs.read(
    "./data",
    schema=MySchema,
    format="csv"
)

# 从 HTTP 流读取
table = pw.io.http.read(
    "http://api.example.com/stream",
    schema=MySchema,
    json_field_paths={"data": "$.records[*]"}
)
```

### 表的内部状态

```mermaid
graph TD
    A[输入数据] --> B[Python Subject]
    B --> C[Raw Updates]
    C --> D[Rust Engine]
    D --> E[Universe Handle]
    D --> F[Column Handles]
    E --> G[Table]
    F --> G
    G --> H[快照 Snapshot]
```

表的内部状态由 `Universe` 和 `Column` 集合组成：

```python
# LegacyTable 内部结构 - 资料来源：src/python_api.rs:180-210
class LegacyTable:
    def __init__(self, universe, columns):
        self.universe = universe    # 行的唯一标识集合
        self.columns = columns      # 列的列表
    
    def to_engine(self):
        """转换为引擎句柄"""
        universe = self.universe.get()
        column_handles = [c.get().handle for c in self.columns]
        return (universe.handle, column_handles)
```

## 基础表操作

### 选择操作（select）

`select` 用于从表中提取指定列，可进行计算和重命名：

```python
# 基本选择
result = table.select(
    id=table.user_id,
    name=table.name,
    full_name=table.first_name + " " + table.last_name  # 表达式计算
)
```

### 过滤操作（filter）

`filter` 根据条件筛选行：

```python
# 过滤条件
active_users = users.filter(users.status == "active")
premium_orders = orders.filter(orders.amount > 100)
```

### 行更新（update_rows）

向表中添加或更新行：

```python
# 添加新行
updated_table = table.update_rows(
    pw.table.builder.make_table(
        id=[1, 2],
        name=["Alice", "Bob"],
        value=[10, 20]
    )
)
```

### 单元格更新（update_cells）

更新特定列的值：

```python
# 更新特定列
result = table.update_cells(
    status=pw.this.status.upper(),  # 将 status 列转为大写
    _id=table.id                     # 保留原 id 列
)
```

资料来源：[python/pathway/internals/table.py]()

## 聚合与分组

### 分组聚合（groupby）

`groupby` 按照指定键分组并执行聚合操作：

```mermaid
graph LR
    A[输入表] --> B[GroupBy Key]
    B --> C[每组聚合]
    C --> D[聚合函数]
    D --> E[输出表]
    
    subgraph 聚合函数
        F[count]
        G[sum]
        H[min/max]
        I[collect]
    end
    D --> F
    D --> G
    D --> H
    D --> I
```

```python
# 按类别聚合
category_stats = orders.groupby(orders.category).reduce(
    orders.category,
    total_amount=pw.reducers.sum(orders.amount),
    order_count=pw.reducers.count(),
    avg_price=pw.reducers.mean(orders.price)
)
```

### 可用聚合函数

| 函数 | 说明 | 示例 |
|------|------|------|
| `count()` | 计数 | `count()` |
| `sum(expr)` | 求和 | `sum(table.amount)` |
| `mean(expr)` | 平均值 | `mean(table.score)` |
| `min(expr)` | 最小值 | `min(table.latency)` |
| `max(expr)` | 最大值 | `max(table.price)` |
| `collect(expr)` | 收集为列表 | `collect(table.items)` |
| `sorted_tuple(expr)` | 排序元组 | `sorted_tuple(table.events)` |

## 表连接操作

### 连接的种类

| 连接类型 | 说明 | 空值处理 |
|----------|------|----------|
| `join` / `join_inner` | 内连接 | 丢弃无匹配的行 |
| `join_left` | 左外连接 | 保留左表所有行 |
| `join_right` | 右外连接 | 保留右表所有行 |
| `join_outer` | 全外连接 | 保留所有行 |

### 基本连接语法

```python
# 内连接
result = table1.join(
    table2,
    table1.key == table2.key
)

# 指定输出列
result = table1.join(
    table2,
    table1.id == table2.user_id,
    left=pw.left.id,
    right=pw.right.user_id,
    prefix="left_"
)
```

### 复杂连接场景

```python
# 多条件连接
orders_enriched = orders.join(
    customers,
    (orders.customer_id == customers.id) & 
    (orders.region == customers.region)
)
```

## 时间与版本控制

### 时间戳机制

Pathway 使用特殊的时间戳系统来追踪数据的版本和变更：

```python
# Timestamp 实现 - 资料来源：src/engine/timestamp.rs:1-30
impl Timestamp {
    fn followed_by(&self, other: &Self) -> Option<Self> {
        self.0.followed_by(&other.0).map(Self)
    }
}
```

### 数据变更追踪

```
时间线:  ─────────────────────────────────────────────►
         t0      t1      t2      t3      t4
         │       │       │       │       │
         ▼       ▼       ▼       ▼       ▼
数据:   [v0]   [v1]   [v2]   [撤回]   [v4]
              (修改)  (修改)  (删除)
```

Pathway 使用 **Differential Dataflow** 实现增量更新，仅处理发生变化的数据。

## 表的快照与导出

### 快照操作（snapshot）

获取表在特定时间点的状态：

```python
# 获取当前快照
snapshot_data = table.snapshot()

# 在特定前沿时间获取快照 - 资料来源：src/python_api.rs:220-240
frontier = TotalFrontier::At(timestamp)
snapshot = exported_table.snapshot_at(frontier)
```

### 外部索引集成

```python
# 创建向量索引用于 RAG
table = pw.Table.from_columns(
    id=range(1000),
    content=texts,
    vector=embeddings
)

# 构建外部索引
table = table.groupby(table.id).reduce(
    table.id,
    content=pw.reducers.torch(
        "embedding",
        vector=table.vector,
        mode="mean"
    )
)
```

## 最佳实践

### 性能优化

1. **合理设计模式**：避免过度规范化
2. **使用适当的数据类型**：优先使用原生类型而非字符串
3. **批量操作**：优先使用 `update_rows` 而非逐行更新
4. **索引列**：在频繁连接的列上建立索引

### 常见模式

```python
# 模式1：流式处理 + 批处理混合
stream_table = pw.io.kafka.read(...)
batch_table = pw.io.fs.read("./backup", ...)

combined = stream_table.concat(batch_table)
```

```python
# 模式2：滑动窗口聚合
windowed = events.groupby(
    events.session_id
).reduce(
    events.session_id,
    time_range=pw.reducers.count(),
    events_list=pw.reducers.collect(
        pw.this.event_data
    )[-100:]  # 最近100个事件
)
```

## 与其他模块的关系

```mermaid
graph TD
    Table[表操作] --> Column[列操作]
    Table --> Expression[表达式]
    Table --> Connector[连接器]
    Column --> Expression
    Expression --> Compute[计算引擎]
    Connector --> Input[输入数据]
    Compute --> Output[输出数据]
    Compute --> Persistence[持久化]
```

- **列操作**（`column.py`）：定义列级别的表达式和转换
- **表达式**（`expression.py`）：处理数据计算和条件逻辑
- **连接器**（`pw.io.*`）：负责数据的输入输出

## 错误处理

### 失败状态检测

```python
# 检查表是否处于失败状态
if table.failed():
    logger.error("数据处理失败，请检查输入数据")
```

### 追踪与调试

```python
# 获取错误追踪信息
class Trace:
    file_name: str
    line_number: int
    line: str
    function: str
```

## 小结

Pathway 的表操作模块构成了整个框架的核心，提供了：

- **统一的数据模型**：通过 `Table` 类实现批流统一的表格操作
- **声明式的转换语法**：使用 Python 表达式定义复杂的数据处理逻辑
- **高效的增量计算**：基于 Differential Dataflow 的增量更新机制
- **丰富的连接和聚合能力**：支持多种连接类型和聚合函数

熟练掌握表操作是构建高效 Pathway 数据处理流水线的基础。

---

<a id='trans-joins-aggregations'></a>

## 连接与聚合

### 相关页面

相关主题：[表操作](#trans-table-operations)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/internals/joins.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/joins.py)
- [python/pathway/internals/groupbys.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/groupbys.py)
- [python/pathway/internals/reducers.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/reducers.py)
- [python/pathway/internals/custom_reducers.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/custom_reducers.py)
</details>

# 连接与聚合

Pathway 是一个基于 Python 的 ETL 框架，其核心由 Rust 引擎驱动，底层采用 Differential Dataflow 实现增量计算。**连接（Join）与聚合（Aggregation）** 是 Pathway 数据处理管线中的两个核心操作，它们使得实时流处理和批处理能够以统一的方式进行。

---

## 1. 概述

在 Pathway 中，连接与聚合承担着数据管道中的关键转换角色：

| 功能 | 作用 | 典型场景 |
|------|------|----------|
| **连接（Join）** | 将两个或多个数据源按指定键进行关联，产生新的组合记录 | 实时关联用户行为与配置文件 |
| **聚合（Aggregation）** | 按分组键对数据进行汇总计算（计数、求和、平均等） | 统计每小时的访问量 |
| **自定义聚合（Custom Reducers）** | 支持用户定义的聚合逻辑 | 复杂业务规则下的数据汇总 |

Pathway 的连接与聚合操作具有以下特性：

- **增量计算**：仅处理自上次计算以来发生变化的数据，而非全量重算
- **幂等性**：相同输入产生相同输出，支持重放（replay）
- **统一 API**：流式数据和静态数据使用相同代码
- **Rust 引擎加速**：核心计算在 Rust 层执行，保证高性能

> 资料来源：[README.md]() 中关于 "incremental computation" 和 "stream processing" 的描述

---

## 2. 架构设计

### 2.1 核心层次结构

```mermaid
graph TD
    A[Python API 层] --> B[Python Internals 层]
    B --> C[Rust 引擎层]
    C --> D[Differential Dataflow]
    D --> E[Timely Dataflow]
    
    F[joins.py] --> B
    G[groupbys.py] --> B
    H[reducers.py] --> B
    I[custom_reducers.py] --> B
    
    J[python_api.rs] --> C
    K[dataflow.rs] --> C
```

Pathway 的连接与聚合功能跨越三个主要层次：

| 层次 | 文件/模块 | 职责 |
|------|-----------|------|
| **Python API** | `pathway/internals/*.py` | 提供用户友好的 Python 接口 |
| **Python Internals** | `joins.py`, `groupbys.py`, `reducers.py` | 实现连接、分组、聚合的 Python 逻辑 |
| **Rust 引擎** | `src/python_api.rs`, `src/engine/dataflow.rs` | 底层计算引擎绑定 |

### 2.2 连接操作的数据流

```mermaid
graph LR
    A[Table A] --> D[Join 操作]
    B[Table B] --> D
    D --> E[Arrangement 索引]
    E --> F[匹配键值对]
    F --> G[结果 Collection]
    
    H[增量更新] --> E
    I[键删除] --> F
```

Pathway 使用 Differential Dataflow 的 `join_core` 方法实现连接。连接过程涉及数据的**排列（Arrangement）**，这是实现高效增量连接的关键数据结构。

> 资料来源：[external/differential-dataflow/src/operators/join.rs:28-42]() 中 `join_core` 方法定义

---

## 3. 连接（Join）操作详解

### 3.1 Join 方法签名与参数

根据 Pathway 的架构，Join 操作在 Rust 层暴露为 Python 可调用的接口：

| 参数 | 类型 | 说明 |
|------|------|------|
| `stream2` | `Arranged<G, Tr2>` | 第二个输入流（已排列） |
| `result` | `FnMut(&K, &V, &Tr2::Val) -> I` | 结果生成函数 |
| `key` | - | 连接键提取（隐含在方法中） |

```rust
// 资料来源：external/differential-dataflow/src/operators/join.rs
fn join_core<Tr2, I, L>(&self, stream2: &Arranged<G, Tr2>, result: L) -> Collection<G, I::Item, R::Output>
where
    Tr2: TraceReader<Key=K, Time=G::Timestamp> + Clone + 'static,
    R: Multiply<Tr2::R>,
    L: FnMut(&K, &V, &Tr2::Val) -> I + 'static,
```

### 3.2 不安全变体：join_core_internal_unsafe

除了标准的 `join_core`，Differential Dataflow 还提供了**不安全变体**，允许更灵活的处理：

```rust
// 资料来源：external/differential-dataflow/src/operators/join.rs
fn join_core_internal_unsafe<Tr2, I, D, ROut, L>(
    &self,
    stream2: &Arranged<G, Tr2>,
    result: L
) -> Collection<G, D, ROut>
where
    // ... 允许返回 (data, time, diff) 三元组
    L: FnMut(&K, &V, &Tr2::Val, &G::Timestamp, &R, &Tr2::R) -> I + 'static,
```

此方法允许回调函数访问时间戳和差分（diff）信息，适用于需要精细控制的场景。

### 3.3 Arrangement 机制

Arrangement 是 Pathway 高效连接的核心。每个参与连接的表都需要预先建立索引：

```mermaid
graph TD
    A[输入数据] --> B[ArrangeByKey]
    B --> C[OrderedLayer 存储结构]
    C --> D[键索引 BTreeMap]
    D --> E[值列表 Vec]
    
    F[新数据到达] --> G[增量更新 Arrangement]
    G --> C
```

Pathway 在 `ordered.rs` 中扩展了 Differential Dataflow 的有序层结构：

```rust
// 资料来源：external/differential-dataflow/src/trace/layers/ordered.rs
fn step(&mut self, storage: &OrderedLayer<K, L, O, C>) {
    // [Pathway extension]: pos_nonnegative indicates that index was moved to 
    // [Pathway extension]: negative value, here we undo it
    if self.pos_nonnegative { 
        self.pos += 1; 
    } else {
        self.pos_nonnegative = true;
    }
    // ...
}
```

> 资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs]() 中 `import_named` 方法

---

## 4. 聚合（Aggregation）操作详解

### 4.1 聚合方法概述

Pathway 通过 `reduce` 操作族实现聚合功能。Differential Dataflow 提供了多种聚合变体：

| 方法 | 用途 |
|------|------|
| `reduce` | 基础聚合操作 |
| `reduce_abelian` | 阿贝尔群（Abelian）聚合，优化数学运算 |
| `threshold` | 阈值过滤后聚合 |
| `count` | 计数聚合 |

### 4.2 Count 聚合实现

计数是常用的聚合操作，Pathway 的实现如下：

```rust
// 资料来源：external/differential-dataflow/src/operators/reduce.rs
fn count(&self) -> Collection<G, (K, R), isize> {
    self.count_core()
}
```

### 4.3 Threshold 聚合

阈值聚合允许在聚合前进行条件过滤：

```rust
// 资料来源：external/differential-dataflow/src/operators/reduce.rs
fn threshold_named<R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static>(
    &self,
    name: &str,
    mut thresh: F
) -> Collection<G, K, R2> {
    self.reduce_abelian::<_, DefaultKeyTrace<_, _, _>>(
        name,
        move |k, s, t| t.push(((), thresh(k, &s[0].1)))
    )
    .as_collection(|k, _| k.clone())
}
```

---

## 5. Universe 与 Keys 机制

### 5.1 Universe 的作用

Universe 是 Pathway 引擎中管理数据键空间的核心结构：

```rust
// 资料来源：src/engine/dataflow.rs
struct Universe<S: MaybeTotalScope> {
    data: Rc<UniverseData<S>>,
}
```

Universe 提供了三种数据来源方式：

| 来源类型 | 说明 |
|----------|------|
| `FromCollection` | 从 Collection 直接创建 |
| `FromArranged` | 从已排列的数据创建 |
| `Consolidated` | 合并后的数据 |

```rust
// 资料来源：src/engine/dataflow.rs
impl<S: MaybeTotalScope> Universe<S> {
    fn from_collection(keys: Keys<S>) -> Self {
        let data = Rc::new(UniverseData::from_collection(keys));
        Self::new(data)
    }

    fn from_arranged(keys: KeysArranged<S>) -> Self {
        let data = Rc::new(UniverseData::from_arranged(keys));
        Self::new(data)
    }
}
```

### 5.2 Keys 数据结构

Keys 结构体提供了统一的接口来访问数据键：

```mermaid
graph TD
    A[Keys 枚举] --> B[FromCollection]
    A --> C[FromArranged]
    
    B --> D[Keys::collection]
    B --> E[Keys::arranged]
    B --> F[Keys::consolidated]
    
    C --> D
    C --> E
    
    E --> G[KeysArranged]
    G --> H[Arranged 索引]
```

---

## 6. Python API 层

### 6.1 Computer 结构

在 Rust-Python 绑定层，聚合逻辑通过 `Computer` 结构表示：

```rust
// 资料来源：src/python_api.rs
#[pyclass(module = "pathway.engine")]
struct Computer {
    fun: Py<PyAny>,
    dtype: Py<PyAny>,
    is_output: bool,
    is_method: bool,
    universe: Py<Universe>,
    data: Value,
    data_column: Option<Py<Column>>,
}
```

`Computer` 的创建通过 `from_raising_fun` 方法：

```rust
// 资料来源：src/python_api.rs
#[pymethods]
impl Computer {
    #[new]
    #[pyo3(signature = (
        fun,
        dtype,
        is_output,
        is_method,
        universe,
        data = Value::None,
        data_column = None,
    ))]
    pub fn from_raising_fun(
        py: Python,
        fun: Py<PyAny>,
        dtype: Py<PyAny>,
        is_output: bool,
        is_method: bool,
        universe: Py<Universe>,
        data: Value,
        data_column: Option<Py<Column>>,
    ) -> PyResult<Py<Self>> {
        // ...
    }
}
```

### 6.2 时间戳与增量计算

Pathway 使用 `Timestamp` 类型管理增量计算的时间维度：

```rust
// 资料来源：src/engine/timestamp.rs
impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}

impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}
```

时间戳设计遵循以下规则：
- **偶数时间戳**：表示原始记录（original）
- **奇数时间戳**：表示删除/撤回记录（retraction）

---

## 7. 使用示例

### 7.1 基础连接

```python
import pathway as pw

# 定义两个输入表
orders = pw.io.csv.read("./orders.csv", schema=OrderSchema)
products = pw.io.csv.read("./products.csv", schema=ProductSchema)

# 连接两个表
result = orders.join(
    products,
    orders.product_id == products.id
).select(
    order_id=orders.id,
    product_name=products.name,
    quantity=orders.quantity
)
```

### 7.2 分组聚合

```python
# 按产品分组统计订单数量
sales_summary = orders.groupby(orders.product_id).reduce(
    orders.product_id,
    total_quantity=pw.reducers.sum(orders.quantity),
    order_count=pw.reducers.count()
)
```

### 7.3 自定义聚合

```python
import pathway as pw

@pw.reducer
def weighted_average(values, weights):
    total_weight = sum(weights)
    if total_weight == 0:
        return 0.0
    return sum(v * w for v, w in zip(values, weights)) / total_weight

# 使用自定义聚合
result = data.groupby(data.category).reduce(
    category=data.category,
    avg_rating=weighted_average(data.rating, data.importance)
)
```

---

## 8. 性能优化建议

### 8.1 Arrangement 复用

对于需要多次连接的表，预先建立 Arrangement 可以显著提升性能：

| 策略 | 适用场景 | 收益 |
|------|----------|------|
| 预排序（Pre-arrange） | 同一表参与多次连接 | 减少重复索引开销 |
| 键压缩 | 大键值数据 | 降低内存占用 |
| 分区（Partitioning） | 超大规模数据 | 并行处理加速 |

### 8.2 增量计算优势

Pathway 的增量计算模型特别适合以下场景：

```mermaid
graph LR
    A[新数据到达] --> B{检查变更}
    B --> C[仅处理受影响键]
    C --> D[更新结果]
    D --> E[增量输出]
    
    F[传统批处理] --> G[全量重算]
    G --> H[完整输出]
```

---

## 9. 相关源码文件索引

| 文件路径 | 功能描述 |
|----------|----------|
| `python/pathway/internals/joins.py` | Python 层 Join 操作实现 |
| `python/pathway/internals/groupbys.py` | Python 层分组操作实现 |
| `python/pathway/internals/reducers.py` | 内置聚合函数 |
| `python/pathway/internals/custom_reducers.py` | 自定义聚合装饰器 |
| `src/python_api.rs` | Rust-Python 绑定层 |
| `src/engine/dataflow.rs` | 引擎核心数据结构 |
| `src/engine/timestamp.rs` | 时间戳管理 |
| `external/differential-dataflow/src/operators/join.rs` | 底层 Join 操作 |
| `external/differential-dataflow/src/operators/reduce.rs` | 底层聚合操作 |
| `external/differential-dataflow/src/operators/arrange/agent.rs` | Arrangement 管理 |

---

## 10. 总结

Pathway 的连接与聚合系统是一个精心设计的分层架构：

1. **Python API 层**提供直观易用的接口
2. **Python Internals 层**处理业务逻辑与类型转换
3. **Rust 引擎层**基于 Differential Dataflow 实现高效增量计算
4. **Timely Dataflow 层**提供底层并行计算支持

这种设计使得用户能够以简洁的 Python 代码表达复杂的数据处理逻辑，同时享受 Rust 引擎带来的高性能和增量计算的效率优势。

---

---

## Doramagic 踩坑日志

项目：pathwaycom/pathway

摘要：发现 22 个潜在踩坑项，其中 2 个为 high/blocking；最高优先级：安装坑 - 来源证据：[Bug]: TypeError: Cannot instantiate typing.Any。

## 1. 安装坑 · 来源证据：[Bug]: TypeError: Cannot instantiate typing.Any

- 严重度：high
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：[Bug]: TypeError: Cannot instantiate typing.Any
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_d63b2462b18449c4a2c02bb5e02a96c8 | https://github.com/pathwaycom/pathway/issues/227 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 2. 安全/权限坑 · 来源证据：Entra Authentication Support (credential handler and/or password callback)

- 严重度：high
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Entra Authentication Support (credential handler and/or password callback)
- 对用户的影响：可能影响授权、密钥配置或安全边界。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e7c65f4d8bce4b1a9c99accbf0457633 | https://github.com/pathwaycom/pathway/issues/230 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 3. 安装坑 · 来源证据：Automated schema exploration in input connectors

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Automated schema exploration in input connectors
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_74b91feb391f4b0298216d2a48fe5abe | https://github.com/pathwaycom/pathway/issues/224 | 来源类型 github_issue 暴露的待验证使用条件。

## 4. 安装坑 · 来源证据：Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_ab5d2b2076034999bfaed612a3d95d60 | https://github.com/pathwaycom/pathway/issues/232 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 5. 安装坑 · 来源证据：Improve watermarks in POSIX-like objects tracker

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Improve watermarks in POSIX-like objects tracker
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_38f29b1fba3b41cc99b1364d15ef7213 | https://github.com/pathwaycom/pathway/issues/225 | 来源讨论提到 node 相关条件，需在安装/试用前复核。

## 6. 安装坑 · 来源证据：Persistence in `iterate` operator

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Persistence in `iterate` operator
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e642d639ebbc4382b242028ef89ec236 | https://github.com/pathwaycom/pathway/issues/214 | 来源类型 github_issue 暴露的待验证使用条件。

## 7. 安装坑 · 来源证据：Support LEANN for RAG pipelines

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Support LEANN for RAG pipelines
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_5d99dd1cfc794b299633b6c5dc9dd33b | https://github.com/pathwaycom/pathway/issues/173 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 8. 安装坑 · 来源证据：Support MongoDB Atlas in pw.io.mongodb

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Support MongoDB Atlas in pw.io.mongodb
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_fa1d6e2e977a45d99c414724681f0f32 | https://github.com/pathwaycom/pathway/issues/221 | 来源类型 github_issue 暴露的待验证使用条件。

## 9. 安装坑 · 来源证据：feat: Add Microsoft SQL Server (MSSQL) connector

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：feat: Add Microsoft SQL Server (MSSQL) connector
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_370bcc864a314734b602f01d3d444ef9 | https://github.com/pathwaycom/pathway/issues/204 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 10. 安装坑 · 来源证据：v0.27.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.27.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_8d6905d8719c488cbcbb821e794add26 | https://github.com/pathwaycom/pathway/releases/tag/v0.27.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 11. 安装坑 · 来源证据：v0.29.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.29.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_b25babd3e3cc49108e56daaaaae8c5bf | https://github.com/pathwaycom/pathway/releases/tag/v0.29.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 12. 安装坑 · 来源证据：v0.30.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.30.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_0291ee068e4e4d38aa86d0fd433b960a | https://github.com/pathwaycom/pathway/releases/tag/v0.30.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 13. 配置坑 · 来源证据：v0.28.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个配置相关的待验证问题：v0.28.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_708184d9c8ac445d923c82d38af7bd19 | https://github.com/pathwaycom/pathway/releases/tag/v0.28.0 | 来源类型 github_release 暴露的待验证使用条件。

## 14. 配置坑 · 来源证据：v0.30.1

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个配置相关的待验证问题：v0.30.1
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e7a19bd03a49499e987781160e4b76f0 | https://github.com/pathwaycom/pathway/releases/tag/v0.30.1 | 来源类型 github_release 暴露的待验证使用条件。

## 15. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | github_repo:571186647 | https://github.com/pathwaycom/pathway | README/documentation is current enough for a first validation pass.

## 16. 运行坑 · 来源证据：`pw.io.mssql.read` needs LSN persistence

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个运行相关的待验证问题：`pw.io.mssql.read` needs LSN persistence
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_59d927a667a843ccb7d0a4cd147a1dc0 | https://github.com/pathwaycom/pathway/issues/218 | 来源类型 github_issue 暴露的待验证使用条件。

## 17. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | last_activity_observed missing

## 18. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

## 19. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

## 20. 安全/权限坑 · 来源证据：Support encryption in Kinesis connectors

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Support encryption in Kinesis connectors
- 对用户的影响：可能影响授权、密钥配置或安全边界。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_f29097c3eced4a209e7c80971aeea40c | https://github.com/pathwaycom/pathway/issues/223 | 来源类型 github_issue 暴露的待验证使用条件。

## 21. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | issue_or_pr_quality=unknown

## 22. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | release_recency=unknown

<!-- canonical_name: pathwaycom/pathway; human_manual_source: deepwiki_human_wiki -->
