Doramagic Project Pack · Human Manual
milvus
The Quick Start Guide provides a streamlined path for new users to begin using Milvus, an open-source vector database optimized for AI applications. This guide covers essential operations ...
Milvus Introduction
Related topics: System Architecture, Quick Start Guide
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: System Architecture, Quick Start Guide
Milvus Introduction
Milvus is an open-source vector database designed for production-level AI applications. It enables efficient storage, indexing, and searching of dense vector embeddings at scale, supporting billions of vectors with millisecond query latency.
What is Milvus?
Milvus is a cloud-native, highly scalable vector database purpose-built for similarity search and AI-powered applications. The system handles vector embeddings—numerical representations of data such as images, text, audio, and video—and enables fast approximate nearest neighbor (ANN) queries across massive datasets.
Core Capabilities
| Capability | Description |
|---|---|
| Vector Similarity Search | Find semantically similar vectors using distance metrics (IP, L2, Hamming, Jaccard) |
| Hybrid Search | Combine vector search with scalar filtering on metadata fields |
| Multi-tenancy | Support thousands of collections with resource isolation |
| Time Travel | Query historical data states for audit and reproducibility |
| Horizontal Scalability | Scale data planes independently from coordination planes |
Source: tools/README.md:1-8
System Architecture
Milvus follows a distributed architecture designed for horizontal scalability and high availability. The system separates concerns between data management and coordination services.
graph TD
subgraph Client Layer
Python[Python SDK]
Go[Go SDK]
Java[Java SDK]
Node[Node.js SDK]
end
subgraph Coordination Layer
RootCoord[Root Coordinator]
Proxy[Proxy Service]
IndexCoord[Index Coordinator]
QueryCoord[Query Coordinator]
DataCoord[Data Coordinator]
end
subgraph Data Layer
QueryNode[Query Nodes]
DataNode[Data Nodes]
IndexNode[Index Nodes]
end
subgraph Storage Layer
RocksMQ[RocksMQ / Pulsar / Kafka]
MetaStore[(Metadata Store)]
ObjectStorage[(Object Storage)]
end
Python & Go & Java & Node --> Proxy
Proxy --> RootCoord
Proxy --> QueryCoord
Proxy --> DataCoord
RootCoord --> MetaStore
QueryCoord --> QueryNode
DataCoord --> DataNode
IndexCoord --> IndexNode
QueryNode & DataNode --> RocksMQ
QueryNode & DataNode --> ObjectStorageKey Components
Proxy Service The entry point for all client requests. It validates, transforms, and routes requests to appropriate coordinator services.
Root Coordinator Manages collection and partition metadata, handles DDL operations, and coordinates timestamps for distributed transactions.
Query Coordinator Manages query node resources and load balancing. Tracks which segments are loaded into memory for fast querying.
Data Coordinator Manages data node resources, handles segment compaction, and coordinates data persistence to object storage.
Index Coordinator Manages index building tasks across index nodes. Supports multiple index types including IVF, HNSW, DiskANN, and ANNOY.
Source: internal/streamingcoord/server/service/assignment.go:1-25
Message Queue Integration
Milvus supports multiple message queue backends for storing write-ahead logs and enabling reliable message delivery.
Supported MQ Types
| MQ Type | Description | Use Case |
|---|---|---|
| RocksMQ | Built-in RockDB-based message queue | Standalone deployments |
| Pulsar | Apache Pulsar for distributed messaging | Production clusters |
| Kafka | Apache Kafka for event streaming | Integration with Kafka ecosystems |
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-30
RocksMQ Implementation
RocksMQ provides an embedded message queue solution using RocksDB for persistent storage. It implements a consumer group model with the following characteristics:
graph LR
Producer[Producer] --> Topic[Topic]
Topic --> Page1[Page 1]
Topic --> Page2[Page 2]
Topic --> PageN[Page N]
subgraph Consumer Groups
CG1[Consumer Group A]
CG2[Consumer Group B]
end
Page1 & Page2 & PageN --> CG1
Page1 & Page2 & PageN --> CG2Consumer Management
- Each consumer group tracks its own consumption position
- Message acknowledgment is tracked per consumer group
- Retention policies clean up acknowledged messages based on size or time thresholds
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:1-60
Retention Configuration
RocksMQ supports configurable retention policies:
| Parameter | Description | Default |
|---|---|---|
RetentionSizeInMB | Minimum acknowledged size before cleanup | -1 (disabled) |
RetentionTimeInMinutes | Minimum time before acknowledged messages are eligible | -1 (disabled) |
When both are set to -1, retention is disabled and messages are only removed when explicitly acknowledged.
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:45-50
SDK Support
Milvus provides official SDKs for multiple programming languages, all maintained under the same version cadence as the core server.
Available SDKs
| SDK | Latest Version | Status |
|---|---|---|
| Python | 2.6.14 | Production |
| Go | 2.6.5 | Production |
| Java | 2.6.20 | Production |
| Node.js | 2.6.14 | Production |
Go SDK (client/v2)
The Go SDK provides a high-performance client library for Milvus:
import "github.com/milvus-io/milvus/client/v2/milvusclient"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: milvusAddr,
})
Prerequisites: Go 1.24.12 or higher
Key Features:
- Struct-array support with vector sub-fields
- Nullable vector columns (dense, binary, sparse, int8)
- Array field partial updates (APPEND, REMOVE)
- Custom gRPC DialOptions with preserved default settings
Source: client/README.md:1-35
Recent SDK Features (v2.6.x)
| Version | Feature |
|---|---|
| v2.6.5 | Nullable vector columns, Array field helpers |
| v2.6.4 | Struct-array support, EmbeddingList/MAX_SIM search |
| v2.6.3 | TruncateCollection API, GetReplicateConfiguration API |
Deployment Modes
Milvus supports three deployment configurations based on scale and requirements:
Standalone Mode
Single-node deployment with all components running in one process. Uses RocksMQ by default.
┌─────────────────────────────────────┐
│ Milvus Standalone │
│ ┌─────────────────────────────┐ │
│ │ Proxy + All Coordinators │ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ Query + Data + Index Nodes │ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ RocksMQ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
Cluster Mode
Distributed deployment with separate coordinator services and worker nodes.
Distributed Mode
Multi-node cluster with separate storage backends (Pulsar/Kafka, S3/MinIO, etcd).
Source: tools/README.md:1-15
Logging Architecture
Milvus uses a context-aware logging library (mlog) built on zap for distributed system observability.
Design Principles
- Mandatory Context Passing - All logging operations require a context, ensuring request traceability
- Zero-Overhead Abstraction - Uses type aliases for performance comparable to direct zap usage
- Automatic Field Accumulation - Context fields automatically propagate through call chains
- Lazy Encoding - Deferred field encoding avoids overhead when log level is disabled
Key Log Fields
| Field Function | Type | Purpose |
|---|---|---|
FieldCollectionID(val) | int64 | Collection identifier |
FieldSegmentID(val) | int64 | Segment identifier |
FieldPChannel(val) | string | Physical channel name |
FieldMessageID(val) | ObjectMarshaler | Message identifier |
Source: pkg/mlog/README.md:1-30
Streaming Coordination
The streaming coordination service manages physical channel assignments across streaming nodes.
PChannel States
stateDiagram-v2
[*] --> UNASSIGNED
UNASSIGNED --> ASSIGNING
ASSIGNING --> ASSIGNED
ASSIGNED --> ASSIGNING : Rebalancing
ASSIGNING --> UNASSIGNED : Failure
ASSIGNED --> [*]State Definitions:
UNASSIGNED: Channel not assigned to any nodeASSIGNING: Transition state during assignmentASSIGNED: Active on a streaming node
Assignment Discovery
The AssignmentService watches and broadcasts channel state changes:
func (s *assignmentServiceImpl) AssignmentDiscover(
server streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverServer
) error {
s.listenerTotal.Inc()
defer s.listenerTotal.Dec()
balancer, err := balance.GetWithContext(server.Context())
if err != nil {
return err
}
// ... assignment streaming
}
Source: internal/streamingcoord/server/service/assignment.go:25-45
Channel History Tracking
Each physical channel maintains an assignment history for debugging and load balancing decisions:
func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
for _, h := range c.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: c.inner.GetChannel().GetName(),
Term: h.Term,
AccessMode: types.AccessMode(h.AccessMode),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
return history
}
Source: internal/streamingcoord/server/balancer/channel/pchannel.go:1-25
Supported Index Types
Milvus supports multiple indexing algorithms optimized for different use cases:
| Index Type | Description | Best For |
|---|---|---|
| FLAT | Brute-force search | Small datasets, accuracy-critical |
| IVF_FLAT | Inverted file index | Balanced speed/accuracy |
| IVF_SQ8 | Scalar quantized IVF | Memory-constrained |
| HNSW | Hierarchical NSW | Fast queries, higher memory |
| DiskANN | Disk-based ANN | Billion-scale datasets |
| ANNOY | Approximate nearest neighbors | Memory-efficient |
Recent Releases
Latest Stable Release (v2.6.17)
- Release date: May 22, 2026
- Python SDK: 2.6.14, Go SDK: 2.6.4, Java SDK: 2.6.20, Node.js SDK: 2.6.14
Security Release (v2.5.27)
- Release date: February 27, 2026
- Critical security patches (CVE fixes)
Feature Highlights by Version
| Version | Key Features |
|---|---|
| v2.6.17 | Performance improvements, bug fixes |
| v2.6.15 | Gemini embedding model support |
| v2.6.14 | Enhanced index building |
| v2.6.13 | Streaming improvements |
| v2.6.12 | Query optimization |
Community Considerations
Based on community feedback and engagement, users have expressed interest in:
- String Field Support - Extending field types beyond numerical values
- Flexible Schema Modification - Adding fields to existing non-empty collections
- Backup and Restore - Native backup mechanisms for production deployments
- String IDs - Primary key support beyond integer types
- ScaNN Index - Google's ScaNN algorithm for improved ANN performance
These feature requests reflect common production requirements that influence how developers design their AI applications around Milvus.
Next Steps
- Installation Guide - Deploy Milvus in your environment
- SDK Documentation - Get started with your preferred language
- Collection Schema - Design schemas for your vectors
- Indexing Reference - Choose the right index for your use case
Source: https://github.com/milvus-io/milvus / Human Manual
Quick Start Guide
Related topics: Milvus Introduction, Go SDK (client/v2)
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Milvus Introduction, Go SDK (client/v2)
Quick Start Guide
Overview
The Quick Start Guide provides a streamlined path for new users to begin using Milvus, an open-source vector database optimized for AI applications. This guide covers essential operations including installation, connecting to Milvus, creating collections with schemas, inserting vectors, and performing similarity searches.
The guide is designed to help developers get from zero to their first vector search in under 15 minutes using the official SDKs.
Prerequisites
| Requirement | Minimum Version | Recommended |
|---|---|---|
| Python | 3.8+ | 3.10+ |
| Java | JDK 8+ | JDK 11+ |
| Go | 1.18+ | 1.21+ |
| Node.js | 14+ | 18+ |
| Docker | 20.10+ | Latest |
| Milvus Server | 2.3+ | 2.6.x |
Installation
Standalone Mode (Single Node)
For local development and testing, Milvus standalone can be started using Docker Compose:
# Download configuration file
wget https://github.com/milvus-io/milvus/releases/download/v2.6.17/milvus-standalone-docker-compose.yml
mv milvus-standalone-docker-compose.yml docker-compose.yml
# Start Milvus
docker-compose up -d
The standalone instance will be available at localhost:19530.
Cluster Mode (Distributed)
For production deployments requiring high availability and scalability, deploy Milvus in cluster mode using Kubernetes or docker-compose with multiple nodes.
SDK Installation
Python SDK (pymilvus):
pip install pymilvus>=2.6.0
Go SDK (client/v2):
go get github.com/milvus-io/milvus-go-sdk/v2@latest
Core Concepts
Collections
A collection is the primary data organization unit in Milvus, containing structured data with vector fields and scalar fields. Collections can be created with predefined schemas or dynamically with flexible fields.
Schemas
Each collection requires a schema that defines:
- Primary Key Field: Unique identifier for each entity (Int64 or VarChar)
- Vector Field: Dense or sparse vector data for similarity search
- Scalar Fields: Optional metadata fields (Int, Float, Boolean, String, etc.)
Index Types
Milvus supports multiple index types optimized for different search requirements:
| Index Type | Use Case | Memory Efficiency |
|---|---|---|
| IVF_FLAT | Balanced accuracy/speed | Medium |
| IVF_SQ8 | Reduced memory | High |
| HNSW | Fastest search | High |
| DISKANN | Billion-scale datasets | Very High |
Connection and Authentication
Basic Connection
from pymilvus import MilvusClient
# Connect to local Milvus
client = MilvusClient(uri="http://localhost:19530")
Authenticated Connection
from pymilvus import MilvusClient
# Connect with authentication
client = MilvusClient(
uri="http://localhost:19530",
token="user:password"
)
Connection with TLS
from pymilvus import MilvusClient
client = MilvusClient(
uri="https://localhost:19530",
secure=True,
tls_ca_cert="path/to/ca.crt"
)
Collection Operations
Creating a Collection
from pymilvus import MilvusClient, DataType
client = MilvusClient()
# Define schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
)
# Add primary key field
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
# Add vector field
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128)
# Add scalar fields
schema.add_field(field_name="color", datatype=DataType.VARCHAR, max_length=100)
# Create collection with index parameters
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="COSINE"
)
client.create_collection(
collection_name="quick_start",
schema=schema,
index_params=index_params
)
Inserting Data
import random
# Generate sample data
data = [
{"id": i, "vector": [random.random() for _ in range(128)], "color": "red"}
for i in range(1000)
]
# Insert data
result = client.insert(
collection_name="quick_start",
data=data
)
print(f"Inserted {result['insert_count']} entities")
Searching Data
# Prepare query vector
query_vector = [[random.random() for _ in range(128)]]
# Search for similar vectors
results = client.search(
collection_name="quick_start",
data=query_vector,
limit=10,
output_fields=["id", "color"]
)
# Process results
for result in results:
print(f"ID: {result['id']}, Distance: {result['distance']}")
Data Flow Architecture
graph TD
A[Client Application] -->|SDK API| B[Milvus Server]
B --> C[Proxy Layer]
C --> D[Query Node]
C --> E[Data Node]
D --> F[Object Storage]
E --> F
G[Index files] --> F
H[Segment files] --> FGo SDK Quick Start
package main
import (
"context"
"fmt"
milvusclient "github.com/milvus-io/milvus-go-sdk/v2/client"
)
func main() {
ctx := context.Background()
// Connect to Milvus
cli, err := milvusclient.NewGrpcClient(ctx, "localhost:19530")
if err != nil {
panic(err)
}
defer cli.Close()
// Create collection option
opt := milvusclient.NewCreateCollectionOption("my_collection", 128)
// Create collection
err = cli.CreateCollection(ctx, opt)
if err != nil {
panic(err)
}
fmt.Println("Collection created successfully")
}
Common Patterns
Batch Operations
For high-throughput scenarios, batch inserts in groups of 1000-5000 entities for optimal performance:
# Batch insert configuration
BATCH_SIZE = 1000
for batch_start in range(0, total_count, BATCH_SIZE):
batch_end = min(batch_start + BATCH_SIZE, total_count)
batch_data = generate_vectors(batch_start, batch_end)
client.insert(
collection_name="quick_start",
data=batch_data
)
Search with Filters
Apply metadata filters during similarity searches:
results = client.search(
collection_name="quick_start",
data=[query_vector],
filter="color == 'red'",
limit=10
)
Troubleshooting
| Issue | Cause | Solution |
|---|---|---|
| Connection refused | Milvus not running | Verify docker container is running |
| Authentication failed | Invalid credentials | Check username/password configuration |
| Index not found | Search before index creation | Wait for index build to complete |
| Out of memory | Insufficient memory | Reduce batch size or increase system RAM |
Next Steps
- Schema Design: Review collection schema best practices for your use case
- Index Tuning: Experiment with different index types for optimal performance
- Partitioning: Use partitions to improve query performance on large datasets
- Backup/Restore: Implement backup strategies for production deployments
See Also
Source: https://github.com/milvus-io/milvus / Human Manual
System Architecture
Related topics: Coordinator Services, Data Storage Layer, Data Ingestion and Flow
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Coordinator Services, Data Storage Layer, Data Ingestion and Flow
System Architecture
Overview
Milvus is a cloud-native, highly scalable vector database designed for vector similarity search and AI applications. The system architecture follows a distributed design pattern that separates concerns between message queuing, storage coordination, data processing, and query execution.
The architecture is built around several core subsystems:
- Message Queue Layer: RocksMQ, Pulsar, or Kafka for event streaming
- Coordination Layer: Root Coordinator and Proxy components for cluster management
- Storage Layer: Distributed storage with retention policies
- Query Layer: Query nodes for vector search execution
- Logging Infrastructure: Context-aware structured logging
Core Components
Message Queue (RocksMQ)
RocksMQ is Milvus's default message queue implementation built on RocksDB. It provides persistent message storage with consumer group management and retention capabilities.
#### Topic Management
Topics in RocksMQ are identified by unique keys stored in RocksDB. The topic identifier follows a naming convention where topic names cannot contain forward slashes (/).
// Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-30
topicIDKey := TopicIDTitle + topicName
Topic creation validation:
- Checks for existing topics to prevent duplicates
- Validates topic name does not contain reserved characters
- Uses mutex locks to ensure thread-safe creation
- Stores topic metadata with initialization markers
#### Consumer Group Management
Consumer groups are managed through the consumerList structure, which maintains a map of consumers by group name:
// Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60
type consumerList struct {
consumers map[string]*Consumer
mu sync.RWMutex
}
Key operations include:
| Operation | Description |
|---|---|
Add(consumer) | Register a new consumer in the group |
Remove(groupName) | Unregister a consumer |
Get(groupName) | Retrieve consumer by group name |
Notify(groupName) | Signal consumer for message processing |
#### Page-Based Message Storage
Messages are stored in fixed-size pages to enable efficient retrieval and cleanup:
// Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:80-120
func (rmq *rocksmq) updateTopicPageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error
The page management system:
- Tracks page boundaries using message IDs
- Records message timestamps for retention decisions
- Uses a mutation buffer for atomic page updates
- Supports configurable page size (default: 64MB)
Retention System
The retention system automatically cleans up old messages based on size and time policies:
// Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-38
func checkRetention() bool {
params := paramtable.Get()
return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 ||
params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
Retention policies are evaluated by:
- Scanning acknowledged message pages
- Checking message age against
RetentionTimeInMinutes - Checking cumulative size against
RetentionSizeInMB - Deleting expired pages using range deletes
#### Page Deletion Implementation
// Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:1-30
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
startKey := path.Join(topic, strconv.FormatInt(startID, 10))
endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
// ...
}
Streaming Coordination
The streaming coordination system manages persistent channels (PChannels) that distribute write operations across the cluster.
#### PChannel Meta Management
PChannels represent logical channels for data streaming, with state tracking and assignment history:
// Source: internal/streamingcoord/server/balancer/channel/pchannel.go:1-50
type PChannelMeta struct {
inner *streamingpb.PChannelMeta
}
Key PChannel states:
| State | Description |
|---|---|
PCHANNEL_META_STATE_ASSIGNING | Channel is being assigned to a node |
PCHANNEL_META_STATE_ASSIGNED | Channel is actively assigned |
| Other states | Reserved for future use |
#### Channel Assignment History
The system maintains assignment history for debugging and load balancing:
// Source: internal/streamingcoord/server/balancer/channel/pchannel.go:60-80
func (c *PChannelMeta) GetChannelHistory() []types.PChannelInfoAssigned {
history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
for _, h := range c.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: c.inner.GetChannel().GetName(),
Term: h.Term,
AccessMode: types.AccessMode(h.AccessMode),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
return history
}
#### State Query Methods
// Source: internal/streamingcoord/server/balancer/channel/pchannel.go:80-100
func (c *PChannelMeta) IsAssigned() bool {
return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
func (c *PChannelMeta) IsAssignedOrAssigning() bool {
return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED ||
c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING
}
Load Balancing Policy
The vChannel fair policy distributes channels across streaming nodes while maintaining balance:
// Source: internal/streamingcoord/server/balancer/policy/vchannelfair/expected_layout.go:1-30
type expectedLayoutForVChannelFairPolicy struct {
Config policyConfig
CurrentLayout balancer.CurrentLayout
AveragePChannelPerNode float64
AverageVChannelPerNode float64
Assignments map[types.ChannelID]types.PChannelInfoAssigned
Nodes nodes
}
#### Assignment Snapshot
// Source: internal/streamingcoord/server/balancer/policy/vchannelfair/expected_layout.go:50-70
type assignmentSnapshot struct {
Assignments map[types.ChannelID]types.PChannelInfoAssigned
GlobalUnbalancedScore float64
}
func (s *assignmentSnapshot) Clone() assignmentSnapshot {
assignments := make(map[types.ChannelID]types.PChannelInfoAssigned, len(s.Assignments))
for channelID, assignment := range s.Assignments {
assignments[channelID] = assignment
}
return assignmentSnapshot{
Assignments: assignments,
GlobalUnbalancedScore: s.GlobalUnbalancedScore,
}
}
Logging Infrastructure
Milvus uses a context-aware logging library (mlog) built on zap for structured logging:
// Source: pkg/mlog/README.md
#### Design Principles
| Principle | Implementation |
|---|---|
| Mandatory Context | All logging operations require context parameter |
| Zero Overhead | Type aliases avoid wrapper overhead |
| Field Accumulation | Context fields automatically propagate |
| Cross-Service | gRPC metadata for distributed tracing |
| Lazy Encoding | WithLazy defers field encoding |
#### Logger Performance Optimization
When logging, the system compares pre-encoded field counts between context and component loggers:
Component Logger: 2 fields (module, nodeID) - pre-encoded
ctx logger: 5 fields (traceID, spanID, ...) - pre-encoded
→ Select ctx logger as base, only need to encode component's 2 fields
→ Faster than using component logger and encoding 5 ctx fields
#### Field Management
// Add fields to context (fields accumulate)
ctx = mlog.WithFields(ctx, mlog.String("request_id", "abc123"))
ctx = mlog.WithFields(ctx, mlog.Int64("user_id", 42))
// Subsequent logs automatically include these fields
mlog.Info(ctx, "processing request")
Architecture Flow
Message Production and Consumption
graph TD
A[Client Request] --> B[Proxy Layer]
B --> C[Topic Selection]
C --> D{RocksMQ Topic}
D --> E[Page Buffer]
E --> F[RocksDB Write]
F --> G[Consumer Groups]
G --> H[Message Retrieval]
H --> I[Acknowledgment]
I --> J[Retention Cleanup]PChannel Assignment Flow
graph TD
A[New PChannel Request] --> B[Check Current State]
B --> C{Node Available?}
C -->|Yes| D[Assign to Node]
C -->|No| E[Queue Assignment]
D --> F[Update Assignment History]
F --> G[Notify Streaming Node]
E --> B
G --> H[Begin Streaming]Retention Decision Flow
graph TD
A[Retention Timer] --> B[Scan Acknowledged Pages]
B --> C[Check Time Policy]
C --> D{Time Expired?}
D -->|Yes| E[Mark for Deletion]
D -->|No| F[Check Size Policy]
F --> G{Size Exceeded?}
G -->|Yes| E
G -->|No| H[Continue Monitoring]
E --> I[Delete Page Range]
I --> J[Update Metadata]Development Tools
Milvus provides development utilities for consistent commit workflows:
# Commit with AI-generated messages
python3 tools/mgit.py
# Create PR with automatic issue linking
python3 tools/mgit.py --pr
# Complete workflow (commit + PR)
python3 tools/mgit.py --all
Commit Message Format
type: Brief description
Options:
- fix: Bug fixes
- feat: New features
- enhance: Improvements
- refactor: Code refactoring
- test: Test modifications
- docs: Documentation updates
- chore: Build/tool changes
Related Documentation
Source: https://github.com/milvus-io/milvus / Human Manual
Coordinator Services
Related topics: System Architecture, Data Storage Layer
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: System Architecture, Data Storage Layer
Coordinator Services
Coordinator Services are centralized management components in Milvus that orchestrate distributed operations across the cluster. These services handle metadata management, data coordination, query distribution, and streaming coordination, acting as the brain of the Milvus distributed system.
Architecture Overview
Milvus employs a coordinator-based architecture where specialized coordinator services manage different aspects of the system:
graph TD
subgraph "Coordinator Layer"
RC[Root Coord]
DC[Data Coord]
QC[Query Coord]
SC[Streaming Coord]
end
subgraph "Worker Nodes"
DN[Data Node]
QN[Query Node]
PN[Proxy Node]
SN[Streaming Node]
end
subgraph "Storage Layer"
ETCD[etcd]
MSG[Message Queue]
OBJ[Object Storage]
end
RC --> ETCD
DC --> ETCD
QC --> ETCD
SC --> MSG
DC --> DN
QC --> QN
SC --> SN
DN --> OBJ
QN --> OBJCore Coordinator Components
Root Coordinator (RootCoord)
The Root Coordinator is the central authority for metadata management and cluster-level operations.
Key Responsibilities:
- Collection and partition management
- Schema enforcement and validation
- Timestamp allocation for distributed transactions
- Database and user management
Location: internal/rootcoord/root_coord.go
Data Coordinator (DataCoord)
The Data Coordinator manages data node coordination and segment lifecycle.
Key Responsibilities:
- Segment assignment and load balancing across data nodes
- Segment compaction coordination
- Index building task distribution
- Data node health monitoring
Location: internal/datacoord/server.go
Services Interface: internal/datacoord/services.go
Query Coordinator (QueryCoord)
The Query Coordinator orchestrates query operations across query nodes.
Key Responsibilities:
- Query collection shard loading/unloading
- Query load balancing
- Search and query request routing
- Query node cluster management
Location: internal/querycoordv2/server.go
Services Interface: internal/querycoordv2/services.go
Streaming Coordinator (StreamingCoord)
The Streaming Coordinator manages streaming channels and real-time data ingestion.
Location: internal/streamingcoord/server/
PChannel Metadata Management
Physical channels (PChannels) represent the fundamental unit of streaming coordination. The PChannel metadata structure tracks channel state and assignment history.
// Source: internal/streamingcoord/server/balancer/channel/pchannel.go
type PChannelMeta struct {
inner *streamingpb.PChannelMeta
}
PChannel States:
| State | Description |
|---|---|
PCHANNEL_META_STATE_UNASSIGNED | Channel not assigned to any node |
PCHANNEL_META_STATE_ASSIGNING | Channel assignment in progress |
PCHANNEL_META_STATE_ASSIGNED | Channel successfully assigned to a node |
Key Methods:
| Method | Purpose |
|---|---|
IsAssigned() | Returns true if channel is assigned to a server |
IsAssignedOrAssigning() | Returns true if channel is in active state |
LastAssignTimestamp() | Returns the last assigned timestamp |
State() | Returns the current channel state |
CopyForWrite() | Returns mutable copy for modification |
History Tracking: PChannel metadata maintains assignment history for debugging and load balancing decisions:
func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
for _, h := range c.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: c.inner.GetChannel().GetName(),
Term: h.Term,
AccessMode: types.AccessMode(h.AccessMode),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
return history
}
Broadcast Manager
The Broadcast Manager handles coordinated broadcasting of operations across the cluster, ensuring consistent state updates.
Location: internal/streamingcoord/server/broadcaster/broadcast_manager.go
Resource Key-Based Locking
Broadcast operations use resource keys to coordinate access to shared resources:
type broadcastTaskManager struct {
resourceKeyLocker ResourceKeyLocker
ackScheduler *ackCallbackScheduler
}
API Methods:
| Method | Purpose |
|---|---|
WithResourceKeys(ctx, ...ResourceKey) | Acquires resource keys for broadcast task |
WithSecondaryClusterResourceKey(ctx) | Acquires exclusive cluster-level key for secondary operations |
Resource Key Acquisition Flow
sequenceDiagram
participant Client
participant BM as BroadcastManager
participant Locker as ResourceKeyLocker
participant IDAlloc as IDAllocator
Client->>BM: WithResourceKeys(ctx, keys...)
BM->>BM: startLockInstant = time.Now()
BM->>BM: Append shared cluster resource keys
BM->>Locker: Lock(resourceKeys...)
Locker-->>BM: Return guards
BM->>IDAlloc: Allocate(ctx)
IDAlloc-->>BM: Return broadcastID
BM->>BM: Check cluster role
BM-->>Client: Return broadcasterWithRKLock Acquisition Process:
func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKeys ...message.ResourceKey) (BroadcastAPI, error) {
startLockInstant := time.Now()
resourceKeys = bm.appendSharedClusterRK(resourceKeys...)
guards := bm.resourceKeyLocker.Lock(resourceKeys...)
id, err := resource.Resource().IDAllocator().Allocate(ctx)
if err != nil {
guards.Unlock()
return nil, errors.Wrapf(err, "allocate new id failed")
}
if err := bm.checkClusterRole(ctx); err != nil {
guards.Unlock()
return nil, err
}
bm.metrics.ObserveAcquireLockDuration(startLockInstant, guards.ResourceKeys())
return &broadcasterWithRK{
broadcaster: bm,
broadcastID: id,
guards: guards,
}, nil
}
Secondary Cluster Operations
Force promote operations require secondary cluster verification:
func (bm *broadcastTaskManager) WithSecondaryClusterResourceKey(ctx context.Context) (BroadcastAPI, error) {
id, err := resource.Resource().IDAllocator().Allocate(ctx)
if err != nil {
return nil, errors.Wrapf(err, "allocate new id failed")
}
startLockInstant := time.Now()
// Acquire an exclusive cluster resource key to block
// ...
}
Message Queue Integration
Coordinator services integrate with message queues for asynchronous communication and event-driven coordination.
RocksMQ Implementation
RocksMQ is a built-in message queue option for Milvus standalone deployments.
Location: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
Key Data Structures:
type consumerList struct {
consumers map[string]*Consumer // GroupName -> *Consumer
mu sync.RWMutex
}
Topic Management: Topics are identified by name with restrictions on character content:
func (rmq *rocksmq) CreateTopic(topicName string) error {
// Check if topicName contains "/"
if strings.Contains(topicName, "/") {
return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
}
// ...
}
Message Retention
RocksMQ implements retention policies to manage storage.
Location: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go
Retention Conditions:
| Condition | Configuration | Default |
|---|---|---|
| Size-based | RocksmqCfg.RetentionSizeInMB | -1 (disabled) |
| Time-based | RocksmqCfg.RetentionTimeInMinutes | -1 (disabled) |
Retention Check Function:
func checkRetention() bool {
params := paramtable.Get()
return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 ||
params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
Retention Process:
- Iterate through acked pages
- Check timestamp expiration per page
- Calculate total deleted size
- Clean expired data when conditions are met
func (ri *retentionInfo) cleanData(topic string, pageEndID int64) error {
// ...
ll, ok := topicMu.Load(topic)
if !ok {
return fmt.Errorf("topic name = %s not exist", topic)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return fmt.Errorf("get mutex failed, topic name = %s", topic)
}
lock.Lock()
defer lock.Unlock()
err := DeleteMessages(ri.db, topic, 0, pageEndID)
// ...
}
Context-Aware Logging
Coordinator services use the mlog package for context-aware logging.
Location: pkg/mlog/README.md
Design Principles
- Mandatory Context Passing - All logging requires a context for request traceability
- Zero-Overhead Abstraction - Type aliases avoid wrapper overhead
- Automatic Field Accumulation - Context fields propagate through call chains
- Lazy Encoding - Deferred field encoding when log level is disabled
Logging API
| Method | Purpose |
|---|---|
mlog.Info(ctx, msg, fields...) | Log info-level message |
mlog.Debug(ctx, msg, fields...) | Log debug-level message |
mlog.Error(ctx, msg, fields...) | Log error-level message |
mlog.WithFields(ctx, fields...) | Add fields to context |
Field Accumulation Example
// Add fields to context (fields accumulate)
ctx = mlog.WithFields(ctx, mlog.String("request_id", "abc123"))
ctx = mlog.WithFields(ctx, mlog.Int64("user_id", 42))
// Subsequent logs automatically include these fields
mlog.Info(ctx, "processing request")
// Output: {"msg":"processing request", "request_id":"abc123", "user_id":42, ...}
Coordinator Communication Patterns
graph LR
subgraph "Client Requests"
S[Search/Query]
I[Insert/Delete]
M[Management]
end
subgraph "Coordinators"
PN[Proxy Node]
RC[Root Coord]
DC[Data Coord]
QC[Query Coord]
end
subgraph "Workers"
DN[Data Nodes]
QN[Query Nodes]
end
S --> PN
I --> PN
M --> PN
PN --> RC
PN --> DC
PN --> QC
RC -.-> DC
RC -.-> QC
DC --> DN
QC --> QNConfiguration and Health Monitoring
Consumer Group Management
Coordinator services track consumer groups for each topic:
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
key := constructCurrentID(topicName, groupName)
_, ok := rmq.consumersID.Load(key)
if ok {
if val, ok := rmq.consumers.Load(topicName); ok {
c := val.(*consumerList).Get(groupName)
return c != nil, c, nil
}
}
return false, nil, nil
}
Topic Cleanup
Topics are cleaned up atomically with multi-key removal:
func (rmq *rocksmq) DestroyTopic(topicName string) error {
// Remove page size info
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
// Remove page ts info
pageMsgTsKey := constructKey(PageTsTitle, topicName)
err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
// Remove acked ts info
ackedTsKey := constructKey(AckedTsTitle, topicName)
err = rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
// Batch remove, atomic operation
err = rmq.kv.MultiRemove(context.TODO(), removedKeys)
}
Best Practices
Topic Naming
- Avoid special characters like
/in topic names - Use descriptive naming conventions for easier debugging
Resource Locking
- Always release locks in defer statements
- Check cluster role before critical operations
- Handle lock acquisition failures gracefully
Retention Configuration
- Enable retention to prevent unbounded storage growth
- Balance retention time/size based on workload patterns
- Monitor RocksMQ disk usage in production deployments
Related Documentation
Source: https://github.com/milvus-io/milvus / Human Manual
Query Execution Engine
Related topics: Vector Index Types, System Architecture
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Vector Index Types, System Architecture
Query Execution Engine
Overview
The Query Execution Engine is the core component responsible for processing search and query requests in Milvus. It translates user queries into executable operations against stored data segments, leveraging vector similarity algorithms and efficient filtering mechanisms to retrieve relevant results.
The execution engine operates at the query node level, coordinating between segment management, index structures, and expression evaluation to deliver high-performance vector search capabilities.
Architecture Overview
graph TD
A[User Query Request] --> B[Query Plan Parser]
B --> C[Execution Plan Node Visitor]
C --> D[Segment Executor]
D --> E[Index-Based Search]
D --> F[Vector Comparison]
D --> G[Expression Filter]
E --> H[Results Merge & Rank]
F --> H
G --> H
H --> I[Final Results]Key Components
Query Plan Parser (Go)
Located in internal/parser/planparserv2/plan_parser_v2.go, the plan parser translates user query requests into an executable query plan. It handles:
- Expression Parsing: Converts filter expressions into abstract syntax trees
- Vector Query Construction: Builds search request structures with query vectors
- Plan Validation: Ensures query parameters are valid before execution
// Plan parser entry point structure
type PlanParserV2 struct {
// Parses query expressions and vector search parameters
// into executable plan nodes
}
ExecPlanNodeVisitor (C++)
The ExecPlanNodeVisitor.cpp implements the visitor pattern for traversing query plan nodes:
| Method | Purpose |
|---|---|
VisitPlanNode | Entry point for plan traversal |
VisitQuery | Processes query-specific nodes |
VisitSearch | Handles vector similarity search |
VisitFilter | Evaluates filter expressions |
VisitRetrieve | Fetches complete record data |
Source: internal/core/src/query/ExecPlanNodeVisitor.cpp
Segment Interface
The SegmentInterface.h defines the contract for segment execution:
// Core segment interface for query execution
class SegmentInterface {
virtual SearchResult Search(const QueryPlan& plan) = 0;
virtual RetrieveResult Retrieve(const QueryPlan& plan) = 0;
virtual int64_t GetRowCount() const = 0;
};
Source: internal/core/src/segcore/SegmentInterface.h
Expression System
Located in internal/core/src/exec/expression/Expr.h, the expression system provides:
- Vectorized Evaluation: Processes batches of data efficiently
- Type Safety: Supports Int, Float, String, and Array field types
- Optimized Predicates: Short-circuit evaluation for filter conditions
Source: internal/core/src/exec/expression/Expr.h
Execution Flow
Search Request Pipeline
sequenceDiagram
participant Client
participant Proxy
participant QueryNode
participant Segment
participant Index
Client->>Proxy: Search Request
Proxy->>QueryNode: Forward Query Plan
QueryNode->>Segment: Initialize Search
Segment->>Index: Query Index Structure
Index-->>Segment: Candidate IDs
Segment->>Segment: Apply Filter Expressions
Segment->>Index: Score Calculation
Index-->>Segment: Scored Results
Segment-->>QueryNode: Top-K Results
QueryNode-->>Proxy: Merged Results
Proxy-->>Client: Final ResultsQuery Node Search Implementation
The search.go file implements the core search execution logic:
func (s *SearchTask) Execute() error {
// 1. Load segments for the collection
segments, err := s.segmentManager.GetSegments()
// 2. Execute search on each segment
results := make([]*SearchResult, 0)
for _, seg := range segments {
result, err := seg.Search(s.plan)
results = append(results, result)
}
// 3. Merge results from all segments
return s.mergeResults(results)
}
Source: internal/querynodev2/segments/search.go
Expression Evaluation
Supported Expression Types
| Expression Type | Operator | Example |
|---|---|---|
| Comparison | =, !=, >, <, >=, <= | age >= 25 |
| Logical | AND, OR, NOT | age >= 25 AND gender = "M" |
| In | IN | status IN ["active", "pending"] |
| Range | BETWEEN | score BETWEEN 0.5 AND 0.9 |
| String Match | LIKE | name LIKE "John%" |
Expression Evaluation Order
graph LR
A[Raw Expression] --> B[Parse to AST]
B --> C[Optimize Order]
C --> D[Execute Filters]
D --> E[Short-Circuit Evaluation]
E --> F[Bitmap Generation]
F --> G[Vector Search Filtering]The expression evaluator generates a bitmap of matching entity IDs, which is then used to filter vector search results before returning to the user.
Segment Execution Strategies
Indexed Search
When an index exists for the vector field:
- Query index structure for candidate IDs
- Apply pre-filter expressions using index metadata
- Compute similarity scores on candidates only
- Return top-K results
Raw Scan
When no index exists:
- Scan all records in the segment
- Apply filter expressions during scan
- Compute similarity scores for all filtered entities
- Return top-K results
Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
queryNode.enableMaterializedView | true | Enable materialized view execution |
queryNode.chunkCacheEnabled | true | Enable chunk-level caching |
queryNode.nprobe | 16 | Number of probe clusters for IVF indexes |
queryNode.growingSizeLimit | -1 | Max results from growing segments |
Performance Considerations
Optimization Strategies
- Early Filtering: Apply filter expressions before vector search to reduce candidates
- Batch Processing: Process multiple queries concurrently using goroutines
- Index Selection: Automatically select optimal index type based on data characteristics
- Cache Utilization: Leverage chunk cache for frequently accessed segments
Monitoring Metrics
| Metric | Description |
|---|---|
query_execution_time_ms | Time taken for query execution |
segment_search_count | Number of segments searched |
index_query_count | Number of index queries performed |
expression_eval_count | Number of filter evaluations |
Related Components
Data Flow Integration
graph TD
A[Data Ingestion] --> B[Segment Builder]
B --> C[Growing Segment]
C --> D[Sealed Segment]
D --> E[Query Node]
E --> F[Query Execution Engine]
F --> G[Search Results]
H[Index Builder] --> D
I[Retention Manager] --> DComponent Dependencies
- Segment Manager: Manages lifecycle of queryable segments
- Index Executor: Handles index-specific query operations
- Result Merger: Combines results from multiple segments
- Cache Manager: Manages in-memory segment data
Common Use Cases
Vector Similarity Search with Filters
Search Request:
- Collection: "products"
- Vector Field: "embedding"
- Query Vector: [0.1, 0.2, ...]
- Filter: "category == 'electronics' AND price < 1000"
- Limit: 100
- Offset: 0
Hybrid Search
The execution engine supports hybrid searches combining:
- Vector similarity (ANN search)
- Scalar filtering (expression evaluation)
- Full-text matching (BM25)
- Array field operations
Troubleshooting
Slow Query Performance
- Check if appropriate index is built on vector field
- Verify filter expressions are selective
- Monitor segment sizes and memory usage
- Review cache hit rates
Memory Issues
- Adjust
queryNode.chunkCacheSizeparameter - Consider segment size limits for growing segments
- Monitor memory usage per query node
References
- Query Plan Parser: internal/parser/planparserv2/plan_parser_v2.go
- Execution Visitor: internal/core/src/query/ExecPlanNodeVisitor.cpp
- Segment Interface: internal/core/src/segcore/SegmentInterface.h
- Search Implementation: internal/querynodev2/segments/search.go
- Expression System: internal/core/src/exec/expression/Expr.h
Source: https://github.com/milvus-io/milvus / Human Manual
Vector Index Types
Related topics: Schema Design and Field Types, Query Execution Engine
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Schema Design and Field Types, Query Execution Engine
Vector Index Types
Milvus is a cloud-native vector database designed for similarity search and AI applications. At its core, Milvus stores and indexes high-dimensional vector embeddings to enable fast approximate nearest neighbor (ANN) searches across billions of vectors.
Overview
Vector indexes are data structures that organize vector data to accelerate similarity queries. Without indexes, searching requires comparing a query vector against every stored vector—a linear scan operation with O(n) complexity. Vector indexes reduce this to logarithmic or sub-linear complexity, making billion-scale similarity search practical.
Milvus supports multiple vector index types, each optimized for different use cases involving trade-offs between:
- Search speed — how quickly results are returned
- Memory footprint — RAM or disk usage
- Build time — index construction duration
- Accuracy — precision of ANN results vs. exact kNN
Index Type Architecture
graph TD
A[Vector Data] --> B{Index Type Selection}
B --> C[In-Memory Indexes]
B --> D[Disk-Based Indexes]
C --> C1[HNSW]
C --> C2[IVF Family]
C --> C3[SCANN]
C --> C4[Sparse Index]
D --> D1[DiskANN]
C1 --> E[Search Query]
C2 --> E
C3 --> E
C4 --> E
D1 --> E
E --> F[Top-k Results]Supported Index Types
1. HNSW (Hierarchical Navigable Small World)
HNSW is a graph-based indexing algorithm that constructs a multi-layer navigation structure. It offers excellent search performance with minimal tuning.
| Parameter | Type | Default | Description |
|---|---|---|---|
M | int | 16 | Maximum connections per node |
efConstruction | int | 200 | Search width during build |
ef | int | 100 | Search width during query |
Use cases: General-purpose ANN search with high accuracy requirements.
Source: client/index/hnsw.go
2. IVF (Inverted File Index)
IVF partitions vectors into clusters using k-means and maintains an inverted index mapping clusters to vectors.
| Parameter | Type | Default | Description |
|---|---|---|---|
nlist | int | 128 | Number of cluster centers |
nprobe | int | 16 | Number of clusters to search |
Variants:
IVF_FLAT— stores exact vectors in clustersIVF_SQ8— scalar quantization appliedIVF_PQ— product quantization applied
Source: client/index/ivf.go
3. SCANN (Scalable Nearest Neighbors)
SCANN is Google's high-performance ANN algorithm that combines inverted indexing with asymmetric hashing.
| Parameter | Type | Default | Description |
|---|---|---|---|
祠堂 | int | - | Number of subquantizers |
reorder_topk | int | - | Top-k results to reorder |
Use cases: High-throughput scenarios requiring high accuracy.
Source: client/index/scann.go
Community note: Feature request #2771 highlights community interest in ScaNN support, noting its potential for significantly better performance compared to other algorithms.
4. DiskANN
DiskANN is a disk-based index that enables searching billion-scale datasets with limited RAM. It uses a hybrid in-memory/disk architecture.
| Parameter | Type | Default | Description |
|---|---|---|---|
max_degree | int | 64 | Maximum out-degree in graph |
search_list_size | int | 100 | Search candidate list size |
pq_code_budget_gb | float | - | PQ code memory budget |
cache_dataset_on_disk | bool | false | Cache raw vectors on disk |
Use cases: Large-scale datasets exceeding available RAM.
Source: client/index/disk_ann.go, internal/core/src/index/VectorDiskIndex.cpp
5. Sparse Index
Sparse index is designed for sparse vectors, commonly used with embedding models that produce high-dimensional sparse representations.
| Parameter | Type | Default | Description |
|---|---|---|---|
mutable_ratio | float | 1.1 | Ratio for index mutability |
Use cases: Sparse embeddings, BM25-style retrieval.
Source: client/index/sparse.go
Vector Types and Index Compatibility
Milvus supports multiple vector types, and each index type has specific compatibility:
| Vector Type | HNSW | IVF | SCANN | DiskANN | Sparse |
|---|---|---|---|---|---|
| Float16 | ✅ | ✅ | ✅ | ✅ | ❌ |
| BFloat16 | ✅ | ✅ | ✅ | ✅ | ❌ |
| Float32 | ✅ | ✅ | ✅ | ✅ | ❌ |
| Binary | ✅ | ✅ | ❌ | ✅ | ❌ |
| Sparse Float | ❌ | ❌ | ❌ | ❌ | ✅ |
Index Build Process
graph LR
A[Collection Data] --> B[Create Index Request]
B --> C{Index Type Valid?}
C -->|No| D[Return Error]
C -->|Yes| E[Select Index Builder]
E --> F[Parameter Validation]
F --> G[Build Index Structure]
G --> H[Serialize to Storage]
H --> I[Index Ready for Search]Index Parameter Validation
Milvus includes a parameter validation layer that checks index parameters before building:
Source: internal/util/indexparamcheck/index_type.go
The validation system ensures:
- Parameters are within acceptable ranges
- Required parameters are present
- Parameter types match expectations
Search Workflow with Indexes
graph TD
A[Search Request] --> B[Load Index into Memory]
B --> C[Execute ANN Search]
C --> D[Rank Results by Distance]
D --> E[Apply Metric Type]
E --> F[Return Top-k Results]
B -.->|DiskANN| G[Memory-Mapped Access]
C -.->|HNSW| H[Graph Traversal]
C -.->|IVF| I[Cluster Pruning]Metric Types
Vector similarity is measured using one of these metrics:
| Metric Type | Description | Compatible Indexes |
|---|---|---|
L2 (Euclidean) | Euclidean distance between vectors | All |
IP (Inner Product) | Dot product of vectors | All |
COSINE | Cosine similarity | All |
Schema Definition
Vectors are defined in collection schemas with the following attributes:
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR,
dim=128, index_params={"index_type": "HNSW", "M": 16, "efConstruction": 200})
]
Common Issues and Best Practices
Index Not Built
- Symptom: Slow search performance
- Solution: Ensure index is built before loading collection
Memory Constraints
- Symptom: Out of memory errors
- Solution: Consider DiskANN or reduce index parameters
Accuracy Tuning
- Symptom: Low recall rates
- Solution: Increase HNSW
efor IVFnprobeparameters
Related Documentation
Community Resources
For additional support:
Source: https://github.com/milvus-io/milvus / Human Manual
Schema Design and Field Types
Related topics: Vector Index Types, Data Storage Layer
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Vector Index Types, Data Storage Layer
Schema Design and Field Types
Overview
Schema Design and Field Types form the foundation of how Milvus organizes and stores vector data. A schema defines the structure of a collection, specifying the fields that can be used to describe entities, including both metadata attributes and vector embeddings.
In Milvus, every collection requires a schema that specifies:
- Primary Key Field: Uniquely identifies each entity in the collection
- Vector Field(s): Stores the embedding vectors used for similarity search
- Metadata Fields: Optional fields to store descriptive attributes
Source: client/entity/schema.go:1-50
Schema Architecture
Core Schema Components
The schema system consists of several interconnected components that work together to provide a robust data modeling layer:
graph TD
A[Collection Schema] --> B[Field Schemas]
B --> C[Primary Key Field]
B --> D[Vector Fields]
B --> E[Metadata Fields]
C --> F[Int64 or String]
D --> G[FloatVector, BinaryVector, SparseVector]
E --> H[String, Int, Float, Bool, Array]Field Type Hierarchy
Milvus supports a comprehensive set of field types organized into distinct categories:
| Category | Data Types | Description |
|---|---|---|
| Integer | Int8, Int16, Int32, Int64 | Signed integer values |
| Floating Point | Float, Double | Decimal numbers |
| Boolean | Bool | True/False values |
| String | String, VarChar | Text data with optional length constraints |
| Vector | FloatVector, BinaryVector, SparseVector, Int8Vector | Embedding representations |
| Array | Array | Collection of elements of the same type |
Source: client/entity/field.go:1-100
Field Definitions
Field Schema Structure
Each field in a Milvus schema is defined by the FieldSchema structure:
type FieldSchema struct {
Name string // Field name identifier
DataType DataType // Data type enumeration
TypeParams map[string]string // Type-specific parameters
ElementType DataType // For array fields, the element data type
IsPrimaryKey bool // Marks the primary key field
IsPartitionKey bool // Enables partition by key
IsDynamic bool // Allows dynamic field population
Description string // Human-readable description
AutoID bool // Auto-generates IDs for this field
}
Source: client/entity/field.go:50-80
Field Type Parameters
Field behavior is customized through type-specific parameters stored as key-value pairs:
| Parameter | Applicable Types | Description | Example |
|---|---|---|---|
dim | FloatVector, BinaryVector, Int8Vector | Vector dimension | "128", "768" |
max_length | VarChar, String | Maximum string length | "256" |
max_capacity | Array | Maximum array elements | "100" |
element_type | Array | Element data type | "Int64", "Float" |
Source: internal/core/src/common/FieldMeta.h:1-100
Primary Key Fields
Supported Primary Key Types
The primary key field uniquely identifies each entity in a collection. Milvus supports two primary key data types:
| Data Type | Use Case | ID Generation |
|---|---|---|
| Int64 | Sequential numeric IDs | Auto-generated or user-specified |
| VarChar | String-based identifiers | User-specified |
Note: Issue #1924 in the community highlights that string ID support was a highly requested feature, now addressed through VarChar primary keys.
Source: internal/rootcoord/create_collection_task.go:100-150
Auto-ID Configuration
Primary key fields can be configured for automatic ID generation:
// Enable auto-generated IDs
FieldSchema{
Name: "id",
DataType: Int64,
IsPrimaryKey: true,
AutoID: true,
}
// User-specified IDs
FieldSchema{
Name: "id",
DataType: Int64,
IsPrimaryKey: true,
AutoID: false,
}
Source: client/entity/field.go:80-120
Vector Fields
Vector fields store embedding representations used for similarity search operations.
Dense Vector Types
#### FloatVector
The most common vector type for high-dimensional embeddings from models like BERT, CLIP, or custom neural networks.
FieldSchema{
Name: "embedding",
DataType: FloatVector,
TypeParams: map[string]string{
"dim": "768", // Embedding dimension
},
}
#### BinaryVector
Compact representation for binary embeddings, useful for models like CLIP's binary variant.
FieldSchema{
Name: "binary_embedding",
DataType: BinaryVector,
TypeParams: map[string]string{
"dim": "256", // Binary vector dimension (bits)
},
}
#### Int8Vector
Quantized vector type for int8 embeddings, providing memory efficiency.
FieldSchema{
Name: "int8_embedding",
DataType: Int8Vector,
TypeParams: map[string]string{
"dim": "128",
},
}
Source: internal/core/src/common/FieldMeta.h:100-200
Sparse Vector Support
Sparse vectors represent embeddings where most dimensions are zero, common in models like BM25 or SPLADE.
FieldSchema{
Name: "sparse_embedding",
DataType: SparseVector,
}
The recent v2.6.5 release added nullable vector columns across all vector types including sparse vectors.
Source: client/v2.6.5 Release Notes
Metadata Fields
Scalar Data Types
#### Integer Types
| Type | Range | Storage |
|---|---|---|
| Int8 | -128 to 127 | 1 byte |
| Int16 | -32,768 to 32,767 | 2 bytes |
| Int32 | -2,147,483,648 to 2,147,483,647 | 4 bytes |
| Int64 | -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 | 8 bytes |
#### Floating Point Types
| Type | Precision | Storage |
|---|---|---|
| Float | Single precision (7 digits) | 4 bytes |
| Double | Double precision (15 digits) | 8 bytes |
#### String Types
| Type | Description | Use Case |
|---|---|---|
| String | Variable-length string | General text storage |
| VarChar | Variable-length with max length constraint | Efficient storage with length limits |
FieldSchema{
Name: "category",
DataType: VarChar,
TypeParams: map[string]string{
"max_length": "256",
},
}
Source: client/entity/field.go:100-150
Array Fields
Array fields allow storing collections of elements of the same data type, enabling more complex data modeling.
Supported Array Element Types
- Int8, Int16, Int32, Int64
- Float, Double
- Bool
- VarChar
Array Field Definition
FieldSchema{
Name: "tags",
DataType: Array,
ElementType: VarChar,
TypeParams: map[string]string{
"max_capacity": "100", // Maximum number of elements
"max_length": "128", // Max length of each element
},
}
Recent releases (v2.6.3+) added Array field partial update helpers for ARRAY_APPEND and ARRAY_REMOVE operations in upsert requests.
Source: client/v2.6.3 Release Notes
Schema Validation
Collection Creation Validation
When creating a collection, the schema undergoes rigorous validation:
graph TD
A[CreateCollection Request] --> B[Schema Validation]
B --> C{Valid Schema?}
C -->|No| D[Return Error]
C -->|Yes| E[Check Field Constraints]
E --> F{Primary Key Exists?}
F -->|No| G[Return Error: Primary Key Required]
F -->|Yes| H{Vector Field Valid?}
H -->|No| I[Return Error: Invalid Vector]
H -->|Yes| J[Validate Field Types]
J --> K[Schema Accepted]Validation Rules
| Rule | Description | Error Code |
|---|---|---|
| Primary Key Required | Collection must have exactly one primary key | ErrNoPrimaryKey |
| Vector Dimension | Vector fields must have valid dimension > 0 | ErrInvalidDimension |
| Primary Key Type | Only Int64 or VarChar allowed as primary key | ErrInvalidPrimaryKeyType |
| AutoID Compatibility | AutoID only valid for Int64 primary keys | ErrInvalidAutoID |
| Field Name Uniqueness | No duplicate field names allowed | ErrDuplicateFieldName |
Source: internal/rootcoord/create_collection_task.go:150-300
Schema Modification
Alter Collection Operations
Milvus allows modifying collection schemas under specific conditions:
| Operation | Empty Collection | Non-Empty Collection |
|---|---|---|
| Add Field | ✅ Supported | ✅ Supported (nullable fields only) |
| Drop Field | ✅ Supported | ❌ Not Supported |
| Modify Field | ✅ Supported | ❌ Not Supported |
Note: Issue #20405 in the community requests the ability to modify schemas on non-empty collections, which is partially addressed through nullable field additions.
Source: internal/rootcoord/alter_collection_task.go:1-100
Adding Fields to Existing Collections
// Add a nullable field to existing collection
AlterCollectionRequest{
CollectionName: "my_collection",
Type: AddField,
FieldName: "new_metadata",
FieldType: VarChar,
TypeParams: map[string]string{
"max_length": "512",
},
Nullable: true, // Required for non-empty collections
}
The v2.6.5 release added validation to ensure vector fields added to existing collections are nullable before sending AddCollectionField requests.
Source: client/v2.6.5 Release Notes
Dynamic Fields
Dynamic fields enable flexible schema designs where entities can have attributes not defined in the schema. These fields are stored as key-value pairs in a special $meta or $dynamic field.
// Collection with dynamic fields enabled
CollectionSchema{
Fields: [...],
EnableDynamicField: true,
}
// Insert with dynamic attributes
InsertRequest{
CollectionName: "products",
Fields: [...],
FieldsData: [...],
// Dynamic fields
DynamicFields: map[string]interface{}{
"brand": "ExampleCorp",
"rating": 4.5,
"tags": []string{"electronics", "sale"},
},
}
Best Practices
Schema Design Guidelines
- Choose Appropriate Primary Key Type
- Use Int64 for sequential or auto-generated IDs
- Use VarChar for string-based identifiers with known prefix patterns
- Vector Dimension Planning
- Match dimension to your embedding model output
- Document the embedding source for reproducibility
- Consider memory footprint: FloatVector uses 4 bytes × dimension per vector
- String Field Sizing
- Set
max_lengthconservatively to reduce storage overhead - For VarChar primary keys, consider prefix patterns for efficient queries
- Partition Key Strategy
- Use high-cardinality fields (e.g., user_id, timestamp) as partition keys
- Avoid low-cardinality fields that create imbalanced partitions
Memory Considerations
| Field Type | Bytes per Value |
|---|---|
| Int64 | 8 |
| Float | 4 |
| Double | 8 |
| Bool | 1 |
| VarChar(n) | n + overhead |
| FloatVector(dim) | 4 × dim |
| BinaryVector(dim) | dim / 8 |
API Reference
Python SDK
from pymilvus import CollectionSchema, FieldSchema, DataType
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=256),
],
description="Example collection schema"
)
Go SDK
import "github.com/milvus-io/milvus/client/v2/entity"
schema := &entity.Schema{
Fields: []*entity.Field{
{
Name: "id",
DataType: entity.FieldTypeInt64,
PrimaryKey: true,
},
{
Name: "embedding",
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
"dim": "768",
},
},
},
}
Summary
Schema Design and Field Types in Milvus provide a flexible yet structured approach to organizing vector data:
- Flexible Field Types: Support for scalar, vector, and array types
- Primary Key Options: Int64 or VarChar with optional auto-generation
- Vector Diversity: Dense, sparse, binary, and quantized vectors
- Schema Evolution: Add nullable fields to existing collections
- Dynamic Fields: Handle unstructured attributes without schema changes
Understanding these concepts is essential for building efficient vector search applications with Milvus.
Source: https://github.com/milvus-io/milvus / Human Manual
Data Storage Layer
Related topics: Data Ingestion and Flow, Coordinator Services
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Data Ingestion and Flow, Coordinator Services
Data Storage Layer
Overview
The Data Storage Layer in Milvus is a multi-tiered system responsible for persisting vector data, metadata, and operational logs. It encompasses the message queue layer (RocksMQ), blob storage integration (MinIO/S3), binlog management, and coordination services that ensure data durability and consistency across the cluster.
Milvus employs a cloud-native storage architecture designed for high scalability and horizontal partitioning of data. The storage layer is decoupled from compute resources, allowing independent scaling of storage and query capabilities.
Key Components
| Component | Purpose | Storage Type |
|---|---|---|
| RocksMQ | Message queue and log persistence | Local RocksDB |
| ChunkManager | Blob storage abstraction | MinIO/S3 |
| DataNode | Data写入 persistent storage | ChunkManager |
| DataCoord | Segment allocation and metadata | etcd |
| Binlog | Operational change logs | ChunkManager |
Source: internal/datanode/README.md
Source: https://github.com/milvus-io/milvus / Human Manual
Data Ingestion and Flow
Related topics: Data Storage Layer, Coordinator Services
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Data Storage Layer, Coordinator Services
Data Ingestion and Flow
Overview
Data ingestion in Milvus refers to the process of receiving, buffering, persisting, and making vector data searchable. Milvus employs a multi-tiered architecture that separates write and read paths, enabling high-throughput insertions while maintaining query performance. The system decouples data ingestion from search operations through an asynchronous flow that leverages message queues and write buffers.
The data ingestion pipeline handles the journey from user-inserted vectors to searchable segments in object storage, incorporating automatic flushing, compaction, and indexing mechanisms.
Architecture Overview
graph TD
subgraph Client["Client Layer"]
SDK[SDK Insert/Upsert]
end
subgraph Proxy["Proxy Layer"]
TaskInsert[Insert Task]
TaskUpsert[Upsert Task]
ChannelsMgr[Channels Manager]
end
subgraph MessageQueue["Message Queue"]
RocksMQ[RocksMQ]
Pulsar[Pulsar]
Kafka[Kafka]
end
subgraph DataNode["Data Node"]
WriteBuffer[Write Buffer]
Flush[Flush Service]
Compactor[Compactor]
end
subgraph Storage["Storage Layer"]
Segment[Segment Files]
Index[Index Files]
end
SDK --> TaskInsert
SDK --> TaskUpsert
TaskInsert --> ChannelsMgr
TaskUpsert --> ChannelsMgr
ChannelsMgr --> RocksMQ
ChannelsMgr --> Pulsar
ChannelsMgr --> Kafka
RocksMQ --> WriteBuffer
Pulsar --> WriteBuffer
Kafka --> WriteBuffer
WriteBuffer --> Flush
Flush --> Segment
Compactor --> Segment
Segment --> IndexMessage Queue Integration
Milvus supports multiple message queue implementations for data ingestion buffering. The message queue layer abstracts the underlying implementation, allowing users to choose between RocksMQ, Pulsar, or Kafka based on their deployment requirements.
RocksMQ (Default)
RocksMQ is the default embedded message queue implementation in Milvus, suitable for standalone deployments. It stores messages in RocksDB and provides reliable message delivery with acknowledgment tracking.
#### Topic Management
Topics in RocksMQ represent logical channels for message streams. Each topic maintains its own message pages and consumer tracking information.
// CreateTopic initializes a new topic in RocksMQ
func (rmq *rocksmq) CreateTopic(topicName string) error {
// Check if topicName contains "/"
if strings.Contains(topicName, "/") {
return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
}
// topicIDKey is the only identifier of a topic
topicIDKey := TopicIDTitle + topicName
val, err := rmq.kv.Load(context.TODO(), topicIDKey)
if val != "" {
log.Warn("rocksmq topic already exists ", zap.String("topic", topicName))
return nil
}
// ...
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:280-295
#### Topic Name Restrictions
Topic names must not contain the "/" character, as this separator is used internally for key construction. Attempting to create a topic with "/" results in an unrecoverable error.
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:284
Consumer Groups
Consumer groups manage subscriptions to topics, tracking consumption progress for each consumer instance.
type consumerList struct {
consumers map[string]*Consumer // GroupName -> *Consumer
mu sync.RWMutex
}
func (l *consumerList) Add(consumer *Consumer) {
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.consumers[consumer.GroupName]; ok {
return
}
l.consumers[consumer.GroupName] = consumer
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-49
Producer Message Flow
Messages are produced to topics in batches, with each message assigned a unique ID upon successful storage.
func (rmq *rocksmq) updatePageMsgSize(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
// ...
mutateBuffer := make(map[string]string)
for _, id := range msgIDs {
msgSize := msgSizes[id]
if curMsgSize+msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
// Current page is full
newPageSize := curMsgSize + msgSize
pageEndID := id
// Update page message size for current page. key is page end ID
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
// ...
}
}
// ...
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:75-92
Write Buffer Management
Write buffers provide an in-memory staging area for incoming data before flushing to persistent storage. This component reduces I/O overhead by batching writes and allows for memory-efficient data handling.
graph LR
subgraph InMemory["Write Buffer"]
BufferA[Buffer A]
BufferB[Buffer B]
BufferN[Buffer N]
end
subgraph Flush["Flush Trigger"]
Trigger[Size/Time Trigger]
end
subgraph Persistent["Persistent Storage"]
SegA[Segment A]
SegB[Segment B]
end
Trigger -->|Flush| BufferA
Trigger -->|Flush| BufferB
BufferA --> SegA
BufferB --> SegBBuffer Lifecycle
| State | Description |
|---|---|
Active | Buffer accepting new data writes |
Full | Buffer reached size threshold, awaiting flush |
Flushing | Data being written to persistent storage |
Flushed | Data committed, buffer can be cleared |
PChannel (Persistent Channel) Metadata
PChannels manage the assignment and state tracking of streaming channels across the Milvus cluster.
type PChannelMeta struct {
inner *streamingpb.PChannelMeta
}
func (c *PChannelMeta) IsAssigned() bool {
return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
func (c *PChannelMeta) IsAssignedOrAssigning() bool {
return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED ||
c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING
}
Source: internal/streamingcoord/server/balancer/channel/pchannel.go:45-55
Channel Assignment States
| State | Description |
|---|---|
PCHANNEL_META_STATE_UNSPECIFIED | Initial state |
PCHANNEL_META_STATE_ASSIGNING | Channel being assigned to a node |
PCHANNEL_META_STATE_ASSIGNED | Channel successfully assigned |
PCHANNEL_META_STATE_DROPPING | Channel being removed |
PCHANNEL_META_STATE_DROPPED | Channel removed |
Data Flow: Insert Operation
The following sequence describes how an insert request flows through the system:
sequenceDiagram
participant Client
participant Proxy
participant ChannelMgr
participant MQ
participant DataNode
participant Storage
Client->>Proxy: Insert request
Proxy->>Proxy: Validate schema
Proxy->>ChannelMgr: Get channel for collection
ChannelMgr->>Proxy: Channel assignment
Proxy->>MQ: Produce messages
MQ-->>Proxy: Message IDs
Proxy-->>Client: Insert result with IDs
Note over MQ,DataNode: Asynchronous processing
DataNode->>MQ: Consume messages
DataNode->>DataNode: Buffer in memory
DataNode->>Storage: Flush to segment
DataNode->>DataNode: Build indexRetention and Cleanup
RocksMQ implements retention policies to manage disk space by removing acknowledged messages that have exceeded their retention period.
Retention Configuration
| Parameter | Description | Default |
|---|---|---|
RetentionSizeInMB | Maximum retained message size | -1 (disabled) |
RetentionTimeInMinutes | Maximum retention duration | -1 (disabled) |
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:28-30
Page-Based Cleanup
Retention operates on a page-based mechanism where messages are grouped into pages. Each page tracks its end ID, timestamp, and acknowledgment status.
func (ri *rocksmqRetention) expiredClean(topic string) error {
// ...
pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB,
typeutil.AddOne(pageMsgPrefix), pageReadOpts)
defer pageIter.Close()
for ; pageIter.Valid(); pageIter.Next() {
pKey := pageIter.Key()
pageID, err := parsePageID(string(pKey.Data()))
// ...
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
ackedTsVal, err := ri.kv.Load(context.TODO(), ackedTsKey)
if msgTimeExpiredCheck(ackedTs) {
pageEndID = pageID
deletedAckedSize += size
pageCleaned++
}
}
// ...
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:78-105
Message Deletion by Range
Messages are deleted using RocksDB's range delete functionality for efficient batch removal:
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
startKey := path.Join(topic, strconv.FormatInt(startID, 10))
endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
opts := gorocksdb.NewDefaultWriteOptions()
defer opts.Destroy()
return db.Write(opts, writeBatch)
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:45-56
Channels Manager
The Channels Manager in the Proxy layer routes insert and upsert operations to the appropriate data node channels based on collection and partition information.
func (mgr *channelsMgr) getChannels(collectionID UniqueID) ([]pChan, error) {
// Routes requests to appropriate proxy channels
// based on collection metadata
}
Source: internal/proxy/channels_mgr.go:1-50
Channel Assignment Strategy
| Strategy | Description |
|---|---|
Sharded | Messages distributed round-robin across channels |
Uniform | Equal distribution based on hash |
Lookup | All messages to single channel |
Compaction
Compaction merges small segments into larger ones, improving query performance and reclaiming storage space.
graph TD
subgraph Before["Before Compaction"]
Seg1[Segment 1 - 1K rows]
Seg2[Segment 2 - 1K rows]
Seg3[Segment 3 - 1K rows]
Seg4[Segment 4 - 1K rows]
end
subgraph After["After Compaction"]
Merged[Segment M - 4K rows]
end
Seg1 --> Merged
Seg2 --> Merged
Seg3 --> Merged
Seg4 --> MergedCompactor Component
The compactor evaluates segments based on:
- Segment size thresholds
- Number of deleted records
- Segment age
- Index type and state
Source: internal/datanode/compactor/compactor.go
Error Handling and Recovery
Topic Destruction
When destroying a topic, the system performs a multi-step cleanup process:
func (rmq *rocksmq) DestroyTopic(topicName string) error {
// Acquire topic lock
ll, ok := topicMu.Load(topicName)
lock, ok := ll.(*sync.Mutex)
lock.Lock()
defer lock.Unlock()
// Remove consumer associations
rmq.consumers.Delete(topicName)
rmq.topicName2LatestMsgID.Delete(topicName)
// Clean topic data
fixTopicName := topicName + "/"
err := rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
// Clean page size and timestamp info
pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
// Clean acknowledgment tracking
ackedTsKey := constructKey(AckedTsTitle, topicName)
rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
// Remove topic metadata
rmq.kv.MultiRemove(context.TODO(), removedKeys)
// Clean retention info
topicMu.Delete(topicName)
rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
}
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:340-380
Related Community Topics
Feature Requests and Limitations
The following community issues relate to data ingestion and flow:
| Issue | Description | Status |
|---|---|---|
| #9685 | Backup and restore functionality | Feature request |
| #20405 | Modify collection schema after creation | Feature request |
| #28583 | Milvus standalone crash with RocksMQ | Bug report |
String ID Support
Support for string-type primary keys has been a frequently requested feature (#4430, #1924), which would affect the data ingestion pipeline's ID handling mechanisms.
Configuration Reference
RocksMQ Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
rocksmq.compressionTypes | string | "0,0,0,0,0" | Compression types per level |
rocksmq.retentionSizeInMB | int64 | -1 | Retention size limit (-1 disabled) |
rocksmq.retentionTimeInMinutes | int64 | -1 | Retention time limit (-1 disabled) |
rocksmq.pageSize | int64 | 10MB | Page size for message batching |
Message Queue Selection
| MQ Type | Use Case | Deployment |
|---|---|---|
| RocksMQ | Development, Standalone | Embedded |
| Pulsar | Production, Multi-tenant | External |
| Kafka | High-throughput, Streaming | External |
Best Practices
- Batch Insert Operations: Group multiple vectors into single insert requests to reduce network overhead and improve throughput.
- Monitor Retention Settings: In production environments, configure retention parameters to prevent unbounded disk usage growth.
- Channel Planning: For high-throughput workloads, distribute collections across multiple channels to parallelize data ingestion.
- Consumer Group Management: Properly manage consumer groups to ensure reliable message consumption and acknowledgment.
- Index Building Strategy: Plan index types and build triggers based on query patterns to balance insert performance with search latency.
Logging and Debugging
Milvus uses structured logging through the mlog package for data ingestion operations:
log.Info("Expired check by retention time",
zap.String("topic", topic),
zap.Int64("pageEndID", pageEndID),
zap.Int64("deletedAckedSize", deletedAckedSize),
zap.Int64("lastAck", lastAck),
zap.Int64("pageCleaned", pageCleaned),
zap.Int64("time taken", time.Since(start).Milliseconds()))
Source: pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:120-127
Key Log Patterns
| Pattern | Component | Description |
|---|---|---|
Rocksmq create topic | RocksMQ | Topic initialization |
Rocksmq destroy topic | RocksMQ | Topic cleanup completion |
Expired check by retention | Retention | Page cleanup operations |
PChannel state change | StreamingCoord | Channel assignment updates |
Source: https://github.com/milvus-io/milvus / Human Manual
Go SDK (client/v2)
Related topics: Quick Start Guide, Schema Design and Field Types
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Continue reading this section for the full explanation and source context.
Related Pages
Related topics: Quick Start Guide, Schema Design and Field Types
Go SDK (client/v2)
The Go SDK (client/v2) is the official Go client library for Milvus, providing a high-level interface for interacting with Milvus vector database. It enables Go applications to perform vector operations including insertion, querying, similarity search, and collection management.
Overview
The Go SDK is designed for Go 1.24.12 or higher and provides idiomatic Go patterns for working with Milvus. The SDK wraps the Milvus gRPC API with a clean, type-safe interface that handles connection management, request serialization, and response parsing.
Source: client/README.md
Architecture
graph TD
A[Application Code] --> B[milvusclient Package]
B --> C[Entity Layer<br/>schema, collection definitions]
B --> D[gRPC Client<br/>Connection, Keepalive]
D --> E[Milvus Server]
F[Collection Operations] --> B
G[Write Operations] --> B
H[Read Operations] --> B
I[Index Operations] --> B
J[RBAC Operations] --> BPackage Structure
| Package | Purpose |
|---|---|
client/milvusclient | Main client interface with New(), collection, write, read, index, and RBAC operations |
client/entity | Data models including schema definitions, field types, and collection schemas |
Source: client/milvusclient/client.go
Client Configuration
Creating a Client
import "github.com/milvus-io/milvus/client/v2/milvusclient"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: "YOUR_MILVUS_ENDPOINT",
})
if err != nil {
// handle error
}
defer cli.Close(ctx)
ClientConfig Parameters
| Parameter | Type | Description |
|---|---|---|
Address | string | Milvus server address (host:port) |
DialOptions | []grpc.DialOption | Custom gRPC dial options |
Important: Custom DialOptions preserve the SDK's default settings for keepalive, backoff, and max receive message size. Previous versions could inadvertently drop these defaults. Source: client/v2.6.4 release notes
Collection Management
Creating a Collection
Collections are created using the CreateCollection method with a schema definition:
err = cli.CreateCollection(ctx, &milvusclient.CreateCollectionOption{
CollectionName: "my_collection",
Schema: collectionSchema,
})
Collection Schema
Schemas define the structure of vector data stored in Milvus:
collectionSchema := &entity.Schema{
CollectionName: "my_collection",
Description: "Example collection",
AutoID: false,
Fields: []entity.Field{
{
Name: "id",
DataType: entity.FieldTypeInt64,
PrimaryKey: true,
},
{
Name: "vector",
DataType: entity.FieldTypeFloatVector,
TypeParams: map[string]string{
entity.TypeParamDim: "128",
},
},
},
}
Schema Field Types
| Field Type | Description |
|---|---|
FieldTypeInt64 | 64-bit integer primary key |
FieldTypeFloatVector | Float vector for dense embeddings |
FieldTypeBinaryVector | Binary vector |
FieldTypeSparseVector | Sparse vector for BM25-style search |
FieldTypeJSON | JSON document field |
FieldTypeArray | Array field with element type |
Source: client/entity/schema.go
TruncateCollection
Removes all data from a collection without dropping and recreating it:
err = cli.TruncateCollection(ctx, NewTruncateCollectionOption("my_collection"))
This is useful for quickly clearing collection data while maintaining the schema and index definitions. Source: client/v2.6.3 release notes
Write Operations
Insert Data
Insert vectors with optional metadata:
ids, err := cli.Insert(ctx, &milvusclient.InsertOption{
CollectionName: "my_collection",
PartitionName: "optional_partition",
FieldsData: []entity.FieldData{
entity.NewInt64Field("id", []int64{1, 2, 3}),
entity.NewFloatVectorField("vector", [][]float32{
{0.1, 0.2, ...},
{0.3, 0.4, ...},
}),
},
})
Upsert Data
Upsert combines insert and update operations:
err = cli.Upsert(ctx, &milvusclient.UpsertOption{
CollectionName: "my_collection",
FieldsData: []entity.FieldData{...},
})
Array Field Partial Updates
The SDK supports partial updates for array fields with ARRAY_APPEND and ARRAY_REMOVE operations:
// Append elements to array field
err = cli.Upsert(ctx, &milvusclient.UpsertOption{
CollectionName: "my_collection",
FieldsData: ...,
// Uses ARRAY_APPEND for array fields
})
Source: client/milvusclient/write.go, client/v2.6.5 release notes
Read Operations
Search
Perform similarity search with various vector types:
results, err := cli.Search(ctx, &milvusclient.SearchOption{
CollectionName: "my_collection",
AnnsField: "vector",
SearchParams: map[string]any{
"metric_type": "L2",
"params": map[string]any{"nprobe": 10},
},
Vector: [][]float32{
{0.1, 0.2, 0.3, ...},
},
Limit: 10,
Offset: 0,
OutputFields: []string{"id"},
})
Vector Sub-field Columns
The SDK supports struct-array vector sub-field columns for advanced embedding scenarios:
// Search with embedding list
results, err := cli.Search(ctx, &milvusclient.SearchOption{
CollectionName: "my_collection",
AnnsField: "embeddings", // Array field with vector sub-fields
// ... additional search parameters
})
Source: client/milvusclient/read.go, client/v2.6.4 release notes
MAX_SIM Search
Maximum similarity search is supported for compatible index types:
results, err := cli.Search(ctx, &milvusclient.SearchOption{
CollectionName: "my_collection",
AnnsField: "sparse_vector",
SearchParams: map[string]any{
"metric_type": "MAX_SIM",
},
Vector: sparseVector,
Limit: 10,
})
Query with Filters
Query entities with boolean expressions:
results, err := cli.Query(ctx, &milvusclient.QueryOption{
CollectionName: "my_collection",
Expr: "id > 100",
OutputFields: []string{"id", "vector"},
Limit: 100,
})
Nullable Vector Columns
The SDK supports nullable vector columns across all vector types:
| Vector Type | Nullable Support |
|---|---|
Dense (FloatVector) | Yes |
Binary (BinaryVector) | Yes |
Sparse (SparseVector) | Yes |
Int8 (Int8Vector) | Yes |
Source: client/milvusclient/read.go, client/v2.6.5 release notes
Index Management
Create Index
Build indexes for efficient search:
err = cli.CreateIndex(ctx, &milvusclient.CreateIndexOption{
CollectionName: "my_collection",
FieldName: "vector",
IndexName: "vector_idx",
IndexParams: map[string]string{
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": "{\"nlist\":128}",
},
})
Index Types
| Index Type | Description |
|---|---|
FLAT | Brute-force search |
IVF_FLAT | Inverted file index |
IVF_SQ8 | IVF with scalar quantization |
HNSW | Hierarchical navigable small world |
DISKANN | Disk-based ANN |
Source: client/milvusclient/index.go
Role-Based Access Control (RBAC)
User Management
// Create user
err = cli.CreateUser(ctx, &milvusclient.CreateUserOption{
Username: "user1",
Password: "password123",
})
// Delete user
err = cli.DropUser(ctx, &milvusclient.DropUserOption{
Username: "user1",
})
Role Management
// Create role
err = cli.CreateRole(ctx, &milvusclient.CreateRoleOption{
RoleName: "viewer",
})
// Grant privilege
err = cli.Grant(ctx, &milvusclient.GrantOption{
Role: "viewer",
Object: entity.ObjectTypeCollection,
ObjectName: "my_collection",
Privilege: entity.PrivilegeSearch,
})
Get Replication Configuration
Retrieve the current replication settings:
config, err := cli.GetReplicateConfiguration(ctx, &milvusclient.GetReplicateConfigurationOption{})
Source: client/milvusclient/rbac.go, client/v2.6.3 release notes
SDK Version Compatibility
| Milvus Version | Go SDK Version |
|---|---|
| 2.6.17 | 2.6.4 |
| 2.6.15 | 2.6.3 |
| 2.6.14 | 2.6.1 |
| 2.6.13 | 2.6.1 |
| 2.6.12 | 2.6.1 |
The Go SDK maintains semantic versioning aligned with the Milvus server releases. Source: Milvus release notes
Installation
go get -u github.com/milvus-io/milvus/client/v2
Source: client/README.md
Best Practices
- Connection Management: Always defer
client.Close(ctx)to ensure proper cleanup - Context Handling: Use appropriate context timeouts for long-running operations
- Error Handling: Check all SDK method returns for errors before proceeding
- Batch Operations: Use batch insert for better throughput when loading large datasets
- Index Building: Create indexes after data insertion for optimal performance
Source: https://github.com/milvus-io/milvus / Human Manual
Doramagic Pitfall Log
Source-linked risks stay visible on the manual page so the preview does not read like a recommendation.
May increase setup, validation, or first-run risk for the user.
May increase setup, validation, or first-run risk for the user.
May increase setup, validation, or first-run risk for the user.
May increase setup, validation, or first-run risk for the user.
Doramagic Pitfall Log
Found 8 structured pitfall item(s), including 0 high/blocking item(s). Top priority: Installation risk - Installation risk requires verification.
1. Installation risk: Installation risk requires verification
- Severity: medium
- Finding: Project evidence flags a installation risk. Review the linked source before relying on this workflow.
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: community_evidence:github | cevd_f0bbd70e35ef4c65a374c802b7614b46 | https://github.com/milvus-io/milvus/issues/28583
2. Capability evidence risk: Capability evidence risk requires verification
- Severity: medium
- Finding: README/documentation is current enough for a first validation pass.
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: capability.assumptions | github_repo:208728772 | https://github.com/milvus-io/milvus
3. Runtime risk: Runtime risk requires verification
- Severity: medium
- Finding: Project evidence flags a runtime risk. Review the linked source before relying on this workflow.
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: packet_text.keyword_scan | github_repo:208728772 | https://github.com/milvus-io/milvus
4. Maintenance risk: Maintenance risk requires verification
- Severity: medium
- Finding: Project evidence flags a maintenance risk. Review the linked source before relying on this workflow.
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus
5. Security or permission risk: Security or permission risk requires verification
- Severity: medium
- Finding: no_demo
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: downstream_validation.risk_items | github_repo:208728772 | https://github.com/milvus-io/milvus
6. Security or permission risk: Security or permission risk requires verification
- Severity: medium
- Finding: no_demo
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: risks.scoring_risks | github_repo:208728772 | https://github.com/milvus-io/milvus
7. Maintenance risk: Maintenance risk requires verification
- Severity: low
- Finding: issue_or_pr_quality=unknown。
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus
8. Maintenance risk: Maintenance risk requires verification
- Severity: low
- Finding: release_recency=unknown。
- User impact: May increase setup, validation, or first-run risk for the user.
- Recommended check: Reproduce the official install and quickstart path in an isolated environment.
- Evidence: evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus
Source: Doramagic discovery, validation, and Project Pack records
Community Discussion Evidence
These external discussion links are review inputs, not standalone proof that the project is production-ready.
Count of project-level external discussion links exposed on this manual page.
Open the linked issues or discussions before treating the pack as ready for your environment.
Community Discussion Evidence
Doramagic exposes project-level community discussion separately from official documentation. Review these links before using milvus with real data or production workflows.
- Milvus standalone crashed - github / github_issue
- client/v2.6.5 - github / github_release
- milvus-2.6.17 - github / github_release
- milvus-2.6.16 - github / github_release
- milvus-3.0.0-beta - github / github_release
- client/v2.6.4 - github / github_release
- Capability evidence risk requires verification - GitHub / issue
Source: Project Pack community evidence and pitfall evidence