Doramagic 项目包 · 项目说明书

milvus 项目

生成时间:2026-05-31 03:45:33 UTC

Milvus 概述

Milvus 是一款云原生的向量数据库,专为大规模向量相似度搜索场景设计。它支持存储、索引和管理由深度学习网络产生的海量向量embedding,能够快速处理十亿级向量的相似度检索请求。Milvus 具备高可用、高性能、可扩展的特点,广泛应用于图像检索、视频分析、自然语言处理、推荐系统等领域。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 协调服务层组件

继续阅读本节完整说明和来源证据。

章节 执行层组件

继续阅读本节完整说明和来源证据。

章节 RocksMQ 架构

继续阅读本节完整说明和来源证据。

简介

Milvus 是一款云原生的向量数据库,专为大规模向量相似度搜索场景设计。它支持存储、索引和管理由深度学习网络产生的海量向量_embedding_,能够快速处理十亿级向量的相似度检索请求。Milvus 具备高可用、高性能、可扩展的特点,广泛应用于图像检索、视频分析、自然语言处理、推荐系统等领域。

核心能力

能力类别具体描述
向量存储支持 dense vector、binary vector、sparse vector、int8 vector 等多种向量类型
相似度检索提供 IP(内积)、L2(欧氏距离)、HAMMING、JACCARD 等多种距离度量
索引类型支持 FLAT、IVF_FLAT、IVF_SQ8、IVF_PQ、HNSW、DISKANN、GPU_CAGRA 等索引
标量字段支持 Int8/16/32/64、Float/Double、Bool、String、Array、JSON 等数据类型
可扩展性支持单机部署、分布式集群部署、Kubernetes 部署

系统架构

Milvus 采用分层架构设计,将系统划分为接入层、协调服务层、执行层和存储层。

graph TD
    subgraph 接入层["接入层"]
        SDK["SDK Clients<br/>Python/Go/Java/Node.js"]
        HTTP["HTTP API<br/>gRPC"]
    end
    
    subgraph 协调服务层["协调服务层"]
        Proxy["Proxy"]
        RootCoord["Root Coord"]
        IndexCoord["Index Coord"]
        QueryCoord["Query Coord"]
        DataCoord["Data Coord"]
    end
    
    subgraph 执行层["执行层"]
        QueryNode["Query Node"]
        DataNode["Data Node"]
        IndexNode["Index Node"]
    end
    
    subgraph 存储层["存储层"]
        MinIO["对象存储<br/>MinIO/S3"]
        RocksMQ["消息队列<br/>RocksMQ/Pulsar/Kafka"]
        MetaStore["元数据存储<br/>etcd"]
    end
    
    SDK --> HTTP
    HTTP --> Proxy
    Proxy --> RootCoord
    Proxy --> QueryCoord
    Proxy --> DataCoord
    RootCoord --> IndexCoord
    QueryCoord --> QueryNode
    DataCoord --> DataNode
    IndexCoord --> IndexNode
    QueryNode --> MinIO
    DataNode --> RocksMQ
    IndexNode --> MinIO
    QueryNode --> MetaStore
    DataNode --> MetaStore

协调服务层组件

组件职责资料来源
Proxy接收客户端请求,负责请求验证、路由和结果聚合-
Root Coord管理 collection/partition 的元数据,处理 DDL 操作-
Query Coord管理 Query Node,协调分布式查询执行-
Data Coord管理 Data Node,协调数据写入和 compaction-
Index Coord管理 Index Node,协调索引构建任务-

执行层组件

组件职责
Query Node执行向量搜索,支持增量更新和历史数据查询
Data Node执行数据写入、删除和 compaction 操作
Index Node执行索引构建,支持多种索引算法

消息队列实现

Milvus 支持多种消息队列作为数据通道,包括 RocksMQ、Pulsar 和 Kafka。RocksMQ 是 Milvus 默认的消息队列实现,基于 RocksDB 构建。

RocksMQ 架构

RocksMQ 使用 RocksDB 作为底层存储,通过分页机制管理消息,实现高效的消息持久化和顺序读取。

graph LR
    subgraph RocksMQ["RocksMQ 架构"]
        subgraph 存储层["RocksDB"]
            TopicData["Topic Data<br/>消息存储"]
            MetaInfo["Meta Info<br/>元数据"]
            AckInfo["Ack Info<br/>消费确认"]
        end
        
        subgraph 内存结构["内存结构"]
            ConsumerMap["Consumer Map<br/>消费者映射"]
            PageInfo["Page Info<br/>分页信息"]
        end
    end
    
    Produce["Produce"] --> TopicData
    TopicData --> Consume["Consume"]
    ConsumerMap -.-> PageInfo
    PageInfo -.-> TopicData

主题管理

RocksMQ 中的每个主题(Topic)对应一个独立的消息流,主题名称不能包含 / 字符。创建主题时会初始化相关的元数据键值对:

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
func (rmq *rocksmq) CreateTopic(topicName string) error {
    // 检查主题名是否包含 "/"
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    // 主题ID键是主题的唯一标识符
    topicIDKey := TopicIDTitle + topicName
    // ...
}

消费者组管理

RocksMQ 支持消费者组机制,允许多个消费者共享同一个主题的消息流。消费者组通过 consumerList 结构管理:

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}

消费者组的存在性检查通过 ExistConsumerGroup 方法实现:

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}

消息保留机制

RocksMQ 实现了基于时间和大小的消息保留策略,通过 RetentionInfo 模块管理过期消息的清理:

配置参数说明默认值
rocksmq.retentionSizeInMB保留消息总大小(MB),-1 表示不限制-1
rocksmq.retentionTimeInMinutes保留时间(分钟),-1 表示不限制-1

消息清理流程:

graph TD
    Start["启动保留检查"] --> CheckSize{"检查保留大小"}
    CheckSize -->|"已超过"| CleanBySize["按大小清理<br/>从最旧页面开始"]
    CheckSize -->|"未超过"| CheckTime{"检查保留时间"}
    
    CheckTime -->|"有页面过期"| CleanByTime["按时间清理<br/>删除已确认且过期的页面"]
    CheckTime -->|"无过期页面"| End["结束"]
    
    CleanBySize --> DeleteRange["DeleteMessages<br/>删除消息范围"]
    CleanByTime --> DeleteRange
    
    DeleteRange --> WriteBatch["WriteBatch<br/>批量删除 RocksDB"]
// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    // 按 ID 范围删除消息 [startID, endID)
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    // ...
}

日志系统

Milvus 使用 mlog 作为统一的日志库,基于 Uber 的 zap 日志框架构建。mlog 设计为上下文感知的日志库,专为 Milvus 分布式系统优化。

设计目标

目标说明
强制上下文传递所有日志操作必须传递 context,确保请求可追溯
零开销抽象使用类型别名避免封装开销,性能接近直接使用 zap
自动字段累积上下文字段自动在调用链中累积,子上下文继承父字段
跨服务传播支持通过 gRPC metadata 传播关键字段用于分布式追踪
延迟编码使用 WithLazy 延迟字段编码,日志级别禁用时避免编码开销

核心架构

┌──────────────────────────────────────────────────────────────────┐
│                           mlog Package                           │
├──────────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌────────────────────────┐  │
│  │  logger.go   │  │  context.go  │  │      field.go          │  │
│  │              │  │              │  │                        │  │
│  │ - Log()      │  │ - WithFields │  │ - Field constructors   │  │
│  │ - Debug()    │  │ - GetProp..  │  │ - PropagatedString     │  │
│  │ - Info()     │  │ - logContext │  │ - PropagatedInt64      │  │
│  └──────────────┘  └──────────────┘  └────────────────────────┘  │
└──────────────────────────────────────────────────────────────────┘

资料来源:pkg/mlog/README.md

流式协调服务

Milvus 的流式协调服务(StreamingCoord)负责管理流式消息通道(PChannel)的分配和负载均衡。

通道元数据管理

PChannelMeta 是 PChannel 的元数据结构,包含通道名称、任期(Term)、访问模式和分配历史:

// 资料来源:internal/streamingcoord/server/balancer/channel/pchannel.go
type PChannelMeta struct {
    inner *streamingpb.PChannelMeta
}

// 获取通道分配历史
func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
    history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
    for _, h := range c.inner.Histories {
        history = append(history, types.PChannelInfoAssigned{
            Channel: types.PChannelInfo{
                Name:       c.inner.GetChannel().GetName(),
                Term:       h.Term,
                AccessMode: types.AccessMode(h.AccessMode),
            },
            Node: types.NewStreamingNodeInfoFromProto(h.Node),
        })
    }
    return history
}

// 检查通道是否已分配
func (c *PChannelMeta) IsAssigned() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}

分配服务

分配服务(AssignmentService)负责处理通道分配请求和配置变更:

// 资料来源:internal/streamingcoord/server/service/assignment.go
type AssignmentService interface {
    streamingpb.StreamingCoordAssignmentServiceServer
}

func NewAssignmentService() streamingpb.StreamingCoordAssignmentServiceServer {
    assignmentService := &assignmentServiceImpl{
        listenerTotal: metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()),
    }
    registry.RegisterAlterReplicateConfigV2AckCallback(assignmentService.alterReplicateConfiguration)
    return assignmentService
}

客户端 SDK

Milvus 提供多语言 SDK 支持,包括 Python、Go、Java、Node.js 等主流编程语言。

Go SDK (client/v2)

Go SDK 是 Milvus 官方推荐的客户端库,采用现代化的 API 设计:

组件说明
milvusclient主客户端包,提供连接管理和请求发送
支持版本Go 1.24.12 或更高版本
// 资料来源:client/README.md
import "github.com/milvus-io/milvus/client/v2/milvusclient"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
})

SDK 版本对应关系

Milvus 版本Python SDKNode.js SDKJava SDKGo SDK
2.6.172.6.142.6.142.6.202.6.4
2.6.162.6.132.6.142.6.192.6.4
2.6.152.6.122.6.132.6.182.6.3
2.6.142.6.112.6.132.6.172.6.1

开发工具

Milvus 提供 mgit.py 智能 Git 工作流工具,用于规范提交和 PR 流程。

功能特性

特性说明
AI 提交信息自动生成符合 Milvus 规范的提交信息
DCO 自动签名确保符合 Developer Certificate of Origin
自动分支创建防止直接提交到 master,自动创建特性分支
完整 PR 工作流支持 fork → branch → commit → issue → PR → cherry-pick
代码格式化提交前自动运行本地格式化工具

提交信息规范

{type}: {description}

可选主体:详细说明

type 类型

type用途
feat新功能
fix错误修复
enhance增强优化
refactor代码重构
test测试相关
docs文档更新
chore构建/工具变更

分支命名规范

{type}/{description}-{timestamp}

示例:fix/memory-leak-1234feat/add-gemini-api-5678

社区热点问题

以下问题反映了用户最关心的功能需求:

Issue标题关注度
#2771Add support for ScaNN index21 comments
#1924Support string id17 comments
#4430Support "string" type field15 comments
#20405Modify the collection schema once collection is created15 comments
#9685Backup and restore2 comments

部署模式

Milvus 支持多种部署模式以适应不同的使用场景:

部署模式说明适用场景
Standalone单机部署,所有组件运行在单一进程开发测试
Cluster分布式集群,支持水平扩展生产环境
Kubernetes基于 K8s 的云原生部署云环境

依赖组件

组件用途可选方案
对象存储向量数据持久化MinIO、S3、Azure Blob
消息队列数据通道、事件通知RocksMQ、Pulsar、Kafka
元数据存储集合、分区元数据etcd、MySQL、TiKV

版本历史

Milvus 采用语义化版本控制,当前主要维护以下版本线:

  • v2.6.x:最新稳定版本,持续添加新功能和优化
  • v2.5.x:长期支持版本,重点修复安全漏洞和关键 bug

最新版本为 v2.6.17(2026年5月22日发布),引入了性能优化和多项新功能。

总结

Milvus 作为一款高性能的向量数据库,通过分层架构设计实现了良好的可扩展性和可用性。其核心组件包括协调服务层的各个 Coord 组件、执行层的 Node 组件,以及支持多种存储后端的灵活性。RocksMQ 消息队列提供了可靠的数据通道,mlog 日志系统确保了分布式环境下的可观测性,多语言 SDK 使开发者能够便捷地集成和使用 Milvus。

如需深入了解特定模块,请参考以下文档:

资料来源:pkg/mlog/README.md

系统架构

Milvus 是一款云原生的大规模向量数据库,专为高效存储、索引和检索高维向量而设计。系统架构采用微服务设计,将核心功能模块解耦为多个独立服务组件,通过消息队列实现组件间通信,实现高可用和水平扩展能力。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 架构概述

继续阅读本节完整说明和来源证据。

章节 RocksMQ 实现

继续阅读本节完整说明和来源证据。

章节 消息保留机制

继续阅读本节完整说明和来源证据。

概述

Milvus 是一款云原生的大规模向量数据库,专为高效存储、索引和检索高维向量而设计。系统架构采用微服务设计,将核心功能模块解耦为多个独立服务组件,通过消息队列实现组件间通信,实现高可用和水平扩展能力。

