# https://github.com/milvus-io/milvus 项目说明书

生成时间：2026-05-31 03:45:33 UTC

## 目录

- [Milvus 概述](#overview)
- [系统架构](#architecture)
- [集合与 Schema 设计](#schema-design)
- [数据插入与查询流程](#data-flow)
- [向量索引类型](#index-types)
- [协调节点详解](#coordination-nodes)
- [执行节点详解](#execution-nodes)
- [Go SDK (client/v2) 使用指南](#client-sdk)
- [部署指南](#deployment)
- [监控与运维](#monitoring)

<a id='overview'></a>

## Milvus 概述

### 相关页面

相关主题：[系统架构](#architecture), [部署指南](#deployment)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [pkg/mlog/README.md](https://github.com/milvus-io/milvus/blob/main/pkg/mlog/README.md)
- [client/README.md](https://github.com/milvus-io/milvus/blob/main/client/README.md)
- [tools/README.md](https://github.com/milvus-io/milvus/blob/main/tools/README.md)
- [internal/streamingcoord/server/service/assignment.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/service/assignment.go)
</details>

# Milvus 概述

## 简介

Milvus 是一款云原生的向量数据库，专为大规模向量相似度搜索场景设计。它支持存储、索引和管理由深度学习网络产生的海量向量_embedding_，能够快速处理十亿级向量的相似度检索请求。Milvus 具备高可用、高性能、可扩展的特点，广泛应用于图像检索、视频分析、自然语言处理、推荐系统等领域。

**核心能力**

| 能力类别 | 具体描述 |
|---------|---------|
| 向量存储 | 支持 dense vector、binary vector、sparse vector、int8 vector 等多种向量类型 |
| 相似度检索 | 提供 IP（内积）、L2（欧氏距离）、HAMMING、JACCARD 等多种距离度量 |
| 索引类型 | 支持 FLAT、IVF_FLAT、IVF_SQ8、IVF_PQ、HNSW、DISKANN、GPU_CAGRA 等索引 |
| 标量字段 | 支持 Int8/16/32/64、Float/Double、Bool、String、Array、JSON 等数据类型 |
| 可扩展性 | 支持单机部署、分布式集群部署、Kubernetes 部署 |

## 系统架构

Milvus 采用分层架构设计，将系统划分为接入层、协调服务层、执行层和存储层。

```mermaid
graph TD
    subgraph 接入层["接入层"]
        SDK["SDK Clients<br/>Python/Go/Java/Node.js"]
        HTTP["HTTP API<br/>gRPC"]
    end
    
    subgraph 协调服务层["协调服务层"]
        Proxy["Proxy"]
        RootCoord["Root Coord"]
        IndexCoord["Index Coord"]
        QueryCoord["Query Coord"]
        DataCoord["Data Coord"]
    end
    
    subgraph 执行层["执行层"]
        QueryNode["Query Node"]
        DataNode["Data Node"]
        IndexNode["Index Node"]
    end
    
    subgraph 存储层["存储层"]
        MinIO["对象存储<br/>MinIO/S3"]
        RocksMQ["消息队列<br/>RocksMQ/Pulsar/Kafka"]
        MetaStore["元数据存储<br/>etcd"]
    end
    
    SDK --> HTTP
    HTTP --> Proxy
    Proxy --> RootCoord
    Proxy --> QueryCoord
    Proxy --> DataCoord
    RootCoord --> IndexCoord
    QueryCoord --> QueryNode
    DataCoord --> DataNode
    IndexCoord --> IndexNode
    QueryNode --> MinIO
    DataNode --> RocksMQ
    IndexNode --> MinIO
    QueryNode --> MetaStore
    DataNode --> MetaStore
```

### 协调服务层组件

| 组件 | 职责 | 资料来源 |
|-----|------|---------|
| Proxy | 接收客户端请求，负责请求验证、路由和结果聚合 | - |
| Root Coord | 管理 collection/partition 的元数据，处理 DDL 操作 | - |
| Query Coord | 管理 Query Node，协调分布式查询执行 | - |
| Data Coord | 管理 Data Node，协调数据写入和 compaction | - |
| Index Coord | 管理 Index Node，协调索引构建任务 | - |

### 执行层组件

| 组件 | 职责 |
|-----|------|
| Query Node | 执行向量搜索，支持增量更新和历史数据查询 |
| Data Node | 执行数据写入、删除和 compaction 操作 |
| Index Node | 执行索引构建，支持多种索引算法 |

## 消息队列实现

Milvus 支持多种消息队列作为数据通道，包括 RocksMQ、Pulsar 和 Kafka。RocksMQ 是 Milvus 默认的消息队列实现，基于 RocksDB 构建。

### RocksMQ 架构

RocksMQ 使用 RocksDB 作为底层存储，通过分页机制管理消息，实现高效的消息持久化和顺序读取。

```mermaid
graph LR
    subgraph RocksMQ["RocksMQ 架构"]
        subgraph 存储层["RocksDB"]
            TopicData["Topic Data<br/>消息存储"]
            MetaInfo["Meta Info<br/>元数据"]
            AckInfo["Ack Info<br/>消费确认"]
        end
        
        subgraph 内存结构["内存结构"]
            ConsumerMap["Consumer Map<br/>消费者映射"]
            PageInfo["Page Info<br/>分页信息"]
        end
    end
    
    Produce["Produce"] --> TopicData
    TopicData --> Consume["Consume"]
    ConsumerMap -.-> PageInfo
    PageInfo -.-> TopicData
```

### 主题管理

RocksMQ 中的每个主题（Topic）对应一个独立的消息流，主题名称不能包含 `/` 字符。创建主题时会初始化相关的元数据键值对：

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
func (rmq *rocksmq) CreateTopic(topicName string) error {
    // 检查主题名是否包含 "/"
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    // 主题ID键是主题的唯一标识符
    topicIDKey := TopicIDTitle + topicName
    // ...
}
```

### 消费者组管理

RocksMQ 支持消费者组机制，允许多个消费者共享同一个主题的消息流。消费者组通过 `consumerList` 结构管理：

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}
```

消费者组的存在性检查通过 `ExistConsumerGroup` 方法实现：

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}
```

### 消息保留机制

RocksMQ 实现了基于时间和大小的消息保留策略，通过 `RetentionInfo` 模块管理过期消息的清理：

| 配置参数 | 说明 | 默认值 |
|---------|------|-------|
| `rocksmq.retentionSizeInMB` | 保留消息总大小（MB），-1 表示不限制 | -1 |
| `rocksmq.retentionTimeInMinutes` | 保留时间（分钟），-1 表示不限制 | -1 |

消息清理流程：

```mermaid
graph TD
    Start["启动保留检查"] --> CheckSize{"检查保留大小"}
    CheckSize -->|"已超过"| CleanBySize["按大小清理<br/>从最旧页面开始"]
    CheckSize -->|"未超过"| CheckTime{"检查保留时间"}
    
    CheckTime -->|"有页面过期"| CleanByTime["按时间清理<br/>删除已确认且过期的页面"]
    CheckTime -->|"无过期页面"| End["结束"]
    
    CleanBySize --> DeleteRange["DeleteMessages<br/>删除消息范围"]
    CleanByTime --> DeleteRange
    
    DeleteRange --> WriteBatch["WriteBatch<br/>批量删除 RocksDB"]
```

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    // 按 ID 范围删除消息 [startID, endID)
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    // ...
}
```

## 日志系统

Milvus 使用 `mlog` 作为统一的日志库，基于 Uber 的 `zap` 日志框架构建。`mlog` 设计为**上下文感知的日志库**，专为 Milvus 分布式系统优化。

### 设计目标

| 目标 | 说明 |
|-----|------|
| 强制上下文传递 | 所有日志操作必须传递 context，确保请求可追溯 |
| 零开销抽象 | 使用类型别名避免封装开销，性能接近直接使用 zap |
| 自动字段累积 | 上下文字段自动在调用链中累积，子上下文继承父字段 |
| 跨服务传播 | 支持通过 gRPC metadata 传播关键字段用于分布式追踪 |
| 延迟编码 | 使用 `WithLazy` 延迟字段编码，日志级别禁用时避免编码开销 |

### 核心架构

```
┌──────────────────────────────────────────────────────────────────┐
│                           mlog Package                           │
├──────────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌────────────────────────┐  │
│  │  logger.go   │  │  context.go  │  │      field.go          │  │
│  │              │  │              │  │                        │  │
│  │ - Log()      │  │ - WithFields │  │ - Field constructors   │  │
│  │ - Debug()    │  │ - GetProp..  │  │ - PropagatedString     │  │
│  │ - Info()     │  │ - logContext │  │ - PropagatedInt64      │  │
│  └──────────────┘  └──────────────┘  └────────────────────────┘  │
└──────────────────────────────────────────────────────────────────┘
```

资料来源：[pkg/mlog/README.md](https://github.com/milvus-io/milvus/blob/main/pkg/mlog/README.md)

## 流式协调服务

Milvus 的流式协调服务（StreamingCoord）负责管理流式消息通道（PChannel）的分配和负载均衡。

### 通道元数据管理

`PChannelMeta` 是 PChannel 的元数据结构，包含通道名称、任期（Term）、访问模式和分配历史：

```go
// 资料来源：internal/streamingcoord/server/balancer/channel/pchannel.go
type PChannelMeta struct {
    inner *streamingpb.PChannelMeta
}

// 获取通道分配历史
func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
    history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
    for _, h := range c.inner.Histories {
        history = append(history, types.PChannelInfoAssigned{
            Channel: types.PChannelInfo{
                Name:       c.inner.GetChannel().GetName(),
                Term:       h.Term,
                AccessMode: types.AccessMode(h.AccessMode),
            },
            Node: types.NewStreamingNodeInfoFromProto(h.Node),
        })
    }
    return history
}

// 检查通道是否已分配
func (c *PChannelMeta) IsAssigned() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
```

### 分配服务

分配服务（AssignmentService）负责处理通道分配请求和配置变更：

```go
// 资料来源：internal/streamingcoord/server/service/assignment.go
type AssignmentService interface {
    streamingpb.StreamingCoordAssignmentServiceServer
}

func NewAssignmentService() streamingpb.StreamingCoordAssignmentServiceServer {
    assignmentService := &assignmentServiceImpl{
        listenerTotal: metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()),
    }
    registry.RegisterAlterReplicateConfigV2AckCallback(assignmentService.alterReplicateConfiguration)
    return assignmentService
}
```

## 客户端 SDK

Milvus 提供多语言 SDK 支持，包括 Python、Go、Java、Node.js 等主流编程语言。

### Go SDK (client/v2)

Go SDK 是 Milvus 官方推荐的客户端库，采用现代化的 API 设计：

| 组件 | 说明 |
|-----|------|
| `milvusclient` | 主客户端包，提供连接管理和请求发送 |
| 支持版本 | Go 1.24.12 或更高版本 |

```go
// 资料来源：client/README.md
import "github.com/milvus-io/milvus/client/v2/milvusclient"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
})
```

### SDK 版本对应关系

| Milvus 版本 | Python SDK | Node.js SDK | Java SDK | Go SDK |
|:-----------:|:----------:|:-----------:|:--------:|:------:|
| 2.6.17 | 2.6.14 | 2.6.14 | 2.6.20 | 2.6.4 |
| 2.6.16 | 2.6.13 | 2.6.14 | 2.6.19 | 2.6.4 |
| 2.6.15 | 2.6.12 | 2.6.13 | 2.6.18 | 2.6.3 |
| 2.6.14 | 2.6.11 | 2.6.13 | 2.6.17 | 2.6.1 |

## 开发工具

Milvus 提供 `mgit.py` 智能 Git 工作流工具，用于规范提交和 PR 流程。

### 功能特性

| 特性 | 说明 |
|-----|------|
| AI 提交信息 | 自动生成符合 Milvus 规范的提交信息 |
| DCO 自动签名 | 确保符合 Developer Certificate of Origin |
| 自动分支创建 | 防止直接提交到 master，自动创建特性分支 |
| 完整 PR 工作流 | 支持 fork → branch → commit → issue → PR → cherry-pick |
| 代码格式化 | 提交前自动运行本地格式化工具 |

### 提交信息规范

```
{type}: {description}

可选主体：详细说明
```

**type 类型**：

| type | 用途 |
|-----|------|
| `feat` | 新功能 |
| `fix` | 错误修复 |
| `enhance` | 增强优化 |
| `refactor` | 代码重构 |
| `test` | 测试相关 |
| `docs` | 文档更新 |
| `chore` | 构建/工具变更 |

**分支命名规范**：

```
{type}/{description}-{timestamp}
```

示例：`fix/memory-leak-1234`、`feat/add-gemini-api-5678`

## 社区热点问题

以下问题反映了用户最关心的功能需求：

| Issue | 标题 | 关注度 |
|-------|------|-------|
| #2771 | Add support for ScaNN index | 21 comments |
| #1924 | Support string id | 17 comments |
| #4430 | Support "string" type field | 15 comments |
| #20405 | Modify the collection schema once collection is created | 15 comments |
| #9685 | Backup and restore | 2 comments |

## 部署模式

Milvus 支持多种部署模式以适应不同的使用场景：

| 部署模式 | 说明 | 适用场景 |
|---------|------|---------|
| Standalone | 单机部署，所有组件运行在单一进程 | 开发测试 |
| Cluster | 分布式集群，支持水平扩展 | 生产环境 |
| Kubernetes | 基于 K8s 的云原生部署 | 云环境 |

### 依赖组件

| 组件 | 用途 | 可选方案 |
|-----|------|---------|
| 对象存储 | 向量数据持久化 | MinIO、S3、Azure Blob |
| 消息队列 | 数据通道、事件通知 | RocksMQ、Pulsar、Kafka |
| 元数据存储 | 集合、分区元数据 | etcd、MySQL、TiKV |

## 版本历史

Milvus 采用语义化版本控制，当前主要维护以下版本线：

- **v2.6.x**：最新稳定版本，持续添加新功能和优化
- **v2.5.x**：长期支持版本，重点修复安全漏洞和关键 bug

最新版本为 v2.6.17（2026年5月22日发布），引入了性能优化和多项新功能。

## 总结

Milvus 作为一款高性能的向量数据库，通过分层架构设计实现了良好的可扩展性和可用性。其核心组件包括协调服务层的各个 Coord 组件、执行层的 Node 组件，以及支持多种存储后端的灵活性。RocksMQ 消息队列提供了可靠的数据通道，mlog 日志系统确保了分布式环境下的可观测性，多语言 SDK 使开发者能够便捷地集成和使用 Milvus。

如需深入了解特定模块，请参考以下文档：

- [RocksMQ 详细实现](pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [消息保留机制](pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [日志系统设计](pkg/mlog/README.md)
- [Go SDK 使用指南](client/README.md)
- [开发工具使用](tools/README.md)

---

<a id='architecture'></a>

## 系统架构

### 相关页面

相关主题：[Milvus 概述](#overview), [协调节点详解](#coordination-nodes), [执行节点详解](#execution-nodes)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [internal/streamingcoord/server/balancer/channel/pchannel.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/balancer/channel/pchannel.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go)
- [tools/README.md](https://github.com/milvus-io/milvus/blob/main/tools/README.md)
</details>

# 系统架构

## 概述

Milvus 是一款云原生的大规模向量数据库，专为高效存储、索引和检索高维向量而设计。系统架构采用微服务设计，将核心功能模块解耦为多个独立服务组件，通过消息队列实现组件间通信，实现高可用和水平扩展能力。

Milvus 的系统架构覆盖了从数据摄入、索引构建、向量检索到数据持久化的完整数据生命周期。系统支持多种部署模式（单机和集群），以及多种消息队列后端（RocksMQ、Pulsar、Kafka）。

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50]()

## 整体架构

Milvus 集群模式采用分层架构设计，主要包含以下核心组件：

| 组件名称 | 功能描述 | 关键职责 |
|---------|---------|---------|
| Proxy | 代理层 | 接收客户端请求，路由转发，结果聚合 |
| RootCoord | 根协调器 | 元数据管理，时间戳分配，DDL 操作 |
| DataCoord | 数据协调器 | 存储协调，Segment 管理，索引管理 |
| QueryCoord | 查询协调器 | 查询节点管理，负载均衡，shard leader |
| DataNode | 数据节点 | 数据写入，压缩，索引构建 |
| QueryNode | 查询节点 | 向量搜索，标量过滤 |
| IndexNode | 索引节点 | 索引构建任务执行 |

```mermaid
graph TB
    subgraph 客户端层
        Client[SDK 客户端]
    end
    
    subgraph 协调层
        Proxy[Proxy]
        RootCoord[RootCoord]
        DataCoord[DataCoord]
        QueryCoord[QueryCoord]
    end
    
    subgraph 执行层
        DataNode[DataNode]
        QueryNode[QueryNode]
        IndexNode[IndexNode]
    end
    
    subgraph 存储层
        RocksDB[(RocksDB 元数据)]
        ObjectStorage[对象存储]
        MQ[消息队列]
    end
    
    Client --> Proxy
    Proxy --> RootCoord
    Proxy --> DataCoord
    Proxy --> QueryCoord
    Proxy --> MQ
    
    RootCoord --> RocksDB
    DataCoord --> RocksDB
    DataCoord --> MQ
    QueryCoord --> MQ
    
    DataNode --> RocksDB
    DataNode --> ObjectStorage
    DataNode --> MQ
    
    QueryNode --> ObjectStorage
    QueryNode --> MQ
    
    IndexNode --> ObjectStorage
    IndexNode --> MQ
```

## 消息队列子系统

### 架构概述

消息队列是 Milvus 架构中的核心通信枢纽，负责在各个微服务组件之间传递数据和控制消息。系统支持多种消息队列后端实现：

| 消息队列类型 | 实现路径 | 适用场景 |
|------------|---------|---------|
| RocksMQ | 内置，基于 RocksDB | 单机部署或轻量级场景 |
| Pulsar | Apache Pulsar | 大规模分布式部署 |
| Kafka | Apache Kafka | 高吞吐生产环境 |

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60]()

### RocksMQ 实现

RocksMQ 是 Milvus 内置的消息队列实现，基于 RocksDB 存储引擎构建。其核心设计采用 Topic-Partition-Consumer 模式：

```mermaid
graph LR
    subgraph RocksMQ 架构
        Topic[Topic 主题]
        Partition[Partition 分区]
        ConsumerGroup[Consumer Group 消费者组]
        RocksDB[(RocksDB)]
    end
    
    Topic --> Partition
    Partition --> ConsumerGroup
    Partition --> RocksDB
```

#### 核心数据结构

RocksMQ 实现中的关键数据结构包括：

- **Topic 管理**：每个 Topic 对应 RocksDB 中的一组 Key-Value 数据
- **Page 管理**：消息按页存储，每页包含多条消息记录
- **Consumer Group**：消费者组维护各消费者的消费位置
- **Ack 机制**：已确认消息标记，用于垃圾回收

```go
// Topic 标识符构造
func constructKey(metaName, topic string) string {
    return metaName + topic
}

// 分页 ID 解析
func parsePageID(key string) (int64, error) {
    stringSlice := strings.Split(key, "/")
    if len(stringSlice) != 3 {
        return 0, fmt.Errorf("invalid page id %s ", key)
    }
    return strconv.ParseInt(stringSlice[2], 10, 64)
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:10-30]()

#### 消费者列表管理

消费者组采用线程安全的 Map 结构管理：

```go
type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return
    }
    l.consumers[consumer.GroupName] = consumer
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60]()

### 消息保留机制

消息保留（Retention）机制是 RocksMQ 的核心功能之一，负责自动清理已确认的过期消息以释放存储空间。

#### 保留策略

系统支持两种保留策略：

| 策略类型 | 配置参数 | 说明 |
|---------|---------|------|
| 按大小保留 | `RocksmqCfg.RetentionSizeInMB` | 当已确认消息总大小超过阈值时触发清理 |
| 按时间保留 | `RocksmqCfg.RetentionTimeInMinutes` | 当消息确认时间超过指定分钟数时触发清理 |

```go
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:30-40]()

#### 页面清理流程

消息清理基于分页（Page）维度进行，每次清理遍历已确认的分页列表：

```mermaid
graph TD
    A[开始清理检查] --> B{检查保留策略}
    B -->|按大小| C[计算已确认消息总大小]
    B -->|按时间| D[检查消息确认时间戳]
    C --> E{超过阈值?}
    D --> F{已过期?}
    E -->|是| G[定位待清理分页]
    F -->|是| G
    E -->|否| H[结束]
    F -->|否| H
    G --> I[执行 RocksDB 范围删除]
    I --> J[更新元数据]
    J --> H
```

```go
// 按消息时间过期检查
func msgTimeExpiredCheck(ackedTs int64) bool {
    // 检查消息确认时间是否超过保留时间
}

// 按消息大小过期检查  
func msgSizeExpiredCheck(curDeleteSize, totalAckedSize int64) bool {
    // 检查已删除大小是否超过总已确认大小的阈值
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:50-150]()

#### 消息删除实现

清理操作通过 RocksDB 的范围删除（Range Delete）实现，保证高效的批量删除：

```go
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    return db.Write(opts, writeBatch)
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:150-200]()

## 存储层架构

### 分层存储设计

Milvus 采用分层存储架构，将不同类型的数据存储在最适合的存储介质中：

| 存储层 | 存储介质 | 存储内容 | 特点 |
|-------|---------|---------|------|
| 元数据层 | RocksDB/Etcd | Collection Schema、Segment 元信息、索引元数据 | 高可靠、低延迟 |
| 原始数据层 | 对象存储(MinIO/S3) | 原始向量数据、Delete Log | 高吞吐、低成本 |
| 索引层 | 对象存储 | IVF、HNSW、DiskANN 等索引结构 | 高效检索 |
| 缓存层 | 内存/SSD | 热数据缓存、索引缓存 | 加速访问 |

### Segment 管理

Segment 是 Milvus 数据存储的基本单元，分为两类：

| Segment 类型 | 说明 | 存储位置 |
|------------|------|---------|
| Growing Segment | 正在写入的 Segment | DataNode 内存 + 磁盘 |
| Sealed Segment | 已完成写入的 Segment | 对象存储 + 内存索引缓存 |

## 协调层组件

### RootCoord（根协调器）

RootCoord 负责管理所有元数据和系统级操作：

- **时间戳服务**：分配全局唯一递增时间戳
- **DDL 操作**：Create/Drop Collection、Create/Drop Partition 等
- **时间轴管理**：维护集合版本和操作历史

### DataCoord（数据协调器）

DataCoord 负责数据写入的协调工作：

- **Segment 分配**：为写入请求分配 Segment
- **垃圾回收**：协调 Segment 压缩和清理
- **健康检查**：监控 DataNode 状态

### QueryCoord（查询协调器）

QueryCoord 负责查询执行的协调：

- **查询调度**：将查询请求路由到合适的 QueryNode
- **负载均衡**：动态调整 QueryNode 负载
- **副本管理**：维护查询副本和高可用

## 流式协调子系统

### PChannel 管理

PChannel（Physical Channel）是 Milvus 流式系统的核心抽象，负责管理消息通道的物理资源：

```go
// PChannel 元数据
type PChannelMeta struct {
    inner *streamingpb.PChannelMeta
}

// 通道状态
func (c *PChannelMeta) State() streamingpb.PChannelMetaState {
    return c.inner.State
}

// 检查通道分配状态
func (c *PChannelMeta) IsAssigned() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}

// 检查通道分配中或已分配状态
func (c *PChannelMeta) IsAssignedOrAssigning() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED || 
           c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING
}
```

资料来源：[internal/streamingcoord/server/balancer/channel/pchannel.go:1-50]()

### 通道状态机

```mermaid
graph LR
    A[空闲] --> B[分配中]
    B --> C[已分配]
    C --> D[回收中]
    D --> A
    
    B -.->|失败| A
    C -.->|节点故障| D
```

通道状态包括：
- **空闲（IDLE）**：可供分配的通道
- **分配中（ASSIGNING）**：正在分配给流式节点
- **已分配（ASSIGNED）**：已分配给流式节点
- **回收中（RECOVERING）**：正在从故障中恢复

### 历史记录追踪

系统维护通道的完整分配历史：

```go
func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
    history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
    for _, h := range c.inner.Histories {
        history = append(history, types.PChannelInfoAssigned{
            Channel: types.PChannelInfo{
                Name:       c.inner.GetChannel().GetName(),
                Term:       h.Term,
                AccessMode: types.AccessMode(h.AccessMode),
            },
            Node: types.NewStreamingNodeInfoFromProto(h.Node),
        })
    }
    return history
}
```

## 数据流架构

### 写入数据流

```mermaid
sequenceDiagram
    participant Client as SDK 客户端
    participant Proxy as Proxy
    participant MQ as 消息队列
    participant DataNode as DataNode
    participant DataCoord as DataCoord
    participant Storage as 对象存储
    
    Client->>Proxy: Insert/Upsert 请求
    Proxy->>RootCoord: 获取时间戳
    RootCoord-->>Proxy: 时间戳
    Proxy->>DataCoord: 获取可写入 Segment
    DataCoord-->>Proxy: Segment 信息
    Proxy->>MQ: 发送消息
    Proxy-->>Client: 返回成功
    DataNode->>MQ: 消费消息
    DataNode->>DataNode: 写入数据
    DataNode->>Storage: 持久化
    DataNode->>DataCoord: 汇报状态
```

### 查询数据流

```mermaid
sequenceDiagram
    participant Client as SDK 客户端
    participant Proxy as Proxy
    participant QueryCoord as QueryCoord
    participant QueryNode as QueryNode
    participant Storage as 对象存储
    
    Client->>Proxy: Search/Query 请求
    Proxy->>QueryCoord: 获取 Shard Leaders
    QueryCoord-->>Proxy: Shard Leader 地址
    Proxy->>QueryNode: 并行发送查询
    QueryNode->>Storage: 加载索引
    QueryNode->>QueryNode: 执行向量检索
    QueryNode-->>Proxy: 返回 Top-K 结果
    Proxy->>Proxy: 结果聚合与排序
    Proxy-->>Client: 返回最终结果
```

## 部署模式

### 单机部署（Standalone）

| 组件 | 部署方式 | 说明 |
|-----|---------|------|
| allinone | 单进程 | 所有组件运行在单一进程中 |
| RocksMQ | 内置 | 使用内置 RocksMQ 作为消息队列 |
| RocksDB | 内置 | 元数据存储 |

### 集群部署（Cluster）

| 组件 | 副本数建议 | 高可用支持 |
|-----|----------|-----------|
| Proxy | 2+ | 负载均衡 |
| RootCoord | 2+ | 主备切换 |
| DataCoord | 2+ | 主备切换 |
| QueryCoord | 2+ | 主备切换 |
| DataNode | 按需 | 水平扩展 |
| QueryNode | 按需 | 水平扩展 |

## 开发工具

Milvus 提供了 `mgit.py` 开发工具，用于规范化的 Git 工作流程：

```bash
# 格式化代码
make fmt

# 静态检查
make static-check

# 提交类型
fix:      错误修复
feat:     新功能
enhance:  代码优化
refactor: 代码重构
test:     测试相关
docs:     文档更新
chore:    构建工具变更
```

资料来源：[tools/README.md:1-80]()

## 关键配置参数

### RocksMQ 配置

| 参数 | 默认值 | 说明 |
|-----|-------|------|
| `rocksmq.pageSize` | 10MB | 单页消息大小 |
| `rocksmq.retentionSizeInMB` | -1 | 按大小保留，-1 表示禁用 |
| `rocksmq.retentionTimeInMinutes` | -1 | 按时间保留，-1 表示禁用 |
| `rocksmq.tickerTimeInSeconds` | 1 | 保留检查间隔 |

### 系统级配置

| 参数 | 说明 |
|-----|------|
| `etcd.endpoints` | Etcd 集群地址 |
| `minio.address` | MinIO 对象存储地址 |
| `pulsar.address` | Pulsar 集群地址（可选） |
| `kafka.brokers` | Kafka Broker 列表（可选） |

## 社区热点与已知限制

根据社区反馈，以下功能需求较高：

| Issue | 描述 | 状态 |
|-------|------|------|
| #4430 | 支持 String 类型字段 | 长期需求 |
| #1924 | 支持 String ID | 长期需求 |
| #9685 | 备份与恢复功能 | 功能请求 |
| #20405 | 修改已有数据的 Collection Schema | 功能请求 |
| #2771 | ScaNN 索引支持 | 索引增强 |

## 总结

Milvus 的系统架构采用分层微服务设计，通过消息队列实现组件间解耦。核心设计特点包括：

1. **可扩展性**：各组件支持水平扩展，适应不同规模数据
2. **高可用**：关键组件支持多副本部署和故障自动转移
3. **灵活性**：支持多种消息队列和存储后端适配
4. **性能优化**：分层存储和多种索引类型支持高效向量检索

系统的核心协调机制依赖于 RootCoord、DataCoord、QueryCoord 三个协调组件，配合 Proxy 实现统一的入口管理。数据存储采用分层架构，元数据、原始数据和索引分离存储，兼顾可靠性和检索效率。

---

<a id='schema-design'></a>

## 集合与 Schema 设计

### 相关页面

相关主题：[数据插入与查询流程](#data-flow), [向量索引类型](#index-types)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [internal/streamingcoord/server/balancer/channel/pchannel.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/balancer/channel/pchannel.go)
- [internal/metastore/model/collection.go](https://github.com/milvus-io/milvus/blob/main/internal/metastore/model/collection.go)
- [internal/metastore/model/field.go](https://github.com/milvus-io/milvus/blob/main/internal/metastore/model/field.go)
- [internal/metastore/model/partition.go](https://github.com/milvus-io/milvus/blob/main/internal/metastore/model/partition.go)
</details>

# 集合与 Schema 设计

## 概述

Milvus 是一个云原生的向量数据库，集合（Collection）是其核心数据组织单元。每个集合由 **Schema**（模式）定义其数据结构，包括字段定义、分区策略和元数据配置。Schema 设计直接影响向量检索性能、存储效率和查询灵活性。

## 核心概念

### 数据模型层次结构

```mermaid
graph TD
    A[Milvus 数据库] --> B[集合 Collection]
    B --> C[Schema 定义]
    C --> D[字段 Fields]
    C --> E[分区 Partitions]
    C --> F[元数据 Metadata]
    D --> D1[主键字段 Primary Key]
    D --> D2[向量字段 Vector Fields]
    D --> D3[标量字段 Scalar Fields]
    D --> D4[Array 字段]
    E --> E1[分区名称]
    E --> E2[分区状态]
```

### 字段类型体系

| 字段类型 | 说明 | 可索引 | 可搜索 |
|---------|------|--------|--------|
| Primary Key | 唯一标识符，支持 Int64 | ✓ | ✓ |
| Float Vector | 浮点向量 (128-32768 维) | ✓ | ✓ |
| Binary Vector | 二进制向量 | ✓ | ✓ |
| Sparse Vector | 稀疏向量 | ✓ | ✓ |
| Int8 Vector | Int8 向量 | ✓ | ✓ |
| Bool | 布尔值 | ✓ | ✓ |
| Int8/16/32/64 | 整数类型 | ✓ | ✓ |
| Float/Double | 浮点类型 | ✓ | ✓ |
| VarChar | 可变长度字符串 | ✓ | ✓ |
| JSON | JSON 格式数据 | ✓ | ✗ |
| Array | 数组类型 | ✓ | ✓ |

资料来源：[internal/metastore/model/field.go]()

## Schema 定义架构

### Collection 模型结构

```go
type Collection struct {
    ID               int64
    Name             string
    Schema           *schemapb.CollectionSchema
    UID              int64
    PartitionIDs     []int64
    PartitionNames   []string
    CreatedUTC       uint64
    CreatedTimestamp uint64
    ConsistencyLevel commonpb.ConsistencyLevel
    Properties       map[string]string
    Status           .pb.Status
}
```

资料来源：[internal/metastore/model/collection.go]()

### Field 模型结构

```go
type Field struct {
    ID               int64
    Name             string
    Description      string
    DataType         schemapb.DataType
    TypeParams       []*commonpb.KeyValuePair
    IndexParams      []*commonpb.KeyValuePair
    State            .pb.State
    IsPrimaryKey     bool
    AutoID           bool
    Nullable         bool  // 支持 nullable 向量列
}
```

资料来源：[internal/metastore/model/field.go]()

## 向量字段设计

### 向量类型支持

Milvus 支持多种向量类型，最新版本（v2.6.4+）在 Go SDK 中完善了 struct-array 向量子字段支持：

| 向量类型 | 维度范围 | 索引类型 | 应用场景 |
|---------|---------|---------|---------|
| FloatVector | 1-32768 | IVF, HNSW, DiskANN, ScaNN | 文本/图像嵌入 |
| BinaryVector | 1-65536 | BIN_IVF_FLAT | 图像特征二进制码 |
| SparseVector | 任意稀疏度 | SPARSE_WAND | 稀疏文本检索 |
| Int8Vector | 1-32768 | IVF_FLAT | 高效量化向量 |

资料来源：[client/v2.6.4 Release Notes](https://github.com/milvus-io/milvus/releases/tag/client/v2.6.4)

### Nullable 向量列

从 v2.6.5 版本开始，Go SDK 支持可空向量列：

```mermaid
graph LR
    A[向量字段] -->|Dense| B[Float/Binary/Int8/Sparse]
    A -->|Nullable| C[允许 NULL 值]
    A -->|EmbeddingList| D[支持列表形式]
```

添加可空向量字段到现有集合时，系统会验证字段必须为 nullable：
资料来源：[client/v2.6.5 Release Notes](https://github.com/milvus-io/milvus/releases/tag/client/v2.6.5)

## 分区设计

### Partition 模型结构

```go
type Partition struct {
    ID           int64
    CollectionID int64
    Name         string
    Description  string
    CreatedTimestamp uint64
    CreatedUTCTimestamp   uint64
    State        pb.PartitionState
}
```

资料来源：[internal/metastore/model/partition.go]()

### 分区策略

```mermaid
graph TD
    A[数据写入] --> B{分区策略选择}
    B -->|时间序列| C[按时间分区]
    B -->|业务分类| D[按业务分区]
    B -->|数据量| E[大集合分区]
    C --> F[分区键字段]
    D --> F
    E --> G[负载均衡]
```

## 集合生命周期管理

### 创建集合流程

```mermaid
sequenceDiagram
    participant Client
    participant RootCoord
    participant MetaStore
    participant Proxy
    
    Client->>Proxy: CreateCollection
    Proxy->>RootCoord: 创建集合任务
    RootCoord->>RootCoord: 验证 Schema
    RootCoord->>MetaStore: 保存集合元数据
    MetaStore-->>RootCoord: 确认保存
    RootCoord->>Proxy: 返回成功
    Proxy-->>Client: 创建完成
```

### 集合操作 API

| 操作 | Go SDK | Python SDK | 说明 |
|------|--------|------------|------|
| CreateCollection | `CreateCollection()` | `create_collection()` | 创建集合 |
| DropCollection | `DropCollection()` | `drop_collection()` | 删除集合 |
| TruncateCollection | `TruncateCollection()` | - | 清空数据（不删除集合）|
| AddCollectionField | `AddCollectionField()` | - | 向现有集合添加字段 |
| HasCollection | `HasCollection()` | `has_collection()` | 检查集合是否存在 |

资料来源：[client/v2.6.3 Release Notes - TruncateCollection API](https://github.com/milvus-io/milvus/releases/tag/client/v2.6.3)

## Schema 设计最佳实践

### 字段设计原则

1. **合理选择主键类型**
   - Int64 主键性能最优
   - 社区请求支持 String ID（Issue #1924）- 当前版本仅支持 Int64

2. **向量维度优化**
   - 根据实际 Embedding 模型选择维度
   - 避免过大维度导致存储和检索开销增加

3. **分区策略**
   - 大数据量集合建议分区
   - 按时间或业务属性分区便于数据管理

### Schema 约束

| 约束项 | 限制值 |
|--------|--------|
| 单集合最大字段数 | 128 |
| 单字段最大维度 | 32768 |
| 主键字段 | 必须唯一，每个集合仅一个 |
| 向量字段 | 每个集合至少一个 |
| 字段名称最大长度 | 256 字符 |

## 社区热点问题

### 热门 Issue 关注

| Issue | 标题 | 状态 |
|-------|------|------|
| #4430 | Support "string" type field | 社区功能请求 |
| #20405 | Modify the collection schema once collection is created | Schema 修改需求 |
| #1924 | Support string id | 主键类型扩展 |

### Schema 修改限制

当前 Milvus 对已创建集合的 Schema 有以下限制：

- 已存在的非空集合无法直接修改 Schema
- 仅支持添加 nullable 字段
- 向量字段添加到现有集合时必须标记为 nullable

资料来源：[client/v2.6.5 Release Notes](https://github.com/milvus-io/milvus/releases/tag/client/v2.6.5)

## 存储架构与 Topic 管理

### RocksMQ Topic 结构

集合数据在底层 RocksMQ 中的存储组织：

```mermaid
graph TD
    A[Topic] --> B[Page Messages]
    A --> C[Page Size Index]
    A --> D[Page Timestamp]
    A --> E[Acked Timestamp]
    
    B -->|按 ID 范围| F[Message Data]
    C -->|PageMsgSizeTitle| G[大小索引]
    D -->|PageTsTitle| H[时间索引]
    E -->|AckedTsTitle| I[确认状态]
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go]()

### 消息保留策略

| 策略类型 | 配置参数 | 说明 |
|---------|---------|------|
| 按大小保留 | `RetentionSizeInMB` | 超过阈值删除旧消息 |
| 按时间保留 | `RetentionTimeInMinutes` | 超过时间删除旧消息 |
| 页清理 | 自动触发 | 基于确认状态清理已消费页面 |

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go]()

## 索引与查询

### 索引类型支持

| 索引类型 | 向量类型 | 特点 |
|---------|---------|------|
| IVF_FLAT | Float/Binary/Int8 | 精确聚类，平衡精度与速度 |
| HNSW | Float/Binary | 图索引，高速高recall |
| DiskANN | Float | 磁盘索引，超大规模数据 |
| SPARSE_WAND | Sparse | 稀疏向量专用 |
| ScaNN | Float | Google 高性能算法（Issue #2771）|

## 总结

Milvus 的集合与 Schema 设计采用分层架构：
- **逻辑层**：Collection、Schema、Field 抽象
- **元数据层**：通过 Metastore 持久化模型定义
- **存储层**：RocksMQ 消息队列与 RocksDB 存储
- **索引层**：多种向量索引算法支持

Schema 设计时需综合考虑业务需求、数据规模、查询模式等因素，合理选择字段类型、向量维度和分区策略，以获得最优的检索性能和存储效率。

---

<a id='data-flow'></a>

## 数据插入与查询流程

### 相关页面

相关主题：[集合与 Schema 设计](#schema-design), [协调节点详解](#coordination-nodes)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go)
- [internal/streamingcoord/server/balancer/channel/pchannel.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/balancer/channel/pchannel.go)
</details>

# 数据插入与查询流程

## 概述

Milvus 的数据插入与查询流程是向量数据库核心操作的基础架构，涵盖了从客户端数据摄入到查询执行的完整链路。该流程涉及多个关键组件的协作，包括消息队列（MQ）、存储层、索引服务以及查询节点。

Milvus 支持多种消息队列后端，其中 RocksMQ 是默认的消息队列实现。RocksMQ 基于 RocksDB 构建，提供持久化的消息发布与订阅功能，确保数据的可靠性和一致性。消息队列在整个数据流程中扮演着缓冲层和异步处理的关键角色，使得数据写入与查询可以解耦运行。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq.go:1-30]()

## 架构设计

### 系统组件层次

Milvus 的数据流架构可以分为四个主要层次：接入层、协调层、执行层和存储层。接入层负责处理客户端请求，执行协议转换；协调层管理元数据和集群状态；执行层处理实际的数据操作；存储层负责数据的持久化。

```mermaid
graph TD
    A[客户端 SDK] --> B[Proxy 接入层]
    B --> C[消息队列 MQ]
    C --> D[DataNode 数据节点]
    D --> E[RocksDB 存储]
    D --> F[QueryNode 查询节点]
    F --> G[索引缓存]
    C --> H[Consumer 消费者]
    H --> F
```

### RocksMQ 角色定位

RocksMQ 是 Milvus 架构中的消息传输中枢，负责在数据写入阶段接收来自 Proxy 的插入请求，并将这些消息分发给下游的 DataNode 进行处理。RocksMQ 采用基于主题（Topic）的消息模型，每个 Collection 对应一个独立的消息 Topic，确保数据隔离。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-80]()

## 数据插入流程

### 客户端请求阶段

客户端通过 SDK 构建插入请求，包含向量数据、元数据字段以及时间戳信息（GuaranteeTimestamp）。SDK 负责将数据序列化为 protobuf 格式，并通过 gRPC 发送到 Milvus 集群的 Proxy 节点。Proxy 作为请求的入口，负责请求验证、负载均衡和结果聚合。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq.go:15-25]()

### 消息队列处理阶段

当数据到达 Proxy 后，插入请求被转换为 ProducerMessage 格式并发布到 RocksMQ。RocksMQ 的 Produce 方法负责将消息持久化到 RocksDB 存储引擎中。消息按照 Page 进行组织，每个 Page 包含多个消息记录，当 Page 达到配置的大小上限时会创建新的 Page。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:150-200]()

```mermaid
graph LR
    A[Insert Request] --> B[Proxy Validate]
    B --> C[Construct ProducerMessage]
    C --> D[RocksMQ Produce]
    D --> E[RocksDB Persist]
    E --> F[Return Message IDs]
```

### 消息分页机制

RocksMQ 使用基于分页的消息存储策略，消息按照固定大小组织成 Page。每个 Page 有一个唯一的 PageID（pageEndID），记录该 Page 中最后一条消息的位置。Page 的元数据信息包括：

| 元数据键 | 用途 |
|---------|------|
| `topic_id/{topicName}` | 主题唯一标识 |
| `message_size/{topicName}` | 当前 Page 的消息总大小 |
| `page_message_size/{topicName}/{pageID}` | 各 Page 的结束 ID |
| `page_ts/{topicName}/{pageID}` | Page 最后更新的时间戳 |
| `acked_ts/{topicName}/{pageID}` | Page 最新的确认时间戳 |

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:200-260]()

### DataNode 消费处理

DataNode 通过订阅相应的 Topic 来消费消息。DataNode 启动时会创建 Consumer 实例，并注册到 RocksMQ 的消费者列表中。消费者使用消费者组（ConsumerGroup）机制实现负载均衡，多个 DataNode 可以协同消费同一个 Topic 的消息。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:80-120]()

## 消费者管理机制

### 消费者列表数据结构

RocksMQ 使用 `consumerList` 结构管理消费者，每个 Topic 维护一个消费者映射表，键为消费者组名称（GroupName），值为 Consumer 实例指针。该结构使用读写锁（sync.RWMutex）保证并发安全。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-70]()

```go
type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}
```

### 消费者操作

消费者管理提供以下核心操作：

- **Add**：将新消费者添加到列表，使用组名作为唯一标识
- **Remove**：根据组名移除消费者，返回被移除的 Consumer 指针
- **Get**：根据组名获取消费者实例，支持并发读取
- **Notify**：通知指定消费者组有新消息到达

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-70]()

### 消费者组检查

ExistConsumerGroup 方法用于检查消费者组是否存在并返回对应的 Consumer 实例。该方法首先检查消费者 ID 映射表中是否存在该组名，然后查询消费者列表获取详细信息。如果消费者组存在但未注册消费者，则返回 false。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:280-310]()

