123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- import sys
- from config import MILVUS_HOST, MILVUS_PORT, VECTOR_DIMENSION, METRIC_TYPE
- from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
- from logs import LOGGER
- class MilvusHelper:
- """
- MilvusHelper class to manager the Milvus Collection.
- Args:
- host (`str`):
- Milvus server Host.
- port (`str|int`):
- Milvus server port.
- ...
- """
- def __init__(self, host=MILVUS_HOST, port=MILVUS_PORT):
- try:
- self.collection = None
- connections.connect(host=host, port=port)
- LOGGER.debug(f"Successfully connect to Milvus with IP:{MILVUS_HOST} and PORT:{MILVUS_PORT}")
- except Exception as e:
- LOGGER.error(f"Failed to connect Milvus: {e}")
- sys.exit(1)
- def set_collection(self, collection_name):
- try:
- self.collection = Collection(name=collection_name)
- except Exception as e:
- LOGGER.error(f"Failed to load data to Milvus: {e}")
- sys.exit(1)
- def has_collection(self, collection_name):
- # Return if Milvus has the collection
- try:
- return utility.has_collection(collection_name)
- except Exception as e:
- LOGGER.error(f"Failed to load data to Milvus: {e}")
- sys.exit(1)
- def create_collection(self, collection_name):
- # Create milvus collection if not exists
- try:
- if not self.has_collection(collection_name):
- im_hash = FieldSchema(name='im_hash', dtype=DataType.VARCHAR, descrition='id to image', max_length=500,
- is_primary=True, auto_id=False)
- product_id = FieldSchema(name='product_id', dtype=DataType.VARCHAR, descrition='product_id to image', max_length=500,
- is_primary=False, auto_id=False)
- im_vector = FieldSchema(name="im_vector", dtype=DataType.FLOAT_VECTOR, descrition="image vectors",
- dim=VECTOR_DIMENSION, is_primary=False)
- schema = CollectionSchema(fields=[im_hash, product_id,im_vector], description="collection_name: "+collection_name)
- self.collection = Collection(name=collection_name, schema=schema)
- self.create_index(collection_name)
- LOGGER.debug(f"Create Milvus collection: {collection_name}")
- else:
- self.set_collection(collection_name)
- return "OK"
- except Exception as e:
- LOGGER.error(f"Failed to load data to Milvus: {e}")
- sys.exit(1)
- # 创建分区
- def create_partition(self, partition_name):
- # Create milvus collection if not exists
- try:
- if partition_name is not None:
- if not self.collection.has_partition(partition_name):
- self.collection.create_partition(partition_name)
- LOGGER.debug(f"Create Milvus partition: {partition_name}")
- # else:
- # self.set_collection(collection_name)
- return "OK"
- except Exception as e:
- LOGGER.error(f"Failed to load data to Milvus: {e}")
- sys.exit(1)
- def insert(self, collection_name,partition_name,im_hash,product_id,im_vector):
- # Batch insert vectors to milvus collection
- try:
- data = [im_hash,product_id,im_vector]
- print(data)
- self.set_collection(collection_name)
- self.create_partition(partition_name)
- mr = self.collection.insert(data,partition_name)
- ids = mr.primary_keys
- self.collection.load()
- LOGGER.debug(
- f"Insert vectors to Milvus in collection: {collection_name} with {len(im_vector)} rows")
- return ids
- except Exception as e:
- LOGGER.error(f"Failed to load data to Milvus: {e}")
- sys.exit(1)
- def create_index(self, collection_name):
- # Create IVF_FLAT index on milvus collection
- try:
- self.set_collection(collection_name)
- default_index = {"index_type": "IVF_FLAT", "metric_type": METRIC_TYPE, "params": {"nlist": 16384}}
- status = self.collection.create_index(field_name="im_vector", index_params=default_index)
- if not status.code:
- LOGGER.debug(
- f"Successfully create index in collection:{collection_name} with param:{default_index}")
- return status
- else:
- raise Exception(status.message)
- except Exception as e:
- LOGGER.error(f"Failed to create index: {e}")
- sys.exit(1)
- def delete_collection(self, collection_name):
- # Delete Milvus collection
- try:
- self.set_collection(collection_name)
- self.collection.drop()
- LOGGER.debug("Successfully drop collection!")
- return "ok"
- except Exception as e:
- LOGGER.error(f"Failed to drop collection: {e}")
- sys.exit(1)
- def delete_record(self, collection_name,partition_name,expr):
- # Delete Milvus collection
- try:
- self.set_collection(collection_name)
- res = self.collection.delete(expr,partition_name)
- LOGGER.debug("Successfully delete record")
- return res
- except Exception as e:
- LOGGER.error(f"Failed to delete record: {e}")
- sys.exit(1)
- def search_vectors(self, collection_name,partition_names,im_vector, top_k):
- try:
- self.set_collection(collection_name)
- search_params = {"metric_type": METRIC_TYPE, "params": {"nprobe": 16}}
- res = self.collection.search(
- im_vector,
- anns_field="im_vector",
- param=search_params,
- limit=top_k,
- expr= None,
- # expr= "product_id like \"63a6dd57cd3dd570bb943e81\"",
- partition_names=partition_names,
- output_fields=["product_id"])
- LOGGER.debug(f"Successfully search in collection: {res}")
- return res
- except Exception as e:
- LOGGER.error(f"Failed to search vectors in Milvus: {e}")
- sys.exit(1)
- def count(self, collection_name):
- # Get the number of milvus collection
- try:
- self.set_collection(collection_name)
- num = self.collection.num_entities
- LOGGER.debug(f"Successfully get the num:{num} of the collection:{collection_name}")
- return num
- except Exception as e:
- LOGGER.error(f"Failed to count vectors in Milvus: {e}")
- sys.exit(1)
|