Doramagic 项目包 · 项目说明书

datatune 项目

生成时间:2026-05-14 07:26:20 UTC

Datatune简介

Datatune是一个基于大语言模型(LLM)的数据转换框架,它允许用户使用自然语言描述来对结构化数据进行转换、映射和过滤操作。该框架支持Dask和Ibis两种后端计算引擎,能够处理大规模数据集,并通过多种LLM提供商(如OpenAI、Ollama、Azure等)实现智能化的数据处理。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 2.1 组件层次

继续阅读本节完整说明和来源证据。

章节 2.2 核心模块说明

继续阅读本节完整说明和来源证据。

章节 3.1 支持的LLM提供商

继续阅读本节完整说明和来源证据。

概述

Datatune是一个基于大语言模型(LLM)的数据转换框架,它允许用户使用自然语言描述来对结构化数据进行转换、映射和过滤操作。该框架支持Dask和Ibis两种后端计算引擎,能够处理大规模数据集,并通过多种LLM提供商(如OpenAI、Ollama、Azure等)实现智能化的数据处理。

资料来源:README.md:1-20

核心架构

Datatune的架构主要由以下几个核心组件构成:

2.1 组件层次

graph TD
    A[用户接口层] --> B[Agent智能代理]
    A --> C[Primitive原语操作]
    B --> C
    C --> D[Map映射操作]
    C --> E[Filter过滤操作]
    D --> F[数据后端]
    E --> F
    F --> G[Dask后端]
    F --> H[Ibis后端]
    G --> I[DuckDB/PostgreSQL/BigQuery]
    H --> I
    F --> J[LLM调用层]
    J --> K[OpenAI]
    J --> L[Ollama]
    J --> M[Azure]
    J --> N[Mistral]
    J --> O[Huggingface]
    J --> P[VLLM]

2.2 核心模块说明

模块路径功能说明
datatune.llm.llmLLM基类和多种LLM提供商实现
datatune.agent.agentAgent智能代理,支持自动规划数据转换流程
datatune.core.dask.map_daskDask后端的Map映射操作实现
datatune.core.dask.filter_daskDask后端的Filter过滤操作实现
datatune.core.ibis.map_ibisIbis后端的Map映射操作实现
datatune.core.ibis.filter_ibisIbis后端的Filter过滤操作实现
datatune.core.deduplication数据去重功能,支持基于嵌入向量的相似度检测

资料来源:datatune/llm/llm.py:1-150datatune/agent/agent.py:1-100

LLM支持

3.1 支持的LLM提供商

Datatune通过litellm库统一封装了多种LLM提供商,用户可以根据需求选择合适的模型:

资料来源:datatune/llm/llm.py:45-120

提供商类名默认模型特点
OpenAIOpenAIgpt-3.5-turbo云端API,需提供api_key
OllamaOllamagemma3:4b本地部署,支持gemma、llama等模型
AzureAzuregpt-3.5-turboAzure OpenAI服务
MistralMistralmistral-tinyMistral AI模型
HuggingfaceHuggingface-Hugging Face推理端点
VLLMVLLM用户指定高性能本地推理服务

3.2 模型速率限制

框架内置了主流模型的速率限制配置,包括每分钟请求数(RPM)和每分钟令牌数(TPM):

资料来源:datatune/llm/model_rate_limits.py:1-100

模型RPMTPM
gpt-3.5-turbo500200,000
gpt-450010,000
gpt-4-turbo50030,000
gpt-4o50030,000
gpt-4.1-mini500200,000
gpt-4.1-nano500200,000

3.3 LLM基类实现

class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式为 "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]

所有LLM类都继承自基类LLM,通过litellm库实现统一的调用接口,并支持自定义速率限制参数。

资料来源:datatune/llm/llm.py:45-70

数据处理原语

4.1 Map映射操作

Map操作用于从现有数据创建新的列,通过LLM理解并应用转换逻辑:

graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM批量处理]
    C --> D[解析LLM输出]
    D --> E[反序列化到新列]
    E --> F[输出DataFrame]

Dask实现流程:

  1. 序列化输入列数据
  2. 检测重复记录并建立映射关系
  3. 仅对主记录调用LLM
  4. 复制结果到重复记录
  5. 将LLM输出反序列化为新列

资料来源:datatune/core/dask/map_dask.py:1-80datatune/core/ibis/map_ibis.py:1-60

4.2 Filter过滤操作

Filter操作用于根据条件移除不满足要求的行:

graph LR
    A[输入DataFrame] --> B[LLM决策判断]
    B --> C[解析过滤决策]
    C --> D[应用过滤条件]
    D --> E[输出过滤后DataFrame]

输出格式要求:

LLM返回的过滤决策必须遵循特定格式:

index=<row_index>|{key1: value1, ..., '__filter__': True/False}<endofrow>

资料来源:datatune/core/dask/filter_dask.py:1-50datatune/core/ibis/filter_ibis.py:1-50

4.3 去重功能

Datatune提供了基于嵌入向量的智能去重功能:

graph TD
    A[原始数据] --> B[嵌入向量生成]
    B --> C[FAISS索引构建]
    C --> D[HNSW相似度搜索]
    D --> E[聚类分组]
    E --> F[主记录标识]

核心参数:

参数说明默认值
embedding_model嵌入模型名称text-embedding-3-small
sim_threshold相似度阈值0.90
top_kTop-K近邻数50
hnsw_mHNSW图的M参数32
ef_search搜索时的ef参数64

资料来源:datatune/core/deduplication.py:1-150

Agent智能代理

5.1 代理概述

Agent是Datatune的高级抽象,它能够自动分析用户的自然语言请求,规划并执行一系列数据转换步骤:

graph TD
    A[用户自然语言请求] --> B[LLM解析意图]
    B --> C[生成执行计划]
    C --> D[计划验证]
    D --> E{计划有效?}
    E -->|是| F[逐步执行]
    E -->|否| G[返回错误]
    F --> H[执行日志记录]
    H --> I[完成并返回结果]

资料来源:datatune/agent/agent.py:1-150datatune/agent/__init__.py:1-40

5.2 计划格式

Agent生成的计划为JSON数组,每个步骤包含以下字段:

字段类型说明
typestring操作类型:dask 或 primitive
operationstring操作名称
paramsdict操作参数字典
subpromptstring原语操作的LLM提示词
input_fieldslist输入列列表
output_fieldslist输出列列表(仅Map)

资料来源:datatune/agent/agent.py:100-150

5.3 支持的Dask操作

Agent可使用的Dask操作包括:

操作类型操作名说明
列操作add_column从表达式创建新列
列操作apply_function对列应用函数
列操作rename_columns重命名列
列操作astype_column更改列数据类型

资料来源:datatune/agent/agent.py:150-200

数据源支持

6.1 Dask后端

Dask后端支持大规模分布式数据处理:

import dask.dataframe as dd
df = dd.read_csv("data.csv")

特点:

  • 支持分区分布式计算
  • 延迟执行提高性能
  • 与pandas API兼容

6.2 Ibis后端

Ibis后端支持多种SQL数据库:

数据库连接方式
DuckDBibis.duckdb.connect()
PostgreSQLibis.postgres.connect()
BigQueryibis.bigquery.connect()

资料来源:README.md:30-60

快速开始

7.1 安装

pip install datatune

7.2 基本使用示例

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 使用Map创建新列
mapped = dt.map(
    prompt="从产品描述和名称中提取类别",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 使用Filter过滤数据
filtered = dt.filter(
    prompt="仅保留电子产品",
    input_fields=["Name"]
)(llm, mapped)

# 保存结果
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

7.3 Agent使用示例

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("添加ProfitMargin列并仅保留非洲组织", df)
result = dt.finalize(df)

Agent会自动:

  • 确定使用哪些操作(map、filter等)
  • 链接多个转换步骤
  • 处理复杂的多步骤任务
  • 生成并执行Python代码

资料来源:README.md:1-100

系统提示词设计

Agent的系统提示词定义了可用的工具和上下文:

资料来源:datatune/agent/__init__.py:1-50

可用的Python库:

  • pandas
  • numpy
  • dask

可用的数据处理原语:

  • Map:通过LLM提示词从现有数据创建新列
  • Filter:通过LLM提示词根据条件移除行

错误处理机制

9.1 执行错误捕获

Agent实现了完善的错误处理机制:

graph TD
    A[执行步骤] --> B{是否成功?}
    B -->|是| C[记录日志]
    B -->|否| D[捕获异常]
    D --> E[格式化错误信息]
    E --> F[返回错误和步骤号]

资料来源:datatune/agent/agent.py:80-120

9.2 速率限制警告

当使用的模型未配置速率限制时,系统会发出警告:

if "rpm" not in kwargs:
    logger.warning(f"REQUESTS-PER-MINUTE limits for model '{model_name}' not found.")
if "tpm" not in kwargs:
    logger.warning(f"TOKENS-PER-MINUTE limits for model '{model_name}' not found.")

资料来源:datatune/llm/llm.py:65-80

总结

Datatune是一个功能强大的LLM驱动数据转换框架,它:

  1. 简化数据处理:通过自然语言描述替代复杂的代码编写
  2. 支持多种后端:Dask用于大规模分布式计算,Ibis支持多种SQL数据库
  3. 集成多LLM:OpenAI、Ollama、Azure、Mistral等主流提供商
  4. 智能去重:基于嵌入向量的高效相似度检测
  5. Agent自动化:自动规划并执行复杂的数据转换流程

该框架适用于需要对结构化数据进行转换、清洗和增强的各种场景,特别是当业务逻辑难以用传统编程表达时,Datatune能够显著提升开发效率。

资料来源:[README.md:1-20]()

快速开始

datatune 是一个使用 LLM(大语言模型)进行数据转换和处理的 Python 库。它允许用户通过自然语言描述来执行复杂的数据操作任务,如数据映射、过滤和去重。快速开始指南旨在帮助用户在最短时间内上手 datatune,掌握其核心功能并完成基础的数据处理流程。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 安装方式

继续阅读本节完整说明和来源证据。

章节 LLM 配置

继续阅读本节完整说明和来源证据。

章节 Dask 数据源

继续阅读本节完整说明和来源证据。

概述

datatune 是一个使用 LLM(大语言模型)进行数据转换和处理的 Python 库。它允许用户通过自然语言描述来执行复杂的数据操作任务,如数据映射、过滤和去重。快速开始指南旨在帮助用户在最短时间内上手 datatune,掌握其核心功能并完成基础的数据处理流程。

该库支持多种 LLM 后端,包括 OpenAI、Ollama、Azure、Mistral、Huggingface 和 VLLM,同时兼容 Dask 和 Ibis(支持 DuckDB、PostgreSQL、BigQuery 等)数据源。资料来源:README.md

环境准备

安装方式

datatune 可通过 pip 直接安装:

pip install datatune

资料来源:README.md

LLM 配置

datatune 支持多种 LLM 提供商,需根据使用的模型进行相应配置:

LLM 类型初始化方式默认模型必需参数
OpenAIOpenAI(model_name="gpt-3.5-turbo")gpt-3.5-turboapi_key(可选)
OllamaOllama()gemma3:4bapi_base(默认 localhost:11434)
AzureAzure(model_name="...")-api_key, api_base, api_version
MistralMistral(model_name="mistral/mistral-tiny")mistral/mistral-tinyapi_key
HuggingfaceHuggingface(model_name="...")-api_key
VLLMVLLM(model_name="...")-api_base

资料来源:datatune/llm/llm.py

核心工作流程

datatune 的数据处理遵循标准的三步流程:加载数据、执行转换、最终化结果。以下是完整的工作流程图:

graph TD
    A[读取数据源] --> B[map 或 filter 转换]
    B --> C{datatune.finalize]
    C --> D[执行 compute]
    D --> E[保存结果]
    
    F[LLM 实例] --> B

数据加载

Dask 数据源

使用 Dask 读取 CSV 文件:

import dask.dataframe as dd

df = dd.read_csv("products.csv")

Ibis 数据源

使用 Ibis 连接 DuckDB 或其他数据库:

import ibis

con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

资料来源:README.md

map 操作

dt.map() 用于从现有列生成新的列,通过自然语言描述转换逻辑。

import datatune as dt

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

参数说明

参数类型说明
promptstr自然语言描述转换规则
output_fieldsList[str]输出新列名列表
input_fieldsList[str]输入列名列表

资料来源:README.md

实现原理

map 操作内部通过 LLM 调用处理数据,将每行数据序列化为字典格式,附加到提示词中发送给 LLM,LLM 返回包含新字段的字典。实现文件位于 datatune/core/dask/map_dask.pydatatune/core/ibis/map_ibis.py

map_dask.py 中的提示词格式如下:

suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
)