## 数据查询流程

### 查询请求入口

查询请求首先到达 Proxy 节点，Proxy 负责解析查询条件并确定目标查询节点。查询类型包括向量相似度搜索（ANN Search）和标量字段过滤查询。Proxy 根据一致性哈希或负载均衡策略将请求路由到合适的 QueryNode。资料来源：[internal/streamingcoord/server/balancer/channel/pchannel.go:1-30]()

### 索引与搜索

QueryNode 负责执行实际的向量搜索操作。Milvus 支持多种索引类型，包括 IVF、HNSW、DiskANN 等。查询时，QueryNode 首先在内存中加载已构建的索引，然后执行近似最近邻搜索算法找到与查询向量最相似的 Top-K 结果。资料来源：[internal/streamingcoord/server/balancer/channel/pchannel.go:20-50]()

### 流式协调通道

PChannel（Physical Channel）是 Milvus 流式系统的核心概念，每个 PChannel 对应一个物理消息通道。PChannelMeta 管理通道的元数据信息，包括通道状态（Assigned、Assigning、Unassigned）、历史分配记录以及当前分配的节点信息。查询节点通过订阅 PChannel 来接收实时的数据更新。资料来源：[internal/streamingcoord/server/balancer/channel/pchannel.go:50-90]()

```mermaid
graph TD
    A[Query Request] --> B[Proxy Route]
    B --> C[QueryNode Pool]
    C --> D[Load Index]
    D --> E[ANN Search]
    E --> F[Post-filter]
    F --> G[Return Results]
```

