Support mmpose:dev-1.x (#884)

* modify mmpose rewritings

* test exporting to ort with hrnet

* support mspn

* update tests

* update tests

* support flip_test

* support mmpose 1.x

* update mmpose.yml

* fix adaptive pool

* align with master for adaptive_pool rewriting

* fix pipeline

* add batch_size to test

* resolve comment
pull/1056/head
RunningLeon 2022-09-19 21:40:53 +08:00 committed by GitHub
parent 99aa1fded9
commit 2fdba2c523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 632 additions and 1191 deletions

View File

@ -5,10 +5,11 @@ codebase_config = dict(model_type='sdk')
backend_config = dict(pipeline=[
dict(type='LoadImageFromFile', channel_order='bgr'),
dict(
type='Collect',
type='PackPoseInputs',
keys=['img'],
meta_keys=[
'image_file', 'bbox', 'rotation', 'bbox_score', 'flip_pairs'
'id', 'img_id', 'img_path', 'ori_shape', 'img_shape', 'input_size',
'flip_indices', 'category'
])
])

View File

@ -1,5 +1,5 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .deploy import MMPose, PoseDetection
from .deploy import PoseDetection
from .models import * # noqa: F401,F403
__all__ = ['MMPose', 'PoseDetection']
__all__ = ['PoseDetection']

View File

@ -1,5 +1,4 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.codebase.mmpose.deploy.mmpose import MMPose
from mmdeploy.codebase.mmpose.deploy.pose_detection import PoseDetection
__all__ = ['MMPose', 'PoseDetection']
__all__ = ['PoseDetection']

View File

@ -1,134 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Optional, Union
import mmengine
import torch
from mmcv.utils import Registry
from torch.utils.data import DataLoader, Dataset
from mmdeploy.codebase.base import CODEBASE, BaseTask, MMCodebase
from mmdeploy.utils import Codebase, get_task_type, load_config
def __build_mmpose_task(model_cfg: mmengine.Config,
deploy_cfg: mmengine.Config, device: str,
registry: Registry) -> BaseTask:
task = get_task_type(deploy_cfg)
return registry.module_dict[task.value](model_cfg, deploy_cfg, device)
MMPOSE_TASK = Registry('mmpose_tasks', build_func=__build_mmpose_task)
@CODEBASE.register_module(Codebase.MMPOSE.value, force=True)
class MMPose(MMCodebase):
"""mmpose codebase class."""
task_registry = MMPOSE_TASK
def __init__(self):
super(MMCodebase, self).__init__()
@staticmethod
def build_task_processor(model_cfg: mmengine.Config,
deploy_cfg: mmengine.Config,
device: str) -> BaseTask:
"""The interface to build the task processors of mmpose.
Args:
model_cfg (mmengine.Config): Model config file.
deploy_cfg (mmengine.Config): Deployment config file.
device (str): A string specifying device type.
Returns:
BaseTask: A task processor.
"""
return MMPOSE_TASK.build(model_cfg, deploy_cfg, device)
@staticmethod
def build_dataset(dataset_cfg: Union[str, mmengine.Config],
dataset_type: str = 'test',
**kwargs) -> Dataset:
"""Build dataset for mmpose.
Args:
dataset_cfg (str | mmengine.Config): The input dataset config.
dataset_type (str): A string represents dataset type, e.g.: 'train'
, 'test', 'val'. Defaults to 'test'.
Returns:
Dataset: A PyTorch dataset.
"""
from mmpose.datasets import build_dataset
dataset_cfg = load_config(dataset_cfg)[0]
assert dataset_type in dataset_cfg.data
data_cfg = dataset_cfg.data[dataset_type]
data_cfg.test_mode = True
dataset = build_dataset(data_cfg, dict(test_mode=True))
return dataset
@staticmethod
def build_dataloader(dataset: Dataset,
samples_per_gpu: int,
workers_per_gpu: int,
num_gpus: int = 1,
dist: bool = False,
shuffle: bool = False,
seed: Optional[int] = None,
drop_last: bool = False,
pin_memory: bool = True,
**kwargs) -> DataLoader:
"""Build PyTorch DataLoader.
Args:
dataset (Dataset): A PyTorch dataset.
samples_per_gpu (int): Number of training samples on each GPU,
i.e., batch size of each GPU.
workers_per_gpu (int): How many subprocesses to use for data
loading for each GPU.
num_gpus (int): Number of GPUs. Only used in non-distributed
training.
dist (bool): Distributed training/test or not. Default: True.
shuffle (bool): Whether to shuffle the data at every epoch.
Default: False.
seed (int): An integer set to be seed. Default is ``None``.
drop_last (bool): Whether to drop the last incomplete batch
in epoch. Default: False.
pin_memory (bool): Whether to use pin_memory in DataLoader.
Default: True.
kwargs: Other keyword arguments to be used to initialize
DataLoader.
Returns:
DataLoader: A PyTorch dataloader.
"""
from mmpose.datasets import build_dataloader
return build_dataloader(
dataset,
samples_per_gpu,
workers_per_gpu,
num_gpus=num_gpus,
dist=dist,
shuffle=shuffle,
seed=seed,
drop_last=drop_last,
pin_memory=pin_memory,
**kwargs)
@staticmethod
def single_gpu_test(model: torch.nn.Module, data_loader: DataLoader,
show: bool, out_dir: str, **kwargs) -> list:
"""Run test with single gpu.
Args:
model (torch.nn.Module): Input model from nn.Module.
data_loader (DataLoader): PyTorch data loader.
show (bool): Specifying whether to show plotted results. Defaults
to ``False``.
out_dir (str): A directory to save results, defaults to ``None``.
Returns:
list: The prediction results.
"""
from mmpose.apis import single_gpu_test
return single_gpu_test(model, data_loader)

View File

@ -1,20 +1,19 @@
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import logging
import os
from typing import Any, Dict, Optional, Sequence, Tuple, Union
from collections import defaultdict
from typing import Callable, Dict, Optional, Sequence, Tuple, Union
import mmcv
import mmengine
import numpy as np
import torch
from mmcv.parallel import collate
from torch.utils.data import Dataset
from mmengine.model import BaseDataPreprocessor
from mmengine.registry import Registry
from mmdeploy.codebase.base import BaseTask
from mmdeploy.codebase.mmpose.deploy.mmpose import MMPOSE_TASK
from mmdeploy.utils import Task, get_input_shape
from mmdeploy.codebase.base import CODEBASE, BaseTask, MMCodebase
from mmdeploy.utils import Codebase, Task, get_input_shape, get_root_logger
def process_model_config(
@ -35,12 +34,12 @@ def process_model_config(
mmengine.Config: the model config after processing.
"""
cfg = copy.deepcopy(model_cfg)
test_pipeline = cfg.data.test.pipeline
test_pipeline = cfg.test_dataloader.dataset.pipeline
data_preprocessor = cfg.model.data_preprocessor
sdk_pipeline = []
color_type = 'color'
channel_order = 'rgb'
if input_shape is None:
input_shape = np.array(cfg.data_cfg['image_size'])
input_shape = np.array(cfg.codec.input_size)
idx = 0
while idx < len(test_pipeline):
@ -58,11 +57,9 @@ def process_model_config(
idx = idx + 2
continue
if trans.type == 'LoadImageFromFile':
if 'color_type' in trans:
color_type = trans['color_type'] # NOQA
if 'channel_order' in trans:
channel_order = trans['channel_order']
if trans.type == 'LoadImage':
if not data_preprocessor.bgr_to_rgb:
channel_order = 'bgr'
if trans.type == 'TopDownAffine':
trans['image_size'] = input_shape
if trans.type == 'TopDownGetBboxCenterScale':
@ -70,10 +67,54 @@ def process_model_config(
sdk_pipeline.append(trans)
idx = idx + 1
cfg.data.test.pipeline = sdk_pipeline
cfg.test_dataloader.dataset.pipeline = sdk_pipeline
return cfg
def _get_dataset_metainfo(model_cfg: mmengine.Config):
"""Get metainfo of dataset.
Args:
model_cfg Config: Input model Config object.
Returns:
(list[str], list[np.ndarray]): Class names and palette
"""
from mmpose import datasets # noqa
from mmpose.registry import DATASETS
module_dict = DATASETS.module_dict
for dataloader_name in [
'test_dataloader', 'val_dataloader', 'train_dataloader'
]:
if dataloader_name not in model_cfg:
continue
dataloader_cfg = model_cfg[dataloader_name]
dataset_cfg = dataloader_cfg.dataset
dataset_mmpose = module_dict.get(dataset_cfg.type, None)
if dataset_mmpose is None:
continue
if hasattr(dataset_mmpose, '_load_metainfo') and isinstance(
dataset_mmpose._load_metainfo, Callable):
meta = dataset_mmpose._load_metainfo(
dataset_cfg.get('metainfo', None))
if meta is not None:
return meta
if hasattr(dataset_mmpose, 'METAINFO'):
return dataset_mmpose.METAINFO
return None
MMPOSE_TASK = Registry('mmpose_tasks')
@CODEBASE.register_module(Codebase.MMPOSE.value)
class MMPose(MMCodebase):
"""mmpose codebase class."""
task_registry = MMPOSE_TASK
@MMPOSE_TASK.register_module(Task.POSE_DETECTION.value)
class PoseDetection(BaseTask):
"""Pose detection task class.
@ -89,6 +130,22 @@ class PoseDetection(BaseTask):
device: str):
super().__init__(model_cfg, deploy_cfg, device)
def build_pytorch_model(self,
model_checkpoint: Optional[str] = None,
cfg_options: Optional[Dict] = None,
**kwargs) -> torch.nn.Module:
from mmpose.apis import init_model
from mmpose.utils import register_all_modules
register_all_modules()
self.model_cfg.model.test_cfg['flip_test'] = False
model = init_model(
self.model_cfg,
model_checkpoint,
device=self.device,
cfg_options=cfg_options)
return model
def build_backend_model(self,
model_files: Sequence[str] = None,
**kwargs) -> torch.nn.Module:
@ -101,37 +158,20 @@ class PoseDetection(BaseTask):
nn.Module: An initialized backend model.
"""
from .pose_detection_model import build_pose_detection_model
data_preprocessor = self.model_cfg.model.data_preprocessor
model = build_pose_detection_model(
model_files,
self.model_cfg,
self.deploy_cfg,
device=self.device,
data_preprocessor=data_preprocessor,
**kwargs)
return model.eval()
def build_pytorch_model(self,
model_checkpoint: Optional[str] = None,
**kwargs) -> torch.nn.Module:
"""Initialize torch model.
Args:
model_checkpoint (str): The checkpoint file of torch model,
defaults to `None`.
Returns:
nn.Module: An initialized torch model generated by other OpenMMLab
codebases.
"""
from mmcv.cnn.utils import revert_sync_batchnorm
from mmpose.apis import init_pose_model
model = init_pose_model(self.model_cfg, model_checkpoint, self.device)
model = revert_sync_batchnorm(model)
model.eval()
return model
return model.eval().to(self.device)
def create_input(self,
imgs: Union[str, np.ndarray],
input_shape: Sequence[int] = None,
data_preprocessor: Optional[BaseDataPreprocessor] = None,
**kwargs) -> Tuple[Dict, torch.Tensor]:
"""Create input for pose detection.
@ -142,16 +182,12 @@ class PoseDetection(BaseTask):
format specifying input shape. Defaults to ``None``.
Returns:
tuple: (data, img), meta information for the input image and input.
tuple: (data, inputs), meta information for the input image
and input.
"""
from mmpose.datasets.dataset_info import DatasetInfo
from mmpose.datasets.pipelines import Compose
from mmcv.transforms import Compose
from mmpose.registry import TRANSFORMS
cfg = self.model_cfg
dataset_info = cfg.data.test.dataset_info
dataset_info = DatasetInfo(dataset_info)
if isinstance(imgs, str):
imgs = mmcv.imread(imgs)
height, width = imgs.shape[:2]
@ -160,64 +196,47 @@ class PoseDetection(BaseTask):
bboxes = np.array([box['bbox'] for box in person_results])
# build the data pipeline
test_pipeline = Compose(cfg.test_pipeline)
dataset_name = dataset_info.dataset_name
flip_pairs = dataset_info.flip_pairs
batch_data = []
test_pipeline = [
TRANSFORMS.build(c) for c in cfg.test_dataloader.dataset.pipeline
]
test_pipeline = Compose(test_pipeline)
if input_shape is not None:
image_size = input_shape
else:
image_size = np.array(cfg.data_cfg['image_size'])
if isinstance(cfg.codec, dict):
codec = cfg.codec
elif isinstance(cfg.codec, list):
codec = cfg.codec[0]
else:
raise TypeError(f'Unsupported type {type(cfg.codec)}')
input_size = codec['input_size']
if tuple(input_shape) != tuple(input_size):
logger = get_root_logger()
logger.warning(f'Input shape from deploy config is not '
f'same as input_size in model config:'
f'{input_shape} vs {input_size}')
batch_data = defaultdict(list)
meta_data = _get_dataset_metainfo(self.model_cfg)
for bbox in bboxes:
# prepare data
bbox_score = np.array([bbox[4] if len(bbox) == 5 else 1
]) # shape (1,)
data = {
'img':
imgs,
'bbox_score':
bbox[4] if len(bbox) == 5 else 1,
'bbox_id':
0, # need to be assigned if batch_size > 1
'dataset':
dataset_name,
'joints_3d':
np.zeros((cfg.data_cfg.num_joints, 3), dtype=np.float32),
'joints_3d_visible':
np.zeros((cfg.data_cfg.num_joints, 3), dtype=np.float32),
'rotation':
0,
'ann_info': {
'image_size': np.array(image_size),
'num_joints': cfg.data_cfg['num_joints'],
'flip_pairs': flip_pairs
}
'img': imgs,
'bbox_score': bbox_score,
'bbox': bbox[None], # shape (1, 4)
}
# for compatibility of mmpose
try:
# for mmpose<=v0.25.1
from mmpose.apis.inference import _box2cs
center, scale = _box2cs(cfg, bbox)
data['center'] = center
data['scale'] = scale
except ImportError:
# for mmpose>=v0.26.0
data['bbox'] = bbox
data.update(meta_data)
data = test_pipeline(data)
batch_data.append(data)
data['inputs'] = data['inputs'].to(self.device)
batch_data['inputs'].append(data['inputs'])
batch_data['data_samples'].append(data['data_samples'])
batch_data = collate(batch_data, samples_per_gpu=1)
# scatter not work so just move image to cuda device
batch_data['img'] = batch_data['img'].to(torch.device(self.device))
# get all img_metas of each bounding box
batch_data['img_metas'] = [
img_metas[0] for img_metas in batch_data['img_metas'].data
]
return batch_data, batch_data['img']
if data_preprocessor is not None:
batch_data = data_preprocessor(batch_data, False)
input_tensor = batch_data['inputs']
return batch_data, input_tensor
def visualize(self,
model: torch.nn.Module,
image: Union[str, np.ndarray],
result: list,
output_file: str,
@ -227,7 +246,6 @@ class PoseDetection(BaseTask):
"""Visualize predictions of a model.
Args:
model (nn.Module): Input model.
image (str | np.ndarray): Input image to draw predictions on.
result (list): A list of predictions.
output_file (str): Output file to save drawn image.
@ -236,74 +254,25 @@ class PoseDetection(BaseTask):
show_result (bool): Whether to show result in windows, defaults
to `False`.
"""
from mmpose.datasets.dataset_info import DatasetInfo
dataset_info = self.model_cfg.data.test.dataset_info
dataset_info = DatasetInfo(dataset_info)
skeleton = dataset_info.skeleton
pose_kpt_color = dataset_info.pose_kpt_color
pose_link_color = dataset_info.pose_link_color
if hasattr(model, 'module'):
model = model.module
from mmpose.apis.inference import dataset_meta_from_config
from mmpose.visualization import PoseLocalVisualizer
save_dir, filename = os.path.split(output_file)
name = os.path.splitext(filename)[0]
dataset_meta = dataset_meta_from_config(
self.model_cfg, dataset_mode='test')
visualizer = PoseLocalVisualizer(name=name, save_dir=save_dir)
visualizer.set_dataset_meta(dataset_meta)
if isinstance(image, str):
image = mmcv.imread(image)
# convert result
result = [dict(keypoints=pose) for pose in result['preds']]
model.show_result(
image = mmcv.imread(image, channel_order='rgb')
visualizer.add_datasample(
name,
image,
result,
skeleton=skeleton,
pose_kpt_color=pose_kpt_color,
pose_link_color=pose_link_color,
out_file=output_file,
data_sample=result,
draw_gt=False,
show=show_result,
win_name=window_name)
@staticmethod
def evaluate_outputs(model_cfg: mmengine.Config,
outputs: Sequence,
dataset: Dataset,
metrics: Optional[str] = None,
out: Optional[str] = None,
metric_options: Optional[dict] = None,
format_only: bool = False,
log_file: Optional[str] = None,
**kwargs):
"""Perform post-processing to predictions of model.
Args:
model_cfg (mmengine.Config): The model config.
outputs (list): A list of predictions of model inference.
dataset (Dataset): Input dataset to run test.
metrics (str): Evaluation metrics, which depends on
the codebase and the dataset, e.g., e.g., "mIoU" for generic
datasets, and "cityscapes" for Cityscapes in mmseg.
out (str): Output result file in pickle format, defaults to `None`.
metric_options (dict): Custom options for evaluation, will be
kwargs for dataset.evaluate() function. Defaults to `None`.
format_only (bool): Format the output results without perform
evaluation. It is useful when you want to format the result
to a specific format and submit it to the test server. Defaults
to `False`.
log_file (str | None): The file to write the evaluation results.
Defaults to `None` and the results will only print on stdout.
"""
from mmcv.utils import get_logger
logger = get_logger('test', log_file=log_file, log_level=logging.INFO)
res_folder = '.'
if out:
logger.info(f'\nwriting results to {out}')
mmcv.dump(outputs, out)
res_folder, _ = os.path.split(out)
os.makedirs(res_folder, exist_ok=True)
eval_config = model_cfg.get('evaluation', {}).copy()
if metrics is not None:
eval_config.update(dict(metric=metrics))
results = dataset.evaluate(outputs, res_folder, **eval_config)
for k, v in sorted(results.items()):
logger.info(f'{k}: {v:.4f}')
out_file=output_file)
def get_model_name(self, *args, **kwargs) -> str:
"""Get the model name.
@ -330,9 +299,11 @@ class PoseDetection(BaseTask):
Return:
dict: Composed of the preprocess information.
"""
# TODO: make it work with sdk
input_shape = get_input_shape(self.deploy_cfg)
model_cfg = process_model_config(self.model_cfg, [''], input_shape)
preprocess = model_cfg.data.test.pipeline
preprocess = model_cfg.test_dataloader.dataset.pipeline
preprocess[0].type = 'LoadImageFromFile'
return preprocess
def get_postprocess(self, *args, **kwargs) -> Dict:
@ -343,39 +314,3 @@ class PoseDetection(BaseTask):
'type'] = self.model_cfg.model.keypoint_head.type + 'Decode'
postprocess.update(self.model_cfg.model.test_cfg)
return postprocess
@staticmethod
def get_tensor_from_input(input_data: Dict[str, Any],
**kwargs) -> torch.Tensor:
"""Get input tensor from input data.
Args:
input_data (dict): Input data containing meta info and image
tensor.
Returns:
torch.Tensor: An image in `Tensor`.
"""
img = input_data['img']
if isinstance(img, (list, tuple)):
img = img[0]
return img
@staticmethod
def run_inference(model, model_inputs: Dict[str, torch.Tensor]):
"""Run inference once for a pose model of mmpose.
Args:
model (nn.Module): Input model.
model_inputs (dict): A dict containing model inputs tensor and
meta info.
Returns:
list: The predictions of model inference.
"""
output = model(
**model_inputs,
return_loss=False,
return_heatmap=False,
target=None,
target_weight=None)
return [output]

View File

@ -1,11 +1,15 @@
# Copyright (c) OpenMMLab. All rights reserved.
from itertools import zip_longest
from typing import List, Optional, Sequence, Union
import mmcv
import mmengine
import numpy as np
import torch
from mmcv.utils import Registry
import torch.nn as nn
from mmengine import Config
from mmengine.model import BaseDataPreprocessor
from mmengine.registry import Registry
from mmengine.structures import BaseDataElement
from mmdeploy.codebase.base import BaseBackendModel
from mmdeploy.utils import (Backend, get_backend, get_codebase_config,
@ -16,8 +20,7 @@ def __build_backend_model(cls_name: str, registry: Registry, *args, **kwargs):
return registry.module_dict[cls_name](*args, **kwargs)
__BACKEND_MODEL = mmcv.utils.Registry(
'backend_pose_detectors', build_func=__build_backend_model)
__BACKEND_MODEL = Registry('backend_segmentors')
@__BACKEND_MODEL.register_module('end2end')
@ -41,21 +44,21 @@ class End2EndModel(BaseBackendModel):
device: str,
deploy_cfg: Union[str, mmengine.Config] = None,
model_cfg: Union[str, mmengine.Config] = None,
data_preprocessor: Optional[Union[dict, nn.Module]] = None,
**kwargs):
super(End2EndModel, self).__init__(deploy_cfg=deploy_cfg)
super(End2EndModel, self).__init__(
deploy_cfg=deploy_cfg, data_preprocessor=data_preprocessor)
from mmpose.models import builder
self.deploy_cfg = deploy_cfg
self.model_cfg = model_cfg
self.device = device
self._init_wrapper(
backend=backend,
backend_files=backend_files,
device=device,
**kwargs)
# create base_head for decoding heatmap
base_head = builder.build_head(model_cfg.model.keypoint_head)
base_head.test_cfg = model_cfg.model.test_cfg
self.base_head = base_head
# create head for decoding heatmap
self.head = builder.build_head(model_cfg.model.head)
def _init_wrapper(self, backend, backend_files, device, **kwargs):
"""Initialize backend wrapper.
@ -76,13 +79,17 @@ class End2EndModel(BaseBackendModel):
deploy_cfg=self.deploy_cfg,
**kwargs)
def forward(self, img: torch.Tensor, img_metas: Sequence[Sequence[dict]],
*args, **kwargs):
def forward(self,
inputs: torch.Tensor,
data_samples: Optional[List[BaseDataElement]],
mode: str = 'predict',
**kwargs):
"""Run forward inference.
Args:
img (torch.Tensor): Input image(s) in [N x C x H x W] format.
img_metas (Sequence[Sequence[dict]]): A list of meta info for
inputs (torch.Tensor): Input image(s) in [N x C x H x W]
format.
data_samples (Sequence[Sequence[dict]]): A list of meta info for
image(s).
*args: Other arguments.
**kwargs: Other key-pair arguments.
@ -90,69 +97,65 @@ class End2EndModel(BaseBackendModel):
Returns:
list: A list contains predictions.
"""
batch_size, _, img_height, img_width = img.shape
input_img = img.contiguous()
outputs = self.forward_test(input_img, img_metas, *args, **kwargs)
heatmaps = outputs[0]
key_points = self.base_head.decode(
img_metas, heatmaps, img_size=[img_width, img_height])
return key_points
assert mode == 'predict', \
'Backend model only support mode==predict,' f' but get {mode}'
inputs = inputs.contiguous().to(self.device)
batch_outputs = self.wrapper({self.input_name: inputs})
batch_outputs = self.wrapper.output_to_list(batch_outputs)
batch_heatmaps = batch_outputs[0]
# flip test
test_cfg = self.model_cfg.model.test_cfg
if test_cfg.get('flip_test', False):
from mmpose.models.utils.tta import flip_heatmaps
batch_inputs_flip = inputs.flip(-1).contiguous()
batch_outputs_flip = self.wrapper(
{self.input_name: batch_inputs_flip})
batch_heatmaps_flip = self.wrapper.output_to_list(
batch_outputs_flip)[0]
flip_indices = data_samples[0].metainfo['flip_indices']
batch_heatmaps_flip = flip_heatmaps(
batch_heatmaps_flip,
flip_mode=test_cfg.get('flip_mode', 'heatmap'),
flip_indices=flip_indices,
shift_heatmap=test_cfg.get('shift_heatmap', False))
batch_heatmaps = (batch_heatmaps + batch_heatmaps_flip) * 0.5
results = self.pack_result(batch_heatmaps, data_samples)
return results
def forward_test(self, imgs: torch.Tensor, *args, **kwargs) -> \
List[np.ndarray]:
"""The interface for forward test.
def pack_result(self, heatmaps, data_samples):
preds = self.head.decode(heatmaps)
if isinstance(preds, tuple):
batch_pred_instances, batch_pred_fields = preds
else:
batch_pred_instances = preds
batch_pred_fields = None
assert len(batch_pred_instances) == len(data_samples)
if batch_pred_fields is None:
batch_pred_fields = []
Args:
imgs (torch.Tensor): Input image(s) in [N x C x H x W] format.
for pred_instances, pred_fields, data_sample in zip_longest(
batch_pred_instances, batch_pred_fields, data_samples):
Returns:
List[np.ndarray]: A list of segmentation map.
"""
outputs = self.wrapper({self.input_name: imgs})
outputs = self.wrapper.output_to_list(outputs)
outputs = [out.detach().cpu().numpy() for out in outputs]
return outputs
gt_instances = data_sample.gt_instances
def show_result(self,
img: np.ndarray,
result: list,
win_name: str = '',
skeleton: Optional[Sequence[Sequence[int]]] = None,
pose_kpt_color: Optional[Sequence[Sequence[int]]] = None,
pose_link_color: Optional[Sequence[Sequence[int]]] = None,
show: bool = False,
out_file: Optional[str] = None,
**kwargs):
"""Show predictions of pose.
# convert keypoint coordinates from input space to image space
bbox_centers = gt_instances.bbox_centers
bbox_scales = gt_instances.bbox_scales
input_size = data_sample.metainfo['input_size']
Args:
img: (np.ndarray): Input image to draw predictions.
result (list): A list of predictions.
win_name (str): The name of visualization window. Default is ''.
skeleton (Sequence[Sequence[int]])The connection of keypoints.
skeleton is 0-based indexing.
pose_kpt_color (np.array[Nx3]): Color of N keypoints.
If ``None``, do not draw keypoints.
pose_link_color (np.array[Mx3]): Color of M links.
If ``None``, do not draw links.
show (bool): Whether to show plotted image in windows.
Defaults to ``True``.
out_file (str): Output image file to save drawn predictions.
pred_instances.keypoints = pred_instances.keypoints / input_size \
* bbox_scales + bbox_centers - 0.5 * bbox_scales
Returns:
np.ndarray: Drawn image, only if not ``show`` or ``out_file``.
"""
from mmpose.models.detectors import TopDown
return TopDown.show_result(
self,
img,
result,
skeleton=skeleton,
pose_kpt_color=pose_kpt_color,
pose_link_color=pose_link_color,
show=show,
out_file=out_file,
win_name=win_name)
# add bbox information into pred_instances
pred_instances.bboxes = gt_instances.bboxes
pred_instances.bbox_scores = gt_instances.bbox_scores
data_sample.pred_instances = pred_instances
if pred_fields is not None:
data_sample.pred_fields = pred_fields
return data_samples
@__BACKEND_MODEL.register_module('sdk')
@ -160,7 +163,8 @@ class SDKEnd2EndModel(End2EndModel):
"""SDK inference class, converts SDK output to mmcls format."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
kwargs['data_preprocessor'] = None
super(SDKEnd2EndModel, self).__init__(*args, **kwargs)
self.ext_info = self.deploy_cfg.ext_info
def _xywh2cs(self, x, y, w, h, padding=1.25):
@ -192,11 +196,11 @@ class SDKEnd2EndModel(End2EndModel):
"""convert xywh to x1 y1 x2 y2."""
return x, y, x + w - 1, y + h - 1
def forward(self, img: List[torch.Tensor], *args, **kwargs) -> list:
def forward(self, inputs: List[torch.Tensor], *args, **kwargs) -> list:
"""Run forward inference.
Args:
img (List[torch.Tensor]): A list contains input image(s)
inputs (List[torch.Tensor]): A list contains input image(s)
in [N x C x H x W] format.
*args: Other arguments.
**kwargs: Other key-pair arguments.
@ -205,7 +209,7 @@ class SDKEnd2EndModel(End2EndModel):
list: A list contains predictions.
"""
image_paths = []
boxes = np.zeros(shape=(img.shape[0], 6))
boxes = np.zeros(shape=(inputs.shape[0], 6))
bbox_ids = []
sdk_boxes = []
for i, img_meta in enumerate(kwargs['img_metas']):
@ -219,8 +223,8 @@ class SDKEnd2EndModel(End2EndModel):
image_paths.append(img_meta['image_file'])
bbox_ids.append(img_meta['bbox_id'])
pred = self.wrapper.handle(img[0].contiguous().detach().cpu().numpy(),
sdk_boxes)
pred = self.wrapper.handle(
[inputs[0].contiguous().detach().cpu().numpy()], sdk_boxes)
result = dict(
preds=pred,
@ -230,10 +234,14 @@ class SDKEnd2EndModel(End2EndModel):
return result
def build_pose_detection_model(model_files: Sequence[str],
model_cfg: Union[str, mmengine.Config],
deploy_cfg: Union[str, mmengine.Config],
device: str, **kwargs):
def build_pose_detection_model(
model_files: Sequence[str],
model_cfg: Union[str, mmengine.Config],
deploy_cfg: Union[str, mmengine.Config],
device: str,
data_preprocessor: Optional[Union[Config,
BaseDataPreprocessor]] = None,
**kwargs):
"""Build object segmentation model for different backends.
Args:
@ -247,19 +255,27 @@ def build_pose_detection_model(model_files: Sequence[str],
Returns:
BaseBackendModel: Pose model for a configured backend.
"""
from mmpose.models.data_preprocessors import PoseDataPreprocessor
# load cfg if necessary
deploy_cfg, model_cfg = load_config(deploy_cfg, model_cfg)
backend = get_backend(deploy_cfg)
model_type = get_codebase_config(deploy_cfg).get('model_type', 'end2end')
if isinstance(data_preprocessor, dict):
dp = data_preprocessor.copy()
dp_type = dp.pop('type')
assert dp_type == 'PoseDataPreprocessor'
data_preprocessor = PoseDataPreprocessor(**dp)
backend_pose_model = __BACKEND_MODEL.build(
model_type,
backend=backend,
backend_files=model_files,
device=device,
model_cfg=model_cfg,
deploy_cfg=deploy_cfg,
**kwargs)
dict(
type=model_type,
backend=backend,
backend_files=model_files,
device=device,
deploy_cfg=deploy_cfg,
model_cfg=model_cfg,
data_preprocessor=data_preprocessor,
**kwargs))
return backend_pose_model

View File

@ -1,5 +1,4 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .backbones import * # noqa: F401,F403
from .detectors import * # noqa: F401,F403
from .heads import * # noqa: F401,F403
from .pose_estimators import * # noqa: F401,F403

View File

@ -1,5 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .litehrnet import cross_resolution_weighting__forward
__all__ = ['cross_resolution_weighting__forward']

View File

@ -1,29 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
import torch
import torch.nn.functional as F
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.backbones.litehrnet.CrossResolutionWeighting.forward')
def cross_resolution_weighting__forward(ctx, self, x):
"""Rewrite ``forward`` for default backend.
Rewrite this function to support export ``adaptive_avg_pool2d``.
Args:
x (list): block input.
"""
mini_size = [int(_) for _ in x[-1].shape[-2:]]
out = [F.adaptive_avg_pool2d(s, mini_size) for s in x[:-1]] + [x[-1]]
out = torch.cat(out, dim=1)
out = self.conv1(out)
out = self.conv2(out)
out = torch.split(out, self.channels, dim=1)
out = [
s * F.interpolate(a, size=s.size()[-2:], mode='nearest')
for s, a in zip(x, out)
]
return out

View File

@ -1,5 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .top_down import top_down__forward
__all__ = ['top_down__forward']

View File

@ -1,25 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.detectors.top_down.TopDown.forward')
def top_down__forward(ctx, self, img, *args, **kwargs):
"""Rewrite `forward_test` of TopDown for default backend.'.
Rewrite this function to run the model directly.
Args:
img (torch.Tensor[NxCxHxW]): Input images.
Returns:
torch.Tensor: The predicted heatmaps.
"""
features = self.backbone(img)
if self.with_neck:
features = self.neck(features)
assert self.with_keypoint
output_heatmap = self.keypoint_head.inference_model(
features, flip_pairs=None)
return output_heatmap

View File

@ -1,12 +1,4 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .deeppose_regression_head import deeppose_regression_head__inference_model
from .topdown_heatmap_multi_stage_head import \
topdown_heatmap_msmu_head__inference_model
from .topdown_heatmap_simple_head import \
topdown_heatmap_simple_head__inference_model
from . import heatmap_head, mspn_head, regression_head
__all__ = [
'topdown_heatmap_simple_head__inference_model',
'topdown_heatmap_msmu_head__inference_model',
'deeppose_regression_head__inference_model'
]
__all__ = ['heatmap_head', 'mspn_head', 'regression_head']

View File

@ -1,24 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.DeepposeRegressionHead.inference_model')
def deeppose_regression_head__inference_model(ctx, self, x, flip_pairs=None):
"""Rewrite `forward_test` of TopDown for default backend.
Rewrite this function to run forward directly. And we don't need to
transform result to np.ndarray.
Args:
x (torch.Tensor[N,K,H,W]): Input features.
flip_pairs (None | list[tuple]):
Pairs of keypoints which are mirrored.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
assert flip_pairs is None
output = self.forward(x)
return output

View File

@ -0,0 +1,23 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.heatmap_heads.HeatmapHead.predict')
def heatmap_head__predict(ctx, self, feats, batch_data_samples, test_cfg=None):
"""Rewrite `predict` of HeatmapHead for default backend.
1. skip heatmaps decoding and return heatmaps directly.
Args:
feats (tuple[Tensor]): Input features.
batch_data_samples (list[SampleList]): Data samples contain
image meta information.
test_cfg (ConfigType): test config.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
batch_heatmaps = self.forward(feats)
return batch_heatmaps

View File

@ -0,0 +1,24 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.heatmap_heads.MSPNHead.predict')
def mspn_head__predict(ctx, self, feats, batch_data_samples, test_cfg=None):
"""Rewrite `predict` of HeatmapHead for default backend.
1. skip heatmaps decoding and return heatmaps directly.
Args:
feats (tuple[Tensor]): Input features.
batch_data_samples (list[SampleList]): Data samples contain
image meta information.
test_cfg (ConfigType): test config.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
msmu_batch_heatmaps = self.forward(feats)
batch_heatmaps = msmu_batch_heatmaps[-1]
return batch_heatmaps

View File

@ -0,0 +1,28 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.regression_heads.regression_head'
'.RegressionHead.predict')
def regression_head__predict(ctx,
self,
feats,
batch_data_samples,
test_cfg=None):
"""Rewrite `predict` of RegressionHead for default backend.
1. skip heatmaps decoding and return heatmaps directly.
Args:
feats (tuple[Tensor]): Input features.
batch_data_samples (list[SampleList]): Data samples contain
image meta information.
test_cfg (ConfigType): test config.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
batch_heatmaps = self.forward(feats)
return batch_heatmaps

View File

@ -1,52 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.TopdownHeatmapMSMUHead.inference_model')
def topdown_heatmap_msmu_head__inference_model(ctx, self, x, flip_pairs=None):
"""Rewrite ``inference_model`` for default backend.
Rewrite this function to run forward directly. And we don't need to
transform result to np.ndarray.
Args:
x (list[torch.Tensor[N,K,H,W]]): Input features.
flip_pairs (None | list[tuple]):
Pairs of keypoints which are mirrored.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
assert flip_pairs is None
output = self.forward(x)
assert isinstance(output, list)
output = output[-1]
return output
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.TopdownHeatmapMultiStageHead.inference_model')
def topdown_heatmap_multi_stage_head__inference_model(ctx,
self,
x,
flip_pairs=None):
"""Rewrite ``inference_model`` for default backend.
Rewrite this function to run forward directly. And we don't need to
transform result to np.ndarray.
Args:
x (list[torch.Tensor[N,K,H,W]]): Input features.
flip_pairs (None | list[tuple]):
Pairs of keypoints which are mirrored.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
assert flip_pairs is None
output = self.forward(x)
assert isinstance(output, list)
output = output[-1]
return output

View File

@ -1,27 +0,0 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.heads.TopdownHeatmapSimpleHead.inference_model')
def topdown_heatmap_simple_head__inference_model(ctx,
self,
x,
flip_pairs=None):
"""Rewrite `forward_test` of TopDown for default backend.
Rewrite this function to run forward directly. And we don't need to
transform result to np.ndarray.
Args:
x (torch.Tensor[N,K,H,W]): Input features.
flip_pairs (None | list[tuple]):
Pairs of keypoints which are mirrored.
Returns:
output_heatmap (torch.Tensor): Output heatmaps.
"""
assert flip_pairs is None
output = self.forward(x)
return output

View File

@ -0,0 +1,4 @@
# Copyright (c) OpenMMLab. All rights reserved.
from . import base, topdown
__all__ = ['base', 'topdown']

View File

@ -0,0 +1,36 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.pose_estimators.base.BasePoseEstimator.forward')
def base_pose_estimator__forward(ctx,
self,
inputs,
data_samples=None,
mode='predict',
**kwargs):
"""Rewrite `forward_test` of TopDown for default backend.'.
1. only support mode='predict'.
2. create data_samples if necessary
Args:
inputs (torch.Tensor[NxCxHxW]): Input images.
data_samples (SampleList | None): Data samples contain
image meta information.
Returns:
torch.Tensor: The predicted heatmaps.
"""
if data_samples is None:
from mmpose.structures import PoseDataSample
_, c, h, w = [int(_) for _ in inputs.shape]
metainfo = dict(
img_shape=(h, w, c),
input_size=(w, h),
heatmap_size=self.cfg.codec.heatmap_size)
data_sample = PoseDataSample(metainfo=metainfo)
data_samples = [data_sample]
return self.predict(inputs, data_samples)

View File

@ -0,0 +1,24 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.core import FUNCTION_REWRITER
@FUNCTION_REWRITER.register_rewriter(
'mmpose.models.pose_estimators.topdown.TopdownPoseEstimator.predict')
def topdown_pose_estimator__predict(ctx, self, inputs, data_samples, **kwargs):
"""Rewrite `predict` of TopdownPoseEstimator for default backend.'.
1. skip flip_test
2. avoid call `add_pred_to_datasample`
Args:
inputs (torch.Tensor[NxCxHxW]): Input images.
data_samples (SampleList | None): Data samples contain
image meta information.
Returns:
torch.Tensor: The predicted heatmaps.
"""
assert self.with_head, ('The model must have head to perform prediction.')
feats = self.extract_feat(inputs)
preds = self.head.predict(feats, data_samples, test_cfg=self.test_cfg)
return preds

View File

@ -20,8 +20,8 @@ __all__ = [
'tensor__getattribute__ncnn', 'group_norm__ncnn', 'interpolate__ncnn',
'interpolate__tensorrt', 'linear__ncnn', 'tensor__repeat__tensorrt',
'tensor__size__ncnn', 'topk__dynamic', 'topk__tensorrt', 'chunk__ncnn',
'triu__default', 'atan2__default', 'adaptive_avg_pool2d__default',
'normalize__ncnn', 'expand__ncnn', 'chunk__torchscript',
'masked_fill__onnxruntime', 'tensor__setitem__default',
'adaptive_avg_pool2d__ncnn'
'triu__default', 'atan2__default', 'normalize__ncnn', 'expand__ncnn',
'chunk__torchscript', 'masked_fill__onnxruntime',
'tensor__setitem__default', 'adaptive_avg_pool2d__ncnn',
'adaptive_avg_pool2d__default'
]

View File

@ -4,7 +4,6 @@ globals:
images:
img_human_pose: &img_human_pose ../mmpose/tests/data/coco/000000000785.jpg
img_human_pose_256x192: &img_human_pose_256x192 ./demo/resources/human-pose.jpg
img_blank: &img_blank
metric_info: &metric_info
AP: # named after metafile.Results.Metrics
eval_name: mAP # test.py --metrics args
@ -50,7 +49,6 @@ tensorrt:
openvino:
pipeline_openvino_static_fp32: &pipeline_openvino_static_fp32
convert_image: *convert_image
backend_test: *default_backend_test
deploy_config: configs/mmpose/pose-detection_openvino_static-256x192.py
ncnn:
@ -68,35 +66,43 @@ pplnn:
torchscript:
pipeline_ts_static_fp32: &pipeline_ts_fp32
convert_image: *convert_image
backend_test: False
backend_test: *default_backend_test
sdk_config: *sdk_static
deploy_config: configs/mmpose/pose-detection_torchscript.py
models:
- name: HRNET
metafile: configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/hrnet_coco.yml
metafile: configs/body_2d_keypoint/topdown_heatmap/coco/hrnet_coco.yml
model_configs:
- configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/hrnet_w48_coco_256x192.py
- configs/body_2d_keypoint/topdown_heatmap/coco/td-hm_hrnet-w32_8xb64-210e_coco-256x192.py
pipelines:
- *pipeline_ort_static_fp32
- *pipeline_trt_static_fp16
- *pipeline_ncnn_static_fp32
- *pipeline_openvino_static_fp32
- *pipeline_ts_fp32
- *pipeline_pplnn_static_fp32
- name: LiteHRNet
metafile: configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/litehrnet_coco.yml
metafile: configs/body_2d_keypoint/topdown_heatmap/coco/litehrnet_coco.yml
model_configs:
- configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/litehrnet_30_coco_256x192.py
- configs/body_2d_keypoint/topdown_heatmap/coco/td-hm_litehrnet-30_8xb64-210e_coco-256x192.py
pipelines:
- *pipeline_ort_static_fp32
- *pipeline_trt_static_fp32
- *pipeline_ncnn_static_fp32
- *pipeline_openvino_static_fp32
- *pipeline_ts_fp32
- *pipeline_pplnn_static_fp32
- name: MSPN
metafile: configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/mspn_coco.yml
metafile: configs/body_2d_keypoint/topdown_heatmap/coco/mspn_coco.yml
model_configs:
- configs/body/2d_kpt_sview_rgb_img/topdown_heatmap/coco/4xmspn50_coco_256x192.py
- configs/body_2d_keypoint/topdown_heatmap/coco/td-hm_4xmspn50_8xb32-210e_coco-256x192.py
pipelines:
- *pipeline_ort_static_fp32
- *pipeline_trt_static_fp16
- *pipeline_ncnn_static_fp32
- *pipeline_openvino_static_fp32
- *pipeline_ts_fp32
- *pipeline_pplnn_static_fp32

View File

@ -0,0 +1,8 @@
# Copyright (c) OpenMMLab. All rights reserved.
from .utils import (generate_datasample, generate_mmpose_deploy_config,
generate_mmpose_task_processor)
__all__ = [
'generate_datasample', 'generate_mmpose_deploy_config',
'generate_mmpose_task_processor'
]

View File

@ -1,257 +1,67 @@
# Copyright (c) OpenMMLab. All rights reserved.
# model settings
import mmpose
from packaging import version
channel_cfg = dict(
num_output_channels=17,
dataset_joints=17,
dataset_channel=[
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
],
inference_channel=[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16
])
codec = dict(
type='MSRAHeatmap', input_size=(192, 256), heatmap_size=(48, 64), sigma=2)
test_cfg = dict(
flip_test=False,
flip_mode='heatmap',
shift_heatmap=True,
)
model = dict(
type='TopDown',
pretrained=None,
type='TopdownPoseEstimator',
data_preprocessor=dict(
type='PoseDataPreprocessor',
mean=[123.675, 116.28, 103.53],
std=[58.395, 57.12, 57.375],
bgr_to_rgb=True),
backbone=dict(type='ResNet', depth=18),
keypoint_head=dict(
type='TopdownHeatmapSimpleHead',
head=dict(
type='HeatmapHead',
in_channels=512,
out_channels=17,
loss_keypoint=dict(type='JointsMSELoss', use_target_weight=True)),
train_cfg=dict(),
test_cfg=dict(
flip_test=False,
post_process='default',
shift_heatmap=False,
modulate_kernel=11))
deconv_out_channels=None,
loss=dict(type='KeypointMSELoss', use_target_weight=True),
decoder=codec),
test_cfg=test_cfg)
data_cfg = dict(
image_size=[192, 256],
heatmap_size=[48, 64],
num_output_channels=channel_cfg['num_output_channels'],
num_joints=channel_cfg['dataset_joints'],
dataset_channel=channel_cfg['dataset_channel'],
inference_channel=channel_cfg['inference_channel'],
soft_nms=False,
nms_thr=1.0,
oks_thr=0.9,
vis_thr=0.2,
# here use_gt_bbox must be true in ut, or should use predicted
# bboxes.
use_gt_bbox=True,
det_bbox_thr=0.0,
bbox_file='tests/test_codebase/test_mmpose/data/coco/' +
'person_detection_results' +
'/COCO_val2017_detections_AP_H_56_person.json',
)
# dataset settings
dataset_type = 'CocoDataset'
data_mode = 'topdown'
data_root = 'tests/test_codebase/test_mmpose/data/'
file_client_args = dict(backend='disk')
test_pipeline = [
dict(type='LoadImageFromFile'),
# dict(type='TopDownGetBboxCenterScale'),
dict(type='TopDownAffine'),
dict(type='ToTensor'),
dict(
type='NormalizeTensor',
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
dict(
type='Collect',
keys=['img'],
meta_keys=[
'image_file', 'center', 'scale', 'rotation', 'bbox_score',
'flip_pairs'
]),
dict(type='LoadImage', file_client_args=file_client_args),
dict(type='GetBBoxCenterScale'),
dict(type='TopdownAffine', input_size=codec['input_size']),
dict(type='PackPoseInputs')
]
# compatible with mmpose >=v0.26.0
if version.parse(mmpose.__version__) >= version.parse('0.26.0'):
test_pipeline.insert(1, dict(type='TopDownGetBboxCenterScale'))
dataset_info = dict(
dataset_name='coco',
paper_info=dict(),
keypoint_info={
0:
dict(name='nose', id=0, color=[51, 153, 255], type='upper', swap=''),
1:
dict(
name='left_eye',
id=1,
color=[51, 153, 255],
type='upper',
swap='right_eye'),
2:
dict(
name='right_eye',
id=2,
color=[51, 153, 255],
type='upper',
swap='left_eye'),
3:
dict(
name='left_ear',
id=3,
color=[51, 153, 255],
type='upper',
swap='right_ear'),
4:
dict(
name='right_ear',
id=4,
color=[51, 153, 255],
type='upper',
swap='left_ear'),
5:
dict(
name='left_shoulder',
id=5,
color=[0, 255, 0],
type='upper',
swap='right_shoulder'),
6:
dict(
name='right_shoulder',
id=6,
color=[255, 128, 0],
type='upper',
swap='left_shoulder'),
7:
dict(
name='left_elbow',
id=7,
color=[0, 255, 0],
type='upper',
swap='right_elbow'),
8:
dict(
name='right_elbow',
id=8,
color=[255, 128, 0],
type='upper',
swap='left_elbow'),
9:
dict(
name='left_wrist',
id=9,
color=[0, 255, 0],
type='upper',
swap='right_wrist'),
10:
dict(
name='right_wrist',
id=10,
color=[255, 128, 0],
type='upper',
swap='left_wrist'),
11:
dict(
name='left_hip',
id=11,
color=[0, 255, 0],
type='lower',
swap='right_hip'),
12:
dict(
name='right_hip',
id=12,
color=[255, 128, 0],
type='lower',
swap='left_hip'),
13:
dict(
name='left_knee',
id=13,
color=[0, 255, 0],
type='lower',
swap='right_knee'),
14:
dict(
name='right_knee',
id=14,
color=[255, 128, 0],
type='lower',
swap='left_knee'),
15:
dict(
name='left_ankle',
id=15,
color=[0, 255, 0],
type='lower',
swap='right_ankle'),
16:
dict(
name='right_ankle',
id=16,
color=[255, 128, 0],
type='lower',
swap='left_ankle')
},
skeleton_info={
0:
dict(link=('left_ankle', 'left_knee'), id=0, color=[0, 255, 0]),
1:
dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255, 0]),
2:
dict(link=('right_ankle', 'right_knee'), id=2, color=[255, 128, 0]),
3:
dict(link=('right_knee', 'right_hip'), id=3, color=[255, 128, 0]),
4:
dict(link=('left_hip', 'right_hip'), id=4, color=[51, 153, 255]),
5:
dict(link=('left_shoulder', 'left_hip'), id=5, color=[51, 153, 255]),
6:
dict(link=('right_shoulder', 'right_hip'), id=6, color=[51, 153, 255]),
7:
dict(
link=('left_shoulder', 'right_shoulder'),
id=7,
color=[51, 153, 255]),
8:
dict(link=('left_shoulder', 'left_elbow'), id=8, color=[0, 255, 0]),
9:
dict(
link=('right_shoulder', 'right_elbow'), id=9, color=[255, 128, 0]),
10:
dict(link=('left_elbow', 'left_wrist'), id=10, color=[0, 255, 0]),
11:
dict(link=('right_elbow', 'right_wrist'), id=11, color=[255, 128, 0]),
12:
dict(link=('left_eye', 'right_eye'), id=12, color=[51, 153, 255]),
13:
dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]),
14:
dict(link=('nose', 'right_eye'), id=14, color=[51, 153, 255]),
15:
dict(link=('left_eye', 'left_ear'), id=15, color=[51, 153, 255]),
16:
dict(link=('right_eye', 'right_ear'), id=16, color=[51, 153, 255]),
17:
dict(link=('left_ear', 'left_shoulder'), id=17, color=[51, 153, 255]),
18:
dict(
link=('right_ear', 'right_shoulder'), id=18, color=[51, 153, 255])
},
joint_weights=[
1., 1., 1., 1., 1., 1., 1., 1.2, 1.2, 1.5, 1.5, 1., 1., 1.2, 1.2, 1.5,
1.5
],
sigmas=[
0.026, 0.025, 0.025, 0.035, 0.035, 0.079, 0.079, 0.072, 0.072, 0.062,
0.062, 0.107, 0.107, 0.087, 0.087, 0.089, 0.089
])
data = dict(
samples_per_gpu=64,
workers_per_gpu=2,
test_dataloader=dict(samples_per_gpu=32),
test=dict(
type='TopDownCocoDataset',
ann_file='tests/test_codebase/test_mmpose/data/annotations/' +
'person_keypoints_val2017.json',
img_prefix='tests/test_codebase/test_mmpose/data/val2017/',
data_cfg=data_cfg,
val_dataloader = dict(
batch_size=1,
num_workers=1,
persistent_workers=True,
drop_last=False,
sampler=dict(type='DefaultSampler', shuffle=False),
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='annotations/person_keypoints_val2017.json',
data_prefix=dict(img='val2017/'),
test_mode=True,
lazy_init=True,
serialize_data=False,
pipeline=test_pipeline,
dataset_info=dataset_info),
)
))
test_dataloader = val_dataloader
val_evaluator = dict(
type='CocoMetric',
ann_file=data_root + 'annotations/person_keypoints_val2017.json')
test_evaluator = val_evaluator
# default_runtime
default_scope = 'mmpose'
default_hooks = dict()
vis_backends = [dict(type='LocalVisBackend')]
visualizer = dict(
type='PoseLocalVisualizer', vis_backends=vis_backends, name='visualizer')

View File

@ -1,11 +1,10 @@
# Copyright (c) OpenMMLab. All rights reserved.
import mmcv
import numpy as np
import pytest
import torch
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Backend, Codebase, Task
from mmdeploy.utils import Backend, Codebase
from mmdeploy.utils.test import WrapModel, check_backend, get_rewrite_outputs
try:
@ -14,142 +13,62 @@ except ImportError:
pytest.skip(
f'{Codebase.MMPOSE} is not installed.', allow_module_level=True)
from .utils import generate_mmpose_deploy_config # noqa: E402
from .utils import generate_mmpose_task_processor # noqa: E402
def get_top_down_heatmap_simple_head_model():
from mmpose.models.heads import TopdownHeatmapSimpleHead
model = TopdownHeatmapSimpleHead(
def get_heatmap_head():
from mmpose.models.heads import HeatmapHead
model = HeatmapHead(
2,
4,
num_deconv_filters=(16, 16, 16),
loss_keypoint=dict(type='JointsMSELoss', use_target_weight=False))
deconv_out_channels=(16, 16, 16),
loss=dict(type='KeypointMSELoss', use_target_weight=False))
model.requires_grad_(False)
return model
@pytest.mark.parametrize('backend_type',
[Backend.ONNXRUNTIME, Backend.TENSORRT])
def test_top_down_heatmap_simple_head_inference_model(backend_type: Backend):
@pytest.mark.parametrize('backend_type', [Backend.ONNXRUNTIME])
def test_heatmaphead_predict(backend_type: Backend):
check_backend(backend_type, True)
model = get_top_down_heatmap_simple_head_model()
model = get_heatmap_head()
model.cpu().eval()
if backend_type == Backend.TENSORRT:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(
type=backend_type.value,
common_config=dict(max_workspace_size=1 << 30),
model_inputs=[
dict(
input_shapes=dict(
input=dict(
min_shape=[1, 3, 32, 48],
opt_shape=[1, 3, 32, 48],
max_shape=[1, 3, 32, 48])))
]),
onnx_config=dict(
input_shape=[32, 48], output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
else:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=backend_type.value),
onnx_config=dict(input_shape=None, output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
img = torch.rand((1, 2, 32, 48))
model_outputs = model.inference_model(img)
wrapped_model = WrapModel(model, 'inference_model')
rewrite_inputs = {'x': img}
rewrite_outputs, is_backend_output = get_rewrite_outputs(
deploy_cfg = generate_mmpose_deploy_config(backend_type.value)
feats = [torch.rand(1, 2, 32, 48)]
wrapped_model = WrapModel(model, 'predict', batch_data_samples=None)
rewrite_inputs = {'feats': feats}
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=rewrite_inputs,
deploy_cfg=deploy_cfg)
if isinstance(rewrite_outputs, dict):
rewrite_outputs = rewrite_outputs['output']
for model_output, rewrite_output in zip(model_outputs, rewrite_outputs):
if isinstance(rewrite_output, torch.Tensor):
rewrite_output = rewrite_output.cpu().numpy()
assert np.allclose(
model_output, rewrite_output, rtol=1e-03, atol=1e-05)
deploy_cfg=deploy_cfg,
run_with_backend=False)
assert isinstance(rewrite_outputs, torch.Tensor)
def get_top_down_heatmap_msmu_head_model():
class DummyMSMUHead(torch.nn.Module):
def __init__(self, out_shape):
from mmpose.models.heads import TopdownHeatmapMSMUHead
super().__init__()
self.model = TopdownHeatmapMSMUHead(
out_shape,
unit_channels=2,
out_channels=17,
num_stages=1,
num_units=1,
loss_keypoint=dict(
type='JointsMSELoss', use_target_weight=False))
def inference_model(self, x):
assert isinstance(x, torch.Tensor)
return self.model.inference_model([[x]], flip_pairs=None)
model = DummyMSMUHead((32, 48))
def get_msmu_head():
from mmpose.models.heads import MSPNHead
model = MSPNHead(
num_stages=1, num_units=1, out_shape=(32, 48), unit_channels=16)
model.requires_grad_(False)
return model
@pytest.mark.parametrize('backend_type',
[Backend.ONNXRUNTIME, Backend.TENSORRT])
def test_top_down_heatmap_msmu_head_inference_model(backend_type: Backend):
@pytest.mark.parametrize('backend_type', [Backend.ONNXRUNTIME])
def test_msmuhead_predict(backend_type: Backend):
check_backend(backend_type, True)
model = get_top_down_heatmap_msmu_head_model()
model = get_msmu_head()
model.cpu().eval()
if backend_type == Backend.TENSORRT:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(
type=backend_type.value,
common_config=dict(max_workspace_size=1 << 30),
model_inputs=[
dict(
input_shapes=dict(
input=dict(
min_shape=[1, 3, 32, 48],
opt_shape=[1, 3, 32, 48],
max_shape=[1, 3, 32, 48])))
]),
onnx_config=dict(
input_shape=[32, 48], output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
else:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=backend_type.value),
onnx_config=dict(input_shape=None, output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
img = torch.rand((1, 2, 32, 48))
model_outputs = model.inference_model(img)
wrapped_model = WrapModel(model, 'inference_model')
rewrite_inputs = {'x': img}
rewrite_outputs, is_backend_output = get_rewrite_outputs(
deploy_cfg = generate_mmpose_deploy_config(backend_type.value)
feats = [[torch.rand(1, 16, 32, 48)]]
wrapped_model = WrapModel(model, 'predict', batch_data_samples=None)
rewrite_inputs = {'feats': feats}
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=rewrite_inputs,
deploy_cfg=deploy_cfg)
if isinstance(rewrite_outputs, dict):
rewrite_outputs = rewrite_outputs['output']
for model_output, rewrite_output in zip(model_outputs, rewrite_outputs):
if isinstance(rewrite_output, torch.Tensor):
rewrite_output = rewrite_output.cpu().numpy()
assert np.allclose(
model_output, rewrite_output, rtol=1e-03, atol=1e-05)
deploy_cfg=deploy_cfg,
run_with_backend=False)
assert isinstance(rewrite_outputs, torch.Tensor)
def get_cross_resolution_weighting_model():
@ -170,29 +89,13 @@ def get_cross_resolution_weighting_model():
return model
@pytest.mark.parametrize('backend_type', [Backend.ONNXRUNTIME, Backend.NCNN])
@pytest.mark.parametrize('backend_type', [Backend.ONNXRUNTIME])
def test_cross_resolution_weighting_forward(backend_type: Backend):
check_backend(backend_type, True)
model = get_cross_resolution_weighting_model()
model.cpu().eval()
imgs = torch.rand(1, 16, 16, 16)
if backend_type == Backend.NCNN:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=backend_type.value, use_vulkan=False),
onnx_config=dict(input_shape=None, output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
else:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=backend_type.value),
onnx_config=dict(input_shape=None, output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
deploy_cfg = generate_mmpose_deploy_config(backend_type.value)
rewrite_inputs = {'x': imgs}
model_outputs = model.forward(imgs)
wrapped_model = WrapModel(model, 'forward')
@ -210,86 +113,19 @@ def test_cross_resolution_weighting_forward(backend_type: Backend):
model_output, rewrite_output, rtol=1e-03, atol=1e-05)
def get_top_down_model():
from mmpose.models.detectors.top_down import TopDown
model_cfg = dict(
type='TopDown',
pretrained=None,
backbone=dict(type='ResNet', depth=18),
keypoint_head=dict(
type='TopdownHeatmapSimpleHead',
in_channels=512,
out_channels=17,
loss_keypoint=dict(type='JointsMSELoss', use_target_weight=True)),
train_cfg=dict(),
test_cfg=dict(
flip_test=False,
post_process='default',
shift_heatmap=False,
modulate_kernel=11))
model = TopDown(model_cfg['backbone'], None, model_cfg['keypoint_head'],
model_cfg['train_cfg'], model_cfg['test_cfg'],
model_cfg['pretrained'])
model.requires_grad_(False)
return model
@pytest.mark.parametrize('backend_type',
[Backend.ONNXRUNTIME, Backend.TENSORRT])
def test_top_down_forward(backend_type: Backend):
@pytest.mark.parametrize('backend_type', [Backend.ONNXRUNTIME])
def test_estimator_forward(backend_type: Backend):
check_backend(backend_type, True)
model = get_top_down_model()
deploy_cfg = generate_mmpose_deploy_config(backend_type.value)
task_processor = generate_mmpose_task_processor(deploy_cfg=deploy_cfg)
model = task_processor.build_pytorch_model()
model.requires_grad_(False)
model.cpu().eval()
if backend_type == Backend.TENSORRT:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(
type=backend_type.value,
common_config=dict(max_workspace_size=1 << 30),
model_inputs=[
dict(
input_shapes=dict(
input=dict(
min_shape=[1, 3, 32, 32],
opt_shape=[1, 3, 32, 32],
max_shape=[1, 3, 32, 32])))
]),
onnx_config=dict(
input_shape=[32, 32], output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
else:
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=backend_type.value),
onnx_config=dict(input_shape=None, output_names=['output']),
codebase_config=dict(
type=Codebase.MMPOSE.value,
task=Task.POSE_DETECTION.value)))
img = torch.rand((1, 3, 32, 32))
img_metas = {
'image_file':
'tests/test_codebase/test_mmpose' + '/data/imgs/dataset/blank.jpg',
'center': torch.tensor([0.5, 0.5]),
'scale': 1.,
'location': torch.tensor([0.5, 0.5]),
'bbox_score': 0.5
}
model_outputs = model.forward(
img, img_metas=[img_metas], return_loss=False, return_heatmap=True)
model_outputs = model_outputs['output_heatmap']
wrapped_model = WrapModel(model, 'forward', return_loss=False)
rewrite_inputs = {'img': img}
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_model = WrapModel(model, 'forward', data_samples=None)
rewrite_inputs = {'inputs': torch.rand(1, 3, 256, 192)}
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=rewrite_inputs,
run_with_backend=False,
deploy_cfg=deploy_cfg)
if isinstance(rewrite_outputs, dict):
rewrite_outputs = rewrite_outputs['output']
for model_output, rewrite_output in zip(model_outputs, rewrite_outputs):
if isinstance(rewrite_output, torch.Tensor):
rewrite_output = rewrite_output.cpu().numpy()
assert np.allclose(
model_output, rewrite_output, rtol=1e-03, atol=1e-05)
assert isinstance(rewrite_outputs, torch.Tensor)

View File

@ -1,17 +1,14 @@
# Copyright (c) OpenMMLab. All rights reserved.
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import mmcv
import numpy as np
import pytest
import torch
import mmdeploy.backend.onnxruntime as ort_apis
from mmdeploy.apis import build_task_processor
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Backend, Codebase, Task, load_config
from mmdeploy.utils.test import DummyModel, SwitchBackendWrapper
from mmdeploy.utils import Codebase, load_config
from mmdeploy.utils.test import SwitchBackendWrapper
try:
import_codebase(Codebase.MMPOSE)
@ -19,56 +16,34 @@ except ImportError:
pytest.skip(
f'{Codebase.MMPOSE.value} is not installed.', allow_module_level=True)
from .utils import (generate_datasample, generate_mmpose_deploy_config,
generate_mmpose_task_processor)
model_cfg_path = 'tests/test_codebase/test_mmpose/data/model.py'
model_cfg = load_config(model_cfg_path)[0]
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmpose', task='PoseDetection'),
onnx_config=dict(
type='onnx',
export_params=True,
keep_initializers_as_inputs=False,
opset_version=11,
save_file='end2end.onnx',
input_names=['input'],
output_names=['output'],
input_shape=None)))
deploy_cfg = generate_mmpose_deploy_config()
onnx_file = NamedTemporaryFile(suffix='.onnx').name
task_processor = build_task_processor(model_cfg, deploy_cfg, 'cpu')
task_processor = generate_mmpose_task_processor()
img_shape = (192, 256)
heatmap_shape = (48, 64)
# mmpose.apis.inference.LoadImage uses opencv, needs float32 in
# cv2.cvtColor.
img = np.random.rand(*img_shape, 3).astype(np.float32)
num_output_channels = model_cfg['data_cfg']['num_output_channels']
img_path = 'tests/data/tiger.jpeg'
num_output_channels = 17
def test_create_input():
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=Backend.ONNXRUNTIME.value),
codebase_config=dict(
type=Codebase.MMPOSE.value, task=Task.POSE_DETECTION.value),
onnx_config=dict(
type='onnx',
export_params=True,
keep_initializers_as_inputs=False,
opset_version=11,
save_file='end2end.onnx',
input_names=['input'],
output_names=['output'],
input_shape=None)))
task_processor = build_task_processor(model_cfg, deploy_cfg, 'cpu')
inputs = task_processor.create_input(img, input_shape=img_shape)
@pytest.mark.parametrize('imgs', [img, img_path])
def test_create_input(imgs):
inputs = task_processor.create_input(imgs, input_shape=img_shape)
assert isinstance(inputs, tuple) and len(inputs) == 2
def test_build_pytorch_model():
from mmpose.models.detectors.base import BasePose
from mmpose.models.pose_estimators.base import BasePoseEstimator
model = task_processor.build_pytorch_model(None)
assert isinstance(model, BasePose)
assert isinstance(model, BasePoseEstimator)
@pytest.fixture
@ -89,25 +64,18 @@ def test_build_backend_model(backend_model):
assert isinstance(backend_model, torch.nn.Module)
def test_run_inference(backend_model):
input_dict, _ = task_processor.create_input(img, input_shape=img_shape)
results = task_processor.run_inference(backend_model, input_dict)
assert results is not None
def test_visualize(backend_model):
input_dict, _ = task_processor.create_input(img, input_shape=img_shape)
results = task_processor.run_inference(backend_model, input_dict)
with TemporaryDirectory() as dir:
filename = dir + 'tmp.jpg'
task_processor.visualize(backend_model, img, results[0], filename, '')
assert os.path.exists(filename)
def test_visualize():
datasample = generate_datasample(img.shape[:2])
output_file = NamedTemporaryFile(suffix='.jpg').name
task_processor.visualize(
img, datasample, output_file, show_result=False, window_name='test')
def test_get_tensor_from_input():
input_data = {'img': torch.ones(3, 4, 5)}
data = torch.ones(3, 4, 5)
input_data = {'inputs': data}
inputs = task_processor.get_tensor_from_input(input_data)
assert torch.equal(inputs, torch.ones(3, 4, 5))
assert torch.equal(inputs, data)
def test_get_partition_cfg():
@ -124,24 +92,26 @@ def test_get_model_name():
def test_build_dataset_and_dataloader():
from torch.utils.data import DataLoader, Dataset
val_dataloader = model_cfg['val_dataloader']
dataset = task_processor.build_dataset(
dataset_cfg=model_cfg, dataset_type='test')
dataset_cfg=val_dataloader['dataset'])
assert isinstance(dataset, Dataset), 'Failed to build dataset'
dataloader = task_processor.build_dataloader(dataset, 1, 1)
dataloader = task_processor.build_dataloader(val_dataloader)
assert isinstance(dataloader, DataLoader), 'Failed to build dataloader'
def test_single_gpu_test_and_evaluate():
from mmcv.parallel import MMDataParallel
dataset = task_processor.build_dataset(
dataset_cfg=model_cfg, dataset_type='test')
dataloader = task_processor.build_dataloader(dataset, 1, 1)
def test_build_test_runner(backend_model):
from mmdeploy.codebase.base.runner import DeployTestRunner
temp_dir = TemporaryDirectory().name
runner = task_processor.build_test_runner(backend_model, temp_dir)
assert isinstance(runner, DeployTestRunner)
# Prepare dummy model
model = DummyModel(outputs=[torch.rand([1, 1000])])
model = MMDataParallel(model, device_ids=[0])
assert model is not None
# Run test
outputs = task_processor.single_gpu_test(model, dataloader)
assert outputs is not None
task_processor.evaluate_outputs(model_cfg, outputs, dataset)
def test_get_preprocess():
process = task_processor.get_preprocess()
assert process is not None
def test_get_postprocess():
process = task_processor.get_postprocess()
assert isinstance(process, dict)

View File

@ -1,15 +1,10 @@
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
from tempfile import NamedTemporaryFile
import mmcv
import numpy as np
import pytest
import torch
import mmdeploy.backend.onnxruntime as ort_apis
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Backend, Codebase
from mmdeploy.utils import Backend, Codebase, load_config
from mmdeploy.utils.test import SwitchBackendWrapper, backend_checker
IMAGE_H = 192
@ -21,6 +16,9 @@ except ImportError:
pytest.skip(
f'{Codebase.MMPOSE} is not installed.', allow_module_level=True)
from .utils import generate_datasample # noqa: E402
from .utils import generate_mmpose_deploy_config # noqa: E402
@backend_checker(Backend.ONNXRUNTIME)
class TestEnd2EndModel:
@ -34,19 +32,15 @@ class TestEnd2EndModel:
# simplify backend inference
cls.wrapper = SwitchBackendWrapper(ORTWrapper)
cls.outputs = {
'outputs': torch.rand(1, 1, IMAGE_H, IMAGE_W),
'output': torch.rand(1, 1, IMAGE_H, IMAGE_W),
}
cls.wrapper.set(outputs=cls.outputs)
deploy_cfg = mmcv.Config(
{'onnx_config': {
'output_names': ['outputs']
}})
from mmdeploy.utils import load_config
model_cfg_path = 'tests/test_codebase/test_mmpose/data/model.py'
model_cfg = load_config(model_cfg_path)[0]
from mmdeploy.codebase.mmpose.deploy.pose_detection_model import \
End2EndModel
model_cfg_path = 'tests/test_codebase/test_mmpose/data/model.py'
model_cfg = load_config(model_cfg_path)[0]
deploy_cfg = generate_mmpose_deploy_config()
cls.end2end_model = End2EndModel(
Backend.ONNXRUNTIME, [''],
device='cpu',
@ -59,45 +53,17 @@ class TestEnd2EndModel:
def test_forward(self):
img = torch.rand(1, 3, IMAGE_H, IMAGE_W)
img_metas = [{
'image_file':
'tests/test_codebase/test_mmpose' + '/data/imgs/dataset/blank.jpg',
'center': torch.tensor([0.5, 0.5]),
'scale': 1.,
'location': torch.tensor([0.5, 0.5]),
'bbox_score': 0.5
}]
results = self.end2end_model.forward(img, img_metas)
data_samples = [generate_datasample((IMAGE_H, IMAGE_W))]
results = self.end2end_model.forward(img, data_samples)
assert results is not None, 'failed to get output using '\
'End2EndModel'
def test_forward_test(self):
imgs = torch.rand(2, 3, IMAGE_H, IMAGE_W)
results = self.end2end_model.forward_test(imgs)
assert isinstance(results[0], np.ndarray)
def test_show_result(self):
input_img = np.zeros([IMAGE_H, IMAGE_W, 3])
img_path = NamedTemporaryFile(suffix='.jpg').name
pred_bbox = torch.rand(1, 5)
pred_keypoint = torch.rand((1, 10, 2))
result = [{'bbox': pred_bbox, 'keypoints': pred_keypoint}]
self.end2end_model.show_result(
input_img, result, '', show=False, out_file=img_path)
assert osp.exists(img_path), 'Fails to create drawn image.'
@backend_checker(Backend.ONNXRUNTIME)
def test_build_pose_detection_model():
from mmdeploy.utils import load_config
model_cfg_path = 'tests/test_codebase/test_mmpose/data/model.py'
model_cfg = load_config(model_cfg_path)[0]
deploy_cfg = mmcv.Config(
dict(
backend_config=dict(type=Backend.ONNXRUNTIME.value),
onnx_config=dict(output_names=['outputs']),
codebase_config=dict(type=Codebase.MMPOSE.value)))
deploy_cfg = generate_mmpose_deploy_config()
from mmdeploy.backend.onnxruntime import ORTWrapper
ort_apis.__dict__.update({'ORTWrapper': ORTWrapper})

View File

@ -0,0 +1,64 @@
# Copyright (c) OpenMMLab. All rights reserved.
import mmengine
import torch
from mmengine.structures import InstanceData, PixelData
from mmdeploy.apis import build_task_processor
from mmdeploy.utils import IR, Backend, Codebase, Task, load_config
def generate_datasample(img_size, heatmap_size=(64, 48)):
from mmpose.structures import PoseDataSample
h, w = img_size[:2]
metainfo = dict(
img_shape=(h, w, 3),
crop_size=(h, w),
input_size=(h, w),
heatmap_size=heatmap_size)
pred_instances = InstanceData()
pred_instances.bboxes = torch.rand((1, 4)).numpy()
pred_instances.bbox_scales = torch.ones(1, 2).numpy()
pred_instances.bbox_scores = torch.ones(1).numpy()
pred_instances.bbox_centers = torch.ones(1, 2).numpy()
pred_instances.keypoints = torch.rand((1, 17, 2))
pred_instances.keypoints_visible = torch.rand((1, 17, 1))
gt_fields = PixelData()
gt_fields.heatmaps = torch.rand((17, 64, 48))
data_sample = PoseDataSample(metainfo=metainfo)
data_sample.pred_instances = pred_instances
data_sample.gt_instances = pred_instances
data_sample.gt_fields = gt_fields
return data_sample
def generate_mmpose_deploy_config(backend=Backend.ONNXRUNTIME.value,
cfg_options=None):
deploy_cfg = mmengine.Config(
dict(
backend_config=dict(type=backend),
codebase_config=dict(
type=Codebase.MMPOSE.value, task=Task.POSE_DETECTION.value),
onnx_config=dict(
type=IR.ONNX.value,
export_params=True,
keep_initializers_as_inputs=False,
opset_version=11,
input_shape=None,
input_names=['input'],
output_names=['output'])))
if cfg_options is not None:
deploy_cfg.update(cfg_options)
return deploy_cfg
def generate_mmpose_task_processor(model_cfg=None, deploy_cfg=None):
if model_cfg is None:
model_cfg = 'tests/test_codebase/test_mmpose/data/model.py'
if deploy_cfg is None:
deploy_cfg = generate_mmpose_deploy_config()
model_cfg, deploy_cfg = load_config(model_cfg, deploy_cfg)
task_processor = build_task_processor(model_cfg, deploy_cfg, 'cpu')
return task_processor

View File

@ -874,7 +874,7 @@ def get_backend_result(pipeline_info: dict, model_cfg_path: Path,
f'--device {device_type} ' \
'--log-level INFO'
if sdk_config is not None:
if sdk_config is not None and test_type == 'precision':
cmd_str += ' --dump-info'
if test_img_path is not None:

View File

@ -109,6 +109,7 @@ def main():
# prepare the dataset loader
test_dataloader = deepcopy(model_cfg['test_dataloader'])
test_dataloader['batch_size'] = args.batch_size
dataset = task_processor.build_dataset(test_dataloader['dataset'])
test_dataloader['dataset'] = dataset
dataloader = task_processor.build_dataloader(test_dataloader)