123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781 |
- import faiss
- import numpy as np
- from PIL import Image
- import io
- import os
- from typing import List, Tuple, Optional, Union
- import torch
- import torchvision.transforms as transforms
- import torchvision.models as models
- from torchvision.models import ResNet50_Weights
- from scipy import ndimage
- import torch.nn.functional as F
- from pymongo import MongoClient
- import datetime
- import time
- class ImageSearchEngine:
- def __init__(self):
- # 添加mongodb
- self.mongo_client = MongoClient("mongodb://root:faiss_image_search@localhost:27017/") # MongoDB 连接字符串
- self.mongo_db = self.mongo_client["faiss_index"] # 数据库名称
- self.mongo_collection = self.mongo_db["mat_vectors"] # 集合名称
- self.mongo_collection.create_index([("product_id", 1)], unique=True)
- self.mongo_collection.create_index([("faiss_id", 1)], unique=True)
- # 初始化一个id生成计数
- self.faiss_id_max = 0
-
- # 检查GPU是否可用(仅用于PyTorch模型)
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print(f"使用设备: {self.device}")
-
- # 定义基础预处理转换
- self.base_transform = transforms.Compose([
- transforms.Grayscale(num_output_channels=3), # 转换为灰度图但保持3通道
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
- ])
-
-
- # 加载预训练的ResNet模型
- self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
- # 移除最后的全连接层
- self.model = torch.nn.Sequential(*list(self.model.children())[:-1])
- self.model = self.model.to(self.device)
- self.model.eval()
-
- # 初始化FAISS索引(2048是ResNet50的特征维度)
- self.dimension = 2048
- # self.index = faiss.IndexFlatL2(self.dimension)
- # 改为支持删除的索引
- base_index = faiss.IndexFlatL2(self.dimension)
- self.index = faiss.IndexIDMap(base_index)
-
-
- # 尝试加载现有索引,如果不存在则创建新索引
- if self._load_index():
- print("成功加载现有索引")
- def _batch_generator(self, cursor, batch_size):
- """从MongoDB游标中分批生成数据"""
- batch = []
- for doc in cursor:
- batch.append(doc)
- if len(batch) == batch_size:
- yield batch
- batch = []
- if batch:
- yield batch
- def _process_image(self, image_path: str) -> Optional[torch.Tensor]:
- # 这里调用阿里云云函数处理图片
- """处理单张图片并提取特征。
-
- Args:
- image_path: 图片路径
-
- Returns:
- 处理后的特征向量,如果处理失败返回None
- """
- try:
- # 读取图片
- image = Image.open(image_path)
-
- # 确保图片是RGB模式
- if image.mode != 'RGB':
- image = image.convert('RGB')
-
- start_ms_time = time.time()
- # 提取多尺度特征
- multi_scale_features = self._extract_multi_scale_features(image)
- end_ms_time = time.time()
- print(f"提取多尺度特征耗时: { end_ms_time - start_ms_time } s",)
- if multi_scale_features is None:
- return None
-
- start_sw_time = time.time()
- # 提取滑动窗口特征
- sliding_window_features = self._extract_sliding_window_features(image)
- end_sw_time = time.time()
- print(f"提取滑动窗口耗时: { end_sw_time - start_sw_time } s",)
- if sliding_window_features is None:
- return None
-
- # 组合特征(加权平均)
- combined_feature = multi_scale_features * 0.6 + sliding_window_features * 0.4
-
- # 标准化特征
- combined_feature = F.normalize(combined_feature, p=2, dim=0)
-
- return combined_feature
-
- except Exception as e:
- print(f"处理图片时出错: {e}")
- return None
- def _extract_multi_scale_features(self, image: Image.Image) -> Optional[torch.Tensor]:
- """基于原图分辨率的多尺度特征提取(智能动态调整版)
-
- Args:
- image: PIL图片对象
-
- Returns:
- 多尺度特征向量,处理失败返回None
- """
- try:
- # 获取原图信息
- orig_w, orig_h = image.size
- max_edge = max(orig_w, orig_h)
- aspect_ratio = orig_w / orig_h
- # 动态调整策略 -------------------------------------------
- # 策略1:根据最大边长确定基准尺寸
- base_size = min(max_edge, 3000) # 不超过模型支持的最大尺寸
-
- # 策略2:自动生成窗口尺寸(等比数列)
- min_size = 224 # 最小特征尺寸
- num_scales = 4 # 固定采样点数
- scale_factors = np.logspace(0, 1, num_scales, base=2)
- window_sizes = [int(base_size * f) for f in scale_factors]
- window_sizes = sorted({min(max(s, min_size), 3000) for s in window_sizes})
-
- # 策略3:根据长宽比调整尺寸组合
- if aspect_ratio > 1.5: # 宽幅图像
- window_sizes = [int(s*aspect_ratio) for s in window_sizes]
- elif aspect_ratio < 0.67: # 竖幅图像
- window_sizes = [int(s/aspect_ratio) for s in window_sizes]
- # 预处理优化 --------------------------------------------
- # 选择最优基准尺寸(最接近原图尺寸的2的幂次)
- base_size = 2 ** int(np.log2(base_size))
- base_transform = transforms.Compose([
- transforms.Resize((base_size, base_size),
- interpolation=transforms.InterpolationMode.LANCZOS),
- self.base_transform
- ])
-
- # 半精度加速
- self.model.half()
- img_base = base_transform(image).unsqueeze(0).to(self.device).half()
- # 动态特征提取 ------------------------------------------
- features = []
- for size in window_sizes:
- # 保持长宽比的重采样
- target_size = (int(size*aspect_ratio), size) if aspect_ratio > 1 else (size, int(size/aspect_ratio))
-
- # GPU加速的智能插值
- img_tensor = torch.nn.functional.interpolate(
- img_base,
- size=target_size,
- mode= 'area' if size < base_size else 'bicubic', # 下采样用area,上采样用bicubic
- align_corners=False
- )
- # 自适应归一化(保持原图统计特性)
- if hasattr(self, 'adaptive_normalize'):
- img_tensor = self.adaptive_normalize(img_tensor)
- # 混合精度推理
- with torch.no_grad(), torch.cuda.amp.autocast():
- feature = self.model(img_tensor)
-
- features.append(feature.squeeze().float())
- # 动态权重分配 ------------------------------------------
- # 基于尺寸差异的权重(尺寸越接近原图权重越高)
- size_diffs = [abs(size - base_size) for size in window_sizes]
- weights = 1 / (torch.tensor(size_diffs, device=self.device) + 1e-6)
- weights = weights / weights.sum()
- # 加权融合
- final_feature = torch.stack([f * w for f, w in zip(features, weights)]).sum(dim=0)
-
- return final_feature
- except Exception as e:
- print(f"智能特征提取失败: {e}")
- return None
- def _extract_multi_scale_features_bak(self, image: Image.Image) -> Optional[torch.Tensor]:
- """提取多尺度特征。
-
- Args:
- image: PIL图片对象
-
- Returns:
- 多尺度特征向量,如果处理失败返回None
- """
- try:
- features_list = []
- window_sizes = [256, 512,1024,1560,2048,2560,3000]
- # 多尺度转换 - 增加更多尺度
- #self.multi_scale_sizes = [224, 384, 512, 768, 1024, 1536,2048,3000]
- for size in window_sizes:
- # 调整图片大小
- transform = transforms.Compose([
- transforms.Resize((size, size), interpolation=transforms.InterpolationMode.LANCZOS),
- self.base_transform
- ])
-
- # 应用变换
- img_tensor = transform(image).unsqueeze(0).to(self.device)
-
- # 提取特征
- with torch.no_grad():
- feature = self.model(img_tensor)
-
- features_list.append(feature.squeeze())
-
- # 计算加权平均,较大尺度的权重更高
- weights = torch.linspace(1, 2, len(features_list)).to(self.device)
- weights = weights / weights.sum()
-
- weighted_features = torch.stack([f * w for f, w in zip(features_list, weights)])
- final_feature = weighted_features.sum(dim=0)
-
- return final_feature
-
- except Exception as e:
- print(f"提取多尺度特征时出错: {e}")
- return None
- def _extract_sliding_window_features(self, image: Image.Image) -> Optional[torch.Tensor]:
- """优化版滑动窗口特征提取(动态调整+批量处理)
-
- Args:
- image: PIL图片对象
-
- Returns:
- 滑动窗口特征向量,处理失败返回None
- """
- try:
- # 获取原图信息
- orig_w, orig_h = image.size
- aspect_ratio = orig_w / orig_h
-
- # 动态窗口配置 -------------------------------------------
- # 根据原图尺寸自动选择关键窗口尺寸(示例逻辑,需根据实际调整)
- max_dim = max(orig_w, orig_h)
- window_sizes = sorted({
- int(2 ** np.round(np.log2(max_dim * 0.1))), # 约10%尺寸
- int(2 ** np.floor(np.log2(max_dim * 0.5))), # 约50%尺寸
- int(2 ** np.ceil(np.log2(max_dim))) # 接近原图尺寸
- } & {256, 512, 1024, 2048, 3000}) # 与预设尺寸取交集
-
- # 智能步长调整(窗口尺寸越大步长越大)
- stride_ratios = {256:0.5, 512:0.4, 1024:0.3, 2048:0.2, 3000:0.15}
-
- # 预处理优化 --------------------------------------------
- # 生成基准图像(最大窗口尺寸)
- max_win_size = max(window_sizes)
- base_size = (int(max_win_size * aspect_ratio), max_win_size) if aspect_ratio > 1 else \
- (max_win_size, int(max_win_size / aspect_ratio))
-
- transform = transforms.Compose([
- transforms.Resize(base_size[::-1], interpolation=transforms.InterpolationMode.LANCZOS),
- self.base_transform
- ])
- base_img = transform(image).to(self.device)
-
- # 半精度加速
- self.model.half()
- base_img = base_img.half()
- # 批量特征提取 ------------------------------------------
- all_features = []
- for win_size in window_sizes:
- # 动态步长选择
- stride = int(win_size * stride_ratios.get(win_size, 0.3))
-
- # 生成窗口坐标(考虑边缘填充)
- h, w = base_img.shape[1:]
- num_h = (h - win_size) // stride + 1
- num_w = (w - win_size) // stride + 1
-
- # 调整窗口数量上限(防止显存溢出)
- MAX_WINDOWS = 32 # 根据显存调整
- if num_h * num_w > MAX_WINDOWS:
- stride = int(np.sqrt(h * w * win_size**2 / MAX_WINDOWS))
- num_h = (h - win_size) // stride + 1
- num_w = (w - win_size) // stride + 1
- # 批量裁剪窗口
- windows = []
- for i in range(num_h):
- for j in range(num_w):
- top = i * stride
- left = j * stride
- window = base_img[:, top:top+win_size, left:left+win_size]
- windows.append(window)
-
- if not windows:
- continue
- # 批量处理(自动分块防止OOM)
- BATCH_SIZE = 8 # 根据显存调整
- with torch.no_grad(), torch.cuda.amp.autocast():
- for i in range(0, len(windows), BATCH_SIZE):
- batch = torch.stack(windows[i:i+BATCH_SIZE])
- features = self.model(batch)
- all_features.append(features.cpu().float()) # 转移至CPU释放显存
- # 特征融合 ---------------------------------------------
- if not all_features:
- return None
-
- final_feature = torch.cat([f.view(-1, f.shape[-1]) for f in all_features], dim=0)
- final_feature = final_feature.mean(dim=0).to(self.device)
- return final_feature
- except Exception as e:
- print(f"滑动窗口特征提取失败: {e}")
- return None
- def _extract_sliding_window_features_bak(self, image: Image.Image) -> Optional[torch.Tensor]:
- """使用滑动窗口提取特征。
-
- Args:
- image: PIL图片对象
-
- Returns:
- 滑动窗口特征向量,如果处理失败返回None
- """
- try:
- window_sizes = [256, 512,1024,1560,2048,2560,3000]
- stride_ratio = 0.25 # 步长比例
-
- features_list = []
- for window_size in window_sizes:
- # 调整图片大小,保持宽高比
- aspect_ratio = image.size[0] / image.size[1]
- if aspect_ratio > 1:
- new_width = int(window_size * aspect_ratio)
- new_height = window_size
- else:
- new_width = window_size
- new_height = int(window_size / aspect_ratio)
-
- transform = transforms.Compose([
- transforms.Resize((new_height, new_width), interpolation=transforms.InterpolationMode.LANCZOS),
- self.base_transform
- ])
-
- # 转换图片
- img_tensor = transform(image)
-
- # 计算步长
- stride = int(window_size * stride_ratio)
-
- # 使用滑动窗口提取特征
- for i in range(0, img_tensor.size(1) - window_size + 1, stride):
- for j in range(0, img_tensor.size(2) - window_size + 1, stride):
- window = img_tensor[:, i:i+window_size, j:j+window_size].unsqueeze(0).to(self.device)
-
- with torch.no_grad():
- feature = self.model(window)
-
- features_list.append(feature.squeeze())
-
- # 如果没有提取到特征,返回None
- if not features_list:
- return None
-
- # 计算所有特征的平均值
- final_feature = torch.stack(features_list).mean(dim=0)
-
- return final_feature
-
- except Exception as e:
- print(f"提取滑动窗口特征时出错: {e}")
- return None
- def extract_features(self, img: Image.Image) -> np.ndarray:
- """结合多尺度和滑动窗口提取特征。
-
- Args:
- img: PIL图像对象
-
- Returns:
- 特征向量
- """
- try:
- # 提取多尺度特征
- multi_scale_features = self._extract_multi_scale_features(img)
- if multi_scale_features is None:
- raise ValueError("无法提取多尺度特征")
-
- # 提取滑动窗口特征
- sliding_window_features = self._extract_sliding_window_features(img)
- if sliding_window_features is None:
- raise ValueError("无法提取滑动窗口特征")
-
- # 组合特征
- combined_feature = multi_scale_features * 0.6 + sliding_window_features * 0.4
-
- # 标准化特征
- combined_feature = F.normalize(combined_feature, p=2, dim=0)
-
- # 转换为numpy数组
- return combined_feature.cpu().numpy()
-
- except Exception as e:
- print(f"特征提取失败: {e}")
- raise
- def add_image_from_url(self, image_path: str, product_id: str) -> bool:
- """从URL添加图片到索引。
-
- Args:
- url: 图片URL
- product_id: 图片对应的商品ID
-
- Returns:
- 添加成功返回True,失败返回False
- """
- try:
- # 使用原有的特征提取逻辑
- feature = self._process_image(image_path)
- if feature is None:
- print("无法提取特征")
- return False
-
- # 转换为numpy数组并添加到索引
- feature_np = feature.cpu().numpy().reshape(1, -1).astype('float32')
- idx = self.faiss_id_max + 1
- print(f"当前: idx { idx }")
- if not isinstance(idx, int) or idx <= 0:
- print("ID生成失败")
- return False
- self.faiss_id_max = idx
-
- # 向数据库写入记录
- record = {
- "faiss_id": idx,
- "product_id": product_id,
- "vector": feature_np.flatten().tolist(), # 将numpy数组转为列表
- "created_at": datetime.datetime.utcnow() # 记录创建时间
- }
- self.mongo_collection.insert_one(record)
- # 为向量设置ID并添加到Faiss索引
- self.index.add_with_ids(feature_np, np.array([idx], dtype='int64'))
-
- print(f"已添加图片: product_id: {product_id}, faiss_id: {idx}")
- return True
- except Exception as e:
- print(f"添加图片时出错: {e}")
- return False
- def get_product_id_by_faiss_id(self, faiss_id: int) -> Optional[str]:
- """根据 faiss_id 查找 MongoDB 中的 product_id。
-
- Args:
- faiss_id: Faiss 索引中的 ID
-
- Returns:
- 对应的 product_id,如果未找到则返回 None
- """
- try:
- faiss_id = int(faiss_id)
- # 检查 faiss_id 是否有效
- if faiss_id < 0:
- print(f"无效的 faiss_id: {faiss_id}")
- return None
- # 查询 MongoDB
- query = {"faiss_id": faiss_id}
- record = self.mongo_collection.find_one(query)
- # 检查是否找到记录
- if record is None:
- print(f"未找到 faiss_id 为 {faiss_id} 的记录")
- return None
- # 返回 product_id
- product_id = record.get("product_id")
- if product_id is None:
- print(f"记录中缺少 product_id 字段: {record}")
- return None
- return str(product_id) # 确保返回字符串类型
- except Exception as e:
- print(f"查询 faiss_id 为 {faiss_id} 的记录时出错: {e}")
- return None
- def search(self, image_path: str = None, top_k: int = 5) -> List[Tuple[str, float]]:
- try:
- if image_path is None:
- print("搜索图片下载失败!")
- return []
- feature = self._process_image(image_path)
- if feature is None:
- print("无法提取查询图片的特征")
- return []
-
- # 将特征转换为numpy数组
- feature_np = feature.cpu().numpy().reshape(1, -1).astype('float32')
-
- start_vector_time = time.time()
- # 搜索最相似的图片
- distances, indices = self.index.search(feature_np, min(top_k, self.index.ntotal))
- end_vector_time = time.time()
- print(f"搜索vector耗时: {end_vector_time - start_vector_time}")
- start_other_time = time.time()
- # 返回结果
- results = []
- for faiss_id, dist in zip(indices[0], distances[0]):
- if faiss_id == -1: # Faiss返回-1表示无效结果
- continue
-
- # 将距离转换为相似度分数(0-1之间,1表示完全相似)
- similarity = 1.0 / (1.0 + dist)
-
- # 根据faiss_id获取product_id
- print(f"搜索结果->faiss_id: { faiss_id }")
- product_id = self.get_product_id_by_faiss_id(faiss_id)
- if product_id:
- results.append((product_id, similarity))
- end_other_time = time.time()
- print(f"查询结果耗时: {end_other_time - start_other_time}")
- return results
- except Exception as e:
- print(f"搜索图片时出错: {e}")
- return []
- def _load_index(self) -> bool:
- """从数据库分批加载数据并初始化faiss_id_max"""
- try:
- # 配置参数
- BATCH_SIZE = 10000
- # 获取文档总数
- total_docs = self.mongo_collection.count_documents({})
- if total_docs == 0:
- print("数据库为空,跳过索引加载")
- return True # 空数据库不算错误
- # 用于跟踪最大ID(兼容空数据情况)
- max_faiss_id = -1
-
- # 分批加载数据
- cursor = self.mongo_collection.find({}).batch_size(BATCH_SIZE)
- for batch in self._batch_generator(cursor, BATCH_SIZE):
- # 处理批次数据
- batch_vectors = []
- batch_ids = []
- current_max = -1
-
- for doc in batch:
- try:
- # 数据校验
- if len(doc['vector']) != self.dimension:
- continue
- if not isinstance(doc['faiss_id'], int):
- continue
-
- # 提取数据
- faiss_id = int(doc['faiss_id'])
- vector = doc['vector']
- print(f"load faiss_id :{ faiss_id }")
-
- # 更新最大值
- if faiss_id > current_max:
- current_max = faiss_id
-
- # 收集数据
- batch_vectors.append(vector)
- batch_ids.append(faiss_id)
- except Exception as e:
- print(f"文档处理异常: {str(e)}")
- continue
- # 批量添加到索引
- if batch_vectors:
- vectors_np = np.array(batch_vectors, dtype='float32')
- ids_np = np.array(batch_ids, dtype='int64')
- self.index.add_with_ids(vectors_np, ids_np)
-
- # 更新全局最大值
- if current_max > max_faiss_id:
- max_faiss_id = current_max
- print(f"向量总数: {self.index.ntotal}")
-
- # 设置初始值(如果已有更大值则保留)
- if max_faiss_id != -1:
- new_id = max_faiss_id
- self.faiss_id_max = new_id
- print(f"ID计数器初始化完成,当前值: {new_id}")
-
- return True
- except Exception as e:
- print(f"索引加载失败: {str(e)}")
- return False
-
- def clear(self) -> bool:
- """清除所有索引和 MongoDB 中的记录。
-
- Returns:
- 清除成功返回 True,失败返回 False
- """
- try:
- # 检查索引是否支持重置操作
- if not hasattr(self.index, "reset"):
- print("当前索引不支持重置操作")
- return False
- # 重置 Faiss 索引
- self.index.reset()
- print("已清除 Faiss 索引中的所有向量")
- # 删除 MongoDB 中的所有记录
- result = self.mongo_collection.delete_many({})
- print(f"已从 MongoDB 中删除 {result.deleted_count} 条记录")
- self.faiss_id_max = 0
- return True
- except Exception as e:
- print(f"清除索引时出错: {e}")
- return False
- def remove_image(self, image_path: str) -> bool:
- """从索引中移除指定图片。
-
- Args:
- image_path: 要移除的图片路径
-
- Returns:
- 是否成功移除
- """
- try:
- if image_path in self.image_paths:
- idx = self.image_paths.index(image_path)
-
- # 创建新的索引
- new_index = faiss.IndexFlatL2(self.dimension)
-
- # 获取所有特征
- all_features = faiss.vector_to_array(self.index.get_xb()).reshape(-1, self.dimension)
-
- # 移除指定图片的特征
- mask = np.ones(len(self.image_paths), dtype=bool)
- mask[idx] = False
- filtered_features = all_features[mask]
-
- # 更新索引
- if len(filtered_features) > 0:
- new_index.add(filtered_features)
-
- # 更新图片路径列表
- self.image_paths.pop(idx)
- self.product_ids.pop(idx)
-
- # 更新索引
- self.index = new_index
-
- # 保存更改
- self._save_index()
-
- print(f"已移除图片: {image_path}")
- return True
- else:
- print(f"图片不存在: {image_path}")
- return False
-
- except Exception as e:
- print(f"移除图片时出错: {e}")
- return False
- def remove_by_product_id(self, product_id: str) -> bool:
- """通过 product_id 删除向量索引和数据库记录。
-
- Args:
- product_id: 要删除的商品 ID
-
- Returns:
- 删除成功返回 True,失败返回 False
- """
- try:
- # 检查 product_id 是否有效
- if not product_id or not isinstance(product_id, str):
- print(f"无效的 product_id: {product_id}")
- return False
- # 查询 MongoDB 获取 faiss_id
- query = {"product_id": product_id}
- record = self.mongo_collection.find_one(query)
- # 检查是否找到记录
- if record is None:
- print(f"未找到 product_id 为 {product_id} 的记录")
- return False
- # 提取 faiss_id
- faiss_id = record.get("faiss_id")
- if faiss_id is None:
- print(f"记录中缺少 faiss_id 字段: {record}")
- return False
- # 删除 Faiss 索引中的向量
- if isinstance(self.index, faiss.IndexIDMap):
- # 检查 faiss_id 是否在索引中
- # ids = self.index.id_map.at(1) # 获取所有 ID
- # if faiss_id not in ids:
- # print(f"faiss_id {faiss_id} 不在索引中")
- # return False
- # 删除向量
- self.index.remove_ids(np.array([faiss_id], dtype='int64'))
- print(f"已从 Faiss 索引中删除 faiss_id: {faiss_id}")
- else:
- print("当前索引不支持删除操作")
- return False
- # 删除 MongoDB 中的记录
- result = self.mongo_collection.delete_one({"faiss_id": faiss_id})
- if result.deleted_count == 1:
- print(f"已从 MongoDB 中删除 faiss_id: {faiss_id}")
- return True
- else:
- print(f"未找到 faiss_id 为 {faiss_id} 的记录")
- return False
- except Exception as e:
- print(f"删除 product_id 为 {product_id} 的记录时出错: {e}")
- traceback.print_exc()
- return False
- def get_index_size(self) -> int:
- """获取索引中的图片数量。
-
- Returns:
- 索引中的图片数量
- """
- return len(self.image_paths)
|