PaddleClas/deploy/python/postprocess.py

162 lines
5.3 KiB
Python

# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import copy
import shutil
from functools import partial
import importlib
import numpy as np
import paddle
import paddle.nn.functional as F
def build_postprocess(config):
if config is None:
return None
mod = importlib.import_module(__name__)
config = copy.deepcopy(config)
main_indicator = config.pop(
"main_indicator") if "main_indicator" in config else None
main_indicator = main_indicator if main_indicator else ""
func_list = []
for func in config:
func_list.append(getattr(mod, func)(**config[func]))
return PostProcesser(func_list, main_indicator)
class PostProcesser(object):
def __init__(self, func_list, main_indicator="Topk"):
self.func_list = func_list
self.main_indicator = main_indicator
def __call__(self, x, image_file=None):
rtn = None
for func in self.func_list:
tmp = func(x, image_file)
if type(func).__name__ in self.main_indicator:
rtn = tmp
return rtn
class Topk(object):
def __init__(self, topk=1, class_id_map_file=None):
assert isinstance(topk, (int, ))
self.class_id_map = self.parse_class_id_map(class_id_map_file)
self.topk = topk
def parse_class_id_map(self, class_id_map_file):
if class_id_map_file is None:
return None
if not os.path.exists(class_id_map_file):
print(
"Warning: If want to use your own label_dict, please input legal path!\nOtherwise label_names will be empty!"
)
return None
try:
class_id_map = {}
with open(class_id_map_file, "r") as fin:
lines = fin.readlines()
for line in lines:
partition = line.split("\n")[0].partition(" ")
class_id_map[int(partition[0])] = str(partition[-1])
except Exception as ex:
print(ex)
class_id_map = None
return class_id_map
def __call__(self, x, file_names=None, multilabel=False):
if file_names is not None:
assert x.shape[0] == len(file_names)
y = []
for idx, probs in enumerate(x):
index = probs.argsort(axis=0)[-self.topk:][::-1].astype(
"int32") if not multilabel else np.where(
probs >= 0.5)[0].astype("int32")
clas_id_list = []
score_list = []
label_name_list = []
for i in index:
clas_id_list.append(i.item())
score_list.append(probs[i].item())
if self.class_id_map is not None:
label_name_list.append(self.class_id_map[i.item()])
result = {
"class_ids": clas_id_list,
"scores": np.around(
score_list, decimals=5).tolist(),
}
if file_names is not None:
result["file_name"] = file_names[idx]
if label_name_list is not None:
result["label_names"] = label_name_list
y.append(result)
return y
class MultiLabelTopk(Topk):
def __init__(self, topk=1, class_id_map_file=None):
super().__init__()
def __call__(self, x, file_names=None):
return super().__call__(x, file_names, multilabel=True)
class SavePreLabel(object):
def __init__(self, save_dir):
if save_dir is None:
raise Exception(
"Please specify save_dir if SavePreLabel specified.")
self.save_dir = partial(os.path.join, save_dir)
def __call__(self, x, file_names=None):
if file_names is None:
return
assert x.shape[0] == len(file_names)
for idx, probs in enumerate(x):
index = probs.argsort(axis=0)[-1].astype("int32")
self.save(index, file_names[idx])
def save(self, id, image_file):
output_dir = self.save_dir(str(id))
os.makedirs(output_dir, exist_ok=True)
shutil.copy(image_file, output_dir)
class Binarize(object):
def __init__(self, method="round"):
self.method = method
self.unit = np.array([[128, 64, 32, 16, 8, 4, 2, 1]]).T
def __call__(self, x, file_names=None):
if self.method == "round":
x = np.round(x + 1).astype("uint8") - 1
if self.method == "sign":
x = ((np.sign(x) + 1) / 2).astype("uint8")
embedding_size = x.shape[1]
assert embedding_size % 8 == 0, "The Binary index only support vectors with sizes multiple of 8"
byte = np.zeros([x.shape[0], embedding_size // 8], dtype=np.uint8)
for i in range(embedding_size // 8):
byte[:, i:i + 1] = np.dot(x[:, i * 8:(i + 1) * 8], self.unit)
return byte