mmengine/tests/test_runner/test_log_processor.py

345 lines
14 KiB
Python

# Copyright (c) OpenMMLab. All rights reserved.
import copy
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import torch
from parameterized import parameterized
from mmengine.logging import HistoryBuffer, MessageHub, MMLogger
from mmengine.runner import LogProcessor
from mmengine.testing import RunnerTestCase
class TestLogProcessor(RunnerTestCase):
def test_init(self):
log_processor = LogProcessor(
window_size=10, by_epoch=True, custom_cfg=None)
assert log_processor.by_epoch
assert log_processor.window_size == 10
assert log_processor.custom_cfg == []
def test_check_custom_cfg(self):
# ``by_epoch==False`` and `window_size='epoch'` in log config will
# raise AssertionError.
custom_cfg = [dict(data_src='loss', window_size='epoch')]
with pytest.raises(AssertionError):
LogProcessor(by_epoch=False, custom_cfg=custom_cfg)
# Duplicate log_name will raise AssertionError.
custom_cfg = [
dict(data_src='loss', log_name='loss_1'),
dict(data_src='loss', log_name='loss_1')
]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
# Overwrite loss item twice will raise AssertionError.
custom_cfg = [dict(data_src='loss'), dict(data_src='loss')]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
custom_cfg = [
dict(data_src='loss_cls', window_size=100, method_name='min'),
dict(data_src='loss', log_name='loss_min', method_name='max'),
dict(data_src='loss', log_name='loss_max', method_name='max')
]
LogProcessor(custom_cfg=custom_cfg)
def test_parse_windows_size(self):
log_processor = LogProcessor()
# Test parse 'epoch' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='epoch')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 2
# Test parse 'global' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='global')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 11
# Test parse int window_size
custom_cfg = [dict(data_src='loss_cls', window_size=100)]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 100
# Invalid type window_size will raise TypeError.
custom_cfg = [dict(data_src='loss_cls', window_size=[])]
with pytest.raises(TypeError):
log_processor._parse_windows_size(self.runner, 1, custom_cfg)
# yapf: disable
@parameterized.expand(
([True, 'train', True], [True, 'train', False], [False, 'train', True],
[False, 'train', False], [True, 'val', True], [True, 'val', False],
[False, 'val', True], [False, 'val', False], [True, 'test', True],
[True, 'test', False], [False, 'test', True], [False, 'test', False]))
# yapf: enable
def test_get_log_after_iter(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
log_processor._get_max_memory = MagicMock(return_value='100')
eta = 40
self.runner.message_hub.update_info('eta', eta)
# Prepare training information.
if mode == 'train':
train_logs = dict(lr=0.1, time=1.0, data_time=1.0, loss_cls=1.0)
else:
train_logs = dict(time=1.0, data_time=1.0, loss_cls=1.0)
log_processor._collect_scalars = \
lambda *args, **kwargs: copy.deepcopy(train_logs)
tag, out = log_processor.get_log_after_iter(self.runner, 1, mode)
# Verify that the correct context have been logged.
cur_loop = log_processor._get_cur_loop(self.runner, mode)
if by_epoch:
if mode in ['train', 'val']:
cur_epoch = log_processor._get_epoch(self.runner, mode)
log_str = (f'Epoch({mode}) [{cur_epoch}][ 2/'
f'{len(cur_loop.dataloader)}] ')
else:
log_str = (f'Epoch({mode}) [2/{len(cur_loop.dataloader)}] ')
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
if mode in ['train', 'val']:
assert 'epoch' in tag
else:
if mode == 'train':
max_iters = self.runner.max_iters
log_str = f'Iter({mode}) [11/{max_iters}] '
elif mode == 'val':
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [ 2/{max_iters}] '
else:
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [2/{max_iters}] '
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
# tag always has "iter" key
assert 'iter' in tag
@parameterized.expand(
([True, 'val', True], [True, 'val', False], [False, 'val', True],
[False, 'val', False], [True, 'test', True], [False, 'test', False]))
def test_log_val(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
# Prepare validation information.
scalar_logs = dict(accuracy=0.9, data_time=1.0)
non_scalar_logs = dict(
recall={
'cat': 1,
'dog': 0
}, cm=torch.tensor([1, 2, 3]))
log_processor._collect_scalars = MagicMock(return_value=scalar_logs)
log_processor._collect_non_scalars = MagicMock(
return_value=non_scalar_logs)
_, out = log_processor.get_log_after_epoch(self.runner, 2, mode)
expect_metric_str = ("accuracy: 0.9000 recall: {'cat': 1, 'dog': 0} "
'cm: \ntensor([1, 2, 3])\n data_time: 1.0000')
if by_epoch:
if mode == 'test':
assert out == 'Epoch(test) [5/5] ' + expect_metric_str
else:
assert out == 'Epoch(val) [1][10/10] ' + expect_metric_str
else:
if mode == 'test':
assert out == 'Iter(test) [5/5] ' + expect_metric_str
else:
assert out == 'Iter(val) [10/10] ' + expect_metric_str
def test_collect_scalars(self):
history_count = np.ones(100)
time_scalars = np.random.randn(100)
loss_cls_scalars = np.random.randn(100)
lr_scalars = np.random.randn(100)
metric_scalars = np.random.randn(100)
history_time_buffer = HistoryBuffer(time_scalars, history_count)
histroy_loss_cls = HistoryBuffer(loss_cls_scalars, history_count)
history_lr_buffer = HistoryBuffer(lr_scalars, history_count)
history_metric_buffer = HistoryBuffer(metric_scalars, history_count)
custom_cfg = [
dict(data_src='time', method_name='max', log_name='time_max')
]
log_processor = LogProcessor(custom_cfg=custom_cfg)
# Collect with prefix.
log_scalars = {
'train/time': history_time_buffer,
'lr': history_lr_buffer,
'train/loss_cls': histroy_loss_cls,
'val/metric': history_metric_buffer
}
self.runner.message_hub._log_scalars = log_scalars
tag = log_processor._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='train')
# Training key in tag.
assert list(tag.keys()) == ['time', 'loss_cls', 'time_max']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['time'] == time_scalars[-10:].mean()
assert tag['time_max'] == time_scalars.max()
assert tag['loss_cls'] == loss_cls_scalars[-10:].mean()
# Validation key in tag
tag = log_processor._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='val')
assert list(tag.keys()) == ['metric']
assert tag['metric'] == metric_scalars[-1]
# reserve_prefix=True
tag = log_processor._collect_scalars(
copy.deepcopy(custom_cfg),
self.runner,
mode='train',
reserve_prefix=True)
assert list(
tag.keys()) == ['train/time', 'train/loss_cls', 'train/time_max']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['train/time'] == time_scalars[-10:].mean()
assert tag['train/time_max'] == time_scalars.max()
assert tag['train/loss_cls'] == loss_cls_scalars[-10:].mean()
def test_collect_non_scalars(self):
metric1 = np.random.rand(10)
metric2 = torch.tensor(10)
log_processor = LogProcessor()
# Collect with prefix.
log_infos = {'test/metric1': metric1, 'test/metric2': metric2}
self.runner.message_hub._runtime_info = log_infos
tag = log_processor._collect_non_scalars(self.runner, mode='test')
# Test training key in tag.
assert list(tag.keys()) == ['metric1', 'metric2']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['metric1'] is metric1
assert tag['metric2'] is metric2
@patch('torch.cuda.max_memory_allocated', MagicMock())
@patch('torch.cuda.reset_peak_memory_stats', MagicMock())
def test_get_max_memory(self):
logger_hook = LogProcessor()
runner = MagicMock()
runner.world_size = 1
runner.model = torch.nn.Linear(1, 1)
logger_hook._get_max_memory(runner)
torch.cuda.max_memory_allocated.assert_called()
torch.cuda.reset_peak_memory_stats.assert_called()
def test_get_iter(self):
log_processor = LogProcessor()
# Get batch_idx
iter = log_processor._get_iter(self.runner, 1)
assert iter == 2
# Still get global iter when `logger_hook.by_epoch==False`
log_processor.by_epoch = False
iter = log_processor._get_iter(self.runner, 1)
assert iter == 11
def test_get_epoch(self):
log_processor = LogProcessor()
epoch = log_processor._get_epoch(self.runner, 'train')
assert epoch == 2
epoch = log_processor._get_epoch(self.runner, 'val')
assert epoch == 1
with pytest.raises(ValueError):
log_processor._get_epoch(self.runner, 'test')
def test_get_cur_loop(self):
log_processor = LogProcessor()
loop = log_processor._get_cur_loop(self.runner, 'train')
assert len(loop.dataloader) == 20
loop = log_processor._get_cur_loop(self.runner, 'val')
assert len(loop.dataloader) == 10
loop = log_processor._get_cur_loop(self.runner, 'test')
assert len(loop.dataloader) == 5
def setUp(self):
runner = MagicMock()
runner.epoch = 1
runner.max_epochs = 10
runner.iter = 10
runner.max_iters = 50
runner.train_dataloader = [0] * 20
runner.val_dataloader = [0] * 10
runner.test_dataloader = [0] * 5
runner.train_loop.dataloader = [0] * 20
runner.val_loop.dataloader = [0] * 10
runner.test_loop.dataloader = [0] * 5
logger = MMLogger.get_instance('log_processor_test')
runner.logger = logger
message_hub = MessageHub.get_instance('log_processor_test')
for i in range(10):
message_hub.update_scalar('train/loss', 10 - i)
for i in range(10):
message_hub.update_scalar('val/acc', i * 0.1)
runner.message_hub = message_hub
self.runner = runner
super().setUp()
def test_with_runner(self):
cfg = self.epoch_based_cfg.copy()
cfg.log_processor = dict(
custom_cfg=[
dict(
data_src='time',
window_size='epoch',
log_name='iter_time',
method_name='mean')
],
log_with_hierarchy=True)
runner = self.build_runner(cfg)
runner.train()
runner.val()
runner.test()
cfg = self.iter_based_cfg.copy()
cfg.log_processor = dict(
by_epoch=False,
custom_cfg=[
dict(
data_src='time',
window_size=100,
log_name='iter_time',
method_name='mean')
],
log_with_hierarchy=True)
runner = self.build_runner(cfg)
runner.train()
runner.val()
runner.test()