## 数据保留机制

### 保留策略

RocksMQ 实现了两层数据保留机制：基于消息大小和基于时间的保留策略。通过配置参数可以控制保留行为：

| 配置参数 | 说明 | 默认值 |
|---------|------|--------|
| `RetentionSizeInMB` | 消息总大小阈值，超过后清理旧消息 | -1（禁用） |
| `RetentionTimeInMinutes` | 消息保留时间阈值 | -1（禁用） |
| `PageSize` | 单个 Page 的最大消息大小 | 可配置 |

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:1-60]()

### 保留检查函数

checkRetention 函数判断是否启用了保留机制，当任一保留参数不为 -1 时返回 true，表示系统需要执行定期的清理任务。保留任务由 retentionInfo 组件在后台定期执行，遍历所有已确认的 Page 并删除超过阈值的旧数据。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:25-35]()

### 过期页面清理流程

保留机制的核心是遍历所有已确认的 Page，检查其确认时间戳（acked_ts）是否超过配置的保留时间。对于大小超限的情况，系统会累加已清理的 Page 大小，当总大小超过 RetentionSizeInMB 时停止清理。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:100-180]()

```mermaid
graph TD
    A[Start Retention Check] --> B[Get All Pages]
    B --> C{Page Acked?}
    C -->|Yes| D{Check Time Expire}
    C -->|No| G[Break Loop]
    D -->|Yes| E[Mark for Deletion]
    D -->|No| G
    E --> F[Accumulate Size]
    F --> C
    G --> H[Delete Marked Pages]
```

