From 651e6ba9c414fadbe8a229d86d97343fc69c2b49 Mon Sep 17 00:00:00 2001 From: liaoxingyu Date: Sat, 9 May 2020 18:23:36 +0800 Subject: [PATCH] feat: support multiprocess predictor add asyncpredictor to support multiprocessing feature extraction with dataloader --- demo/demo.py | 35 +++---- demo/predictor.py | 185 ++++++++++++++++++++++++++++++++++++ fastreid/engine/defaults.py | 36 +++---- 3 files changed, 216 insertions(+), 40 deletions(-) create mode 100644 demo/predictor.py diff --git a/demo/demo.py b/demo/demo.py index 35c947c..6eca7fb 100644 --- a/demo/demo.py +++ b/demo/demo.py @@ -17,7 +17,7 @@ from torch.backends import cudnn sys.path.append('..') from fastreid.config import get_cfg -from fastreid.engine import DefaultPredictor +from predictor import FeatureExtractionDemo cudnn.benchmark = True @@ -32,26 +32,28 @@ def setup_cfg(args): def get_parser(): - parser = argparse.ArgumentParser(description="FastReID demo for builtin models") + parser = argparse.ArgumentParser(description="Feature extraction with reid models") parser.add_argument( "--config-file", - default="configs/quick_schedules/mask_rcnn_R_50_FPN_inference_acc_test.yaml", metavar="FILE", help="path to config file", ) + parser.add_argument( + '--device', + default='cuda: 1', + help='CUDA device to use' + ) + parser.add_argument( + '--parallel', + action='store_true', + help='If use multiprocess for feature extraction.' + ) parser.add_argument( "--input", nargs="+", help="A list of space separated input images; " "or a single glob pattern such as 'directory/*.jpg'", ) - parser.add_argument( - "--output", - default="traced_module/", - help="A file or directory to save export jit module.", - - ) - parser.add_argument( "--opts", help="Modify config options using the command-line 'KEY VALUE' pairs", @@ -64,19 +66,18 @@ def get_parser(): if __name__ == '__main__': args = get_parser().parse_args() cfg = setup_cfg(args) - demo = DefaultPredictor(cfg) + demo = FeatureExtractionDemo(cfg, device=args.device, parallel=args.parallel) feats = [] if args.input: if len(args.input) == 1: args.input = glob.glob(os.path.expanduser(args.input[0])) assert args.input, "The input path(s) was not found" - for path in tqdm.tqdm(args.input, disable=not args.output): + for path in tqdm.tqdm(args.input): img = cv2.imread(path) - feats.append(demo(img)) + feat = demo.run_on_image(img) + feats.append(feat.numpy()) - cos_12 = np.dot(feats[0], feats[1].T).item() - cos_13 = np.dot(feats[0], feats[2].T).item() - cos_23 = np.dot(feats[1], feats[2].T).item() + cos_sim = np.dot(feats[0], feats[1].T).item() - print('cosine similarity is {:.4f}, {:.4f}, {:.4f}'.format(cos_12, cos_13, cos_23)) + print('cosine similarity of the first two images is {:.4f}'.format(cos_sim)) diff --git a/demo/predictor.py b/demo/predictor.py new file mode 100644 index 0000000..1d3c9b9 --- /dev/null +++ b/demo/predictor.py @@ -0,0 +1,185 @@ +# encoding: utf-8 +""" +@author: xingyu liao +@contact: liaoxingyu5@jd.com +""" + +import atexit +import bisect + +import cv2 +import torch +import torch.multiprocessing as mp +from collections import deque + +from fastreid.engine import DefaultPredictor + +try: + mp.set_start_method('spawn') +except RuntimeError: + pass + + +class FeatureExtractionDemo(object): + def __init__(self, cfg, device='cuda:0', parallel=False): + """ + Args: + cfg (CfgNode): + parallel (bool) whether to run the model in different processes from visualization.: + Useful since the visualization logic can be slow. + """ + self.cfg = cfg + self.parallel = parallel + + if parallel: + self.num_gpus = torch.cuda.device_count() + self.predictor = AsyncPredictor(cfg, self.num_gpus) + else: + self.predictor = DefaultPredictor(cfg, device) + + num_channels = len(cfg.MODEL.PIXEL_MEAN) + self.mean = torch.tensor(cfg.MODEL.PIXEL_MEAN).view(1, num_channels, 1, 1) + self.std = torch.tensor(cfg.MODEL.PIXEL_STD).view(1, num_channels, 1, 1) + + def run_on_image(self, original_image): + """ + + Args: + original_image (np.ndarray): an image of shape (H, W, C) (in BGR order). + This is the format used by OpenCV. + + Returns: + predictions (np.ndarray): normalized feature of the model. + """ + # the model expects RGB inputs + original_image = original_image[:, :, ::-1] + # Apply pre-processing to image. + image = cv2.resize(original_image, tuple(self.cfg.INPUT.SIZE_TEST[::-1]), interpolation=cv2.INTER_CUBIC) + image = torch.as_tensor(image.astype("float32").transpose(2, 0, 1))[None] + image.sub_(self.mean).div_(self.std) + predictions = self.predictor(image) + return predictions + + def run_on_loader(self, data_loader): + + image_gen = self._image_from_loader(data_loader) + if self.parallel: + buffer_size = self.predictor.default_buffer_size + + batch_data = deque() + + for cnt, batch in enumerate(image_gen): + batch_data.append(batch) + self.predictor.put(batch['images']) + + if cnt >= buffer_size: + batch = batch_data.popleft() + predictions = self.predictor.get() + yield predictions, batch['targets'].numpy(), batch['camid'].numpy() + + while len(batch_data): + batch = batch_data.popleft() + predictions = self.predictor.get() + yield predictions, batch['targets'].numpy(), batch['camid'].numpy() + else: + for batch in image_gen: + predictions = self.predictor(batch['images']) + yield predictions, batch['targets'].numpy(), batch['camid'].numpy() + + def _image_from_loader(self, data_loader): + data_loader.reset() + data = data_loader.next() + while data is not None: + yield data + data = data_loader.next() + + +class AsyncPredictor: + """ + A predictor that runs the model asynchronously, possibly on >1 GPUs. + Because when the amount of data is large. + """ + + class _StopToken: + pass + + class _PredictWorker(mp.Process): + def __init__(self, cfg, device, task_queue, result_queue): + self.cfg = cfg + self.device = device + self.task_queue = task_queue + self.result_queue = result_queue + super().__init__() + + def run(self): + predictor = DefaultPredictor(self.cfg, self.device) + + while True: + task = self.task_queue.get() + if isinstance(task, AsyncPredictor._StopToken): + break + idx, data = task + result = predictor(data) + self.result_queue.put((idx, result)) + + def __init__(self, cfg, num_gpus: int = 1): + """ + + Args: + cfg (CfgNode): + num_gpus (int): if 0, will run on CPU + """ + num_workers = max(num_gpus, 1) + self.task_queue = mp.Queue(maxsize=num_workers * 3) + self.result_queue = mp.Queue(maxsize=num_workers * 3) + self.procs = [] + for gpuid in range(max(num_gpus, 1)): + device = "cuda:{}".format(gpuid) if num_gpus > 0 else "cpu" + self.procs.append( + AsyncPredictor._PredictWorker(cfg, device, self.task_queue, self.result_queue) + ) + + self.put_idx = 0 + self.get_idx = 0 + self.result_rank = [] + self.result_data = [] + + for p in self.procs: + p.start() + + atexit.register(self.shutdown) + + def put(self, image): + self.put_idx += 1 + self.task_queue.put((self.put_idx, image)) + + def get(self): + self.get_idx += 1 + if len(self.result_rank) and self.result_rank[0] == self.get_idx: + res = self.result_data[0] + del self.result_data[0], self.result_rank[0] + return res + + while True: + # Make sure the results are returned in the correct order + idx, res = self.result_queue.get() + if idx == self.get_idx: + return res + insert = bisect.bisect(self.result_rank, idx) + self.result_rank.insert(insert, idx) + self.result_data.insert(insert, res) + + def __len__(self): + return self.put_idx - self.get_idx + + def __call__(self, image): + self.put(image) + return self.get() + + def shutdown(self): + for _ in self.procs: + self.task_queue.put(AsyncPredictor._StopToken()) + + @property + def default_buffer_size(self): + return len(self.procs) * 5 diff --git a/fastreid/engine/defaults.py b/fastreid/engine/defaults.py index 2065447..fd871d4 100644 --- a/fastreid/engine/defaults.py +++ b/fastreid/engine/defaults.py @@ -13,8 +13,6 @@ import logging import os from collections import OrderedDict -import cv2 -import numpy as np import torch import torch.nn.functional as F from torch.nn import DataParallel @@ -131,40 +129,32 @@ class DefaultPredictor: outputs = pred(inputs) """ - def __init__(self, cfg): + def __init__(self, cfg, device='cpu'): self.cfg = cfg.clone() # cfg can be modified by model - model = build_model(self.cfg) - self.model = DataParallel(model) - self.model.cuda() + self.cfg.defrost() + self.cfg.MODEL.BACKBONE.PRETRAIN = False + self.device = device + self.model = build_model(self.cfg) + self.model.to(device) self.model.eval() checkpointer = Checkpointer(self.model) checkpointer.load(cfg.MODEL.WEIGHTS) - num_channels = len(cfg.MODEL.PIXEL_MEAN) - self.mean = torch.tensor(cfg.MODEL.PIXEL_MEAN).view(1, num_channels, 1, 1) - self.std = torch.tensor(cfg.MODEL.PIXEL_STD).view(1, num_channels, 1, 1) - - def __call__(self, original_image): + def __call__(self, image): """ Args: - original_image (np.ndarray): an image of shape (H, W, C) (in BGR order). + image (torch.tensor): an image tensor of shape (B, C, H, W). Returns: - predictions (np.ndarray): the output of the model + predictions (torch.tensor): the output features of the model """ with torch.no_grad(): # https://github.com/sphinx-doc/sphinx/issues/4258 - # Apply pre-processing to image. - # the model expects RGB inputs - original_image = original_image[:, :, ::-1] - image = cv2.resize(original_image, tuple(self.cfg.INPUT.SIZE_TEST[::-1]), interpolation=cv2.INTER_CUBIC) - image = torch.as_tensor(image.astype("float32").transpose(2, 0, 1))[None] - image.sub_(self.mean).div_(self.std) - + image = image.to(self.device) inputs = {"images": image} - pred_feat = self.model(inputs) + predictions = self.model(inputs) # Normalize feature to compute cosine distance - pred_feat = F.normalize(pred_feat) - pred_feat = pred_feat.cpu().data.numpy() + pred_feat = F.normalize(predictions) + pred_feat = pred_feat.cpu().data return pred_feat