import numpy as np from PIL import Image from typing import Optional import torch import torchvision.transforms as transforms import torchvision.models as models from torchvision.models import ResNet50_Weights import torch.nn.functional as F import time class ImageSearchEngine: def __init__(self): # 检查GPU是否可用(仅用于PyTorch模型) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {self.device}") # 定义基础预处理转换 self.base_transform = transforms.Compose([ transforms.Grayscale(num_output_channels=3), # 转换为灰度图但保持3通道 transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # 加载预训练的ResNet模型 self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V2) # 移除最后的全连接层 self.model = torch.nn.Sequential(*list(self.model.children())[:-1]) self.model = self.model.to(self.device) self.model.eval() # 初始化FAISS索引(2048是ResNet50的特征维度) self.dimension = 2048 # self.index = faiss.IndexFlatL2(self.dimension) # 改为支持删除的索引 # base_index = faiss.IndexFlatL2(self.dimension) # self.index = faiss.IndexIDMap(base_index) def _process_image(self, image_path: str) -> Optional[torch.Tensor]: """处理单张图片并提取特征。 Args: image_path: 图片路径 Returns: 处理后的特征向量,如果处理失败返回None """ try: # 读取图片 image = Image.open(image_path) # 确保图片是RGB模式 if image.mode != 'RGB': image = image.convert('RGB') start_ms_time = time.time() # 提取多尺度特征 multi_scale_features = self._extract_multi_scale_features(image) end_ms_time = time.time() print(f"提取多尺度特征耗时: { end_ms_time - start_ms_time } s",) if multi_scale_features is None: return None start_sw_time = time.time() # 提取滑动窗口特征 sliding_window_features = self._extract_sliding_window_features(image) end_sw_time = time.time() print(f"提取滑动窗口耗时: { end_sw_time - start_sw_time } s",) if sliding_window_features is None: return None # 组合特征(加权平均) combined_feature = multi_scale_features * 0.6 + sliding_window_features * 0.4 # 标准化特征 combined_feature = F.normalize(combined_feature, p=2, dim=0) return combined_feature except Exception as e: print(f"处理图片时出错: {e}") return None def _extract_multi_scale_features(self, image: Image.Image) -> Optional[torch.Tensor]: """基于原图分辨率的多尺度特征提取(智能动态调整版) Args: image: PIL图片对象 Returns: 多尺度特征向量,处理失败返回None """ try: # 获取原图信息 orig_w, orig_h = image.size max_edge = max(orig_w, orig_h) aspect_ratio = orig_w / orig_h # 动态调整策略 ------------------------------------------- # 策略1:根据最大边长确定基准尺寸 base_size = min(max_edge, 3000) # 不超过模型支持的最大尺寸 # 策略2:自动生成窗口尺寸(等比数列) min_size = 224 # 最小特征尺寸 num_scales = 4 # 固定采样点数 scale_factors = np.logspace(0, 1, num_scales, base=2) window_sizes = [int(base_size * f) for f in scale_factors] window_sizes = sorted({min(max(s, min_size), 3000) for s in window_sizes}) # 策略3:根据长宽比调整尺寸组合 if aspect_ratio > 1.5: # 宽幅图像 window_sizes = [int(s*aspect_ratio) for s in window_sizes] elif aspect_ratio < 0.67: # 竖幅图像 window_sizes = [int(s/aspect_ratio) for s in window_sizes] # 预处理优化 -------------------------------------------- # 选择最优基准尺寸(最接近原图尺寸的2的幂次) base_size = 2 ** int(np.log2(base_size)) base_transform = transforms.Compose([ transforms.Resize((base_size, base_size), interpolation=transforms.InterpolationMode.LANCZOS), self.base_transform ]) # 半精度加速 self.model.half() img_base = base_transform(image).unsqueeze(0).to(self.device).half() # 动态特征提取 ------------------------------------------ features = [] for size in window_sizes: # 保持长宽比的重采样 target_size = (int(size*aspect_ratio), size) if aspect_ratio > 1 else (size, int(size/aspect_ratio)) # GPU加速的智能插值 img_tensor = torch.nn.functional.interpolate( img_base, size=target_size, mode= 'area' if size < base_size else 'bicubic', # 下采样用area,上采样用bicubic align_corners=False ) # 自适应归一化(保持原图统计特性) if hasattr(self, 'adaptive_normalize'): img_tensor = self.adaptive_normalize(img_tensor) # 混合精度推理 with torch.no_grad(), torch.cuda.amp.autocast(): feature = self.model(img_tensor) features.append(feature.squeeze().float()) # 动态权重分配 ------------------------------------------ # 基于尺寸差异的权重(尺寸越接近原图权重越高) size_diffs = [abs(size - base_size) for size in window_sizes] weights = 1 / (torch.tensor(size_diffs, device=self.device) + 1e-6) weights = weights / weights.sum() # 加权融合 final_feature = torch.stack([f * w for f, w in zip(features, weights)]).sum(dim=0) return final_feature except Exception as e: print(f"智能特征提取失败: {e}") return None def _extract_sliding_window_features(self, image: Image.Image) -> Optional[torch.Tensor]: """优化版滑动窗口特征提取(动态调整+批量处理) Args: image: PIL图片对象 Returns: 滑动窗口特征向量,处理失败返回None """ try: # 获取原图信息 orig_w, orig_h = image.size aspect_ratio = orig_w / orig_h # 动态窗口配置 ------------------------------------------- # 根据原图尺寸自动选择关键窗口尺寸(示例逻辑,需根据实际调整) max_dim = max(orig_w, orig_h) window_sizes = sorted({ int(2 ** np.round(np.log2(max_dim * 0.1))), # 约10%尺寸 int(2 ** np.floor(np.log2(max_dim * 0.5))), # 约50%尺寸 int(2 ** np.ceil(np.log2(max_dim))) # 接近原图尺寸 } & {256, 512, 1024, 2048, 3000}) # 与预设尺寸取交集 # 智能步长调整(窗口尺寸越大步长越大) stride_ratios = {256:0.5, 512:0.4, 1024:0.3, 2048:0.2, 3000:0.15} # 预处理优化 -------------------------------------------- # 生成基准图像(最大窗口尺寸) max_win_size = max(window_sizes) base_size = (int(max_win_size * aspect_ratio), max_win_size) if aspect_ratio > 1 else \ (max_win_size, int(max_win_size / aspect_ratio)) transform = transforms.Compose([ transforms.Resize(base_size[::-1], interpolation=transforms.InterpolationMode.LANCZOS), self.base_transform ]) base_img = transform(image).to(self.device) # 半精度加速 self.model.half() base_img = base_img.half() # 批量特征提取 ------------------------------------------ all_features = [] for win_size in window_sizes: # 动态步长选择 stride = int(win_size * stride_ratios.get(win_size, 0.3)) # 生成窗口坐标(考虑边缘填充) h, w = base_img.shape[1:] num_h = (h - win_size) // stride + 1 num_w = (w - win_size) // stride + 1 # 调整窗口数量上限(防止显存溢出) MAX_WINDOWS = 32 # 根据显存调整 if num_h * num_w > MAX_WINDOWS: stride = int(np.sqrt(h * w * win_size**2 / MAX_WINDOWS)) num_h = (h - win_size) // stride + 1 num_w = (w - win_size) // stride + 1 # 批量裁剪窗口 windows = [] for i in range(num_h): for j in range(num_w): top = i * stride left = j * stride window = base_img[:, top:top+win_size, left:left+win_size] windows.append(window) if not windows: continue # 批量处理(自动分块防止OOM) BATCH_SIZE = 8 # 根据显存调整 with torch.no_grad(), torch.cuda.amp.autocast(): for i in range(0, len(windows), BATCH_SIZE): batch = torch.stack(windows[i:i+BATCH_SIZE]) features = self.model(batch) all_features.append(features.cpu().float()) # 转移至CPU释放显存 # 特征融合 --------------------------------------------- if not all_features: return None final_feature = torch.cat([f.view(-1, f.shape[-1]) for f in all_features], dim=0) final_feature = final_feature.mean(dim=0).to(self.device) return final_feature except Exception as e: print(f"滑动窗口特征提取失败: {e}") return None