Doramagic 项目包 · 项目说明书

pathway 项目

生成时间:2026-05-16 20:44:44 UTC

Pathway简介

Pathway是一个Python ETL框架,专为流处理(stream processing)、实时分析(real-time analytics)、LLM管道(LLM pipelines)和检索增强生成(RAG)场景设计。资料来源:[README.md:1]()

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 Python API层

继续阅读本节完整说明和来源证据。

章节 Rust执行引擎

继续阅读本节完整说明和来源证据。

章节 Timely Dataflow分布式计算

继续阅读本节完整说明和来源证据。

项目概述

Pathway是一个Python ETL框架,专为流处理(stream processing)、实时分析(real-time analytics)、LLM管道(LLM pipelines)和检索增强生成(RAG)场景设计。资料来源:README.md:1

Pathway的核心优势在于其易用的Python API,允许用户无缝集成最喜欢的Python机器学习库。同时,Pathway代码具有极高的通用性和鲁棒性,可以同时用于开发和生产环境,有效处理批处理(batch)和流式数据(streaming data)。资料来源:README.md:3

Pathway的一个显著特点是代码一致性:同一套代码可用于本地开发、CI/CD测试、运行批处理任务、处理流回放(stream replays)以及处理数据流。资料来源:README.md:4

核心技术架构

Pathway的技术架构采用分层设计,融合了多种分布式计算范式:

graph TD
    A[Python API层] --> B[PyO3绑定层]
    B --> C[Rust执行引擎]
    C --> D[Timely Dataflow分布式计算]
    C --> E[Differential Dataflow增量计算]
    D --> F[增量数据流处理]
    E --> F

Python API层

Pathway提供简洁的Python接口,通过PyO3实现Python与Rust的双向绑定。Python API层支持多种数据类型和连接器配置。资料来源:src/python_api.rs:1-80

核心Python绑定类包括:

类名功能说明
PythonSubjectPython主题定义,支持start、read、seek等回调
ValueField值字段定义,包含名称、类型、来源和默认值
Table数据表核心抽象
ConnectorProperties连接器通用属性
CsvParserSettingsCSV解析器配置
AwsS3SettingsAWS S3连接配置

Rust执行引擎

Pathway的执行引擎由Rust实现,基于Differential Dataflow和Timely Dataflow构建。Rust引擎负责高性能的数据处理和计算。资料来源:README.md:5

引擎通过pymodule导出Python可调用的类和函数,实现Python语法的易用性与Rust的高性能完美结合。资料来源:src/python_api.rs:100-150

Timely Dataflow分布式计算

Pathway使用Timely Dataflow作为底层分布式计算框架。Timely Dataflow提供了数据流图的处理能力,支持并行和分布式计算。资料来源:external/timely-dataflow/timely/src/progress/subgraph.rs:1-30

Timely Dataflow的核心组件包括:

graph LR
    A[数据输入] --> B[Scope子图]
    B --> C[PerOperatorState]
    C --> D[Antichain进度跟踪]
    D --> E[ChangeBatch消息]
    E --> F[数据输出]

关键数据结构:

  • Activations:共享激活状态管理
  • MutableAntichain:可变反链,用于跟踪进度边界
  • ChangeBatch:变化批次,用于记录数据变化

Differential Dataflow增量计算

Differential Dataflow是Pathway实现增量计算的核心技术。与传统批处理不同,Differential Dataflow能够智能处理数据的变化,仅重新计算受影响的部分。资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:1-30

#### 轨迹追踪器(Trace)

Pathway使用轨迹(Trace)来记录历史数据状态,支持时间旅行查询和增量更新。资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:1-30

graph TD
    A[数据变化] --> B[Trace记录]
    B --> C[时间戳管理]
    C --> D[Antichain since边界]
    D --> E[增量计算引擎]
    E --> F[输出更新]

#### 有序批处理

Pathway实现了高效的有序批处理结构OrdKeyBatchOrdValBatch,用于存储键值对的时间序列数据。资料来源:external/differential-dataflow/src/trace/implementations/ord.rs:1-50

组件说明
OrdKeyCursor有序键游标,用于遍历键数据
OrdValCursor有序键值游标,用于遍历键值对
BatchContainer批量容器接口
MergeBuilder合并构建器

主要功能特性

连接器支持

Pathway提供丰富的连接器支持,覆盖多种数据源和数据格式:

连接器类型支持的数据源
文件系统CSV、Parquet等
云存储AWS S3、Azure Blob Storage
数据库PostgreSQL(包括复制支持)
搜索引擎Elasticsearch
流处理MQTT、Kafka、Debezium
其他Schema Registry、Iceberg

资料来源:src/python_api.rs:80-120

PostgreSQL连接器

Pathway的PostgreSQL连接器支持完整的二进制协议,实现高效的数据读写:

  • 支持多种数据类型映射(BOOL、INT、Float、TEXT、JSONB等)
  • 二进制数组格式写入
  • 复制槽(Replication Slot)支持逻辑复制
  • Debezium CDC集成

资料来源:src/connectors/postgres.rs:1-60

数据格式与解析

Pathway提供灵活的数据格式配置,包括:

CsvParserSettings(
    name=field_name,
    type_=field_type,
    source=FieldSource.Payload,
    default=None
)

支持的数据类型包括:INT、FLOAT、STRING、BOOL、TIMESTAMP、JSON等。

持久化与状态管理

Pathway支持灵活的数据持久化配置:

配置项说明
PersistenceConfig持久化基础配置
BackfillingThreshold回填阈值配置
PySnapshotAccess快照访问模式
PySnapshotEvent快照事件类型

应用场景

ETL管道

Pathway特别适合构建ETL(Extract-Transform-Load)管道,支持从多种数据源提取数据、进行转换处理、加载到目标系统。

graph LR
    A[数据源] -->|Extract| B[Pathway管道]
    B -->|Transform| C[增量处理]
    C -->|Load| D[目标系统]

实时分析

基于Differential Dataflow的增量计算能力,Pathway能够高效处理实时数据分析任务,仅处理数据流中的增量变化部分。

LLM管道与RAG

Pathway为构建LLM(Large Language Model)管道和RAG(Retrieval-Augmented Generation)应用提供专用支持,能够实时索引和检索外部数据源。

开发与部署

开发环境

Pathway支持本地开发环境,开发者可以使用熟悉的Python工具链进行开发。资料来源:examples/templates/el-pipeline/README.md:1-20

许可证

Pathway采用BSL(Business Source License)许可证发布,允许非商业使用和评估。商业使用需要获取正式许可证。资料来源:README.md:45

许可证配置

部署时需要设置环境变量:

export PATHWAY_LICENSE_KEY=your_pathway_key

技术优势总结

优势描述
高性能Rust引擎提供接近原生的执行效率
增量计算Differential Dataflow避免重复计算
统一APIPython接口简化开发,Rust保证性能
多数据源丰富的内置连接器支持
可扩展性基于Timely Dataflow支持分布式部署
开发友好支持本地开发和生产环境一致体验

资料来源:[src/python_api.rs:80-120]()

安装指南

Pathway 是一个基于 Python 的 ETL 框架,由 Rust 引擎驱动,支持流处理和实时分析。本指南将帮助你在不同环境下正确安装和配置 Pathway。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 硬件和操作系统要求

继续阅读本节完整说明和来源证据。

章节 Python 版本要求

继续阅读本节完整说明和来源证据。

章节 使用 pip 安装

继续阅读本节完整说明和来源证据。

系统要求

硬件和操作系统要求

要求类型最低配置推荐配置
处理器2 核4 核或以上
内存4 GB8 GB 或以上
磁盘空间500 MB1 GB 或以上
操作系统Linux/macOS/WindowsLinux/macOS

Python 版本要求

Pathway 需要 Python 3.10 或更高版本。建议使用 Python 3.11 以获得最佳性能。

Python 版本支持状态
3.10✅ 支持
3.11✅ 推荐
3.12✅ 支持

安装方式

使用 pip 安装

通过 pip 包管理器安装 Pathway 是最简单的方式:

pip install pathway

从源码安装

对于需要最新功能或进行开发调试的用户,可以从源码编译安装:

# 克隆仓库
git clone https://github.com/pathwaycom/pathway.git
cd pathway

# 安装 Rust 依赖和 Python 包
pip install -e .

依赖环境

Rust 工具链

Pathway 的核心引擎使用 Rust 编写,需要 Rust 工具链支持。

#### 安装 Rust

# Linux/macOS
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Windows - 使用 rustup 安装程序
winget install Rustlang.Rustup

#### 验证 Rust 安装

rustc --version
cargo --version

项目使用 rust-toolchain.toml 文件指定 Rust 版本,确保编译器版本一致性。

Python 依赖

Pathway 的 Python 依赖在 pyproject.toml 中定义,主要包括:

依赖包版本要求用途
pyo3最新稳定版Python 绑定
timely-dataflow外部依赖数据流处理
differential-dataflow外部依赖增量计算

环境配置

环境变量

Pathway 运行可能需要的可选环境变量:

# 设置 Pathway 许可证密钥(商业功能需要)
export PATHWAY_LICENSE_KEY=your_pathway_key

# 可选:设置 Python 路径
export PYTHONPATH=/path/to/pathway:$PYTHONPATH

验证安装

安装完成后,可以通过以下方式验证:

import pathway as pw

# 打印版本信息
print(pw.__version__)

# 创建一个简单的数据流测试
input_session = pw.demo.demo_reduce_operator(show_progress=False)
print("Pathway 安装成功!")

平台特定说明

Linux

Linux 环境需要确保以下系统包已安装:

# Debian/Ubuntu
sudo apt-get update
sudo apt-get install -y python3-dev python3-pip cargo

macOS

macOS 用户建议使用 Homebrew 安装依赖:

brew install python rust
pip3 install pathway

Windows

Windows 用户需要安装 Visual C++ Build Tools 以编译 Rust 依赖:

  1. 下载并安装 Visual Studio Build Tools
  2. 确保安装 "C++ 生成工具" 工作负载
  3. 通过 PowerShell 或命令提示符安装 Pathway