Milvus 的系统架构覆盖了从数据摄入、索引构建、向量检索到数据持久化的完整数据生命周期。系统支持多种部署模式(单机和集群),以及多种消息队列后端(RocksMQ、Pulsar、Kafka)。

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50

整体架构

Milvus 集群模式采用分层架构设计,主要包含以下核心组件:

组件名称功能描述关键职责
Proxy代理层接收客户端请求,路由转发,结果聚合
RootCoord根协调器元数据管理,时间戳分配,DDL 操作
DataCoord数据协调器存储协调,Segment 管理,索引管理
QueryCoord查询协调器查询节点管理,负载均衡,shard leader
DataNode数据节点数据写入,压缩,索引构建
QueryNode查询节点向量搜索,标量过滤
IndexNode索引节点索引构建任务执行
graph TB
    subgraph 客户端层
        Client[SDK 客户端]
    end
    
    subgraph 协调层
        Proxy[Proxy]
        RootCoord[RootCoord]
        DataCoord[DataCoord]
        QueryCoord[QueryCoord]
    end
    
    subgraph 执行层
        DataNode[DataNode]
        QueryNode[QueryNode]
        IndexNode[IndexNode]
    end
    
    subgraph 存储层
        RocksDB[(RocksDB 元数据)]
        ObjectStorage[对象存储]
        MQ[消息队列]
    end
    
    Client --> Proxy
    Proxy --> RootCoord
    Proxy --> DataCoord
    Proxy --> QueryCoord
    Proxy --> MQ
    
    RootCoord --> RocksDB
    DataCoord --> RocksDB
    DataCoord --> MQ
    QueryCoord --> MQ
    
    DataNode --> RocksDB
    DataNode --> ObjectStorage
    DataNode --> MQ
    
    QueryNode --> ObjectStorage
    QueryNode --> MQ
    
    IndexNode --> ObjectStorage
    IndexNode --> MQ

消息队列子系统

架构概述

消息队列是 Milvus 架构中的核心通信枢纽,负责在各个微服务组件之间传递数据和控制消息。系统支持多种消息队列后端实现:

消息队列类型实现路径适用场景
RocksMQ内置,基于 RocksDB单机部署或轻量级场景
PulsarApache Pulsar大规模分布式部署
KafkaApache Kafka高吞吐生产环境

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60

RocksMQ 实现

RocksMQ 是 Milvus 内置的消息队列实现,基于 RocksDB 存储引擎构建。其核心设计采用 Topic-Partition-Consumer 模式:

graph LR
    subgraph RocksMQ 架构
        Topic[Topic 主题]
        Partition[Partition 分区]
        ConsumerGroup[Consumer Group 消费者组]
        RocksDB[(RocksDB)]
    end
    
    Topic --> Partition
    Partition --> ConsumerGroup
    Partition --> RocksDB

#### 核心数据结构

RocksMQ 实现中的关键数据结构包括:

  • Topic 管理:每个 Topic 对应 RocksDB 中的一组 Key-Value 数据
  • Page 管理:消息按页存储,每页包含多条消息记录
  • Consumer Group:消费者组维护各消费者的消费位置
  • Ack 机制:已确认消息标记,用于垃圾回收
// Topic 标识符构造
func constructKey(metaName, topic string) string {
    return metaName + topic
}

// 分页 ID 解析
func parsePageID(key string) (int64, error) {
    stringSlice := strings.Split(key, "/")
    if len(stringSlice) != 3 {
        return 0, fmt.Errorf("invalid page id %s ", key)
    }
    return strconv.ParseInt(stringSlice[2], 10, 64)
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:10-30

#### 消费者列表管理

消费者组采用线程安全的 Map 结构管理:

type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return
    }
    l.consumers[consumer.GroupName] = consumer
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60

消息保留机制

消息保留(Retention)机制是 RocksMQ 的核心功能之一,负责自动清理已确认的过期消息以释放存储空间。

#### 保留策略

系统支持两种保留策略:

策略类型配置参数说明
按大小保留RocksmqCfg.RetentionSizeInMB当已确认消息总大小超过阈值时触发清理
按时间保留RocksmqCfg.RetentionTimeInMinutes当消息确认时间超过指定分钟数时触发清理
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:30-40

#### 页面清理流程

消息清理基于分页(Page)维度进行,每次清理遍历已确认的分页列表:

graph TD
    A[开始清理检查] --> B{检查保留策略}
    B -->|按大小| C[计算已确认消息总大小]
    B -->|按时间| D[检查消息确认时间戳]
    C --> E{超过阈值?}
    D --> F{已过期?}
    E -->|是| G[定位待清理分页]
    F -->|是| G
    E -->|否| H[结束]
    F -->|否| H
    G --> I[执行 RocksDB 范围删除]
    I --> J[更新元数据]
    J --> H
// 按消息时间过期检查
func msgTimeExpiredCheck(ackedTs int64) bool {
    // 检查消息确认时间是否超过保留时间
}

// 按消息大小过期检查  
func msgSizeExpiredCheck(curDeleteSize, totalAckedSize int64) bool {
    // 检查已删除大小是否超过总已确认大小的阈值
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:50-150

#### 消息删除实现

清理操作通过 RocksDB 的范围删除(Range Delete)实现,保证高效的批量删除:

func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    return db.Write(opts, writeBatch)
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:150-200

存储层架构

分层存储设计

Milvus 采用分层存储架构,将不同类型的数据存储在最适合的存储介质中:

存储层存储介质存储内容特点
元数据层RocksDB/EtcdCollection Schema、Segment 元信息、索引元数据高可靠、低延迟
原始数据层对象存储(MinIO/S3)原始向量数据、Delete Log高吞吐、低成本
索引层对象存储IVF、HNSW、DiskANN 等索引结构高效检索
缓存层内存/SSD热数据缓存、索引缓存加速访问

Segment 管理

Segment 是 Milvus 数据存储的基本单元,分为两类:

Segment 类型说明存储位置
Growing Segment正在写入的 SegmentDataNode 内存 + 磁盘
Sealed Segment已完成写入的 Segment对象存储 + 内存索引缓存

协调层组件

RootCoord(根协调器)

RootCoord 负责管理所有元数据和系统级操作:

  • 时间戳服务:分配全局唯一递增时间戳
  • DDL 操作:Create/Drop Collection、Create/Drop Partition 等
  • 时间轴管理:维护集合版本和操作历史

DataCoord(数据协调器)

DataCoord 负责数据写入的协调工作:

  • Segment 分配:为写入请求分配 Segment
  • 垃圾回收:协调 Segment 压缩和清理
  • 健康检查:监控 DataNode 状态

QueryCoord(查询协调器)

QueryCoord 负责查询执行的协调:

  • 查询调度:将查询请求路由到合适的 QueryNode
  • 负载均衡:动态调整 QueryNode 负载
  • 副本管理:维护查询副本和高可用

流式协调子系统

PChannel 管理

PChannel(Physical Channel)是 Milvus 流式系统的核心抽象,负责管理消息通道的物理资源:

// PChannel 元数据
type PChannelMeta struct {
    inner *streamingpb.PChannelMeta
}

// 通道状态
func (c *PChannelMeta) State() streamingpb.PChannelMetaState {
    return c.inner.State
}

// 检查通道分配状态
func (c *PChannelMeta) IsAssigned() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}

// 检查通道分配中或已分配状态
func (c *PChannelMeta) IsAssignedOrAssigning() bool {
    return c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED || 
           c.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING
}

资料来源:internal/streamingcoord/server/balancer/channel/pchannel.go:1-50

通道状态机

graph LR
    A[空闲] --> B[分配中]
    B --> C[已分配]
    C --> D[回收中]
    D --> A
    
    B -.->|失败| A
    C -.->|节点故障| D

通道状态包括:

  • 空闲(IDLE):可供分配的通道
  • 分配中(ASSIGNING):正在分配给流式节点
  • 已分配(ASSIGNED):已分配给流式节点
  • 回收中(RECOVERING):正在从故障中恢复

历史记录追踪

系统维护通道的完整分配历史:

func (c *PChannelMeta) GetHistories() []types.PChannelInfoAssigned {
    history := make([]types.PChannelInfoAssigned, 0, len(c.inner.Histories))
    for _, h := range c.inner.Histories {
        history = append(history, types.PChannelInfoAssigned{
            Channel: types.PChannelInfo{
                Name:       c.inner.GetChannel().GetName(),
                Term:       h.Term,
                AccessMode: types.AccessMode(h.AccessMode),
            },
            Node: types.NewStreamingNodeInfoFromProto(h.Node),
        })
    }
    return history
}

数据流架构

写入数据流

sequenceDiagram
    participant Client as SDK 客户端
    participant Proxy as Proxy
    participant MQ as 消息队列
    participant DataNode as DataNode
    participant DataCoord as DataCoord
    participant Storage as 对象存储
    
    Client->>Proxy: Insert/Upsert 请求
    Proxy->>RootCoord: 获取时间戳
    RootCoord-->>Proxy: 时间戳
    Proxy->>DataCoord: 获取可写入 Segment
    DataCoord-->>Proxy: Segment 信息
    Proxy->>MQ: 发送消息
    Proxy-->>Client: 返回成功
    DataNode->>MQ: 消费消息
    DataNode->>DataNode: 写入数据
    DataNode->>Storage: 持久化
    DataNode->>DataCoord: 汇报状态

查询数据流

sequenceDiagram
    participant Client as SDK 客户端
    participant Proxy as Proxy
    participant QueryCoord as QueryCoord
    participant QueryNode as QueryNode
    participant Storage as 对象存储
    
    Client->>Proxy: Search/Query 请求
    Proxy->>QueryCoord: 获取 Shard Leaders
    QueryCoord-->>Proxy: Shard Leader 地址
    Proxy->>QueryNode: 并行发送查询
    QueryNode->>Storage: 加载索引
    QueryNode->>QueryNode: 执行向量检索
    QueryNode-->>Proxy: 返回 Top-K 结果
    Proxy->>Proxy: 结果聚合与排序
    Proxy-->>Client: 返回最终结果

部署模式

单机部署(Standalone)

组件部署方式说明
allinone单进程所有组件运行在单一进程中
RocksMQ内置使用内置 RocksMQ 作为消息队列
RocksDB内置元数据存储

集群部署(Cluster)

组件副本数建议高可用支持
Proxy2+负载均衡
RootCoord2+主备切换
DataCoord2+主备切换
QueryCoord2+主备切换
DataNode按需水平扩展
QueryNode按需水平扩展

开发工具

Milvus 提供了 mgit.py 开发工具,用于规范化的 Git 工作流程:

# 格式化代码
make fmt

# 静态检查
make static-check

# 提交类型
fix:      错误修复
feat:     新功能
enhance:  代码优化
refactor: 代码重构
test:     测试相关
docs:     文档更新
chore:    构建工具变更

资料来源:tools/README.md:1-80

关键配置参数

RocksMQ 配置

参数默认值说明
rocksmq.pageSize10MB单页消息大小
rocksmq.retentionSizeInMB-1按大小保留,-1 表示禁用
rocksmq.retentionTimeInMinutes-1按时间保留,-1 表示禁用
rocksmq.tickerTimeInSeconds1保留检查间隔

系统级配置

参数说明
etcd.endpointsEtcd 集群地址
minio.addressMinIO 对象存储地址
pulsar.addressPulsar 集群地址(可选)
kafka.brokersKafka Broker 列表(可选)

社区热点与已知限制

根据社区反馈,以下功能需求较高:

Issue描述状态
#4430支持 String 类型字段长期需求
#1924支持 String ID长期需求
#9685备份与恢复功能功能请求
#20405修改已有数据的 Collection Schema功能请求
#2771ScaNN 索引支持索引增强

总结

Milvus 的系统架构采用分层微服务设计,通过消息队列实现组件间解耦。核心设计特点包括:

  1. 可扩展性:各组件支持水平扩展,适应不同规模数据
  2. 高可用:关键组件支持多副本部署和故障自动转移
  3. 灵活性:支持多种消息队列和存储后端适配
  4. 性能优化:分层存储和多种索引类型支持高效向量检索

系统的核心协调机制依赖于 RootCoord、DataCoord、QueryCoord 三个协调组件,配合 Proxy 实现统一的入口管理。数据存储采用分层架构,元数据、原始数据和索引分离存储,兼顾可靠性和检索效率。

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50

集合与 Schema 设计

Milvus 是一个云原生的向量数据库,集合(Collection)是其核心数据组织单元。每个集合由 Schema(模式)定义其数据结构,包括字段定义、分区策略和元数据配置。Schema 设计直接影响向量检索性能、存储效率和查询灵活性。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 数据模型层次结构

继续阅读本节完整说明和来源证据。

章节 字段类型体系

继续阅读本节完整说明和来源证据。

章节 Collection 模型结构

继续阅读本节完整说明和来源证据。

概述

Milvus 是一个云原生的向量数据库,集合(Collection)是其核心数据组织单元。每个集合由 Schema(模式)定义其数据结构,包括字段定义、分区策略和元数据配置。Schema 设计直接影响向量检索性能、存储效率和查询灵活性。

核心概念

数据模型层次结构

