Doramagic 项目包 · 项目说明书
pathway 项目
生成时间:2026-05-16 20:44:44 UTC
Pathway简介
Pathway是一个Python ETL框架,专为流处理(stream processing)、实时分析(real-time analytics)、LLM管道(LLM pipelines)和检索增强生成(RAG)场景设计。资料来源:[README.md:1]()
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
项目概述
Pathway是一个Python ETL框架,专为流处理(stream processing)、实时分析(real-time analytics)、LLM管道(LLM pipelines)和检索增强生成(RAG)场景设计。资料来源:README.md:1
Pathway的核心优势在于其易用的Python API,允许用户无缝集成最喜欢的Python机器学习库。同时,Pathway代码具有极高的通用性和鲁棒性,可以同时用于开发和生产环境,有效处理批处理(batch)和流式数据(streaming data)。资料来源:README.md:3
Pathway的一个显著特点是代码一致性:同一套代码可用于本地开发、CI/CD测试、运行批处理任务、处理流回放(stream replays)以及处理数据流。资料来源:README.md:4
核心技术架构
Pathway的技术架构采用分层设计,融合了多种分布式计算范式:
graph TD
A[Python API层] --> B[PyO3绑定层]
B --> C[Rust执行引擎]
C --> D[Timely Dataflow分布式计算]
C --> E[Differential Dataflow增量计算]
D --> F[增量数据流处理]
E --> FPython API层
Pathway提供简洁的Python接口,通过PyO3实现Python与Rust的双向绑定。Python API层支持多种数据类型和连接器配置。资料来源:src/python_api.rs:1-80
核心Python绑定类包括:
| 类名 | 功能说明 |
|---|---|
PythonSubject | Python主题定义,支持start、read、seek等回调 |
ValueField | 值字段定义,包含名称、类型、来源和默认值 |
Table | 数据表核心抽象 |
ConnectorProperties | 连接器通用属性 |
CsvParserSettings | CSV解析器配置 |
AwsS3Settings | AWS S3连接配置 |
Rust执行引擎
Pathway的执行引擎由Rust实现,基于Differential Dataflow和Timely Dataflow构建。Rust引擎负责高性能的数据处理和计算。资料来源:README.md:5
引擎通过pymodule导出Python可调用的类和函数,实现Python语法的易用性与Rust的高性能完美结合。资料来源:src/python_api.rs:100-150
Timely Dataflow分布式计算
Pathway使用Timely Dataflow作为底层分布式计算框架。Timely Dataflow提供了数据流图的处理能力,支持并行和分布式计算。资料来源:external/timely-dataflow/timely/src/progress/subgraph.rs:1-30
Timely Dataflow的核心组件包括:
graph LR
A[数据输入] --> B[Scope子图]
B --> C[PerOperatorState]
C --> D[Antichain进度跟踪]
D --> E[ChangeBatch消息]
E --> F[数据输出]关键数据结构:
- Activations:共享激活状态管理
- MutableAntichain:可变反链,用于跟踪进度边界
- ChangeBatch:变化批次,用于记录数据变化
Differential Dataflow增量计算
Differential Dataflow是Pathway实现增量计算的核心技术。与传统批处理不同,Differential Dataflow能够智能处理数据的变化,仅重新计算受影响的部分。资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:1-30
#### 轨迹追踪器(Trace)
Pathway使用轨迹(Trace)来记录历史数据状态,支持时间旅行查询和增量更新。资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:1-30
graph TD
A[数据变化] --> B[Trace记录]
B --> C[时间戳管理]
C --> D[Antichain since边界]
D --> E[增量计算引擎]
E --> F[输出更新]#### 有序批处理
Pathway实现了高效的有序批处理结构OrdKeyBatch和OrdValBatch,用于存储键值对的时间序列数据。资料来源:external/differential-dataflow/src/trace/implementations/ord.rs:1-50
| 组件 | 说明 |
|---|---|
OrdKeyCursor | 有序键游标,用于遍历键数据 |
OrdValCursor | 有序键值游标,用于遍历键值对 |
BatchContainer | 批量容器接口 |
MergeBuilder | 合并构建器 |
主要功能特性
连接器支持
Pathway提供丰富的连接器支持,覆盖多种数据源和数据格式:
| 连接器类型 | 支持的数据源 |
|---|---|
| 文件系统 | CSV、Parquet等 |
| 云存储 | AWS S3、Azure Blob Storage |
| 数据库 | PostgreSQL(包括复制支持) |
| 搜索引擎 | Elasticsearch |
| 流处理 | MQTT、Kafka、Debezium |
| 其他 | Schema Registry、Iceberg |
PostgreSQL连接器
Pathway的PostgreSQL连接器支持完整的二进制协议,实现高效的数据读写:
- 支持多种数据类型映射(BOOL、INT、Float、TEXT、JSONB等)
- 二进制数组格式写入
- 复制槽(Replication Slot)支持逻辑复制
- Debezium CDC集成
资料来源:src/connectors/postgres.rs:1-60
数据格式与解析
Pathway提供灵活的数据格式配置,包括:
CsvParserSettings(
name=field_name,
type_=field_type,
source=FieldSource.Payload,
default=None
)
支持的数据类型包括:INT、FLOAT、STRING、BOOL、TIMESTAMP、JSON等。
持久化与状态管理
Pathway支持灵活的数据持久化配置:
| 配置项 | 说明 |
|---|---|
PersistenceConfig | 持久化基础配置 |
BackfillingThreshold | 回填阈值配置 |
PySnapshotAccess | 快照访问模式 |
PySnapshotEvent | 快照事件类型 |
应用场景
ETL管道
Pathway特别适合构建ETL(Extract-Transform-Load)管道,支持从多种数据源提取数据、进行转换处理、加载到目标系统。
graph LR
A[数据源] -->|Extract| B[Pathway管道]
B -->|Transform| C[增量处理]
C -->|Load| D[目标系统]实时分析
基于Differential Dataflow的增量计算能力,Pathway能够高效处理实时数据分析任务,仅处理数据流中的增量变化部分。
LLM管道与RAG
Pathway为构建LLM(Large Language Model)管道和RAG(Retrieval-Augmented Generation)应用提供专用支持,能够实时索引和检索外部数据源。
开发与部署
开发环境
Pathway支持本地开发环境,开发者可以使用熟悉的Python工具链进行开发。资料来源:examples/templates/el-pipeline/README.md:1-20
许可证
Pathway采用BSL(Business Source License)许可证发布,允许非商业使用和评估。商业使用需要获取正式许可证。资料来源:README.md:45
许可证配置
部署时需要设置环境变量:
export PATHWAY_LICENSE_KEY=your_pathway_key
技术优势总结
| 优势 | 描述 |
|---|---|
| 高性能 | Rust引擎提供接近原生的执行效率 |
| 增量计算 | Differential Dataflow避免重复计算 |
| 统一API | Python接口简化开发,Rust保证性能 |
| 多数据源 | 丰富的内置连接器支持 |
| 可扩展性 | 基于Timely Dataflow支持分布式部署 |
| 开发友好 | 支持本地开发和生产环境一致体验 |
资料来源:[src/python_api.rs:80-120]()
安装指南
Pathway 是一个基于 Python 的 ETL 框架,由 Rust 引擎驱动,支持流处理和实时分析。本指南将帮助你在不同环境下正确安装和配置 Pathway。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
系统要求
硬件和操作系统要求
| 要求类型 | 最低配置 | 推荐配置 |
|---|---|---|
| 处理器 | 2 核 | 4 核或以上 |
| 内存 | 4 GB | 8 GB 或以上 |
| 磁盘空间 | 500 MB | 1 GB 或以上 |
| 操作系统 | Linux/macOS/Windows | Linux/macOS |
Python 版本要求
Pathway 需要 Python 3.10 或更高版本。建议使用 Python 3.11 以获得最佳性能。
| Python 版本 | 支持状态 |
|---|---|
| 3.10 | ✅ 支持 |
| 3.11 | ✅ 推荐 |
| 3.12 | ✅ 支持 |
安装方式
使用 pip 安装
通过 pip 包管理器安装 Pathway 是最简单的方式:
pip install pathway
从源码安装
对于需要最新功能或进行开发调试的用户,可以从源码编译安装:
# 克隆仓库
git clone https://github.com/pathwaycom/pathway.git
cd pathway
# 安装 Rust 依赖和 Python 包
pip install -e .
依赖环境
Rust 工具链
Pathway 的核心引擎使用 Rust 编写,需要 Rust 工具链支持。
#### 安装 Rust
# Linux/macOS
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Windows - 使用 rustup 安装程序
winget install Rustlang.Rustup
#### 验证 Rust 安装
rustc --version
cargo --version
项目使用 rust-toolchain.toml 文件指定 Rust 版本,确保编译器版本一致性。
Python 依赖
Pathway 的 Python 依赖在 pyproject.toml 中定义,主要包括:
| 依赖包 | 版本要求 | 用途 |
|---|---|---|
| pyo3 | 最新稳定版 | Python 绑定 |
| timely-dataflow | 外部依赖 | 数据流处理 |
| differential-dataflow | 外部依赖 | 增量计算 |
环境配置
环境变量
Pathway 运行可能需要的可选环境变量:
# 设置 Pathway 许可证密钥(商业功能需要)
export PATHWAY_LICENSE_KEY=your_pathway_key
# 可选:设置 Python 路径
export PYTHONPATH=/path/to/pathway:$PYTHONPATH
验证安装
安装完成后,可以通过以下方式验证:
import pathway as pw
# 打印版本信息
print(pw.__version__)
# 创建一个简单的数据流测试
input_session = pw.demo.demo_reduce_operator(show_progress=False)
print("Pathway 安装成功!")
平台特定说明
Linux
Linux 环境需要确保以下系统包已安装:
# Debian/Ubuntu
sudo apt-get update
sudo apt-get install -y python3-dev python3-pip cargo
macOS
macOS 用户建议使用 Homebrew 安装依赖:
brew install python rust
pip3 install pathway
Windows
Windows 用户需要安装 Visual C++ Build Tools 以编译 Rust 依赖:
- 下载并安装 Visual Studio Build Tools
- 确保安装 "C++ 生成工具" 工作负载
- 通过 PowerShell 或命令提示符安装 Pathway
常见问题
编译错误
如果遇到编译错误,确保 Rust 工具链是最新版本:
rustup update
cargo clean
pip install --force-reinstall pathway
导入错误
如果遇到 ModuleNotFoundError,检查 Python 路径和安装状态:
pip show pathway
python -c "import pathway; print(pathway.__file__)"
下一步
安装完成后,你可以开始使用 Pathway:
来源:https://github.com/pathwaycom/pathway / 项目说明书
基础示例
Pathway 是一个 Python ETL 框架,用于流处理、实时分析、LLM 管道和 RAG(检索增强生成)应用开发。本文档介绍 Pathway 的基础概念、核心组件和入门示例。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Pathway 由 Python 前端和 Rust 后端引擎组成,通过 PyO3 实现 Python 与 Rust 的互操作。Python API 提供了简洁易用的接口,而 Rust 引擎基于 Differential Dataflow 进行增量计算,确保高性能和一致性。
资料来源:README.md:1-15
核心概念
数据流架构
graph TD
A[数据源 Connectors] --> B[PythonSubject 输入]
B --> C[Rust 引擎处理]
C --> D[增量计算 Differential Dataflow]
D --> E[PyExportedTable 输出]
E --> F[快照 Snapshot]Pathway 的核心数据流遵循:数据源 → PythonSubject → Rust 引擎 → 增量计算 → ExportedTable 的模式。
PythonSubject 核心类
PythonSubject 是 Python 端的数据输入接口,用于定义数据源的读取行为:
| 参数 | 类型 | 说明 |
|---|---|---|
start | Py<PyAny> | 起始处理函数 |
read | Py<PyAny> | 读取数据函数 |
seek | Py<PyAny> | 定位函数 |
on_persisted_run | Py<PyAny> | 持久化运行回调 |
end | Py<PyAny> | 结束处理函数 |
is_internal | bool | 是否为内部数据源 |
deletions_enabled | bool | 是否启用删除 |
ValueField 模式字段
ValueField 定义表的 schema 字段结构:
| 属性 | 类型 | 说明 |
|---|---|---|
name | String | 字段名称 |
type_ | Type | 数据类型 |
source | FieldSource | 字段来源(默认 Payload) |
default | Option<Value> | 默认值 |
metadata | Option<String> | 元数据 |
PyExportedTable 导出表
PyExportedTable 提供数据输出和快照功能:
| 方法 | 返回类型 | 说明 |
|---|---|---|
frontier() | TotalFrontier<Timestamp> | 获取当前前沿 |
snapshot_at(frontier) | Vec<(Key, Vec<Value>)> | 在指定前沿获取快照 |
failed() | bool | 检查处理是否失败 |
资料来源:src/python_api.rs:106-115
Done 标记值
Done 是时间线末端的标记值,用于表示计算完成状态。它是一个冻结的 Python 对象,具有特殊的比较和哈希行为:
- 与
Done比较时返回Equal - 与整数比较时返回
Greater - 支持自定义哈希实现
资料来源:src/python_api.rs:176-196
时间戳与前沿
Timestamp 时间戳
Pathway 使用 Timestamp 进行时间管理,实现 OriginalOrRetraction 和 NextRetractionTime trait:
graph LR
A[原始数据] --> B[偶数时间戳]
A --> C[ retraction 数据]
C --> D[奇数时间戳]
D --> E[下一 retraction 时间]is_original(): 偶数时间戳表示原始数据next_retraction_time(): 计算下一个 retraction 时间(当前时间 + 1)
资料来源:src/engine/timestamp.rs:45-55
TotalFrontier 前沿状态
TotalFrontier<T> 是表示计算前沿的枚举类型:
| 变体 | 说明 |
|---|---|
At(T) | 在指定时间点 |
Done | 计算已完成 |
该类型实现了 FromPyObject 和 IntoPyObject,支持 Python 和 Rust 间的无缝转换。
资料来源:src/python_api.rs:140-175
基础使用模式
典型项目结构
project/
├── app.py # 主程序
├── data/ # 数据目录
│ ├── docs/ # 文档输入
│ └── output/ # 结果输出
├── .env # 环境变量
└── requirements.txt # 依赖
资料来源:examples/projects/question-answering-rag/README.md:1-30
环境配置
安装 Pathway:
pip install pathway[xpack-llm]
设置环境变量:
export PATHWAY_LICENSE_KEY=your_pathway_key
或创建 .env 文件:
OPENAI_API_KEY=your_openai_api_key_here
资料来源:examples/projects/question-answering-rag/README.md:25-30
运行应用
直接运行 Python 脚本:
python app.py
使用 Pathway CLI:
pathway spawn python main.py
多线程运行:
pathway spawn --threads 3 python main.py
资料来源:README.md:120-135
HTTP 服务集成
Pathway 提供 HTTP 服务器用于接收查询和返回结果:
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)
提交查询:
curl --data '{ "messages": "What is the value of X?"}' http://localhost:8011
资料来源:examples/projects/question-answering-rag/README.md:15-20
注册的 Python API 类
Pathway 通过 add_class 注册以下核心类到 pathway.engine 模块:
| 类别 | 类名 |
|---|---|
| 连接器设置 | AwsS3Settings, AzureBlobStorageSettings, MqttSettings, PsqlReplicationSettings |
| 解析器 | CsvParserSettings, PySchemaRegistrySettings |
| 存储格式 | DataStorage, DataFormat, PersistenceConfig |
| 索引 | IcebergCatalogSettings, ElasticSearchParams |
| 内部类 | PythonSubject, ValueField, PyExportedTable, Done, Trace |
| 配置 | ConnectorProperties, ColumnProperties, TableProperties, TelemetryConfig |
| 快照 | PySnapshotAccess, PySnapshotEvent, PyPersistenceMode |
| 向量索引 | PyExternalIndexFactory, PyExternalIndexData, PyExternalIndexQuery |
资料来源:src/python_api.rs:220-250
文档与支持
完整文档访问:pathway.com/developers/
如有问题,可通过以下方式获取帮助:
资料来源:examples/templates/el-pipeline/README.md:45-50
下一步
- 学习 Pathway 的连接器配置
- 了解实时数据处理的高级特性
- 探索 RAG 和 LLM 集成的最佳实践
- 查看
examples/projects/目录下的完整示例项目
资料来源:[README.md:1-15]()
引擎架构
Pathway 的引擎架构是一个基于 Rust 的高性能计算引擎,构建于 Differential Dataflow 和 Timely Dataflow 之上。这一架构使得 Pathway 能够以 Python API 的形式提供简洁易用的接口,同时在底层实现增量计算和流式处理。引擎的核心设计理念是将 Python 层的灵活表达与 Rust 引擎的高性能执行相结合,通过 P...
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Pathway 的引擎架构是一个基于 Rust 的高性能计算引擎,构建于 Differential Dataflow 和 Timely Dataflow 之上。这一架构使得 Pathway 能够以 Python API 的形式提供简洁易用的接口,同时在底层实现增量计算和流式处理。引擎的核心设计理念是将 Python 层的灵活表达与 Rust 引擎的高性能执行相结合,通过 PyO3 实现 Python 与 Rust 之间的无缝桥接。
Pathway 引擎支持多种数据类型和处理模式,包括静态数据(batch)和动态数据(streaming),能够在同一个代码库中处理历史数据和实时数据流。引擎内置的增量计算机制确保只有发生变化的数据才会被重新处理,从而大幅提升计算效率。
核心架构组件
计算引擎层次结构
Pathway 引擎采用三层架构设计,从上到下依次为:
| 层次 | 组件 | 职责 |
|---|---|---|
| Python API 层 | pathway.engine 模块 | 提供 Python 接口,暴露核心类和函数 |
| PyO3 绑定层 | src/python_api.rs | 实现 Python 对象到 Rust 对象的转换 |
| Rust 核心层 | src/engine/ | 实现 Differential Dataflow 计算引擎 |
核心类结构
Pathway 引擎通过 PyO3 将以下核心 Rust 结构暴露给 Python:
# Python API 层暴露的核心类
Table # 数据表,表示一组带时间戳的记录
Column # 列,表示数据表中的一列
Universe # 宇宙,表示一组唯一标识符
Context # 上下文,管理数据流计算的执行环境
Scope # 作用域,定义计算的数据流范围
DataRow # 数据行,表示表中的一行数据
Computer # 计算器,定义如何计算派生列
数据流计算模型
Differential Dataflow 集成
Pathway 引擎的核心计算模型基于 Differential Dataflow,这是一种用于增量数据流计算的编程模型。Differential Dataflow 能够高效处理数据的插入、更新和删除操作,通过追踪数据变化而非重新计算整个结果来实现高性能。
graph TD
A[输入数据流] --> B[Input Session]
B --> C[Differential Collection]
C --> D[Arrangement]
D --> E[Operators<br/>join/reduce/count]
E --> F[输出 Collection]
F --> G[Incremental 结果]
H[变化追踪] --> C资料来源:external/differential-dataflow/src/operators/reduce.rs:1-50
时间戳与版本管理
Pathway 使用复合时间戳来追踪数据变化,每个数据记录都带有时间戳和差异值(multiplicity)。时间戳必须实现 Lattice trait,支持部分有序比较,这使得系统能够正确处理并发和乱序数据。
// 时间戳必须是 Lattice,用于支持增量计算的合并
impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G, T1>
where
G: Scope,
G::Timestamp: Lattice+Ord+Debug,
// ...
资料来源:external/differential-dataflow/src/operators/join.rs:1-30
数据流图结构
Pathway 引擎内部维护一个数据流图(Dataflow Graph),用于协调各个算子之间的数据传递:
graph LR
subgraph DataflowGraphInner
A[BeforeIterate] --> B[Scope]
B --> C[Child Scope]
C --> D[Operators]
D --> E[AfterIterate]
end
F[持久化数据] --> A
A --> G[表创建]
G --> D资料来源:src/engine/dataflow.rs:1-60
核心算子实现
Join 算子
Join 是 Pathway 中最常用的算子之一,用于合并两个数据集。引擎使用 join_core_internal_unsafe 方法实现高效的连接操作:
// Join 操作的核心实现
fn join_core<Tr2,I,L>(&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
// ... 配置连接参数
{
self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}
资料来源:external/differential-dataflow/src/operators/join.rs:20-35
Reduce 算子
Reduce 算子用于对数据进行聚合操作,支持自定义归约器实现:
impl<S: MaybeTotalScope, R: ReducerImpl> DataflowReducer<S> for R
where
Collection<S, (Key, Option<<R as ReducerImpl>::State>)>:
Into<PersistableCollection<S>> + From<PersistableCollection<S>>,
{
fn reduce(
self: Rc<Self>,
values: &Collection<S, (Key, Key, Vec<Value>)>,
error_logger: Rc<dyn LogError>,
_trace: Trace,
graph: &mut DataflowGraphInner<S>,
) -> Result<Values<S>> {
// 实现聚合逻辑
}
}
资料来源:src/engine/dataflow.rs:80-110
Count 算子
Count 是 Differential Dataflow 内置的聚合操作,用于计算每个键的出现次数:
// Count 扩展 trait 实现
pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
fn count(&self) -> Collection<G, (K, R), isize> {
self.count_core()
}
fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(
"Count",
|_k, s, t| t.push((s[0].1.clone(), R2::from(1i8)))
).as_collection(|k, c| (k.clone(), c.clone()))
}
}
资料来源:external/differential-dataflow/src/operators/reduce.rs:30-80
Arrangement 数据结构
Arrangement 概念
Arrangement 是 Differential Dataflow 中的核心数据结构,用于组织和索引数据以支持高效的后续操作。Arrangement 本质上是一个按 key 排序的 trace 结构:
graph TD
A[输入数据] --> B[ArrangeByKey]
B --> C[Trace Agent]
C --> D[Trace Spine]
D --> E[多个 Batch]
F[查询] --> G[Cursor 遍历]
G --> D资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:1-40
Trace Writer 实现
Trace Writer 负责向 arrangement 中写入数据批次:
pub fn new(
upper: Vec<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self {
let mut temp = Antichain::new();
temp.extend(upper.into_iter());
Self { upper: temp, trace, queues }
}
pub fn exert(&mut self, fuel: &mut isize) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert(fuel);
}
}
资料来源:external/differential-dataflow/src/operators/arrange/writer.rs:1-40
Trace Freeze 包装器
Trace Freeze 是一个特殊的 trace 包装器,用于在特定条件下冻结时间:
impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
where
Tr: TraceReader,
Tr::Time: Lattice+Clone+'static,
F: Fn(&Tr::Time)->Option<Tr::Time>+'static,
{
type Key = Tr::Key;
type Val = Tr::Val;
type Time = Tr::Time;
type R = Tr::R;
type Batch = BatchFreeze<Tr::Batch, F>;
type Cursor = CursorFreeze<Tr::Cursor, F>;
}
资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:1-50
增量计算机制
变化批次处理
Pathway 使用 ChangeBatch 来追踪数据的变化,每个变化包含时间戳和差异值:
pub fn into_inner(mut self) -> Vec<(T, i64)> {
self.compact();
self.updates
}
pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
self.compact();
self.updates.iter()
}
资料来源:external/timely-dataflow/timely/src/progress/change_batch.rs:60-90
持久化与状态管理
引擎支持对计算状态进行持久化,通过 PersistenceConfig 配置持久化行为:
| 配置项 | 类型 | 说明 |
|---|---|---|
persistence_mode | PyPersistenceMode | 持久化模式(禁用/异步/同步) |
snapshot_access | PySnapshotAccess | 快照访问策略 |
snapshot_event | PySnapshotEvent | 快照触发事件 |
Python 绑定实现
PyO3 类导出
Pathway 通过 PyO3 将 Rust 结构导出为 Python 类:
#[pymethods]
impl ValueField {
#[new]
#[pyo3(signature = (name, type_, source = FieldSource::Payload))]
pub fn new(name: String, type_: Type, source: FieldSource) -> Self {
Self {
name,
type_,
source,
}
}
}
资料来源:src/python_api.rs:140-160
PythonSubject 绑定
PythonSubject 允许 Python 代码定义自定义的数据源:
#[pyclass(module = "pathway.engine")]
pub struct PythonSubject {
pub start: Py<PyAny>,
pub read: Py<PyAny>,
pub seek: Py<PyAny>,
pub on_persisted_run: Py<PyAny>,
pub end: Py<PyAny>,
pub is_internal: bool,
pub deletions_enabled: bool,
}
数据模型
表结构
| 组件 | 说明 |
|---|---|
TableProperties | 表级别的属性配置 |
ColumnProperties | 列级别的属性配置 |
ConnectorProperties | 连接器配置 |
Trace | 追踪配置 |
Done | 完成标记 |
值字段定义
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub type_: Type,
#[pyo3(get)]
pub source: FieldSource,
#[pyo3(get)]
pub default: Option<Value>,
#[pyo3(get)]
pub metadata: Option<String>,
}
资料来源:src/python_api.rs:145-165
关键配置选项
连接器模式
| 模式 | 说明 |
|---|---|
STATIC | 静态数据源,读取一次 |
STREAMING | 流式数据源,持续监听 |
REPLAY | 回放模式,重演历史数据 |
监控级别
| 级别 | 说明 |
|---|---|
OFF | 禁用监控 |
BASIC | 基础指标 |
DETAILED | 详细指标 |
工作流程图
graph TD
A[Python 代码定义数据流] --> B[PyO3 调用 Rust 引擎]
B --> C[创建 DataflowGraph]
C --> D[初始化 Scope 和 Universe]
D --> E[注册数据源和算子]
E --> F[启动数据流执行]
F --> G{数据输入模式}
G -->|批量| H[Batch Input]
G -->|流式| I[Stream Input]
H --> J[计算 operators]
I --> J
J --> K[生成 Arrangement]
K --> L[输出结果]
J --> M{状态变化}
M -->|有变化| N[触发增量计算]
M -->|无变化| O[跳过重算]
N --> K总结
Pathway 的引擎架构通过深度整合 Differential Dataflow 和 Timely Dataflow,实现了一个高性能、可扩展的增量计算引擎。核心设计特点包括:
- 双层语言架构:Python API 提供易用性,Rust 引擎保证性能
- 增量计算模型:通过时间戳和差异值追踪变化,避免重复计算
- 灵活的 Arrangement:支持高效的 join、reduce 等操作
- 完整的持久化支持:内置状态管理和快照机制
- 丰富的连接器:支持多种数据源和输出目标
这种架构使 Pathway 能够同时满足开发和生产环境的需求,支持从本地开发到大规模分布式部署的多种场景。
资料来源:[src/python_api.rs:1-50]()
Python与Rust集成
Pathway是一个基于Python的ETL框架,但其核心计算引擎由Rust实现。这种架构通过PyO3库实现Python与Rust的无缝集成,使得开发者可以使用简洁的Python API编写数据处理逻辑,同时享受Rust带来的高性能和内存安全性。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
技术架构概述
Pathway采用双层架构设计:上层是用户编写的Python代码,下层是Rust实现的增量计算引擎。Differential Dataflow算法驱动核心数据流处理,而Timely Dataflow提供分布式计算能力。
graph TD
A[Python用户代码] --> B[PyO3绑定层]
B --> C[Rust引擎核心]
C --> D[Differential Dataflow]
C --> E[Timely Dataflow]
D --> F[增量计算]
E --> G[分布式数据流]
F --> H[结果返回Python]
G --> HPyO3绑定机制
Pathway使用PyO3库实现Python与Rust之间的双向通信。PyO3提供了从Rust向Python暴露类型和函数的能力,同时也支持从Python代码调用Rust函数。
核心数据结构绑定
#### PythonSubject结构体
PythonSubject是Pathway中用于连接外部数据源的关键结构体,它将Python端的回调函数包装为Rust可调用的对象:
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
pub start: Py<PyAny>,
pub read: Py<PyAny>,
pub seek: Py<PyAny>,
pub on_persisted_run: Py<PyAny>,
pub end: Py<PyAny>,
pub is_internal: bool,
pub deletions_enabled: bool,
}
| 参数 | 类型 | 说明 |
|---|---|---|
start | Py<PyAny> | 起始位置回调函数 |
read | Py<PyAny> | 读取数据回调函数 |
seek | Py<PyAny> | 定位回调函数 |
on_persisted_run | Py<PyAny> | 持久化运行回调 |
end | Py<PyAny> | 结束位置回调 |
is_internal | bool | 是否为内部数据源 |
deletions_enabled | bool | 是否启用删除操作 |
#### ValueField结构体
ValueField用于定义表字段的元数据,包括字段名、类型、来源和默认值:
#[pyclass(module = "pathway.engine", frozen)]
#[derive(Clone)]
pub struct ValueField {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub type_: Type,
#[pyo3(get)]
pub source: FieldSource,
#[pyo3(get)]
pub default: Option<Value>,
#[pyo3(get)]
pub metadata: Option<String>,
}
PyO3类型转换
Pathway实现了Python与Rust之间的自动类型转换,通过IntoPyObject和FromPyObjecttrait实现。
#### Timestamp类型转换
impl<'py> IntoPyObject<'py> for Timestamp {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
self.0.into_bound_py_any(py)
}
}
资料来源:src/engine/timestamp.rs:40-50
#### TotalFrontier枚举转换
TotalFrontier表示计算的前沿边界,包含At和Done两种状态:
impl<'py, T> FromPyObject<'py> for TotalFrontier<T>
where
T: FromPyObject<'py>,
{
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
if ob.is_instance_of::<Done>() {
Ok(TotalFrontier::Done)
} else {
Ok(TotalFrontier::At(ob.extract()?))
}
}
}
资料来源:src/python_api.rs:180-192
错误处理机制
Pathway通过PyO3暴露多种异常类型供Python端捕获和处理。
异常类型体系
| 异常类型 | 说明 | 使用场景 |
|---|---|---|
EngineError | 基础引擎错误 | 通用错误处理 |
EngineErrorWithTrace | 带调用栈的错误 | 调试和问题定位 |
OtherWorkerError | 其他工作进程错误 | 分布式环境错误处理 |
Error | 通用错误类型 | 自定义错误 |
Pending | 操作未完成状态 | 异步操作状态查询 |
资料来源:src/python_api.rs:200-230
表导出与快照机制
PyExportedTable导出表
PyExportedTable允许Python代码访问Rust引擎计算的表数据:
#[pyclass(module = "pathway.engine", frozen, name = "ExportedTable")]
pub struct PyExportedTable {
inner: Arc<dyn ExportedTable>,
}
#[pymethods]
impl PyExportedTable {
fn frontier(&self) -> TotalFrontier<Timestamp> {
self.inner.frontier()
}
fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
self.inner.snapshot_at(frontier)
}
fn failed(&self) -> bool {
self.inner.failed()
}
}
资料来源:src/python_api.rs:115-135
Done标记类型
Done是一个特殊的冻结类型,用于表示计算已完成的状态:
mod done {
use once_cell::sync::Lazy;
use pyo3::prelude::*;
#[pyclass(module = "pathway.engine", frozen)]
pub struct Done(InnerDone);
pub static DONE: Lazy<Py<Done>> = Lazy::new(|| {
Python::with_gil(|py| Py::new(py, Done(InnerDone)).expect("creating DONE should not fail"))
});
}
资料来源:src/python_api.rs:165-177
计算上下文
Context作用域管理
Context结构体提供了当前计算行的上下文信息:
#[pyclass(module = "pathway.engine", frozen)]
pub struct Context(SendWrapper<ScopedContext>);
#[pymethods]
impl Context {
#[getter]
fn this_row(&self) -> PyResult<Key> {
self.0
.with(|context| context.this_row())
.ok_or_else(|| PyValueError::new_err("context out of scope"))
}
}
资料来源:src/python_api.rs:235-250
Computer计算器
Computer用于执行用户定义的Python函数:
#[pyclass]
pub struct Computer {
pub fun: Py<PyAny>,
pub dtype: Py<PyAny>,
pub is_output: bool,
pub is_method: bool,
pub universe: Py<Universe>,
pub data: Value,
pub data_column: Option<Py<Column>>,
}
#[pymethods]
impl Computer {
#[staticmethod]
#[pyo3(signature = (fun, dtype, is_output, is_method, universe, data = Value::None, data_column = None))]
pub fn from_raising_fun(
py: Python,
fun: Py<PyAny>,
dtype: Py<PyAny>,
is_output: bool,
is_method: bool,
universe: Py<Universe>,
data: Value,
data_column: Option<Py<Column>>,
) -> PyResult<Py<Self>> { ... }
}
资料来源:src/python_api.rs:260-290
Differential Dataflow集成
Arrangement导入机制
Pathway通过Differential Dataflow的arrangement机制在数据流之间共享计算结果:
/// Same as `import`, but allows to name the source.
pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr::Time: Timestamp,
{
// Drop ShutdownButton and return only the arrangement.
self.import_core(scope, name).0
}
资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:35-45
Trace Freeze包装器
TraceFreeze包装器允许延迟计算trace直到被使用时才执行:
pub struct TraceFreeze<Tr, F>
where
Tr: TraceReader,
Tr::Time: Lattice+Clone+'static,
F: Fn(&Tr::Time)->Option<Tr::Time>,
{
trace: Tr,
func: Rc<F>,
}
资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:15-21
时间戳与顺序语义
Timestamp特征实现
Pathway实现了Differential Dataflow所需的时间戳特征:
impl OriginalOrRetraction for Timestamp {
fn is_original(&self) -> bool {
self.0.is_multiple_of(2)
}
}
impl NextRetractionTime for Timestamp {
fn next_retraction_time(&self) -> Self {
Self(self.0 + 1)
}
}
资料来源:src/engine/timestamp.rs:60-70
时间戳顺序关系
Timestamp支持时间戳间的偏序关系计算:
pub fn followed_by(&self, other: &Self) -> Option<Self> {
self.0.followed_by(&other.0).map(Self)
}
注册的Python类
Pathway通过add_class方法将以下Rust结构体暴露给Python:
| 类名 | 功能 |
|---|---|
AwsS3Settings | AWS S3连接配置 |
AzureBlobStorageSettings | Azure Blob存储配置 |
ElasticSearchParams | Elasticsearch参数 |
CsvParserSettings | CSV解析设置 |
DataStorage | 数据存储配置 |
DataFormat | 数据格式定义 |
PersistenceConfig | 持久化配置 |
PythonSubject | Python数据源主题 |
MqttSettings | MQTT连接设置 |
IcebergCatalogSettings | Iceberg目录配置 |
ConnectorProperties | 连接器属性 |
ColumnProperties | 列属性 |
TableProperties | 表属性 |
PyExportedTable | 导出表 |
PyExternalIndexFactory | 外部索引工厂 |
PyUSearchMetricKind | USearch度量类型 |
PyBruteForceKnnMetricKind | 暴力KNN度量类型 |
资料来源:src/python_api.rs:300-330
数据流处理流程
sequenceDiagram
participant Python用户代码
participant PyO3绑定层
participant Rust引擎
participant DifferentialDataflow
participant TimelyDataflow
Python用户代码->>PyO3绑定层: 调用Python API
PyO3绑定层->>Rust引擎: 传递计算请求
Rust引擎->>DifferentialDataflow: 创建数据流操作符
DifferentialDataflow->>TimelyDataflow: 分布式执行
TimelyDataflow-->>DifferentialDataflow: 返回计算结果
DifferentialDataflow-->>Rust引擎: 增量更新结果
Rust引擎-->>PyO3绑定层: 转换结果类型
PyO3绑定层-->>Python用户代码: 返回Python对象多工作进程支持
Pathway支持通过Timely Dataflow的多工作进程实现并行计算。可以通过命令行参数指定工作进程数量:
cargo run --example wordcount -- -w2
这将启动两个工作进程并行处理数据,每个进程独立消费输入数据并产生部分结果。
资料来源:external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md:1-25
总结
Pathway的Python与Rust集成架构充分发挥了两种语言的优势:
- Python端:提供简洁的声明式API,开发者使用Python编写数据处理逻辑
- Rust端:实现高性能的增量计算引擎,基于Differential Dataflow提供高效的流式处理
- PyO3:作为桥梁,实现Python与Rust之间的无缝类型转换和函数调用
- Timely Dataflow:提供分布式计算能力,支持多工作进程并行处理
资料来源:[src/python_api.rs:1-15]()
连接器概述
Pathway 是一个 Python ETL 框架,专为流处理、实时分析、LLM 管道和 RAG(检索增强生成)而设计 资料来源:README.md。连接器(Connector)是 Pathway 数据管道中的核心组件,负责数据的输入与输出操作。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
1. 概述
Pathway 是一个 Python ETL 框架,专为流处理、实时分析、LLM 管道和 RAG(检索增强生成)而设计 资料来源:README.md。连接器(Connector)是 Pathway 数据管道中的核心组件,负责数据的输入与输出操作。
Pathway 连接器系统具有以下特点:
- 多源数据支持:支持文件系统、数据库、云存储、消息队列等多种数据源
- 实时与批处理:同一套 API 支持静态批处理和流式实时处理
- Rust 引擎驱动:底层由 Rust 编写的可扩展引擎驱动,基于 Differential Dataflow 实现增量计算
- Python 友好:通过 Python API 提供简洁易用的接口
2. 连接器架构
2.1 核心组件关系
graph TD
subgraph "Python API 层"
IO["pw.io 模块"]
DS["DataSource"]
DK["DataSink"]
end
subgraph "Rust 引擎层"
CP["ConnectorProperties"]
CM["ConnectorMode"]
Engine["Pathway Engine"]
end
subgraph "数据源类型"
S3["AWS S3"]
Azure["Azure Blob"]
CSV["CSV Parser"]
MQTT["MQTT"]
Kafka["Kafka/Confluent"]
end
IO --> DS
IO --> DK
DS --> CP
DK --> CP
CP --> CM
CM --> Engine
S3 --> CP
Azure --> CP
CSV --> CP
MQTT --> CP2.2 连接器模式
Pathway 支持两种连接器模式,用于区分数据处理方式 资料来源:src/python_api.rs:320-329:
| 模式 | 枚举值 | 说明 |
|---|---|---|
| 静态模式 | ConnectorMode.STATIC | 批处理模式,一次性加载所有数据 |
| 流式模式 | ConnectorMode.STREAMING | 实时流处理,持续监听数据变化 |
#[pymethods]
impl PyConnectorMode {
#[classattr]
pub const STATIC: ConnectorMode = ConnectorMode::Static;
#[classattr]
pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}
3. 数据源(DataSource)
数据源是连接器系统中负责从外部系统读取数据的组件。在 Pathway 中,数据源通过 Python Subject 接口与 Rust 引擎交互。
3.1 PythonSubject 结构
PythonSubject 是 Python 层与 Rust 引擎之间的桥梁,封装了数据读取所需的所有回调函数 资料来源:src/python_api.rs:15-39:
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
pub start: Py<PyAny>, // 初始化回调
pub read: Py<PyAny>, // 读取数据回调
pub seek: Py<PyAny>, // 寻址回调(可选)
pub on_persisted_run: Py<PyAny>, // 持久化运行回调
pub end: Py<PyAny>, // 结束回调
pub is_internal: bool, // 是否为内部数据源
pub deletions_enabled: bool, // 是否启用删除操作
}
3.2 数据读取流程
sequenceDiagram
participant App as 应用代码
participant DS as DataSource
participant Subject as PythonSubject
participant Engine as Pathway Engine
App->>DS: 创建数据源实例
DS->>Subject: 调用 start() 初始化
Engine->>Subject: 调用 read() 读取数据
Subject-->>Engine: 返回数据批次
loop 流式处理
Engine->>Subject: 调用 read() 轮询新数据
Subject-->>Engine: 返回更新数据
end
Engine->>Subject: 调用 end() 清理资源4. 数据输出(DataSink)
数据 Sink 负责将处理后的数据输出到外部系统。Pathway 提供了多种输出连接器,包括文件系统、数据库和消息队列等。
4.1 输出配置参数
| 参数 | 类型 | 说明 |
|---|---|---|
host | string | 服务器主机地址 |
port | int | 服务器端口 |
format | DataFormat | 输出数据格式 |
storage | DataStorage | 存储配置 |
create_table | bool | 是否创建输出表 |
5. 支持的连接器类型
5.1 云存储连接器
Pathway 原生支持主流云存储服务:
| 服务 | 设置类 | 说明 |
|---|---|---|
| AWS S3 | AwsS3Settings | 亚马逊 S3 对象存储 |
| Azure Blob | AzureBlobStorageSettings | 微软 Azure Blob 存储 |
5.2 消息队列连接器
| 消息系统 | 设置类 | 说明 |
|---|---|---|
| MQTT | MqttSettings | IoT 场景常用的轻量级消息协议 |
| Kafka | SchemaRegistrySettings | Confluent Schema Registry 集成 |
5.3 数据库连接器
| 数据库 | 设置类 | 说明 |
|---|---|---|
| PostgreSQL | PsqlReplicationSettings | 支持 CDC(变更数据捕获)复制 |
5.4 其他专用连接器
- ElasticSearch:
ElasticSearchParams/ElasticSearchAuth— 全文搜索引擎集成 - Iceberg:
IcebergCatalogSettings— Apache Iceberg 表格式支持 - Schema Registry:
PySchemaRegistrySettings— Avro/Protobuf Schema 管理
5.5 解析器设置
m.add_class::<CsvParserSettings>()?;
m.add_class::<ValueField>()?;
m.add_class::<DataStorage>()?;
m.add_class::<DataFormat>()?;
6. 连接器配置属性
6.1 ConnectorProperties
连接器属性类定义了数据源和输出端的通用配置选项 资料来源:src/python_api.rs:450-465:
| 属性 | 说明 |
|---|---|
columns | 列定义列表 |
primary_key | 主键列 |
date_format | 日期格式字符串 |
value_fields | 值字段列表 |
6.2 ColumnProperties
列属性用于定义单个列的元数据:
pw.ColumnProperties(
name="column_name",
type_=pw.Type,
source=pw.FieldSource.Payload,
default=None,
metadata=None
)
7. 使用示例
7.1 RAG 应用中的连接器使用
在问答型 RAG 应用中,Pathway 连接器用于实时索引文档并提供检索服务 资料来源:examples/projects/question-answering-rag/README.md:
import pathway as pw
# 创建 HTTP 服务器用于接收查询
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)
# 数据源:文件系统连接器监听 ./data/ 目录
# 文档被分块、向量化后存储
# 输出:通过 REST API 提供 /v1/retrieve 检索接口
7.2 多智能体 RAG 中的连接器
在多智能体 RAG 架构中,连接器同时承担文档索引和智能体间通信的任务 资料来源:examples/projects/ag2-multiagent-rag/README.md:
graph LR
A["Documents (./data/)"] -->|实时索引| B["Pathway VectorStoreServer"]
B -->|HTTP /v1/retrieve| C["AG2 Researcher Agent"]
C -->|search_documents| B
B -->|检索结果| D["AG2 Analyst Agent"]
D -->|综合回答| E["用户"]8. 连接器的高级特性
8.1 持久化配置
Pathway 支持连接器级别的持久化配置,用于故障恢复和状态管理:
m.add_class::<PersistenceConfig>()?;
m.add_class::<PyPersistenceMode>()?;
m.add_class::<PySnapshotAccess>()?;
m.add_class::<PySnapshotEvent>()?;
8.2 会话类型
Pathway 引擎支持不同类型的会话处理 资料来源:src/python_api.rs:334-345:
| 会话类型 | 枚举值 | 使用场景 |
|---|---|---|
| 原生会话 | SessionType.NATIVE | 标准批处理和流处理 |
| UPSERT 会话 | SessionType.UPSERT | 增量更新场景 |
8.3 监控与追踪
Pathway 提供了完善的监控机制,包括:
- TelemetryConfig:遥测数据收集配置
- BackfillingThreshold:回填数据阈值控制
- EngineTrace:引擎执行追踪
9. 最佳实践
- 选择合适的连接器模式:静态数据使用
STATIC模式,实时数据使用STREAMING模式 - 配置主键:为数据源设置主键以支持增量更新和去重
- 设置超时和重试:生产环境应配置适当的重试策略
- 监控连接器状态:使用 Pathway Dashboard 监控消息数量和延迟
- 处理删除操作:根据业务需求通过
deletions_enabled参数控制是否处理数据删除
10. 相关文档
来源:https://github.com/pathwaycom/pathway / 项目说明书
数据库连接器
Pathway 的数据库连接器模块提供了与各类数据库系统交互的能力,支持从外部数据源读取数据并将处理结果写入目标数据库。作为 Pathway ETL 框架的核心组件之一,数据库连接器实现了实时的增量数据同步机制,能够在流处理和批处理场景下保持数据的一致性和时效性。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Pathway 的数据库连接器模块提供了与各类数据库系统交互的能力,支持从外部数据源读取数据并将处理结果写入目标数据库。作为 Pathway ETL 框架的核心组件之一,数据库连接器实现了实时的增量数据同步机制,能够在流处理和批处理场景下保持数据的一致性和时效性。
Pathway 的数据库连接器基于 Rust 引擎构建,结合 Differential Dataflow 计算模型,实现了高效的增量计算能力。这意味着当源数据库中的数据发生变化时,连接器能够自动检测变更并仅处理发生变化的记录,而不是重新处理整个数据集。
架构设计
连接器类型体系
Pathway 在 src/python_api.rs 中定义了一套统一的连接器配置接口,通过 PyO3 绑定暴露给 Python 用户。连接器配置通过 ConnectorProperties、ColumnProperties 和 TableProperties 等类进行封装,支持对数据源的精细化配置。
m.add_class::<ConnectorProperties>()?;
m.add_class::<ColumnProperties>()?;
m.add_class::<TableProperties>()?;
资料来源:src/python_api.rs:行
连接器模式
Pathway 支持两种连接器运行模式,通过 ConnectorMode 枚举定义:
| 模式 | 说明 |
|---|---|
STATIC | 静态模式,适用于批处理场景,数据源仅读取一次 |
STREAMING | 流式模式,适用于实时数据处理,支持持续监听数据变更 |
#[pymethods]
impl PyConnectorMode {
#[classattr]
pub const STATIC: ConnectorMode = ConnectorMode::Static;
#[classattr]
pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}
资料来源:src/python_api.rs:行
支持的数据库类型
关系型数据库
Pathway 原生支持多种主流关系型数据库的连接:
- PostgreSQL:通过
PsqlReplicationSettings提供复制槽支持,用于捕获数据库变更 - MySQL:支持标准的读取连接配置
- Microsoft SQL Server:通过 MSSQL 连接器支持企业级应用
NoSQL 数据库
- MongoDB:支持文档型数据库的实时读取和写入
- Elasticsearch:提供全文检索场景下的数据索引能力
云存储与服务
Pathway 还集成了多种云服务的连接配置:
- AWS S3:
AwsS3Settings支持从 S3 存储桶读取数据文件 - Azure Blob Storage:
AzureBlobStorageSettings支持 Azure 云存储 - Iceberg Catalog:
IcebergCatalogSettings支持数据湖架构
m.add_class::<AwsS3Settings>()?;
m.add_class::<AzureBlobStorageSettings>()?;
m.add_class::<ElasticSearchParams>()?;
m.add_class::<IcebergCatalogSettings>()?;
m.add_class::<PsqlReplicationSettings>()?;
资料来源:src/python_api.rs:行
数据处理流程
graph TD
A[外部数据库] --> B[连接器读取层]
B --> C[变更捕获]
C --> D[Rust 引擎处理]
D --> E[增量计算]
E --> F[下游输出]
G[Schema 定义] --> D
H[连接配置] --> B读取流程
- 连接初始化:连接器根据配置参数建立与目标数据库的连接
- Schema 映射:通过
ValueField定义输入数据的字段映射和类型转换 - 变更捕获:在流式模式下持续监听数据变更事件
- 数据传输:变更数据通过 Pathway 引擎进行增量处理
写入流程
- 输出表定义:通过
is_output=True标记输出计算节点 - 目标配置:指定目标数据库的连接信息和写入策略
- 批量写入:引擎自动将处理结果批量写入目标系统
Schema 定义
Pathway 使用 ValueField 结构体定义数据的 Schema 映射:
#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub type_: Type,
#[pyo3(get)]
pub source: FieldSource,
#[pyo3(get)]
pub default: Option<Value>,
#[pyo3(get)]
pub metadata: Option<String>,
}
资料来源:src/python_api.rs:行
字段属性说明:
| 属性 | 类型 | 说明 |
|---|---|---|
| name | String | 字段名称 |
| type_ | Type | 数据类型 |
| source | FieldSource | 字段来源(Payload/Materialized) |
| default | Option<Value> | 默认值 |
| metadata | Option<String> | 元数据信息 |
时间戳与前沿管理
Pathway 通过 TotalFrontier<T> 管理数据处理的时间戳前沿,实现精确的增量计算:
#[pymethods]
impl PyExportedTable {
fn frontier(&self) -> TotalFrontier<Timestamp> {
self.inner.frontier()
}
fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
self.inner.snapshot_at(frontier)
}
}
资料来源:src/python_api.rs:行
TotalFrontier 有两种状态:
At(t):表示处理到达时间戳t,继续处理后续数据Done:表示数据处理完成,不再有新数据到达
impl<'py, T> FromPyObject<'py> for TotalFrontier<T> {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
if ob.is_instance_of::<Done>() {
Ok(TotalFrontier::Done)
} else {
Ok(TotalFrontier::At(ob.extract()?))
}
}
}
资料来源:src/python_api.rs:行
使用示例
基本配置流程
import pathway as pw
# 定义输入连接器
class InputSchema(pw.Schema):
id: int
value: str
# 配置 PostgreSQL 输入
input_connector = pw.io.postgres.read(
host="localhost",
port=5432,
database="mydb",
user="user",
password="password",
table="source_table",
schema=InputSchema,
mode=pw ConnectorMode.STREAMING
)
# 定义处理逻辑
result = input_connector.groupby(pw.this.id).reduce(
id=pw.this.id,
count=pw.reducers.count()
)
# 配置输出连接器
pw.io.postgres.write(
result,
host="localhost",
port=5432,
database="mydb",
user="user",
password="password",
table="output_table"
)
# 运行管道
pw.run()
与向量存储集成
Pathway 的数据库连接器可以与向量索引功能结合使用,实现 RAG(检索增强生成)场景:
graph LR
A[PostgreSQL/MySQL] --> B[Pathway 引擎]
B --> C[向量嵌入]
C --> D[向量索引]
D --> E[LLM 查询]监控与故障处理
Pathway 的 PyExportedTable 提供了内置的监控接口:
#[pymethods]
impl PyExportedTable {
fn failed(&self) -> bool {
self.inner.failed()
}
}
资料来源:src/python_api.rs:行
通过检查 failed() 状态,用户可以及时发现数据处理过程中的异常情况,并采取相应的恢复措施。
性能优化建议
- 批量处理:利用 Pathway 的增量计算特性,避免全量数据重处理
- 索引优化:确保源数据库的相关字段已建立索引
- 连接池:合理配置连接池大小以平衡资源使用和吞吐量
- Schema 精简:仅提取必要的字段,减少数据传输量
资料来源:[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
流数据连接器
Pathway的流数据连接器(Stream Connectors)是Framework与外部数据源之间进行实时数据交换的核心组件。这些连接器使得Pathway能够从Kafka、NATS、MQTT、RabbitMQ、Redpanda等消息队列系统实时读取数据,并将处理结果写入各种数据sink。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Pathway的流数据连接器(Stream Connectors)是Framework与外部数据源之间进行实时数据交换的核心组件。这些连接器使得Pathway能够从Kafka、NATS、MQTT、RabbitMQ、Redpanda等消息队列系统实时读取数据,并将处理结果写入各种数据sink。
Pathway的连接器系统构建在Differential Dataflow增量计算引擎之上,支持两种主要的运行模式:静态模式(Static)和流式模式(Streaming)。静态模式适用于批处理场景,而流式模式则专为实时数据处理设计,能够处理持续到来的数据更新。连接器系统还支持增量计算的语义,允许系统只处理自上次处理以来的变更,而不是重新处理整个数据集。
核心架构
连接器模式体系
Pathway定义了一套完整的连接器模式枚举,用于区分不同的数据处理语义:
class ConnectorMode(Enum):
STATIC = auto() # 静态批处理模式
STREAMING = auto() # 实时流处理模式
每种模式对应不同的数据处理策略。静态模式下,连接器假设数据源是有限的,处理完成后系统进入完成状态。流式模式下,连接器持续监听数据源,系统永远不会自然结束,除非外部终止。
PythonSubject核心接口
所有自定义数据源连接器都继承自PythonSubject基类,该类定义了数据源的核心接口:
class PythonSubject:
def __init__(
self,
start: Callable[[], None], # 启动数据源
read: Callable[[], Iterator[Row]], # 读取数据行
seek: Callable[[Any], None], # 定位到指定位置
on_persisted_run: Callable[[], None], # 持久化运行回调
end: Callable[[], bool], # 检查是否结束
is_internal: bool, # 是否内部数据源
deletions_enabled: bool # 是否启用删除
)
PythonSubject的设计体现了Pathway连接器的核心哲学:数据源被抽象为一个可以随时启动、读取、定位和检查完成状态的可迭代对象。这种设计使得连接器能够与Differential Dataflow的计算模型无缝集成。
字段定义与数据类型
ValueField类定义了表中每个字段的元信息,包括名称、类型、来源和默认值:
class ValueField:
name: str # 字段名称
type_: Type # 数据类型
source: FieldSource # 字段来源 (默认: Payload)
default: Optional[Value] # 默认值
metadata: Optional[str] # 元数据
FieldSource枚举标识了数据的来源类型,最常用的是FieldSource.Payload,表示数据来自消息的有效载荷。在复杂的流处理场景中,字段来源还可以标识时间戳、主键等特殊字段。
工作原理
数据流处理管道
Pathway的流数据连接器遵循一个清晰的数据处理管道:
graph TD
A[外部数据源] --> B[Connector启动]
B --> C[数据读取循环]
C --> D{数据有效?}
D -->|是| E[解析消息]
D -->|否| F[跳过/日志]
E --> G[转换为Pathway Row]
G --> H[输入到Differential Dataflow]
H --> I[增量计算]
I --> J[结果输出到Sink]
J --> C
F --> C增量计算与时间戳
Pathway的连接器系统深度集成了Differential Dataflow的增量计算能力。每个数据更新都带有时间戳,系统只处理自上次检查点以来的变更。Timestamp类型实现了OriginalOrRetraction特性,用于区分原始数据和撤回数据:
class Timestamp:
def is_original(self) -> bool:
# 偶数时间戳表示原始数据
return self.0.is_multiple_of(2)
def next_retraction_time(self) -> Self:
# 奇数时间戳表示撤回
return Self(self.0 + 1)
资料来源:src/engine/timestamp.rs:45-53
这种设计允许连接器高效处理数据的插入、更新和删除操作,而无需重新处理整个数据集。
持久化与状态恢复
连接器支持持久化运行模式,通过on_persisted_run回调函数实现故障恢复:
- 检查点保存:系统在适当的时间点保存当前的处理状态
- 故障恢复:重新启动时,从最后一个检查点恢复
- 数据重放:根据保存的seek位置,重新从数据源读取未处理的数据
这种机制确保了在分布式环境中,即使发生节点故障,数据也不会丢失,处理可以透明地继续。
预置连接器
Pathway提供了多个开箱即用的流数据连接器,覆盖了主流的消息队列系统:
| 连接器 | 协议 | 典型用途 | 文档链接 |
|---|---|---|---|
| Kafka | TCP二进制协议 | 大规模流处理 | kafka/__init__.py |
| Redpanda | Kafka兼容API | 高性能流处理 | redpanda/__init__.py |
| NATS | NATS协议 | 轻量级消息传递 | nats/__init__.py |
| MQTT | MQTT协议 | IoT设备数据采集 | mqtt/__init__.py |
| RabbitMQ | AMQP协议 | 企业消息队列 | rabbitmq/__init__.py |
连接器配置参数
每种连接器都支持一组通用的配置参数:
| 参数 | 类型 | 说明 | 默认值 |
|---|---|---|---|
mode | ConnectorMode | 运行模式 | STREAMING |
deletions_enabled | bool | 是否处理删除事件 | false |
autocommit_duration | timedelta | 自动提交间隔 | None |
timeout | timedelta | 连接超时时间 | 30秒 |
schema | ValueField[] | 输出表模式 | 自动推断 |
使用场景
实时文档索引
Pathway的连接器常用于构建实时RAG(检索增强生成)系统。以下架构展示了典型的用法:
graph LR
A[文档数据源] --> B[Pathway连接器]
B --> C[数据预处理]
C --> D[文本分块]
D --> E[嵌入模型]
E --> F[向量索引]
F --> G[HTTP API]
G --> H[LLM查询]资料来源:examples/projects/ag2-multiagent-rag/README.md:12-20
在ag2-multiagent-rag项目中,Pathway连接器负责监听文件系统中的文档变化,实时索引新文档并更新向量存储。外部AI代理通过REST API查询相关文档,系统保证始终返回最新的索引结果。
Web数据抓取管道
Pathway连接器也广泛用于构建数据采集管道:
graph TD
A[新闻网站] --> B[Web爬虫连接器]
B --> C[内容解析]
C --> D[元数据提取]
D --> E[数据清洗]
E --> F[JSON Lines输出]
F --> G[下游处理]资料来源:examples/projects/web-scraping/README.md:1-50
自定义的NewsScraperSubject继承自Pathway的ConnectorSubject,实现了文章URL作为主键的数据表结构,支持增量更新和去重。
企业数据集成
对于企业级应用,Pathway的RabbitMQ和Kafka连接器能够与现有的消息基础设施无缝集成:
import pathway as pw
class MyConnectorSubject(pw.io.ConnectorSubject):
def __init__(self):
super().__init__()
self._client = create_rabbitmq_client()
def start(self):
self._client.connect()
def read(self):
while message := self._client.consume():
yield self._parse_message(message)
def end(self):
return self._client.is_idle_timeout()
与Differential Dataflow的集成
Pathway的连接器系统是Differential Dataflow在Python生态系统中的桥接层。Rust核心负责:
- 数据流的组织:使用Trace数据结构维护历史状态
- 增量计算:只处理实际发生的变更,而非全量数据
- 多 worker 支持:通过
pathway spawn --threads N启用多线程处理
Python API提供了高级抽象,降低了使用门槛,同时保持了Rust引擎的高性能特性。这种设计使得开发者能够用Python编写流处理逻辑,而底层执行效率与原生Rust程序相当。
最佳实践
模式定义
在创建连接器时,应显式定义输出表的schema,包括所有字段的名称、类型和默认值:
schema = pw.Schema(
fields={
"id": pw.FieldType.STRING,
"content": pw.FieldType.STRING,
"timestamp": pw.FieldType.DATETIME,
},
primary_key=["id"]
)
错误处理
连接器应实现健壮的错误处理机制:
def read(self):
try:
yield from self._fetch_data()
except TransientError:
# 临时错误,等待后重试
time.sleep(1)
yield from self.read()
except PermanentError:
# 永久错误,标记连接器失败
self._mark_failed()
性能优化
- 批量处理:积累多条消息后再提交,减少I/O次数
- 背压控制:当处理速度跟不上数据到达速度时,使用
BackfillingThreshold配置 - 连接池复用:在多worker场景下复用连接,避免资源耗尽
总结
Pathway的流数据连接器系统提供了一套完整的数据集成解决方案。通过统一的抽象接口,开发者可以轻松地将各种外部数据源接入Pathway的处理管道。连接器与Differential Dataflow的深度集成确保了即使在处理大规模实时数据时,也能保持高效的增量计算能力。
预置的Kafka、Redpanda、NATS、MQTT和RabbitMQ连接器覆盖了大多数流数据场景,而PythonSubject基类则允许开发者实现完全自定义的数据源连接器,满足各种特殊的集成需求。
资料来源:[src/python_api.rs:21-31]()
表操作
Pathway 是一个基于 Python 的 ETL 框架,其核心数据模型围绕表(Table)展开。表操作是 Pathway 数据处理流水线的基本构建单元,提供了对结构化数据进行转换、过滤、聚合和连接的能力。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
继续阅读本节完整说明和来源证据。
概述
Pathway 是一个基于 Python 的 ETL 框架,其核心数据模型围绕表(Table)展开。表操作是 Pathway 数据处理流水线的基本构建单元,提供了对结构化数据进行转换、过滤、聚合和连接的能力。
Pathway 的表操作具有以下特性:
- 统一批流处理:同一套 API 可处理静态数据集和实时流数据
- 增量计算:基于 Rust 的 Differential Dataflow 引擎实现高效增量更新
- 声明式语法:通过 Python 表达式定义数据转换逻辑
资料来源:README.md:1-20
核心数据结构
Table 类
Table 是 Pathway 中的核心数据结构,代表一个具有特定模式的分布式表格。
classDiagram
class Table {
+scope: Py[Scope]
+handle: TableHandle
+_is_deleted: bool
+_app_id: str
+__getitem__(key) TableSlice
+select(*args) Table
+update_rows(*args) Table
+update_cells(*args) Table
+filter(predicate) Table
+groupby(*args) Table
+join(*args) Table
+join_inner(*args) Table
+concat(*tables) Table
}
class LegacyTable {
+universe: Universe
+columns: Vec~Column~
+to_engine() (UniverseHandle, Vec~ColumnHandle~)
+from_handles(scope, handles) LegacyTable
}
Table --> Scope : 关联
LegacyTable --> Universe : 关联
LegacyTable --> Column : 包含列(Column)与字段(ValueField)
每张表由多个列组成,每个列具有名称、类型和来源信息。
# ValueField 定义 - 资料来源:src/python_api.rs:40-55
@dataclass
class ValueField:
name: str # 列名
type_: Type # 数据类型
source: FieldSource # 字段来源(Payload/Materialized)
default: Optional[Value] # 默认值
metadata: Optional[str] # 元数据
类型系统
| 类型 | 说明 | 对应 Rust 类型 |
|---|---|---|
INT | 64位整数 | i64 |
FLOAT | 双精度浮点 | f64 |
STR | 字符串 | String |
BOOL | 布尔值 | bool |
ANY | 任意类型 | - |
表的创建与初始化
从输入连接器创建
Pathway 提供多种连接器用于从不同数据源创建表:
# 从文件系统创建表
table = pw.io.fs.read(
"./data",
schema=MySchema,
format="csv"
)
# 从 HTTP 流读取
table = pw.io.http.read(
"http://api.example.com/stream",
schema=MySchema,
json_field_paths={"data": "$.records[*]"}
)
表的内部状态
graph TD
A[输入数据] --> B[Python Subject]
B --> C[Raw Updates]
C --> D[Rust Engine]
D --> E[Universe Handle]
D --> F[Column Handles]
E --> G[Table]
F --> G
G --> H[快照 Snapshot]表的内部状态由 Universe 和 Column 集合组成:
# LegacyTable 内部结构 - 资料来源:src/python_api.rs:180-210
class LegacyTable:
def __init__(self, universe, columns):
self.universe = universe # 行的唯一标识集合
self.columns = columns # 列的列表
def to_engine(self):
"""转换为引擎句柄"""
universe = self.universe.get()
column_handles = [c.get().handle for c in self.columns]
return (universe.handle, column_handles)
基础表操作
选择操作(select)
select 用于从表中提取指定列,可进行计算和重命名:
# 基本选择
result = table.select(
id=table.user_id,
name=table.name,
full_name=table.first_name + " " + table.last_name # 表达式计算
)
过滤操作(filter)
filter 根据条件筛选行:
# 过滤条件
active_users = users.filter(users.status == "active")
premium_orders = orders.filter(orders.amount > 100)
行更新(update_rows)
向表中添加或更新行:
# 添加新行
updated_table = table.update_rows(
pw.table.builder.make_table(
id=[1, 2],
name=["Alice", "Bob"],
value=[10, 20]
)
)
单元格更新(update_cells)
更新特定列的值:
# 更新特定列
result = table.update_cells(
status=pw.this.status.upper(), # 将 status 列转为大写
_id=table.id # 保留原 id 列
)
资料来源:python/pathway/internals/table.py
聚合与分组
分组聚合(groupby)
groupby 按照指定键分组并执行聚合操作:
graph LR
A[输入表] --> B[GroupBy Key]
B --> C[每组聚合]
C --> D[聚合函数]
D --> E[输出表]
subgraph 聚合函数
F[count]
G[sum]
H[min/max]
I[collect]
end
D --> F
D --> G
D --> H
D --> I# 按类别聚合
category_stats = orders.groupby(orders.category).reduce(
orders.category,
total_amount=pw.reducers.sum(orders.amount),
order_count=pw.reducers.count(),
avg_price=pw.reducers.mean(orders.price)
)
可用聚合函数
| 函数 | 说明 | 示例 |
|---|---|---|
count() | 计数 | count() |
sum(expr) | 求和 | sum(table.amount) |
mean(expr) | 平均值 | mean(table.score) |
min(expr) | 最小值 | min(table.latency) |
max(expr) | 最大值 | max(table.price) |
collect(expr) | 收集为列表 | collect(table.items) |
sorted_tuple(expr) | 排序元组 | sorted_tuple(table.events) |
表连接操作
连接的种类
| 连接类型 | 说明 | 空值处理 |
|---|---|---|
join / join_inner | 内连接 | 丢弃无匹配的行 |
join_left | 左外连接 | 保留左表所有行 |
join_right | 右外连接 | 保留右表所有行 |
join_outer | 全外连接 | 保留所有行 |
基本连接语法
# 内连接
result = table1.join(
table2,
table1.key == table2.key
)
# 指定输出列
result = table1.join(
table2,
table1.id == table2.user_id,
left=pw.left.id,
right=pw.right.user_id,
prefix="left_"
)
复杂连接场景
# 多条件连接
orders_enriched = orders.join(
customers,
(orders.customer_id == customers.id) &
(orders.region == customers.region)
)
时间与版本控制
时间戳机制
Pathway 使用特殊的时间戳系统来追踪数据的版本和变更:
# Timestamp 实现 - 资料来源:src/engine/timestamp.rs:1-30
impl Timestamp {
fn followed_by(&self, other: &Self) -> Option<Self> {
self.0.followed_by(&other.0).map(Self)
}
}
数据变更追踪
时间线: ─────────────────────────────────────────────►
t0 t1 t2 t3 t4
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
数据: [v0] [v1] [v2] [撤回] [v4]
(修改) (修改) (删除)
Pathway 使用 Differential Dataflow 实现增量更新,仅处理发生变化的数据。
表的快照与导出
快照操作(snapshot)
获取表在特定时间点的状态:
# 获取当前快照
snapshot_data = table.snapshot()
# 在特定前沿时间获取快照 - 资料来源:src/python_api.rs:220-240
frontier = TotalFrontier::At(timestamp)
snapshot = exported_table.snapshot_at(frontier)
外部索引集成
# 创建向量索引用于 RAG
table = pw.Table.from_columns(
id=range(1000),
content=texts,
vector=embeddings
)
# 构建外部索引
table = table.groupby(table.id).reduce(
table.id,
content=pw.reducers.torch(
"embedding",
vector=table.vector,
mode="mean"
)
)
最佳实践
性能优化
- 合理设计模式:避免过度规范化
- 使用适当的数据类型:优先使用原生类型而非字符串
- 批量操作:优先使用
update_rows而非逐行更新 - 索引列:在频繁连接的列上建立索引
常见模式
# 模式1:流式处理 + 批处理混合
stream_table = pw.io.kafka.read(...)
batch_table = pw.io.fs.read("./backup", ...)
combined = stream_table.concat(batch_table)
# 模式2:滑动窗口聚合
windowed = events.groupby(
events.session_id
).reduce(
events.session_id,
time_range=pw.reducers.count(),
events_list=pw.reducers.collect(
pw.this.event_data
)[-100:] # 最近100个事件
)
与其他模块的关系
graph TD
Table[表操作] --> Column[列操作]
Table --> Expression[表达式]
Table --> Connector[连接器]
Column --> Expression
Expression --> Compute[计算引擎]
Connector --> Input[输入数据]
Compute --> Output[输出数据]
Compute --> Persistence[持久化]- 列操作(
column.py):定义列级别的表达式和转换 - 表达式(
expression.py):处理数据计算和条件逻辑 - 连接器(
pw.io.*):负责数据的输入输出
错误处理
失败状态检测
# 检查表是否处于失败状态
if table.failed():
logger.error("数据处理失败,请检查输入数据")
追踪与调试
# 获取错误追踪信息
class Trace:
file_name: str
line_number: int
line: str
function: str
小结
Pathway 的表操作模块构成了整个框架的核心,提供了:
- 统一的数据模型:通过
Table类实现批流统一的表格操作 - 声明式的转换语法:使用 Python 表达式定义复杂的数据处理逻辑
- 高效的增量计算:基于 Differential Dataflow 的增量更新机制
- 丰富的连接和聚合能力:支持多种连接类型和聚合函数
熟练掌握表操作是构建高效 Pathway 数据处理流水线的基础。
资料来源:[README.md:1-20]()
连接与聚合
Pathway 是一个基于 Python 的 ETL 框架,其核心由 Rust 引擎驱动,底层采用 Differential Dataflow 实现增量计算。连接(Join)与聚合(Aggregation) 是 Pathway 数据处理管线中的两个核心操作,它们使得实时流处理和批处理能够以统一的方式进行。
继续阅读本节完整说明和来源证据。
连接与聚合
Pathway 是一个基于 Python 的 ETL 框架,其核心由 Rust 引擎驱动,底层采用 Differential Dataflow 实现增量计算。连接(Join)与聚合(Aggregation) 是 Pathway 数据处理管线中的两个核心操作,它们使得实时流处理和批处理能够以统一的方式进行。
来源:https://github.com/pathwaycom/pathway / 项目说明书
失败模式与踩坑日记
保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。
可能增加新用户试用和生产接入成本。
可能影响授权、密钥配置或安全边界。
可能增加新用户试用和生产接入成本。
可能阻塞安装或首次运行。
Pitfall Log / 踩坑日志
项目:pathwaycom/pathway
摘要:发现 22 个潜在踩坑项,其中 2 个为 high/blocking;最高优先级:安装坑 - 来源证据:[Bug]: TypeError: Cannot instantiate typing.Any。
1. 安装坑 · 来源证据:[Bug]: TypeError: Cannot instantiate typing.Any
- 严重度:high
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:[Bug]: TypeError: Cannot instantiate typing.Any
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_d63b2462b18449c4a2c02bb5e02a96c8 | https://github.com/pathwaycom/pathway/issues/227 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
2. 安全/权限坑 · 来源证据:Entra Authentication Support (credential handler and/or password callback)
- 严重度:high
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Entra Authentication Support (credential handler and/or password callback)
- 对用户的影响:可能影响授权、密钥配置或安全边界。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_e7c65f4d8bce4b1a9c99accbf0457633 | https://github.com/pathwaycom/pathway/issues/230 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
3. 安装坑 · 来源证据:Automated schema exploration in input connectors
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Automated schema exploration in input connectors
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_74b91feb391f4b0298216d2a48fe5abe | https://github.com/pathwaycom/pathway/issues/224 | 来源类型 github_issue 暴露的待验证使用条件。
4. 安装坑 · 来源证据:Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_ab5d2b2076034999bfaed612a3d95d60 | https://github.com/pathwaycom/pathway/issues/232 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
5. 安装坑 · 来源证据:Improve watermarks in POSIX-like objects tracker
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Improve watermarks in POSIX-like objects tracker
- 对用户的影响:可能阻塞安装或首次运行。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_38f29b1fba3b41cc99b1364d15ef7213 | https://github.com/pathwaycom/pathway/issues/225 | 来源讨论提到 node 相关条件,需在安装/试用前复核。
6. 安装坑 · 来源证据:Persistence in `iterate` operator
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Persistence in
iterateoperator - 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_e642d639ebbc4382b242028ef89ec236 | https://github.com/pathwaycom/pathway/issues/214 | 来源类型 github_issue 暴露的待验证使用条件。
7. 安装坑 · 来源证据:Support LEANN for RAG pipelines
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Support LEANN for RAG pipelines
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_5d99dd1cfc794b299633b6c5dc9dd33b | https://github.com/pathwaycom/pathway/issues/173 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
8. 安装坑 · 来源证据:Support MongoDB Atlas in pw.io.mongodb
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Support MongoDB Atlas in pw.io.mongodb
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_fa1d6e2e977a45d99c414724681f0f32 | https://github.com/pathwaycom/pathway/issues/221 | 来源类型 github_issue 暴露的待验证使用条件。
9. 安装坑 · 来源证据:feat: Add Microsoft SQL Server (MSSQL) connector
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:feat: Add Microsoft SQL Server (MSSQL) connector
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_370bcc864a314734b602f01d3d444ef9 | https://github.com/pathwaycom/pathway/issues/204 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
10. 安装坑 · 来源证据:v0.27.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.27.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_8d6905d8719c488cbcbb821e794add26 | https://github.com/pathwaycom/pathway/releases/tag/v0.27.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
11. 安装坑 · 来源证据:v0.29.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.29.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_b25babd3e3cc49108e56daaaaae8c5bf | https://github.com/pathwaycom/pathway/releases/tag/v0.29.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
12. 安装坑 · 来源证据:v0.30.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.30.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_0291ee068e4e4d38aa86d0fd433b960a | https://github.com/pathwaycom/pathway/releases/tag/v0.30.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。
13. 配置坑 · 来源证据:v0.28.0
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:v0.28.0
- 对用户的影响:可能影响升级、迁移或版本选择。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_708184d9c8ac445d923c82d38af7bd19 | https://github.com/pathwaycom/pathway/releases/tag/v0.28.0 | 来源类型 github_release 暴露的待验证使用条件。
14. 配置坑 · 来源证据:v0.30.1
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:v0.30.1
- 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_e7a19bd03a49499e987781160e4b76f0 | https://github.com/pathwaycom/pathway/releases/tag/v0.30.1 | 来源类型 github_release 暴露的待验证使用条件。
15. 能力坑 · 能力判断依赖假设
- 严重度:medium
- 证据强度:source_linked
- 发现:README/documentation is current enough for a first validation pass.
- 对用户的影响:假设不成立时,用户拿不到承诺的能力。
- 建议检查:将假设转成下游验证清单。
- 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
- 证据:capability.assumptions | github_repo:571186647 | https://github.com/pathwaycom/pathway | README/documentation is current enough for a first validation pass.
16. 运行坑 · 来源证据:`pw.io.mssql.read` needs LSN persistence
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个运行相关的待验证问题:
pw.io.mssql.readneeds LSN persistence - 对用户的影响:可能增加新用户试用和生产接入成本。
- 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_59d927a667a843ccb7d0a4cd147a1dc0 | https://github.com/pathwaycom/pathway/issues/218 | 来源类型 github_issue 暴露的待验证使用条件。
17. 维护坑 · 维护活跃度未知
- 严重度:medium
- 证据强度:source_linked
- 发现:未记录 last_activity_observed。
- 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
- 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
- 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | last_activity_observed missing
18. 安全/权限坑 · 下游验证发现风险项
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 对用户的影响:下游已经要求复核,不能在页面中弱化。
- 建议检查:进入安全/权限治理复核队列。
- 防护动作:下游风险存在时必须保持 review/recommendation 降级。
- 证据:downstream_validation.risk_items | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium
19. 安全/权限坑 · 存在评分风险
- 严重度:medium
- 证据强度:source_linked
- 发现:no_demo
- 对用户的影响:风险会影响是否适合普通用户安装。
- 建议检查:把风险写入边界卡,并确认是否需要人工复核。
- 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
- 证据:risks.scoring_risks | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium
20. 安全/权限坑 · 来源证据:Support encryption in Kinesis connectors
- 严重度:medium
- 证据强度:source_linked
- 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Support encryption in Kinesis connectors
- 对用户的影响:可能影响授权、密钥配置或安全边界。
- 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
- 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
- 证据:community_evidence:github | cevd_f29097c3eced4a209e7c80971aeea40c | https://github.com/pathwaycom/pathway/issues/223 | 来源类型 github_issue 暴露的待验证使用条件。
21. 维护坑 · issue/PR 响应质量未知
- 严重度:low
- 证据强度:source_linked
- 发现:issue_or_pr_quality=unknown。
- 对用户的影响:用户无法判断遇到问题后是否有人维护。
- 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
- 防护动作:issue/PR 响应未知时,必须提示维护风险。
- 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | issue_or_pr_quality=unknown
22. 维护坑 · 发布节奏不明确
- 严重度:low
- 证据强度:source_linked
- 发现:release_recency=unknown。
- 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
- 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
- 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | release_recency=unknown
来源:Doramagic 发现、验证与编译记录