Shortcuts

Source code for mmpose.datasets.pipelines.bottom_up_transform

# Copyright (c) OpenMMLab. All rights reserved.
import math
import random

import cv2
import numpy as np

from mmpose.core.post_processing import (get_affine_transform, get_warp_matrix,
                                         warp_affine_joints)
from mmpose.datasets.builder import PIPELINES
from .shared_transform import Compose


def _ceil_to_multiples_of(x, base=64):
    """Transform x to the integral multiple of the base."""
    return int(np.ceil(x / base)) * base


def _get_multi_scale_size(image,
                          input_size,
                          current_scale,
                          min_scale,
                          base_length=64,
                          use_udp=False):
    """Get the size for multi-scale training.

    Args:
        image: Input image.
        input_size (np.ndarray[2]): Size (w, h) of the image input.
        current_scale (float): Scale factor.
        min_scale (float): Minimal scale.
        base_length (int): The width and height should be multiples of
            base_length. Default: 64.
        use_udp (bool): To use unbiased data processing.
            Paper ref: Huang et al. The Devil is in the Details: Delving into
            Unbiased Data Processing for Human Pose Estimation (CVPR 2020).

    Returns:
        tuple: A tuple containing multi-scale sizes.

        - (w_resized, h_resized) (tuple(int)): resized width/height
        - center (np.ndarray): image center
        - scale (np.ndarray): scales wrt width/height
    """
    assert len(input_size) == 2
    h, w, _ = image.shape

    # calculate the size for min_scale
    min_input_w = _ceil_to_multiples_of(min_scale * input_size[0], base_length)
    min_input_h = _ceil_to_multiples_of(min_scale * input_size[1], base_length)
    if w < h:
        w_resized = int(min_input_w * current_scale / min_scale)
        h_resized = int(
            _ceil_to_multiples_of(min_input_w / w * h, base_length) *
            current_scale / min_scale)
        if use_udp:
            scale_w = w - 1.0
            scale_h = (h_resized - 1.0) / (w_resized - 1.0) * (w - 1.0)
        else:
            scale_w = w / 200.0
            scale_h = h_resized / w_resized * w / 200.0
    else:
        h_resized = int(min_input_h * current_scale / min_scale)
        w_resized = int(
            _ceil_to_multiples_of(min_input_h / h * w, base_length) *
            current_scale / min_scale)
        if use_udp:
            scale_h = h - 1.0
            scale_w = (w_resized - 1.0) / (h_resized - 1.0) * (h - 1.0)
        else:
            scale_h = h / 200.0
            scale_w = w_resized / h_resized * h / 200.0
    if use_udp:
        center = (scale_w / 2.0, scale_h / 2.0)
    else:
        center = np.array([round(w / 2.0), round(h / 2.0)])
    return (w_resized, h_resized), center, np.array([scale_w, scale_h])


def _resize_align_multi_scale(image,
                              input_size,
                              current_scale,
                              min_scale,
                              base_length=64):
    """Resize the images for multi-scale training.

    Args:
        image: Input image
        input_size (np.ndarray[2]): Size (w, h) of the image input
        current_scale (float): Current scale
        min_scale (float): Minimal scale
        base_length (int): The width and height should be multiples of
            base_length. Default: 64.

    Returns:
        tuple: A tuple containing image info.

        - image_resized (np.ndarray): resized image
        - center (np.ndarray): center of image
        - scale (np.ndarray): scale
    """
    assert len(input_size) == 2
    size_resized, center, scale = _get_multi_scale_size(
        image, input_size, current_scale, min_scale, base_length)

    trans = get_affine_transform(center, scale, 0, size_resized)
    image_resized = cv2.warpAffine(image, trans, size_resized)

    return image_resized, center, scale


def _resize_align_multi_scale_udp(image,
                                  input_size,
                                  current_scale,
                                  min_scale,
                                  base_length=64):
    """Resize the images for multi-scale training.

    Args:
        image: Input image
        input_size (np.ndarray[2]): Size (w, h) of the image input
        current_scale (float): Current scale
        min_scale (float): Minimal scale
        base_length (int): The width and height should be multiples of
            base_length. Default: 64.

    Returns:
        tuple: A tuple containing image info.

        - image_resized (np.ndarray): resized image
        - center (np.ndarray): center of image
        - scale (np.ndarray): scale
    """
    assert len(input_size) == 2
    size_resized, _, _ = _get_multi_scale_size(image, input_size,
                                               current_scale, min_scale,
                                               base_length, True)

    _, center, scale = _get_multi_scale_size(image, input_size, min_scale,
                                             min_scale, base_length, True)

    trans = get_warp_matrix(
        theta=0,
        size_input=np.array(scale, dtype=np.float32),
        size_dst=np.array(size_resized, dtype=np.float32) - 1.0,
        size_target=np.array(scale, dtype=np.float32))
    image_resized = cv2.warpAffine(
        image.copy(), trans, size_resized, flags=cv2.INTER_LINEAR)

    return image_resized, center, scale


