mmengine/tests/test_runner/test_log_processor.py

292 lines
12 KiB
Python

# Copyright (c) OpenMMLab. All rights reserved.
import copy
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import torch
from mmengine.logging import HistoryBuffer, MessageHub, MMLogger
from mmengine.runner import LogProcessor
class TestLogProcessor:
def test_init(self):
log_processor = LogProcessor(
window_size=10, by_epoch=True, custom_cfg=None)
assert log_processor.by_epoch
assert log_processor.window_size == 10
assert log_processor.custom_cfg == []
def test_check_custom_cfg(self):
# ``by_epoch==False`` and `window_size='epoch'` in log config will
# raise AssertionError.
custom_cfg = [dict(data_src='loss', window_size='epoch')]
with pytest.raises(AssertionError):
LogProcessor(by_epoch=False, custom_cfg=custom_cfg)
# Duplicate log_name will raise AssertionError.
custom_cfg = [
dict(data_src='loss', log_name='loss_1'),
dict(data_src='loss', log_name='loss_1')
]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
# Overwrite loss item twice will raise AssertionError.
custom_cfg = [dict(data_src='loss'), dict(data_src='loss')]
with pytest.raises(AssertionError):
LogProcessor(custom_cfg=custom_cfg)
custom_cfg = [
dict(data_src='loss_cls', window_size=100, method_name='min'),
dict(data_src='loss', log_name='loss_min', method_name='max'),
dict(data_src='loss', log_name='loss_max', method_name='max')
]
LogProcessor(custom_cfg=custom_cfg)
def test_parse_windows_size(self):
log_processor = LogProcessor()
# Test parse 'epoch' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='epoch')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 2
# Test parse 'global' window_size.
custom_cfg = [dict(data_src='loss_cls', window_size='global')]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 11
# Test parse int window_size
custom_cfg = [dict(data_src='loss_cls', window_size=100)]
custom_cfg = log_processor._parse_windows_size(self.runner, 1,
custom_cfg)
assert custom_cfg[0]['window_size'] == 100
# Invalid type window_size will raise TypeError.
custom_cfg = [dict(data_src='loss_cls', window_size=[])]
with pytest.raises(TypeError):
log_processor._parse_windows_size(self.runner, 1, custom_cfg)
@pytest.mark.parametrize(
'by_epoch,mode,log_with_hierarchy',
([True, 'train', True], [True, 'train', False], [False, 'train', True],
[False, 'train', False], [True, 'val', True], [True, 'val', False],
[False, 'val', True], [False, 'val', False], [True, 'test', True],
[True, 'test', False], [False, 'test', True], [False, 'test', False]))
def test_get_log_after_iter(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
log_processor._get_max_memory = MagicMock(return_value='100')
eta = 40
self.runner.message_hub.update_info('eta', eta)
# Prepare training information.
if mode == 'train':
train_logs = dict(lr=0.1, time=1.0, data_time=1.0, loss_cls=1.0)
else:
train_logs = dict(time=1.0, data_time=1.0, loss_cls=1.0)
log_processor._collect_scalars = \
lambda *args, **kwargs: copy.deepcopy(train_logs)
_, out = log_processor.get_log_after_iter(self.runner, 1, mode)
# Verify that the correct context have been logged.
cur_loop = log_processor._get_cur_loop(self.runner, mode)
if by_epoch:
if mode in ['train', 'val']:
cur_epoch = log_processor._get_epoch(self.runner, mode)
log_str = (f'Epoch({mode}) [{cur_epoch}][ 2/'
f'{len(cur_loop.dataloader)}] ')
else:
log_str = (f'Epoch({mode}) [2/{len(cur_loop.dataloader)}] ')
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
else:
if mode == 'train':
max_iters = self.runner.max_iters
log_str = f'Iter({mode}) [11/{max_iters}] '
elif mode == 'val':
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [ 2/{max_iters}] '
else:
max_iters = len(cur_loop.dataloader)
log_str = f'Iter({mode}) [2/{max_iters}] '
if mode == 'train':
log_str += f"lr: {train_logs['lr']:.4e} "
else:
log_str += ' '
log_str += (f'eta: 0:00:40 '
f"time: {train_logs['time']:.4f} "
f"data_time: {train_logs['data_time']:.4f} ")
if torch.cuda.is_available():
log_str += 'memory: 100 '
if mode == 'train':
log_str += f"loss_cls: {train_logs['loss_cls']:.4f}"
assert out == log_str
@pytest.mark.parametrize(
'by_epoch,mode,log_with_hierarchy',
([True, 'val', True], [True, 'val', False], [False, 'val', True],
[False, 'val', False], [True, 'test', True], [False, 'test', False]))
def test_log_val(self, by_epoch, mode, log_with_hierarchy):
# Prepare LoggerHook
log_processor = LogProcessor(
by_epoch=by_epoch, log_with_hierarchy=log_with_hierarchy)
# Prepare validation information.
scalar_logs = dict(accuracy=0.9, data_time=1.0)
non_scalar_logs = dict(
recall={
'cat': 1,
'dog': 0
}, cm=torch.tensor([1, 2, 3]))
log_processor._collect_scalars = MagicMock(return_value=scalar_logs)
log_processor._collect_non_scalars = MagicMock(
return_value=non_scalar_logs)
_, out = log_processor.get_log_after_epoch(self.runner, 2, mode)
expect_metric_str = ("accuracy: 0.9000 recall: {'cat': 1, 'dog': 0} "
'cm: \ntensor([1, 2, 3])\n data_time: 1.0000')
if by_epoch:
if mode == 'test':
assert out == 'Epoch(test) [5/5] ' + expect_metric_str
else:
assert out == 'Epoch(val) [1][10/10] ' + expect_metric_str
else:
if mode == 'test':
assert out == 'Iter(test) [5/5] ' + expect_metric_str
else:
assert out == 'Iter(val) [10/10] ' + expect_metric_str
def test_collect_scalars(self):
history_count = np.ones(100)
time_scalars = np.random.randn(100)
loss_cls_scalars = np.random.randn(100)
lr_scalars = np.random.randn(100)
metric_scalars = np.random.randn(100)
history_time_buffer = HistoryBuffer(time_scalars, history_count)
histroy_loss_cls = HistoryBuffer(loss_cls_scalars, history_count)
history_lr_buffer = HistoryBuffer(lr_scalars, history_count)
history_metric_buffer = HistoryBuffer(metric_scalars, history_count)
custom_cfg = [
dict(data_src='time', method_name='max', log_name='time_max')
]
logger_hook = LogProcessor(custom_cfg=custom_cfg)
# Collect with prefix.
log_scalars = {
'train/time': history_time_buffer,
'lr': history_lr_buffer,
'train/loss_cls': histroy_loss_cls,
'val/metric': history_metric_buffer
}
self.runner.message_hub._log_scalars = log_scalars
tag = logger_hook._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='train')
# Test training key in tag.
assert list(tag.keys()) == ['time', 'loss_cls', 'time_max']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['time'] == time_scalars[-10:].mean()
assert tag['time_max'] == time_scalars.max()
assert tag['loss_cls'] == loss_cls_scalars[-10:].mean()
tag = logger_hook._collect_scalars(
copy.deepcopy(custom_cfg), self.runner, mode='val')
assert list(tag.keys()) == ['metric']
assert tag['metric'] == metric_scalars[-1]
def test_collect_non_scalars(self):
metric1 = np.random.rand(10)
metric2 = torch.tensor(10)
log_processor = LogProcessor()
# Collect with prefix.
log_infos = {'test/metric1': metric1, 'test/metric2': metric2}
self.runner.message_hub._runtime_info = log_infos
tag = log_processor._collect_non_scalars(self.runner, mode='test')
# Test training key in tag.
assert list(tag.keys()) == ['metric1', 'metric2']
# Test statistics lr with `current`, loss and time with 'mean'
assert tag['metric1'] is metric1
assert tag['metric2'] is metric2
@patch('torch.cuda.max_memory_allocated', MagicMock())
@patch('torch.cuda.reset_peak_memory_stats', MagicMock())
def test_get_max_memory(self):
logger_hook = LogProcessor()
runner = MagicMock()
runner.world_size = 1
runner.model = torch.nn.Linear(1, 1)
logger_hook._get_max_memory(runner)
torch.cuda.max_memory_allocated.assert_called()
torch.cuda.reset_peak_memory_stats.assert_called()
def test_get_iter(self):
log_processor = LogProcessor()
# Get global iter when `inner_iter=False`
iter = log_processor._get_iter(self.runner)
assert iter == 11
# Get inner iter
iter = log_processor._get_iter(self.runner, 1)
assert iter == 2
# Still get global iter when `logger_hook.by_epoch==False`
log_processor.by_epoch = False
iter = log_processor._get_iter(self.runner, 1)
assert iter == 11
def test_get_epoch(self):
log_processor = LogProcessor()
epoch = log_processor._get_epoch(self.runner, 'train')
assert epoch == 2
epoch = log_processor._get_epoch(self.runner, 'val')
assert epoch == 1
with pytest.raises(ValueError):
log_processor._get_epoch(self.runner, 'test')
def test_get_cur_loop(self):
log_processor = LogProcessor()
loop = log_processor._get_cur_loop(self.runner, 'train')
assert len(loop.dataloader) == 20
loop = log_processor._get_cur_loop(self.runner, 'val')
assert len(loop.dataloader) == 10
loop = log_processor._get_cur_loop(self.runner, 'test')
assert len(loop.dataloader) == 5
def setup_method(self):
runner = MagicMock()
runner.epoch = 1
runner.max_epochs = 10
runner.iter = 10
runner.max_iters = 50
runner.train_dataloader = [0] * 20
runner.val_dataloader = [0] * 10
runner.test_dataloader = [0] * 5
runner.train_loop.dataloader = [0] * 20
runner.val_loop.dataloader = [0] * 10
runner.test_loop.dataloader = [0] * 5
logger = MMLogger.get_instance('log_processor_test')
runner.logger = logger
message_hub = MessageHub.get_instance('log_processor_test')
for i in range(10):
message_hub.update_scalar('train/loss', 10 - i)
for i in range(10):
message_hub.update_scalar('val/acc', i * 0.1)
runner.message_hub = message_hub
self.runner = runner