### 消息删除实现

DeleteMessages 函数通过 RocksDB 的范围删除操作移除指定 ID 范围内的消息。删除操作使用 WriteBatch 批量处理，确保原子性。起始键为 `topic/startID`，结束键为 `topic/(endID+1)`，实现左闭右开的区间删除。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:60-90]()

## 主题生命周期管理

### 主题创建

CreateTopic 方法负责初始化新的 Topic。创建前会检查 Topic 名称是否合法（不能包含"/"字符），然后在 RocksDB 中写入初始元数据。如果 Topic 已存在则直接返回，避免重复创建。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-220]()

### 主题销毁

DestroyTopic 方法执行完整的 Topic 清理工作，包括：删除消费者列表、清理消息存储、删除 Page 元数据、清理确认时间戳信息以及释放相关资源。销毁操作使用互斥锁确保并发安全。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:250-290]()

### 主题名称验证

系统禁止在主题名称中使用"/"字符，这是为了避免与内部键名分隔符冲突。创建主题时如果检测到非法字符，会返回 Unrecoverable 错误并记录警告日志。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:185-195]()

## 状态管理

### RocksMQ 状态机

RocksMQ 实例维护两种状态：RmqStateStopped 和 RmqStateHealthy。状态转换由系统自动管理，初始化时为 Stopped 状态，Start 方法调用后进入 Healthy 状态。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:260-280]()

| 状态值 | 含义 | 转换条件 |
|-------|------|---------|
| RmqStateStopped | 已停止 | 初始化或 Close 调用后 |
| RmqStateHealthy | 运行正常 | Start 方法调用后 |

### 关闭流程

Close 方法首先停止保留任务（stopRetention），然后关闭 RocksDB 数据库连接。关闭前会等待所有进行中的操作完成，确保数据一致性。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:220-250]()

## 测试验证

### 基本功能测试

rocksmq_impl_test.go 提供了完整的功能测试覆盖，包括主题创建、消息生产消费、消费者组管理以及错误场景验证。测试使用独立的 RocksDB 实例，通过 assert 断言验证预期行为。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:1-100]()

### 保留机制测试

rocksmq_retention_test.go 验证保留策略的正确性，包括时间过期和大小超限两种场景。测试创建大量消息，消费后等待指定时间或累积足够数据量，然后验证过期消息已被清理。资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go:1-80]()

## 常见问题与社区反馈

根据社区反馈，以下是用户在使用数据插入与查询流程时经常遇到的问题：