graph TD
    A[Milvus 数据库] --> B[集合 Collection]
    B --> C[Schema 定义]
    C --> D[字段 Fields]
    C --> E[分区 Partitions]
    C --> F[元数据 Metadata]
    D --> D1[主键字段 Primary Key]
    D --> D2[向量字段 Vector Fields]
    D --> D3[标量字段 Scalar Fields]
    D --> D4[Array 字段]
    E --> E1[分区名称]
    E --> E2[分区状态]

字段类型体系

字段类型说明可索引可搜索
Primary Key唯一标识符,支持 Int64
Float Vector浮点向量 (128-32768 维)
Binary Vector二进制向量
Sparse Vector稀疏向量
Int8 VectorInt8 向量
Bool布尔值
Int8/16/32/64整数类型
Float/Double浮点类型
VarChar可变长度字符串
JSONJSON 格式数据
Array数组类型

资料来源:internal/metastore/model/field.go

Schema 定义架构

Collection 模型结构

type Collection struct {
    ID               int64
    Name             string
    Schema           *schemapb.CollectionSchema
    UID              int64
    PartitionIDs     []int64
    PartitionNames   []string
    CreatedUTC       uint64
    CreatedTimestamp uint64
    ConsistencyLevel commonpb.ConsistencyLevel
    Properties       map[string]string
    Status           .pb.Status
}

资料来源:internal/metastore/model/collection.go

Field 模型结构

type Field struct {
    ID               int64
    Name             string
    Description      string
    DataType         schemapb.DataType
    TypeParams       []*commonpb.KeyValuePair
    IndexParams      []*commonpb.KeyValuePair
    State            .pb.State
    IsPrimaryKey     bool
    AutoID           bool
    Nullable         bool  // 支持 nullable 向量列
}

资料来源:internal/metastore/model/field.go

向量字段设计

向量类型支持

Milvus 支持多种向量类型,最新版本(v2.6.4+)在 Go SDK 中完善了 struct-array 向量子字段支持:

向量类型维度范围索引类型应用场景
FloatVector1-32768IVF, HNSW, DiskANN, ScaNN文本/图像嵌入
BinaryVector1-65536BIN_IVF_FLAT图像特征二进制码
SparseVector任意稀疏度SPARSE_WAND稀疏文本检索
Int8Vector1-32768IVF_FLAT高效量化向量

资料来源:client/v2.6.4 Release Notes

Nullable 向量列

从 v2.6.5 版本开始,Go SDK 支持可空向量列:

graph LR
    A[向量字段] -->|Dense| B[Float/Binary/Int8/Sparse]
    A -->|Nullable| C[允许 NULL 值]
    A -->|EmbeddingList| D[支持列表形式]

添加可空向量字段到现有集合时,系统会验证字段必须为 nullable: 资料来源:client/v2.6.5 Release Notes

分区设计

Partition 模型结构

type Partition struct {
    ID           int64
    CollectionID int64
    Name         string
    Description  string
    CreatedTimestamp uint64
    CreatedUTCTimestamp   uint64
    State        pb.PartitionState
}

资料来源:internal/metastore/model/partition.go

分区策略

graph TD
    A[数据写入] --> B{分区策略选择}
    B -->|时间序列| C[按时间分区]
    B -->|业务分类| D[按业务分区]
    B -->|数据量| E[大集合分区]
    C --> F[分区键字段]
    D --> F
    E --> G[负载均衡]

集合生命周期管理

创建集合流程

sequenceDiagram
    participant Client
    participant RootCoord
    participant MetaStore
    participant Proxy
    
    Client->>Proxy: CreateCollection
    Proxy->>RootCoord: 创建集合任务
    RootCoord->>RootCoord: 验证 Schema
    RootCoord->>MetaStore: 保存集合元数据
    MetaStore-->>RootCoord: 确认保存
    RootCoord->>Proxy: 返回成功
    Proxy-->>Client: 创建完成

集合操作 API

操作Go SDKPython SDK说明
CreateCollectionCreateCollection()create_collection()创建集合
DropCollectionDropCollection()drop_collection()删除集合
TruncateCollectionTruncateCollection()-清空数据(不删除集合)
AddCollectionFieldAddCollectionField()-向现有集合添加字段
HasCollectionHasCollection()has_collection()检查集合是否存在

资料来源:client/v2.6.3 Release Notes - TruncateCollection API

Schema 设计最佳实践

