向量数据库 milvus 使用

Milvus 是一款开源的向量数据库,支持针对 TB 级向量的增删改操作和近实时查询,具有高度灵活、稳定可靠以及高速查询等特点。。Milvus 集成了 Faiss、NMSLIB、Annoy 等广泛应用的向量索引库,提供了一整套简单直观的 API,让你可以针对不同场景选择不同的索引类型。

Milvus 在 Apache 2 License 协议下发布,于 2019 年 10 月正式开源,是 LF AI & DATA 基金会的孵化项目。Milvus 的源代码被托管于 Github。

Milvus 文档:https://milvus.io/docs

1. 数据库安装

我们这里采用基于 Docker 的安装方式,使用下面命令,下载 docker-compose.yam 文件:

$ wget https://github.com/milvus-io/milvus/releases/download/v2.2.3/milvus-standalone-docker-compose.yml -O docker-compose.yml

该文件的内容如下:

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2022-03-17T06-34-49Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.2.3
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - "etcd"
      - "minio"

networks:
  default:
    name: milvus

接着,使用下面命令执行上面下载的文件,该文件会使得 Docker 下载并实例化 3 个容器对象:etcd、minio、standalone,所以执行完毕后,会运行这三个容器:

sudo docker-compose up -d

其他有用的一些命令:

# 查看容器运行状态, 注意在刚下载的 docker-compose.yam 目录下执行该命令
sudo docker-compose ps

# 停止所有容器
sudo docker-compose down

# 删除容器关联的数据文件目录,默认在 docker-compose.yml 同级目录下会创建一个 volumes 目录,用于存储相关数据文件
# sudo rm -rf  volumes

前面提到 Milvus 提供了多语言的 SDK,我们这里下载其 Python 语言的版本,安装命令为:

pip install pymilvus==2.2.2

2. 数据库连接

pymilvus 中通过维护一个全局、唯一的 Connections 类型的 connections 对象来管理所有的数据库连接对象,我们可以通过该对象相关方法来设置和连接相关的操作:

示例代码:

from pymilvus import connections
# 导入的 connections 是一个单例对象,维护用户所有的连接


def test():

    # 连接数据库, 如果 alias 没有指定的话,默认名字为 default
    connections.connect(alias='main1', host='localhost', port=19530)

    # 判断连接对象状态
    print(connections.has_connection('main1'))

    # 获得 alias 对应的连接地址
    print(connections.get_connection_addr('main1'))

    # 该函数可以添加连接信息,当创建数据库时可以直接使用 alias 标记在那个连接中进行操作
    # connections.add_connection(main2={"host": "localhost", "port": '19530'})
    '''
         connections.add_connection(
                default={"host": "localhost", "port": "19530"},
                dev1={"host": "localhost", "port": "19531"},
                dev2={"uri": "http://random.com/random"},
                dev3={"uri": "http://localhost:19530"},
                dev4={"uri": "tcp://localhost:19530"},
                dev5={"address": "localhost:19530"},
                prod={"uri": "http://random.random.random.com:19530"},
            )
    '''

    # 获得所有连接对象
    print(connections.list_connections())

    # 断开名字为 alias 连接
    connections.disconnect('main')


if __name__ == '__main__':
    test()

程序输出结果:

True
{'address': 'localhost:19530', 'user': ''}
[('default', None), ('main1', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fc6700dc0d0>)]

3. 集合创建操作

在 Milvus中,没有传统意义上的关系数据库中数据库、表的概念。Milvus 使用向量数据库 Vector Database和集合 Collection 来存储和组织向量数据。

Vector Database 是一种专门用于存储向量数据的数据库,一个 Vector Database 可以包含多个 Collection,每个 Collection 都是一个独立的向量数据集合,每个Collection可以包含多个向量数据。

接下来,我们就在向量数据库中创建一个可以存储向量数据的 Collection 集合,其创建步骤如下:

  1. 定义 Collection 的字段信息
  2. 定义 Collection 的配置信息
  3. 创建 Collection 集合到向量数据库中

支持的字段类型:

For primary key field:
	DataType.INT64 (numpy.int64)
	DataType.VARCHAR (VARCHAR)

For scalar field:
	DataType.BOOL (Boolean)
	DataType.INT64 (numpy.int64)
	DataType.FLOAT (numpy.float32)
	DataType.DOUBLE (numpy.double)

For vector field:
	BINARY_VECTOR (Binary vector)
	FLOAT_VECTOR (Float vector)

示例代码:

from pymilvus import connections
from pymilvus import CollectionSchema
from pymilvus import Collection
from pymilvus import FieldSchema
from pymilvus import DataType
from pymilvus import list_collections
from pymilvus import has_collection
from pymilvus import drop_collection

def test():

    # 连接数据库
    connections.connect(alias='main', host='localhost', port=19530)


    # 1. 定义 collection 字段信息
    field1 = FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, description='主键')
    field2 = FieldSchema(name='name', dtype=DataType.VARCHAR, max_length=100, description='名字')
    field3 = FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=328, description='向量')

    # 2. 定义 collection 配置信息
    collection_schema = CollectionSchema(fields=[field1, field2, field3], description='数据库')

    # 3. 创建数据库和表
    # using 如果不指定,则使用 default 连接
    collection = Collection(name='my_collection',        # collection 的名字
                            schema=collection_schema,    # collection 的配置信息
                            using='main')                # 向哪个连接中创建数据库和表

    # 4. 其他关于 collection 的操作
    # 获得指定连接中所有的 collection 名字
    print('获得所有的集合:', list_collections(using='main'))

    # 判断是否存在指定名字的 collection 集合
    print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))

    # 删除指定名字的 collection 集合
    drop_collection(collection_name='my_collection', using='main')
    print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))


    # 断开数据库
    connections.disconnect('main')


