PaddleClas/ppcls/loss/tripletangularmarginloss.py

242 lines
10 KiB
Python

# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import paddle
import paddle.nn as nn
from ppcls.loss.xbm import CrossBatchMemory
class TripletAngularMarginLoss(nn.Layer):
"""A more robust triplet loss with hard positive/negative mining on angular margin instead of relative distance between d(a,p) and d(a,n).
Args:
margin (float, optional): angular margin. Defaults to 0.5.
normalize_feature (bool, optional): whether to apply L2-norm in feature before computing distance(cos-similarity). Defaults to True.
reduction (str, optional): reducing option within an batch . Defaults to "mean".
add_absolute (bool, optional): whether add absolute loss within d(a,p) or d(a,n). Defaults to False.
absolute_loss_weight (float, optional): weight for absolute loss. Defaults to 1.0.
ap_value (float, optional): weight for d(a, p). Defaults to 0.9.
an_value (float, optional): weight for d(a, n). Defaults to 0.5.
feature_from (str, optional): which key feature from. Defaults to "features".
"""
def __init__(self,
margin=0.5,
normalize_feature=True,
reduction="mean",
add_absolute=False,
absolute_loss_weight=1.0,
ap_value=0.9,
an_value=0.5,
feature_from="features"):
super(TripletAngularMarginLoss, self).__init__()
self.margin = margin
self.feature_from = feature_from
self.ranking_loss = paddle.nn.loss.MarginRankingLoss(
margin=margin, reduction=reduction)
self.normalize_feature = normalize_feature
self.add_absolute = add_absolute
self.ap_value = ap_value
self.an_value = an_value
self.absolute_loss_weight = absolute_loss_weight
def forward(self, input, target):
"""
Args:
inputs: feature matrix with shape (batch_size, feat_dim)
target: ground truth labels with shape (batch_size)
"""
inputs = input[self.feature_from]
if self.normalize_feature:
inputs = paddle.divide(
inputs, paddle.norm(
inputs, p=2, axis=-1, keepdim=True))
bs = inputs.shape[0]
# compute distance(cos-similarity)
dist = paddle.matmul(inputs, inputs.t())
# hard negative mining
is_pos = paddle.expand(target, (
bs, bs)).equal(paddle.expand(target, (bs, bs)).t())
is_neg = paddle.expand(target, (
bs, bs)).not_equal(paddle.expand(target, (bs, bs)).t())
# `dist_ap` means distance(anchor, positive)
# both `dist_ap` and `relative_p_inds` with shape [N, 1]
dist_ap = paddle.min(paddle.reshape(
paddle.masked_select(dist, is_pos), (bs, -1)),
axis=1,
keepdim=True)
# `dist_an` means distance(anchor, negative)
# both `dist_an` and `relative_n_inds` with shape [N, 1]
dist_an = paddle.max(paddle.reshape(
paddle.masked_select(dist, is_neg), (bs, -1)),
axis=1,
keepdim=True)
# shape [N]
dist_ap = paddle.squeeze(dist_ap, axis=1)
dist_an = paddle.squeeze(dist_an, axis=1)
# Compute ranking hinge loss
y = paddle.ones_like(dist_an)
loss = self.ranking_loss(dist_ap, dist_an, y)
if self.add_absolute:
absolut_loss_ap = self.ap_value - dist_ap
absolut_loss_ap = paddle.where(absolut_loss_ap > 0,
absolut_loss_ap,
paddle.zeros_like(absolut_loss_ap))
absolut_loss_an = dist_an - self.an_value
absolut_loss_an = paddle.where(absolut_loss_an > 0,
absolut_loss_an,
paddle.ones_like(absolut_loss_an))
loss = (absolut_loss_an.mean() + absolut_loss_ap.mean()
) * self.absolute_loss_weight + loss.mean()
return {"TripletAngularMarginLoss": loss}
class TripletAngularMarginLoss_XBM(TripletAngularMarginLoss):
"""TripletAngularMarginLoss combined with CrossBatchMemory
Args:
start_iter: (int): from which step CrossBatchMemory is enabled
xbm_size: (int): Size of CrossBatchMemory
xbm_weight: (float): Weight of CrossBatchMemory loss
feat_dim: (int): Channels of features in CrossBatchMemory
margin (float, optional): angular margin. Defaults to 0.5.
normalize_feature (bool, optional): whether to apply L2-norm in feature before computing distance(cos-similarity). Defaults to True.
reduction (str, optional): reducing option within an batch . Defaults to "mean".
add_absolute (bool, optional): whether add absolute loss within d(a,p) or d(a,n). Defaults to False.
absolute_loss_weight (float, optional): weight for absolute loss. Defaults to 1.0.
ap_value (float, optional): weight for d(a, p). Defaults to 0.9.
an_value (float, optional): weight for d(a, n). Defaults to 0.5.
feature_from (str, optional): which key feature from. Defaults to "features".
"""
def __init__(self,
start_iter: int,
xbm_size: int,
xbm_weight: float,
feat_dim: int,
margin=0.5,
normalize_feature=True,
reduction="mean",
add_absolute=False,
absolute_loss_weight=1.0,
ap_value=0.9,
an_value=0.5,
feature_from="features"):
super(TripletAngularMarginLoss_XBM, self).__init__(
margin, normalize_feature, reduction, add_absolute,
absolute_loss_weight, ap_value, an_value, feature_from)
self.start_iter = start_iter
self.xbm = CrossBatchMemory(xbm_size, feat_dim)
self.xbm_weight = xbm_weight
self.inf = 10 # 10 is big enough as inf for cos-similarity
self.register_buffer("iter", paddle.to_tensor(0, dtype="int64"))
def forward(self, input, target):
"""
Args:
inputs: feature matrix with shape (batch_size, feat_dim)
target: ground truth labels with shape (batch_size)
"""
feats = input[self.feature_from]
if self.normalize_feature:
feats = nn.functional.normalize(feats, p=2, axis=1)
labels = target
if labels.ndim >= 2 and labels.shape[-1] == 1:
labels = paddle.squeeze(labels, axis=[-1])
loss = self._compute_loss(feats, labels, feats, labels)
# XBM loss below
self.iter += 1
if self.iter.item() > self.start_iter:
self.xbm.enqueue_dequeue(feats.detach(), labels.detach())
xbm_feats, xbm_labels = self.xbm.get()
xbm_loss = self._compute_loss(feats, labels, xbm_feats, xbm_labels)
loss = loss + self.xbm_weight * xbm_loss
return {"TripletAngularMarginLoss_XBM": loss}
def _masked_max(self, tensor, mask, axis):
masked = paddle.multiply(tensor, mask.astype(tensor.dtype))
neg_inf = paddle.zeros_like(tensor)
neg_inf.stop_gradient = True
neg_inf[paddle.logical_not(mask)] = -self.inf
return paddle.max(masked + neg_inf, axis=axis, keepdim=True)
def _masked_min(self, tensor, mask, axis):
masked = paddle.multiply(tensor, mask.astype(tensor.dtype))
pos_inf = paddle.zeros_like(tensor)
pos_inf.stop_gradient = True
pos_inf[paddle.logical_not(mask)] = self.inf
return paddle.min(masked + pos_inf, axis=axis, keepdim=True)
def _compute_loss(self,
inputs_q: paddle.Tensor,
targets_q: paddle.Tensor,
inputs_k: paddle.Tensor,
targets_k: paddle.Tensor) -> paddle.Tensor:
Q = inputs_q.shape[0]
K = inputs_k.shape[0]
# compute distance(cos-similarity)
dist = paddle.matmul(inputs_q, inputs_k.t()) # [Q, K]
# hard negative mining
is_pos = paddle.expand(paddle.unsqueeze(targets_q, 1), (Q, K)).equal(
paddle.expand(paddle.unsqueeze(targets_k, 1),
(K, Q)).t()) # [Q, K]
is_neg = paddle.expand(paddle.unsqueeze(targets_q, 1),
(Q, K)).not_equal(
paddle.expand(
paddle.unsqueeze(targets_k, 1),
(K, Q)).t()) # [Q, K]
dist_ap = self._masked_min(dist, is_pos, axis=1) # [Q, ]
dist_an = self._masked_max(dist, is_neg, axis=1) # [Q, ]
# Compute ranking hinge loss
y = paddle.ones_like(dist_an)
loss = self.ranking_loss(dist_ap, dist_an, y)
if self.add_absolute:
absolut_loss_ap = self.ap_value - dist_ap
absolut_loss_ap = paddle.where(absolut_loss_ap > 0,
absolut_loss_ap,
paddle.zeros_like(absolut_loss_ap))
absolut_loss_an = dist_an - self.an_value
absolut_loss_an = paddle.where(absolut_loss_an > 0,
absolut_loss_an,
paddle.ones_like(absolut_loss_an))
loss = (absolut_loss_an.mean() + absolut_loss_ap.mean()
) * self.absolute_loss_weight + loss.mean()
return loss