常见问题

编译错误

如果遇到编译错误,确保 Rust 工具链是最新版本:

rustup update
cargo clean
pip install --force-reinstall pathway

导入错误

如果遇到 ModuleNotFoundError,检查 Python 路径和安装状态:

pip show pathway
python -c "import pathway; print(pathway.__file__)"

下一步

安装完成后,你可以开始使用 Pathway:

  1. 阅读快速开始指南了解基本用法
  2. 查看用户指南深入学习
  3. 参考 API 文档了解完整功能

来源:https://github.com/pathwaycom/pathway / 项目说明书

基础示例

Pathway 是一个 Python ETL 框架,用于流处理、实时分析、LLM 管道和 RAG(检索增强生成)应用开发。本文档介绍 Pathway 的基础概念、核心组件和入门示例。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 数据流架构

继续阅读本节完整说明和来源证据。

章节 PythonSubject 核心类

继续阅读本节完整说明和来源证据。

章节 ValueField 模式字段

继续阅读本节完整说明和来源证据。

概述

Pathway 由 Python 前端和 Rust 后端引擎组成,通过 PyO3 实现 Python 与 Rust 的互操作。Python API 提供了简洁易用的接口,而 Rust 引擎基于 Differential Dataflow 进行增量计算,确保高性能和一致性。

资料来源:README.md:1-15

核心概念

数据流架构

graph TD
    A[数据源 Connectors] --> B[PythonSubject 输入]
    B --> C[Rust 引擎处理]
    C --> D[增量计算 Differential Dataflow]
    D --> E[PyExportedTable 输出]
    E --> F[快照 Snapshot]

Pathway 的核心数据流遵循:数据源 → PythonSubject → Rust 引擎 → 增量计算 → ExportedTable 的模式。

资料来源:src/python_api.rs:35-45

PythonSubject 核心类

PythonSubject 是 Python 端的数据输入接口,用于定义数据源的读取行为:

参数类型说明
startPy<PyAny>起始处理函数
readPy<PyAny>读取数据函数
seekPy<PyAny>定位函数
on_persisted_runPy<PyAny>持久化运行回调
endPy<PyAny>结束处理函数
is_internalbool是否为内部数据源
deletions_enabledbool是否启用删除

资料来源:src/python_api.rs:25-33

ValueField 模式字段

ValueField 定义表的 schema 字段结构:

属性类型说明
nameString字段名称
type_Type数据类型
sourceFieldSource字段来源(默认 Payload)
defaultOption<Value>默认值
metadataOption<String>元数据

资料来源:src/python_api.rs:52-61

PyExportedTable 导出表

PyExportedTable 提供数据输出和快照功能:

方法返回类型说明
frontier()TotalFrontier<Timestamp>获取当前前沿
snapshot_at(frontier)Vec<(Key, Vec<Value>)>在指定前沿获取快照
failed()bool检查处理是否失败

资料来源:src/python_api.rs:106-115

Done 标记值

Done 是时间线末端的标记值,用于表示计算完成状态。它是一个冻结的 Python 对象,具有特殊的比较和哈希行为:

  • Done 比较时返回 Equal
  • 与整数比较时返回 Greater
  • 支持自定义哈希实现

资料来源:src/python_api.rs:176-196

时间戳与前沿

Timestamp 时间戳

Pathway 使用 Timestamp 进行时间管理,实现 OriginalOrRetractionNextRetractionTime trait:

graph LR
    A[原始数据] --> B[偶数时间戳]
    A --> C[ retraction 数据]
    C --> D[奇数时间戳]
    D --> E[下一 retraction 时间]
  • is_original(): 偶数时间戳表示原始数据
  • next_retraction_time(): 计算下一个 retraction 时间(当前时间 + 1)

资料来源:src/engine/timestamp.rs:45-55

TotalFrontier 前沿状态

TotalFrontier<T> 是表示计算前沿的枚举类型:

变体说明
At(T)在指定时间点
Done计算已完成

该类型实现了 FromPyObjectIntoPyObject,支持 Python 和 Rust 间的无缝转换。

资料来源:src/python_api.rs:140-175

基础使用模式

典型项目结构

project/
├── app.py              # 主程序
├── data/               # 数据目录
│   ├── docs/           # 文档输入
│   └── output/         # 结果输出
├── .env                # 环境变量
└── requirements.txt    # 依赖

资料来源:examples/projects/question-answering-rag/README.md:1-30

环境配置

安装 Pathway:

pip install pathway[xpack-llm]

设置环境变量:

export PATHWAY_LICENSE_KEY=your_pathway_key

或创建 .env 文件:

OPENAI_API_KEY=your_openai_api_key_here

资料来源:examples/projects/question-answering-rag/README.md:25-30

运行应用

直接运行 Python 脚本:

python app.py

使用 Pathway CLI:

pathway spawn python main.py

多线程运行:

pathway spawn --threads 3 python main.py

资料来源:README.md:120-135

HTTP 服务集成

Pathway 提供 HTTP 服务器用于接收查询和返回结果:

webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)

提交查询:

curl --data '{ "messages": "What is the value of X?"}' http://localhost:8011

资料来源:examples/projects/question-answering-rag/README.md:15-20

注册的 Python API 类

Pathway 通过 add_class 注册以下核心类到 pathway.engine 模块:

类别类名
连接器设置AwsS3Settings, AzureBlobStorageSettings, MqttSettings, PsqlReplicationSettings
解析器CsvParserSettings, PySchemaRegistrySettings
存储格式DataStorage, DataFormat, PersistenceConfig
索引IcebergCatalogSettings, ElasticSearchParams
内部类PythonSubject, ValueField, PyExportedTable, Done, Trace
配置ConnectorProperties, ColumnProperties, TableProperties, TelemetryConfig
快照PySnapshotAccess, PySnapshotEvent, PyPersistenceMode
向量索引PyExternalIndexFactory, PyExternalIndexData, PyExternalIndexQuery

资料来源:src/python_api.rs:220-250

文档与支持

完整文档访问:pathway.com/developers/

如有问题,可通过以下方式获取帮助:

资料来源:examples/templates/el-pipeline/README.md:45-50

下一步

  • 学习 Pathway 的连接器配置
  • 了解实时数据处理的高级特性
  • 探索 RAG 和 LLM 集成的最佳实践
  • 查看 examples/projects/ 目录下的完整示例项目

资料来源:[README.md:1-15]()

引擎架构

Pathway 的引擎架构是一个基于 Rust 的高性能计算引擎,构建于 Differential Dataflow 和 Timely Dataflow 之上。这一架构使得 Pathway 能够以 Python API 的形式提供简洁易用的接口,同时在底层实现增量计算和流式处理。引擎的核心设计理念是将 Python 层的灵活表达与 Rust 引擎的高性能执行相结合,通过 P...

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 计算引擎层次结构

继续阅读本节完整说明和来源证据。

章节 核心类结构

继续阅读本节完整说明和来源证据。

章节 Differential Dataflow 集成

继续阅读本节完整说明和来源证据。

概述

Pathway 的引擎架构是一个基于 Rust 的高性能计算引擎,构建于 Differential Dataflow 和 Timely Dataflow 之上。这一架构使得 Pathway 能够以 Python API 的形式提供简洁易用的接口,同时在底层实现增量计算和流式处理。引擎的核心设计理念是将 Python 层的灵活表达与 Rust 引擎的高性能执行相结合,通过 PyO3 实现 Python 与 Rust 之间的无缝桥接。

Pathway 引擎支持多种数据类型和处理模式,包括静态数据(batch)和动态数据(streaming),能够在同一个代码库中处理历史数据和实时数据流。引擎内置的增量计算机制确保只有发生变化的数据才会被重新处理,从而大幅提升计算效率。

核心架构组件

计算引擎层次结构

Pathway 引擎采用三层架构设计,从上到下依次为:

层次组件职责
Python API 层pathway.engine 模块提供 Python 接口,暴露核心类和函数
PyO3 绑定层src/python_api.rs实现 Python 对象到 Rust 对象的转换
Rust 核心层src/engine/实现 Differential Dataflow 计算引擎

资料来源:src/python_api.rs:1-50

核心类结构

Pathway 引擎通过 PyO3 将以下核心 Rust 结构暴露给 Python:

# Python API 层暴露的核心类
Table           # 数据表,表示一组带时间戳的记录
Column          # 列,表示数据表中的一列
Universe        # 宇宙,表示一组唯一标识符
Context         # 上下文,管理数据流计算的执行环境
Scope           # 作用域,定义计算的数据流范围
DataRow         # 数据行,表示表中的一行数据
Computer        # 计算器,定义如何计算派生列

资料来源:src/python_api.rs:85-110

数据流计算模型

Differential Dataflow 集成

Pathway 引擎的核心计算模型基于 Differential Dataflow,这是一种用于增量数据流计算的编程模型。Differential Dataflow 能够高效处理数据的插入、更新和删除操作,通过追踪数据变化而非重新计算整个结果来实现高性能。

graph TD
    A[输入数据流] --> B[Input Session]
    B --> C[Differential Collection]
    C --> D[Arrangement]
    D --> E[Operators<br/>join/reduce/count]
    E --> F[输出 Collection]
    F --> G[Incremental 结果]
    
    H[变化追踪] --> C

资料来源:external/differential-dataflow/src/operators/reduce.rs:1-50

时间戳与版本管理

Pathway 使用复合时间戳来追踪数据变化,每个数据记录都带有时间戳和差异值(multiplicity)。时间戳必须实现 Lattice trait,支持部分有序比较,这使得系统能够正确处理并发和乱序数据。

// 时间戳必须是 Lattice,用于支持增量计算的合并
impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G, T1>
where
    G: Scope,
    G::Timestamp: Lattice+Ord+Debug,
    // ...

资料来源:external/differential-dataflow/src/operators/join.rs:1-30

数据流图结构

