Doramagic Project Pack · Human Manual

datatune

Datatune is a data transformation framework that leverages Large Language Models (LLMs) to perform semantic operations on distributed data. The system provides a declarative interface for ...

Introduction to Datatune

Related topics: Getting Started, System Architecture

Section Related Pages

Continue reading this section for the full explanation and source context.

Section LLM Interface Layer

Continue reading this section for the full explanation and source context.

Section Rate Limiting Configuration

Continue reading this section for the full explanation and source context.

Section Map Operation

Continue reading this section for the full explanation and source context.

Related topics: Getting Started, System Architecture

Introduction to Datatune

Datatune is an AI-powered data transformation library that enables natural language-driven data processing operations on large-scale datasets. The library leverages Large Language Models (LLMs) to interpret human-readable instructions and automatically generate the appropriate data transformations, making complex data manipulation accessible to users without deep technical expertise.

Datatune supports multiple backend computational frameworks including Dask for distributed computing and Ibis for SQL-like operations across various databases. The system provides both explicit transformation primitives (Map, Filter) and an intelligent Agent that can automatically determine and chain multiple transformation steps from a single natural language prompt.

High-Level Architecture

graph TD
    User[User Request] --> Agent[Datatune Agent]
    User --> Primitives[Explicit Primitives]
    
    Agent --> Planner[Planning Module]
    Planner --> Steps[Transformation Steps]
    Steps --> DaskOps[Dask Operations]
    Steps --> PrimOps[Primitive Operations]
    Steps --> CodeGen[Code Generation]
    
    Primitives --> MapOp[Map Operation]
    Primitives --> FilterOp[Filter Operation]
    
    MapOp --> LLM[LLM Interface]
    FilterOp --> LLM
    
    LLM --> OpenAI[OpenAI Provider]
    LLM --> Ollama[Ollama Provider]
    LLM --> VLLM[VLLM Provider]
    LLM --> Azure[Azure Provider]
    
    DaskOps --> Dask[Dask DataFrame]
    PrimOps --> Dask
    
    MapOp --> Ibis[Ibis Backend]
    FilterOp --> Ibis
    
    Ibis --> DuckDB[DuckDB]
    Ibis --> PostgreSQL[PostgreSQL]
    Ibis --> BigQuery[BigQuery]

The architecture follows a layered approach where the LLM abstraction layer provides a unified interface for multiple AI providers, while the transformation layer handles both explicit user-defined operations and AI-generated execution plans.

Core Components

LLM Interface Layer

The LLM module (datatune/llm/llm.py) provides a unified interface for interacting with various language model providers. The base LLM class handles rate limiting, token counting, and request batching, while provider-specific subclasses implement the actual API calls.

#### Supported LLM Providers

ProviderClassDefault ModelDescription
OpenAIOpenAIgpt-3.5-turboStandard OpenAI API integration
OllamaOllamagemma3:4bLocal LLM server support
VLLMVLLMUser-specifiedHigh-performance vLLM inference
AzureAzureUser-specifiedAzure OpenAI Service

The base LLM class automatically extracts rate limit configurations from model_rate_limits.py based on the model name. If a model is not found in the predefined limits, it defaults to GPT-3.5-turbo limits with a warning message.

from datatune.llm.llm import OpenAI, Ollama, Azure

# OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")

# Ollama (local)
llm = Ollama()

# Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)

Sources: README.md:40-56

Rate Limiting Configuration

The model_rate_limits.py module defines tokens-per-minute (TPM) and requests-per-minute (RPM) limits for supported models:

Model FamilyTPMRPM
GPT-3.5-Turbo200,000500
GPT-410,000 - 30,000500
GPT-4.130,000 - 200,000500
GPT-4.5-Preview125,0001000

Sources: datatune/llm/model_rate_limits.py:1-120

Transformation Primitives

Datatune provides two fundamental transformation primitives that work with natural language prompts:

Map Operation

The Map primitive creates new columns by applying LLM-driven transformations to existing data. It takes input from specified columns and generates output fields based on the natural language prompt.

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

The Map operation constructs prompts with the following structure:

Your task is to MAP/CREATE new fields based on the following input record(s):
[input data]

Your response MUST be the entire input record as a valid Python dictionary in the format
'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys of expected new fields.

Sources: datatune/core/dask/map_dask.py:1-50 Sources: datatune/core/ibis/map_ibis.py:1-60

Filter Operation

The Filter primitive removes rows that do not meet specified conditions interpreted by the LLM:

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

The Filter operation adds a __filter__ key to each record, with True indicating the row should be kept and False indicating removal.

FILTERING CRITERIA:
[prompt]

DECISION: Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...} with added key called '__filter__' 
with value either True to KEEP the record or False to REMOVE it.

Sources: datatune/core/dask/filter_dask.py:1-45 Sources: datatune/core/ibis/filter_ibis.py:1-50

Intelligent Agent

The Agent class provides a higher-level interface that automatically determines which operations to perform based on natural language requests. The agent can chain multiple Map, Filter, and Python code operations into a coherent execution plan.

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)

Agent System Prompt

The Agent uses a structured system prompt that defines available operations and expected response formats:

system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
- pandas
- numpy
- dask

In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.

Plan Execution Flow

graph TD
    Request[Natural Language Request] --> Parse[Parse Request]
    Parse --> Plan[Generate Execution Plan]
    Plan --> Step1[Step 1: Dask/Primitive]
    Step1 --> Step2[Step 2: Dask/Primitive]
    Step2 --> StepN[Step N: ...]
    StepN --> Execute[Execute Full Plan]
    Execute --> Result[Finalized DataFrame]
    
    Plan --> StepTypes{Step Type}
    StepTypes -->|"dask"| DaskTemplate[Dask Template]
    StepTypes -->|"primitive"| PrimTemplate[Primitive Template]

The agent generates plans as JSON arrays with the following structure:

[
  {
    "type": "dask|primitive",
    "operation": "operation_name",
    "params": {},
    "subprompt": "LLM prompt for primitive operations",
    "input_fields": ["column1"],
    "output_fields": ["new_column"]
  }
]

Sources: datatune/agent/__init__.py:1-45 Sources: datatune/agent/agent.py:1-100

Data Source Backends

Datatune supports multiple computational backends for processing data at scale:

Dask Backend

The Dask backend provides distributed computing capabilities for pandas-like DataFrames:

import dask.dataframe as dd
df = dd.read_csv("data.csv")

Dask operations include:

  • add_column: Create new columns from expressions
  • apply_function: Apply element-wise functions
  • rename_columns: Rename using mappings
  • astype_column: Change data types

Ibis Backend

The Ibis backend enables SQL-like operations across various database backends:

import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

Supported Ibis backends include DuckDB, PostgreSQL, and BigQuery.

Sources: README.md:60-75

Deduplication System

Datatune includes a sophisticated deduplication system that uses embedding-based similarity detection:

graph LR
    Input[Input Data] --> Embed[Embedding Generation]
    Embed --> Index[FAISS Index Build]
    Index --> Cluster[Similarity Clustering]
    Cluster --> Dedup[Deduplicated Output]

Key parameters:

ParameterDefaultDescription
embedding_modeltext-embedding-3-smallModel for generating embeddings
sim_threshold0.90Similarity threshold for clustering
top_k50Top-K neighbors for clustering
hnsw_m32HNSW index construction parameter
ef_search64HNSW search parameter

The system uses FAISS HNSW (Hierarchical Navigable Small World) indexes for efficient similarity search at scale.

Sources: datatune/core/deduplication.py:1-100

Workflow Summary

graph LR
    A[Load Data] --> B[Choose Interface]
    B --> C[Agent Mode]
    B --> D[Primitives Mode]
    C --> E[Natural Language]
    D --> F[Explicit Operations]
    E --> G[Auto Plan]
    F --> H[Manual Plan]
    G --> I[Execute]
    H --> I
    I --> J[LLM Processing]
    J --> K[Finalize]
    K --> L[Output]
  1. Load Data: Import data into Dask or Ibis DataFrames
  2. Choose Interface: Use Agent for automatic planning or Primitives for explicit control
  3. Define Operations: Write natural language prompts or configure operations
  4. Execute: Process data with LLM integration
  5. Finalize: Convert lazy operations to computed results

Sources: README.md:1-50

Installation and Quick Start

pip install datatune

The complete workflow involves initializing an LLM, loading data, applying transformations, and finalizing results:

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Apply transformations
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

# Save results
result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

Sources: README.md:20-55

Sources: README.md:40-56

Getting Started

Related topics: Introduction to Datatune

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Basic Setup

Continue reading this section for the full explanation and source context.

Section Map Operation

Continue reading this section for the full explanation and source context.

Section Filter Operation

Continue reading this section for the full explanation and source context.

Related topics: Introduction to Datatune

Getting Started

Datatune is a data transformation library that leverages Large Language Models (LLMs) to perform complex data operations using natural language prompts. It enables users to transform, filter, map, and process data without writing extensive custom code.

Installation

Install datatune using pip:

pip install datatune

Sources: README.md:1

Core Concepts

Datatune provides two primary interfaces for data transformation:

InterfaceDescriptionUse Case
PrimitivesDirect operations (Map, Filter)Single, focused transformations
AgentAI-powered automatic planningComplex multi-step workflows

Sources: README.md:20-30

Quick Start

Basic Setup

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

Sources: README.md:26-32

Map Operation

Use dt.map() to extract or derive new columns from existing data:

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

Sources: README.md:34-41

Filter Operation

Use dt.filter() to keep only rows matching criteria:

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

Sources: README.md:44-49

Finalize and Save

result = dt.finalize(filtered)
result.compute().to_csv("electronics_products.csv")

Sources: README.md:51-53

Supported LLMs

Datatune supports multiple LLM providers through a unified interface.

OpenAI

from datatune.llm.llm import OpenAI
llm = OpenAI(model_name="gpt-3.5-turbo")

Sources: README.md:58-61

Ollama (Local Models)

from datatune.llm.llm import Ollama
llm = Ollama()

Default configuration:

  • Model: gemma3:4b
  • API Base: http://localhost:11434

Sources: README.md:63-65

Azure OpenAI

from datatune.llm.llm import Azure
llm = Azure(model_name="gpt-3.5-turbo", api_key=api_key)

Sources: README.md:67-70

Additional Providers

ProviderClassNotes
MistralMistralRequires mistral/ prefix in model name
HuggingfaceHuggingfaceRequires huggingface/ prefix
VLLMVLLMAuto-detects max token limit

Sources: datatune/llm/llm.py:1-150

Agent Mode

The Agent provides an intelligent, automated approach to data transformation. It automatically determines which operations to use and chains multiple transformations from a single natural language prompt.

Basic Agent Usage

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = agent.do("Add ProfitMargin column and keep only African organizations", df)
result = dt.finalize(df)

Sources: README.md:74-82

Agent Capabilities

The agent automatically:

  • Determines which operations to use (map, filter, etc.)
  • Chains multiple transformations
  • Handles complex multi-step tasks from a single prompt
  • Generates and executes Python code along with row-level primitives (Map, Filter, etc) if required

Sources: README.md:84-88

Agent System Prompt

The agent has access to:

  • Python builtins
  • pandas
  • numpy
  • dask
  • datatune library primitives (Map, Filter)

Sources: datatune/agent/__init__.py:1-35

Data Sources