字段设计原则

  1. 合理选择主键类型
  • Int64 主键性能最优
  • 社区请求支持 String ID(Issue #1924)- 当前版本仅支持 Int64
  1. 向量维度优化
  • 根据实际 Embedding 模型选择维度
  • 避免过大维度导致存储和检索开销增加
  1. 分区策略
  • 大数据量集合建议分区
  • 按时间或业务属性分区便于数据管理

Schema 约束

约束项限制值
单集合最大字段数128
单字段最大维度32768
主键字段必须唯一,每个集合仅一个
向量字段每个集合至少一个
字段名称最大长度256 字符

社区热点问题

热门 Issue 关注

Issue标题状态
#4430Support "string" type field社区功能请求
#20405Modify the collection schema once collection is createdSchema 修改需求
#1924Support string id主键类型扩展

Schema 修改限制

当前 Milvus 对已创建集合的 Schema 有以下限制:

  • 已存在的非空集合无法直接修改 Schema
  • 仅支持添加 nullable 字段
  • 向量字段添加到现有集合时必须标记为 nullable

资料来源:client/v2.6.5 Release Notes

存储架构与 Topic 管理

RocksMQ Topic 结构

集合数据在底层 RocksMQ 中的存储组织:

graph TD
    A[Topic] --> B[Page Messages]
    A --> C[Page Size Index]
    A --> D[Page Timestamp]
    A --> E[Acked Timestamp]
    
    B -->|按 ID 范围| F[Message Data]
    C -->|PageMsgSizeTitle| G[大小索引]
    D -->|PageTsTitle| H[时间索引]
    E -->|AckedTsTitle| I[确认状态]

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go

消息保留策略

策略类型配置参数说明
按大小保留RetentionSizeInMB超过阈值删除旧消息
按时间保留RetentionTimeInMinutes超过时间删除旧消息
页清理自动触发基于确认状态清理已消费页面

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go

索引与查询

索引类型支持

索引类型向量类型特点
IVF_FLATFloat/Binary/Int8精确聚类,平衡精度与速度
HNSWFloat/Binary图索引,高速高recall
DiskANNFloat磁盘索引,超大规模数据
SPARSE_WANDSparse稀疏向量专用
ScaNNFloatGoogle 高性能算法(Issue #2771)

总结

Milvus 的集合与 Schema 设计采用分层架构:

  • 逻辑层:Collection、Schema、Field 抽象
  • 元数据层:通过 Metastore 持久化模型定义
  • 存储层:RocksMQ 消息队列与 RocksDB 存储
  • 索引层:多种向量索引算法支持

Schema 设计时需综合考虑业务需求、数据规模、查询模式等因素,合理选择字段类型、向量维度和分区策略,以获得最优的检索性能和存储效率。

资料来源:internal/metastore/model/field.go

数据插入与查询流程

Milvus 的数据插入与查询流程是向量数据库核心操作的基础架构,涵盖了从客户端数据摄入到查询执行的完整链路。该流程涉及多个关键组件的协作,包括消息队列(MQ)、存储层、索引服务以及查询节点。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 系统组件层次

继续阅读本节完整说明和来源证据。

章节 RocksMQ 角色定位

继续阅读本节完整说明和来源证据。

章节 客户端请求阶段

继续阅读本节完整说明和来源证据。

概述

Milvus 的数据插入与查询流程是向量数据库核心操作的基础架构,涵盖了从客户端数据摄入到查询执行的完整链路。该流程涉及多个关键组件的协作,包括消息队列(MQ)、存储层、索引服务以及查询节点。

Milvus 支持多种消息队列后端,其中 RocksMQ 是默认的消息队列实现。RocksMQ 基于 RocksDB 构建,提供持久化的消息发布与订阅功能,确保数据的可靠性和一致性。消息队列在整个数据流程中扮演着缓冲层和异步处理的关键角色,使得数据写入与查询可以解耦运行。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq.go:1-30

架构设计

系统组件层次

Milvus 的数据流架构可以分为四个主要层次:接入层、协调层、执行层和存储层。接入层负责处理客户端请求,执行协议转换;协调层管理元数据和集群状态;执行层处理实际的数据操作;存储层负责数据的持久化。

graph TD
    A[客户端 SDK] --> B[Proxy 接入层]
    B --> C[消息队列 MQ]
    C --> D[DataNode 数据节点]
    D --> E[RocksDB 存储]
    D --> F[QueryNode 查询节点]
    F --> G[索引缓存]
    C --> H[Consumer 消费者]
    H --> F

RocksMQ 角色定位

RocksMQ 是 Milvus 架构中的消息传输中枢,负责在数据写入阶段接收来自 Proxy 的插入请求,并将这些消息分发给下游的 DataNode 进行处理。RocksMQ 采用基于主题(Topic)的消息模型,每个 Collection 对应一个独立的消息 Topic,确保数据隔离。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-80

数据插入流程

客户端请求阶段

客户端通过 SDK 构建插入请求,包含向量数据、元数据字段以及时间戳信息(GuaranteeTimestamp)。SDK 负责将数据序列化为 protobuf 格式,并通过 gRPC 发送到 Milvus 集群的 Proxy 节点。Proxy 作为请求的入口,负责请求验证、负载均衡和结果聚合。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq.go:15-25

消息队列处理阶段

当数据到达 Proxy 后,插入请求被转换为 ProducerMessage 格式并发布到 RocksMQ。RocksMQ 的 Produce 方法负责将消息持久化到 RocksDB 存储引擎中。消息按照 Page 进行组织,每个 Page 包含多个消息记录,当 Page 达到配置的大小上限时会创建新的 Page。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:150-200

graph LR
    A[Insert Request] --> B[Proxy Validate]
    B --> C[Construct ProducerMessage]
    C --> D[RocksMQ Produce]
    D --> E[RocksDB Persist]
    E --> F[Return Message IDs]

消息分页机制

RocksMQ 使用基于分页的消息存储策略,消息按照固定大小组织成 Page。每个 Page 有一个唯一的 PageID(pageEndID),记录该 Page 中最后一条消息的位置。Page 的元数据信息包括:

元数据键用途
topic_id/{topicName}主题唯一标识
message_size/{topicName}当前 Page 的消息总大小
page_message_size/{topicName}/{pageID}各 Page 的结束 ID
page_ts/{topicName}/{pageID}Page 最后更新的时间戳
acked_ts/{topicName}/{pageID}Page 最新的确认时间戳

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:200-260

DataNode 消费处理

DataNode 通过订阅相应的 Topic 来消费消息。DataNode 启动时会创建 Consumer 实例,并注册到 RocksMQ 的消费者列表中。消费者使用消费者组(ConsumerGroup)机制实现负载均衡,多个 DataNode 可以协同消费同一个 Topic 的消息。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:80-120

消费者管理机制

消费者列表数据结构

RocksMQ 使用 consumerList 结构管理消费者,每个 Topic 维护一个消费者映射表,键为消费者组名称(GroupName),值为 Consumer 实例指针。该结构使用读写锁(sync.RWMutex)保证并发安全。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-70

type consumerList struct {
    consumers map[string]*Consumer // GroupName -> *Consumer
    mu        sync.RWMutex
}

消费者操作

消费者管理提供以下核心操作:

  • Add:将新消费者添加到列表,使用组名作为唯一标识
  • Remove:根据组名移除消费者,返回被移除的 Consumer 指针
  • Get:根据组名获取消费者实例,支持并发读取
  • Notify:通知指定消费者组有新消息到达

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-70

消费者组检查

ExistConsumerGroup 方法用于检查消费者组是否存在并返回对应的 Consumer 实例。该方法首先检查消费者 ID 映射表中是否存在该组名,然后查询消费者列表获取详细信息。如果消费者组存在但未注册消费者,则返回 false。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:280-310

数据查询流程

查询请求入口

查询请求首先到达 Proxy 节点,Proxy 负责解析查询条件并确定目标查询节点。查询类型包括向量相似度搜索(ANN Search)和标量字段过滤查询。Proxy 根据一致性哈希或负载均衡策略将请求路由到合适的 QueryNode。资料来源:internal/streamingcoord/server/balancer/channel/pchannel.go:1-30

索引与搜索

QueryNode 负责执行实际的向量搜索操作。Milvus 支持多种索引类型,包括 IVF、HNSW、DiskANN 等。查询时,QueryNode 首先在内存中加载已构建的索引,然后执行近似最近邻搜索算法找到与查询向量最相似的 Top-K 结果。资料来源:internal/streamingcoord/server/balancer/channel/pchannel.go:20-50

流式协调通道

PChannel(Physical Channel)是 Milvus 流式系统的核心概念,每个 PChannel 对应一个物理消息通道。PChannelMeta 管理通道的元数据信息,包括通道状态(Assigned、Assigning、Unassigned)、历史分配记录以及当前分配的节点信息。查询节点通过订阅 PChannel 来接收实时的数据更新。资料来源:internal/streamingcoord/server/balancer/channel/pchannel.go:50-90

graph TD
    A[Query Request] --> B[Proxy Route]
    B --> C[QueryNode Pool]
    C --> D[Load Index]
    D --> E[ANN Search]
    E --> F[Post-filter]
    F --> G[Return Results]

数据保留机制

保留策略

RocksMQ 实现了两层数据保留机制:基于消息大小和基于时间的保留策略。通过配置参数可以控制保留行为:

配置参数说明默认值
RetentionSizeInMB消息总大小阈值,超过后清理旧消息-1(禁用)
RetentionTimeInMinutes消息保留时间阈值-1(禁用)
PageSize单个 Page 的最大消息大小可配置

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:1-60

保留检查函数

checkRetention 函数判断是否启用了保留机制,当任一保留参数不为 -1 时返回 true,表示系统需要执行定期的清理任务。保留任务由 retentionInfo 组件在后台定期执行,遍历所有已确认的 Page 并删除超过阈值的旧数据。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:25-35

过期页面清理流程

保留机制的核心是遍历所有已确认的 Page,检查其确认时间戳(acked_ts)是否超过配置的保留时间。对于大小超限的情况,系统会累加已清理的 Page 大小,当总大小超过 RetentionSizeInMB 时停止清理。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:100-180

graph TD
    A[Start Retention Check] --> B[Get All Pages]
    B --> C{Page Acked?}
    C -->|Yes| D{Check Time Expire}
    C -->|No| G[Break Loop]
    D -->|Yes| E[Mark for Deletion]
    D -->|No| G
    E --> F[Accumulate Size]
    F --> C
    G --> H[Delete Marked Pages]

消息删除实现

DeleteMessages 函数通过 RocksDB 的范围删除操作移除指定 ID 范围内的消息。删除操作使用 WriteBatch 批量处理,确保原子性。起始键为 topic/startID,结束键为 topic/(endID+1),实现左闭右开的区间删除。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:60-90

主题生命周期管理

主题创建

CreateTopic 方法负责初始化新的 Topic。创建前会检查 Topic 名称是否合法(不能包含"/"字符),然后在 RocksDB 中写入初始元数据。如果 Topic 已存在则直接返回,避免重复创建。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-220

主题销毁

DestroyTopic 方法执行完整的 Topic 清理工作,包括:删除消费者列表、清理消息存储、删除 Page 元数据、清理确认时间戳信息以及释放相关资源。销毁操作使用互斥锁确保并发安全。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:250-290

主题名称验证

系统禁止在主题名称中使用"/"字符,这是为了避免与内部键名分隔符冲突。创建主题时如果检测到非法字符,会返回 Unrecoverable 错误并记录警告日志。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:185-195

状态管理

RocksMQ 状态机

RocksMQ 实例维护两种状态:RmqStateStopped 和 RmqStateHealthy。状态转换由系统自动管理,初始化时为 Stopped 状态,Start 方法调用后进入 Healthy 状态。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:260-280

状态值含义转换条件
RmqStateStopped已停止初始化或 Close 调用后
RmqStateHealthy运行正常Start 方法调用后

关闭流程

Close 方法首先停止保留任务(stopRetention),然后关闭 RocksDB 数据库连接。关闭前会等待所有进行中的操作完成,确保数据一致性。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:220-250

测试验证

基本功能测试

rocksmq_impl_test.go 提供了完整的功能测试覆盖,包括主题创建、消息生产消费、消费者组管理以及错误场景验证。测试使用独立的 RocksDB 实例,通过 assert 断言验证预期行为。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:1-100

保留机制测试

rocksmq_retention_test.go 验证保留策略的正确性,包括时间过期和大小超限两种场景。测试创建大量消息,消费后等待指定时间或累积足够数据量,然后验证过期消息已被清理。资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go:1-80

常见问题与社区反馈

根据社区反馈,以下是用户在使用数据插入与查询流程时经常遇到的问题:

RocksMQ 独立部署崩溃问题:部分用户在独立部署模式下使用默认的 RocksMQ 时遇到崩溃问题,这通常与 RocksDB 配置或资源限制相关。建议在生产环境使用 Pulsar 或 Kafka 作为消息队列后端。资料来源:GitHub Issue #28583

字符串类型支持:社区持续关注字符串类型字段的支持情况,这是影响数据插入灵活性的重要功能需求。资料来源:GitHub Issue #4430

Schema 修改限制:当前 Collection 创建后不允许修改 Schema,这在某些动态场景下造成了不便。用户希望能够在 Collection 非空时仍能添加新字段。资料来源:GitHub Issue #20405

总结

Milvus 的数据插入与查询流程是一个复杂但设计精良的系统,涵盖了从客户端到存储层的完整数据链路。RocksMQ 作为默认消息队列,提供了可靠的持久化消息传递能力,配合 DataNode 和 QueryNode 实现了高效的数据摄入和查询执行。保留机制确保了系统的存储空间可控,而消费者组机制则支持了水平扩展和高可用部署。理解这一流程对于优化 Milvus 性能、排查问题以及进行二次开发都至关重要。

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:200-260

向量索引类型

Milvus 作为一款高性能的向量数据库,索引技术是实现快速近似最近邻(ANN)搜索的核心组件。本页面详细介绍 Milvus 支持的各种向量索引类型、它们的实现原理、适用场景以及配置参数。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 向量索引分类

继续阅读本节完整说明和来源证据。

章节 简介

继续阅读本节完整说明和来源证据。

章节 核心参数

继续阅读本节完整说明和来源证据。

概述

向量索引是一种数据结构,旨在加速在高维向量空间中的相似性搜索。传统的精确搜索(暴力搜索)在数据量增长时性能急剧下降,而索引通过预计算和图/树/量化等结构,将搜索复杂度从 O(n) 降低到 O(log n) 或更低。

Milvus 支持多种向量索引类型,以满足不同场景对搜索精度、速度和内存占用的需求。索引类型的选择需要综合考虑数据集规模、维度、查询延迟要求、内存限制等因素。

向量索引分类

graph TD
    A[向量索引类型] --> B[内存索引]
    A --> C[磁盘索引]
    A --> D[倒排索引]
    
    B --> B1[HNSW]
    B --> B2[IVF-Flat]
    B --> B3[IVF-PQ]
    B --> B4[SCaNN]
    B --> B5[FLAT]
    
    C --> C1[DISKANN]
    
    D --> D1[SPARSE倒排索引]

HNSW 索引

简介

HNSW(Hierarchical Navigable Small World)是一种基于图的近似最近邻搜索算法。它通过构建多层可导航小世界图来实现高效的向量搜索,在搜索时从最上层开始,逐步向下精确定位最近邻。

核心参数

参数名称类型默认值说明
Mint16每层最大连接数,影响图的密度和搜索精度
efConstructionint200构建时动态列表大小,影响索引质量
efint100搜索时动态列表大小,影响搜索精度和速度
threshold握手float0.5候选结果阈值

工作原理

graph TD
    A[查询向量] --> B[从顶层开始搜索]
    B --> C[找到局部最优点]
    C --> D[下沉到下一层]
    D --> E{是否到达底层?}
    E -->|否| D
    E -->|是| F[返回最近邻结果]
    
    G[图构建过程] --> G1[均匀分布节点到多层]
    G1 --> G2[每层随机选择概率递减]
    G2 --> G3[构建小世界图连接]

资料来源:client/index/hnsw.go:1-100

适用场景

  • 高精度需求:HNSW 提供优秀的搜索精度,ef 参数越大精度越高
  • 动态数据:支持实时插入和删除操作
  • 中等规模数据:通常用于百万到千万级向量
  • 延迟敏感场景:搜索延迟可控制在毫秒级

IVF 索引

简介

IVF(Inverted File System)是一种基于聚类的索引方法。它先将向量空间划分为多个聚类中心,搜索时只扫描与查询向量最近的几个聚类,从而大幅减少搜索范围。

变体类型

#### IVF-Flat

IVF-Flat 保持聚类内向量的原始形态,不进行量化压缩。它在精度和速度之间取得平衡,适合需要高精度且内存充足的环境。

#### IVF-PQ

IVF-PQ 在 IVF 基础上增加了 Product Quantization(产品量化),将高维向量分割成多个子空间并分别量化,大幅降低内存占用但会引入量化误差。

核心参数

参数名称类型默认值说明
nlistint1024聚类中心数量
nprobeint8搜索时探测的聚类数

资料来源:client/index/ivf.go:1-100

性能特性

  • 内存效率:IVF-PQ 可将内存占用降低到原始数据的 1/10 甚至更低
  • 搜索速度:nprobe 越大速度越慢但精度越高
  • 构建速度:相对较快,适合大规模数据批量构建

DISKANN 索引

简介

DISKANN 是一种设计用于磁盘存储的向量索引算法,特别适合超大规模数据集。它通过图索引结构和磁盘优化策略,使得在有限内存条件下仍能实现高效的向量搜索。

核心优势

graph LR
    A[超大规模数据] --> B[磁盘存储]
    B --> C[图索引结构]
    C --> D[内存缓存热点]
    D --> E[高效搜索]

技术实现

资料来源:internal/core/src/index/VectorDiskIndex.cpp:1-100

DISKANN 的核心设计理念是将大部分索引结构存储在磁盘上,只将搜索过程中频繁访问的图结构保留在内存中。这种设计使得单机可以支持数十亿甚至上百亿级别的向量搜索。

适用场景

  • 超大规模数据:单机无法容纳全部索引的场景
  • 内存受限环境:需要在有限内存下提供可接受的搜索性能
  • 冷热数据分离:大部分数据存储在磁盘,热数据保留在内存

SCaNN 索引

简介

SCaNN(Scalable Nearest Neighbors)是 Google 开发的先进向量索引算法,在多项基准测试中表现出色。它结合了量化技术和图搜索的优势,在保持高精度的同时实现极致的搜索速度。

资料来源:client/index/scann.go:1-100

社区关注

社区对 SCaNN 索引有较高期待,GitHub Issue #2771 专门讨论了此功能的需求。社区反馈表明,用户期望通过 SCaNN 获得比现有索引类型更好的性能表现。

核心特性

特性说明
高召回率可配置的高精度搜索
低延迟优化了搜索路径减少计算量
可扩展性良好的水平扩展能力

Sparse 稀疏向量索引

简介

Milvus 支持稀疏向量类型,适用于词袋模型、TF-IDF 等产生的稀疏特征表示。稀疏索引采用倒排索引结构,显著提高稀疏向量的搜索效率。

资料来源:client/index/sparse.go:1-100

倒排索引实现

graph TD
    A[稀疏向量集合] --> B[构建倒排列表]
    B --> C[词汇 -> 文档列表]
    C --> D[查询处理]
    D --> E[合并相关列表]
    E --> F[计算相似度]
    F --> G[返回排序结果]

资料来源:internal/index/InvertedIndexTantivy.cpp:1-100

适用场景

  • 文本嵌入:BGE、SPLADE 等稀疏文本模型
  • 词袋特征:传统 NLP 中的 TF-IDF 向量
  • 高维稀疏数据:维度利用率低于 5% 的场景

二值向量索引

支持的索引类型

Milvus 支持二值向量字段的专门索引,适用于二值化特征表示的场景。常见的索引类型包括:

索引类型描述适用场景
BIN_FLAT暴力搜索小规模数据、高精度需求
BIN_IVF倒排文件索引中等规模数据

特点

  • 内存效率极高:每个维度仅占 1 bit
  • 快速位运算:利用 CPU 位操作加速距离计算
  • 适用场景有限:主要用于二值化特征的相似性搜索

FLAT 索引

简介

FLAT 是最基础的索引类型,采用暴力搜索方式遍历所有向量。它不构建任何预处理数据结构,搜索时直接计算查询向量与所有向量的距离。

适用场景

  • 小规模数据:数据量在十万级以下
  • 高精度需求:需要 100% 召回率的场景
  • 调试测试:作为基准对比其他索引的性能

性能特点

  • 搜索延迟:O(n × d),与数据规模和维度成正比
  • 内存占用:仅存储原始向量,无额外开销
  • 精度:100% 召回率,无量化误差

索引选择指南

graph TD
    A[选择索引类型] --> B{数据规模?}
    B -->|百万级以下| C{内存充足?}
    C -->|是| D[推荐 HNSW 或 FLAT]
    C -->|否| E[推荐 IVF-PQ]
    B -->|百万到千万级| F{延迟要求?}
    F -->|极高| G[推荐 HNSW]
    F -->|中等| H[推荐 IVF]
    B -->|千万到亿级| I{磁盘限制?}
    I -->|是| J[推荐 DISKANN]
    I -->|否| K[推荐 HNSW 或 SCaNN]
    B -->|亿级以上| L[推荐 DISKANN]

选择决策表

数据规模内存限制延迟要求推荐索引
< 10万高精度FLAT
< 100万充足< 10msHNSW
< 100万有限< 50msIVF-PQ
100万-1000万充足< 20msHNSW
100万-1000万有限< 100msIVF
1000万-1亿有限< 200msDISKANN
> 1亿严重受限< 500msDISKANN

索引构建与查询

构建索引

索引构建通常在数据导入后执行,可以使用以下方式:

  1. 自动构建:创建集合时设置 auto_index 参数
  2. 手动构建:调用 create_index 接口显式构建

查询参数调整

搜索时可动态调整查询参数以平衡精度和性能:

# Python SDK 示例
search_params = {
    "metric_type": "L2",
    "params": {
        "ef": 64  # HNSW 搜索参数
    }
}

常见问题

Q: 如何选择合适的索引类型?

A: 根据数据规模、内存限制、延迟要求三个维度综合评估。小规模数据优先考虑精度,大规模数据优先考虑内存效率。

Q: HNSW 的 ef 参数如何设置?

A: ef 值越大搜索精度越高,但延迟也相应增加。建议从默认值 100 开始,根据精度需求逐步调整。

Q: DISKANN 和内存索引如何选择?

A: 当数据量超过单机内存容量时,选择 DISKANN;在内存充足的情况下,HNSW 通常能提供更低的搜索延迟。

相关资源

资料来源:client/index/hnsw.go:1-100

协调节点详解

RocksMQ 是 Milvus 中基于 RocksDB 实现的消息队列组件,作为默认的消息存储后端。在 Milvus 架构中,RocksMQ 负责处理数据写入、消息分页、消费者组管理以及消息保留策略等核心功能。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 创建主题

继续阅读本节完整说明和来源证据。

章节 销毁主题

继续阅读本节完整说明和来源证据。

章节 消费者列表结构

继续阅读本节完整说明和来源证据。

RocksMQ 消息队列实现详解

概述

RocksMQ 是 Milvus 中基于 RocksDB 实现的消息队列组件,作为默认的消息存储后端。在 Milvus 架构中,RocksMQ 负责处理数据写入、消息分页、消费者组管理以及消息保留策略等核心功能。

注:本页内容基于源码文件中的 RocksMQ 实现代码,关于协调节点的详细内容请参阅 rootcoord、datacoord、querycoordv2 等相关源码。

核心架构

RocksMQ 采用 RocksDB 作为底层存储,通过键值对的方式管理主题(Topic)、消息页(Page)和消费者组(Consumer Group)。

graph TD
    A[Producer 发送消息] --> B[RocksMQ 消息分页]
    B --> C[消息存储到 RocksDB]
    C --> D[Consumer Group 消费]
    D --> E[消息确认 ACK]
    E --> F[Retention 清理过期消息]
    
    G[RocksDB Key 结构] --> G1[TopicID: topicName]
    G --> G2[PageMsgSize: topic/pageEndID]
    G --> G3[PageTs: topic/pageEndID]
    G --> G4[AckedTs: topic/consumerGroup/pageID]

主题管理

创建主题

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:125-145
func (rmq *rocksmq) CreateTopic(topicName string) error {
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 检查主题名是否包含 "/"
    if strings.Contains(topicName, "/") {
        log.Warn("rocksmq failed to create topic for topic name contains \"/\"")
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // 主题ID键是主题的唯一标识
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    if err != nil {
        return err
    }
    if val != "" {
        log.Warn("rocksmq topic already exists")
        return nil
    }
    
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    rmq.consumers.LoadOrStore(topicName, &consumerList{consumers: make(map[string]*Consumer)})
    // ... 初始化操作
}

关键约束:

  • 主题名称不能包含字符 /
  • 主题创建时会初始化消费者列表和互斥锁
  • 已存在的主题不会重复创建

销毁主题

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-230
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 清理主题数据
    fixTopicName := topicName + "/"
    err := rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 清理页大小信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    
    // 清理页时间戳信息
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 清理已确认消息时间戳
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    err = rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
    
    // 清理主题信息和消息大小
    topicMu.Delete(topicName)
    rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
}

消费者组管理

消费者列表结构

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-85
type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> *Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return
    }
    l.consumers[consumer.GroupName] = consumer
}

