fast-reid/fastreid/utils/compute_dist.py

201 lines
6.4 KiB
Python

# encoding: utf-8
"""
@author: xingyu liao
@contact: sherlockliao01@gmail.com
"""
# Modified from: https://github.com/open-mmlab/OpenUnReID/blob/66bb2ae0b00575b80fbe8915f4d4f4739cc21206/openunreid/core/utils/compute_dist.py
import faiss
import numpy as np
import torch
import torch.nn.functional as F
from .faiss_utils import (
index_init_cpu,
index_init_gpu,
search_index_pytorch,
search_raw_array_pytorch,
)
__all__ = [
"build_dist",
"compute_jaccard_distance",
"compute_euclidean_distance",
"compute_cosine_distance",
]
@torch.no_grad()
def build_dist(feat_1: torch.Tensor, feat_2: torch.Tensor, metric: str = "euclidean", **kwargs) -> np.ndarray:
r"""Compute distance between two feature embeddings.
Args:
feat_1 (torch.Tensor): 2-D feature with batch dimension.
feat_2 (torch.Tensor): 2-D feature with batch dimension.
metric:
Returns:
numpy.ndarray: distance matrix.
"""
assert metric in ["cosine", "euclidean", "jaccard"], "Expected metrics are cosine, euclidean and jaccard, " \
"but got {}".format(metric)
if metric == "euclidean":
return compute_euclidean_distance(feat_1, feat_2)
elif metric == "cosine":
return compute_cosine_distance(feat_1, feat_2)
elif metric == "jaccard":
feat = torch.cat((feat_1, feat_2), dim=0)
dist = compute_jaccard_distance(feat, k1=kwargs["k1"], k2=kwargs["k2"], search_option=0)
return dist[: feat_1.size(0), feat_1.size(0):]
def k_reciprocal_neigh(initial_rank, i, k1):
forward_k_neigh_index = initial_rank[i, : k1 + 1]
backward_k_neigh_index = initial_rank[forward_k_neigh_index, : k1 + 1]
fi = np.where(backward_k_neigh_index == i)[0]
return forward_k_neigh_index[fi]
@torch.no_grad()
def compute_jaccard_distance(features, k1=20, k2=6, search_option=0, fp16=False):
if search_option < 3:
# torch.cuda.empty_cache()
features = features.cuda()
ngpus = faiss.get_num_gpus()
N = features.size(0)
mat_type = np.float16 if fp16 else np.float32
if search_option == 0:
# GPU + PyTorch CUDA Tensors (1)
res = faiss.StandardGpuResources()
res.setDefaultNullStreamAllDevices()
_, initial_rank = search_raw_array_pytorch(res, features, features, k1)
initial_rank = initial_rank.cpu().numpy()
elif search_option == 1:
# GPU + PyTorch CUDA Tensors (2)
res = faiss.StandardGpuResources()
index = faiss.GpuIndexFlatL2(res, features.size(-1))
index.add(features.cpu().numpy())
_, initial_rank = search_index_pytorch(index, features, k1)
res.syncDefaultStreamCurrentDevice()
initial_rank = initial_rank.cpu().numpy()
elif search_option == 2:
# GPU
index = index_init_gpu(ngpus, features.size(-1))
index.add(features.cpu().numpy())
_, initial_rank = index.search(features.cpu().numpy(), k1)
else:
# CPU
index = index_init_cpu(features.size(-1))
index.add(features.cpu().numpy())
_, initial_rank = index.search(features.cpu().numpy(), k1)
nn_k1 = []
nn_k1_half = []
for i in range(N):
nn_k1.append(k_reciprocal_neigh(initial_rank, i, k1))
nn_k1_half.append(k_reciprocal_neigh(initial_rank, i, int(np.around(k1 / 2))))
V = np.zeros((N, N), dtype=mat_type)
for i in range(N):
k_reciprocal_index = nn_k1[i]
k_reciprocal_expansion_index = k_reciprocal_index
for candidate in k_reciprocal_index:
candidate_k_reciprocal_index = nn_k1_half[candidate]
if len(
np.intersect1d(candidate_k_reciprocal_index, k_reciprocal_index)
) > 2 / 3 * len(candidate_k_reciprocal_index):
k_reciprocal_expansion_index = np.append(
k_reciprocal_expansion_index, candidate_k_reciprocal_index
)
k_reciprocal_expansion_index = np.unique(
k_reciprocal_expansion_index
) # element-wise unique
x = features[i].unsqueeze(0).contiguous()
y = features[k_reciprocal_expansion_index]
m, n = x.size(0), y.size(0)
dist = (
torch.pow(x, 2).sum(dim=1, keepdim=True).expand(m, n)
+ torch.pow(y, 2).sum(dim=1, keepdim=True).expand(n, m).t()
)
dist.addmm_(x, y.t(), beta=1, alpha=-2)
if fp16:
V[i, k_reciprocal_expansion_index] = (
F.softmax(-dist, dim=1).view(-1).cpu().numpy().astype(mat_type)
)
else:
V[i, k_reciprocal_expansion_index] = (
F.softmax(-dist, dim=1).view(-1).cpu().numpy()
)
del nn_k1, nn_k1_half, x, y
features = features.cpu()
if k2 != 1:
V_qe = np.zeros_like(V, dtype=mat_type)
for i in range(N):
V_qe[i, :] = np.mean(V[initial_rank[i, :k2], :], axis=0)
V = V_qe
del V_qe
del initial_rank
invIndex = []
for i in range(N):
invIndex.append(np.where(V[:, i] != 0)[0]) # len(invIndex)=all_num
jaccard_dist = np.zeros((N, N), dtype=mat_type)
for i in range(N):
temp_min = np.zeros((1, N), dtype=mat_type)
indNonZero = np.where(V[i, :] != 0)[0]
indImages = [invIndex[ind] for ind in indNonZero]
for j in range(len(indNonZero)):
temp_min[0, indImages[j]] = temp_min[0, indImages[j]] + np.minimum(
V[i, indNonZero[j]], V[indImages[j], indNonZero[j]]
)
jaccard_dist[i] = 1 - temp_min / (2 - temp_min)
del invIndex, V
pos_bool = jaccard_dist < 0
jaccard_dist[pos_bool] = 0.0
return jaccard_dist
@torch.no_grad()
def compute_euclidean_distance(features, others):
m, n = features.size(0), others.size(0)
dist_m = (
torch.pow(features, 2).sum(dim=1, keepdim=True).expand(m, n)
+ torch.pow(others, 2).sum(dim=1, keepdim=True).expand(n, m).t()
)
dist_m.addmm_(1, -2, features, others.t())
return dist_m.cpu().numpy()
@torch.no_grad()
def compute_cosine_distance(features, others):
"""Computes cosine distance.
Args:
features (torch.Tensor): 2-D feature matrix.
others (torch.Tensor): 2-D feature matrix.
Returns:
torch.Tensor: distance matrix.
"""
features = F.normalize(features, p=2, dim=1)
others = F.normalize(others, p=2, dim=1)
dist_m = 1 - torch.mm(features, others.t())
return dist_m.cpu().numpy()