import sys from config import MILVUS_HOST, MILVUS_PORT, VECTOR_DIMENSION, METRIC_TYPE from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility from logs import LOGGER class MilvusHelper: """ MilvusHelper class to manager the Milvus Collection. Args: host (`str`): Milvus server Host. port (`str|int`): Milvus server port. ... """ def __init__(self, host=MILVUS_HOST, port=MILVUS_PORT): try: self.collection = None connections.connect(host=host, port=port) LOGGER.debug(f"Successfully connect to Milvus with IP:{MILVUS_HOST} and PORT:{MILVUS_PORT}") except Exception as e: LOGGER.error(f"Failed to connect Milvus: {e}") sys.exit(1) def set_collection(self, collection_name): try: self.collection = Collection(name=collection_name) except Exception as e: LOGGER.error(f"Failed to load data to Milvus: {e}") sys.exit(1) def has_collection(self, collection_name): # Return if Milvus has the collection try: return utility.has_collection(collection_name) except Exception as e: LOGGER.error(f"Failed to load data to Milvus: {e}") sys.exit(1) def create_collection(self, collection_name): # Create milvus collection if not exists try: if not self.has_collection(collection_name): im_hash = FieldSchema(name='im_hash', dtype=DataType.VARCHAR, descrition='id to image', max_length=500, is_primary=True, auto_id=False) product_id = FieldSchema(name='product_id', dtype=DataType.VARCHAR, descrition='product_id to image', max_length=500, is_primary=False, auto_id=False) im_vector = FieldSchema(name="im_vector", dtype=DataType.FLOAT_VECTOR, descrition="image vectors", dim=VECTOR_DIMENSION, is_primary=False) schema = CollectionSchema(fields=[im_hash, product_id,im_vector], description="collection_name: "+collection_name) self.collection = Collection(name=collection_name, schema=schema) self.create_index(collection_name) LOGGER.debug(f"Create Milvus collection: {collection_name}") else: self.set_collection(collection_name) return "OK" except Exception as e: LOGGER.error(f"Failed to load data to Milvus: {e}") sys.exit(1) # 创建分区 def create_partition(self, partition_name): # Create milvus collection if not exists try: if partition_name is not None: if not self.collection.has_partition(partition_name): self.collection.create_partition(partition_name) LOGGER.debug(f"Create Milvus partition: {partition_name}") # else: # self.set_collection(collection_name) return "OK" except Exception as e: LOGGER.error(f"Failed to load data to Milvus: {e}") sys.exit(1) def insert(self, collection_name,partition_name,im_hash,product_id,im_vector): # Batch insert vectors to milvus collection try: data = [im_hash,product_id,im_vector] print(data) self.set_collection(collection_name) self.create_partition(partition_name) mr = self.collection.insert(data,partition_name) ids = mr.primary_keys self.collection.load() LOGGER.debug( f"Insert vectors to Milvus in collection: {collection_name} with {len(im_vector)} rows") return ids except Exception as e: LOGGER.error(f"Failed to load data to Milvus: {e}") sys.exit(1) def create_index(self, collection_name): # Create IVF_FLAT index on milvus collection try: self.set_collection(collection_name) default_index = {"index_type": "IVF_FLAT", "metric_type": METRIC_TYPE, "params": {"nlist": 16384}} status = self.collection.create_index(field_name="im_vector", index_params=default_index) if not status.code: LOGGER.debug( f"Successfully create index in collection:{collection_name} with param:{default_index}") return status else: raise Exception(status.message) except Exception as e: LOGGER.error(f"Failed to create index: {e}") sys.exit(1) def delete_collection(self, collection_name): # Delete Milvus collection try: self.set_collection(collection_name) self.collection.drop() LOGGER.debug("Successfully drop collection!") return "ok" except Exception as e: LOGGER.error(f"Failed to drop collection: {e}") sys.exit(1) def delete_record(self, collection_name,partition_name,expr): # Delete Milvus collection try: self.set_collection(collection_name) res = self.collection.delete(expr,partition_name) LOGGER.debug("Successfully delete record") return res except Exception as e: LOGGER.error(f"Failed to delete record: {e}") sys.exit(1) def search_vectors(self, collection_name,partition_names,im_vector, top_k): try: self.set_collection(collection_name) search_params = {"metric_type": METRIC_TYPE, "params": {"nprobe": 16}} res = self.collection.search( im_vector, anns_field="im_vector", param=search_params, limit=top_k, expr= None, # expr= "product_id like \"63a6dd57cd3dd570bb943e81\"", partition_names=partition_names, output_fields=["product_id"]) LOGGER.debug(f"Successfully search in collection: {res}") return res except Exception as e: LOGGER.error(f"Failed to search vectors in Milvus: {e}") sys.exit(1) def count(self, collection_name): # Get the number of milvus collection try: self.set_collection(collection_name) num = self.collection.num_entities LOGGER.debug(f"Successfully get the num:{num} of the collection:{collection_name}") return num except Exception as e: LOGGER.error(f"Failed to count vectors in Milvus: {e}") sys.exit(1)