**RocksMQ 独立部署崩溃问题**：部分用户在独立部署模式下使用默认的 RocksMQ 时遇到崩溃问题，这通常与 RocksDB 配置或资源限制相关。建议在生产环境使用 Pulsar 或 Kafka 作为消息队列后端。资料来源：[GitHub Issue #28583]()

**字符串类型支持**：社区持续关注字符串类型字段的支持情况，这是影响数据插入灵活性的重要功能需求。资料来源：[GitHub Issue #4430]()

**Schema 修改限制**：当前 Collection 创建后不允许修改 Schema，这在某些动态场景下造成了不便。用户希望能够在 Collection 非空时仍能添加新字段。资料来源：[GitHub Issue #20405]()

## 总结

Milvus 的数据插入与查询流程是一个复杂但设计精良的系统，涵盖了从客户端到存储层的完整数据链路。RocksMQ 作为默认消息队列，提供了可靠的持久化消息传递能力，配合 DataNode 和 QueryNode 实现了高效的数据摄入和查询执行。保留机制确保了系统的存储空间可控，而消费者组机制则支持了水平扩展和高可用部署。理解这一流程对于优化 Milvus 性能、排查问题以及进行二次开发都至关重要。

---

<a id='index-types'></a>

## 向量索引类型

### 相关页面

相关主题：[集合与 Schema 设计](#schema-design), [执行节点详解](#execution-nodes)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [client/index/hnsw.go](https://github.com/milvus-io/milvus/blob/main/client/index/hnsw.go)
- [client/index/ivf.go](https://github.com/milvus-io/milvus/blob/main/client/index/ivf.go)
- [client/index/scann.go](https://github.com/milvus-io/milvus/blob/main/client/index/scann.go)
- [client/index/disk_ann.go](https://github.com/milvus-io/milvus/blob/main/client/index/disk_ann.go)
- [client/index/sparse.go](https://github.com/milvus-io/milvus/blob/main/client/index/sparse.go)
- [internal/core/src/index/VectorDiskIndex.cpp](https://github.com/milvus-io/milvus/blob/main/internal/core/src/index/VectorDiskIndex.cpp)
- [internal/index/InvertedIndexTantivy.cpp](https://github.com/milvus-io/milvus/blob/main/internal/index/InvertedIndexTantivy.cpp)
</details>

# 向量索引类型

Milvus 作为一款高性能的向量数据库，索引技术是实现快速近似最近邻（ANN）搜索的核心组件。本页面详细介绍 Milvus 支持的各种向量索引类型、它们的实现原理、适用场景以及配置参数。

## 概述

向量索引是一种数据结构，旨在加速在高维向量空间中的相似性搜索。传统的精确搜索（暴力搜索）在数据量增长时性能急剧下降，而索引通过预计算和图/树/量化等结构，将搜索复杂度从 O(n) 降低到 O(log n) 或更低。

Milvus 支持多种向量索引类型，以满足不同场景对搜索精度、速度和内存占用的需求。索引类型的选择需要综合考虑数据集规模、维度、查询延迟要求、内存限制等因素。

### 向量索引分类

```mermaid
graph TD
    A[向量索引类型] --> B[内存索引]
    A --> C[磁盘索引]
    A --> D[倒排索引]
    
    B --> B1[HNSW]
    B --> B2[IVF-Flat]
    B --> B3[IVF-PQ]
    B --> B4[SCaNN]
    B --> B5[FLAT]
    
    C --> C1[DISKANN]
    
    D --> D1[SPARSE倒排索引]
```

## HNSW 索引

### 简介

HNSW（Hierarchical Navigable Small World）是一种基于图的近似最近邻搜索算法。它通过构建多层可导航小世界图来实现高效的向量搜索，在搜索时从最上层开始，逐步向下精确定位最近邻。

### 核心参数

| 参数名称 | 类型 | 默认值 | 说明 |
|---------|------|--------|------|
| `M` | int | 16 | 每层最大连接数，影响图的密度和搜索精度 |
| `efConstruction` | int | 200 | 构建时动态列表大小，影响索引质量 |
| `ef` | int | 100 | 搜索时动态列表大小，影响搜索精度和速度 |
| `threshold握手` | float | 0.5 | 候选结果阈值 |

### 工作原理

```mermaid
graph TD
    A[查询向量] --> B[从顶层开始搜索]
    B --> C[找到局部最优点]
    C --> D[下沉到下一层]
    D --> E{是否到达底层?}
    E -->|否| D
    E -->|是| F[返回最近邻结果]
    
    G[图构建过程] --> G1[均匀分布节点到多层]
    G1 --> G2[每层随机选择概率递减]
    G2 --> G3[构建小世界图连接]
```

资料来源：[client/index/hnsw.go:1-100]()

### 适用场景

- **高精度需求**：HNSW 提供优秀的搜索精度，ef 参数越大精度越高
- **动态数据**：支持实时插入和删除操作
- **中等规模数据**：通常用于百万到千万级向量
- **延迟敏感场景**：搜索延迟可控制在毫秒级

## IVF 索引

### 简介

IVF（Inverted File System）是一种基于聚类的索引方法。它先将向量空间划分为多个聚类中心，搜索时只扫描与查询向量最近的几个聚类，从而大幅减少搜索范围。

### 变体类型

#### IVF-Flat

IVF-Flat 保持聚类内向量的原始形态，不进行量化压缩。它在精度和速度之间取得平衡，适合需要高精度且内存充足的环境。

#### IVF-PQ

IVF-PQ 在 IVF 基础上增加了 Product Quantization（产品量化），将高维向量分割成多个子空间并分别量化，大幅降低内存占用但会引入量化误差。

### 核心参数

| 参数名称 | 类型 | 默认值 | 说明 |
|---------|------|--------|------|
| `nlist` | int | 1024 | 聚类中心数量 |
| `nprobe` | int | 8 | 搜索时探测的聚类数 |

资料来源：[client/index/ivf.go:1-100]()

### 性能特性

- **内存效率**：IVF-PQ 可将内存占用降低到原始数据的 1/10 甚至更低
- **搜索速度**：nprobe 越大速度越慢但精度越高
- **构建速度**：相对较快，适合大规模数据批量构建

## DISKANN 索引

### 简介

DISKANN 是一种设计用于磁盘存储的向量索引算法，特别适合超大规模数据集。它通过图索引结构和磁盘优化策略，使得在有限内存条件下仍能实现高效的向量搜索。

### 核心优势

```mermaid
graph LR
    A[超大规模数据] --> B[磁盘存储]
    B --> C[图索引结构]
    C --> D[内存缓存热点]
    D --> E[高效搜索]
```

### 技术实现

资料来源：[internal/core/src/index/VectorDiskIndex.cpp:1-100]()

DISKANN 的核心设计理念是将大部分索引结构存储在磁盘上，只将搜索过程中频繁访问的图结构保留在内存中。这种设计使得单机可以支持数十亿甚至上百亿级别的向量搜索。

### 适用场景

- **超大规模数据**：单机无法容纳全部索引的场景
- **内存受限环境**：需要在有限内存下提供可接受的搜索性能
- **冷热数据分离**：大部分数据存储在磁盘，热数据保留在内存

## SCaNN 索引

### 简介

SCaNN（Scalable Nearest Neighbors）是 Google 开发的先进向量索引算法，在多项基准测试中表现出色。它结合了量化技术和图搜索的优势，在保持高精度的同时实现极致的搜索速度。

资料来源：[client/index/scann.go:1-100]()

### 社区关注

社区对 SCaNN 索引有较高期待，GitHub Issue #2771 专门讨论了此功能的需求。社区反馈表明，用户期望通过 SCaNN 获得比现有索引类型更好的性能表现。

### 核心特性

| 特性 | 说明 |
|------|------|
| 高召回率 | 可配置的高精度搜索 |
| 低延迟 | 优化了搜索路径减少计算量 |
| 可扩展性 | 良好的水平扩展能力 |

## Sparse 稀疏向量索引

### 简介

Milvus 支持稀疏向量类型，适用于词袋模型、TF-IDF 等产生的稀疏特征表示。稀疏索引采用倒排索引结构，显著提高稀疏向量的搜索效率。

资料来源：[client/index/sparse.go:1-100]()

### 倒排索引实现

```mermaid
graph TD
    A[稀疏向量集合] --> B[构建倒排列表]
    B --> C[词汇 -> 文档列表]
    C --> D[查询处理]
    D --> E[合并相关列表]
    E --> F[计算相似度]
    F --> G[返回排序结果]
```

资料来源：[internal/index/InvertedIndexTantivy.cpp:1-100]()

### 适用场景

- **文本嵌入**：BGE、SPLADE 等稀疏文本模型
- **词袋特征**：传统 NLP 中的 TF-IDF 向量
- **高维稀疏数据**：维度利用率低于 5% 的场景

## 二值向量索引

### 支持的索引类型

Milvus 支持二值向量字段的专门索引，适用于二值化特征表示的场景。常见的索引类型包括：

| 索引类型 | 描述 | 适用场景 |
|---------|------|---------|
| BIN_FLAT | 暴力搜索 | 小规模数据、高精度需求 |
| BIN_IVF | 倒排文件索引 | 中等规模数据 |

### 特点

- **内存效率极高**：每个维度仅占 1 bit
- **快速位运算**：利用 CPU 位操作加速距离计算
- **适用场景有限**：主要用于二值化特征的相似性搜索

## FLAT 索引

### 简介

FLAT 是最基础的索引类型，采用暴力搜索方式遍历所有向量。它不构建任何预处理数据结构，搜索时直接计算查询向量与所有向量的距离。

### 适用场景

- **小规模数据**：数据量在十万级以下
- **高精度需求**：需要 100% 召回率的场景
- **调试测试**：作为基准对比其他索引的性能

### 性能特点

- **搜索延迟**：O(n × d)，与数据规模和维度成正比
- **内存占用**：仅存储原始向量，无额外开销
- **精度**：100% 召回率，无量化误差

## 索引选择指南

```mermaid
graph TD
    A[选择索引类型] --> B{数据规模?}
    B -->|百万级以下| C{内存充足?}
    C -->|是| D[推荐 HNSW 或 FLAT]
    C -->|否| E[推荐 IVF-PQ]
    B -->|百万到千万级| F{延迟要求?}
    F -->|极高| G[推荐 HNSW]
    F -->|中等| H[推荐 IVF]
    B -->|千万到亿级| I{磁盘限制?}
    I -->|是| J[推荐 DISKANN]
    I -->|否| K[推荐 HNSW 或 SCaNN]
    B -->|亿级以上| L[推荐 DISKANN]
```

### 选择决策表

| 数据规模 | 内存限制 | 延迟要求 | 推荐索引 |
|---------|---------|---------|---------|
| < 10万 | 无 | 高精度 | FLAT |
| < 100万 | 充足 | < 10ms | HNSW |
| < 100万 | 有限 | < 50ms | IVF-PQ |
| 100万-1000万 | 充足 | < 20ms | HNSW |
| 100万-1000万 | 有限 | < 100ms | IVF |
| 1000万-1亿 | 有限 | < 200ms | DISKANN |
| > 1亿 | 严重受限 | < 500ms | DISKANN |

## 索引构建与查询

### 构建索引

索引构建通常在数据导入后执行，可以使用以下方式：

1. **自动构建**：创建集合时设置 `auto_index` 参数
2. **手动构建**：调用 `create_index` 接口显式构建

### 查询参数调整

搜索时可动态调整查询参数以平衡精度和性能：

```python
# Python SDK 示例
search_params = {
    "metric_type": "L2",
    "params": {
        "ef": 64  # HNSW 搜索参数
    }
}
```

## 常见问题

### Q: 如何选择合适的索引类型？

A: 根据数据规模、内存限制、延迟要求三个维度综合评估。小规模数据优先考虑精度，大规模数据优先考虑内存效率。

### Q: HNSW 的 ef 参数如何设置？

A: ef 值越大搜索精度越高，但延迟也相应增加。建议从默认值 100 开始，根据精度需求逐步调整。

### Q: DISKANN 和内存索引如何选择？

A: 当数据量超过单机内存容量时，选择 DISKANN；在内存充足的情况下，HNSW 通常能提供更低的搜索延迟。

## 相关资源

- 官方文档：[索引类型 | Milvus](https://milvus.io/docs/index.md)
- GitHub Issue #2771：[Add support for ScaNN index](https://github.com/milvus-io/milvus/issues/2771)
- 最新发布版本中的索引相关更新

---

<a id='coordination-nodes'></a>

## 协调节点详解

### 相关页面

相关主题：[系统架构](#architecture), [执行节点详解](#execution-nodes)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go)
- [tools/README.md](https://github.com/milvus-io/milvus/blob/main/tools/README.md)
</details>

# RocksMQ 消息队列实现详解

## 概述

RocksMQ 是 Milvus 中基于 RocksDB 实现的消息队列组件，作为默认的消息存储后端。在 Milvus 架构中，RocksMQ 负责处理数据写入、消息分页、消费者组管理以及消息保留策略等核心功能。

> 注：本页内容基于源码文件中的 RocksMQ 实现代码，关于协调节点的详细内容请参阅 rootcoord、datacoord、querycoordv2 等相关源码。

## 核心架构

RocksMQ 采用 RocksDB 作为底层存储，通过键值对的方式管理主题（Topic）、消息页（Page）和消费者组（Consumer Group）。

```mermaid
graph TD
    A[Producer 发送消息] --> B[RocksMQ 消息分页]
    B --> C[消息存储到 RocksDB]
    C --> D[Consumer Group 消费]
    D --> E[消息确认 ACK]
    E --> F[Retention 清理过期消息]
    
    G[RocksDB Key 结构] --> G1[TopicID: topicName]
    G --> G2[PageMsgSize: topic/pageEndID]
    G --> G3[PageTs: topic/pageEndID]
    G --> G4[AckedTs: topic/consumerGroup/pageID]
```

## 主题管理

### 创建主题

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:125-145
func (rmq *rocksmq) CreateTopic(topicName string) error {
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 检查主题名是否包含 "/"
    if strings.Contains(topicName, "/") {
        log.Warn("rocksmq failed to create topic for topic name contains \"/\"")
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // 主题ID键是主题的唯一标识
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    if err != nil {
        return err
    }
    if val != "" {
        log.Warn("rocksmq topic already exists")
        return nil
    }
    
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    rmq.consumers.LoadOrStore(topicName, &consumerList{consumers: make(map[string]*Consumer)})
    // ... 初始化操作
}
```

**关键约束：**
- 主题名称不能包含字符 `/`
- 主题创建时会初始化消费者列表和互斥锁
- 已存在的主题不会重复创建

### 销毁主题

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-230
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 清理主题数据
    fixTopicName := topicName + "/"
    err := rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 清理页大小信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    
    // 清理页时间戳信息
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 清理已确认消息时间戳
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
    
    // 清理主题信息和消息大小
    topicMu.Delete(topicName)
    rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
}
```

## 消费者组管理

### 消费者列表结构

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-85
type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> *Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return
    }
    l.consumers[consumer.GroupName] = consumer
}

func (l *consumerList) Remove(groupName string) *Consumer {
    l.mu.Lock()
    defer l.mu.Unlock()
    delete(l.consumers, groupName)
    return nil
}

func (l *consumerList) Get(groupName string) *Consumer {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if consumer, ok := l.consumers[groupName]; ok {
        return consumer
    }
    return nil
}
```

### 消费者组存在性检查

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:250-265
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}
```

## 消息保留策略

### 保留策略配置

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-45
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
```

### 消息删除实现

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:80-100
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    err := db.Write(opts, writeBatch)
    return err
}
```

### 基于大小的过期清理

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:150-200
// 遍历已确认页，计算总大小
func (ri *RetentionInfo) getTotalAckedSize(topic string) (int64, error) {
    fixedAckedTsKey := constructKey(AckedTsTitle, topic)
    pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
    
    pageReadOpts := gorocksdb.NewDefaultReadOptions()
    defer pageReadOpts.Destroy()
    
    pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts)
    defer pageIter.Close()
    pageIter.Seek([]byte(pageMsgPrefix))
    
    var ackedSize int64
    for ; pageIter.Valid(); pageIter.Next() {
        key := pageIter.Key()
        pageID, err := parsePageID(string(key.Data()))
        // ... 检查页是否已确认
        ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
        ackedTsVal, err := ri.kv.Load(context.TODO(), ackedTsKey)
        if ackedTsVal == "" {
            break  // 未确认，停止计算
        }
        // 累加已确认消息大小
        ackedSize += size
    }
    return ackedSize, nil
}
```

## RocksDB 键结构设计

| 键类型 | 格式 | 说明 |
|--------|------|------|
| TopicID | `TopicIDTitle + topicName` | 主题唯一标识 |
| PageMsgSize | `PageMsgSizeTitle + topicName + "/" + pageEndID` | 页消息大小 |
| PageTs | `PageTsTitle + topicName + "/" + pageEndID` | 页时间戳 |
| AckedTs | `AckedTsTitle + topicName + "/" + consumerGroup + "/" + pageID` | 已确认消息时间戳 |
| MessageSize | `MessageSizeTitle + topicName` | 主题消息总大小 |
| CurrentID | `CurrentIDTitle + topicName + "/" + groupName` | 消费者当前位置 |

### 键解析函数

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:20-30
func parsePageID(key string) (int64, error) {
    stringSlice := strings.Split(key, "/")
    if len(stringSlice) != 3 {
        return 0, fmt.Errorf("invalid page id %s", key)
    }
    return strconv.ParseInt(stringSlice[2], 10, 64)
}

func constructKey(metaName, topic string) string {
    return metaName + topic
}
```

## 分页存储机制

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:300-340
// 当页大小超过配置阈值时创建新页
mutateBuffer := make(map[string]string)
for _, id := range msgIDs {
    msgSize := msgSizes[id]
    if curMsgSize+msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
        // 当前页已满，创建新页
        newPageSize := curMsgSize + msgSize
        pageEndID := id
        
        // 更新页消息大小
        pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
        
        // 更新页时间戳
        pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageTsKey] = nowTs
        
        curMsgSize = 0
    } else {
        curMsgSize += msgSize
    }
}
```

## 测试覆盖

### 消费者组测试

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:100-150
func TestRocksmq_ExistConsumerGroup(t *testing.T) {
    rmq, err := NewRocksMQ(rocksdbPath)
    assert.NoError(t, err)
    defer rmq.Close()
    
    topicName := "topic_a"
    rmq.CreateTopic(topicName)
    defer rmq.DestroyTopic(topicName)
    
    groupName := "test"
    consumer := &Consumer{
        Topic:     topicName,
        GroupName: groupName,
        MsgMutex:  make(chan struct{}),
    }
    rmq.RegisterConsumer(consumer)
    
    exist, _, _ := rmq.ExistConsumerGroup(topicName, groupName)
    assert.Equal(t, exist, true)
    
    dummyGrpName := "group_dummy"
    exist, _, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
    assert.Equal(t, exist, false)
}
```

### 保留策略测试

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go:50-100
func TestRetentionInfo_PageTimeExpire(t *testing.T) {
    params := paramtable.Get()
    paramtable.Init()
    
    // 测试页时间过期
    params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "1")
    params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "-1")
    
    topicName := "topic_a"
    err = rmq.CreateTopic(topicName)
    
    // 生产大量消息
    msgNum := 100000
    pMsgs := make([]ProducerMessage, msgNum)
    for i := 0; i < msgNum; i++ {
        msg := "message_" + strconv.Itoa(i)
        pMsgs[i] = ProducerMessage{Payload: []byte(msg)}
    }
    ids, err := rmq.Produce(topicName, pMsgs)
    assert.Equal(t, len(pMsgs), len(ids))
    
    // 消费所有消息
    for i := 0; i < msgNum; i++ {
        cMsg, err := rmq.Consume(topicName, groupName, 1)
        cMsgs = append(cMsgs, cMsg[0])
    }
    
    // 等待页面过期
    time.Sleep(time.Duration(2) * time.Second)
    err = rmq.ForceSeek(topicName, groupName, ids[0])
    assert.NoError(t, err)
}
```

## 配置参数

| 参数 | 默认值 | 说明 |
|------|--------|------|
| `rocksmq.retentionTimeInMinutes` | `-1` (禁用) | 消息保留时间（分钟） |
| `rocksmq.retentionSizeInMB` | `-1` (禁用) | 消息保留大小（MB） |
| `rocksmq.pageSize` | `10` MB | 单页最大容量 |
| `rocksmq.tickerTimeInSeconds` | `1` 秒 | 保留检查间隔 |

## 状态流程

```mermaid
stateDiagram-v2
    [*] --> Created: CreateTopic
    Created --> Active: Produce Messages
    Active --> Consuming: Register Consumer
    Consuming --> Acknowledged: ACK Message
    Acknowledged --> Retained: Wait for Retention
    Retained --> Expired: Time/Size Check
    Expired --> [*]: DeleteMessages
    
    Created --> [*]: DestroyTopic
    Active --> [*]: DestroyTopic
```

## 线程安全机制

```go
// 资料来源：pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-40
var topicMu = sync.Map{}

// 主题操作使用互斥锁保护
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    lock, ok := ll.(*sync.Mutex)
    if !ok {
        return fmt.Errorf("get mutex failed")
    }
    lock.Lock()
    defer lock.Unlock()
    // ... 清理操作
}
```

## 常见问题与社区反馈

### 相关社区 Issue

根据社区反馈，以下问题与 RocksMQ 相关：

- **独立部署崩溃问题** ([#28583](https://github.com/milvus-io/milvus/issues/28583))：使用 RocksMQ 作为默认消息队列时，独立部署可能出现崩溃问题，涉及版本 v2.2 及后续版本

### 生产环境建议

1. **监控保留策略**：确保 `retentionTimeInMinutes` 和 `retentionSizeInMB` 配置适当
2. **磁盘空间管理**：RocksMQ 会持续累积数据，需监控 RocksDB 磁盘使用
3. **消费者组管理**：及时清理不再使用的消费者组，避免孤儿消息堆积

## 相关文档

- [Milvus 官方文档 - RocksMQ](https://milvus.io/docs)
- [开发工具说明](tools/README.md) - 包含提交规范和分支命名规则

---

<a id='execution-nodes'></a>

## 执行节点详解

### 相关页面

相关主题：[系统架构](#architecture), [协调节点详解](#coordination-nodes), [向量索引类型](#index-types)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go)
- [internal/streamingcoord/server/balancer/channel/pchannel.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/balancer/channel/pchannel.go)
</details>

# 执行节点详解

## 概述

执行节点（Execution Node）是 Milvus 分布式向量数据库系统中负责数据写入、查询执行和消息队列处理的核心组件。Milvus 的执行节点架构采用分层设计，主要包括 **DataNode**（数据节点）和 **QueryNode**（查询节点）两大类，它们通过消息队列进行解耦通信，实现高并发、高可用的数据处理能力。

根据 Milvus 的架构设计，执行节点的主要职责包括：

- 接收并处理来自代理（Proxy）的数据写入请求
- 管理分片（Segment）的生命周期
- 执行向量相似度搜索和标量查询
- 处理消息队列中的数据消费和确认

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50]()

## 核心架构

### 消息队列与执行节点的关系

Milvus 采用发布-订阅模式进行节点间通信。执行节点通过消息队列（如 RocksMQ、Pulsar、Kafka）实现数据的异步处理和系统组件的解耦。

```mermaid
graph TD
    A[Proxy 代理节点] -->|写入请求| B[消息队列]
    B -->|消费消息| C[DataNode 数据节点]
    B -->|消费消息| D[QueryNode 查询节点]
    C -->|数据写入| E[对象存储]
    D -->|索引构建| F[索引缓存]
    E -->|数据加载| D
```

### RocksMQ 执行节点实现

RocksMQ 是 Milvus 默认的消息队列实现，基于 RocksDB 数据库构建。执行节点通过 RocksMQ 实现高效的消息持久化和消费。

#### Topic 管理

Topic 是 RocksMQ 中的核心概念，每个 Collection 对应一个独立的 Topic。执行节点通过 Topic 进行消息的发布和订阅。

```go
// Topic 创建逻辑
func (rmq *rocksmq) CreateTopic(topicName string) error {
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 检查 topicName 是否包含 "/" 字符
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // topicIDKey 是 Topic 的唯一标识
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    if err != nil {
        return err
    }
    if val != "" {
        return nil  // Topic 已存在，直接返回
    }
    
    // 初始化互斥锁用于并发控制
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    return nil
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:95-130]()

#### 消费者组管理

消费者组（Consumer Group）是执行节点处理消息的基本单位。每个消费者组维护独立的消费进度，实现消息的负载均衡和故障恢复。

```go
type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> Consumer 映射
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return  // 消费者已存在
    }
    l.consumers[consumer.GroupName] = consumer
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60]()

## 数据分页机制

### 分页存储策略

RocksMQ 采用分页机制管理消息，每个分页（Page）包含多条消息。当分页大小达到配置阈值时，系统会创建新的分页继续存储。

```go
// 消息分页处理逻辑
for _, id := range msgIDs {
    msgSize := msgSizes[id]
    if curMsgSize + msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
        // 当前分页已满，创建新分页
        newPageSize := curMsgSize + msgSize
        pageEndID := id
        
        // 更新分页消息大小元数据
        pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
        
        // 更新分页时间戳元数据
        pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageTsKey] = nowTs
        
        curMsgSize = 0
    } else {
        curMsgSize += msgSize
    }
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-210]()

### 分页元数据结构

| 元数据类型 | Key 格式 | 说明 |
|-----------|---------|------|
| PageMsgSizeTitle | `{prefix}/{pageEndID}` | 记录每个分页的消息总大小 |
| PageTsTitle | `{prefix}/{pageEndID}` | 记录每个分页的创建时间戳 |
| AckedTsTitle | `{prefix}/{pageEndID}` | 记录已确认消息的时间戳 |

## 保留策略与数据清理

### 保留策略配置

RocksMQ 支持基于大小和时间两种保留策略，用于自动清理过期数据。

```go
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
```

配置参数说明：

| 参数 | 默认值 | 说明 |
|------|--------|------|
| `RetentionSizeInMB` | -1（禁用） | 保留消息总大小阈值（MB） |
| `RetentionTimeInMinutes` | -1（禁用） | 保留消息时间阈值（分钟） |
| `PageSize` | 10MB | 单个分页的最大大小 |

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:25-30]()

### 过期数据清理流程

```mermaid
graph TD
    A[启动保留检查] --> B{检查大小保留策略}
    B -->|已启用| C[计算已确认消息总大小]
    B -->|未启用| D{检查时间保留策略}
    C --> E{超过阈值?}
    E -->|是| F[标记过期分页]
    E -->|否| G[结束检查]
    D -->|已启用| H[遍历分页时间戳]
    H --> I{消息时间过期?}
    I -->|是| F
    I -->|否| G
    F --> J[删除过期分页]
    J --> K[更新元数据]
```

### 分页删除实现

```go
// DeleteMessages 按 ID 范围删除消息
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    
    // 删除 [startID, endID) 范围内的所有消息
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    
    err := db.Write(opts, writeBatch)
    return err
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:50-70]()

## Topic 生命周期管理

### Topic 销毁流程

当删除 Collection 时，系统需要清理对应的 Topic 及其所有关联数据。

```go
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    
    lock := ll.(*sync.Mutex)
    lock.Lock()
    defer lock.Unlock()
    
    // 清理消费者组信息
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 清理消息数据
    fixTopicName := topicName + "/"
    rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 清理分页大小信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    
    // 清理分页时间戳信息
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 清理已确认时间戳信息
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
    
    // 清理保留信息
    topicMu.Delete(topicName)
    rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
    
    return nil
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:150-190]()

## 消费者组状态管理

### 消费者组存在性检查

```go
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}
```

### 消息消费确认流程

```mermaid
sequenceDiagram
    participant CN as Consumer 消费者
    participant RMQ as RocksMQ
    participant KV as RocksDB KV Store
    
    CN->>RMQ: Consume(topicName, groupName, num)
    RMQ->>KV: Load current position
    KV-->>RMQ: CurrentID
    RMQ->>KV: Seek to currentID
    KV-->>RMQ: Messages
    RMQ-->>CN: ConsumerMessages
    
    CN->>RMQ: Ack(messageID)
    RMQ->>KV: UpdateAckedInfo
    RMQ->>KV: Update page metadata
```

## 执行节点配置

### 关键配置参数

| 配置项 | 默认值 | 描述 | 影响 |
|--------|--------|------|------|
| `PageSize` | 10MB | 分页存储大小 | 影响内存使用和 I/O 效率 |
| `RetentionSizeInMB` | -1 | 保留大小阈值 | -1 表示禁用 |
| `RetentionTimeInMinutes` | -1 | 保留时间阈值 | -1 表示禁用 |
| `TickerTimeInSeconds` | 1 | 清理任务执行间隔 | 影响资源回收及时性 |

### 压缩类型配置

RocksMQ 支持对不同类型的数据进行压缩：

```go
// 压缩类型配置格式: "compressionType1,compressionType2,..."
// 其中每个值代表对应数据块的压缩算法
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:200-220]()

## 错误处理与故障恢复

### 常见错误场景

| 错误类型 | 错误码 | 处理方式 |
|---------|--------|---------|
| Topic 不存在 | `ErrMqTopicNotFound` | 检查 Topic 是否已创建 |
| 消费者组不存在 | `ErrMqConsumerGroupNotFound` | 先创建消费者组 |
| RocksMQ 已关闭 | `RmqNotServingErrMsg` | 重启服务或等待恢复 |
| 无效的分页 ID | `invalid page id` | 检查 Key 格式是否正确 |

### 错误恢复策略

```go
// Topic 创建时的不允许字符检查
if strings.Contains(topicName, "/") {
    log.Warn("rocksmq failed to create topic for topic name contains \"/\"", 
        zap.String("topic", topicName))
    return retry.Unrecoverable(
        fmt.Errorf("topic name = %s contains \"/\"", topicName))
}
```

## 性能优化建议

### 分页大小调优

- **小分页**（<5MB）：适合高频写入场景，减少内存占用
- **中等分页**（5-15MB）：平衡 I/O 效率和内存使用
- **大分页**（>15MB）：适合批量写入，减少分页数量

### 保留策略优化

1. 根据磁盘容量设置 `RetentionSizeInMB`
2. 根据业务需求设置 `RetentionTimeInMinutes`
3. 建议同时启用两个策略以确保数据安全

## 相关资源

- 社区 Issue：[#9685 Backup and restore](https://github.com/milvus-io/milvus/issues/9685) - 关于数据备份功能的需求讨论
- 部署指南：[Milvus 独立部署崩溃问题排查](https://github.com/milvus-io/milvus/issues/28583)

## 总结

执行节点是 Milvus 架构中的核心处理单元，通过消息队列（如 RocksMQ）与代理节点解耦，实现高并发的数据写入和查询处理。理解执行节点的工作原理对于优化 Milvus 性能、排查问题至关重要。主要关注点包括：

1. **Topic 管理**：确保 Topic 命名规范，避免特殊字符
2. **消费者组管理**：正确管理消费进度，支持故障恢复
3. **保留策略**：合理配置数据清理策略，控制存储成本
4. **分页机制**：优化分页大小，提升 I/O 效率

---

<a id='client-sdk'></a>

## Go SDK (client/v2) 使用指南

### 相关页面

相关主题：[Milvus 概述](#overview), [集合与 Schema 设计](#schema-design)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [client/milvusclient/client.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/client.go)
- [client/milvusclient/collection.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/collection.go)
- [client/milvusclient/write.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/write.go)
- [client/milvusclient/read.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/read.go)
- [client/milvusclient/rbac.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/rbac.go)
- [client/milvusclient/replicate.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/replicate.go)
</details>

# Go SDK (client/v2) 使用指南

## 概述

Go SDK (client/v2) 是 Milvus 向量数据库的官方 Go 语言客户端，提供了一套完整的 API 用于与 Milvus 服务器进行交互。该 SDK 支持向量存储、相似度搜索、标量字段管理、集合操作、权限控制以及数据复制等核心功能。

当前版本：**2.6.5**

主要特性包括：

- 支持 dense、binary、sparse、int8 类型的可空（nullable）向量列
- 结构化数组（struct-array）的向量子字段支持
- EmbeddingList/MAX_SIM 搜索功能
- Array 字段的部分更新（ARRAY_APPEND、ARRAY_REMOVE）
- 自定义 gRPC DialOptions 保留默认连接配置

## 核心组件架构

```mermaid
graph TD
    A[Client] --> B[Collection Operations]
    A --> C[Database Operations]
    A --> D[RBAC Operations]
    A --> E[Replication Operations]
    
    B --> B1[Create/Drop Collection]
    B --> B2[Schema Management]
    B --> B3[Index Operations]
    
    C --> C1[Collection Management]
    C --> C2[Data Query]
    
    D --> D1[User Management]
    D --> D2[Role Management]
    D --> D3[Privilege Management]
    
    E --> E1[Replication Config]
    E --> E2[Truncate Collection]
```

## 客户端初始化

### 基本连接

使用 `New` 函数创建客户端连接：

```go
import "github.com/milvus-io/milvus/client/v2/milvusclient"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "YOUR_MILVUS_ENDPOINT"

cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
})
if err != nil {
    // 处理错误
}
defer cli.Close(ctx)
```

### 客户端配置选项

| 配置项 | 类型 | 描述 |
|--------|------|------|
| Address | string | Milvus 服务器地址（必需） |
| DialOptions | []grpc.DialOption | 自定义 gRPC 连接选项 |
| Username | string | 认证用户名 |
| Password | string | 认证密码 |

**注意**：自定义 DialOptions 不会覆盖 SDK 的默认 keepalive、backoff 和 max receive message-size 设置。

资料来源：[client/README.md](https://github.com/milvus-io/milvus/blob/main/client/README.md)

## 集合操作（Collection Operations）

### 创建集合

使用 `CreateCollection` 方法创建集合：

```go
// 定义集合 Schema
fields := []*schema.Field{
    {
        Name:       "id",
        DataType:   schemapb.DataType_Int64,
        IsPrimary:  true,
    },
    {
        Name:       "vector",
        DataType:   schemapb.DataType_FloatVector,
        ElementType: schemapb.DataType_Float,
        TypeParams: map[string]string{
            "dim": "128",
        },
    },
}