资料来源:datatune/core/dask/map_dask.py

filter 操作

dt.filter() 用于根据条件过滤数据行。

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

参数说明

参数类型说明
promptstr自然语言描述过滤条件
input_fieldsList[str]输入列名列表

资料来源:README.md

实现原理

filter 操作使用特殊格式让 LLM 判断每行是否应保留。返回格式包含 __filter__ 键,值为 True 表示保留,False 表示移除。

suffix = (
    f"{os.linesep}{os.linesep}"
    "DECISION:Your response MUST be the entire input record as a Python dictionary..."
    "with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it."
)

资料来源:datatune/core/dask/filter_dask.py

最终化与保存结果

使用 dt.finalize() 完成所有计算并将结果转换为标准 DataFrame:

result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

finalize() 函数负责触发 Dask 计算图的实际执行,将延迟操作转换为最终结果。资料来源:README.md

Agent 模式

对于复杂的、多步骤的数据处理任务,可使用 dt.Agent 让 AI 自动规划转换步骤:

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = dd.read_csv("data.csv")

# 描述复杂需求,Agent 自动规划执行步骤
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)

Agent 自动完成的工作

功能说明
操作类型判断自动决定使用 map、filter 还是其他操作
步骤链式执行自动串联多个转换步骤
复杂任务处理支持从单一提示词执行多步操作
代码生成生成 Python 代码和行级原语操作

资料来源:README.md

Agent 系统提示词

Agent 基于以下系统提示词工作:

You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your runtime:
- pandas
- numpy
- dask

资料来源:datatune/agent/__init__.py

模型速率限制

datatune 内置了常见 OpenAI 模型的速率限制配置,确保 API 调用不会超出限制:

模型TPM(每分钟令牌数)RPM(每分钟请求数)
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500
gpt-4.130,000500
gpt-4.1-mini200,000500
gpt-4.1-nano200,000500
gpt-4o30,000500

资料来源:datatune/llm/model_rate_limits.py

完整示例

以下是一个完整的快速开始示例,整合所有核心功能:

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 1. 初始化 LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# 2. 加载数据
df = dd.read_csv("products.csv")

# 3. 使用 map 提取类别
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 4. 使用 filter 筛选电子产品
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 5. 最终化并保存
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

后续步骤

  • 查看完整文档:https://docs.datatune.ai/
  • 参考示例代码:https://github.com/vitalops/datatune/tree/main/examples
  • 加入社区 Discord:https://discord.gg/3RKA5AryQX

资料来源:[README.md](https://github.com/vitalops/datatune/blob/main/README.md)

系统架构

Datatune 是一个基于大语言模型(LLM)的数据处理框架,其核心设计目标是通过自然语言描述实现对数据的转换、过滤和去重操作。系统采用模块化架构,将 LLM 交互、数据处理后端和智能代理三个层面解耦,支持 Dask 和 Ibis 两种数据处理后端,兼容 DuckDB、PostgreSQL、BigQuery 等多种数据源。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 各层职责

继续阅读本节完整说明和来源证据。

章节 抽象基类设计

继续阅读本节完整说明和来源证据。

章节 支持的 LLM 提供者

继续阅读本节完整说明和来源证据。

概述

Datatune 是一个基于大语言模型(LLM)的数据处理框架,其核心设计目标是通过自然语言描述实现对数据的转换、过滤和去重操作。系统采用模块化架构,将 LLM 交互、数据处理后端和智能代理三个层面解耦,支持 Dask 和 Ibis 两种数据处理后端,兼容 DuckDB、PostgreSQL、BigQuery 等多种数据源。

资料来源:datatune/__init__.py:1-7

架构分层

Datatune 采用三层架构设计,从下至上依次为:

graph TD
    subgraph "表现层"
        Agent
    end
    
    subgraph "核心操作层"
        Map
        Filter
        Reduce
    end
    
    subgraph "后端抽象层"
        Dask后端
        Ibis后端
    end
    
    subgraph "LLM交互层"
        OpenAI
        Ollama
        VLLM
        Azure
    end
    
    Agent --> Map
    Agent --> Filter
    Map --> Dask后端
    Map --> Ibis后端
    Filter --> Dask后端
    Filter --> Ibis后端
    OpenAI --> Agent
    Ollama --> Agent
    VLLM --> Agent
    Azure --> Agent

各层职责

层级组件职责
表现层Agent理解用户意图,规划执行步骤,生成并执行代码
核心操作层Map/Filter/Reduce提供声明式数据转换接口
后端抽象层Dask/Ibis实现分布式数据处理逻辑
LLM交互层各LLM实现类封装与不同语言模型的通信协议

资料来源:datatune/agent/agent.py:1-15

LLM 交互层

抽象基类设计

LLM 是所有 LLM 提供者的抽象基类,通过 litellm 库实现统一的模型调用接口:

class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式: "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]

资料来源:datatune/llm/llm.py:17-24

支持的 LLM 提供者

提供者类名默认模型特点
OpenAIOpenAIgpt-3.5-turbo云端 API
OllamaOllamagemma3:4b本地部署
VLLMVLLM需指定高性能推理
AzureAzure需指定企业级部署

资料来源:datatune/llm/llm.py:65-108

模型速率限制

系统内置了主流模型的速率限制配置,包含每分钟请求数(RPM)和每分钟令牌数(TPM):

model_rate_limits = {
    "gpt-3.5-turbo": {"tpm": 200_000, "rpm": 500},
    "gpt-4": {"tpm": 10_000, "rpm": 500},
    "gpt-4o": {"tpm": 30_000, "rpm": 500},
}

资料来源:datatune/llm/model_rate_limits.py:1-20

核心操作层

Map 操作

Map 操作通过 LLM 将现有列转换为新列,实现数据转换功能:

graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM批量推理]
    C --> D[解析输出字典]
    D --> E[写入新列]
    E --> F[输出DataFrame]

关键流程包括:

  1. 将输入列序列化为字符串格式
  2. 调用 LLM 批量处理,根据 prompt 生成映射结果
  3. 解析 LLM 返回的 Python 字典格式输出
  4. 将结果写入新生成的列

资料来源:datatune/core/dask/map_dask.py:1-50

Filter 操作

Filter 操作根据 LLM 生成的决策条件过滤数据行:

graph LR
    A[输入DataFrame] --> B[序列化输入列]
    B --> C[LLM决策判断]
    C --> D[解析__filter__字段]
    D --> E[布尔索引过滤]
    E --> F[输出DataFrame]

LLM 响应格式要求包含 __filter__ 字段,值为 True 表示保留,False 表示移除:

index=0|{key1: value1, key2: value2, '__filter__': True}<endofrow>

资料来源:datatune/core/dask/filter_dask.py:1-60

语义去重

SemanticDeduplicator 使用嵌入向量和 FAISS 索引实现语义级别的去重:

graph TD
    A[原始数据] --> B[嵌入生成]
    B --> C[FAISS HNSW索引]
    C --> D[流式聚类]
    D --> E[重复组识别]
    E --> F[规范ID映射]

去重器支持以下参数:

参数说明默认值
embedding_model嵌入模型名称text-embedding-3-small
sim_threshold相似度阈值0.90
top_k最近邻数量50
hnsw_mHNSW 索引参数32
ef_search搜索精度参数64

资料来源:datatune/core/deduplication.py:1-80

Agent 代理层

Agent 执行流程

Agent 是系统的智能编排层,能够自动分析用户需求并生成执行计划:

graph TD
    A[用户自然语言描述] --> B[LLM生成执行计划]
    B --> C{计划步骤循环}
    C -->|步骤类型为dask| D[执行Dask操作模板]
    C -->|步骤类型为primitive| E[执行Primitive操作]
    D --> F[记录步骤日志]
    E --> F
    F --> G{还有更多步骤?}
    G -->|是| C
    G -->|否| H[计算并返回结果]

执行计划结构

Agent 生成的执行计划是一个 JSON 数组,每个步骤包含以下字段:

字段类型说明
typestring"dask" 或 "primitive"
operationstring操作名称
paramsdictDask 模板参数
subpromptstringPrimitive 操作的 LLM 提示词
input_fieldslist输入列名
output_fieldslist输出列名(仅 Map)

资料来源:datatune/agent/agent.py:80-120

操作模板

Agent 支持两种类型的操作模板:

#### Dask 操作模板

用于直接生成 Dask DataFrame 操作代码:

TEMPLATE = {
    "dask": {
        "add_column": "df = df.assign({new_column}=...)",
        "group_by": "df = df.groupby(...).agg(...)",
        "rename_columns": "df = df.rename(columns={mapping})",
    }
}

#### Primitive 操作模板

用于通过 LLM 生成逐行转换逻辑:

TEMPLATE = {
    "primitive": {
        "Map": "df = dt.Map(subprompt=...)",
        "Filter": "df = dt.Filter(subprompt=...)",
    }
}

资料来源:datatune/agent/agent.py:25-50

数据流处理

Dask 分区处理

系统使用 map_partitions 实现分布式数据处理,保证每个分区独立执行 LLM 操作:

code_lines += [
    f"df = df.map_partitions(log_primitive, {start_msg!r}, meta=df._meta)",
    "step_num += 1",
    template,
    f"df = df.map_partitions(log_primitive, {end_msg!r}, meta=df._meta)"
]

资料来源:datatune/agent/agent.py:55-75

重复记录处理

在 Map 和 Filter 操作中,系统采用规范 ID 映射策略处理重复记录:

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]
llm_out = llm(canonical_input, ...)

# 将结果回填到重复记录
for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

资料来源:datatune/core/dask/map_dask.py:30-55

公共 API