Pathway 引擎内部维护一个数据流图(Dataflow Graph),用于协调各个算子之间的数据传递:

graph LR
    subgraph DataflowGraphInner
        A[BeforeIterate] --> B[Scope]
        B --> C[Child Scope]
        C --> D[Operators]
        D --> E[AfterIterate]
    end
    
    F[持久化数据] --> A
    A --> G[表创建]
    G --> D

资料来源:src/engine/dataflow.rs:1-60

核心算子实现

Join 算子

Join 是 Pathway 中最常用的算子之一,用于合并两个数据集。引擎使用 join_core_internal_unsafe 方法实现高效的连接操作:

// Join 操作的核心实现
fn join_core<Tr2,I,L>(&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
    Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
    // ... 配置连接参数
{
    self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}

资料来源:external/differential-dataflow/src/operators/join.rs:20-35

Reduce 算子

Reduce 算子用于对数据进行聚合操作,支持自定义归约器实现:

impl<S: MaybeTotalScope, R: ReducerImpl> DataflowReducer<S> for R
where
    Collection<S, (Key, Option<<R as ReducerImpl>::State>)>:
        Into<PersistableCollection<S>> + From<PersistableCollection<S>>,
{
    fn reduce(
        self: Rc<Self>,
        values: &Collection<S, (Key, Key, Vec<Value>)>,
        error_logger: Rc<dyn LogError>,
        _trace: Trace,
        graph: &mut DataflowGraphInner<S>,
    ) -> Result<Values<S>> {
        // 实现聚合逻辑
    }
}

资料来源:src/engine/dataflow.rs:80-110

Count 算子

Count 是 Differential Dataflow 内置的聚合操作,用于计算每个键的出现次数:

// Count 扩展 trait 实现
pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
    fn count(&self) -> Collection<G, (K, R), isize> {
        self.count_core()
    }
    
    fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
        self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(
            "Count", 
            |_k, s, t| t.push((s[0].1.clone(), R2::from(1i8)))
        ).as_collection(|k, c| (k.clone(), c.clone()))
    }
}

资料来源:external/differential-dataflow/src/operators/reduce.rs:30-80

Arrangement 数据结构

Arrangement 概念

Arrangement 是 Differential Dataflow 中的核心数据结构,用于组织和索引数据以支持高效的后续操作。Arrangement 本质上是一个按 key 排序的 trace 结构:

graph TD
    A[输入数据] --> B[ArrangeByKey]
    B --> C[Trace Agent]
    C --> D[Trace Spine]
    D --> E[多个 Batch]
    
    F[查询] --> G[Cursor 遍历]
    G --> D

资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:1-40

Trace Writer 实现

Trace Writer 负责向 arrangement 中写入数据批次:

pub fn new(
    upper: Vec<Tr::Time>,
    trace: Weak<RefCell<TraceBox<Tr>>>,
    queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self {
    let mut temp = Antichain::new();
    temp.extend(upper.into_iter());
    Self { upper: temp, trace, queues }
}

pub fn exert(&mut self, fuel: &mut isize) {
    if let Some(trace) = self.trace.upgrade() {
        trace.borrow_mut().trace.exert(fuel);
    }
}

资料来源:external/differential-dataflow/src/operators/arrange/writer.rs:1-40

Trace Freeze 包装器

Trace Freeze 是一个特殊的 trace 包装器,用于在特定条件下冻结时间:

impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
where
    Tr: TraceReader,
    Tr::Time: Lattice+Clone+'static,
    F: Fn(&Tr::Time)->Option<Tr::Time>+'static,
{
    type Key = Tr::Key;
    type Val = Tr::Val;
    type Time = Tr::Time;
    type R = Tr::R;
    
    type Batch = BatchFreeze<Tr::Batch, F>;
    type Cursor = CursorFreeze<Tr::Cursor, F>;
}

资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:1-50

增量计算机制

变化批次处理

Pathway 使用 ChangeBatch 来追踪数据的变化,每个变化包含时间戳和差异值:

pub fn into_inner(mut self) -> Vec<(T, i64)> {
    self.compact();
    self.updates
}

pub fn iter(&mut self) -> ::std::slice::Iter<(T, i64)> {
    self.compact();
    self.updates.iter()
}

资料来源:external/timely-dataflow/timely/src/progress/change_batch.rs:60-90

持久化与状态管理

引擎支持对计算状态进行持久化,通过 PersistenceConfig 配置持久化行为:

配置项类型说明
persistence_modePyPersistenceMode持久化模式(禁用/异步/同步)
snapshot_accessPySnapshotAccess快照访问策略
snapshot_eventPySnapshotEvent快照触发事件

资料来源:src/python_api.rs:1-30

Python 绑定实现

PyO3 类导出

Pathway 通过 PyO3 将 Rust 结构导出为 Python 类:

#[pymethods]
impl ValueField {
    #[new]
    #[pyo3(signature = (name, type_, source = FieldSource::Payload))]
    pub fn new(name: String, type_: Type, source: FieldSource) -> Self {
        Self {
            name,
            type_,
            source,
        }
    }
}

资料来源:src/python_api.rs:140-160

PythonSubject 绑定

PythonSubject 允许 Python 代码定义自定义的数据源:

#[pyclass(module = "pathway.engine")]
pub struct PythonSubject {
    pub start: Py<PyAny>,
    pub read: Py<PyAny>,
    pub seek: Py<PyAny>,
    pub on_persisted_run: Py<PyAny>,
    pub end: Py<PyAny>,
    pub is_internal: bool,
    pub deletions_enabled: bool,
}

资料来源:src/python_api.rs:95-130

数据模型

表结构

组件说明
TableProperties表级别的属性配置
ColumnProperties列级别的属性配置
ConnectorProperties连接器配置
Trace追踪配置
Done完成标记

资料来源:src/python_api.rs:25-40

值字段定义

#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}

资料来源:src/python_api.rs:145-165

关键配置选项

连接器模式

模式说明
STATIC静态数据源,读取一次
STREAMING流式数据源,持续监听
REPLAY回放模式,重演历史数据

监控级别

级别说明
OFF禁用监控
BASIC基础指标
DETAILED详细指标

资料来源:src/python_api.rs:85-100

工作流程图

graph TD
    A[Python 代码定义数据流] --> B[PyO3 调用 Rust 引擎]
    B --> C[创建 DataflowGraph]
    C --> D[初始化 Scope 和 Universe]
    D --> E[注册数据源和算子]
    E --> F[启动数据流执行]
    
    F --> G{数据输入模式}
    G -->|批量| H[Batch Input]
    G -->|流式| I[Stream Input]
    
    H --> J[计算 operators]
    I --> J
    
    J --> K[生成 Arrangement]
    K --> L[输出结果]
    
    J --> M{状态变化}
    M -->|有变化| N[触发增量计算]
    M -->|无变化| O[跳过重算]
    N --> K

总结

Pathway 的引擎架构通过深度整合 Differential Dataflow 和 Timely Dataflow,实现了一个高性能、可扩展的增量计算引擎。核心设计特点包括:

  1. 双层语言架构:Python API 提供易用性,Rust 引擎保证性能
  2. 增量计算模型:通过时间戳和差异值追踪变化,避免重复计算
  3. 灵活的 Arrangement:支持高效的 join、reduce 等操作
  4. 完整的持久化支持:内置状态管理和快照机制
  5. 丰富的连接器:支持多种数据源和输出目标

这种架构使 Pathway 能够同时满足开发和生产环境的需求,支持从本地开发到大规模分布式部署的多种场景。

资料来源:[src/python_api.rs:1-50]()

Python与Rust集成

Pathway是一个基于Python的ETL框架,但其核心计算引擎由Rust实现。这种架构通过PyO3库实现Python与Rust的无缝集成,使得开发者可以使用简洁的Python API编写数据处理逻辑,同时享受Rust带来的高性能和内存安全性。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 核心数据结构绑定

继续阅读本节完整说明和来源证据。

章节 PyO3类型转换

继续阅读本节完整说明和来源证据。

章节 异常类型体系

继续阅读本节完整说明和来源证据。

技术架构概述

Pathway采用双层架构设计:上层是用户编写的Python代码,下层是Rust实现的增量计算引擎。Differential Dataflow算法驱动核心数据流处理,而Timely Dataflow提供分布式计算能力。

graph TD
    A[Python用户代码] --> B[PyO3绑定层]
    B --> C[Rust引擎核心]
    C --> D[Differential Dataflow]
    C --> E[Timely Dataflow]
    D --> F[增量计算]
    E --> G[分布式数据流]
    F --> H[结果返回Python]
    G --> H

PyO3绑定机制

Pathway使用PyO3库实现Python与Rust之间的双向通信。PyO3提供了从Rust向Python暴露类型和函数的能力,同时也支持从Python代码调用Rust函数。

核心数据结构绑定

#### PythonSubject结构体

PythonSubject是Pathway中用于连接外部数据源的关键结构体,它将Python端的回调函数包装为Rust可调用的对象:

#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
    pub start: Py<PyAny>,
    pub read: Py<PyAny>,
    pub seek: Py<PyAny>,
    pub on_persisted_run: Py<PyAny>,
    pub end: Py<PyAny>,
    pub is_internal: bool,
    pub deletions_enabled: bool,
}

资料来源:src/python_api.rs:1-15

参数类型说明
startPy<PyAny>起始位置回调函数
readPy<PyAny>读取数据回调函数
seekPy<PyAny>定位回调函数
on_persisted_runPy<PyAny>持久化运行回调
endPy<PyAny>结束位置回调
is_internalbool是否为内部数据源
deletions_enabledbool是否启用删除操作

#### ValueField结构体

ValueField用于定义表字段的元数据,包括字段名、类型、来源和默认值:

#[pyclass(module = "pathway.engine", frozen)]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}

资料来源:src/python_api.rs:36-48

PyO3类型转换

Pathway实现了Python与Rust之间的自动类型转换,通过IntoPyObjectFromPyObjecttrait实现。

#### Timestamp类型转换