Datatune supports multiple data processing backends.

Dask DataFrames

import dask.dataframe as dd
df = dd.read_csv("data.csv")

Ibis Backend

Ibis provides connectivity to multiple databases:

import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

Sources: README.md:93-102

Architecture Overview

graph TD
    A[User Prompt] --> B[Datatune API]
    B --> C{Operation Type}
    C -->|Map| D[map_dask.py / map_ibis.py]
    C -->|Filter| E[filter_dask.py / filter_ibis.py]
    C -->|Agent| F[Agent Planning]
    D --> G[LLM Batch Processing]
    E --> G
    F -->|Dask Ops| H[Dask Templates]
    F -->|Primitives| I[Map / Filter Primitives]
    G --> J[Output DataFrame]
    H --> J
    I --> J

Rate Limits Configuration

When using LLMs, datatune respects model-specific rate limits:

ParameterDescription
tpmTokens per minute
rpmRequests per minute

Sources: datatune/llm/model_rate_limits.py:1-50

Default rate limits for unknown models fall back to gpt-3.5-turbo settings with a warning logged.

Sources: datatune/llm/llm.py:20-35

Primitive Operations

Map Operation

Creates new columns by applying LLM-based transformations to input columns.

Parameters:

ParameterTypeDescription
promptstrNatural language description of the transformation
input_fieldslistInput column names
output_fieldslistNames for new columns

Filter Operation

Removes rows that don't meet the specified criteria.

Parameters:

ParameterTypeDescription
promptstrNatural language filtering criteria
input_fieldslistColumns to evaluate

Next Steps

Sources: README.md:105-113

Sources: README.md:1

System Architecture

Related topics: Dask Backend, Ibis Backend, LLM Integration

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Class Hierarchy

Continue reading this section for the full explanation and source context.

Section Base LLM Class

Continue reading this section for the full explanation and source context.

Section Provider Implementations

Continue reading this section for the full explanation and source context.

Related topics: Dask Backend, Ibis Backend, LLM Integration

System Architecture

Overview

Datatune is a data transformation framework that leverages Large Language Models (LLMs) to perform semantic operations on distributed data. The system provides a declarative interface for common data manipulation tasks like mapping, filtering, and deduplication while abstracting the complexity of distributed computing and LLM interaction.

The architecture follows a modular design with three primary layers:

LayerPurposeKey Components
LLM LayerModel abstraction and rate limitingLLM, OpenAI, Ollama, VLLM, Azure
Core Operations LayerData transformation primitivesMap, Filter, Reduce, Deduplication
Agent LayerHigh-level orchestration and planningAgent, Plan execution engine

Sources: datatune/llm/llm.py:1-100

High-Level Architecture Diagram

graph TB
    subgraph "Client Layer"
        User[User Code]
    end
    
    subgraph "LLM Layer"
        LLM[Base LLM Class]
        OpenAI[OpenAI Provider]
        Ollama[Ollama Provider]
        VLLM[VLLM Provider]
        Azure[Azure Provider]
        RateLimits[model_rate_limits]
    end
    
    subgraph "Core Operations Layer"
        Map[Map Operation]
        Filter[Filter Operation]
        Reduce[Reduce Operation]
        Dedupe[Deduplication]
    end
    
    subgraph "Data Backend Layer"
        Dask[Dask DataFrame]
        Ibis[Ibis (DuckDB, PostgreSQL, BigQuery)]
    end
    
    subgraph "Agent Layer"
        Agent[Agent Orchestrator]
        Planner[Plan Generator]
        Executor[Plan Executor]
    end
    
    User --> Agent
    User --> Map
    User --> Filter
    
    Agent --> Planner
    Agent --> Executor
    Agent --> LLM
    
    Map --> LLM
    Filter --> LLM
    
    Map --> Dask
    Map --> Ibis
    Filter --> Dask
    Filter --> Ibis
    
    LLM --> RateLimits

LLM Layer

The LLM layer provides a unified interface for interacting with various LLM providers while handling rate limiting and token counting.

Class Hierarchy

graph TD
    LLM[LLM Base Class] --> Ollama[Ollama]
    LLM --> OpenAI[OpenAI]
    LLM --> VLLM[VLLM]
    LLM --> Azure[Azure]

Base LLM Class

The LLM class serves as the foundation for all LLM providers. It handles model naming, rate limit configuration, and request batching.

ParameterTypeDescription
model_namestrModel identifier in format provider/model (e.g., openai/gpt-3.5-turbo)
tpmintTokens per minute limit
rpmintRequests per minute limit
api_keystrAPI authentication key
api_basestrBase URL for API endpoint

Sources: datatune/llm/llm.py:70-95

Provider Implementations

OpenAI Provider

class OpenAI(LLM):
    def __init__(
        self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
    ):
        kwargs.update({"api_key": api_key})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)

Sources: datatune/llm/llm.py:96-103

Ollama Provider (Local)

class Ollama(LLM):
    def __init__(
        self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
    ) -> None:
        super().__init__(
            model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
        )

Sources: datatune/llm/llm.py:81-87

VLLM Provider

The VLLM provider automatically retrieves the max_model_len from the API and configures token limits accordingly.

class VLLM(LLM):
    def __init__(self, model_name: str, api_base: str = "http://localhost:8000/v1", **kwargs):
        import httpx
        resp = httpx.get(f"{api_base}/models")
        max_model_len = resp.json()["data"][0]["max_model_len"]
        kwargs.update({"api_base": api_base, "api_key": "dummy", "max_tokens": max_model_len})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)

Sources: datatune/llm/llm.py:105-118

Rate Limiting

The system maintains predefined rate limits for various models in model_rate_limits.py:

Model FamilyTPMRPM
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500
gpt-4.1-mini/nano200,000500

Sources: datatune/llm/model_rate_limits.py:1-50

Core Operations Layer

The core operations layer provides primitive transformations that can be applied to distributed data.

Map Operation

The Map operation creates new columns by applying LLM-based transformations to existing data.

graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[Batch to LLM]
    C --> D[Parse Responses]
    D --> E[Extract New Columns]
    E --> F[Output DataFrame]

Input Format: Rows are serialized to Python dictionaries with the following structure:

index=<row_index>|{key1: value1, key2: value2, ...}

Output Format: LLM returns dictionaries with original keys plus new field keys:

index=<row_index>|{key1: value1, key2: value2, new_field: new_value}

Sources: datatune/core/dask/map_dask.py:1-50

Key Features:

  • Handles duplicate rows via canonical mapping
  • Supports batched API calls for efficiency
  • Falls back to canonical row values for duplicates

Filter Operation

The Filter operation removes rows that do not meet specified criteria using LLM-based evaluation.

graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[LLM Decision]
    C --> D{Keep Row?}
    D -->|Yes| E[Include in Output]
    D -->|No| F[Exclude Row]

Decision Format: Each row receives a __filter__ key:

index=<row_index>|{..., __filter__: True/False}

Sources: datatune/core/dask/filter_dask.py:1-50

Deduplication

The deduplication system uses embeddings and FAISS for efficient similarity search:

ComponentDescription
embedding_modelModel for generating text embeddings
sim_thresholdMinimum similarity score (default: 0.90)
top_kNumber of neighbors to search
hnsw_mHNSW index parameter for m
ef_searchHNSW search parameter

Workflow:

  1. Embed column values in batches of 256
  2. Build FAISS HNSW index per partition
  3. Search for similar records above threshold
  4. Cluster duplicates with canonical ID

Sources: datatune/core/deduplication.py:1-80

Agent Layer

The Agent layer provides high-level orchestration, automatically determining which operations to use and generating execution plans.

graph TD
    A[User Goal] --> B[Plan Generator]
    B --> C[JSON Plan]
    C --> D[Plan Executor]
    D --> E[Step 1: Operation]
    D --> F[Step 2: Operation]
    D --> G[Step N: Operation]
    E --> H[Finalize]
    F --> H
    G --> H
    H --> I[Computed Result]

Plan Structure

Plans are generated as JSON arrays of steps:

[
  {
    "type": "primitive",
    "operation": "map",
    "params": {
      "subprompt": "Extract category from industry",
      "input_fields": ["Industry"],
      "output_fields": ["Category"]
    }
  },
  {
    "type": "dask",
    "operation": "add_column",
    "params": {
      "new_column": "Year",
      "expression": "df['Date'].dt.year"
    }
  }
]

Sources: datatune/agent/agent.py:80-120

Step Types

TypeDescriptionOperations
primitiveLLM-based transformationsmap, filter
daskNative Dask operationsadd_column, group_by, apply_function, rename_columns, astype_column

Template System

Templates define how operations are translated to executable code:

TEMPLATE = {
    "primitive": {
        "Map": "df = dt.Map(...)(self.llm, df)",
        "Filter": "df = dt.Filter(...)(self.llm, df)"
    },
    "dask": {
        "add_column": "df = df.assign({new_column}=lambda x: {expression})",
        "group_by_agg": "df = df.groupby({group_columns}).agg({aggregations})"
    }
}

Sources: datatune/agent/agent.py:40-70

Data Backend Support

Dask Integration

Datatune primarily uses Dask for distributed computing. Operations are executed partition-by-partition with lazy evaluation:

# Example: Map operation on Dask DataFrame
df = dd.read_csv("data.csv")
mapped = dt.map(
    prompt="Extract sentiment from text",
    output_fields=["sentiment"],
    input_fields=["text"]
)(llm, df)

# Finalize and compute
result = dt.finalize(mapped).compute()

Ibis Integration

Ibis provides support for multiple backends (DuckDB, PostgreSQL, BigQuery):

# DuckDB example
import ibis
con = ibis.duckdb.connect("data.duckdb")
table = con.table("my_table")

# Apply map operation
mapped_table = dt.map(
    prompt="Extract category",
    output_fields=["category"],
    input_fields=["description"]
)(llm, table)

Sources: datatune/core/ibis/map_ibis.py:1-50

Request Batching

The LLM layer implements intelligent batching to optimize API usage:

graph TD
    A[Input Rows] --> B[Calculate Token Budget]
    B --> C{Within Limits?}
    C -->|Yes| D[Batch All]
    C -->|No| E[Split into Batches]
    E --> F[Execute Batch 1]
    E --> G[Execute Batch 2]
    F --> H[Merge Results]
    G --> H
    D --> H

Token Calculation:

  • Prefix/suffix tokens are calculated via token_counter
  • nrows_per_api_call determines optimal batch sizes
  • Rate limits (TPM/RPM) are enforced per model

Sources: datatune/llm/llm.py:30-70

Error Handling

The system implements robust error handling at multiple levels:

LayerError Handling
AgentCatches execution errors, returns step number and error message
Core OperationsGracefully handles malformed LLM responses with fallback to empty dict
LLMRetries with exponential backoff on transient failures
def parse_filter_output(output: Union[str, Exception], err: bool = True) -> Optional[bool]:
    try:
        d = ast.literal_eval(str(output).strip())
        if d:
            last_key = list(d.keys())[-1]
            flag = d[last_key]
            return flag
    except Exception:
        return None

Sources: datatune/core/dask/filter_dask.py:50-65

Configuration Options

LLM Configuration

OptionDefaultDescription
model_namegpt-3.5-turboModel identifier
tpmModel-specificTokens per minute limit
rpmModel-specificRequests per minute limit
api_keyNoneAPI authentication
api_baseProvider-specificAPI endpoint URL

