[Feature] Add colossalai strategy (#1299)
parent
03ad86cfd2
commit
db32234241
|
@ -25,7 +25,6 @@ Optimizer
|
|||
OptimWrapperDict
|
||||
DefaultOptimWrapperConstructor
|
||||
ZeroRedundancyOptimizer
|
||||
DeepSpeedOptimWrapper
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
|
|
|
@ -16,3 +16,26 @@ mmengine._strategy
|
|||
DDPStrategy
|
||||
DeepSpeedStrategy
|
||||
FSDPStrategy
|
||||
ColossalAIStrategy
|
||||
|
||||
|
||||
.. currentmodule:: mmengine._strategy.deepspeed
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
:nosignatures:
|
||||
:template: classtemplate.rst
|
||||
|
||||
MMDeepSpeedEngineWrapper
|
||||
DeepSpeedOptimWrapper
|
||||
|
||||
|
||||
.. currentmodule:: mmengine._strategy.colossalai
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
:nosignatures:
|
||||
:template: classtemplate.rst
|
||||
|
||||
CollosalAIModelWrapper
|
||||
ColossalAIOpitmWrapper
|
||||
|
|
|
@ -25,7 +25,7 @@ pip install deepspeed
|
|||
After installing DeepSpeed, you need to configure the `strategy` and `optim_wrapper` parameters of FlexibleRunner as follows:
|
||||
|
||||
- strategy: Set `type='DeepSpeedStrategy'` and configure other parameters. See [DeepSpeedStrategy](mmengine._strategy.DeepSpeedStrategy) for more details.
|
||||
- optim_wrapper: Set `type='DeepSpeedOptimWrapper'` and configure other parameters. See [DeepSpeedOptimWrapper](mmengine.optim.DeepSpeedOptimWrapper) for more details.
|
||||
- optim_wrapper: Set `type='DeepSpeedOptimWrapper'` and configure other parameters. See [DeepSpeedOptimWrapper](mmengine._strategy.deepspeed.DeepSpeedOptimWrapper) for more details.
|
||||
|
||||
Here is an example configuration related to DeepSpeed:
|
||||
|
||||
|
@ -182,3 +182,77 @@ torchrun --nproc-per-node 2 examples/distributed_training_with_flexible_runner.p
|
|||
```
|
||||
|
||||
</details>
|
||||
|
||||
## ColossalAI
|
||||
|
||||
[ColossalAI](https://colossalai.org/) is a comprehensive large-scale model training system that utilizes efficient parallelization techniques. Starting from MMEngine v0.8.5, it supports training models using optimization strategies from the ZeRO series in ColossalAI.
|
||||
|
||||
Install ColossalAI with a version greater than v0.3.1. This version requirement is due to a [bug](https://github.com/hpcaitech/ColossalAI/issues/4393) in v0.3.1 that causes some program blocking, which has been fixed in later versions. If the highest available version of ColossalAI is still v0.3.1, it is recommended to install ColossalAI from the source code on the main branch.
|
||||
|
||||
```{note}
|
||||
Note that if you encounter compilation errors like `nvcc fatal: Unsupported gpu architecture 'compute_90'` and your PyTorch version is higher than 2.0, you need to git clone the source code and follow the modifications in this [PR](https://github.com/hpcaitech/ColossalAI/pull/4357) before proceeding with the installation.
|
||||
```
|
||||
|
||||
```bash
|
||||
pip install git+https://github.com/hpcaitech/ColossalAI
|
||||
```
|
||||
|
||||
If the latest version of ColossalAI is higher than v0.3.1, you can directly install it using pip:
|
||||
|
||||
```bash
|
||||
pip install colossalai
|
||||
```
|
||||
|
||||
Once ColossalAI is installed, configure the `strategy` and `optim_wrapper` parameters for FlexibleRunner:
|
||||
|
||||
- `strategy`: Specify `type='ColossalAIStrategy'` and configure the parameters. Detailed parameter descriptions can be found in [ColossalAIStrategy](mmengine._strategy.ColossalAI).
|
||||
- `optim_wrapper`: Default to no `type` parameter or specify `type=ColossalAIOpitmWrapper`. It is recommended to choose `HybridAdam` as the optimizer type. Other configurable types are listed in [ColossalAIOptimWrapper](mmengine._strategy.ColossalAIOptimWrapper).
|
||||
|
||||
Here's the configuration related to ColossalAI:
|
||||
|
||||
```python
|
||||
from mmengine.runner._flexible_runner import FlexibleRunner
|
||||
|
||||
strategy = dict(type='ColossalAIStrategy')
|
||||
optim_wrapper = dict(optimizer=dict(type='HybridAdam', lr=1e-3))
|
||||
|
||||
# Initialize FlexibleRunner
|
||||
runner = FlexibleRunner(
|
||||
model=MMResNet50(),
|
||||
work_dir='./work_dirs',
|
||||
strategy=strategy,
|
||||
train_dataloader=train_dataloader,
|
||||
optim_wrapper=optim_wrapper,
|
||||
param_scheduler=dict(type='LinearLR'),
|
||||
train_cfg=dict(by_epoch=True, max_epochs=10, val_interval=1),
|
||||
val_dataloader=val_dataloader,
|
||||
val_cfg=dict(),
|
||||
val_evaluator=dict(type=Accuracy))
|
||||
|
||||
# Start training
|
||||
runner.train()
|
||||
```
|
||||
|
||||
To initiate distributed training using two GPUs:
|
||||
|
||||
```bash
|
||||
torchrun --nproc-per-node 2 examples/distributed_training_with_flexible_runner.py --use-colossalai
|
||||
```
|
||||
|
||||
<details>
|
||||
<summary>Training Logs</summary>
|
||||
|
||||
```
|
||||
08/18 11:56:34 - mmengine - INFO - Epoch(train) [1][ 10/196] lr: 3.3333e-04 eta: 0:10:31 time: 0.3238 data_time: 0.0344 memory: 597 loss: 3.8766
|
||||
08/18 11:56:35 - mmengine - INFO - Epoch(train) [1][ 20/196] lr: 3.3333e-04 eta: 0:06:56 time: 0.1057 data_time: 0.0338 memory: 597 loss: 2.3797
|
||||
08/18 11:56:36 - mmengine - INFO - Epoch(train) [1][ 30/196] lr: 3.3333e-04 eta: 0:05:45 time: 0.1068 data_time: 0.0342 memory: 597 loss: 2.3219
|
||||
08/18 11:56:37 - mmengine - INFO - Epoch(train) [1][ 40/196] lr: 3.3333e-04 eta: 0:05:08 time: 0.1059 data_time: 0.0337 memory: 597 loss: 2.2641
|
||||
08/18 11:56:38 - mmengine - INFO - Epoch(train) [1][ 50/196] lr: 3.3333e-04 eta: 0:04:45 time: 0.1062 data_time: 0.0338 memory: 597 loss: 2.2250
|
||||
08/18 11:56:40 - mmengine - INFO - Epoch(train) [1][ 60/196] lr: 3.3333e-04 eta: 0:04:31 time: 0.1097 data_time: 0.0339 memory: 597 loss: 2.1672
|
||||
08/18 11:56:41 - mmengine - INFO - Epoch(train) [1][ 70/196] lr: 3.3333e-04 eta: 0:04:21 time: 0.1096 data_time: 0.0340 memory: 597 loss: 2.1688
|
||||
08/18 11:56:42 - mmengine - INFO - Epoch(train) [1][ 80/196] lr: 3.3333e-04 eta: 0:04:13 time: 0.1098 data_time: 0.0338 memory: 597 loss: 2.1781
|
||||
08/18 11:56:43 - mmengine - INFO - Epoch(train) [1][ 90/196] lr: 3.3333e-04 eta: 0:04:06 time: 0.1097 data_time: 0.0338 memory: 597 loss: 2.0938
|
||||
08/18 11:56:44 - mmengine - INFO - Epoch(train) [1][100/196] lr: 3.3333e-04 eta: 0:04:01 time: 0.1097 data_time: 0.0339 memory: 597 loss: 2.1078
|
||||
08/18 11:56:45 - mmengine - INFO - Epoch(train) [1][110/196] lr: 3.3333e-04 eta: 0:04:01 time: 0.1395 data_time: 0.0340 memory: 597 loss: 2.0141
|
||||
08/18 11:56:46 - mmengine - INFO - Epoch(train) [1][120/196] lr: 3.3333
|
||||
```
|
||||
|
|
|
@ -25,7 +25,6 @@ Optimizer
|
|||
OptimWrapperDict
|
||||
DefaultOptimWrapperConstructor
|
||||
ZeroRedundancyOptimizer
|
||||
DeepSpeedOptimWrapper
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
|
|
|
@ -16,3 +16,26 @@ mmengine._strategy
|
|||
DDPStrategy
|
||||
DeepSpeedStrategy
|
||||
FSDPStrategy
|
||||
ColossalAIStrategy
|
||||
|
||||
|
||||
.. currentmodule:: mmengine._strategy.deepspeed
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
:nosignatures:
|
||||
:template: classtemplate.rst
|
||||
|
||||
MMDeepSpeedEngineWrapper
|
||||
DeepSpeedOptimWrapper
|
||||
|
||||
|
||||
.. currentmodule:: mmengine._strategy.colossalai
|
||||
|
||||
.. autosummary::
|
||||
:toctree: generated
|
||||
:nosignatures:
|
||||
:template: classtemplate.rst
|
||||
|
||||
CollosalAIModelWrapper
|
||||
ColossalAIOpitmWrapper
|
||||
|
|
|
@ -24,7 +24,7 @@ pip install deepspeed
|
|||
安装好 deepspeed 后,需配置 FlexibleRunner 的 strategy 和 optim_wrapper 参数:
|
||||
|
||||
- strategy:指定 `type='DeepSpeedStrategy'` 并配置参数。参数的详细介绍可阅读 [DeepSpeedStrategy](mmengine._strategy.DeepSpeedStrategy)。
|
||||
- optim_wrapper:指定 `type='DeepSpeedOptimWrapper'` 并配置参数。参数的详细介绍可阅读 [DeepSpeedOptimWrapper](mmengine.optim.DeepSpeedOptimWrapper)。
|
||||
- optim_wrapper:指定 `type='DeepSpeedOptimWrapper'` 并配置参数。参数的详细介绍可阅读 [DeepSpeedOptimWrapper](mmengine._strategy.deepspeed.DeepSpeedOptimWrapper)。
|
||||
|
||||
下面是 DeepSpeed 相关的配置:
|
||||
|
||||
|
@ -181,3 +181,85 @@ torchrun --nproc-per-node 2 examples/distributed_training_with_flexible_runner.p
|
|||
```
|
||||
|
||||
</details>
|
||||
|
||||
## ColossalAI
|
||||
|
||||
[ColossalAI](https://colossalai.org/) 是一个具有高效并行化技术的综合大规模模型训练系统。MMEngine 自 v0.8.5 开始,支持使用 ColossalAI 中的 ZeRO 系列优化策略训练模型。
|
||||
|
||||
安装版本大于 v0.3.1 的 ColossalAI。这个版本限制是由于 v0.3.1 存在一些程序阻塞的 [Bug](https://github.com/hpcaitech/ColossalAI/issues/4393),而该 Bug 在之后的版本中已经修复。如果目前 ColossalAI 的最高版本仍为 v0.3.1,建议从源码安装主分支的 ColossalAI。
|
||||
|
||||
```{note}
|
||||
需要注意的是,如果你的 PyTorch 版本高于 2.0,并遇到了 `nvcc fatal : Unsupported gpu architecture 'compute_90'` 类似的编译错误,则需要 git clone 源码,参考该 [PR](https://github.com/hpcaitech/ColossalAI/pull/4357) 进行修改源码,再进行安装
|
||||
```
|
||||
|
||||
```bash
|
||||
pip install git+https://github.com/hpcaitech/ColossalAI
|
||||
```
|
||||
|
||||
如果 ColossalAI 的最新版本大于 v0.3.1,可以直接使用 pip 安装:
|
||||
|
||||
```bash
|
||||
pip install colossalai
|
||||
```
|
||||
|
||||
安装好 ColossalAI 后,需配置 FlexibleRunner 的 strategy 和 optim_wrapper 参数:
|
||||
|
||||
- strategy:指定 `type='ColossalAIStrategy'` 并配置参数。参数的详细介绍可阅读 [ColossalAIStrategy](mmengine._strategy.ColossalAI)。
|
||||
- optim_wrapper:缺省 `type` 参数,或指定 `type=ColossalAIOpitmWrapper`,优化器类型时建议选择 `HybridAdam`。其他可配置类型可阅读 [ColossalAIOptimWrapper](mmengine._strategy.ColossalAIOptimWrapper)。
|
||||
|
||||
下面是 ColossalAI 相关的配置:
|
||||
|
||||
```python
|
||||
from mmengine.runner._flexible_runner import FlexibleRunner
|
||||
|
||||
strategy = dict(type='ColossalAIStrategy')
|
||||
optim_wrapper = dict(optimizer=dict(type='HybridAdam', lr=1e-3))
|
||||
|
||||
# 初始化 FlexibleRunner
|
||||
runner = FlexibleRunner(
|
||||
model=MMResNet50(),
|
||||
work_dir='./work_dirs',
|
||||
strategy=strategy,
|
||||
train_dataloader=train_dataloader,
|
||||
optim_wrapper=optim_wrapper,
|
||||
param_scheduler=dict(type='LinearLR'),
|
||||
train_cfg=dict(by_epoch=True, max_epochs=10, val_interval=1),
|
||||
val_dataloader=val_dataloader,
|
||||
val_cfg=dict(),
|
||||
val_evaluator=dict(type=Accuracy))
|
||||
|
||||
# 开始训练
|
||||
runner.train()
|
||||
```
|
||||
|
||||
使用两张卡启动分布式训练:
|
||||
|
||||
```bash
|
||||
torchrun --nproc-per-node 2 examples/distributed_training_with_flexible_runner.py --use-colossalai
|
||||
```
|
||||
|
||||
<details>
|
||||
<summary>训练日志</summary>
|
||||
|
||||
```
|
||||
08/18 11:56:34 - mmengine - INFO - Epoch(train) [1][ 10/196] lr: 3.3333e-04 eta: 0:10:31 time: 0.3238 data_time: 0.0344 memory: 597 loss: 3.8766
|
||||
08/18 11:56:35 - mmengine - INFO - Epoch(train) [1][ 20/196] lr: 3.3333e-04 eta: 0:06:56 time: 0.1057 data_time: 0.0338 memory: 597 loss: 2.3797
|
||||
08/18 11:56:36 - mmengine - INFO - Epoch(train) [1][ 30/196] lr: 3.3333e-04 eta: 0:05:45 time: 0.1068 data_time: 0.0342 memory: 597 loss: 2.3219
|
||||
08/18 11:56:37 - mmengine - INFO - Epoch(train) [1][ 40/196] lr: 3.3333e-04 eta: 0:05:08 time: 0.1059 data_time: 0.0337 memory: 597 loss: 2.2641
|
||||
08/18 11:56:38 - mmengine - INFO - Epoch(train) [1][ 50/196] lr: 3.3333e-04 eta: 0:04:45 time: 0.1062 data_time: 0.0338 memory: 597 loss: 2.2250
|
||||
08/18 11:56:40 - mmengine - INFO - Epoch(train) [1][ 60/196] lr: 3.3333e-04 eta: 0:04:31 time: 0.1097 data_time: 0.0339 memory: 597 loss: 2.1672
|
||||
08/18 11:56:41 - mmengine - INFO - Epoch(train) [1][ 70/196] lr: 3.3333e-04 eta: 0:04:21 time: 0.1096 data_time: 0.0340 memory: 597 loss: 2.1688
|
||||
08/18 11:56:42 - mmengine - INFO - Epoch(train) [1][ 80/196] lr: 3.3333e-04 eta: 0:04:13 time: 0.1098 data_time: 0.0338 memory: 597 loss: 2.1781
|
||||
08/18 11:56:43 - mmengine - INFO - Epoch(train) [1][ 90/196] lr: 3.3333e-04 eta: 0:04:06 time: 0.1097 data_time: 0.0338 memory: 597 loss: 2.0938
|
||||
08/18 11:56:44 - mmengine - INFO - Epoch(train) [1][100/196] lr: 3.3333e-04 eta: 0:04:01 time: 0.1097 data_time: 0.0339 memory: 597 loss: 2.1078
|
||||
08/18 11:56:45 - mmengine - INFO - Epoch(train) [1][110/196] lr: 3.3333e-04 eta: 0:04:01 time: 0.1395 data_time: 0.0340 memory: 597 loss: 2.0141
|
||||
08/18 11:56:46 - mmengine - INFO - Epoch(train) [1][120/196] lr: 3.3333e-04 eta: 0:03:56 time: 0.1090 data_time: 0.0338 memory: 597 loss: 2.0273
|
||||
08/18 11:56:48 - mmengine - INFO - Epoch(train) [1][130/196] lr: 3.3333e-04 eta: 0:03:52 time: 0.1096 data_time: 0.0339 memory: 597 loss: 2.0086
|
||||
08/18 11:56:49 - mmengine - INFO - Epoch(train) [1][140/196] lr: 3.3333e-04 eta: 0:03:49 time: 0.1096 data_time: 0.0339 memory: 597 loss: 1.9180
|
||||
08/18 11:56:50 - mmengine - INFO - Epoch(train) [1][150/196] lr: 3.3333e-04 eta: 0:03:46 time: 0.1092 data_time: 0.0339 memory: 597 loss: 1.9578
|
||||
08/18 11:56:51 - mmengine - INFO - Epoch(train) [1][160/196] lr: 3.3333e-04 eta: 0:03:43 time: 0.1097 data_time: 0.0339 memory: 597 loss: 1.9375
|
||||
08/18 11:56:52 - mmengine - INFO - Epoch(train) [1][170/196] lr: 3.3333e-04 eta: 0:03:40 time: 0.1092 data_time: 0.0339 memory: 597 loss: 1.9312
|
||||
08/18 11:56:53 - mmengine - INFO - Epoch(train) [1][180/196] lr: 3.3333e-04 eta: 0:03:37 time: 0.1070 data_time: 0.0339 memory: 597 loss: 1.9078
|
||||
```
|
||||
|
||||
</details>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
import argparse
|
||||
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
import torchvision
|
||||
import torchvision.transforms as transforms
|
||||
|
@ -44,6 +45,7 @@ def parse_args():
|
|||
parser.add_argument('--local_rank', '--local-rank', type=int, default=0)
|
||||
parser.add_argument('--use-fsdp', action='store_true')
|
||||
parser.add_argument('--use-deepspeed', action='store_true')
|
||||
parser.add_argument('--use-colossalai', action='store_true')
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
@ -116,6 +118,25 @@ def main():
|
|||
model_wrapper=dict(auto_wrap_policy=size_based_auto_wrap_policy))
|
||||
optim_wrapper = dict(
|
||||
type='AmpOptimWrapper', optimizer=dict(type='AdamW', lr=1e-3))
|
||||
elif args.use_colossalai:
|
||||
from colossalai.tensor.op_wrapper import colo_op_impl
|
||||
|
||||
# ColossalAI overwrite some torch ops with their custom op to
|
||||
# make it compatible with `ColoTensor`. However, a backward error
|
||||
# is more likely to happen if there are inplace operation in the
|
||||
# model.
|
||||
# For example, layers like `conv` + `bn` + `relu` is OK when `relu` is
|
||||
# inplace since PyTorch builtin ops `batch_norm` could handle it.
|
||||
# However, if `relu` is an `inplaced` op while `batch_norm` is an
|
||||
# custom op, an error will be raised since PyTorch thinks the custom op
|
||||
# could not handle the backward graph modification caused by inplace
|
||||
# op.
|
||||
# In this example, the inplace op `add_` in resnet could raise an error
|
||||
# since PyTorch consider the custom op before it could not handle the
|
||||
# backward graph modification
|
||||
colo_op_impl(torch.Tensor.add_)(torch.add)
|
||||
strategy = dict(type='ColossalAIStrategy')
|
||||
optim_wrapper = dict(optimizer=dict(type='HybridAdam', lr=1e-3))
|
||||
else:
|
||||
strategy = None
|
||||
optim_wrapper = dict(
|
||||
|
|
|
@ -2,12 +2,14 @@
|
|||
from mmengine.utils import digit_version
|
||||
from mmengine.utils.dl_utils import TORCH_VERSION
|
||||
from .base import BaseStrategy
|
||||
from .colossalai import ColossalAIStrategy
|
||||
from .deepspeed import DeepSpeedStrategy
|
||||
from .distributed import DDPStrategy
|
||||
from .single_device import SingleDeviceStrategy
|
||||
|
||||
__all__ = [
|
||||
'BaseStrategy', 'DDPStrategy', 'SingleDeviceStrategy', 'DeepSpeedStrategy'
|
||||
'BaseStrategy', 'DDPStrategy', 'SingleDeviceStrategy', 'DeepSpeedStrategy',
|
||||
'ColossalAIStrategy'
|
||||
]
|
||||
|
||||
if digit_version(TORCH_VERSION) >= digit_version('2.0.0'):
|
||||
|
|
|
@ -0,0 +1,553 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
import inspect
|
||||
import os.path as osp
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
try:
|
||||
import colossalai
|
||||
import colossalai.booster.mixed_precision as colo_precision
|
||||
import colossalai.booster.plugin as colo_plugin
|
||||
import colossalai.nn.optimizer as colo_optimizer
|
||||
from colossalai.booster import Booster
|
||||
from colossalai.interface import ModelWrapper
|
||||
except Exception as e: # noqa: F841
|
||||
colossalai = None
|
||||
colo_precision = None
|
||||
colo_plugin = None
|
||||
colo_optimizer = None
|
||||
Booster = None
|
||||
ModelWrapper = None
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
import mmengine
|
||||
from mmengine import mkdir_or_exist
|
||||
from mmengine._strategy import BaseStrategy
|
||||
from mmengine.device import get_device
|
||||
from mmengine.dist import init_dist, is_main_process
|
||||
from mmengine.fileio import join_path
|
||||
from mmengine.model import BaseDataPreprocessor
|
||||
from mmengine.optim import BaseOptimWrapper, OptimWrapper, _ParamScheduler
|
||||
from mmengine.registry import STRATEGIES, Registry
|
||||
from mmengine.registry.root import MODEL_WRAPPERS, OPTIM_WRAPPERS, OPTIMIZERS
|
||||
from mmengine.runner.checkpoint import _load_checkpoint, save_checkpoint
|
||||
from mmengine.utils import get_git_hash
|
||||
|
||||
# Component for colossalai `plugins` and `mixed_precisions`
|
||||
PLUGINS = Registry('plugin')
|
||||
MIXED_PRECISIONS = Registry('mixed_precision')
|
||||
|
||||
|
||||
def register_plugins():
|
||||
_plugins = inspect.getmembers(
|
||||
colo_plugin,
|
||||
lambda x: inspect.isclass(x) and issubclass(x, colo_plugin.Plugin))
|
||||
|
||||
for name, plugin in _plugins:
|
||||
PLUGINS.register_module(name=name, module=plugin)
|
||||
|
||||
|
||||
def register_optimizers():
|
||||
_colo_optimizer = inspect.getmembers(
|
||||
colo_optimizer,
|
||||
lambda x: inspect.isclass(x) and issubclass(x, torch.optim.Optimizer))
|
||||
for name, optim_type in _colo_optimizer:
|
||||
OPTIMIZERS.register_module(name=name, module=optim_type, force=True)
|
||||
|
||||
|
||||
def register_mixed_precisions():
|
||||
_mixed_precisions = inspect.getmembers(
|
||||
colo_precision, lambda x: inspect.isclass(x) and issubclass(
|
||||
x, colo_precision.MixedPrecision))
|
||||
|
||||
for name, mixed_precision in _mixed_precisions:
|
||||
MIXED_PRECISIONS.register_module(name=name, module=mixed_precision)
|
||||
|
||||
|
||||
@OPTIM_WRAPPERS.register_module()
|
||||
class ColossalAIOpitmWrapper(OptimWrapper):
|
||||
"""OptimWrapper for ColossalAI.
|
||||
|
||||
The available optimizers are:
|
||||
- CPUAdam
|
||||
- FusedAdam
|
||||
- FusedLAMB
|
||||
- FusedSGD
|
||||
- HybridAdam
|
||||
- Lamb
|
||||
- Lars
|
||||
|
||||
You can find more details in the `colossalai tutorial`_
|
||||
|
||||
Args:
|
||||
optimizer (dict or collossal.booster.Booster): The optimizer to be
|
||||
wrapped.
|
||||
accumulative_counts (int): The number of iterations to accumulate
|
||||
gradients. The parameters will be updated per
|
||||
``accumulative_counts``.
|
||||
|
||||
.. _colossalai tutorial: https://github.com/hpcaitech/ColossalAI/tree/main/colossalai/nn/optimizer
|
||||
""" # noqa: E501
|
||||
|
||||
def __init__(self,
|
||||
optimizer: torch.optim.Optimizer,
|
||||
booster: Booster,
|
||||
accumulative_counts: int = 1):
|
||||
super().__init__(optimizer, accumulative_counts=accumulative_counts)
|
||||
self.booster = booster
|
||||
|
||||
@contextmanager
|
||||
def optim_context(self, model: nn.Module):
|
||||
if self.booster.plugin.support_no_sync():
|
||||
sync_context = self.booster.no_sync(model, self.optimizer)
|
||||
else:
|
||||
yield
|
||||
return
|
||||
if not self.should_sync():
|
||||
with sync_context:
|
||||
yield
|
||||
|
||||
def backward(self, loss: torch.Tensor, **kwargs) -> None:
|
||||
self._inner_count += 1
|
||||
self.optimizer.backward(loss, **kwargs)
|
||||
|
||||
|
||||
@MODEL_WRAPPERS.register_module()
|
||||
class CollosalAIModelWrapper:
|
||||
|
||||
def __init__(self, model_wrapper: ModelWrapper, model: nn.Module):
|
||||
self.model_wrapper = model_wrapper
|
||||
self.model = model
|
||||
|
||||
def __call__(self, *args, **kwargs) -> Any:
|
||||
return self.model_wrapper(*args, **kwargs)
|
||||
|
||||
def train_step(
|
||||
self,
|
||||
data: Union[dict, tuple, list],
|
||||
optim_wrapper: ColossalAIOpitmWrapper,
|
||||
) -> Dict[str, torch.Tensor]:
|
||||
data = self.model.data_preprocessor(data, training=True)
|
||||
with optim_wrapper.optim_context(self.model):
|
||||
losses = self._run_forward(data, mode='loss')
|
||||
parsed_loss, log_vars = self.model.parse_losses(losses)
|
||||
optim_wrapper.update_params(parsed_loss)
|
||||
return log_vars
|
||||
|
||||
def val_step(self, data: Union[dict, tuple, list]) -> list:
|
||||
"""Gets the prediction of module during validation process.
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
|
||||
Returns:
|
||||
list: The predictions of given data.
|
||||
"""
|
||||
data = self.model.data_preprocessor(data, False)
|
||||
return self._run_forward(data, mode='predict')
|
||||
|
||||
test_step = val_step
|
||||
|
||||
def _run_forward(self, data: Union[dict, tuple, list], mode: str) -> Any:
|
||||
"""Unpacks data for :meth:`forward`
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
mode (str): Mode of forward.
|
||||
|
||||
Returns:
|
||||
dict or list: Results of training or testing mode.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
results = self.model_wrapper(**data, mode=mode)
|
||||
elif isinstance(data, (list, tuple)):
|
||||
results = self.model_wrapper(*data, mode=mode)
|
||||
else:
|
||||
raise TypeError('Output of `data_preprocessor` should be '
|
||||
f'list, tuple or dict, but got {type(data)}')
|
||||
return results
|
||||
|
||||
def __getattr__(self, name):
|
||||
if hasattr(self.model_wrapper, name):
|
||||
return getattr(self.model_wrapper, name)
|
||||
elif hasattr(self.model, name):
|
||||
return getattr(self.model, name)
|
||||
else:
|
||||
raise AttributeError(
|
||||
f'{self.model_wrapper} and {self.model} has no '
|
||||
f'attribute {name}')
|
||||
|
||||
|
||||
@STRATEGIES.register_module()
|
||||
class ColossalAIStrategy(BaseStrategy):
|
||||
"""
|
||||
Args:
|
||||
config: (str or dict): The colossalai config file to setup distributed
|
||||
environment. See more details in the `colossalai config tutorial`_.
|
||||
mixed_precision (str or MixedPrecision): The mixed precision to run the
|
||||
training. Defaults to None. If the argument is a string, it can be
|
||||
'fp16', 'fp16_apex', 'bf16', or 'fp8' fp16' would use PyTorch AMP
|
||||
while `fp16_apex` would use Nvidia Apex.
|
||||
plugin (Plugin): The plugin to run the training. The type of `plugin`
|
||||
could be:
|
||||
|
||||
- str: The available plugins are ``gemini`` and ``lowlevel-zero``.
|
||||
|
||||
``gemini`` means a `ZeRO`_ implementation with chunk-based
|
||||
memory management. You could find more details in the
|
||||
`colossalai gemini tutorial`_. ``lowlevel-zero`` means a
|
||||
Zero-1 and Zero-2 implementation. Although gemini is more
|
||||
memory saving, some unexpceted error could happen for
|
||||
some spectial model structure. lowlevel-zero is more stable.
|
||||
|
||||
- dict: **dict-type style config to build a colossalai plugin**.
|
||||
|
||||
See the `booster plugin tutorial`_ for more details.
|
||||
|
||||
model_wrapper (dict, optional): Dict for model wrapper. Defaults to
|
||||
None.
|
||||
work_dir (str): The working directory to save checkpoints. The logs
|
||||
will be saved in the subdirectory of `work_dir` named
|
||||
:attr:`timestamp`. Defaults to 'work_dirs'.
|
||||
experiment_name (str, optional): Name of current experiment. If not
|
||||
specified, timestamp will be used as :attr:`experiment_name`.
|
||||
Defaults to None.
|
||||
env_kwargs (dict, optional): Environment config passed in
|
||||
:meth:`setup_env`. Defaults to None.
|
||||
log_kwargs (dict, optional): Logger config passed in
|
||||
:meth:`build_logger`. Defaults to None.
|
||||
auto_scale_lr (dict, Optional): Config to scale the learning rate
|
||||
automatically. It includes ``base_batch_size`` and ``enable``.
|
||||
``base_batch_size`` is the batch size that the optimizer lr is
|
||||
based on. ``enable`` is the switch to turn on and off the feature.
|
||||
|
||||
.. _colossalai config tutorial: https://colossalai.org/docs/basics/configure_parallelization
|
||||
.. _ZeRO: https://arxiv.org/abs/1910.02054
|
||||
.. _colossalai gemini tutorial: https://colossalai.org/docs/features/zero_with_chunk/#geminiddp
|
||||
.. _booster plugin tutorial: https://colossalai.org/docs/basics/booster_plugins
|
||||
|
||||
""" # noqa: E501
|
||||
OPTIMIZER_DIR = 'optimizer' # directory to save optimizer state.
|
||||
MODEL_DIR = 'model' # directory to save model
|
||||
SCHEDULER_DIR = 'scheduler' # directory to save scheduelrs
|
||||
model: CollosalAIModelWrapper # type: ignore
|
||||
optim_wrapper: ColossalAIOpitmWrapper # type: ignore
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
config: Union[str, dict, None] = None,
|
||||
mixed_precision: Union[str, dict, None] = None,
|
||||
plugin: str = 'gemini',
|
||||
model_wrapper: Optional[dict] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if colossalai is None:
|
||||
raise ModuleNotFoundError(
|
||||
'Please install colossalai by `pip install -U colossalai`')
|
||||
register_plugins()
|
||||
register_mixed_precisions()
|
||||
register_optimizers()
|
||||
|
||||
self.config = config or {}
|
||||
super().__init__(**kwargs)
|
||||
if mixed_precision is not None:
|
||||
mixed_precision = self._build_mixed_precision(mixed_precision)
|
||||
|
||||
if plugin is not None:
|
||||
plugin = self._build_plugin(plugin)
|
||||
self.booster = Booster(mixed_precision=mixed_precision, plugin=plugin)
|
||||
self.model_wrapper = model_wrapper
|
||||
|
||||
def prepare(
|
||||
self,
|
||||
model: Union[nn.Module, dict],
|
||||
*,
|
||||
optim_wrapper: Union[BaseOptimWrapper, dict, None] = None,
|
||||
param_scheduler: Union[_ParamScheduler, Dict, List, None] = None,
|
||||
compile: Union[dict, bool] = False,
|
||||
dispatch_kwargs: Optional[dict] = None,
|
||||
):
|
||||
"""Prepare model and some components.
|
||||
|
||||
Args:
|
||||
model (:obj:`torch.nn.Module` or dict): The model to be run. It
|
||||
can be a dict used for build a model.
|
||||
|
||||
Keyword Args:
|
||||
optim_wrapper (BaseOptimWrapper or dict, optional): Computing the
|
||||
gradient of model parameters and updating them.
|
||||
Defaults to None.
|
||||
See :meth:`build_optim_wrapper` for examples.
|
||||
param_scheduler (_ParamScheduler or dict or list, optional):
|
||||
Parameter scheduler for updating optimizer parameters. If
|
||||
specified, :attr:`optim_wrapper` should also be specified.
|
||||
Defaults to None.
|
||||
See :meth:`build_param_scheduler` for examples.
|
||||
compile (dict, optional): Config to compile model.
|
||||
Defaults to False. Requires PyTorch>=2.0.
|
||||
dispatch_kwargs (dict, optional): Kwargs to be passed to other
|
||||
methods of Strategy. Defaults to None.
|
||||
If ``accumulative_counts`` is set in ``optim_wrapper``, you
|
||||
need to provide ``max_iters`` in ``dispatch_kwargs``.
|
||||
"""
|
||||
if self._prepared:
|
||||
return self._prepared_components()
|
||||
if dispatch_kwargs is not None:
|
||||
self.dispatch_kwargs.update(dispatch_kwargs)
|
||||
|
||||
model = self.build_model(model)
|
||||
model = self._init_model_weights(model)
|
||||
|
||||
# optim_wrapper is required by booster
|
||||
if optim_wrapper is not None and isinstance(optim_wrapper, dict):
|
||||
optim_wrapper.setdefault('type', 'ColossalAIOpitmWrapper')
|
||||
optim_wrapper.setdefault('booster', self.booster)
|
||||
optim_wrapper_type = OPTIM_WRAPPERS.get(optim_wrapper['type'])
|
||||
if optim_wrapper_type is None:
|
||||
raise ValueError(f'Failed to find {optim_wrapper["type"]} in '
|
||||
'`OPTIM_WRAPPERS`.')
|
||||
if 'clip_grad' in optim_wrapper:
|
||||
raise ValueError('`Please configure `clip_grad` in `plugin`')
|
||||
if not issubclass(optim_wrapper_type, ColossalAIOpitmWrapper):
|
||||
raise ValueError(
|
||||
'The type of `optim_wrapper` must be '
|
||||
'`ColossalAIOptimWrapper` (or subclass), but got '
|
||||
f'{optim_wrapper_type}')
|
||||
optim_wrapper = self.build_optim_wrapper(optim_wrapper, model)
|
||||
|
||||
if optim_wrapper is not None:
|
||||
self.model, self.optim_wrapper = self._wrap(
|
||||
model, optim_wrapper) # type: ignore
|
||||
else:
|
||||
self.model = self._wrap(model) # type: ignore
|
||||
# TODO: Check whether `compile` is compatible with colossalai.
|
||||
|
||||
if param_scheduler is not None:
|
||||
self.param_schedulers = self.build_param_scheduler(
|
||||
param_scheduler, optim_wrapper) # type: ignore
|
||||
|
||||
if optim_wrapper is not None:
|
||||
self._scale_lr()
|
||||
accumulative_counts = getattr(self.optim_wrapper,
|
||||
'_accumulative_counts', 1)
|
||||
if accumulative_counts > 1:
|
||||
if 'max_iters' not in self.dispatch_kwargs:
|
||||
raise ValueError(
|
||||
'"max_iters" must be specified because '
|
||||
'"accumulative_counts" was set as '
|
||||
f'{accumulative_counts} which is greater than 1.')
|
||||
|
||||
self.optim_wrapper.initialize_count_status( # type: ignore
|
||||
self.model, 0, self.dispatch_kwargs['max_iters'])
|
||||
self._prepared = True
|
||||
return self._prepared_components()
|
||||
|
||||
def resume(
|
||||
self,
|
||||
filename: str,
|
||||
*,
|
||||
resume_optimizer: bool = True,
|
||||
resume_param_scheduler: bool = True,
|
||||
map_location: Union[str, Callable] = 'default',
|
||||
callback: Optional[Callable] = None,
|
||||
) -> dict:
|
||||
"""override this method since colossalai resume optimizer from filename
|
||||
directly."""
|
||||
self.logger.info(f'Resume checkpoint from {filename}')
|
||||
|
||||
extra_ckpt = self.load_checkpoint(
|
||||
filename, map_location=map_location, callback=callback)
|
||||
|
||||
if resume_optimizer:
|
||||
self.booster.load_optimizer(
|
||||
self.optim_wrapper.optimizer,
|
||||
join_path(filename, self.OPTIMIZER_DIR))
|
||||
|
||||
if resume_param_scheduler:
|
||||
schedulers_dir = join_path(filename, self.SCHEDULER_DIR)
|
||||
for i, scheduler in enumerate(self.param_schedulers):
|
||||
self.booster.load_lr_scheduler(
|
||||
scheduler, f'{schedulers_dir}/scheduler_{i}.pth')
|
||||
|
||||
# resume random seed
|
||||
resumed_seed = extra_ckpt['meta'].get('seed', None)
|
||||
current_seed = self._randomness.get('seed')
|
||||
if resumed_seed is not None and resumed_seed != current_seed:
|
||||
if current_seed is not None:
|
||||
self.logger.warning(f'The value of random seed in the '
|
||||
f'checkpoint "{resumed_seed}" is '
|
||||
f'different from the value in '
|
||||
f'`randomness` config "{current_seed}"')
|
||||
self._randomness.update(seed=resumed_seed)
|
||||
self._set_randomness(**self._randomness)
|
||||
|
||||
# resume iter
|
||||
self.dispatch_kwargs['cur_iter'] = extra_ckpt['meta']['iter']
|
||||
|
||||
return extra_ckpt
|
||||
|
||||
def load_checkpoint(
|
||||
self,
|
||||
filename: str,
|
||||
*,
|
||||
map_location: Union[str, Callable] = 'cpu',
|
||||
strict: bool = False,
|
||||
revise_keys: list = [(r'^module.', '')],
|
||||
callback: Optional[Callable] = None,
|
||||
) -> dict:
|
||||
"""Load checkpoint from given ``filename``.
|
||||
|
||||
Warning:
|
||||
`map_localtion` and `callback` parameters are not supported yet.
|
||||
|
||||
Args:
|
||||
filename (str): Accept local filepath, URL, ``torchvision://xxx``,
|
||||
``open-mmlab://xxx``.
|
||||
"""
|
||||
self.logger.info(f'Load checkpoint from {filename}')
|
||||
self.booster.load_model(self.model.model_wrapper,
|
||||
join_path(filename, self.MODEL_DIR))
|
||||
meta = _load_checkpoint(osp.join(filename, 'meta.pth'))
|
||||
return meta
|
||||
|
||||
def save_checkpoint(
|
||||
self,
|
||||
filename: str,
|
||||
*,
|
||||
save_optimizer: bool = True,
|
||||
save_param_scheduler: bool = True,
|
||||
extra_ckpt: Optional[dict] = None,
|
||||
callback: Optional[Callable] = None,
|
||||
) -> None:
|
||||
# The checkpoint directory will be:
|
||||
# |--epoch_0.pth
|
||||
# |---model/
|
||||
# |---optimizer/
|
||||
# |---scheduler/
|
||||
if extra_ckpt is None:
|
||||
extra_ckpt = dict()
|
||||
if 'meta' not in extra_ckpt:
|
||||
extra_ckpt['meta'] = dict()
|
||||
extra_ckpt['meta'].update(
|
||||
seed=self.seed,
|
||||
time=time.strftime('%Y%m%d_%H%M%S', time.localtime()),
|
||||
mmengine=mmengine.__version__ + get_git_hash())
|
||||
|
||||
model_dir = join_path(filename, self.MODEL_DIR)
|
||||
optimizer_dir = join_path(filename, self.OPTIMIZER_DIR)
|
||||
schedulers_dir = join_path(filename, self.SCHEDULER_DIR)
|
||||
mkdir_or_exist(model_dir)
|
||||
mkdir_or_exist(optimizer_dir)
|
||||
mkdir_or_exist(schedulers_dir)
|
||||
|
||||
self.booster.save_model(
|
||||
self.model.model_wrapper, checkpoint=model_dir, shard=True)
|
||||
|
||||
if save_optimizer:
|
||||
self.booster.save_optimizer(
|
||||
self.optim_wrapper.optimizer,
|
||||
checkpoint=optimizer_dir,
|
||||
shard=True)
|
||||
|
||||
if is_main_process() and save_param_scheduler:
|
||||
for i, scheduler in enumerate(self.param_schedulers):
|
||||
self.booster.save_lr_scheduler(
|
||||
scheduler, f'{schedulers_dir}/scheduler_{i}.pth')
|
||||
|
||||
save_checkpoint(extra_ckpt, join_path(filename, 'meta.pth'))
|
||||
|
||||
def _build_plugin(self, plugin: Union[str, dict]):
|
||||
if isinstance(plugin, str):
|
||||
if plugin == 'gemini':
|
||||
plugin = colo_plugin.GeminiPlugin(
|
||||
precision='bf16', placement_policy='cuda')
|
||||
elif plugin == 'lowlevel-zero':
|
||||
plugin = colo_plugin.LowLevelZeroPlugin()
|
||||
else:
|
||||
raise ValueError('`plugin` must be "gemini" or '
|
||||
'"lowlevel-zero"')
|
||||
elif isinstance(plugin, dict):
|
||||
plugin = PLUGINS.build(plugin)
|
||||
else:
|
||||
raise ValueError('`plugin` must be dict or str, but got a '
|
||||
f'{type(plugin)} object)')
|
||||
return plugin
|
||||
|
||||
def _build_mixed_precision(self, mixed_precision: Union[str, dict]):
|
||||
if isinstance(mixed_precision, str):
|
||||
if mixed_precision == 'fp16':
|
||||
mixed_precision = colo_precision.FP16TorchMixedPrecision()
|
||||
elif mixed_precision == 'fp16_apex':
|
||||
mixed_precision = colo_precision.FP16ApexMixedPrecision()
|
||||
elif mixed_precision == 'bf16':
|
||||
mixed_precision = colo_precision.BF16MixedPrecision()
|
||||
elif mixed_precision == 'fp8':
|
||||
mixed_precision = colo_precision.FP8MixedPrecision()
|
||||
else:
|
||||
raise ValueError(
|
||||
'If `mixed_precision` is a string, it must be one of '
|
||||
'"fp16", "fp16_apex", "bf16" and "fp8", but got '
|
||||
f'{mixed_precision}')
|
||||
elif isinstance(mixed_precision, dict):
|
||||
mixed_precision = MIXED_PRECISIONS.build(mixed_precision)
|
||||
else:
|
||||
raise ValueError('mixed precision should be dict or str, but got '
|
||||
f'a {type(mixed_precision)} object')
|
||||
return mixed_precision
|
||||
|
||||
def _wrap(
|
||||
self,
|
||||
model: nn.Module,
|
||||
optim_wrapper: Optional[OptimWrapper] = None,
|
||||
) -> Union[Tuple[CollosalAIModelWrapper, ColossalAIOpitmWrapper],
|
||||
CollosalAIModelWrapper]: # type: ignore
|
||||
"""Wrap model with :class:`ModelWrapper`."""
|
||||
if self.model_wrapper is None:
|
||||
self.model_wrapper = {'type': 'CollosalAIModelWrapper'}
|
||||
|
||||
# For zero series parallel, move `data_preprocessor` to current device
|
||||
# is reasonable. We need to `BaseDataPreprocessor.to` manually since
|
||||
# framework like colossalai and deepspeed could not handle it, leading
|
||||
# to `data_preprocessor` move data to cpu.
|
||||
for module in model.modules():
|
||||
if isinstance(module, BaseDataPreprocessor):
|
||||
module.to(get_device())
|
||||
|
||||
if optim_wrapper is not None:
|
||||
optimizer = optim_wrapper.optimizer
|
||||
if not hasattr(optimizer, '_hook_for_profile'):
|
||||
# PyTorch 2.0 removes the `_hook_for_profile` in
|
||||
# `torch.optim.Optimizer`. We maintain this function here to
|
||||
# keep compatibility.
|
||||
# TODO: Remove this hardcode when ColossalAI supports
|
||||
# PyTorch 2.0
|
||||
optimizer.__class__._hook_for_profile = object
|
||||
|
||||
# We do not pass `scheduler` and `Dataloader` here for:
|
||||
# 1. `Booster.boost` cannot accept a list of schedulers.
|
||||
# 2. `Strategy` cannot not accept dataloader now.
|
||||
model_wrapper, optimizer, *_ = self.booster.boost(model, optimizer)
|
||||
optim_wrapper.optimizer = optimizer
|
||||
default_args = {'model_wrapper': model_wrapper, 'model': model}
|
||||
model_wrapper = MODEL_WRAPPERS.build(
|
||||
self.model_wrapper, default_args=default_args)
|
||||
return model_wrapper, optim_wrapper # type: ignore
|
||||
else:
|
||||
model_wrapper, *_ = self.booster.boost(model)
|
||||
default_args = {'model_wrapper': model_wrapper, 'model': model}
|
||||
model_wrapper = MODEL_WRAPPERS.build(
|
||||
self.model_wrapper, default_args=default_args)
|
||||
return model_wrapper
|
||||
|
||||
def _setup_distributed( # type: ignore
|
||||
self,
|
||||
launcher: Optional[str] = None,
|
||||
backend: str = 'nccl',
|
||||
**kwargs,
|
||||
):
|
||||
init_dist(
|
||||
launcher, backend, init_backend='colossalai', config=self.config)
|
|
@ -2,7 +2,9 @@
|
|||
import json
|
||||
import os.path as osp
|
||||
import time
|
||||
from typing import Callable, Dict, List, Optional, Union
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
|
||||
import torch
|
||||
|
||||
try:
|
||||
import deepspeed
|
||||
|
@ -13,13 +15,197 @@ import torch.nn as nn
|
|||
|
||||
import mmengine
|
||||
from mmengine.dist import init_dist
|
||||
from mmengine.model.wrappers._deepspeed import MMDeepSpeedEngineWrapper
|
||||
from mmengine.optim import BaseOptimWrapper, _ParamScheduler
|
||||
from mmengine.registry import STRATEGIES
|
||||
from mmengine.registry import (MODEL_WRAPPERS, OPTIM_WRAPPERS, OPTIMIZERS,
|
||||
STRATEGIES)
|
||||
from mmengine.utils import get_git_hash
|
||||
from .base import BaseStrategy
|
||||
|
||||
|
||||
def register_deepspeed_optimizers() -> List[str]:
|
||||
"""Register optimizers in ``deepspeed`` to the ``OPTIMIZERS`` registry.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of registered optimizers' name.
|
||||
"""
|
||||
deepspeed_optimizers = []
|
||||
try:
|
||||
import deepspeed # noqa: F401
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
|
||||
from deepspeed.ops.lamb import FusedLamb
|
||||
from deepspeed.runtime.fp16.onebit import (OnebitAdam, OnebitLamb,
|
||||
ZeroOneAdam)
|
||||
|
||||
OPTIMIZERS.register_module(module=DeepSpeedCPUAdam)
|
||||
deepspeed_optimizers.append('DeepSpeedCPUAdam')
|
||||
OPTIMIZERS.register_module(module=FusedAdam)
|
||||
deepspeed_optimizers.append('FusedAdam')
|
||||
OPTIMIZERS.register_module(module=FusedLamb)
|
||||
deepspeed_optimizers.append('FusedLamb')
|
||||
OPTIMIZERS.register_module(module=OnebitAdam)
|
||||
deepspeed_optimizers.append('OnebitAdam')
|
||||
OPTIMIZERS.register_module(module=OnebitLamb)
|
||||
deepspeed_optimizers.append('OnebitLamb')
|
||||
OPTIMIZERS.register_module(module=ZeroOneAdam)
|
||||
deepspeed_optimizers.append('ZeroOneAdam')
|
||||
|
||||
return deepspeed_optimizers
|
||||
|
||||
|
||||
@OPTIM_WRAPPERS.register_module()
|
||||
class DeepSpeedOptimWrapper(BaseOptimWrapper):
|
||||
|
||||
def __init__(self, optimizer):
|
||||
super().__init__(optimizer)
|
||||
self._model = None
|
||||
|
||||
@property
|
||||
def model(self):
|
||||
if self._model is None:
|
||||
raise ValueError('model attribute should be set before accessing.')
|
||||
return self._model
|
||||
|
||||
@model.setter
|
||||
def model(self, value):
|
||||
self._model = value
|
||||
|
||||
def update_params(self, loss) -> None: # type: ignore
|
||||
"""Update parameters in :attr:`optimizer`."""
|
||||
self.backward(loss)
|
||||
self.step()
|
||||
|
||||
def backward(self, loss: torch.Tensor, **kwargs) -> None:
|
||||
""""Perform gradient back propagation."""
|
||||
self.model.backward(loss)
|
||||
|
||||
def zero_grad(self, **kwargs) -> None:
|
||||
raise NotImplementedError(
|
||||
'DeepSpeedOptimWrapper does not support zero_grad method '
|
||||
'currently.')
|
||||
|
||||
def step(self, **kwargs):
|
||||
self.model.step()
|
||||
|
||||
def state_dict(self) -> dict:
|
||||
state_dict = {}
|
||||
if self.base_param_settings is not None:
|
||||
state_dict['base_param_settings'] = self.base_param_settings
|
||||
|
||||
return state_dict
|
||||
|
||||
def load_state_dict(self, state_dict: dict) -> None:
|
||||
base_param_settings = state_dict.pop('base_param_settings', None)
|
||||
|
||||
if base_param_settings is not None:
|
||||
self.base_param_settings = base_param_settings
|
||||
|
||||
|
||||
@MODEL_WRAPPERS.register_module()
|
||||
class MMDeepSpeedEngineWrapper:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model: 'deepspeed.DeepSpeedEngine',
|
||||
inputs_to_half: Optional[List[Union[int, str]]] = None,
|
||||
):
|
||||
self.model = model
|
||||
self._inputs_to_half = inputs_to_half
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.model, name)
|
||||
|
||||
def train_step(
|
||||
self,
|
||||
data: Union[dict, tuple, list],
|
||||
optim_wrapper: DeepSpeedOptimWrapper,
|
||||
) -> Dict[str, torch.Tensor]:
|
||||
data = self.model.module.data_preprocessor(data, training=True)
|
||||
data = self._cast_inputs_half(data)
|
||||
losses = self._run_forward(data, mode='loss')
|
||||
parsed_loss, log_vars = self.model.module.parse_losses(losses)
|
||||
optim_wrapper.update_params(parsed_loss)
|
||||
|
||||
return log_vars
|
||||
|
||||
def val_step(self, data: Union[dict, tuple, list]) -> list:
|
||||
"""Gets the prediction of module during validation process.
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
|
||||
Returns:
|
||||
list: The predictions of given data.
|
||||
"""
|
||||
data = self.model.module.data_preprocessor(data, False)
|
||||
data = self._cast_inputs_half(data)
|
||||
return self._run_forward(data, mode='predict')
|
||||
|
||||
def test_step(self, data: Union[dict, tuple, list]) -> list:
|
||||
"""Gets the predictions of module during testing process.
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
|
||||
Returns:
|
||||
list: The predictions of given data.
|
||||
"""
|
||||
data = self.model.module.data_preprocessor(data, False)
|
||||
data = self._cast_inputs_half(data)
|
||||
return self._run_forward(data, mode='predict')
|
||||
|
||||
def _run_forward(self, data: Union[dict, tuple, list], mode: str) -> Any:
|
||||
"""Unpacks data for :meth:`forward`
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
mode (str): Mode of forward.
|
||||
|
||||
Returns:
|
||||
dict or list: Results of training or testing mode.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
results = self.model(**data, mode=mode)
|
||||
elif isinstance(data, (list, tuple)):
|
||||
results = self.model(*data, mode=mode)
|
||||
else:
|
||||
raise TypeError('Output of `data_preprocessor` should be '
|
||||
f'list, tuple or dict, but got {type(data)}')
|
||||
return results
|
||||
|
||||
def _cast_inputs_half(self, inputs: Union[list, tuple, dict, None]):
|
||||
"""Cast inputs to half precision if needed.
|
||||
|
||||
Args:
|
||||
inputs (list or tuple or dict or None): Inputs to be casted.
|
||||
|
||||
Returns:
|
||||
list or tuple or dict or None: Casted inputs.
|
||||
"""
|
||||
if self._inputs_to_half is None:
|
||||
return inputs
|
||||
|
||||
if isinstance(inputs, (list, tuple)):
|
||||
new_inputs = []
|
||||
for i, v in enumerate(inputs):
|
||||
if i in self._inputs_to_half:
|
||||
new_inputs.append(v.half())
|
||||
else:
|
||||
new_inputs.append(v)
|
||||
return inputs.__class__(new_inputs)
|
||||
elif isinstance(inputs, dict):
|
||||
for k, v in inputs.items():
|
||||
if k in self._inputs_to_half:
|
||||
inputs[k] = v.half()
|
||||
return inputs
|
||||
else:
|
||||
raise TypeError('inputs should be list, tuple or dict, '
|
||||
f'but got {type(inputs)}')
|
||||
|
||||
|
||||
@STRATEGIES.register_module()
|
||||
class DeepSpeedStrategy(BaseStrategy):
|
||||
"""Support training models with DeepSpeed.
|
||||
|
@ -113,6 +299,8 @@ class DeepSpeedStrategy(BaseStrategy):
|
|||
self.config['steps_per_print'] = steps_per_print
|
||||
self._inputs_to_half = inputs_to_half
|
||||
|
||||
register_deepspeed_optimizers()
|
||||
|
||||
def _parse_config(self, config):
|
||||
if config is None:
|
||||
config = dict()
|
||||
|
|
|
@ -126,6 +126,9 @@ def _init_dist_pytorch(backend, init_backend='torch', **kwargs) -> None:
|
|||
elif init_backend == 'deepspeed':
|
||||
import deepspeed
|
||||
deepspeed.init_distributed(dist_backend=backend, **kwargs)
|
||||
elif init_backend == 'colossalai':
|
||||
import colossalai
|
||||
colossalai.launch_from_torch(backend=backend, **kwargs)
|
||||
else:
|
||||
raise ValueError(
|
||||
'supported "init_backend" is "torch" or "deepspeed", '
|
||||
|
@ -208,6 +211,14 @@ def _init_dist_slurm(backend,
|
|||
elif init_backend == 'deepspeed':
|
||||
import deepspeed
|
||||
deepspeed.init_distributed(dist_backend=backend, **kwargs)
|
||||
elif init_backend == 'colossalai':
|
||||
import colossalai
|
||||
colossalai.launch_from_slurm(
|
||||
backend=backend,
|
||||
host=os.environ['MASTER_ADDR'],
|
||||
port=os.environ['MASTER_PORT'],
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
raise ValueError('supported "init_backend" is "torch" or "deepspeed", '
|
||||
f'but got {init_backend}')
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import torch
|
||||
|
||||
from mmengine.optim.optimizer._deepspeed import DeepSpeedOptimWrapper
|
||||
from mmengine.registry import MODEL_WRAPPERS
|
||||
|
||||
try:
|
||||
from deepspeed.runtime.engine import DeepSpeedEngine
|
||||
except ImportError:
|
||||
DeepSpeedEngine = None
|
||||
|
||||
|
||||
@MODEL_WRAPPERS.register_module()
|
||||
class MMDeepSpeedEngineWrapper:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
model: DeepSpeedEngine,
|
||||
inputs_to_half: Optional[List[Union[int, str]]] = None,
|
||||
):
|
||||
self.model = model
|
||||
self._inputs_to_half = inputs_to_half
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.model, name)
|
||||
|
||||
def train_step(
|
||||
self,
|
||||
data: Union[dict, tuple, list],
|
||||
optim_wrapper: DeepSpeedOptimWrapper,
|
||||
) -> Dict[str, torch.Tensor]:
|
||||
data = self.model.module.data_preprocessor(data, training=True)
|
||||
data = self._cast_inputs_half(data)
|
||||
losses = self._run_forward(data, mode='loss')
|
||||
parsed_loss, log_vars = self.model.module.parse_losses(losses)
|
||||
optim_wrapper.update_params(parsed_loss)
|
||||
|
||||
return log_vars
|
||||
|
||||
def val_step(self, data: Union[dict, tuple, list]) -> list:
|
||||
"""Gets the prediction of module during validation process.
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
|
||||
Returns:
|
||||
list: The predictions of given data.
|
||||
"""
|
||||
data = self.model.module.data_preprocessor(data, False)
|
||||
data = self._cast_inputs_half(data)
|
||||
return self._run_forward(data, mode='predict')
|
||||
|
||||
def test_step(self, data: Union[dict, tuple, list]) -> list:
|
||||
"""Gets the predictions of module during testing process.
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
|
||||
Returns:
|
||||
list: The predictions of given data.
|
||||
"""
|
||||
data = self.model.module.data_preprocessor(data, False)
|
||||
data = self._cast_inputs_half(data)
|
||||
return self._run_forward(data, mode='predict')
|
||||
|
||||
def _run_forward(self, data: Union[dict, tuple, list], mode: str) -> Any:
|
||||
"""Unpacks data for :meth:`forward`
|
||||
|
||||
Args:
|
||||
data (dict or tuple or list): Data sampled from dataset.
|
||||
mode (str): Mode of forward.
|
||||
|
||||
Returns:
|
||||
dict or list: Results of training or testing mode.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
results = self.model(**data, mode=mode)
|
||||
elif isinstance(data, (list, tuple)):
|
||||
results = self.model(*data, mode=mode)
|
||||
else:
|
||||
raise TypeError('Output of `data_preprocessor` should be '
|
||||
f'list, tuple or dict, but got {type(data)}')
|
||||
return results
|
||||
|
||||
def _cast_inputs_half(self, inputs: Union[list, tuple, dict, None]):
|
||||
"""Cast inputs to half precision if needed.
|
||||
|
||||
Args:
|
||||
inputs (list or tuple or dict or None): Inputs to be casted.
|
||||
|
||||
Returns:
|
||||
list or tuple or dict or None: Casted inputs.
|
||||
"""
|
||||
if self._inputs_to_half is None:
|
||||
return inputs
|
||||
|
||||
if isinstance(inputs, (list, tuple)):
|
||||
new_inputs = []
|
||||
for i, v in enumerate(inputs):
|
||||
if i in self._inputs_to_half:
|
||||
new_inputs.append(v.half())
|
||||
else:
|
||||
new_inputs.append(v)
|
||||
return inputs.__class__(new_inputs)
|
||||
elif isinstance(inputs, dict):
|
||||
for k, v in inputs.items():
|
||||
if k in self._inputs_to_half:
|
||||
inputs[k] = v.half()
|
||||
return inputs
|
||||
else:
|
||||
raise TypeError('inputs should be list, tuple or dict, '
|
||||
f'but got {type(inputs)}')
|
|
@ -1,9 +1,9 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
from .optimizer import (OPTIM_WRAPPER_CONSTRUCTORS, OPTIMIZERS,
|
||||
AmpOptimWrapper, ApexOptimWrapper, BaseOptimWrapper,
|
||||
DeepSpeedOptimWrapper, DefaultOptimWrapperConstructor,
|
||||
OptimWrapper, OptimWrapperDict,
|
||||
ZeroRedundancyOptimizer, build_optim_wrapper)
|
||||
DefaultOptimWrapperConstructor, OptimWrapper,
|
||||
OptimWrapperDict, ZeroRedundancyOptimizer,
|
||||
build_optim_wrapper)
|
||||
# yapf: disable
|
||||
from .scheduler import (ConstantLR, ConstantMomentum, ConstantParamScheduler,
|
||||
CosineAnnealingLR, CosineAnnealingMomentum,
|
||||
|
@ -31,5 +31,5 @@ __all__ = [
|
|||
'OptimWrapperDict', 'OneCycleParamScheduler', 'OneCycleLR', 'PolyLR',
|
||||
'PolyMomentum', 'PolyParamScheduler', 'ReduceOnPlateauLR',
|
||||
'ReduceOnPlateauMomentum', 'ReduceOnPlateauParamScheduler',
|
||||
'ZeroRedundancyOptimizer', 'BaseOptimWrapper', 'DeepSpeedOptimWrapper'
|
||||
'ZeroRedundancyOptimizer', 'BaseOptimWrapper'
|
||||
]
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
from ._deepspeed import DeepSpeedOptimWrapper
|
||||
from .amp_optimizer_wrapper import AmpOptimWrapper
|
||||
from .apex_optimizer_wrapper import ApexOptimWrapper
|
||||
from .base import BaseOptimWrapper
|
||||
|
@ -14,5 +13,5 @@ __all__ = [
|
|||
'OPTIM_WRAPPER_CONSTRUCTORS', 'OPTIMIZERS',
|
||||
'DefaultOptimWrapperConstructor', 'build_optim_wrapper', 'OptimWrapper',
|
||||
'AmpOptimWrapper', 'ApexOptimWrapper', 'OptimWrapperDict',
|
||||
'ZeroRedundancyOptimizer', 'BaseOptimWrapper', 'DeepSpeedOptimWrapper'
|
||||
'ZeroRedundancyOptimizer', 'BaseOptimWrapper'
|
||||
]
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
|
||||
import torch
|
||||
|
||||
from mmengine.registry import OPTIM_WRAPPERS
|
||||
from .base import BaseOptimWrapper
|
||||
|
||||
|
||||
@OPTIM_WRAPPERS.register_module()
|
||||
class DeepSpeedOptimWrapper(BaseOptimWrapper):
|
||||
|
||||
def __init__(self, optimizer):
|
||||
super().__init__(optimizer)
|
||||
self._model = None
|
||||
|
||||
@property
|
||||
def model(self):
|
||||
if self._model is None:
|
||||
raise ValueError('model attribute should be set before accessing.')
|
||||
return self._model
|
||||
|
||||
@model.setter
|
||||
def model(self, value):
|
||||
self._model = value
|
||||
|
||||
def update_params(self, loss) -> None: # type: ignore
|
||||
"""Update parameters in :attr:`optimizer`."""
|
||||
self.backward(loss)
|
||||
self.step()
|
||||
|
||||
def backward(self, loss: torch.Tensor, **kwargs) -> None:
|
||||
""""Perform gradient back propagation."""
|
||||
self.model.backward(loss)
|
||||
|
||||
def zero_grad(self, **kwargs) -> None:
|
||||
raise NotImplementedError(
|
||||
'DeepSpeedOptimWrapper does not support zero_grad method '
|
||||
'currently.')
|
||||
|
||||
def step(self, **kwargs):
|
||||
self.model.step()
|
||||
|
||||
def state_dict(self) -> dict:
|
||||
state_dict = {}
|
||||
if self.base_param_settings is not None:
|
||||
state_dict['base_param_settings'] = self.base_param_settings
|
||||
|
||||
return state_dict
|
||||
|
||||
def load_state_dict(self, state_dict: dict) -> None:
|
||||
base_param_settings = state_dict.pop('base_param_settings', None)
|
||||
|
||||
if base_param_settings is not None:
|
||||
self.base_param_settings = base_param_settings
|
|
@ -129,42 +129,6 @@ def register_sophia_optimizers() -> List[str]:
|
|||
SOPHIA_OPTIMIZERS = register_sophia_optimizers()
|
||||
|
||||
|
||||
def register_deepspeed_optimizers() -> List[str]:
|
||||
"""Register optimizers in ``deepspeed`` to the ``OPTIMIZERS`` registry.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of registered optimizers' name.
|
||||
"""
|
||||
deepspeed_optimizers = []
|
||||
try:
|
||||
import deepspeed # noqa: F401
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
|
||||
from deepspeed.ops.lamb import FusedLamb
|
||||
from deepspeed.runtime.fp16.onebit import (OnebitAdam, OnebitLamb,
|
||||
ZeroOneAdam)
|
||||
|
||||
OPTIMIZERS.register_module(module=DeepSpeedCPUAdam)
|
||||
deepspeed_optimizers.append('DeepSpeedCPUAdam')
|
||||
OPTIMIZERS.register_module(module=FusedAdam)
|
||||
deepspeed_optimizers.append('FusedAdam')
|
||||
OPTIMIZERS.register_module(module=FusedLamb)
|
||||
deepspeed_optimizers.append('FusedLamb')
|
||||
OPTIMIZERS.register_module(module=OnebitAdam)
|
||||
deepspeed_optimizers.append('OnebitAdam')
|
||||
OPTIMIZERS.register_module(module=OnebitLamb)
|
||||
deepspeed_optimizers.append('OnebitLamb')
|
||||
OPTIMIZERS.register_module(module=ZeroOneAdam)
|
||||
deepspeed_optimizers.append('ZeroOneAdam')
|
||||
|
||||
return deepspeed_optimizers
|
||||
|
||||
|
||||
DEEPSPEED_OPTIMIZERS = register_deepspeed_optimizers()
|
||||
|
||||
|
||||
def build_optim_wrapper(model: nn.Module,
|
||||
cfg: Union[dict, Config, ConfigDict]) -> OptimWrapper:
|
||||
"""Build function of OptimWrapper.
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# Copyright (c) OpenMMLab. All rights reserved.
|
||||
import inspect
|
||||
import logging
|
||||
from typing import List, Optional, Union
|
||||
|
||||
|
@ -293,15 +294,23 @@ class DefaultOptimWrapperConstructor:
|
|||
optim_wrapper_cfg = self.optim_wrapper_cfg.copy()
|
||||
optim_wrapper_cfg.setdefault('type', 'OptimWrapper')
|
||||
optimizer_cfg = self.optimizer_cfg.copy()
|
||||
optimizer_cls = self.optimizer_cfg['type']
|
||||
# Optimizer like HybridAdam in colossalai requires the argument name
|
||||
# `model_params` rather than `params`. Here we get the first argument
|
||||
# name and fill it with the model parameters.
|
||||
if isinstance(optimizer_cls, str):
|
||||
optimizer_cls = OPTIMIZERS.get(self.optimizer_cfg['type'])
|
||||
fisrt_arg_name = next(
|
||||
iter(inspect.signature(optimizer_cls).parameters))
|
||||
# if no paramwise option is specified, just use the global setting
|
||||
if not self.paramwise_cfg:
|
||||
optimizer_cfg['params'] = model.parameters()
|
||||
optimizer_cfg[fisrt_arg_name] = model.parameters()
|
||||
optimizer = OPTIMIZERS.build(optimizer_cfg)
|
||||
else:
|
||||
# set param-wise lr and weight decay recursively
|
||||
params: List = []
|
||||
self.add_params(params, model)
|
||||
optimizer_cfg['params'] = params
|
||||
optimizer_cfg[fisrt_arg_name] = params
|
||||
optimizer = OPTIMIZERS.build(optimizer_cfg)
|
||||
optim_wrapper = OPTIM_WRAPPERS.build(
|
||||
optim_wrapper_cfg, default_args=dict(optimizer=optimizer))
|
||||
|
|
|
@ -122,10 +122,6 @@ class OptimWrapper(BaseOptimWrapper):
|
|||
assert accumulative_counts > 0, (
|
||||
'_accumulative_counts at least greater than or equal to 1')
|
||||
self._accumulative_counts = accumulative_counts
|
||||
|
||||
assert isinstance(optimizer, Optimizer), (
|
||||
'optimizer must be a `torch.optim.Optimizer` instance, but got '
|
||||
f'{type(optimizer)}')
|
||||
self.optimizer = optimizer
|
||||
|
||||
if clip_grad is not None:
|
||||
|
|
|
@ -1268,7 +1268,7 @@ class FlexibleRunner:
|
|||
try:
|
||||
getattr(hook, fn_name)(self, **kwargs)
|
||||
except TypeError as e:
|
||||
raise TypeError(f'{e} in {hook}') from None
|
||||
raise TypeError(f'{e} in {hook}') from e
|
||||
|
||||
def register_hook(
|
||||
self,
|
||||
|
|
Loading…
Reference in New Issue