impl<'py> IntoPyObject<'py> for Timestamp {
    type Target = PyAny;
    type Output = Bound<'py, Self::Target>;
    type Error = PyErr;
    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
        self.0.into_bound_py_any(py)
    }
}

资料来源:src/engine/timestamp.rs:40-50

#### TotalFrontier枚举转换

TotalFrontier表示计算的前沿边界,包含AtDone两种状态:

impl<'py, T> FromPyObject<'py> for TotalFrontier<T>
where
    T: FromPyObject<'py>,
{
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        if ob.is_instance_of::<Done>() {
            Ok(TotalFrontier::Done)
        } else {
            Ok(TotalFrontier::At(ob.extract()?))
        }
    }
}

资料来源:src/python_api.rs:180-192

错误处理机制

Pathway通过PyO3暴露多种异常类型供Python端捕获和处理。

异常类型体系

异常类型说明使用场景
EngineError基础引擎错误通用错误处理
EngineErrorWithTrace带调用栈的错误调试和问题定位
OtherWorkerError其他工作进程错误分布式环境错误处理
Error通用错误类型自定义错误
Pending操作未完成状态异步操作状态查询

资料来源:src/python_api.rs:200-230

表导出与快照机制

PyExportedTable导出表

PyExportedTable允许Python代码访问Rust引擎计算的表数据:

#[pyclass(module = "pathway.engine", frozen, name = "ExportedTable")]
pub struct PyExportedTable {
    inner: Arc<dyn ExportedTable>,
}

#[pymethods]
impl PyExportedTable {
    fn frontier(&self) -> TotalFrontier<Timestamp> {
        self.inner.frontier()
    }

    fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
        self.inner.snapshot_at(frontier)
    }

    fn failed(&self) -> bool {
        self.inner.failed()
    }
}

资料来源:src/python_api.rs:115-135

Done标记类型

Done是一个特殊的冻结类型,用于表示计算已完成的状态:

mod done {
    use once_cell::sync::Lazy;
    use pyo3::prelude::*;

    #[pyclass(module = "pathway.engine", frozen)]
    pub struct Done(InnerDone);

    pub static DONE: Lazy<Py<Done>> = Lazy::new(|| {
        Python::with_gil(|py| Py::new(py, Done(InnerDone)).expect("creating DONE should not fail"))
    });
}

资料来源:src/python_api.rs:165-177

计算上下文

Context作用域管理

Context结构体提供了当前计算行的上下文信息:

#[pyclass(module = "pathway.engine", frozen)]
pub struct Context(SendWrapper<ScopedContext>);

#[pymethods]
impl Context {
    #[getter]
    fn this_row(&self) -> PyResult<Key> {
        self.0
            .with(|context| context.this_row())
            .ok_or_else(|| PyValueError::new_err("context out of scope"))
    }
}

资料来源:src/python_api.rs:235-250

Computer计算器

Computer用于执行用户定义的Python函数:

#[pyclass]
pub struct Computer {
    pub fun: Py<PyAny>,
    pub dtype: Py<PyAny>,
    pub is_output: bool,
    pub is_method: bool,
    pub universe: Py<Universe>,
    pub data: Value,
    pub data_column: Option<Py<Column>>,
}

#[pymethods]
impl Computer {
    #[staticmethod]
    #[pyo3(signature = (fun, dtype, is_output, is_method, universe, data = Value::None, data_column = None))]
    pub fn from_raising_fun(
        py: Python,
        fun: Py<PyAny>,
        dtype: Py<PyAny>,
        is_output: bool,
        is_method: bool,
        universe: Py<Universe>,
        data: Value,
        data_column: Option<Py<Column>>,
    ) -> PyResult<Py<Self>> { ... }
}

资料来源:src/python_api.rs:260-290

Differential Dataflow集成

Arrangement导入机制

Pathway通过Differential Dataflow的arrangement机制在数据流之间共享计算结果:

/// Same as `import`, but allows to name the source.
pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
    G: Scope<Timestamp=Tr::Time>,
    Tr::Time: Timestamp,
{
    // Drop ShutdownButton and return only the arrangement.
    self.import_core(scope, name).0
}

资料来源:external/differential-dataflow/src/operators/arrange/agent.rs:35-45

Trace Freeze包装器

TraceFreeze包装器允许延迟计算trace直到被使用时才执行:

pub struct TraceFreeze<Tr, F>
where
    Tr: TraceReader,
    Tr::Time: Lattice+Clone+'static,
    F: Fn(&Tr::Time)->Option<Tr::Time>,
{
    trace: Tr,
    func: Rc<F>,
}

资料来源:external/differential-dataflow/src/trace/wrappers/freeze.rs:15-21

时间戳与顺序语义

Timestamp特征实现

Pathway实现了Differential Dataflow所需的时间戳特征:

impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}

impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}

资料来源:src/engine/timestamp.rs:60-70

时间戳顺序关系

Timestamp支持时间戳间的偏序关系计算:

pub fn followed_by(&self, other: &Self) -> Option<Self> {
    self.0.followed_by(&other.0).map(Self)
}

注册的Python类

Pathway通过add_class方法将以下Rust结构体暴露给Python:

类名功能
AwsS3SettingsAWS S3连接配置
AzureBlobStorageSettingsAzure Blob存储配置
ElasticSearchParamsElasticsearch参数
CsvParserSettingsCSV解析设置
DataStorage数据存储配置
DataFormat数据格式定义
PersistenceConfig持久化配置
PythonSubjectPython数据源主题
MqttSettingsMQTT连接设置
IcebergCatalogSettingsIceberg目录配置
ConnectorProperties连接器属性
ColumnProperties列属性
TableProperties表属性
PyExportedTable导出表
PyExternalIndexFactory外部索引工厂
PyUSearchMetricKindUSearch度量类型
PyBruteForceKnnMetricKind暴力KNN度量类型

资料来源:src/python_api.rs:300-330

数据流处理流程

sequenceDiagram
    participant Python用户代码
    participant PyO3绑定层
    participant Rust引擎
    participant DifferentialDataflow
    participant TimelyDataflow
    
    Python用户代码->>PyO3绑定层: 调用Python API
    PyO3绑定层->>Rust引擎: 传递计算请求
    Rust引擎->>DifferentialDataflow: 创建数据流操作符
    DifferentialDataflow->>TimelyDataflow: 分布式执行
    TimelyDataflow-->>DifferentialDataflow: 返回计算结果
    DifferentialDataflow-->>Rust引擎: 增量更新结果
    Rust引擎-->>PyO3绑定层: 转换结果类型
    PyO3绑定层-->>Python用户代码: 返回Python对象

多工作进程支持

Pathway支持通过Timely Dataflow的多工作进程实现并行计算。可以通过命令行参数指定工作进程数量:

cargo run --example wordcount -- -w2

这将启动两个工作进程并行处理数据,每个进程独立消费输入数据并产生部分结果。

资料来源:external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md:1-25

总结

Pathway的Python与Rust集成架构充分发挥了两种语言的优势:

  • Python端:提供简洁的声明式API,开发者使用Python编写数据处理逻辑
  • Rust端:实现高性能的增量计算引擎,基于Differential Dataflow提供高效的流式处理
  • PyO3:作为桥梁,实现Python与Rust之间的无缝类型转换和函数调用
  • Timely Dataflow:提供分布式计算能力,支持多工作进程并行处理

资料来源:[src/python_api.rs:1-15]()

连接器概述

Pathway 是一个 Python ETL 框架,专为流处理、实时分析、LLM 管道和 RAG(检索增强生成)而设计 资料来源:README.md。连接器(Connector)是 Pathway 数据管道中的核心组件,负责数据的输入与输出操作。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 2.1 核心组件关系

继续阅读本节完整说明和来源证据。

章节 2.2 连接器模式

继续阅读本节完整说明和来源证据。

章节 3.1 PythonSubject 结构

继续阅读本节完整说明和来源证据。

1. 概述

Pathway 是一个 Python ETL 框架,专为流处理、实时分析、LLM 管道和 RAG(检索增强生成)而设计 资料来源:README.md。连接器(Connector)是 Pathway 数据管道中的核心组件,负责数据的输入与输出操作。

Pathway 连接器系统具有以下特点:

  • 多源数据支持:支持文件系统、数据库、云存储、消息队列等多种数据源
  • 实时与批处理:同一套 API 支持静态批处理和流式实时处理
  • Rust 引擎驱动:底层由 Rust 编写的可扩展引擎驱动,基于 Differential Dataflow 实现增量计算
  • Python 友好:通过 Python API 提供简洁易用的接口

2. 连接器架构

2.1 核心组件关系

graph TD
    subgraph "Python API 层"
        IO["pw.io 模块"]
        DS["DataSource"]
        DK["DataSink"]
    end
    
    subgraph "Rust 引擎层"
        CP["ConnectorProperties"]
        CM["ConnectorMode"]
        Engine["Pathway Engine"]
    end
    
    subgraph "数据源类型"
        S3["AWS S3"]
        Azure["Azure Blob"]
        CSV["CSV Parser"]
        MQTT["MQTT"]
        Kafka["Kafka/Confluent"]
    end
    
    IO --> DS
    IO --> DK
    DS --> CP
    DK --> CP
    CP --> CM
    CM --> Engine
    S3 --> CP
    Azure --> CP
    CSV --> CP
    MQTT --> CP

2.2 连接器模式

Pathway 支持两种连接器模式,用于区分数据处理方式 资料来源:src/python_api.rs:320-329

模式枚举值说明
静态模式ConnectorMode.STATIC批处理模式,一次性加载所有数据
流式模式ConnectorMode.STREAMING实时流处理,持续监听数据变化
#[pymethods]
impl PyConnectorMode {
    #[classattr]
    pub const STATIC: ConnectorMode = ConnectorMode::Static;
    #[classattr]
    pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}

3. 数据源(DataSource)

数据源是连接器系统中负责从外部系统读取数据的组件。在 Pathway 中,数据源通过 Python Subject 接口与 Rust 引擎交互。

