mirror of https://github.com/open-mmlab/mmcv.git
158 lines
5.3 KiB
Python
158 lines
5.3 KiB
Python
# Copyright (c) OpenMMLab. All rights reserved.
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.nn.parallel import DataParallel, DistributedDataParallel
|
|
|
|
from mmcv.parallel import (MODULE_WRAPPERS, MMDataParallel,
|
|
MMDistributedDataParallel, is_module_wrapper)
|
|
from mmcv.parallel._functions import Scatter, get_input_device, scatter
|
|
from mmcv.parallel.distributed_deprecated import \
|
|
MMDistributedDataParallel as DeprecatedMMDDP
|
|
|
|
|
|
def mock(*args, **kwargs):
|
|
pass
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
torch.__version__ == 'parrots', reason='not supported in parrots now')
|
|
@patch('torch.distributed._broadcast_coalesced', mock)
|
|
@patch('torch.distributed.broadcast', mock)
|
|
@patch('torch.nn.parallel.DistributedDataParallel._ddp_init_helper', mock)
|
|
def test_is_module_wrapper():
|
|
|
|
class Model(nn.Module):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.conv = nn.Conv2d(2, 2, 1)
|
|
|
|
def forward(self, x):
|
|
return self.conv(x)
|
|
|
|
# _verify_model_across_ranks is added in torch1.9.0,
|
|
# _verify_params_across_processes is added in torch1.11.0,
|
|
# so we should check whether _verify_model_across_ranks
|
|
# and _verify_params_across_processes are the member of
|
|
# torch.distributed before mocking
|
|
if hasattr(torch.distributed, '_verify_model_across_ranks'):
|
|
torch.distributed._verify_model_across_ranks = mock
|
|
if hasattr(torch.distributed, '_verify_params_across_processes'):
|
|
torch.distributed._verify_params_across_processes = mock
|
|
|
|
model = Model()
|
|
assert not is_module_wrapper(model)
|
|
|
|
dp = DataParallel(model)
|
|
assert is_module_wrapper(dp)
|
|
|
|
mmdp = MMDataParallel(model)
|
|
assert is_module_wrapper(mmdp)
|
|
|
|
ddp = DistributedDataParallel(model, process_group=MagicMock())
|
|
assert is_module_wrapper(ddp)
|
|
|
|
mmddp = MMDistributedDataParallel(model, process_group=MagicMock())
|
|
assert is_module_wrapper(mmddp)
|
|
|
|
deprecated_mmddp = DeprecatedMMDDP(model)
|
|
assert is_module_wrapper(deprecated_mmddp)
|
|
|
|
# test module wrapper registry
|
|
@MODULE_WRAPPERS.register_module()
|
|
class ModuleWrapper:
|
|
|
|
def __init__(self, module):
|
|
self.module = module
|
|
|
|
def forward(self, *args, **kwargs):
|
|
return self.module(*args, **kwargs)
|
|
|
|
module_wraper = ModuleWrapper(model)
|
|
assert is_module_wrapper(module_wraper)
|
|
|
|
|
|
def test_get_input_device():
|
|
# if the device is CPU, return -1
|
|
input = torch.zeros([1, 3, 3, 3])
|
|
assert get_input_device(input) == -1
|
|
inputs = [torch.zeros([1, 3, 3, 3]), torch.zeros([1, 4, 4, 4])]
|
|
assert get_input_device(inputs) == -1
|
|
|
|
# if the device is GPU, return the index of device
|
|
if torch.cuda.is_available():
|
|
input = torch.zeros([1, 3, 3, 3]).cuda()
|
|
assert get_input_device(input) == 0
|
|
inputs = [
|
|
torch.zeros([1, 3, 3, 3]).cuda(),
|
|
torch.zeros([1, 4, 4, 4]).cuda()
|
|
]
|
|
assert get_input_device(inputs) == 0
|
|
|
|
# input should be a tensor or list of tensor
|
|
with pytest.raises(Exception):
|
|
get_input_device(5)
|
|
|
|
|
|
def test_scatter():
|
|
# if the device is CPU, just return the input
|
|
input = torch.zeros([1, 3, 3, 3])
|
|
output = scatter(input=input, devices=[-1])
|
|
assert torch.allclose(input, output)
|
|
|
|
inputs = [torch.zeros([1, 3, 3, 3]), torch.zeros([1, 4, 4, 4])]
|
|
outputs = scatter(input=inputs, devices=[-1])
|
|
for input, output in zip(inputs, outputs):
|
|
assert torch.allclose(input, output)
|
|
|
|
# if the device is GPU, copy the input from CPU to GPU
|
|
if torch.cuda.is_available():
|
|
input = torch.zeros([1, 3, 3, 3])
|
|
output = scatter(input=input, devices=[0])
|
|
assert torch.allclose(input.cuda(), output)
|
|
|
|
inputs = [torch.zeros([1, 3, 3, 3]), torch.zeros([1, 4, 4, 4])]
|
|
outputs = scatter(input=inputs, devices=[0])
|
|
for input, output in zip(inputs, outputs):
|
|
assert torch.allclose(input.cuda(), output)
|
|
|
|
# input should be a tensor or list of tensor
|
|
with pytest.raises(Exception):
|
|
scatter(5, [-1])
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
torch.__version__ == 'parrots', reason='not supported in parrots now')
|
|
def test_Scatter():
|
|
# if the device is CPU, just return the input
|
|
target_gpus = [-1]
|
|
input = torch.zeros([1, 3, 3, 3])
|
|
outputs = Scatter.forward(target_gpus, input)
|
|
assert isinstance(outputs, tuple)
|
|
assert torch.allclose(input, outputs[0])
|
|
|
|
target_gpus = [-1]
|
|
inputs = [torch.zeros([1, 3, 3, 3]), torch.zeros([1, 4, 4, 4])]
|
|
outputs = Scatter.forward(target_gpus, inputs)
|
|
assert isinstance(outputs, tuple)
|
|
for input, output in zip(inputs, outputs):
|
|
assert torch.allclose(input, output)
|
|
|
|
# if the device is GPU, copy the input from CPU to GPU
|
|
if torch.cuda.is_available():
|
|
target_gpus = [0]
|
|
input = torch.zeros([1, 3, 3, 3])
|
|
outputs = Scatter.forward(target_gpus, input)
|
|
assert isinstance(outputs, tuple)
|
|
assert torch.allclose(input.cuda(), outputs[0])
|
|
|
|
target_gpus = [0]
|
|
inputs = [torch.zeros([1, 3, 3, 3]), torch.zeros([1, 4, 4, 4])]
|
|
outputs = Scatter.forward(target_gpus, inputs)
|
|
assert isinstance(outputs, tuple)
|
|
for input, output in zip(inputs, outputs):
|
|
assert torch.allclose(input.cuda(), output[0])
|