PaddleClas/ppcls/data/dataloader/dali.py

796 lines
34 KiB
Python

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import copy
import os
from collections import defaultdict
from typing import Any, Callable, Dict, List, Tuple, Union, Optional
import numpy as np
import nvidia.dali.fn as fn
import nvidia.dali.ops as ops
import nvidia.dali.pipeline as pipeline
import nvidia.dali.types as types
import paddle
from nvidia.dali.plugin.paddle import DALIGenericIterator
from nvidia.dali.plugin.base_iterator import LastBatchPolicy
from ppcls.data.preprocess.ops.dali_operators import ColorJitter
from ppcls.data.preprocess.ops.dali_operators import CropImage
from ppcls.data.preprocess.ops.dali_operators import CropMirrorNormalize
from ppcls.data.preprocess.ops.dali_operators import DecodeImage
from ppcls.data.preprocess.ops.dali_operators import DecodeRandomResizedCrop
from ppcls.data.preprocess.ops.dali_operators import NormalizeImage
from ppcls.data.preprocess.ops.dali_operators import Pad
from ppcls.data.preprocess.ops.dali_operators import RandCropImage
from ppcls.data.preprocess.ops.dali_operators import RandCropImageV2
from ppcls.data.preprocess.ops.dali_operators import RandFlipImage
from ppcls.data.preprocess.ops.dali_operators import RandomCropImage
from ppcls.data.preprocess.ops.dali_operators import RandomRot90
from ppcls.data.preprocess.ops.dali_operators import RandomRotation
from ppcls.data.preprocess.ops.dali_operators import ResizeImage
from ppcls.data.preprocess.ops.dali_operators import ToCHWImage
from ppcls.engine.train.utils import type_name
from ppcls.utils import logger
INTERP_MAP = {
"nearest": types.DALIInterpType.INTERP_NN, # cv2.INTER_NEAREST
"bilinear": types.DALIInterpType.INTERP_LINEAR, # cv2.INTER_LINEAR
"bicubic": types.DALIInterpType.INTERP_CUBIC, # cv2.INTER_CUBIC
"lanczos": types.DALIInterpType.INTERP_LANCZOS3, # cv2.INTER_LANCZOS4
}
def make_pair(x: Union[Any, Tuple[Any], List[Any]]) -> Tuple[Any]:
"""repeat input x to be an tuple if x is an single element, else return x directly
Args:
x (Union[Any, Tuple[Any], List[Any]]): input x
Returns:
Tuple[Any]: tupled input
"""
return x if isinstance(x, (tuple, list)) else (x, x)
def parse_value_with_key(content: Union[Dict, List[Dict]],
key: str) -> Union[None, Any]:
"""parse value according to given key recursively, return None if not found
Args:
content (Union[Dict, List[Dict]]): content to be parsed
key (str): given key
Returns:
Union[None, Any]: result
"""
if isinstance(content, dict):
if key in content:
return content[key]
for content_ in content.values():
value = parse_value_with_key(content_, key)
if value is not None:
return value
elif isinstance(content, (tuple, list)):
for content_ in content:
value = parse_value_with_key(content_, key)
if value is not None:
return value
return None
def convert_cfg_to_dali(op_name: str, device: str, **op_cfg) -> Dict[str, Any]:
"""convert original preprocess op params into DALI-based op params
Args:
op_name (str): name of operator
device (str): device which operator applied on
Returns:
Dict[str, Any]: converted arguments for DALI initialization
"""
assert device in ["cpu", "gpu"
], f"device({device}) must in [\"cpu\", \"gpu\"]"
dali_op_cfg = {}
if op_name == "DecodeImage":
device = "cpu" if device == "cpu" else "mixed"
to_rgb = op_cfg.get("to_rgb", True)
channel_first = op_cfg.get("channel_first", False)
assert channel_first is False, \
f"`channel_first` must set to False when using DALI, but got {channel_first}"
dali_op_cfg.update({"device": device})
dali_op_cfg.update({
"output_type": types.DALIImageType.RGB
if to_rgb else types.DALIImageType.BGR
})
dali_op_cfg.update({
"device_memory_padding":
op_cfg.get("device_memory_padding", 211025920)
})
dali_op_cfg.update({
"host_memory_padding": op_cfg.get("host_memory_padding", 140544512)
})
elif op_name == "ResizeImage":
size = op_cfg.get("size", None)
resize_short = op_cfg.get("resize_short", None)
interpolation = op_cfg.get("interpolation", None)
if size is not None:
size = make_pair(size)
dali_op_cfg.update({"resize_y": size[0], "resize_x": size[1]})
if resize_short is not None:
dali_op_cfg.update({"resize_shorter": resize_short})
if interpolation is not None:
dali_op_cfg.update({"interp_type": INTERP_MAP[interpolation]})
elif op_name == "CropImage":
size = op_cfg.get("size", 224)
size = make_pair(size)
dali_op_cfg.update({"crop_h": size[1], "crop_w": size[0]})
dali_op_cfg.update({"crop_pos_x": 0.5, "crop_pos_y": 0.5})
elif op_name == "RandomCropImage":
size = op_cfg.get("size", 224)
if size is not None:
size = make_pair(size)
dali_op_cfg.update({"crop_h": size[1], "crop_w": size[0]})
elif op_name == "RandCropImage":
size = op_cfg.get("size", 224)
size = make_pair(size)
scale = op_cfg.get("scale", [0.08, 1.0])
ratio = op_cfg.get("ratio", [3.0 / 4, 4.0 / 3])
interpolation = op_cfg.get("interpolation", "bilinear")
dali_op_cfg.update({"size": size})
if scale is not None:
dali_op_cfg.update({"random_area": scale})
if ratio is not None:
dali_op_cfg.update({"random_aspect_ratio": ratio})
if interpolation is not None:
dali_op_cfg.update({"interp_type": INTERP_MAP[interpolation]})
elif op_name == "RandCropImageV2":
size = op_cfg.get("size", 224)
size = make_pair(size)
dali_op_cfg.update({"crop_h": size[1], "crop_w": size[0]})
elif op_name == "RandFlipImage":
prob = op_cfg.get("prob", 0.5)
flip_code = op_cfg.get("flip_code", 1)
dali_op_cfg.update({"prob": prob})
dali_op_cfg.update({"flip_code": flip_code})
elif op_name == "NormalizeImage":
# scale * (in - mean) / stddev + shift
scale = op_cfg.get("scale", 1.0 / 255.0)
if isinstance(scale, str):
scale = eval(scale)
mean = op_cfg.get("mean", [0.485, 0.456, 0.406])
std = op_cfg.get("std", [0.229, 0.224, 0.225])
mean = [v / scale for v in mean]
std = [v / scale for v in std]
order = op_cfg.get("order", "chw")
channel_num = op_cfg.get("channel_num", 3)
output_fp16 = op_cfg.get("output_fp16", False)
dali_op_cfg.update({
"mean": np.reshape(
np.array(
mean, dtype="float32"), [channel_num, 1, 1]
if order == "chw" else [1, 1, channel_num])
})
dali_op_cfg.update({
"stddev": np.reshape(
np.array(
std, dtype="float32"), [channel_num, 1, 1]
if order == "chw" else [1, 1, channel_num])
})
if output_fp16:
dali_op_cfg.update({"dtype": types.FLOAT16})
elif op_name == "ToCHWImage":
dali_op_cfg.update({"perm": [2, 0, 1]})
elif op_name == "ColorJitter":
prob = op_cfg.get("prob", 1.0)
brightness = op_cfg.get("brightness", 0.0)
contrast = op_cfg.get("contrast", 0.0)
saturation = op_cfg.get("saturation", 0.0)
hue = op_cfg.get("hue", 0.0)
dali_op_cfg.update({"prob": prob})
dali_op_cfg.update({"brightness_factor": brightness})
dali_op_cfg.update({"contrast_factor": contrast})
dali_op_cfg.update({"saturation_factor": saturation})
dali_op_cfg.update({"hue_factor": hue})
elif op_name == "RandomRotation":
prob = op_cfg.get("prob", 0.5)
degrees = op_cfg.get("degrees", 90)
interpolation = op_cfg.get("interpolation", "bilinear")
dali_op_cfg.update({"prob": prob})
dali_op_cfg.update({"angle": degrees})
dali_op_cfg.update({"interp_type": INTERP_MAP[interpolation]})
elif op_name == "Pad":
size = op_cfg.get("size", 224)
size = make_pair(size)
padding = op_cfg.get("padding", 0)
fill = op_cfg.get("fill", 0)
dali_op_cfg.update({
"crop_h": padding + size[1] + padding,
"crop_w": padding + size[0] + padding
})
dali_op_cfg.update({"fill_values": fill})
dali_op_cfg.update({"out_of_bounds_policy": "pad"})
elif op_name == "RandomRot90":
interpolation = op_cfg.get("interpolation", "nearest")
elif op_name == "DecodeRandomResizedCrop":
device = "cpu" if device == "cpu" else "mixed"
output_type = op_cfg.get("output_type", types.DALIImageType.RGB)
device_memory_padding = op_cfg.get("device_memory_padding", 211025920)
host_memory_padding = op_cfg.get("host_memory_padding", 140544512)
scale = op_cfg.get("scale", [0.08, 1.0])
ratio = op_cfg.get("ratio", [3.0 / 4, 4.0 / 3])
num_attempts = op_cfg.get("num_attempts", 100)
size = op_cfg.get("size", 224)
dali_op_cfg.update({"device": device})
if output_type is not None:
dali_op_cfg.update({"output_type": output_type})
if device_memory_padding is not None:
dali_op_cfg.update({
"device_memory_padding": device_memory_padding
})
if host_memory_padding is not None:
dali_op_cfg.update({"host_memory_padding": host_memory_padding})
if scale is not None:
dali_op_cfg.update({"random_area": scale})
if ratio is not None:
dali_op_cfg.update({"random_aspect_ratio": ratio})
if num_attempts is not None:
dali_op_cfg.update({"num_attempts": num_attempts})
if size is not None:
dali_op_cfg.update({"resize_x": size, "resize_y": size})
elif op_name == "CropMirrorNormalize":
dtype = types.FLOAT16 if op_cfg.get("output_fp16",
False) else types.FLOAT
output_layout = op_cfg.get("output_layout", "CHW")
size = op_cfg.get("size", None)
scale = op_cfg.get("scale", 1 / 255.0)
if isinstance(scale, str):
scale = eval(scale)
mean = op_cfg.get("mean", [0.485, 0.456, 0.406])
mean = [v / scale for v in mean]
std = op_cfg.get("std", [0.229, 0.224, 0.225])
std = [v / scale for v in std]
pad_output = op_cfg.get("channel_num", 3) == 4
prob = op_cfg.get("prob", 0.5)
dali_op_cfg.update({"dtype": dtype})
if output_layout is not None:
dali_op_cfg.update({"output_layout": output_layout})
if size is not None:
dali_op_cfg.update({"crop": (size, size)})
if mean is not None:
dali_op_cfg.update({"mean": mean})
if std is not None:
dali_op_cfg.update({"std": std})
if pad_output is not None:
dali_op_cfg.update({"pad_output": pad_output})
if prob is not None:
dali_op_cfg.update({"prob": prob})
else:
raise ValueError(
f"DALI operator \"{op_name}\" in PaddleClas is not implemented now. please refer to docs/zh_CN/training/config_description/develop_with_DALI.md"
)
if "device" not in dali_op_cfg:
dali_op_cfg.update({"device": device})
return dali_op_cfg
def build_dali_transforms(op_cfg_list: List[Dict[str, Any]],
mode: str,
device: str="gpu",
enable_fuse: bool=True) -> List[Callable]:
"""create dali operators based on the config
Args:
op_cfg_list (List[Dict[str, Any]]): a dict list, used to create some operators, such as config below
--------------------------------
- DecodeImage:
to_rgb: True
channel_first: False
- ResizeImage:
size: 224
- NormalizeImage:
scale: 0.00392157
mean: [0.485, 0.456, 0.406]
std: [0.229, 0.224, 0.225]
order: ""
--------------------------------
mode (str): mode.
device (str): device which dali operator(s) applied in. Defaults to "gpu".
enable_fuse (bool): whether to use fused dali operators instead of single operators, such as DecodeRandomResizedCrop. Defaults to True.
Returns:
List[Callable]: Callable DALI operators in list.
"""
assert isinstance(op_cfg_list, list), "operator config should be a list"
# build dali transforms list
dali_op_list = []
idx = 0
num_cfg_node = len(op_cfg_list)
while idx < num_cfg_node:
op_cfg = op_cfg_list[idx]
op_name = list(op_cfg)[0]
op_param = {} if op_cfg[op_name] is None else copy.deepcopy(op_cfg[
op_name])
fused_success = False
if enable_fuse:
# fuse operators if enabled
if idx + 1 < num_cfg_node:
op_name_nxt = list(op_cfg_list[idx + 1])[0]
if (op_name == "DecodeImage" and
op_name_nxt == "RandCropImage"):
fused_op_name = "DecodeRandomResizedCrop"
fused_op_param = convert_cfg_to_dali(
fused_op_name, device, **{
** op_param, ** (op_cfg_list[idx + 1][op_name_nxt])
})
fused_dali_op = eval(fused_op_name)(**fused_op_param)
idx += 2
dali_op_list.append(fused_dali_op)
fused_success = True
logger.info(
f"DALI fused Operator conversion({mode}): [DecodeImage, RandCropImage] -> {type_name(dali_op_list[-1])}: {fused_op_param}"
)
if not fused_success and 0 < idx and idx + 1 < num_cfg_node:
op_name_pre = list(op_cfg_list[idx - 1])[0]
op_name_nxt = list(op_cfg_list[idx + 1])[0]
if (op_name_pre == "RandCropImage" and
op_name == "RandFlipImage" and
op_name_nxt == "NormalizeImage"):
fused_op_name = "CropMirrorNormalize"
fused_op_param = convert_cfg_to_dali(
fused_op_name, device, **{
** op_param, **
(op_cfg_list[idx - 1][op_name_pre]), **
(op_cfg_list[idx + 1][op_name_nxt])
})
fused_dali_op = eval(fused_op_name)(**fused_op_param)
idx += 2
dali_op_list.append(fused_dali_op)
fused_success = True
logger.info(
f"DALI fused Operator conversion({mode}): [RandCropImage, RandFlipImage, NormalizeImage] -> {type_name(dali_op_list[-1])}: {fused_op_param}"
)
if not fused_success and idx + 1 < num_cfg_node:
op_name_nxt = list(op_cfg_list[idx + 1])[0]
if (op_name == "CropImage" and
op_name_nxt == "NormalizeImage"):
fused_op_name = "CropMirrorNormalize"
fused_op_param = convert_cfg_to_dali(
fused_op_name, device, **{
**
op_param,
**
(op_cfg_list[idx + 1][op_name_nxt]),
"prob": 0.0
})
fused_dali_op = eval(fused_op_name)(**fused_op_param)
idx += 2
dali_op_list.append(fused_dali_op)
fused_success = True
logger.info(
f"DALI fused Operator conversion({mode}): [CropImage, NormalizeImage] -> {type_name(dali_op_list[-1])}: {fused_op_param}"
)
if not enable_fuse or not fused_success:
assert isinstance(op_cfg,
dict) and len(op_cfg) == 1, "yaml format error"
if op_name == "Pad":
# NOTE: Argument `size` must be provided for DALI operator
op_param.update({
"size": parse_value_with_key(op_cfg_list[:idx], "size")
})
dali_param = convert_cfg_to_dali(op_name, device, **op_param)
dali_op = eval(op_name)(**dali_param)
dali_op_list.append(dali_op)
idx += 1
logger.info(
f"DALI Operator conversion({mode}): {op_name} -> {type_name(dali_op_list[-1])}: {dali_param}"
)
return dali_op_list
class ExternalSource_RandomIdentity(object):
"""PKsampler implemented with ExternalSource
Args:
batch_size (int): batch size
sample_per_id (int): number of instance(s) within an class
device_id (int): device id
shard_id (int): shard id
num_gpus (int): number of gpus
image_root (str): image root directory
cls_label_path (str): path to annotation file, such as `train_list.txt` or `val_list.txt`
delimiter (Optional[str], optional): delimiter. Defaults to None.
relabel (bool, optional): whether do relabel when original label do not starts from 0 or are discontinuous. Defaults to False.
sample_method (str, optional): sample method when generating prob_list. Defaults to "sample_avg_prob".
id_list (List[int], optional): list of (start_id, end_id, start_id, end_id) for set of ids to duplicated. Defaults to None.
ratio (List[Union[int, float]], optional): list of (ratio1, ratio2..) the duplication number for ids in id_list. Defaults to None.
shuffle (bool): whether to shuffle label list. Defaults to True.
"""
def __init__(self,
batch_size: int,
sample_per_id: int,
device_id: int,
shard_id: int,
num_gpus: int,
image_root: str,
cls_label_path: str,
delimiter: Optional[str]=None,
relabel: bool=False,
sample_method: str="sample_avg_prob",
id_list: List[int]=None,
ratio: List[Union[int, float]]=None,
shuffle: bool=True):
self.batch_size = batch_size
self.sample_per_id = sample_per_id
self.label_per_batch = self.batch_size // self.sample_per_id
self.device_id = device_id
self.shard_id = shard_id
self.num_gpus = num_gpus
self._img_root = image_root
self._cls_path = cls_label_path
self.delimiter = delimiter if delimiter is not None else " "
self.relabel = relabel
self.sample_method = sample_method
self.image_paths = []
self.labels = []
self.epoch = 0
# NOTE: code from ImageNetDataset below
with open(self._cls_path, "r") as fd:
lines = fd.readlines()
if self.relabel:
label_set = set()
for line in lines:
line = line.strip().split(self.delimiter)
label_set.add(np.int64(line[1]))
label_map = {
oldlabel: newlabel
for newlabel, oldlabel in enumerate(label_set)
}
for line in lines:
line = line.strip().split(self.delimiter)
self.image_paths.append(os.path.join(self._img_root, line[0]))
if self.relabel:
self.labels.append(label_map[np.int64(line[1])])
else:
self.labels.append(np.int64(line[1]))
assert os.path.exists(self.image_paths[
-1]), f"path {self.image_paths[-1]} does not exist."
# NOTE: code from PKSampler below
# group sample indexes into their label bucket
self.label_dict = defaultdict(list)
for idx, label in enumerate(self.labels):
self.label_dict[label].append(idx)
# get all label
self.label_list = list(self.label_dict)
assert len(self.label_list) * self.sample_per_id >= self.batch_size, \
f"batch size({self.batch_size}) should not be bigger than than #classes({len(self.label_list)})*sample_per_id({self.sample_per_id})"
if self.sample_method == "id_avg_prob":
self.prob_list = np.array([1 / len(self.label_list)] *
len(self.label_list))
elif self.sample_method == "sample_avg_prob":
counter = []
for label_i in self.label_list:
counter.append(len(self.label_dict[label_i]))
self.prob_list = np.array(counter) / sum(counter)
# reweight prob_list according to id_list and ratio if provided
if id_list and ratio:
assert len(id_list) % 2 == 0 and len(id_list) == len(ratio) * 2
for i in range(len(self.prob_list)):
for j in range(len(ratio)):
if i >= id_list[j * 2] and i <= id_list[j * 2 + 1]:
self.prob_list[i] = self.prob_list[i] * ratio[j]
break
self.prob_list = self.prob_list / sum(self.prob_list)
assert os.path.exists(
self._cls_path), f"path {self._cls_path} does not exist."
assert os.path.exists(
self._img_root), f"path {self._img_root} does not exist."
diff = np.abs(sum(self.prob_list) - 1)
if diff > 0.00000001:
self.prob_list[-1] = 1 - sum(self.prob_list[:-1])
if self.prob_list[-1] > 1 or self.prob_list[-1] < 0:
logger.error("PKSampler prob list error")
else:
logger.info(
"sum of prob list not equal to 1, diff is {}, change the last prob".
format(diff))
# whole dataset size
self.data_set_len = len(self.image_paths)
# get sharded size
self.sharded_data_set_len = self.data_set_len // self.num_gpus
# iteration log
self.shuffle = shuffle
self.total_iter = self.sharded_data_set_len // batch_size
self.iter_count = 0
def __iter__(self):
if self.shuffle:
seed = self.shard_id * 12345 + self.epoch
np.random.RandomState(seed).shuffle(self.label_list)
np.random.RandomState(seed).shuffle(self.prob_list)
self.epoch += 1
return self
def __next__(self):
if self.iter_count >= self.total_iter:
self.__iter__()
self.iter_count = 0
batch_indexes = []
for _ in range(self.sharded_data_set_len):
batch_label_list = np.random.choice(
self.label_list,
size=self.label_per_batch,
replace=False,
p=self.prob_list)
for label_i in batch_label_list:
label_i_indexes = self.label_dict[label_i]
if self.sample_per_id <= len(label_i_indexes):
batch_indexes.extend(
np.random.choice(
label_i_indexes,
size=self.sample_per_id,
replace=False))
else:
batch_indexes.extend(
np.random.choice(
label_i_indexes,
size=self.sample_per_id,
replace=True))
if len(batch_indexes) == self.batch_size:
break
batch_indexes = []
batch_raw_images = []
batch_labels = []
for index in batch_indexes:
batch_raw_images.append(
np.fromfile(
self.image_paths[index], dtype="uint8"))
batch_labels.append(self.labels[index])
self.iter_count += 1
return (batch_raw_images, np.array(batch_labels, dtype="int64"))
def __len__(self):
return self.sharded_data_set_len
class HybridPipeline(pipeline.Pipeline):
"""Hybrid Pipeline
Args:
device (str): device
batch_size (int): batch size
py_num_workers (int): number of python worker(s)
num_threads (int): number of thread(s)
device_id (int): device id
seed (int): random seed
file_root (str): file root path
file_list (str): path to annotation file, such as `train_list.txt` or `val_list.txt`
transform_list (List[Callable]): List of DALI transform operator(s)
shard_id (int, optional): shard id. Defaults to 0.
num_shards (int, optional): number of shard(s). Defaults to 1.
random_shuffle (bool, optional): whether shuffle data during training. Defaults to True.
ext_src (optional): custom external source. Defaults to None.
"""
def __init__(self,
device: str,
batch_size: int,
py_num_workers: int,
num_threads: int,
device_id: int,
seed: int,
file_root: str,
file_list: str,
transform_list: List[Callable],
shard_id: int=0,
num_shards: int=1,
random_shuffle: bool=True,
ext_src=None):
super(HybridPipeline, self).__init__(
batch_size=batch_size,
device_id=device_id,
seed=seed,
py_start_method="fork" if ext_src is None else "spawn",
py_num_workers=py_num_workers,
num_threads=num_threads)
self.device = device
self.ext_src = ext_src
if ext_src is None:
self.reader = ops.readers.File(
file_root=file_root,
file_list=file_list,
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=random_shuffle)
self.transforms = ops.Compose(transform_list)
self.cast = ops.Cast(dtype=types.DALIDataType.INT64, device=device)
def define_graph(self):
if self.ext_src:
raw_images, labels = fn.external_source(
source=self.ext_src,
num_outputs=2,
dtype=[types.DALIDataType.UINT8, types.DALIDataType.INT64],
batch=True,
parallel=True)
else:
raw_images, labels = self.reader(name="Reader")
images = self.transforms(raw_images)
return [
images, self.cast(labels.gpu() if self.device == "gpu" else labels)
]
def __len__(self):
if self.ext_src is not None:
return len(self.ext_src)
return self.epoch_size(name="Reader")
class DALIImageNetIterator(DALIGenericIterator):
def __init__(self, *kargs, **kwargs):
super(DALIImageNetIterator, self).__init__(*kargs, **kwargs)
self.in_dynamic_mode = paddle.in_dynamic_mode()
def __next__(self) -> List[paddle.Tensor]:
data_batch = super(DALIImageNetIterator,
self).__next__() # List[Dict[str, Tensor], ...]
# reformat to List[Tensor1, Tensor2, ...]
data_batch = [
paddle.to_tensor(data_batch[0][key])
if self.in_dynamic_mode else data_batch[0][key]
for key in self.output_map
]
return data_batch
def dali_dataloader(config: Dict[str, Any],
mode: str,
device: str,
py_num_workers: int=1,
num_threads: int=4,
seed: Optional[int]=None,
enable_fuse: bool=True) -> DALIImageNetIterator:
"""build and return HybridPipeline
Args:
config (Dict[str, Any]): train/eval dataloader configuration
mode (str): mode
device (str): device string
py_num_workers (int, optional): number of python worker(s). Defaults to 1.
num_threads (int, optional): number of thread(s). Defaults to 4.
seed (Optional[int], optional): random seed. Defaults to None.
enable_fuse (bool, optional): enable fused operator(s). Defaults to True.
Returns:
DALIImageNetIterator: Iterable DALI dataloader
"""
assert "gpu" in device, f"device must be \"gpu\" when running with DALI, but got {device}"
config_dataloader = config[mode]
device_id = int(device.split(":")[1])
device = "gpu"
seed = 42 if seed is None else seed
env = os.environ
num_gpus = paddle.distributed.get_world_size()
batch_size = config_dataloader["sampler"]["batch_size"]
file_root = config_dataloader["dataset"]["image_root"]
file_list = config_dataloader["dataset"]["cls_label_path"]
sampler_name = config_dataloader["sampler"].get("name",
"DistributedBatchSampler")
transform_ops_cfg = config_dataloader["dataset"]["transform_ops"]
random_shuffle = config_dataloader["sampler"].get("shuffle", None)
dali_transforms = build_dali_transforms(
transform_ops_cfg, mode, device, enable_fuse=enable_fuse)
if "ToCHWImage" not in [type_name(op) for op in dali_transforms] and (
"CropMirrorNormalize" not in
[type_name(op) for op in dali_transforms]):
dali_transforms.append(ToCHWImage(perm=[2, 0, 1], device=device))
logger.info(
"Append DALI operator \"ToCHWImage\" at the end of dali_transforms for getting output in \"CHW\" shape"
)
if mode.lower() in ["train"]:
if "PADDLE_TRAINER_ID" in env and "PADDLE_TRAINERS_NUM" in env and "FLAGS_selected_gpus" in env:
shard_id = int(env["PADDLE_TRAINER_ID"])
num_shards = int(env["PADDLE_TRAINERS_NUM"])
device_id = int(env["FLAGS_selected_gpus"])
else:
shard_id = 0
num_shards = 1
logger.info(
f"Building DALI {mode} pipeline with num_shards: {num_shards}, num_gpus: {num_gpus}"
)
random_shuffle = random_shuffle if random_shuffle is not None else True
if sampler_name in ["PKSampler", "DistributedRandomIdentitySampler"]:
ext_src = ExternalSource_RandomIdentity(
batch_size=batch_size,
sample_per_id=config_dataloader["sampler"][
"sample_per_id"
if sampler_name == "PKSampler" else "num_instances"],
device_id=device_id,
shard_id=shard_id,
num_gpus=num_gpus,
image_root=file_root,
cls_label_path=file_list,
delimiter=None,
relabel=config_dataloader["dataset"].get("relabel", False),
sample_method=config_dataloader["sampler"].get(
"sample_method", "sample_avg_prob"),
id_list=config_dataloader["sampler"].get("id_list", None),
ratio=config_dataloader["sampler"].get("ratio", None),
shuffle=random_shuffle)
logger.info(
f"Building DALI {mode} pipeline with ext_src({type_name(ext_src)})"
)
else:
ext_src = None
pipe = HybridPipeline(device, batch_size, py_num_workers, num_threads,
device_id, seed + shard_id, file_root, file_list,
dali_transforms, shard_id, num_shards,
random_shuffle, ext_src)
pipe.build()
pipelines = [pipe]
if ext_src is None:
return DALIImageNetIterator(
pipelines, ["data", "label"], reader_name="Reader")
else:
return DALIImageNetIterator(
pipelines,
["data", "label"],
size=len(ext_src),
last_batch_policy=LastBatchPolicy.
DROP # make reset() successfully
)
elif mode.lower() in ["eval", "gallery", "query"]:
assert sampler_name in ["DistributedBatchSampler"], \
f"sampler_name({sampler_name}) must in [\"DistributedBatchSampler\"]"
if "PADDLE_TRAINER_ID" in env and "PADDLE_TRAINERS_NUM" in env and "FLAGS_selected_gpus" in env:
shard_id = int(env["PADDLE_TRAINER_ID"])
num_shards = int(env["PADDLE_TRAINERS_NUM"])
device_id = int(env["FLAGS_selected_gpus"])
else:
shard_id = 0
num_shards = 1
logger.info(
f"Building DALI {mode} pipeline with num_shards: {num_shards}, num_gpus: {num_gpus}..."
)
random_shuffle = random_shuffle if random_shuffle is not None else False
pipe = HybridPipeline(device, batch_size, py_num_workers, num_threads,
device_id, seed + shard_id, file_root, file_list,
dali_transforms, shard_id, num_shards,
random_shuffle)
pipe.build()
pipelines = [pipe]
return DALIImageNetIterator(
pipelines, ["data", "label"], reader_name="Reader")
else:
raise ValueError(f"Invalid mode({mode}) when building DALI pipeline")