Doramagic Project Pack · Human Manual
datatune
Datatune is a data transformation framework that leverages Large Language Models (LLMs) to perform semantic operations on distributed data. The system provides a declarative interface for ...
Introduction to Datatune
Related topics: Getting Started, System Architecture
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Getting Started, System Architecture
Introduction to Datatune
Datatune is an AI-powered data transformation library that enables natural language-driven data processing operations on large-scale datasets. The library leverages Large Language Models (LLMs) to interpret human-readable instructions and automatically generate the appropriate data transformations, making complex data manipulation accessible to users without deep technical expertise.
Datatune supports multiple backend computational frameworks including Dask for distributed computing and Ibis for SQL-like operations across various databases. The system provides both explicit transformation primitives (Map, Filter) and an intelligent Agent that can automatically determine and chain multiple transformation steps from a single natural language prompt.
High-Level Architecture
graph TD
User[User Request] --> Agent[Datatune Agent]
User --> Primitives[Explicit Primitives]
Agent --> Planner[Planning Module]
Planner --> Steps[Transformation Steps]
Steps --> DaskOps[Dask Operations]
Steps --> PrimOps[Primitive Operations]
Steps --> CodeGen[Code Generation]
Primitives --> MapOp[Map Operation]
Primitives --> FilterOp[Filter Operation]
MapOp --> LLM[LLM Interface]
FilterOp --> LLM
LLM --> OpenAI[OpenAI Provider]
LLM --> Ollama[Ollama Provider]
LLM --> VLLM[VLLM Provider]
LLM --> Azure[Azure Provider]
DaskOps --> Dask[Dask DataFrame]
PrimOps --> Dask
MapOp --> Ibis[Ibis Backend]
FilterOp --> Ibis
Ibis --> DuckDB[DuckDB]
Ibis --> PostgreSQL[PostgreSQL]
Ibis --> BigQuery[BigQuery]The architecture follows a layered approach where the LLM abstraction layer provides a unified interface for multiple AI providers, while the transformation layer handles both explicit user-defined operations and AI-generated execution plans.
Core Components
LLM Interface Layer
The LLM module (datatune/llm/llm.py) provides a unified interface for interacting with various language model providers. The base LLM class handles rate limiting, token counting, and request batching, while provider-specific subclasses implement the actual API calls.
#### Supported LLM Providers
| Provider | Class | Default Model | Description |
|---|---|---|---|
| OpenAI | OpenAI | gpt-3.5-turbo | Standard OpenAI API integration |
| Ollama | Ollama | gemma3:4b | Local LLM server support |
| VLLM | VLLM | User-specified | High-performance vLLM inference |
| Azure | Azure | User-specified | Azure OpenAI Service |
The base LLM class automatically extracts rate limit configurations from model_rate_limits.py based on the model name. If a model is not found in the predefined limits, it defaults to GPT-3.5-turbo limits with a warning message.
from datatune.llm.llm import OpenAI, Ollama, Azure
# OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
# Ollama (local)
llm = Ollama()
# Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)
Sources: README.md:40-56
Rate Limiting Configuration
The model_rate_limits.py module defines tokens-per-minute (TPM) and requests-per-minute (RPM) limits for supported models:
| Model Family | TPM | RPM |
|---|---|---|
| GPT-3.5-Turbo | 200,000 | 500 |
| GPT-4 | 10,000 - 30,000 | 500 |
| GPT-4.1 | 30,000 - 200,000 | 500 |
| GPT-4.5-Preview | 125,000 | 1000 |
Sources: datatune/llm/model_rate_limits.py:1-120
Transformation Primitives
Datatune provides two fundamental transformation primitives that work with natural language prompts:
Map Operation
The Map primitive creates new columns by applying LLM-driven transformations to existing data. It takes input from specified columns and generates output fields based on the natural language prompt.
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
The Map operation constructs prompts with the following structure:
Your task is to MAP/CREATE new fields based on the following input record(s):
[input data]
Your response MUST be the entire input record as a valid Python dictionary in the format
'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys of expected new fields.
Sources: datatune/core/dask/map_dask.py:1-50 Sources: datatune/core/ibis/map_ibis.py:1-60
Filter Operation
The Filter primitive removes rows that do not meet specified conditions interpreted by the LLM:
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Name"]
)(llm, mapped)
The Filter operation adds a __filter__ key to each record, with True indicating the row should be kept and False indicating removal.
FILTERING CRITERIA:
[prompt]
DECISION: Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...} with added key called '__filter__'
with value either True to KEEP the record or False to REMOVE it.
Sources: datatune/core/dask/filter_dask.py:1-45 Sources: datatune/core/ibis/filter_ibis.py:1-50
Intelligent Agent
The Agent class provides a higher-level interface that automatically determines which operations to perform based on natural language requests. The agent can chain multiple Map, Filter, and Python code operations into a coherent execution plan.
import datatune as dt
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
Agent System Prompt
The Agent uses a structured system prompt that defines available operations and expected response formats:
system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
- pandas
- numpy
- dask
In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
Plan Execution Flow
graph TD
Request[Natural Language Request] --> Parse[Parse Request]
Parse --> Plan[Generate Execution Plan]
Plan --> Step1[Step 1: Dask/Primitive]
Step1 --> Step2[Step 2: Dask/Primitive]
Step2 --> StepN[Step N: ...]
StepN --> Execute[Execute Full Plan]
Execute --> Result[Finalized DataFrame]
Plan --> StepTypes{Step Type}
StepTypes -->|"dask"| DaskTemplate[Dask Template]
StepTypes -->|"primitive"| PrimTemplate[Primitive Template]The agent generates plans as JSON arrays with the following structure:
[
{
"type": "dask|primitive",
"operation": "operation_name",
"params": {},
"subprompt": "LLM prompt for primitive operations",
"input_fields": ["column1"],
"output_fields": ["new_column"]
}
]
Sources: datatune/agent/__init__.py:1-45 Sources: datatune/agent/agent.py:1-100
Data Source Backends
Datatune supports multiple computational backends for processing data at scale:
Dask Backend
The Dask backend provides distributed computing capabilities for pandas-like DataFrames:
import dask.dataframe as dd
df = dd.read_csv("data.csv")
Dask operations include:
add_column: Create new columns from expressionsapply_function: Apply element-wise functionsrename_columns: Rename using mappingsastype_column: Change data types
Ibis Backend
The Ibis backend enables SQL-like operations across various database backends:
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
Supported Ibis backends include DuckDB, PostgreSQL, and BigQuery.
Sources: README.md:60-75
Deduplication System
Datatune includes a sophisticated deduplication system that uses embedding-based similarity detection:
graph LR
Input[Input Data] --> Embed[Embedding Generation]
Embed --> Index[FAISS Index Build]
Index --> Cluster[Similarity Clustering]
Cluster --> Dedup[Deduplicated Output]Key parameters:
| Parameter | Default | Description |
|---|---|---|
embedding_model | text-embedding-3-small | Model for generating embeddings |
sim_threshold | 0.90 | Similarity threshold for clustering |
top_k | 50 | Top-K neighbors for clustering |
hnsw_m | 32 | HNSW index construction parameter |
ef_search | 64 | HNSW search parameter |
The system uses FAISS HNSW (Hierarchical Navigable Small World) indexes for efficient similarity search at scale.
Sources: datatune/core/deduplication.py:1-100
Workflow Summary
graph LR
A[Load Data] --> B[Choose Interface]
B --> C[Agent Mode]
B --> D[Primitives Mode]
C --> E[Natural Language]
D --> F[Explicit Operations]
E --> G[Auto Plan]
F --> H[Manual Plan]
G --> I[Execute]
H --> I
I --> J[LLM Processing]
J --> K[Finalize]
K --> L[Output]- Load Data: Import data into Dask or Ibis DataFrames
- Choose Interface: Use Agent for automatic planning or Primitives for explicit control
- Define Operations: Write natural language prompts or configure operations
- Execute: Process data with LLM integration
- Finalize: Convert lazy operations to computed results
Sources: README.md:1-50
Installation and Quick Start
pip install datatune
The complete workflow involves initializing an LLM, loading data, applying transformations, and finalizing results:
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
# Apply transformations
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Name"]
)(llm, mapped)
# Save results
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
Sources: README.md:20-55
Sources: README.md:40-56
Getting Started
Related topics: Introduction to Datatune
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Introduction to Datatune
Getting Started
Datatune is a data transformation library that leverages Large Language Models (LLMs) to perform complex data operations using natural language prompts. It enables users to transform, filter, map, and process data without writing extensive custom code.
Installation
Install datatune using pip:
pip install datatune
Sources: README.md:1
Core Concepts
Datatune provides two primary interfaces for data transformation:
| Interface | Description | Use Case |
|---|---|---|
| Primitives | Direct operations (Map, Filter) | Single, focused transformations |
| Agent | AI-powered automatic planning | Complex multi-step workflows |
Sources: README.md:20-30
Quick Start
Basic Setup
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
Sources: README.md:26-32
Map Operation
Use dt.map() to extract or derive new columns from existing data:
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
Sources: README.md:34-41
Filter Operation
Use dt.filter() to keep only rows matching criteria:
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Name"]
)(llm, mapped)
Sources: README.md:44-49
Finalize and Save
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")
Sources: README.md:51-53
Supported LLMs
Datatune supports multiple LLM providers through a unified interface.
OpenAI
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
Sources: README.md:58-61
Ollama (Local Models)
from datatune.llm.llm import Ollama
llm = Ollama()
Default configuration:
- Model:
gemma3:4b - API Base:
http://localhost:11434
Sources: README.md:63-65
Azure OpenAI
from datatune.llm.llm import Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)
Sources: README.md:67-70
Additional Providers
| Provider | Class | Notes |
|---|---|---|
| Mistral | Mistral | Requires mistral/ prefix in model name |
| Huggingface | Huggingface | Requires huggingface/ prefix |
| VLLM | VLLM | Auto-detects max token limit |
Sources: datatune/llm/llm.py:1-150
Agent Mode
The Agent provides an intelligent, automated approach to data transformation. It automatically determines which operations to use and chains multiple transformations from a single natural language prompt.
Basic Agent Usage
import datatune as dt
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)
df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)
Sources: README.md:74-82
Agent Capabilities
The agent automatically:
- Determines which operations to use (map, filter, etc.)
- Chains multiple transformations
- Handles complex multi-step tasks from a single prompt
- Generates and executes Python code along with row-level primitives (Map, Filter, etc) if required
Sources: README.md:84-88
Agent System Prompt
The agent has access to:
- Python builtins
- pandas
- numpy
- dask
- datatune library primitives (Map, Filter)
Sources: datatune/agent/__init__.py:1-35
Data Sources
Datatune supports multiple data processing backends.
Dask DataFrames
import dask.dataframe as dd
df = dd.read_csv("data.csv")
Ibis Backend
Ibis provides connectivity to multiple databases:
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
Sources: README.md:93-102
Architecture Overview
graph TD
A[User Prompt] --> B[Datatune API]
B --> C{Operation Type}
C -->|Map| D[map_dask.py / map_ibis.py]
C -->|Filter| E[filter_dask.py / filter_ibis.py]
C -->|Agent| F[Agent Planning]
D --> G[LLM Batch Processing]
E --> G
F -->|Dask Ops| H[Dask Templates]
F -->|Primitives| I[Map / Filter Primitives]
G --> J[Output DataFrame]
H --> J
I --> JRate Limits Configuration
When using LLMs, datatune respects model-specific rate limits:
| Parameter | Description |
|---|---|
tpm | Tokens per minute |
rpm | Requests per minute |
Sources: datatune/llm/model_rate_limits.py:1-50
Default rate limits for unknown models fall back to gpt-3.5-turbo settings with a warning logged.
Sources: datatune/llm/llm.py:20-35
Primitive Operations
Map Operation
Creates new columns by applying LLM-based transformations to input columns.
Parameters:
| Parameter | Type | Description |
|---|---|---|
prompt | str | Natural language description of the transformation |
input_fields | list | Input column names |
output_fields | list | Names for new columns |
Filter Operation
Removes rows that don't meet the specified criteria.
Parameters:
| Parameter | Type | Description |
|---|---|---|
prompt | str | Natural language filtering criteria |
input_fields | list | Columns to evaluate |
Next Steps
- Documentation - Complete guides and API reference
- Examples - Real-world use cases
- Discord - Community support
- Issues - Report bugs and request features
Sources: README.md:105-113
Sources: README.md:1
System Architecture
Related topics: Dask Backend, Ibis Backend, LLM Integration
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Dask Backend, Ibis Backend, LLM Integration
System Architecture
Overview
Datatune is a data transformation framework that leverages Large Language Models (LLMs) to perform semantic operations on distributed data. The system provides a declarative interface for common data manipulation tasks like mapping, filtering, and deduplication while abstracting the complexity of distributed computing and LLM interaction.
The architecture follows a modular design with three primary layers:
| Layer | Purpose | Key Components |
|---|---|---|
| LLM Layer | Model abstraction and rate limiting | LLM, OpenAI, Ollama, VLLM, Azure |
| Core Operations Layer | Data transformation primitives | Map, Filter, Reduce, Deduplication |
| Agent Layer | High-level orchestration and planning | Agent, Plan execution engine |
Sources: datatune/llm/llm.py:1-100
High-Level Architecture Diagram
graph TB
subgraph "Client Layer"
User[User Code]
end
subgraph "LLM Layer"
LLM[Base LLM Class]
OpenAI[OpenAI Provider]
Ollama[Ollama Provider]
VLLM[VLLM Provider]
Azure[Azure Provider]
RateLimits[model_rate_limits]
end
subgraph "Core Operations Layer"
Map[Map Operation]
Filter[Filter Operation]
Reduce[Reduce Operation]
Dedupe[Deduplication]
end
subgraph "Data Backend Layer"
Dask[Dask DataFrame]
Ibis[Ibis (DuckDB, PostgreSQL, BigQuery)]
end
subgraph "Agent Layer"
Agent[Agent Orchestrator]
Planner[Plan Generator]
Executor[Plan Executor]
end
User --> Agent
User --> Map
User --> Filter
Agent --> Planner
Agent --> Executor
Agent --> LLM
Map --> LLM
Filter --> LLM
Map --> Dask
Map --> Ibis
Filter --> Dask
Filter --> Ibis
LLM --> RateLimitsLLM Layer
The LLM layer provides a unified interface for interacting with various LLM providers while handling rate limiting and token counting.
Class Hierarchy
graph TD
LLM[LLM Base Class] --> Ollama[Ollama]
LLM --> OpenAI[OpenAI]
LLM --> VLLM[VLLM]
LLM --> Azure[Azure]Base LLM Class
The LLM class serves as the foundation for all LLM providers. It handles model naming, rate limit configuration, and request batching.
| Parameter | Type | Description |
|---|---|---|
model_name | str | Model identifier in format provider/model (e.g., openai/gpt-3.5-turbo) |
tpm | int | Tokens per minute limit |
rpm | int | Requests per minute limit |
api_key | str | API authentication key |
api_base | str | Base URL for API endpoint |
Sources: datatune/llm/llm.py:70-95
Provider Implementations
OpenAI Provider
class OpenAI(LLM):
def __init__(
self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
):
kwargs.update({"api_key": api_key})
super().__init__(model_name=f"openai/{model_name}", **kwargs)
Sources: datatune/llm/llm.py:96-103
Ollama Provider (Local)
class Ollama(LLM):
def __init__(
self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
) -> None:
super().__init__(
model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
)
Sources: datatune/llm/llm.py:81-87
VLLM Provider
The VLLM provider automatically retrieves the max_model_len from the API and configures token limits accordingly.
class VLLM(LLM):
def __init__(self, model_name: str, api_base: str = "http://localhost:8000/v1", **kwargs):
import httpx
resp = httpx.get(f"{api_base}/models")
max_model_len = resp.json()["data"][0]["max_model_len"]
kwargs.update({"api_base": api_base, "api_key": "dummy", "max_tokens": max_model_len})
super().__init__(model_name=f"openai/{model_name}", **kwargs)
Sources: datatune/llm/llm.py:105-118
Rate Limiting
The system maintains predefined rate limits for various models in model_rate_limits.py:
| Model Family | TPM | RPM |
|---|---|---|
gpt-3.5-turbo | 200,000 | 500 |
gpt-4 | 10,000 | 500 |
gpt-4-turbo | 30,000 | 500 |
gpt-4.1-mini/nano | 200,000 | 500 |
Sources: datatune/llm/model_rate_limits.py:1-50
Core Operations Layer
The core operations layer provides primitive transformations that can be applied to distributed data.
Map Operation
The Map operation creates new columns by applying LLM-based transformations to existing data.
graph LR
A[Input DataFrame] --> B[Serialize Rows]
B --> C[Batch to LLM]
C --> D[Parse Responses]
D --> E[Extract New Columns]
E --> F[Output DataFrame]Input Format: Rows are serialized to Python dictionaries with the following structure:
index=<row_index>|{key1: value1, key2: value2, ...}
Output Format: LLM returns dictionaries with original keys plus new field keys:
index=<row_index>|{key1: value1, key2: value2, new_field: new_value}
Sources: datatune/core/dask/map_dask.py:1-50
Key Features:
- Handles duplicate rows via canonical mapping
- Supports batched API calls for efficiency
- Falls back to canonical row values for duplicates
Filter Operation
The Filter operation removes rows that do not meet specified criteria using LLM-based evaluation.
graph LR
A[Input DataFrame] --> B[Serialize Rows]
B --> C[LLM Decision]
C --> D{Keep Row?}
D -->|Yes| E[Include in Output]
D -->|No| F[Exclude Row]Decision Format: Each row receives a __filter__ key:
index=<row_index>|{..., __filter__: True/False}
Sources: datatune/core/dask/filter_dask.py:1-50
Deduplication
The deduplication system uses embeddings and FAISS for efficient similarity search:
| Component | Description |
|---|---|
embedding_model | Model for generating text embeddings |
sim_threshold | Minimum similarity score (default: 0.90) |
top_k | Number of neighbors to search |
hnsw_m | HNSW index parameter for m |
ef_search | HNSW search parameter |
Workflow:
- Embed column values in batches of 256
- Build FAISS HNSW index per partition
- Search for similar records above threshold
- Cluster duplicates with canonical ID
Sources: datatune/core/deduplication.py:1-80
Agent Layer
The Agent layer provides high-level orchestration, automatically determining which operations to use and generating execution plans.
graph TD
A[User Goal] --> B[Plan Generator]
B --> C[JSON Plan]
C --> D[Plan Executor]
D --> E[Step 1: Operation]
D --> F[Step 2: Operation]
D --> G[Step N: Operation]
E --> H[Finalize]
F --> H
G --> H
H --> I[Computed Result]Plan Structure
Plans are generated as JSON arrays of steps:
[
{
"type": "primitive",
"operation": "map",
"params": {
"subprompt": "Extract category from industry",
"input_fields": ["Industry"],
"output_fields": ["Category"]
}
},
{
"type": "dask",
"operation": "add_column",
"params": {
"new_column": "Year",
"expression": "df['Date'].dt.year"
}
}
]
Sources: datatune/agent/agent.py:80-120
Step Types
| Type | Description | Operations |
|---|---|---|
primitive | LLM-based transformations | map, filter |
dask | Native Dask operations | add_column, group_by, apply_function, rename_columns, astype_column |
Template System
Templates define how operations are translated to executable code:
TEMPLATE = {
"primitive": {
"Map": "df = dt.Map(...)(self.llm, df)",
"Filter": "df = dt.Filter(...)(self.llm, df)"
},
"dask": {
"add_column": "df = df.assign({new_column}=lambda x: {expression})",
"group_by_agg": "df = df.groupby({group_columns}).agg({aggregations})"
}
}
Sources: datatune/agent/agent.py:40-70
Data Backend Support
Dask Integration
Datatune primarily uses Dask for distributed computing. Operations are executed partition-by-partition with lazy evaluation:
# Example: Map operation on Dask DataFrame
df = dd.read_csv("data.csv")
mapped = dt.map(
prompt="Extract sentiment from text",
output_fields=["sentiment"],
input_fields=["text"]
)(llm, df)
# Finalize and compute
result = dt.finalize(mapped).compute()
Ibis Integration
Ibis provides support for multiple backends (DuckDB, PostgreSQL, BigQuery):
# DuckDB example
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")
# Apply map operation
mapped_table = dt.map(
prompt="Extract category",
output_fields=["category"],
input_fields=["description"]
)(llm, table)
Sources: datatune/core/ibis/map_ibis.py:1-50
Request Batching
The LLM layer implements intelligent batching to optimize API usage:
graph TD
A[Input Rows] --> B[Calculate Token Budget]
B --> C{Within Limits?}
C -->|Yes| D[Batch All]
C -->|No| E[Split into Batches]
E --> F[Execute Batch 1]
E --> G[Execute Batch 2]
F --> H[Merge Results]
G --> H
D --> HToken Calculation:
- Prefix/suffix tokens are calculated via
token_counter nrows_per_api_calldetermines optimal batch sizes- Rate limits (TPM/RPM) are enforced per model
Sources: datatune/llm/llm.py:30-70
Error Handling
The system implements robust error handling at multiple levels:
| Layer | Error Handling |
|---|---|
| Agent | Catches execution errors, returns step number and error message |
| Core Operations | Gracefully handles malformed LLM responses with fallback to empty dict |
| LLM | Retries with exponential backoff on transient failures |
def parse_filter_output(output: Union[str, Exception], err: bool = True) -> Optional[bool]:
try:
d = ast.literal_eval(str(output).strip())
if d:
last_key = list(d.keys())[-1]
flag = d[last_key]
return flag
except Exception:
return None
Sources: datatune/core/dask/filter_dask.py:50-65
Configuration Options
LLM Configuration
| Option | Default | Description |
|---|---|---|
model_name | gpt-3.5-turbo | Model identifier |
tpm | Model-specific | Tokens per minute limit |
rpm | Model-specific | Requests per minute limit |
api_key | None | API authentication |
api_base | Provider-specific | API endpoint URL |
Operation Configuration
| Option | Description |
|---|---|
prompt | Natural language instruction |
input_fields | Columns to use as input |
output_fields | New columns to create (Map only) |
optimized | Enable batching optimization |
Summary
Datatune's system architecture provides:
- Provider Abstraction: Unified interface across OpenAI, Ollama, VLLM, and Azure
- Rate Limiting: Automatic enforcement of TPM/RPM limits per model
- Distributed Execution: Partition-based processing for large datasets
- Multi-Backend Support: Native support for Dask and Ibis backends
- Intelligent Planning: Agent-based orchestration for complex multi-step transformations
- Semantic Operations: LLM-powered Map and Filter for text understanding
- Deduplication: Embedding-based similarity search with FAISS
Sources: datatune/llm/llm.py:1-100
Map Operations
Related topics: Filter Operations, Reduce Operations, Dask Backend
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Filter Operations, Reduce Operations, Dask Backend
Map Operations
Map Operations in Datatune enable LLM-powered column transformations on distributed dataframes. Users describe desired transformations in natural language, and the system generates new columns by leveraging large language models to process each row semantically.
Overview
The Map operation is a core primitive in Datatune that bridges natural language instructions with data transformations. It accepts a user-defined prompt describing what new columns to create, identifies relevant input columns, and produces one or more output columns populated by LLM inference. Sources: datatune/__init__.py:3
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
Sources: README.md:18-29
Architecture
The Map system is designed with a pluggable backend architecture, supporting both Dask and Ibis execution contexts. Each backend implements the same logical interface while adapting to its specific dataframe API.
Component Flow
graph TD
A[User Prompt + Input/Output Fields] --> B[map Function Entry Point]
B --> C{Backend Type}
C -->|Dask| D[map_dask.py]
C -->|Ibis| E[map_ibis.py]
D --> F[LLM Batch Processor]
E --> F
F --> G[Parse LLM Output]
G --> H[Merge Results to DataFrame]
H --> I[Output DataFrame with New Columns]Backend Implementations
| Backend | File | Purpose |
|---|---|---|
| Dask | datatune/core/dask/map_dask.py | Distributed DataFrame operations using Dask |
| Ibis | datatune/core/ibis/map_ibis.py | SQL-like backend supporting DuckDB, PostgreSQL, BigQuery |
API Reference
Function Signature
def map(
prompt: str,
output_fields: List[str],
input_fields: Optional[List[str]] = None,
**kwargs
) -> Callable
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt | str | Yes | Natural language description of the transformation to apply |
output_fields | List[str] | Yes | Names of columns to create in the output |
input_fields | List[str] | Optional | Input columns to pass to the LLM; if omitted, all columns are used |
Sources: datatune/core/dask/map_dask.py:1-50
Return Value
Returns a callable that accepts (llm, dataframe) and returns a transformed dataframe with the new columns appended.
Dask Backend Implementation
Data Flow
graph LR
A[Partitioned DataFrame] --> B[Serialize Input Columns]
B --> C[Batch Rows to LLM]
C --> D[LLM Inference]
D --> E[Parse Dictionary Output]
E --> F[Assign to Output Columns]
F --> G[Merge Canonical Results to Duplicates]
G --> H[Return Partitioned DataFrame]Prompt Construction
The Dask implementation constructs prompts using a prefix-suffix pattern to ensure consistent LLM responses.
Prefix Structure:
Use the following context to create new columns from existing columns.
Sources: datatune/core/dask/map_dask.py:10-15
Suffix Structure:
suffix = (
f"{os.linesep}{os.linesep}"
"Your response MUST be the entire input record as a valid Python dictionary in the format"
"'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys of expected new fields if any."
"ALWAYS START YOUR RESPONSE WITH 'index=<row_index>|' WHERE <row_index> IS THE INDEX OF THE ROW." \
"IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO null" \
"'index=<row_index>|{key1: None, key2: value2, ...}'"
)
Sources: datatune/core/dask/map_dask.py:22-30
Duplicate Handling
The Dask Map implementation includes sophisticated duplicate handling for semantic deduplication scenarios. When clusters of duplicate rows exist, only the canonical row is sent to the LLM, and results are propagated to duplicates.
dup_to_canon = {
dup: c["canonical_id"]
for c in clusters
for dup in c["duplicate_ids"]
}
canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]
llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)
df.loc[canonical_idx, llm_output_column] = llm_out
for dup, canon in dup_to_canon.items():
df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
Sources: datatune/core/dask/map_dask.py:35-50
Output Parsing
The LLM output is parsed using parse_llm_output, which converts the string response to a Python dictionary.
def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
"""
Parses the LLM output string into a Python dictionary.
"""
Sources: datatune/core/dask/map_dask.py:75-85
Ibis Backend Implementation
Data Flow
graph LR
A[Ibis Table] --> B[Add Row ID Column]
B --> C[Execute to Local Data]
C --> D[Convert to List]
D --> E[Batch to LLM]
E --> F[Parse Results]
F --> G[Create MemTable]
G --> H[Join Back to Original Table]Row Indexing
Ibis operations require explicit row indexing since SQL tables don't maintain native pandas-like indices.
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
local_data = indexed_table.select("_ROW_ID_", input_col).execute()
input_list = local_data[input_col].tolist()
Sources: datatune/core/ibis/map_ibis.py:25-32
Result Processing
Raw LLM outputs are parsed and converted to JSON strings for storage:
processed_results = []
for res in raw_results:
try:
py_dict = ast.literal_eval(str(res).strip())
processed_results.append(json.dumps(py_dict))
except Exception:
processed_results.append("{}")
Sources: datatune/core/ibis/map_ibis.py:45-52
LLM Integration
Supported LLM Providers
| Provider | Class | Default Model |
|---|---|---|
| OpenAI | OpenAI | gpt-3.5-turbo |
| Ollama | Ollama | gemma3:4b |
| Azure | Azure | Configurable |
| vLLM | VLLM | Configurable |
Sources: datatune/llm/llm.py:1-60
Batching and Rate Limiting
The LLM layer implements token and request rate limiting based on model-specific constraints defined in model_rate_limits.py.
model_rate_limits = {
"gpt-3.5-turbo": {
"tpm": 200_000,
"rpm": 500,
},
"gpt-4": {
"tpm": 10_000,
"rpm": 500,
},
"gpt-4o": {
"tpm": 30_000,
"rpm": 500,
},
}
Sources: datatune/llm/model_rate_limits.py:1-20
Batch Processing
The LLM batches multiple rows into single API calls for efficiency:
for i, prompt in enumerate(input_rows):
q # Batch construction continues...
Sources: datatune/llm/llm.py:30-45
Agent Integration
The Datatune Agent can automatically generate Map operations as part of multi-step data transformation pipelines.
Agent System Prompt
class Agent(ABC):
system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
- pandas
- numpy
- dask
In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
Map Example:
```python
import datatune as dt
import dask.dataframe as dd
df = dd.read_csv("path/to/data.csv")
map = dt.Map(prompt="Your prompt here")
llm = dt.LLM(model_name="gpt-3.5-turbo")
mapped_df = map(llm, df)
mapped_df = dt.finalize(mapped_df)
```
"""
Sources: datatune/agent/__init__.py:1-30
Plan Generation
When the Agent generates a plan requiring Map operations, it creates steps with the following structure:
{
"type": "primitive",
"operation": "map",
"params": {
"subprompt": "Extract category and sub-category from industry",
"input_fields": ["Industry"],
"output_fields": ["Category","Sub-Category"]
},
}
Sources: datatune/agent/agent.py:50-65
Usage Examples
Basic Column Extraction
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-4")
df = dd.read_csv("customers.csv")
# Extract sentiment and category from review text
mapped = dt.map(
prompt="Analyze the customer feedback and extract sentiment (positive/negative/neutral) and main topic",
output_fields=["Sentiment", "Topic"],
input_fields=["CustomerFeedback"]
)(llm, df)
result = dt.finalize(mapped)
result.compute().to_csv("enriched_customers.csv")
Multi-Column Generation
# Generate multiple derived columns in a single Map operation
mapped = dt.map(
prompt="Parse the product description to identify: 1) the material, 2) primary color, 3) target demographic",
output_fields=["Material", "Color", "Demographic"],
input_fields=["ProductDescription", "ProductName"]
)(llm, df)
With Agent
import datatune as dt
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-4")
agent = dt.Agent(llm)
df = dd.read_csv("data.csv")
# Agent automatically determines when to use Map operations
result = agent.do("Add ProfitMargin column and categorize by revenue tier", df)
Sources: README.md:35-45
Best Practices
Input Field Selection
| Practice | Rationale |
|---|---|
| Provide specific input fields | Reduces token usage and improves accuracy |
| Include context-rich columns | Helps LLM understand the domain |
| Avoid overly large inputs | Prevents token limit issues |
Prompt Engineering
- Be explicit about the expected output format in output field names
- Include examples in prompts for complex transformations
- Break complex extractions into multiple Map operations when needed
Performance Considerations
| Factor | Impact | Recommendation |
|---|---|---|
| Batch size | Throughput vs. latency | Use default batching for most cases |
| Model selection | Quality vs. speed | Use gpt-3.5-turbo for bulk, gpt-4 for complex tasks |
| Partition count | Parallelism | Match to cluster size for Dask operations |
See Also
- Filter Operations - Row-level filtering using natural language
- Agent - Automatic operation selection and chaining
- Deduplication - Semantic deduplication with Map integration
- Supported LLMs - Complete list of supported providers
Sources: README.md:18-29
Filter Operations
Related topics: Map Operations, Reduce Operations, Ibis Backend
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Map Operations, Reduce Operations, Ibis Backend
Filter Operations
Filter Operations in Datatune enable intelligent, LLM-powered row filtering based on natural language criteria. Instead of writing explicit boolean conditions, users describe what rows to keep in plain English, and the LLM determines which records match the specified criteria.
Overview
The Filter module provides a declarative interface for semantic row-level filtering of data. It supports multiple backends (Dask DataFrames and Ibis tables) and uses LLMs to make filtering decisions based on natural language prompts.
Key capabilities:
- Natural language filter criteria specification
- Multi-backend support (Dask, Ibis)
- Semantic deduplication integration
- Parallelized LLM inference
- Structured output parsing
Architecture
The Filter system follows a dispatcher pattern that routes to backend-specific implementations based on the input data type.
graph TD
A[User calls filter prompt, input_fields] --> B[datatune.filter function]
B --> C{Data Type Detection}
C -->|dask.dataframe.DataFrame| D[_filter_dask]
C -->|ibis.Table| E[_filter_ibis]
D --> F[LLM Batch Processing]
E --> F
F --> G[Parse Filter Output]
G --> H[Apply Boolean Mask]
H --> I[Filtered DataFrame/Table]Component Overview
| Component | File | Responsibility |
|---|---|---|
filter() | datatune/core/filter.py | Main entry point, type dispatch |
_filter_dask() | datatune/core/dask/filter_dask.py | Dask DataFrame filtering |
_filter_ibis() | datatune/core/ibis/filter_ibis.py | Ibis table filtering |
| LLM Integration | datatune/llm/llm.py | Batch inference and token management |
API Reference
`dt.filter()`
Main filter function exported from datatune.
def filter(*, prompt, input_fields=None, clusters=None):
def apply(llm, data):
# Implementation dispatch
...
return apply
#### Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt | str | Yes | Natural language description of filter criteria |
input_fields | List[str] | No | Columns to use for filtering decisions |
clusters | List[Dict] | No | Semantic deduplication clusters for propagating filter decisions |
#### Returns
Returns a callable that accepts (llm, data) and returns a filtered DataFrame or Table.
Dask Implementation
The Dask backend processes rows in parallel across partitions, leveraging the Dask execution graph for distributed computation.
Source Files
- Main implementation:
datatune/core/dask/filter_dask.py - Dispatcher:
datatune/core/filter.py
Workflow
graph LR
A[Input DataFrame] --> B[Serialize Input Columns]
B --> C[Handle Clusters]
C --> D[Extract Canonical Indices]
D --> E[LLM Batch Inference]
E --> F[Parse Boolean Output]
F --> G[Propagate to Duplicates]
G --> H[Apply Boolean Mask]Key Functions
#### _filter_dask()
Core filtering function for Dask DataFrames:
def _filter_dask(
prompt: str,
input_fields: Optional[List[str]] = None,
clusters: Optional[List[Dict]] = None,
):
# Validates inputs, constructs prompts, invokes LLM
#### parse_filter_output()
Parses LLM output string into boolean values:
def parse_filter_output(
output: Union[str, Exception],
err: bool = True
) -> Optional[bool]:
Ibis Implementation
The Ibis backend executes filtering operations at the database level, enabling pushdown to various SQL backends (DuckDB, PostgreSQL, BigQuery, etc.).
Source Files
- Implementation:
datatune/core/ibis/filter_ibis.py
Workflow
graph TD
A[Input Table] --> B[Add Row ID Column]
B --> C[Select Input Columns]
C --> D[Execute to Local]
D --> E[LLM Batch Inference]
E --> F[Parse Filter Results]
F --> G[Build Boolean Filter]
G --> H[Filter Original Table]
H --> I[Remove Row ID Column]Key Functions
#### _filter_ibis()
Core filtering function for Ibis tables:
def _filter_ibis(
table: ibis.Table,
prompt: str,
input_fields: Optional[List[str]] = None,
):
indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))
Uses ibis.row_number() to assign sequential row IDs that match LLM response indexing. Sources: datatune/core/ibis/filter_ibis.py:28
LLM Prompt Format
The Filter module constructs specialized prompts to guide LLM responses for filtering decisions.
Prompt Structure
FILTERING CRITERIA:
<prompt>
[input record dictionary]
DECISION: Your response MUST be the entire input record as Python dictionary in the format:
index=<row_index>|{key1: value1, ...}<endofrow> with added key called '__filter__'
with value either True to KEEP the record or False to REMOVE it.
Output Format
The LLM must return records with an added __filter__ key:
index=0|{'Name': 'Acme Corp', 'Country': 'USA', '__filter__': True}<endofrow>
index=1|{'Name': 'Bad Inc', 'Country': 'UK', '__filter__': False}<endofrow>
#### Response Requirements
| Requirement | Description | |
|---|---|---|
| Index prefix | Each response starts with `index=<row_index>\ | ` |
| Complete record | Include all original keys plus __filter__ | |
| Boolean value | __filter__ must be True (keep) or False (remove) | |
| Missing values | Set to None if column value does not exist |
Sources: datatune/core/dask/filter_dask.py:14-19
Semantic Deduplication Integration
When clusters parameter is provided, filter decisions propagate from canonical records to their duplicates.
Cluster Structure
clusters = [
{
"canonical_id": 5,
"duplicate_ids": [12, 45, 78]
},
...
]
Propagation Logic
graph TD
A[Clusters Input] --> B[Build dup_to_canon Map]
B --> C[Extract Canonical Indices]
C --> D[Filter Only Canonical Rows]
D --> E[LLM Inference on Canonical]
E --> F[Propagate Decisions to Duplicates]
F --> G[Merge Results]The mapping builds:
dup_to_canon = {
dup: c["canonical_id"]
for c in clusters
for dup in c["duplicate_ids"]
}
Sources: datatune/core/dask/filter_dask.py:41-46
Usage Examples
Basic Dask Filtering
import datatune as dt
import dask.dataframe as dd
from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
# Keep only electronics products
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Name", "Description"]
)(llm, df)
result = dt.finalize(filtered)
Ibis Table Filtering
import datatune as dt
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("customers")
# Filter using natural language
filtered = dt.filter(
prompt="Keep only customers from African countries",
input_fields=["Country", "Region"]
)(llm, table)
With Semantic Deduplication
# First deduplicate to find clusters
deduplicator = dt.SemanticDeduplicator(llm=llm)
clusters = deduplicator.find_duplicates(df, columns=["Name", "Address"])
# Filter with cluster awareness
filtered = dt.filter(
prompt="Keep only verified organizations",
input_fields=["Status", "VerificationDate"],
clusters=clusters
)(llm, df)
Error Handling
The parse_filter_output() function handles various error cases:
| Error Type | Handling | Return Value |
|---|---|---|
| Parse failure | Returns None if err=False | None |
| Exception | Returns exception if err=True | Original exception |
| Empty output | Returns None | None |
| Malformed dict | Falls back to None | None |
Sources: datatune/core/dask/filter_dask.py:64-79
Rate Limiting
Filter operations respect LLM rate limits defined in datatune/llm/model_rate_limits.py:
| Model | TPM (tokens/min) | RPM (requests/min) |
|---|---|---|
| gpt-3.5-turbo | 200,000 | 500 |
| gpt-4 | 10,000 | 500 |
| gpt-4-turbo | 30,000 | 500 |
The batching system automatically respects these limits during batch LLM inference.
Performance Considerations
Batching Strategy
Rows are batched for LLM inference to optimize throughput:
# From datatune/llm/llm.py
for i, prompt in enumerate(input_rows):
# Batch prompts based on token limits
q # Accumulate until batch size reached
Partition Parallelism (Dask)
Dask DataFrames process partitions in parallel:
graph LR
A[Partition 0] --> E[LLM Batch]
B[Partition 1] --> F[LLM Batch]
C[Partition 2] --> G[LLM Batch]
E --> H[Results Merged]
F --> H
G --> HBackend Selection
| Backend | Best For | Limitations |
|---|---|---|
| Dask | Large datasets, local processing | Memory-bound |
| Ibis | Database pushdown, SQL backends | Requires database connection |
See Also
- Map Operations - Creating new columns via LLM
- Semantic Deduplication - Finding duplicate records
- Agent - Automatic operation planning
- LLM Configuration - LLM setup and backends
Reduce Operations
Related topics: Map Operations, Filter Operations
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Map Operations, Filter Operations
Reduce Operations
Overview
Reduce Operations in datatune provide a mechanism for aggregating and condensing data using configurable action handlers. The reduce system follows a plugin-based architecture where specific aggregation or reduction behaviors are registered as actions and invoked through a unified interface.
The core purpose of reduce operations is to transform a DataFrame into aggregated results by applying registered reduction actions. This enables users to perform complex aggregation tasks that go beyond standard group-by operations, leveraging LLM-powered semantic understanding when needed.
Sources: datatune/core/reduce.py:1-22
Architecture
Action Registry Pattern
The reduce system implements a decorator-based registry pattern for action registration:
graph TD
A[User calls reduce df action] --> B[reduce function invoked]
B --> C[get_action looks up registry]
C --> D{Action found?}
D -->|Yes| E[Instantiate action class]
D -->|No| F[Raise ValueError]
E --> G[Execute reduction on DataFrame]
G --> H[Return aggregated result]The _ACTIONS dictionary serves as a central registry mapping action names to their implementation classes. The register_action decorator allows new actions to be dynamically registered without modifying core logic.
Sources: datatune/core/reduce.py:1-22
Class Diagram
classDiagram
class _ACTIONS {
Dict[str, Type]
}
class register_action {
+decorator(name: str)
}
class get_action {
+function(name: str) Type
}
class reduce {
+function(df, action, **kwargs)
}
register_action ..> _ACTIONS : registers
get_action ..> _ACTIONS : queries
reduce ..> get_action : usesAPI Reference
Main Function
def reduce(df, *, action: str, **kwargs):
cls = get_action(action)
reducer = cls(**kwargs)
return reducer(df)
#### Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
df | dask.dataframe.DataFrame | Yes | The input Dask DataFrame to reduce |
action | str | Yes | The name of the registered reduction action to execute |
**kwargs | Various | No | Additional keyword arguments passed to the action constructor |
#### Returns
| Return Type | Description |
|---|---|
| Any | Returns the result of the specific reduction action (varies by registered action) |
#### Exceptions
| Exception | Condition |
|---|---|
ValueError | When the specified action name is not registered |
Sources: datatune/core/reduce.py:19-22
Register Action Decorator
def register_action(name):
def decorator(cls):
_ACTIONS[name] = cls
return cls
return decorator
#### Usage Pattern
@register_action("my_reducer")
class MyReducer:
def __init__(self, **kwargs):
# Initialize reducer configuration
pass
def __call__(self, df):
# Perform reduction
return result
Get Action Function
def get_action(name):
try:
return _ACTIONS[name]
except KeyError:
raise ValueError(f"Unknown action: {name}")
Workflow Execution
Execution Flow
graph TD
A[Start] --> B[Import reduce module]
B --> C[Call reduce df action]
C --> D[get_action retrieves class]
D --> E[Instantiate with kwargs]
E --> F[Call reducer on DataFrame]
F --> G[Return result]
G --> H[End]Integration with Datatune Core
The reduce operation integrates with other datatune components through the public API:
import datatune as dt
# reduce is exported from datatune namespace
result = dt.reduce(df, action="specific_action", param=value)
Sources: datatune/__init__.py:1-7
Registered Actions
Based on the repository structure, reduce operations support extensibility through custom registered actions. The system is designed to work with Dask DataFrames and can integrate with:
- SemanticDeduplicator: For deduplication using embedding-based similarity matching
- Custom aggregations: User-defined reduction logic via the decorator pattern
SemanticDeduplicator Integration
While primarily a deduplication tool, the SemanticDeduplicator class in datatune/core/deduplication.py demonstrates how reduce-style operations work:
class SemanticDeduplicator:
def __init__(
self,
llm,
embedding_model: str = "text-embedding-3-small",
sim_threshold: float = 0.90,
top_k: int = 50,
hnsw_m: int = 32,
ef_search: int = 64,
return_df: bool = False,
):
# Configuration initialization
The deduplicator processes data through embedding generation, FAISS index building, and similarity search phases, demonstrating how reduce operations handle complex multi-stage transformations.
Sources: datatune/core/deduplication.py:1-150
Extension Points
Creating Custom Reduce Actions
To create a custom reduction action:
- Define a class with appropriate
__init__and__call__methods - Decorate with
@register_action("action_name") - The class should accept
**kwargsfor flexible configuration - Implement
__call__(self, df)to receive the DataFrame and return the result
from datatune.core.reduce import register_action
@register_action("custom_aggregate")
class CustomAggregateReducer:
def __init__(self, group_by_column, agg_column, operation="sum", **kwargs):
self.group_by_column = group_by_column
self.agg_column = agg_column
self.operation = operation
def __call__(self, df):
# Custom reduction logic
return result_df
Best Practices
| Practice | Rationale |
|---|---|
| Use descriptive action names | Ensures clear identification in get_action calls |
Accept **kwargs in __init__ | Maintains compatibility with the reduce function signature |
| Return consistent types | Enables predictable downstream processing |
| Handle empty DataFrames | Prevents errors during edge case processing |
Related Components
| Component | File | Relationship |
|---|---|---|
| Filter | datatune/core/dask/filter_dask.py | Similar pattern for row-level operations |
| Map | datatune/core/dask/map_dask.py | Column transformation counterpart |
| Finalize | datatune/core/dask/op.py | Completes lazy Dask computations |
| Agent | datatune/agent/agent.py | Orchestrates multi-step transformations |
Sources: datatune/core/dask/filter_dask.py:1-50 Sources: datatune/core/dask/map_dask.py:1-50
Summary
Reduce Operations provide a flexible, extensible mechanism for aggregating and transforming data in datatune. The action registry pattern enables:
- Dynamic action registration via the
@register_actiondecorator - Centralized action lookup through the
get_actionfunction - Unified API through the
reducefunction interface - Custom extensibility for domain-specific reduction logic
The architecture follows proven design patterns that separate action registration from execution, allowing the core system to remain stable while supporting diverse reduction behaviors through plugin-style extensions.
Sources: datatune/core/reduce.py:1-22
Dask Backend
Related topics: Ibis Backend, System Architecture
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Ibis Backend, System Architecture
Dask Backend
The Dask Backend is the primary data processing engine in datatune, enabling large-scale, distributed data transformations using Large Language Models (LLMs). It extends the popular Dask library with semantic understanding capabilities, allowing users to perform complex data operations through natural language prompts while maintaining Dask's efficient parallel and out-of-core computation model.
Architecture Overview
The Dask Backend follows a modular architecture with three main layers:
- API Layer (
datatune/core/map.py,datatune/core/filter.py) - Entry points that dispatch to appropriate backends - Operation Layer (
datatune/core/dask/) - Core transformations for Dask DataFrames - LLM Integration Layer - Handles communication with language models for semantic operations
graph TD
A[User Data] --> B[Dask DataFrame]
B --> C{Operation Type}
C -->|Map| D[map_dask.py]
C -->|Filter| E[filter_dask.py]
D --> F[LLM API]
E --> F
F --> G[Parse & Validate]
G --> H[New Columns Added]
H --> I[Lazy Dask DataFrame]
I --> J[finalize]
J --> K[Computed Result]
L[Clusters] -.->|Optional| D
L -.->|Optional| ECore Operations
Map Operation
The map operation uses an LLM to generate new columns from existing data through natural language understanding. It is designed for semantic extraction, classification, and content transformation tasks.
Source: datatune/core/dask/map_dask.py
#### How Map Works
graph LR
A[Input DataFrame] --> B[Serialize Rows]
B --> C[Cluster Deduplication]
C --> D[Batch for LLM]
D --> E[LLM Processing]
E --> F[Parse Output]
F --> G[Expand to Columns]
G --> H[Copy to Duplicates]
H --> I[Output DataFrame]#### Key Components
| Component | Description |
|---|---|
serialized_input_column | Temporary column storing row data as Python dictionaries |
llm_output_column | Temporary column storing raw LLM responses |
clusters | Optional list of duplicate clusters for deduplication |
#### Processing Flow
- Each row is serialized to a dictionary format
- If clusters are provided, duplicate handling is applied - only canonical rows are sent to LLM
- LLM receives batched prompts with mapping instructions
- Responses are parsed and new columns are extracted
- Results are propagated back to duplicate rows
Key code snippet from map_dask.py:
df.loc[canonical_idx, llm_output_column] = llm_out
for dup, canon in dup_to_canon.items():
df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]
This ensures duplicate handling by copying canonical results to all duplicate rows.
Sources: datatune/core/dask/map_dask.py:34-48
Filter Operation
The filter operation uses an LLM to determine which rows to keep or remove based on natural language criteria.
Source: datatune/core/dask/filter_dask.py
#### Filter Output Format
The LLM returns decisions in a specific format with an added __filter__ key:
index=<row_index>|{key1: value1, ..., '__filter__': True/False}<endofrow>
| Decision | Behavior |
|---|---|
True | Keep the row |
False | Remove the row |
#### Processing Pipeline
graph TD
A[Input DataFrame] --> B[Serialize to Dictionaries]
B --> C[Apply Cluster Deduplication]
C --> D[Send to LLM with Filter Prompt]
D --> E[Parse __filter__ Value]
E --> F[Filter DataFrame]
F --> G[Output DataFrame]Key code snippet for filter decision parsing:
suffix = (
f"{os.linesep}{os.linesep}"
"DECISION:Your response MUST be the entire input record as Python dictionary..."
"ALWAYS STICK TO THE FORMAT index=<row_index>|{...} with added key called '__filter__'..."
)
Sources: datatune/core/dask/filter_dask.py:24-35
Finalize Operation
The finalize operation computes the lazy Dask DataFrame and converts it to a pandas DataFrame.
Source: datatune/core/dask/op.py
def finalize(df: dd.DataFrame) -> pd.DataFrame:
"""Compute the lazy Dask DataFrame and return a pandas DataFrame."""
return df.compute()
Sources: datatune/core/dask/op.py
Semantic Deduplication
Both map and filter operations support semantic deduplication to reduce LLM API calls and costs.
Source: datatune/core/deduplication.py
How Deduplication Works
graph TD
A[Input Series] --> B[Find Duplicate Clusters]
B --> C{dup_to_canon mapping}
C --> D[Extract Canonical Index]
D --> E[Send Only Canonical to LLM]
E --> F[Copy Results to Duplicates]
style C fill:#f9f,color:#000Cluster Format
clusters = [
{"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
{"canonical_id": 10, "duplicate_ids": [15, 20]}
]
This approach sends only unique/canonical rows to the LLM, then propagates results to all duplicates.
Sources: datatune/core/dask/map_dask.py:34-40
API Reference
Map Function
from datatune.core.map import map
result = map(
prompt="Extract categories from the description",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"],
clusters=None # Optional deduplication clusters
)(llm, dask_dataframe)
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt | str | Yes | Natural language instructions for column generation |
output_fields | List[str] | Yes | Names of new columns to create |
input_fields | List[str] | No | Columns to use as input (defaults to all) |
clusters | List[Dict] | No | Duplicate clusters for deduplication |
Filter Function
from datatune.core.filter import filter
result = filter(
prompt="Keep only electronics products",
input_fields=["Name", "Description"],
clusters=None # Optional deduplication clusters
)(llm, dask_dataframe)
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt | str | Yes | Natural language criteria for filtering |
input_fields | List[str] | No | Columns to evaluate against |
clusters | List[Dict] | No | Duplicate clusters for deduplication |
Finalize Function
from datatune.core.dask.op import finalize
pandas_df = finalize(lazy_dask_dataframe)
Sources: datatune/core/dask/op.py
LLM Output Format
The Dask Backend requires LLM responses in a specific parseable format for reliable operation.
Map Output Format
index=<row_index>|{key1: value1, key2: value2, ..., 'new_field1': val, 'new_field2': val}<endofrow>
Filter Output Format
index=<row_index>|{key1: value1, key2: value2, ..., '__filter__': True/False}<endofrow>
Parsing Rules
| Rule | Description |
|---|---|
| String values | Must be enclosed in double quotes |
| Missing values | Set to None or null |
| Index | Must match row index exactly |
| Format | Must be valid Python literal (dictionary) |
Sources: datatune/core/dask/map_dask.py:20-30
Usage Examples
Example 1: Category Extraction
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")
# Extract categories using natural language
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
# Save results
result = dt.finalize(mapped)
result.compute().to_csv("categorized_products.csv")
Sources: README.md
Example 2: Text-Based Filtering
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("companies.csv")
# Filter based on semantic criteria
filtered = dt.filter(
prompt="Keep only organizations in Africa",
input_fields=["Country", "Description"]
)(llm, df)
result = dt.finalize(filtered)
Example 3: With Deduplication
# Assuming clusters from semantic deduplication step
clusters = [
{"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
{"canonical_id": 10, "duplicate_ids": [15, 20]}
]
mapped = dt.map(
prompt="Classify the industry sector",
output_fields=["Industry", "Sector"],
input_fields=["CompanyName", "Description"],
clusters=clusters
)(llm, df)
Internal Data Flow
Serialization Process
The backend converts DataFrame rows to dictionary format for LLM processing:
graph LR
A[DataFrame Row] --> B[Convert to Dict]
B --> C[Replace NaN with None]
C --> D["String: {col1: val1, col2: val2}"]
D --> E[LLM Prompt]Implementation from map_dask.py:
df = df.astype(object).where(df.notna(), None)
df[serialized_input_column] = [
str(row.to_dict()) for _, row in df.iterrows()
]
Sources: datatune/core/dask/map_dask.py:50-56
Error Handling
The backend includes robust error handling for LLM parsing failures:
def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
"""
Parses the LLM output string into a Python dictionary.
"""
# Returns dict on success, Exception on failure
Failed parsing attempts return empty dictionaries {}, ensuring the pipeline continues rather than crashing.
Backend Dispatch Mechanism
The system automatically dispatches to the Dask backend when a Dask DataFrame is detected:
graph TD
A[Input Data] --> B{Is Dask DataFrame?}
B -->|Yes| C[Dask Backend]
B -->|No| D{Is Ibis Table?}
D -->|Yes| E[Ibis Backend]
D -->|No| F[TypeError]
C --> G[_map_dask / _filter_dask]
E --> H[_map_ibis / _filter_ibis]Dispatch logic from core/map.py:
def _is_dask_df(obj):
try:
import dask.dataframe as dd
return isinstance(obj, dd.DataFrame)
except ImportError:
return False
Sources: datatune/core/map.py:1-17
Configuration Considerations
Memory Efficiency
The Dask backend processes data partition by partition, making it suitable for datasets larger than memory. Consider:
| Setting | Recommendation |
|---|---|
| Dask partitions | Set based on available memory |
| Batch size | Adjust based on LLM rate limits |
| Clusters | Use for high-duplicate datasets |
LLM Rate Limits
The backend implements batching to respect LLM API rate limits while maximizing throughput.
Summary
The Dask Backend provides a powerful, scalable mechanism for performing LLM-powered data transformations on Dask DataFrames. Key capabilities include:
- Map: Generate new columns through semantic understanding
- Filter: Remove rows based on natural language criteria
- Deduplication: Reduce costs by processing only canonical rows
- Lazy Evaluation: Maintain Dask's efficient computation model until finalization
All operations follow a consistent pattern: serialize data, send to LLM with clear instructions, parse responses, and update the DataFrame.
Sources: datatune/core/dask/map_dask.py:34-48
Ibis Backend
Related topics: Dask Backend, System Architecture
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Dask Backend, System Architecture
Ibis Backend
Overview
The Ibis Backend in datatune provides integration with the Ibis project, enabling LLM-powered data transformations on various database backends including DuckDB, PostgreSQL, BigQuery, and more. This backend extends datatune's core functionality (map, filter) to work with Ibis tables, leveraging the composability and SQL-generation capabilities of Ibis while incorporating LLM-based semantic transformations.
The Ibis Backend serves as a critical abstraction layer that:
- Accepts Ibis table objects as input
- Serializes table data for LLM consumption
- Executes batch inference operations via LLM calls
- Parses and applies LLM outputs back to the table structure
- Returns transformed Ibis tables for continued processing
Sources: datatune/core/ibis/filter_ibis.py:1-20 Sources: datatune/core/ibis/map_ibis.py:1-25
Architecture
System Components
The Ibis Backend consists of several interconnected components that work together to provide seamless LLM-powered transformations:
graph TD
A[Ibis Table Input] --> B[add_serialized_col]
B --> C[llm_batch_inference]
C --> D[LLM API Call]
D --> E[Parse LLM Output]
E --> F[apply_llm_updates / Filter Logic]
F --> G[Transformed Ibis Table]
H[_map_ibis Class] --> I[Map Operations]
J[_filter_ibis Class] --> K[Filter Operations]Component Responsibilities
| Component | File | Purpose |
|---|---|---|
add_serialized_col | filter_ibis.py, map_ibis.py | Serializes Ibis table columns into JSON strings for LLM input |
llm_batch_inference | filter_ibis.py, map_ibis.py | Manages batch inference with LLM, handles retries and error cases |
_map_ibis | map_ibis.py | Handles mapping operations (creating new columns via LLM) |
_filter_ibis | filter_ibis.py | Handles filtering operations (row selection via LLM) |
Sources: datatune/core/ibis/filter_ibis.py:11-32 Sources: datatune/core/ibis/map_ibis.py:12-38
Core Functions
add_serialized_col
The add_serialized_col function transforms Ibis table columns into a serialized JSON format suitable for LLM processing. This function is fundamental to both map and filter operations.
Function Signature:
def add_serialized_col(table, target_col: str, input_fields: Optional[List[str]] = []):
Parameters:
| Parameter | Type | Description |
|---|---|---|
table | ibis.Table | The input Ibis table |
target_col | str | Name of the column to store serialized data |
input_fields | Optional[List[str]] | List of column names to include in serialization. If empty, includes all columns except target |
Behavior:
- Filters columns to include only relevant fields (excluding target column and non-selected fields)
- Constructs a JSON string expression by concatenating column names and values
- Returns a new Ibis table with the serialized column added via
mutate()
Sources: datatune/core/ibis/map_ibis.py:40-53 Sources: datatune/core/ibis/filter_ibis.py:37-51
llm_batch_inference
The llm_batch_inference function manages the core LLM interaction for transforming data. It handles the complete workflow from prompt construction to result collection.
Function Signature:
def llm_batch_inference(
table,
llm: Callable,
input_col: str,
output_col: str,
prompt: str,
expected_new_fields: list[str] = []
):
Parameters:
| Parameter | Type | Description |
|---|---|---|
table | ibis.Table | The input Ibis table |
llm | Callable | LLM callable (e.g., OpenAI, Ollama instance) |
input_col | str | Column containing serialized input data |
output_col | str | Column name for storing LLM output |
prompt | str | Transformation prompt for the LLM |
expected_new_fields | list[str] | Optional list of expected output field names |
Workflow:
sequenceDiagram
participant Table as Ibis Table
participant Func as llm_batch_inference
participant LLM as LLM API
participant Parse as Output Parser
Table->>Func: Execute table.select() to local DataFrame
Func->>Table: Get input_list from input_col
Func->>LLM: Call llm(input_list, prefix, prompt, suffix)
LLM-->>Func: Raw LLM responses
Func->>Parse: Process each response
Parse-->>Func: Validated JSON strings
Func->>Table: Create mapping DataFrame
Table-->>Func: Updated table with output_colSources: datatune/core/ibis/map_ibis.py:55-82 Sources: datatune/core/ibis/filter_ibis.py:54-78
Map Operations
_map_ibis Class
The _map_ibis class implements the mapping transformation for Ibis tables, enabling the creation of new columns based on LLM-driven transformations.
Class Definition:
class _map_ibis:
def __init__(
self,
prompt: str,
input_fields: Optional[List[str]] = None,
output_fields: Optional[List[str]] = None,
):
self.prompt = prompt
self.input_fields = input_fields or []
self.output_fields = output_fields or []
self.llm_output_column = "MAP_LLM_OUTPUT__DATATUNE__"
self.serialized_input_column = "MAP_SERIALIZED_INPUT__DATATUNE__"
Key Attributes:
| Attribute | Type | Default | Description |
|---|---|---|---|
prompt | str | Required | Natural language prompt describing the transformation |
input_fields | Optional[List[str]] | [] | Columns to use as input for transformation |
output_fields | Optional[List[str]] | [] | Names of new columns to create |
llm_output_column | str | System-generated | Internal column for LLM response storage |
serialized_input_column | str | System-generated | Internal column for serialized input |
Execution Flow:
- Validates that all
input_fieldsexist in the table schema - Creates serialized input column via
add_serialized_col - Executes batch inference via
llm_batch_inference - Applies LLM updates to create output columns
- Selects final columns (removing internal columns)
Sources: datatune/core/ibis/map_ibis.py:122-160
Prompt Format for Map
The LLM receives structured prompts for mapping operations:
Map and transform the input according to the mapping criteria below.
Replace or Create new fields or values as per the prompt.
Expected new fields: [output_fields]
MAPPING CRITERIA:
{prompt}
Your response MUST be the entire input record as a valid Python dictionary
in the format 'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys
of expected new fields if any.
Sources: datatune/core/ibis/map_ibis.py:56-75
Filter Operations
_filter_ibis Class
The _filter_ibis class implements filtering transformations for Ibis tables, enabling row-level selection based on LLM-driven criteria.
Class Definition:
class _filter_ibis:
def __init__(
self,
prompt: str,
input_fields: Optional[List[str]] = None,
):
self.prompt = prompt
self.input_fields = input_fields or []
self.llm_output_column = "FILTER_LLM_OUTPUT__DATATUNE__"
self.serialized_input_column = "FILTER_SERIALIZED_INPUT__DATATUNE__"
Key Attributes:
| Attribute | Type | Default | Description |
|---|---|---|---|
prompt | str | Required | Natural language criteria for filtering |
input_fields | Optional[List[str]] | [] | Columns to consider for filtering decision |
llm_output_column | str | System-generated | Internal column for LLM response storage |
serialized_input_column | str | System-generated | Internal column for serialized input |
Execution Flow:
graph TD
A[Input Ibis Table] --> B[Add Serialized Column]
B --> C[Execute to Local DataFrame]
C --> D[LLM Batch Inference]
D --> E[Parse Filter Decisions]
E --> F[Extract __filter__ Flag]
F --> G[Apply Boolean Filter]
G --> H[Final Ibis Table]Sources: datatune/core/ibis/filter_ibis.py:80-120
Prompt Format for Filter
The LLM receives structured prompts for filtering operations:
You are filtering a dataset. Your task is to determine whether each data record
should be KEPT or REMOVED based on the filtering criteria below.
Return the entire input data record with an added key called '__filter__' with
value either True to KEEP the record or False to REMOVE it.
FILTERING CRITERIA:
{prompt}
Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called
'__filter__' with value either True to KEEP the record or False to REMOVE it.
Sources: datatune/core/ibis/filter_ibis.py:56-68
Output Parsing
parse_filter_output
The parse_filter_output function extracts boolean filter decisions from LLM responses.
Function Signature:
def parse_filter_output(
output: Union[str, Exception], err: bool = True
) -> Optional[bool]:
Parsing Logic:
- Attempts to evaluate the LLM output as a Python literal using
ast.literal_eval - Extracts the last key-value pair from the dictionary
- Returns the boolean value associated with the
__filter__key - Returns
Noneif parsing fails
Sources: datatune/core/ibis/filter_ibis.py:104-130
Output Format Requirements
The Ibis Backend enforces strict output format requirements for LLM responses:
| Requirement | Format | Example | ||
|---|---|---|---|---|
| Index prefix | `index=<row_index>\ | ` | `index=0\ | ` |
| Data format | Python dictionary literal | {'col1': 'value1', 'col2': 'value2'} | ||
| String quoting | Double quotes only | "string" not 'string' | ||
| Null values | None | {'col': None} | ||
| Row terminator | <endofrow> | `index=0\ | {...}<endofrow>` |
Sources: datatune/core/ibis/map_ibis.py:70-75 Sources: datatune/core/ibis/filter_ibis.py:64-67
Usage Examples
Basic Map Operation
import datatune as dt
from datatune.llm.llm import OpenAI
import ibis
# Connect to DuckDB
con = ibis.duckdb.connect("data.duckdb")
table = con.table("products")
# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")
# Create mapping operation
mapped = dt.map(
prompt="Extract product category and subcategory from description",
input_fields=["description", "name"],
output_fields=["Category", "Subcategory"]
)(llm, table)
Basic Filter Operation
import datatune as dt
from datatune.llm.llm import Ollama
# Connect to PostgreSQL via Ibis
con = ibis.postgres.connect(...)
table = con.table("orders")
# Initialize local LLM
llm = Ollama(model_name="gemma3:4b")
# Create filter operation
filtered = dt.filter(
prompt="Keep only orders from enterprise customers with value over $10,000",
input_fields=["customer_type", "order_value"]
)(llm, table)
Error Handling
Schema Validation
The Ibis Backend performs input schema validation before processing:
def __call__(self, llm: Callable, table: Table) -> Table:
if self.input_fields:
missing = [f for f in self.input_fields if f not in table.columns]
if missing:
error_msg = (
f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. "
f"Available columns: {list(table.columns)}"
)
raise ValueError(error_msg)
Validation Rules:
| Check | Condition | Action on Failure |
|---|---|---|
| Input fields exist | input_fields subset of table.columns | Raise ValueError |
| LLM call success | No exception from llm() | Return error string |
| Output parsing | Valid Python literal | Return empty dict {} |
Sources: datatune/core/ibis/map_ibis.py:130-140
Internal Column Naming
The Ibis Backend uses prefixed column names to avoid conflicts with user data:
| Operation | Serialized Column | Output Column |
|---|---|---|
| Map | MAP_SERIALIZED_INPUT__DATATUNE__ | MAP_LLM_OUTPUT__DATATUNE__ |
| Filter | FILTER_SERIALIZED_INPUT__DATATUNE__ | FILTER_LLM_OUTPUT__DATATUNE__ |
These columns are automatically removed from the final output table, ensuring clean results.
Integration with Core API
The Ibis Backend is invoked through the unified datatune API via the filter.py module:
def filter(*, prompt, input_fields=None, clusters=None):
def apply(llm, data):
if _is_ibis_table(data):
from .ibis.filter_ibis import _filter_ibis
return _filter_ibis(
prompt=prompt,
input_fields=input_fields,
)(llm, data)
# ... other backends
Sources: datatune/core/filter.py:15-28
Supported Database Backends
The Ibis Backend supports any database backend compatible with Ibis:
| Backend | Connection Method | Example |
|---|---|---|
| DuckDB | ibis.duckdb.connect() | Local analytical queries |
| PostgreSQL | ibis.postgres.connect() | Enterprise data |
| BigQuery | ibis.bigquery.connect() | Cloud data warehousing |
| SQLite | ibis.sqlite.connect() | Lightweight embedded DB |
| MySQL | ibis.mysql.connect() | Web applications |
Performance Considerations
Batch Processing
The Ibis Backend processes data in batches determined by:
- Local execution boundary: Data is materialized to a local DataFrame via
.execute() - LLM rate limits: Respects TPM (tokens per minute) and RPM (requests per minute) limits
- Memory constraints: Batch sizes are controlled by the underlying LLM integration
Optimization Strategies
| Strategy | Description | Applicable Operation |
|---|---|---|
| Select specific fields | Use input_fields to minimize data transfer | Map, Filter |
| Pushdown predicates | Filter data in the database before LLM processing | Filter |
| Column pruning | Remove unused columns early in the pipeline | Map |
See Also
- Dask Backend - Alternative backend for distributed computing
- LLM Integrations - Supported LLM providers
- Core API - Unified transformation API
LLM Integration
Related topics: Agent System, Map Operations, Filter Operations
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Agent System, Map Operations, Filter Operations
LLM Integration
The LLM Integration module in datatune provides a unified abstraction layer for interfacing with various Large Language Model providers. It handles rate limiting, token management, request batching, and provider-specific configurations, enabling consistent data transformation operations across different LLM backends.
Architecture Overview
The LLM module uses a class hierarchy where a base LLM class provides common functionality, while provider-specific subclasses handle individual vendor implementations.
graph TD
A[LLM Base Class] --> B[OpenAI]
A --> C[Ollama]
A --> D[VLLM]
A --> E[Azure]
A --> F[Mistral]
A --> G[Huggingface]
A --> H[Gemini]
I[litellm] --> A
J[Rate Limits] --> A
K[Token Counter] --> ACore Components
LLM Base Class
The LLM class serves as the foundation for all provider implementations. It manages rate limiting, token counting, and prompt batching.
#### Initialization Parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
model_name | str | Full model identifier (provider/model format) | Required |
rpm | int | Requests per minute limit | Provider default |
tpm | int | Tokens per minute limit | Provider default |
max_tokens | int | Maximum tokens in response | Provider default |
temperature | float | Sampling temperature | 0.0 |
#### Key Methods
| Method | Description |
|---|---|
_completion(prompt) | Execute a single completion request |
_create_batched_prompts(input_rows, user_batch_prefix, prompt_per_row) | Create optimized batched prompts |
get_max_tokens() | Retrieve max tokens for current model |
Sources: datatune/llm/llm.py:40-90
Provider Implementations
#### OpenAI
from datatune.llm.llm import OpenAI
llm = OpenAI(
model_name="gpt-3.5-turbo",
api_key="your-api-key"
)
Configures models in the openai/model-name format for LiteLLM compatibility.
Sources: datatune/llm/llm.py:95-102
#### Ollama (Local)
from datatune.llm.llm import Ollama
llm = Ollama(
model_name="gemma3:4b",
api_base="http://localhost:11434"
)
Enables local model inference with Ollama. The api_base defaults to http://localhost:11434.
Sources: datatune/llm/llm.py:74-82
#### VLLM
from datatune.llm.llm import VLLM
llm = VLLM(
model_name="your-model",
api_base="http://localhost:8000/v1"
)
Queries the /models endpoint to automatically determine max_model_len from the server.
Sources: datatune/llm/llm.py:104-122
#### Azure
from datatune.llm.llm import Azure
llm = Azure(
model_name="gpt-3.5-turbo",
api_key="your-key",
api_base="your-endpoint",
api_version="2024-01-01"
)
Supports Azure OpenAI Service deployments with additional configuration options.
#### Mistral
from datatune.llm.llm import Mistral
llm = Mistral(
model_name="mistral-tiny",
api_key="your-api-key"
)
Wraps Mistral AI models with automatic mistral/ prefix handling.
#### Huggingface
from datatune.llm.llm import Huggingface
llm = Huggingface(
model_name="meta-llama/Llama-2-70b",
api_key="your-api-key"
)
Provides access to Hugging Face Inference API models.
Rate Limiting
The module enforces two types of rate limits to prevent API throttling:
| Limit Type | Description | Applied At |
|---|---|---|
| TPM | Tokens per minute | Per-request token counting |
| RPM | Requests per minute | Per-API-call counting |
Model Rate Limits Configuration
Predefined limits for major models are stored in datatune/llm/model_rate_limits.py:
| Model | TPM | RPM |
|---|---|---|
gpt-3.5-turbo | 200,000 | 500 |
gpt-4 | 10,000 | 500 |
gpt-4-turbo | 30,000 | 500 |
gpt-4o | 30,000 | 500 |
gpt-4.1-mini | 200,000 | 500 |
gpt-4.1-nano | 200,000 | 500 |
gpt-4.5-preview | 125,000 | 1,000 |
Sources: datatune/llm/model_rate_limits.py:1-90
Fallback Behavior
When an unknown model is used, the system falls back to gpt-3.5-turbo limits and logs a warning:
if self._base_model_name in model_rate_limits:
model_limits = model_rate_limits[self._base_model_name]
else:
model_limits = model_rate_limits[DEFAULT_MODEL]
logger.warning(
f"REQUESTS-PER-MINUTE limits for model '{model_name}' not found. "
f"Defaulting to '{DEFAULT_MODEL}' limits"
)
Sources: datatune/llm/llm.py:45-57
Prompt Formatting
The LLM module enforces strict output format requirements for parsing reliability.
Output Format Specification
All responses must follow this pattern:
index=<row_index>|{python_literal}<endofrow>
Supported Python literals:
- Lists:
index=0|['item1', 'item2', 'item3']<endofrow> - Dicts:
index=1|{'key1': 'value1', 'key2': 2}<endofrow> - Strings:
index=2|"This is a string answer"<endofrow>
Prompt Components
| Component | Purpose |
|---|---|
prefix | Instructions for row processing |
user_batch_prefix | Custom user instructions |
suffix | Output format requirements |
Sources: datatune/llm/llm.py:1-20
Batch Processing
The batching system optimizes LLM usage by grouping multiple rows into single API requests while respecting token limits.
graph LR
A[Input Rows] --> B[Token Counting]
B --> C{Within Limits?}
C -->|Yes| D[Batch Request]
C -->|No| E[Split Batch]
D --> F[LLM API]
E --> D
F --> G[Parse Output]
G --> H[Row Results]Batching Logic
- Calculate prefix/suffix token overhead using
token_counter - Iterate through input rows, accumulating tokens
- Split batches when approaching
max_tokenslimit - Track
nrows_per_api_callfor output reconstruction
Sources: datatune/llm/llm.py:30-70
Integration with Core Modules
Filter Operations
The filter module uses the LLM to evaluate row-level boolean conditions:
def filter_rows(llm, df, input_column, prompt, output_column):
prefix = "Row data:"
suffix = "Your response MUST be... with added key '__filter__'"
llm_out = llm(
input_series,
prefix,
prompt,
suffix,
optimized=True
)
Sources: datatune/core/dask/filter_dask.py:15-40
Map Operations
The map module leverages the LLM for column transformations:
def map_rows(llm, df, input_columns, prompt, output_columns):
suffix = "Your response MUST be... with added keys of expected new fields"
llm_out = llm(
canonical_input,
prefix,
prompt,
suffix,
optimized=True
)
Sources: datatune/core/dask/map_dask.py:20-45
Usage Example
import datatune as dt
from datatune.llm.llm import OpenAI
# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")
# Use with datatune operations
df = dd.read_csv("products.csv")
mapped = dt.map(
prompt="Extract categories from description",
output_fields=["Category", "Subcategory"],
input_fields=["Description"]
)(llm, df)
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Category"]
)(llm, mapped)
Module Exports
The datatune.llm package exports all provider classes:
from datatune.llm import *
# Available: LLM, OpenAI, Ollama, VLLM, Azure, Mistral, Huggingface
Sources: datatune/llm/__init__.py:1
Dependencies
| Library | Purpose |
|---|---|
litellm | Unified LLM API interface |
token_counter | Token estimation |
get_max_tokens | Model context limits |
Sources: datatune/llm/llm.py:10-11
Sources: datatune/llm/llm.py:40-90
Agent System
Related topics: LLM Integration, Map Operations, Filter Operations
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: LLM Integration, Map Operations, Filter Operations
Agent System
The Agent System is the core intelligent orchestration layer in datatune that automatically determines, plans, and executes data transformation workflows based on natural language prompts. It abstracts the complexity of selecting between Dask operations and LLM-powered primitives (Map, Filter), allowing users to describe desired transformations in plain English while the system handles the implementation details.
Architecture Overview
The Agent System follows a planner-executor pattern where the LLM acts as the planner, generating a JSON-based execution plan, and the Agent class handles the runtime execution of that plan against Dask DataFrames.
graph TD
A[User Natural Language Goal] --> B[Agent.do method]
B --> C[LLM Planner]
C --> D[JSON Execution Plan]
D --> E[Agent._execute_plan]
E --> F{Step Type}
F -->|dask| G[Dask Operations]
F -->|primitive| H[LLM Primitives<br/>Map/Filter]
G --> I[Runtime Execution]
H --> I
I --> J[Result DataFrame]Sources: datatune/agent/agent.py:1-50
Core Components
Agent Base Class
The Agent class serves as the abstract base for all agent implementations and provides the foundational system prompt that defines the agent's capabilities.
Sources: datatune/agent/__init__.py:1-50
| Component | Description |
|---|---|
system_prompt | Defines available libraries (pandas, numpy, dask, datatune) and example usage patterns |
ABC | Abstract base class pattern ensuring implementers provide core methods |
abstractmethod | Decorator marking do method as required implementation |
from abc import ABC, abstractmethod
class Agent(ABC):
system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
- pandas
- numpy
- dask
In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
..."""
Agent Implementation
The concrete Agent class in datatune/agent/agent.py implements the planner-executor logic.
Sources: datatune/agent/agent.py:1-100
| Attribute | Type | Purpose |
|---|---|---|
llm | LLM | Language model used for planning and primitive operations |
history | List[Dict] | Execution history for debugging and state tracking |
verbose | bool | Enable debug-level logging when True |
TEMPLATE | dict | Operation templates for code generation |
def __init__(self, llm: LLM, verbose: bool = False):
self.llm = llm
self.history: List[Dict[str, Any]] = []
self.verbose = verbose
if self.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
Plan Structure
The Agent uses a JSON-based plan format where each step represents a single transformation operation. Plans are generated by the LLM based on user goals and the current DataFrame schema.
Sources: datatune/agent/agent.py:100-150
Plan Step Schema
| Field | Type | Required | Description |
|---|---|---|---|
type | string | Yes | Either "dask" or "primitive" |
operation | string | Yes | Operation name within the step type |
params | dict | No | Parameters for Dask templates |
subprompt | string | No | LLM prompt for primitive operations |
input_fields | list | No | Input column names for the operation |
output_fields | list | No | Output column names (Map only) |
Step Types
Dask Operations execute server-side Python code using Dask's computational graph:
| Operation | Description |
|---|---|
add_column | Create a new column from an expression |
apply_function | Apply an element-wise function to a column |
rename_columns | Rename columns using a mapping |
astype_column | Change a column's data type |
group_by | Group data and aggregate |
Sources: datatune/agent/agent.py:50-100
Primitive Operations use the LLM for row-level transformations:
| Operation | Description | Use Case |
|---|---|---|
Map | Create new columns via LLM transformation | Semantic extraction, classification, interpretation |
Filter | Remove rows based on LLM criteria | Complex conditional filtering |
Sources: datatune/core/dask/map_dask.py:1-50, datatune/core/dask/filter_dask.py:1-50
Plan Execution Flow
graph TD
A[_execute_plan called] --> B[Set DataFrame in runtime]
B --> C[Initialize code_lines list]
C --> D{For each step in plan}
D -->|Has steps| E[Format step template]
E --> F[Log start message]
F --> G[Execute map_partitions with log_primitive]
G --> H[Increment step_num]
H --> I[Execute operation template]
I --> J[Log end message]
J --> D
D -->|No more steps| K[Call dt.finalize]
K --> L[Compute DataFrame]
L --> M[Return None, n_steps on success]
N[Exception during execution] --> O[Return error, failed_step]Sources: datatune/agent/agent.py:150-200
Execution Methods
#### _execute_plan(plan: List[Dict])
Executes a complete multi-step plan sequentially.
def _execute_plan(self, plan: List[Dict]):
"""Execute a sequence of plan steps (Dask or primitive)."""
self._set_df(self.df)
self.runtime.update({"step_num": 0, "plan": plan})
code_lines = []
n_steps = len(plan)
for i, step in enumerate(plan, start=1):
# ... step formatting and execution ...
# Final execution
full_code = (
"\n".join(code_lines)
+ "\n\n"
+ "df = dt.finalize(df)\n"
"df = df.compute()"
)
self.runtime.execute(full_code)
return None, n_steps
#### _execute_step(step: Dict)
Executes a single step with detailed error handling.
def _execute_step(self, step: Dict):
try:
if step["type"] == "dask":
template = self.TEMPLATE["dask"][step["operation"]].format(**step["params"])
self.runtime.execute(template + "\n_ = df.head()")
elif step["type"] == "primitive":
template = self.TEMPLATE["primitive"][step["operation"]].format(**step["params"])
self.runtime.execute(template)
else:
raise ValueError(f"Unknown step type: {step['type']}")
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
return error_msg
Sources: datatune/agent/agent.py:200-250
LLM Integration
The Agent integrates with the LLM system through datatune/llm/llm.py to generate execution plans and power primitive operations.
Sources: datatune/llm/llm.py:1-50
Supported LLM Providers
| Provider | Class | Default Model |
|---|---|---|
| OpenAI | OpenAI | gpt-3.5-turbo |
| Ollama | Ollama | gemma3:4b |
| Azure | Azure | Configurable |
| vLLM | VLLM | Configurable |
class OpenAI(LLM):
def __init__(
self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
):
kwargs.update({"api_key": api_key})
super().__init__(model_name=f"openai/{model_name}", **kwargs)
class Ollama(LLM):
def __init__(
self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
) -> None:
super().__init__(
model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
)
Rate Limiting
The system includes model rate limits configuration to prevent API throttling:
| Model | TPM | RPM |
|---|---|---|
gpt-3.5-turbo | 200,000 | 500 |
gpt-4-turbo | 30,000 | 500 |
gpt-4o | 30,000 | 500 |
Sources: datatune/llm/model_rate_limits.py:1-50
Prompt Generation
The Agent constructs prompts dynamically based on the current state and goal.
Sources: datatune/agent/agent.py:250-300
Prompt Components
| Method | Purpose |
|---|---|
get_persona_prompt(goal) | Generates the system instructions for plan generation |
get_schema_prompt(df) | Includes current DataFrame schema |
get_error_prompt(error_msg, failed_step) | Context for retry after failures |
get_full_prompt(df, goal) | Combines all components |
def get_schema_prompt(self, df: dd.DataFrame) -> str:
schema_prompt: str = f"""The current schema of the dataframe df is as follows:
{df.dtypes.to_string()}
"""
return schema_prompt
def get_error_prompt(self, error_msg: str, failed_step: Dict) -> str:
error_prompt: str = f"""
The previous code execution failed with the following error:
Error: {error_msg}
Failed step:
{failed_step}
Provide the json plan with the corrected step. Make sure to:
1. Address the specific error mentioned above.
2. DO NOT include any explanations or comments.
"""
return error_prompt
Primitive Operations
Map Operation
The Map primitive uses the LLM to create new columns by applying transformations to each row.
Sources: datatune/core/dask/map_dask.py:1-80
# Example: Extract categories from description
mapped = dt.map(
prompt="Extract categories from the description and name of product.",
output_fields=["Category", "Subcategory"],
input_fields=["Description", "Name"]
)(llm, df)
Output Format Requirements:
- Each response must be:
index=<index>|{key1: value1, key2: value2, ...}<endofrow> - Strings must use double quotes
- Missing values should be
null
Filter Operation
The Filter primitive uses the LLM to evaluate row-level conditions.
Sources: datatune/core/dask/filter_dask.py:1-80
# Example: Keep only electronics products
filtered = dt.filter(
prompt="Keep only electronics products",
input_fields=["Name"]
)(llm, mapped)
Decision Format:
- Each response must include
__filter__: True(keep) or__filter__: False(remove)
Error Handling
The Agent implements comprehensive error handling at multiple levels:
| Level | Handler | Action |
|---|---|---|
| Step formatting | ValueError | Log and return error with step number |
| Template lookup | KeyError | Raise with available operation names |
| Runtime execution | Exception | Capture traceback, return formatted error |
except Exception as e:
step_num = self.runtime["step_num"]
logger.error(f"❌ Step {step_num}/{n_steps} failed with error: {e}")
return str(e), step_num - 1
Sources: datatune/logger.py:1-50
Usage Examples
Basic Agent Usage
import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd
llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)
df = dd.read_csv("products.csv")
result = agent.do("Add ProfitMargin column and keep only African organizations", df)
result.compute().to_csv("african_products.csv")
Verbose Mode for Debugging
agent = dt.Agent(llm, verbose=True)
result = agent.do("Complex transformation task", df)
With verbose=True, the Agent sets logger to DEBUG level, outputting detailed execution information including step progress and intermediate states.
Chaining Operations
The Agent automatically chains multiple operations when a goal contains multiple tasks:
TASK: 1. create column a based on column x
2. create column b based on column y
Combined into a single Map primitive step rather than separate steps.
Sources: datatune/agent/agent.py:1-50
Doramagic Pitfall Log
Source-linked risks stay visible on the manual page so the preview does not read like a recommendation.
The project should not be treated as fully validated until this signal is reviewed.
Users cannot judge support quality until recent activity, releases, and issue response are checked.
The project may affect permissions, credentials, data exposure, or host boundaries.
The project may affect permissions, credentials, data exposure, or host boundaries.
Doramagic Pitfall Log
Doramagic extracted 7 source-linked risk signals. Review them before installing or handing real data to the project.
1. Capability assumption: README/documentation is current enough for a first validation pass.
- Severity: medium
- Finding: README/documentation is current enough for a first validation pass.
- User impact: The project should not be treated as fully validated until this signal is reviewed.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: capability.assumptions | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | README/documentation is current enough for a first validation pass.
2. Maintenance risk: Maintainer activity is unknown
- Severity: medium
- Finding: Maintenance risk is backed by a source signal: Maintainer activity is unknown. Treat it as a review item until the current version is checked.
- User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | last_activity_observed missing
3. Security or permission risk: no_demo
- Severity: medium
- Finding: no_demo
- User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: downstream_validation.risk_items | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium
4. Security or permission risk: No sandbox install has been executed yet; downstream must verify before user use.
- Severity: medium
- Finding: No sandbox install has been executed yet; downstream must verify before user use.
- User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: risks.safety_notes | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | No sandbox install has been executed yet; downstream must verify before user use.
5. Security or permission risk: no_demo
- Severity: medium
- Finding: no_demo
- User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: risks.scoring_risks | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium
6. Maintenance risk: issue_or_pr_quality=unknown
- Severity: low
- Finding: issue_or_pr_quality=unknown。
- User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | issue_or_pr_quality=unknown
7. Maintenance risk: release_recency=unknown
- Severity: low
- Finding: release_recency=unknown。
- User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
- Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
- Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | release_recency=unknown
Source: Doramagic discovery, validation, and Project Pack records
Community Discussion Evidence
These external discussion links are review inputs, not standalone proof that the project is production-ready.
Count of project-level external discussion links exposed on this manual page.
Open the linked issues or discussions before treating the pack as ready for your environment.
Community Discussion Evidence
Doramagic exposes project-level community discussion separately from official documentation. Review these links before using datatune with real data or production workflows.
- Dask's parallel computation of each partition will break true_batch_comp - github / github_issue
- Requirements Document: Planning Agent for Datatune - github / github_issue
- Requirements Doc: Unstructured data handling - github / github_issue
- Semantic Deduplication - github / github_issue
- We built a tool to connect LLMs and Agents to the entire user data and . - reddit / searxng_indexed
- README/documentation is current enough for a first validation pass. - GitHub / issue
Source: Project Pack community evidence and pitfall evidence