func (l *consumerList) Remove(groupName string) *Consumer {
    l.mu.Lock()
    defer l.mu.Unlock()
    delete(l.consumers, groupName)
    return nil
}

func (l *consumerList) Get(groupName string) *Consumer {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if consumer, ok := l.consumers[groupName]; ok {
        return consumer
    }
    return nil
}

消费者组存在性检查

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:250-265
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}

消息保留策略

保留策略配置

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-45
func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}

消息删除实现

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:80-100
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    err := db.Write(opts, writeBatch)
    return err
}

基于大小的过期清理

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:150-200
// 遍历已确认页,计算总大小
func (ri *RetentionInfo) getTotalAckedSize(topic string) (int64, error) {
    fixedAckedTsKey := constructKey(AckedTsTitle, topic)
    pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
    
    pageReadOpts := gorocksdb.NewDefaultReadOptions()
    defer pageReadOpts.Destroy()
    
    pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts)
    defer pageIter.Close()
    pageIter.Seek([]byte(pageMsgPrefix))
    
    var ackedSize int64
    for ; pageIter.Valid(); pageIter.Next() {
        key := pageIter.Key()
        pageID, err := parsePageID(string(key.Data()))
        // ... 检查页是否已确认
        ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
        ackedTsVal, err := ri.kv.Load(context.TODO(), ackedTsKey)
        if ackedTsVal == "" {
            break  // 未确认,停止计算
        }
        // 累加已确认消息大小
        ackedSize += size
    }
    return ackedSize, nil
}

RocksDB 键结构设计

键类型格式说明
TopicIDTopicIDTitle + topicName主题唯一标识
PageMsgSizePageMsgSizeTitle + topicName + "/" + pageEndID页消息大小
PageTsPageTsTitle + topicName + "/" + pageEndID页时间戳
AckedTsAckedTsTitle + topicName + "/" + consumerGroup + "/" + pageID已确认消息时间戳
MessageSizeMessageSizeTitle + topicName主题消息总大小
CurrentIDCurrentIDTitle + topicName + "/" + groupName消费者当前位置

键解析函数

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:20-30
func parsePageID(key string) (int64, error) {
    stringSlice := strings.Split(key, "/")
    if len(stringSlice) != 3 {
        return 0, fmt.Errorf("invalid page id %s", key)
    }
    return strconv.ParseInt(stringSlice[2], 10, 64)
}

func constructKey(metaName, topic string) string {
    return metaName + topic
}

分页存储机制

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:300-340
// 当页大小超过配置阈值时创建新页
mutateBuffer := make(map[string]string)
for _, id := range msgIDs {
    msgSize := msgSizes[id]
    if curMsgSize+msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
        // 当前页已满,创建新页
        newPageSize := curMsgSize + msgSize
        pageEndID := id
        
        // 更新页消息大小
        pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
        
        // 更新页时间戳
        pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageTsKey] = nowTs
        
        curMsgSize = 0
    } else {
        curMsgSize += msgSize
    }
}

测试覆盖

消费者组测试

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:100-150
func TestRocksmq_ExistConsumerGroup(t *testing.T) {
    rmq, err := NewRocksMQ(rocksdbPath)
    assert.NoError(t, err)
    defer rmq.Close()
    
    topicName := "topic_a"
    rmq.CreateTopic(topicName)
    defer rmq.DestroyTopic(topicName)
    
    groupName := "test"
    consumer := &Consumer{
        Topic:     topicName,
        GroupName: groupName,
        MsgMutex:  make(chan struct{}),
    }
    rmq.RegisterConsumer(consumer)
    
    exist, _, _ := rmq.ExistConsumerGroup(topicName, groupName)
    assert.Equal(t, exist, true)
    
    dummyGrpName := "group_dummy"
    exist, _, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
    assert.Equal(t, exist, false)
}

保留策略测试

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go:50-100
func TestRetentionInfo_PageTimeExpire(t *testing.T) {
    params := paramtable.Get()
    paramtable.Init()
    
    // 测试页时间过期
    params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "1")
    params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "-1")
    
    topicName := "topic_a"
    err = rmq.CreateTopic(topicName)
    
    // 生产大量消息
    msgNum := 100000
    pMsgs := make([]ProducerMessage, msgNum)
    for i := 0; i < msgNum; i++ {
        msg := "message_" + strconv.Itoa(i)
        pMsgs[i] = ProducerMessage{Payload: []byte(msg)}
    }
    ids, err := rmq.Produce(topicName, pMsgs)
    assert.Equal(t, len(pMsgs), len(ids))
    
    // 消费所有消息
    for i := 0; i < msgNum; i++ {
        cMsg, err := rmq.Consume(topicName, groupName, 1)
        cMsgs = append(cMsgs, cMsg[0])
    }
    
    // 等待页面过期
    time.Sleep(time.Duration(2) * time.Second)
    err = rmq.ForceSeek(topicName, groupName, ids[0])
    assert.NoError(t, err)
}

配置参数

参数默认值说明
rocksmq.retentionTimeInMinutes-1 (禁用)消息保留时间(分钟)
rocksmq.retentionSizeInMB-1 (禁用)消息保留大小(MB)
rocksmq.pageSize10 MB单页最大容量
rocksmq.tickerTimeInSeconds1保留检查间隔

状态流程

stateDiagram-v2
    [*] --> Created: CreateTopic
    Created --> Active: Produce Messages
    Active --> Consuming: Register Consumer
    Consuming --> Acknowledged: ACK Message
    Acknowledged --> Retained: Wait for Retention
    Retained --> Expired: Time/Size Check
    Expired --> [*]: DeleteMessages
    
    Created --> [*]: DestroyTopic
    Active --> [*]: DestroyTopic

线程安全机制

// 资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:35-40
var topicMu = sync.Map{}

// 主题操作使用互斥锁保护
func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    lock, ok := ll.(*sync.Mutex)
    if !ok {
        return fmt.Errorf("get mutex failed")
    }
    lock.Lock()
    defer lock.Unlock()
    // ... 清理操作
}

常见问题与社区反馈

相关社区 Issue

根据社区反馈,以下问题与 RocksMQ 相关:

  • 独立部署崩溃问题 (#28583):使用 RocksMQ 作为默认消息队列时,独立部署可能出现崩溃问题,涉及版本 v2.2 及后续版本

生产环境建议

  1. 监控保留策略:确保 retentionTimeInMinutesretentionSizeInMB 配置适当
  2. 磁盘空间管理:RocksMQ 会持续累积数据,需监控 RocksDB 磁盘使用
  3. 消费者组管理:及时清理不再使用的消费者组,避免孤儿消息堆积

相关文档

来源:https://github.com/milvus-io/milvus / 项目说明书

执行节点详解

执行节点(Execution Node)是 Milvus 分布式向量数据库系统中负责数据写入、查询执行和消息队列处理的核心组件。Milvus 的执行节点架构采用分层设计,主要包括 DataNode(数据节点)和 QueryNode(查询节点)两大类,它们通过消息队列进行解耦通信,实现高并发、高可用的数据处理能力。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 消息队列与执行节点的关系

继续阅读本节完整说明和来源证据。

章节 RocksMQ 执行节点实现

继续阅读本节完整说明和来源证据。

章节 分页存储策略

继续阅读本节完整说明和来源证据。

概述

执行节点(Execution Node)是 Milvus 分布式向量数据库系统中负责数据写入、查询执行和消息队列处理的核心组件。Milvus 的执行节点架构采用分层设计,主要包括 DataNode(数据节点)和 QueryNode(查询节点)两大类,它们通过消息队列进行解耦通信,实现高并发、高可用的数据处理能力。

根据 Milvus 的架构设计,执行节点的主要职责包括:

  • 接收并处理来自代理(Proxy)的数据写入请求
  • 管理分片(Segment)的生命周期
  • 执行向量相似度搜索和标量查询
  • 处理消息队列中的数据消费和确认

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50

核心架构

消息队列与执行节点的关系

Milvus 采用发布-订阅模式进行节点间通信。执行节点通过消息队列(如 RocksMQ、Pulsar、Kafka)实现数据的异步处理和系统组件的解耦。

graph TD
    A[Proxy 代理节点] -->|写入请求| B[消息队列]
    B -->|消费消息| C[DataNode 数据节点]
    B -->|消费消息| D[QueryNode 查询节点]
    C -->|数据写入| E[对象存储]
    D -->|索引构建| F[索引缓存]
    E -->|数据加载| D

RocksMQ 执行节点实现

RocksMQ 是 Milvus 默认的消息队列实现,基于 RocksDB 数据库构建。执行节点通过 RocksMQ 实现高效的消息持久化和消费。

#### Topic 管理

Topic 是 RocksMQ 中的核心概念,每个 Collection 对应一个独立的 Topic。执行节点通过 Topic 进行消息的发布和订阅。

// Topic 创建逻辑
func (rmq *rocksmq) CreateTopic(topicName string) error {
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 检查 topicName 是否包含 "/" 字符
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // topicIDKey 是 Topic 的唯一标识
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    if err != nil {
        return err
    }
    if val != "" {
        return nil  // Topic 已存在,直接返回
    }
    
    // 初始化互斥锁用于并发控制
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    return nil
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:95-130

#### 消费者组管理

消费者组(Consumer Group)是执行节点处理消息的基本单位。每个消费者组维护独立的消费进度,实现消息的负载均衡和故障恢复。

type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> Consumer 映射
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return  // 消费者已存在
    }
    l.consumers[consumer.GroupName] = consumer
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:40-60

数据分页机制

分页存储策略

RocksMQ 采用分页机制管理消息,每个分页(Page)包含多条消息。当分页大小达到配置阈值时,系统会创建新的分页继续存储。

// 消息分页处理逻辑
for _, id := range msgIDs {
    msgSize := msgSizes[id]
    if curMsgSize + msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
        // 当前分页已满,创建新分页
        newPageSize := curMsgSize + msgSize
        pageEndID := id
        
        // 更新分页消息大小元数据
        pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
        
        // 更新分页时间戳元数据
        pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
        mutateBuffer[pageTsKey] = nowTs
        
        curMsgSize = 0
    } else {
        curMsgSize += msgSize
    }
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:180-210

分页元数据结构

元数据类型Key 格式说明
PageMsgSizeTitle{prefix}/{pageEndID}记录每个分页的消息总大小
PageTsTitle{prefix}/{pageEndID}记录每个分页的创建时间戳
AckedTsTitle{prefix}/{pageEndID}记录已确认消息的时间戳

保留策略与数据清理

保留策略配置

RocksMQ 支持基于大小和时间两种保留策略,用于自动清理过期数据。

func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}