系统通过 datatune/__init__.py 导出以下核心组件:

from datatune.agent.agent import Agent
from datatune.core.filter import filter
from datatune.core.map import map
from datatune.core.dask.op import finalize
from datatune.core.deduplication import SemanticDeduplicator
from datatune.core.reduce import reduce

__all__ = ["map", "filter", "finalize", "Agent", "reduce"]

资料来源:datatune/__init__.py:1-10

错误处理机制

步骤级错误追踪

Agent 在执行计划时维护步骤计数器,失败时返回错误信息和失败步骤编号:

def _execute_plan(self, plan: List[Dict]):
    self.runtime.update({"step_num": 0, "plan": plan})
    for i, step in enumerate(plan, start=1):
        try:
            # 执行步骤...
            self.runtime["step_num"] = i
        except Exception as e:
            return str(e), i - 1

资料来源:datatune/agent/agent.py:45-70

输出解析容错

LLM 输出解析使用 ast.literal_eval 进行安全评估,解析失败时返回空字典:

def safe_parse(row):
    try:
        return ast.literal_eval(row)
    except Exception:
        return {}

资料来源:datatune/core/deduplication.py:60-65

总结

Datatune 的系统架构体现了以下设计原则:

  1. 层次化解耦:LLM 交互、数据处理和用户接口三层分离,便于维护和扩展
  2. 多后端支持:通过 Dask 和 Ibis 抽象,支持从单机到分布式多种部署场景
  3. 智能编排:Agent 层自动规划执行路径,降低用户使用门槛
  4. 容错设计:完善的错误处理和日志记录机制,便于问题诊断
  5. 语义去重:基于嵌入向量的语义匹配,实现精准的数据去重

资料来源:[datatune/__init__.py:1-7]()

数据管理与数据流

Datatune 是一个基于 LLM 的数据处理库,通过自然语言提示词实现数据的映射(Map)、过滤(Filter)和去重(Deduplication)操作。系统支持多种后端数据源,包括 Dask DataFrame 和 Ibis(支持 DuckDB、PostgreSQL、BigQuery 等),并通过统一的 API 接口实现数据流的透明处理。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 支持的数据源

继续阅读本节完整说明和来源证据。

章节 数据类型检测

继续阅读本节完整说明和来源证据。

章节 数据处理流程

继续阅读本节完整说明和来源证据。

概述

Datatune 是一个基于 LLM 的数据处理库,通过自然语言提示词实现数据的映射(Map)、过滤(Filter)和去重(Deduplication)操作。系统支持多种后端数据源,包括 Dask DataFrame 和 Ibis(支持 DuckDB、PostgreSQL、BigQuery 等),并通过统一的 API 接口实现数据流的透明处理。

核心设计理念是将复杂的数据转换逻辑封装为可组合的操作符,用户只需通过自然语言描述期望的转换结果,系统即可自动生成并执行相应的处理流程。

架构概览

graph TD
    subgraph "数据输入层"
        CSV[CSV 文件]
        SQLDB[(SQL 数据库)]
        DuckDB[(DuckDB)]
    end
    
    subgraph "抽象接口层"
        DD[Dask DataFrame]
        IB[Ibis Table]
    end
    
    subgraph "核心操作层"
        MAP[Map 操作]
        FILTER[Filter 操作]
        DEDUP[去重操作]
        REDUCE[Reduce 操作]
    end
    
    subgraph "LLM 服务层"
        OPENAI[OpenAI]
        OLLAMA[Ollama]
        AZURE[Azure]
        VLLM[VLLM]
    end
    
    CSV --> DD
    SQLDB --> IB
    DuckDB --> IB
    
    DD --> MAP
    DD --> FILTER
    DD --> DEDUP
    IB --> MAP
    IB --> FILTER
    
    MAP --> LLM
    FILTER --> LLM
    
    LLM --> OPENAI
    LLM --> OLLAMA
    LLM --> AZURE
    LLM --> VLLM

数据源支持

Datatune 通过抽象数据类型检测机制自动选择合适的后端实现。

支持的数据源

数据源类型后端实现导入方式
CSV/ParquetDaskdask.dataframe as dd
DuckDBIbisibis.duckdb.connect()
PostgreSQLIbisibis.postgres.connect()
BigQueryIbisibis.bigquery.connect()

数据类型检测

数据类型检测通过 __init__.py 中的内部函数实现:

def _is_ibis_table(obj):
    import ibis
    return isinstance(obj, ibis.Table)

def _is_dask_df(obj):
    import dask.dataframe as dd
    return isinstance(obj, dd.DataFrame)

资料来源:datatune/core/map.py:1-16

Map 操作数据流

Map 操作是 Datatune 的核心功能之一,用于根据自然语言提示词生成新的数据列。

数据处理流程

graph LR
    A[输入 DataFrame] --> B[序列化输入列]
    B --> C[去重预处理]
    C --> D[批量调用 LLM]
    D --> E[解析 LLM 输出]
    E --> F[合并去重结果]
    F --> G[输出包含新列的 DataFrame]

Dask 后端 Map 实现

Dask 后端的 Map 操作通过 _map_dask 函数实现,采用延迟计算和分区处理策略:

def _map_dask(prompt, output_fields, input_fields=None, clusters=None):
    # 1. 序列化输入列为 JSON 字符串
    serialized_input_column = "serialized_input"
    df[serialized_input_column] = df[input_fields].apply(
        lambda x: json.dumps(x.to_dict()), axis=1, meta=(serialized_input_column, str)
    )
    
    # 2. 构建 LLM 调用前缀和后缀
    prefix = "..."
    suffix = "Your response MUST be the entire input record..."
    
    # 3. 处理重复数据的预处理
    dup_to_canon = {
        dup: c["canonical_id"]
        for c in clusters
        for dup in c["duplicate_ids"]
    }
    canonical_idx = input_series.index.difference(dup_to_canon.keys())
    canonical_input = input_series.loc[canonical_idx]
    
    # 4. 批量调用 LLM
    llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)
    
    # 5. 将 LLM 结果写回 DataFrame
    df.loc[canonical_idx, llm_output_column] = llm_out
    
    # 6. 复制去重结果到重复行
    for dup, canon in dup_to_canon.items():
        df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

资料来源:datatune/core/dask/map_dask.py:1-45

Ibis 后端 Map 实现

Ibis 后端采用不同的实现方式,使用行号索引和内存表进行数据处理:

def _map_ibis(table, llm, input_col, output_col, prompt, expected_new_fields):
    # 添加行号索引
    indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
    
    # 将数据拉到本地执行
    local_data = indexed_table.select("_ROW_ID_", input_col).execute()
    input_list = local_data[input_col].tolist()
    
    # 调用 LLM
    raw_results = llm(input_list, prefix, mapping_prompt, suffix, optimized=True)
    
    # 解析结果并处理异常
    processed_results = []
    for res in raw_results:
        try:
            py_dict = ast.literal_eval(str(res).strip())
            processed_results.append(json.dumps(py_dict))
        except Exception:
            processed_results.append("{}")
    
    # 创建内存表并关联
    mapping_df = pd.DataFrame({
        "_ROW_ID_": local_data["_ROW_ID_"].values, 
        output_col: processed_results
    })
    mapping_table = ibis.memtable(mapping_df)

资料来源:datatune/core/ibis/map_ibis.py:1-50

LLM 输出格式规范

Map 操作要求 LLM 返回特定格式的输出:

index=<row_index>|{key1: value1, key2: value2, ...}
  • 必须以 index=<row_index>| 开头
  • 主体必须是有效的 Python 字典格式
  • 新增字段必须使用双引号
  • 缺失值使用 None 表示

资料来源:datatune/core/dask/map_dask.py:15-25

Filter 操作数据流

Filter 操作根据自然语言条件过滤数据行。

过滤处理流程

graph LR
    A[输入 DataFrame] --> B[序列化数据行]
    B --> C[构建过滤提示词]
    C --> D[批量调用 LLM]
    D --> E[解析过滤决策]
    E --> F[应用过滤掩码]
    F --> G[输出过滤后的 DataFrame]

Dask 后端 Filter 实现

def _filter_dask(df, llm, input_fields, prompt, clusters=None):
    # 序列化输入数据
    serialized_input_column = "serialized_input"
    df[serialized_input_column] = df[input_fields].apply(
        lambda x: json.dumps(x.to_dict()), axis=1, meta=(serialized_input_column, str)
    )
    
    # 构建过滤提示词
    suffix = (
        f"{os.linesep}{os.linesep}"
        "DECISION:Your response MUST be the entire input record..."
        "with added key called '__filter__' with value either True to KEEP "
        "the record or False to REMOVE it."
    )
    
    # 去重处理
    dup_to_canon = {
        dup: c["canonical_id"]
        for c in clusters
        for dup in c["duplicate_ids"]
    }
    canonical_idx = input_series.index.difference(dup_to_canon.keys())
    canonical_input = input_series.loc[canonical_idx]
    
    # 调用 LLM 获取过滤决策
    llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)
    
    # 合并去重结果
    for dup, canon in dup_to_canon.items():
        df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

资料来源:datatune/core/dask/filter_dask.py:1-40

LLM 过滤决策格式

Filter 操作要求 LLM 返回带有 __filter__ 键的字典:

index=<row_index>|{key1: value1, key2: value2, ..., '__filter__': True}
  • __filter__: True 表示保留该行
  • __filter__: False 表示移除该行

资料来源:datatune/core/dask/filter_dask.py:18-24

去重机制

Datatune 实现了基于语义相似度的去重功能,用于优化 LLM 调用并保持数据一致性。

去重处理架构

graph TD
    A[输入分区数据] --> B[Embedding 向量化]
    B --> C[构建 FAISS HNSW 索引]
    C --> D[相似度搜索]
    D --> E[聚类重复数据]
    E --> F[合并重复行结果]

去重处理流程

def _embed_and_write_partition(self, part, partition_id, output_dir):
    # 解析行为字典
    dicts = [safe_parse(row) for row in pdf]
    
    # 构建文本表示
    texts = [
        ", ".join(f"{k}: {v}" for k, v in d.items())
        for d in dicts
    ]
    
    # 批量生成 Embedding
    for i in range(0, len(texts), 256):
        batch = texts[i:i+256]
        resp = embedding(model=self.embedding_model, input=batch)
        embeddings.extend([item["embedding"] for item in resp["data"]])
    
    # 归一化并存储
    X = np.asarray(embeddings, dtype="float32")
    faiss.normalize_L2(X)
    np.save(f"{output_dir}/embeddings_part_{partition_id}.npy", X)

资料来源:datatune/core/deduplication.py:1-60

去重配置参数

参数默认值说明
embedding_modeltext-embedding-3-small用于向量化的 Embedding 模型
sim_threshold0.90相似度阈值,超过此值视为重复
top_k50搜索时返回的最近邻数量
hnsw_m32HNSW 索引参数 M
ef_search64HNSW 搜索参数

资料来源:datatune/core/deduplication.py:1-25

Reduce 操作

Reduce 操作通过注册-执行模式实现数据的聚合转换。

架构设计

