# https://github.com/pathwaycom/pathway 项目说明书

生成时间：2026-05-16 21:24:03 UTC

## 目录

- [Pathway Introduction](#page-introduction)
- [Core Concepts](#page-concepts)
- [System Architecture](#page-architecture)
- [Python API Structure](#page-python-api)
- [Data Model and Type System](#page-data-model)
- [Data Connectors](#page-connectors)
- [Custom Connectors](#page-connectors-custom)
- [Data Transformations](#page-transformations)
- [Temporal and Time-Based Processing](#page-temporal-processing)
- [LLM xPack](#page-llm-xpack)

<a id='page-introduction'></a>

## Pathway Introduction

### 相关页面

相关主题：[Core Concepts](#page-concepts), [System Architecture](#page-architecture)

<details>
<summary>Relevant Source Files</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)
- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md)
- [examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
</details>

# Pathway Introduction

Pathway is a Python ETL framework designed for stream processing, real-time analytics, LLM pipelines, and Retrieval-Augmented Generation (RAG). It provides an easy-to-use Python API that seamlessly integrates with popular Python ML libraries, enabling developers to build robust data processing pipelines that work in both development and production environments.

## Overview

Pathway combines the flexibility of Python with the performance of a scalable Rust engine built on Differential Dataflow. The framework performs incremental computation, allowing it to efficiently handle both batch and streaming data with the same code base.

### Key Characteristics

| Feature | Description |
|---------|-------------|
| **Language** | Python with Rust engine |
| **Processing Model** | Incremental computation via Differential Dataflow |
| **Data Modes** | Batch processing and streaming |
| **Deployment** | Local development, CI/CD, Docker, production |
| **License** | BSL (Business Source License) |

资料来源：[README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

## Architecture

Pathway's architecture leverages a multi-layered design that separates the Python interface from the high-performance Rust computation engine.

```mermaid
graph TD
    subgraph Python_Interface["Python Layer"]
        A[User Code] --> B[pathway API]
        B --> C[Python Subject]
        B --> D[Python Connectors]
    end
    
    subgraph Rust_Engine["Rust Engine"]
        E[Expression Engine] --> F[Differential Dataflow]
        F --> G[Timely Dataflow]
        C --> E
        D --> E
    end
    
    subgraph Output["Output Layer"]
        G --> H[Exported Tables]
        H --> I[Snapshot Data]
    end
```

### Core Components

| Component | Role |
|-----------|------|
| **PythonSubject** | Wraps Python callbacks for data input |
| **ValueField** | Defines schema fields with types and defaults |
| **PyExpression** | Encapsulates expressions with optional GIL release |
| **PyExportedTable** | Provides snapshot and frontier access to output data |
| **ConnectorMode** | Distinguishes between STATIC and STREAMING modes |

资料来源：[src/python_api.rs:25-35](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs#L25-L35)

## Data Model

### Schema Fields

Pathway uses `ValueField` to define table schemas with the following properties:

```python
@dataclass
class ValueField:
    name: str           # Field identifier
    type_: Type         # Data type
    source: FieldSource # Origin of the field (Payload, Key, etc.)
    default: Optional[Value]  # Default value if not present
    metadata: Optional[str]   # Additional metadata as JSON string
```

The `source` field determines how a field is populated:

| FieldSource | Description |
|-------------|-------------|
| `FieldSource.Payload` | Field comes from the input data |
| `FieldSource.Key` | Field is used as a key identifier |
| `FieldSource.PSEUDO` | Synthetic field generated by the system |

资料来源：[src/python_api.rs:47-55](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs#L47-L55)

### Timestamps and Ordering

Pathway implements timestamp-based ordering for incremental computation. Each timestamp has special semantics for distinguishing original data from retractions.

```rust
impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}

impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}
```

The timestamp system ensures that even-numbered timestamps represent original data while odd-numbered timestamps represent retractions, enabling efficient differential updates.

资料来源：[src/engine/timestamp.rs:48-60](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs#L48-L60)

## Processing Modes

Pathway supports two distinct connector modes:

### Static Mode

In static mode, data is processed in batches and the pipeline completes once all input is consumed.

```python
pw.io.http.PathwayWebserver(mode=pw.io.http.SocketWriterMode.STATIC)
```

### Streaming Mode

In streaming mode, the pipeline remains active and continuously processes incoming data updates.

```python
pw.io.http.PathwayWebserver(mode=pw.io.http.SocketWriterMode.STREAMING)
```

| Mode | Use Case | Behavior |
|------|----------|----------|
| `STATIC` | Batch jobs, one-off processing | Exits after all data processed |
| `STREAMING` | Live data, real-time analytics | Continuously processes updates |

资料来源：[src/python_api.rs:280-290](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs#L280-L290)

## Expression Engine

Pathway's expression engine supports both unary and binary operations on data. The engine is implemented in Rust for performance but callable from Python.

```mermaid
graph LR
    A[PyExpression] -->|unary_op!| B[Unary Expression]
    A -->|binary_op!| C[Binary Expression]
    B --> D[Result Expression]
    C --> D
```

### Supported Operations

The `PyExpression` class wraps Rust expressions and tracks whether the GIL (Global Interpreter Lock) should be held during evaluation:

```rust
pub struct PyExpression {
    inner: Arc<Expression>,
    gil: bool,  // Whether GIL is required
}
```

Operations automatically set the `gil` flag based on whether any operand requires the Python GIL, optimizing performance when Python code is not involved.

资料来源：[src/python_api.rs:360-375](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs#L360-L375)

## Frontier and Snapshots

Pathway uses the concept of "frontiers" to track progress in incremental computation. The `TotalFrontier` enum represents the state of computation:

```rust
pub enum TotalFrontier<T> {
    At(T),      // Computation at specific timestamp
    Done,       // Computation complete
}
```

### Snapshot Access

The `PyExportedTable` provides methods to access data at specific frontiers:

| Method | Description |
|--------|-------------|
| `frontier()` | Returns the current computation frontier |
| `snapshot_at(frontier)` | Returns all key-value pairs up to the specified frontier |
| `failed()` | Indicates if the computation has failed |

```python
# Access snapshot at current frontier
current_frontier = table.frontier()
data = table.snapshot_at(current_frontier)
```

资料来源：[src/python_api.rs:200-220](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs#L200-L220)

## Differential Dataflow Integration

Pathway's Rust engine is built on Differential Dataflow, an extension of Timely Dataflow that supports incremental computation with efficient handling of updates and retractions.

```mermaid
graph TD
    subgraph Timely_Dataflow["Timely Dataflow"]
        A[Input] --> B[Operator]
        B --> C[Probe]
    end
    
    subgraph Differential_Extension["Differential Dataflow"]
        B --> D[Arrangement]
        D --> E[Trace Agent]
        E --> F[Collection]
    end
```

The `arrange` operator imports arranged data into a computation scope:

```rust
pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
where
    G: Scope<Timestamp=Tr::Time>,
    Tr::Time: Timestamp,
{
    self.import_named(scope, "ArrangedSource")
}
```

资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:45-55](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs#L45-L55)

## Common Use Cases

### Real-time RAG Pipelines

Pathway excels at building Retrieval-Augmented Generation pipelines that require real-time document indexing and querying.

```
Documents --> Pathway VectorStoreServer --> REST API /v1/retrieve
                                                    |
User Query --> HTTP POST --> Pathway --> LLM --> Response
```

Example configuration:

```python
import pathway as pw

# Start the vector store server
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)

# Query endpoint
curl --data '{ "messages": "What is the value of X?"}' http://localhost:8011
```

资料来源：[examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md)

### Multi-Agent RAG Systems

Pathway supports multi-agent architectures where different agents collaborate to answer complex queries:

```mermaid
graph LR
    A[Documents] --> B[Pathway VectorStoreServer]
    B --> C[REST API]
    C --> D[UserProxy Agent]
    D --> E[Researcher Agent]
    D --> F[Analyst Agent]
    E & F --> G[Grounded Answers]
```

资料来源：[examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md)

## Running Pathway Applications

### Local Execution

Run Pathway applications like standard Python scripts:

```bash
python main.py
```

### Multi-threaded Execution

Pathway natively supports multithreading:

```bash
pathway spawn --threads 3 python main.py
```

### Docker Deployment

Pathway provides official Docker images for containerized execution:

```dockerfile
FROM pathwaycom/pathway:latest

WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

CMD [ "python", "./your-script.py" ]
```

资料来源：[README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

## Summary

Pathway provides a powerful framework for building data processing pipelines with the following advantages:

- **Python-first API**: Write pipelines in familiar Python syntax
- **Rust performance**: Leverage Differential Dataflow for efficient incremental computation
- **Unified code**: Same code works for batch and streaming scenarios
- **Production-ready**: Built-in monitoring, logging, and Docker support
- **LLM integration**: First-class support for RAG and AI pipelines

The framework is particularly well-suited for applications requiring real-time data processing, such as live document indexing, dynamic analytics, and AI-powered search systems.

---

<a id='page-concepts'></a>

## Core Concepts

### 相关页面

相关主题：[Pathway Introduction](#page-introduction), [Data Model and Type System](#page-data-model)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/internals/schema.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/schema.py)
- [python/pathway/internals/table.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/table.py)
- [python/pathway/internals/datasource.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/datasource.py)
- [docs/2.developers/4.user-guide/10.introduction/50.concepts.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/10.introduction/50.concepts.md)
- [docs/2.developers/4.user-guide/10.introduction/70.streaming-and-static-modes.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/10.introduction/70.streaming-and-static-modes.md)
</details>

# Core Concepts

Pathway is a Python ETL framework built on a scalable Rust engine that leverages Differential Dataflow for incremental computation. Understanding its core concepts is essential for building robust data processing pipelines that work seamlessly in both development and production environments.

## Architecture Overview

Pathway's architecture combines Python usability with Rust performance through a carefully designed abstraction layer. The Python API provides the user-facing interface, while the Rust engine handles the actual computation using Differential Dataflow's incremental processing model.

```mermaid
graph TB
    subgraph Python_API["Python API Layer"]
        Schema["Schema"]
        Table["Table"]
        Connector["Connectors"]
        UDF["User-Defined Functions"]
    end
    
    subgraph Rust_Engine["Rust Engine"]
        DiffDataflow["Differential Dataflow"]
        IncrementalComp["Incremental Computation"]
        Persistence["Persistence Layer"]
    end
    
    subgraph DataStreams["Data Streams"]
        Static["Static/Batch Data"]
        Streaming["Real-time Streams"]
    end
    
    Python_API --> Rust_Engine
    DataStreams --> Rust_Engine
```

The framework is designed so that the same code can run in multiple execution modes without modification, supporting local development, CI/CD tests, batch jobs, stream replays, and live data processing.

## Tables

Tables are the fundamental data structure in Pathway, representing collections of typed rows with a defined schema. Every table has a primary key that uniquely identifies each row.

### Table Structure

From the Rust engine perspective, a `Table` consists of:

| Component | Description | Rust Type |
|-----------|-------------|-----------|
| Scope | Execution scope for the table | `Scope` |
| Handle | Internal reference to table data | `TableHandle` |
| Columns | Typed data columns | `Vec<Column>` |
| Universe | Set of all keys in the table | `Universe` |

The `Table` class wraps the Rust engine's table representation and caches Python instances to avoid redundant object creation.

### Table Operations

Tables support a rich set of operations:

- **Selection**: Filter rows based on conditions
- **Projection**: Select and transform columns
- **Joins**: Combine tables based on keys
- **Aggregations**: Group and aggregate data
- **Updates**: Modify existing rows

All operations are lazy—they don't execute until the computation is triggered by `pw.run()`.

## Schemas

Schemas define the structure of tables, specifying column names, types, and metadata. They serve as the contract between data sources and the processing logic.

### Schema Definition

Schemas are typically defined using the `@pw.schema_class` decorator combined with `pw.column_definition`:

```python
class DocumentSchema(pw.Schema):
    id = pw.column_definition(type_=pw.Type.STRING)
    content = pw.column_definition(type_=pw.Type.STRING)
    timestamp = pw.column_definition(type_=pw.Type.DATETIME)
    embedding = pw.column_definition(type_=pw.Type.FLOAT_ARRAY)
```

### Column Types

| Type | Python Representation | Use Case |
|------|---------------------|----------|
| `STRING` | `str` | Text data, identifiers |
| `INT` | `int` | Integer values, counts |
| `FLOAT` | `float` | Decimal numbers |
| `BOOL` | `bool` | True/false values |
| `DATETIME` | `datetime` | Timestamps and dates |
| `FLOAT_ARRAY` | `List[float]` | Embeddings, vectors |
| `DICT` | `dict` | Nested structured data |

### Primary Keys

Each schema must define a primary key that uniquely identifies rows:

```python
class InputSchema(pw.Schema):
    doc_id = pw.column_definition(type_=pw.Type.STRING)
    content = pw.column_definition(type_=pw.Type.STRING)
    
    @staticmethod
    def primary_key():
        return InputSchema.doc_id
```

## Connectors and Data Sources

Connectors bridge external data sources with Pathway's internal table representation. They handle reading data from various sources and writing results to destinations.

### Input Connectors

Pathway provides input connectors for multiple data sources:

| Connector | Description | Protocol |
|-----------|-------------|----------|
| `pw.io.filesystem.read` | File system reading | Batch/Streaming |
| `pw.io.http.read` | HTTP endpoint polling | Streaming |
| `pw.io.rdkafka.read` | Kafka consumer | Streaming |
| `pw.io.amqp.read` | AMQP message queue | Streaming |
| `pw.io.gcp_storage.read` | Google Cloud Storage | Batch |
| `pw.io.s3.read` | Amazon S3 | Batch |
| `pw.io.azure_blob.read` | Azure Blob Storage | Batch |
| `pw.io.postgres.read` | PostgreSQL database | Batch/Streaming |

### Output Connectors

Output connectors write processed data to external systems:

```python
pw.io.json_lines.write(
    table=result_table,
    path="./output/results.jsonl"
)
```

### PythonSubject for Custom Sources

For custom data sources, Pathway provides the `PythonSubject` class that wraps Python callback functions:

```python
pw.io.python.connector(
    schema=MySchema,
    start_function=start_callback,
    read_function=read_callback,
    seek_function=seek_callback,
    stop_function=stop_callback
)
```

The subject handles the lifecycle of data reading with callbacks for initialization, data retrieval, and seeking to specific positions.

## Data Flow and Processing

Pathway processes data through a directed acyclic graph (DAG) of operations. Each operation transforms input tables into output tables.

```mermaid
graph LR
    A[Input Data] --> B[Source Connector]
    B --> C[Raw Table]
    C --> D[Transformations]
    D --> E[Joins & Aggregations]
    E --> F[Computed Table]
    F --> G[Output Connector]
    G --> H[Destination]
    
    style C fill:#e1f5fe
    style F fill:#e8f5e8
```

### Lazy Evaluation

All Pathway operations are lazily evaluated. The computation graph is built incrementally as operations are added, but no actual data processing occurs until `pw.run()` is called.

### Incremental Computation

The Rust engine uses Differential Dataflow to perform incremental computation. When input data changes, only the affected portions of the computation are re-executed. This is particularly valuable for streaming scenarios where data arrives continuously.

## Execution Modes

Pathway supports two primary execution modes that determine how data is processed over time.

### Static Mode (Batch Processing)

In static mode, Pathway processes finite datasets completely before producing results. The computation terminates when all input has been consumed.

```mermaid
graph LR
    A[All Input Data] --> B[Process Entire Dataset]
    B --> C[Single Output Result]
    
    style A fill:#fff3e0
    style C fill:#e8f5e8
```

Characteristics:
- Processes data in batches
- Terminates after completion
- Suitable for ETL jobs
- Optimized for throughput

### Streaming Mode

In streaming mode, Pathway processes data continuously as it arrives, updating results incrementally.

```mermaid
graph LR
    A[Data Stream] --> B[Continuous Processing]
    B --> C[Incremental Updates]
    C --> D[Live Results]
    D --> B
    
    style A fill:#e3f2fd
    style D fill:#e8f5e8
```

Characteristics:
- Processes data as it arrives
- Results update continuously
- Suitable for real-time analytics
- Maintains state over time

### Unified API

The same Pathway code works in both modes:

```python
# Works identically in both modes
result = (
    input_table
    .filter(lambda row: row.value > threshold)
    .groupby(lambda row: row.category)
    .reduce(lambda key, rows: (key, sum(r.value for r in rows)))
)
```

Switching between modes is achieved by changing the connector configuration rather than the transformation logic.

## The Rust Engine

Pathway's Rust engine provides the computational foundation, built on Timely Dataflow and Differential Dataflow.

### Key Components

| Component | Purpose | Location |
|-----------|---------|----------|
| `Scope` | Execution context | Rust Engine |
| `TableHandle` | Internal table reference | Rust Engine |
| `Column` | Column data structure | Rust Engine |
| `Universe` | Key set management | Rust Engine |
| `TraceAgent` | Arrangement tracking | Differential Dataflow |

### Python-Rust Integration

The integration uses PyO3 to create Python bindings for Rust structures:

```rust
#[pyclass(module = "pathway.engine", frozen)]
pub struct Table {
    scope: Py<Scope>,
    handle: TableHandle,
}
```

This allows Python code to work with Rust objects seamlessly while maintaining Python's ease of use.

### Persistence

The Rust engine supports checkpointing and state persistence:

```python
pw.run(
    persistence_config=pw.persistence.Config(
        mode=pw.persistence.Mode.SNAPSHOT,
        snapshot_path="./checkpoints"
    )
)
```

## User-Defined Functions

Pathway allows custom Python functions to be used in transformations, with careful handling of serialization and execution.

### Function Types

| Type | Execution | Use Case |
|------|-----------|----------|
| Pure Python | Serialized to Rust | Simple transformations |
| Stateful | Managed by Pathway | Complex aggregations |
| External | Called externally | LLM integrations |

### Execution Safety

Functions executed on streaming data must be deterministic and side-effect free to ensure correct incremental computation. Pathway validates function behavior during development and enforces constraints at runtime.

## Monitoring and Debugging

Pathway provides built-in monitoring capabilities through its dashboard.

### Available Metrics

- Message counts per connector
- Processing latency
- System resource utilization
- Error rates and logs

### Debugging Features

The `Trace` class captures execution stack traces for debugging:

```rust
#[pyclass(module = "pathway.engine", frozen)]
pub struct Trace {
    file_name: String,
    line_number: usize,
    line: String,
    function: String,
}
```

## Summary

Pathway's core concepts form a cohesive system for building data processing pipelines:

| Concept | Role |
|---------|------|
| **Tables** | Primary data structure for collections of typed rows |
| **Schemas** | Define table structure, types, and primary keys |
| **Connectors** | Bridge external data sources with Pathway tables |
| **Lazy Evaluation** | Build computation graphs without immediate execution |
| **Incremental Computation** | Efficient updates through Differential Dataflow |
| **Execution Modes** | Unified API for batch and streaming processing |
| **Rust Engine** | High-performance computation layer |

These concepts work together to provide a framework that is both developer-friendly and production-ready, capable of handling the full spectrum from simple batch ETL to complex real-time streaming analytics.

---

<a id='page-architecture'></a>

## System Architecture

### 相关页面

相关主题：[Python API Structure](#page-python-api), [Pathway Introduction](#page-introduction)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [src/engine/mod.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/mod.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [src/connectors/postgres.rs](https://github.com/pathwaycom/pathway/blob/main/src/connectors/postgres.rs)
- [external/differential-dataflow/src/lib.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/lib.rs)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
- [external/differential-dataflow/src/trace/wrappers/freeze.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/wrappers/freeze.rs)
- [external/differential-dataflow/src/trace/implementations/ord.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/implementations/ord.rs)
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)
</details>

# System Architecture

Pathway is a Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG applications. The system provides an easy-to-use Python API while being powered by a scalable Rust engine based on Differential Dataflow for incremental computation. 资料来源：[README.md]()

## High-Level Architecture Overview

Pathway follows a layered architecture that separates the Python user interface from the high-performance Rust computation engine.

```mermaid
graph TD
    subgraph Python_Layer["Python API Layer"]
        User_Code["User Python Code"]
        Python_API["Pathway Python API<br/>pw.Table, pw.io.*, pw.connectors"]
    end

    subgraph Rust_Engine["Rust Engine Layer"]
        PyO3_Bindings["PyO3 Bindings"]
        Timely_Runtime["Timely Dataflow Runtime"]
        Diff_Computations["Differential Dataflow<br/>Incremental Computations"]
        Trace_Agent["Trace Agent & Arrangements"]
    end

    subgraph Data_Storage["Data Storage Layer"]
        In_Memory_State["In-Memory State"]
        Persistence["Persistence Layer"]
        Connectors["External Connectors<br/>SQL, S3, HTTP, etc."]
    end

    User_Code --> Python_API
    Python_API --> PyO3_Bindings
    PyO3_Bindings --> Timely_Runtime
    Timely_Runtime --> Diff_Computations
    Diff_Computations --> Trace_Agent
    Trace_Agent --> In_Memory_State
    In_Memory_State --> Persistence
    Diff_Computations --> Connectors
```

资料来源：[src/python_api.rs:1-50]()

## Core Architectural Components

### Python API Layer

The Python API layer provides the public interface for Pathway users. It exposes various I/O connectors, table operations, and transformation functions.

**Key Python Classes Exposed via PyO3:**

| Class | Purpose | Source |
|-------|---------|--------|
| `Table` | Primary data container for rows | `src/python_api.rs` |
| `Universe` | Represents a set of row keys | `src/python_api.rs` |
| `Column` | Column descriptor with type info | `src/python_api.rs` |
| `Scope` | Computation scope context | `src/python_api.rs` |
| `Context` | Runtime context for expressions | `src/python_api.rs` |
| `Pointer` | Reference to table data | `src/python_api.rs` |
| `ValueField` | Schema field definition | `src/python_api.rs` |
| `PythonSubject` | Python-based data source | `src/python_api.rs` |

资料来源：[src/python_api.rs:150-200]()

### Rust Engine Layer

The Rust engine implements the core computation logic using Timely Dataflow and Differential Dataflow.

#### Timely Dataflow

Timely Dataflow provides the low-level infrastructure for parallel and distributed dataflow computation. Pathway extends the external timely-dataflow crate to support:

- **Multi-worker execution**: Enables parallel processing across CPU cores
- **Progress tracking**: Monitors dataflow completeness
- **Dynamic reconfiguration**: Supports changing dataflow graphs

资料来源：[external/timely-dataflow/mdbook/src/chapter_2/chapter_2_5.md]()

#### Differential Dataflow

Differential Dataflow builds on Timely Dataflow to provide incremental computation capabilities. This is the heart of Pathway's ability to efficiently handle streaming data with updates and deletions.

**Key Differential Dataflow Components:**

| Component | Purpose |
|-----------|---------|
| `TraceAgent` | Manages persistent trace data for arrangements |
| `Arranged` | Wrapper combining stream with trace |
| `TraceReader` | Interface for reading from traces |
| `Batch` | Immutable collection of updates |

资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:1-30]()

#### Trace Implementation

Pathway uses `OrdValBatch` and `OrdKeyBatch` for efficient in-memory storage of keyed data with timestamps.

**OrdValBatch Structure:**
```
OrdValBatch<K, V, T, R, O, CK, CV>
├── K: Key type (Ord + Clone)
├── V: Value type (Ord + Clone)
├── T: Timestamp type (Lattice + Ord + Clone)
├── R: Weight type (Semigroup)
├── O: Offset type for arrays
├── CK: Container for keys
└── CV: Container for values
```

资料来源：[external/differential-dataflow/src/trace/implementations/ord.rs:1-50]()

### Timestamp System

Pathway's timestamp system is crucial for handling streaming data and maintaining temporal ordering.

```mermaid
classDiagram
    class Timestamp {
        +u64 inner
        +followed_by(other: Timestamp) Timestamp
        +is_original() bool
        +next_retraction_time() Timestamp
    }
    
    class TotalFrontier {
        <<enumeration>>
        At(Timestamp)
        Done
    }
    
    class OriginalOrRetraction {
        +is_original() bool
    }
    
    class NextRetractionTime {
        +next_retraction_time() Timestamp
    }
    
    Timestamp ..|> OriginalOrRetraction
    Timestamp ..|> NextRetractionTime
```

**Key Timestamp Properties:**

- **Even timestamps** (`is_multiple_of(2)`): Represent original data
- **Odd timestamps**: Represent retractions
- **Lattice properties**: Enable merging of partial time information

资料来源：[src/engine/timestamp.rs:1-50]()

## Data Flow Architecture

### Input Processing Flow

```mermaid
graph LR
    subgraph Input_Sources
        Files["Files"]
        HTTP["HTTP/Websocket"]
        SQL["SQL Databases"]
        Kafka["Kafka/MQTT"]
    end

    subgraph Processing
        Parser["Parser/Deserializer"]
        Transformer["Transformer"]
        Arranger["Arranger"]
    end

    subgraph Storage
        Trace["Trace Agent"]
        Table["Table State"]
    end

    Files --> Parser
    HTTP --> Parser
    SQL --> Parser
    Kafka --> Parser
    Parser --> Transformer
    Transformer --> Arranger
    Arranger --> Trace
    Trace --> Table
```

### Table Operations Flow

Tables in Pathway are the primary data abstraction. Each table maintains:

1. **Schema**: Column definitions with types
2. **Keys**: Unique row identifiers
3. **Values**: Column data
4. **Timestamps**: For temporal ordering
5. **Diffs**: Change weights (+1 for insert, -1 for delete)

**Table Properties:**

| Property | Type | Description |
|----------|------|-------------|
| `id` | Key | Unique row identifier |
| `values` | Map[Column, Value] | Row data |
| `time` | Timestamp | Event timestamp |
| `diff` | i64 | Change weight |

资料来源：[src/python_api.rs:200-250]()

## Connectors Architecture

Pathway provides connectors for various data sources and sinks. The connector system is designed with extensibility in mind.

### Connector Base Infrastructure

**ConnectorProperties Class Hierarchy:**

```mermaid
classDiagram
    class ConnectorProperties {
        +name: String
        +storage: DataStorage
        +format: DataFormat
    }
    
    class ColumnProperties {
        +name: String
        +type_: PathwayType
        +source: FieldSource
        +default: Option~Value~
    }
    
    class TableProperties {
        +columns: Vec~ColumnProperties~
        +primary_key: Option~Vec~String~~
    }
    
    class CsvParserSettings {
        +delimiter: char
        +quotechar: char
        +header: bool
    }
    
    ConnectorProperties --> TableProperties
    TableProperties --> ColumnProperties
```

### Database Connectors

The PostgreSQL connector demonstrates the connector implementation pattern:

**Type Mapping:**

| PostgreSQL Type | Pathway Type | OID |
|-----------------|--------------|-----|
| BOOL | BOOLEAN | 16 |
| INT2 | INT | 21 |
| INT4 | INT | 23 |
| INT8 | INT | 20 |
| FLOAT4 | FLOAT | 700 |
| FLOAT8 | FLOAT | 701 |
| TEXT/VARCHAR | STRING | 25 |
| BYTEA | BINARY | 17 |
| JSONB | JSON | 3802 |
| TIMESTAMP | DATETIME | 1114 |
| UUID | UUID | 2950 |

**Postgres Binary Array Format:**
- `i32`: ndim (dimensions)
- `i32`: has_nulls flag
- `u32`: element OID
- Per dimension: size, lower_bound
- Per element: length (-1 for NULL), bytes

资料来源：[src/connectors/postgres.rs:1-80]()

### Other Supported Connectors

| Connector | Settings Class | Description |
|-----------|---------------|-------------|
| AWS S3 | `AwsS3Settings` | S3 object storage |
| Azure Blob | `AzureBlobStorageSettings` | Azure blob storage |
| Elasticsearch | `ElasticSearchParams` | Search and indexing |
| MQTT | `MqttSettings` | IoT messaging |
| Kafka | `KafkaSettings` | Stream messaging |
| Schema Registry | `PySchemaRegistrySettings` | Avro schema management |
| Iceberg | `IcebergCatalogSettings` | Lakehouse tables |
| PostgreSQL Replication | `PsqlReplicationSettings` | CDC from Postgres |

资料来源：[src/python_api.rs:250-300]()

## Persistence and State Management

### Persistence Configuration

Pathway supports configurable persistence for fault tolerance and state recovery.

```mermaid
graph TD
    subgraph Application_Layer
        User_Query["User Query"]
    end

    subgraph Persistence_Layer
        Config["PersistenceConfig"]
        Mode["PyPersistenceMode<br/>ON_STARTUP<br/>MANUAL<br/>periodic"]
        Storage["DataStorage"]
    end

    subgraph State_Management
        Snapshot["PySnapshotAccess"]
        Event["PySnapshotEvent"]
        Frontier["TotalFrontier"]
    end

    User_Query --> Config
    Config --> Mode
    Config --> Storage
    Mode --> Snapshot
    Snapshot --> Event
    Event --> Frontier
```

**Persistence Modes:**

| Mode | Behavior |
|------|----------|
| `ON_STARTUP` | Automatically restore state on application start |
| `MANUAL` | Explicit checkpoint management via API |
| `PERIODIC` | Automatic snapshots at intervals |

**Snapshot Access Methods:**

| Method | Return Type | Description |
|--------|-------------|-------------|
| `snapshot_at(frontier)` | `Vec<(Key, Vec<Value>)>` | State at specific frontier |
| `frontier()` | `TotalFrontier` | Current progress frontier |
| `failed()` | `bool` | Whether computation failed |

资料来源：[src/python_api.rs:100-150]()

### Trace Freeze Wrapper

The `TraceFreeze` wrapper provides read-only access to historical trace data:

```rust
pub struct TraceFreeze<Tr, F>
where
    Tr: TraceReader,
    Tr::Time: Lattice+Clone+'static,
    F: Fn(&Tr::Time)->Option<Tr::Time>,
{
    trace: Tr,
    func: Rc<F>,
}
```

This enables efficient point-in-time queries without blocking ongoing updates.

资料来源：[external/differential-dataflow/src/trace/wrappers/freeze.rs:1-30]()

## Expression and Computation System

### Python Expression Evaluation

Pathway allows Python functions to be used for data transformations through the `PyExpression` system.

**Expression System Components:**

| Component | Purpose |
|-----------|---------|
| `PyExpression` | Serialized Python expression |
| `PyExpressionData` | Runtime data for evaluation |
| `PyReducer` | Aggregation functions |
| `PyUnaryOperator` | Single-input operators |
| `PyBinaryOperator` | Two-input operators |

**Computation Execution Model:**

```mermaid
graph LR
    subgraph Definition
        Expr["Expression Definition"]
        Schema["Schema Definition"]
    end

    subgraph Compilation
        Parse["Parse & Validate"]
        Optimize["Optimize"]
        Generate["Generate Operators"]
    end

    subgraph Execution
        Compute["Compute"]
        Index["Index Results"]
        Output["Output to Tables"]
    end

    Expr --> Parse
    Schema --> Parse
    Parse --> Optimize
    Optimize --> Generate
    Generate --> Compute
    Compute --> Index
    Index --> Output
```

资料来源：[src/python_api.rs:300-350]()

### Computer and Scope System

The `Computer` and `Scope` classes manage computation execution:

| Class | Responsibility |
|-------|----------------|
| `Scope` | Defines a computation boundary |
| `Context` | Provides runtime evaluation context |
| `Computer` | Executes transformation logic |
| `Table` | Stores results |

```mermaid
sequenceDiagram
    participant User as User Code
    participant Scope as Scope
    participant Context as Context
    participant Computer as Computer
    participant Table as Table
    
    User->>Scope: Create scope
    User->>Context: Define operations
    Context->>Computer: Create computer
    Computer->>Table: Write results
    Table-->>User: Return handle
```

## Monitoring and Telemetry

Pathway includes comprehensive monitoring capabilities through the `TelemetryConfig` and `PyMonitoringLevel` classes.

**Monitoring Levels:**

| Level | Description |
|-------|-------------|
| `OFF` | No telemetry |
| `ERROR` | Only errors |
| `WARN` | Warnings and errors |
| `INFO` | Informational logs |
| `DEBUG` | Detailed debugging |

**Trace and Done Classes:**

The `Trace` class provides structured logging and tracing:

```python
class Trace:
    """Structured logging and tracing"""
    def log(level, message, context)
    def span(name, fn)
```

The `Done` class signals completion of operations and is used for synchronization.

资料来源：[src/python_api.rs:350-400]()

## External Integrations

Pathway integrates with various external systems and frameworks:

| Integration | Description |
|-------------|-------------|
| LangChain | LLM pipeline integration |
| LlamaIndex | RAG framework integration |
| MinIO | S3-compatible object storage |
| PaddleOCR | OCR capabilities |
| Databento | Market data feeds |

These integrations are typically implemented as Pathway connectors or external index providers.

资料来源：[README.md]()

## Key Design Patterns

### Incremental Computation Pattern

Pathway's core innovation is incremental computation:

1. **Input Change**: A modification arrives at time T
2. **Propagate**: The change flows through the computation graph
3. **Differential Update**: Only affected results are updated
4. **Efficient Output**: Minimal recomputation

```mermaid
graph LR
    A["Input Δ"] --> B["Compute Δ"]
    B --> C["Update Trace"]
    C --> D["Emit Δ"]
    D --> A
    
    style A fill:#f96
    style D fill:#96f
```

### Arrangement Pattern

Arrangements are a fundamental concept in Differential Dataflow:

- An **arrangement** = a stream + its indexed state
- Enables efficient joins and aggregations
- Maintains multiple versions for time-travel queries

```mermaid
graph TD
    subgraph Arrangement
        Stream["Stream Input"]
        Index["Trace Index"]
        Merge["Merge Operator"]
    end
    
    Stream --> Merge
    Index --> Merge
    Merge --> Output["Arranged Data"]
```

## Summary

Pathway's architecture is designed around three key principles:

1. **Python-First API**: Users write Python, but computation happens in Rust
2. **Incremental Everywhere**: All operations support efficient updates
3. **Unified Batch/Stream**: The same code works for both paradigms

The system successfully abstracts the complexity of parallel and distributed computation behind a simple Python interface, making real-time analytics and LLM pipelines accessible to Python developers.

---

<a id='page-python-api'></a>

## Python API Structure

### 相关页面

相关主题：[System Architecture](#page-architecture), [Data Transformations](#page-transformations)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md)
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md)
- [examples/projects/web-scraping/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/web-scraping/README.md)
</details>

# Python API Structure

## Overview

Pathway is a Python ETL framework for stream processing and real-time analytics, powered by a **scalable Rust engine** based on Differential Dataflow. The Python API Structure defines the interface layer that bridges Python user code with the high-performance Rust computation engine through PyO3 bindings. This architecture enables Python developers to write data pipelines while leveraging Rust's performance characteristics for incremental computation.

The framework allows seamless integration with Python ML libraries, supports both batch and streaming data, and can be used across development, CI/CD, and production environments with identical code. 资料来源：[README.md:1-15]()

## Architecture Overview

```mermaid
graph TD
    A[Python User Code] --> B[Python API Layer]
    B --> C[PyO3 Bindings]
    C --> D[Rust Engine]
    D --> E[Differential Dataflow]
    E --> F[Incremental Computation]
    
    G[PythonSubject] --> B
    H[ValueField] --> B
    I[PyExpression] --> B
    J[PyExportedTable] --> B
```

## Core Components

### PythonSubject

The `PythonSubject` class serves as the primary interface for connecting external data sources to the Pathway engine. It wraps Python callback functions that the Rust engine invokes during data processing.

| Property | Type | Description |
|----------|------|-------------|
| `start` | `Py<PyAny>` | Callback invoked when a new session starts |
| `read` | `Py<PyAny>` | Callback for reading data from the source |
| `seek` | `Py<PyAny>` | Callback for seeking within the data stream |
| `on_persisted_run` | `Py<PyAny>` | Callback triggered on persisted runs |
| `end` | `Py<PyAny>` | Callback invoked when the source ends |
| `is_internal` | `bool` | Flag indicating internal source usage |
| `deletions_enabled` | `bool` | Flag enabling deletion tracking |

资料来源：[src/python_api.rs:1-35]()

```rust
#[pymethods]
impl PythonSubject {
    #[new]
    #[pyo3(signature = (start, read, seek, on_persisted_run, end, is_internal, deletions_enabled))]
    fn new(
        start: Py<PyAny>,
        read: Py<PyAny>,
        seek: Py<PyAny>,
        on_persisted_run: Py<PyAny>,
        end: Py<PyAny>,
        is_internal: bool,
        deletions_enabled: bool,
    ) -> Self {
        Self { ... }
    }
}
```

### ValueField

`ValueField` defines schema fields within Pathway tables, specifying the structure of data including type, source, default values, and metadata.

| Property | Type | Description |
|----------|------|-------------|
| `name` | `String` | Field identifier |
| `type_` | `Type` | Data type specification |
| `source` | `FieldSource` | Origin of the field data |
| `default` | `Option<Value>` | Default value if none provided |
| `metadata` | `Option<String>` | Additional metadata as JSON string |

资料来源：[src/python_api.rs:40-55]()

### FieldSource Enum

The `FieldSource` enum specifies where field data originates:

| Value | Description |
|-------|-------------|
| `Payload` | Data from the input payload (default) |
| `NewId` | System-generated identifier |
| `USB` | User-specified binding |

资料来源：[src/python_api.rs:200-210]()

## Engine Integration Layer

### TotalFrontier

`TotalFrontier` represents the completion state of data processing across all workers. It uses an enum with two states:

```mermaid
stateDiagram-v2
    [*] --> At
    At --> Done: All workers complete
    Done --> [*]
    
    TotalFrontier: At(timestamp) - Processing pending
    TotalFrontier: Done - All complete
```

| State | Type | Meaning |
|-------|------|---------|
| `At(i)` | `Timestamp` | Frontier positioned at timestamp `i` |
| `Done` | singleton | All processing completed |

资料来源：[src/python_api.rs:90-130]()

### PyExportedTable

`PyExportedTable` exposes table data to Python consumers through three key methods:

| Method | Return Type | Description |
|--------|-------------|-------------|
| `frontier()` | `TotalFrontier<Timestamp>` | Current frontier position |
| `snapshot_at(frontier)` | `Vec<(Key, Vec<Value>)>` | Point-in-time table snapshot |
| `failed()` | `bool` | Indicates if table processing failed |

资料来源：[src/python_api.rs:140-165]()

```rust
#[pymethods]
impl PyExportedTable {
    fn frontier(&self) -> TotalFrontier<Timestamp> {
        self.inner.frontier()
    }

    fn snapshot_at(&self, frontier: TotalFrontier<Timestamp>) -> Vec<(Key, Vec<Value>)> {
        self.inner.snapshot_at(frontier)
    }

    fn failed(&self) -> bool {
        self.inner.failed()
    }
}
```

## Mode and Type Enums

### ConnectorMode

Defines data processing behavior for connectors:

| Constant | Value | Use Case |
|----------|-------|----------|
| `STATIC` | `ConnectorMode::Static` | Batch processing, fixed dataset |
| `STREAMING` | `ConnectorMode::Streaming` | Continuous data streams |

资料来源：[src/python_api.rs:220-240]()

### SessionType

Specifies how the engine handles sessionization:

| Constant | Value | Description |
|----------|-------|-------------|
| `NATIVE` | `SessionType::Native` | Native differential dataflow processing |
| `UPSERT` | `SessionType::Upsert` | Upsert-based session handling |

资料来源：[src/python_api.rs:245-260]()

## Expression System

### PyExpression

The `PyExpression` class wraps Rust `Expression` objects and tracks whether Python GIL (Global Interpreter Lock) is required for evaluation:

| Property | Type | Purpose |
|----------|------|---------|
| `inner` | `Arc<Expression>` | Rust expression AST |
| `gil` | `bool` | Whether GIL acquisition needed |

资料来源：[src/python_api.rs:280-310]()

```rust
pub struct PyExpression {
    inner: Arc<Expression>,
    gil: bool,
}

impl PyExpression {
    fn new(inner: Arc<Expression>, gil: bool) -> Self {
        Self { inner, gil }
    }
}
```

### Binary Operators

Binary operations are implemented using macros that automatically handle GIL tracking:

```rust
macro_rules! binary_op {
    ($expression:path, $lhs:expr, $rhs:expr $(, $arg:expr)*) => {
        Self::new(
            Arc::new(Expression::from($expression(
                $lhs.inner.clone(),
                $rhs.inner.clone(),
                $($arg,)*
            ))),
            $lhs.gil || $rhs.gil,
        )
    };
}
```

This pattern ensures that if either operand requires Python GIL, the entire expression is marked as requiring GIL during evaluation.

## Timestamp System

Pathway uses a specialized timestamp system that supports incremental computation through Differential Dataflow:

```mermaid
graph LR
    A[Input] --> B[Timestamp Assignment]
    B --> C[Frontier Computation]
    C --> D[Incremental Update]
    D --> E[Result]
    
    F[Retraction] -.->|Original| D
    F[Retraction] -.->|Retraction| D
```

### OriginalOrRetraction Trait

Timestamps determine whether a change is an original value or a retraction:

```rust
impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}
```

### NextRetractionTime Trait

Computes the next retraction timestamp:

```rust
impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}
```

资料来源：[src/engine/timestamp.rs:1-50]()

## Wakeup Handler

The `WakeupHandler` manages file descriptor-based wakeups for coordinating Rust async operations with Python's event loop:

```rust
struct WakeupHandler<'py> {
    _fd: OwnedFd,
    set_wakeup_fd: Bound<'py, PyAny>,
    old_wakeup_fd: Bound<'py, PyAny>,
}
```

This enables Pathway to integrate with Python's async ecosystem while maintaining efficient I/O handling in Rust.

## Trace and Error Handling

### EngineTrace

`EngineTrace` captures source location information for debugging and error reporting:

| Field | Type | Description |
|-------|------|-------------|
| `file_name` | `String` | Source file path |
| `line_number` | `u32` | Line number in source |
| `line` | `String` | Source line content |
| `function` | `String` | Function/method name |

资料来源：[src/python_api.rs:180-200]()

The trace system integrates with Python's exception handling, converting Rust trace data into Python traceback objects:

```rust
impl<'py> IntoPyObject<'py> for EngineTrace {
    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
        match self {
            Self::Empty => Ok(py.None().into_bound(py)),
            Self::Frame { ... } => Trace { ... }.into_bound_py_any(py),
        }
    }
}
```

## Data Flow Example

```mermaid
graph TD
    A[Data Source] -->|PythonSubject| B[Rust Engine]
    B -->|Processing| C[Timestamp Assignment]
    C -->|Frontier Update| D[TotalFrontier]
    D -->|Snapshot| E[PyExportedTable]
    E -->|Query| F[Python Application]
    
    G[ConnectorMode] -->|Configuration| B
    H[SessionType] -->|Configuration| B
```

## Usage Patterns

### Creating a Data Pipeline

Pathway pipelines are defined in Python and executed by the Rust engine:

```python
import pathway as pw

# Define schema
class InputSchema(pw.Schema):
    id: int
    data: str

# Create connector
connector = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)

# Run pipeline
pw.run()
```

资料来源：[examples/projects/question-answering-rag/README.md:30-45]()

### Web Scraping Pipeline

Real-time web scraping demonstrates the streaming capabilities:

```python
# Connector implementation
class NewsScraperSubject(pw.ConnectorSubject):
    def __init__(self, url, interval=60):
        # Configure callbacks
        self.url = url
        self.interval = interval
```

资料来源：[examples/projects/web-scraping/README.md:50-65]()

### Multi-Agent RAG Pipeline

The integration with agent frameworks shows advanced API usage:

```mermaid
graph LR
    A[Documents] -->|Index| B[Pathway VectorStoreServer]
    B -->|Retrieve| C[AG2 Agents]
    C -->|Query| D[User]
    
    E[REST API /v1/retrieve] --> B
```

资料来源：[examples/projects/ag2-multiagent-rag/README.md:20-40]()

## Summary

The Python API Structure in Pathway consists of several key layers:

1. **Python Interface Layer**: PyO3-wrapped classes (`PythonSubject`, `ValueField`, `PyExportedTable`, `PyExpression`) provide idiomatic Python APIs
2. **Type System**: Enums for `ConnectorMode`, `SessionType`, `FieldSource` configure engine behavior
3. **State Management**: `TotalFrontier` tracks processing progress across distributed workers
4. **Expression Evaluation**: The `PyExpression` system handles both pure Rust and Python-requiring computations
5. **Error Handling**: `EngineTrace` and `WakeupHandler` provide debugging and async coordination capabilities

This architecture enables Python developers to leverage high-performance Rust computation while maintaining the productivity of Python development.

---

<a id='page-data-model'></a>

## Data Model and Type System

### 相关页面

相关主题：[Core Concepts](#page-concepts), [Data Transformations](#page-transformations)

<details>
<summary>Relevant Source Files</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)
- [src/connectors/postgres.rs](https://github.com/pathwaycom/pathway/blob/main/src/connectors/postgres.rs)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)
- [external/differential-dataflow/src/trace/layers/ordered.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/trace/layers/ordered.rs)
</details>

# Data Model and Type System

Pathway's Data Model and Type System form the foundational architecture that enables the framework to handle structured data processing with type safety, schema validation, and efficient data representation. Built on a Rust-powered engine, Pathway provides a Python-friendly API while maintaining the performance benefits of differential dataflow computations.

## Architecture Overview

Pathway's type system operates at multiple levels: the Python API layer where users define schemas, and the Rust engine layer where types are resolved and processed efficiently.

```mermaid
graph TD
    A[Python API Layer<br/>dtype.py, expression.py] --> B[Type Interpreter<br/>type_interpreter.py]
    B --> C[Rust Engine<br/>python_api.rs]
    C --> D[Differential Dataflow<br/>Timely Backend]
    C --> E[Connectors<br/>postgres.rs, etc.]
    
    F[ValueField] --> C
    G[FieldSource] --> C
    H[ConnectorMode] --> C
    I[Timestamp] --> D
```

## Core Type Components

### Type Enumeration

The Pathway type system is built around a `Type` enumeration that defines all supported data types. These types map to both Python native types and Rust storage representations.

| Type Category | Supported Types | Description |
|---------------|-----------------|-------------|
| Boolean | `BOOL` | True/False values |
| Integer | `INT2`, `INT4`, `INT8` | 16, 32, and 64-bit signed integers |
| Float | `FLOAT4`, `FLOAT8` | 32 and 64-bit floating point |
| String | `TEXT`, `VARCHAR` | Variable-length text |
| Binary | `BYTEA` | Binary/blob data |
| JSON | `JSON`, `JSONB` | JSON data with/without binary optimization |
| Temporal | `TIMESTAMP`, `TIMESTAMPTZ`, `INTERVAL` | Date, time, and duration values |
| UUID | `UUID` | Universally unique identifiers |

资料来源：[src/connectors/postgres.rs:1-30]()

### ValueField Structure

`ValueField` is the core schema definition unit that encapsulates field metadata:

```python
@dataclass
class ValueField:
    name: str           # Field identifier
    type_: Type         # Data type enumeration
    source: FieldSource # Origin of the field data
    default: Optional[Value]  # Default value if null
    metadata: Optional[str]    # Additional metadata as JSON string
```

资料来源：[src/python_api.rs:30-47]()

## Field Source Classification

`FieldSource` defines where data originates within the Pathway computation model:

| Source | Value | Description |
|--------|-------|-------------|
| `Payload` | Default | Data from the original input connector |
| `Key` | - | Primary key field extracted/generated |
| `Computed` | - | Derived from transformations |
| `Pointer` | - | Reference to another table entry |

资料来源：[src/python_api.rs:20-28]()

The `FieldSource` enum enables Pathway to track data lineage and enforce immutability constraints:

```python
const PAYLOAD: FieldSource = FieldSource::Payload;
```

资料来源：[src/python_api.rs:450-451]()

## Connector Modes

Pathway supports two distinct data processing modes:

| Mode | Behavior | Use Case |
|------|----------|----------|
| `STATIC` | Process data once, complete when input exhausted | Batch processing, static datasets |
| `STREAMING` | Continuous processing, incremental updates | Real-time data streams, live monitoring |

资料来源：[src/python_api.rs:460-468]()

```python
#[pymethods]
impl PyConnectorMode {
    #[classattr]
    pub const STATIC: ConnectorMode = ConnectorMode::Static;
    #[classattr]
    pub const STREAMING: ConnectorMode = ConnectorMode::Streaming;
}
```

## Session Types

Session type configuration affects how Pathway handles data transactions:

| Type | Description |
|------|-------------|
| `NATIVE` | Standard Pathway processing |
| `UPSERT` | Specialized handling for upsert operations |

资料来源：[src/python_api.rs:470-477]()

## Timestamp System

Pathway's timestamp system implements incremental computation semantics using even-odd parity:

```python
impl OriginalOrRetraction for Timestamp {
    fn is_original(&self) -> bool {
        self.0.is_multiple_of(2)
    }
}

impl NextRetractionTime for Timestamp {
    fn next_retraction_time(&self) -> Self {
        Self(self.0 + 1)
    }
}
```

资料来源：[src/engine/timestamp.rs:25-36]()

### Timestamp Parity Convention

- **Even timestamps**: Original data values
- **Odd timestamps**: Retractions (deletions/updates of previous values)

This design enables efficient incremental updates in the differential dataflow computation model.

## Data Arrangement in Differential Dataflow

Pathway leverages differential dataflow for efficient state management. The arrangement layer provides key-value storage with ordered indexing:

```mermaid
graph LR
    A[Input Data] --> B[OrderedLayer<br/>Keys + Values]
    B --> C[Arranged Trace]
    C --> D[Query Operations]
    
    E[Offset Index] --> B
    F[Child Cursor] --> B
```

资料来源：[external/differential-dataflow/src/trace/layers/ordered.rs:1-20]()

### Cursor Navigation

The cursor-based traversal in ordered layers supports efficient seeking and stepping:

```rust
fn step(&mut self, storage: &OrderedLayer<K, L, O, C>) {
    // Pathway extension: pos_nonnegative tracks cursor repositioning
    if self.pos_nonnegative { 
        self.pos += 1; 
    } else {
        self.pos_nonnegative = true;
    }
}
```

资料来源：[external/differential-dataflow/src/trace/layers/ordered.rs:15-23]()

## Table Representation

Pathway represents data using the `LegacyTable` structure:

| Component | Type | Purpose |
|-----------|------|---------|
| `universe` | `Universe` | Unique identifier namespace |
| `columns` | `Vec<Column>` | Collection of typed columns |

资料来源：[src/python_api.rs:100-110]()

### Universe System

The universe concept provides isolation and partitioning for tables:

```python
pub fn __repr__(&self, py: Python) -> String {
    format!(
        "<LegacyTable universe={:?} columns=[{}]>",
        self.universe.borrow(py).handle,
        self.columns.iter().format_with(", ", |column, f| {
            f(&format_args!("{:?}", column.borrow(py).handle))
        })
    )
}
```

资料来源：[src/python_api.rs:112-120]()

## Computer and Computation Model

The `Computer` class wraps Python functions for execution within the Rust engine:

```python
pub struct Computer {
    fun: Py<PyAny>,           # Python callable
    dtype: Py<PyAny>,         # Return type annotation
    is_output: bool,          # Marks output tables
    is_method: bool,          # Instance vs class method
    universe: Py<Universe>,   # Computation scope
    data: Value,              # Closure-captured data
    data_column: Option<Py<Column>>,  # Optional input column
}
```

资料来源：[src/python_api.rs:145-154]()

### Computation Flow

```mermaid
graph TD
    A[Python Function Definition] --> B[Computer Wrapper]
    B --> C[Rust Engine Execution]
    C --> D[ScopedContext Injection]
    D --> E[Result Value]
    
    F[Timestamp Parity Check] -.-> C
```

## Key Generation Policies

Pathway supports flexible key generation strategies through `KeyGenerationPolicy`:

| Policy | Behavior |
|--------|----------|
| `Auto` | System-generated unique keys |
| `Manual` | User-provided key values |
| `Composite` | Keys derived from multiple fields |

资料来源：[src/python_api.rs:520-530]()

## Monitoring and Debugging

The `EngineTrace` system captures execution context for debugging:

```python
pub enum EngineTrace {
    Empty,                    # No trace information
    Frame {                   # Source location
        file_name: String,
        line: String,
        line_number: usize,
        function: String,
    }
}
```

资料来源：[src/python_api.rs:380-395]()

## Python-Rust Type Conversions

Pathway provides bidirectional type conversions between Python and Rust:

### IntoPyObject Implementation

```python
impl<'py> IntoPyObject<'py> for Timestamp {
    type Target = PyAny;
    type Output = Bound<'py, Self::Target>;
    type Error = PyErr;
    
    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
        self.0.into_bound_py_any(py)
    }
}
```

资料来源：[src/engine/timestamp.rs:8-18]()

### FromPyObject Implementation

```python
impl<'py> FromPyObject<'py> for Timestamp {
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        ob.extract().map(Self)
    }
}
```

资料来源：[src/engine/timestamp.rs:20-24]()

## PostgreSQL Type Mapping

Pathway's PostgreSQL connector provides explicit OID mappings for binary protocol:

| PostgreSQL OID | Pathway Type |
|----------------|--------------|
| 16 | BOOL |
| 21 | INT2 |
| 23 | INT4 |
| 20 | INT8 |
| 700 | FLOAT4 |
| 701 | FLOAT8 |
| 25 | TEXT/VARCHAR |
| 17 | BYTEA |
| 3802 | JSONB |
| 114 | JSON |
| 1114 | TIMESTAMP |
| 1184 | TIMESTAMPTZ |
| 1186 | INTERVAL |
| 2950 | UUID |

资料来源：[src/connectors/postgres.rs:10-25]()

## Summary

Pathway's Data Model and Type System provides:

1. **Type Safety**: Strong typing from Python definitions through to Rust execution
2. **Incremental Computation**: Timestamp-based parity for efficient updates
3. **Flexible Schema**: ValueField with configurable sources and defaults
4. **Multiple Processing Modes**: Static and streaming connector support
5. **Differential Execution**: Powered by timely and differential dataflow
6. **Connector Integration**: Native type mappings for external systems like PostgreSQL

This architecture enables users to write expressive Python pipelines while benefiting from high-performance Rust-based incremental computation.

---

<a id='page-connectors'></a>

## Data Connectors

### 相关页面

相关主题：[Custom Connectors](#page-connectors-custom)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/io/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/__init__.py)
- [python/pathway/io/csv/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/csv/__init__.py)
- [python/pathway/io/kafka/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/kafka/__init__.py)
- [python/pathway/io/postgres/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/postgres/__init__.py)
- [python/pathway/io/s3/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/s3/__init__.py)
- [python/pathway/io/http/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/io/http/__init__.py)
- [src/connectors/mod.rs](https://github.com/pathwaycom/pathway/blob/main/src/connectors/mod.rs)
- [docs/2.developers/4.user-guide/20.connect/35.pathway-connectors.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/20.connect/35.pathway-connectors.md)
</details>

# Data Connectors

## Overview

Data Connectors in Pathway are the primary interface for ingesting data from external sources and persisting computed results to external destinations. Pathway provides a unified abstraction that handles both batch and streaming data, allowing developers to write Python code that works seamlessly across different execution modes. The connector system is built on top of Pathway's Rust engine, which provides incremental computation capabilities through Differential Dataflow.

Pathway connectors abstract the complexity of various data sources—including filesystems, databases, message queues, cloud storage, and HTTP endpoints—behind a consistent Python API. This design enables developers to connect to multiple data sources using the same patterns, whether working in local development, CI/CD tests, or production streaming environments.

资料来源：[examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)

## Architecture

### Connector System Components

The Pathway connector system consists of three primary components that work together to provide reliable data movement:

**Subjects** are the data source abstraction in Pathway. A `PythonSubject` wraps user-defined Python functions that handle the mechanics of reading from an external source—connecting, seeking, reading bytes, and managing persistence state. The subject tracks whether the source is internal or external and manages deletion notifications when upstream data is removed.

**Connectors** orchestrate the data flow between subjects and the Pathway engine. They manage the lifecycle of data ingestion, including initialization, incremental updates, and graceful shutdown. Connectors expose configuration options for streaming versus static modes, enabling the same code to work in batch and real-time scenarios.

**Sinks** handle output operations, persisting Pathway tables to external destinations. Sinks receive computed results from the engine and write them to files, databases, or streaming systems with exactly-once semantics where supported.

```mermaid
graph TD
    A[External Data Source] --> B[PythonSubject]
    B --> C[Connector]
    C --> D[Pathway Engine<br/>Differential Dataflow]
    D --> E[Sink]
    E --> F[External Data Destination]
    
    G[Schema Definition] --> C
    H[Connector Mode<br/>Static/Streaming] --> C
```

资料来源：[src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

### Connector Mode

Pathway supports two distinct connector modes that determine how data is processed:

| Mode | Description | Use Case |
|------|-------------|----------|
| `STATIC` | Process all available data at once, then complete | Batch jobs, one-time imports |
| `STREAMING` | Continuously monitor for changes and updates | Real-time pipelines, live data |

```python
# Static mode - processes existing data and exits
pw.io.fs.read(path="./data/", mode=pw ConnectorMode.STATIC)

# Streaming mode - continuously monitors for changes
pw.io.fs.read(path="./data/", mode=pw.ConnectorMode.STREAMING)
```

资料来源：[src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## Input Connectors

### CSV Reader

The CSV connector reads comma-separated values from files or directories. It supports schema inference, type casting, and automatic handling of headers. The connector can operate in both static and streaming modes, re-reading files when they are modified or new files appear in the watched directory.

```python
import pathway as pw

# Basic CSV reading
table = pw.io.csv.read(
    path="./input_data/",
    schema=pw.Schema(
        id=pw.ColumnDefinition(type=pw.Type.INT),
        name=pw.ColumnDefinition(type=pw.Type.STR),
        value=pw.ColumnDefinition(type=pw.Type.FLOAT)
    ),
    mode=pw.ConnectorMode.STATIC
)
```

Configuration parameters for CSV reading include the file path, schema definition, whether to use streaming mode, and options for handling malformed rows. The connector tracks file modifications to enable incremental processing in streaming scenarios.

资料来源：[examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)

### Kafka Connector

The Kafka connector integrates with Apache Kafka for high-throughput message streaming. It supports both consumer and producer patterns, enabling Pathway pipelines to consume from Kafka topics and write results back to Kafka. The connector handles offset management, partition awareness, and graceful rebalancing when consumer groups change.

```python
import pathway as pw

# Kafka consumer
events = pw.io.kafka.read(
    brokers=["localhost:9092"],
    topic="input-events",
    schema=pw.Schema(
        event_id=pw.ColumnDefinition(type=pw.Type.STR),
        timestamp=pw.ColumnDefinition(type=pw.Type.DATETIME),
        payload=pw.ColumnDefinition(type=pw.Type.STR)
    ),
    format="json",
    kafka_settings=pw.io.kafka.KafkaReadSettings(
        consumer_group="pathway-consumer",
        offset=pw.io.kafka.Offset.latest()
    )
)
```

The connector supports JSON and Avro message formats, with automatic schema registration for Avro registries. It provides exactly-once processing semantics through coordinated offsets with Pathway's checkpointing system.

资料来源：[examples/projects/option-greeks/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/option-greeks/README.md)

### PostgreSQL Connector

Pathway provides bidirectional PostgreSQL integration for reading from and writing to PostgreSQL tables. The connector supports both full table scans and change data capture (CDC) when combined with logical replication, enabling real-time streaming from database changes.

```python
import pathway as pw

# PostgreSQL read with CDC
table = pw.io.postgres.read(
    host="localhost",
    port=5432,
    database="mydb",
    table_name="orders",
    user="pathway_user",
    password=pw.io.postgres.PasswordSecret("PG_PASSWORD"),
    schema=pw.Schema(
        order_id=pw.ColumnDefinition(type=pw.Type.INT, primary_key=True),
        customer_id=pw.ColumnDefinition(type=pw.Type.INT),
        total=pw.ColumnDefinition(type=pw.Type.FLOAT),
        created_at=pw.ColumnDefinition(type=pw.Type.DATETIME)
    )
)
```

For write operations, the PostgreSQL sink supports batch inserts and upserts, using primary key columns to determine insert versus update behavior. Connection pooling ensures efficient resource utilization under high throughput.

### S3 Connector

The S3 connector provides access to Amazon S3 and S3-compatible object stores (including MinIO). It supports reading objects as binary data, CSV, or JSON, and can watch S3 prefixes for new objects in streaming mode.

```python
import pathway as pw

# S3 read
data = pw.io.s3.read(
    bucket="my-bucket",
    prefix="input/data-",
    access_key=pw.io.s3.AwsAccessKeySecret("AWS_ACCESS_KEY"),
    secret_key=pw.io.s3.AwsSecretKeySecret("AWS_SECRET_KEY"),
    region="us-east-1",
    format="csv",
    mode=pw.ConnectorMode.STREAMING
)
```

The connector supports AWS IAM roles, instance profiles, and explicit credentials. It handles retries automatically for transient network failures and implements efficient listing with pagination for buckets with many objects.

资料来源：[examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)

### HTTP Connector

Pathway includes HTTP-based connectors for both receiving data via HTTP servers and making outbound HTTP requests. The HTTP input connector starts a web server that accepts incoming data, while the HTTP output connector can send computed results to external webhooks.

```python
import pathway as pw

# HTTP input server
input_table = pw.io.http.read(
    host="0.0.0.0",
    port=8080,
    schema=pw.Schema(
        request_id=pw.ColumnDefinition(type=pw.Type.STR),
        data=pw.ColumnDefinition(type=pw.Type.STR)
    ),
    mode=pw.ConnectorMode.STREAMING
)

# HTTP output
pw.io.http.write(
    table=result_table,
    url="https://api.example.com/webhook",
    api_key=pw.io.http.ApiKeySecret("WEBHOOK_API_KEY")
)
```

The HTTP input connector supports authentication via API keys and basic authentication. It processes incoming POST requests with JSON payloads and automatically handles concurrent requests with thread-safe processing.

资料来源：[examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md)

### SharePoint Connector

Pathway provides specialized connectors for Microsoft SharePoint that enable reading files from SharePoint document libraries and sites. The connector supports authenticated access using Azure AD application credentials with certificate-based authentication.

```python
import pathway as pw

# SharePoint read
documents = pw.io.sharepoint.read(
    url="https://company.sharepoint.com/sites/project",
    root_path="/documents",
    tenant="tenant-id",
    client_id="app-client-id",
    thumbprint="cert-thumbprint",
    cert_path="/path/to/certificate.pem",
    mode=pw.ConnectorMode.STREAMING
)
```

The connector indexes all files in the specified SharePoint location, monitoring for additions and modifications. It supports various file formats and can extract content for downstream processing.

资料来源：[examples/projects/sharepoint-test/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/sharepoint-test/README.md)

## Output Connectors

### CSV Writer

The CSV writer persists Pathway tables to CSV files with configurable delimiters, quoting behavior, and header options. In streaming mode, the writer can append to files or create new files based on time windows.

```python
import pathway as pw

pw.io.csv.write(
    table=result_table,
    path="./output/results.csv",
    include_header=True,
    separator=","
)
```

### Kafka Writer

The Kafka writer sends Pathway table updates to Kafka topics. Each row update is serialized as a message with configurable key extraction for partition routing.

```python
pw.io.kafka.write(
    table=result_table,
    brokers=["localhost:9092"],
    topic="output-events",
    format="json",
    key_column="event_id"
)
```

### Database Writers

Pathway supports writing to PostgreSQL and other databases with upsert semantics. Writers automatically batch updates for efficiency and handle retries for transient failures.

```python
pw.io.postgres.write(
    table=result_table,
    host="localhost",
    port=5432,
    database="output_db",
    table_name="results",
    user="writer_user",
    password=pw.io.postgres.PasswordSecret("PG_PASSWORD"),
    upsert=True
)
```

## Schema Definition

Pathway uses explicit schema definitions to map external data formats to typed columns. Schemas define column names, types, and metadata that guide parsing and validation.

```python
import pathway as pw

class InputSchema(pw.Schema):
    id: int
    name: str
    timestamp: pw.DateTimeUtc
    metadata: dict
```

| Schema Feature | Description |
|----------------|-------------|
| Column Types | INT, FLOAT, STR, BOOL, DATETIME, BYTES, JSON |
| Primary Keys | Mark columns as primary keys for upsert operations |
| Default Values | Provide fallback values for missing data |
| Nested Types | Support for arrays and dictionaries |

资料来源：[src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## Persistence and State Management

Connectors integrate with Pathway's persistence system to maintain state across restarts. When a connector's subject tracks offsets, positions, or timestamps, this state is preserved in the persistence backend and restored on restart.

```python
import pathway as pw

# Persistence configuration for connector state
persistence_config = pw.persistence.Config(
    backend=pw.persistence.Backend.filesystem(
        path="./persistence_storage/"
    )
)

pw.run(
    persistence_config=persistence_config
)
```

The persistence system ensures that streaming connectors resume from their last processed position, preventing data loss and duplicate processing during planned or unplanned shutdowns.

资料来源：[examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)

## Common Patterns

### Multi-Source Aggregation

Pathway excels at combining data from multiple heterogeneous sources into unified computations. Connectors for each source produce tables that can be joined, unioned, and transformed together.

```python
import pathway as pw

# Combine CSV and Kafka sources
csv_data = pw.io.csv.read(path="./static-data/", schema=CsvSchema)
kafka_data = pw.io.kafka.read(
    brokers=["localhost:9092"],
    topic="live-events",
    schema=KafkaSchema
)

# Join and process
result = csv_data.join(kafka_data, pw.left.id == pw.right.source_id).select(
    id=pw.left.id,
    static_info=pw.left.info,
    live_update=pw.right.value
)
```

### Live Document Indexing

A common pattern for RAG (Retrieval-Augmented Generation) applications involves continuous document indexing with real-time search capability. The HTTP connector combined with vector embedding creates a live knowledge base.

```python
import pathway as pw

# Document source with automatic reindexing
docs = pw.io.fs.read(
    path="./documents/",
    format="binary",
    mode=pw.ConnectorMode.STREAMING
)

# Web server for queries
pw.io.http.write_server(
    table=embedded_docs,
    host="0.0.0.0",
    port=8011
)
```

资料来源：[examples/projects/ag2-multiagent-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/ag2-multiagent-rag/README.md)

## Error Handling

Connectors provide mechanisms for handling errors during data processing. The engine tracks connector failures through the `failed()` method on exported tables, enabling pipelines to detect and respond to source or destination errors.

```python
exported = pw.io.csv.read(
    path="./data/",
    schema=InputSchema
)

# Check for connector failures
if exported.failed():
    # Handle error - log, alert, or retry
    pass
```

Connectors also support dead-letter patterns where invalid records are routed to separate tables for inspection and reprocessing, ensuring that malformed data does not halt the entire pipeline.

## Security Considerations

Connectors handle sensitive data and credentials with appropriate security measures. Secrets can be provided through environment variables, files, or secret management services. The connector API uses typed secret classes that prevent accidental logging of credentials.

| Secret Type | Usage |
|-------------|-------|
| `PasswordSecret` | Database passwords |
| `ApiKeySecret` | HTTP API keys |
| `AwsAccessKeySecret` | AWS access keys |
| `AwsSecretKeySecret` | AWS secret keys |

Pathway recommends using IAM roles, Azure managed identities, or Kubernetes secrets for production deployments rather than embedding credentials in code.

## Monitoring and Observability

Pathway's monitoring dashboard tracks connector metrics including message counts, processing latency, and error rates. Each connector reports statistics independently, enabling fine-grained observability across complex pipelines.

```python
pw.run(monitoring=True)
```

The dashboard displays per-connector metrics that help identify bottlenecks and failures in data pipelines. Metrics are also exported to Prometheus-compatible endpoints for integration with existing observability infrastructure.

资料来源：[README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

---

<a id='page-connectors-custom'></a>

## Custom Connectors

### 相关页面

相关主题：[Data Connectors](#page-connectors)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/internals/datasource.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/datasource.py)
- [python/pathway/internals/datasink.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/internals/datasink.py)
- [docs/2.developers/4.user-guide/20.connect/99.connectors/30.custom-python-connectors.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/20.connect/99.connectors/30.custom-python-connectors.md)
- [examples/projects/custom-python-connector-twitter/twitter_connector_example.py](https://github.com/pathwaycom/pathway/blob/main/examples/projects/custom-python-connector-twitter/twitter_connector_example.py)
</details>

# Custom Connectors

Pathway provides a powerful extensibility mechanism through Custom Connectors, enabling developers to create bespoke data source and data sink implementations tailored to specific data providers or consumers. This capability allows seamless integration with proprietary systems, legacy data sources, or third-party APIs that are not covered by Pathway's built-in connector library.

## Overview

Custom Connectors bridge the gap between Pathway's Rust-powered computation engine and Python-based data sources or sinks. They leverage the `ConnectorSubject` base class for data sources and provide a callback-based mechanism where Python functions handle the actual data reading or writing logic while the Rust engine manages state, persistence, and incremental computation.

The architecture follows a clean separation of concerns:

- **Python Layer**: Implements the actual data fetching/writing logic
- **Rust Engine**: Handles data flow, state management, watermarking, and incremental updates

## Architecture

```mermaid
graph TD
    subgraph "Python User Code"
        A[Custom Connector Class] --> B[ConnectorSubject Subclass]
        A --> C[read callback]
        A --> D[seek callback]
        A --> E[start callback]
        A --> F[end callback]
    end
    
    subgraph "Pathway Engine (Rust)"
        G[PythonSubject Wrapper] --> H[PythonReader]
        H --> I[State Manager]
        I --> J[Incremental Computation]
    end
    
    B --> G
    C --> G
    D --> G
    E --> G
    F --> G
```

## Core Components

### PythonSubject

The `PythonSubject` class serves as the bridge between Python connector implementations and the Pathway Rust engine. It wraps several Python callbacks that define the connector's behavior.

| Parameter | Type | Description |
|-----------|------|-------------|
| `start` | `Py<PyAny>` | Callback invoked when the connector begins processing |
| `read` | `Py<PyAny>` | Callback to read data entries from the source |
| `seek` | `Py<PyAny>` | Callback to seek to a specific position in the data stream |
| `on_persisted_run` | `Py<PyAny>` | Callback triggered when persisted state is restored |
| `end` | `Py<PyAny>` | Callback invoked when the connector finishes processing |
| `is_internal` | `bool` | Flag indicating if this is an internal connector |
| `deletions_enabled` | `bool` | Flag enabling deletion support |

*资料来源：[src/python_api.rs:1-50](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)*

### ConnectorMode

Pathway supports two distinct connector modes that determine data processing behavior:

| Mode | Description |
|------|-------------|
| `STATIC` | Reads data once without continuous polling. Ideal for batch processing of finite datasets. |
| `STREAMING` | Enables polling and continuous data monitoring. Supports real-time data ingestion with watermarks. |

*资料来源：[src/connectors/data_storage.rs:1-20](https://github.com/pathwaycom/pathway/blob/main/src/connectors/data_storage.rs)*

The `is_polling_enabled()` method returns `false` for Static mode and `true` for Streaming mode, controlling whether the engine periodically invokes the connector's read callback.

## Creating a Custom Data Source

### Step 1: Subclass ConnectorSubject

Create a new class that extends the base `ConnectorSubject` class. This defines your connector's schema and behavior.

```python
import pathway as pw

class MyCustomSourceSubject(pw.ConnectorSubject):
    schema: pw.Schema = pw.schema_from_types(
        id=str,
        value=float,
        timestamp=int,
    )
```

### Step 2: Implement Required Callbacks

The following callbacks must be implemented to define data retrieval behavior:

| Callback | Purpose |
|----------|---------|
| `start()` | Initialize connection and set up reader state |
| `read()` | Return new data entries since last read |
| `seek()` | Move to a specific position for replay |
| `end()` | Clean up resources when processing completes |

### Step 3: Configure the Connector

Pass your custom subject class to the connector along with mode and configuration:

```python
class MyCustomSource(pw.io.KafkaSource):
    def __init__(self, subject, topics, start_at_timestamp=0):
        super().__init__(
            subject=subject,
            topics=topics,
            start_at_timestamp=start_at_timestamp,
        )
```

## Creating a Custom Data Sink

Custom data sinks follow a similar pattern to sources but focus on outputting processed data. The datasink receives table updates and writes them to the target system.

### Sink Architecture

```mermaid
graph LR
    A[Pathway Table] --> B[DataSink Handler]
    B --> C[write callback]
    C --> D[External System]
```

### Key Properties

| Property | Description |
|----------|-------------|
| `table` | The Pathway table to subscribe to for updates |
| `id_column` | Column used for primary key identification |
| `username` | Authentication username (optional) |
| `password` | Authentication password (optional) |
| `host` | Target system hostname |
| `port` | Target system port |

## Twitter Connector Example

A complete example demonstrating custom connector implementation is available in the repository. The Twitter connector shows how to wrap an external API as a Pathway data source.

### Key Implementation Details

```python
class TwitterSubject(pw.ConnectorSubject):
    schema: pw.Schema = pw.schema_from_types(
        id=str,
        text=str,
        author=str,
        created_at=int,
    )
    
    def __init__(self, api_key, api_secret, **kwargs):
        self.api_key = api_key
        self.api_secret = api_secret
        super().__init__(**kwargs)
    
    def start(self):
        self.twitter_api = connect_to_twitter(self.api_key, self.api_secret)
        self.last_read_id = None
    
    def read(self):
        new_tweets = self.twitter_api.get_new_tweets(since_id=self.last_read_id)
        self.last_read_id = max(t.id for t in new_tweets)
        return [(t.id, t.text, t.author, t.created_at) for t in new_tweets]
```

*资料来源：[examples/projects/custom-python-connector-twitter/twitter_connector_example.py](https://github.com/pathwaycom/pathway/blob/main/examples/projects/custom-python-connector-twitter/twitter_connector_example.py)*

## ValueField Schema Definition

When defining connector schemas, `ValueField` provides fine-grained control over column properties:

| Property | Type | Description |
|----------|------|-------------|
| `name` | `String` | Column identifier |
| `type_` | `Type` | Data type (INT, FLOAT, STRING, etc.) |
| `source` | `FieldSource` | Origin of the field (Payload, Time, Partition) |
| `default` | `Option<Value>` | Default value if field is missing |
| `metadata` | `Option<String>` | Optional metadata string |

*资料来源：[src/python_api.rs:30-60](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)*

## Integration with Pathway Engine

### Reader Builder Pattern

The `PythonReaderBuilder` constructs reader instances that integrate with the engine:

```rust
pub struct PythonReaderBuilder {
    subject: Py<PythonSubject>,
    schema: HashMap<String, Type>,
}
```

*资料来源：[src/connectors/data_storage.rs:20-30](https://github.com/pathwaycom/pathway/blob/main/src/connectors/data_storage.rs)*

### State Management

The `PythonReader` maintains internal state for incremental processing:

| State Field | Purpose |
|-------------|---------|
| `total_entries_read` | Counter for statistics and debugging |
| `current_external_offset` | Position marker for resumable reads |
| `is_initialized` | Flag indicating startup completion |
| `is_finished` | Flag indicating processing completion |
| `python_thread_state` | GIL state for thread-safe Python calls |

## Best Practices

### Error Handling

Implement robust error handling in callbacks. The connector should gracefully handle transient failures and support retry mechanisms.

### Checkpointing

For long-running connectors, implement periodic checkpointing by tracking processed record identifiers or offsets in `seek()` callback state.

### Resource Management

Ensure resources (connections, file handles, API sessions) are properly initialized in `start()` and cleaned up in `end()`.

### Performance Considerations

- Batch data reads when possible to reduce callback overhead
- Use appropriate polling intervals for streaming connectors
- Monitor `total_entries_read` for capacity planning

## See Also

- [Pathway Connectors Documentation](https://pathway.com/developers/user-guide/connect/pathway-connectors)
- [Built-in Connectors Reference](https://pathway.com/developers/api-docs/pathway)
- [Persistence and State Management](https://pathway.com/developers/user-guide/persistence/)
- [Real-time Data Processing Guide](https://pathway.com/developers/user-guide/introduction/welcome)

---

<a id='page-transformations'></a>

## Data Transformations

### 相关页面

相关主题：[Temporal and Time-Based Processing](#page-temporal-processing)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs) (核心引擎接口)
- [src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs) (时间戳处理)
- [README.md](https://github.com/pathwaycom/pathway/blob/main/README.md) (项目概述)
- [examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md) (YAML配置示例)
- [examples/projects/question-answering-rag/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/README.md) (RAG管道示例)
- [external/differential-dataflow/src/operators/arrange/agent.rs](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs) (Differential Dataflow操作)
</details>

# Data Transformations

Data Transformations are the core operational layer in Pathway, enabling you to manipulate, combine, filter, and aggregate data as it flows through your pipeline. Built on a scalable Rust engine powered by Differential Dataflow, Pathway performs incremental computation that efficiently handles both batch and streaming data with the same Python API.

## Architecture Overview

Pathway's transformation engine operates on **Tables**, which are the fundamental data structures in the framework. Each table consists of:

- **Columns**: Named data fields with typed values
- **Keys**: Unique identifiers for rows
- **Timestamps**: Time indicators for stream ordering
- **Change Metadata**: Track additions, modifications, and retractions

```mermaid
graph TD
    A[Input Connectors] --> B[Table Creation]
    B --> C[Transformations]
    C --> D[Aggregations]
    C --> E[Joins]
    C --> F[Filters & Mappings]
    D --> G[Output Connectors]
    E --> G
    F --> G
    G --> H[Results / Persistence]
    
    subgraph Rust Engine
        C
        D
        E
        F
    end
```

资料来源：[README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

## Table Operations

Tables are immutable once created in Pathway. All data transformations produce new tables rather than modifying existing ones, ensuring data consistency and enabling proper change tracking.

### Core Table Structure

From the Rust engine perspective, tables are represented as:

| Component | Description |
|-----------|-------------|
| `universe` | A unique identifier space for table rows |
| `columns` | Collection of typed data columns |
| `handle` | Internal reference for engine operations |

资料来源：[src/python_api.rs:1-100](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

### Supported Data Types

Pathway supports a variety of primitive and complex types:

| Category | Types |
|----------|-------|
| Primitives | `int`, `float`, `str`, `bool` |
| Temporal | `datetime`, `date`, `time` |
| Complex | `bytes`, `JSON`, `pointer` |

### Filtering

Filtering operations select subsets of rows based on predicates:

```python
filtered_table = source_table.filter(source_table.column > threshold)
```

The filter operation:
1. Evaluates the predicate for each row
2. Produces a new table with only matching rows
3. Propagates timestamps from the source

### Column Operations

Add, rename, or compute new columns:

```python
# Add computed column
result = table.select(
    table.id,
    table.value,
    total=table.value * table.rate
)

# Rename columns
renamed = table.select(
    new_name=table.old_name,
    another=table.other
)
```

## Join Operations

Pathway provides multiple join strategies for combining tables, all powered by the underlying Differential Dataflow computation model.

```mermaid
graph LR
    A[Table A] --> C[Join Operation]
    B[Table B] --> C
    C --> D[Result Table]
    
    D --> E[Inner Join<br/>Rows with matches in both]
    D --> F[Left Join<br/>All rows from A]
    D --> G[Outer Join<br/>All rows from both]
```

### Join Types

| Join Type | Behavior |
|-----------|----------|
| Inner Join | Returns rows with matches in both tables |
| Left Join | All rows from left table, matched from right |
| Outer Join | All rows from both tables |
| Cross Join | Cartesian product with optional filtering |

### Join Syntax

```python
result = left_table.join(
    right_table,
    left_table.key == right_table.key
).select(
    left_table.star,
    right_table.data
)
```

### Incremental Join Behavior

The Rust engine handles joins incrementally:

```
src/python_api.rs: Table implementation
├── handle: TableHandle (internal engine reference)
├── scope: Py<Scope> (computation scope)
└── Methods for engine-level join operations
```

资料来源：[src/python_api.rs:200-300](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## Aggregation and Grouping

Pathway's aggregation system uses Differential Dataflow's efficient incremental computation model.

### GroupBy Operations

Group data by one or more columns:

```python
grouped = table.groupby(table.category)
```

### Built-in Reducers

| Reducer | Description | Example |
|---------|-------------|---------|
| `count` | Number of rows | `t.groupby(k).reduce(count=pw.reducers.count())` |
| `sum` | Sum of values | `t.groupby(k).reduce(total=pw.reducers.sum(t.val))` |
| `avg` | Average value | `t.groupby(k).reduce(avg=pw.reducers.avg(t.val))` |
| `min` | Minimum value | `t.groupby(k).reduce(min=pw.reducers.min(t.val))` |
| `max` | Maximum value | `t.groupby(k).reduce(max=pw.reducers.max(t.val))` |
| `collect` | Collect all values | `t.groupby(k).reduce(all=pw.reducers.collect(t.val))` |

### Custom Aggregations

Define custom aggregation logic:

```python
result = table.groupby(table.key).reduce(
    key=table.key,
    custom_agg=custom_reducer(table.values)
)
```

## Temporal Operations

Pathway handles temporal data with a sophisticated timestamp model that supports streaming and batch scenarios.

### Timestamp Semantics

```mermaid
graph TD
    T1[Timestamp 1] --> T2[Timestamp 2]
    T2 --> T3[Timestamp 3]
    T3 --> T4[Timestamp N]
    
    subgraph Original Values
        T1
        T3
    end
    
    subgraph Retractions
        T2
        T4
    end
```

The timestamp system distinguishes between:
- **Original values** (even timestamps)
- **Retractions** (odd timestamps)

资料来源：[src/engine/timestamp.rs:1-50](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)

### Time-based Operations

```python
# Window by time
result = table.windowby(table.timestamp).reduce(
    time_bucket=table.timestamp.dt.floor("1h"),
    value=pw.reducers.sum(table.amount)
)
```

## SQL-style Processing

Pathway supports SQL-like query processing for complex transformations:

```python
result = pw.sql.processing(
    """
    SELECT 
        category,
        SUM(value) as total,
        AVG(value) as average
    FROM source_table
    GROUP BY category
    HAVING SUM(value) > 100
    """
)
```

### Supported SQL Features

| Feature | Support |
|---------|---------|
| SELECT | Full projection support |
| WHERE | Filter conditions |
| GROUP BY | Aggregation grouping |
| HAVING | Post-aggregation filters |
| JOIN | Table combinations |
| Subqueries | Nested queries |

## Differential Dataflow Foundation

Pathway's transformation engine is built on **Differential Dataflow**, an extension of Timely Dataflow that efficiently handles incremental updates.

```mermaid
graph LR
    subgraph Timely Dataflow
        A[Operators] --> B[Scope]
        B --> C[Channels]
    end
    
    subgraph Differential Dataflow
        D[Arrangements] --> E[Collections]
        E --> F[Traces]
        D --> F
    end
    
    G[Pathway Python API] --> H[Rust Engine]
    H --> I[Differential Dataflow]
```

### Key Properties

| Property | Benefit |
|----------|---------|
| Incremental | Only recomputes changed portions |
| Monotonic | Handles retractions naturally |
| Composable | Complex pipelines from simple operators |
| Parallel | Scales across threads automatically |

资料来源：[external/differential-dataflow/src/operators/arrange/agent.rs:1-50](https://github.com/pathwaycom/pathway/blob/main/external/differential-dataflow/src/operators/arrange/agent.rs)

## Computation Model

Pathway's computation follows an **append-only** model where:

1. **Input**: Data enters via connectors
2. **Transform**: Operations produce new tables
3. **Output**: Results flow to sinks
4. **Persistence**: Optional state storage for recovery

```mermaid
graph TD
    A[Connectors] -->|Input Data| B[Engine]
    B --> C{Transformations}
    C --> D[Table Operations]
    C --> E[Joins]
    C --> F[Aggregations]
    D --> G[New Tables]
    E --> G
    F --> G
    G --> H[Output Connectors]
    G --> I[Persistence Layer]
```

### Running the Pipeline

```python
import pathway as pw

# Define pipeline
table = pw.io.fs.read("./input", schema=Schema)
result = table.groupby(table.key).reduce(count=pw.reducers.count())

# Configure output
pw.io.json_lines.write(result, "./output")

# Execute
pw.run()
```

资料来源：[README.md](https://github.com/pathwaycom/pathway/blob/main/README.md)

## Configuration and YAML Integration

Pathway supports declarative pipeline definition via YAML:

```yaml
# Data source
source: !pw.io.csv.read
  path: ./input.csv
  schema: $schema

# Transformations
processed: !pw.Transformations.groupby
  table: $source
  by: [category]
  reducers:
    - !pw.reducers.sum
      column: amount

# Output
output: !pw.io.csv.write
  table: $processed
  path: ./output.csv
```

资料来源：[examples/templates/el-pipeline/README.md](https://github.com/pathwaycom/pathway/blob/main/examples/templates/el-pipeline/README.md)

## Error Handling

The engine tracks and propagates errors through the computation graph:

```mermaid
graph TD
    A[Operation] -->|Success| B[Next Operation]
    A -->|Error| C[ErrorLog]
    C -->|Recorded| D[Monitoring]
    D -->|Notification| E[Alert System]
```

```python
# Access error information
error_log = table.error_history()
```

资料来源：[src/python_api.rs:300-400](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)

## Performance Characteristics

### Threading

Pathway natively supports multithreading:

```bash
$ pathway spawn --threads 3 python main.py
```

### Memory Efficiency

| Strategy | Description |
|----------|-------------|
| Incremental | Only changed data is reprocessed |
| Arrangements | Pre-indexed data structures for fast joins |
| Persistence | Checkpointing to manage memory |

### Scaling

Pathway pipelines scale from:
- Local development (single thread)
- Multi-threaded processing
- Distributed deployment via containerization

## See Also

- [Input Connectors](https://pathway.com/developers/user-guide/connect/pathway-connectors) - Data ingestion methods
- [Output Connectors](https://pathway.com/developers/user-guide/connect/pathway-connectors) - Data delivery methods
- [Persistence Guide](https://pathway.com/developers/user-guide/persistence/overview) - State management
- [RAG Pipeline Example](https://github.com/pathwaycom/pathway/blob/main/examples/projects/question-answering-rag/) - End-to-end transformation example

---

<a id='page-temporal-processing'></a>

## Temporal and Time-Based Processing

### 相关页面

相关主题：[Data Transformations](#page-transformations)

<details>
<summary>Relevant Source Files</summary>

以下源码文件用于生成本页说明：

- [python/pathway/stdlib/temporal/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/__init__.py)
- [python/pathway/stdlib/temporal/_window.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_window.py)
- [python/pathway/stdlib/temporal/_asof_join.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_asof_join.py)
- [python/pathway/stdlib/temporal/_interval_join.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_interval_join.py)
- [docs/2.developers/4.user-guide/40.temporal-data/10.windows-manual.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/40.temporal-data/10.windows-manual.md)
- [docs/2.developers/4.user-guide/40.temporal-data/40.asof-join.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/40.temporal-data/40.asof-join.md)
</details>

# Temporal and Time-Based Processing

## Overview

Pathway's temporal and time-based processing capabilities enable sophisticated stream analytics by leveraging the underlying Differential Dataflow computation model. The framework provides robust mechanisms for windowing operations, temporal joins, and time-based aggregations that work seamlessly across both batch and streaming data scenarios.

Pathway processes data with timestamps, maintaining the temporal order and allowing computations to reference historical data at specific time points. The Rust engine manages time progression through frontiers—boundary markers that indicate which timestamps have been fully processed—enabling consistent and deterministic results regardless of data arrival order.

资料来源：[src/python_api.rs](https://github.com/pathwaycom/pathway/blob/main/src/python_api.rs)  
资料来源：[src/engine/timestamp.rs](https://github.com/pathwaycom/pathway/blob/main/src/engine/timestamp.rs)

## Core Concepts

### Timestamps and Time Model

Pathway operates on timestamped data where each record carries a timestamp indicating when the event occurred or when it should be processed. The timestamp model supports both event time and processing time semantics.

```python
# Timestamps are automatically tracked by Pathway
# Each data record carries temporal information
result = table.select(
    col=table.value,
    time=table._time_attribute  # System-managed timestamp
)
```

### Frontiers and Time Progress

Frontiers in Pathway represent the boundary between processed and unprocessed data. The system tracks two critical frontiers:

| Frontier Type | Description |
|--------------|-------------|
| Input Frontier | Earliest incomplete input timestamp |
| Output Frontier | Latest timestamp for which all computations are complete |

The capability system (borrowed from Timely Dataflow) allows operators to hold references to specific timestamps, ensuring computations only execute when their input data is available.

资料来源：[external/timely-dataflow/timely/src/dataflow/operators/capability.rs](https://github.com/pathwaycom/pathway/blob/main/external/timely-dataflow/timely/src/dataflow/operators/capability.rs)

## Windowing Operations

Pathway provides comprehensive windowing capabilities through the `pathway.stdlib.temporal` module. Windows group data based on temporal criteria, enabling aggregations, joins, and transformations over bounded time intervals.

```mermaid
graph TD
    A[Input Stream] --> B[Window Assigner]
    B --> C[Tumbling Windows]
    B --> D[Sliding Windows]
    B --> E[Session Windows]
    C --> F[Window 1]
    C --> G[Window 2]
    D --> H[Overlapping Windows]
    E --> I[Session 1]
    E --> J[Session 2]
```

### Tumbling Windows

Tumbling windows are fixed-size, non-overlapping time windows that partition the data stream into discrete, consecutive segments. Each event belongs to exactly one window.

```python
from pathway.stdlib.temporal import tumbling_window

# Create tumbling windows of 1 hour
result = table.window.tumbling(
    window_length=timedelta(hours=1),
    timestamp=table.event_time
).reduce(
    count=tools.count(),
    sum_value=tools.sum(table.value)
)
```

**Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `window_length` | `timedelta` | Duration of each window |
| `timestamp` | `ColumnExpression` | Column containing event timestamps |
| `session_gap` | `timedelta` | Optional gap between sessions |

### Sliding Windows

Sliding windows move continuously over the data stream with a fixed size and slide interval. Unlike tumbling windows, consecutive windows can overlap.

```python
from pathway.stdlib.temporal import sliding_window

# Create sliding windows of 5 minutes, sliding every 1 minute
result = table.window.sliding(
    window_length=timedelta(minutes=5),
    slide=timedelta(minutes=1),
    timestamp=table.event_time
).reduce(
    avg_value=tools.aggregate.avg(table.value)
)
```

### Session Windows

Session windows detect periods of activity separated by gaps of inactivity. They automatically expand as new events arrive within the gap threshold.

```python
from pathway.stdlib.temporal import session_window

# Session windows with 30-minute inactivity gap
result = table.window.session(
    gap=timedelta(minutes=30),
    timestamp=table.event_time
).reduce(
    session_duration=tools.count(),
    total_amount=tools.sum(table.amount)
)
```

资料来源：[python/pathway/stdlib/temporal/_window.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_window.py)  
资料来源：[docs/2.developers/4.user-guide/40.temporal-data/10.windows-manual.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/40.temporal-data/10.windows-manual.md)

## Temporal Joins

### ASOF Join

ASOF (As-Of) join is designed for time-series data where you want to match records from two tables based on the closest timestamp without exceeding a specified threshold. This is particularly useful for correlating events that occur at approximately the same time.

```python
from pathway.stdlib.temporal import asof_join

# Match trades with the most recent quote before each trade
result = trades.join(
    quotes,
    trades.timestamp >= quotes.timestamp,
    trades.timestamp < quotes.timestamp + timedelta(seconds=10),
    selectors={
        trades.price: "trade_price",
        quotes.price: "quote_price",
    }
).select(
    trade_price=result.trade_price,
    quote_price=result.quote_price,
    time_diff=result.trades_timestamp - result.quotes_timestamp
)
```

**Configuration Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `time_distance` | `timedelta` | Maximum time distance between matched records |
| `closed_window` | `bool` | Include boundary timestamps in matches |
| `behavior` | `str` | `"all"`, `"emit_per_right"`, or `"emit_per_left"` |
| `index` | `Table` | Optional reference table for index-based matching |

The ASOF join supports multiple matching strategies:

```python
# Emit one result per left record (default)
result = left.asof_join(
    right,
    left.time >= right.time,
    left.time < right.time + time_distance
)

# Emit one result per right record
result = left.asof_join(
    right,
    left.time >= right.time,
    left.time < right.time + time_distance,
    behavior=ASOF_join_behavior.EMIT_PER_RIGHT
)
```

资料来源：[python/pathway/stdlib/temporal/_asof_join.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_asof_join.py)  
资料来源：[docs/2.developers/4.user-guide/40.temporal-data/40.asof-join.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/40.temporal-data/40.asof-join.md)

### Interval Join

Interval join matches records from two tables where the timestamps fall within a specified time interval relative to each other. Unlike ASOF join which finds the closest match, interval join produces all pairs where timestamps satisfy the interval condition.

```python
from pathway.stdway.stdlib.temporal import interval_join

# Match orders with shipments that arrive within 1-5 days after ordering
result = orders.join(
    shipments,
    orders.order_date <= shipments.ship_date,
    orders.order_date + timedelta(days=5) >= shipments.ship_date,
    orders.order_date + timedelta(days=1) <= shipments.ship_date,
    selectors={
        orders.order_id: "order_id",
        orders.order_date: "order_date",
        shipments.ship_date: "ship_date",
        shipments.tracking: "tracking_number"
    }
).select(
    order_id=result.order_id,
    order_date=result.order_date,
    ship_date=result.ship_date,
    delay=result.ship_date - result.order_date
)
```

**Key Differences from ASOF Join:**

| Aspect | ASOF Join | Interval Join |
|--------|----------|---------------|
| Match Cardinality | One-to-one (closest) | One-to-many |
| Timestamp Condition | Single bound (closest) | Interval range |
| Use Case | Time-series alignment | Event correlation |
| Performance | Optimized for single match | May produce more results |

资料来源：[python/pathway/stdlib/temporal/_interval_join.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/_interval_join.py)

## Windowed Aggregations

Pathway supports aggregations over windowed data with a rich set of built-in functions:

```python
from pathway.stdlib.temporal import window
from pathway.stdlib import tools

# Comprehensive windowed aggregation
result = sensor_data.window.tumbling(
    window_length=timedelta(minutes=15)
).reduce(
    # Count aggregations
    event_count=tools.count(),
    distinct_sensors=tools.count(distinct=True, by=sensor_data.sensor_id),
    
    # Numeric aggregations
    avg_temperature=tools.avg(sensor_data.temperature),
    max_reading=tools.max(sensor_data.value),
    min_reading=tools.min(sensor_data.value),
    total_value=tools.sum(sensor_data.value),
    variance_pop=tools.var_pop(sensor_data.value),
    
    # Collection aggregations
    all_readings=tools.collect(sensor_data.value),
    std_dev=tools.stddev(sensor_data.value)
)
```

**Available Aggregation Functions:**

| Function | Description | Applicable Types |
|----------|-------------|------------------|
| `count()` | Number of records | All |
| `sum(column)` | Sum of values | Numeric |
| `avg(column)` | Arithmetic mean | Numeric |
| `min(column)` | Minimum value | Comparable |
| `max(column)` | Maximum value | Comparable |
| `var_pop(column)` | Population variance | Numeric |
| `stddev(column)` | Standard deviation | Numeric |
| `collect(column)` | Collect all values | All |
| `any(column)` | Return any value | All |

## Time Evolution and State Management

Pathway maintains state across time using differential dataflow's incremental computation model. The persistence layer tracks state changes and enables recovery from checkpoints.

```mermaid
graph LR
    A[Input Data] --> B[Timestamp Assignment]
    B --> C[Differential Update]
    C --> D[State Accumulation]
    D --> E[Frontier Advancement]
    E --> F[Output Production]
    E -->|State Checkpoint| G[Persistence Backend]
    G -->|Recovery| D
```

### Checkpoint and Recovery

The persistence system automatically snapshots state at frontier boundaries:

```python
import pathway as pw

class MyApp(pw.Application):
    # Enable persistence with automatic snapshots
    persistence_config = pw.PersistenceConfig(
        backend=pw.persistence.Backends.filesystem("./data"),
        snapshot_interval=timedelta(minutes=5)
    )
    
    # Tables automatically benefit from persistence
    @pw.operator
    def my_operator(self, table):
        return table.groupby(table.category).reduce(
            total=pw.reducers.sum(table.value)
        )
```

资料来源：[src/persistence/tracker.rs](https://github.com/pathwaycom/pathway/blob/main/src/persistence/tracker.rs)

## API Reference

### Module: pathway.stdlib.temporal

#### Window Creation

```python
pathway.stdlib.temporal.tumbling_window(
    table,
    time_column,
    window_length,
    hop=None,
    name=None
)
```

#### Join Operations

```python
pathway.stdlib.temporal.asof_join(
    left_table,
    right_table,
    left_time,
    right_time,
    time_distance,
    behavior=ASOF_join_behavior.EMIT_PER_RIGHT,
    selectors=None
)

pathway.stdlib.temporal.interval_join(
    left_table,
    right_table,
    left_time_lower,
    left_time_upper,
    right_time_lower,
    right_time_upper,
    selectors=None
)
```

### Enum: ASOF_join_behavior

| Value | Description |
|-------|-------------|
| `EMIT_PER_RIGHT` | Emit one result per right record matched |
| `EMIT_PER_LEFT` | Emit one result per left record matched |
| `EMIT_ALL` | Emit all possible matches |

## Best Practices

### Timestamp Column Selection

Choose timestamp columns carefully:

```python
# Prefer event-time columns for windowing
table = pw.Table.read(
    data,
    id_column="event_id",
    time_column="event_timestamp"  # Event time, not processing time
)
```

### Performance Considerations

1. **Window Size Tuning**: Smaller windows reduce memory footprint but increase computation frequency
2. **Session Gap Configuration**: Set gaps based on expected user behavior patterns
3. **Join Cardinality**: Monitor interval join result sizes to avoid unbounded growth
4. **Late Data Handling**: Configure watermarks appropriately for your data arrival patterns

### Handling Out-of-Order Data

```python
# Configure watermark tolerance for late events
result = table.window.tumbling(
    window_length=timedelta(hours=1),
    timestamp=table.event_time,
    watermark=timedelta(minutes=5)  # Accept events up to 5 min late
).reduce(
    count=tools.count()
)
```

## Conclusion

Pathway's temporal and time-based processing capabilities provide a powerful foundation for building real-time analytics applications. The combination of flexible windowing, specialized temporal joins, and the underlying differential dataflow engine enables processing of streaming data with the same ease as batch processing, while maintaining correctness and determinism across restarts and scaling events.

资料来源：[python/pathway/stdlib/temporal/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/stdlib/temporal/__init__.py)

---

<a id='page-llm-xpack'></a>

## LLM xPack

### 相关页面

相关主题：[Data Connectors](#page-connectors)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [python/pathway/xpacks/llm/__init__.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/__init__.py)
- [python/pathway/xpacks/llm/llms.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/llms.py)
- [python/pathway/xpacks/llm/embedders.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/embedders.py)
- [python/pathway/xpacks/llm/parsers.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/parsers.py)
- [python/pathway/xpacks/llm/splitters.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/splitters.py)
- [python/pathway/xpacks/llm/rerankers.py](https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/rerankers.py)
- [docs/2.developers/4.user-guide/50.llm-xpack/10.overview.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/50.llm-xpack/10.overview.md)
- [docs/2.developers/4.user-guide/50.llm-xpack/70.embedders.md](https://github.com/pathwaycom/pathway/blob/main/docs/2.developers/4.user-guide/50.llm-xpack/70.embedders.md)
</details>

# LLM xPack

The LLM xPack is a specialized extension module within Pathway that provides comprehensive integration capabilities for Large Language Models (LLMs), embedding models, and retrieval-augmented generation (RAG) workflows. This module enables developers to build production-ready LLM pipelines with real-time data processing capabilities.

## Overview

Pathway's LLM xPack bridges the gap between traditional ETL stream processing and modern AI-powered applications. It provides Python-native interfaces for interacting with various LLM providers, embedding services, and RAG components while maintaining Pathway's core strengths in handling streaming and batch data with consistency guarantees.

The module is designed to work seamlessly with Pathway's Rust-powered engine, enabling high-performance inference and data transformation operations that can scale from local development to production deployments.

## Architecture

```mermaid
graph TD
    A[Pathway Pipeline] --> B[LLM xPack]
    B --> C[LLMs Module]
    B --> D[Embedders Module]
    B --> E[Rerankers Module]
    B --> F[Splitters Module]
    B --> G[Parsers Module]
    
    C --> H[OpenAI]
    C --> I[Azure OpenAI]
    C --> J[Anthropic]
    C --> K[Google AI]
    C --> L[Custom REST]
    
    D --> M[OpenAI Embeddings]
    D --> N[HuggingFace Embeddings]
    D --> O[Ollama]
    D --> P[Azure OpenAI Embeddings]
    
    H --> Q[Responses]
    I --> Q
    J --> Q
    K --> Q
    L --> Q
```

## Module Structure

| Module | Purpose | Primary Classes/Functions |
|--------|---------|---------------------------|
| `llms.py` | LLM provider integration | `OpenAILanguageModel`, `AzureOpenAILanguageModel`, `AnthropicLanguageModel` |
| `embedders.py` | Text embedding generation | `OpenAIEmbedder`, `HuggingFaceEmbedder`, `OllamaEmbedder` |
| `rerankers.py` | Document reranking for RAG | Cross-encoder based reranking |
| `splitters.py` | Text document splitting | `TokenSplitter`, `SentenceSplitter` |
| `parsers.py` | Output parsing and extraction | Response parsing utilities |

## LLMs Module

The LLMs module provides unified interfaces for interacting with various language model providers. All implementations follow a common base interface ensuring portability across different backends.

### Supported Providers

```mermaid
graph LR
    A[Developer Code] --> B[LLM Wrapper Layer]
    B --> C[OpenAI]
    B --> D[Azure OpenAI]
    B --> E[Anthropic]
    B --> F[Google AI]
    B --> G[Custom API]
```

### Base Interface

The LLM wrappers implement a common pattern where prompts are processed and responses are returned as Pathway-compatible data structures. This enables seamless integration with Pathway's table operations.

```python
# Conceptual interface pattern
class BaseLanguageModel:
    def __call__(self, prompt: str, **kwargs) -> LLMResponse:
        ...
```

### Provider-Specific Configurations

| Provider | Authentication | API Version | Model Selection |
|----------|---------------|-------------|-----------------|
| OpenAI | API Key | Latest | Via model parameter |
| Azure OpenAI | API Key + Endpoint | 2024-02-01 | Deployment name |
| Anthropic | API Key | Latest | claude-3 family |
| Google AI | API Key | Latest | Gemini models |

## Embedders Module

Embedders generate vector representations of text for semantic search and similarity matching. The module supports multiple embedding providers with consistent interfaces.

### Supported Embedding Services

| Embedder | Provider | Dimension Support | Batch Processing |
|----------|----------|-------------------|------------------|
| `OpenAIEmbedder` | OpenAI | 1536, 3072 | Yes |
| `HuggingFaceEmbedder` | HuggingFace Inference API | Model-dependent | Yes |
| `OllamaEmbedder` | Local Ollama server | Model-dependent | Yes |
| `AzureOpenAIEmbedder` | Azure OpenAI | 1536, 3072 | Yes |

### Embedding Workflow

```mermaid
graph TD
    A[Input Text] --> B[Preprocessing]
    B --> C[Tokenization]
    C --> D[API Call / Local Inference]
    D --> E[Vector Embedding]
    E --> F[Normalized Output]
```

### Configuration Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `model` | str | Required | Embedding model identifier |
| `api_key` | str | Environment | Authentication key |
| `batch_size` | int | 32 | Documents per batch |
| `timeout` | float | 60.0 | Request timeout in seconds |
| `max_retries` | int | 3 | Retry attempts on failure |

## Rerankers Module

Rerankers improve retrieval quality by reordering initial search results using cross-encoder models. This is particularly valuable in RAG pipelines where initial vector search may return partially relevant results.

### Reranking Process

```mermaid
graph LR
    A[Query] --> B[Initial Retrieval<br/>Top-K Results]
    B --> C[Cross-Encoder Scorer]
    C --> D[Reordered Results]
    D --> E[Final Output]
    
    F[Retrieved Documents] --> C
```

The reranking module uses cross-encoder models to score query-document pairs, returning results ordered by relevance score.

## Splitters Module

Document splitters divide large texts into smaller chunks suitable for embedding and retrieval. Proper chunking is critical for RAG pipeline effectiveness.

### Splitting Strategies

| Strategy | Description | Use Case |
|----------|-------------|----------|
| `TokenSplitter` | Split by token count | Fixed context windows |
| `SentenceSplitter` | Split at sentence boundaries | Natural language coherence |
| `RecursiveSplitter` | Hierarchical splitting | Complex documents |

### Configuration

| Parameter | Type | Description |
|-----------|------|-------------|
| `chunk_size` | int | Target chunk size |
| `chunk_overlap` | int | Overlap between chunks |
| `separator` | str | Splitting delimiter |

## Parsers Module

The parsers module provides utilities for extracting structured information from LLM responses. This is essential for turning raw model outputs into usable data.

### Supported Parsing Patterns

- JSON extraction from markdown-formatted responses
- Structured output parsing with Pydantic models
- Template-based response extraction

## RAG Pipeline Integration

The LLM xPack is designed to compose into complete RAG pipelines within Pathway:

```mermaid
graph TD
    A[Data Sources] --> B[Pathway Connectors]
    B --> C[Document Splitter]
    C --> D[Embedder]
    D --> E[Vector Store / Index]
    E --> F[Query Input]
    F --> G[Similarity Search]
    G --> H[Reranker]
    H --> I[LLM Prompt Assembly]
    I --> J[LLM Inference]
    J --> K[Response Parser]
    K --> L[Structured Output]
```

## Usage Example

```python
import pathway as pw
from pathway.xpacks.llm import OpenAIEmbedder, OpenAILanguageModel
from pathway.xpacks.llm.splitters import TokenSplitter

# Configure embedder
embedder = OpenAIEmbedder(model="text-embedding-3-small")

# Configure LLM
llm = OpenAILanguageModel(model="gpt-4o")

# Document splitting
splitter = TokenSplitter(chunk_size=512, chunk_overlap=50)

# Pipeline integration
class LLMQueryResult(pw.Result):
    response: str
    context: list[str]
```

## Error Handling and Resilience

The LLM xPack implements robust error handling for production deployments:

- **Automatic retry** with exponential backoff for transient failures
- **Timeout handling** to prevent pipeline stalls
- **Rate limiting compliance** for API-based providers
- **Graceful degradation** when optional components are unavailable

## Performance Considerations

| Aspect | Recommendation |
|--------|----------------|
| Batch Processing | Enable batching for multiple documents |
| Caching | Use for repeated queries |
| Async Operations | Prefer async APIs where supported |
| Memory | Monitor embedding vector memory usage |

## Configuration Sources

LLM xPack credentials and settings can be configured through:

1. **Environment variables** (recommended for production)
2. **Pathway config files** (YAML/JSON)
3. **Runtime parameters** (for development)

Common environment variables:

```bash
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=...
```

## See Also

- [Pathway Documentation](https://pathway.com/developers/)
- [LLM xPack User Guide](https://pathway.com/developers/user-guide/llm-xpack/)
- [RAG Pipeline Examples](https://github.com/pathwaycom/pathway/tree/main/examples)

---

---

## Doramagic 踩坑日志

项目：pathwaycom/pathway

摘要：发现 22 个潜在踩坑项，其中 2 个为 high/blocking；最高优先级：安装坑 - 来源证据：[Bug]: TypeError: Cannot instantiate typing.Any。

## 1. 安装坑 · 来源证据：[Bug]: TypeError: Cannot instantiate typing.Any

- 严重度：high
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：[Bug]: TypeError: Cannot instantiate typing.Any
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_d63b2462b18449c4a2c02bb5e02a96c8 | https://github.com/pathwaycom/pathway/issues/227 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 2. 安全/权限坑 · 来源证据：Entra Authentication Support (credential handler and/or password callback)

- 严重度：high
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Entra Authentication Support (credential handler and/or password callback)
- 对用户的影响：可能影响授权、密钥配置或安全边界。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e7c65f4d8bce4b1a9c99accbf0457633 | https://github.com/pathwaycom/pathway/issues/230 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 3. 安装坑 · 来源证据：Automated schema exploration in input connectors

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Automated schema exploration in input connectors
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_74b91feb391f4b0298216d2a48fe5abe | https://github.com/pathwaycom/pathway/issues/224 | 来源类型 github_issue 暴露的待验证使用条件。

## 4. 安装坑 · 来源证据：Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_ab5d2b2076034999bfaed612a3d95d60 | https://github.com/pathwaycom/pathway/issues/232 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 5. 安装坑 · 来源证据：Improve watermarks in POSIX-like objects tracker

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Improve watermarks in POSIX-like objects tracker
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_38f29b1fba3b41cc99b1364d15ef7213 | https://github.com/pathwaycom/pathway/issues/225 | 来源讨论提到 node 相关条件，需在安装/试用前复核。

## 6. 安装坑 · 来源证据：Persistence in `iterate` operator

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Persistence in `iterate` operator
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e642d639ebbc4382b242028ef89ec236 | https://github.com/pathwaycom/pathway/issues/214 | 来源类型 github_issue 暴露的待验证使用条件。

## 7. 安装坑 · 来源证据：Support LEANN for RAG pipelines

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Support LEANN for RAG pipelines
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_5d99dd1cfc794b299633b6c5dc9dd33b | https://github.com/pathwaycom/pathway/issues/173 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 8. 安装坑 · 来源证据：Support MongoDB Atlas in pw.io.mongodb

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Support MongoDB Atlas in pw.io.mongodb
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_fa1d6e2e977a45d99c414724681f0f32 | https://github.com/pathwaycom/pathway/issues/221 | 来源类型 github_issue 暴露的待验证使用条件。

## 9. 安装坑 · 来源证据：feat: Add Microsoft SQL Server (MSSQL) connector

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：feat: Add Microsoft SQL Server (MSSQL) connector
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_370bcc864a314734b602f01d3d444ef9 | https://github.com/pathwaycom/pathway/issues/204 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 10. 安装坑 · 来源证据：v0.27.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.27.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_8d6905d8719c488cbcbb821e794add26 | https://github.com/pathwaycom/pathway/releases/tag/v0.27.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 11. 安装坑 · 来源证据：v0.29.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.29.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_b25babd3e3cc49108e56daaaaae8c5bf | https://github.com/pathwaycom/pathway/releases/tag/v0.29.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 12. 安装坑 · 来源证据：v0.30.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：v0.30.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_0291ee068e4e4d38aa86d0fd433b960a | https://github.com/pathwaycom/pathway/releases/tag/v0.30.0 | 来源讨论提到 python 相关条件，需在安装/试用前复核。

## 13. 配置坑 · 来源证据：v0.28.0

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个配置相关的待验证问题：v0.28.0
- 对用户的影响：可能影响升级、迁移或版本选择。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_708184d9c8ac445d923c82d38af7bd19 | https://github.com/pathwaycom/pathway/releases/tag/v0.28.0 | 来源类型 github_release 暴露的待验证使用条件。

## 14. 配置坑 · 来源证据：v0.30.1

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个配置相关的待验证问题：v0.30.1
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_e7a19bd03a49499e987781160e4b76f0 | https://github.com/pathwaycom/pathway/releases/tag/v0.30.1 | 来源类型 github_release 暴露的待验证使用条件。

## 15. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | github_repo:571186647 | https://github.com/pathwaycom/pathway | README/documentation is current enough for a first validation pass.

## 16. 运行坑 · 来源证据：`pw.io.mssql.read` needs LSN persistence

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个运行相关的待验证问题：`pw.io.mssql.read` needs LSN persistence
- 对用户的影响：可能增加新用户试用和生产接入成本。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_59d927a667a843ccb7d0a4cd147a1dc0 | https://github.com/pathwaycom/pathway/issues/218 | 来源类型 github_issue 暴露的待验证使用条件。

## 17. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | last_activity_observed missing

## 18. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

## 19. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | github_repo:571186647 | https://github.com/pathwaycom/pathway | no_demo; severity=medium

## 20. 安全/权限坑 · 来源证据：Support encryption in Kinesis connectors

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安全/权限相关的待验证问题：Support encryption in Kinesis connectors
- 对用户的影响：可能影响授权、密钥配置或安全边界。
- 建议检查：来源问题仍为 open，Pack Agent 需要复核是否仍影响当前版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_f29097c3eced4a209e7c80971aeea40c | https://github.com/pathwaycom/pathway/issues/223 | 来源类型 github_issue 暴露的待验证使用条件。

## 21. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | issue_or_pr_quality=unknown

## 22. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | github_repo:571186647 | https://github.com/pathwaycom/pathway | release_recency=unknown

<!-- canonical_name: pathwaycom/pathway; human_manual_source: deepwiki_human_wiki -->
