# Copyright (c) OpenMMLab. All rights reserved. import os import os.path as osp import tempfile import unittest from unittest import TestCase from unittest.mock import patch import torch import torch.distributed as torch_dist import mmengine.dist as dist from mmengine.dist.dist import sync_random_seed from mmengine.testing._internal import MultiProcessTestCase from mmengine.utils import TORCH_VERSION, digit_version class TestDist(TestCase): """Test dist module in non-distributed environment.""" def test_all_reduce(self): data = torch.arange(2, dtype=torch.int64) expected = torch.arange(2, dtype=torch.int64) dist.all_reduce(data) self.assertTrue(torch.allclose(data, expected)) def test_all_gather(self): data = torch.arange(2, dtype=torch.int64) expected = torch.arange(2, dtype=torch.int64) output = dist.all_gather(data) self.assertTrue(torch.allclose(output[0], expected)) def test_gather(self): data = torch.arange(2, dtype=torch.int64) expected = torch.arange(2, dtype=torch.int64) output = dist.gather(data) self.assertTrue(torch.allclose(output[0], expected)) def test_broadcast(self): data = torch.arange(2, dtype=torch.int64) expected = torch.arange(2, dtype=torch.int64) dist.broadcast(data) self.assertTrue(torch.allclose(data, expected)) @patch('numpy.random.randint', return_value=10) def test_sync_random_seed(self, mock): self.assertEqual(sync_random_seed(), 10) def test_broadcast_object_list(self): with self.assertRaises(AssertionError): # input should be list of object dist.broadcast_object_list('foo') data = ['foo', 12, {1: 2}] expected = ['foo', 12, {1: 2}] dist.broadcast_object_list(data) self.assertEqual(data, expected) def test_all_reduce_dict(self): with self.assertRaises(AssertionError): # input should be dict dist.all_reduce_dict('foo') data = { 'key1': torch.arange(2, dtype=torch.int64), 'key2': torch.arange(3, dtype=torch.int64) } expected = { 'key1': torch.arange(2, dtype=torch.int64), 'key2': torch.arange(3, dtype=torch.int64) } dist.all_reduce_dict(data) for key in data: self.assertTrue(torch.allclose(data[key], expected[key])) def test_all_gather_object(self): data = 'foo' expected = 'foo' gather_objects = dist.all_gather_object(data) self.assertEqual(gather_objects[0], expected) def test_gather_object(self): data = 'foo' expected = 'foo' gather_objects = dist.gather_object(data) self.assertEqual(gather_objects[0], expected) def test_collect_results(self): data = ['foo', {1: 2}] size = 2 expected = ['foo', {1: 2}] # test `device=cpu` output = dist.collect_results(data, size, device='cpu') self.assertEqual(output, expected) # test `device=gpu` output = dist.collect_results(data, size, device='gpu') self.assertEqual(output, expected) class TestDistWithGLOOBackend(MultiProcessTestCase): def _init_dist_env(self, rank, world_size): """Initialize the distributed environment.""" os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29505' os.environ['RANK'] = str(rank) torch_dist.init_process_group( backend='gloo', rank=rank, world_size=world_size) def setUp(self): super().setUp() self._spawn_processes() def test_all_reduce(self): self._init_dist_env(self.rank, self.world_size) for tensor_type, reduce_op in zip([torch.int64, torch.float32], ['sum', 'mean']): if dist.get_rank() == 0: data = torch.tensor([1, 2], dtype=tensor_type) else: data = torch.tensor([3, 4], dtype=tensor_type) if reduce_op == 'sum': expected = torch.tensor([4, 6], dtype=tensor_type) else: expected = torch.tensor([2, 3], dtype=tensor_type) dist.all_reduce(data, reduce_op) self.assertTrue(torch.allclose(data, expected)) def test_all_gather(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = torch.tensor([0, 1]) else: data = torch.tensor([1, 2]) expected = [torch.tensor([0, 1]), torch.tensor([1, 2])] output = dist.all_gather(data) self.assertTrue( torch.allclose(output[dist.get_rank()], expected[dist.get_rank()])) def test_gather(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = torch.tensor([0, 1]) else: data = torch.tensor([1, 2]) output = dist.gather(data) if dist.get_rank() == 0: expected = [torch.tensor([0, 1]), torch.tensor([1, 2])] for i in range(2): assert torch.allclose(output[i], expected[i]) else: assert output == [] def test_broadcast_dist(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = torch.tensor([0, 1]) else: data = torch.tensor([1, 2]) expected = torch.tensor([0, 1]) dist.broadcast(data, 0) assert torch.allclose(data, expected) def test_sync_random_seed(self): self._init_dist_env(self.rank, self.world_size) with patch.object( torch, 'tensor', return_value=torch.tensor(1024)) as mock_tensor: output = dist.sync_random_seed() assert output == 1024 mock_tensor.assert_called() def test_broadcast_object_list(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = ['foo', 12, {1: 2}] else: data = [None, None, None] expected = ['foo', 12, {1: 2}] dist.broadcast_object_list(data) self.assertEqual(data, expected) def test_all_reduce_dict(self): self._init_dist_env(self.rank, self.world_size) for tensor_type, reduce_op in zip([torch.int64, torch.float32], ['sum', 'mean']): if dist.get_rank() == 0: data = { 'key1': torch.tensor([0, 1], dtype=tensor_type), 'key2': torch.tensor([1, 2], dtype=tensor_type), } else: data = { 'key1': torch.tensor([2, 3], dtype=tensor_type), 'key2': torch.tensor([3, 4], dtype=tensor_type), } if reduce_op == 'sum': expected = { 'key1': torch.tensor([2, 4], dtype=tensor_type), 'key2': torch.tensor([4, 6], dtype=tensor_type), } else: expected = { 'key1': torch.tensor([1, 2], dtype=tensor_type), 'key2': torch.tensor([2, 3], dtype=tensor_type), } dist.all_reduce_dict(data, reduce_op) for key in data: assert torch.allclose(data[key], expected[key]) # `torch.cat` in torch1.5 can not concatenate different types so we # fallback to convert them all to float type. if digit_version(TORCH_VERSION) == digit_version('1.5.0'): if dist.get_rank() == 0: data = { 'key1': torch.tensor([0, 1], dtype=torch.float32), 'key2': torch.tensor([1, 2], dtype=torch.int32) } else: data = { 'key1': torch.tensor([2, 3], dtype=torch.float32), 'key2': torch.tensor([3, 4], dtype=torch.int32), } expected = { 'key1': torch.tensor([2, 4], dtype=torch.float32), 'key2': torch.tensor([4, 6], dtype=torch.float32), } dist.all_reduce_dict(data, 'sum') for key in data: assert torch.allclose(data[key], expected[key]) def test_all_gather_object(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = 'foo' else: data = {1: 2} expected = ['foo', {1: 2}] output = dist.all_gather_object(data) self.assertEqual(output, expected) def test_gather_object(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = 'foo' else: data = {1: 2} output = dist.gather_object(data, dst=0) if dist.get_rank() == 0: self.assertEqual(output, ['foo', {1: 2}]) else: self.assertIsNone(output) @unittest.skipIf( torch.cuda.device_count() < 2, reason='need 2 gpu to test nccl') class TestDistWithNCCLBackend(MultiProcessTestCase): def _init_dist_env(self, rank, world_size): """Initialize the distributed environment.""" os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29505' os.environ['RANK'] = str(rank) num_gpus = torch.cuda.device_count() torch.cuda.set_device(rank % num_gpus) torch_dist.init_process_group( backend='nccl', rank=rank, world_size=world_size) def setUp(self): super().setUp() self._spawn_processes() def test_all_reduce(self): self._init_dist_env(self.rank, self.world_size) for tensor_type, reduce_op in zip([torch.int64, torch.float32], ['sum', 'mean']): if dist.get_rank() == 0: data = torch.tensor([1, 2], dtype=tensor_type).cuda() else: data = torch.tensor([3, 4], dtype=tensor_type).cuda() if reduce_op == 'sum': expected = torch.tensor([4, 6], dtype=tensor_type).cuda() else: expected = torch.tensor([2, 3], dtype=tensor_type).cuda() dist.all_reduce(data, reduce_op) self.assertTrue(torch.allclose(data, expected)) def test_all_gather(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = torch.tensor([0, 1]).cuda() else: data = torch.tensor([1, 2]).cuda() expected = [torch.tensor([0, 1]).cuda(), torch.tensor([1, 2]).cuda()] output = dist.all_gather(data) self.assertTrue( torch.allclose(output[dist.get_rank()], expected[dist.get_rank()])) def test_broadcast_dist(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = torch.tensor([0, 1]).cuda() else: data = torch.tensor([1, 2]).cuda() expected = torch.tensor([0, 1]).cuda() dist.broadcast(data, 0) assert torch.allclose(data, expected) def test_sync_random_seed(self): self._init_dist_env(self.rank, self.world_size) with patch.object( torch, 'tensor', return_value=torch.tensor(1024)) as mock_tensor: output = dist.sync_random_seed() assert output == 1024 mock_tensor.assert_called() def test_broadcast_object_list(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = ['foo', 12, {1: 2}] else: data = [None, None, None] expected = ['foo', 12, {1: 2}] dist.broadcast_object_list(data) self.assertEqual(data, expected) def test_all_reduce_dict(self): self._init_dist_env(self.rank, self.world_size) for tensor_type, reduce_op in zip([torch.int64, torch.float32], ['sum', 'mean']): if dist.get_rank() == 0: data = { 'key1': torch.tensor([0, 1], dtype=tensor_type).cuda(), 'key2': torch.tensor([1, 2], dtype=tensor_type).cuda(), } else: data = { 'key1': torch.tensor([2, 3], dtype=tensor_type).cuda(), 'key2': torch.tensor([3, 4], dtype=tensor_type).cuda(), } if reduce_op == 'sum': expected = { 'key1': torch.tensor([2, 4], dtype=tensor_type).cuda(), 'key2': torch.tensor([4, 6], dtype=tensor_type).cuda(), } else: expected = { 'key1': torch.tensor([1, 2], dtype=tensor_type).cuda(), 'key2': torch.tensor([2, 3], dtype=tensor_type).cuda(), } dist.all_reduce_dict(data, reduce_op) for key in data: assert torch.allclose(data[key], expected[key]) # `torch.cat` in torch1.5 can not concatenate different types so we # fallback to convert them all to float type. if digit_version(TORCH_VERSION) == digit_version('1.5.0'): if dist.get_rank() == 0: data = { 'key1': torch.tensor([0, 1], dtype=torch.float32).cuda(), 'key2': torch.tensor([1, 2], dtype=torch.int32).cuda(), } else: data = { 'key1': torch.tensor([2, 3], dtype=torch.float32).cuda(), 'key2': torch.tensor([3, 4], dtype=torch.int32).cuda(), } expected = { 'key1': torch.tensor([2, 4], dtype=torch.float32).cuda(), 'key2': torch.tensor([4, 6], dtype=torch.float32).cuda(), } dist.all_reduce_dict(data, 'sum') for key in data: assert torch.allclose(data[key], expected[key]) def test_all_gather_object(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = 'foo' else: data = {1: 2} expected = ['foo', {1: 2}] output = dist.all_gather_object(data) self.assertEqual(output, expected) def test_collect_results(self): self._init_dist_env(self.rank, self.world_size) if dist.get_rank() == 0: data = ['foo', {1: 2}] else: data = [24, {'a': 'b'}] size = 4 expected = ['foo', 24, {1: 2}, {'a': 'b'}] # test `device=cpu` output = dist.collect_results(data, size, device='cpu') if dist.get_rank() == 0: self.assertEqual(output, expected) else: self.assertIsNone(output) # test `device=cpu` and `tmpdir is not None` tmpdir = tempfile.mkdtemp() # broadcast tmpdir to all ranks to make it consistent object_list = [tmpdir] dist.broadcast_object_list(object_list) output = dist.collect_results( data, size, device='cpu', tmpdir=object_list[0]) if dist.get_rank() == 0: self.assertEqual(output, expected) else: self.assertIsNone(output) if dist.get_rank() == 0: # object_list[0] will be removed by `dist.collect_results` self.assertFalse(osp.exists(object_list[0])) # test `device=gpu` output = dist.collect_results(data, size, device='gpu') if dist.get_rank() == 0: self.assertEqual(output, expected) else: self.assertIsNone(output)