3.1 PythonSubject 结构

PythonSubject 是 Python 层与 Rust 引擎之间的桥梁,封装了数据读取所需的所有回调函数 资料来源:src/python_api.rs:15-39

#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct PythonSubject {
    pub start: Py<PyAny>,              // 初始化回调
    pub read: Py<PyAny>,               // 读取数据回调
    pub seek: Py<PyAny>,               // 寻址回调(可选)
    pub on_persisted_run: Py<PyAny>,    // 持久化运行回调
    pub end: Py<PyAny>,                // 结束回调
    pub is_internal: bool,             // 是否为内部数据源
    pub deletions_enabled: bool,       // 是否启用删除操作
}

3.2 数据读取流程

sequenceDiagram
    participant App as 应用代码
    participant DS as DataSource
    participant Subject as PythonSubject
    participant Engine as Pathway Engine
    
    App->>DS: 创建数据源实例
    DS->>Subject: 调用 start() 初始化
    Engine->>Subject: 调用 read() 读取数据
    Subject-->>Engine: 返回数据批次
    loop 流式处理
        Engine->>Subject: 调用 read() 轮询新数据
        Subject-->>Engine: 返回更新数据
    end
    Engine->>Subject: 调用 end() 清理资源

4. 数据输出(DataSink)

数据 Sink 负责将处理后的数据输出到外部系统。Pathway 提供了多种输出连接器,包括文件系统、数据库和消息队列等。

4.1 输出配置参数

参数类型说明
hoststring服务器主机地址
portint服务器端口
formatDataFormat输出数据格式
storageDataStorage存储配置
create_tablebool是否创建输出表

5. 支持的连接器类型

5.1 云存储连接器

Pathway 原生支持主流云存储服务:

服务设置类说明
AWS S3AwsS3Settings亚马逊 S3 对象存储
Azure BlobAzureBlobStorageSettings微软 Azure Blob 存储

5.2 消息队列连接器

消息系统设置类说明
MQTTMqttSettingsIoT 场景常用的轻量级消息协议
KafkaSchemaRegistrySettingsConfluent Schema Registry 集成

5.3 数据库连接器

数据库设置类说明
PostgreSQLPsqlReplicationSettings支持 CDC(变更数据捕获)复制

5.4 其他专用连接器

  • ElasticSearchElasticSearchParams / ElasticSearchAuth — 全文搜索引擎集成
  • IcebergIcebergCatalogSettings — Apache Iceberg 表格式支持
  • Schema RegistryPySchemaRegistrySettings — Avro/Protobuf Schema 管理

5.5 解析器设置

m.add_class::<CsvParserSettings>()?;
m.add_class::<ValueField>()?;
m.add_class::<DataStorage>()?;
m.add_class::<DataFormat>()?;

6. 连接器配置属性

6.1 ConnectorProperties

连接器属性类定义了数据源和输出端的通用配置选项 资料来源:src/python_api.rs:450-465

属性说明
columns列定义列表
primary_key主键列
date_format日期格式字符串
value_fields值字段列表

6.2 ColumnProperties

列属性用于定义单个列的元数据:

pw.ColumnProperties(
    name="column_name",
    type_=pw.Type,
    source=pw.FieldSource.Payload,
    default=None,
    metadata=None
)

7. 使用示例

7.1 RAG 应用中的连接器使用

在问答型 RAG 应用中,Pathway 连接器用于实时索引文档并提供检索服务 资料来源:examples/projects/question-answering-rag/README.md

import pathway as pw

# 创建 HTTP 服务器用于接收查询
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)

# 数据源:文件系统连接器监听 ./data/ 目录
# 文档被分块、向量化后存储
# 输出:通过 REST API 提供 /v1/retrieve 检索接口

7.2 多智能体 RAG 中的连接器

在多智能体 RAG 架构中,连接器同时承担文档索引和智能体间通信的任务 资料来源:examples/projects/ag2-multiagent-rag/README.md

graph LR
    A["Documents (./data/)"] -->|实时索引| B["Pathway VectorStoreServer"]
    B -->|HTTP /v1/retrieve| C["AG2 Researcher Agent"]
    C -->|search_documents| B
    B -->|检索结果| D["AG2 Analyst Agent"]
    D -->|综合回答| E["用户"]

8. 连接器的高级特性

8.1 持久化配置

Pathway 支持连接器级别的持久化配置,用于故障恢复和状态管理:

m.add_class::<PersistenceConfig>()?;
m.add_class::<PyPersistenceMode>()?;
m.add_class::<PySnapshotAccess>()?;
m.add_class::<PySnapshotEvent>()?;

8.2 会话类型

Pathway 引擎支持不同类型的会话处理 资料来源:src/python_api.rs:334-345

会话类型枚举值使用场景
原生会话SessionType.NATIVE标准批处理和流处理
UPSERT 会话SessionType.UPSERT增量更新场景

8.3 监控与追踪

Pathway 提供了完善的监控机制,包括:

  • TelemetryConfig:遥测数据收集配置
  • BackfillingThreshold:回填数据阈值控制
  • EngineTrace:引擎执行追踪

9. 最佳实践

  1. 选择合适的连接器模式:静态数据使用 STATIC 模式,实时数据使用 STREAMING 模式
  2. 配置主键:为数据源设置主键以支持增量更新和去重
  3. 设置超时和重试:生产环境应配置适当的重试策略
  4. 监控连接器状态:使用 Pathway Dashboard 监控消息数量和延迟
  5. 处理删除操作:根据业务需求通过 deletions_enabled 参数控制是否处理数据删除

10. 相关文档

来源:https://github.com/pathwaycom/pathway / 项目说明书

数据库连接器

Pathway 的数据库连接器模块提供了与各类数据库系统交互的能力,支持从外部数据源读取数据并将处理结果写入目标数据库。作为 Pathway ETL 框架的核心组件之一,数据库连接器实现了实时的增量数据同步机制,能够在流处理和批处理场景下保持数据的一致性和时效性。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 连接器类型体系

继续阅读本节完整说明和来源证据。

章节 连接器模式

继续阅读本节完整说明和来源证据。

章节 关系型数据库

继续阅读本节完整说明和来源证据。

概述

Pathway 的数据库连接器模块提供了与各类数据库系统交互的能力,支持从外部数据源读取数据并将处理结果写入目标数据库。作为 Pathway ETL 框架的核心组件之一,数据库连接器实现了实时的增量数据同步机制,能够在流处理和批处理场景下保持数据的一致性和时效性。

Pathway 的数据库连接器基于 Rust 引擎构建,结合 Differential Dataflow 计算模型,实现了高效的增量计算能力。这意味着当源数据库中的数据发生变化时,连接器能够自动检测变更并仅处理发生变化的记录,而不是重新处理整个数据集。

架构设计

连接器类型体系

Pathway 在 src/python_api.rs 中定义了一套统一的连接器配置接口,通过 PyO3 绑定暴露给 Python 用户。连接器配置通过 ConnectorPropertiesColumnPropertiesTableProperties 等类进行封装,支持对数据源的精细化配置。

m.add_class::<ConnectorProperties>()?;
m.add_class::<ColumnProperties>()?;
m.add_class::<TableProperties>()?;

资料来源:src/python_api.rs:行

连接器模式

Pathway 支持两种连接器运行模式,通过 ConnectorMode 枚举定义:

模式说明
STATIC静态模式,适用于批处理场景,数据源仅读取一次
STREAMING流式模式,适用于实时数据处理,支持持续监听数据变更
#[pymethods]
impl PyConnectorMode {
    #[classattr]
    pub const STATIC: ConnectorMode = ConnectorMode::Static;
    #[classattr]
    pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}

资料来源:src/python_api.rs:行

支持的数据库类型

关系型数据库

Pathway 原生支持多种主流关系型数据库的连接:

  • PostgreSQL:通过 PsqlReplicationSettings 提供复制槽支持,用于捕获数据库变更
  • MySQL:支持标准的读取连接配置
  • Microsoft SQL Server:通过 MSSQL 连接器支持企业级应用

NoSQL 数据库

  • MongoDB:支持文档型数据库的实时读取和写入
  • Elasticsearch:提供全文检索场景下的数据索引能力

云存储与服务

Pathway 还集成了多种云服务的连接配置:

  • AWS S3AwsS3Settings 支持从 S3 存储桶读取数据文件
  • Azure Blob StorageAzureBlobStorageSettings 支持 Azure 云存储
  • Iceberg CatalogIcebergCatalogSettings 支持数据湖架构
m.add_class::<AwsS3Settings>()?;
m.add_class::<AzureBlobStorageSettings>()?;
m.add_class::<ElasticSearchParams>()?;
m.add_class::<IcebergCatalogSettings>()?;
m.add_class::<PsqlReplicationSettings>()?;

资料来源:src/python_api.rs:行

数据处理流程

graph TD
    A[外部数据库] --> B[连接器读取层]
    B --> C[变更捕获]
    C --> D[Rust 引擎处理]
    D --> E[增量计算]
    E --> F[下游输出]
    
    G[Schema 定义] --> D
    H[连接配置] --> B

读取流程

  1. 连接初始化:连接器根据配置参数建立与目标数据库的连接
  2. Schema 映射:通过 ValueField 定义输入数据的字段映射和类型转换
  3. 变更捕获:在流式模式下持续监听数据变更事件
  4. 数据传输:变更数据通过 Pathway 引擎进行增量处理

写入流程

  1. 输出表定义:通过 is_output=True 标记输出计算节点
  2. 目标配置:指定目标数据库的连接信息和写入策略
  3. 批量写入:引擎自动将处理结果批量写入目标系统

Schema 定义

Pathway 使用 ValueField 结构体定义数据的 Schema 映射:

#[pyclass(module = "pathway.engine")]
#[derive(Clone)]
pub struct ValueField {
    #[pyo3(get)]
    pub name: String,
    #[pyo3(get)]
    pub type_: Type,
    #[pyo3(get)]
    pub source: FieldSource,
    #[pyo3(get)]
    pub default: Option<Value>,
    #[pyo3(get)]
    pub metadata: Option<String>,
}

