|
@@ -74,7 +74,6 @@ class ImageSearchEngine:
|
|
|
print(f"提取滑动窗口耗时: { end_sw_time - start_sw_time } s",)
|
|
|
if sliding_window_features is None:
|
|
|
return None
|
|
|
-
|
|
|
# 组合特征(加权平均)
|
|
|
combined_feature = multi_scale_features * 0.6 + sliding_window_features * 0.4
|
|
|
|
|
@@ -88,179 +87,195 @@ class ImageSearchEngine:
|
|
|
return None
|
|
|
|
|
|
def _extract_multi_scale_features(self, image: Image.Image) -> Optional[torch.Tensor]:
|
|
|
- """基于原图分辨率的多尺度特征提取(智能动态调整版)
|
|
|
-
|
|
|
- Args:
|
|
|
- image: PIL图片对象
|
|
|
-
|
|
|
- Returns:
|
|
|
- 多尺度特征向量,处理失败返回None
|
|
|
- """
|
|
|
+ """提取多尺度特征。"""
|
|
|
try:
|
|
|
- # 获取原图信息
|
|
|
- orig_w, orig_h = image.size
|
|
|
- max_edge = max(orig_w, orig_h)
|
|
|
- aspect_ratio = orig_w / orig_h
|
|
|
+ features_list = []
|
|
|
+ width, height = image.size
|
|
|
+ min_dim = min(width, height)
|
|
|
+ max_dim = max(width, height)
|
|
|
|
|
|
- # 动态调整策略 -------------------------------------------
|
|
|
- # 策略1:根据最大边长确定基准尺寸
|
|
|
- base_size = min(max_edge, 3000) # 不超过模型支持的最大尺寸
|
|
|
-
|
|
|
- # 策略2:自动生成窗口尺寸(等比数列)
|
|
|
- min_size = 224 # 最小特征尺寸
|
|
|
- num_scales = 4 # 固定采样点数
|
|
|
- scale_factors = np.logspace(0, 1, num_scales, base=2)
|
|
|
- window_sizes = [int(base_size * f) for f in scale_factors]
|
|
|
- window_sizes = sorted({min(max(s, min_size), 3000) for s in window_sizes})
|
|
|
-
|
|
|
- # 策略3:根据长宽比调整尺寸组合
|
|
|
- if aspect_ratio > 1.5: # 宽幅图像
|
|
|
- window_sizes = [int(s*aspect_ratio) for s in window_sizes]
|
|
|
- elif aspect_ratio < 0.67: # 竖幅图像
|
|
|
- window_sizes = [int(s/aspect_ratio) for s in window_sizes]
|
|
|
+ # 动态生成候选尺寸,基于原图尺寸
|
|
|
+ scales = [0.25, 0.5, 0.75, 1.0, 1.5, 2.0]
|
|
|
+ fixed_sizes = [256, 512, 1024, 2048]
|
|
|
+ candidate_sizes = [int(min_dim * s) for s in scales] + fixed_sizes
|
|
|
+ max_allowed = int(max_dim * 1.5)
|
|
|
+ window_sizes = [size for size in candidate_sizes if 64 <= size <= max_allowed]
|
|
|
+ window_sizes = sorted(list(set(window_sizes)))
|
|
|
|
|
|
- # 预处理优化 --------------------------------------------
|
|
|
- # 选择最优基准尺寸(最接近原图尺寸的2的幂次)
|
|
|
- base_size = 2 ** int(np.log2(base_size))
|
|
|
- base_transform = transforms.Compose([
|
|
|
- transforms.Resize((base_size, base_size),
|
|
|
- interpolation=transforms.InterpolationMode.LANCZOS),
|
|
|
- self.base_transform
|
|
|
- ])
|
|
|
-
|
|
|
- # 半精度加速
|
|
|
- self.model.half()
|
|
|
- img_base = base_transform(image).unsqueeze(0).to(self.device).half()
|
|
|
+ if not window_sizes:
|
|
|
+ return None
|
|
|
|
|
|
- # 动态特征提取 ------------------------------------------
|
|
|
- features = []
|
|
|
for size in window_sizes:
|
|
|
- # 保持长宽比的重采样
|
|
|
- target_size = (int(size*aspect_ratio), size) if aspect_ratio > 1 else (size, int(size/aspect_ratio))
|
|
|
+ # 保持宽高比调整较小边,并中心裁剪
|
|
|
+ transform = transforms.Compose([
|
|
|
+ transforms.Resize(size, interpolation=transforms.InterpolationMode.LANCZOS),
|
|
|
+ transforms.CenterCrop(size),
|
|
|
+ self.base_transform
|
|
|
+ ])
|
|
|
+ img_tensor = transform(image).unsqueeze(0).to(self.device)
|
|
|
|
|
|
- # GPU加速的智能插值
|
|
|
- img_tensor = torch.nn.functional.interpolate(
|
|
|
- img_base,
|
|
|
- size=target_size,
|
|
|
- mode= 'area' if size < base_size else 'bicubic', # 下采样用area,上采样用bicubic
|
|
|
- align_corners=False
|
|
|
- )
|
|
|
-
|
|
|
- # 自适应归一化(保持原图统计特性)
|
|
|
- if hasattr(self, 'adaptive_normalize'):
|
|
|
- img_tensor = self.adaptive_normalize(img_tensor)
|
|
|
-
|
|
|
- # 混合精度推理
|
|
|
- with torch.no_grad(), torch.cuda.amp.autocast():
|
|
|
+ with torch.no_grad():
|
|
|
feature = self.model(img_tensor)
|
|
|
-
|
|
|
- features.append(feature.squeeze().float())
|
|
|
-
|
|
|
- # 动态权重分配 ------------------------------------------
|
|
|
- # 基于尺寸差异的权重(尺寸越接近原图权重越高)
|
|
|
- size_diffs = [abs(size - base_size) for size in window_sizes]
|
|
|
- weights = 1 / (torch.tensor(size_diffs, device=self.device) + 1e-6)
|
|
|
- weights = weights / weights.sum()
|
|
|
+ features_list.append(feature.squeeze())
|
|
|
|
|
|
- # 加权融合
|
|
|
- final_feature = torch.stack([f * w for f, w in zip(features, weights)]).sum(dim=0)
|
|
|
+ # 加权平均(较大尺度权重更高)
|
|
|
+ weights = torch.linspace(1, 2, len(features_list), device=self.device)
|
|
|
+ weights /= weights.sum()
|
|
|
+ final_feature = torch.stack(features_list) * weights[:, None]
|
|
|
+ return final_feature.sum(dim=0)
|
|
|
|
|
|
- return final_feature
|
|
|
-
|
|
|
except Exception as e:
|
|
|
- print(f"智能特征提取失败: {e}")
|
|
|
+ print(f"提取多尺度特征时出错: {e}")
|
|
|
return None
|
|
|
-
|
|
|
+
|
|
|
def _extract_sliding_window_features(self, image: Image.Image) -> Optional[torch.Tensor]:
|
|
|
"""优化版滑动窗口特征提取(动态调整+批量处理)
|
|
|
|
|
|
Args:
|
|
|
image: PIL图片对象
|
|
|
-
|
|
|
+
|
|
|
Returns:
|
|
|
滑动窗口特征向量,处理失败返回None
|
|
|
"""
|
|
|
try:
|
|
|
+ # 基础图片检查
|
|
|
+ if image is None or image.size[0] < 64 or image.size[1] < 64:
|
|
|
+ print("图片无效或尺寸过小")
|
|
|
+ return None
|
|
|
+
|
|
|
# 获取原图信息
|
|
|
orig_w, orig_h = image.size
|
|
|
aspect_ratio = orig_w / orig_h
|
|
|
-
|
|
|
- # 动态窗口配置 -------------------------------------------
|
|
|
- # 根据原图尺寸自动选择关键窗口尺寸(示例逻辑,需根据实际调整)
|
|
|
max_dim = max(orig_w, orig_h)
|
|
|
- window_sizes = sorted({
|
|
|
- int(2 ** np.round(np.log2(max_dim * 0.1))), # 约10%尺寸
|
|
|
- int(2 ** np.floor(np.log2(max_dim * 0.5))), # 约50%尺寸
|
|
|
- int(2 ** np.ceil(np.log2(max_dim))) # 接近原图尺寸
|
|
|
- } & {256, 512, 1024, 2048, 3000}) # 与预设尺寸取交集
|
|
|
|
|
|
- # 智能步长调整(窗口尺寸越大步长越大)
|
|
|
- stride_ratios = {256:0.5, 512:0.4, 1024:0.3, 2048:0.2, 3000:0.15}
|
|
|
+ # 动态窗口配置 -------------------------------------------
|
|
|
+ # 使用对数尺度生成窗口尺寸,确保合理的尺寸分布
|
|
|
+ base_sizes = {256, 512, 1024, 2048}
|
|
|
+ log_size = np.log2(max_dim)
|
|
|
+ dynamic_sizes = {
|
|
|
+ int(2 ** size) for size in [
|
|
|
+ np.floor(log_size - 1), # 约50%原尺寸
|
|
|
+ np.ceil(log_size), # 接近原尺寸
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ window_sizes = sorted(base_sizes & dynamic_sizes)
|
|
|
|
|
|
+ if not window_sizes:
|
|
|
+ # 如果没有合适的预设尺寸,选择最接近的基础尺寸
|
|
|
+ closest_size = min(base_sizes, key=lambda x: abs(np.log2(x) - log_size))
|
|
|
+ window_sizes = [closest_size]
|
|
|
+
|
|
|
+ # 智能步长配置(窗口越大,步长比例越大)
|
|
|
+ def get_stride_ratio(size):
|
|
|
+ # 使用线性插值计算步长比例
|
|
|
+ size_ratio = np.clip(size / 2048, 0.2, 0.8)
|
|
|
+ return 0.2 + size_ratio * 0.3 # 步长比例范围:0.2-0.5
|
|
|
+
|
|
|
# 预处理优化 --------------------------------------------
|
|
|
- # 生成基准图像(最大窗口尺寸)
|
|
|
+ # 生成基准图像(使用最大窗口尺寸)
|
|
|
max_win_size = max(window_sizes)
|
|
|
- base_size = (int(max_win_size * aspect_ratio), max_win_size) if aspect_ratio > 1 else \
|
|
|
- (max_win_size, int(max_win_size / aspect_ratio))
|
|
|
-
|
|
|
+ if aspect_ratio > 1:
|
|
|
+ base_size = (int(max_win_size * aspect_ratio), max_win_size)
|
|
|
+ else:
|
|
|
+ base_size = (max_win_size, int(max_win_size / aspect_ratio))
|
|
|
+
|
|
|
+ # 图像转换和加载
|
|
|
transform = transforms.Compose([
|
|
|
transforms.Resize(base_size[::-1], interpolation=transforms.InterpolationMode.LANCZOS),
|
|
|
self.base_transform
|
|
|
])
|
|
|
- base_img = transform(image).to(self.device)
|
|
|
|
|
|
- # 半精度加速
|
|
|
- self.model.half()
|
|
|
- base_img = base_img.half()
|
|
|
-
|
|
|
- # 批量特征提取 ------------------------------------------
|
|
|
+ try:
|
|
|
+ base_img = transform(image).to(self.device)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"图像转换失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 特征提取 ---------------------------------------------
|
|
|
all_features = []
|
|
|
+ total_windows = 0
|
|
|
+
|
|
|
for win_size in window_sizes:
|
|
|
- # 动态步长选择
|
|
|
- stride = int(win_size * stride_ratios.get(win_size, 0.3))
|
|
|
+ # 计算动态步长
|
|
|
+ stride_ratio = get_stride_ratio(win_size)
|
|
|
+ stride = max(int(win_size * stride_ratio), 16) # 确保最小步长
|
|
|
|
|
|
- # 生成窗口坐标(考虑边缘填充)
|
|
|
+ # 计算窗口数量
|
|
|
h, w = base_img.shape[1:]
|
|
|
num_h = (h - win_size) // stride + 1
|
|
|
num_w = (w - win_size) // stride + 1
|
|
|
|
|
|
- # 调整窗口数量上限(防止显存溢出)
|
|
|
- MAX_WINDOWS = 32 # 根据显存调整
|
|
|
- if num_h * num_w > MAX_WINDOWS:
|
|
|
- stride = int(np.sqrt(h * w * win_size**2 / MAX_WINDOWS))
|
|
|
+ # 内存优化:控制单个尺寸下的最大窗口数
|
|
|
+ MAX_WINDOWS_PER_SIZE = 64
|
|
|
+ if num_h * num_w > MAX_WINDOWS_PER_SIZE:
|
|
|
+ adjusted_stride = int(np.sqrt((h * w) / MAX_WINDOWS_PER_SIZE))
|
|
|
+ stride = max(stride, adjusted_stride)
|
|
|
num_h = (h - win_size) // stride + 1
|
|
|
num_w = (w - win_size) // stride + 1
|
|
|
-
|
|
|
- # 批量裁剪窗口
|
|
|
- windows = []
|
|
|
+
|
|
|
+ print(f"处理窗口 {win_size}x{win_size}, 步长 {stride}, 窗口数 {num_h * num_w}")
|
|
|
+
|
|
|
+ # 批量处理窗口
|
|
|
+ batch = []
|
|
|
+ batch_size = min(16, num_h * num_w) # 动态批次大小
|
|
|
+
|
|
|
for i in range(num_h):
|
|
|
for j in range(num_w):
|
|
|
top = i * stride
|
|
|
left = j * stride
|
|
|
window = base_img[:, top:top+win_size, left:left+win_size]
|
|
|
- windows.append(window)
|
|
|
+
|
|
|
+ if torch.isnan(window).any() or torch.isinf(window).any():
|
|
|
+ continue
|
|
|
+
|
|
|
+ batch.append(window)
|
|
|
+ total_windows += 1
|
|
|
+
|
|
|
+ if len(batch) >= batch_size:
|
|
|
+ with torch.no_grad():
|
|
|
+ try:
|
|
|
+ batch_tensor = torch.stack(batch)
|
|
|
+ features = self.model(batch_tensor)
|
|
|
+ all_features.append(features.cpu()) # 转移到CPU释放显存
|
|
|
+ except RuntimeError as e:
|
|
|
+ print(f"批处理失败,尝试减小批次大小: {e}")
|
|
|
+ if batch_size > 4:
|
|
|
+ batch_size //= 2
|
|
|
+ continue
|
|
|
+ batch = []
|
|
|
|
|
|
- if not windows:
|
|
|
- continue
|
|
|
-
|
|
|
- # 批量处理(自动分块防止OOM)
|
|
|
- BATCH_SIZE = 8 # 根据显存调整
|
|
|
- with torch.no_grad(), torch.cuda.amp.autocast():
|
|
|
- for i in range(0, len(windows), BATCH_SIZE):
|
|
|
- batch = torch.stack(windows[i:i+BATCH_SIZE])
|
|
|
- features = self.model(batch)
|
|
|
- all_features.append(features.cpu().float()) # 转移至CPU释放显存
|
|
|
-
|
|
|
+ # 处理剩余的窗口
|
|
|
+ if batch:
|
|
|
+ with torch.no_grad():
|
|
|
+ try:
|
|
|
+ batch_tensor = torch.stack(batch)
|
|
|
+ features = self.model(batch_tensor)
|
|
|
+ all_features.append(features.cpu())
|
|
|
+ except RuntimeError as e:
|
|
|
+ print(f"处理剩余窗口失败: {e}")
|
|
|
+
|
|
|
# 特征融合 ---------------------------------------------
|
|
|
if not all_features:
|
|
|
+ print("未能提取到有效特征")
|
|
|
return None
|
|
|
+
|
|
|
+ print(f"总处理窗口数: {total_windows}")
|
|
|
|
|
|
- final_feature = torch.cat([f.view(-1, f.shape[-1]) for f in all_features], dim=0)
|
|
|
- final_feature = final_feature.mean(dim=0).to(self.device)
|
|
|
-
|
|
|
- return final_feature
|
|
|
+ # 合并所有特征
|
|
|
+ try:
|
|
|
+ final_features = torch.cat([f.view(-1, f.shape[-1]) for f in all_features], dim=0)
|
|
|
+
|
|
|
+ # 如果特征数量过多,进行随机采样
|
|
|
+ if final_features.size(0) > 1000:
|
|
|
+ indices = torch.randperm(final_features.size(0))[:1000]
|
|
|
+ final_features = final_features[indices]
|
|
|
+
|
|
|
+ return final_features.mean(dim=0).to(self.device)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"特征融合失败: {e}")
|
|
|
+ return None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"滑动窗口特征提取失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
+ return None
|