# https://github.com/vitalops/datatune 项目说明书

生成时间：2026-05-15 09:50:50 UTC

## 目录

- [Introduction to Datatune](#page-introduction)
- [Getting Started](#page-getting-started)
- [System Architecture](#page-architecture)
- [Map Operations](#page-map-operations)
- [Filter Operations](#page-filter-operations)
- [Reduce Operations](#page-reduce-operations)
- [Dask Backend](#page-dask-backend)
- [Ibis Backend](#page-ibis-backend)
- [LLM Integration](#page-llm-integration)
- [Agent System](#page-agent-system)

<a id='page-introduction'></a>

## Introduction to Datatune

### 相关页面

相关主题：[Getting Started](#page-getting-started), [System Architecture](#page-architecture)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/vitalops/datatune/blob/main/README.md)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
</details>

# Introduction to Datatune

Datatune is an AI-powered data transformation library that enables natural language-driven data processing operations on large-scale datasets. The library leverages Large Language Models (LLMs) to interpret human-readable instructions and automatically generate the appropriate data transformations, making complex data manipulation accessible to users without deep technical expertise.

Datatune supports multiple backend computational frameworks including Dask for distributed computing and Ibis for SQL-like operations across various databases. The system provides both explicit transformation primitives (Map, Filter) and an intelligent Agent that can automatically determine and chain multiple transformation steps from a single natural language prompt.

## High-Level Architecture

```mermaid
graph TD
    User[User Request] --> Agent[Datatune Agent]
    User --> Primitives[Explicit Primitives]
    
    Agent --> Planner[Planning Module]
    Planner --> Steps[Transformation Steps]
    Steps --> DaskOps[Dask Operations]
    Steps --> PrimOps[Primitive Operations]
    Steps --> CodeGen[Code Generation]
    
    Primitives --> MapOp[Map Operation]
    Primitives --> FilterOp[Filter Operation]
    
    MapOp --> LLM[LLM Interface]
    FilterOp --> LLM
    
    LLM --> OpenAI[OpenAI Provider]
    LLM --> Ollama[Ollama Provider]
    LLM --> VLLM[VLLM Provider]
    LLM --> Azure[Azure Provider]
    
    DaskOps --> Dask[Dask DataFrame]
    PrimOps --> Dask
    
    MapOp --> Ibis[Ibis Backend]
    FilterOp --> Ibis
    
    Ibis --> DuckDB[DuckDB]
    Ibis --> PostgreSQL[PostgreSQL]
    Ibis --> BigQuery[BigQuery]
```

The architecture follows a layered approach where the LLM abstraction layer provides a unified interface for multiple AI providers, while the transformation layer handles both explicit user-defined operations and AI-generated execution plans.

## Core Components

### LLM Interface Layer

The LLM module (`datatune/llm/llm.py`) provides a unified interface for interacting with various language model providers. The base `LLM` class handles rate limiting, token counting, and request batching, while provider-specific subclasses implement the actual API calls.

#### Supported LLM Providers

| Provider | Class | Default Model | Description |
|----------|-------|---------------|-------------|
| OpenAI | `OpenAI` | gpt-3.5-turbo | Standard OpenAI API integration |
| Ollama | `Ollama` | gemma3:4b | Local LLM server support |
| VLLM | `VLLM` | User-specified | High-performance vLLM inference |
| Azure | `Azure` | User-specified | Azure OpenAI Service |

The base `LLM` class automatically extracts rate limit configurations from `model_rate_limits.py` based on the model name. If a model is not found in the predefined limits, it defaults to GPT-3.5-turbo limits with a warning message.

```python
from datatune.llm.llm import OpenAI, Ollama, Azure

# OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")

# Ollama (local)
llm = Ollama()

# Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)
```

资料来源：[README.md:40-56]()

### Rate Limiting Configuration

The `model_rate_limits.py` module defines tokens-per-minute (TPM) and requests-per-minute (RPM) limits for supported models:

| Model Family | TPM | RPM |
|--------------|-----|-----|
| GPT-3.5-Turbo | 200,000 | 500 |
| GPT-4 | 10,000 - 30,000 | 500 |
| GPT-4.1 | 30,000 - 200,000 | 500 |
| GPT-4.5-Preview | 125,000 | 1000 |

资料来源：[datatune/llm/model_rate_limits.py:1-120]()

## Transformation Primitives

Datatune provides two fundamental transformation primitives that work with natural language prompts:

### Map Operation

The `Map` primitive creates new columns by applying LLM-driven transformations to existing data. It takes input from specified columns and generates output fields based on the natural language prompt.

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

The Map operation constructs prompts with the following structure:

```
Your task is to MAP/CREATE new fields based on the following input record(s):
[input data]

Your response MUST be the entire input record as a valid Python dictionary in the format
'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys of expected new fields.
```

资料来源：[datatune/core/dask/map_dask.py:1-50]()
资料来源：[datatune/core/ibis/map_ibis.py:1-60]()

### Filter Operation

The `Filter` primitive removes rows that do not meet specified conditions interpreted by the LLM:

```python
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

The Filter operation adds a `__filter__` key to each record, with `True` indicating the row should be kept and `False` indicating removal.

```
FILTERING CRITERIA:
[prompt]

DECISION: Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...} with added key called '__filter__' 
with value either True to KEEP the record or False to REMOVE it.
```

资料来源：[datatune/core/dask/filter_dask.py:1-45]()
资料来源：[datatune/core/ibis/filter_ibis.py:1-50]()

## Intelligent Agent

The `Agent` class provides a higher-level interface that automatically determines which operations to perform based on natural language requests. The agent can chain multiple Map, Filter, and Python code operations into a coherent execution plan.

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
```

### Agent System Prompt

The Agent uses a structured system prompt that defines available operations and expected response formats:

```python
system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
- pandas
- numpy
- dask

In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
```

### Plan Execution Flow

```mermaid
graph TD
    Request[Natural Language Request] --> Parse[Parse Request]
    Parse --> Plan[Generate Execution Plan]
    Plan --> Step1[Step 1: Dask/Primitive]
    Step1 --> Step2[Step 2: Dask/Primitive]
    Step2 --> StepN[Step N: ...]
    StepN --> Execute[Execute Full Plan]
    Execute --> Result[Finalized DataFrame]
    
    Plan --> StepTypes{Step Type}
    StepTypes -->|"dask"| DaskTemplate[Dask Template]
    StepTypes -->|"primitive"| PrimTemplate[Primitive Template]
```

The agent generates plans as JSON arrays with the following structure:

```json
[
  {
    "type": "dask|primitive",
    "operation": "operation_name",
    "params": {},
    "subprompt": "LLM prompt for primitive operations",
    "input_fields": ["column1"],
    "output_fields": ["new_column"]
  }
]
```

资料来源：[datatune/agent/__init__.py:1-45]()
资料来源：[datatune/agent/agent.py:1-100]()

## Data Source Backends

Datatune supports multiple computational backends for processing data at scale:

### Dask Backend

The Dask backend provides distributed computing capabilities for pandas-like DataFrames:

```python
import dask.dataframe as dd
df = dd.read_csv("data.csv")
```

Dask operations include:
- `add_column`: Create new columns from expressions
- `apply_function`: Apply element-wise functions
- `rename_columns`: Rename using mappings
- `astype_column`: Change data types

### Ibis Backend

The Ibis backend enables SQL-like operations across various database backends:

```python
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
```

Supported Ibis backends include DuckDB, PostgreSQL, and BigQuery.

资料来源：[README.md:60-75]()

## Deduplication System

Datatune includes a sophisticated deduplication system that uses embedding-based similarity detection:

```mermaid
graph LR
    Input[Input Data] --> Embed[Embedding Generation]
    Embed --> Index[FAISS Index Build]
    Index --> Cluster[Similarity Clustering]
    Cluster --> Dedup[Deduplicated Output]
```

Key parameters:
| Parameter | Default | Description |
|-----------|---------|-------------|
| `embedding_model` | text-embedding-3-small | Model for generating embeddings |
| `sim_threshold` | 0.90 | Similarity threshold for clustering |
| `top_k` | 50 | Top-K neighbors for clustering |
| `hnsw_m` | 32 | HNSW index construction parameter |
| `ef_search` | 64 | HNSW search parameter |

The system uses FAISS HNSW (Hierarchical Navigable Small World) indexes for efficient similarity search at scale.

资料来源：[datatune/core/deduplication.py:1-100]()

## Workflow Summary

```mermaid
graph LR
    A[Load Data] --> B[Choose Interface]
    B --> C[Agent Mode]
    B --> D[Primitives Mode]
    C --> E[Natural Language]
    D --> F[Explicit Operations]
    E --> G[Auto Plan]
    F --> H[Manual Plan]
    G --> I[Execute]
    H --> I
    I --> J[LLM Processing]
    J --> K[Finalize]
    K --> L[Output]
```

1. **Load Data**: Import data into Dask or Ibis DataFrames
2. **Choose Interface**: Use Agent for automatic planning or Primitives for explicit control
3. **Define Operations**: Write natural language prompts or configure operations
4. **Execute**: Process data with LLM integration
5. **Finalize**: Convert lazy operations to computed results

资料来源：[README.md:1-50]()

## Installation and Quick Start

```bash
pip install datatune
```

The complete workflow involves initializing an LLM, loading data, applying transformations, and finalizing results:

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Apply transformations
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# Save results
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

资料来源：[README.md:20-55]()

---

<a id='page-getting-started'></a>

## Getting Started

### 相关页面

相关主题：[Introduction to Datatune](#page-introduction)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/vitalops/datatune/blob/main/README.md)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
</details>

# Getting Started

Datatune is a data transformation library that leverages Large Language Models (LLMs) to perform complex data operations using natural language prompts. It enables users to transform, filter, map, and process data without writing extensive custom code.

## Installation

Install datatune using pip:

```bash
pip install datatune
```

资料来源：[README.md:1]()

## Core Concepts

Datatune provides two primary interfaces for data transformation:

| Interface | Description | Use Case |
|-----------|-------------|----------|
| **Primitives** | Direct operations (Map, Filter) | Single, focused transformations |
| **Agent** | AI-powered automatic planning | Complex multi-step workflows |

资料来源：[README.md:20-30]()

## Quick Start

### Basic Setup

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
```

资料来源：[README.md:26-32]()

### Map Operation

Use `dt.map()` to extract or derive new columns from existing data:

```python
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

资料来源：[README.md:34-41]()

### Filter Operation

Use `dt.filter()` to keep only rows matching criteria:

```python
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

资料来源：[README.md:44-49]()

### Finalize and Save

```python
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

资料来源：[README.md:51-53]()

## Supported LLMs

Datatune supports multiple LLM providers through a unified interface.

### OpenAI

```python
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
```

资料来源：[README.md:58-61]()

### Ollama (Local Models)

```python
from datatune.llm.llm import Ollama
llm = Ollama()
```

Default configuration:
- Model: `gemma3:4b`
- API Base: `http://localhost:11434`

资料来源：[README.md:63-65]()

### Azure OpenAI

```python
from datatune.llm.llm import Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)
```

资料来源：[README.md:67-70]()

### Additional Providers

| Provider | Class | Notes |
|----------|-------|-------|
| Mistral | `Mistral` | Requires `mistral/` prefix in model name |
| Huggingface | `Huggingface` | Requires `huggingface/` prefix |
| VLLM | `VLLM` | Auto-detects max token limit |

资料来源：[datatune/llm/llm.py:1-150]()

## Agent Mode

The Agent provides an intelligent, automated approach to data transformation. It automatically determines which operations to use and chains multiple transformations from a single natural language prompt.

### Basic Agent Usage

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
```

资料来源：[README.md:74-82]()

### Agent Capabilities

The agent automatically:

- Determines which operations to use (map, filter, etc.)
- Chains multiple transformations
- Handles complex multi-step tasks from a single prompt
- Generates and executes Python code along with row-level primitives (Map, Filter, etc) if required

资料来源：[README.md:84-88]()

### Agent System Prompt

The agent has access to:

- Python builtins
- pandas
- numpy
- dask
- datatune library primitives (Map, Filter)

资料来源：[datatune/agent/__init__.py:1-35]()

## Data Sources

Datatune supports multiple data processing backends.

### Dask DataFrames

```python
import dask.dataframe as dd
df = dd.read_csv("data.csv")
```

### Ibis Backend

Ibis provides connectivity to multiple databases:

```python
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
```

资料来源：[README.md:93-102]()

## Architecture Overview

```mermaid
graph TD
    A[User Prompt] --> B[Datatune API]
    B --> C{Operation Type}
    C -->|Map| D[map_dask.py / map_ibis.py]
    C -->|Filter| E[filter_dask.py / filter_ibis.py]
    C -->|Agent| F[Agent Planning]
    D --> G[LLM Batch Processing]
    E --> G
    F -->|Dask Ops| H[Dask Templates]
    F -->|Primitives| I[Map / Filter Primitives]
    G --> J[Output DataFrame]
    H --> J
    I --> J
```

## Rate Limits Configuration

When using LLMs, datatune respects model-specific rate limits:

| Parameter | Description |
|-----------|-------------|
| `tpm` | Tokens per minute |
| `rpm` | Requests per minute |

资料来源：[datatune/llm/model_rate_limits.py:1-50]()

Default rate limits for unknown models fall back to `gpt-3.5-turbo` settings with a warning logged.

资料来源：[datatune/llm/llm.py:20-35]()

## Primitive Operations

### Map Operation

Creates new columns by applying LLM-based transformations to input columns.

**Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `prompt` | str | Natural language description of the transformation |
| `input_fields` | list | Input column names |
| `output_fields` | list | Names for new columns |

### Filter Operation

Removes rows that don't meet the specified criteria.

**Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `prompt` | str | Natural language filtering criteria |
| `input_fields` | list | Columns to evaluate |

## Next Steps

- **[Documentation](https://docs.datatune.ai/)** - Complete guides and API reference
- **[Examples](https://github.com/vitalops/datatune/tree/main/examples)** - Real-world use cases
- **[Discord](https://discord.gg/3RKA5AryQX)** - Community support
- **[Issues](https://github.com/vitalops/datatune/issues)** - Report bugs and request features

资料来源：[README.md:105-113]()

---

<a id='page-architecture'></a>

## System Architecture

### 相关页面

相关主题：[Dask Backend](#page-dask-backend), [Ibis Backend](#page-ibis-backend), [LLM Integration](#page-llm-integration)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
</details>

# System Architecture

## Overview

Datatune is a data transformation framework that leverages Large Language Models (LLMs) to perform semantic operations on distributed data. The system provides a declarative interface for common data manipulation tasks like mapping, filtering, and deduplication while abstracting the complexity of distributed computing and LLM interaction.

The architecture follows a modular design with three primary layers:

| Layer | Purpose | Key Components |
|-------|---------|----------------|
| **LLM Layer** | Model abstraction and rate limiting | `LLM`, `OpenAI`, `Ollama`, `VLLM`, `Azure` |
| **Core Operations Layer** | Data transformation primitives | `Map`, `Filter`, `Reduce`, `Deduplication` |
| **Agent Layer** | High-level orchestration and planning | `Agent`, Plan execution engine |

资料来源：[datatune/llm/llm.py:1-100]()

## High-Level Architecture Diagram

```mermaid
graph TB
    subgraph "Client Layer"
        User[User Code]
    end
    
    subgraph "LLM Layer"
        LLM[Base LLM Class]
        OpenAI[OpenAI Provider]
        Ollama[Ollama Provider]
        VLLM[VLLM Provider]
        Azure[Azure Provider]
        RateLimits[model_rate_limits]
    end
    
    subgraph "Core Operations Layer"
        Map[Map Operation]
        Filter[Filter Operation]
        Reduce[Reduce Operation]
        Dedupe[Deduplication]
    end
    
    subgraph "Data Backend Layer"
        Dask[Dask DataFrame]
        Ibis[Ibis (DuckDB, PostgreSQL, BigQuery)]
    end
    
    subgraph "Agent Layer"
        Agent[Agent Orchestrator]
        Planner[Plan Generator]
        Executor[Plan Executor]
    end
    
    User --> Agent
    User --> Map
    User --> Filter
    
    Agent --> Planner
    Agent --> Executor
    Agent --> LLM
    
    Map --> LLM
    Filter --> LLM
    
    Map --> Dask
    Map --> Ibis
    Filter --> Dask
    Filter --> Ibis
    
    LLM --> RateLimits
```

## LLM Layer

The LLM layer provides a unified interface for interacting with various LLM providers while handling rate limiting and token counting.

### Class Hierarchy

```mermaid
graph TD
    LLM[LLM Base Class] --> Ollama[Ollama]
    LLM --> OpenAI[OpenAI]
    LLM --> VLLM[VLLM]
    LLM --> Azure[Azure]
```

### Base LLM Class

The `LLM` class serves as the foundation for all LLM providers. It handles model naming, rate limit configuration, and request batching.

| Parameter | Type | Description |
|-----------|------|-------------|
| `model_name` | `str` | Model identifier in format `provider/model` (e.g., `openai/gpt-3.5-turbo`) |
| `tpm` | `int` | Tokens per minute limit |
| `rpm` | `int` | Requests per minute limit |
| `api_key` | `str` | API authentication key |
| `api_base` | `str` | Base URL for API endpoint |

资料来源：[datatune/llm/llm.py:70-95]()

### Provider Implementations

**OpenAI Provider**

```python
class OpenAI(LLM):
    def __init__(
        self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
    ):
        kwargs.update({"api_key": api_key})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)
```

资料来源：[datatune/llm/llm.py:96-103]()

**Ollama Provider (Local)**

```python
class Ollama(LLM):
    def __init__(
        self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
    ) -> None:
        super().__init__(
            model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
        )
```

资料来源：[datatune/llm/llm.py:81-87]()

**VLLM Provider**

The VLLM provider automatically retrieves the `max_model_len` from the API and configures token limits accordingly.

```python
class VLLM(LLM):
    def __init__(self, model_name: str, api_base: str = "http://localhost:8000/v1", **kwargs):
        import httpx
        resp = httpx.get(f"{api_base}/models")
        max_model_len = resp.json()["data"][0]["max_model_len"]
        kwargs.update({"api_base": api_base, "api_key": "dummy", "max_tokens": max_model_len})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)
```

资料来源：[datatune/llm/llm.py:105-118]()

### Rate Limiting

The system maintains predefined rate limits for various models in `model_rate_limits.py`:

| Model Family | TPM | RPM |
|--------------|-----|-----|
| `gpt-3.5-turbo` | 200,000 | 500 |
| `gpt-4` | 10,000 | 500 |
| `gpt-4-turbo` | 30,000 | 500 |
| `gpt-4.1-mini/nano` | 200,000 | 500 |

资料来源：[datatune/llm/model_rate_limits.py:1-50]()

## Core Operations Layer

The core operations layer provides primitive transformations that can be applied to distributed data.

### Map Operation

The `Map` operation creates new columns by applying LLM-based transformations to existing data.

```mermaid
graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[Batch to LLM]
    C --> D[Parse Responses]
    D --> E[Extract New Columns]
    E --> F[Output DataFrame]
```

**Input Format**: Rows are serialized to Python dictionaries with the following structure:

```
index=<row_index>|{key1: value1, key2: value2, ...}
```

**Output Format**: LLM returns dictionaries with original keys plus new field keys:

```
index=<row_index>|{key1: value1, key2: value2, new_field: new_value}
```

资料来源：[datatune/core/dask/map_dask.py:1-50]()

**Key Features**:

- Handles duplicate rows via canonical mapping
- Supports batched API calls for efficiency
- Falls back to canonical row values for duplicates

### Filter Operation

The `Filter` operation removes rows that do not meet specified criteria using LLM-based evaluation.

```mermaid
graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[LLM Decision]
    C --> D{Keep Row?}
    D -->|Yes| E[Include in Output]
    D -->|No| F[Exclude Row]
```

**Decision Format**: Each row receives a `__filter__` key:

```
index=<row_index>|{..., __filter__: True/False}
```

资料来源：[datatune/core/dask/filter_dask.py:1-50]()

### Deduplication

The deduplication system uses embeddings and FAISS for efficient similarity search:

| Component | Description |
|-----------|-------------|
| `embedding_model` | Model for generating text embeddings |
| `sim_threshold` | Minimum similarity score (default: 0.90) |
| `top_k` | Number of neighbors to search |
| `hnsw_m` | HNSW index parameter for m |
| `ef_search` | HNSW search parameter |

**Workflow**:

1. Embed column values in batches of 256
2. Build FAISS HNSW index per partition
3. Search for similar records above threshold
4. Cluster duplicates with canonical ID

资料来源：[datatune/core/deduplication.py:1-80]()

## Agent Layer

The Agent layer provides high-level orchestration, automatically determining which operations to use and generating execution plans.

```mermaid
graph TD
    A[User Goal] --> B[Plan Generator]
    B --> C[JSON Plan]
    C --> D[Plan Executor]
    D --> E[Step 1: Operation]
    D --> F[Step 2: Operation]
    D --> G[Step N: Operation]
    E --> H[Finalize]
    F --> H
    G --> H
    H --> I[Computed Result]
```

### Plan Structure

Plans are generated as JSON arrays of steps:

```json
[
  {
    "type": "primitive",
    "operation": "map",
    "params": {
      "subprompt": "Extract category from industry",
      "input_fields": ["Industry"],
      "output_fields": ["Category"]
    }
  },
  {
    "type": "dask",
    "operation": "add_column",
    "params": {
      "new_column": "Year",
      "expression": "df['Date'].dt.year"
    }
  }
]
```

资料来源：[datatune/agent/agent.py:80-120]()

### Step Types

| Type | Description | Operations |
|------|-------------|------------|
| `primitive` | LLM-based transformations | `map`, `filter` |
| `dask` | Native Dask operations | `add_column`, `group_by`, `apply_function`, `rename_columns`, `astype_column` |

### Template System

Templates define how operations are translated to executable code:

```python
TEMPLATE = {
    "primitive": {
        "Map": "df = dt.Map(...)(self.llm, df)",
        "Filter": "df = dt.Filter(...)(self.llm, df)"
    },
    "dask": {
        "add_column": "df = df.assign({new_column}=lambda x: {expression})",
        "group_by_agg": "df = df.groupby({group_columns}).agg({aggregations})"
    }
}
```

资料来源：[datatune/agent/agent.py:40-70]()

## Data Backend Support

### Dask Integration

Datatune primarily uses Dask for distributed computing. Operations are executed partition-by-partition with lazy evaluation:

```python
# Example: Map operation on Dask DataFrame
df = dd.read_csv("data.csv")
mapped = dt.map(
    prompt="Extract sentiment from text",
    output_fields=["sentiment"],
    input_fields=["text"]
)(llm, df)

# Finalize and compute
result = dt.finalize(mapped).compute()
```

### Ibis Integration

Ibis provides support for multiple backends (DuckDB, PostgreSQL, BigQuery):

```python
# DuckDB example
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

# Apply map operation
mapped_table = dt.map(
    prompt="Extract category",
    output_fields=["category"],
    input_fields=["description"]
)(llm, table)
```

资料来源：[datatune/core/ibis/map_ibis.py:1-50]()

## Request Batching

The LLM layer implements intelligent batching to optimize API usage:

```mermaid
graph TD
    A[Input Rows] --> B[Calculate Token Budget]
    B --> C{Within Limits?}
    C -->|Yes| D[Batch All]
    C -->|No| E[Split into Batches]
    E --> F[Execute Batch 1]
    E --> G[Execute Batch 2]
    F --> H[Merge Results]
    G --> H
    D --> H
```

**Token Calculation**:

- Prefix/suffix tokens are calculated via `token_counter`
- `nrows_per_api_call` determines optimal batch sizes
- Rate limits (TPM/RPM) are enforced per model

资料来源：[datatune/llm/llm.py:30-70]()

## Error Handling

The system implements robust error handling at multiple levels:

| Layer | Error Handling |
|-------|----------------|
| **Agent** | Catches execution errors, returns step number and error message |
| **Core Operations** | Gracefully handles malformed LLM responses with fallback to empty dict |
| **LLM** | Retries with exponential backoff on transient failures |

```python
def parse_filter_output(output: Union[str, Exception], err: bool = True) -> Optional[bool]:
    try:
        d = ast.literal_eval(str(output).strip())
        if d:
            last_key = list(d.keys())[-1]
            flag = d[last_key]
            return flag
    except Exception:
        return None
```

资料来源：[datatune/core/dask/filter_dask.py:50-65]()

## Configuration Options

### LLM Configuration

| Option | Default | Description |
|--------|---------|-------------|
| `model_name` | `gpt-3.5-turbo` | Model identifier |
| `tpm` | Model-specific | Tokens per minute limit |
| `rpm` | Model-specific | Requests per minute limit |
| `api_key` | `None` | API authentication |
| `api_base` | Provider-specific | API endpoint URL |

### Operation Configuration

| Option | Description |
|--------|-------------|
| `prompt` | Natural language instruction |
| `input_fields` | Columns to use as input |
| `output_fields` | New columns to create (Map only) |
| `optimized` | Enable batching optimization |

## Summary

Datatune's system architecture provides:

1. **Provider Abstraction**: Unified interface across OpenAI, Ollama, VLLM, and Azure
2. **Rate Limiting**: Automatic enforcement of TPM/RPM limits per model
3. **Distributed Execution**: Partition-based processing for large datasets
4. **Multi-Backend Support**: Native support for Dask and Ibis backends
5. **Intelligent Planning**: Agent-based orchestration for complex multi-step transformations
6. **Semantic Operations**: LLM-powered Map and Filter for text understanding
7. **Deduplication**: Embedding-based similarity search with FAISS

---

<a id='page-map-operations'></a>

## Map Operations

### 相关页面

相关主题：[Filter Operations](#page-filter-operations), [Reduce Operations](#page-reduce-operations), [Dask Backend](#page-dask-backend)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/map.py](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
</details>

# Map Operations

Map Operations in Datatune enable LLM-powered column transformations on distributed dataframes. Users describe desired transformations in natural language, and the system generates new columns by leveraging large language models to process each row semantically.

## Overview

The Map operation is a core primitive in Datatune that bridges natural language instructions with data transformations. It accepts a user-defined prompt describing what new columns to create, identifies relevant input columns, and produces one or more output columns populated by LLM inference. 资料来源：[datatune/__init__.py:3]()

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

资料来源：[README.md:18-29]()

## Architecture

The Map system is designed with a pluggable backend architecture, supporting both Dask and Ibis execution contexts. Each backend implements the same logical interface while adapting to its specific dataframe API.

### Component Flow

```mermaid
graph TD
    A[User Prompt + Input/Output Fields] --> B[map Function Entry Point]
    B --> C{Backend Type}
    C -->|Dask| D[map_dask.py]
    C -->|Ibis| E[map_ibis.py]
    D --> F[LLM Batch Processor]
    E --> F
    F --> G[Parse LLM Output]
    G --> H[Merge Results to DataFrame]
    H --> I[Output DataFrame with New Columns]
```

### Backend Implementations

| Backend | File | Purpose |
|---------|------|---------|
| Dask | `datatune/core/dask/map_dask.py` | Distributed DataFrame operations using Dask |
| Ibis | `datatune/core/ibis/map_ibis.py` | SQL-like backend supporting DuckDB, PostgreSQL, BigQuery |

## API Reference

### Function Signature

```python
def map(
    prompt: str,
    output_fields: List[str],
    input_fields: Optional[List[str]] = None,
    **kwargs
) -> Callable
```

### Parameters

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `prompt` | `str` | Yes | Natural language description of the transformation to apply |
| `output_fields` | `List[str]` | Yes | Names of columns to create in the output |
| `input_fields` | `List[str]` | Optional | Input columns to pass to the LLM; if omitted, all columns are used |

资料来源：[datatune/core/dask/map_dask.py:1-50]()

### Return Value

Returns a callable that accepts `(llm, dataframe)` and returns a transformed dataframe with the new columns appended.

## Dask Backend Implementation

### Data Flow

```mermaid
graph LR
    A[Partitioned DataFrame] --> B[Serialize Input Columns]
    B --> C[Batch Rows to LLM]
    C --> D[LLM Inference]
    D --> E[Parse Dictionary Output]
    E --> F[Assign to Output Columns]
    F --> G[Merge Canonical Results to Duplicates]
    G --> H[Return Partitioned DataFrame]
```

### Prompt Construction

The Dask implementation constructs prompts using a prefix-suffix pattern to ensure consistent LLM responses.

**Prefix Structure:**
```
Use the following context to create new columns from existing columns.
```

资料来源：[datatune/core/dask/map_dask.py:10-15]()

**Suffix Structure:**
```python
suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
     
    "ALWAYS START YOUR RESPONSE WITH 'index=<row_index>|' WHERE <row_index> IS THE INDEX OF THE ROW." \
    "IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO null" \
    "'index=<row_index>|{key1: None, key2: value2, ...}'"
)
```

资料来源：[datatune/core/dask/map_dask.py:22-30]()

### Duplicate Handling

The Dask Map implementation includes sophisticated duplicate handling for semantic deduplication scenarios. When clusters of duplicate rows exist, only the canonical row is sent to the LLM, and results are propagated to duplicates.

```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]

llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)

df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/map_dask.py:35-50]()

### Output Parsing

The LLM output is parsed using `parse_llm_output`, which converts the string response to a Python dictionary.

```python
def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
    """
    Parses the LLM output string into a Python dictionary.
    """
```

资料来源：[datatune/core/dask/map_dask.py:75-85]()

## Ibis Backend Implementation

### Data Flow

```mermaid
graph LR
    A[Ibis Table] --> B[Add Row ID Column]
    B --> C[Execute to Local Data]
    C --> D[Convert to List]
    D --> E[Batch to LLM]
    E --> F[Parse Results]
    F --> G[Create MemTable]
    G --> H[Join Back to Original Table]
```

### Row Indexing

Ibis operations require explicit row indexing since SQL tables don't maintain native pandas-like indices.

```python
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))

local_data = indexed_table.select("_ROW_ID_", input_col).execute()
    
input_list = local_data[input_col].tolist()
```

资料来源：[datatune/core/ibis/map_ibis.py:25-32]()

### Result Processing

Raw LLM outputs are parsed and converted to JSON strings for storage:

```python
processed_results = []
for res in raw_results:
    try:
        py_dict = ast.literal_eval(str(res).strip())
        processed_results.append(json.dumps(py_dict))
    except Exception:
        processed_results.append("{}")
```

资料来源：[datatune/core/ibis/map_ibis.py:45-52]()

## LLM Integration

### Supported LLM Providers

| Provider | Class | Default Model |
|----------|-------|---------------|
| OpenAI | `OpenAI` | `gpt-3.5-turbo` |
| Ollama | `Ollama` | `gemma3:4b` |
| Azure | `Azure` | Configurable |
| vLLM | `VLLM` | Configurable |

资料来源：[datatune/llm/llm.py:1-60]()

### Batching and Rate Limiting

The LLM layer implements token and request rate limiting based on model-specific constraints defined in `model_rate_limits.py`.

```python
model_rate_limits = {
    "gpt-3.5-turbo": {
        "tpm": 200_000,
        "rpm": 500,
    },
    "gpt-4": {
        "tpm": 10_000,
        "rpm": 500,
    },
    "gpt-4o": {
        "tpm": 30_000,
        "rpm": 500,
    },
}
```

资料来源：[datatune/llm/model_rate_limits.py:1-20]()

### Batch Processing

The LLM batches multiple rows into single API calls for efficiency:

```python
for i, prompt in enumerate(input_rows):
    q  # Batch construction continues...
```

资料来源：[datatune/llm/llm.py:30-45]()

## Agent Integration

The Datatune Agent can automatically generate Map operations as part of multi-step data transformation pipelines.

### Agent System Prompt

```python
class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    Map Example:
    ```python
    import datatune as dt
    import dask.dataframe as dd
    df = dd.read_csv("path/to/data.csv")
    map = dt.Map(prompt="Your prompt here")
    llm = dt.LLM(model_name="gpt-3.5-turbo")
    mapped_df = map(llm, df)
    mapped_df = dt.finalize(mapped_df)
    ```
    """
```

资料来源：[datatune/agent/__init__.py:1-30]()

### Plan Generation

When the Agent generates a plan requiring Map operations, it creates steps with the following structure:

```python
{
    "type": "primitive",
    "operation": "map",
    "params": {
        "subprompt": "Extract category and sub-category from industry",
        "input_fields": ["Industry"],
        "output_fields": ["Category","Sub-Category"]
    },
}
```

资料来源：[datatune/agent/agent.py:50-65]()

## Usage Examples

### Basic Column Extraction

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-4")
df = dd.read_csv("customers.csv")

# Extract sentiment and category from review text
mapped = dt.map(
    prompt="Analyze the customer feedback and extract sentiment (positive/negative/neutral) and main topic",
    output_fields=["Sentiment", "Topic"],
    input_fields=["CustomerFeedback"]
)(llm, df)

result = dt.finalize(mapped)
result.compute().to_csv("enriched_customers.csv")
```

### Multi-Column Generation

```python
# Generate multiple derived columns in a single Map operation
mapped = dt.map(
    prompt="Parse the product description to identify: 1) the material, 2) primary color, 3) target demographic",
    output_fields=["Material", "Color", "Demographic"],
    input_fields=["ProductDescription", "ProductName"]
)(llm, df)
```

### With Agent

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-4")
agent = dt.Agent(llm)

df = dd.read_csv("data.csv")
# Agent automatically determines when to use Map operations
result = agent.do("Add ProfitMargin column and categorize by revenue tier", df)
```

资料来源：[README.md:35-45]()

## Best Practices

### Input Field Selection

| Practice | Rationale |
|----------|-----------|
| Provide specific input fields | Reduces token usage and improves accuracy |
| Include context-rich columns | Helps LLM understand the domain |
| Avoid overly large inputs | Prevents token limit issues |

### Prompt Engineering

- Be explicit about the expected output format in output field names
- Include examples in prompts for complex transformations
- Break complex extractions into multiple Map operations when needed

### Performance Considerations

| Factor | Impact | Recommendation |
|--------|--------|----------------|
| Batch size | Throughput vs. latency | Use default batching for most cases |
| Model selection | Quality vs. speed | Use `gpt-3.5-turbo` for bulk, `gpt-4` for complex tasks |
| Partition count | Parallelism | Match to cluster size for Dask operations |

## See Also

- [Filter Operations](Filter.md) - Row-level filtering using natural language
- [Agent](Agent.md) - Automatic operation selection and chaining
- [Deduplication](Deduplication.md) - Semantic deduplication with Map integration
- [Supported LLMs](LLMs.md) - Complete list of supported providers

---

<a id='page-filter-operations'></a>

## Filter Operations

### 相关页面

相关主题：[Map Operations](#page-map-operations), [Reduce Operations](#page-reduce-operations), [Ibis Backend](#page-ibis-backend)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/filter.py](https://github.com/vitalops/datatune/blob/main/datatune/core/filter.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
</details>

# Filter Operations

Filter Operations in Datatune enable intelligent, LLM-powered row filtering based on natural language criteria. Instead of writing explicit boolean conditions, users describe what rows to keep in plain English, and the LLM determines which records match the specified criteria.

## Overview

The Filter module provides a declarative interface for semantic row-level filtering of data. It supports multiple backends (Dask DataFrames and Ibis tables) and uses LLMs to make filtering decisions based on natural language prompts.

Key capabilities:
- Natural language filter criteria specification
- Multi-backend support (Dask, Ibis)
- Semantic deduplication integration
- Parallelized LLM inference
- Structured output parsing

## Architecture

The Filter system follows a dispatcher pattern that routes to backend-specific implementations based on the input data type.

```mermaid
graph TD
    A[User calls filter prompt, input_fields] --> B[datatune.filter function]
    B --> C{Data Type Detection}
    C -->|dask.dataframe.DataFrame| D[_filter_dask]
    C -->|ibis.Table| E[_filter_ibis]
    D --> F[LLM Batch Processing]
    E --> F
    F --> G[Parse Filter Output]
    G --> H[Apply Boolean Mask]
    H --> I[Filtered DataFrame/Table]
```

### Component Overview

| Component | File | Responsibility |
|-----------|------|----------------|
| `filter()` | `datatune/core/filter.py` | Main entry point, type dispatch |
| `_filter_dask()` | `datatune/core/dask/filter_dask.py` | Dask DataFrame filtering |
| `_filter_ibis()` | `datatune/core/ibis/filter_ibis.py` | Ibis table filtering |
| LLM Integration | `datatune/llm/llm.py` | Batch inference and token management |

## API Reference

### `dt.filter()`

Main filter function exported from `datatune`.

```python
def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        # Implementation dispatch
        ...
    return apply
```

#### Parameters

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `prompt` | `str` | Yes | Natural language description of filter criteria |
| `input_fields` | `List[str]` | No | Columns to use for filtering decisions |
| `clusters` | `List[Dict]` | No | Semantic deduplication clusters for propagating filter decisions |

#### Returns

Returns a callable that accepts `(llm, data)` and returns a filtered DataFrame or Table.

## Dask Implementation

The Dask backend processes rows in parallel across partitions, leveraging the Dask execution graph for distributed computation.

### Source Files

- Main implementation: `datatune/core/dask/filter_dask.py`
- Dispatcher: `datatune/core/filter.py`

### Workflow

```mermaid
graph LR
    A[Input DataFrame] --> B[Serialize Input Columns]
    B --> C[Handle Clusters]
    C --> D[Extract Canonical Indices]
    D --> E[LLM Batch Inference]
    E --> F[Parse Boolean Output]
    F --> G[Propagate to Duplicates]
    G --> H[Apply Boolean Mask]
```

### Key Functions

#### `_filter_dask()`

Core filtering function for Dask DataFrames:

```python
def _filter_dask(
    prompt: str,
    input_fields: Optional[List[str]] = None,
    clusters: Optional[List[Dict]] = None,
):
    # Validates inputs, constructs prompts, invokes LLM
```

#### `parse_filter_output()`

Parses LLM output string into boolean values:

```python
def parse_filter_output(
    output: Union[str, Exception], 
    err: bool = True
) -> Optional[bool]:
```

## Ibis Implementation

The Ibis backend executes filtering operations at the database level, enabling pushdown to various SQL backends (DuckDB, PostgreSQL, BigQuery, etc.).

### Source Files

- Implementation: `datatune/core/ibis/filter_ibis.py`

### Workflow

```mermaid
graph TD
    A[Input Table] --> B[Add Row ID Column]
    B --> C[Select Input Columns]
    C --> D[Execute to Local]
    D --> E[LLM Batch Inference]
    E --> F[Parse Filter Results]
    F --> G[Build Boolean Filter]
    G --> H[Filter Original Table]
    H --> I[Remove Row ID Column]
```

### Key Functions

#### `_filter_ibis()`

Core filtering function for Ibis tables:

```python
def _filter_ibis(
    table: ibis.Table,
    prompt: str,
    input_fields: Optional[List[str]] = None,
):
    indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
```

Uses `ibis.row_number()` to assign sequential row IDs that match LLM response indexing. 资料来源：[datatune/core/ibis/filter_ibis.py:28]()

## LLM Prompt Format

The Filter module constructs specialized prompts to guide LLM responses for filtering decisions.

### Prompt Structure

```
FILTERING CRITERIA:
<prompt>

[input record dictionary]

DECISION: Your response MUST be the entire input record as Python dictionary in the format:
index=<row_index>|{key1: value1, ...}<endofrow> with added key called '__filter__' 
with value either True to KEEP the record or False to REMOVE it.
```

### Output Format

The LLM must return records with an added `__filter__` key:

```python
index=0|{'Name': 'Acme Corp', 'Country': 'USA', '__filter__': True}<endofrow>
index=1|{'Name': 'Bad Inc', 'Country': 'UK', '__filter__': False}<endofrow>
```

#### Response Requirements

| Requirement | Description |
|-------------|-------------|
| Index prefix | Each response starts with `index=<row_index>\|` |
| Complete record | Include all original keys plus `__filter__` |
| Boolean value | `__filter__` must be `True` (keep) or `False` (remove) |
| Missing values | Set to `None` if column value does not exist |

资料来源：[datatune/core/dask/filter_dask.py:14-19]()

## Semantic Deduplication Integration

When `clusters` parameter is provided, filter decisions propagate from canonical records to their duplicates.

### Cluster Structure

```python
clusters = [
    {
        "canonical_id": 5,
        "duplicate_ids": [12, 45, 78]
    },
    ...
]
```

### Propagation Logic

```mermaid
graph TD
    A[Clusters Input] --> B[Build dup_to_canon Map]
    B --> C[Extract Canonical Indices]
    C --> D[Filter Only Canonical Rows]
    D --> E[LLM Inference on Canonical]
    E --> F[Propagate Decisions to Duplicates]
    F --> G[Merge Results]
```

The mapping builds:
```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}
```

资料来源：[datatune/core/dask/filter_dask.py:41-46]()

## Usage Examples

### Basic Dask Filtering

```python
import datatune as dt
import dask.dataframe as dd
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Keep only electronics products
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name", "Description"]
)(llm, df)

result = dt.finalize(filtered)
```

### Ibis Table Filtering

```python
import datatune as dt
import ibis

con = ibis.duckdb.connect("data.duckdb")
table = con.table("customers")

# Filter using natural language
filtered = dt.filter(
    prompt="Keep only customers from African countries",
    input_fields=["Country", "Region"]
)(llm, table)
```

### With Semantic Deduplication

```python
# First deduplicate to find clusters
deduplicator = dt.SemanticDeduplicator(llm=llm)
clusters = deduplicator.find_duplicates(df, columns=["Name", "Address"])

# Filter with cluster awareness
filtered = dt.filter(
    prompt="Keep only verified organizations",
    input_fields=["Status", "VerificationDate"],
    clusters=clusters
)(llm, df)
```

## Error Handling

The `parse_filter_output()` function handles various error cases:

| Error Type | Handling | Return Value |
|------------|----------|--------------|
| Parse failure | Returns `None` if `err=False` | `None` |
| Exception | Returns exception if `err=True` | Original exception |
| Empty output | Returns `None` | `None` |
| Malformed dict | Falls back to `None` | `None` |

资料来源：[datatune/core/dask/filter_dask.py:64-79]()

## Rate Limiting

Filter operations respect LLM rate limits defined in `datatune/llm/model_rate_limits.py`:

| Model | TPM (tokens/min) | RPM (requests/min) |
|-------|------------------|---------------------|
| gpt-3.5-turbo | 200,000 | 500 |
| gpt-4 | 10,000 | 500 |
| gpt-4-turbo | 30,000 | 500 |

The batching system automatically respects these limits during batch LLM inference.

## Performance Considerations

### Batching Strategy

Rows are batched for LLM inference to optimize throughput:

```python
# From datatune/llm/llm.py
for i, prompt in enumerate(input_rows):
    # Batch prompts based on token limits
    q  # Accumulate until batch size reached
```

### Partition Parallelism (Dask)

Dask DataFrames process partitions in parallel:

```mermaid
graph LR
    A[Partition 0] --> E[LLM Batch]
    B[Partition 1] --> F[LLM Batch]
    C[Partition 2] --> G[LLM Batch]
    E --> H[Results Merged]
    F --> H
    G --> H
```

### Backend Selection

| Backend | Best For | Limitations |
|---------|----------|-------------|
| Dask | Large datasets, local processing | Memory-bound |
| Ibis | Database pushdown, SQL backends | Requires database connection |

## See Also

- [Map Operations](./Map.md) - Creating new columns via LLM
- [Semantic Deduplication](./SemanticDeduplication.md) - Finding duplicate records
- [Agent](./Agent.md) - Automatic operation planning
- [LLM Configuration](../llm/llm.md) - LLM setup and backends

---

<a id='page-reduce-operations'></a>

## Reduce Operations

### 相关页面

相关主题：[Map Operations](#page-map-operations), [Filter Operations](#page-filter-operations)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/reduce.py](https://github.com/vitalops/datatune/blob/main/datatune/core/reduce.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
</details>

# Reduce Operations

## Overview

Reduce Operations in datatune provide a mechanism for aggregating and condensing data using configurable action handlers. The reduce system follows a plugin-based architecture where specific aggregation or reduction behaviors are registered as actions and invoked through a unified interface.

The core purpose of reduce operations is to transform a DataFrame into aggregated results by applying registered reduction actions. This enables users to perform complex aggregation tasks that go beyond standard group-by operations, leveraging LLM-powered semantic understanding when needed.

资料来源：[datatune/core/reduce.py:1-22]()

## Architecture

### Action Registry Pattern

The reduce system implements a decorator-based registry pattern for action registration:

```mermaid
graph TD
    A[User calls reduce df action] --> B[reduce function invoked]
    B --> C[get_action looks up registry]
    C --> D{Action found?}
    D -->|Yes| E[Instantiate action class]
    D -->|No| F[Raise ValueError]
    E --> G[Execute reduction on DataFrame]
    G --> H[Return aggregated result]
```

The `_ACTIONS` dictionary serves as a central registry mapping action names to their implementation classes. The `register_action` decorator allows new actions to be dynamically registered without modifying core logic.

资料来源：[datatune/core/reduce.py:1-22]()

### Class Diagram

```mermaid
classDiagram
    class _ACTIONS {
        Dict[str, Type]
    }
    
    class register_action {
        +decorator(name: str)
    }
    
    class get_action {
        +function(name: str) Type
    }
    
    class reduce {
        +function(df, action, **kwargs)
    }
    
    register_action ..> _ACTIONS : registers
    get_action ..> _ACTIONS : queries
    reduce ..> get_action : uses
```

## API Reference

### Main Function

```python
def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)   
    return reducer(df)
```

#### Parameters

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `df` | `dask.dataframe.DataFrame` | Yes | The input Dask DataFrame to reduce |
| `action` | `str` | Yes | The name of the registered reduction action to execute |
| `**kwargs` | Various | No | Additional keyword arguments passed to the action constructor |

#### Returns

| Return Type | Description |
|-------------|-------------|
| Any | Returns the result of the specific reduction action (varies by registered action) |

#### Exceptions

| Exception | Condition |
|-----------|------------|
| `ValueError` | When the specified action name is not registered |

资料来源：[datatune/core/reduce.py:19-22]()

### Register Action Decorator

```python
def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator
```

#### Usage Pattern

```python
@register_action("my_reducer")
class MyReducer:
    def __init__(self, **kwargs):
        # Initialize reducer configuration
        pass
    
    def __call__(self, df):
        # Perform reduction
        return result
```

### Get Action Function

```python
def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")
```

## Workflow Execution

### Execution Flow

```mermaid
graph TD
    A[Start] --> B[Import reduce module]
    B --> C[Call reduce df action]
    C --> D[get_action retrieves class]
    D --> E[Instantiate with kwargs]
    E --> F[Call reducer on DataFrame]
    F --> G[Return result]
    G --> H[End]
```

### Integration with Datatune Core

The reduce operation integrates with other datatune components through the public API:

```python
import datatune as dt

# reduce is exported from datatune namespace
result = dt.reduce(df, action="specific_action", param=value)
```

资料来源：[datatune/__init__.py:1-7]()

## Registered Actions

Based on the repository structure, reduce operations support extensibility through custom registered actions. The system is designed to work with Dask DataFrames and can integrate with:

- **SemanticDeduplicator**: For deduplication using embedding-based similarity matching
- **Custom aggregations**: User-defined reduction logic via the decorator pattern

### SemanticDeduplicator Integration

While primarily a deduplication tool, the `SemanticDeduplicator` class in `datatune/core/deduplication.py` demonstrates how reduce-style operations work:

```python
class SemanticDeduplicator:
    def __init__(
        self,
        llm,
        embedding_model: str = "text-embedding-3-small",
        sim_threshold: float = 0.90,
        top_k: int = 50,
        hnsw_m: int = 32,
        ef_search: int = 64,
        return_df: bool = False,
    ):
        # Configuration initialization
```

The deduplicator processes data through embedding generation, FAISS index building, and similarity search phases, demonstrating how reduce operations handle complex multi-stage transformations.

资料来源：[datatune/core/deduplication.py:1-150]()

## Extension Points

### Creating Custom Reduce Actions

To create a custom reduction action:

1. Define a class with appropriate `__init__` and `__call__` methods
2. Decorate with `@register_action("action_name")`
3. The class should accept `**kwargs` for flexible configuration
4. Implement `__call__(self, df)` to receive the DataFrame and return the result

```python
from datatune.core.reduce import register_action

@register_action("custom_aggregate")
class CustomAggregateReducer:
    def __init__(self, group_by_column, agg_column, operation="sum", **kwargs):
        self.group_by_column = group_by_column
        self.agg_column = agg_column
        self.operation = operation
    
    def __call__(self, df):
        # Custom reduction logic
        return result_df
```

### Best Practices

| Practice | Rationale |
|----------|-----------|
| Use descriptive action names | Ensures clear identification in `get_action` calls |
| Accept `**kwargs` in `__init__` | Maintains compatibility with the reduce function signature |
| Return consistent types | Enables predictable downstream processing |
| Handle empty DataFrames | Prevents errors during edge case processing |

## Related Components

| Component | File | Relationship |
|-----------|------|--------------|
| Filter | `datatune/core/dask/filter_dask.py` | Similar pattern for row-level operations |
| Map | `datatune/core/dask/map_dask.py` | Column transformation counterpart |
| Finalize | `datatune/core/dask/op.py` | Completes lazy Dask computations |
| Agent | `datatune/agent/agent.py` | Orchestrates multi-step transformations |

资料来源：[datatune/core/dask/filter_dask.py:1-50]()
资料来源：[datatune/core/dask/map_dask.py:1-50]()

## Summary

Reduce Operations provide a flexible, extensible mechanism for aggregating and transforming data in datatune. The action registry pattern enables:

- **Dynamic action registration** via the `@register_action` decorator
- **Centralized action lookup** through the `get_action` function
- **Unified API** through the `reduce` function interface
- **Custom extensibility** for domain-specific reduction logic

The architecture follows proven design patterns that separate action registration from execution, allowing the core system to remain stable while supporting diverse reduction behaviors through plugin-style extensions.

---

<a id='page-dask-backend'></a>

## Dask Backend

### 相关页面

相关主题：[Ibis Backend](#page-ibis-backend), [System Architecture](#page-architecture)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/op.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/op.py)
- [datatune/core/dask/constants.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/constants.py)
- [datatune/core/map.py](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)
- [datatune/core/filter.py](https://github.com/vitalops/datatune/blob/main/datatune/core/filter.py)
</details>

# Dask Backend

The Dask Backend is the primary data processing engine in datatune, enabling large-scale, distributed data transformations using Large Language Models (LLMs). It extends the popular Dask library with semantic understanding capabilities, allowing users to perform complex data operations through natural language prompts while maintaining Dask's efficient parallel and out-of-core computation model.

## Architecture Overview

The Dask Backend follows a modular architecture with three main layers:

1. **API Layer** (`datatune/core/map.py`, `datatune/core/filter.py`) - Entry points that dispatch to appropriate backends
2. **Operation Layer** (`datatune/core/dask/`) - Core transformations for Dask DataFrames
3. **LLM Integration Layer** - Handles communication with language models for semantic operations

```mermaid
graph TD
    A[User Data] --> B[Dask DataFrame]
    B --> C{Operation Type}
    C -->|Map| D[map_dask.py]
    C -->|Filter| E[filter_dask.py]
    D --> F[LLM API]
    E --> F
    F --> G[Parse & Validate]
    G --> H[New Columns Added]
    H --> I[Lazy Dask DataFrame]
    I --> J[finalize]
    J --> K[Computed Result]
    
    L[Clusters] -.->|Optional| D
    L -.->|Optional| E
```

## Core Operations

### Map Operation

The map operation uses an LLM to generate new columns from existing data through natural language understanding. It is designed for semantic extraction, classification, and content transformation tasks.

**Source**: `datatune/core/dask/map_dask.py`

#### How Map Works

```mermaid
graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[Cluster Deduplication]
    C --> D[Batch for LLM]
    D --> E[LLM Processing]
    E --> F[Parse Output]
    F --> G[Expand to Columns]
    G --> H[Copy to Duplicates]
    H --> I[Output DataFrame]
```

#### Key Components

| Component | Description |
|-----------|-------------|
| `serialized_input_column` | Temporary column storing row data as Python dictionaries |
| `llm_output_column` | Temporary column storing raw LLM responses |
| `clusters` | Optional list of duplicate clusters for deduplication |

#### Processing Flow

1. Each row is serialized to a dictionary format
2. If clusters are provided, duplicate handling is applied - only canonical rows are sent to LLM
3. LLM receives batched prompts with mapping instructions
4. Responses are parsed and new columns are extracted
5. Results are propagated back to duplicate rows

**Key code snippet from map_dask.py**:
```python
df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

This ensures duplicate handling by copying canonical results to all duplicate rows.

资料来源：[datatune/core/dask/map_dask.py:34-48]()

### Filter Operation

The filter operation uses an LLM to determine which rows to keep or remove based on natural language criteria.

**Source**: `datatune/core/dask/filter_dask.py`

#### Filter Output Format

The LLM returns decisions in a specific format with an added `__filter__` key:

```python
index=<row_index>|{key1: value1, ..., '__filter__': True/False}<endofrow>
```

| Decision | Behavior |
|----------|----------|
| `True` | Keep the row |
| `False` | Remove the row |

#### Processing Pipeline

```mermaid
graph TD
    A[Input DataFrame] --> B[Serialize to Dictionaries]
    B --> C[Apply Cluster Deduplication]
    C --> D[Send to LLM with Filter Prompt]
    D --> E[Parse __filter__ Value]
    E --> F[Filter DataFrame]
    F --> G[Output DataFrame]
```

**Key code snippet for filter decision parsing**:
```python
suffix = (
    f"{os.linesep}{os.linesep}"
    "DECISION:Your response MUST be the entire input record as Python dictionary..."
    "ALWAYS STICK TO THE FORMAT index=<row_index>|{...} with added key called '__filter__'..."
)
```

资料来源：[datatune/core/dask/filter_dask.py:24-35]()

### Finalize Operation

The `finalize` operation computes the lazy Dask DataFrame and converts it to a pandas DataFrame.

**Source**: `datatune/core/dask/op.py`

```python
def finalize(df: dd.DataFrame) -> pd.DataFrame:
    """Compute the lazy Dask DataFrame and return a pandas DataFrame."""
    return df.compute()
```

资料来源：[datatune/core/dask/op.py]()

## Semantic Deduplication

Both map and filter operations support semantic deduplication to reduce LLM API calls and costs.

**Source**: `datatune/core/deduplication.py`

### How Deduplication Works

```mermaid
graph TD
    A[Input Series] --> B[Find Duplicate Clusters]
    B --> C{dup_to_canon mapping}
    C --> D[Extract Canonical Index]
    D --> E[Send Only Canonical to LLM]
    E --> F[Copy Results to Duplicates]
    
    style C fill:#f9f,color:#000
```

### Cluster Format

```python
clusters = [
    {"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
    {"canonical_id": 10, "duplicate_ids": [15, 20]}
]
```

This approach sends only unique/canonical rows to the LLM, then propagates results to all duplicates.

资料来源：[datatune/core/dask/map_dask.py:34-40]()

## API Reference

### Map Function

```python
from datatune.core.map import map

result = map(
    prompt="Extract categories from the description",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"],
    clusters=None  # Optional deduplication clusters
)(llm, dask_dataframe)
```

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `prompt` | str | Yes | Natural language instructions for column generation |
| `output_fields` | List[str] | Yes | Names of new columns to create |
| `input_fields` | List[str] | No | Columns to use as input (defaults to all) |
| `clusters` | List[Dict] | No | Duplicate clusters for deduplication |

### Filter Function

```python
from datatune.core.filter import filter

result = filter(
    prompt="Keep only electronics products",
    input_fields=["Name", "Description"],
    clusters=None  # Optional deduplication clusters
)(llm, dask_dataframe)
```

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `prompt` | str | Yes | Natural language criteria for filtering |
| `input_fields` | List[str] | No | Columns to evaluate against |
| `clusters` | List[Dict] | No | Duplicate clusters for deduplication |

### Finalize Function

```python
from datatune.core.dask.op import finalize

pandas_df = finalize(lazy_dask_dataframe)
```

资料来源：[datatune/core/dask/op.py]()

## LLM Output Format

The Dask Backend requires LLM responses in a specific parseable format for reliable operation.

### Map Output Format

```
index=<row_index>|{key1: value1, key2: value2, ..., 'new_field1': val, 'new_field2': val}<endofrow>
```

### Filter Output Format

```
index=<row_index>|{key1: value1, key2: value2, ..., '__filter__': True/False}<endofrow>
```

### Parsing Rules

| Rule | Description |
|------|-------------|
| String values | Must be enclosed in double quotes |
| Missing values | Set to `None` or `null` |
| Index | Must match row index exactly |
| Format | Must be valid Python literal (dictionary) |

资料来源：[datatune/core/dask/map_dask.py:20-30]()

## Usage Examples

### Example 1: Category Extraction

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Extract categories using natural language
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# Save results
result = dt.finalize(mapped)
result.compute().to_csv("categorized_products.csv")
```

资料来源：[README.md]()

### Example 2: Text-Based Filtering

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("companies.csv")

# Filter based on semantic criteria
filtered = dt.filter(
    prompt="Keep only organizations in Africa",
    input_fields=["Country", "Description"]
)(llm, df)

result = dt.finalize(filtered)
```

### Example 3: With Deduplication

```python
# Assuming clusters from semantic deduplication step
clusters = [
    {"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
    {"canonical_id": 10, "duplicate_ids": [15, 20]}
]

mapped = dt.map(
    prompt="Classify the industry sector",
    output_fields=["Industry", "Sector"],
    input_fields=["CompanyName", "Description"],
    clusters=clusters
)(llm, df)
```

## Internal Data Flow

### Serialization Process

The backend converts DataFrame rows to dictionary format for LLM processing:

```mermaid
graph LR
    A[DataFrame Row] --> B[Convert to Dict]
    B --> C[Replace NaN with None]
    C --> D["String: {col1: val1, col2: val2}"]
    D --> E[LLM Prompt]
```

**Implementation from map_dask.py**:
```python
df = df.astype(object).where(df.notna(), None)
df[serialized_input_column] = [
    str(row.to_dict()) for _, row in df.iterrows()
]
```

资料来源：[datatune/core/dask/map_dask.py:50-56]()

## Error Handling

The backend includes robust error handling for LLM parsing failures:

```python
def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
    """
    Parses the LLM output string into a Python dictionary.
    """
    # Returns dict on success, Exception on failure
```

Failed parsing attempts return empty dictionaries `{}`, ensuring the pipeline continues rather than crashing.

## Backend Dispatch Mechanism

The system automatically dispatches to the Dask backend when a Dask DataFrame is detected:

```mermaid
graph TD
    A[Input Data] --> B{Is Dask DataFrame?}
    B -->|Yes| C[Dask Backend]
    B -->|No| D{Is Ibis Table?}
    D -->|Yes| E[Ibis Backend]
    D -->|No| F[TypeError]
    
    C --> G[_map_dask / _filter_dask]
    E --> H[_map_ibis / _filter_ibis]
```

**Dispatch logic from core/map.py**:
```python
def _is_dask_df(obj):
    try:
        import dask.dataframe as dd
        return isinstance(obj, dd.DataFrame)
    except ImportError:
        return False
```

资料来源：[datatune/core/map.py:1-17]()

## Configuration Considerations

### Memory Efficiency

The Dask backend processes data partition by partition, making it suitable for datasets larger than memory. Consider:

| Setting | Recommendation |
|---------|----------------|
| Dask partitions | Set based on available memory |
| Batch size | Adjust based on LLM rate limits |
| Clusters | Use for high-duplicate datasets |

### LLM Rate Limits

The backend implements batching to respect LLM API rate limits while maximizing throughput.

## Summary

The Dask Backend provides a powerful, scalable mechanism for performing LLM-powered data transformations on Dask DataFrames. Key capabilities include:

- **Map**: Generate new columns through semantic understanding
- **Filter**: Remove rows based on natural language criteria  
- **Deduplication**: Reduce costs by processing only canonical rows
- **Lazy Evaluation**: Maintain Dask's efficient computation model until finalization

All operations follow a consistent pattern: serialize data, send to LLM with clear instructions, parse responses, and update the DataFrame.

---

<a id='page-ibis-backend'></a>

## Ibis Backend

### 相关页面

相关主题：[Dask Backend](#page-dask-backend), [System Architecture](#page-architecture)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
</details>

# Ibis Backend

## Overview

The Ibis Backend in datatune provides integration with the Ibis project, enabling LLM-powered data transformations on various database backends including DuckDB, PostgreSQL, BigQuery, and more. This backend extends datatune's core functionality (map, filter) to work with Ibis tables, leveraging the composability and SQL-generation capabilities of Ibis while incorporating LLM-based semantic transformations.

The Ibis Backend serves as a critical abstraction layer that:

- Accepts Ibis table objects as input
- Serializes table data for LLM consumption
- Executes batch inference operations via LLM calls
- Parses and applies LLM outputs back to the table structure
- Returns transformed Ibis tables for continued processing

资料来源：[datatune/core/ibis/filter_ibis.py:1-20](datatune/core/ibis/filter_ibis.py)
资料来源：[datatune/core/ibis/map_ibis.py:1-25](datatune/core/ibis/map_ibis.py)

## Architecture

### System Components

The Ibis Backend consists of several interconnected components that work together to provide seamless LLM-powered transformations:

```mermaid
graph TD
    A[Ibis Table Input] --> B[add_serialized_col]
    B --> C[llm_batch_inference]
    C --> D[LLM API Call]
    D --> E[Parse LLM Output]
    E --> F[apply_llm_updates / Filter Logic]
    F --> G[Transformed Ibis Table]
    
    H[_map_ibis Class] --> I[Map Operations]
    J[_filter_ibis Class] --> K[Filter Operations]
```

### Component Responsibilities

| Component | File | Purpose |
|-----------|------|---------|
| `add_serialized_col` | filter_ibis.py, map_ibis.py | Serializes Ibis table columns into JSON strings for LLM input |
| `llm_batch_inference` | filter_ibis.py, map_ibis.py | Manages batch inference with LLM, handles retries and error cases |
| `_map_ibis` | map_ibis.py | Handles mapping operations (creating new columns via LLM) |
| `_filter_ibis` | filter_ibis.py | Handles filtering operations (row selection via LLM) |

资料来源：[datatune/core/ibis/filter_ibis.py:11-32](datatune/core/ibis/filter_ibis.py)
资料来源：[datatune/core/ibis/map_ibis.py:12-38](datatune/core/ibis/map_ibis.py)

## Core Functions

### add_serialized_col

The `add_serialized_col` function transforms Ibis table columns into a serialized JSON format suitable for LLM processing. This function is fundamental to both map and filter operations.

**Function Signature:**

```python
def add_serialized_col(table, target_col: str, input_fields: Optional[List[str]] = []):
```

**Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `table` | `ibis.Table` | The input Ibis table |
| `target_col` | `str` | Name of the column to store serialized data |
| `input_fields` | `Optional[List[str]]` | List of column names to include in serialization. If empty, includes all columns except target |

**Behavior:**

1. Filters columns to include only relevant fields (excluding target column and non-selected fields)
2. Constructs a JSON string expression by concatenating column names and values
3. Returns a new Ibis table with the serialized column added via `mutate()`

资料来源：[datatune/core/ibis/map_ibis.py:40-53](datatune/core/ibis/map_ibis.py)
资料来源：[datatune/core/ibis/filter_ibis.py:37-51](datatune/core/ibis/filter_ibis.py)

### llm_batch_inference

The `llm_batch_inference` function manages the core LLM interaction for transforming data. It handles the complete workflow from prompt construction to result collection.

**Function Signature:**

```python
def llm_batch_inference(
    table,
    llm: Callable, 
    input_col: str, 
    output_col: str,   
    prompt: str,
    expected_new_fields: list[str] = []
):
```

**Parameters:**

| Parameter | Type | Description |
|-----------|------|-------------|
| `table` | `ibis.Table` | The input Ibis table |
| `llm` | `Callable` | LLM callable (e.g., OpenAI, Ollama instance) |
| `input_col` | `str` | Column containing serialized input data |
| `output_col` | `str` | Column name for storing LLM output |
| `prompt` | `str` | Transformation prompt for the LLM |
| `expected_new_fields` | `list[str]` | Optional list of expected output field names |

**Workflow:**

```mermaid
sequenceDiagram
    participant Table as Ibis Table
    participant Func as llm_batch_inference
    participant LLM as LLM API
    participant Parse as Output Parser
    
    Table->>Func: Execute table.select() to local DataFrame
    Func->>Table: Get input_list from input_col
    Func->>LLM: Call llm(input_list, prefix, prompt, suffix)
    LLM-->>Func: Raw LLM responses
    Func->>Parse: Process each response
    Parse-->>Func: Validated JSON strings
    Func->>Table: Create mapping DataFrame
    Table-->>Func: Updated table with output_col
```

资料来源：[datatune/core/ibis/map_ibis.py:55-82](datatune/core/ibis/map_ibis.py)
资料来源：[datatune/core/ibis/filter_ibis.py:54-78](datatune/core/ibis/filter_ibis.py)

## Map Operations

### _map_ibis Class

The `_map_ibis` class implements the mapping transformation for Ibis tables, enabling the creation of new columns based on LLM-driven transformations.

**Class Definition:**

```python
class _map_ibis:
    def __init__(
        self,
        prompt: str,
        input_fields: Optional[List[str]] = None,
        output_fields: Optional[List[str]] = None,
    ):
        self.prompt = prompt
        self.input_fields = input_fields or []
        self.output_fields = output_fields or []
        self.llm_output_column = "MAP_LLM_OUTPUT__DATATUNE__"
        self.serialized_input_column = "MAP_SERIALIZED_INPUT__DATATUNE__"
```

**Key Attributes:**

| Attribute | Type | Default | Description |
|-----------|------|---------|-------------|
| `prompt` | `str` | Required | Natural language prompt describing the transformation |
| `input_fields` | `Optional[List[str]]` | `[]` | Columns to use as input for transformation |
| `output_fields` | `Optional[List[str]]` | `[]` | Names of new columns to create |
| `llm_output_column` | `str` | System-generated | Internal column for LLM response storage |
| `serialized_input_column` | `str` | System-generated | Internal column for serialized input |

**Execution Flow:**

1. Validates that all `input_fields` exist in the table schema
2. Creates serialized input column via `add_serialized_col`
3. Executes batch inference via `llm_batch_inference`
4. Applies LLM updates to create output columns
5. Selects final columns (removing internal columns)

资料来源：[datatune/core/ibis/map_ibis.py:122-160](datatune/core/ibis/map_ibis.py)

### Prompt Format for Map

The LLM receives structured prompts for mapping operations:

```
Map and transform the input according to the mapping criteria below.
Replace or Create new fields or values as per the prompt.
Expected new fields: [output_fields]

MAPPING CRITERIA:
{prompt}

Your response MUST be the entire input record as a valid Python dictionary
in the format 'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys
of expected new fields if any.
```

资料来源：[datatune/core/ibis/map_ibis.py:56-75](datatune/core/ibis/map_ibis.py)

## Filter Operations

### _filter_ibis Class

The `_filter_ibis` class implements filtering transformations for Ibis tables, enabling row-level selection based on LLM-driven criteria.

**Class Definition:**

```python
class _filter_ibis:
    def __init__(
        self,
        prompt: str,
        input_fields: Optional[List[str]] = None,
    ):
        self.prompt = prompt
        self.input_fields = input_fields or []
        self.llm_output_column = "FILTER_LLM_OUTPUT__DATATUNE__"
        self.serialized_input_column = "FILTER_SERIALIZED_INPUT__DATATUNE__"
```

**Key Attributes:**

| Attribute | Type | Default | Description |
|-----------|------|---------|-------------|
| `prompt` | `str` | Required | Natural language criteria for filtering |
| `input_fields` | `Optional[List[str]]` | `[]` | Columns to consider for filtering decision |
| `llm_output_column` | `str` | System-generated | Internal column for LLM response storage |
| `serialized_input_column` | `str` | System-generated | Internal column for serialized input |

**Execution Flow:**

```mermaid
graph TD
    A[Input Ibis Table] --> B[Add Serialized Column]
    B --> C[Execute to Local DataFrame]
    C --> D[LLM Batch Inference]
    D --> E[Parse Filter Decisions]
    E --> F[Extract __filter__ Flag]
    F --> G[Apply Boolean Filter]
    G --> H[Final Ibis Table]
```

资料来源：[datatune/core/ibis/filter_ibis.py:80-120](datatune/core/ibis/filter_ibis.py)

### Prompt Format for Filter

The LLM receives structured prompts for filtering operations:

```
You are filtering a dataset. Your task is to determine whether each data record
should be KEPT or REMOVED based on the filtering criteria below.
Return the entire input data record with an added key called '__filter__' with
value either True to KEEP the record or False to REMOVE it.

FILTERING CRITERIA:
{prompt}

Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called
'__filter__' with value either True to KEEP the record or False to REMOVE it.
```

资料来源：[datatune/core/ibis/filter_ibis.py:56-68](datatune/core/ibis/filter_ibis.py)

## Output Parsing

### parse_filter_output

The `parse_filter_output` function extracts boolean filter decisions from LLM responses.

**Function Signature:**

```python
def parse_filter_output(
    output: Union[str, Exception], err: bool = True
) -> Optional[bool]:
```

**Parsing Logic:**

1. Attempts to evaluate the LLM output as a Python literal using `ast.literal_eval`
2. Extracts the last key-value pair from the dictionary
3. Returns the boolean value associated with the `__filter__` key
4. Returns `None` if parsing fails

资料来源：[datatune/core/ibis/filter_ibis.py:104-130](datatune/core/ibis/filter_ibis.py)

### Output Format Requirements

The Ibis Backend enforces strict output format requirements for LLM responses:

| Requirement | Format | Example |
|-------------|--------|---------|
| Index prefix | `index=<row_index>\|` | `index=0\|` |
| Data format | Python dictionary literal | `{'col1': 'value1', 'col2': 'value2'}` |
| String quoting | Double quotes only | `"string"` not `'string'` |
| Null values | `None` | `{'col': None}` |
| Row terminator | `<endofrow>` | `index=0\|{...}<endofrow>` |

资料来源：[datatune/core/ibis/map_ibis.py:70-75](datatune/core/ibis/map_ibis.py)
资料来源：[datatune/core/ibis/filter_ibis.py:64-67](datatune/core/ibis/filter_ibis.py)

## Usage Examples

### Basic Map Operation

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import ibis

# Connect to DuckDB
con = ibis.duckdb.connect("data.duckdb")
table = con.table("products")

# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# Create mapping operation
mapped = dt.map(
    prompt="Extract product category and subcategory from description",
    input_fields=["description", "name"],
    output_fields=["Category", "Subcategory"]
)(llm, table)
```

### Basic Filter Operation

```python
import datatune as dt
from datatune.llm.llm import Ollama

# Connect to PostgreSQL via Ibis
con = ibis.postgres.connect(...)
table = con.table("orders")

# Initialize local LLM
llm = Ollama(model_name="gemma3:4b")

# Create filter operation
filtered = dt.filter(
    prompt="Keep only orders from enterprise customers with value over $10,000",
    input_fields=["customer_type", "order_value"]
)(llm, table)
```

## Error Handling

### Schema Validation

The Ibis Backend performs input schema validation before processing:

```python
def __call__(self, llm: Callable, table: Table) -> Table:
    if self.input_fields:
        missing = [f for f in self.input_fields if f not in table.columns]
        if missing:
            error_msg = (
                f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. "
                f"Available columns: {list(table.columns)}"
            )
            raise ValueError(error_msg)
```

**Validation Rules:**

| Check | Condition | Action on Failure |
|-------|-----------|-------------------|
| Input fields exist | `input_fields` subset of `table.columns` | Raise `ValueError` |
| LLM call success | No exception from `llm()` | Return error string |
| Output parsing | Valid Python literal | Return empty dict `{}` |

资料来源：[datatune/core/ibis/map_ibis.py:130-140](datatune/core/ibis/map_ibis.py)

## Internal Column Naming

The Ibis Backend uses prefixed column names to avoid conflicts with user data:

| Operation | Serialized Column | Output Column |
|-----------|-------------------|---------------|
| Map | `MAP_SERIALIZED_INPUT__DATATUNE__` | `MAP_LLM_OUTPUT__DATATUNE__` |
| Filter | `FILTER_SERIALIZED_INPUT__DATATUNE__` | `FILTER_LLM_OUTPUT__DATATUNE__` |

These columns are automatically removed from the final output table, ensuring clean results.

## Integration with Core API

The Ibis Backend is invoked through the unified datatune API via the `filter.py` module:

```python
def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_ibis_table(data):
            from .ibis.filter_ibis import _filter_ibis
            return _filter_ibis(
                prompt=prompt,
                input_fields=input_fields,
            )(llm, data)
        # ... other backends
```

资料来源：[datatune/core/filter.py:15-28](datatune/core/filter.py)

## Supported Database Backends

The Ibis Backend supports any database backend compatible with Ibis:

| Backend | Connection Method | Example |
|---------|-------------------|---------|
| DuckDB | `ibis.duckdb.connect()` | Local analytical queries |
| PostgreSQL | `ibis.postgres.connect()` | Enterprise data |
| BigQuery | `ibis.bigquery.connect()` | Cloud data warehousing |
| SQLite | `ibis.sqlite.connect()` | Lightweight embedded DB |
| MySQL | `ibis.mysql.connect()` | Web applications |

## Performance Considerations

### Batch Processing

The Ibis Backend processes data in batches determined by:

1. **Local execution boundary**: Data is materialized to a local DataFrame via `.execute()`
2. **LLM rate limits**: Respects TPM (tokens per minute) and RPM (requests per minute) limits
3. **Memory constraints**: Batch sizes are controlled by the underlying LLM integration

### Optimization Strategies

| Strategy | Description | Applicable Operation |
|----------|-------------|---------------------|
| Select specific fields | Use `input_fields` to minimize data transfer | Map, Filter |
| Pushdown predicates | Filter data in the database before LLM processing | Filter |
| Column pruning | Remove unused columns early in the pipeline | Map |

## See Also

- [Dask Backend](../dask/backend.md) - Alternative backend for distributed computing
- [LLM Integrations](../../llm/llm.md) - Supported LLM providers
- [Core API](../core/api.md) - Unified transformation API

---

<a id='page-llm-integration'></a>

## LLM Integration

### 相关页面

相关主题：[Agent System](#page-agent-system), [Map Operations](#page-map-operations), [Filter Operations](#page-filter-operations)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
- [datatune/llm/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/__init__.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
</details>

# LLM Integration

The LLM Integration module in datatune provides a unified abstraction layer for interfacing with various Large Language Model providers. It handles rate limiting, token management, request batching, and provider-specific configurations, enabling consistent data transformation operations across different LLM backends.

## Architecture Overview

The LLM module uses a class hierarchy where a base `LLM` class provides common functionality, while provider-specific subclasses handle individual vendor implementations.

```mermaid
graph TD
    A[LLM Base Class] --> B[OpenAI]
    A --> C[Ollama]
    A --> D[VLLM]
    A --> E[Azure]
    A --> F[Mistral]
    A --> G[Huggingface]
    A --> H[Gemini]
    
    I[litellm] --> A
    J[Rate Limits] --> A
    K[Token Counter] --> A
```

## Core Components

### LLM Base Class

The `LLM` class serves as the foundation for all provider implementations. It manages rate limiting, token counting, and prompt batching.

#### Initialization Parameters

| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `model_name` | `str` | Full model identifier (provider/model format) | Required |
| `rpm` | `int` | Requests per minute limit | Provider default |
| `tpm` | `int` | Tokens per minute limit | Provider default |
| `max_tokens` | `int` | Maximum tokens in response | Provider default |
| `temperature` | `float` | Sampling temperature | `0.0` |

#### Key Methods

| Method | Description |
|--------|-------------|
| `_completion(prompt)` | Execute a single completion request |
| `_create_batched_prompts(input_rows, user_batch_prefix, prompt_per_row)` | Create optimized batched prompts |
| `get_max_tokens()` | Retrieve max tokens for current model |

资料来源：[datatune/llm/llm.py:40-90]()

### Provider Implementations

#### OpenAI

```python
from datatune.llm.llm import OpenAI

llm = OpenAI(
    model_name="gpt-3.5-turbo",
    api_key="your-api-key"
)
```

Configures models in the `openai/model-name` format for LiteLLM compatibility.

资料来源：[datatune/llm/llm.py:95-102]()

#### Ollama (Local)

```python
from datatune.llm.llm import Ollama

llm = Ollama(
    model_name="gemma3:4b",
    api_base="http://localhost:11434"
)
```

Enables local model inference with Ollama. The `api_base` defaults to `http://localhost:11434`.

资料来源：[datatune/llm/llm.py:74-82]()

#### VLLM

```python
from datatune.llm.llm import VLLM

llm = VLLM(
    model_name="your-model",
    api_base="http://localhost:8000/v1"
)
```

Queries the `/models` endpoint to automatically determine `max_model_len` from the server.

资料来源：[datatune/llm/llm.py:104-122]()

#### Azure

```python
from datatune.llm.llm import Azure

llm = Azure(
    model_name="gpt-3.5-turbo",
    api_key="your-key",
    api_base="your-endpoint",
    api_version="2024-01-01"
)
```

Supports Azure OpenAI Service deployments with additional configuration options.

#### Mistral

```python
from datatune.llm.llm import Mistral

llm = Mistral(
    model_name="mistral-tiny",
    api_key="your-api-key"
)
```

Wraps Mistral AI models with automatic `mistral/` prefix handling.

#### Huggingface

```python
from datatune.llm.llm import Huggingface

llm = Huggingface(
    model_name="meta-llama/Llama-2-70b",
    api_key="your-api-key"
)
```

Provides access to Hugging Face Inference API models.

## Rate Limiting

The module enforces two types of rate limits to prevent API throttling:

| Limit Type | Description | Applied At |
|------------|-------------|------------|
| **TPM** | Tokens per minute | Per-request token counting |
| **RPM** | Requests per minute | Per-API-call counting |

### Model Rate Limits Configuration

Predefined limits for major models are stored in `datatune/llm/model_rate_limits.py`:

| Model | TPM | RPM |
|-------|-----|-----|
| `gpt-3.5-turbo` | 200,000 | 500 |
| `gpt-4` | 10,000 | 500 |
| `gpt-4-turbo` | 30,000 | 500 |
| `gpt-4o` | 30,000 | 500 |
| `gpt-4.1-mini` | 200,000 | 500 |
| `gpt-4.1-nano` | 200,000 | 500 |
| `gpt-4.5-preview` | 125,000 | 1,000 |

资料来源：[datatune/llm/model_rate_limits.py:1-90]()

### Fallback Behavior

When an unknown model is used, the system falls back to `gpt-3.5-turbo` limits and logs a warning:

```python
if self._base_model_name in model_rate_limits:
    model_limits = model_rate_limits[self._base_model_name]
else:
    model_limits = model_rate_limits[DEFAULT_MODEL]
    logger.warning(
        f"REQUESTS-PER-MINUTE limits for model '{model_name}' not found. "
        f"Defaulting to '{DEFAULT_MODEL}' limits"
    )
```

资料来源：[datatune/llm/llm.py:45-57]()

## Prompt Formatting

The LLM module enforces strict output format requirements for parsing reliability.

### Output Format Specification

All responses must follow this pattern:

```
index=<row_index>|{python_literal}<endofrow>
```

Supported Python literals:
- Lists: `index=0|['item1', 'item2', 'item3']<endofrow>`
- Dicts: `index=1|{'key1': 'value1', 'key2': 2}<endofrow>`
- Strings: `index=2|"This is a string answer"<endofrow>`

### Prompt Components

| Component | Purpose |
|-----------|---------|
| `prefix` | Instructions for row processing |
| `user_batch_prefix` | Custom user instructions |
| `suffix` | Output format requirements |

资料来源：[datatune/llm/llm.py:1-20]()

## Batch Processing

The batching system optimizes LLM usage by grouping multiple rows into single API requests while respecting token limits.

```mermaid
graph LR
    A[Input Rows] --> B[Token Counting]
    B --> C{Within Limits?}
    C -->|Yes| D[Batch Request]
    C -->|No| E[Split Batch]
    D --> F[LLM API]
    E --> D
    F --> G[Parse Output]
    G --> H[Row Results]
```

### Batching Logic

1. Calculate prefix/suffix token overhead using `token_counter`
2. Iterate through input rows, accumulating tokens
3. Split batches when approaching `max_tokens` limit
4. Track `nrows_per_api_call` for output reconstruction

资料来源：[datatune/llm/llm.py:30-70]()

## Integration with Core Modules

### Filter Operations

The filter module uses the LLM to evaluate row-level boolean conditions:

```python
def filter_rows(llm, df, input_column, prompt, output_column):
    prefix = "Row data:"
    suffix = "Your response MUST be... with added key '__filter__'"
    
    llm_out = llm(
        input_series,
        prefix,
        prompt,
        suffix,
        optimized=True
    )
```

资料来源：[datatune/core/dask/filter_dask.py:15-40]()

### Map Operations

The map module leverages the LLM for column transformations:

```python
def map_rows(llm, df, input_columns, prompt, output_columns):
    suffix = "Your response MUST be... with added keys of expected new fields"
    
    llm_out = llm(
        canonical_input,
        prefix,
        prompt,
        suffix,
        optimized=True
    )
```

资料来源：[datatune/core/dask/map_dask.py:20-45]()

## Usage Example

```python
import datatune as dt
from datatune.llm.llm import OpenAI

# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# Use with datatune operations
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from description",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description"]
)(llm, df)

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Category"]
)(llm, mapped)
```

## Module Exports

The `datatune.llm` package exports all provider classes:

```python
from datatune.llm import *

# Available: LLM, OpenAI, Ollama, VLLM, Azure, Mistral, Huggingface
```

资料来源：[datatune/llm/__init__.py:1]()

## Dependencies

| Library | Purpose |
|---------|---------|
| `litellm` | Unified LLM API interface |
| `token_counter` | Token estimation |
| `get_max_tokens` | Model context limits |

资料来源：[datatune/llm/llm.py:10-11]()

---

<a id='page-agent-system'></a>

## Agent System

### 相关页面

相关主题：[LLM Integration](#page-llm-integration), [Map Operations](#page-map-operations), [Filter Operations](#page-filter-operations)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
</details>

# Agent System

The Agent System is the core intelligent orchestration layer in datatune that automatically determines, plans, and executes data transformation workflows based on natural language prompts. It abstracts the complexity of selecting between Dask operations and LLM-powered primitives (Map, Filter), allowing users to describe desired transformations in plain English while the system handles the implementation details.

## Architecture Overview

The Agent System follows a planner-executor pattern where the LLM acts as the planner, generating a JSON-based execution plan, and the Agent class handles the runtime execution of that plan against Dask DataFrames.

```mermaid
graph TD
    A[User Natural Language Goal] --> B[Agent.do method]
    B --> C[LLM Planner]
    C --> D[JSON Execution Plan]
    D --> E[Agent._execute_plan]
    E --> F{Step Type}
    F -->|dask| G[Dask Operations]
    F -->|primitive| H[LLM Primitives<br/>Map/Filter]
    G --> I[Runtime Execution]
    H --> I
    I --> J[Result DataFrame]
```

资料来源：[datatune/agent/agent.py:1-50]()

## Core Components

### Agent Base Class

The `Agent` class serves as the abstract base for all agent implementations and provides the foundational system prompt that defines the agent's capabilities.

资料来源：[datatune/agent/__init__.py:1-50]()

| Component | Description |
|-----------|-------------|
| `system_prompt` | Defines available libraries (pandas, numpy, dask, datatune) and example usage patterns |
| `ABC` | Abstract base class pattern ensuring implementers provide core methods |
| `abstractmethod` | Decorator marking `do` method as required implementation |

```python
from abc import ABC, abstractmethod

class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    ..."""
```

### Agent Implementation

The concrete `Agent` class in `datatune/agent/agent.py` implements the planner-executor logic.

资料来源：[datatune/agent/agent.py:1-100]()

| Attribute | Type | Purpose |
|-----------|------|---------|
| `llm` | `LLM` | Language model used for planning and primitive operations |
| `history` | `List[Dict]` | Execution history for debugging and state tracking |
| `verbose` | `bool` | Enable debug-level logging when True |
| `TEMPLATE` | `dict` | Operation templates for code generation |

```python
def __init__(self, llm: LLM, verbose: bool = False):
    self.llm = llm
    self.history: List[Dict[str, Any]] = []
    self.verbose = verbose
    if self.verbose:
        logger.setLevel(logging.DEBUG)
    else:
        logger.setLevel(logging.INFO)
```

## Plan Structure

The Agent uses a JSON-based plan format where each step represents a single transformation operation. Plans are generated by the LLM based on user goals and the current DataFrame schema.

资料来源：[datatune/agent/agent.py:100-150]()

### Plan Step Schema

| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `type` | `string` | Yes | Either `"dask"` or `"primitive"` |
| `operation` | `string` | Yes | Operation name within the step type |
| `params` | `dict` | No | Parameters for Dask templates |
| `subprompt` | `string` | No | LLM prompt for primitive operations |
| `input_fields` | `list` | No | Input column names for the operation |
| `output_fields` | `list` | No | Output column names (Map only) |

### Step Types

**Dask Operations** execute server-side Python code using Dask's computational graph:

| Operation | Description |
|-----------|-------------|
| `add_column` | Create a new column from an expression |
| `apply_function` | Apply an element-wise function to a column |
| `rename_columns` | Rename columns using a mapping |
| `astype_column` | Change a column's data type |
| `group_by` | Group data and aggregate |

资料来源：[datatune/agent/agent.py:50-100]()

**Primitive Operations** use the LLM for row-level transformations:

| Operation | Description | Use Case |
|-----------|-------------|----------|
| `Map` | Create new columns via LLM transformation | Semantic extraction, classification, interpretation |
| `Filter` | Remove rows based on LLM criteria | Complex conditional filtering |

资料来源：[datatune/core/dask/map_dask.py:1-50](), [datatune/core/dask/filter_dask.py:1-50]()

## Plan Execution Flow

```mermaid
graph TD
    A[_execute_plan called] --> B[Set DataFrame in runtime]
    B --> C[Initialize code_lines list]
    C --> D{For each step in plan}
    D -->|Has steps| E[Format step template]
    E --> F[Log start message]
    F --> G[Execute map_partitions with log_primitive]
    G --> H[Increment step_num]
    H --> I[Execute operation template]
    I --> J[Log end message]
    J --> D
    D -->|No more steps| K[Call dt.finalize]
    K --> L[Compute DataFrame]
    L --> M[Return None, n_steps on success]
    
    N[Exception during execution] --> O[Return error, failed_step]
```

资料来源：[datatune/agent/agent.py:150-200]()

### Execution Methods

#### `_execute_plan(plan: List[Dict])`

Executes a complete multi-step plan sequentially.

```python
def _execute_plan(self, plan: List[Dict]):
    """Execute a sequence of plan steps (Dask or primitive)."""
    
    self._set_df(self.df)
    self.runtime.update({"step_num": 0, "plan": plan})
    code_lines = []
    n_steps = len(plan)
    
    for i, step in enumerate(plan, start=1):
        # ... step formatting and execution ...
    
    # Final execution
    full_code = (
        "\n".join(code_lines)
        + "\n\n"
        + "df = dt.finalize(df)\n"
        "df = df.compute()"
    )
    self.runtime.execute(full_code)
    return None, n_steps
```

#### `_execute_step(step: Dict)`

Executes a single step with detailed error handling.

```python
def _execute_step(self, step: Dict):
    try:
        if step["type"] == "dask":
            template = self.TEMPLATE["dask"][step["operation"]].format(**step["params"])
            self.runtime.execute(template + "\n_ = df.head()")
        elif step["type"] == "primitive":
            template = self.TEMPLATE["primitive"][step["operation"]].format(**step["params"])
            self.runtime.execute(template)
        else:
            raise ValueError(f"Unknown step type: {step['type']}")
    except Exception as e:
        error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
        return error_msg
```

资料来源：[datatune/agent/agent.py:200-250]()

## LLM Integration

The Agent integrates with the LLM system through `datatune/llm/llm.py` to generate execution plans and power primitive operations.

资料来源：[datatune/llm/llm.py:1-50]()

### Supported LLM Providers

| Provider | Class | Default Model |
|----------|-------|---------------|
| OpenAI | `OpenAI` | `gpt-3.5-turbo` |
| Ollama | `Ollama` | `gemma3:4b` |
| Azure | `Azure` | Configurable |
| vLLM | `VLLM` | Configurable |

```python
class OpenAI(LLM):
    def __init__(
        self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
    ):
        kwargs.update({"api_key": api_key})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)

class Ollama(LLM):
    def __init__(
        self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
    ) -> None:
        super().__init__(
            model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
        )
```

### Rate Limiting

The system includes model rate limits configuration to prevent API throttling:

| Model | TPM | RPM |
|-------|-----|-----|
| `gpt-3.5-turbo` | 200,000 | 500 |
| `gpt-4-turbo` | 30,000 | 500 |
| `gpt-4o` | 30,000 | 500 |

资料来源：[datatune/llm/model_rate_limits.py:1-50]()

## Prompt Generation

The Agent constructs prompts dynamically based on the current state and goal.

资料来源：[datatune/agent/agent.py:250-300]()

### Prompt Components

| Method | Purpose |
|--------|---------|
| `get_persona_prompt(goal)` | Generates the system instructions for plan generation |
| `get_schema_prompt(df)` | Includes current DataFrame schema |
| `get_error_prompt(error_msg, failed_step)` | Context for retry after failures |
| `get_full_prompt(df, goal)` | Combines all components |

```python
def get_schema_prompt(self, df: dd.DataFrame) -> str:
    schema_prompt: str = f"""The current schema of the dataframe df is as follows:
    {df.dtypes.to_string()}
    """
    return schema_prompt

def get_error_prompt(self, error_msg: str, failed_step: Dict) -> str:
    error_prompt: str = f"""
    The previous code execution failed with the following error:
    Error: {error_msg}
    
    Failed step:
    {failed_step}
    
    Provide the json plan with the corrected step. Make sure to:
    1. Address the specific error mentioned above.
    2. DO NOT include any explanations or comments.
    """
    return error_prompt
```

## Primitive Operations

### Map Operation

The Map primitive uses the LLM to create new columns by applying transformations to each row.

资料来源：[datatune/core/dask/map_dask.py:1-80]()

```python
# Example: Extract categories from description
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

**Output Format Requirements:**
- Each response must be: `index=<index>|{key1: value1, key2: value2, ...}<endofrow>`
- Strings must use double quotes
- Missing values should be `null`

### Filter Operation

The Filter primitive uses the LLM to evaluate row-level conditions.

资料来源：[datatune/core/dask/filter_dask.py:1-80]()

```python
# Example: Keep only electronics products
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

**Decision Format:**
- Each response must include `__filter__: True` (keep) or `__filter__: False` (remove)

## Error Handling

The Agent implements comprehensive error handling at multiple levels:

| Level | Handler | Action |
|-------|---------|--------|
| Step formatting | `ValueError` | Log and return error with step number |
| Template lookup | `KeyError` | Raise with available operation names |
| Runtime execution | `Exception` | Capture traceback, return formatted error |

```python
except Exception as e:
    step_num = self.runtime["step_num"]
    logger.error(f"❌ Step {step_num}/{n_steps} failed with error: {e}")
    return str(e), step_num - 1
```

资料来源：[datatune/logger.py:1-50]()

## Usage Examples

### Basic Agent Usage

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = dd.read_csv("products.csv")
result = agent.do("Add ProfitMargin column and keep only African organizations", df)
result.compute().to_csv("african_products.csv")
```

### Verbose Mode for Debugging

```python
agent = dt.Agent(llm, verbose=True)
result = agent.do("Complex transformation task", df)
```

With `verbose=True`, the Agent sets logger to `DEBUG` level, outputting detailed execution information including step progress and intermediate states.

### Chaining Operations

The Agent automatically chains multiple operations when a goal contains multiple tasks:

```
TASK: 1. create column a based on column x
      2. create column b based on column y
```

Combined into a single Map primitive step rather than separate steps.

---

---

## Doramagic 踩坑日志

项目：vitalops/datatune

摘要：发现 7 个潜在踩坑项，其中 0 个为 high/blocking；最高优先级：能力坑 - 能力判断依赖假设。

## 1. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | README/documentation is current enough for a first validation pass.

## 2. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | last_activity_observed missing

## 3. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

## 4. 安全/权限坑 · 存在安全注意事项

- 严重度：medium
- 证据强度：source_linked
- 发现：No sandbox install has been executed yet; downstream must verify before user use.
- 对用户的影响：用户安装前需要知道权限边界和敏感操作。
- 建议检查：转成明确权限清单和安全审查提示。
- 防护动作：安全注意事项必须面向用户前置展示。
- 证据：risks.safety_notes | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | No sandbox install has been executed yet; downstream must verify before user use.

## 5. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

## 6. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | issue_or_pr_quality=unknown

## 7. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | release_recency=unknown

<!-- canonical_name: vitalops/datatune; human_manual_source: deepwiki_human_wiki -->