资料来源:src/python_api.rs:行

字段属性说明:

属性类型说明
nameString字段名称
type_Type数据类型
sourceFieldSource字段来源(Payload/Materialized)
defaultOption<Value>默认值
metadataOption<String>元数据信息

时间戳与前沿管理

Pathway 通过 TotalFrontier<T> 管理数据处理的时间戳前沿,实现精确的增量计算:

#[pymethods]
impl PyExportedTable {
    fn frontier(&self) -> TotalFrontier<Timestamp> {
        self.inner.frontier()
    }

    fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
        self.inner.snapshot_at(frontier)
    }
}

资料来源:src/python_api.rs:行

TotalFrontier 有两种状态:

  • At(t):表示处理到达时间戳 t,继续处理后续数据
  • Done:表示数据处理完成,不再有新数据到达
impl<'py, T> FromPyObject<'py> for TotalFrontier<T> {
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        if ob.is_instance_of::<Done>() {
            Ok(TotalFrontier::Done)
        } else {
            Ok(TotalFrontier::At(ob.extract()?))
        }
    }
}

资料来源:src/python_api.rs:行

使用示例

基本配置流程

import pathway as pw

# 定义输入连接器
class InputSchema(pw.Schema):
    id: int
    value: str

# 配置 PostgreSQL 输入
input_connector = pw.io.postgres.read(
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="password",
    table="source_table",
    schema=InputSchema,
    mode=pw ConnectorMode.STREAMING
)

# 定义处理逻辑
result = input_connector.groupby(pw.this.id).reduce(
    id=pw.this.id,
    count=pw.reducers.count()
)

# 配置输出连接器
pw.io.postgres.write(
    result,
    host="localhost",
    port=5432,
    database="mydb",
    user="user",
    password="password",
    table="output_table"
)

# 运行管道
pw.run()

与向量存储集成

Pathway 的数据库连接器可以与向量索引功能结合使用,实现 RAG(检索增强生成)场景:

graph LR
    A[PostgreSQL/MySQL] --> B[Pathway 引擎]
    B --> C[向量嵌入]
    C --> D[向量索引]
    D --> E[LLM 查询]

监控与故障处理

Pathway 的 PyExportedTable 提供了内置的监控接口:

#[pymethods]
impl PyExportedTable {
    fn failed(&self) -> bool {
        self.inner.failed()
    }
}

资料来源:src/python_api.rs:行

通过检查 failed() 状态,用户可以及时发现数据处理过程中的异常情况,并采取相应的恢复措施。

性能优化建议

  1. 批量处理:利用 Pathway 的增量计算特性,避免全量数据重处理
  2. 索引优化:确保源数据库的相关字段已建立索引
  3. 连接池:合理配置连接池大小以平衡资源使用和吞吐量
  4. Schema 精简:仅提取必要的字段,减少数据传输量