Operation Configuration

OptionDescription
promptNatural language instruction
input_fieldsColumns to use as input
output_fieldsNew columns to create (Map only)
optimizedEnable batching optimization

Summary

Datatune's system architecture provides:

  1. Provider Abstraction: Unified interface across OpenAI, Ollama, VLLM, and Azure
  2. Rate Limiting: Automatic enforcement of TPM/RPM limits per model
  3. Distributed Execution: Partition-based processing for large datasets
  4. Multi-Backend Support: Native support for Dask and Ibis backends
  5. Intelligent Planning: Agent-based orchestration for complex multi-step transformations
  6. Semantic Operations: LLM-powered Map and Filter for text understanding
  7. Deduplication: Embedding-based similarity search with FAISS

Sources: datatune/llm/llm.py:1-100

Map Operations

Related topics: Filter Operations, Reduce Operations, Dask Backend

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Component Flow

Continue reading this section for the full explanation and source context.

Section Backend Implementations

Continue reading this section for the full explanation and source context.

Section Function Signature

Continue reading this section for the full explanation and source context.

Related topics: Filter Operations, Reduce Operations, Dask Backend

Map Operations

Map Operations in Datatune enable LLM-powered column transformations on distributed dataframes. Users describe desired transformations in natural language, and the system generates new columns by leveraging large language models to process each row semantically.

Overview

The Map operation is a core primitive in Datatune that bridges natural language instructions with data transformations. It accepts a user-defined prompt describing what new columns to create, identifies relevant input columns, and produces one or more output columns populated by LLM inference. Sources: datatune/__init__.py:3

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

Sources: README.md:18-29

Architecture

The Map system is designed with a pluggable backend architecture, supporting both Dask and Ibis execution contexts. Each backend implements the same logical interface while adapting to its specific dataframe API.

Component Flow

graph TD
    A[User Prompt + Input/Output Fields] --> B[map Function Entry Point]
    B --> C{Backend Type}
    C -->|Dask| D[map_dask.py]
    C -->|Ibis| E[map_ibis.py]
    D --> F[LLM Batch Processor]
    E --> F
    F --> G[Parse LLM Output]
    G --> H[Merge Results to DataFrame]
    H --> I[Output DataFrame with New Columns]

Backend Implementations

BackendFilePurpose
Daskdatatune/core/dask/map_dask.pyDistributed DataFrame operations using Dask
Ibisdatatune/core/ibis/map_ibis.pySQL-like backend supporting DuckDB, PostgreSQL, BigQuery

API Reference

Function Signature

def map(
    prompt: str,
    output_fields: List[str],
    input_fields: Optional[List[str]] = None,
    **kwargs
) -> Callable

Parameters

ParameterTypeRequiredDescription
promptstrYesNatural language description of the transformation to apply
output_fieldsList[str]YesNames of columns to create in the output
input_fieldsList[str]OptionalInput columns to pass to the LLM; if omitted, all columns are used

Sources: datatune/core/dask/map_dask.py:1-50

Return Value

Returns a callable that accepts (llm, dataframe) and returns a transformed dataframe with the new columns appended.

Dask Backend Implementation

Data Flow

graph LR
    A[Partitioned DataFrame] --> B[Serialize Input Columns]
    B --> C[Batch Rows to LLM]
    C --> D[LLM Inference]
    D --> E[Parse Dictionary Output]
    E --> F[Assign to Output Columns]
    F --> G[Merge Canonical Results to Duplicates]
    G --> H[Return Partitioned DataFrame]

Prompt Construction

The Dask implementation constructs prompts using a prefix-suffix pattern to ensure consistent LLM responses.

Prefix Structure:

Use the following context to create new columns from existing columns.

Sources: datatune/core/dask/map_dask.py:10-15

Suffix Structure:

suffix = (
    f"{os.linesep}{os.linesep}"
    "Your response MUST be the entire input record as a valid Python dictionary in the format"
    "'index=<row_index>|{key1: value1, key2: value2, ...}'  with added keys of expected new fields if any."
     
    "ALWAYS START YOUR RESPONSE WITH 'index=<row_index>|' WHERE <row_index> IS THE INDEX OF THE ROW." \
    "IF A VALUE FOR A COLUMN DOES NOT EXIST SET IT TO null" \
    "'index=<row_index>|{key1: None, key2: value2, ...}'"
)

Sources: datatune/core/dask/map_dask.py:22-30

Duplicate Handling

The Dask Map implementation includes sophisticated duplicate handling for semantic deduplication scenarios. When clusters of duplicate rows exist, only the canonical row is sent to the LLM, and results are propagated to duplicates.

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

canonical_idx = input_series.index.difference(dup_to_canon.keys())
canonical_input = input_series.loc[canonical_idx]

llm_out = llm(canonical_input, prefix, prompt, suffix, optimized=True)

df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

Sources: datatune/core/dask/map_dask.py:35-50

Output Parsing

The LLM output is parsed using parse_llm_output, which converts the string response to a Python dictionary.

def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
    """
    Parses the LLM output string into a Python dictionary.
    """

Sources: datatune/core/dask/map_dask.py:75-85

Ibis Backend Implementation

Data Flow

graph LR
    A[Ibis Table] --> B[Add Row ID Column]
    B --> C[Execute to Local Data]
    C --> D[Convert to List]
    D --> E[Batch to LLM]
    E --> F[Parse Results]
    F --> G[Create MemTable]
    G --> H[Join Back to Original Table]

Row Indexing

Ibis operations require explicit row indexing since SQL tables don't maintain native pandas-like indices.

indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))

local_data = indexed_table.select("_ROW_ID_", input_col).execute()
    
input_list = local_data[input_col].tolist()

Sources: datatune/core/ibis/map_ibis.py:25-32

Result Processing

Raw LLM outputs are parsed and converted to JSON strings for storage:

processed_results = []
for res in raw_results:
    try:
        py_dict = ast.literal_eval(str(res).strip())
        processed_results.append(json.dumps(py_dict))
    except Exception:
        processed_results.append("{}")

Sources: datatune/core/ibis/map_ibis.py:45-52

LLM Integration

Supported LLM Providers

ProviderClassDefault Model
OpenAIOpenAIgpt-3.5-turbo
OllamaOllamagemma3:4b
AzureAzureConfigurable
vLLMVLLMConfigurable

Sources: datatune/llm/llm.py:1-60

Batching and Rate Limiting

The LLM layer implements token and request rate limiting based on model-specific constraints defined in model_rate_limits.py.

model_rate_limits = {
    "gpt-3.5-turbo": {
        "tpm": 200_000,
        "rpm": 500,
    },
    "gpt-4": {
        "tpm": 10_000,
        "rpm": 500,
    },
    "gpt-4o": {
        "tpm": 30_000,
        "rpm": 500,
    },
}

Sources: datatune/llm/model_rate_limits.py:1-20

Batch Processing

The LLM batches multiple rows into single API calls for efficiency:

for i, prompt in enumerate(input_rows):
    q  # Batch construction continues...

Sources: datatune/llm/llm.py:30-45

Agent Integration

The Datatune Agent can automatically generate Map operations as part of multi-step data transformation pipelines.

Agent System Prompt