配置参数说明:

参数默认值说明
RetentionSizeInMB-1(禁用)保留消息总大小阈值(MB)
RetentionTimeInMinutes-1(禁用)保留消息时间阈值(分钟)
PageSize10MB单个分页的最大大小

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:25-30

过期数据清理流程

graph TD
    A[启动保留检查] --> B{检查大小保留策略}
    B -->|已启用| C[计算已确认消息总大小]
    B -->|未启用| D{检查时间保留策略}
    C --> E{超过阈值?}
    E -->|是| F[标记过期分页]
    E -->|否| G[结束检查]
    D -->|已启用| H[遍历分页时间戳]
    H --> I{消息时间过期?}
    I -->|是| F
    I -->|否| G
    F --> J[删除过期分页]
    J --> K[更新元数据]

分页删除实现

// DeleteMessages 按 ID 范围删除消息
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    
    // 删除 [startID, endID) 范围内的所有消息
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    
    err := db.Write(opts, writeBatch)
    return err
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:50-70

Topic 生命周期管理

Topic 销毁流程

当删除 Collection 时,系统需要清理对应的 Topic 及其所有关联数据。

func (rmq *rocksmq) DestroyTopic(topicName string) error {
    ll, ok := topicMu.Load(topicName)
    if !ok {
        return fmt.Errorf("topic name = %s not exist", topicName)
    }
    
    lock := ll.(*sync.Mutex)
    lock.Lock()
    defer lock.Unlock()
    
    // 清理消费者组信息
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 清理消息数据
    fixTopicName := topicName + "/"
    rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 清理分页大小信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    
    // 清理分页时间戳信息
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 清理已确认时间戳信息
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
    
    // 清理保留信息
    topicMu.Delete(topicName)
    rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName)
    
    return nil
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:150-190

消费者组状态管理

消费者组存在性检查

func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}

消息消费确认流程

sequenceDiagram
    participant CN as Consumer 消费者
    participant RMQ as RocksMQ
    participant KV as RocksDB KV Store
    
    CN->>RMQ: Consume(topicName, groupName, num)
    RMQ->>KV: Load current position
    KV-->>RMQ: CurrentID
    RMQ->>KV: Seek to currentID
    KV-->>RMQ: Messages
    RMQ-->>CN: ConsumerMessages
    
    CN->>RMQ: Ack(messageID)
    RMQ->>KV: UpdateAckedInfo
    RMQ->>KV: Update page metadata

执行节点配置

关键配置参数

配置项默认值描述影响
PageSize10MB分页存储大小影响内存使用和 I/O 效率
RetentionSizeInMB-1保留大小阈值-1 表示禁用
RetentionTimeInMinutes-1保留时间阈值-1 表示禁用
TickerTimeInSeconds1清理任务执行间隔影响资源回收及时性

压缩类型配置

RocksMQ 支持对不同类型的数据进行压缩:

// 压缩类型配置格式: "compressionType1,compressionType2,..."
// 其中每个值代表对应数据块的压缩算法

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:200-220

错误处理与故障恢复

常见错误场景

错误类型错误码处理方式
Topic 不存在ErrMqTopicNotFound检查 Topic 是否已创建
消费者组不存在ErrMqConsumerGroupNotFound先创建消费者组
RocksMQ 已关闭RmqNotServingErrMsg重启服务或等待恢复
无效的分页 IDinvalid page id检查 Key 格式是否正确

错误恢复策略

// Topic 创建时的不允许字符检查
if strings.Contains(topicName, "/") {
    log.Warn("rocksmq failed to create topic for topic name contains \"/\"", 
        zap.String("topic", topicName))
    return retry.Unrecoverable(
        fmt.Errorf("topic name = %s contains \"/\"", topicName))
}

性能优化建议

分页大小调优

  • 小分页(<5MB):适合高频写入场景,减少内存占用
  • 中等分页(5-15MB):平衡 I/O 效率和内存使用
  • 大分页(>15MB):适合批量写入,减少分页数量

保留策略优化

  1. 根据磁盘容量设置 RetentionSizeInMB
  2. 根据业务需求设置 RetentionTimeInMinutes
  3. 建议同时启用两个策略以确保数据安全

相关资源

总结

执行节点是 Milvus 架构中的核心处理单元,通过消息队列(如 RocksMQ)与代理节点解耦,实现高并发的数据写入和查询处理。理解执行节点的工作原理对于优化 Milvus 性能、排查问题至关重要。主要关注点包括:

  1. Topic 管理:确保 Topic 命名规范,避免特殊字符
  2. 消费者组管理:正确管理消费进度,支持故障恢复
  3. 保留策略:合理配置数据清理策略,控制存储成本
  4. 分页机制:优化分页大小,提升 I/O 效率

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:1-50

Go SDK (client/v2) 使用指南

Go SDK (client/v2) 是 Milvus 向量数据库的官方 Go 语言客户端,提供了一套完整的 API 用于与 Milvus 服务器进行交互。该 SDK 支持向量存储、相似度搜索、标量字段管理、集合操作、权限控制以及数据复制等核心功能。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 基本连接

继续阅读本节完整说明和来源证据。

章节 客户端配置选项

继续阅读本节完整说明和来源证据。

章节 创建集合

继续阅读本节完整说明和来源证据。

概述

Go SDK (client/v2) 是 Milvus 向量数据库的官方 Go 语言客户端,提供了一套完整的 API 用于与 Milvus 服务器进行交互。该 SDK 支持向量存储、相似度搜索、标量字段管理、集合操作、权限控制以及数据复制等核心功能。

当前版本:2.6.5

主要特性包括:

  • 支持 dense、binary、sparse、int8 类型的可空(nullable)向量列
  • 结构化数组(struct-array)的向量子字段支持
  • EmbeddingList/MAX_SIM 搜索功能
  • Array 字段的部分更新(ARRAY_APPEND、ARRAY_REMOVE)
  • 自定义 gRPC DialOptions 保留默认连接配置

核心组件架构

graph TD
    A[Client] --> B[Collection Operations]
    A --> C[Database Operations]
    A --> D[RBAC Operations]
    A --> E[Replication Operations]
    
    B --> B1[Create/Drop Collection]
    B --> B2[Schema Management]
    B --> B3[Index Operations]
    
    C --> C1[Collection Management]
    C --> C2[Data Query]
    
    D --> D1[User Management]
    D --> D2[Role Management]
    D --> D3[Privilege Management]
    
    E --> E1[Replication Config]
    E --> E2[Truncate Collection]

客户端初始化

基本连接

使用 New 函数创建客户端连接:

import "github.com/milvus-io/milvus/client/v2/milvusclient"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

milvusAddr := "YOUR_MILVUS_ENDPOINT"

cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
    Address: milvusAddr,
})
if err != nil {
    // 处理错误
}
defer cli.Close(ctx)

客户端配置选项

配置项类型描述
AddressstringMilvus 服务器地址(必需)
DialOptions[]grpc.DialOption自定义 gRPC 连接选项
Usernamestring认证用户名
Passwordstring认证密码

注意:自定义 DialOptions 不会覆盖 SDK 的默认 keepalive、backoff 和 max receive message-size 设置。

资料来源:client/README.md

集合操作(Collection Operations)

创建集合

使用 CreateCollection 方法创建集合:

// 定义集合 Schema
fields := []*schema.Field{
    {
        Name:       "id",
        DataType:   schemapb.DataType_Int64,
        IsPrimary:  true,
    },
    {
        Name:       "vector",
        DataType:   schemapb.DataType_FloatVector,
        ElementType: schemapb.DataType_Float,
        TypeParams: map[string]string{
            "dim": "128",
        },
    },
}

err = cli.CreateCollection(ctx, &milvusclient.CreateCollectionOption{
    Collection:  "my_collection",
    Schema:      schema.NewSchema().WithFields(fields),
})

向集合添加字段

使用 AddCollectionField 方法向现有集合添加字段:

err = cli.AddCollectionField(ctx, "my_collection", 
    milvusclient.NewAddCollectionFieldOption("new_field", schemapb.DataType_VarChar).
        WithMaxLength(256).
        WithElementType(schemapb.DataType_VarChar))

重要:新增的向量字段必须是 nullable 的,否则请求会被 SDK 验证拒绝。

截断集合数据

TruncateCollection API 可以快速清除集合中的所有数据,而无需删除和重建集合:

err = cli.TruncateCollection(ctx, NewTruncateCollectionOption("my_collection"))

资料来源:client/milvusclient/collection.go

数据写入操作

插入数据

// 准备数据
ids := []int64{1, 2, 3, 4, 5}
vectors := [][]float32{
    {0.1, 0.2, /* ... 128 维 */},
    {0.1, 0.2, /* ... */},
    // ...
}

result, err := cli.Insert(ctx, &milvusclient.InsertOption{
    Collection: "my_collection",
    Partition:  "my_partition",  // 可选
    FieldsData: map[string]interface{}{
        "id":     ids,
        "vector": vectors,
    },
})

Upsert 操作

支持 Array 字段的部分更新:

// ARRAY_APPEND 操作
result, err := cli.Upsert(ctx, &milvusclient.UpsertOption{
    Collection: "my_collection",
    FieldsData: fieldsData,
}, milvusclient.WithArrayAppend("array_field", "new_element"))

// ARRAY_REMOVE 操作  
result, err := cli.Upsert(ctx, &milvusclient.UpsertOption{
    Collection: "my_collection",
    FieldsData: fieldsData,
}, milvusclient.WithArrayRemove("array_field", "element_to_remove"))

资料来源:client/milvusclient/write.go

数据查询与搜索

向量相似度搜索

// 定义搜索向量
searchVectors := [][]float32{
    {0.1, 0.2, /* ... 128 维 */},
}

// 执行搜索
results, err := cli.Search(ctx, &milvusclient.SearchOption{
    Collection:    "my_collection",
    VecFieldName:  "vector",
    QueryVector:   searchVectors,
    Limit:         10,
    Offset:        0,
    Filter:        "id > 100",  // 可选:标量过滤
    OutputFields:  []string{"id", "vector"},
})

MAX_SIM 搜索

支持基于 EmbeddingList 的最大相似度搜索:

results, err := cli.Search(ctx, &milvusclient.SearchOption{
    Collection:    "my_collection",
    VecFieldName:  "embedding_field",
    QueryVector:   queryEmbedding,
    SearchParams: map[string]interface{}{
        "metric_type": "IP",  // 内积
    },
})

查询标量字段

results, err := cli.Query(ctx, &milvusclient.QueryOption{
    Collection: "my_collection",
    Filter:    "id >= 1 AND id <= 100",
    OutputFields: []string{"id", "scalar_field"},
    Limit:    100,
})

资料来源:client/milvusclient/read.go

权限控制(RBAC)

用户管理

// 创建用户
err = cli.CreateUser(ctx, &milvusclient.CreateUserOption{
    Username: "new_user",
    Password: "password",
})

// 更新密码
err = cli.UpdatePassword(ctx, &milvusclient.UpdatePasswordOption{
    Username:    "existing_user",
    OldPassword: "old_pass",
    NewPassword: "new_pass",
})

// 删除用户
err = cli.DropUser(ctx, &milvusclient.DropUserOption{
    Username: "user_to_delete",
})

角色管理

// 创建角色
err = cli.CreateRole(ctx, &milvusclient.CreateRoleOption{
    RoleName: "admin_role",
})

// 授予角色权限
err = cli.GrantRole(ctx, &milvusclient.GrantRoleOption{
    RoleName:     "admin_role",
    ObjectType:   "Collection",
    ObjectName:   "*",
    Privilege:    "Admin",
})

// 添加用户到角色
err = cli.AddUserToRole(ctx, &milvusclient.AddUserToRoleOption{
    Username: "new_user",
    RoleName: "admin_role",
})

权限检查

// 检查权限
granted, err := cli.CheckRole_granted(ctx, &milvusclient.CheckRoleOption{
    RoleName: "admin_role",
})

资料来源:client/milvusclient/rbac.go

数据复制功能

获取复制配置

config, err := cli.GetReplicateConfiguration(ctx, &milvusclient.GetReplicateConfigurationOption{
    CollectionName: "my_collection",
})

复制状态管理

SDK 提供了完整的复制配置接口,支持跨集群数据同步场景。

资料来源:client/milvusclient/replicate.go

向量类型支持