err = cli.CreateCollection(ctx, &milvusclient.CreateCollectionOption{
    Collection:  "my_collection",
    Schema:      schema.NewSchema().WithFields(fields),
})
```

### 向集合添加字段

使用 `AddCollectionField` 方法向现有集合添加字段：

```go
err = cli.AddCollectionField(ctx, "my_collection", 
    milvusclient.NewAddCollectionFieldOption("new_field", schemapb.DataType_VarChar).
        WithMaxLength(256).
        WithElementType(schemapb.DataType_VarChar))
```

**重要**：新增的向量字段必须是 nullable 的，否则请求会被 SDK 验证拒绝。

### 截断集合数据

`TruncateCollection` API 可以快速清除集合中的所有数据，而无需删除和重建集合：

```go
err = cli.TruncateCollection(ctx, NewTruncateCollectionOption("my_collection"))
```

资料来源：[client/milvusclient/collection.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/collection.go)

## 数据写入操作

### 插入数据

```go
// 准备数据
ids := []int64{1, 2, 3, 4, 5}
vectors := [][]float32{
    {0.1, 0.2, /* ... 128 维 */},
    {0.1, 0.2, /* ... */},
    // ...
}

result, err := cli.Insert(ctx, &milvusclient.InsertOption{
    Collection: "my_collection",
    Partition:  "my_partition",  // 可选
    FieldsData: map[string]interface{}{
        "id":     ids,
        "vector": vectors,
    },
})
```

### Upsert 操作

支持 Array 字段的部分更新：

```go
// ARRAY_APPEND 操作
result, err := cli.Upsert(ctx, &milvusclient.UpsertOption{
    Collection: "my_collection",
    FieldsData: fieldsData,
}, milvusclient.WithArrayAppend("array_field", "new_element"))

// ARRAY_REMOVE 操作  
result, err := cli.Upsert(ctx, &milvusclient.UpsertOption{
    Collection: "my_collection",
    FieldsData: fieldsData,
}, milvusclient.WithArrayRemove("array_field", "element_to_remove"))
```

资料来源：[client/milvusclient/write.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/write.go)

## 数据查询与搜索

### 向量相似度搜索

```go
// 定义搜索向量
searchVectors := [][]float32{
    {0.1, 0.2, /* ... 128 维 */},
}

// 执行搜索
results, err := cli.Search(ctx, &milvusclient.SearchOption{
    Collection:    "my_collection",
    VecFieldName:  "vector",
    QueryVector:   searchVectors,
    Limit:         10,
    Offset:        0,
    Filter:        "id > 100",  // 可选：标量过滤
    OutputFields:  []string{"id", "vector"},
})
```

### MAX_SIM 搜索

支持基于 EmbeddingList 的最大相似度搜索：

```go
results, err := cli.Search(ctx, &milvusclient.SearchOption{
    Collection:    "my_collection",
    VecFieldName:  "embedding_field",
    QueryVector:   queryEmbedding,
    SearchParams: map[string]interface{}{
        "metric_type": "IP",  // 内积
    },
})
```

### 查询标量字段

```go
results, err := cli.Query(ctx, &milvusclient.QueryOption{
    Collection: "my_collection",
    Filter:    "id >= 1 AND id <= 100",
    OutputFields: []string{"id", "scalar_field"},
    Limit:    100,
})
```

资料来源：[client/milvusclient/read.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/read.go)

## 权限控制（RBAC）

### 用户管理

```go
// 创建用户
err = cli.CreateUser(ctx, &milvusclient.CreateUserOption{
    Username: "new_user",
    Password: "password",
})

// 更新密码
err = cli.UpdatePassword(ctx, &milvusclient.UpdatePasswordOption{
    Username:    "existing_user",
    OldPassword: "old_pass",
    NewPassword: "new_pass",
})