graph LR
    A[定义 Action] --> B[注册到全局表]
    B --> C[调用 reduce 函数]
    C --> D[查找并实例化 Action]
    D --> E[执行数据处理]

Action 注册机制

_ACTIONS = {}

def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator

def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")

def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)   
    return reducer(df)

资料来源:datatune/core/reduce.py:1-25

Agent 编排层

Agent 是 Datatune 的智能编排组件,能够根据用户自然语言描述自动规划并执行数据处理流程。

Agent 工作流

graph TD
    A[用户描述任务] --> B[规划器分析意图]
    B --> C{判断操作类型}
    C -->|需要语义理解| D[使用 Primitive 操作]
    C -->|纯数据转换| E[使用 Dask 操作]
    D --> F[执行 Map/Filter]
    E --> G[执行 add_column/group_by 等]
    F --> H[合并结果]
    G --> H
    H --> I[返回处理后的 DataFrame]

计划执行流程

def _execute_plan(self, plan: List[Dict]):
    self._set_df(self.df)
    runtime.update({"step_num": 0, "plan": plan})
    
    for i, step in enumerate(plan, start=1):
        step_type = step["type"]
        operation = step["operation"]
        params = step["params"]
        
        if step_type == "dask":
            template = self.TEMPLATE["dask"][step["operation"]].format(**params)
            self.runtime.execute(template + "\n_ = df.head()")
        elif step_type == "primitive":
            template = self.TEMPLATE["primitive"][step["operation"]].format(**params)
            self.runtime.execute(template)

资料来源:datatune/agent/agent.py:1-80

支持的操作类型

#### Primitive 操作

操作类型说明参数
Map从文本生成新列subprompt, input_fields, output_fields
Filter根据条件过滤行subprompt, input_fields

#### Dask 操作

操作类型说明参数
add_column从表达式添加列new_column, expression
group_by_agg分组聚合group_columns, aggregations
rename_columns重命名列mapping
astype_column改变列类型column, dtype

资料来源:datatune/agent/agent.py:50-100

LLM 接口层

LLM 模块提供了统一的接口,支持多种大语言模型服务。

支持的 LLM 提供商

提供商类名默认模型配置参数
OpenAIOpenAIgpt-3.5-turboapi_key
OllamaOllamagemma3:4bapi_base
AzureAzure自定义api_key, api_base, api_version
VLLMVLLM自定义api_base, max_tokens

批处理机制

LLM 模块实现了智能批处理,根据输入长度动态分组请求以优化 API 调用:

# token 计数估算
prefix_suffix_tokens = token_counter(model_name, messages=message(""))
total_ntokens = prefix_suffix_tokens

for i, prompt in enumerate(input_rows):
    # 动态批处理逻辑
    # 当累计 token 接近限制时执行当前批次

资料来源:datatune/llm/llm.py:1-60

API 参考

核心函数

#### map 函数

def map(*, prompt, output_fields, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_dask_df(data):
            from .dask.map_dask import _map_dask
            return _map_dask(prompt=prompt, output_fields=output_fields, 
                           input_fields=input_fields, clusters=clusters)(llm, data)
        elif _is_ibis_table(data):
            from .ibis.map_ibis import _map_ibis
            return _map_ibis(prompt=prompt, output_fields=output_fields,
                            input_fields=input_fields)(llm, data)
    return apply

资料来源:datatune/core/map.py:20-38

#### filter 函数

def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_dask_df(data):
            from .dask.filter_dask import _filter_dask
            return _filter_dask(prompt=prompt, input_fields=input_fields,
                               clusters=clusters)(llm, data)
        elif _is_ibis_table(data):
            from .ibis.filter_ibis import _filter_ibis
            return _filter_ibis(prompt=prompt, input_fields=input_fields)(llm, data)
    return apply

资料来源:datatune/core/filter.py:20-36

模块导出

from datatune import map, filter, finalize, Agent, reduce

资料来源:datatune/__init__.py:1-8

完整使用示例

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化 LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# 读取数据
df = dd.read_csv("products.csv")

# Map: 提取分类信息
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# Filter: 保留电子产品
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 获取最终结果
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

资料来源:[datatune/core/map.py:1-16](https://github.com/vitalops/datatune/blob/main/datatune/core/map.py)

Map操作

Map操作是datatune库的核心功能之一,它通过大型语言模型(LLM)实现对数据列的语义转换和增强。用户只需提供自然语言描述的转换规则,Map操作即可自动解析数据、调用LLM生成新字段,并返回增强后的数据集。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 模块层次

继续阅读本节完整说明和来源证据。

章节 执行流程

继续阅读本节完整说明和来源证据。

章节 Dask后端 mapdask.py

继续阅读本节完整说明和来源证据。

概述

Map操作是datatune库的核心功能之一,它通过大型语言模型(LLM)实现对数据列的语义转换和增强。用户只需提供自然语言描述的转换规则,Map操作即可自动解析数据、调用LLM生成新字段,并返回增强后的数据集。

该操作支持Dask和Ibis两种后端,能够处理分布式数据源(如CSV、数据库表等),同时内置去重优化机制,避免对语义相同的记录重复调用LLM API,从而显著降低使用成本。

架构设计

模块层次

datatune.core.map (顶层接口)
    ├── datatune.core.dask.map_dask (Dask后端实现)
    └── datatune.core.ibis.map_ibis (Ibis后端实现)

执行流程

graph TD
    A[开始Map操作] --> B[序列化输入列]
    B --> C{启用去重?}
    C -->|是| D[构建重复记录映射]
    C -->|否| E[跳过去重]
    D --> F[提取唯一记录索引]
    E --> G[调用LLM API]
    F --> G
    G --> H[解析LLM输出]
    H --> I[填充结果到DataFrame]
    I --> J[返回增强后的数据]
    D --> K[复制结果到重复记录]
    K --> J

核心实现

Dask后端 map_dask.py

#### 去重优化机制

Dask后端的Map操作实现了智能去重功能,通过SemanticDeduplicator预先识别语义相似的记录。当存在重复记录群组时,系统仅对每个群组的canonical(规范)记录调用LLM,然后将结果复制到群组内的其他重复记录。

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]

llm_out = llm(
    canonical_input,
    prefix,
    prompt,
    suffix,
    optimized=True
)

df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

资料来源:datatune/core/dask/map_dask.py:25-46

#### LLM调用与输出解析

Map操作使用特定的输出格式约束,确保LLM返回可解析的Python字典:

suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
     
    "ALWAYS START YOUR RESPONSE WITH 'index=<row_index>|' WHERE <row_index> IS THE INDEX OF THE ROW." \
    "IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO null" \
    "'index=<row_index>|{key1: None, key2: value2, ...}'"
)

输出解析通过parse_llm_output函数处理,将LLM返回的字符串转换为Python字典对象。

资料来源:datatune/core/dask/map_dask.py:8-17

Ibis后端 map_ibis.py

#### 行号索引机制

Ibis后端为每个输入记录添加行号索引,以便后续结果关联:

indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))

local_data = indexed_table.select("_ROW_ID_", input_col).execute()

input_list = local_data[input_col].tolist()

raw_results = llm(input_list, prefix, mapping_prompt, suffix, optimized=True)

资料来源:datatune/core/ibis/map_ibis.py:35-45

#### 结果转换与关联

解析后的结果被转换为内存表并与原表JOIN:

processed_results = []
for res in raw_results:
    try:
        py_dict = ast.literal_eval(str(res).strip())
        processed_results.append(json.dumps(py_dict))
    except Exception:
        processed_results.append("{}")

mapping_df = pd.DataFrame({
    "_ROW_ID_": local_data["_ROW_ID_"].values, 
    output_col: processed_results
})
mapping_table = ibis.memtable(mapping_df)

资料来源:datatune/core/ibis/map_ibis.py:47-60

Agent集成

Plan定义格式

在Agent的Plan中,Map操作作为primitive步骤声明:

{
    "type": "primitive",
    "operation": "map",
    "params": {
        "subprompt": "Extract category and sub-category from industry",
        "input_fields": ["Industry"],
        "output_fields": ["Category", "Sub-Category"]
    }
}

资料来源:datatune/agent/agent.py:78-85

步骤类型决策规则

Agent根据以下规则选择Map操作:

条件选择操作类型
需要自然语言理解、语义提取、分类primitive (Map/Filter)
涉及多列语义推理primitive (Map)
简单的数值/列操作dask
复杂多步骤转换优先使用dask,后续步骤依赖新列时可使用primitive

使用示例

基础用法

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 提取类别信息
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 完成转换并保存
result = dt.finalize(mapped)
result.compute().to_csv("mapped_products.csv")

与Agent结合使用

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)

API参数说明

dt.map 函数参数

参数类型必填说明
promptstr自然语言描述的转换规则
input_fieldsList[str]输入列名列表
output_fieldsList[str]新生成的输出列名列表
optimizedbool是否启用去重优化,默认为True

LLM类配置

datatune支持多种LLM后端,各后端的初始化参数如下:

LLM类主要参数默认值
OpenAImodel_name, api_keygpt-3.5-turbo
Ollamamodel_name, api_basegemma3:4b, http://localhost:11434
Azuremodel_name, api_key, api_base, api_version-
VLLMmodel_name, api_base- , http://localhost:8000/v1

资料来源:datatune/llm/llm.py:48-95

速率限制

datatune内置了常用模型的速率限制配置,支持tpm(每分钟Token数)和rpm(每分钟请求数)两个维度的控制。

模型TPMRPM
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500
gpt-4.1-mini200,000500

资料来源:datatune/llm/model_rate_limits.py:1-30

技术特性

语义去重优化

当数据中存在语义重复的记录时,Map操作通过SemanticDeduplicator识别并合并:

graph LR
    A[原始数据] --> B[语义去重]
    B --> C[Canonical记录]
    B --> D[重复记录映射]
    C --> E[单次LLM调用]
    D --> F[结果复制]
    E --> G[最终结果]
    F --> G

此机制可节省高达50%-90%的API调用成本,同时保持结果准确性。

分布式处理

Map操作基于Dask或Ibis实现,支持大规模数据集的分布式处理:

  • Dask后端:利用Dask DataFrame的分区机制,每个分区独立处理
  • Ibis后端:支持DuckDB、PostgreSQL、BigQuery等多种数据库后端

最佳实践

  1. 批量输入优化:尽量将相关记录放在一起处理,提高去重效果
  2. 明确输出字段:在output_fields中清晰指定新列名,便于后续操作
  3. 选择合适模型:简单提取任务可使用gpt-3.5-turbo,复杂推理建议使用gpt-4
  4. 结果验证:首次使用新prompt时建议抽样验证输出质量

相关模块