[docs]class HeatmapGenerator: """Generate heatmaps for bottom-up models. Args: num_joints (int): Number of keypoints output_size (np.ndarray): Size (w, h) of feature map sigma (int): Sigma of the heatmaps. use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, output_size, num_joints, sigma=-1, use_udp=False): if not isinstance(output_size, np.ndarray): output_size = np.array(output_size) if output_size.size > 1: assert len(output_size) == 2 self.output_size = output_size else: self.output_size = np.array([output_size, output_size], dtype=int) self.num_joints = num_joints if sigma < 0: sigma = self.output_size.prod()**0.5 / 64 self.sigma = sigma size = 6 * sigma + 3 self.use_udp = use_udp if use_udp: self.x = np.arange(0, size, 1, np.float32) self.y = self.x[:, None] else: x = np.arange(0, size, 1, np.float32) y = x[:, None] x0, y0 = 3 * sigma + 1, 3 * sigma + 1 self.g = np.exp(-((x - x0)**2 + (y - y0)**2) / (2 * sigma**2)) def __call__(self, joints): """Generate heatmaps.""" hms = np.zeros( (self.num_joints, self.output_size[1], self.output_size[0]), dtype=np.float32) sigma = self.sigma for p in joints: for idx, pt in enumerate(p): if pt[2] > 0: x, y = int(pt[0]), int(pt[1]) if x < 0 or y < 0 or \ x >= self.output_size[0] or y >= self.output_size[1]: continue if self.use_udp: x0 = 3 * sigma + 1 + pt[0] - x y0 = 3 * sigma + 1 + pt[1] - y g = np.exp(-((self.x - x0)**2 + (self.y - y0)**2) / (2 * sigma**2)) else: g = self.g ul = int(np.round(x - 3 * sigma - 1)), int(np.round(y - 3 * sigma - 1)) br = int(np.round(x + 3 * sigma + 2)), int(np.round(y + 3 * sigma + 2)) c, d = max(0, -ul[0]), min(br[0], self.output_size[0]) - ul[0] a, b = max(0, -ul[1]), min(br[1], self.output_size[1]) - ul[1] cc, dd = max(0, ul[0]), min(br[0], self.output_size[0]) aa, bb = max(0, ul[1]), min(br[1], self.output_size[1]) hms[idx, aa:bb, cc:dd] = np.maximum(hms[idx, aa:bb, cc:dd], g[a:b, c:d]) return hms
[docs]class OffsetGenerator: """Generate offset maps for bottom-up models. Args: num_joints (int): Number of keypoints output_size (np.ndarray): Size (w, h) of feature map radius (int): Radius of area assigned with valid offset """ def __init__(self, output_size, num_joints, radius=4): if not isinstance(output_size, np.ndarray): output_size = np.array(output_size) if output_size.size > 1: assert len(output_size) == 2 self.output_size = output_size else: self.output_size = np.array([output_size, output_size], dtype=int) self.num_joints = num_joints assert radius > 0, f'`radius` must be a positive value, ' \ f'but got {radius}' self.radius = radius def __call__(self, center, joints, area): """Generate offset maps.""" offset_map = np.zeros( (self.num_joints * 2, self.output_size[1], self.output_size[0]), dtype=np.float32) weight_map = np.zeros( (self.num_joints * 2, self.output_size[1], self.output_size[0]), dtype=np.float32) area_map = np.zeros((self.output_size[1], self.output_size[0]), dtype=np.float32) for i in range(len(center)): x_center, y_center = center[i, 0, 0], center[i, 0, 1] if center[i, 0, 2] < 1 or x_center < 0 or y_center < 0 \ or x_center >= self.output_size[0] \ or y_center >= self.output_size[1]: continue for j in range(self.num_joints): x, y = joints[i, j, :2] if joints[i, j, 2] < 1 or x >= self.output_size[0] \ or y >= self.output_size[1] or x < 0 or y < 0: continue start_x = max(int(x_center - self.radius), 0) start_y = max(int(y_center - self.radius), 0) end_x = min(int(x_center + self.radius), self.output_size[0]) end_y = min(int(y_center + self.radius), self.output_size[1]) for pos_x in range(start_x, end_x): for pos_y in range(start_y, end_y): offset_x = pos_x - x offset_y = pos_y - y if offset_map[j*2, pos_y, pos_x] != 0 \ or offset_map[j*2+1, pos_y, pos_x] != 0: if area_map[pos_y, pos_x] < area[i]: continue offset_map[j * 2, pos_y, pos_x] = offset_x offset_map[j * 2 + 1, pos_y, pos_x] = offset_y weight_map[j * 2, pos_y, pos_x] = 1. / np.sqrt(area[i]) weight_map[j * 2 + 1, pos_y, pos_x] = 1. / np.sqrt(area[i]) area_map[pos_y, pos_x] = area[i] return offset_map, weight_map
[docs]class JointsEncoder: """Encodes the visible joints into (coordinates, score); The coordinate of one joint and its score are of `int` type. (idx * output_size**2 + y * output_size + x, 1) or (0, 0). Args: max_num_people(int): Max number of people in an image num_joints(int): Number of keypoints output_size(np.ndarray): Size (w, h) of feature map tag_per_joint(bool): Option to use one tag map per joint. """ def __init__(self, max_num_people, num_joints, output_size, tag_per_joint): self.max_num_people = max_num_people self.num_joints = num_joints if not isinstance(output_size, np.ndarray): output_size = np.array(output_size) if output_size.size > 1: assert len(output_size) == 2 self.output_size = output_size else: self.output_size = np.array([output_size, output_size], dtype=int) self.tag_per_joint = tag_per_joint def __call__(self, joints): """ Note: - number of people in image: N - number of keypoints: K - max number of people in an image: M Args: joints (np.ndarray[N,K,3]) Returns: visible_kpts (np.ndarray[M,K,2]). """ visible_kpts = np.zeros((self.max_num_people, self.num_joints, 2), dtype=np.float32) for i in range(len(joints)): tot = 0 for idx, pt in enumerate(joints[i]): x, y = int(pt[0]), int(pt[1]) if (pt[2] > 0 and 0 <= y < self.output_size[1] and 0 <= x < self.output_size[0]): if self.tag_per_joint: visible_kpts[i][tot] = \ (idx * self.output_size.prod() + y * self.output_size[0] + x, 1) else: visible_kpts[i][tot] = (y * self.output_size[0] + x, 1) tot += 1 return visible_kpts
[docs]class PAFGenerator: """Generate part affinity fields. Args: output_size (np.ndarray): Size (w, h) of feature map. limb_width (int): Limb width of part affinity fields. skeleton (list[list]): connections of joints. """ def __init__(self, output_size, limb_width, skeleton): if not isinstance(output_size, np.ndarray): output_size = np.array(output_size) if output_size.size > 1: assert len(output_size) == 2 self.output_size = output_size else: self.output_size = np.array([output_size, output_size], dtype=int) self.limb_width = limb_width self.skeleton = skeleton def _accumulate_paf_map_(self, pafs, src, dst, count): """Accumulate part affinity fields between two given joints. Args: pafs (np.ndarray[2,H,W]): paf maps (2 dimensions:x axis and y axis) for a certain limb connection. This argument will be modified inplace. src (np.ndarray[2,]): coordinates of the source joint. dst (np.ndarray[2,]): coordinates of the destination joint. count (np.ndarray[H,W]): count map that preserves the number of non-zero vectors at each point. This argument will be modified inplace. """ limb_vec = dst - src norm = np.linalg.norm(limb_vec) if norm == 0: unit_limb_vec = np.zeros(2) else: unit_limb_vec = limb_vec / norm min_x = max(np.floor(min(src[0], dst[0]) - self.limb_width), 0) max_x = min( np.ceil(max(src[0], dst[0]) + self.limb_width), self.output_size[0] - 1) min_y = max(np.floor(min(src[1], dst[1]) - self.limb_width), 0) max_y = min( np.ceil(max(src[1], dst[1]) + self.limb_width), self.output_size[1] - 1) range_x = list(range(int(min_x), int(max_x + 1), 1)) range_y = list(range(int(min_y), int(max_y + 1), 1)) mask = np.zeros_like(count, dtype=bool) if len(range_x) > 0 and len(range_y) > 0: xx, yy = np.meshgrid(range_x, range_y) delta_x = xx - src[0] delta_y = yy - src[1] dist = np.abs(delta_x * unit_limb_vec[1] - delta_y * unit_limb_vec[0]) mask_local = (dist < self.limb_width) mask[yy, xx] = mask_local pafs[0, mask] += unit_limb_vec[0] pafs[1, mask] += unit_limb_vec[1] count += mask return pafs, count def __call__(self, joints): """Generate the target part affinity fields.""" pafs = np.zeros( (len(self.skeleton) * 2, self.output_size[1], self.output_size[0]), dtype=np.float32) for idx, sk in enumerate(self.skeleton): count = np.zeros((self.output_size[1], self.output_size[0]), dtype=np.float32) for p in joints: src = p[sk[0]] dst = p[sk[1]] if src[2] > 0 and dst[2] > 0: self._accumulate_paf_map_(pafs[2 * idx:2 * idx + 2], src[:2], dst[:2], count) pafs[2 * idx:2 * idx + 2] /= np.maximum(count, 1) return pafs
[docs]@PIPELINES.register_module() class GetKeypointCenterArea: """Copmute center and area from keypoitns for each instance. Required key: 'joints'. Modifies key: 'center' and 'area'. Args: minimal_area (float): Minimum of allowed area. Instance with smaller area will be ignored in training. Default: 32. """ def __init__(self, minimal_area=32): self.minimal_area = minimal_area def __call__(self, results): """Copmute center and area from keypoitns for each instance.""" center_list = [] area_list = [] for joints in results['joints']: area = np.zeros((joints.shape[0]), dtype=np.float32) center = np.zeros((joints.shape[0], 1, 3), dtype=np.float32) for i in range(joints.shape[0]): visible_joints = joints[i][joints[i][..., 2] > 0][..., :2] if visible_joints.size == 0: continue center[i, 0, :2] = visible_joints.mean(axis=0, keepdims=True) center[i, 0, 2] = 1 area[i] = np.power( visible_joints.max(axis=0) - visible_joints.min(axis=0), 2)[:2].sum() if area[i] < self.minimal_area: center[i, 0, 2] = 0 center_list.append(center) area_list.append(area) results['center'] = center_list results['area'] = area_list return results
[docs]@PIPELINES.register_module() class BottomUpRandomFlip: """Data augmentation with random image flip for bottom-up. Args: flip_prob (float): Probability of flip. """ def __init__(self, flip_prob=0.5): self.flip_prob = flip_prob def __call__(self, results): """Perform data augmentation with random image flip.""" image, mask, joints = results['img'], results['mask'], results[ 'joints'] self.flip_index = results['ann_info']['flip_index'] self.output_size = results['ann_info']['heatmap_size'] assert isinstance(mask, list) assert isinstance(joints, list) assert len(mask) == len(joints) assert len(mask) == len(self.output_size) if np.random.random() < self.flip_prob: image = image[:, ::-1].copy() - np.zeros_like(image) for i, _output_size in enumerate(self.output_size): if not isinstance(_output_size, np.ndarray): _output_size = np.array(_output_size) if _output_size.size > 1: assert len(_output_size) == 2 else: _output_size = np.array([_output_size, _output_size], dtype=int) mask[i] = mask[i][:, ::-1].copy() joints[i] = joints[i][:, self.flip_index] joints[i][:, :, 0] = _output_size[0] - joints[i][:, :, 0] - 1 if i == 0 and 'bboxes' in results: bbox = results['bboxes'] bbox = bbox[:, [1, 0, 3, 2]] bbox[:, :, 0] = _output_size[0] - bbox[:, :, 0] - 1 results['bboxes'] = bbox results['img'], results['mask'], results[ 'joints'] = image, mask, joints return results
[docs]@PIPELINES.register_module() class BottomUpRandomAffine: """Data augmentation with random scaling & rotating. Args: rot_factor (int): Rotating to [-rotation_factor, rotation_factor] scale_factor (float): Scaling to [1-scale_factor, 1+scale_factor] scale_type: wrt ``long`` or ``short`` length of the image. trans_factor: Translation factor. use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, rot_factor, scale_factor, scale_type, trans_factor, use_udp=False): self.max_rotation = rot_factor self.min_scale = scale_factor[0] self.max_scale = scale_factor[1] self.scale_type = scale_type self.trans_factor = trans_factor self.use_udp = use_udp def _get_scale(self, image_size, resized_size): w, h = image_size w_resized, h_resized = resized_size if w / w_resized < h / h_resized: if self.scale_type == 'long': w_pad = h / h_resized * w_resized h_pad = h elif self.scale_type == 'short': w_pad = w h_pad = w / w_resized * h_resized else: raise ValueError(f'Unknown scale type: {self.scale_type}') else: if self.scale_type == 'long': w_pad = w h_pad = w / w_resized * h_resized elif self.scale_type == 'short': w_pad = h / h_resized * w_resized h_pad = h else: raise ValueError(f'Unknown scale type: {self.scale_type}') scale = np.array([w_pad, h_pad], dtype=np.float32) return scale def __call__(self, results): """Perform data augmentation with random scaling & rotating.""" image, mask, joints = results['img'], results['mask'], results[ 'joints'] self.input_size = results['ann_info']['image_size'] if not isinstance(self.input_size, np.ndarray): self.input_size = np.array(self.input_size) if self.input_size.size > 1: assert len(self.input_size) == 2 else: self.input_size = [self.input_size, self.input_size] self.output_size = results['ann_info']['heatmap_size'] assert isinstance(mask, list) assert isinstance(joints, list) assert len(mask) == len(joints) assert len(mask) == len(self.output_size), (len(mask), len(self.output_size), self.output_size) height, width = image.shape[:2] if self.use_udp: center = np.array(((width - 1.0) / 2, (height - 1.0) / 2)) else: center = np.array((width / 2, height / 2)) img_scale = np.array([width, height], dtype=np.float32) aug_scale = np.random.random() * (self.max_scale - self.min_scale) \ + self.min_scale img_scale *= aug_scale aug_rot = (np.random.random() * 2 - 1) * self.max_rotation if self.trans_factor > 0: dx = np.random.randint(-self.trans_factor * img_scale[0] / 200.0, self.trans_factor * img_scale[0] / 200.0) dy = np.random.randint(-self.trans_factor * img_scale[1] / 200.0, self.trans_factor * img_scale[1] / 200.0) center[0] += dx center[1] += dy if self.use_udp: for i, _output_size in enumerate(self.output_size): if not isinstance(_output_size, np.ndarray): _output_size = np.array(_output_size) if _output_size.size > 1: assert len(_output_size) == 2 else: _output_size = [_output_size, _output_size] scale = self._get_scale(img_scale, _output_size) trans = get_warp_matrix( theta=aug_rot, size_input=center * 2.0, size_dst=np.array( (_output_size[0], _output_size[1]), dtype=np.float32) - 1.0, size_target=scale) mask[i] = cv2.warpAffine( (mask[i] * 255).astype(np.uint8), trans, (int(_output_size[0]), int(_output_size[1])), flags=cv2.INTER_LINEAR) / 255 mask[i] = (mask[i] > 0.5).astype(np.float32) joints[i][:, :, 0:2] = \ warp_affine_joints(joints[i][:, :, 0:2].copy(), trans) if results['ann_info']['scale_aware_sigma']: joints[i][:, :, 3] = joints[i][:, :, 3] / aug_scale scale = self._get_scale(img_scale, self.input_size) mat_input = get_warp_matrix( theta=aug_rot, size_input=center * 2.0, size_dst=np.array((self.input_size[0], self.input_size[1]), dtype=np.float32) - 1.0, size_target=scale) image = cv2.warpAffine( image, mat_input, (int(self.input_size[0]), int(self.input_size[1])), flags=cv2.INTER_LINEAR) else: for i, _output_size in enumerate(self.output_size): if not isinstance(_output_size, np.ndarray): _output_size = np.array(_output_size) if _output_size.size > 1: assert len(_output_size) == 2 else: _output_size = [_output_size, _output_size] scale = self._get_scale(img_scale, _output_size) mat_output = get_affine_transform( center=center, scale=scale / 200.0, rot=aug_rot, output_size=_output_size) mask[i] = cv2.warpAffine( (mask[i] * 255).astype(np.uint8), mat_output, (int(_output_size[0]), int(_output_size[1]))) / 255 mask[i] = (mask[i] > 0.5).astype(np.float32) joints[i][:, :, 0:2] = \ warp_affine_joints(joints[i][:, :, 0:2], mat_output) if results['ann_info']['scale_aware_sigma']: joints[i][:, :, 3] = joints[i][:, :, 3] / aug_scale if i == 0 and 'bboxes' in results: results['bboxes'] = warp_affine_joints( results['bboxes'], mat_output) scale = self._get_scale(img_scale, self.input_size) mat_input = get_affine_transform( center=center, scale=scale / 200.0, rot=aug_rot, output_size=self.input_size) image = cv2.warpAffine(image, mat_input, (int( self.input_size[0]), int(self.input_size[1]))) results['img'], results['mask'], results[ 'joints'] = image, mask, joints return results
[docs]@PIPELINES.register_module() class BottomUpGenerateHeatmapTarget: """Generate multi-scale heatmap target for bottom-up. Required key: 'joints', 'mask' and 'center'. Modifies key: 'target', 'heatmaps' and 'masks'. Args: sigma (int or tuple): Sigma of heatmap Gaussian. If sigma is a tuple, the first item should be the sigma of keypoints and the second item should be the sigma of center. bg_weight (float): Weight for background. Default: 1.0. gen_center_heatmap (bool): Whether to generate heatmaps for instance centers. Default: False. use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, sigma, bg_weight=1.0, gen_center_heatmap=False, use_udp=False): if isinstance(sigma, int): sigma = (sigma, ) if gen_center_heatmap: assert len(sigma) == 2, 'sigma for centers must be given if ' \ '`gen_center_heatmap` is True. ' \ 'e.g. sigma=(2, 4)' self.sigma = sigma self.bg_weight = bg_weight self.gen_center_heatmap = gen_center_heatmap self.use_udp = use_udp def _generate(self, num_joints, sigma, heatmap_size): """Get heatmap generator.""" heatmap_generator = [ HeatmapGenerator(output_size, num_joints, sigma, self.use_udp) for output_size in heatmap_size ] return heatmap_generator def __call__(self, results): """Generate multi-scale heatmap target for bottom-up.""" target_list = list() joints_list = results['joints'] mask_list = results['mask'] output_mask_list = [] heatmap_generator = \ self._generate(results['ann_info']['num_joints'], self.sigma[0], results['ann_info']['heatmap_size']) for scale_id in range(results['ann_info']['num_scales']): heatmaps = heatmap_generator[scale_id](joints_list[scale_id]) target_list.append(heatmaps.astype(np.float32)) if self.bg_weight != 1: mask = mask_list[scale_id].copy().astype(np.float32) mask = mask[None, ...].repeat(heatmaps.shape[0], axis=0) mask = mask * self.bg_weight mask[np.logical_and(heatmaps > 0, mask > 0)] = 1 output_mask_list.append(mask) if self.gen_center_heatmap: center_list = results['center'] heatmap_generator = self._generate( 1, self.sigma[1], results['ann_info']['heatmap_size']) for scale_id in range(results['ann_info']['num_scales']): heatmaps = heatmap_generator[scale_id]( center_list[scale_id]).astype(np.float32) target_list[scale_id] = np.concatenate( (heatmaps, target_list[scale_id]), axis=0) if self.bg_weight != 1: mask = mask_list[scale_id].copy().astype(np.float32) mask = mask[None, ...] * self.bg_weight mask[np.logical_and(heatmaps > 0, mask > 0)] = 1 output_mask_list[scale_id] = np.concatenate( (mask, output_mask_list[scale_id]), axis=0) results['target'] = target_list results['heatmaps'] = target_list results['masks'] = output_mask_list return results
[docs]@PIPELINES.register_module() class BottomUpGenerateOffsetTarget: """Generate multi-scale offset target for bottom-up. Required key: 'center', 'joints and 'area'. Modifies key: 'offsets', 'offset_weights. Args: radius (int): Radius of labeled area for each instance. """ def __init__(self, radius=4): self.radius = radius def _generate(self, num_joints, heatmap_size): """Get offset generator.""" offset_generator = [ OffsetGenerator(output_size, num_joints, self.radius) for output_size in heatmap_size ] return offset_generator def __call__(self, results): """Generate multi-scale offset target for bottom-up.""" target_list = list() weight_list = list() center_list = results['center'] joints_list = results['joints'] area_list = results['area'] offset_generator = self._generate(results['ann_info']['num_joints'], results['ann_info']['heatmap_size']) for scale_id in range(results['ann_info']['num_scales']): offset, offset_weight = offset_generator[scale_id]( center_list[scale_id], joints_list[scale_id], area_list[scale_id]) target_list.append(offset.astype(np.float32)) weight_list.append(offset_weight) results['offsets'] = target_list results['offset_weights'] = weight_list return results
[docs]@PIPELINES.register_module() class BottomUpGenerateTarget: """Generate multi-scale heatmap target for associate embedding. Args: sigma (int): Sigma of heatmap Gaussian max_num_people (int): Maximum number of people in an image use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, sigma, max_num_people, use_udp=False): self.sigma = sigma self.max_num_people = max_num_people self.use_udp = use_udp def _generate(self, num_joints, heatmap_size): """Get heatmap generator and joint encoder.""" heatmap_generator = [ HeatmapGenerator(output_size, num_joints, self.sigma, self.use_udp) for output_size in heatmap_size ] joints_encoder = [ JointsEncoder(self.max_num_people, num_joints, output_size, True) for output_size in heatmap_size ] return heatmap_generator, joints_encoder def __call__(self, results): """Generate multi-scale heatmap target for bottom-up.""" heatmap_generator, joints_encoder = \ self._generate(results['ann_info']['num_joints'], results['ann_info']['heatmap_size']) target_list = list() mask_list, joints_list = results['mask'], results['joints'] for scale_id in range(results['ann_info']['num_scales']): target_t = heatmap_generator[scale_id](joints_list[scale_id]) joints_t = joints_encoder[scale_id](joints_list[scale_id]) target_list.append(target_t.astype(np.float32)) mask_list[scale_id] = mask_list[scale_id].astype(np.float32) joints_list[scale_id] = joints_t.astype(np.int32) results['masks'], results['joints'] = mask_list, joints_list results['targets'] = target_list return results
[docs]@PIPELINES.register_module() class CIDGenerateTarget: """Generate target for CID training. Args: max_num_people (int): Maximum number of people in an image """ def __init__(self, max_num_people): self.max_num_people = max_num_people def gaussian2D(self, shape, sigma=1.): m, n = [(ss - 1.) / 2. for ss in shape] y, x = np.ogrid[-m:m + 1, -n:n + 1] h = np.exp(-(x * x + y * y) / (2 * sigma * sigma)) h[h < np.finfo(h.dtype).eps * h.max()] = 0 return h def gaussian_radius(self, det_size, min_overlap=0.7): height, width = det_size a1 = 1 b1 = (height + width) c1 = width * height * (1 - min_overlap) / (1 + min_overlap) sq1 = np.sqrt(b1**2 - 4 * a1 * c1) r1 = (b1 + sq1) / 2 a2 = 4 b2 = 2 * (height + width) c2 = (1 - min_overlap) * width * height sq2 = np.sqrt(b2**2 - 4 * a2 * c2) r2 = (b2 + sq2) / 2 a3 = 4 * min_overlap b3 = -2 * min_overlap * (height + width) c3 = (min_overlap - 1) * width * height sq3 = np.sqrt(b3**2 - 4 * a3 * c3) r3 = (b3 + sq3) / 2 return min(r1, r2, r3) def heatmap_generator(self, output_size, keypoints, bboxes): num_keypoints = keypoints.shape[1] hms = np.zeros((num_keypoints, output_size, output_size), dtype=np.float32) mask = np.ones((num_keypoints, 1, 1), dtype=np.float32) for kpt, bbox in zip(keypoints, bboxes): bbox = np.clip(bbox, 0, output_size - 1) h = np.sqrt( np.power(bbox[2, 0] - bbox[0, 0], 2) + np.power(bbox[2, 1] - bbox[0, 1], 2)) w = np.sqrt( np.power(bbox[1, 0] - bbox[0, 0], 2) + np.power(bbox[1, 1] - bbox[0, 1], 2)) radius = self.gaussian_radius((math.ceil(h), math.ceil(w))) radius = max(0, int(radius)) diameter = 2 * radius + 1 gaussian = self.gaussian2D((diameter, diameter), sigma=diameter / 6) height, width = output_size, output_size for idx, pt in enumerate(kpt): if pt[2] > 0: x, y = int(pt[0]), int(pt[1]) if x < 0 or y < 0 or x >= output_size or y >= output_size: continue left, right = min(x, radius), min(width - x, radius + 1) top, bottom = min(y, radius), min(height - y, radius + 1) masked_heatmap = hms[idx][y - top:y + bottom, x - left:x + right] masked_gaussian = gaussian[radius - top:radius + bottom, radius - left:radius + right] if min(masked_gaussian.shape) > 0 and min( masked_heatmap.shape) > 0: np.maximum( masked_heatmap, masked_gaussian, out=masked_heatmap) else: mask[idx] = 0.0 return hms, mask def __call__(self, results): """Generate global heatmap and instance heatmap target for CID.""" num_keypoints = results['ann_info']['num_joints'] output_size = results['ann_info']['heatmap_size'][0] keypoints = results['joints'][0] bboxes = results['bboxes'] areas = results['areas'] mask = results['mask'][0] num_people = keypoints.shape[0] # get center centers = np.zeros((num_people, 1, 3)) for i in range(num_people): vis = (keypoints[i, :, 2:3] > 0).astype(np.float32) keypoints_sum = np.sum(keypoints[i, :, :2] * vis, axis=0) num_vis_keypoints = len(np.nonzero(keypoints[i, :, 2])[0]) if num_vis_keypoints <= 0: centers[i, 0, 2] = 0 continue centers[i, 0, :2] = keypoints_sum / num_vis_keypoints centers[i, 0, 2] = 2 keypoints_with_centers = np.concatenate((keypoints, centers), axis=1) heatmap_with_center, _ = self.heatmap_generator( output_size, keypoints_with_centers, bboxes) # generate instance heatmap target ind_vis = [] area_idx = np.argsort(areas.squeeze()) inst_coords, inst_heatmaps, inst_masks = [], [], [] for i in area_idx: inst_coord = [] center = centers[i, 0] if center[2] < 1: continue x, y = int(center[0]), int(center[1]) if x < 0 or x >= output_size or y < 0 or y >= output_size: continue # rand center point in 3x3 grid new_x = x + random.choice([-1, 0, 1]) new_y = y + random.choice([-1, 0, 1]) if new_x < 0 or new_x >= output_size or \ new_y < 0 or new_y >= output_size: new_x = x new_y = y x, y = new_x, new_y if [y, x] in ind_vis: continue inst_coord.append([y, x]) ind_vis.append([y, x]) inst_coords.append(np.array(inst_coord)) inst_heatmap, inst_mask = self.heatmap_generator( output_size, keypoints[i:i + 1, :, :], bboxes[i:i + 1, :, :]) inst_heatmaps.append(inst_heatmap[None, :, :, :]) inst_masks.append(inst_mask[None, :, :, :]) results['multi_heatmap'] = heatmap_with_center results['multi_mask'] = mask[None, :, :] # pad instance targets for batching instance_coord = np.zeros((self.max_num_people, 2), dtype=int) instance_heatmap = np.zeros( (self.max_num_people, num_keypoints, output_size, output_size), dtype=np.float32) instance_mask = np.zeros((self.max_num_people, num_keypoints, 1, 1), dtype=np.float32) instance_valid = np.zeros((self.max_num_people), dtype=int) if len(inst_coords) > 0: idx_list = list(range(len(inst_coords))) random.shuffle(idx_list) for i, idx in enumerate(idx_list): if idx >= self.max_num_people: continue instance_coord[i] = inst_coords[idx] instance_heatmap[i] = inst_heatmaps[idx] instance_mask[i] = inst_masks[idx] instance_valid[i] = 1 results['instance_coord'] = instance_coord results['instance_heatmap'] = instance_heatmap results['instance_mask'] = instance_mask results['instance_valid'] = instance_valid return results
[docs]@PIPELINES.register_module() class BottomUpGeneratePAFTarget: """Generate multi-scale heatmaps and part affinity fields (PAF) target for bottom-up. Paper ref: Cao et al. Realtime Multi-Person 2D Human Pose Estimation using Part Affinity Fields (CVPR 2017). Args: limb_width (int): Limb width of part affinity fields """ def __init__(self, limb_width, skeleton=None): self.limb_width = limb_width self.skeleton = skeleton def _generate(self, heatmap_size, skeleton): """Get PAF generator.""" paf_generator = [ PAFGenerator(output_size, self.limb_width, skeleton) for output_size in heatmap_size ] return paf_generator def __call__(self, results): """Generate multi-scale part affinity fields for bottom-up.""" if self.skeleton is None: assert results['ann_info']['skeleton'] is not None self.skeleton = results['ann_info']['skeleton'] paf_generator = \ self._generate(results['ann_info']['heatmap_size'], self.skeleton) target_list = list() joints_list = results['joints'] for scale_id in range(results['ann_info']['num_scales']): pafs = paf_generator[scale_id](joints_list[scale_id]) target_list.append(pafs.astype(np.float32)) results['target'] = target_list return results
[docs]@PIPELINES.register_module() class BottomUpGetImgSize: """Get multi-scale image sizes for bottom-up, including base_size and test_scale_factor. Keep the ratio and the image is resized to `results['ann_info']['image_size']×current_scale`. Args: test_scale_factor (List[float]): Multi scale current_scale (int): default 1 base_length (int): The width and height should be multiples of base_length. Default: 64. use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, test_scale_factor, current_scale=1, base_length=64, use_udp=False): self.test_scale_factor = test_scale_factor self.min_scale = min(test_scale_factor) self.current_scale = current_scale self.base_length = base_length self.use_udp = use_udp def __call__(self, results): """Get multi-scale image sizes for bottom-up.""" input_size = results['ann_info']['image_size'] if not isinstance(input_size, np.ndarray): input_size = np.array(input_size) if input_size.size > 1: assert len(input_size) == 2 else: input_size = np.array([input_size, input_size], dtype=int) img = results['img'] base_size, center, scale = _get_multi_scale_size( img, input_size, self.current_scale, self.min_scale, self.base_length, self.use_udp) results['ann_info']['test_scale_factor'] = self.test_scale_factor results['ann_info']['base_size'] = base_size results['ann_info']['center'] = center results['ann_info']['scale'] = scale return results
[docs]@PIPELINES.register_module() class BottomUpResizeAlign: """Resize multi-scale size and align transform for bottom-up. Args: transforms (List): ToTensor & Normalize base_length (int): The width and height should be multiples of base_length. Default: 64. use_udp (bool): To use unbiased data processing. Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased Data Processing for Human Pose Estimation (CVPR 2020). """ def __init__(self, transforms, base_length=64, use_udp=False): self.transforms = Compose(transforms) self.base_length = base_length if use_udp: self._resize_align_multi_scale = _resize_align_multi_scale_udp else: self._resize_align_multi_scale = _resize_align_multi_scale def __call__(self, results): """Resize multi-scale size and align transform for bottom-up.""" input_size = results['ann_info']['image_size'] if not isinstance(input_size, np.ndarray): input_size = np.array(input_size) if input_size.size > 1: assert len(input_size) == 2 else: input_size = np.array([input_size, input_size], dtype=int) test_scale_factor = results['ann_info']['test_scale_factor'] aug_data = [] for _, s in enumerate(sorted(test_scale_factor, reverse=True)): _results = results.copy() image_resized, _, _ = self._resize_align_multi_scale( _results['img'], input_size, s, min(test_scale_factor), self.base_length) _results['img'] = image_resized _results = self.transforms(_results) transformed_img = _results['img'].unsqueeze(0) aug_data.append(transformed_img) results['ann_info']['aug_data'] = aug_data return results
Read the Docs v: latest
Versions
latest
1.x
v0.14.0
fix-doc
cn_doc
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.