# https://github.com/vitalops/datatune 项目说明书

生成时间：2026-05-14 07:26:20 UTC

## 目录

- [Datatune简介](#intro)
- [快速开始](#quickstart)
- [系统架构](#architecture)
- [数据管理与数据流](#datamanagement)
- [Map操作](#map-operation)
- [Filter操作](#filter-operation)
- [Reduce操作](#reduce-operation)
- [Agent系统](#agent-system)
- [LLM集成](#llm-integration)
- [数据源支持](#datasource)

<a id='intro'></a>

## Datatune简介

### 相关页面

相关主题：[系统架构](#architecture), [快速开始](#quickstart)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/vitalops/datatune/blob/main/README.md)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
</details>

# Datatune简介

## 概述

Datatune是一个基于大语言模型（LLM）的数据转换框架，它允许用户使用自然语言描述来对结构化数据进行转换、映射和过滤操作。该框架支持Dask和Ibis两种后端计算引擎，能够处理大规模数据集，并通过多种LLM提供商（如OpenAI、Ollama、Azure等）实现智能化的数据处理。

资料来源：[README.md:1-20]()

## 核心架构

Datatune的架构主要由以下几个核心组件构成：

### 2.1 组件层次

```mermaid
graph TD
    A[用户接口层] --> B[Agent智能代理]
    A --> C[Primitive原语操作]
    B --> C
    C --> D[Map映射操作]
    C --> E[Filter过滤操作]
    D --> F[数据后端]
    E --> F
    F --> G[Dask后端]
    F --> H[Ibis后端]
    G --> I[DuckDB/PostgreSQL/BigQuery]
    H --> I
    F --> J[LLM调用层]
    J --> K[OpenAI]
    J --> L[Ollama]
    J --> M[Azure]
    J --> N[Mistral]
    J --> O[Huggingface]
    J --> P[VLLM]
```

### 2.2 核心模块说明

| 模块路径 | 功能说明 |
|---------|---------|
| `datatune.llm.llm` | LLM基类和多种LLM提供商实现 |
| `datatune.agent.agent` | Agent智能代理，支持自动规划数据转换流程 |
| `datatune.core.dask.map_dask` | Dask后端的Map映射操作实现 |
| `datatune.core.dask.filter_dask` | Dask后端的Filter过滤操作实现 |
| `datatune.core.ibis.map_ibis` | Ibis后端的Map映射操作实现 |
| `datatune.core.ibis.filter_ibis` | Ibis后端的Filter过滤操作实现 |
| `datatune.core.deduplication` | 数据去重功能，支持基于嵌入向量的相似度检测 |

资料来源：[datatune/llm/llm.py:1-150]()[datatune/agent/agent.py:1-100]()

## LLM支持

### 3.1 支持的LLM提供商

Datatune通过litellm库统一封装了多种LLM提供商，用户可以根据需求选择合适的模型：

资料来源：[datatune/llm/llm.py:45-120]()

| 提供商 | 类名 | 默认模型 | 特点 |
|-------|------|---------|------|
| OpenAI | `OpenAI` | gpt-3.5-turbo | 云端API，需提供api_key |
| Ollama | `Ollama` | gemma3:4b | 本地部署，支持gemma、llama等模型 |
| Azure | `Azure` | gpt-3.5-turbo | Azure OpenAI服务 |
| Mistral | `Mistral` | mistral-tiny | Mistral AI模型 |
| Huggingface | `Huggingface` | - | Hugging Face推理端点 |
| VLLM | `VLLM` | 用户指定 | 高性能本地推理服务 |

### 3.2 模型速率限制

框架内置了主流模型的速率限制配置，包括每分钟请求数（RPM）和每分钟令牌数（TPM）：

资料来源：[datatune/llm/model_rate_limits.py:1-100]()

| 模型 | RPM | TPM |
|------|-----|-----|
| gpt-3.5-turbo | 500 | 200,000 |
| gpt-4 | 500 | 10,000 |
| gpt-4-turbo | 500 | 30,000 |
| gpt-4o | 500 | 30,000 |
| gpt-4.1-mini | 500 | 200,000 |
| gpt-4.1-nano | 500 | 200,000 |

### 3.3 LLM基类实现

```python
class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式为 "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]
```

所有LLM类都继承自基类`LLM`，通过litellm库实现统一的调用接口，并支持自定义速率限制参数。

资料来源：[datatune/llm/llm.py:45-70]()

## 数据处理原语

### 4.1 Map映射操作

Map操作用于从现有数据创建新的列，通过LLM理解并应用转换逻辑：

```mermaid
graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM批量处理]
    C --> D[解析LLM输出]
    D --> E[反序列化到新列]
    E --> F[输出DataFrame]
```

**Dask实现流程：**

1. 序列化输入列数据
2. 检测重复记录并建立映射关系
3. 仅对主记录调用LLM
4. 复制结果到重复记录
5. 将LLM输出反序列化为新列

资料来源：[datatune/core/dask/map_dask.py:1-80]()[datatune/core/ibis/map_ibis.py:1-60]()

### 4.2 Filter过滤操作

Filter操作用于根据条件移除不满足要求的行：

```mermaid
graph LR
    A[输入DataFrame] --> B[LLM决策判断]
    B --> C[解析过滤决策]
    C --> D[应用过滤条件]
    D --> E[输出过滤后DataFrame]
```

**输出格式要求：**

LLM返回的过滤决策必须遵循特定格式：

```
index=<row_index>|{key1: value1, ..., '__filter__': True/False}<endofrow>
```

资料来源：[datatune/core/dask/filter_dask.py:1-50]()[datatune/core/ibis/filter_ibis.py:1-50]()

### 4.3 去重功能

Datatune提供了基于嵌入向量的智能去重功能：

```mermaid
graph TD
    A[原始数据] --> B[嵌入向量生成]
    B --> C[FAISS索引构建]
    C --> D[HNSW相似度搜索]
    D --> E[聚类分组]
    E --> F[主记录标识]
```

**核心参数：**

| 参数 | 说明 | 默认值 |
|------|------|-------|
| embedding_model | 嵌入模型名称 | text-embedding-3-small |
| sim_threshold | 相似度阈值 | 0.90 |
| top_k | Top-K近邻数 | 50 |
| hnsw_m | HNSW图的M参数 | 32 |
| ef_search | 搜索时的ef参数 | 64 |

资料来源：[datatune/core/deduplication.py:1-150]()

## Agent智能代理

### 5.1 代理概述

Agent是Datatune的高级抽象，它能够自动分析用户的自然语言请求，规划并执行一系列数据转换步骤：

```mermaid
graph TD
    A[用户自然语言请求] --> B[LLM解析意图]
    B --> C[生成执行计划]
    C --> D[计划验证]
    D --> E{计划有效?}
    E -->|是| F[逐步执行]
    E -->|否| G[返回错误]
    F --> H[执行日志记录]
    H --> I[完成并返回结果]
```

资料来源：[datatune/agent/agent.py:1-150]()[datatune/agent/__init__.py:1-40]()

### 5.2 计划格式

Agent生成的计划为JSON数组，每个步骤包含以下字段：

| 字段 | 类型 | 说明 |
|------|------|------|
| type | string | 操作类型：dask 或 primitive |
| operation | string | 操作名称 |
| params | dict | 操作参数字典 |
| subprompt | string | 原语操作的LLM提示词 |
| input_fields | list | 输入列列表 |
| output_fields | list | 输出列列表（仅Map） |

资料来源：[datatune/agent/agent.py:100-150]()

### 5.3 支持的Dask操作

Agent可使用的Dask操作包括：

| 操作类型 | 操作名 | 说明 |
|---------|--------|------|
| 列操作 | add_column | 从表达式创建新列 |
| 列操作 | apply_function | 对列应用函数 |
| 列操作 | rename_columns | 重命名列 |
| 列操作 | astype_column | 更改列数据类型 |

资料来源：[datatune/agent/agent.py:150-200]()

## 数据源支持

### 6.1 Dask后端

Dask后端支持大规模分布式数据处理：

```python
import dask.dataframe as dd
df = dd.read_csv("data.csv")
```

特点：
- 支持分区分布式计算
- 延迟执行提高性能
- 与pandas API兼容

### 6.2 Ibis后端

Ibis后端支持多种SQL数据库：

| 数据库 | 连接方式 |
|-------|---------|
| DuckDB | `ibis.duckdb.connect()` |
| PostgreSQL | `ibis.postgres.connect()` |
| BigQuery | `ibis.bigquery.connect()` |

资料来源：[README.md:30-60]()

## 快速开始

### 7.1 安装

```bash
pip install datatune
```

### 7.2 基本使用示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 使用Map创建新列
mapped = dt.map(
    prompt="从产品描述和名称中提取类别",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 使用Filter过滤数据
filtered = dt.filter(
    prompt="仅保留电子产品",
    input_fields=["Name"]
)(llm, mapped)

# 保存结果
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

### 7.3 Agent使用示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("添加ProfitMargin列并仅保留非洲组织", df)
result = dt.finalize(df)
```

Agent会自动：
- 确定使用哪些操作（map、filter等）
- 链接多个转换步骤
- 处理复杂的多步骤任务
- 生成并执行Python代码

资料来源：[README.md:1-100]()

## 系统提示词设计

Agent的系统提示词定义了可用的工具和上下文：

资料来源：[datatune/agent/__init__.py:1-50]()

**可用的Python库：**
- pandas
- numpy
- dask

**可用的数据处理原语：**
- Map：通过LLM提示词从现有数据创建新列
- Filter：通过LLM提示词根据条件移除行

## 错误处理机制

### 9.1 执行错误捕获

Agent实现了完善的错误处理机制：

```mermaid
graph TD
    A[执行步骤] --> B{是否成功?}
    B -->|是| C[记录日志]
    B -->|否| D[捕获异常]
    D --> E[格式化错误信息]
    E --> F[返回错误和步骤号]
```

资料来源：[datatune/agent/agent.py:80-120]()

### 9.2 速率限制警告

当使用的模型未配置速率限制时，系统会发出警告：

```python
if "rpm" not in kwargs:
    logger.warning(f"REQUESTS-PER-MINUTE limits for model '{model_name}' not found.")
if "tpm" not in kwargs:
    logger.warning(f"TOKENS-PER-MINUTE limits for model '{model_name}' not found.")
```

资料来源：[datatune/llm/llm.py:65-80]()

## 总结

Datatune是一个功能强大的LLM驱动数据转换框架，它：

1. **简化数据处理**：通过自然语言描述替代复杂的代码编写
2. **支持多种后端**：Dask用于大规模分布式计算，Ibis支持多种SQL数据库
3. **集成多LLM**：OpenAI、Ollama、Azure、Mistral等主流提供商
4. **智能去重**：基于嵌入向量的高效相似度检测
5. **Agent自动化**：自动规划并执行复杂的数据转换流程

该框架适用于需要对结构化数据进行转换、清洗和增强的各种场景，特别是当业务逻辑难以用传统编程表达时，Datatune能够显著提升开发效率。

---

<a id='quickstart'></a>

## 快速开始

### 相关页面

相关主题：[Map操作](#map-operation), [Filter操作](#filter-operation), [Agent系统](#agent-system)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [README.md](https://github.com/vitalops/datatune/blob/main/README.md)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)
</details>

# 快速开始

## 概述

datatune 是一个使用 LLM（大语言模型）进行数据转换和处理的 Python 库。它允许用户通过自然语言描述来执行复杂的数据操作任务，如数据映射、过滤和去重。快速开始指南旨在帮助用户在最短时间内上手 datatune，掌握其核心功能并完成基础的数据处理流程。

该库支持多种 LLM 后端，包括 OpenAI、Ollama、Azure、Mistral、Huggingface 和 VLLM，同时兼容 Dask 和 Ibis（支持 DuckDB、PostgreSQL、BigQuery 等）数据源。资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

## 环境准备

### 安装方式

datatune 可通过 pip 直接安装：

```bash
pip install datatune
```

资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

### LLM 配置

datatune 支持多种 LLM 提供商，需根据使用的模型进行相应配置：

| LLM 类型 | 初始化方式 | 默认模型 | 必需参数 |
|---------|-----------|---------|---------|
| OpenAI | `OpenAI(model_name="gpt-3.5-turbo")` | gpt-3.5-turbo | api_key（可选） |
| Ollama | `Ollama()` | gemma3:4b | api_base（默认 localhost:11434） |
| Azure | `Azure(model_name="...")` | - | api_key, api_base, api_version |
| Mistral | `Mistral(model_name="mistral/mistral-tiny")` | mistral/mistral-tiny | api_key |
| Huggingface | `Huggingface(model_name="...")` | - | api_key |
| VLLM | `VLLM(model_name="...")` | - | api_base |

资料来源：[datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)

## 核心工作流程

datatune 的数据处理遵循标准的三步流程：加载数据、执行转换、最终化结果。以下是完整的工作流程图：

```mermaid
graph TD
    A[读取数据源] --> B[map 或 filter 转换]
    B --> C{datatune.finalize]
    C --> D[执行 compute]
    D --> E[保存结果]
    
    F[LLM 实例] --> B
```

## 数据加载

### Dask 数据源

使用 Dask 读取 CSV 文件：

```python
import dask.dataframe as dd

df = dd.read_csv("products.csv")
```

### Ibis 数据源

使用 Ibis 连接 DuckDB 或其他数据库：

```python
import ibis

con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
```

资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

## map 操作

`dt.map()` 用于从现有列生成新的列，通过自然语言描述转换逻辑。

```python
import datatune as dt

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

### 参数说明

| 参数 | 类型 | 说明 |
|-----|------|-----|
| prompt | str | 自然语言描述转换规则 |
| output_fields | List[str] | 输出新列名列表 |
| input_fields | List[str] | 输入列名列表 |

资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

### 实现原理

map 操作内部通过 LLM 调用处理数据，将每行数据序列化为字典格式，附加到提示词中发送给 LLM，LLM 返回包含新字段的字典。实现文件位于 `datatune/core/dask/map_dask.py` 和 `datatune/core/ibis/map_ibis.py`。

map_dask.py 中的提示词格式如下：

```python
suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
)
```

资料来源：[datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

## filter 操作

`dt.filter()` 用于根据条件过滤数据行。

```python
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

### 参数说明

| 参数 | 类型 | 说明 |
|-----|------|-----|
| prompt | str | 自然语言描述过滤条件 |
| input_fields | List[str] | 输入列名列表 |

资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

### 实现原理

filter 操作使用特殊格式让 LLM 判断每行是否应保留。返回格式包含 `__filter__` 键，值为 `True` 表示保留，`False` 表示移除。

```python
suffix = (
    f"{os.linesep}{os.linesep}"
    "DECISION:Your response MUST be the entire input record as a Python dictionary..."
    "with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it."
)
```

资料来源：[datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)

## 最终化与保存结果

使用 `dt.finalize()` 完成所有计算并将结果转换为标准 DataFrame：

```python
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

`finalize()` 函数负责触发 Dask 计算图的实际执行，将延迟操作转换为最终结果。资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

## Agent 模式

对于复杂的、多步骤的数据处理任务，可使用 `dt.Agent` 让 AI 自动规划转换步骤：

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = dd.read_csv("data.csv")

# 描述复杂需求，Agent 自动规划执行步骤
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
```

### Agent 自动完成的工作

| 功能 | 说明 |
|-----|------|
| 操作类型判断 | 自动决定使用 map、filter 还是其他操作 |
| 步骤链式执行 | 自动串联多个转换步骤 |
| 复杂任务处理 | 支持从单一提示词执行多步操作 |
| 代码生成 | 生成 Python 代码和行级原语操作 |

资料来源：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

### Agent 系统提示词

Agent 基于以下系统提示词工作：

```
You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your runtime:
- pandas
- numpy
- dask
```

资料来源：[datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)

## 模型速率限制

datatune 内置了常见 OpenAI 模型的速率限制配置，确保 API 调用不会超出限制：

| 模型 | TPM（每分钟令牌数） | RPM（每分钟请求数） |
|-----|---------------------|---------------------|
| gpt-3.5-turbo | 200,000 | 500 |
| gpt-4 | 10,000 | 500 |
| gpt-4-turbo | 30,000 | 500 |
| gpt-4.1 | 30,000 | 500 |
| gpt-4.1-mini | 200,000 | 500 |
| gpt-4.1-nano | 200,000 | 500 |
| gpt-4o | 30,000 | 500 |

资料来源：[datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)

## 完整示例

以下是一个完整的快速开始示例，整合所有核心功能：

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 1. 初始化 LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# 2. 加载数据
df = dd.read_csv("products.csv")

# 3. 使用 map 提取类别
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 4. 使用 filter 筛选电子产品
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 5. 最终化并保存
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

## 后续步骤

- 查看完整文档：https://docs.datatune.ai/
- 参考示例代码：https://github.com/vitalops/datatune/tree/main/examples
- 加入社区 Discord：https://discord.gg/3RKA5AryQX

---

<a id='architecture'></a>

## 系统架构

### 相关页面

相关主题：[Agent系统](#agent-system), [LLM集成](#llm-integration), [数据源支持](#datasource)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
</details>

# 系统架构

## 概述

Datatune 是一个基于大语言模型（LLM）的数据处理框架，其核心设计目标是通过自然语言描述实现对数据的转换、过滤和去重操作。系统采用模块化架构，将 LLM 交互、数据处理后端和智能代理三个层面解耦，支持 Dask 和 Ibis 两种数据处理后端，兼容 DuckDB、PostgreSQL、BigQuery 等多种数据源。

资料来源：[datatune/__init__.py:1-7]()

## 架构分层

Datatune 采用三层架构设计，从下至上依次为：

```mermaid
graph TD
    subgraph "表现层"
        Agent
    end
    
    subgraph "核心操作层"
        Map
        Filter
        Reduce
    end
    
    subgraph "后端抽象层"
        Dask后端
        Ibis后端
    end
    
    subgraph "LLM交互层"
        OpenAI
        Ollama
        VLLM
        Azure
    end
    
    Agent --> Map
    Agent --> Filter
    Map --> Dask后端
    Map --> Ibis后端
    Filter --> Dask后端
    Filter --> Ibis后端
    OpenAI --> Agent
    Ollama --> Agent
    VLLM --> Agent
    Azure --> Agent
```

### 各层职责

| 层级 | 组件 | 职责 |
|------|------|------|
| 表现层 | Agent | 理解用户意图，规划执行步骤，生成并执行代码 |
| 核心操作层 | Map/Filter/Reduce | 提供声明式数据转换接口 |
| 后端抽象层 | Dask/Ibis | 实现分布式数据处理逻辑 |
| LLM交互层 | 各LLM实现类 | 封装与不同语言模型的通信协议 |

资料来源：[datatune/agent/agent.py:1-15]()

## LLM 交互层

### 抽象基类设计

`LLM` 是所有 LLM 提供者的抽象基类，通过 `litellm` 库实现统一的模型调用接口：

```python
class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式: "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]
```

资料来源：[datatune/llm/llm.py:17-24]()

### 支持的 LLM 提供者

| 提供者 | 类名 | 默认模型 | 特点 |
|--------|------|----------|------|
| OpenAI | `OpenAI` | gpt-3.5-turbo | 云端 API |
| Ollama | `Ollama` | gemma3:4b | 本地部署 |
| VLLM | `VLLM` | 需指定 | 高性能推理 |
| Azure | `Azure` | 需指定 | 企业级部署 |

资料来源：[datatune/llm/llm.py:65-108]()

### 模型速率限制

系统内置了主流模型的速率限制配置，包含每分钟请求数（RPM）和每分钟令牌数（TPM）：

```python
model_rate_limits = {
    "gpt-3.5-turbo": {"tpm": 200_000, "rpm": 500},
    "gpt-4": {"tpm": 10_000, "rpm": 500},
    "gpt-4o": {"tpm": 30_000, "rpm": 500},
}
```

资料来源：[datatune/llm/model_rate_limits.py:1-20]()

## 核心操作层

### Map 操作

Map 操作通过 LLM 将现有列转换为新列，实现数据转换功能：

```mermaid
graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM批量推理]
    C --> D[解析输出字典]
    D --> E[写入新列]
    E --> F[输出DataFrame]
```

关键流程包括：
1. 将输入列序列化为字符串格式
2. 调用 LLM 批量处理，根据 prompt 生成映射结果
3. 解析 LLM 返回的 Python 字典格式输出
4. 将结果写入新生成的列

资料来源：[datatune/core/dask/map_dask.py:1-50]()

### Filter 操作

Filter 操作根据 LLM 生成的决策条件过滤数据行：

```mermaid
graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM决策判断]
    C --> D[解析__filter__字段]
    D --> E[布尔索引过滤]
    E --> F[输出DataFrame]
```

LLM 响应格式要求包含 `__filter__` 字段，值为 `True` 表示保留，`False` 表示移除：

```
index=0|{key1: value1, key2: value2, '__filter__': True}<endofrow>
```

资料来源：[datatune/core/dask/filter_dask.py:1-60]()

### 语义去重

`SemanticDeduplicator` 使用嵌入向量和 FAISS 索引实现语义级别的去重：

```mermaid
graph TD
    A[原始数据] --> B[嵌入生成]
    B --> C[FAISS HNSW索引]
    C --> D[流式聚类]
    D --> E[重复组识别]
    E --> F[规范ID映射]
```

去重器支持以下参数：

| 参数 | 说明 | 默认值 |
|------|------|--------|
| embedding_model | 嵌入模型名称 | text-embedding-3-small |
| sim_threshold | 相似度阈值 | 0.90 |
| top_k | 最近邻数量 | 50 |
| hnsw_m | HNSW 索引参数 | 32 |
| ef_search | 搜索精度参数 | 64 |

资料来源：[datatune/core/deduplication.py:1-80]()

## Agent 代理层

### Agent 执行流程

Agent 是系统的智能编排层，能够自动分析用户需求并生成执行计划：

```mermaid
graph TD
    A[用户自然语言描述] --> B[LLM生成执行计划]
    B --> C{计划步骤循环}
    C -->|步骤类型为dask| D[执行Dask操作模板]
    C -->|步骤类型为primitive| E[执行Primitive操作]
    D --> F[记录步骤日志]
    E --> F
    F --> G{还有更多步骤?}
    G -->|是| C
    G -->|否| H[计算并返回结果]
```

### 执行计划结构

Agent 生成的执行计划是一个 JSON 数组，每个步骤包含以下字段：

| 字段 | 类型 | 说明 |
|------|------|------|
| type | string | "dask" 或 "primitive" |
| operation | string | 操作名称 |
| params | dict | Dask 模板参数 |
| subprompt | string | Primitive 操作的 LLM 提示词 |
| input_fields | list | 输入列名 |
| output_fields | list | 输出列名（仅 Map） |

资料来源：[datatune/agent/agent.py:80-120]()

### 操作模板

Agent 支持两种类型的操作模板：

#### Dask 操作模板

用于直接生成 Dask DataFrame 操作代码：

```python
TEMPLATE = {
    "dask": {
        "add_column": "df = df.assign({new_column}=...)",
        "group_by": "df = df.groupby(...).agg(...)",
        "rename_columns": "df = df.rename(columns={mapping})",
    }
}
```

#### Primitive 操作模板

用于通过 LLM 生成逐行转换逻辑：

```python
TEMPLATE = {
    "primitive": {
        "Map": "df = dt.Map(subprompt=...)",
        "Filter": "df = dt.Filter(subprompt=...)",
    }
}
```

资料来源：[datatune/agent/agent.py:25-50]()

## 数据流处理

### Dask 分区处理

系统使用 `map_partitions` 实现分布式数据处理，保证每个分区独立执行 LLM 操作：

```python
code_lines += [
    f"df = df.map_partitions(log_primitive, {start_msg!r}, meta=df._meta)",
    "step_num += 1",
    template,
    f"df = df.map_partitions(log_primitive, {end_msg!r}, meta=df._meta)"
]
```

资料来源：[datatune/agent/agent.py:55-75]()

### 重复记录处理

在 Map 和 Filter 操作中，系统采用规范 ID 映射策略处理重复记录：

```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]
llm_out = llm(canonical_input, ...)

# 将结果回填到重复记录
for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/map_dask.py:30-55]()

## 公共 API

系统通过 `datatune/__init__.py` 导出以下核心组件：

```python
from datatune.agent.agent import Agent
from datatune.core.filter import filter
from datatune.core.map import map
from datatune.core.dask.op import finalize
from datatune.core.deduplication import SemanticDeduplicator
from datatune.core.reduce import reduce

__all__ = ["map", "filter", "finalize", "Agent", "reduce"]
```

资料来源：[datatune/__init__.py:1-10]()

## 错误处理机制

### 步骤级错误追踪

Agent 在执行计划时维护步骤计数器，失败时返回错误信息和失败步骤编号：

```python
def _execute_plan(self, plan: List[Dict]):
    self.runtime.update({"step_num": 0, "plan": plan})
    for i, step in enumerate(plan, start=1):
        try:
            # 执行步骤...
            self.runtime["step_num"] = i
        except Exception as e:
            return str(e), i - 1
```

资料来源：[datatune/agent/agent.py:45-70]()

### 输出解析容错

LLM 输出解析使用 `ast.literal_eval` 进行安全评估，解析失败时返回空字典：

```python
def safe_parse(row):
    try:
        return ast.literal_eval(row)
    except Exception:
        return {}
```

资料来源：[datatune/core/deduplication.py:60-65]()

## 总结

Datatune 的系统架构体现了以下设计原则：

1. **层次化解耦**：LLM 交互、数据处理和用户接口三层分离，便于维护和扩展
2. **多后端支持**：通过 Dask 和 Ibis 抽象，支持从单机到分布式多种部署场景
3. **智能编排**：Agent 层自动规划执行路径，降低用户使用门槛
4. **容错设计**：完善的错误处理和日志记录机制，便于问题诊断
5. **语义去重**：基于嵌入向量的语义匹配，实现精准的数据去重

---

<a id='datamanagement'></a>

## 数据管理与数据流

### 相关页面

相关主题：[数据源支持](#datasource), [Map操作](#map-operation), [Filter操作](#filter-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/core/reduce.py](https://github.com/vitalops/datatune/blob/main/datatune/core/reduce.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
</details>

# 数据管理与数据流

## 概述

Datatune 是一个基于 LLM 的数据处理库，通过自然语言提示词实现数据的映射（Map）、过滤（Filter）和去重（Deduplication）操作。系统支持多种后端数据源，包括 Dask DataFrame 和 Ibis（支持 DuckDB、PostgreSQL、BigQuery 等），并通过统一的 API 接口实现数据流的透明处理。

核心设计理念是将复杂的数据转换逻辑封装为可组合的操作符，用户只需通过自然语言描述期望的转换结果，系统即可自动生成并执行相应的处理流程。

## 架构概览

```mermaid
graph TD
    subgraph "数据输入层"
        CSV[CSV 文件]
        SQLDB[(SQL 数据库)]
        DuckDB[(DuckDB)]
    end
    
    subgraph "抽象接口层"
        DD[Dask DataFrame]
        IB[Ibis Table]
    end
    
    subgraph "核心操作层"
        MAP[Map 操作]
        FILTER[Filter 操作]
        DEDUP[去重操作]
        REDUCE[Reduce 操作]
    end
    
    subgraph "LLM 服务层"
        OPENAI[OpenAI]
        OLLAMA[Ollama]
        AZURE[Azure]
        VLLM[VLLM]
    end
    
    CSV --> DD
    SQLDB --> IB
    DuckDB --> IB
    
    DD --> MAP
    DD --> FILTER
    DD --> DEDUP
    IB --> MAP
    IB --> FILTER
    
    MAP --> LLM
    FILTER --> LLM
    
    LLM --> OPENAI
    LLM --> OLLAMA
    LLM --> AZURE
    LLM --> VLLM
```

## 数据源支持

Datatune 通过抽象数据类型检测机制自动选择合适的后端实现。

### 支持的数据源

| 数据源类型 | 后端实现 | 导入方式 |
|-----------|---------|---------|
| CSV/Parquet | Dask | `dask.dataframe as dd` |
| DuckDB | Ibis | `ibis.duckdb.connect()` |
| PostgreSQL | Ibis | `ibis.postgres.connect()` |
| BigQuery | Ibis | `ibis.bigquery.connect()` |

### 数据类型检测

数据类型检测通过 `__init__.py` 中的内部函数实现：

```python
def _is_ibis_table(obj):
    import ibis
    return isinstance(obj, ibis.Table)

def _is_dask_df(obj):
    import dask.dataframe as dd
    return isinstance(obj, dd.DataFrame)
```

资料来源：[datatune/core/map.py:1-16](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)

## Map 操作数据流

Map 操作是 Datatune 的核心功能之一，用于根据自然语言提示词生成新的数据列。

### 数据处理流程

```mermaid
graph LR
    A[输入 DataFrame] --> B[序列化输入列]
    B --> C[去重预处理]
    C --> D[批量调用 LLM]
    D --> E[解析 LLM 输出]
    E --> F[合并去重结果]
    F --> G[输出包含新列的 DataFrame]
```

### Dask 后端 Map 实现

Dask 后端的 Map 操作通过 `_map_dask` 函数实现，采用延迟计算和分区处理策略：

```python
def _map_dask(prompt, output_fields, input_fields=None, clusters=None):
    # 1. 序列化输入列为 JSON 字符串
    serialized_input_column = "serialized_input"
    df[serialized_input_column] = df[input_fields].apply(
        lambda x: json.dumps(x.to_dict()), axis=1, meta=(serialized_input_column, str)
    )
    
    # 2. 构建 LLM 调用前缀和后缀
    prefix = "..."
    suffix = "Your response MUST be the entire input record..."
    
    # 3. 处理重复数据的预处理
    dup_to_canon = {
        dup: c["canonical_id"]
        for c in clusters
        for dup in c["duplicate_ids"]
    }
    canonical_idx = input_series.index.difference(dup_to_canon.keys())
    canonical_input = input_series.loc[canonical_idx]
    
    # 4. 批量调用 LLM
    llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)
    
    # 5. 将 LLM 结果写回 DataFrame
    df.loc[canonical_idx, llm_output_column] = llm_out
    
    # 6. 复制去重结果到重复行
    for dup, canon in dup_to_canon.items():
        df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/map_dask.py:1-45](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

### Ibis 后端 Map 实现

Ibis 后端采用不同的实现方式，使用行号索引和内存表进行数据处理：

```python
def _map_ibis(table, llm, input_col, output_col, prompt, expected_new_fields):
    # 添加行号索引
    indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
    
    # 将数据拉到本地执行
    local_data = indexed_table.select("_ROW_ID_", input_col).execute()
    input_list = local_data[input_col].tolist()
    
    # 调用 LLM
    raw_results = llm(input_list, prefix, mapping_prompt, suffix, optimized=True)
    
    # 解析结果并处理异常
    processed_results = []
    for res in raw_results:
        try:
            py_dict = ast.literal_eval(str(res).strip())
            processed_results.append(json.dumps(py_dict))
        except Exception:
            processed_results.append("{}")
    
    # 创建内存表并关联
    mapping_df = pd.DataFrame({
        "_ROW_ID_": local_data["_ROW_ID_"].values, 
        output_col: processed_results
    })
    mapping_table = ibis.memtable(mapping_df)
```

资料来源：[datatune/core/ibis/map_ibis.py:1-50](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)

### LLM 输出格式规范

Map 操作要求 LLM 返回特定格式的输出：

```
index=<row_index>|{key1: value1, key2: value2, ...}
```

- 必须以 `index=<row_index>|` 开头
- 主体必须是有效的 Python 字典格式
- 新增字段必须使用双引号
- 缺失值使用 `None` 表示

资料来源：[datatune/core/dask/map_dask.py:15-25](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

## Filter 操作数据流

Filter 操作根据自然语言条件过滤数据行。

### 过滤处理流程

```mermaid
graph LR
    A[输入 DataFrame] --> B[序列化数据行]
    B --> C[构建过滤提示词]
    C --> D[批量调用 LLM]
    D --> E[解析过滤决策]
    E --> F[应用过滤掩码]
    F --> G[输出过滤后的 DataFrame]
```

### Dask 后端 Filter 实现

```python
def _filter_dask(df, llm, input_fields, prompt, clusters=None):
    # 序列化输入数据
    serialized_input_column = "serialized_input"
    df[serialized_input_column] = df[input_fields].apply(
        lambda x: json.dumps(x.to_dict()), axis=1, meta=(serialized_input_column, str)
    )
    
    # 构建过滤提示词
    suffix = (
        f"{os.linesep}{os.linesep}"
        "DECISION:Your response MUST be the entire input record..."
        "with added key called '__filter__' with value either True to KEEP "
        "the record or False to REMOVE it."
    )
    
    # 去重处理
    dup_to_canon = {
        dup: c["canonical_id"]
        for c in clusters
        for dup in c["duplicate_ids"]
    }
    canonical_idx = input_series.index.difference(dup_to_canon.keys())
    canonical_input = input_series.loc[canonical_idx]
    
    # 调用 LLM 获取过滤决策
    llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)
    
    # 合并去重结果
    for dup, canon in dup_to_canon.items():
        df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/filter_dask.py:1-40](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)

### LLM 过滤决策格式

Filter 操作要求 LLM 返回带有 `__filter__` 键的字典：

```
index=<row_index>|{key1: value1, key2: value2, ..., '__filter__': True}
```

- `__filter__: True` 表示保留该行
- `__filter__: False` 表示移除该行

资料来源：[datatune/core/dask/filter_dask.py:18-24](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)

## 去重机制

Datatune 实现了基于语义相似度的去重功能，用于优化 LLM 调用并保持数据一致性。

### 去重处理架构

```mermaid
graph TD
    A[输入分区数据] --> B[Embedding 向量化]
    B --> C[构建 FAISS HNSW 索引]
    C --> D[相似度搜索]
    D --> E[聚类重复数据]
    E --> F[合并重复行结果]
```

### 去重处理流程

```python
def _embed_and_write_partition(self, part, partition_id, output_dir):
    # 解析行为字典
    dicts = [safe_parse(row) for row in pdf]
    
    # 构建文本表示
    texts = [
        ", ".join(f"{k}: {v}" for k, v in d.items())
        for d in dicts
    ]
    
    # 批量生成 Embedding
    for i in range(0, len(texts), 256):
        batch = texts[i:i+256]
        resp = embedding(model=self.embedding_model, input=batch)
        embeddings.extend([item["embedding"] for item in resp["data"]])
    
    # 归一化并存储
    X = np.asarray(embeddings, dtype="float32")
    faiss.normalize_L2(X)
    np.save(f"{output_dir}/embeddings_part_{partition_id}.npy", X)
```

资料来源：[datatune/core/deduplication.py:1-60](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)

### 去重配置参数

| 参数 | 默认值 | 说明 |
|-----|-------|------|
| `embedding_model` | `text-embedding-3-small` | 用于向量化的 Embedding 模型 |
| `sim_threshold` | `0.90` | 相似度阈值，超过此值视为重复 |
| `top_k` | `50` | 搜索时返回的最近邻数量 |
| `hnsw_m` | `32` | HNSW 索引参数 M |
| `ef_search` | `64` | HNSW 搜索参数 |

资料来源：[datatune/core/deduplication.py:1-25](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)

## Reduce 操作

Reduce 操作通过注册-执行模式实现数据的聚合转换。

### 架构设计

```mermaid
graph LR
    A[定义 Action] --> B[注册到全局表]
    B --> C[调用 reduce 函数]
    C --> D[查找并实例化 Action]
    D --> E[执行数据处理]
```

### Action 注册机制

```python
_ACTIONS = {}

def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator

def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")

def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)   
    return reducer(df)
```

资料来源：[datatune/core/reduce.py:1-25](https://github.com/vitalops/datatune/blob/main/datatune/core/reduce.py)

## Agent 编排层

Agent 是 Datatune 的智能编排组件，能够根据用户自然语言描述自动规划并执行数据处理流程。

### Agent 工作流

```mermaid
graph TD
    A[用户描述任务] --> B[规划器分析意图]
    B --> C{判断操作类型}
    C -->|需要语义理解| D[使用 Primitive 操作]
    C -->|纯数据转换| E[使用 Dask 操作]
    D --> F[执行 Map/Filter]
    E --> G[执行 add_column/group_by 等]
    F --> H[合并结果]
    G --> H
    H --> I[返回处理后的 DataFrame]
```

### 计划执行流程

```python
def _execute_plan(self, plan: List[Dict]):
    self._set_df(self.df)
    runtime.update({"step_num": 0, "plan": plan})
    
    for i, step in enumerate(plan, start=1):
        step_type = step["type"]
        operation = step["operation"]
        params = step["params"]
        
        if step_type == "dask":
            template = self.TEMPLATE["dask"][step["operation"]].format(**params)
            self.runtime.execute(template + "\n_ = df.head()")
        elif step_type == "primitive":
            template = self.TEMPLATE["primitive"][step["operation"]].format(**params)
            self.runtime.execute(template)
```

资料来源：[datatune/agent/agent.py:1-80](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)

### 支持的操作类型

#### Primitive 操作

| 操作类型 | 说明 | 参数 |
|---------|------|------|
| `Map` | 从文本生成新列 | `subprompt`, `input_fields`, `output_fields` |
| `Filter` | 根据条件过滤行 | `subprompt`, `input_fields` |

#### Dask 操作

| 操作类型 | 说明 | 参数 |
|---------|------|------|
| `add_column` | 从表达式添加列 | `new_column`, `expression` |
| `group_by_agg` | 分组聚合 | `group_columns`, `aggregations` |
| `rename_columns` | 重命名列 | `mapping` |
| `astype_column` | 改变列类型 | `column`, `dtype` |

资料来源：[datatune/agent/agent.py:50-100](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)

## LLM 接口层

LLM 模块提供了统一的接口，支持多种大语言模型服务。

### 支持的 LLM 提供商

| 提供商 | 类名 | 默认模型 | 配置参数 |
|-------|------|---------|---------|
| OpenAI | `OpenAI` | `gpt-3.5-turbo` | `api_key` |
| Ollama | `Ollama` | `gemma3:4b` | `api_base` |
| Azure | `Azure` | 自定义 | `api_key`, `api_base`, `api_version` |
| VLLM | `VLLM` | 自定义 | `api_base`, `max_tokens` |

### 批处理机制

LLM 模块实现了智能批处理，根据输入长度动态分组请求以优化 API 调用：

```python
# token 计数估算
prefix_suffix_tokens = token_counter(model_name, messages=message(""))
total_ntokens = prefix_suffix_tokens

for i, prompt in enumerate(input_rows):
    # 动态批处理逻辑
    # 当累计 token 接近限制时执行当前批次
```

资料来源：[datatune/llm/llm.py:1-60](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)

## API 参考

### 核心函数

#### map 函数

```python
def map(*, prompt, output_fields, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_dask_df(data):
            from .dask.map_dask import _map_dask
            return _map_dask(prompt=prompt, output_fields=output_fields, 
                           input_fields=input_fields, clusters=clusters)(llm, data)
        elif _is_ibis_table(data):
            from .ibis.map_ibis import _map_ibis
            return _map_ibis(prompt=prompt, output_fields=output_fields,
                            input_fields=input_fields)(llm, data)
    return apply
```

资料来源：[datatune/core/map.py:20-38](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)

#### filter 函数

```python
def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_dask_df(data):
            from .dask.filter_dask import _filter_dask
            return _filter_dask(prompt=prompt, input_fields=input_fields,
                               clusters=clusters)(llm, data)
        elif _is_ibis_table(data):
            from .ibis.filter_ibis import _filter_ibis
            return _filter_ibis(prompt=prompt, input_fields=input_fields)(llm, data)
    return apply
```

资料来源：[datatune/core/filter.py:20-36](https://github.com/vitalops/datatune/blob/main/datatune/core/filter.py)

### 模块导出

```python
from datatune import map, filter, finalize, Agent, reduce
```

资料来源：[datatune/__init__.py:1-8](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)

## 完整使用示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化 LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# 读取数据
df = dd.read_csv("products.csv")

# Map: 提取分类信息
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# Filter: 保留电子产品
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 获取最终结果
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

---

<a id='map-operation'></a>

## Map操作

### 相关页面

相关主题：[Filter操作](#filter-operation), [Reduce操作](#reduce-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/map.py](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
</details>

# Map操作

## 概述

Map操作是datatune库的核心功能之一，它通过大型语言模型（LLM）实现对数据列的语义转换和增强。用户只需提供自然语言描述的转换规则，Map操作即可自动解析数据、调用LLM生成新字段，并返回增强后的数据集。

该操作支持Dask和Ibis两种后端，能够处理分布式数据源（如CSV、数据库表等），同时内置去重优化机制，避免对语义相同的记录重复调用LLM API，从而显著降低使用成本。

## 架构设计

### 模块层次

```
datatune.core.map (顶层接口)
    ├── datatune.core.dask.map_dask (Dask后端实现)
    └── datatune.core.ibis.map_ibis (Ibis后端实现)
```

### 执行流程

```mermaid
graph TD
    A[开始Map操作] --> B[序列化输入列]
    B --> C{启用去重?}
    C -->|是| D[构建重复记录映射]
    C -->|否| E[跳过去重]
    D --> F[提取唯一记录索引]
    E --> G[调用LLM API]
    F --> G
    G --> H[解析LLM输出]
    H --> I[填充结果到DataFrame]
    I --> J[返回增强后的数据]
    D --> K[复制结果到重复记录]
    K --> J
```

## 核心实现

### Dask后端 map_dask.py

#### 去重优化机制

Dask后端的Map操作实现了智能去重功能，通过`SemanticDeduplicator`预先识别语义相似的记录。当存在重复记录群组时，系统仅对每个群组的**canonical（规范）记录**调用LLM，然后将结果复制到群组内的其他重复记录。

```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]

llm_out = llm(
    canonical_input,
    prefix,
    prompt,
    suffix,
    optimized=True
)

df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/map_dask.py:25-46](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

#### LLM调用与输出解析

Map操作使用特定的输出格式约束，确保LLM返回可解析的Python字典：

```python
suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
     
    "ALWAYS START YOUR RESPONSE WITH 'index=<row_index>|' WHERE <row_index> IS THE INDEX OF THE ROW." \
    "IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO null" \
    "'index=<row_index>|{key1: None, key2: value2, ...}'"
)
```

输出解析通过`parse_llm_output`函数处理，将LLM返回的字符串转换为Python字典对象。

资料来源：[datatune/core/dask/map_dask.py:8-17](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

### Ibis后端 map_ibis.py

#### 行号索引机制

Ibis后端为每个输入记录添加行号索引，以便后续结果关联：

```python
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))

local_data = indexed_table.select("_ROW_ID_", input_col).execute()

input_list = local_data[input_col].tolist()

raw_results = llm(input_list, prefix, mapping_prompt, suffix, optimized=True)
```

资料来源：[datatune/core/ibis/map_ibis.py:35-45](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)

#### 结果转换与关联

解析后的结果被转换为内存表并与原表JOIN：

```python
processed_results = []
for res in raw_results:
    try:
        py_dict = ast.literal_eval(str(res).strip())
        processed_results.append(json.dumps(py_dict))
    except Exception:
        processed_results.append("{}")

mapping_df = pd.DataFrame({
    "_ROW_ID_": local_data["_ROW_ID_"].values, 
    output_col: processed_results
})
mapping_table = ibis.memtable(mapping_df)
```

资料来源：[datatune/core/ibis/map_ibis.py:47-60](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)

## Agent集成

### Plan定义格式

在Agent的Plan中，Map操作作为primitive步骤声明：

```json
{
    "type": "primitive",
    "operation": "map",
    "params": {
        "subprompt": "Extract category and sub-category from industry",
        "input_fields": ["Industry"],
        "output_fields": ["Category", "Sub-Category"]
    }
}
```

资料来源：[datatune/agent/agent.py:78-85](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)

### 步骤类型决策规则

Agent根据以下规则选择Map操作：

| 条件 | 选择操作类型 |
|------|-------------|
| 需要自然语言理解、语义提取、分类 | primitive (Map/Filter) |
| 涉及多列语义推理 | primitive (Map) |
| 简单的数值/列操作 | dask |
| 复杂多步骤转换 | 优先使用dask，后续步骤依赖新列时可使用primitive |

## 使用示例

### 基础用法

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 提取类别信息
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 完成转换并保存
result = dt.finalize(mapped)
result.compute().to_csv("mapped_products.csv")
```

### 与Agent结合使用

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
```

## API参数说明

### dt.map 函数参数

| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| prompt | str | 是 | 自然语言描述的转换规则 |
| input_fields | List[str] | 是 | 输入列名列表 |
| output_fields | List[str] | 否 | 新生成的输出列名列表 |
| optimized | bool | 否 | 是否启用去重优化，默认为True |

### LLM类配置

datatune支持多种LLM后端，各后端的初始化参数如下：

| LLM类 | 主要参数 | 默认值 |
|-------|---------|--------|
| OpenAI | model_name, api_key | gpt-3.5-turbo |
| Ollama | model_name, api_base | gemma3:4b, http://localhost:11434 |
| Azure | model_name, api_key, api_base, api_version | - |
| VLLM | model_name, api_base | - , http://localhost:8000/v1 |

资料来源：[datatune/llm/llm.py:48-95](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)

## 速率限制

datatune内置了常用模型的速率限制配置，支持tpm（每分钟Token数）和rpm（每分钟请求数）两个维度的控制。

| 模型 | TPM | RPM |
|------|-----|-----|
| gpt-3.5-turbo | 200,000 | 500 |
| gpt-4 | 10,000 | 500 |
| gpt-4-turbo | 30,000 | 500 |
| gpt-4.1-mini | 200,000 | 500 |

资料来源：[datatune/llm/model_rate_limits.py:1-30](https://github.com/vitalops/datatune/blob/main/datatune/llm/model_rate_limits.py)

## 技术特性

### 语义去重优化

当数据中存在语义重复的记录时，Map操作通过`SemanticDeduplicator`识别并合并：

```mermaid
graph LR
    A[原始数据] --> B[语义去重]
    B --> C[Canonical记录]
    B --> D[重复记录映射]
    C --> E[单次LLM调用]
    D --> F[结果复制]
    E --> G[最终结果]
    F --> G
```

此机制可节省高达50%-90%的API调用成本，同时保持结果准确性。

### 分布式处理

Map操作基于Dask或Ibis实现，支持大规模数据集的分布式处理：

- **Dask后端**：利用Dask DataFrame的分区机制，每个分区独立处理
- **Ibis后端**：支持DuckDB、PostgreSQL、BigQuery等多种数据库后端

## 最佳实践

1. **批量输入优化**：尽量将相关记录放在一起处理，提高去重效果
2. **明确输出字段**：在`output_fields`中清晰指定新列名，便于后续操作
3. **选择合适模型**：简单提取任务可使用gpt-3.5-turbo，复杂推理建议使用gpt-4
4. **结果验证**：首次使用新prompt时建议抽样验证输出质量

## 相关模块

- **Filter操作** (`datatune/core/filter.py`)：基于LLM的过滤功能
- **Agent** (`datatune/agent/agent.py`)：自动规划并执行Map/Filter/dask操作
- **SemanticDeduplicator** (`datatune/core/deduplication.py`)：语义去重功能
- **finalize** (`datatune/core/dask/op.py`)：完成Lazy DataFrame计算

---

<a id='filter-operation'></a>

## Filter操作

### 相关页面

相关主题：[Map操作](#map-operation), [Reduce操作](#reduce-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/filter.py](https://github.com/vitalops/datatune/blob/main/datatune/core/filter.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
</details>

# Filter操作

## 概述

Filter操作是datatune库中用于基于自然语言描述对数据进行行级过滤的核心功能。该操作允许用户使用LLM（大型语言模型）根据语义理解来判断哪些数据行应该被保留或移除，无需编写复杂的条件表达式。

Filter操作的本质是将用户的自然语言过滤条件翻译为布尔判断，通过LLM的语义理解能力实现智能化的数据筛选。系统会在每条数据记录中添加一个`__filter__`字段，值为`True`表示保留该记录，`False`表示移除该记录。

资料来源：[datatune/core/filter.py:1-30]()

## 架构设计

### 模块结构

Filter操作采用后端分发的设计模式，根据输入数据的类型自动选择相应的处理后端。顶层模块`filter.py`负责类型检测和后端路由，具体实现分布在Dask和Ibis两个后端模块中。

```mermaid
graph TD
    A[用户调用 dt.filter] --> B{数据类型检测}
    B -->|Dask DataFrame| C[_filter_dask]
    B -->|Ibis Table| D[_filter_ibis]
    B -->|不支持的类型| E[TypeError]
    
    C --> F[LLM批量推理]
    D --> F
    
    F --> G[解析LLM输出]
    G --> H[生成过滤序列]
    H --> I[执行过滤操作]
```

资料来源：[datatune/core/filter.py:16-29]()

### 后端支持

| 后端类型 | 处理模块 | 支持的数据源 |
|---------|---------|-------------|
| Dask | `datatune/core/dask/filter_dask.py` | Dask DataFrame |
| Ibis | `datatune/core/ibis/filter_ibis.py` | Ibis Table（DuckDB、PostgreSQL、BigQuery等） |

资料来源：[datatune/core/filter.py:18-27]()

## API接口

### 函数签名

```python
def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        # 后端分发逻辑
        pass
    return apply
```

### 参数说明

| 参数 | 类型 | 必填 | 说明 |
|-----|------|------|------|
| `prompt` | str | 是 | 自然语言描述的过滤条件 |
| `input_fields` | List[str] | 否 | 要参与过滤判断的输入列（当前版本中未实际使用） |
| `clusters` | List[Dict] | 否 | 用于去重场景的聚类信息，避免重复调用LLM |

资料来源：[datatune/core/filter.py:12-28]()

### 使用示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 使用自然语言过滤数据
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, df)

result = dt.finalize(filtered)
```

资料来源：[README.md:1-45]()

## 工作流程

### 整体流程

Filter操作的处理流程分为以下几个阶段：

```mermaid
graph LR
    A[序列化输入数据] --> B[构建LLM提示词]
    B --> C[批量调用LLM]
    C --> D[解析LLM输出]
    D --> E[提取__filter__字段]
    E --> F[生成布尔序列]
    F --> G[执行数据过滤]
```

### 提示词构建

系统自动构建包含前缀、用户提示和后缀的完整提示词。

**前缀部分**包含输入数据格式规范：
```
Each answer must be formatted exactly as 'index=<index>|{answer}<endofrow>'.
{answer} must be any requested python literal (e.g. list, dict, string, integer)
eg: for a dict answer: index=1|{'key1': 'value1', 'key2': 2}<endofrow>
Ensure that each response is a valid python literal.
Ensure that strings are enclosed in double quotes.
STRINGS MUST BE ENCLOSED IN DOUBLE QUOTES. DO NOT OUTPUT BARE TEXT
```

**用户提示部分**包含过滤条件：
```
FILTERING CRITERIA:
{prompt}
```

**后缀部分**指定输出格式要求：
```
DECISION: Your response MUST be the entire input record as Python dictionary in the format: index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it.
No explanations or additional text.
ALWAYS STICK TO THE FORMAT index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it.
IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO None
```

资料来源：[datatune/core/dask/filter_dask.py:1-30]()

### LLM响应格式

LLM返回的数据必须包含原始记录信息，并添加`__filter__`布尔字段：

```python
# 示例输入
{"Name": "Laptop", "Price": 999, "Category": "Electronics"}

# LLM期望输出格式
'index=0|{"Name": "Laptop", "Price": 999, "Category": "Electronics", "__filter__": true}<endofrow>'
```

## Dask后端实现

### 去重机制

Dask后端支持基于聚类的去重优化。当提供`clusters`参数时，系统会建立一个从重复ID到规范ID的映射：

```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]
```

对于规范数据行调用LLM进行判断，然后将结果复制到所有重复行：

```python
for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
```

资料来源：[datatune/core/dask/filter_dask.py:25-50]()

### 过滤序列生成

使用`parse_filter_output`函数解析LLM输出：

```python
def parse_filter_output(
    output: Union[str, Exception], err: bool = True
) -> Optional[bool]:
    # 解析LLM返回的字典，提取__filter__字段值
```

资料来源：[datatune/core/dask/filter_dask.py:60-75]()

## Ibis后端实现

### 行号索引

Ibis后端通过`row_number()`函数为表添加行号索引：

```python
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
local_data = indexed_table.select("_ROW_ID_", input_col).execute()
```

资料来源：[datatune/core/ibis/filter_ibis.py:20-25]()

### 数据处理

Ibis实现同样通过AST解析提取过滤决策，并将结果存储在内存表中进行关联：

```python
mapping_df = pd.DataFrame({
    "_ROW_ID_": local_data["_ROW_ID_"].values, 
    output_col: processed_results
})
mapping_table = ibis.memtable(mapping_df)
```

资料来源：[datatune/core/ibis/filter_ibis.py:35-45]()

## 与Agent的集成

Filter操作可以与datatune的Agent功能结合使用，让AI自动判断何时需要使用Filter操作：

```python
agent = dt.Agent(llm)
df = agent.do("Keep only African organizations", df)
```

在Agent的规划模板中，Filter操作被定义为primitive类型：

```python
"filter": textwrap.dedent(
    """\
    filtered = dt.filter(
        prompt="{subprompt}",
        input_fields={input_fields}
    )(llm, df)
    df = filtered
    """
)
```

资料来源：[datatune/agent/agent.py:100-110]()

## 错误处理

| 错误类型 | 处理方式 |
|---------|---------|
| 不支持的数据类型 | 抛出`TypeError`，提示当前支持Dask和Ibis |
| LLM输出解析失败 | 捕获异常，返回空字典或默认值 |
| API调用异常 | 通过异常处理机制传递给上层 |

资料来源：[datatune/core/filter.py:27-28]()

## 最佳实践

### 输入数据准备

确保输入数据包含足够的上下文信息供LLM进行判断。对于涉及多列条件的过滤，应在prompt中明确说明需要参考的列。

### 批量处理

Filter操作支持批量处理，通过设置合理的批次大小来平衡处理速度和API调用频率。

### 去重优化

在处理可能包含重复记录的数据时，使用`clusters`参数可以显著减少LLM API调用次数，提升处理效率。

---

<a id='reduce-operation'></a>

## Reduce操作

### 相关页面

相关主题：[Map操作](#map-operation), [Filter操作](#filter-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/core/reduce.py](https://github.com/vitalops/datatune/blob/main/datatune/core/reduce.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/deduplication.py](https://github.com/vitalops/datatune/blob/main/datatune/core/deduplication.py)
</details>

# Reduce操作

## 概述

Reduce操作是datatune库中用于对数据执行聚合、归约和转换操作的核心模块之一。该模块采用插件化架构设计，允许通过注册机制扩展不同的归约操作类型。`reduce`函数作为统一入口，接收数据帧和操作类型参数，动态调用对应的归约处理器完成数据处理任务。

在datatune的生态系统定位中，Reduce操作与Map、Filter操作共同构成了基于大语言模型的数据处理管道。三者的职责分工如下：

| 操作类型 | 职责 | 输入 | 输出 |
|---------|------|------|------|
| Map | 字段映射与转换 | 单行数据 | 添加新字段的行数据 |
| Filter | 数据过滤 | 单行数据 | 布尔决策（保留/移除） |
| Reduce | 聚合与归约 | 多行数据 | 聚合结果 |

## 核心架构

### 插件注册机制

Reduce模块采用工厂模式和装饰器模式实现插件化架构。核心组件包括全局动作注册表、动作注册装饰器和动作获取函数三个部分。

```mermaid
graph TD
    A[reduce函数] --> B[get_action获取动作类]
    B --> C{查找_ACTIONS注册表}
    C -->|找到| D[实例化动作类]
    C -->|未找到| E[抛出ValueError异常]
    D --> F[调用reducer执行归约]
    F --> G[返回处理结果]
    
    H[register_action装饰器] --> I[将动作类注册到_ACTIONS]
    J[自定义动作类] --> H
    K[内置动作类] --> H
```

### 全局动作注册表

```python
_ACTIONS = {}
```

`_ACTIONS`是一个模块级字典，用作动作类的注册表。键为动作名称字符串，值为对应的动作类。资料来源：[datatune/core/reduce.py:1]()

### 注册装饰器

```python
def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator
```

`register_action`装饰器接收动作名称作为参数，返回一个接受类作为参数的装饰器函数。装饰器执行时将类注册到`_ACTIONS`字典中，并返回原类以保持装饰器链的正常工作。资料来源：[datatune/core/reduce.py:6-12]()

### 动作获取函数

```python
def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")
```

`get_action`函数从注册表中查找指定名称的动作类。若未找到对应动作，抛出`ValueError`异常并提示未知动作名称。资料来源：[datatune/core/reduce.py:14-20]()

## API参考

### reduce函数

```python
def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)
    return reducer(df)
```

**函数签名参数说明：**

| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `df` | `dask.dataframe.DataFrame` | 是 | 输入数据帧 |
| `action` | `str` | 是 | 动作类型名称，用于从注册表获取对应动作类 |
| `**kwargs` | 任意 | 否 | 传递给动作类构造器的关键字参数 |

**返回值：** 返回动作类实例调用后的结果，具体类型取决于具体动作的实现。

资料来源：[datatune/core/reduce.py:23-27]()

## 使用方式

### 基本调用模式

```python
import datatune as dt
import dask.dataframe as dd

# 读取数据
df = dd.read_csv("data.csv")

# 调用reduce操作
result = dt.reduce(df, action="your_action_name", param1=value1, param2=value2)
```

### 在管道中使用

Reduce操作可以与其他datatune操作链式调用，构建完整的数据处理管道：

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")

# 构建处理管道
df = dd.read_csv("raw_data.csv")
mapped = dt.map(prompt="提取产品类别", input_fields=["description"], output_fields=["category"])(llm, df)
filtered = dt.filter(prompt="只保留电子产品", input_fields=["category"])(llm, mapped)
result = dt.reduce(df, action="deduplicate", embedding_model="text-embedding-3-small")
final_result = dt.finalize(result)
```

## 内置归约动作

### SemanticDeduplicator

语义去重动作，用于识别和合并语义相似的记录。该动作使用嵌入向量计算记录间的语义相似度，适用于需要处理近似重复数据的场景。

**嵌入到磁盘功能：**

```python
def embed_column_to_disk(self, df, column, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    tasks = []

    for pid in range(df.npartitions):
        part = df[column].get_partition(pid)
        task = dask.delayed(self._embed_and_write_partition)(
            part,
            pid,
            output_dir,
        )
        tasks.append(task)

    dask.compute(*tasks)
```

资料来源：[datatune/core/deduplication.py:1-100]()

**分区嵌入处理逻辑：**

```python
def _embed_and_write_partition(self, part, partition_id, output_dir):
    pdf = part

    if pdf.empty:
        print("empty partition")
        return 0

    emb_path = f"{output_dir}/embeddings_part_{partition_id}.npy"
    idx_path = f"{output_dir}/index_part_{partition_id}.npy"

    if os.path.exists(emb_path) and os.path.exists(idx_path):
        return 0

    row_index = pdf.index.to_numpy()

    def safe_parse(row):
        try:
            return ast.literal_eval(row)
        except Exception:
            return {}

    dicts = [safe_parse(row) for row in pdf]

    texts = [
        ", ".join(f"{k}: {v}" for k, v in d.items())
        for d in dicts
    ]

    embeddings = []
    for i in range(0, len(texts), 256):
        batch = texts[i:i+256]
        resp = embedding(model=self.embedding_model, input=batch)
        embeddings.extend([item["embedding"] for item in resp["data"]])

    X = np.asarray(embeddings, dtype="float32")
    faiss.normalize_L2(X)

    np.save(f"{output_dir}/embeddings_part_{partition_id}.npy", X)
    np.save(f"{output_dir}/index_part_{partition_id}.npy", row_index)
```

资料来源：[datatune/core/deduplication.py:70-130]()

**FAISS索引构建：**

```python
def build_faiss_index(self, embedding_dir, dim, hnsw_m, ef_search):
    index = faiss.IndexHNSWFlat(
        dim,
        hnsw_m,
        faiss.METRIC_INNER_PRODUCT,
    )
    index.hnsw.efConstruction = 200
```

资料来源：[datatune/core/deduplication.py:140-160]()

### 构造器参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `llm` | `LLM` | 无 | 用于执行去重的语言模型实例 |
| `embedding_model` | `str` | `"text-embedding-3-small"` | 嵌入模型名称 |
| `sim_threshold` | `float` | `0.90` | 相似度阈值，超过此值的记录被视为重复 |
| `top_k` | `int` | `50` | 每个记录查找的最大相似记录数 |
| `hnsw_m` | `int` | `32` | HNSW索引的空间参数M |
| `ef_search` | `int` | `64` | HNSW搜索时的搜索参数ef |
| `return_df` | `bool` | `False` | 是否返回DataFrame格式结果 |

资料来源：[datatune/core/deduplication.py:160-180]()

## 自定义归约动作

用户可以通过`@register_action`装饰器注册自定义归约动作，扩展reduce模块的功能。

### 创建自定义动作示例

```python
from datatune.core.reduce import register_action

@register_action("group_stats")
class GroupStatsReducer:
    def __init__(self, group_by_column, stat_columns):
        self.group_by_column = group_by_column
        self.stat_columns = stat_columns
    
    def __call__(self, df):
        return df.groupby(self.group_by_column)[self.stat_columns].agg(['mean', 'sum', 'count'])
```

### 调用自定义动作

```python
import datatune as dt

result = dt.reduce(df, action="group_stats", group_by_column="category", stat_columns=["price", "quantity"])
```

## 执行流程

```mermaid
sequenceDiagram
    participant User as 用户
    participant Reduce as reduce函数
    participant Registry as 动作注册表
    participant Action as 动作类
    participant Result as 处理结果

    User->>Reduce: reduce(df, action="xxx", **kwargs)
    Reduce->>Registry: get_action("xxx")
    Registry-->>Reduce: 动作类引用
    Reduce->>Action: 实例化 cls(**kwargs)
    Reduce->>Action: reducer(df)
    Action->>Result: 执行归约逻辑
    Result-->>User: 返回处理结果
```

## 与Map和Filter的协作

在datatune的整体架构中，Reduce操作通常与其他两种操作配合使用。Map操作用于生成新的列字段，Filter操作用于数据筛选，Reduce操作用于聚合和去重。

```mermaid
graph LR
    A[原始数据] --> B[Map操作]
    B --> C[新增列数据]
    C --> D[Filter操作]
    D --> E[筛选后数据]
    E --> F[Reduce操作]
    F --> G[聚合/去重结果]
    G --> H[finalize]
    H --> I[计算结果]
```

### 工作流程示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Step 1: 使用Map提取类别信息
mapped = dt.map(
    prompt="从产品描述中提取主要类别",
    output_fields=["Category"],
    input_fields=["Description", "Name"]
)(llm, df)

# Step 2: 使用Filter筛选特定类别
filtered = dt.filter(
    prompt="只保留电子产品",
    input_fields=["Category"]
)(llm, mapped)

# Step 3: 使用Reduce进行语义去重
deduplicated = dt.reduce(
    df=dt.finalize(filtered),
    action="deduplicate",
    embedding_model="text-embedding-3-small"
)

# Step 4: 最终计算输出
result = deduplicated.compute()
```

资料来源：[datatune/__init__.py:1-10]()

## 注意事项

1. **动作注册时机**：自定义动作需要在调用`reduce`函数之前完成注册，确保动作类已在`_ACTIONS`注册表中可用。

2. **参数传递**：`reduce`函数使用关键字参数`**kwargs`将配置传递给动作类构造器，调用者需确保参数名与动作类构造器参数匹配。

3. **异常处理**：当指定的动作名称不存在时，`get_action`函数会抛出`ValueError`异常。应用程序应做好异常捕获处理。

4. **Dask集成**：Reduce操作基于Dask DataFrame实现，支持大规模数据的分布式处理。输入数据应为`dask.dataframe.DataFrame`类型。资料来源：[datatune/core/reduce.py:1]()

5. **分区处理**：在处理大规模数据时，系统会按分区逐个处理，用户应合理配置Dask集群或本地并行参数以优化性能。

---

<a id='agent-system'></a>

## Agent系统

### 相关页面

相关主题：[LLM集成](#llm-integration), [Map操作](#map-operation), [Filter操作](#filter-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/__init__.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
</details>

# Agent系统

## 概述

Agent系统是Datatune的核心智能组件，它允许用户使用自然语言描述数据处理目标，系统自动解析用户意图，生成并执行数据转换计划。该系统结合了大型语言模型（LLM）的推理能力与Dask分布式计算框架，能够处理复杂的多步骤数据转换任务，无需用户手动编写代码。

Agent的核心设计理念是：**用户描述期望的结果，系统自动决定使用何种操作以及如何组合这些操作**。这使得非技术用户也能完成复杂的数据处理任务，同时保持足够的灵活性供高级用户使用。

资料来源：[datatune/agent/__init__.py:1-30]()

## 架构设计

### 系统组件

Agent系统由以下核心组件构成：

| 组件 | 文件位置 | 职责 |
|------|---------|------|
| Agent | `datatune/agent/agent.py` | 主要执行引擎，负责计划生成与执行 |
| Agent基类 | `datatune/agent/__init__.py` | 定义系统提示词和抽象接口 |
| LLM | `datatune/llm/llm.py` | 提供多后端LLM调用能力 |
| Map原语 | `datatune/core/dask/map_dask.py` | 基于LLM的列映射转换 |
| Filter原语 | `datatune/core/dask/filter_dask.py` | 基于LLM的行过滤 |

### 数据流架构

```mermaid
graph TD
    A[用户自然语言目标] --> B[Agent.do方法]
    B --> C[LLM生成计划JSON]
    C --> D[计划验证]
    D --> E{计划类型}
    E -->|dask| F[Dask操作模板]
    E -->|primitive| G[Map/Filter原语]
    F --> H[代码执行引擎]
    G --> H
    H --> I[错误处理]
    I -->|成功| J[最终结果]
    I -->|失败| K[错误恢复]
    K --> C
```

Agent接收用户的自然语言指令后，首先调用LLM生成结构化的执行计划。计划由一系列步骤组成，每个步骤可以是Dask原生操作或LLM驱动的原语操作。

资料来源：[datatune/agent/agent.py:1-50]()

## 核心类设计

### Agent类

`Agent`是系统的主要执行类，封装了所有数据处理逻辑。

```python
class Agent:
    def __init__(self, llm: LLM, verbose: bool = False):
        self.llm = llm
        self.history: List[Dict[str, Any]] = []
        self.verbose = verbose
```

**初始化参数：**

| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `llm` | `LLM` | 是 | LLM实例，用于生成计划和执行原语 |
| `verbose` | `bool` | 否 | 设为True时启用DEBUG级别日志，默认False |

**主要方法：**

| 方法 | 返回值 | 说明 |
|------|--------|------|
| `do(goal: str, df: dd.DataFrame)` | `None \| str` | 执行自然语言目标 |
| `_generate_plan(goal)` | `List[Dict]` | 生成执行计划 |
| `_execute_plan(plan)` | `Tuple[None, int] \| Tuple[str, int]` | 执行完整计划 |
| `_execute_step(step)` | `None \| str` | 执行单个步骤 |
| `log_primitive(df, message)` | `dd.DataFrame` | 记录原语执行状态 |

资料来源：[datatune/agent/agent.py:50-80]()

### Agent基类

```python
class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    ..."""
```

基类定义了Agent的系统提示词，明确了可用的运行时环境：Python内置库、pandas、numpy、dask以及datatune库。

资料来源：[datatune/agent/__init__.py:1-30]()

## 执行计划模型

### 计划结构

执行计划是一个JSON数组，每个元素代表一个独立的执行步骤：

```json
[
  {
    "type": "dask" | "primitive",
    "operation": "操作名称",
    "params": { ... },
    "subprompt": "LLM提示词",
    "input_fields": ["列名"],
    "output_fields": ["新列名"]
  }
]
```

### 步骤类型

Agent支持两种类型的操作步骤：

| 类型 | 说明 | 示例操作 |
|------|------|----------|
| `dask` | 直接的Dask DataFrame操作 | `add_column`, `group_by`, `rename_columns` |
| `primitive` | LLM驱动的原语操作 | `Map`, `Filter` |

### Dask操作模板

系统预定义了丰富的Dask操作模板：

| 操作名称 | 功能描述 | 关键参数 |
|----------|----------|----------|
| `add_column` | 添加新列（基于表达式） | `column`, `expr` |
| `apply_function` | 对列应用函数 | `column`, `func` |
| `rename_columns` | 重命名列 | `mapping` |
| `astype_column` | 更改列数据类型 | `column`, `dtype` |

资料来源：[datatune/agent/agent.py:80-150]()

## 原语系统

### Map原语

`Map`原语使用LLM根据自然语言提示词从现有列生成新列。它能够处理语义提取、分类、解释等复杂转换任务。

**工作流程：**

1. 序列化输入列数据为字典格式
2. 将数据分批发送给LLM
3. LLM返回包含新字段的字典
4. 将结果合并回DataFrame

```python
map = dt.Map(prompt="Extract categories from the description")
mapped_df = map(llm, df)
```

资料来源：[datatune/core/dask/map_dask.py:1-60]()

### Filter原语

`Filter`原语使用LLM根据自然语言条件过滤DataFrame行。

**输出格式规范：**

```
index=<row_index>|{...字典内容..., '__filter__': True|False}<endofrow>
```

- `True`：保留该行
- `False`：移除该行

资料来源：[datatune/core/dask/filter_dask.py:1-50]()

## LLM后端支持

系统通过统一的`LLM`接口支持多种语言模型提供商：

```mermaid
graph LR
    A[LLM基类] --> B[OpenAI]
    A --> C[Ollama]
    A --> D[VLLM]
    A --> E[Azure]
```

### 支持的模型

| 提供商 | 模型标识 | 默认模型 |
|--------|----------|----------|
| OpenAI | `openai/<model>` | `gpt-3.5-turbo` |
| Ollama | `ollama_chat/<model>` | `gemma3:4b` |
| VLLM | `openai/<model>` | - |
| Azure | `azure/<model>` | - |

### 速率限制

系统内置了主流模型的速率限制配置：

| 模型 | TPM (每分钟Token数) | RPM (每分钟请求数) |
|------|---------------------|-------------------|
| `gpt-3.5-turbo` | 200,000 | 500 |
| `gpt-4` | 10,000 | 500 |
| `gpt-4-turbo` | 30,000 | 500 |
| `gpt-4o` | 30,000 | 500 |

资料来源：[datatune/llm/model_rate_limits.py:1-80]()

## 使用示例

### 基础用法

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化LLM和Agent
llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

# 加载数据
df = dd.read_csv("products.csv")

# 使用自然语言描述任务
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
```

### 详细示例

```python
# 创建Agent实例（启用详细日志）
agent = dt.Agent(llm, verbose=True)

# 执行复杂的多步骤任务
goal = """
1. Create a 'PriceCategory' column: 'high' if price > 100, 'low' otherwise
2. Keep only rows where PriceCategory is 'high'
3. Add 'DiscountedPrice' column: Price * 0.9
"""

df = agent.do(goal, df)
result = dt.finalize(df)
```

### 单独使用Map和Filter

```python
# 仅使用Map原语
mapped = dt.map(
    prompt="Extract categories from the description and name",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 仅使用Filter原语
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

## 执行流程详解

### 计划生成

```mermaid
sequenceDiagram
    participant User as 用户
    participant Agent as Agent
    participant LLM as LLM
    participant Schema as 数据Schema

    User->>Agent: do(goal, df)
    Agent->>Schema: 获取DataFrame结构
    Agent->>LLM: 发送persona_prompt + schema_prompt
    LLM->>Agent: 返回JSON计划
    Agent->>Agent: 验证计划格式
```

Agent生成计划时，会向LLM传递：
1. **Persona提示词**：定义LLM的角色、能力边界和输出格式
2. **Schema提示词**：DataFrame的列结构和数据类型
3. **用户目标**：自然语言描述的期望结果

### 计划执行

执行过程采用流式日志记录：

```python
# 每个步骤的执行流程
df = df.map_partitions(log_primitive, "🔄 开始步骤 X/Y", meta=df._meta)
step_num += 1
# 执行具体操作
df = df.map_partitions(log_primitive, "📍 完成步骤 X/Y", meta=df._meta)
```

最终执行完成后，调用`dt.finalize()`和`compute()`将Dask DataFrame转换为实际的Pandas DataFrame。

资料来源：[datatune/agent/agent.py:100-140]()

## 错误处理与恢复

### 错误检测机制

系统在每个步骤执行后都会检测异常：

```python
try:
    # 执行步骤
    ...
except Exception as e:
    error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
    return error_msg
```

### 错误恢复流程

当步骤执行失败时，系统会：
1. 记录错误信息和失败的步骤详情
2. 生成包含错误上下文的修复提示词
3. 请求LLM生成修正后的计划

```python
def get_error_prompt(self, error_msg: str, failed_step: Dict) -> str:
    error_prompt = f"""
    The previous code execution failed with the following error:
    Error: {error_msg}
    Failed step: {failed_step}
    Provide the json plan with the corrected step.
    """
```

## 配置与调优

### 初始化配置

```python
# 完整配置示例
agent = dt.Agent(
    llm=llm,
    verbose=True  # 启用DEBUG日志
)

# 自定义LLM配置
llm = OpenAI(
    model_name="gpt-4-turbo",
    api_key="your-api-key",
    rpm=1000,  # 自定义RPM限制
    tpm=50000  # 自定义TPM限制
)
```

### 日志级别控制

```python
import logging

# 启用详细日志
agent = dt.Agent(llm, verbose=True)
logger.setLevel(logging.DEBUG)

# 禁用详细日志
agent = dt.Agent(llm, verbose=False)
logger.setLevel(logging.INFO)
```

## 最佳实践

1. **明确指定列名**：在目标描述中明确提及要操作的列名，有助于Agent生成准确的操作计划

2. **分解复杂任务**：对于非常复杂的任务，可以考虑分步执行以便于调试

3. **选择合适的模型**：简单的数据转换可使用`gpt-3.5-turbo`，复杂的语义任务建议使用`gpt-4-turbo`或`gpt-4o`

4. **监控执行过程**：通过设置`verbose=True`监控Agent的决策过程，便于理解系统行为

5. **处理重复数据**：系统内置了基于语义相似度的去重功能，可用于处理LLM输出中的重复结果

---

<a id='llm-integration'></a>

## LLM集成

### 相关页面

相关主题：[Agent系统](#agent-system)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/llm/model_rate_limits.py](https://github.com/vitalops/datatuple/blob/main/datatune/llm/model_rate_limits.py)
</details>

# LLM集成

## 概述

datatune 的 LLM 集成模块提供了统一的接口来连接各种大语言模型（Large Language Model）提供者。该模块封装了不同 LLM 服务商的 API 调用逻辑，使上层应用能够以一致的方式使用不同的模型，而无需关心底层实现的差异。

主要功能包括：

- **统一抽象**：通过基类 `LLM` 提供一致的接口设计
- **多提供商支持**：支持 OpenAI、Ollama、VLLM、Azure、Mistral、Huggingface 等主流 LLM 服务
- **速率限制管理**：内置对各模型速率限制的支持，防止 API 调用超出限制
- **Token 计数与批处理**：提供 token 计数和批量请求功能，优化 API 使用效率

## 架构设计

### 类继承结构

```mermaid
graph TD
    LLMBase[LLM 基类] --> Ollama
    LLMBase --> OpenAI
    LLMBase --> VLLM
    LLMBase --> Azure
    LLMBase --> Gemini
    LLMBase --> Mistral
    LLMBase --> Huggingface
    
    LLMBase --> 完成方法[_completion]
    LLMBase --> 批处理方法[_create_batched_prompts]
    LLMBase --> Token限制[MAX_RPM, MAX_TPM]
```

### 核心类说明

#### LLM 基类

`LLM` 类是所有 LLM 提供者的基类，定义通用接口和默认行为：

```python
class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式: "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]  # 如 "gpt-3.5-turbo"
```

**资料来源**：[datatune/llm/llm.py:27-32]()

#### 速率限制

每个模型实例具有两个关键属性：

| 属性 | 说明 | 来源 |
|------|------|------|
| `MAX_RPM` | 每分钟最大请求数 (Requests Per Minute) | model_rate_limits |
| `MAX_TPM` | 每分钟最大 Token 数 (Tokens Per Minute) | model_rate_limits |
| `max_tokens` | 单次请求最大 Token 数 | `get_max_tokens()` |

**资料来源**：[datatune/llm/llm.py:62-64]()

## 支持的 LLM 提供者

### 1. OpenAI

OpenAI 模型使用 `openai/` 前缀标识：

```python
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo", api_key="your-key")
llm = OpenAI(model_name="gpt-4o", api_key="your-key")
```

**资料来源**：[datatune/llm/llm.py:64-73]()

### 2. Ollama (本地部署)

适用于本地运行的 LLM，通过 HTTP API 连接：

```python
from datatune.llm.llm import Ollama

llm = Ollama()  # 默认 gemma3:4b，localhost:11434
llm = Ollama(model_name="llama3:8b", api_base="http://localhost:11434")
```

**资料来源**：[datatune/llm/llm.py:53-61]()

### 3. VLLM

VLLM 服务器连接，支持 OpenAI 兼容 API：

```python
from datatune.llm.llm import VLLM

llm = VLLM(
    model_name="meta-llama/Llama-3-8B-Instruct",
    api_base="http://localhost:8000/v1"
)
```

**资料来源**：[datatune/llm/llm.py:75-91]()

### 4. Azure OpenAI

Azure 托管的 OpenAI 模型：

```python
from datatune.llm.llm import Azure

llm = Azure(
    model_name="gpt-3.5-turbo",
    api_key="your-key",
    api_base="https://xxx.openai.azure.com",
    api_version="2024-02-01"
)
```

**资料来源**：[datatune/llm/llm.py:93-106]()

### 5. Gemini

Google Gemini 模型集成：

```python
from datatune.llm.llm import Gemini

llm = Gemini(model_name="gemini-1.5-pro", api_key="your-key")
```

### 6. Mistral

Mistral AI 模型：

```python
from datatune.llm.llm import Mistral

llm = Mistral(model_name="mistral-tiny")
llm = Mistral(model_name="mistral-large-latest")
```

**资料来源**：[datatune/llm/llm.py:130-148]()

### 7. Huggingface

Hugging Face 推理端点：

```python
from datatune.llm.llm import Huggingface

llm = Huggingface(model_name="meta-llama/Llama-3-8B-Instruct")
```

**资料来源**：[datatune/llm/llm.py:150-172]()

## 速率限制配置

### 预定义模型限制

`model_rate_limits.py` 文件包含各模型的默认速率限制：

| 模型系列 | TPM (Token/分钟) | RPM (请求/分钟) |
|----------|------------------|-----------------|
| GPT-3.5-Turbo | 200,000 | 500 |
| GPT-4 | 10,000 - 30,000 | 500 |
| GPT-4.1 | 30,000 - 200,000 | 500 |
| GPT-4o | 30,000 | 500 |
| GPT-4.5-Preview | 125,000 | 1,000 |

**资料来源**：[datatune/llm/model_rate_limits.py:1-85]()

### 自定义速率限制

可以在初始化时覆盖默认限制：

```python
from datatune.llm.llm import OpenAI

# 自定义 RPM 和 TPM
llm = OpenAI(
    model_name="gpt-3.5-turbo",
    rpm=1000,
    tpm=300000
)
```

**资料来源**：[datatune/llm/llm.py:42-51]()

## 使用方式

### 基本调用

所有 LLM 实例可作为可调用对象使用：

```python
llm = OpenAI(model_name="gpt-3.5-turbo", api_key="sk-xxx")

# 单次调用
result = llm("Hello, how are you?")
```

### 批处理调用

对于大量数据处理，系统支持自动批处理：

```python
prompts = ["Process row 1", "Process row 2", "Process row 3"]
results = llm(prompts, prefix="Instruction: ", suffix="End.")
```

**资料来源**：[datatune/llm/llm.py:85-105]()

### 在 datatune 中的应用

datatune 的核心功能使用 LLM 集成进行数据转换：

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")

# 映射操作
mapped = dt.map(
    prompt="Extract categories from description",
    output_fields=["Category"],
    input_fields=["Description"]
)(llm, df)

# 过滤操作
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

**资料来源**：[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

## 核心方法

### `_completion`

执行单次补全请求：

```python
def _completion(self, prompt: str) -> Union[str, Exception]:
    messages = [{"role": "user", "content": prompt}]
    from litellm import completion
    
    response = completion(model=self.model_name, messages=messages, **self.kwargs)
    
    if isinstance(response, Exception):
        return response
    return response["choices"][0]["message"]["content"]
```

**资料来源**：[datatune/llm/llm.py:69-77]()

### `get_max_tokens`

获取模型支持的最大 token 数：

```python
def get_max_tokens(self) -> int:
    return get_max_tokens(self._base_model_name)
```

### `_create_batched_prompts`

创建批处理提示词，用于优化多行数据处理：

```python
def _create_batched_prompts(
    self,
    input_rows: List[str],
    user_batch_prefix: str,
    prompt_per_row: str,
    batch_suffix: str = None
) -> List[str]:
    # 计算 token 限制并分批
    # 返回批处理后的提示词列表
```

**资料来源**：[datatune/llm/llm.py:80-107]()

## 依赖说明

LLM 模块依赖 `litellm` 库实现底层 API 调用：

```python
from litellm import get_max_tokens, token_counter, batch_completion
```

litellm 提供了对 100+ LLM 模型的统一接口，支持：
- OpenAI 系列
- Azure OpenAI
- Anthropic
- Google Vertex AI
- AWS Bedrock
- 本地部署模型 (Ollama, VLLM, TGI)
- Hugging Face 推理端点

**资料来源**：[datatune/llm/llm.py:10-12]()

## 最佳实践

### 1. 选择合适的模型

| 场景 | 推荐模型 | 理由 |
|------|----------|------|
| 快速原型 | gpt-3.5-turbo | 成本低、速度快 |
| 高质量输出 | gpt-4o | 性能与成本平衡 |
| 最大精度 | gpt-4.1 | 最新最强模型 |
| 本地部署 | Ollama/VLLM | 隐私保护、无 API 成本 |

### 2. 配置合理的速率限制

根据实际 API 配额设置：

```python
# 根据 API tier 设置
llm = OpenAI(
    model_name="gpt-4o",
    rpm=500,   # 根据配额调整
    tpm=30000  # 根据配额调整
)
```

### 3. 使用批处理优化

对于大量数据处理，使用内置批处理功能避免单次调用限制：

```python
# 自动分批处理
results = llm(
    input_rows,
    prefix="任务: ",
    suffix="请以指定格式输出",
    optimized=True  # 启用优化模式
)
```

## 导出接口

模块入口文件提供统一导出：

```python
from datatune.llm import *
# 导出: LLM, Ollama, OpenAI, VLLM, Azure, Gemini, Mistral, Huggingface
```

**资料来源**：[datatune/llm/__init__.py]()

## 总结

datatune 的 LLM 集成模块通过抽象化的设计，为用户提供了便捷的模型切换能力和统一的调用接口。开发者可以轻松地在不同 LLM 提供者之间切换，同时享受内置的速率限制保护和批处理优化功能。

---

<a id='datasource'></a>

## 数据源支持

### 相关页面

相关主题：[系统架构](#architecture), [Map操作](#map-operation), [Filter操作](#filter-operation)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [datatune/llm/llm.py](https://github.com/vitalops/datatune/blob/main/datatune/llm/llm.py)
- [datatune/agent/agent.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/agent.py)
- [datatune/core/dask/filter_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/filter_dask.py)
- [datatune/core/dask/map_dask.py](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)
- [datatune/core/ibis/filter_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/filter_ibis.py)
- [datatune/core/ibis/map_ibis.py](https://github.com/vitalops/datatune/blob/main/datatune/core/ibis/map_ibis.py)
- [datatune/agent/__init__.py](https://github.com/vitalops/datatune/blob/main/datatune/agent/__init__.py)
</details>

# 数据源支持

## 概述

Datatune 是一个数据处理框架，通过集成大语言模型（LLM）来实现智能化的数据转换和清洗。该框架的核心设计理念是支持多种数据源，使得用户能够在不同的计算后端上执行 `map`、`filter` 等数据转换操作。

数据源支持架构采用插件式设计，通过抽象层将具体的数据处理逻辑与底层数据源解耦。当前框架主要支持两大类数据源：**Dask**（用于大规模并行计算）和 **Ibis**（支持多种 SQL 后端）。

资料来源：[datatune/agent/__init__.py:1-5]()

## 支持的数据源类型

Datatune 支持的数据源可分为以下几类：

| 数据源类型 | 底层引擎 | 特点 | 适用场景 |
|-----------|---------|------|---------|
| Dask DataFrame | Dask | 延迟计算、分布式执行、内存优化 | 大规模单机或集群数据处理 |
| DuckDB | Ibis + DuckDB | 嵌入式 OLAP、零配置 | 快速分析、交互式查询 |
| PostgreSQL | Ibis + PostgreSQL | 关系型数据库、企业级支持 | 生产环境、事务处理 |
| BigQuery | Ibis + BigQuery | 云原生、超大规模 | 云端大数据分析 |

资料来源：[README.md:1-50]()

## 架构设计

### 核心抽象层

Datatune 的数据源支持通过统一的抽象接口实现，主要包含以下核心组件：

```python
# 抽象基类定义
class Agent(ABC):
    system_prompt: str = """You are Datatune Agent..."""
```

代理（Agent）类作为核心抽象，负责协调 LLM 与数据源的交互。不同的数据源实现继承该基类并实现具体的数据操作逻辑。

资料来源：[datatune/agent/__init__.py:1-10]()

### 数据流架构

```mermaid
graph TD
    A[用户代码] --> B[Datatune API]
    B --> C{数据源类型}
    C -->|Dask| D[core/dask/]
    C -->|Ibis| E[core/ibis/]
    D --> F[map_dask.py / filter_dask.py]
    E --> G[map_ibis.py / filter_ibis.py]
    F --> H[LLM 调用]
    G --> H
    H --> I[结果聚合]
    I --> J[compute / execute]
```

## Dask 数据源

### 概述

Dask 是 Datatune 默认支持的数据源，提供大规模并行计算能力。通过 Dask DataFrame，用户可以处理超出内存限制的数据集。

### 主要操作

Dask 数据源支持两类核心操作：

#### Map 操作

用于从现有列生成新列，常用于语义提取、分类、自然语言推理等场景。

```python
import datatune as dt
import dask.dataframe as dd

df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)
```

资料来源：[README.md:1-30]()

#### Filter 操作

用于根据条件过滤行，基于 LLM 理解保留符合条件的记录。

```python
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)
```

资料来源：[README.md:1-35]()

### Dask 实现细节

#### 输出格式规范

Map 和 Filter 操作要求 LLM 返回特定格式的输出：

```
index=<row_index>|{key1: value1, key2: value2, ...}<endofrow>
```

- 字符串必须用双引号包裹
- 缺失值设置为 `None` 或 `null`
- 每行输出必须以 `<endofrow>` 结尾

资料来源：[datatune/core/dask/map_dask.py:1-30]()

#### 重复数据处理

Dask 实现包含智能去重机制，通过 `dup_to_canon` 映射处理重复数据：

```python
dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}
```

只有规范索引的数据会被发送到 LLM，重复项的结果会自动复制。

资料来源：[datatune/core/dask/map_dask.py:1-45]()

## Ibis 数据源

### 概述

Ibis 是一个 Python 抽象层，支持多种 SQL 后端。通过 Ibis，Datatune 可以连接到 DuckDB、PostgreSQL、BigQuery 等数据库执行向量化查询。

### 连接方式

#### DuckDB

```python
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
```

资料来源：[README.md:1-60]()

#### PostgreSQL

```python
import ibis
con = ibis.postgres.connect(...)
table = con.table("my_table")
```

#### BigQuery

```python
import ibis
con = ibis.bigquery.connect(...)
table = con.table("my_table")
```

### Ibis 实现特点

#### 行号索引

Ibis 实现使用 `ibis.row_number()` 生成行索引：

```python
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
local_data = indexed_table.select("_ROW_ID_", input_col).execute()
```

资料来源：[datatune/core/ibis/filter_ibis.py:1-40]()

#### 输出解析

Ibis 后端将 LLM 输出解析为布尔值用于过滤：

```python
for res in results:
    try:
        d = ast.literal_eval(str(res).strip())
        if d:
            last_key = list(d.keys())[-1]
            flag = d[last_key]
            out_bools.append(flag)
    except Exception:
        out_bools.append(False)
```

资料来源：[datatune/core/ibis/filter_ibis.py:1-60]()

## Agent 自动化

### 功能概述

Datatune Agent 可以自动识别数据转换需求，智能选择合适的操作类型并执行。

```python
import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
```

资料来源：[README.md:1-45]()

### 计划生成规则

Agent 生成执行计划时遵循以下规则：

1. **Dask 操作选择**：如果任务涉及添加列、分组、移位、按行或按列应用函数，使用 `type: "dask"`
2. **Primitive 操作选择**：如果需要理解自然语言、提取或解释文本内容、进行语义推理，使用 `type: "primitive"`
3. **简化优先**：能用 Dask 完成的步骤优先使用 Dask，只有 Dask 无法实现时才使用 primitives

资料来源：[datatune/agent/agent.py:1-100]()

### 支持的操作类型

| 操作类型 | 说明 | 适用场景 |
|---------|------|---------|
| add_column | 从表达式创建新列 | 数值计算、日期提取 |
| apply_function | 对单列应用函数 | 逐元素转换 |
| rename_columns | 重命名列 | 列名规范化 |
| astype_column | 更改列数据类型 | 类型转换 |
| group_by_agg | 分组聚合 | 统计计算 |
| conditional_column | 条件列 | 基于条件的新列 |

资料来源：[datatune/agent/agent.py:1-150]()

## LLM 集成

### 支持的模型提供商

Datatune 通过 litellm 库支持多种 LLM 提供商：

| 提供商 | 类名 | 默认模型 | 说明 |
|-------|------|---------|------|
| OpenAI | `OpenAI` | gpt-3.5-turbo | OpenAI GPT 系列 |
| Ollama | `Ollama` | gemma3:4b | 本地模型支持 |
| vLLM | `VLLM` | 自定义 | 高性能推理 |
| Azure | `Azure` | 自定义 | Azure OpenAI 服务 |

资料来源：[datatune/llm/llm.py:1-100]()

### 模型初始化示例

```python
# OpenAI
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")

# Ollama (本地)
from datatune.llm.llm import Ollama
llm = Ollama()

# Azure
from datatune.llm.llm import Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)
```

资料来源：[README.md:1-70]()

### 速率限制配置

框架内置了主要模型的速率限制配置：

```python
model_rate_limits = {
    "gpt-3.5-turbo": {
        "tpm": 200_000,  # tokens per minute
        "rpm": 500,      # requests per minute
    },
    "gpt-4": {
        "tpm": 10_000,
        "rpm": 500,
    },
    # ...
}
```

资料来源：[datatune/llm/model_rate_limits.py:1-50]()

## 使用工作流

### 完整数据处理流程

```mermaid
graph LR
    A[加载数据] --> B[定义 LLM]
    B --> C[应用 Map]
    C --> D[应用 Filter]
    D --> E[执行 Finalize]
    E --> F[保存结果]
```

### 代码示例

```python
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 映射操作
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 过滤操作
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 最终计算并保存
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
```

资料来源：[README.md:1-40]()

## 性能优化

### 去重处理

对于包含重复行的数据集，框架采用智能去重策略：

1. 使用 embedding 模型识别语义相似的记录
2. 将相似记录聚类，只对规范记录调用 LLM
3. 将结果自动复制到同簇的其他记录

```python
class Deduplicator:
    def __init__(
        self,
        embedding_model: str = "text-embedding-3-small",
        sim_threshold: float = 0.90,
        top_k: int = 50,
    ):
        # ...
```

资料来源：[datatune/core/deduplication.py:1-30]()

### 分区处理

Dask 数据源支持分区并行处理：

```python
for pid in range(df.npartitions):
    part = df[column].get_partition(pid)
    task = dask.delayed(self._embed_and_write_partition)(part, pid, output_dir)
```

资料来源：[datatune/core/deduplication.py:1-80]()

## 注意事项

1. **数据量控制**：LLM 调用按批次进行，需注意 token 限制
2. **输出格式**：严格遵循 `index=<index>|{...}<endofrow>` 格式
3. **字符串转义**：输出必须使用双引号，避免裸文本
4. **错误处理**：单个记录解析失败不影响整体流程
5. **资源限制**：生产环境建议配置合理的 `tpm` 和 `rpm` 参数

---

---

## Doramagic 踩坑日志

项目：vitalops/datatune

摘要：发现 7 个潜在踩坑项，其中 0 个为 high/blocking；最高优先级：能力坑 - 能力判断依赖假设。

## 1. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | README/documentation is current enough for a first validation pass.

## 2. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | last_activity_observed missing

## 3. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

## 4. 安全/权限坑 · 存在安全注意事项

- 严重度：medium
- 证据强度：source_linked
- 发现：No sandbox install has been executed yet; downstream must verify before user use.
- 对用户的影响：用户安装前需要知道权限边界和敏感操作。
- 建议检查：转成明确权限清单和安全审查提示。
- 防护动作：安全注意事项必须面向用户前置展示。
- 证据：risks.safety_notes | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | No sandbox install has been executed yet; downstream must verify before user use.

## 5. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

## 6. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | issue_or_pr_quality=unknown

## 7. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | release_recency=unknown

<!-- canonical_name: vitalops/datatune; human_manual_source: deepwiki_human_wiki -->
