464 lines
15 KiB
Python
464 lines
15 KiB
Python
# Copyright (c) OpenMMLab. All rights reserved.
|
|
import tempfile
|
|
|
|
import mmengine
|
|
import numpy as np
|
|
import pytest
|
|
import torch
|
|
|
|
from mmdeploy.codebase import import_codebase
|
|
from mmdeploy.core import RewriterContext, patch_model
|
|
from mmdeploy.utils import Backend, Codebase
|
|
from mmdeploy.utils.test import (WrapModel, check_backend, get_model_outputs,
|
|
get_rewrite_outputs)
|
|
|
|
try:
|
|
import_codebase(Codebase.MMOCR)
|
|
except ImportError:
|
|
pytest.skip(f'{Codebase.MMOCR} is not installed.', allow_module_level=True)
|
|
|
|
from mmocr.models.textdet.necks import FPNC
|
|
|
|
dictionary = dict(
|
|
type='Dictionary',
|
|
dict_file='tests/test_codebase/test_mmocr/data/lower_english_digits.txt',
|
|
with_padding=True)
|
|
|
|
|
|
class FPNCNeckModel(FPNC):
|
|
|
|
def __init__(self, in_channels, init_cfg=None):
|
|
super().__init__(in_channels, init_cfg=init_cfg)
|
|
self.in_channels = in_channels
|
|
self.neck = FPNC(in_channels, init_cfg=init_cfg)
|
|
|
|
def forward(self, inputs):
|
|
neck_inputs = [
|
|
inputs.repeat([1, channel, 1, 1]) for channel in self.in_channels
|
|
]
|
|
output = self.neck.forward(neck_inputs)
|
|
return output
|
|
|
|
|
|
def get_bidirectionallstm_model():
|
|
from mmocr.models.textrecog.layers.lstm_layer import BidirectionalLSTM
|
|
model = BidirectionalLSTM(32, 16, 16)
|
|
|
|
model.requires_grad_(False)
|
|
return model
|
|
|
|
|
|
def get_single_stage_text_detector_model():
|
|
from mmocr.models.textdet import SingleStageTextDetector
|
|
backbone = dict(
|
|
type='mmdet.ResNet',
|
|
depth=18,
|
|
num_stages=4,
|
|
out_indices=(0, 1, 2, 3),
|
|
frozen_stages=-1,
|
|
norm_cfg=dict(type='BN', requires_grad=True),
|
|
init_cfg=dict(type='Pretrained', checkpoint='torchvision://resnet18'),
|
|
norm_eval=False,
|
|
style='caffe')
|
|
neck = dict(
|
|
type='FPNC', in_channels=[64, 128, 256, 512], lateral_channels=256)
|
|
det_head = dict(
|
|
type='DBHead',
|
|
in_channels=256,
|
|
module_loss=dict(type='DBModuleLoss'),
|
|
postprocessor=dict(type='DBPostprocessor', text_repr_type='quad'))
|
|
model = SingleStageTextDetector(backbone, det_head, neck)
|
|
|
|
model.requires_grad_(False)
|
|
return model
|
|
|
|
|
|
def get_crnn_decoder_model(rnn_flag):
|
|
from mmocr.models.textrecog.decoders import CRNNDecoder
|
|
model = CRNNDecoder(32, dictionary, rnn_flag=rnn_flag)
|
|
|
|
model.requires_grad_(False)
|
|
return model
|
|
|
|
|
|
def get_fpnc_neck_model():
|
|
model = FPNCNeckModel([2, 4, 8, 16])
|
|
|
|
model.requires_grad_(False)
|
|
return model
|
|
|
|
|
|
def get_base_recognizer_model():
|
|
from mmocr.models.textrecog.recognizers import CRNN
|
|
|
|
cfg = dict(
|
|
preprocessor=None,
|
|
backbone=dict(type='MiniVGG', leaky_relu=False, input_channels=1),
|
|
encoder=None,
|
|
decoder=dict(
|
|
type='CRNNDecoder',
|
|
in_channels=512,
|
|
rnn_flag=True,
|
|
module_loss=dict(type='CTCModuleLoss', letter_case='lower'),
|
|
postprocessor=dict(type='CTCPostProcessor'),
|
|
dictionary=dictionary),
|
|
data_preprocessor=dict(
|
|
type='mmocr.TextRecogDataPreprocessor', mean=[127], std=[127]))
|
|
model = CRNN(
|
|
backbone=cfg['backbone'],
|
|
encoder=None,
|
|
decoder=cfg['decoder'],
|
|
data_preprocessor=cfg['data_preprocessor'])
|
|
model.requires_grad_(False)
|
|
return model
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.NCNN])
|
|
def test_bidirectionallstm(backend: Backend):
|
|
"""Test forward rewrite of bidirectionallstm."""
|
|
check_backend(backend)
|
|
bilstm = get_bidirectionallstm_model()
|
|
bilstm.cpu().eval()
|
|
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(output_names=['output'], input_shape=None),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextRecognition',
|
|
)))
|
|
|
|
input = torch.rand(1, 1, 32)
|
|
|
|
# to get outputs of pytorch model
|
|
model_inputs = {
|
|
'input': input,
|
|
}
|
|
model_outputs = get_model_outputs(bilstm, 'forward', model_inputs)
|
|
|
|
# to get outputs of onnx model after rewrite
|
|
wrapped_model = WrapModel(bilstm, 'forward')
|
|
rewrite_inputs = {'input': input}
|
|
rewrite_outputs, is_backend_output = get_rewrite_outputs(
|
|
wrapped_model=wrapped_model,
|
|
model_inputs=rewrite_inputs,
|
|
deploy_cfg=deploy_cfg,
|
|
run_with_backend=True)
|
|
if is_backend_output:
|
|
model_output = model_outputs.cpu().numpy()
|
|
rewrite_output = rewrite_outputs[0].cpu().numpy()
|
|
assert np.allclose(model_output, rewrite_output, rtol=1e-3, atol=1e-4)
|
|
else:
|
|
assert rewrite_outputs is not None
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
|
|
def test_simple_test_of_single_stage_text_detector(backend: Backend):
|
|
"""Test simple_test single_stage_text_detector."""
|
|
check_backend(backend)
|
|
single_stage_text_detector = get_single_stage_text_detector_model()
|
|
single_stage_text_detector.eval()
|
|
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(input_shape=None),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextDetection',
|
|
)))
|
|
|
|
input = torch.rand(1, 3, 64, 64)
|
|
model_outputs = single_stage_text_detector._forward(input)
|
|
|
|
wrapped_model = WrapModel(single_stage_text_detector, '_forward')
|
|
rewrite_inputs = {'inputs': input}
|
|
rewrite_outputs, is_backend_output = get_rewrite_outputs(
|
|
wrapped_model=wrapped_model,
|
|
model_inputs=rewrite_inputs,
|
|
deploy_cfg=deploy_cfg,
|
|
run_with_backend=True)
|
|
|
|
if is_backend_output:
|
|
rewrite_outputs = rewrite_outputs[0]
|
|
|
|
model_outputs = model_outputs.cpu().numpy()
|
|
rewrite_outputs = rewrite_outputs.cpu().numpy()
|
|
assert np.allclose(model_outputs, rewrite_outputs, rtol=1e-03, atol=1e-05)
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.NCNN])
|
|
@pytest.mark.parametrize('rnn_flag', [True, False])
|
|
def test_crnndecoder(backend: Backend, rnn_flag: bool):
|
|
"""Test forward rewrite of crnndecoder."""
|
|
check_backend(backend)
|
|
crnn_decoder = get_crnn_decoder_model(rnn_flag)
|
|
crnn_decoder.cpu().eval()
|
|
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(input_shape=None),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextRecognition',
|
|
)))
|
|
|
|
input = torch.rand(1, 32, 1, 64)
|
|
out_enc = None
|
|
data_samples = None
|
|
|
|
# to get outputs of pytorch model
|
|
model_inputs = {
|
|
'feat': input,
|
|
'out_enc': out_enc,
|
|
'data_samples': data_samples
|
|
}
|
|
model_outputs = get_model_outputs(crnn_decoder, 'forward_train',
|
|
model_inputs)
|
|
|
|
# to get outputs of onnx model after rewrite
|
|
wrapped_model = WrapModel(
|
|
crnn_decoder,
|
|
'forward_train',
|
|
out_enc=out_enc,
|
|
data_samples=data_samples)
|
|
rewrite_inputs = {'feat': input}
|
|
rewrite_outputs, is_backend_output = get_rewrite_outputs(
|
|
wrapped_model=wrapped_model,
|
|
model_inputs=rewrite_inputs,
|
|
deploy_cfg=deploy_cfg,
|
|
run_with_backend=True)
|
|
rewrite_outputs = [rewrite_outputs[-1]]
|
|
if is_backend_output:
|
|
for model_output, rewrite_output in zip(model_outputs,
|
|
rewrite_outputs):
|
|
model_output = model_output.squeeze().cpu().numpy()
|
|
rewrite_output = rewrite_output.squeeze()
|
|
print(model_outputs, rewrite_output)
|
|
assert np.allclose(
|
|
model_output, rewrite_output, rtol=1e-03, atol=1e-04)
|
|
else:
|
|
assert rewrite_outputs is not None
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
|
|
@pytest.mark.parametrize(
|
|
'data_samples', [[[{}]], [[{
|
|
'resize_shape': [32, 32],
|
|
'valid_ratio': 1.0
|
|
}]]])
|
|
@pytest.mark.parametrize('is_dynamic', [True, False])
|
|
def test_forward_of_encoder_decoder_recognizer(data_samples, is_dynamic,
|
|
backend):
|
|
"""Test forward base_recognizer."""
|
|
check_backend(backend)
|
|
base_recognizer = get_base_recognizer_model()
|
|
base_recognizer.eval()
|
|
|
|
if not is_dynamic:
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(input_shape=None),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextRecognition',
|
|
)))
|
|
else:
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(
|
|
input_shape=None,
|
|
dynamic_axes={
|
|
'input': {
|
|
0: 'batch',
|
|
2: 'height',
|
|
3: 'width'
|
|
},
|
|
'output': {
|
|
0: 'batch',
|
|
2: 'height',
|
|
3: 'width'
|
|
}
|
|
}),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextRecognition',
|
|
)))
|
|
|
|
input = torch.rand(1, 1, 32, 32)
|
|
|
|
model_outputs = base_recognizer.forward(input)
|
|
wrapped_model = WrapModel(
|
|
base_recognizer, 'forward', data_samples=data_samples[0])
|
|
rewrite_inputs = {
|
|
'batch_inputs': input,
|
|
}
|
|
rewrite_outputs, is_backend_output = get_rewrite_outputs(
|
|
wrapped_model=wrapped_model,
|
|
model_inputs=rewrite_inputs,
|
|
deploy_cfg=deploy_cfg)
|
|
|
|
if is_backend_output:
|
|
rewrite_outputs = rewrite_outputs[0]
|
|
|
|
model_outputs = model_outputs.cpu().numpy()
|
|
rewrite_outputs = rewrite_outputs.cpu().numpy()
|
|
assert np.allclose(model_outputs, rewrite_outputs, rtol=1e-03, atol=1e-05)
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.TENSORRT])
|
|
def test_forward_of_fpnc(backend: Backend):
|
|
"""Test forward rewrite of fpnc."""
|
|
check_backend(backend)
|
|
fpnc = get_fpnc_neck_model().cuda()
|
|
fpnc.eval()
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(
|
|
type=backend.value,
|
|
common_config=dict(max_workspace_size=1 << 30),
|
|
model_inputs=[
|
|
dict(
|
|
input_shapes=dict(
|
|
inputs=dict(
|
|
min_shape=[1, 1, 64, 64],
|
|
opt_shape=[1, 1, 64, 64],
|
|
max_shape=[1, 1, 64, 64])))
|
|
]),
|
|
onnx_config=dict(
|
|
input_shape=None,
|
|
input_names=['inputs'],
|
|
output_names=['output']),
|
|
codebase_config=dict(type='mmocr', task='TextDetection')))
|
|
|
|
input = torch.rand(1, 1, 64, 64).cuda()
|
|
model_inputs = {
|
|
'inputs': input,
|
|
}
|
|
model_outputs = get_model_outputs(fpnc, 'forward', model_inputs)
|
|
wrapped_model = WrapModel(fpnc, 'forward')
|
|
rewrite_inputs = {
|
|
'inputs': input,
|
|
}
|
|
rewrite_outputs, is_backend_output = get_rewrite_outputs(
|
|
wrapped_model=wrapped_model,
|
|
model_inputs=rewrite_inputs,
|
|
deploy_cfg=deploy_cfg)
|
|
|
|
if is_backend_output:
|
|
rewrite_outputs = rewrite_outputs[0]
|
|
|
|
model_outputs = model_outputs.cpu().numpy()
|
|
rewrite_outputs = rewrite_outputs.cpu().numpy()
|
|
assert np.allclose(model_outputs, rewrite_outputs, rtol=1e-03, atol=1e-05)
|
|
|
|
|
|
def get_sar_model_cfg(decoder_type: str):
|
|
model = dict(
|
|
type='SARNet',
|
|
data_preprocessor=dict(
|
|
type='mmocr.TextRecogDataPreprocessor',
|
|
mean=[127, 127, 127],
|
|
std=[127, 127, 127]),
|
|
backbone=dict(type='ResNet31OCR'),
|
|
encoder=dict(
|
|
type='mmocr.SAREncoder',
|
|
enc_bi_rnn=False,
|
|
enc_do_rnn=0.1,
|
|
enc_gru=False),
|
|
decoder=dict(
|
|
type=f'mmocr.{decoder_type}',
|
|
enc_bi_rnn=False,
|
|
dec_bi_rnn=False,
|
|
dec_do_rnn=0,
|
|
dec_gru=False,
|
|
pred_dropout=0.1,
|
|
d_k=512,
|
|
pred_concat=True,
|
|
postprocessor=dict(type='AttentionPostprocessor'),
|
|
module_loss=dict(
|
|
type='CEModuleLoss', ignore_first_char=True, reduction='mean'),
|
|
dictionary=dict(
|
|
type='Dictionary',
|
|
dict_file='tests/test_codebase/test_mmocr/'
|
|
'data/lower_english_digits.txt',
|
|
with_start=True,
|
|
with_end=True,
|
|
same_start_end=True,
|
|
with_padding=True,
|
|
with_unknown=True),
|
|
max_seq_len=30))
|
|
return mmengine.Config(dict(model=model))
|
|
|
|
|
|
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
|
|
@pytest.mark.parametrize('decoder_type',
|
|
['SequentialSARDecoder', 'ParallelSARDecoder'])
|
|
def test_sar_model(backend: Backend, decoder_type):
|
|
check_backend(backend)
|
|
import os.path as osp
|
|
|
|
import onnx
|
|
from mmocr.models.textrecog import SARNet
|
|
sar_cfg = get_sar_model_cfg(decoder_type)
|
|
sar_cfg.model.pop('type')
|
|
pytorch_model = SARNet(**(sar_cfg.model))
|
|
|
|
# img_meta = {
|
|
# 'ori_shape': [48, 160],
|
|
# 'img_shape': [48, 160, 3],
|
|
# 'scale_factor': [1., 1.]
|
|
# }
|
|
# from mmengine.structures import InstanceData
|
|
# from mmocr.structures import TextRecogDataSample
|
|
# pred_instances = InstanceData(metainfo=img_meta)
|
|
# data_sample = TextRecogDataSample(pred_instances=pred_instances)
|
|
# data_sample.set_metainfo(img_meta)
|
|
model_inputs = {'inputs': torch.rand(1, 3, 48, 160), 'data_samples': None}
|
|
|
|
deploy_cfg = mmengine.Config(
|
|
dict(
|
|
backend_config=dict(type=backend.value),
|
|
onnx_config=dict(input_shape=None),
|
|
codebase_config=dict(
|
|
type='mmocr',
|
|
task='TextRecognition',
|
|
)))
|
|
# patch model
|
|
pytorch_model.cfg = sar_cfg
|
|
patched_model = patch_model(
|
|
pytorch_model, cfg=deploy_cfg, backend=backend.value)
|
|
onnx_file_path = tempfile.NamedTemporaryFile(suffix='.onnx').name
|
|
input_names = [k for k, v in model_inputs.items() if k != 'ctx']
|
|
# model_forward = patched_model.forward
|
|
# from functools import partial
|
|
# patched_model.forward = partial(patched_model.forward,
|
|
# **{'data_samples': [data_sample]})
|
|
with RewriterContext(
|
|
cfg=deploy_cfg, backend=backend.value), torch.no_grad():
|
|
torch.onnx.export(
|
|
patched_model,
|
|
tuple([v for k, v in model_inputs.items()]),
|
|
onnx_file_path,
|
|
export_params=True,
|
|
input_names=input_names,
|
|
output_names=None,
|
|
opset_version=11,
|
|
dynamic_axes=None,
|
|
keep_initializers_as_inputs=False)
|
|
|
|
# The result should be different due to the rewrite.
|
|
# So we only check if the file exists
|
|
assert osp.exists(onnx_file_path)
|
|
|
|
model = onnx.load(onnx_file_path)
|
|
assert model is not None
|
|
try:
|
|
onnx.checker.check_model(model)
|
|
except onnx.checker.ValidationError:
|
|
assert False
|