资料来源:[src/python_api.rs:行](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

流数据连接器

Pathway的流数据连接器(Stream Connectors)是Framework与外部数据源之间进行实时数据交换的核心组件。这些连接器使得Pathway能够从Kafka、NATS、MQTT、RabbitMQ、Redpanda等消息队列系统实时读取数据,并将处理结果写入各种数据sink。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 连接器模式体系

继续阅读本节完整说明和来源证据。

章节 PythonSubject核心接口

继续阅读本节完整说明和来源证据。

章节 字段定义与数据类型

继续阅读本节完整说明和来源证据。

概述

Pathway的流数据连接器(Stream Connectors)是Framework与外部数据源之间进行实时数据交换的核心组件。这些连接器使得Pathway能够从Kafka、NATS、MQTT、RabbitMQ、Redpanda等消息队列系统实时读取数据,并将处理结果写入各种数据sink。

Pathway的连接器系统构建在Differential Dataflow增量计算引擎之上,支持两种主要的运行模式:静态模式(Static)和流式模式(Streaming)。静态模式适用于批处理场景,而流式模式则专为实时数据处理设计,能够处理持续到来的数据更新。连接器系统还支持增量计算的语义,允许系统只处理自上次处理以来的变更,而不是重新处理整个数据集。

核心架构

连接器模式体系

Pathway定义了一套完整的连接器模式枚举,用于区分不同的数据处理语义:

class ConnectorMode(Enum):
    STATIC = auto()    # 静态批处理模式
    STREAMING = auto() # 实时流处理模式

每种模式对应不同的数据处理策略。静态模式下,连接器假设数据源是有限的,处理完成后系统进入完成状态。流式模式下,连接器持续监听数据源,系统永远不会自然结束,除非外部终止。

PythonSubject核心接口

所有自定义数据源连接器都继承自PythonSubject基类,该类定义了数据源的核心接口:

class PythonSubject:
    def __init__(
        self,
        start: Callable[[], None],           # 启动数据源
        read: Callable[[], Iterator[Row]],   # 读取数据行
        seek: Callable[[Any], None],         # 定位到指定位置
        on_persisted_run: Callable[[], None], # 持久化运行回调
        end: Callable[[], bool],             # 检查是否结束
        is_internal: bool,                   # 是否内部数据源
        deletions_enabled: bool              # 是否启用删除
    )

资料来源:src/python_api.rs:21-31

PythonSubject的设计体现了Pathway连接器的核心哲学:数据源被抽象为一个可以随时启动、读取、定位和检查完成状态的可迭代对象。这种设计使得连接器能够与Differential Dataflow的计算模型无缝集成。

字段定义与数据类型

ValueField类定义了表中每个字段的元信息,包括名称、类型、来源和默认值:

class ValueField:
    name: str           # 字段名称
    type_: Type         # 数据类型
    source: FieldSource # 字段来源 (默认: Payload)
    default: Optional[Value]  # 默认值
    metadata: Optional[str]   # 元数据

FieldSource枚举标识了数据的来源类型,最常用的是FieldSource.Payload,表示数据来自消息的有效载荷。在复杂的流处理场景中,字段来源还可以标识时间戳、主键等特殊字段。

工作原理

数据流处理管道

Pathway的流数据连接器遵循一个清晰的数据处理管道:

graph TD
    A[外部数据源] --> B[Connector启动]
    B --> C[数据读取循环]
    C --> D{数据有效?}
    D -->|是| E[解析消息]
    D -->|否| F[跳过/日志]
    E --> G[转换为Pathway Row]
    G --> H[输入到Differential Dataflow]
    H --> I[增量计算]
    I --> J[结果输出到Sink]
    J --> C
    F --> C

增量计算与时间戳

Pathway的连接器系统深度集成了Differential Dataflow的增量计算能力。每个数据更新都带有时间戳,系统只处理自上次检查点以来的变更。Timestamp类型实现了OriginalOrRetraction特性,用于区分原始数据和撤回数据:

class Timestamp:
    def is_original(self) -> bool:
        # 偶数时间戳表示原始数据
        return self.0.is_multiple_of(2)
    
    def next_retraction_time(self) -> Self:
        # 奇数时间戳表示撤回
        return Self(self.0 + 1)

资料来源:src/engine/timestamp.rs:45-53

这种设计允许连接器高效处理数据的插入、更新和删除操作,而无需重新处理整个数据集。

持久化与状态恢复

连接器支持持久化运行模式,通过on_persisted_run回调函数实现故障恢复:

  1. 检查点保存:系统在适当的时间点保存当前的处理状态
  2. 故障恢复:重新启动时,从最后一个检查点恢复
  3. 数据重放:根据保存的seek位置,重新从数据源读取未处理的数据

这种机制确保了在分布式环境中,即使发生节点故障,数据也不会丢失,处理可以透明地继续。

预置连接器

Pathway提供了多个开箱即用的流数据连接器,覆盖了主流的消息队列系统:

连接器协议典型用途文档链接
KafkaTCP二进制协议大规模流处理kafka/__init__.py
RedpandaKafka兼容API高性能流处理redpanda/__init__.py
NATSNATS协议轻量级消息传递nats/__init__.py
MQTTMQTT协议IoT设备数据采集mqtt/__init__.py
RabbitMQAMQP协议企业消息队列rabbitmq/__init__.py

连接器配置参数

每种连接器都支持一组通用的配置参数:

参数类型说明默认值
modeConnectorMode运行模式STREAMING
deletions_enabledbool是否处理删除事件false
autocommit_durationtimedelta自动提交间隔None
timeouttimedelta连接超时时间30秒
schemaValueField[]输出表模式自动推断

使用场景

实时文档索引

Pathway的连接器常用于构建实时RAG(检索增强生成)系统。以下架构展示了典型的用法:

graph LR
    A[文档数据源] --> B[Pathway连接器]
    B --> C[数据预处理]
    C --> D[文本分块]
    D --> E[嵌入模型]
    E --> F[向量索引]
    F --> G[HTTP API]
    G --> H[LLM查询]

资料来源:examples/projects/ag2-multiagent-rag/README.md:12-20

在ag2-multiagent-rag项目中,Pathway连接器负责监听文件系统中的文档变化,实时索引新文档并更新向量存储。外部AI代理通过REST API查询相关文档,系统保证始终返回最新的索引结果。

Web数据抓取管道

Pathway连接器也广泛用于构建数据采集管道:

graph TD
    A[新闻网站] --> B[Web爬虫连接器]
    B --> C[内容解析]
    C --> D[元数据提取]
    D --> E[数据清洗]
    E --> F[JSON Lines输出]
    F --> G[下游处理]

资料来源:examples/projects/web-scraping/README.md:1-50

自定义的NewsScraperSubject继承自Pathway的ConnectorSubject,实现了文章URL作为主键的数据表结构,支持增量更新和去重。

企业数据集成

对于企业级应用,Pathway的RabbitMQ和Kafka连接器能够与现有的消息基础设施无缝集成:

import pathway as pw

class MyConnectorSubject(pw.io.ConnectorSubject):
    def __init__(self):
        super().__init__()
        self._client = create_rabbitmq_client()
    
    def start(self):
        self._client.connect()
    
    def read(self):
        while message := self._client.consume():
            yield self._parse_message(message)
    
    def end(self):
        return self._client.is_idle_timeout()

与Differential Dataflow的集成

Pathway的连接器系统是Differential Dataflow在Python生态系统中的桥接层。Rust核心负责:

  1. 数据流的组织:使用Trace数据结构维护历史状态
  2. 增量计算:只处理实际发生的变更,而非全量数据
  3. 多 worker 支持:通过pathway spawn --threads N启用多线程处理

Python API提供了高级抽象,降低了使用门槛,同时保持了Rust引擎的高性能特性。这种设计使得开发者能够用Python编写流处理逻辑,而底层执行效率与原生Rust程序相当。

最佳实践

模式定义

在创建连接器时,应显式定义输出表的schema,包括所有字段的名称、类型和默认值:

schema = pw.Schema(
    fields={
        "id": pw.FieldType.STRING,
        "content": pw.FieldType.STRING,
        "timestamp": pw.FieldType.DATETIME,
    },
    primary_key=["id"]
)

错误处理

连接器应实现健壮的错误处理机制:

def read(self):
    try:
        yield from self._fetch_data()
    except TransientError:
        # 临时错误,等待后重试
        time.sleep(1)
        yield from self.read()
    except PermanentError:
        # 永久错误,标记连接器失败
        self._mark_failed()

性能优化

  1. 批量处理:积累多条消息后再提交,减少I/O次数
  2. 背压控制:当处理速度跟不上数据到达速度时,使用BackfillingThreshold配置
  3. 连接池复用:在多worker场景下复用连接,避免资源耗尽

总结

Pathway的流数据连接器系统提供了一套完整的数据集成解决方案。通过统一的抽象接口,开发者可以轻松地将各种外部数据源接入Pathway的处理管道。连接器与Differential Dataflow的深度集成确保了即使在处理大规模实时数据时,也能保持高效的增量计算能力。

预置的Kafka、Redpanda、NATS、MQTT和RabbitMQ连接器覆盖了大多数流数据场景,而PythonSubject基类则允许开发者实现完全自定义的数据源连接器,满足各种特殊的集成需求。

资料来源:[src/python_api.rs:21-31]()

表操作

Pathway 是一个基于 Python 的 ETL 框架,其核心数据模型围绕表(Table)展开。表操作是 Pathway 数据处理流水线的基本构建单元,提供了对结构化数据进行转换、过滤、聚合和连接的能力。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 Table 类

继续阅读本节完整说明和来源证据。

章节 列(Column)与字段(ValueField)

继续阅读本节完整说明和来源证据。

章节 类型系统

继续阅读本节完整说明和来源证据。

概述

Pathway 是一个基于 Python 的 ETL 框架,其核心数据模型围绕表(Table)展开。表操作是 Pathway 数据处理流水线的基本构建单元,提供了对结构化数据进行转换、过滤、聚合和连接的能力。

Pathway 的表操作具有以下特性:

  • 统一批流处理:同一套 API 可处理静态数据集和实时流数据
  • 增量计算:基于 Rust 的 Differential Dataflow 引擎实现高效增量更新
  • 声明式语法:通过 Python 表达式定义数据转换逻辑

资料来源:README.md:1-20

核心数据结构

Table 类

Table 是 Pathway 中的核心数据结构,代表一个具有特定模式的分布式表格。

classDiagram
    class Table {
        +scope: Py[Scope]
        +handle: TableHandle
        +_is_deleted: bool
        +_app_id: str
        +__getitem__(key) TableSlice
        +select(*args) Table
        +update_rows(*args) Table
        +update_cells(*args) Table
        +filter(predicate) Table
        +groupby(*args) Table
        +join(*args) Table
        +join_inner(*args) Table
        +concat(*tables) Table
    }
    
    class LegacyTable {
        +universe: Universe
        +columns: Vec~Column~
        +to_engine() (UniverseHandle, Vec~ColumnHandle~)
        +from_handles(scope, handles) LegacyTable
    }
    
    Table --> Scope : 关联
    LegacyTable --> Universe : 关联
    LegacyTable --> Column : 包含

列(Column)与字段(ValueField)

每张表由多个列组成,每个列具有名称、类型和来源信息。

# ValueField 定义 - 资料来源:src/python_api.rs:40-55
@dataclass
class ValueField:
    name: str                    # 列名
    type_: Type                  # 数据类型
    source: FieldSource          # 字段来源(Payload/Materialized)
    default: Optional[Value]     # 默认值
    metadata: Optional[str]      # 元数据

类型系统

类型说明对应 Rust 类型
INT64位整数i64
FLOAT双精度浮点f64
STR字符串String
BOOL布尔值bool
ANY任意类型-

资料来源:src/python_api.rs:40-55

表的创建与初始化

从输入连接器创建

Pathway 提供多种连接器用于从不同数据源创建表:

# 从文件系统创建表
table = pw.io.fs.read(
    "./data",
    schema=MySchema,
    format="csv"
)

# 从 HTTP 流读取
table = pw.io.http.read(
    "http://api.example.com/stream",
    schema=MySchema,
    json_field_paths={"data": "$.records[*]"}
)

表的内部状态

graph TD
    A[输入数据] --> B[Python Subject]
    B --> C[Raw Updates]
    C --> D[Rust Engine]
    D --> E[Universe Handle]
    D --> F[Column Handles]
    E --> G[Table]
    F --> G
    G --> H[快照 Snapshot]

表的内部状态由 UniverseColumn 集合组成:

# LegacyTable 内部结构 - 资料来源:src/python_api.rs:180-210
class LegacyTable:
    def __init__(self, universe, columns):
        self.universe = universe    # 行的唯一标识集合
        self.columns = columns      # 列的列表
    
    def to_engine(self):
        """转换为引擎句柄"""
        universe = self.universe.get()
        column_handles = [c.get().handle for c in self.columns]
        return (universe.handle, column_handles)

基础表操作

选择操作(select)

select 用于从表中提取指定列,可进行计算和重命名:

# 基本选择
result = table.select(
    id=table.user_id,
    name=table.name,
    full_name=table.first_name + " " + table.last_name  # 表达式计算
)

过滤操作(filter)

filter 根据条件筛选行:

# 过滤条件
active_users = users.filter(users.status == "active")
premium_orders = orders.filter(orders.amount > 100)

行更新(update_rows)

向表中添加或更新行:

# 添加新行
updated_table = table.update_rows(
    pw.table.builder.make_table(
        id=[1, 2],
        name=["Alice", "Bob"],
        value=[10, 20]
    )
)

单元格更新(update_cells)

更新特定列的值:

# 更新特定列
result = table.update_cells(
    status=pw.this.status.upper(),  # 将 status 列转为大写
    _id=table.id                     # 保留原 id 列
)

资料来源:python/pathway/internals/table.py

聚合与分组

分组聚合(groupby)

groupby 按照指定键分组并执行聚合操作:

graph LR
    A[输入表] --> B[GroupBy Key]
    B --> C[每组聚合]
    C --> D[聚合函数]
    D --> E[输出表]
    
    subgraph 聚合函数
        F[count]
        G[sum]
        H[min/max]
        I[collect]
    end
    D --> F
    D --> G
    D --> H
    D --> I
# 按类别聚合
category_stats = orders.groupby(orders.category).reduce(
    orders.category,
    total_amount=pw.reducers.sum(orders.amount),
    order_count=pw.reducers.count(),
    avg_price=pw.reducers.mean(orders.price)
)

可用聚合函数

函数说明示例
count()计数count()
sum(expr)求和sum(table.amount)
mean(expr)平均值mean(table.score)
min(expr)最小值min(table.latency)
max(expr)最大值max(table.price)
collect(expr)收集为列表collect(table.items)
sorted_tuple(expr)排序元组sorted_tuple(table.events)

表连接操作

连接的种类

连接类型说明空值处理
join / join_inner内连接丢弃无匹配的行
join_left左外连接保留左表所有行
join_right右外连接保留右表所有行
join_outer全外连接保留所有行

基本连接语法

# 内连接
result = table1.join(
    table2,
    table1.key == table2.key
)

# 指定输出列
result = table1.join(
    table2,
    table1.id == table2.user_id,
    left=pw.left.id,
    right=pw.right.user_id,
    prefix="left_"
)

复杂连接场景

# 多条件连接
orders_enriched = orders.join(
    customers,
    (orders.customer_id == customers.id) & 
    (orders.region == customers.region)
)

时间与版本控制

时间戳机制

Pathway 使用特殊的时间戳系统来追踪数据的版本和变更:

# Timestamp 实现 - 资料来源:src/engine/timestamp.rs:1-30
impl Timestamp {
    fn followed_by(&self, other: &Self) -> Option<Self> {
        self.0.followed_by(&other.0).map(Self)
    }
}

数据变更追踪

时间线:  ─────────────────────────────────────────────►
         t0      t1      t2      t3      t4
         │       │       │       │       │
         ▼       ▼       ▼       ▼       ▼
数据:   [v0]   [v1]   [v2]   [撤回]   [v4]
              (修改)  (修改)  (删除)

Pathway 使用 Differential Dataflow 实现增量更新,仅处理发生变化的数据。

表的快照与导出

快照操作(snapshot)

获取表在特定时间点的状态:

# 获取当前快照
snapshot_data = table.snapshot()

# 在特定前沿时间获取快照 - 资料来源:src/python_api.rs:220-240
frontier = TotalFrontier::At(timestamp)
snapshot = exported_table.snapshot_at(frontier)

外部索引集成

# 创建向量索引用于 RAG
table = pw.Table.from_columns(
    id=range(1000),
    content=texts,
    vector=embeddings
)

# 构建外部索引
table = table.groupby(table.id).reduce(
    table.id,
    content=pw.reducers.torch(
        "embedding",
        vector=table.vector,
        mode="mean"
    )
)

最佳实践

性能优化

  1. 合理设计模式:避免过度规范化
  2. 使用适当的数据类型:优先使用原生类型而非字符串
  3. 批量操作:优先使用 update_rows 而非逐行更新
  4. 索引列:在频繁连接的列上建立索引

常见模式

# 模式1:流式处理 + 批处理混合
stream_table = pw.io.kafka.read(...)
batch_table = pw.io.fs.read("./backup", ...)

combined = stream_table.concat(batch_table)
# 模式2:滑动窗口聚合
windowed = events.groupby(
    events.session_id
).reduce(
    events.session_id,
    time_range=pw.reducers.count(),
    events_list=pw.reducers.collect(
        pw.this.event_data
    )[-100:]  # 最近100个事件
)

与其他模块的关系

graph TD
    Table[表操作] --> Column[列操作]
    Table --> Expression[表达式]
    Table --> Connector[连接器]
    Column --> Expression
    Expression --> Compute[计算引擎]
    Connector --> Input[输入数据]
    Compute --> Output[输出数据]
    Compute --> Persistence[持久化]
  • 列操作column.py):定义列级别的表达式和转换
  • 表达式expression.py):处理数据计算和条件逻辑
  • 连接器pw.io.*):负责数据的输入输出