// 删除用户
err = cli.DropUser(ctx, &milvusclient.DropUserOption{
    Username: "user_to_delete",
})
```

### 角色管理

```go
// 创建角色
err = cli.CreateRole(ctx, &milvusclient.CreateRoleOption{
    RoleName: "admin_role",
})

// 授予角色权限
err = cli.GrantRole(ctx, &milvusclient.GrantRoleOption{
    RoleName:     "admin_role",
    ObjectType:   "Collection",
    ObjectName:   "*",
    Privilege:    "Admin",
})

// 添加用户到角色
err = cli.AddUserToRole(ctx, &milvusclient.AddUserToRoleOption{
    Username: "new_user",
    RoleName: "admin_role",
})
```

### 权限检查

```go
// 检查权限
granted, err := cli.CheckRole_granted(ctx, &milvusclient.CheckRoleOption{
    RoleName: "admin_role",
})
```

资料来源：[client/milvusclient/rbac.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/rbac.go)

## 数据复制功能

### 获取复制配置

```go
config, err := cli.GetReplicateConfiguration(ctx, &milvusclient.GetReplicateConfigurationOption{
    CollectionName: "my_collection",
})
```

### 复制状态管理

SDK 提供了完整的复制配置接口，支持跨集群数据同步场景。

资料来源：[client/milvusclient/replicate.go](https://github.com/milvus-io/milvus/blob/main/client/milvusclient/replicate.go)

## 向量类型支持

| 向量类型 | 描述 | Nullable 支持 | 索引类型 |
|---------|------|--------------|---------|
| FloatVector | 浮点向量 | ✅ | IVF, HNSW, DISKANN |
| BinaryVector | 二进制向量 | ✅ | BIN_IVF |
| SparseVector | 稀疏向量 | ✅ | SPARSE_INVERTED_INDEX |
| Int8Vector | Int8 向量 | ✅ | IVF_FLAT |

## 常见问题与社区讨论

### 社区热点问题

根据社区反馈，以下功能需求受到较多关注：

| Issue | 描述 | 状态 |
|-------|------|------|
| #4430 | 字符串类型字段支持 | 功能请求中 |
| #20405 | 集合创建后修改 Schema | 功能请求中 |
| #1924 | 字符串 ID 支持 | 功能请求中 |
| #9685 | 备份与恢复功能 | 功能请求中 |

### 使用注意事项

1. **RocksMQ 独立部署崩溃**：如果使用 RocksMQ 作为消息队列，在独立部署模式下可能遇到崩溃问题，建议生产环境使用 Kafka 或 Pulsar。

2. **Schema 修改限制**：当前 Milvus 不支持直接修改已创建且非空集合的 Schema。

3. **版本兼容性**：确保 Go SDK 版本与 Milvus 服务器版本兼容：

| Milvus 版本 | Go SDK 版本 |
|------------|------------|
| 2.6.17 | 2.6.4 |
| 2.6.15 | 2.6.3 |
| 2.6.13 | 2.6.1 |
| 2.5.27 | 2.5.14 |

## 最佳实践

### 连接管理

```go
// 建议使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// 确保客户端资源释放
cli, err := milvusclient.New(ctx, config)
if err != nil {
    return err
}
defer cli.Close(ctx)
```

### 批量操作

```go
// 使用批量插入提高性能
const batchSize = 1000
for i := 0; i < len(data); i += batchSize {
    end := i + batchSize
    if end > len(data) {
        end = len(data)
    }
    _, err = cli.Insert(ctx, &milvusclient.InsertOption{
        Collection: "my_collection",
        FieldsData: data[i:end],
    })
    if err != nil {
        return err
    }
}
```

### 错误处理

```go
result, err := cli.Search(ctx, options)
if err != nil {
    // 检查错误类型
    if status, ok := status.FromError(err); ok {
        switch status.Code() {
        case codes.NotFound:
            // 处理资源不存在
        case codes.Unavailable:
            // 处理服务不可用
        }
    }
    return err
}
```

## 相关资源

- [Milvus 官方文档](https://milvus.io/docs)
- [Go SDK GitHub 仓库](https://github.com/milvus-io/milvus/tree/main/client)
- [Release Notes - client/v2.6.5](https://github.com/milvus-io/milvus/releases/tag/client/v2.6.5)
- [版本兼容性说明](https://github.com/milvus-io/milvus/releases)

---

<a id='deployment'></a>

## 部署指南

### 相关页面

相关主题：[Milvus 概述](#overview), [监控与运维](#monitoring)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [cmd/milvus/milvus.go](https://github.com/milvus-io/milvus/blob/main/cmd/milvus/milvus.go)
- [cmd/components/mix_coord.go](https://github.com/milvus-io/milvus/blob/main/cmd/components/mix_coord.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [internal/streamingcoord/server/balancer/channel/pchannel.go](https://github.com/milvus-io/milvus/blob/main/internal/streamingcoord/server/balancer/channel/pchannel.go)
</details>

# 部署指南

本页面介绍 Milvus 向量数据库的部署方式、架构组成以及配置方法。Milvus 支持单机部署（Standalone）和分布式集群部署（Cluster）两种模式，用户可根据业务规模选择合适的部署方案。

## 部署模式概述

Milvus 提供两种核心部署模式，分别适用于不同的使用场景。

### 单机部署（Standalone）

单机部署将所有 Milvus 组件运行在单一节点上，适用于开发、测试以及对高可用性要求较低的场景。单机部署使用 Docker Compose 方式启动，依赖 etcd 存储元数据、MinIO 存储向量数据，以及 RocksMQ 作为消息队列。

### 集群部署（Cluster）

集群部署采用分布式架构，将各组件分离部署以实现水平扩展和高可用性。集群模式支持 Kubernetes 环境，通过微服务化设计将协调节点（Coordinator）、查询节点（Query Node）、数据节点（Data Node）等组件独立部署。

## Milvus 组件架构

Milvus 采用分层架构设计，各组件协同工作完成向量数据的存储、索引和检索。

```mermaid
graph TB
    subgraph 协调层["协调层 Coordinator"]
        RC[Root Coord]
        QC[Query Coord]
        DC[Data Coord]
        IC[Index Coord]
        HC[Proxy/访问层]
    end
    
    subgraph 执行层["执行层 Workers"]
        QN[Query Node]
        DN[Data Node]
        IN[Index Node]
        SN[Data Node]
    end
    
    subgraph 存储层["存储层 Storage"]
        ET[etcd<br/>元数据]
        MS[MinIO/S3<br/>对象存储]
        MQ[Kafka/Pulsar<br/>消息队列]
    end
    
    HC --> RC
    HC --> QC
    HC --> DC
    QC --> QN
    DC --> DN
    IC --> IN
    RC --> ET
    QC --> ET
    DC --> ET
    RC --> MQ
    QN --> MS
    DN --> MS
    IN --> MS
```

### 主要组件说明

| 组件名称 | 功能描述 |
|---------|---------|
| Proxy | 负责接收客户端请求，进行请求验证和路由 |
| Root Coord | 管理整个 Milvus 系统的元数据和协调操作 |
| Query Coord | 管理查询节点，负责搜索和查询调度 |
| Data Coord | 管理数据节点，负责数据写入和存储 |
| Index Coord | 管理索引构建任务 |
| Query Node | 执行向量相似度搜索 |
| Data Node | 处理数据写入和持久化 |
| Index Node | 构建向量索引以加速检索 |

## 单机部署指南

### 环境要求

| 资源类型 | 最低要求 | 推荐配置 |
|---------|---------|---------|
| CPU | 2 核 | 8 核以上 |
| 内存 | 4 GB | 16 GB 以上 |
| 磁盘 | 50 GB | 100 GB SSD |
| 操作系统 | Ubuntu 18.04+ / CentOS 7+ | Ubuntu 20.04+ |

### Docker Compose 部署步骤

1. **下载 docker-compose 配置文件**

```bash
wget https://raw.githubusercontent.com/milvus-io/milvus/master/deployments/docker/standalone/docker-compose.yml
```

2. **启动服务**

```bash
docker-compose up -d
```

3. **验证服务状态**

```bash
docker-compose ps
```

4. **查看日志**

```bash
docker-compose logs -f milvus
```

### 单机部署配置参数

单机部署的配置文件位于 `configs/milvus.yaml`，主要配置项包括：

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `etcd.endpoints` | `localhost:2379` | etcd 服务地址 |
| `minio.address` | `localhost:9000` | MinIO 服务地址 |
| `rocksmq.retentionTimeInMinutes` | `4320` | RocksMQ 消息保留时间（分钟） |
| `rocksmq.retentionSizeInMB` | `-1` | RocksMQ 消息保留大小（MB），-1 表示不限制 |
| `common.retentionDuration` | `432000` | 数据保留时长（秒） |

## 集群部署指南

### Kubernetes 部署架构

```mermaid
graph LR
    Client[客户端] --> LB[Load Balancer]
    LB --> Proxy[Proxy Pods]
    Proxy --> RC[Root Coord]
    Proxy --> QC[Query Coord]
    Proxy --> DC[Data Coord]
    Proxy --> IC[Index Coord]
    QC --> QN1[Query Node 1]
    QC --> QN2[Query Node 2]
    DC --> DN1[Data Node 1]
    DC --> DN2[Data Node 2]
    IC --> IN1[Index Node 1]
    IC --> IN2[Index Node 2]
```

### 部署前准备

| 组件 | 版本要求 | 用途 |
|-----|---------|------|
| Kubernetes | 1.18+ | 容器编排平台 |
| Helm | 3.0+ | 包管理工具 |
| etcd | 3.5+ | 元数据存储 |
| MinIO | RELEASE.2022+ | 对象存储 |

### Helm Chart 部署步骤

1. **添加 Helm 仓库**

```bash
helm repo add milvus https://milvus-io.github.io/milvus-helm
helm repo update
```

2. **配置 values.yaml**

```yaml
cluster:
  enabled: true

etcd:
  enabled: true

minio:
  enabled: true

pulsar:
  enabled: false

kafka:
  enabled: false

proxy:
  serviceType: LoadBalancer
```

3. **执行部署**

```bash
helm install my-milvus milvus/milvus -n milvus --create-namespace -f values.yaml
```

4. **验证部署状态**

```bash
kubectl get pods -n milvus
kubectl get svc -n milvus
```

## 消息队列配置

Milvus 支持多种消息队列作为日志存储后端，默认使用 RocksMQ，在分布式环境中推荐使用 Kafka 或 Pulsar。

### RocksMQ 配置

RocksMQ 是单机部署默认的消息队列实现，配置参数位于 `milvus.yaml` 中的 `rocksmq` 段落：

```yaml
rocksmq:
  # RocksMQ 消息保留时间（分钟）
  retentionTimeInMinutes: 4320
  
  # RocksMQ 消息保留大小（MB），-1 表示不限制
  retentionSizeInMB: -1
  
  # RocksMQ 页面大小配置
  pageSize: 1048576
  
  # 压缩类型配置
  compressionTypes: 0,0,0,0,0
```

### 消息保留机制

RocksMQ 的保留机制通过 `rocksmq_retention.go` 实现，支持两种清理策略：

```mermaid
graph TD
    A[启动保留检查] --> B{检查保留配置}
    B -->|时间策略| C[按时间过期清理]
    B -->|大小策略| D[按大小限制清理]
    C --> E[删除已确认消息]
    D --> E
    E --> F[更新页面信息]
```

清理流程说明：

- **时间过期检查**：遍历已确认的消息页面，检查其确认时间是否超过 `retentionTimeInMinutes` 配置
- **大小过期检查**：统计已确认消息的总大小，当超过 `retentionSizeInMB` 时触发清理
- **原子操作**：使用 RocksDB 的批量删除接口确保数据一致性

## 依赖服务配置

### etcd 元数据存储

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `etcd.endpoints` | `localhost:2379` | etcd 集群地址 |
| `etcd.rootPath` | `by-dev` | Milvus 根路径 |
| `etcd.metaSubPath` | `meta` | 元数据子路径 |

### MinIO 对象存储

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `minio.address` | `localhost:9000` | MinIO 服务地址 |
| `minio.accessKeyID` | `minioadmin` | 访问密钥 |
| `minio.secretAccessKey` | `minioadmin` | 访问密钥 |
| `minio.bucketName` | `milvus-bucket` | 存储桶名称 |
| `minio.useSSL` | `false` | 是否使用 SSL |

## 高可用性配置

### 多副本部署

为保证服务高可用性，可在 values.yaml 中配置各组件的副本数：

```yaml
queryNode:
  replicas: 3

dataNode:
  replicas: 3

indexNode:
  replicas: 2
```

### 健康检查配置

| 检查项 | 间隔时间 | 超时时间 | 重试次数 |
|-------|---------|---------|---------|
| 存活探针 | 10s | 5s | 3 |
| 就绪探针 | 10s | 5s | 3 |

## 安全配置

### 认证与授权

Milvus 支持基于角色的访问控制（RBAC），配置项包括：

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `common.security.authorizationEnabled` | `false` | 是否启用授权 |
| `common.security.userName` | `minioadmin` | 默认用户名 |
| `common.security.password` | `minioadmin` | 默认密码 |

### TLS 加密传输

```yaml
proxy:
  http:
    enabled: true
    port: 9091
  grpc:
    port: 19530
    tls:
      enabled: true
      certPath: /path/to/cert.pem
      keyPath: /path/to/key.pem
```

## 性能调优

### 内存配置

| 配置项 | 说明 |
|-------|------|
| `queryNode.cache.memoryLimit` | 查询节点缓存内存上限 |
| `dataNode.memory.memoryLimit` | 数据节点内存上限 |

### 索引构建配置

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `indexCoord.buildIndexConcurrency` | `1` | 索引构建并发数 |
| `indexCoord.minSegmentSizeToMerge` | `512` | 合并最小段大小 |

## 监控与日志

### Prometheus 监控

Milvus 集成 Prometheus 指标导出，配置项位于 `milvus.yaml`：

```yaml
metrics:
  enabled: true
  port: 9091
  path: /metrics
```

### 日志配置

| 配置项 | 默认值 | 说明 |
|-------|-------|------|
| `log.level` | `info` | 日志级别 |
| `log.format` | `text` | 日志格式 |
| `log.file.rotation.maxSize` | `300` | 单文件最大大小（MB） |
| `log.file.rotation.maxAge` | `10` | 文件保留天数 |

## 常见部署问题排查

### 服务启动失败

1. 检查依赖服务（etcd、MinIO）是否正常运行
2. 确认端口未被占用：`netstat -tlnp | grep <port>`
3. 查看容器日志：`docker-compose logs -f milvus`

### 连接超时

| 可能原因 | 解决方案 |
|---------|---------|
| 防火墙阻止 | 开放必要端口 |
| 服务未就绪 | 增加就绪探针等待时间 |
| 网络配置错误 | 检查 Docker 网络配置 |

### 存储空间不足

```bash
# 清理未使用的 Docker 资源
docker system prune -a