class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    Map Example:
    ```python
    import datatune as dt
    import dask.dataframe as dd
    df = dd.read_csv("path/to/data.csv")
    map = dt.Map(prompt="Your prompt here")
    llm = dt.LLM(model_name="gpt-3.5-turbo")
    mapped_df = map(llm, df)
    mapped_df = dt.finalize(mapped_df)
    ```
    """

Sources: datatune/agent/__init__.py:1-30

Plan Generation

When the Agent generates a plan requiring Map operations, it creates steps with the following structure:

{
    "type": "primitive",
    "operation": "map",
    "params": {
        "subprompt": "Extract category and sub-category from industry",
        "input_fields": ["Industry"],
        "output_fields": ["Category","Sub-Category"]
    },
}

Sources: datatune/agent/agent.py:50-65

Usage Examples

Basic Column Extraction

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-4")
df = dd.read_csv("customers.csv")

# Extract sentiment and category from review text
mapped = dt.map(
    prompt="Analyze the customer feedback and extract sentiment (positive/negative/neutral) and main topic",
    output_fields=["Sentiment", "Topic"],
    input_fields=["CustomerFeedback"]
)(llm, df)

result = dt.finalize(mapped)
result.compute().to_csv("enriched_customers.csv")

Multi-Column Generation

# Generate multiple derived columns in a single Map operation
mapped = dt.map(
    prompt="Parse the product description to identify: 1) the material, 2) primary color, 3) target demographic",
    output_fields=["Material", "Color", "Demographic"],
    input_fields=["ProductDescription", "ProductName"]
)(llm, df)

With Agent

import datatune as dt
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-4")
agent = dt.Agent(llm)

df = dd.read_csv("data.csv")
# Agent automatically determines when to use Map operations
result = agent.do("Add ProfitMargin column and categorize by revenue tier", df)

Sources: README.md:35-45

Best Practices

Input Field Selection

PracticeRationale
Provide specific input fieldsReduces token usage and improves accuracy
Include context-rich columnsHelps LLM understand the domain
Avoid overly large inputsPrevents token limit issues

Prompt Engineering

  • Be explicit about the expected output format in output field names
  • Include examples in prompts for complex transformations
  • Break complex extractions into multiple Map operations when needed

Performance Considerations

FactorImpactRecommendation
Batch sizeThroughput vs. latencyUse default batching for most cases
Model selectionQuality vs. speedUse gpt-3.5-turbo for bulk, gpt-4 for complex tasks
Partition countParallelismMatch to cluster size for Dask operations

See Also

Sources: README.md:18-29

Filter Operations

Related topics: Map Operations, Reduce Operations, Ibis Backend

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Component Overview

Continue reading this section for the full explanation and source context.

Section dt.filter()

Continue reading this section for the full explanation and source context.

Section Source Files

Continue reading this section for the full explanation and source context.

Related topics: Map Operations, Reduce Operations, Ibis Backend

Filter Operations

Filter Operations in Datatune enable intelligent, LLM-powered row filtering based on natural language criteria. Instead of writing explicit boolean conditions, users describe what rows to keep in plain English, and the LLM determines which records match the specified criteria.

Overview

The Filter module provides a declarative interface for semantic row-level filtering of data. It supports multiple backends (Dask DataFrames and Ibis tables) and uses LLMs to make filtering decisions based on natural language prompts.

Key capabilities:

  • Natural language filter criteria specification
  • Multi-backend support (Dask, Ibis)
  • Semantic deduplication integration
  • Parallelized LLM inference
  • Structured output parsing

Architecture

The Filter system follows a dispatcher pattern that routes to backend-specific implementations based on the input data type.

graph TD
    A[User calls filter prompt, input_fields] --> B[datatune.filter function]
    B --> C{Data Type Detection}
    C -->|dask.dataframe.DataFrame| D[_filter_dask]
    C -->|ibis.Table| E[_filter_ibis]
    D --> F[LLM Batch Processing]
    E --> F
    F --> G[Parse Filter Output]
    G --> H[Apply Boolean Mask]
    H --> I[Filtered DataFrame/Table]

Component Overview

ComponentFileResponsibility
filter()datatune/core/filter.pyMain entry point, type dispatch
_filter_dask()datatune/core/dask/filter_dask.pyDask DataFrame filtering
_filter_ibis()datatune/core/ibis/filter_ibis.pyIbis table filtering
LLM Integrationdatatune/llm/llm.pyBatch inference and token management

API Reference

`dt.filter()`

Main filter function exported from datatune.

def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        # Implementation dispatch
        ...
    return apply

#### Parameters

ParameterTypeRequiredDescription
promptstrYesNatural language description of filter criteria
input_fieldsList[str]NoColumns to use for filtering decisions
clustersList[Dict]NoSemantic deduplication clusters for propagating filter decisions

#### Returns

Returns a callable that accepts (llm, data) and returns a filtered DataFrame or Table.

Dask Implementation

The Dask backend processes rows in parallel across partitions, leveraging the Dask execution graph for distributed computation.

Source Files

Workflow

graph LR
    A[Input DataFrame] --> B[Serialize Input Columns]
    B --> C[Handle Clusters]
    C --> D[Extract Canonical Indices]
    D --> E[LLM Batch Inference]
    E --> F[Parse Boolean Output]
    F --> G[Propagate to Duplicates]
    G --> H[Apply Boolean Mask]

Key Functions

#### _filter_dask()

Core filtering function for Dask DataFrames:

def _filter_dask(
    prompt: str,
    input_fields: Optional[List[str]] = None,
    clusters: Optional[List[Dict]] = None,
):
    # Validates inputs, constructs prompts, invokes LLM

#### parse_filter_output()

Parses LLM output string into boolean values:

def parse_filter_output(
    output: Union[str, Exception], 
    err: bool = True
) -> Optional[bool]:

Ibis Implementation

The Ibis backend executes filtering operations at the database level, enabling pushdown to various SQL backends (DuckDB, PostgreSQL, BigQuery, etc.).

Source Files

Workflow

graph TD
    A[Input Table] --> B[Add Row ID Column]
    B --> C[Select Input Columns]
    C --> D[Execute to Local]
    D --> E[LLM Batch Inference]
    E --> F[Parse Filter Results]
    F --> G[Build Boolean Filter]
    G --> H[Filter Original Table]
    H --> I[Remove Row ID Column]

Key Functions

#### _filter_ibis()

Core filtering function for Ibis tables:

def _filter_ibis(
    table: ibis.Table,
    prompt: str,
    input_fields: Optional[List[str]] = None,
):
    indexed_table = table.mutate(_ROW_ID_=ibis.row_number().cast("int64"))

Uses ibis.row_number() to assign sequential row IDs that match LLM response indexing. Sources: datatune/core/ibis/filter_ibis.py:28

LLM Prompt Format

The Filter module constructs specialized prompts to guide LLM responses for filtering decisions.

Prompt Structure

FILTERING CRITERIA:
<prompt>

[input record dictionary]

DECISION: Your response MUST be the entire input record as Python dictionary in the format:
index=<row_index>|{key1: value1, ...}<endofrow> with added key called '__filter__' 
with value either True to KEEP the record or False to REMOVE it.

Output Format

The LLM must return records with an added __filter__ key:

index=0|{'Name': 'Acme Corp', 'Country': 'USA', '__filter__': True}<endofrow>
index=1|{'Name': 'Bad Inc', 'Country': 'UK', '__filter__': False}<endofrow>

#### Response Requirements

RequirementDescription
Index prefixEach response starts with `index=<row_index>\`
Complete recordInclude all original keys plus __filter__
Boolean value__filter__ must be True (keep) or False (remove)
Missing valuesSet to None if column value does not exist

Sources: datatune/core/dask/filter_dask.py:14-19

Semantic Deduplication Integration

When clusters parameter is provided, filter decisions propagate from canonical records to their duplicates.

Cluster Structure

clusters = [
    {
        "canonical_id": 5,
        "duplicate_ids": [12, 45, 78]
    },
    ...
]

Propagation Logic

graph TD
    A[Clusters Input] --> B[Build dup_to_canon Map]
    B --> C[Extract Canonical Indices]
    C --> D[Filter Only Canonical Rows]
    D --> E[LLM Inference on Canonical]
    E --> F[Propagate Decisions to Duplicates]
    F --> G[Merge Results]

The mapping builds:

dup_to_canon = {
    dup: c["canonical_id"]
    for c in clusters
    for dup in c["duplicate_ids"]
}

Sources: datatune/core/dask/filter_dask.py:41-46

Usage Examples

Basic Dask Filtering

import datatune as dt
import dask.dataframe as dd
from datatune.llm.llm import OpenAI

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Keep only electronics products
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name", "Description"]
)(llm, df)

result = dt.finalize(filtered)

Ibis Table Filtering

import datatune as dt
import ibis

con = ibis.duckdb.connect("data.duckdb")
table = con.table("customers")

# Filter using natural language
filtered = dt.filter(
    prompt="Keep only customers from African countries",
    input_fields=["Country", "Region"]
)(llm, table)

With Semantic Deduplication

# First deduplicate to find clusters
deduplicator = dt.SemanticDeduplicator(llm=llm)
clusters = deduplicator.find_duplicates(df, columns=["Name", "Address"])

# Filter with cluster awareness
filtered = dt.filter(
    prompt="Keep only verified organizations",
    input_fields=["Status", "VerificationDate"],
    clusters=clusters
)(llm, df)

Error Handling

The parse_filter_output() function handles various error cases:

Error TypeHandlingReturn Value
Parse failureReturns None if err=FalseNone
ExceptionReturns exception if err=TrueOriginal exception
Empty outputReturns NoneNone
Malformed dictFalls back to NoneNone

Sources: datatune/core/dask/filter_dask.py:64-79

Rate Limiting

Filter operations respect LLM rate limits defined in datatune/llm/model_rate_limits.py:

ModelTPM (tokens/min)RPM (requests/min)
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500

The batching system automatically respects these limits during batch LLM inference.

Performance Considerations

Batching Strategy

Rows are batched for LLM inference to optimize throughput:

# From datatune/llm/llm.py
for i, prompt in enumerate(input_rows):
    # Batch prompts based on token limits
    q  # Accumulate until batch size reached

Partition Parallelism (Dask)

Dask DataFrames process partitions in parallel:

graph LR
    A[Partition 0] --> E[LLM Batch]
    B[Partition 1] --> F[LLM Batch]
    C[Partition 2] --> G[LLM Batch]
    E --> H[Results Merged]
    F --> H
    G --> H

Backend Selection

BackendBest ForLimitations
DaskLarge datasets, local processingMemory-bound
IbisDatabase pushdown, SQL backendsRequires database connection

See Also

Sources: datatune/core/dask/filter_dask.py:14-19

Reduce Operations

Related topics: Map Operations, Filter Operations

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Action Registry Pattern

Continue reading this section for the full explanation and source context.

Section Class Diagram

Continue reading this section for the full explanation and source context.

Section Main Function

Continue reading this section for the full explanation and source context.

Related topics: Map Operations, Filter Operations

Reduce Operations

Overview

Reduce Operations in datatune provide a mechanism for aggregating and condensing data using configurable action handlers. The reduce system follows a plugin-based architecture where specific aggregation or reduction behaviors are registered as actions and invoked through a unified interface.

The core purpose of reduce operations is to transform a DataFrame into aggregated results by applying registered reduction actions. This enables users to perform complex aggregation tasks that go beyond standard group-by operations, leveraging LLM-powered semantic understanding when needed.

Sources: datatune/core/reduce.py:1-22

Architecture

Action Registry Pattern

The reduce system implements a decorator-based registry pattern for action registration:

graph TD
    A[User calls reduce df action] --> B[reduce function invoked]
    B --> C[get_action looks up registry]
    C --> D{Action found?}
    D -->|Yes| E[Instantiate action class]
    D -->|No| F[Raise ValueError]
    E --> G[Execute reduction on DataFrame]
    G --> H[Return aggregated result]

The _ACTIONS dictionary serves as a central registry mapping action names to their implementation classes. The register_action decorator allows new actions to be dynamically registered without modifying core logic.

Sources: datatune/core/reduce.py:1-22

Class Diagram

classDiagram
    class _ACTIONS {
        Dict[str, Type]
    }
    
    class register_action {
        +decorator(name: str)
    }
    
    class get_action {
        +function(name: str) Type
    }
    
    class reduce {
        +function(df, action, **kwargs)
    }
    
    register_action ..> _ACTIONS : registers
    get_action ..> _ACTIONS : queries
    reduce ..> get_action : uses

API Reference

Main Function

def reduce(df, *, action: str, **kwargs):
    cls = get_action(action)
    reducer = cls(**kwargs)   
    return reducer(df)

#### Parameters

ParameterTypeRequiredDescription
dfdask.dataframe.DataFrameYesThe input Dask DataFrame to reduce
actionstrYesThe name of the registered reduction action to execute
**kwargsVariousNoAdditional keyword arguments passed to the action constructor

#### Returns

Return TypeDescription
AnyReturns the result of the specific reduction action (varies by registered action)

#### Exceptions

ExceptionCondition
ValueErrorWhen the specified action name is not registered

Sources: datatune/core/reduce.py:19-22

Register Action Decorator

def register_action(name):
    def decorator(cls):
        _ACTIONS[name] = cls
        return cls
    return decorator

#### Usage Pattern

@register_action("my_reducer")
class MyReducer:
    def __init__(self, **kwargs):
        # Initialize reducer configuration
        pass
    
    def __call__(self, df):
        # Perform reduction
        return result

Get Action Function

def get_action(name):
    try:
        return _ACTIONS[name]
    except KeyError:
        raise ValueError(f"Unknown action: {name}")

Workflow Execution

Execution Flow

graph TD
    A[Start] --> B[Import reduce module]
    B --> C[Call reduce df action]
    C --> D[get_action retrieves class]
    D --> E[Instantiate with kwargs]
    E --> F[Call reducer on DataFrame]
    F --> G[Return result]
    G --> H[End]

Integration with Datatune Core

The reduce operation integrates with other datatune components through the public API:

import datatune as dt

# reduce is exported from datatune namespace
result = dt.reduce(df, action="specific_action", param=value)

Sources: datatune/__init__.py:1-7

Registered Actions

Based on the repository structure, reduce operations support extensibility through custom registered actions. The system is designed to work with Dask DataFrames and can integrate with:

  • SemanticDeduplicator: For deduplication using embedding-based similarity matching
  • Custom aggregations: User-defined reduction logic via the decorator pattern

SemanticDeduplicator Integration

While primarily a deduplication tool, the SemanticDeduplicator class in datatune/core/deduplication.py demonstrates how reduce-style operations work:

class SemanticDeduplicator:
    def __init__(
        self,
        llm,
        embedding_model: str = "text-embedding-3-small",
        sim_threshold: float = 0.90,
        top_k: int = 50,
        hnsw_m: int = 32,
        ef_search: int = 64,
        return_df: bool = False,
    ):
        # Configuration initialization

The deduplicator processes data through embedding generation, FAISS index building, and similarity search phases, demonstrating how reduce operations handle complex multi-stage transformations.

Sources: datatune/core/deduplication.py:1-150

Extension Points

Creating Custom Reduce Actions

To create a custom reduction action:

  1. Define a class with appropriate __init__ and __call__ methods
  2. Decorate with @register_action("action_name")
  3. The class should accept **kwargs for flexible configuration
  4. Implement __call__(self, df) to receive the DataFrame and return the result
from datatune.core.reduce import register_action

@register_action("custom_aggregate")
class CustomAggregateReducer:
    def __init__(self, group_by_column, agg_column, operation="sum", **kwargs):
        self.group_by_column = group_by_column
        self.agg_column = agg_column
        self.operation = operation
    
    def __call__(self, df):
        # Custom reduction logic
        return result_df

Best Practices

PracticeRationale
Use descriptive action namesEnsures clear identification in get_action calls
Accept **kwargs in __init__Maintains compatibility with the reduce function signature
Return consistent typesEnables predictable downstream processing
Handle empty DataFramesPrevents errors during edge case processing
ComponentFileRelationship
Filterdatatune/core/dask/filter_dask.pySimilar pattern for row-level operations
Mapdatatune/core/dask/map_dask.pyColumn transformation counterpart
Finalizedatatune/core/dask/op.pyCompletes lazy Dask computations
Agentdatatune/agent/agent.pyOrchestrates multi-step transformations

Sources: datatune/core/dask/filter_dask.py:1-50 Sources: datatune/core/dask/map_dask.py:1-50

Summary

Reduce Operations provide a flexible, extensible mechanism for aggregating and transforming data in datatune. The action registry pattern enables:

  • Dynamic action registration via the @register_action decorator
  • Centralized action lookup through the get_action function
  • Unified API through the reduce function interface
  • Custom extensibility for domain-specific reduction logic

The architecture follows proven design patterns that separate action registration from execution, allowing the core system to remain stable while supporting diverse reduction behaviors through plugin-style extensions.

Sources: datatune/core/reduce.py:1-22

Dask Backend

Related topics: Ibis Backend, System Architecture

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Map Operation

Continue reading this section for the full explanation and source context.

Section Filter Operation

Continue reading this section for the full explanation and source context.

Section Finalize Operation

Continue reading this section for the full explanation and source context.

Related topics: Ibis Backend, System Architecture

Dask Backend

The Dask Backend is the primary data processing engine in datatune, enabling large-scale, distributed data transformations using Large Language Models (LLMs). It extends the popular Dask library with semantic understanding capabilities, allowing users to perform complex data operations through natural language prompts while maintaining Dask's efficient parallel and out-of-core computation model.

Architecture Overview

The Dask Backend follows a modular architecture with three main layers:

  1. API Layer (datatune/core/map.py, datatune/core/filter.py) - Entry points that dispatch to appropriate backends
  2. Operation Layer (datatune/core/dask/) - Core transformations for Dask DataFrames
  3. LLM Integration Layer - Handles communication with language models for semantic operations
graph TD
    A[User Data] --> B[Dask DataFrame]
    B --> C{Operation Type}
    C -->|Map| D[map_dask.py]
    C -->|Filter| E[filter_dask.py]
    D --> F[LLM API]
    E --> F
    F --> G[Parse & Validate]
    G --> H[New Columns Added]
    H --> I[Lazy Dask DataFrame]
    I --> J[finalize]
    J --> K[Computed Result]
    
    L[Clusters] -.->|Optional| D
    L -.->|Optional| E

Core Operations

Map Operation

The map operation uses an LLM to generate new columns from existing data through natural language understanding. It is designed for semantic extraction, classification, and content transformation tasks.

Source: datatune/core/dask/map_dask.py

#### How Map Works

graph LR
    A[Input DataFrame] --> B[Serialize Rows]
    B --> C[Cluster Deduplication]
    C --> D[Batch for LLM]
    D --> E[LLM Processing]
    E --> F[Parse Output]
    F --> G[Expand to Columns]
    G --> H[Copy to Duplicates]
    H --> I[Output DataFrame]

#### Key Components

ComponentDescription
serialized_input_columnTemporary column storing row data as Python dictionaries
llm_output_columnTemporary column storing raw LLM responses
clustersOptional list of duplicate clusters for deduplication

#### Processing Flow

  1. Each row is serialized to a dictionary format
  2. If clusters are provided, duplicate handling is applied - only canonical rows are sent to LLM
  3. LLM receives batched prompts with mapping instructions
  4. Responses are parsed and new columns are extracted
  5. Results are propagated back to duplicate rows

Key code snippet from map_dask.py:

df.loc[canonical_idx, llm_output_column] = llm_out

for dup, canon in dup_to_canon.items():
    df.loc[dup, llm_output_column] = df.loc[canon, llm_output_column]

This ensures duplicate handling by copying canonical results to all duplicate rows.

Sources: datatune/core/dask/map_dask.py:34-48

Filter Operation

The filter operation uses an LLM to determine which rows to keep or remove based on natural language criteria.

Source: datatune/core/dask/filter_dask.py

#### Filter Output Format

The LLM returns decisions in a specific format with an added __filter__ key:

index=<row_index>|{key1: value1, ..., '__filter__': True/False}<endofrow>
DecisionBehavior
TrueKeep the row
FalseRemove the row

#### Processing Pipeline

graph TD
    A[Input DataFrame] --> B[Serialize to Dictionaries]
    B --> C[Apply Cluster Deduplication]
    C --> D[Send to LLM with Filter Prompt]
    D --> E[Parse __filter__ Value]
    E --> F[Filter DataFrame]
    F --> G[Output DataFrame]

Key code snippet for filter decision parsing:

suffix = (
    f"{os.linesep}{os.linesep}"
    "DECISION:Your response MUST be the entire input record as Python dictionary..."
    "ALWAYS STICK TO THE FORMAT index=<row_index>|{...} with added key called '__filter__'..."
)

Sources: datatune/core/dask/filter_dask.py:24-35

Finalize Operation

The finalize operation computes the lazy Dask DataFrame and converts it to a pandas DataFrame.

Source: datatune/core/dask/op.py

def finalize(df: dd.DataFrame) -> pd.DataFrame:
    """Compute the lazy Dask DataFrame and return a pandas DataFrame."""
    return df.compute()

Sources: datatune/core/dask/op.py

Semantic Deduplication

Both map and filter operations support semantic deduplication to reduce LLM API calls and costs.

Source: datatune/core/deduplication.py

How Deduplication Works

graph TD
    A[Input Series] --> B[Find Duplicate Clusters]
    B --> C{dup_to_canon mapping}
    C --> D[Extract Canonical Index]
    D --> E[Send Only Canonical to LLM]
    E --> F[Copy Results to Duplicates]
    
    style C fill:#f9f,color:#000

Cluster Format

clusters = [
    {"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
    {"canonical_id": 10, "duplicate_ids": [15, 20]}
]

This approach sends only unique/canonical rows to the LLM, then propagates results to all duplicates.

Sources: datatune/core/dask/map_dask.py:34-40

API Reference

Map Function

from datatune.core.map import map

result = map(
    prompt="Extract categories from the description",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"],
    clusters=None  # Optional deduplication clusters
)(llm, dask_dataframe)
ParameterTypeRequiredDescription
promptstrYesNatural language instructions for column generation
output_fieldsList[str]YesNames of new columns to create
input_fieldsList[str]NoColumns to use as input (defaults to all)
clustersList[Dict]NoDuplicate clusters for deduplication

Filter Function

from datatune.core.filter import filter

result = filter(
    prompt="Keep only electronics products",
    input_fields=["Name", "Description"],
    clusters=None  # Optional deduplication clusters
)(llm, dask_dataframe)
ParameterTypeRequiredDescription
promptstrYesNatural language criteria for filtering
input_fieldsList[str]NoColumns to evaluate against
clustersList[Dict]NoDuplicate clusters for deduplication

Finalize Function

from datatune.core.dask.op import finalize

pandas_df = finalize(lazy_dask_dataframe)

Sources: datatune/core/dask/op.py

LLM Output Format

The Dask Backend requires LLM responses in a specific parseable format for reliable operation.

Map Output Format

index=<row_index>|{key1: value1, key2: value2, ..., 'new_field1': val, 'new_field2': val}<endofrow>

Filter Output Format

index=<row_index>|{key1: value1, key2: value2, ..., '__filter__': True/False}<endofrow>

Parsing Rules

RuleDescription
String valuesMust be enclosed in double quotes
Missing valuesSet to None or null
IndexMust match row index exactly
FormatMust be valid Python literal (dictionary)

Sources: datatune/core/dask/map_dask.py:20-30

Usage Examples

Example 1: Category Extraction

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("products.csv")

# Extract categories using natural language
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

# Save results
result = dt.finalize(mapped)
result.compute().to_csv("categorized_products.csv")

Sources: README.md

Example 2: Text-Based Filtering

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
df = dd.read_csv("companies.csv")

# Filter based on semantic criteria
filtered = dt.filter(
    prompt="Keep only organizations in Africa",
    input_fields=["Country", "Description"]
)(llm, df)

result = dt.finalize(filtered)

Example 3: With Deduplication

# Assuming clusters from semantic deduplication step
clusters = [
    {"canonical_id": 5, "duplicate_ids": [12, 23, 45]},
    {"canonical_id": 10, "duplicate_ids": [15, 20]}
]

mapped = dt.map(
    prompt="Classify the industry sector",
    output_fields=["Industry", "Sector"],
    input_fields=["CompanyName", "Description"],
    clusters=clusters
)(llm, df)

Internal Data Flow

Serialization Process

The backend converts DataFrame rows to dictionary format for LLM processing:

graph LR
    A[DataFrame Row] --> B[Convert to Dict]
    B --> C[Replace NaN with None]
    C --> D["String: {col1: val1, col2: val2}"]
    D --> E[LLM Prompt]

Implementation from map_dask.py:

df = df.astype(object).where(df.notna(), None)
df[serialized_input_column] = [
    str(row.to_dict()) for _, row in df.iterrows()
]

Sources: datatune/core/dask/map_dask.py:50-56

Error Handling

The backend includes robust error handling for LLM parsing failures:

def parse_llm_output(llm_output: Union[str, Exception]) -> Union[Dict, Exception]:
    """
    Parses the LLM output string into a Python dictionary.
    """
    # Returns dict on success, Exception on failure

Failed parsing attempts return empty dictionaries {}, ensuring the pipeline continues rather than crashing.

Backend Dispatch Mechanism

The system automatically dispatches to the Dask backend when a Dask DataFrame is detected:

graph TD
    A[Input Data] --> B{Is Dask DataFrame?}
    B -->|Yes| C[Dask Backend]
    B -->|No| D{Is Ibis Table?}
    D -->|Yes| E[Ibis Backend]
    D -->|No| F[TypeError]
    
    C --> G[_map_dask / _filter_dask]
    E --> H[_map_ibis / _filter_ibis]

Dispatch logic from core/map.py:

def _is_dask_df(obj):
    try:
        import dask.dataframe as dd
        return isinstance(obj, dd.DataFrame)
    except ImportError:
        return False

Sources: datatune/core/map.py:1-17

Configuration Considerations

Memory Efficiency

The Dask backend processes data partition by partition, making it suitable for datasets larger than memory. Consider:

SettingRecommendation
Dask partitionsSet based on available memory
Batch sizeAdjust based on LLM rate limits
ClustersUse for high-duplicate datasets

LLM Rate Limits

The backend implements batching to respect LLM API rate limits while maximizing throughput.

Summary

The Dask Backend provides a powerful, scalable mechanism for performing LLM-powered data transformations on Dask DataFrames. Key capabilities include:

  • Map: Generate new columns through semantic understanding
  • Filter: Remove rows based on natural language criteria
  • Deduplication: Reduce costs by processing only canonical rows
  • Lazy Evaluation: Maintain Dask's efficient computation model until finalization

All operations follow a consistent pattern: serialize data, send to LLM with clear instructions, parse responses, and update the DataFrame.

Sources: datatune/core/dask/map_dask.py:34-48

Ibis Backend

Related topics: Dask Backend, System Architecture

Section Related Pages

Continue reading this section for the full explanation and source context.

Section System Components

Continue reading this section for the full explanation and source context.

Section Component Responsibilities

Continue reading this section for the full explanation and source context.

Section addserializedcol

Continue reading this section for the full explanation and source context.

Related topics: Dask Backend, System Architecture

Ibis Backend

Overview

The Ibis Backend in datatune provides integration with the Ibis project, enabling LLM-powered data transformations on various database backends including DuckDB, PostgreSQL, BigQuery, and more. This backend extends datatune's core functionality (map, filter) to work with Ibis tables, leveraging the composability and SQL-generation capabilities of Ibis while incorporating LLM-based semantic transformations.

The Ibis Backend serves as a critical abstraction layer that:

  • Accepts Ibis table objects as input
  • Serializes table data for LLM consumption
  • Executes batch inference operations via LLM calls
  • Parses and applies LLM outputs back to the table structure
  • Returns transformed Ibis tables for continued processing

Sources: datatune/core/ibis/filter_ibis.py:1-20 Sources: datatune/core/ibis/map_ibis.py:1-25

Architecture

System Components

The Ibis Backend consists of several interconnected components that work together to provide seamless LLM-powered transformations:

graph TD
    A[Ibis Table Input] --> B[add_serialized_col]
    B --> C[llm_batch_inference]
    C --> D[LLM API Call]
    D --> E[Parse LLM Output]
    E --> F[apply_llm_updates / Filter Logic]
    F --> G[Transformed Ibis Table]
    
    H[_map_ibis Class] --> I[Map Operations]
    J[_filter_ibis Class] --> K[Filter Operations]

Component Responsibilities

ComponentFilePurpose
add_serialized_colfilter_ibis.py, map_ibis.pySerializes Ibis table columns into JSON strings for LLM input
llm_batch_inferencefilter_ibis.py, map_ibis.pyManages batch inference with LLM, handles retries and error cases
_map_ibismap_ibis.pyHandles mapping operations (creating new columns via LLM)
_filter_ibisfilter_ibis.pyHandles filtering operations (row selection via LLM)

Sources: datatune/core/ibis/filter_ibis.py:11-32 Sources: datatune/core/ibis/map_ibis.py:12-38

Core Functions

add_serialized_col

The add_serialized_col function transforms Ibis table columns into a serialized JSON format suitable for LLM processing. This function is fundamental to both map and filter operations.

Function Signature:

def add_serialized_col(table, target_col: str, input_fields: Optional[List[str]] = []):

Parameters:

ParameterTypeDescription
tableibis.TableThe input Ibis table
target_colstrName of the column to store serialized data
input_fieldsOptional[List[str]]List of column names to include in serialization. If empty, includes all columns except target

Behavior:

  1. Filters columns to include only relevant fields (excluding target column and non-selected fields)
  2. Constructs a JSON string expression by concatenating column names and values
  3. Returns a new Ibis table with the serialized column added via mutate()

Sources: datatune/core/ibis/map_ibis.py:40-53 Sources: datatune/core/ibis/filter_ibis.py:37-51

llm_batch_inference

The llm_batch_inference function manages the core LLM interaction for transforming data. It handles the complete workflow from prompt construction to result collection.

Function Signature:

def llm_batch_inference(
    table,
    llm: Callable, 
    input_col: str, 
    output_col: str,   
    prompt: str,
    expected_new_fields: list[str] = []
):

Parameters:

ParameterTypeDescription
tableibis.TableThe input Ibis table
llmCallableLLM callable (e.g., OpenAI, Ollama instance)
input_colstrColumn containing serialized input data
output_colstrColumn name for storing LLM output
promptstrTransformation prompt for the LLM
expected_new_fieldslist[str]Optional list of expected output field names

Workflow:

sequenceDiagram
    participant Table as Ibis Table
    participant Func as llm_batch_inference
    participant LLM as LLM API
    participant Parse as Output Parser
    
    Table->>Func: Execute table.select() to local DataFrame
    Func->>Table: Get input_list from input_col
    Func->>LLM: Call llm(input_list, prefix, prompt, suffix)
    LLM-->>Func: Raw LLM responses
    Func->>Parse: Process each response
    Parse-->>Func: Validated JSON strings
    Func->>Table: Create mapping DataFrame
    Table-->>Func: Updated table with output_col

Sources: datatune/core/ibis/map_ibis.py:55-82 Sources: datatune/core/ibis/filter_ibis.py:54-78

Map Operations

_map_ibis Class

The _map_ibis class implements the mapping transformation for Ibis tables, enabling the creation of new columns based on LLM-driven transformations.

Class Definition:

class _map_ibis:
    def __init__(
        self,
        prompt: str,
        input_fields: Optional[List[str]] = None,
        output_fields: Optional[List[str]] = None,
    ):
        self.prompt = prompt
        self.input_fields = input_fields or []
        self.output_fields = output_fields or []
        self.llm_output_column = "MAP_LLM_OUTPUT__DATATUNE__"
        self.serialized_input_column = "MAP_SERIALIZED_INPUT__DATATUNE__"

Key Attributes:

AttributeTypeDefaultDescription
promptstrRequiredNatural language prompt describing the transformation
input_fieldsOptional[List[str]][]Columns to use as input for transformation
output_fieldsOptional[List[str]][]Names of new columns to create
llm_output_columnstrSystem-generatedInternal column for LLM response storage
serialized_input_columnstrSystem-generatedInternal column for serialized input

Execution Flow:

  1. Validates that all input_fields exist in the table schema
  2. Creates serialized input column via add_serialized_col
  3. Executes batch inference via llm_batch_inference
  4. Applies LLM updates to create output columns
  5. Selects final columns (removing internal columns)

Sources: datatune/core/ibis/map_ibis.py:122-160

Prompt Format for Map

The LLM receives structured prompts for mapping operations:

Map and transform the input according to the mapping criteria below.
Replace or Create new fields or values as per the prompt.
Expected new fields: [output_fields]

MAPPING CRITERIA:
{prompt}

Your response MUST be the entire input record as a valid Python dictionary
in the format 'index=<row_index>|{key1: value1, key2: value2, ...}' with added keys
of expected new fields if any.

Sources: datatune/core/ibis/map_ibis.py:56-75

Filter Operations

_filter_ibis Class

The _filter_ibis class implements filtering transformations for Ibis tables, enabling row-level selection based on LLM-driven criteria.

Class Definition:

class _filter_ibis:
    def __init__(
        self,
        prompt: str,
        input_fields: Optional[List[str]] = None,
    ):
        self.prompt = prompt
        self.input_fields = input_fields or []
        self.llm_output_column = "FILTER_LLM_OUTPUT__DATATUNE__"
        self.serialized_input_column = "FILTER_SERIALIZED_INPUT__DATATUNE__"

Key Attributes:

AttributeTypeDefaultDescription
promptstrRequiredNatural language criteria for filtering
input_fieldsOptional[List[str]][]Columns to consider for filtering decision
llm_output_columnstrSystem-generatedInternal column for LLM response storage
serialized_input_columnstrSystem-generatedInternal column for serialized input

Execution Flow:

graph TD
    A[Input Ibis Table] --> B[Add Serialized Column]
    B --> C[Execute to Local DataFrame]
    C --> D[LLM Batch Inference]
    D --> E[Parse Filter Decisions]
    E --> F[Extract __filter__ Flag]
    F --> G[Apply Boolean Filter]
    G --> H[Final Ibis Table]

Sources: datatune/core/ibis/filter_ibis.py:80-120

Prompt Format for Filter

The LLM receives structured prompts for filtering operations:

You are filtering a dataset. Your task is to determine whether each data record
should be KEPT or REMOVED based on the filtering criteria below.
Return the entire input data record with an added key called '__filter__' with
value either True to KEEP the record or False to REMOVE it.

FILTERING CRITERIA:
{prompt}

Your response MUST be the entire input record as a Python dictionary in the format:
index=<row_index>|{key1: value1, key2: value2, ...}<endofrow> with added key called
'__filter__' with value either True to KEEP the record or False to REMOVE it.

Sources: datatune/core/ibis/filter_ibis.py:56-68

Output Parsing

parse_filter_output

The parse_filter_output function extracts boolean filter decisions from LLM responses.

Function Signature:

def parse_filter_output(
    output: Union[str, Exception], err: bool = True
) -> Optional[bool]:

Parsing Logic:

  1. Attempts to evaluate the LLM output as a Python literal using ast.literal_eval
  2. Extracts the last key-value pair from the dictionary
  3. Returns the boolean value associated with the __filter__ key
  4. Returns None if parsing fails

Sources: datatune/core/ibis/filter_ibis.py:104-130

Output Format Requirements

The Ibis Backend enforces strict output format requirements for LLM responses:

RequirementFormatExample
Index prefix`index=<row_index>\``index=0\`
Data formatPython dictionary literal{'col1': 'value1', 'col2': 'value2'}
String quotingDouble quotes only"string" not 'string'
Null valuesNone{'col': None}
Row terminator<endofrow>`index=0\{...}<endofrow>`

Sources: datatune/core/ibis/map_ibis.py:70-75 Sources: datatune/core/ibis/filter_ibis.py:64-67

Usage Examples

Basic Map Operation

import datatune as dt
from datatune.llm.llm import OpenAI
import ibis

# Connect to DuckDB
con = ibis.duckdb.connect("data.duckdb")
table = con.table("products")

# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# Create mapping operation
mapped = dt.map(
    prompt="Extract product category and subcategory from description",
    input_fields=["description", "name"],
    output_fields=["Category", "Subcategory"]
)(llm, table)

Basic Filter Operation

import datatune as dt
from datatune.llm.llm import Ollama

# Connect to PostgreSQL via Ibis
con = ibis.postgres.connect(...)
table = con.table("orders")

# Initialize local LLM
llm = Ollama(model_name="gemma3:4b")

# Create filter operation
filtered = dt.filter(
    prompt="Keep only orders from enterprise customers with value over $10,000",
    input_fields=["customer_type", "order_value"]
)(llm, table)

Error Handling

Schema Validation

The Ibis Backend performs input schema validation before processing:

def __call__(self, llm: Callable, table: Table) -> Table:
    if self.input_fields:
        missing = [f for f in self.input_fields if f not in table.columns]
        if missing:
            error_msg = (
                f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. "
                f"Available columns: {list(table.columns)}"
            )
            raise ValueError(error_msg)

Validation Rules:

CheckConditionAction on Failure
Input fields existinput_fields subset of table.columnsRaise ValueError
LLM call successNo exception from llm()Return error string
Output parsingValid Python literalReturn empty dict {}

Sources: datatune/core/ibis/map_ibis.py:130-140

Internal Column Naming

The Ibis Backend uses prefixed column names to avoid conflicts with user data:

OperationSerialized ColumnOutput Column
MapMAP_SERIALIZED_INPUT__DATATUNE__MAP_LLM_OUTPUT__DATATUNE__
FilterFILTER_SERIALIZED_INPUT__DATATUNE__FILTER_LLM_OUTPUT__DATATUNE__

These columns are automatically removed from the final output table, ensuring clean results.

Integration with Core API

The Ibis Backend is invoked through the unified datatune API via the filter.py module:

def filter(*, prompt, input_fields=None, clusters=None):
    def apply(llm, data):
        if _is_ibis_table(data):
            from .ibis.filter_ibis import _filter_ibis
            return _filter_ibis(
                prompt=prompt,
                input_fields=input_fields,
            )(llm, data)
        # ... other backends

Sources: datatune/core/filter.py:15-28

Supported Database Backends

The Ibis Backend supports any database backend compatible with Ibis:

BackendConnection MethodExample
DuckDBibis.duckdb.connect()Local analytical queries
PostgreSQLibis.postgres.connect()Enterprise data
BigQueryibis.bigquery.connect()Cloud data warehousing
SQLiteibis.sqlite.connect()Lightweight embedded DB
MySQLibis.mysql.connect()Web applications

Performance Considerations

Batch Processing

The Ibis Backend processes data in batches determined by:

  1. Local execution boundary: Data is materialized to a local DataFrame via .execute()
  2. LLM rate limits: Respects TPM (tokens per minute) and RPM (requests per minute) limits
  3. Memory constraints: Batch sizes are controlled by the underlying LLM integration

Optimization Strategies

StrategyDescriptionApplicable Operation
Select specific fieldsUse input_fields to minimize data transferMap, Filter
Pushdown predicatesFilter data in the database before LLM processingFilter
Column pruningRemove unused columns early in the pipelineMap

See Also

Sources: datatune/core/ibis/filter_ibis.py:1-20

LLM Integration

Related topics: Agent System, Map Operations, Filter Operations

Section Related Pages

Continue reading this section for the full explanation and source context.

Section LLM Base Class

Continue reading this section for the full explanation and source context.

Section Provider Implementations

Continue reading this section for the full explanation and source context.

Section Model Rate Limits Configuration

Continue reading this section for the full explanation and source context.

Related topics: Agent System, Map Operations, Filter Operations

LLM Integration

The LLM Integration module in datatune provides a unified abstraction layer for interfacing with various Large Language Model providers. It handles rate limiting, token management, request batching, and provider-specific configurations, enabling consistent data transformation operations across different LLM backends.

Architecture Overview

The LLM module uses a class hierarchy where a base LLM class provides common functionality, while provider-specific subclasses handle individual vendor implementations.

graph TD
    A[LLM Base Class] --> B[OpenAI]
    A --> C[Ollama]
    A --> D[VLLM]
    A --> E[Azure]
    A --> F[Mistral]
    A --> G[Huggingface]
    A --> H[Gemini]
    
    I[litellm] --> A
    J[Rate Limits] --> A
    K[Token Counter] --> A

Core Components

LLM Base Class

The LLM class serves as the foundation for all provider implementations. It manages rate limiting, token counting, and prompt batching.

#### Initialization Parameters

ParameterTypeDescriptionDefault
model_namestrFull model identifier (provider/model format)Required
rpmintRequests per minute limitProvider default
tpmintTokens per minute limitProvider default
max_tokensintMaximum tokens in responseProvider default
temperaturefloatSampling temperature0.0

#### Key Methods

MethodDescription
_completion(prompt)Execute a single completion request
_create_batched_prompts(input_rows, user_batch_prefix, prompt_per_row)Create optimized batched prompts
get_max_tokens()Retrieve max tokens for current model

Sources: datatune/llm/llm.py:40-90

Provider Implementations

#### OpenAI

from datatune.llm.llm import OpenAI

llm = OpenAI(
    model_name="gpt-3.5-turbo",
    api_key="your-api-key"
)

Configures models in the openai/model-name format for LiteLLM compatibility.

Sources: datatune/llm/llm.py:95-102

#### Ollama (Local)

from datatune.llm.llm import Ollama

llm = Ollama(
    model_name="gemma3:4b",
    api_base="http://localhost:11434"
)

Enables local model inference with Ollama. The api_base defaults to http://localhost:11434.

Sources: datatune/llm/llm.py:74-82

#### VLLM

from datatune.llm.llm import VLLM

llm = VLLM(
    model_name="your-model",
    api_base="http://localhost:8000/v1"
)

Queries the /models endpoint to automatically determine max_model_len from the server.

Sources: datatune/llm/llm.py:104-122

#### Azure

from datatune.llm.llm import Azure

llm = Azure(
    model_name="gpt-3.5-turbo",
    api_key="your-key",
    api_base="your-endpoint",
    api_version="2024-01-01"
)

Supports Azure OpenAI Service deployments with additional configuration options.

#### Mistral

from datatune.llm.llm import Mistral

llm = Mistral(
    model_name="mistral-tiny",
    api_key="your-api-key"
)

Wraps Mistral AI models with automatic mistral/ prefix handling.

#### Huggingface

from datatune.llm.llm import Huggingface

llm = Huggingface(
    model_name="meta-llama/Llama-2-70b",
    api_key="your-api-key"
)

Provides access to Hugging Face Inference API models.

Rate Limiting

The module enforces two types of rate limits to prevent API throttling:

Limit TypeDescriptionApplied At
TPMTokens per minutePer-request token counting
RPMRequests per minutePer-API-call counting

Model Rate Limits Configuration

Predefined limits for major models are stored in datatune/llm/model_rate_limits.py:

ModelTPMRPM
gpt-3.5-turbo200,000500
gpt-410,000500
gpt-4-turbo30,000500
gpt-4o30,000500
gpt-4.1-mini200,000500
gpt-4.1-nano200,000500
gpt-4.5-preview125,0001,000

Sources: datatune/llm/model_rate_limits.py:1-90

Fallback Behavior

When an unknown model is used, the system falls back to gpt-3.5-turbo limits and logs a warning:

if self._base_model_name in model_rate_limits:
    model_limits = model_rate_limits[self._base_model_name]
else:
    model_limits = model_rate_limits[DEFAULT_MODEL]
    logger.warning(
        f"REQUESTS-PER-MINUTE limits for model '{model_name}' not found. "
        f"Defaulting to '{DEFAULT_MODEL}' limits"
    )

Sources: datatune/llm/llm.py:45-57

Prompt Formatting

The LLM module enforces strict output format requirements for parsing reliability.

Output Format Specification

All responses must follow this pattern:

index=<row_index>|{python_literal}<endofrow>

Supported Python literals:

  • Lists: index=0|['item1', 'item2', 'item3']<endofrow>
  • Dicts: index=1|{'key1': 'value1', 'key2': 2}<endofrow>
  • Strings: index=2|"This is a string answer"<endofrow>

Prompt Components

ComponentPurpose
prefixInstructions for row processing
user_batch_prefixCustom user instructions
suffixOutput format requirements

Sources: datatune/llm/llm.py:1-20

Batch Processing

The batching system optimizes LLM usage by grouping multiple rows into single API requests while respecting token limits.

graph LR
    A[Input Rows] --> B[Token Counting]
    B --> C{Within Limits?}
    C -->|Yes| D[Batch Request]
    C -->|No| E[Split Batch]
    D --> F[LLM API]
    E --> D
    F --> G[Parse Output]
    G --> H[Row Results]

Batching Logic

  1. Calculate prefix/suffix token overhead using token_counter
  2. Iterate through input rows, accumulating tokens
  3. Split batches when approaching max_tokens limit
  4. Track nrows_per_api_call for output reconstruction

Sources: datatune/llm/llm.py:30-70

Integration with Core Modules

Filter Operations

The filter module uses the LLM to evaluate row-level boolean conditions:

def filter_rows(llm, df, input_column, prompt, output_column):
    prefix = "Row data:"
    suffix = "Your response MUST be... with added key '__filter__'"
    
    llm_out = llm(
        input_series,
        prefix,
        prompt,
        suffix,
        optimized=True
    )

Sources: datatune/core/dask/filter_dask.py:15-40

Map Operations

The map module leverages the LLM for column transformations:

def map_rows(llm, df, input_columns, prompt, output_columns):
    suffix = "Your response MUST be... with added keys of expected new fields"
    
    llm_out = llm(
        canonical_input,
        prefix,
        prompt,
        suffix,
        optimized=True
    )

Sources: datatune/core/dask/map_dask.py:20-45

Usage Example

import datatune as dt
from datatune.llm.llm import OpenAI

# Initialize LLM
llm = OpenAI(model_name="gpt-3.5-turbo")

# Use with datatune operations
df = dd.read_csv("products.csv")

mapped = dt.map(
    prompt="Extract categories from description",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description"]
)(llm, df)

filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Category"]
)(llm, mapped)

Module Exports

The datatune.llm package exports all provider classes:

from datatune.llm import *

# Available: LLM, OpenAI, Ollama, VLLM, Azure, Mistral, Huggingface

Sources: datatune/llm/__init__.py:1

Dependencies

LibraryPurpose
litellmUnified LLM API interface
token_counterToken estimation
get_max_tokensModel context limits

Sources: datatune/llm/llm.py:10-11

Sources: datatune/llm/llm.py:40-90

Agent System

Related topics: LLM Integration, Map Operations, Filter Operations

Section Related Pages

Continue reading this section for the full explanation and source context.

Section Agent Base Class

Continue reading this section for the full explanation and source context.

Section Agent Implementation

Continue reading this section for the full explanation and source context.

Section Plan Step Schema

Continue reading this section for the full explanation and source context.

Related topics: LLM Integration, Map Operations, Filter Operations

Agent System

The Agent System is the core intelligent orchestration layer in datatune that automatically determines, plans, and executes data transformation workflows based on natural language prompts. It abstracts the complexity of selecting between Dask operations and LLM-powered primitives (Map, Filter), allowing users to describe desired transformations in plain English while the system handles the implementation details.

Architecture Overview

The Agent System follows a planner-executor pattern where the LLM acts as the planner, generating a JSON-based execution plan, and the Agent class handles the runtime execution of that plan against Dask DataFrames.

graph TD
    A[User Natural Language Goal] --> B[Agent.do method]
    B --> C[LLM Planner]
    C --> D[JSON Execution Plan]
    D --> E[Agent._execute_plan]
    E --> F{Step Type}
    F -->|dask| G[Dask Operations]
    F -->|primitive| H[LLM Primitives<br/>Map/Filter]
    G --> I[Runtime Execution]
    H --> I
    I --> J[Result DataFrame]

Sources: datatune/agent/agent.py:1-50

Core Components

Agent Base Class

The Agent class serves as the abstract base for all agent implementations and provides the foundational system prompt that defines the agent's capabilities.

Sources: datatune/agent/__init__.py:1-50

ComponentDescription
system_promptDefines available libraries (pandas, numpy, dask, datatune) and example usage patterns
ABCAbstract base class pattern ensuring implementers provide core methods
abstractmethodDecorator marking do method as required implementation
from abc import ABC, abstractmethod

class Agent(ABC):
    system_prompt: str = """You are Datatune Agent, a powerful assistant designed to help users with data processing tasks.
    You are capable of generating python code to perform various operations on data. Apart from python builtins, you have the following libraries avaiable in your run time:
    - pandas
    - numpy
    - dask

    In addition to these, you also have access to the datatune libarary, which provides functionality for processing data using LLMs.
    ..."""

Agent Implementation

The concrete Agent class in datatune/agent/agent.py implements the planner-executor logic.

Sources: datatune/agent/agent.py:1-100

AttributeTypePurpose
llmLLMLanguage model used for planning and primitive operations
historyList[Dict]Execution history for debugging and state tracking
verboseboolEnable debug-level logging when True
TEMPLATEdictOperation templates for code generation
def __init__(self, llm: LLM, verbose: bool = False):
    self.llm = llm
    self.history: List[Dict[str, Any]] = []
    self.verbose = verbose
    if self.verbose:
        logger.setLevel(logging.DEBUG)
    else:
        logger.setLevel(logging.INFO)

Plan Structure

The Agent uses a JSON-based plan format where each step represents a single transformation operation. Plans are generated by the LLM based on user goals and the current DataFrame schema.

Sources: datatune/agent/agent.py:100-150

Plan Step Schema

FieldTypeRequiredDescription
typestringYesEither "dask" or "primitive"
operationstringYesOperation name within the step type
paramsdictNoParameters for Dask templates
subpromptstringNoLLM prompt for primitive operations
input_fieldslistNoInput column names for the operation
output_fieldslistNoOutput column names (Map only)

Step Types

Dask Operations execute server-side Python code using Dask's computational graph:

OperationDescription
add_columnCreate a new column from an expression
apply_functionApply an element-wise function to a column
rename_columnsRename columns using a mapping
astype_columnChange a column's data type
group_byGroup data and aggregate

Sources: datatune/agent/agent.py:50-100

Primitive Operations use the LLM for row-level transformations:

OperationDescriptionUse Case
MapCreate new columns via LLM transformationSemantic extraction, classification, interpretation
FilterRemove rows based on LLM criteriaComplex conditional filtering

Sources: datatune/core/dask/map_dask.py:1-50, datatune/core/dask/filter_dask.py:1-50

Plan Execution Flow

graph TD
    A[_execute_plan called] --> B[Set DataFrame in runtime]
    B --> C[Initialize code_lines list]
    C --> D{For each step in plan}
    D -->|Has steps| E[Format step template]
    E --> F[Log start message]
    F --> G[Execute map_partitions with log_primitive]
    G --> H[Increment step_num]
    H --> I[Execute operation template]
    I --> J[Log end message]
    J --> D
    D -->|No more steps| K[Call dt.finalize]
    K --> L[Compute DataFrame]
    L --> M[Return None, n_steps on success]
    
    N[Exception during execution] --> O[Return error, failed_step]

Sources: datatune/agent/agent.py:150-200

Execution Methods

#### _execute_plan(plan: List[Dict])

Executes a complete multi-step plan sequentially.

def _execute_plan(self, plan: List[Dict]):
    """Execute a sequence of plan steps (Dask or primitive)."""
    
    self._set_df(self.df)
    self.runtime.update({"step_num": 0, "plan": plan})
    code_lines = []
    n_steps = len(plan)
    
    for i, step in enumerate(plan, start=1):
        # ... step formatting and execution ...
    
    # Final execution
    full_code = (
        "\n".join(code_lines)
        + "\n\n"
        + "df = dt.finalize(df)\n"
        "df = df.compute()"
    )
    self.runtime.execute(full_code)
    return None, n_steps

#### _execute_step(step: Dict)

Executes a single step with detailed error handling.

def _execute_step(self, step: Dict):
    try:
        if step["type"] == "dask":
            template = self.TEMPLATE["dask"][step["operation"]].format(**step["params"])
            self.runtime.execute(template + "\n_ = df.head()")
        elif step["type"] == "primitive":
            template = self.TEMPLATE["primitive"][step["operation"]].format(**step["params"])
            self.runtime.execute(template)
        else:
            raise ValueError(f"Unknown step type: {step['type']}")
    except Exception as e:
        error_msg = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
        return error_msg

Sources: datatune/agent/agent.py:200-250

LLM Integration

The Agent integrates with the LLM system through datatune/llm/llm.py to generate execution plans and power primitive operations.

Sources: datatune/llm/llm.py:1-50

Supported LLM Providers

ProviderClassDefault Model
OpenAIOpenAIgpt-3.5-turbo
OllamaOllamagemma3:4b
AzureAzureConfigurable
vLLMVLLMConfigurable
class OpenAI(LLM):
    def __init__(
        self, model_name: str = "gpt-3.5-turbo", api_key: Optional[str] = None, **kwargs
    ):
        kwargs.update({"api_key": api_key})
        super().__init__(model_name=f"openai/{model_name}", **kwargs)

class Ollama(LLM):
    def __init__(
        self, model_name="gemma3:4b", api_base="http://localhost:11434", **kwargs
    ) -> None:
        super().__init__(
            model_name=f"ollama_chat/{model_name}", api_base=api_base, **kwargs
        )

Rate Limiting

The system includes model rate limits configuration to prevent API throttling:

ModelTPMRPM
gpt-3.5-turbo200,000500
gpt-4-turbo30,000500
gpt-4o30,000500

Sources: datatune/llm/model_rate_limits.py:1-50

Prompt Generation

The Agent constructs prompts dynamically based on the current state and goal.

Sources: datatune/agent/agent.py:250-300

Prompt Components

MethodPurpose
get_persona_prompt(goal)Generates the system instructions for plan generation
get_schema_prompt(df)Includes current DataFrame schema
get_error_prompt(error_msg, failed_step)Context for retry after failures
get_full_prompt(df, goal)Combines all components
def get_schema_prompt(self, df: dd.DataFrame) -> str:
    schema_prompt: str = f"""The current schema of the dataframe df is as follows:
    {df.dtypes.to_string()}
    """
    return schema_prompt

def get_error_prompt(self, error_msg: str, failed_step: Dict) -> str:
    error_prompt: str = f"""
    The previous code execution failed with the following error:
    Error: {error_msg}
    
    Failed step:
    {failed_step}
    
    Provide the json plan with the corrected step. Make sure to:
    1. Address the specific error mentioned above.
    2. DO NOT include any explanations or comments.
    """
    return error_prompt

Primitive Operations

Map Operation

The Map primitive uses the LLM to create new columns by applying transformations to each row.

Sources: datatune/core/dask/map_dask.py:1-80

# Example: Extract categories from description
mapped = dt.map(
    prompt="Extract categories from the description and name of product.",
    output_fields=["Category", "Subcategory"],
    input_fields=["Description", "Name"]
)(llm, df)

Output Format Requirements:

  • Each response must be: index=<index>|{key1: value1, key2: value2, ...}<endofrow>
  • Strings must use double quotes
  • Missing values should be null

Filter Operation

The Filter primitive uses the LLM to evaluate row-level conditions.

Sources: datatune/core/dask/filter_dask.py:1-80

# Example: Keep only electronics products
filtered = dt.filter(
    prompt="Keep only electronics products",
    input_fields=["Name"]
)(llm, mapped)

Decision Format:

  • Each response must include __filter__: True (keep) or __filter__: False (remove)

Error Handling

The Agent implements comprehensive error handling at multiple levels:

LevelHandlerAction
Step formattingValueErrorLog and return error with step number
Template lookupKeyErrorRaise with available operation names
Runtime executionExceptionCapture traceback, return formatted error
except Exception as e:
    step_num = self.runtime["step_num"]
    logger.error(f"❌ Step {step_num}/{n_steps} failed with error: {e}")
    return str(e), step_num - 1

Sources: datatune/logger.py:1-50

Usage Examples

Basic Agent Usage

import datatune as dt
from datatune.llm.llm import OpenAI
import dask.dataframe as dd

llm = OpenAI(model_name="gpt-3.5-turbo")
agent = dt.Agent(llm)

df = dd.read_csv("products.csv")
result = agent.do("Add ProfitMargin column and keep only African organizations", df)
result.compute().to_csv("african_products.csv")

Verbose Mode for Debugging

agent = dt.Agent(llm, verbose=True)
result = agent.do("Complex transformation task", df)

With verbose=True, the Agent sets logger to DEBUG level, outputting detailed execution information including step progress and intermediate states.

Chaining Operations

The Agent automatically chains multiple operations when a goal contains multiple tasks:

TASK: 1. create column a based on column x
      2. create column b based on column y

Combined into a single Map primitive step rather than separate steps.

Sources: datatune/agent/agent.py:1-50

Doramagic Pitfall Log

Source-linked risks stay visible on the manual page so the preview does not read like a recommendation.

medium README/documentation is current enough for a first validation pass.

The project should not be treated as fully validated until this signal is reviewed.

medium Maintainer activity is unknown

Users cannot judge support quality until recent activity, releases, and issue response are checked.

medium no_demo

The project may affect permissions, credentials, data exposure, or host boundaries.

medium No sandbox install has been executed yet; downstream must verify before user use.

The project may affect permissions, credentials, data exposure, or host boundaries.

Doramagic Pitfall Log

Doramagic extracted 7 source-linked risk signals. Review them before installing or handing real data to the project.

1. Capability assumption: README/documentation is current enough for a first validation pass.

  • Severity: medium
  • Finding: README/documentation is current enough for a first validation pass.
  • User impact: The project should not be treated as fully validated until this signal is reviewed.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: capability.assumptions | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | README/documentation is current enough for a first validation pass.

2. Maintenance risk: Maintainer activity is unknown

  • Severity: medium
  • Finding: Maintenance risk is backed by a source signal: Maintainer activity is unknown. Treat it as a review item until the current version is checked.
  • User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | last_activity_observed missing

3. Security or permission risk: no_demo

  • Severity: medium
  • Finding: no_demo
  • User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: downstream_validation.risk_items | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

4. Security or permission risk: No sandbox install has been executed yet; downstream must verify before user use.

  • Severity: medium
  • Finding: No sandbox install has been executed yet; downstream must verify before user use.
  • User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: risks.safety_notes | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | No sandbox install has been executed yet; downstream must verify before user use.

5. Security or permission risk: no_demo

  • Severity: medium
  • Finding: no_demo
  • User impact: The project may affect permissions, credentials, data exposure, or host boundaries.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: risks.scoring_risks | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | no_demo; severity=medium

6. Maintenance risk: issue_or_pr_quality=unknown

  • Severity: low
  • Finding: issue_or_pr_quality=unknown。
  • User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | issue_or_pr_quality=unknown

7. Maintenance risk: release_recency=unknown

  • Severity: low
  • Finding: release_recency=unknown。
  • User impact: Users cannot judge support quality until recent activity, releases, and issue response are checked.
  • Recommended check: Open the linked source, confirm whether it still applies to the current version, and keep the first run isolated.
  • Evidence: evidence.maintainer_signals | art_fcb598a7a7ed4b2482ca92474f06efe2 | https://github.com/vitalops/datatune#readme | release_recency=unknown

Source: Doramagic discovery, validation, and Project Pack records

Community Discussion Evidence

These external discussion links are review inputs, not standalone proof that the project is production-ready.

Sources 6

Count of project-level external discussion links exposed on this manual page.

Use Review before install

Open the linked issues or discussions before treating the pack as ready for your environment.

Community Discussion Evidence

Doramagic exposes project-level community discussion separately from official documentation. Review these links before using datatune with real data or production workflows.

Source: Project Pack community evidence and pitfall evidence