if __name__ == '__main__':
    test()

程序输出内容:

获得所有的集合: ['first_milvus', 'my_collection']
是否存在某集合: True
是否存在某集合: False

4. 集合插入操作

在进行插入操作时,重复插入 PK 相同的数据,insert_count 始终返回 10,它表示插入了数据 10 条,但是可能这 10 条数据在真正 flush 到 collection 时,由于 PK 重复被过滤掉。

from pymilvus import connections
from pymilvus import Collection
from pymilvus import list_collections
import numpy as np



def test():

    # 连接数据库
    connections.connect(alias='main', host='localhost', port=19530)

    # 打印所有的 collection 集合
    print('获得所有的集合:', list_collections(using='main'))

    # 获得已存在集合对象
    collection = Collection(name='my_collection',using='main')

    # 构建要插入的数据(id, name, vector)
    # 注意: 数据需要按照字段顺序构建
    data_num = 10
    user_data = [np.arange(data_num), ['name_' + str(i) for i in range(data_num)], np.random.randn(data_num, 328)]

    # 向默认的 default 分区中插入 data_num 条数据
    # 如果 id 存在则进行更新操作,需要 flush 将内存中的更新写入到数据库
    res = collection.insert(data=user_data, partition_name='_default')
    collection.flush()
    print('插入行数:', res.insert_count)
    print('插入主键:', res.primary_keys)

    # 删除数据, Milvus 仅支持删除具有明确指定主键的实体,什么主键大于几、小于几都是不支持的
    expr = 'id > 40'
    res = collection.delete(expr=expr)
    print('成功操作:', res.succ_count)
    print('删除行数:', res.delete_count)
    print('删除主键', res.primary_keys)

    # 断开数据库
    connections.disconnect('main')


if __name__ == '__main__':
    test()

程序输出内容:

获得所有的集合: ['my_collection']
插入行数: 10
插入主键: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

5. 集合查询操作

在 Milvus 中进行查询操作,必须对要进行查询的字段、以及向量字段构建索引(及时不搜索该字段,仍然需要构建索引),并且要将 collection load 到内存中,因为 Milvus 的所有查询操作都是在内存中进行的。

另外,一个 collection 可以存在多个分区 Partition。例如:我们有 1 万条数据,插入时可以将这些数据按照某些条件分成 5 个分区 p1、p2、p3、p4、p5,插入时候就可以指定数据的插入分区。

当 load 数据时,可以只 load 某个分区数据,加快搜索速度。

距离度量和索引类型:https://milvus.io/docs/metric.md#floating

示例代码:

from pymilvus import connections
from pymilvus import Collection

def test():

    # 连接数据库
    connections.connect(alias='main', host='localhost', port=19530)

    # 获得 collection 对象
    collection = Collection(name='my_collection',using='main')
    collection.release()

    # 我们要查询 ID 字段,给其构建索引
    # 构建索引需要在 collection 未加载到内存时进行
    collection.create_index(field_name='id', index_name='PK_index')

    # 给向量字段构建索引,并指定索引类型,以及相似度度量方式
    # nlist 表示簇的个数,该参数可以将向量划分成多个区域,有利于加快搜索
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 1024}
    }
    collection.create_index(field_name='vector', index_name='vector_index', index_params=index_params)

    # 将整个 collection 加载到内存中,也可以只加载某个 Partition
    collection.load()

    # 根据某个字段查询
    # query 函数的 output_fields 字段可以返回任意字段
    res = collection.query(expr='id > 0', output_fields=['id', 'name'])
    print(len(res), res)


    # 查询相似的向量
    data = np.random.randn(1, 328)
    res = collection.search(data=data,   # 要搜索的向量
                            limit=3,     # 返回记录数量
                            anns_field='vector',  # 要搜索的字段
                            param={'nprobe': 10, 'metric_type': 'L2'},   # nprobe 在最近的10个簇中搜索
                            output_fields=['id'])  # 输出字段名字,只能是标量字段(数字或者小数字段,字符串无法返回)

    print(res)


    # 断开数据库
    connections.disconnect('main')


if __name__ == '__main__':
    test()

程序输出结果:

# query 函数根据 ID 字段的搜索结果
9 [{'id': 1, 'name': 'name_1'}, {'id': 2, 'name': 'name_%s2'}, {'id': 3, 'name': 'name_3'}, {'id': 4, 'name': 'name_%s4'}, {'id': 5, 'name': 'name_5'}, {'id': 6, 'name': 'name_6'}, {'id': 7, 'name': 'name_7'}, {'id': 8, 'name': 'name_8'}, {'id': 9, 'name': 'name_9'}]

# search 函数搜索的最近向量
["['(distance: 535.2747802734375, id: 2)', '(distance: 552.648681640625, id: 1)', '(distance: 558.37158203125, id: 7)']"]
未经允许不得转载:一亩三分地 » 向量数据库 milvus 使用
评论 (0)

6 + 8 =