# 扩展 MinIO 存储卷
kubectl patch pvc data-my-minio-0 -p '{"spec":{"resources":{"requests":{"storage":"500Gi"}}}}'
```

## 相关资源

- 官方部署文档：https://milvus.io/docs/v2.0.x/install_standalone-docker.md
- Helm Chart 仓库：https://artifacthub.io/packages/helm/milvus/milvus
- 最新版本：v2.6.x（持续更新中）

---

<a id='monitoring'></a>

## 监控与运维

### 相关页面

相关主题：[部署指南](#deployment), [执行节点详解](#execution-nodes)

<details>
<summary>相关源码文件</summary>

以下源码文件用于生成本页说明：

- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go)
- [pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go](https://github.com/milvus-io/milvus/blob/main/pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go)
- [tools/README.md](https://github.com/milvus-io/milvus/blob/main/tools/README.md)
</details>

# 监控与运维

## 概述

Milvus 的监控与运维体系涵盖消息队列状态管理、数据保留机制、消费者组监控以及系统健康检查等多个维度。作为云原生向量数据库，Milvus 提供了完善的运维工具和机制，确保系统在高并发场景下的稳定运行。

RocksMQ 作为 Milvus 默认的消息队列实现，承担着数据传输和异步处理的关键职责。其监控与运维主要围绕以下几个方面展开：

- **主题（Topic）生命周期管理**：创建、销毁、状态追踪
- **消费者组（Consumer Group）管理**：注册、注销、消费位点追踪
- **数据保留策略**：基于时间和基于大小的双重保留机制
- **消息分页管理**：分页存储与清理机制

## 核心架构

### RocksMQ 在 Milvus 中的定位

```mermaid
graph TB
    subgraph "Milvus 架构"
        A[客户端请求] --> B[Proxy 组件]
        B --> C[RocksMQ 消息队列]
        C --> D[QueryNode 消费]
        C --> E[DataNode 消费]
        C --> F[其他消费者]
    end
    
    subgraph "RocksMQ 内部"
        G[Topic 管理器] --> H[分页存储]
        G --> I[消费者列表]
        I --> J[位点追踪]
        H --> K[保留策略]
        K --> L{保留类型}
        L --> M[时间保留]
        L --> N[大小保留]
    end
```

### 关键组件关系

RocksMQ 的核心数据结构包括：

| 组件 | 类型 | 作用 |
|------|------|------|
| `topicMu` | `sync.Map` | 主题级别互斥锁映射 |
| `consumerList` | `struct` | 消费者列表封装 |
| `rocksmq` | `struct` | 主消息队列实例 |
| `retentionInfo` | `struct` | 保留策略执行器 |

## 主题管理

### 创建主题

主题创建时执行以下校验和初始化操作：

```go
func (rmq *rocksmq) CreateTopic(topicName string) error {
    // 1. 检查关闭状态
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 2. 校验主题名不包含 "/"
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // 3. 检查主题是否已存在
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    
    // 4. 初始化主题锁和消费者映射
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    rmq.consumers.LoadOrStore(topicName, &consumerList{consumers: make(map[string]*Consumer)})
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:146-175]()

### 销毁主题

销毁操作会清理所有相关数据：

```go
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    // 1. 获取主题互斥锁
    ll, ok := topicMu.Load(topicName)
    lock, ok := ll.(*sync.Mutex)
    lock.Lock()
    defer lock.Unlock()
    
    // 2. 清理消费者映射
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 3. 删除主题消息数据
    fixTopicName := topicName + "/"
    err := rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 4. 清理分页大小和时间戳信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 5. 清理确认时间戳和主题信息
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:185-240]()

## 消费者组管理

### 消费者注册与追踪

```go
type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return  // 已存在则跳过
    }
    l.consumers[consumer.GroupName] = consumer
}

func (l *consumerList) Get(groupName string) *Consumer {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if consumer, ok := l.consumers[groupName]; ok {
        return consumer
    }
    return nil
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:62-87]()

### 消费者组存在性检查

```go
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:260-270]()

## 数据保留机制

RocksMQ 实现了双重保留策略，可通过配置独立启用或同时生效：

```go
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-54]()

### 保留策略执行流程

```mermaid
graph TD
    A[开始保留检查] --> B{启用大小保留?}
    B -->|是| C[遍历已确认分页]
    B -->|否| D{启用时间保留?}
    C --> E{当前删除大小 > 总确认大小?}
    E -->|是| F[删除该分页]
    E -->|否| G[停止清理]
    F --> H[累加已删除大小]
    H --> C
    G --> I[流程结束]
    D -->|是| J[检查分页确认时间]
    J --> K{确认时间过期?}
    K -->|是| F
    K -->|否| G
    D -->|否| I
```

### 消息删除实现

```go
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    // 删除 [startID, endID) 范围内的消息
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    err := db.Write(opts, writeBatch)
    return err
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:45-60]()

### 分页过期检测

保留机制通过迭代已确认的分页来检测过期数据：

```go
// 遍历分页并检查确认时间
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
pageIter := rocksdbkv.NewRocksiteratorWithUpperBound(ri.kv.DB, 
    typeutil.AddOne(pageMsgPrefix), pageReadOpts)

for ; pageIter.Valid(); pageIter.Next() {
    pageID, err := parsePageID(string(pKey.Data()))
    ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
    ackedTsVal, err := ri.kv.Load(context.TODO(), ackedTsKey)
    
    // 未确认的分页，停止清理
    if ackedTsVal == "" {
        break
    }
    
    if msgTimeExpiredCheck(ackedTs) {
        // 确认时间过期，执行清理
        pageEndID = pageID
        deletedAckedSize += size
        pageCleaned++
    }
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:120-180]()

## 分页管理机制

### 分页存储结构

| Key 格式 | 用途 |
|----------|------|
| `{PageMsgSizeTitle}{topic}/{pageEndID}` | 记录分页消息总大小 |
| `{PageTsTitle}{topic}/{pageEndID}` | 记录分页创建时间戳 |
| `{AckedTsTitle}{topic}/{pageEndID}` | 记录分页确认时间戳 |

### 分页大小更新

```go
func (rmq *rocksmq) updatePageMsgSize(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
    curMsgSize := 0
    nowTs := strconv.FormatInt(time.Now().Unix(), 10)
    mutateBuffer := make(map[string]string)
    
    for _, id := range msgIDs {
        msgSize := msgSizes[id]
        if curMsgSize + msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
            // 当前分页已满，创建新分页
            pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(id, 10)
            mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(curMsgSize + msgSize, 10)
            pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(id, 10)
            mutateBuffer[pageTsKey] = nowTs
            curMsgSize = 0
        } else {
            curMsgSize += msgSize
        }
    }
    return rmq.kv.MultiSave(context.TODO(), mutateBuffer)
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:310-340]()

## 配置参数

### RocksMQ 相关配置项

| 参数名 | 作用 | 默认值 |
|--------|------|--------|
| `rocksmq.compressionTypes` | 消息压缩类型 | `0,0,0,0,0` |
| `rocksmq.retentionSizeInMB` | 大小保留阈值（MB），-1 表示禁用 | `-1` |
| `rocksmq.retentionTimeInMinutes` | 时间保留阈值（分钟），-1 表示禁用 | `-1` |
| `rocksmq.pageSize` | 单个分页的最大消息大小 | - |
| `rocksmq.tickerTimeInSeconds` | 保留检查间隔（秒） | `1` |

### 配置校验

```go
func TestRocksmq_ParseCompressionTypeError(t *testing.T) {
    params := paramtable.Get()
    params.Save(params.RocksmqCfg.CompressionTypes.Key, "invalid,1")
    _, err := parseCompressionType(params)
    assert.Error(t, err)  // 无效值应返回错误
    
    params.Save(params.RocksmqCfg.CompressionTypes.Key, "-1,-1")
    _, err = parseCompressionType(params)
    assert.Error(t, err)  // 负数应返回错误
}
```

资料来源：[pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:350-365]()

## 运维最佳实践

### 主题命名规范

- **禁止**在主题名中使用 `/` 字符
- 主题名应具有业务语义，便于问题排查
- 生产环境建议使用统一的主题命名规范

### 消费者组管理

1. **正确注销**：消费组不再使用时，应调用 `DestroyConsumerGroup` 释放资源
2. **位点管理**：定期检查消费者位点，避免消息积压
3. **异常恢复**：使用 `ForceSeek` 定位到指定消息 ID 进行消费位点重置

```go
// 强制跳转消费位点示例
err = rmq.ForceSeek(topicName, groupName, ids[0])
```

### 保留策略调优

```mermaid
graph LR
    A[数据量增长快] --> B[启用大小保留]
    A --> C[减小 PageSize]
    D[数据访问周期固定] --> E[启用时间保留]
    D --> F[调整 RetentionTime]
    G[两者结合] --> H[双重保护机制]
```

### 常见问题排查

| 症状 | 可能原因 | 排查方法 |
|------|----------|----------|
| 消息未清理 | 保留策略未启用 | 检查 `RetentionSizeInMB` 和 `RetentionTimeInMinutes` 配置 |
| 消费者无法消费 | 消费者组未注册 | 调用 `ExistConsumerGroup` 确认 |
| 主题创建失败 | 主题名包含非法字符 | 检查名称是否包含 `/` |
| 内存占用高 | 分页未及时清理 | 检查保留机制是否正常运行 |

## 社区关注的问题

根据社区反馈，以下运维相关问题受到较多关注：

- **#28583** - Milvus standalone 崩溃问题（涉及 RocksMQ 相关错误）
- **#9685** - 备份与恢复功能请求
- 独立部署模式下 RocksMQ 的稳定性问题

## 相关文档

- [Milvus 部署文档](https://github.com/milvus-io/milvus/tree/main/deployments)
- [监控系统配置](https://github.com/milvus-io/milvus/tree/main/deployments/monitor)
- [Grafana 监控面板](https://github.com/milvus-io/milvus/blob/main/deployments/monitor/grafana/milvus-dashboard.json)

---

<!-- evidence_pipeline_checked: true -->
<!-- evidence_injected: true -->

---

## Doramagic 踩坑日志

项目：milvus-io/milvus

摘要：发现 8 个潜在踩坑项，其中 0 个为 high/blocking；最高优先级：安装坑 - 来源证据：Milvus standalone crashed。

## 1. 安装坑 · 来源证据：Milvus standalone crashed

- 严重度：medium
- 证据强度：source_linked
- 发现：GitHub 社区证据显示该项目存在一个安装相关的待验证问题：Milvus standalone crashed
- 对用户的影响：可能阻塞安装或首次运行。
- 建议检查：来源显示可能已有修复、规避或版本变化，说明书中必须标注适用版本。
- 防护动作：不得脱离来源链接放大为确定性结论；需要标注适用版本和复核状态。
- 证据：community_evidence:github | cevd_f0bbd70e35ef4c65a374c802b7614b46 | https://github.com/milvus-io/milvus/issues/28583 | 来源讨论提到 linux 相关条件，需在安装/试用前复核。

## 2. 能力坑 · 能力判断依赖假设

- 严重度：medium
- 证据强度：source_linked
- 发现：README/documentation is current enough for a first validation pass.
- 对用户的影响：假设不成立时，用户拿不到承诺的能力。
- 建议检查：将假设转成下游验证清单。
- 防护动作：假设必须转成验证项；没有验证结果前不能写成事实。
- 证据：capability.assumptions | github_repo:208728772 | https://github.com/milvus-io/milvus | README/documentation is current enough for a first validation pass.

## 3. 运行坑 · 运行可能依赖外部服务

- 严重度：medium
- 证据强度：source_linked
- 发现：项目说明出现 external service/cloud/webhook/database 等运行依赖关键词。
- 对用户的影响：本地安装成功不等于能力可用，外部服务不可用会阻断体验。
- 建议检查：确认是否有离线 demo、mock 数据或可替代服务。
- 防护动作：外部服务依赖未明确时，不把本地安装成功等同于能力可用。
- 证据：packet_text.keyword_scan | github_repo:208728772 | https://github.com/milvus-io/milvus | matched external service / cloud / webhook / database keyword

## 4. 维护坑 · 维护活跃度未知

- 严重度：medium
- 证据强度：source_linked
- 发现：未记录 last_activity_observed。
- 对用户的影响：新项目、停更项目和活跃项目会被混在一起，推荐信任度下降。
- 建议检查：补 GitHub 最近 commit、release、issue/PR 响应信号。
- 防护动作：维护活跃度未知时，推荐强度不能标为高信任。
- 证据：evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | last_activity_observed missing

## 5. 安全/权限坑 · 下游验证发现风险项

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：下游已经要求复核，不能在页面中弱化。
- 建议检查：进入安全/权限治理复核队列。
- 防护动作：下游风险存在时必须保持 review/recommendation 降级。
- 证据：downstream_validation.risk_items | github_repo:208728772 | https://github.com/milvus-io/milvus | no_demo; severity=medium

## 6. 安全/权限坑 · 存在评分风险

- 严重度：medium
- 证据强度：source_linked
- 发现：no_demo
- 对用户的影响：风险会影响是否适合普通用户安装。
- 建议检查：把风险写入边界卡，并确认是否需要人工复核。
- 防护动作：评分风险必须进入边界卡，不能只作为内部分数。
- 证据：risks.scoring_risks | github_repo:208728772 | https://github.com/milvus-io/milvus | no_demo; severity=medium

## 7. 维护坑 · issue/PR 响应质量未知

- 严重度：low
- 证据强度：source_linked
- 发现：issue_or_pr_quality=unknown。
- 对用户的影响：用户无法判断遇到问题后是否有人维护。
- 建议检查：抽样最近 issue/PR，判断是否长期无人处理。
- 防护动作：issue/PR 响应未知时，必须提示维护风险。
- 证据：evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | issue_or_pr_quality=unknown

## 8. 维护坑 · 发布节奏不明确

- 严重度：low
- 证据强度：source_linked
- 发现：release_recency=unknown。
- 对用户的影响：安装命令和文档可能落后于代码，用户踩坑概率升高。
- 建议检查：确认最近 release/tag 和 README 安装命令是否一致。
- 防护动作：发布节奏未知或过期时，安装说明必须标注可能漂移。
- 证据：evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | release_recency=unknown

<!-- canonical_name: milvus-io/milvus; human_manual_source: deepwiki_human_wiki -->
