mmengine/tests/test_runner/test_log_processor.py
Mashiro 8063d2cce7
[Enhancement] Support writing data to vis_backend with prefix (#972)
* Log with prefix

* Fix test of loggerhook

* minor refine

* minor refine

* Fix unit test

* clean the code

* deepcopy in method

* replace regex

* Fix as comment

* Enhance readable

* rename reserve_prefix to remove_prefix

* Fix as comment

* Refine unit test

* Adjust sequence

* clean the code

* clean the code

* revert renaming reserve prefix

* Count the dataloader length in _get_dataloader_size
2023-03-13 13:07:37 +08:00

292 lines
12 KiB
Python

# Copyright (c) OpenMMLab. All rights reserved.
import copy
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import torch
from mmengine.logging import HistoryBuffer, MessageHub, MMLogger
from mmengine.runner import LogProcessor
class TestLogProcessor:
def test_init(self):
log_processor = LogProcessor(
window_size=10, by_epoch=True, custom_cfg=None)
assert log_processor.by_epoch
assert log_processor.window_size == 10
assert log_processor.custom_cfg == []
def test_check_custom_cfg(self):
# ``by_epoch==False`` and `window_size='epoch'` in log config will
# raise AssertionError.
custom_cfg = [dict(data_src='loss', window_size='epoch')]
with pytest.raises(AssertionError):
LogProcessor(by_epoch=False, custom_cfg=custom_cfg)
# Duplicate log_name will raise AssertionError.
custom_cfg = [
dict(data_src='loss', log_name='loss_1'),
dict(data_src='loss', log_name='loss_1')
]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
# Overwrite loss item twice will raise AssertionError.
custom_cfg = [dict(data_src='loss'), dict(data_src='loss')]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
custom_cfg = [
dict(data_src='loss_cls', window_size=100, method_name='min'),
dict(data_src='loss', log_name='loss_min', method_name='max'),
dict(data_src='loss', log_name='loss_max', method_name='max')
]
LogProcessor(custom_cfg=custom_cfg)
def test_parse_windows_size(self):
log_processor = LogProcessor()
# Test parse 'epoch' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='epoch')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 2
# Test parse 'global' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='global')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 11
# Test parse int window_size
custom_cfg = [dict(data_src='loss_cls', window_size=100)]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 100
# Invalid type window_size will raise TypeError.
custom_cfg = [dict(data_src='loss_cls', window_size=[])]
with pytest.raises(TypeError):
log_processor._parse_windows_size(self.runner, 1, custom_cfg)
@pytest.mark.parametrize(
'by_epoch,mode,log_with_hierarchy',
([True, 'train', True], [True, 'train', False], [False, 'train', True],
[False, 'train', False], [True, 'val', True], [True, 'val', False],
[False, 'val', True], [False, 'val', False], [True, 'test', True],
[True, 'test', False], [False, 'test', True], [False, 'test', False]))
def test_get_log_after_iter(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
log_processor._get_max_memory = MagicMock(return_value='100')
eta = 40
self.runner.message_hub.update_info('eta', eta)
# Prepare training information.
if mode == 'train':
train_logs = dict(lr=0.1, time=1.0, data_time=1.0, loss_cls=1.0)
else:
train_logs = dict(time=1.0, data_time=1.0, loss_cls=1.0)
log_processor._collect_scalars = \
lambda *args, **kwargs: copy.deepcopy(train_logs)
_, out = log_processor.get_log_after_iter(self.runner, 1, mode)
# Verify that the correct context have been logged.
cur_loop = log_processor._get_cur_loop(self.runner, mode)
if by_epoch:
if mode in ['train', 'val']:
cur_epoch = log_processor._get_epoch(self.runner, mode)
log_str = (f'Epoch({mode}) [{cur_epoch}][ 2/'
f'{len(cur_loop.dataloader)}] ')
else:
log_str = (f'Epoch({mode}) [2/{len(cur_loop.dataloader)}] ')
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
else:
if mode == 'train':
max_iters = self.runner.max_iters
log_str = f'Iter({mode}) [11/{max_iters}] '
elif mode == 'val':
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [ 2/{max_iters}] '
else:
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [2/{max_iters}] '
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
@pytest.mark.parametrize(
'by_epoch,mode,log_with_hierarchy',
([True, 'val', True], [True, 'val', False], [False, 'val', True],
[False, 'val', False], [True, 'test', True], [False, 'test', False]))
def test_log_val(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
# Prepare validation information.
scalar_logs = dict(accuracy=0.9, data_time=1.0)
non_scalar_logs = dict(
recall={
'cat': 1,
'dog': 0
}, cm=torch.tensor([1, 2, 3]))
log_processor._collect_scalars = MagicMock(return_value=scalar_logs)
log_processor._collect_non_scalars = MagicMock(
return_value=non_scalar_logs)
_, out = log_processor.get_log_after_epoch(self.runner, 2, mode)
expect_metric_str = ("accuracy: 0.9000 recall: {'cat': 1, 'dog': 0} "
'cm: \ntensor([1, 2, 3])\ndata_time: 1.0000 ')
if by_epoch:
if mode == 'test':
assert out == 'Epoch(test) [5/5] ' + expect_metric_str
else:
assert out == 'Epoch(val) [1][10/10] ' + expect_metric_str
else:
if mode == 'test':
assert out == 'Iter(test) [5/5] ' + expect_metric_str
else:
assert out == 'Iter(val) [10/10] ' + expect_metric_str
def test_collect_scalars(self):
history_count = np.ones(100)
time_scalars = np.random.randn(100)
loss_cls_scalars = np.random.randn(100)
lr_scalars = np.random.randn(100)
metric_scalars = np.random.randn(100)
history_time_buffer = HistoryBuffer(time_scalars, history_count)
histroy_loss_cls = HistoryBuffer(loss_cls_scalars, history_count)
history_lr_buffer = HistoryBuffer(lr_scalars, history_count)
history_metric_buffer = HistoryBuffer(metric_scalars, history_count)
custom_cfg = [
dict(data_src='time', method_name='max', log_name='time_max')
]
logger_hook = LogProcessor(custom_cfg=custom_cfg)
# Collect with prefix.
log_scalars = {
'train/time': history_time_buffer,
'lr': history_lr_buffer,
'train/loss_cls': histroy_loss_cls,
'val/metric': history_metric_buffer
}
self.runner.message_hub._log_scalars = log_scalars
tag = logger_hook._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='train')
# Test training key in tag.
assert list(tag.keys()) == ['time', 'loss_cls', 'time_max']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['time'] == time_scalars[-10:].mean()
assert tag['time_max'] == time_scalars.max()
assert tag['loss_cls'] == loss_cls_scalars[-10:].mean()
tag = logger_hook._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='val')
assert list(tag.keys()) == ['metric']
assert tag['metric'] == metric_scalars[-1]
def test_collect_non_scalars(self):
metric1 = np.random.rand(10)
metric2 = torch.tensor(10)
log_processor = LogProcessor()
# Collect with prefix.
log_infos = {'test/metric1': metric1, 'test/metric2': metric2}
self.runner.message_hub._runtime_info = log_infos
tag = log_processor._collect_non_scalars(self.runner, mode='test')
# Test training key in tag.
assert list(tag.keys()) == ['metric1', 'metric2']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['metric1'] is metric1
assert tag['metric2'] is metric2
@patch('torch.cuda.max_memory_allocated', MagicMock())
@patch('torch.cuda.reset_peak_memory_stats', MagicMock())
def test_get_max_memory(self):
logger_hook = LogProcessor()
runner = MagicMock()
runner.world_size = 1
runner.model = torch.nn.Linear(1, 1)
logger_hook._get_max_memory(runner)
torch.cuda.max_memory_allocated.assert_called()
torch.cuda.reset_peak_memory_stats.assert_called()
def test_get_iter(self):
log_processor = LogProcessor()
# Get global iter when `inner_iter=False`
iter = log_processor._get_iter(self.runner)
assert iter == 11
# Get inner iter
iter = log_processor._get_iter(self.runner, 1)
assert iter == 2
# Still get global iter when `logger_hook.by_epoch==False`
log_processor.by_epoch = False
iter = log_processor._get_iter(self.runner, 1)
assert iter == 11
def test_get_epoch(self):
log_processor = LogProcessor()
epoch = log_processor._get_epoch(self.runner, 'train')
assert epoch == 2
epoch = log_processor._get_epoch(self.runner, 'val')
assert epoch == 1
with pytest.raises(ValueError):
log_processor._get_epoch(self.runner, 'test')
def test_get_cur_loop(self):
log_processor = LogProcessor()
loop = log_processor._get_cur_loop(self.runner, 'train')
assert len(loop.dataloader) == 20
loop = log_processor._get_cur_loop(self.runner, 'val')
assert len(loop.dataloader) == 10
loop = log_processor._get_cur_loop(self.runner, 'test')
assert len(loop.dataloader) == 5
def setup_method(self):
runner = MagicMock()
runner.epoch = 1
runner.max_epochs = 10
runner.iter = 10
runner.max_iters = 50
runner.train_dataloader = [0] * 20
runner.val_dataloader = [0] * 10
runner.test_dataloader = [0] * 5
runner.train_loop.dataloader = [0] * 20
runner.val_loop.dataloader = [0] * 10
runner.test_loop.dataloader = [0] * 5
logger = MMLogger.get_instance('log_processor_test')
runner.logger = logger
message_hub = MessageHub.get_instance('log_processor_test')
for i in range(10):
message_hub.update_scalar('train/loss', 10 - i)
for i in range(10):
message_hub.update_scalar('val/acc', i * 0.1)
runner.message_hub = message_hub
self.runner = runner