AI Agent 六大趋势怎么看
2026/5/15 21:33:09
在大语言模型(LLM)、计算机视觉、推荐系统等人工智能应用落地过程中,非结构化数据(文本、图片、音频、视频)的相似性检索成为核心需求 —— 这类数据需先通过模型转化为高维向量,再通过向量相似性计算实现快速匹配。Milvus 作为开源的分布式向量数据库,专为高维向量的存储、索引与相似性检索设计,能够解决传统关系型数据库、键值数据库在向量处理上的性能瓶颈。企业与开发者开展 “Milvus 向量数据库使用尝试” 的需求,源于传统数据存储方式在向量处理场景的痛点,以及对 “高效、可扩展、低成本” 实现非结构化数据检索的核心诉求。
随着 AI 应用的普及,高维向量数据(如文本的 Embedding 向量、图片的特征向量)的规模呈指数级增长,传统数据存储方式在处理这类数据时,暴露出难以逾越的短板:
Milvus 作为专为向量检索设计的数据库,从底层架构上解决了传统存储方式的痛点。开发者与企业开展 “使用尝试”,本质是验证其在向量存储、检索、扩展等方面的能力,探索将其融入 AI 应用的可行性,核心价值体现在:
开发者与企业开展 Milvus 使用尝试,并非单纯的技术验证,而是为了:
综上,Milvus 向量数据库使用尝试的需求,源于 AI 应用对高维向量数据高效处理的迫切诉求,以及传统数据存储方式在向量场景的全面失效。这一尝试不仅是开发者探索新技术的过程,更是企业将 AI 应用从 “演示级原型” 推向 “生产级落地” 的关键步骤 —— 通过验证 Milvus 的能力,解决非结构化数据检索的性能瓶颈,为 AI 应用的规模化部署提供核心数据存储支撑。
pip install pymilvusimport sys import os import random import numpy as np from pymilvus import ( connections, db, Collection, FieldSchema, CollectionSchema, DataType, utility ) # 解决Windows控制台中文乱码问题 if sys.platform == 'win32': # 设置标准输出编码为UTF-8 if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8') # 设置环境变量 os.environ['PYTHONIOENCODING'] = 'utf-8'host = "xxx" port = "19530" database_name = "test_db_01" collection_name = "test_collection_01" # 连接到Milvus print(f"正在连接到 Milvus {host}:{port}...") connections.connect( alias="default", host=host, port=port ) print("连接成功!\n")databases = db.list_database()db.create_database(database_name)db.using_database(database_name) # 定义Collection Schema print(f"正在创建Collection {collection_name}...") # 定义字段 fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=64), FieldSchema(name="a", dtype=DataType.INT64), FieldSchema(name="b", dtype=DataType.FLOAT) ] # 创建Schema schema = CollectionSchema( fields=fields, description="测试Collection,包含64维向量和标量字段a、b" ) # 检查Collection是否存在,如果存在则删除 if utility.has_collection(collection_name): print(f"Collection {collection_name} 已存在,正在删除...") utility.drop_collection(collection_name) # 创建Collection collection = Collection( name=collection_name, schema=schema )# 生成随机数据并插入 print("正在生成随机数据并插入...") data_id = random.randint(1, 1000000) data_vector = np.random.random((1, 64)).tolist()[0] # 64维随机向量 data_a = random.randint(1, 100) data_b = random.uniform(0.0, 100.0) data = [ [data_id], [data_vector], [data_a], [data_b] ] print(f"插入数据:") print(f" id: {data_id}") print(f" vector: {data_vector[:5]}... (显示前5维)") print(f" a: {data_a}") print(f" b: {data_b:.2f}") collection.insert(data) print("数据插入成功!\n")print("正在为向量字段创建索引...") index_params = { "metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index( field_name="vector", index_params=index_params ) print("索引创建成功!\n")# 加载Collection到内存 print("正在加载Collection到内存...") collection.load() print("加载成功!\n") # 进行查询 print("正在查询数据...") # 使用向量搜索 search_vector = [data_vector] # 使用插入的向量进行搜索 search_params = { "metric_type": "L2", "params": {"nprobe": 10} } results = collection.search( data=search_vector, anns_field="vector", param=search_params, limit=10, output_fields=["id", "a", "b"] ) print("查询结果:") for hits in results: for hit in hits: print(f" id: {hit.id}, 距离: {hit.distance:.4f}, a: {hit.entity.get('a')}, b: {hit.entity.get('b'):.2f}") # 也可以使用表达式查询 print("\n使用表达式查询:") query_result = collection.query( expr=f"id == {data_id}", output_fields=["id", "a", "b", "vector"] ) for result in query_result: print(f" id: {result['id']}") print(f" a: {result['a']}") print(f" b: {result['b']:.2f}") print(f" vector: {result['vector'][:5]}... (显示前5维)")