资料来源:[datatune/core/dask/map_dask.py:25-46](https://github.com/vitalops/datatune/blob/main/datatune/core/dask/map_dask.py)

Filter操作

Filter操作是datatune库中用于基于自然语言描述对数据进行行级过滤的核心功能。该操作允许用户使用LLM(大型语言模型)根据语义理解来判断哪些数据行应该被保留或移除,无需编写复杂的条件表达式。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 模块结构

继续阅读本节完整说明和来源证据。

章节 后端支持

继续阅读本节完整说明和来源证据。

章节 函数签名

继续阅读本节完整说明和来源证据。

概述

Filter操作是datatune库中用于基于自然语言描述对数据进行行级过滤的核心功能。该操作允许用户使用LLM(大型语言模型)根据语义理解来判断哪些数据行应该被保留或移除,无需编写复杂的条件表达式。

Filter操作的本质是将用户的自然语言过滤条件翻译为布尔判断,通过LLM的语义理解能力实现智能化的数据筛选。系统会在每条数据记录中添加一个__filter__字段,值为True表示保留该记录,False表示移除该记录。

资料来源:datatune/core/filter.py:1-30

架构设计

模块结构

Filter操作采用后端分发的设计模式,根据输入数据的类型自动选择相应的处理后端。顶层模块filter.py负责类型检测和后端路由,具体实现分布在Dask和Ibis两个后端模块中。

graph TD
    A[用户调用 dt.filter] --> B{数据类型检测}
    B -->|Dask DataFrame| C[_filter_dask]
    B -->|Ibis Table| D[_filter_ibis]
    B -->|不支持的类型| E[TypeError]
    
    C --> F[LLM批量推理]
    D --> F
    
    F --> G[解析LLM输出]
    G --> H[生成过滤序列]
    H --> I[执行过滤操作]

资料来源:datatune/core/filter.py:16-29

后端支持

后端类型处理模块支持的数据源
Daskdatatune/core/dask/filter_dask.pyDask DataFrame
Ibisdatatune/core/ibis/filter_ibis.pyIbis Table(DuckDB、PostgreSQL、BigQuery等)

资料来源:datatune/core/filter.py:18-27

API接口

函数签名

def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        # 后端分发逻辑
        pass
    return apply

参数说明

参数类型必填说明
promptstr自然语言描述的过滤条件
input_fieldsList[str]要参与过滤判断的输入列(当前版本中未实际使用)
clustersList[Dict]用于去重场景的聚类信息,避免重复调用LLM

资料来源:datatune/core/filter.py:12-28

使用示例

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 使用自然语言过滤数据
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, df)

result = dt.finalize(filtered)

资料来源:README.md:1-45

工作流程

整体流程

Filter操作的处理流程分为以下几个阶段:

graph LR
    A[序列化输入数据] --> B[构建LLM提示词]
    B --> C[批量调用LLM]
    C --> D[解析LLM输出]
    D --> E[提取__filter__字段]
    E --> F[生成布尔序列]
    F --> G[执行数据过滤]

提示词构建

系统自动构建包含前缀、用户提示和后缀的完整提示词。

前缀部分包含输入数据格式规范:

Each answer must be formatted exactly as 'index=<index>|{answer}<endofrow>'.
{answer} must be any requested python literal (e.g. list, dict, string, integer)
eg: for a dict answer: index=1|{'key1': 'value1', 'key2': 2}<endofrow>
Ensure that each response is a valid python literal.
Ensure that strings are enclosed in double quotes.
STRINGS MUST BE ENCLOSED IN DOUBLE QUOTES. DO NOT OUTPUT BARE TEXT

用户提示部分包含过滤条件:

FILTERING CRITERIA:
{prompt}

后缀部分指定输出格式要求:

DECISION: Your response MUST be the entire input record as Python dictionary in the format: index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it.
No explanations or additional text.
ALWAYS STICK TO THE FORMAT index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called '__filter__' with value either True to KEEP the record or False to REMOVE it.
IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO None

资料来源:datatune/core/dask/filter_dask.py:1-30

LLM响应格式

LLM返回的数据必须包含原始记录信息,并添加__filter__布尔字段:

# 示例输入
{"Name": "Laptop", "Price": 999, "Category": "Electronics"}

# LLM期望输出格式
'index=0|{"Name": "Laptop", "Price": 999, "Category": "Electronics", "__filter__": true}<endofrow>'

Dask后端实现

去重机制

Dask后端支持基于聚类的去重优化。当提供clusters参数时,系统会建立一个从重复ID到规范ID的映射:

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]

对于规范数据行调用LLM进行判断,然后将结果复制到所有重复行:

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

资料来源:datatune/core/dask/filter_dask.py:25-50

过滤序列生成

使用parse_filter_output函数解析LLM输出:

def parse_filter_output(
    output: Union[str, Exception], err: bool = True
) -> Optional[bool]:
    # 解析LLM返回的字典,提取__filter__字段值

资料来源:datatune/core/dask/filter_dask.py:60-75

Ibis后端实现

行号索引

Ibis后端通过row_number()函数为表添加行号索引:

indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
local_data = indexed_table.select("_ROW_ID_", input_col).execute()

资料来源:datatune/core/ibis/filter_ibis.py:20-25

数据处理

Ibis实现同样通过AST解析提取过滤决策,并将结果存储在内存表中进行关联:

mapping_df = pd.DataFrame({
    "_ROW_ID_": local_data["_ROW_ID_"].values, 
    output_col: processed_results
})
mapping_table = ibis.memtable(mapping_df)

资料来源:datatune/core/ibis/filter_ibis.py:35-45

与Agent的集成

Filter操作可以与datatune的Agent功能结合使用,让AI自动判断何时需要使用Filter操作:

agent = dt.Agent(llm)
df = agent.do("Keep only African organizations", df)

在Agent的规划模板中,Filter操作被定义为primitive类型:

"filter": textwrap.dedent(
    """\
    filtered = dt.filter(
        prompt="{subprompt}",
        input_fields={input_fields}
    )(llm, df)
    df = filtered
    """
)

资料来源:datatune/agent/agent.py:100-110

错误处理

错误类型处理方式
不支持的数据类型抛出TypeError,提示当前支持Dask和Ibis
LLM输出解析失败捕获异常,返回空字典或默认值
API调用异常通过异常处理机制传递给上层

资料来源:datatune/core/filter.py:27-28

最佳实践

输入数据准备

确保输入数据包含足够的上下文信息供LLM进行判断。对于涉及多列条件的过滤,应在prompt中明确说明需要参考的列。

批量处理

Filter操作支持批量处理,通过设置合理的批次大小来平衡处理速度和API调用频率。

去重优化

在处理可能包含重复记录的数据时,使用clusters参数可以显著减少LLM API调用次数,提升处理效率。

资料来源:[datatune/core/filter.py:1-30]()

Reduce操作

Reduce操作是datatune库中用于对数据执行聚合、归约和转换操作的核心模块之一。该模块采用插件化架构设计,允许通过注册机制扩展不同的归约操作类型。reduce函数作为统一入口,接收数据帧和操作类型参数,动态调用对应的归约处理器完成数据处理任务。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 插件注册机制

继续阅读本节完整说明和来源证据。

章节 全局动作注册表

继续阅读本节完整说明和来源证据。

章节 注册装饰器

继续阅读本节完整说明和来源证据。

概述

Reduce操作是datatune库中用于对数据执行聚合、归约和转换操作的核心模块之一。该模块采用插件化架构设计,允许通过注册机制扩展不同的归约操作类型。reduce函数作为统一入口,接收数据帧和操作类型参数,动态调用对应的归约处理器完成数据处理任务。

在datatune的生态系统定位中,Reduce操作与Map、Filter操作共同构成了基于大语言模型的数据处理管道。三者的职责分工如下:

操作类型职责输入输出
Map字段映射与转换单行数据添加新字段的行数据
Filter数据过滤单行数据布尔决策(保留/移除)
Reduce聚合与归约多行数据聚合结果

核心架构

插件注册机制

Reduce模块采用工厂模式和装饰器模式实现插件化架构。核心组件包括全局动作注册表、动作注册装饰器和动作获取函数三个部分。

graph TD
    A[reduce函数] --> B[get_action获取动作类]
    B --> C{查找_ACTIONS注册表}
    C -->|找到| D[实例化动作类]
    C -->|未找到| E[抛出ValueError异常]
    D --> F[调用reducer执行归约]
    F --> G[返回处理结果]
    
    H[register_action装饰器] --> I[将动作类注册到_ACTIONS]
    J[自定义动作类] --> H
    K[内置动作类] --> H

全局动作注册表

_ACTIONS = {}

_ACTIONS是一个模块级字典,用作动作类的注册表。键为动作名称字符串,值为对应的动作类。资料来源:datatune/core/reduce.py:1

注册装饰器

def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator

register_action装饰器接收动作名称作为参数,返回一个接受类作为参数的装饰器函数。装饰器执行时将类注册到_ACTIONS字典中,并返回原类以保持装饰器链的正常工作。资料来源:datatune/core/reduce.py:6-12

动作获取函数

def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")

get_action函数从注册表中查找指定名称的动作类。若未找到对应动作,抛出ValueError异常并提示未知动作名称。资料来源:datatune/core/reduce.py:14-20

API参考

reduce函数

def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)
    return reducer(df)

函数签名参数说明:

参数类型必填说明
dfdask.dataframe.DataFrame输入数据帧
actionstr动作类型名称,用于从注册表获取对应动作类
**kwargs任意传递给动作类构造器的关键字参数

返回值: 返回动作类实例调用后的结果,具体类型取决于具体动作的实现。

资料来源:datatune/core/reduce.py:23-27

使用方式

基本调用模式

import datatune as dt
import dask.dataframe as dd

# 读取数据
df = dd.read_csv("data.csv")

# 调用reduce操作
result = dt.reduce(df, action="your_action_name", param1=value1, param2=value2)

在管道中使用

Reduce操作可以与其他datatune操作链式调用,构建完整的数据处理管道:

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")

# 构建处理管道
df = dd.read_csv("raw_data.csv")
mapped = dt.map(prompt="提取产品类别", input_fields=["description"], output_fields=["category"])(llm, df)
filtered = dt.filter(prompt="只保留电子产品", input_fields=["category"])(llm, mapped)
result = dt.reduce(df, action="deduplicate", embedding_model="text-embedding-3-small")
final_result = dt.finalize(result)

内置归约动作

SemanticDeduplicator

语义去重动作,用于识别和合并语义相似的记录。该动作使用嵌入向量计算记录间的语义相似度,适用于需要处理近似重复数据的场景。

嵌入到磁盘功能:

def embed_column_to_disk(self, df, column, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    tasks = []

    for pid in range(df.npartitions):
        part = df[column].get_partition(pid)
        task = dask.delayed(self._embed_and_write_partition)(
            part,
            pid,
            output_dir,
        )
        tasks.append(task)

    dask.compute(*tasks)

资料来源:datatune/core/deduplication.py:1-100

分区嵌入处理逻辑:

def _embed_and_write_partition(self, part, partition_id, output_dir):
    pdf = part

    if pdf.empty:
        print("empty partition")
        return 0

    emb_path = f"{output_dir}/embeddings_part_{partition_id}.npy"
    idx_path = f"{output_dir}/index_part_{partition_id}.npy"

    if os.path.exists(emb_path) and os.path.exists(idx_path):
        return 0

    row_index = pdf.index.to_numpy()

    def safe_parse(row):
        try:
            return ast.literal_eval(row)
        except Exception:
            return {}

    dicts = [safe_parse(row) for row in pdf]

    texts = [
        ", ".join(f"{k}: {v}" for k, v in d.items())
        for d in dicts
    ]

    embeddings = []
    for i in range(0, len(texts), 256):
        batch = texts[i:i+256]
        resp = embedding(model=self.embedding_model, input=batch)
        embeddings.extend([item["embedding"] for item in resp["data"]])

    X = np.asarray(embeddings, dtype="float32")
    faiss.normalize_L2(X)

    np.save(f"{output_dir}/embeddings_part_{partition_id}.npy", X)
    np.save(f"{output_dir}/index_part_{partition_id}.npy", row_index)

资料来源:datatune/core/deduplication.py:70-130

FAISS索引构建:

def build_faiss_index(self, embedding_dir, dim, hnsw_m, ef_search):
    index = faiss.IndexHNSWFlat(
        dim,
        hnsw_m,
        faiss.METRIC_INNER_PRODUCT,
    )
    index.hnsw.efConstruction = 200

资料来源:datatune/core/deduplication.py:140-160

构造器参数

参数类型默认值说明
llmLLM用于执行去重的语言模型实例
embedding_modelstr"text-embedding-3-small"嵌入模型名称
sim_thresholdfloat0.90相似度阈值,超过此值的记录被视为重复
top_kint50每个记录查找的最大相似记录数
hnsw_mint32HNSW索引的空间参数M
ef_searchint64HNSW搜索时的搜索参数ef
return_dfboolFalse是否返回DataFrame格式结果

资料来源:datatune/core/deduplication.py:160-180

自定义归约动作

用户可以通过@register_action装饰器注册自定义归约动作,扩展reduce模块的功能。

创建自定义动作示例

from datatune.core.reduce import register_action

@register_action("group_stats")
class GroupStatsReducer:
    def __init__(self, group_by_column, stat_columns):
        self.group_by_column = group_by_column
        self.stat_columns = stat_columns
    
    def __call__(self, df):
        return df.groupby(self.group_by_column)[self.stat_columns].agg(['mean', 'sum', 'count'])

调用自定义动作

import datatune as dt

result = dt.reduce(df, action="group_stats", group_by_column="category", stat_columns=["price", "quantity"])

执行流程

sequenceDiagram
    participant User as 用户
    participant Reduce as reduce函数
    participant Registry as 动作注册表
    participant Action as 动作类
    participant Result as 处理结果

    User->>Reduce: reduce(df, action="xxx", **kwargs)
    Reduce->>Registry: get_action("xxx")
    Registry-->>Reduce: 动作类引用
    Reduce->>Action: 实例化 cls(**kwargs)
    Reduce->>Action: reducer(df)
    Action->>Result: 执行归约逻辑
    Result-->>User: 返回处理结果

与Map和Filter的协作

在datatune的整体架构中,Reduce操作通常与其他两种操作配合使用。Map操作用于生成新的列字段,Filter操作用于数据筛选,Reduce操作用于聚合和去重。

graph LR
    A[原始数据] --> B[Map操作]
    B --> C[新增列数据]
    C --> D[Filter操作]
    D --> E[筛选后数据]
    E --> F[Reduce操作]
    F --> G[聚合/去重结果]
    G --> H[finalize]
    H --> I[计算结果]

工作流程示例

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Step 1: 使用Map提取类别信息
mapped = dt.map(
    prompt="从产品描述中提取主要类别",
    output_fields=["Category"],
    input_fields=["Description", "Name"]
)(llm, df)

# Step 2: 使用Filter筛选特定类别
filtered = dt.filter(
    prompt="只保留电子产品",
    input_fields=["Category"]
)(llm, mapped)

# Step 3: 使用Reduce进行语义去重
deduplicated = dt.reduce(
    df=dt.finalize(filtered),
    action="deduplicate",
    embedding_model="text-embedding-3-small"
)

# Step 4: 最终计算输出
result = deduplicated.compute()

资料来源:datatune/__init__.py:1-10

注意事项

  1. 动作注册时机:自定义动作需要在调用reduce函数之前完成注册,确保动作类已在_ACTIONS注册表中可用。
  1. 参数传递reduce函数使用关键字参数**kwargs将配置传递给动作类构造器,调用者需确保参数名与动作类构造器参数匹配。
  1. 异常处理:当指定的动作名称不存在时,get_action函数会抛出ValueError异常。应用程序应做好异常捕获处理。
  1. Dask集成:Reduce操作基于Dask DataFrame实现,支持大规模数据的分布式处理。输入数据应为dask.dataframe.DataFrame类型。资料来源:datatune/core/reduce.py:1
  1. 分区处理:在处理大规模数据时,系统会按分区逐个处理,用户应合理配置Dask集群或本地并行参数以优化性能。

资料来源:[datatune/core/reduce.py:23-27]()

Agent系统

Agent系统是Datatune的核心智能组件,它允许用户使用自然语言描述数据处理目标,系统自动解析用户意图,生成并执行数据转换计划。该系统结合了大型语言模型(LLM)的推理能力与Dask分布式计算框架,能够处理复杂的多步骤数据转换任务,无需用户手动编写代码。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 系统组件

继续阅读本节完整说明和来源证据。

章节 数据流架构

继续阅读本节完整说明和来源证据。

章节 Agent类

继续阅读本节完整说明和来源证据。

概述

Agent系统是Datatune的核心智能组件,它允许用户使用自然语言描述数据处理目标,系统自动解析用户意图,生成并执行数据转换计划。该系统结合了大型语言模型(LLM)的推理能力与Dask分布式计算框架,能够处理复杂的多步骤数据转换任务,无需用户手动编写代码。

Agent的核心设计理念是:用户描述期望的结果,系统自动决定使用何种操作以及如何组合这些操作。这使得非技术用户也能完成复杂的数据处理任务,同时保持足够的灵活性供高级用户使用。

资料来源:datatune/agent/__init__.py:1-30

架构设计

系统组件

Agent系统由以下核心组件构成:

组件文件位置职责
Agentdatatune/agent/agent.py主要执行引擎,负责计划生成与执行
Agent基类datatune/agent/__init__.py定义系统提示词和抽象接口
LLMdatatune/llm/llm.py提供多后端LLM调用能力
Map原语datatune/core/dask/map_dask.py基于LLM的列映射转换
Filter原语datatune/core/dask/filter_dask.py基于LLM的行过滤

数据流架构

graph TD
    A[用户自然语言目标] --> B[Agent.do方法]
    B --> C[LLM生成计划JSON]
    C --> D[计划验证]
    D --> E{计划类型}
    E -->|dask| F[Dask操作模板]
    E -->|primitive| G[Map/Filter原语]
    F --> H[代码执行引擎]
    G --> H
    H --> I[错误处理]
    I -->|成功| J[最终结果]
    I -->|失败| K[错误恢复]
    K --> C

Agent接收用户的自然语言指令后,首先调用LLM生成结构化的执行计划。计划由一系列步骤组成,每个步骤可以是Dask原生操作或LLM驱动的原语操作。

资料来源:datatune/agent/agent.py:1-50

核心类设计

Agent类

Agent是系统的主要执行类,封装了所有数据处理逻辑。

class Agent:
    def __init__(self, llm: LLM, verbose: bool = False):
        self.llm = llm
        self.history: List[Dict[str, Any]] = []
        self.verbose = verbose

初始化参数:

参数类型必填说明
llmLLMLLM实例,用于生成计划和执行原语
verbosebool设为True时启用DEBUG级别日志,默认False

主要方法:

方法返回值说明
do(goal: str, df: dd.DataFrame)`None \str`执行自然语言目标
_generate_plan(goal)List[Dict]生成执行计划
_execute_plan(plan)`Tuple[None, int] \Tuple[str, int]`执行完整计划
_execute_step(step)`None \str`执行单个步骤
log_primitive(df, message)dd.DataFrame记录原语执行状态

资料来源:datatune/agent/agent.py:50-80

Agent基类

class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    ..."""

基类定义了Agent的系统提示词,明确了可用的运行时环境:Python内置库、pandas、numpy、dask以及datatune库。

资料来源:datatune/agent/__init__.py:1-30

执行计划模型

计划结构

执行计划是一个JSON数组,每个元素代表一个独立的执行步骤:

[
  {
    "type": "dask" | "primitive",
    "operation": "操作名称",
    "params": { ... },
    "subprompt": "LLM提示词",
    "input_fields": ["列名"],
    "output_fields": ["新列名"]
  }
]

步骤类型

Agent支持两种类型的操作步骤:

类型说明示例操作
dask直接的Dask DataFrame操作add_column, group_by, rename_columns
primitiveLLM驱动的原语操作Map, Filter

Dask操作模板

系统预定义了丰富的Dask操作模板:

操作名称功能描述关键参数
add_column添加新列(基于表达式)column, expr
apply_function对列应用函数column, func
rename_columns重命名列mapping
astype_column更改列数据类型column, dtype

资料来源:datatune/agent/agent.py:80-150

原语系统

Map原语

Map原语使用LLM根据自然语言提示词从现有列生成新列。它能够处理语义提取、分类、解释等复杂转换任务。

工作流程:

  1. 序列化输入列数据为字典格式
  2. 将数据分批发送给LLM
  3. LLM返回包含新字段的字典
  4. 将结果合并回DataFrame
map = dt.Map(prompt="Extract categories from the description")
mapped_df = map(llm, df)

资料来源:datatune/core/dask/map_dask.py:1-60

Filter原语

Filter原语使用LLM根据自然语言条件过滤DataFrame行。

输出格式规范:

index=<row_index>|{...字典内容..., '__filter__': True|False}<endofrow>
  • True:保留该行
  • False:移除该行

资料来源:datatune/core/dask/filter_dask.py:1-50

LLM后端支持

系统通过统一的LLM接口支持多种语言模型提供商:

graph LR
    A[LLM基类] --> B[OpenAI]
    A --> C[Ollama]
    A --> D[VLLM]
    A --> E[Azure]

支持的模型

提供商模型标识默认模型
OpenAIopenai/<model>gpt-3.5-turbo
Ollamaollama_chat/<model>gemma3:4b
VLLMopenai/<model>-
Azureazure/<model>-

速率限制

系统内置了主流模型的速率限制配置:

模型TPM (每分钟Token数)RPM (每分钟请求数)
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500
gpt-4o30,000500

资料来源:datatune/llm/model_rate_limits.py:1-80

使用示例

基础用法

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化LLM和Agent
llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

# 加载数据
df = dd.read_csv("products.csv")

# 使用自然语言描述任务
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)

详细示例

# 创建Agent实例(启用详细日志)
agent = dt.Agent(llm, verbose=True)

# 执行复杂的多步骤任务
goal = """
1. Create a 'PriceCategory' column: 'high' if price > 100, 'low' otherwise
2. Keep only rows where PriceCategory is 'high'
3. Add 'DiscountedPrice' column: Price * 0.9
"""

df = agent.do(goal, df)
result = dt.finalize(df)

单独使用Map和Filter

# 仅使用Map原语
mapped = dt.map(
    prompt="Extract categories from the description and name",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 仅使用Filter原语
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

执行流程详解

计划生成

sequenceDiagram
    participant User as 用户
    participant Agent as Agent
    participant LLM as LLM
    participant Schema as 数据Schema

    User->>Agent: do(goal, df)
    Agent->>Schema: 获取DataFrame结构
    Agent->>LLM: 发送persona_prompt + schema_prompt
    LLM->>Agent: 返回JSON计划
    Agent->>Agent: 验证计划格式

Agent生成计划时,会向LLM传递:

  1. Persona提示词:定义LLM的角色、能力边界和输出格式
  2. Schema提示词:DataFrame的列结构和数据类型
  3. 用户目标:自然语言描述的期望结果

计划执行

执行过程采用流式日志记录:

# 每个步骤的执行流程
df = df.map_partitions(log_primitive, "🔄 开始步骤 X/Y", meta=df._meta)
step_num += 1
# 执行具体操作
df = df.map_partitions(log_primitive, "📍 完成步骤 X/Y", meta=df._meta)

最终执行完成后,调用dt.finalize()compute()将Dask DataFrame转换为实际的Pandas DataFrame。

资料来源:datatune/agent/agent.py:100-140

错误处理与恢复

错误检测机制

系统在每个步骤执行后都会检测异常:

try:
    # 执行步骤
    ...
except Exception as e:
    error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
    return error_msg

错误恢复流程

当步骤执行失败时,系统会:

  1. 记录错误信息和失败的步骤详情
  2. 生成包含错误上下文的修复提示词
  3. 请求LLM生成修正后的计划
def get_error_prompt(self, error_msg: str, failed_step: Dict) -> str:
    error_prompt = f"""
    The previous code execution failed with the following error:
    Error: {error_msg}
    Failed step: {failed_step}
    Provide the json plan with the corrected step.
    """

配置与调优

初始化配置

# 完整配置示例
agent = dt.Agent(
    llm=llm,
    verbose=True  # 启用DEBUG日志
)

# 自定义LLM配置
llm = OpenAI(
    model_name="gpt-4-turbo",
    api_key="your-api-key",
    rpm=1000,  # 自定义RPM限制
    tpm=50000  # 自定义TPM限制
)

日志级别控制

import logging

# 启用详细日志
agent = dt.Agent(llm, verbose=True)
logger.setLevel(logging.DEBUG)

# 禁用详细日志
agent = dt.Agent(llm, verbose=False)
logger.setLevel(logging.INFO)

最佳实践

  1. 明确指定列名:在目标描述中明确提及要操作的列名,有助于Agent生成准确的操作计划
  1. 分解复杂任务:对于非常复杂的任务,可以考虑分步执行以便于调试
  1. 选择合适的模型:简单的数据转换可使用gpt-3.5-turbo,复杂的语义任务建议使用gpt-4-turbogpt-4o
  1. 监控执行过程:通过设置verbose=True监控Agent的决策过程,便于理解系统行为
  1. 处理重复数据:系统内置了基于语义相似度的去重功能,可用于处理LLM输出中的重复结果

资料来源:[datatune/agent/__init__.py:1-30]()

LLM集成

datatune 的 LLM 集成模块提供了统一的接口来连接各种大语言模型(Large Language Model)提供者。该模块封装了不同 LLM 服务商的 API 调用逻辑,使上层应用能够以一致的方式使用不同的模型,而无需关心底层实现的差异。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 类继承结构

继续阅读本节完整说明和来源证据。

章节 核心类说明

继续阅读本节完整说明和来源证据。

章节 1. OpenAI

继续阅读本节完整说明和来源证据。

概述

datatune 的 LLM 集成模块提供了统一的接口来连接各种大语言模型(Large Language Model)提供者。该模块封装了不同 LLM 服务商的 API 调用逻辑,使上层应用能够以一致的方式使用不同的模型,而无需关心底层实现的差异。

主要功能包括:

  • 统一抽象:通过基类 LLM 提供一致的接口设计
  • 多提供商支持:支持 OpenAI、Ollama、VLLM、Azure、Mistral、Huggingface 等主流 LLM 服务
  • 速率限制管理:内置对各模型速率限制的支持,防止 API 调用超出限制
  • Token 计数与批处理:提供 token 计数和批量请求功能,优化 API 使用效率

架构设计

类继承结构

graph TD
    LLMBase[LLM 基类] --> Ollama
    LLMBase --> OpenAI
    LLMBase --> VLLM
    LLMBase --> Azure
    LLMBase --> Gemini
    LLMBase --> Mistral
    LLMBase --> Huggingface
    
    LLMBase --> 完成方法[_completion]
    LLMBase --> 批处理方法[_create_batched_prompts]
    LLMBase --> Token限制[MAX_RPM, MAX_TPM]

核心类说明

#### LLM 基类

LLM 类是所有 LLM 提供者的基类,定义通用接口和默认行为:

class LLM:
    def __init__(self, model_name: str, **kwargs) -> None:
        self.model_name = model_name  # 格式: "provider/model"
        self._base_model_name = model_name.split("/", 1)[1]  # 如 "gpt-3.5-turbo"

资料来源datatune/llm/llm.py:27-32

#### 速率限制

每个模型实例具有两个关键属性:

属性说明来源
MAX_RPM每分钟最大请求数 (Requests Per Minute)model_rate_limits
MAX_TPM每分钟最大 Token 数 (Tokens Per Minute)model_rate_limits
max_tokens单次请求最大 Token 数get_max_tokens()

资料来源datatune/llm/llm.py:62-64

支持的 LLM 提供者

1. OpenAI

OpenAI 模型使用 openai/ 前缀标识:

from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo", api_key="your-key")
llm = OpenAI(model_name="gpt-4o", api_key="your-key")

资料来源datatune/llm/llm.py:64-73

2. Ollama (本地部署)

适用于本地运行的 LLM,通过 HTTP API 连接:

from datatune.llm.llm import Ollama

llm = Ollama()  # 默认 gemma3:4b,localhost:11434
llm = Ollama(model_name="llama3:8b", api_base="http://localhost:11434")

资料来源datatune/llm/llm.py:53-61

3. VLLM

VLLM 服务器连接,支持 OpenAI 兼容 API:

from datatune.llm.llm import VLLM

llm = VLLM(
    model_name="meta-llama/Llama-3-8B-Instruct",
    api_base="http://localhost:8000/v1"
)

资料来源datatune/llm/llm.py:75-91

4. Azure OpenAI

Azure 托管的 OpenAI 模型:

from datatune.llm.llm import Azure

llm = Azure(
    model_name="gpt-3.5-turbo",
    api_key="your-key",
    api_base="https://xxx.openai.azure.com",
    api_version="2024-02-01"
)

资料来源datatune/llm/llm.py:93-106

5. Gemini

Google Gemini 模型集成:

from datatune.llm.llm import Gemini

llm = Gemini(model_name="gemini-1.5-pro", api_key="your-key")

6. Mistral

Mistral AI 模型:

from datatune.llm.llm import Mistral

llm = Mistral(model_name="mistral-tiny")
llm = Mistral(model_name="mistral-large-latest")

资料来源datatune/llm/llm.py:130-148

7. Huggingface

Hugging Face 推理端点:

from datatune.llm.llm import Huggingface

llm = Huggingface(model_name="meta-llama/Llama-3-8B-Instruct")

资料来源datatune/llm/llm.py:150-172

速率限制配置

预定义模型限制

model_rate_limits.py 文件包含各模型的默认速率限制:

模型系列TPM (Token/分钟)RPM (请求/分钟)
GPT-3.5-Turbo200,000500
GPT-410,000 - 30,000500
GPT-4.130,000 - 200,000500
GPT-4o30,000500
GPT-4.5-Preview125,0001,000

资料来源datatune/llm/model_rate_limits.py:1-85

自定义速率限制

可以在初始化时覆盖默认限制:

from datatune.llm.llm import OpenAI

# 自定义 RPM 和 TPM
llm = OpenAI(
    model_name="gpt-3.5-turbo",
    rpm=1000,
    tpm=300000
)

资料来源datatune/llm/llm.py:42-51

使用方式

基本调用

所有 LLM 实例可作为可调用对象使用:

llm = OpenAI(model_name="gpt-3.5-turbo", api_key="sk-xxx")

# 单次调用
result = llm("Hello, how are you?")

批处理调用

对于大量数据处理,系统支持自动批处理:

prompts = ["Process row 1", "Process row 2", "Process row 3"]
results = llm(prompts, prefix="Instruction: ", suffix="End.")

资料来源datatune/llm/llm.py:85-105

在 datatune 中的应用

datatune 的核心功能使用 LLM 集成进行数据转换:

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")

# 映射操作
mapped = dt.map(
    prompt="Extract categories from description",
    output_fields=["Category"],
    input_fields=["Description"]
)(llm, df)

# 过滤操作
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

资料来源README.md

核心方法

`_completion`

执行单次补全请求:

def _completion(self, prompt: str) -> Union[str, Exception]:
    messages = [{"role": "user", "content": prompt}]
    from litellm import completion
    
    response = completion(model=self.model_name, messages=messages, **self.kwargs)
    
    if isinstance(response, Exception):
        return response
    return response["choices"][0]["message"]["content"]

资料来源datatune/llm/llm.py:69-77

`get_max_tokens`

获取模型支持的最大 token 数:

def get_max_tokens(self) -> int:
    return get_max_tokens(self._base_model_name)

`_create_batched_prompts`

创建批处理提示词,用于优化多行数据处理:

def _create_batched_prompts(
    self,
    input_rows: List[str],
    user_batch_prefix: str,
    prompt_per_row: str,
    batch_suffix: str = None
) -> List[str]:
    # 计算 token 限制并分批
    # 返回批处理后的提示词列表

资料来源datatune/llm/llm.py:80-107

依赖说明

LLM 模块依赖 litellm 库实现底层 API 调用:

from litellm import get_max_tokens, token_counter, batch_completion

litellm 提供了对 100+ LLM 模型的统一接口,支持:

  • OpenAI 系列
  • Azure OpenAI
  • Anthropic
  • Google Vertex AI
  • AWS Bedrock
  • 本地部署模型 (Ollama, VLLM, TGI)
  • Hugging Face 推理端点

资料来源datatune/llm/llm.py:10-12

最佳实践

1. 选择合适的模型

场景推荐模型理由
快速原型gpt-3.5-turbo成本低、速度快
高质量输出gpt-4o性能与成本平衡
最大精度gpt-4.1最新最强模型
本地部署Ollama/VLLM隐私保护、无 API 成本

2. 配置合理的速率限制

根据实际 API 配额设置:

# 根据 API tier 设置
llm = OpenAI(
    model_name="gpt-4o",
    rpm=500,   # 根据配额调整
    tpm=30000  # 根据配额调整
)

3. 使用批处理优化

对于大量数据处理,使用内置批处理功能避免单次调用限制:

# 自动分批处理
results = llm(
    input_rows,
    prefix="任务: ",
    suffix="请以指定格式输出",
    optimized=True  # 启用优化模式
)

导出接口

模块入口文件提供统一导出:

from datatune.llm import *
# 导出: LLM, Ollama, OpenAI, VLLM, Azure, Gemini, Mistral, Huggingface

资料来源datatune/llm/__init__.py

总结

datatune 的 LLM 集成模块通过抽象化的设计,为用户提供了便捷的模型切换能力和统一的调用接口。开发者可以轻松地在不同 LLM 提供者之间切换,同时享受内置的速率限制保护和批处理优化功能。

来源:https://github.com/vitalops/datatune / 项目说明书

数据源支持

Datatune 是一个数据处理框架,通过集成大语言模型(LLM)来实现智能化的数据转换和清洗。该框架的核心设计理念是支持多种数据源,使得用户能够在不同的计算后端上执行 map、filter 等数据转换操作。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 核心抽象层

继续阅读本节完整说明和来源证据。

章节 数据流架构

继续阅读本节完整说明和来源证据。

章节 概述

继续阅读本节完整说明和来源证据。

概述

Datatune 是一个数据处理框架,通过集成大语言模型(LLM)来实现智能化的数据转换和清洗。该框架的核心设计理念是支持多种数据源,使得用户能够在不同的计算后端上执行 mapfilter 等数据转换操作。

数据源支持架构采用插件式设计,通过抽象层将具体的数据处理逻辑与底层数据源解耦。当前框架主要支持两大类数据源:Dask(用于大规模并行计算)和 Ibis(支持多种 SQL 后端)。

资料来源:datatune/agent/__init__.py:1-5

支持的数据源类型

Datatune 支持的数据源可分为以下几类:

数据源类型底层引擎特点适用场景
Dask DataFrameDask延迟计算、分布式执行、内存优化大规模单机或集群数据处理
DuckDBIbis + DuckDB嵌入式 OLAP、零配置快速分析、交互式查询
PostgreSQLIbis + PostgreSQL关系型数据库、企业级支持生产环境、事务处理
BigQueryIbis + BigQuery云原生、超大规模云端大数据分析

资料来源:README.md:1-50

架构设计

核心抽象层

Datatune 的数据源支持通过统一的抽象接口实现,主要包含以下核心组件:

# 抽象基类定义
class Agent(ABC):
    system_prompt: str = """You are Datatune Agent..."""

代理(Agent)类作为核心抽象,负责协调 LLM 与数据源的交互。不同的数据源实现继承该基类并实现具体的数据操作逻辑。

资料来源:datatune/agent/__init__.py:1-10

数据流架构

graph TD
    A[用户代码] --> B[Datatune API]
    B --> C{数据源类型}
    C -->|Dask| D[core/dask/]
    C -->|Ibis| E[core/ibis/]
    D --> F[map_dask.py / filter_dask.py]
    E --> G[map_ibis.py / filter_ibis.py]
    F --> H[LLM 调用]
    G --> H
    H --> I[结果聚合]
    I --> J[compute / execute]

Dask 数据源

概述

Dask 是 Datatune 默认支持的数据源,提供大规模并行计算能力。通过 Dask DataFrame,用户可以处理超出内存限制的数据集。

主要操作

Dask 数据源支持两类核心操作:

#### Map 操作

用于从现有列生成新列,常用于语义提取、分类、自然语言推理等场景。

import datatune as dt
import dask.dataframe as dd

df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

资料来源:README.md:1-30

#### Filter 操作

用于根据条件过滤行,基于 LLM 理解保留符合条件的记录。

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

资料来源:README.md:1-35

Dask 实现细节

#### 输出格式规范

Map 和 Filter 操作要求 LLM 返回特定格式的输出:

index=<row_index>|{key1: value1, key2: value2, ...}<endofrow>
  • 字符串必须用双引号包裹
  • 缺失值设置为 Nonenull
  • 每行输出必须以 <endofrow> 结尾

资料来源:datatune/core/dask/map_dask.py:1-30

#### 重复数据处理

Dask 实现包含智能去重机制,通过 dup_to_canon 映射处理重复数据:

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

只有规范索引的数据会被发送到 LLM,重复项的结果会自动复制。

资料来源:datatune/core/dask/map_dask.py:1-45

Ibis 数据源

概述

Ibis 是一个 Python 抽象层,支持多种 SQL 后端。通过 Ibis,Datatune 可以连接到 DuckDB、PostgreSQL、BigQuery 等数据库执行向量化查询。

连接方式

#### DuckDB

import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

资料来源:README.md:1-60

#### PostgreSQL

import ibis
con = ibis.postgres.connect(...)
table = con.table("my_table")

#### BigQuery

import ibis
con = ibis.bigquery.connect(...)
table = con.table("my_table")

Ibis 实现特点

#### 行号索引

Ibis 实现使用 ibis.row_number() 生成行索引:

indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
local_data = indexed_table.select("_ROW_ID_", input_col).execute()

资料来源:datatune/core/ibis/filter_ibis.py:1-40

#### 输出解析

Ibis 后端将 LLM 输出解析为布尔值用于过滤:

for res in results:
    try:
        d = ast.literal_eval(str(res).strip())
        if d:
            last_key = list(d.keys())[-1]
            flag = d[last_key]
            out_bools.append(flag)
    except Exception:
        out_bools.append(False)

资料来源:datatune/core/ibis/filter_ibis.py:1-60

Agent 自动化

功能概述

Datatune Agent 可以自动识别数据转换需求,智能选择合适的操作类型并执行。

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)

资料来源:README.md:1-45

计划生成规则

Agent 生成执行计划时遵循以下规则:

  1. Dask 操作选择:如果任务涉及添加列、分组、移位、按行或按列应用函数,使用 type: "dask"
  2. Primitive 操作选择:如果需要理解自然语言、提取或解释文本内容、进行语义推理,使用 type: "primitive"
  3. 简化优先:能用 Dask 完成的步骤优先使用 Dask,只有 Dask 无法实现时才使用 primitives

资料来源:datatune/agent/agent.py:1-100

支持的操作类型

操作类型说明适用场景
add_column从表达式创建新列数值计算、日期提取
apply_function对单列应用函数逐元素转换
rename_columns重命名列列名规范化
astype_column更改列数据类型类型转换
group_by_agg分组聚合统计计算
conditional_column条件列基于条件的新列

资料来源:datatune/agent/agent.py:1-150

LLM 集成

支持的模型提供商

Datatune 通过 litellm 库支持多种 LLM 提供商:

提供商类名默认模型说明
OpenAIOpenAIgpt-3.5-turboOpenAI GPT 系列
OllamaOllamagemma3:4b本地模型支持
vLLMVLLM自定义高性能推理
AzureAzure自定义Azure OpenAI 服务

资料来源:datatune/llm/llm.py:1-100

模型初始化示例

# OpenAI
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")

# Ollama (本地)
from datatune.llm.llm import Ollama
llm = Ollama()

# Azure
from datatune.llm.llm import Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)

资料来源:README.md:1-70

速率限制配置

框架内置了主要模型的速率限制配置:

model_rate_limits = {
    "gpt-3.5-turbo": {
        "tpm": 200_000,  # tokens per minute
        "rpm": 500,      # requests per minute
    },
    "gpt-4": {
        "tpm": 10_000,
        "rpm": 500,
    },
    # ...
}

资料来源:datatune/llm/model_rate_limits.py:1-50

使用工作流

完整数据处理流程

graph LR
    A[加载数据] --> B[定义 LLM]
    B --> C[应用 Map]
    C --> D[应用 Filter]
    D --> E[执行 Finalize]
    E --> F[保存结果]

代码示例

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

# 初始化
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# 映射操作
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# 过滤操作
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# 最终计算并保存
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

资料来源:README.md:1-40

性能优化

去重处理

对于包含重复行的数据集,框架采用智能去重策略:

  1. 使用 embedding 模型识别语义相似的记录
  2. 将相似记录聚类,只对规范记录调用 LLM
  3. 将结果自动复制到同簇的其他记录
class Deduplicator:
    def __init__(
        self,
        embedding_model: str = "text-embedding-3-small",
        sim_threshold: float = 0.90,
        top_k: int = 50,
    ):
        # ...

资料来源:datatune/core/deduplication.py:1-30

分区处理

Dask 数据源支持分区并行处理:

for pid in range(df.npartitions):
    part = df[column].get_partition(pid)
    task = dask.delayed(self._embed_and_write_partition)(part, pid, output_dir)

资料来源:datatune/core/deduplication.py:1-80

注意事项

  1. 数据量控制:LLM 调用按批次进行,需注意 token 限制
  2. 输出格式:严格遵循 index=<index>|{...}<endofrow> 格式
  3. 字符串转义:输出必须使用双引号,避免裸文本
  4. 错误处理:单个记录解析失败不影响整体流程
  5. 资源限制:生产环境建议配置合理的 tpmrpm 参数

资料来源:[datatune/agent/__init__.py:1-5]()

失败模式与踩坑日记

保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。

medium 能力判断依赖假设

假设不成立时,用户拿不到承诺的能力。

medium 维护活跃度未知

新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。

medium 下游验证发现风险项

下游已经要求复核,不能在页面中弱化。

medium 存在安全注意事项

用户安装前需要知道权限边界和敏感操作。

Pitfall Log / 踩坑日志

项目:vitalops/datatune

摘要:发现 7 个潜在踩坑项,其中 0 个为 high/blocking;最高优先级:能力坑 - 能力判断依赖假设。

1. 能力坑 · 能力判断依赖假设

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:README/documentation is current enough for a first validation pass.
  • 对用户的影响:假设不成立时,用户拿不到承诺的能力。
  • 建议检查:将假设转成下游验证清单。
  • 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
  • 证据:capability.assumptions | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | README/documentation is current enough for a first validation pass.

2. 维护坑 · 维护活跃度未知

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:未记录 last_activity_observed。
  • 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
  • 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
  • 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
  • 证据:evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | last_activity_observed missing

3. 安全/权限坑 · 下游验证发现风险项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:下游已经要求复核,不能在页面中弱化。
  • 建议检查:进入安全/权限治理复核队列。
  • 防护动作:下游风险存在时必须保持 review/recommendation 降级。
  • 证据:downstream_validation.risk_items | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

4. 安全/权限坑 · 存在安全注意事项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:No sandbox install has been executed yet; downstream must verify before user use.
  • 对用户的影响:用户安装前需要知道权限边界和敏感操作。
  • 建议检查:转成明确权限清单和安全审查提示。
  • 防护动作:安全注意事项必须面向用户前置展示。
  • 证据:risks.safety_notes | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | No sandbox install has been executed yet; downstream must verify before user use.

5. 安全/权限坑 · 存在评分风险

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:风险会影响是否适合普通用户安装。
  • 建议检查:把风险写入边界卡,并确认是否需要人工复核。
  • 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
  • 证据:risks.scoring_risks | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

6. 维护坑 · issue/PR 响应质量未知

  • 严重度:low
  • 证据强度:source_linked
  • 发现:issue_or_pr_quality=unknown。
  • 对用户的影响:用户无法判断遇到问题后是否有人维护。
  • 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
  • 防护动作:issue/PR 响应未知时,必须提示维护风险。
  • 证据:evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | issue_or_pr_quality=unknown

7. 维护坑 · 发布节奏不明确

  • 严重度:low
  • 证据强度:source_linked
  • 发现:release_recency=unknown。
  • 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
  • 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
  • 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
  • 证据:evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | release_recency=unknown

来源:Doramagic 发现、验证与编译记录