错误处理

失败状态检测

# 检查表是否处于失败状态
if table.failed():
    logger.error("数据处理失败,请检查输入数据")

追踪与调试

# 获取错误追踪信息
class Trace:
    file_name: str
    line_number: int
    line: str
    function: str

小结

Pathway 的表操作模块构成了整个框架的核心,提供了:

  • 统一的数据模型:通过 Table 类实现批流统一的表格操作
  • 声明式的转换语法:使用 Python 表达式定义复杂的数据处理逻辑
  • 高效的增量计算:基于 Differential Dataflow 的增量更新机制
  • 丰富的连接和聚合能力:支持多种连接类型和聚合函数

熟练掌握表操作是构建高效 Pathway 数据处理流水线的基础。

资料来源:[README.md:1-20]()

连接与聚合

Pathway 是一个基于 Python 的 ETL 框架,其核心由 Rust 引擎驱动,底层采用 Differential Dataflow 实现增量计算。连接(Join)与聚合(Aggregation) 是 Pathway 数据处理管线中的两个核心操作,它们使得实时流处理和批处理能够以统一的方式进行。

章节 相关页面

继续阅读本节完整说明和来源证据。

连接与聚合

Pathway 是一个基于 Python 的 ETL 框架,其核心由 Rust 引擎驱动,底层采用 Differential Dataflow 实现增量计算。连接(Join)与聚合(Aggregation) 是 Pathway 数据处理管线中的两个核心操作,它们使得实时流处理和批处理能够以统一的方式进行。

来源:https://github.com/pathwaycom/pathway / 项目说明书

失败模式与踩坑日记

保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。

high 来源证据:[Bug]: TypeError: Cannot instantiate typing.Any

可能增加新用户试用和生产接入成本。

high 来源证据:Entra Authentication Support (credential handler and/or password callback)

可能影响授权、密钥配置或安全边界。

medium 来源证据:Automated schema exploration in input connectors

可能增加新用户试用和生产接入成本。

medium 来源证据:Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions

可能阻塞安装或首次运行。

Pitfall Log / 踩坑日志

项目:pathwaycom/pathway

摘要:发现 22 个潜在踩坑项,其中 2 个为 high/blocking;最高优先级:安装坑 - 来源证据:[Bug]: TypeError: Cannot instantiate typing.Any。

1. 安装坑 · 来源证据:[Bug]: TypeError: Cannot instantiate typing.Any

  • 严重度:high
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:[Bug]: TypeError: Cannot instantiate typing.Any
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_d63b2462b18449c4a2c02bb5e02a96c8 | https://github.com/pathwaycom/pathway/issues/227 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

2. 安全/权限坑 · 来源证据:Entra Authentication Support (credential handler and/or password callback)

  • 严重度:high
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Entra Authentication Support (credential handler and/or password callback)
  • 对用户的影响:可能影响授权、密钥配置或安全边界。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_e7c65f4d8bce4b1a9c99accbf0457633 | https://github.com/pathwaycom/pathway/issues/230 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

3. 安装坑 · 来源证据:Automated schema exploration in input connectors

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Automated schema exploration in input connectors
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_74b91feb391f4b0298216d2a48fe5abe | https://github.com/pathwaycom/pathway/issues/224 | 来源类型 github_issue 暴露的待验证使用条件。

4. 安装坑 · 来源证据:Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_ab5d2b2076034999bfaed612a3d95d60 | https://github.com/pathwaycom/pathway/issues/232 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

5. 安装坑 · 来源证据:Improve watermarks in POSIX-like objects tracker

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Improve watermarks in POSIX-like objects tracker
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_38f29b1fba3b41cc99b1364d15ef7213 | https://github.com/pathwaycom/pathway/issues/225 | 来源讨论提到 node 相关条件,需在安装/试用前复核。

6. 安装坑 · 来源证据:Persistence in `iterate` operator

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Persistence in iterate operator
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_e642d639ebbc4382b242028ef89ec236 | https://github.com/pathwaycom/pathway/issues/214 | 来源类型 github_issue 暴露的待验证使用条件。

7. 安装坑 · 来源证据:Support LEANN for RAG pipelines

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Support LEANN for RAG pipelines
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_5d99dd1cfc794b299633b6c5dc9dd33b | https://github.com/pathwaycom/pathway/issues/173 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

8. 安装坑 · 来源证据:Support MongoDB Atlas in pw.io.mongodb

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Support MongoDB Atlas in pw.io.mongodb
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_fa1d6e2e977a45d99c414724681f0f32 | https://github.com/pathwaycom/pathway/issues/221 | 来源类型 github_issue 暴露的待验证使用条件。

9. 安装坑 · 来源证据:feat: Add Microsoft SQL Server (MSSQL) connector

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:feat: Add Microsoft SQL Server (MSSQL) connector
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_370bcc864a314734b602f01d3d444ef9 | https://github.com/pathwaycom/pathway/issues/204 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

10. 安装坑 · 来源证据:v0.27.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.27.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_8d6905d8719c488cbcbb821e794add26 | https://github.com/pathwaycom/pathway/releases/tag/v0.27.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

11. 安装坑 · 来源证据:v0.29.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.29.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_b25babd3e3cc49108e56daaaaae8c5bf | https://github.com/pathwaycom/pathway/releases/tag/v0.29.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

12. 安装坑 · 来源证据:v0.30.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:v0.30.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_0291ee068e4e4d38aa86d0fd433b960a | https://github.com/pathwaycom/pathway/releases/tag/v0.30.0 | 来源讨论提到 python 相关条件,需在安装/试用前复核。

13. 配置坑 · 来源证据:v0.28.0

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:v0.28.0
  • 对用户的影响:可能影响升级、迁移或版本选择。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_708184d9c8ac445d923c82d38af7bd19 | https://github.com/pathwaycom/pathway/releases/tag/v0.28.0 | 来源类型 github_release 暴露的待验证使用条件。

14. 配置坑 · 来源证据:v0.30.1

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个配置相关的待验证问题:v0.30.1
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_e7a19bd03a49499e987781160e4b76f0 | https://github.com/pathwaycom/pathway/releases/tag/v0.30.1 | 来源类型 github_release 暴露的待验证使用条件。

15. 能力坑 · 能力判断依赖假设

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:README/documentation is current enough for a first validation pass.
  • 对用户的影响:假设不成立时,用户拿不到承诺的能力。
  • 建议检查:将假设转成下游验证清单。
  • 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
  • 证据:capability.assumptions | github_repo:571186647 | https://github.com/pathwaycom/pathway | README/documentation is current enough for a first validation pass.

16. 运行坑 · 来源证据:`pw.io.mssql.read` needs LSN persistence

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个运行相关的待验证问题:pw.io.mssql.read needs LSN persistence
  • 对用户的影响:可能增加新用户试用和生产接入成本。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_59d927a667a843ccb7d0a4cd147a1dc0 | https://github.com/pathwaycom/pathway/issues/218 | 来源类型 github_issue 暴露的待验证使用条件。

17. 维护坑 · 维护活跃度未知

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:未记录 last_activity_observed。
  • 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
  • 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
  • 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
  • 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | last_activity_observed missing

18. 安全/权限坑 · 下游验证发现风险项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:下游已经要求复核,不能在页面中弱化。
  • 建议检查:进入安全/权限治理复核队列。
  • 防护动作:下游风险存在时必须保持 review/recommendation 降级。
  • 证据:downstream_validation.risk_items | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

19. 安全/权限坑 · 存在评分风险

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:风险会影响是否适合普通用户安装。
  • 建议检查:把风险写入边界卡,并确认是否需要人工复核。
  • 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
  • 证据:risks.scoring_risks | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

20. 安全/权限坑 · 来源证据:Support encryption in Kinesis connectors

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题:Support encryption in Kinesis connectors
  • 对用户的影响:可能影响授权、密钥配置或安全边界。
  • 建议检查:来源问题仍为 open,Pack Agent 需要复核是否仍影响当前版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_f29097c3eced4a209e7c80971aeea40c | https://github.com/pathwaycom/pathway/issues/223 | 来源类型 github_issue 暴露的待验证使用条件。

21. 维护坑 · issue/PR 响应质量未知

  • 严重度:low
  • 证据强度:source_linked
  • 发现:issue_or_pr_quality=unknown。
  • 对用户的影响:用户无法判断遇到问题后是否有人维护。
  • 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
  • 防护动作:issue/PR 响应未知时,必须提示维护风险。
  • 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | issue_or_pr_quality=unknown

22. 维护坑 · 发布节奏不明确

  • 严重度:low
  • 证据强度:source_linked
  • 发现:release_recency=unknown。
  • 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
  • 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
  • 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
  • 证据:evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | release_recency=unknown

来源:Doramagic 发现、验证与编译记录