向量类型描述Nullable 支持索引类型
FloatVector浮点向量IVF, HNSW, DISKANN
BinaryVector二进制向量BIN_IVF
SparseVector稀疏向量SPARSE_INVERTED_INDEX
Int8VectorInt8 向量IVF_FLAT

常见问题与社区讨论

社区热点问题

根据社区反馈,以下功能需求受到较多关注:

Issue描述状态
#4430字符串类型字段支持功能请求中
#20405集合创建后修改 Schema功能请求中
#1924字符串 ID 支持功能请求中
#9685备份与恢复功能功能请求中

使用注意事项

  1. RocksMQ 独立部署崩溃:如果使用 RocksMQ 作为消息队列,在独立部署模式下可能遇到崩溃问题,建议生产环境使用 Kafka 或 Pulsar。
  1. Schema 修改限制:当前 Milvus 不支持直接修改已创建且非空集合的 Schema。
  1. 版本兼容性:确保 Go SDK 版本与 Milvus 服务器版本兼容:
Milvus 版本Go SDK 版本
2.6.172.6.4
2.6.152.6.3
2.6.132.6.1
2.5.272.5.14

最佳实践

连接管理

// 建议使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// 确保客户端资源释放
cli, err := milvusclient.New(ctx, config)
if err != nil {
    return err
}
defer cli.Close(ctx)

批量操作

// 使用批量插入提高性能
const batchSize = 1000
for i := 0; i < len(data); i += batchSize {
    end := i + batchSize
    if end > len(data) {
        end = len(data)
    }
    _, err = cli.Insert(ctx, &milvusclient.InsertOption{
        Collection: "my_collection",
        FieldsData: data[i:end],
    })
    if err != nil {
        return err
    }
}

错误处理

result, err := cli.Search(ctx, options)
if err != nil {
    // 检查错误类型
    if status, ok := status.FromError(err); ok {
        switch status.Code() {
        case codes.NotFound:
            // 处理资源不存在
        case codes.Unavailable:
            // 处理服务不可用
        }
    }
    return err
}

相关资源

资料来源:client/README.md

部署指南

本页面介绍 Milvus 向量数据库的部署方式、架构组成以及配置方法。Milvus 支持单机部署(Standalone)和分布式集群部署(Cluster)两种模式,用户可根据业务规模选择合适的部署方案。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 单机部署(Standalone)

继续阅读本节完整说明和来源证据。

章节 集群部署(Cluster)

继续阅读本节完整说明和来源证据。

章节 主要组件说明

继续阅读本节完整说明和来源证据。

部署模式概述

Milvus 提供两种核心部署模式,分别适用于不同的使用场景。

单机部署(Standalone)

单机部署将所有 Milvus 组件运行在单一节点上,适用于开发、测试以及对高可用性要求较低的场景。单机部署使用 Docker Compose 方式启动,依赖 etcd 存储元数据、MinIO 存储向量数据,以及 RocksMQ 作为消息队列。

集群部署(Cluster)

集群部署采用分布式架构,将各组件分离部署以实现水平扩展和高可用性。集群模式支持 Kubernetes 环境,通过微服务化设计将协调节点(Coordinator)、查询节点(Query Node)、数据节点(Data Node)等组件独立部署。

Milvus 组件架构

Milvus 采用分层架构设计,各组件协同工作完成向量数据的存储、索引和检索。

graph TB
    subgraph 协调层["协调层 Coordinator"]
        RC[Root Coord]
        QC[Query Coord]
        DC[Data Coord]
        IC[Index Coord]
        HC[Proxy/访问层]
    end
    
    subgraph 执行层["执行层 Workers"]
        QN[Query Node]
        DN[Data Node]
        IN[Index Node]
        SN[Data Node]
    end
    
    subgraph 存储层["存储层 Storage"]
        ET[etcd<br/>元数据]
        MS[MinIO/S3<br/>对象存储]
        MQ[Kafka/Pulsar<br/>消息队列]
    end
    
    HC --> RC
    HC --> QC
    HC --> DC
    QC --> QN
    DC --> DN
    IC --> IN
    RC --> ET
    QC --> ET
    DC --> ET
    RC --> MQ
    QN --> MS
    DN --> MS
    IN --> MS

主要组件说明

组件名称功能描述
Proxy负责接收客户端请求,进行请求验证和路由
Root Coord管理整个 Milvus 系统的元数据和协调操作
Query Coord管理查询节点,负责搜索和查询调度
Data Coord管理数据节点,负责数据写入和存储
Index Coord管理索引构建任务
Query Node执行向量相似度搜索
Data Node处理数据写入和持久化
Index Node构建向量索引以加速检索

单机部署指南

环境要求

资源类型最低要求推荐配置
CPU2 核8 核以上
内存4 GB16 GB 以上
磁盘50 GB100 GB SSD
操作系统Ubuntu 18.04+ / CentOS 7+Ubuntu 20.04+

Docker Compose 部署步骤

  1. 下载 docker-compose 配置文件
wget https://raw.githubusercontent.com/milvus-io/milvus/master/deployments/docker/standalone/docker-compose.yml
  1. 启动服务
docker-compose up -d
  1. 验证服务状态
docker-compose ps
  1. 查看日志
docker-compose logs -f milvus

单机部署配置参数

单机部署的配置文件位于 configs/milvus.yaml,主要配置项包括:

配置项默认值说明
etcd.endpointslocalhost:2379etcd 服务地址
minio.addresslocalhost:9000MinIO 服务地址
rocksmq.retentionTimeInMinutes4320RocksMQ 消息保留时间(分钟)
rocksmq.retentionSizeInMB-1RocksMQ 消息保留大小(MB),-1 表示不限制
common.retentionDuration432000数据保留时长(秒)

集群部署指南

Kubernetes 部署架构

graph LR
    Client[客户端] --> LB[Load Balancer]
    LB --> Proxy[Proxy Pods]
    Proxy --> RC[Root Coord]
    Proxy --> QC[Query Coord]
    Proxy --> DC[Data Coord]
    Proxy --> IC[Index Coord]
    QC --> QN1[Query Node 1]
    QC --> QN2[Query Node 2]
    DC --> DN1[Data Node 1]
    DC --> DN2[Data Node 2]
    IC --> IN1[Index Node 1]
    IC --> IN2[Index Node 2]

部署前准备

组件版本要求用途
Kubernetes1.18+容器编排平台
Helm3.0+包管理工具
etcd3.5+元数据存储
MinIORELEASE.2022+对象存储

Helm Chart 部署步骤

  1. 添加 Helm 仓库
helm repo add milvus https://milvus-io.github.io/milvus-helm
helm repo update
  1. 配置 values.yaml
cluster:
  enabled: true

etcd:
  enabled: true

minio:
  enabled: true

pulsar:
  enabled: false

kafka:
  enabled: false

proxy:
  serviceType: LoadBalancer
  1. 执行部署
helm install my-milvus milvus/milvus -n milvus --create-namespace -f values.yaml
  1. 验证部署状态
kubectl get pods -n milvus
kubectl get svc -n milvus

消息队列配置

Milvus 支持多种消息队列作为日志存储后端,默认使用 RocksMQ,在分布式环境中推荐使用 Kafka 或 Pulsar。

RocksMQ 配置

RocksMQ 是单机部署默认的消息队列实现,配置参数位于 milvus.yaml 中的 rocksmq 段落:

rocksmq:
  # RocksMQ 消息保留时间(分钟)
  retentionTimeInMinutes: 4320
  
  # RocksMQ 消息保留大小(MB),-1 表示不限制
  retentionSizeInMB: -1
  
  # RocksMQ 页面大小配置
  pageSize: 1048576
  
  # 压缩类型配置
  compressionTypes: 0,0,0,0,0

消息保留机制

RocksMQ 的保留机制通过 rocksmq_retention.go 实现,支持两种清理策略:

graph TD
    A[启动保留检查] --> B{检查保留配置}
    B -->|时间策略| C[按时间过期清理]
    B -->|大小策略| D[按大小限制清理]
    C --> E[删除已确认消息]
    D --> E
    E --> F[更新页面信息]

清理流程说明:

  • 时间过期检查:遍历已确认的消息页面,检查其确认时间是否超过 retentionTimeInMinutes 配置
  • 大小过期检查:统计已确认消息的总大小,当超过 retentionSizeInMB 时触发清理
  • 原子操作:使用 RocksDB 的批量删除接口确保数据一致性

依赖服务配置

etcd 元数据存储

配置项默认值说明
etcd.endpointslocalhost:2379etcd 集群地址
etcd.rootPathby-devMilvus 根路径
etcd.metaSubPathmeta元数据子路径

MinIO 对象存储

配置项默认值说明
minio.addresslocalhost:9000MinIO 服务地址
minio.accessKeyIDminioadmin访问密钥
minio.secretAccessKeyminioadmin访问密钥
minio.bucketNamemilvus-bucket存储桶名称
minio.useSSLfalse是否使用 SSL

高可用性配置

多副本部署

为保证服务高可用性,可在 values.yaml 中配置各组件的副本数:

queryNode:
  replicas: 3

dataNode:
  replicas: 3

indexNode:
  replicas: 2

健康检查配置

检查项间隔时间超时时间重试次数
存活探针10s5s3
就绪探针10s5s3

安全配置

认证与授权

Milvus 支持基于角色的访问控制(RBAC),配置项包括:

配置项默认值说明
common.security.authorizationEnabledfalse是否启用授权
common.security.userNameminioadmin默认用户名
common.security.passwordminioadmin默认密码

TLS 加密传输

proxy:
  http:
    enabled: true
    port: 9091
  grpc:
    port: 19530
    tls:
      enabled: true
      certPath: /path/to/cert.pem
      keyPath: /path/to/key.pem

性能调优

内存配置

配置项说明
queryNode.cache.memoryLimit查询节点缓存内存上限
dataNode.memory.memoryLimit数据节点内存上限

索引构建配置

配置项默认值说明
indexCoord.buildIndexConcurrency1索引构建并发数
indexCoord.minSegmentSizeToMerge512合并最小段大小

监控与日志

Prometheus 监控

Milvus 集成 Prometheus 指标导出,配置项位于 milvus.yaml

metrics:
  enabled: true
  port: 9091
  path: /metrics

日志配置

配置项默认值说明
log.levelinfo日志级别
log.formattext日志格式
log.file.rotation.maxSize300单文件最大大小(MB)
log.file.rotation.maxAge10文件保留天数

常见部署问题排查

服务启动失败

  1. 检查依赖服务(etcd、MinIO)是否正常运行
  2. 确认端口未被占用:netstat -tlnp | grep <port>
  3. 查看容器日志:docker-compose logs -f milvus

连接超时

可能原因解决方案
防火墙阻止开放必要端口
服务未就绪增加就绪探针等待时间
网络配置错误检查 Docker 网络配置

存储空间不足

# 清理未使用的 Docker 资源
docker system prune -a

# 扩展 MinIO 存储卷
kubectl patch pvc data-my-minio-0 -p '{"spec":{"resources":{"requests":{"storage":"500Gi"}}}}'

相关资源

  • 官方部署文档:https://milvus.io/docs/v2.0.x/install_standalone-docker.md
  • Helm Chart 仓库:https://artifacthub.io/packages/helm/milvus/milvus
  • 最新版本:v2.6.x(持续更新中)

来源:https://github.com/milvus-io/milvus / 项目说明书

监控与运维

Milvus 的监控与运维体系涵盖消息队列状态管理、数据保留机制、消费者组监控以及系统健康检查等多个维度。作为云原生向量数据库,Milvus 提供了完善的运维工具和机制,确保系统在高并发场景下的稳定运行。

章节 相关页面

继续阅读本节完整说明和来源证据。

章节 RocksMQ 在 Milvus 中的定位

继续阅读本节完整说明和来源证据。

章节 关键组件关系

继续阅读本节完整说明和来源证据。

章节 创建主题

继续阅读本节完整说明和来源证据。

概述

Milvus 的监控与运维体系涵盖消息队列状态管理、数据保留机制、消费者组监控以及系统健康检查等多个维度。作为云原生向量数据库,Milvus 提供了完善的运维工具和机制,确保系统在高并发场景下的稳定运行。

RocksMQ 作为 Milvus 默认的消息队列实现,承担着数据传输和异步处理的关键职责。其监控与运维主要围绕以下几个方面展开:

  • 主题(Topic)生命周期管理:创建、销毁、状态追踪
  • 消费者组(Consumer Group)管理:注册、注销、消费位点追踪
  • 数据保留策略:基于时间和基于大小的双重保留机制
  • 消息分页管理:分页存储与清理机制

核心架构

RocksMQ 在 Milvus 中的定位

graph TB
    subgraph "Milvus 架构"
        A[客户端请求] --> B[Proxy 组件]
        B --> C[RocksMQ 消息队列]
        C --> D[QueryNode 消费]
        C --> E[DataNode 消费]
        C --> F[其他消费者]
    end
    
    subgraph "RocksMQ 内部"
        G[Topic 管理器] --> H[分页存储]
        G --> I[消费者列表]
        I --> J[位点追踪]
        H --> K[保留策略]
        K --> L{保留类型}
        L --> M[时间保留]
        L --> N[大小保留]
    end

关键组件关系

RocksMQ 的核心数据结构包括:

组件类型作用
topicMusync.Map主题级别互斥锁映射
consumerListstruct消费者列表封装
rocksmqstruct主消息队列实例
retentionInfostruct保留策略执行器

主题管理

创建主题

主题创建时执行以下校验和初始化操作:

func (rmq *rocksmq) CreateTopic(topicName string) error {
    // 1. 检查关闭状态
    if rmq.isClosed() {
        return errors.New(RmqNotServingErrMsg)
    }
    
    // 2. 校验主题名不包含 "/"
    if strings.Contains(topicName, "/") {
        return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
    }
    
    // 3. 检查主题是否已存在
    topicIDKey := TopicIDTitle + topicName
    val, err := rmq.kv.Load(context.TODO(), topicIDKey)
    
    // 4. 初始化主题锁和消费者映射
    topicMu.LoadOrStore(topicName, new(sync.Mutex))
    rmq.consumers.LoadOrStore(topicName, &consumerList{consumers: make(map[string]*Consumer)})
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:146-175

销毁主题

销毁操作会清理所有相关数据:

func (rmq *rocksmq) DestroyTopic(topicName string) error {
    // 1. 获取主题互斥锁
    ll, ok := topicMu.Load(topicName)
    lock, ok := ll.(*sync.Mutex)
    lock.Lock()
    defer lock.Unlock()
    
    // 2. 清理消费者映射
    rmq.consumers.Delete(topicName)
    rmq.topicName2LatestMsgID.Delete(topicName)
    
    // 3. 删除主题消息数据
    fixTopicName := topicName + "/"
    err := rmq.kv.RemoveWithPrefix(context.TODO(), fixTopicName)
    
    // 4. 清理分页大小和时间戳信息
    pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
    pageMsgTsKey := constructKey(PageTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgSizeKey)
    rmq.kv.RemoveWithPrefix(context.TODO(), pageMsgTsKey)
    
    // 5. 清理确认时间戳和主题信息
    ackedTsKey := constructKey(AckedTsTitle, topicName)
    rmq.kv.RemoveWithPrefix(context.TODO(), ackedTsKey)
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:185-240

消费者组管理

消费者注册与追踪

type consumerList struct {
    consumers map[string]*Consumer  // GroupName -> Consumer
    mu        sync.RWMutex
}

func (l *consumerList) Add(consumer *Consumer) {
    l.mu.Lock()
    defer l.mu.Unlock()
    if _, ok := l.consumers[consumer.GroupName]; ok {
        return  // 已存在则跳过
    }
    l.consumers[consumer.GroupName] = consumer
}

func (l *consumerList) Get(groupName string) *Consumer {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if consumer, ok := l.consumers[groupName]; ok {
        return consumer
    }
    return nil
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:62-87

消费者组存在性检查

func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
    key := constructCurrentID(topicName, groupName)
    _, ok := rmq.consumersID.Load(key)
    if ok {
        if val, ok := rmq.consumers.Load(topicName); ok {
            c := val.(*consumerList).Get(groupName)
            return c != nil, c, nil
        }
    }
    return false, nil, nil
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:260-270

数据保留机制

RocksMQ 实现了双重保留策略,可通过配置独立启用或同时生效:

func checkRetention() bool {
    params := paramtable.Get()
    return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || 
           params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:50-54

保留策略执行流程

graph TD
    A[开始保留检查] --> B{启用大小保留?}
    B -->|是| C[遍历已确认分页]
    B -->|否| D{启用时间保留?}
    C --> E{当前删除大小 > 总确认大小?}
    E -->|是| F[删除该分页]
    E -->|否| G[停止清理]
    F --> H[累加已删除大小]
    H --> C
    G --> I[流程结束]
    D -->|是| J[检查分页确认时间]
    J --> K{确认时间过期?}
    K -->|是| F
    K -->|否| G
    D -->|否| I

消息删除实现

func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
    // 删除 [startID, endID) 范围内的消息
    startKey := path.Join(topic, strconv.FormatInt(startID, 10))
    endKey := path.Join(topic, strconv.FormatInt(endID+1, 10))
    
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.DeleteRange([]byte(startKey), []byte(endKey))
    
    opts := gorocksdb.NewDefaultWriteOptions()
    defer opts.Destroy()
    err := db.Write(opts, writeBatch)
    return err
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:45-60

分页过期检测

保留机制通过迭代已确认的分页来检测过期数据:

// 遍历分页并检查确认时间
pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/"
pageIter := rocksdbkv.NewRocksiteratorWithUpperBound(ri.kv.DB, 
    typeutil.AddOne(pageMsgPrefix), pageReadOpts)

for ; pageIter.Valid(); pageIter.Next() {
    pageID, err := parsePageID(string(pKey.Data()))
    ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10)
    ackedTsVal, err := ri.kv.Load(context.TODO(), ackedTsKey)
    
    // 未确认的分页,停止清理
    if ackedTsVal == "" {
        break
    }
    
    if msgTimeExpiredCheck(ackedTs) {
        // 确认时间过期,执行清理
        pageEndID = pageID
        deletedAckedSize += size
        pageCleaned++
    }
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go:120-180

分页管理机制

分页存储结构

Key 格式用途
{PageMsgSizeTitle}{topic}/{pageEndID}记录分页消息总大小
{PageTsTitle}{topic}/{pageEndID}记录分页创建时间戳
{AckedTsTitle}{topic}/{pageEndID}记录分页确认时间戳

分页大小更新

func (rmq *rocksmq) updatePageMsgSize(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
    curMsgSize := 0
    nowTs := strconv.FormatInt(time.Now().Unix(), 10)
    mutateBuffer := make(map[string]string)
    
    for _, id := range msgIDs {
        msgSize := msgSizes[id]
        if curMsgSize + msgSize > params.RocksmqCfg.PageSize.GetAsInt64() {
            // 当前分页已满,创建新分页
            pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(id, 10)
            mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(curMsgSize + msgSize, 10)
            pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(id, 10)
            mutateBuffer[pageTsKey] = nowTs
            curMsgSize = 0
        } else {
            curMsgSize += msgSize
        }
    }
    return rmq.kv.MultiSave(context.TODO(), mutateBuffer)
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:310-340

配置参数

RocksMQ 相关配置项

参数名作用默认值
rocksmq.compressionTypes消息压缩类型0,0,0,0,0
rocksmq.retentionSizeInMB大小保留阈值(MB),-1 表示禁用-1
rocksmq.retentionTimeInMinutes时间保留阈值(分钟),-1 表示禁用-1
rocksmq.pageSize单个分页的最大消息大小-
rocksmq.tickerTimeInSeconds保留检查间隔(秒)1

配置校验

func TestRocksmq_ParseCompressionTypeError(t *testing.T) {
    params := paramtable.Get()
    params.Save(params.RocksmqCfg.CompressionTypes.Key, "invalid,1")
    _, err := parseCompressionType(params)
    assert.Error(t, err)  // 无效值应返回错误
    
    params.Save(params.RocksmqCfg.CompressionTypes.Key, "-1,-1")
    _, err = parseCompressionType(params)
    assert.Error(t, err)  // 负数应返回错误
}

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go:350-365

运维最佳实践

主题命名规范

  • 禁止在主题名中使用 / 字符
  • 主题名应具有业务语义,便于问题排查
  • 生产环境建议使用统一的主题命名规范

消费者组管理

  1. 正确注销:消费组不再使用时,应调用 DestroyConsumerGroup 释放资源
  2. 位点管理:定期检查消费者位点,避免消息积压
  3. 异常恢复:使用 ForceSeek 定位到指定消息 ID 进行消费位点重置
// 强制跳转消费位点示例
err = rmq.ForceSeek(topicName, groupName, ids[0])

保留策略调优

graph LR
    A[数据量增长快] --> B[启用大小保留]
    A --> C[减小 PageSize]
    D[数据访问周期固定] --> E[启用时间保留]
    D --> F[调整 RetentionTime]
    G[两者结合] --> H[双重保护机制]

常见问题排查

症状可能原因排查方法
消息未清理保留策略未启用检查 RetentionSizeInMBRetentionTimeInMinutes 配置
消费者无法消费消费者组未注册调用 ExistConsumerGroup 确认
主题创建失败主题名包含非法字符检查名称是否包含 /
内存占用高分页未及时清理检查保留机制是否正常运行

社区关注的问题

根据社区反馈,以下运维相关问题受到较多关注:

  • #28583 - Milvus standalone 崩溃问题(涉及 RocksMQ 相关错误)
  • #9685 - 备份与恢复功能请求
  • 独立部署模式下 RocksMQ 的稳定性问题

相关文档

资料来源:pkg/mq/mqimpl/rocksmq/server/rocksmq_impl.go:146-175

失败模式与踩坑日记

保留 Doramagic 在发现、验证和编译中沉淀的项目专属风险,不把社区讨论只当作装饰信息。

medium 来源证据:Milvus standalone crashed

可能阻塞安装或首次运行。

medium 能力判断依赖假设

假设不成立时,用户拿不到承诺的能力。

medium 运行可能依赖外部服务

本地安装成功不等于能力可用,外部服务不可用会阻断体验。

medium 维护活跃度未知

新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。

Pitfall Log / 踩坑日志

项目:milvus-io/milvus

摘要:发现 8 个潜在踩坑项,其中 0 个为 high/blocking;最高优先级:安装坑 - 来源证据:Milvus standalone crashed。

1. 安装坑 · 来源证据:Milvus standalone crashed

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:GitHub 社区证据显示该项目存在一个安装相关的待验证问题:Milvus standalone crashed
  • 对用户的影响:可能阻塞安装或首次运行。
  • 建议检查:来源显示可能已有修复、规避或版本变化,说明书中必须标注适用版本。
  • 防护动作:不得脱离来源链接放大为确定性结论;需要标注适用版本和复核状态。
  • 证据:community_evidence:github | cevd_f0bbd70e35ef4c65a374c802b7614b46 | https://github.com/milvus-io/milvus/issues/28583 | 来源讨论提到 linux 相关条件,需在安装/试用前复核。

2. 能力坑 · 能力判断依赖假设

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:README/documentation is current enough for a first validation pass.
  • 对用户的影响:假设不成立时,用户拿不到承诺的能力。
  • 建议检查:将假设转成下游验证清单。
  • 防护动作:假设必须转成验证项;没有验证结果前不能写成事实。
  • 证据:capability.assumptions | github_repo:208728772 | https://github.com/milvus-io/milvus | README/documentation is current enough for a first validation pass.

3. 运行坑 · 运行可能依赖外部服务

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:项目说明出现 external service/cloud/webhook/database 等运行依赖关键词。
  • 对用户的影响:本地安装成功不等于能力可用,外部服务不可用会阻断体验。
  • 建议检查:确认是否有离线 demo、mock 数据或可替代服务。
  • 防护动作:外部服务依赖未明确时,不把本地安装成功等同于能力可用。
  • 证据:packet_text.keyword_scan | github_repo:208728772 | https://github.com/milvus-io/milvus | matched external service / cloud / webhook / database keyword

4. 维护坑 · 维护活跃度未知

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:未记录 last_activity_observed。
  • 对用户的影响:新项目、停更项目和活跃项目会被混在一起,推荐信任度下降。
  • 建议检查:补 GitHub 最近 commit、release、issue/PR 响应信号。
  • 防护动作:维护活跃度未知时,推荐强度不能标为高信任。
  • 证据:evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | last_activity_observed missing

5. 安全/权限坑 · 下游验证发现风险项

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:下游已经要求复核,不能在页面中弱化。
  • 建议检查:进入安全/权限治理复核队列。
  • 防护动作:下游风险存在时必须保持 review/recommendation 降级。
  • 证据:downstream_validation.risk_items | github_repo:208728772 | https://github.com/milvus-io/milvus | no_demo; severity=medium

6. 安全/权限坑 · 存在评分风险

  • 严重度:medium
  • 证据强度:source_linked
  • 发现:no_demo
  • 对用户的影响:风险会影响是否适合普通用户安装。
  • 建议检查:把风险写入边界卡,并确认是否需要人工复核。
  • 防护动作:评分风险必须进入边界卡,不能只作为内部分数。
  • 证据:risks.scoring_risks | github_repo:208728772 | https://github.com/milvus-io/milvus | no_demo; severity=medium

7. 维护坑 · issue/PR 响应质量未知

  • 严重度:low
  • 证据强度:source_linked
  • 发现:issue_or_pr_quality=unknown。
  • 对用户的影响:用户无法判断遇到问题后是否有人维护。
  • 建议检查:抽样最近 issue/PR,判断是否长期无人处理。
  • 防护动作:issue/PR 响应未知时,必须提示维护风险。
  • 证据:evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | issue_or_pr_quality=unknown

8. 维护坑 · 发布节奏不明确

  • 严重度:low
  • 证据强度:source_linked
  • 发现:release_recency=unknown。
  • 对用户的影响:安装命令和文档可能落后于代码,用户踩坑概率升高。
  • 建议检查:确认最近 release/tag 和 README 安装命令是否一致。
  • 防护动作:发布节奏未知或过期时,安装说明必须标注可能漂移。
  • 证据:evidence.maintainer_signals | github_repo:208728772 | https://github.com/milvus-io/milvus | release_recency=unknown

来源:Doramagic 发现、验证与编译记录