# Modified from https://github.com/PaddlePaddle/PaddleOCR/tree/release/2.6/ppocr/data/imaug import math import random import sys import cv2 import imgaug import imgaug.augmenters as iaa import numpy as np import pyclipper from shapely.geometry import Polygon from easycv.datasets.registry import PIPELINES from easycv.framework.errors import RuntimeError class AugmenterBuilder(object): def __init__(self): pass def build(self, args, root=True): if args is None or len(args) == 0: return None elif isinstance(args, list): if root: sequence = [self.build(value, root=False) for value in args] return iaa.Sequential(sequence) else: return getattr( iaa, args[0])(*[self.to_tuple_if_list(a) for a in args[1:]]) elif isinstance(args, dict): cls = getattr(iaa, args['type']) return cls( ** {k: self.to_tuple_if_list(v) for k, v in args['args'].items()}) else: raise RuntimeError('unknown augmenter arg: ' + str(args)) def to_tuple_if_list(self, obj): if isinstance(obj, list): return tuple(obj) return obj @PIPELINES.register_module() class IaaAugment(): def __init__(self, augmenter_args=None, **kwargs): if augmenter_args is None: augmenter_args = [{ 'type': 'Fliplr', 'args': { 'p': 0.5 } }, { 'type': 'Affine', 'args': { 'rotate': [-10, 10] } }, { 'type': 'Resize', 'args': { 'size': [0.5, 3] } }] self.augmenter = AugmenterBuilder().build(augmenter_args) def __call__(self, data): image = data['img'] shape = image.shape if self.augmenter: aug = self.augmenter.to_deterministic() data['img'] = aug.augment_image(image) data = self.may_augment_annotation(aug, data, shape) return data def may_augment_annotation(self, aug, data, shape): if aug is None: return data line_polys = [] for poly in data['polys']: new_poly = self.may_augment_poly(aug, shape, poly) line_polys.append(new_poly) data['polys'] = np.array(line_polys) return data def may_augment_poly(self, aug, img_shape, poly): keypoints = [imgaug.Keypoint(p[0], p[1]) for p in poly] keypoints = aug.augment_keypoints( [imgaug.KeypointsOnImage(keypoints, shape=img_shape)])[0].keypoints poly = [(p.x, p.y) for p in keypoints] return poly def is_poly_in_rect(poly, x, y, w, h): poly = np.array(poly) if poly[:, 0].min() < x or poly[:, 0].max() > x + w: return False if poly[:, 1].min() < y or poly[:, 1].max() > y + h: return False return True def is_poly_outside_rect(poly, x, y, w, h): poly = np.array(poly) if poly[:, 0].max() < x or poly[:, 0].min() > x + w: return True if poly[:, 1].max() < y or poly[:, 1].min() > y + h: return True return False def split_regions(axis): regions = [] min_axis = 0 for i in range(1, axis.shape[0]): if axis[i] != axis[i - 1] + 1: region = axis[min_axis:i] min_axis = i regions.append(region) return regions def random_select(axis, max_size): xx = np.random.choice(axis, size=2) xmin = np.min(xx) xmax = np.max(xx) xmin = np.clip(xmin, 0, max_size - 1) xmax = np.clip(xmax, 0, max_size - 1) return xmin, xmax def region_wise_random_select(regions, max_size): selected_index = list(np.random.choice(len(regions), 2)) selected_values = [] for index in selected_index: axis = regions[index] xx = int(np.random.choice(axis, size=1)) selected_values.append(xx) xmin = min(selected_values) xmax = max(selected_values) return xmin, xmax def crop_area(im, text_polys, min_crop_side_ratio, max_tries): h, w, _ = im.shape h_array = np.zeros(h, dtype=np.int32) w_array = np.zeros(w, dtype=np.int32) for points in text_polys: points = np.round(points, decimals=0).astype(np.int32) minx = np.min(points[:, 0]) maxx = np.max(points[:, 0]) w_array[minx:maxx] = 1 miny = np.min(points[:, 1]) maxy = np.max(points[:, 1]) h_array[miny:maxy] = 1 # ensure the cropped area not across a text h_axis = np.where(h_array == 0)[0] w_axis = np.where(w_array == 0)[0] if len(h_axis) == 0 or len(w_axis) == 0: return 0, 0, w, h h_regions = split_regions(h_axis) w_regions = split_regions(w_axis) for i in range(max_tries): if len(w_regions) > 1: xmin, xmax = region_wise_random_select(w_regions, w) else: xmin, xmax = random_select(w_axis, w) if len(h_regions) > 1: ymin, ymax = region_wise_random_select(h_regions, h) else: ymin, ymax = random_select(h_axis, h) if xmax - xmin < min_crop_side_ratio * w or ymax - ymin < min_crop_side_ratio * h: # area too small continue num_poly_in_rect = 0 for poly in text_polys: if not is_poly_outside_rect(poly, xmin, ymin, xmax - xmin, ymax - ymin): num_poly_in_rect += 1 break if num_poly_in_rect > 0: return xmin, ymin, xmax - xmin, ymax - ymin return 0, 0, w, h @PIPELINES.register_module() class EastRandomCropData(object): """ crop method for ocr detection, ensure the cropped area not across a text, and keep min side larger than min_crop_side_ratio """ def __init__(self, size=(640, 640), max_tries=10, min_crop_side_ratio=0.1, keep_ratio=True, **kwargs): """ Args: size (tuple, optional): target size to crop. Defaults to (640, 640). max_tries (int, optional): max try times. Defaults to 10. min_crop_side_ratio (float, optional): min side should larger than this. Defaults to 0.1. keep_ratio (bool, optional): whether to keep ratio. Defaults to True. """ self.size = size self.max_tries = max_tries self.min_crop_side_ratio = min_crop_side_ratio self.keep_ratio = keep_ratio def __call__(self, data): img = data['img'] text_polys = data['polys'] ignore_tags = data['ignore_tags'] texts = data['texts'] all_care_polys = [ text_polys[i] for i, tag in enumerate(ignore_tags) if not tag ] # compute crop area crop_x, crop_y, crop_w, crop_h = crop_area(img, all_care_polys, self.min_crop_side_ratio, self.max_tries) # crop img scale_w = self.size[0] / crop_w scale_h = self.size[1] / crop_h scale = min(scale_w, scale_h) h = int(crop_h * scale) w = int(crop_w * scale) if self.keep_ratio: padimg = np.zeros((self.size[1], self.size[0], img.shape[2]), img.dtype) padimg[:h, :w] = cv2.resize( img[crop_y:crop_y + crop_h, crop_x:crop_x + crop_w], (w, h)) img = padimg else: img = cv2.resize( img[crop_y:crop_y + crop_h, crop_x:crop_x + crop_w], tuple(self.size)) # crop text text_polys_crop = [] ignore_tags_crop = [] texts_crop = [] for poly, text, tag in zip(text_polys, texts, ignore_tags): poly = ((poly - (crop_x, crop_y)) * scale).tolist() if not is_poly_outside_rect(poly, 0, 0, w, h): text_polys_crop.append(poly) ignore_tags_crop.append(tag) texts_crop.append(text) data['img'] = img data['polys'] = np.array(text_polys_crop) data['ignore_tags'] = ignore_tags_crop data['texts'] = texts_crop return data @PIPELINES.register_module() class MakeBorderMap(object): """ Making Border binary mask from DBNet algorithm """ def __init__(self, shrink_ratio=0.4, thresh_min=0.3, thresh_max=0.7, **kwargs): self.shrink_ratio = shrink_ratio self.thresh_min = thresh_min self.thresh_max = thresh_max def __call__(self, data): img = data['img'] text_polys = data['polys'] ignore_tags = data['ignore_tags'] canvas = np.zeros(img.shape[:2], dtype=np.float32) mask = np.zeros(img.shape[:2], dtype=np.float32) for i in range(len(text_polys)): if ignore_tags[i]: continue self.draw_border_map(text_polys[i], canvas, mask=mask) canvas = canvas * (self.thresh_max - self.thresh_min) + self.thresh_min data['threshold_map'] = canvas data['threshold_mask'] = mask return data def draw_border_map(self, polygon, canvas, mask): polygon = np.array(polygon) assert polygon.ndim == 2 assert polygon.shape[1] == 2 polygon_shape = Polygon(polygon) if polygon_shape.area <= 0: return distance = polygon_shape.area * ( 1 - np.power(self.shrink_ratio, 2)) / polygon_shape.length subject = [tuple(l) for l in polygon] padding = pyclipper.PyclipperOffset() padding.AddPath(subject, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) padded_polygon = np.array(padding.Execute(distance)[0]) cv2.fillPoly(mask, [padded_polygon.astype(np.int32)], 1.0) xmin = padded_polygon[:, 0].min() xmax = padded_polygon[:, 0].max() ymin = padded_polygon[:, 1].min() ymax = padded_polygon[:, 1].max() width = xmax - xmin + 1 height = ymax - ymin + 1 polygon[:, 0] = polygon[:, 0] - xmin polygon[:, 1] = polygon[:, 1] - ymin xs = np.broadcast_to( np.linspace(0, width - 1, num=width).reshape(1, width), (height, width)) ys = np.broadcast_to( np.linspace(0, height - 1, num=height).reshape(height, 1), (height, width)) distance_map = np.zeros((polygon.shape[0], height, width), dtype=np.float32) for i in range(polygon.shape[0]): j = (i + 1) % polygon.shape[0] absolute_distance = self._distance(xs, ys, polygon[i], polygon[j]) distance_map[i] = np.clip(absolute_distance / distance, 0, 1) distance_map = distance_map.min(axis=0) xmin_valid = min(max(0, xmin), canvas.shape[1] - 1) xmax_valid = min(max(0, xmax), canvas.shape[1] - 1) ymin_valid = min(max(0, ymin), canvas.shape[0] - 1) ymax_valid = min(max(0, ymax), canvas.shape[0] - 1) canvas[ymin_valid:ymax_valid + 1, xmin_valid:xmax_valid + 1] = np.fmax( 1 - distance_map[ymin_valid - ymin:ymax_valid - ymax + height, xmin_valid - xmin:xmax_valid - xmax + width], canvas[ymin_valid:ymax_valid + 1, xmin_valid:xmax_valid + 1]) def _distance(self, xs, ys, point_1, point_2): ''' compute the distance from point to a line ys: coordinates in the first axis xs: coordinates in the second axis point_1, point_2: (x, y), the end of the line ''' height, width = xs.shape[:2] square_distance_1 = np.square(xs - point_1[0]) + np.square(ys - point_1[1]) square_distance_2 = np.square(xs - point_2[0]) + np.square(ys - point_2[1]) square_distance = np.square(point_1[0] - point_2[0]) + np.square(point_1[1] - point_2[1]) cosin = (square_distance - square_distance_1 - square_distance_2) / ( 2 * np.sqrt(square_distance_1 * square_distance_2)) square_sin = 1 - np.square(cosin) square_sin = np.nan_to_num(square_sin) result = np.sqrt(square_distance_1 * square_distance_2 * square_sin / square_distance) result[cosin < 0] = np.sqrt( np.fmin(square_distance_1, square_distance_2))[cosin < 0] # self.extend_line(point_1, point_2, result) return result def extend_line(self, point_1, point_2, result, shrink_ratio): ex_point_1 = (int( round(point_1[0] + (point_1[0] - point_2[0]) * (1 + shrink_ratio))), int( round(point_1[1] + (point_1[1] - point_2[1]) * (1 + shrink_ratio)))) cv2.line( result, tuple(ex_point_1), tuple(point_1), 4096.0, 1, lineType=cv2.LINE_AA, shift=0) ex_point_2 = (int( round(point_2[0] + (point_2[0] - point_1[0]) * (1 + shrink_ratio))), int( round(point_2[1] + (point_2[1] - point_1[1]) * (1 + shrink_ratio)))) cv2.line( result, tuple(ex_point_2), tuple(point_2), 4096.0, 1, lineType=cv2.LINE_AA, shift=0) return ex_point_1, ex_point_2 @PIPELINES.register_module() class MakeShrinkMap(object): r''' Making binary mask from detection data with ICDAR format. Typically following the process of class `MakeICDARData`. ''' def __init__(self, min_text_size=8, shrink_ratio=0.4, **kwargs): self.min_text_size = min_text_size self.shrink_ratio = shrink_ratio def __call__(self, data): image = data['img'] text_polys = data['polys'] ignore_tags = data['ignore_tags'] h, w = image.shape[:2] text_polys, ignore_tags = self.validate_polygons( text_polys, ignore_tags, h, w) gt = np.zeros((h, w), dtype=np.float32) mask = np.ones((h, w), dtype=np.float32) for i in range(len(text_polys)): polygon = text_polys[i] height = max(polygon[:, 1]) - min(polygon[:, 1]) width = max(polygon[:, 0]) - min(polygon[:, 0]) if ignore_tags[i] or min(height, width) < self.min_text_size: cv2.fillPoly(mask, polygon.astype(np.int32)[np.newaxis, :, :], 0) ignore_tags[i] = True else: polygon_shape = Polygon(polygon) subject = [tuple(l) for l in polygon] padding = pyclipper.PyclipperOffset() padding.AddPath(subject, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) shrinked = [] # Increase the shrink ratio every time we get multiple polygon returned back possible_ratios = np.arange(self.shrink_ratio, 1, self.shrink_ratio) np.append(possible_ratios, 1) # print(possible_ratios) for ratio in possible_ratios: # print(f"Change shrink ratio to {ratio}") distance = polygon_shape.area * ( 1 - np.power(ratio, 2)) / polygon_shape.length shrinked = padding.Execute(-distance) if len(shrinked) == 1: break if shrinked == []: cv2.fillPoly(mask, polygon.astype(np.int32)[np.newaxis, :, :], 0) ignore_tags[i] = True continue for each_shirnk in shrinked: shirnk = np.array(each_shirnk).reshape(-1, 2) cv2.fillPoly(gt, [shirnk.astype(np.int32)], 1) data['shrink_map'] = gt data['shrink_mask'] = mask return data def validate_polygons(self, polygons, ignore_tags, h, w): ''' polygons (numpy.array, required): of shape (num_instances, num_points, 2) ''' if len(polygons) == 0: return polygons, ignore_tags assert len(polygons) == len(ignore_tags) for polygon in polygons: polygon[:, 0] = np.clip(polygon[:, 0], 0, w - 1) polygon[:, 1] = np.clip(polygon[:, 1], 0, h - 1) for i in range(len(polygons)): area = self.polygon_area(polygons[i]) if abs(area) < 1: ignore_tags[i] = True if area > 0: polygons[i] = polygons[i][::-1, :] return polygons, ignore_tags def polygon_area(self, polygon): """ compute polygon area """ area = 0 q = polygon[-1] for p in polygon: area += p[0] * q[1] - p[1] * q[0] q = p return area / 2.0 @PIPELINES.register_module() class OCRDetResize(object): """resize function for ocr det test """ def __init__(self, **kwargs): super(OCRDetResize, self).__init__() self.resize_type = 0 if 'image_shape' in kwargs: self.image_shape = kwargs['image_shape'] self.resize_type = 1 elif 'limit_side_len' in kwargs: self.limit_side_len = kwargs['limit_side_len'] self.limit_type = kwargs.get('limit_type', 'min') elif 'resize_long' in kwargs: self.resize_type = 2 self.resize_long = kwargs.get('resize_long', 960) else: self.limit_side_len = 736 self.limit_type = 'min' def __call__(self, data): img = data['img'] src_h, src_w, _ = img.shape if self.resize_type == 0: # img, shape = self.resize_image_type0(img) img, [ratio_h, ratio_w] = self.resize_image_type0(img) elif self.resize_type == 2: img, [ratio_h, ratio_w] = self.resize_image_type2(img) else: # img, shape = self.resize_image_type1(img) img, [ratio_h, ratio_w] = self.resize_image_type1(img) data['img'] = img # data['shape'] = np.array([src_h, src_w, ratio_h, ratio_w]) return data def resize_image_type1(self, img): resize_h, resize_w = self.image_shape ori_h, ori_w = img.shape[:2] # (h, w, c) ratio_h = float(resize_h) / ori_h ratio_w = float(resize_w) / ori_w img = cv2.resize(img, (int(resize_w), int(resize_h))) return img, [ratio_h, ratio_w] def resize_image_type0(self, img): """ resize image to a size multiple of 32 which is required by the network args: img(array): array with shape [h, w, c] return(tuple): img, (ratio_h, ratio_w) """ limit_side_len = self.limit_side_len h, w, c = img.shape # limit the max side if self.limit_type == 'max': if max(h, w) > limit_side_len: if h > w: ratio = float(limit_side_len) / h else: ratio = float(limit_side_len) / w else: ratio = 1. elif self.limit_type == 'min': if min(h, w) < limit_side_len: if h < w: ratio = float(limit_side_len) / h else: ratio = float(limit_side_len) / w else: ratio = 1. elif self.limit_type == 'resize_long': ratio = float(limit_side_len) / max(h, w) else: raise Exception('not support limit type, image ') resize_h = int(h * ratio) resize_w = int(w * ratio) resize_h = max(int(round(resize_h / 32) * 32), 32) resize_w = max(int(round(resize_w / 32) * 32), 32) try: if int(resize_w) <= 0 or int(resize_h) <= 0: return None, (None, None) img = cv2.resize(img, (int(resize_w), int(resize_h))) except: print(img.shape, resize_w, resize_h) sys.exit(0) ratio_h = resize_h / float(h) ratio_w = resize_w / float(w) return img, [ratio_h, ratio_w] def resize_image_type2(self, img): h, w, _ = img.shape resize_w = w resize_h = h if resize_h > resize_w: ratio = float(self.resize_long) / resize_h else: ratio = float(self.resize_long) / resize_w resize_h = int(resize_h * ratio) resize_w = int(resize_w * ratio) max_stride = 128 resize_h = (resize_h + max_stride - 1) // max_stride * max_stride resize_w = (resize_w + max_stride - 1) // max_stride * max_stride img = cv2.resize(img, (int(resize_w), int(resize_h))) ratio_h = resize_h / float(h) ratio_w = resize_w / float(w) return img, [ratio_h, ratio_w]