Milvus 是一款开源的向量数据库,支持针对 TB 级向量的增删改操作和近实时查询,具有高度灵活、稳定可靠以及高速查询等特点。。Milvus 集成了 Faiss、NMSLIB、Annoy 等广泛应用的向量索引库,提供了一整套简单直观的 API,让你可以针对不同场景选择不同的索引类型。
Milvus 在 Apache 2 License 协议下发布,于 2019 年 10 月正式开源,是 LF AI & DATA 基金会的孵化项目。Milvus 的源代码被托管于 Github。
Milvus 文档:https://milvus.io/docs
1. 数据库安装
我们这里采用基于 Docker 的安装方式,使用下面命令,下载 docker-compose.yam 文件:
$ wget https://github.com/milvus-io/milvus/releases/download/v2.2.3/milvus-standalone-docker-compose.yml -O docker-compose.yml
该文件的内容如下:
version: '3.5' services: etcd: container_name: milvus-etcd image: quay.io/coreos/etcd:v3.5.5 environment: - ETCD_AUTO_COMPACTION_MODE=revision - ETCD_AUTO_COMPACTION_RETENTION=1000 - ETCD_QUOTA_BACKEND_BYTES=4294967296 - ETCD_SNAPSHOT_COUNT=50000 volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd minio: container_name: milvus-minio image: minio/minio:RELEASE.2022-03-17T06-34-49Z environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin ports: - "9001:9001" - "9000:9000" volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data command: minio server /minio_data --console-address ":9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s timeout: 20s retries: 3 standalone: container_name: milvus-standalone image: milvusdb/milvus:v2.2.3 command: ["milvus", "run", "standalone"] environment: ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus ports: - "19530:19530" - "9091:9091" depends_on: - "etcd" - "minio" networks: default: name: milvus
接着,使用下面命令执行上面下载的文件,该文件会使得 Docker 下载并实例化 3 个容器对象:etcd、minio、standalone,所以执行完毕后,会运行这三个容器:
sudo docker-compose up -d
其他有用的一些命令:
# 查看容器运行状态, 注意在刚下载的 docker-compose.yam 目录下执行该命令 sudo docker-compose ps # 停止所有容器 sudo docker-compose down # 删除容器关联的数据文件目录,默认在 docker-compose.yml 同级目录下会创建一个 volumes 目录,用于存储相关数据文件 # sudo rm -rf volumes
前面提到 Milvus 提供了多语言的 SDK,我们这里下载其 Python 语言的版本,安装命令为:
pip install pymilvus==2.2.2
2. 数据库连接
pymilvus 中通过维护一个全局、唯一的 Connections 类型的 connections 对象来管理所有的数据库连接对象,我们可以通过该对象相关方法来设置和连接相关的操作:
示例代码:
from pymilvus import connections # 导入的 connections 是一个单例对象,维护用户所有的连接 def test(): # 连接数据库, 如果 alias 没有指定的话,默认名字为 default connections.connect(alias='main1', host='localhost', port=19530) # 判断连接对象状态 print(connections.has_connection('main1')) # 获得 alias 对应的连接地址 print(connections.get_connection_addr('main1')) # 该函数可以添加连接信息,当创建数据库时可以直接使用 alias 标记在那个连接中进行操作 # connections.add_connection(main2={"host": "localhost", "port": '19530'}) ''' connections.add_connection( default={"host": "localhost", "port": "19530"}, dev1={"host": "localhost", "port": "19531"}, dev2={"uri": "http://random.com/random"}, dev3={"uri": "http://localhost:19530"}, dev4={"uri": "tcp://localhost:19530"}, dev5={"address": "localhost:19530"}, prod={"uri": "http://random.random.random.com:19530"}, ) ''' # 获得所有连接对象 print(connections.list_connections()) # 断开名字为 alias 连接 connections.disconnect('main') if __name__ == '__main__': test()
程序输出结果:
True {'address': 'localhost:19530', 'user': ''} [('default', None), ('main1', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fc6700dc0d0>)]
3. 集合创建操作
在 Milvus中,没有传统意义上的关系数据库中数据库、表的概念。Milvus 使用向量数据库 Vector Database和集合 Collection 来存储和组织向量数据。
Vector Database 是一种专门用于存储向量数据的数据库,一个 Vector Database 可以包含多个 Collection,每个 Collection 都是一个独立的向量数据集合,每个Collection可以包含多个向量数据。
接下来,我们就在向量数据库中创建一个可以存储向量数据的 Collection 集合,其创建步骤如下:
- 定义 Collection 的字段信息
- 定义 Collection 的配置信息
- 创建 Collection 集合到向量数据库中
支持的字段类型:
For primary key field: DataType.INT64 (numpy.int64) DataType.VARCHAR (VARCHAR) For scalar field: DataType.BOOL (Boolean) DataType.INT64 (numpy.int64) DataType.FLOAT (numpy.float32) DataType.DOUBLE (numpy.double) For vector field: BINARY_VECTOR (Binary vector) FLOAT_VECTOR (Float vector)
示例代码:
from pymilvus import connections from pymilvus import CollectionSchema from pymilvus import Collection from pymilvus import FieldSchema from pymilvus import DataType from pymilvus import list_collections from pymilvus import has_collection from pymilvus import drop_collection def test(): # 连接数据库 connections.connect(alias='main', host='localhost', port=19530) # 1. 定义 collection 字段信息 field1 = FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, description='主键') field2 = FieldSchema(name='name', dtype=DataType.VARCHAR, max_length=100, description='名字') field3 = FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=328, description='向量') # 2. 定义 collection 配置信息 collection_schema = CollectionSchema(fields=[field1, field2, field3], description='数据库') # 3. 创建数据库和表 # using 如果不指定,则使用 default 连接 collection = Collection(name='my_collection', # collection 的名字 schema=collection_schema, # collection 的配置信息 using='main') # 向哪个连接中创建数据库和表 # 4. 其他关于 collection 的操作 # 获得指定连接中所有的 collection 名字 print('获得所有的集合:', list_collections(using='main')) # 判断是否存在指定名字的 collection 集合 print('是否存在某集合:', has_collection(collection_name='my_collection', using='main')) # 删除指定名字的 collection 集合 drop_collection(collection_name='my_collection', using='main') print('是否存在某集合:', has_collection(collection_name='my_collection', using='main')) # 断开数据库 connections.disconnect('main') if __name__ == '__main__': test()
程序输出内容:
获得所有的集合: ['first_milvus', 'my_collection'] 是否存在某集合: True 是否存在某集合: False
4. 集合插入操作
在进行插入操作时,重复插入 PK 相同的数据,insert_count 始终返回 10,它表示插入了数据 10 条,但是可能这 10 条数据在真正 flush 到 collection 时,由于 PK 重复被过滤掉。
from pymilvus import connections from pymilvus import Collection from pymilvus import list_collections import numpy as np def test(): # 连接数据库 connections.connect(alias='main', host='localhost', port=19530) # 打印所有的 collection 集合 print('获得所有的集合:', list_collections(using='main')) # 获得已存在集合对象 collection = Collection(name='my_collection',using='main') # 构建要插入的数据(id, name, vector) # 注意: 数据需要按照字段顺序构建 data_num = 10 user_data = [np.arange(data_num), ['name_' + str(i) for i in range(data_num)], np.random.randn(data_num, 328)] # 向默认的 default 分区中插入 data_num 条数据 # 如果 id 存在则进行更新操作,需要 flush 将内存中的更新写入到数据库 res = collection.insert(data=user_data, partition_name='_default') collection.flush() print('插入行数:', res.insert_count) print('插入主键:', res.primary_keys) # 删除数据, Milvus 仅支持删除具有明确指定主键的实体,什么主键大于几、小于几都是不支持的 expr = 'id > 40' res = collection.delete(expr=expr) print('成功操作:', res.succ_count) print('删除行数:', res.delete_count) print('删除主键', res.primary_keys) # 断开数据库 connections.disconnect('main') if __name__ == '__main__': test()
程序输出内容:
获得所有的集合: ['my_collection'] 插入行数: 10 插入主键: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
5. 集合查询操作
在 Milvus 中进行查询操作,必须对要进行查询的字段、以及向量字段构建索引(及时不搜索该字段,仍然需要构建索引),并且要将 collection load 到内存中,因为 Milvus 的所有查询操作都是在内存中进行的。
另外,一个 collection 可以存在多个分区 Partition。例如:我们有 1 万条数据,插入时可以将这些数据按照某些条件分成 5 个分区 p1、p2、p3、p4、p5,插入时候就可以指定数据的插入分区。
当 load 数据时,可以只 load 某个分区数据,加快搜索速度。
距离度量和索引类型:https://milvus.io/docs/metric.md#floating
示例代码:
from pymilvus import connections from pymilvus import Collection def test(): # 连接数据库 connections.connect(alias='main', host='localhost', port=19530) # 获得 collection 对象 collection = Collection(name='my_collection',using='main') collection.release() # 我们要查询 ID 字段,给其构建索引 # 构建索引需要在 collection 未加载到内存时进行 collection.create_index(field_name='id', index_name='PK_index') # 给向量字段构建索引,并指定索引类型,以及相似度度量方式 # nlist 表示簇的个数,该参数可以将向量划分成多个区域,有利于加快搜索 index_params = { "metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 1024} } collection.create_index(field_name='vector', index_name='vector_index', index_params=index_params) # 将整个 collection 加载到内存中,也可以只加载某个 Partition collection.load() # 根据某个字段查询 # query 函数的 output_fields 字段可以返回任意字段 res = collection.query(expr='id > 0', output_fields=['id', 'name']) print(len(res), res) # 查询相似的向量 data = np.random.randn(1, 328) res = collection.search(data=data, # 要搜索的向量 limit=3, # 返回记录数量 anns_field='vector', # 要搜索的字段 param={'nprobe': 10, 'metric_type': 'L2'}, # nprobe 在最近的10个簇中搜索 output_fields=['id']) # 输出字段名字,只能是标量字段(数字或者小数字段,字符串无法返回) print(res) # 断开数据库 connections.disconnect('main') if __name__ == '__main__': test()
程序输出结果:
# query 函数根据 ID 字段的搜索结果 9 [{'id': 1, 'name': 'name_1'}, {'id': 2, 'name': 'name_%s2'}, {'id': 3, 'name': 'name_3'}, {'id': 4, 'name': 'name_%s4'}, {'id': 5, 'name': 'name_5'}, {'id': 6, 'name': 'name_6'}, {'id': 7, 'name': 'name_7'}, {'id': 8, 'name': 'name_8'}, {'id': 9, 'name': 'name_9'}] # search 函数搜索的最近向量 ["['(distance: 535.2747802734375, id: 2)', '(distance: 552.648681640625, id: 1)', '(distance: 558.37158203125, id: 7)']"]