• Docs >
  • Module code >
  • mmpose.datasets.datasets.top_down.topdown_posetrack18_video_dataset
Shortcuts

Source code for mmpose.datasets.datasets.top_down.topdown_posetrack18_video_dataset

# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import tempfile
from collections import OrderedDict, defaultdict

import json_tricks as json
import numpy as np
from mmcv import deprecated_api_warning

from ....core.post_processing import oks_nms, soft_oks_nms
from ...builder import DATASETS
from ..base import Kpt2dSviewRgbVidTopDownDataset

try:
    from poseval import eval_helpers
    from poseval.evaluateAP import evaluateAP
    has_poseval = True
except (ImportError, ModuleNotFoundError):
    has_poseval = False


[docs]@DATASETS.register_module() class TopDownPoseTrack18VideoDataset(Kpt2dSviewRgbVidTopDownDataset): """PoseTrack18 dataset for top-down pose estimation. "Posetrack: A benchmark for human pose estimation and tracking", CVPR'2018. More details can be found in the `paper <https://arxiv.org/abs/1710.10000>`__ . The dataset loads raw features and apply specified transforms to return a dict containing the image tensors and other information. PoseTrack2018 keypoint indexes:: 0: 'nose', 1: 'head_bottom', 2: 'head_top', 3: 'left_ear', 4: 'right_ear', 5: 'left_shoulder', 6: 'right_shoulder', 7: 'left_elbow', 8: 'right_elbow', 9: 'left_wrist', 10: 'right_wrist', 11: 'left_hip', 12: 'right_hip', 13: 'left_knee', 14: 'right_knee', 15: 'left_ankle', 16: 'right_ankle' Args: ann_file (str): Path to the annotation file. img_prefix (str): Path to a directory where videos/images are held. Default: None. data_cfg (dict): config pipeline (list[dict | callable]): A sequence of data transforms. dataset_info (DatasetInfo): A class containing all dataset info. test_mode (bool): Store True when building test or validation dataset. Default: False. ph_fill_len (int): The length of the placeholder to fill in the image filenames, default: 6 in PoseTrack18. """ def __init__(self, ann_file, img_prefix, data_cfg, pipeline, dataset_info=None, test_mode=False, ph_fill_len=6): super().__init__( ann_file, img_prefix, data_cfg, pipeline, dataset_info=dataset_info, test_mode=test_mode) self.use_gt_bbox = data_cfg['use_gt_bbox'] self.bbox_file = data_cfg['bbox_file'] self.det_bbox_thr = data_cfg.get('det_bbox_thr', 0.0) self.use_nms = data_cfg.get('use_nms', True) self.soft_nms = data_cfg['soft_nms'] self.nms_thr = data_cfg['nms_thr'] self.oks_thr = data_cfg['oks_thr'] self.vis_thr = data_cfg['vis_thr'] self.frame_weight_train = data_cfg['frame_weight_train'] self.frame_weight_test = data_cfg['frame_weight_test'] self.frame_weight = self.frame_weight_test \ if self.test_mode else self.frame_weight_train self.ph_fill_len = ph_fill_len # select the frame indices self.frame_index_rand = data_cfg.get('frame_index_rand', True) self.frame_index_range = data_cfg.get('frame_index_range', [-2, 2]) self.num_adj_frames = data_cfg.get('num_adj_frames', 1) self.frame_indices_train = data_cfg.get('frame_indices_train', None) self.frame_indices_test = data_cfg.get('frame_indices_test', [-2, -1, 0, 1, 2]) if self.frame_indices_train is not None: self.frame_indices_train.sort() self.frame_indices_test.sort() self.db = self._get_db() print(f'=> num_images: {self.num_images}') print(f'=> load {len(self.db)} samples') def _get_db(self): """Load dataset.""" if (not self.test_mode) or self.use_gt_bbox: # use ground truth bbox gt_db = self._load_coco_keypoint_annotations() else: # use bbox from detection gt_db = self._load_posetrack_person_detection_results() return gt_db def _load_coco_keypoint_annotations(self): """Ground truth bbox and keypoints.""" gt_db = [] for img_id in self.img_ids: gt_db.extend(self._load_coco_keypoint_annotation_kernel(img_id)) return gt_db def _load_coco_keypoint_annotation_kernel(self, img_id): """load annotation from COCOAPI. Note: bbox:[x1, y1, w, h] Args: img_id: coco image id Returns: dict: db entry """ img_ann = self.coco.loadImgs(img_id)[0] width = img_ann['width'] height = img_ann['height'] num_joints = self.ann_info['num_joints'] file_name = img_ann['file_name'] nframes = int(img_ann['nframes']) frame_id = int(img_ann['frame_id']) ann_ids = self.coco.getAnnIds(imgIds=img_id, iscrowd=False) objs = self.coco.loadAnns(ann_ids) # sanitize bboxes valid_objs = [] for obj in objs: if 'bbox' not in obj: continue x, y, w, h = obj['bbox'] x1 = max(0, x) y1 = max(0, y) x2 = min(width - 1, x1 + max(0, w)) y2 = min(height - 1, y1 + max(0, h)) if ('area' not in obj or obj['area'] > 0) and x2 > x1 and y2 > y1: obj['clean_bbox'] = [x1, y1, x2 - x1, y2 - y1] valid_objs.append(obj) objs = valid_objs bbox_id = 0 rec = [] for obj in objs: if 'keypoints' not in obj: continue if max(obj['keypoints']) == 0: continue if 'num_keypoints' in obj and obj['num_keypoints'] == 0: continue joints_3d = np.zeros((num_joints, 3), dtype=np.float32) joints_3d_visible = np.zeros((num_joints, 3), dtype=np.float32) keypoints = np.array(obj['keypoints']).reshape(-1, 3) joints_3d[:, :2] = keypoints[:, :2] joints_3d_visible[:, :2] = np.minimum(1, keypoints[:, 2:3]) image_files = [] cur_image_file = osp.join(self.img_prefix, self.id2name[img_id]) image_files.append(cur_image_file) # "images/val/012834_mpii_test/000000.jpg" -->> "000000.jpg" cur_image_name = file_name.split('/')[-1] ref_idx = int(cur_image_name.replace('.jpg', '')) # select the frame indices if not self.test_mode and self.frame_indices_train is not None: indices = self.frame_indices_train elif not self.test_mode and self.frame_index_rand: low, high = self.frame_index_range indices = np.random.randint(low, high + 1, self.num_adj_frames) else: indices = self.frame_indices_test for index in indices: if self.test_mode and index == 0: continue # the supporting frame index support_idx = ref_idx + index # clip the frame index to make sure that it does not exceed # the boundings of frame indices support_idx = np.clip(support_idx, 0, nframes - 1) sup_image_file = cur_image_file.replace( cur_image_name, str(support_idx).zfill(self.ph_fill_len) + '.jpg') image_files.append(sup_image_file) rec.append({ 'image_file': image_files, 'bbox': obj['clean_bbox'][:4], 'rotation': 0, 'joints_3d': joints_3d, 'joints_3d_visible': joints_3d_visible, 'dataset': self.dataset_name, 'bbox_score': 1, 'bbox_id': bbox_id, 'nframes': nframes, 'frame_id': frame_id, 'frame_weight': self.frame_weight }) bbox_id = bbox_id + 1 return rec def _load_posetrack_person_detection_results(self): """Load Posetrack person detection results. Only in test mode. """ num_joints = self.ann_info['num_joints'] all_boxes = None with open(self.bbox_file, 'r') as f: all_boxes = json.load(f) if not all_boxes: raise ValueError('=> Load %s fail!' % self.bbox_file) print(f'=> Total boxes: {len(all_boxes)}') kpt_db = [] bbox_id = 0 for det_res in all_boxes: if det_res['category_id'] != 1: continue score = det_res['score'] if score < self.det_bbox_thr: continue box = det_res['bbox'] # deal with different bbox file formats if 'nframes' in det_res and 'frame_id' in det_res: nframes = int(det_res['nframes']) frame_id = int(det_res['frame_id']) elif 'image_name' in det_res: img_id = self.name2id[det_res['image_name']] img_ann = self.coco.loadImgs(img_id)[0] nframes = int(img_ann['nframes']) frame_id = int(img_ann['frame_id']) else: img_id = det_res['image_id'] img_ann = self.coco.loadImgs(img_id)[0] nframes = int(img_ann['nframes']) frame_id = int(img_ann['frame_id']) image_files = [] if 'image_name' in det_res: file_name = det_res['image_name'] else: file_name = self.id2name[det_res['image_id']] cur_image_file = osp.join(self.img_prefix, file_name) image_files.append(cur_image_file) # "images/val/012834_mpii_test/000000.jpg" -->> "000000.jpg" cur_image_name = file_name.split('/')[-1] ref_idx = int(cur_image_name.replace('.jpg', '')) indices = self.frame_indices_test for index in indices: if self.test_mode and index == 0: continue # the supporting frame index support_idx = ref_idx + index # clip the frame index to make sure that it does not exceed # the boundings of frame indices support_idx = np.clip(support_idx, 0, nframes - 1) sup_image_file = cur_image_file.replace( cur_image_name, str(support_idx).zfill(self.ph_fill_len) + '.jpg') image_files.append(sup_image_file) joints_3d = np.zeros((num_joints, 3), dtype=np.float32) joints_3d_visible = np.ones((num_joints, 3), dtype=np.float32) kpt_db.append({ 'image_file': image_files, 'rotation': 0, 'bbox': box[:4], 'bbox_score': score, 'dataset': self.dataset_name, 'joints_3d': joints_3d, 'joints_3d_visible': joints_3d_visible, 'bbox_id': bbox_id, 'nframes': nframes, 'frame_id': frame_id, 'frame_weight': self.frame_weight }) bbox_id = bbox_id + 1 print(f'=> Total boxes after filter ' f'low score@{self.det_bbox_thr}: {bbox_id}') return kpt_db
[docs] @deprecated_api_warning(name_dict=dict(outputs='results')) def evaluate(self, results, res_folder=None, metric='mAP', **kwargs): """Evaluate posetrack keypoint results. The pose prediction results will be saved in ``${res_folder}/result_keypoints.json``. Note: - num_keypoints: K Args: results (list[dict]): Testing results containing the following items: - preds (np.ndarray[N,K,3]): The first two dimensions are \ coordinates, score is the third dimension of the array. - boxes (np.ndarray[N,6]): [center[0], center[1], scale[0], \ scale[1],area, score] - image_paths (list[str]): For example, ['val/010016_mpii_test\ /000024.jpg'] - heatmap (np.ndarray[N, K, H, W]): model output heatmap. - bbox_id (list(int)) res_folder (str, optional): The folder to save the testing results. If not specified, a temp folder will be created. Default: None. metric (str | list[str]): Metric to be performed. Defaults: 'mAP'. Returns: dict: Evaluation results for evaluation metric. """ metrics = metric if isinstance(metric, list) else [metric] allowed_metrics = ['mAP'] for metric in metrics: if metric not in allowed_metrics: raise KeyError(f'metric {metric} is not supported') if res_folder is not None: tmp_folder = None else: tmp_folder = tempfile.TemporaryDirectory() res_folder = tmp_folder.name gt_folder = osp.join( osp.dirname(self.ann_file), osp.splitext(self.ann_file.split('_')[-1])[0]) kpts = defaultdict(list) for result in results: preds = result['preds'] boxes = result['boxes'] image_paths = result['image_paths'] bbox_ids = result['bbox_ids'] batch_size = len(image_paths) for i in range(batch_size): if not isinstance(image_paths[i], list): image_id = self.name2id[image_paths[i] [len(self.img_prefix):]] else: image_id = self.name2id[image_paths[i][0] [len(self.img_prefix):]] kpts[image_id].append({ 'keypoints': preds[i], 'center': boxes[i][0:2], 'scale': boxes[i][2:4], 'area': boxes[i][4], 'score': boxes[i][5], 'image_id': image_id, 'bbox_id': bbox_ids[i] }) kpts = self._sort_and_unique_bboxes(kpts) # rescoring and oks nms num_joints = self.ann_info['num_joints'] vis_thr = self.vis_thr oks_thr = self.oks_thr valid_kpts = defaultdict(list) for image_id in kpts.keys(): img_kpts = kpts[image_id] for n_p in img_kpts: box_score = n_p['score'] kpt_score = 0 valid_num = 0 for n_jt in range(0, num_joints): t_s = n_p['keypoints'][n_jt][2] if t_s > vis_thr: kpt_score = kpt_score + t_s valid_num = valid_num + 1 if valid_num != 0: kpt_score = kpt_score / valid_num # rescoring n_p['score'] = kpt_score * box_score if self.use_nms: nms = soft_oks_nms if self.soft_nms else oks_nms keep = nms(img_kpts, oks_thr, sigmas=self.sigmas) valid_kpts[image_id].append( [img_kpts[_keep] for _keep in keep]) else: valid_kpts[image_id].append(img_kpts) self._write_keypoint_results(valid_kpts, gt_folder, res_folder) info_str = self._do_python_keypoint_eval(gt_folder, res_folder) name_value = OrderedDict(info_str) if tmp_folder is not None: tmp_folder.cleanup() return name_value
@staticmethod def _write_keypoint_results(keypoint_results, gt_folder, pred_folder): """Write results into a json file. Args: keypoint_results (dict): keypoint results organized by image_id. gt_folder (str): Path of directory for official gt files. pred_folder (str): Path of directory to save the results. """ categories = [] cat = {} cat['supercategory'] = 'person' cat['id'] = 1 cat['name'] = 'person' cat['keypoints'] = [ 'nose', 'head_bottom', 'head_top', 'left_ear', 'right_ear', 'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle' ] cat['skeleton'] = [[16, 14], [14, 12], [17, 15], [15, 13], [12, 13], [6, 12], [7, 13], [6, 7], [6, 8], [7, 9], [8, 10], [9, 11], [2, 3], [1, 2], [1, 3], [2, 4], [3, 5], [4, 6], [5, 7]] categories.append(cat) json_files = [ pos for pos in os.listdir(gt_folder) if pos.endswith('.json') ] for json_file in json_files: with open(osp.join(gt_folder, json_file), 'r') as f: gt = json.load(f) annotations = [] images = [] for image in gt['images']: im = {} im['id'] = image['id'] im['file_name'] = image['file_name'] images.append(im) img_kpts = keypoint_results[im['id']] if len(img_kpts) == 0: continue for track_id, img_kpt in enumerate(img_kpts[0]): ann = {} ann['image_id'] = img_kpt['image_id'] ann['keypoints'] = np.array( img_kpt['keypoints']).reshape(-1).tolist() ann['scores'] = np.array(ann['keypoints']).reshape( [-1, 3])[:, 2].tolist() ann['score'] = float(img_kpt['score']) ann['track_id'] = track_id annotations.append(ann) info = {} info['images'] = images info['categories'] = categories info['annotations'] = annotations with open(osp.join(pred_folder, json_file), 'w') as f: json.dump(info, f, sort_keys=True, indent=4) def _do_python_keypoint_eval(self, gt_folder, pred_folder): """Keypoint evaluation using poseval. Args: gt_folder (str): The folder of the json files storing ground truth keypoint annotations. pred_folder (str): The folder of the json files storing prediction results. Returns: List: Evaluation results for evaluation metric. """ if not has_poseval: raise ImportError('Please install poseval package for evaluation' 'on PoseTrack dataset ' '(see requirements/optional.txt)') argv = ['', gt_folder + '/', pred_folder + '/'] print('Loading data') gtFramesAll, prFramesAll = eval_helpers.load_data_dir(argv) print('# gt frames :', len(gtFramesAll)) print('# pred frames:', len(prFramesAll)) # evaluate per-frame multi-person pose estimation (AP) # compute AP print('Evaluation of per-frame multi-person pose estimation') apAll, _, _ = evaluateAP(gtFramesAll, prFramesAll, None, False, False) # print AP print('Average Precision (AP) metric:') eval_helpers.printTable(apAll) stats = eval_helpers.getCum(apAll) stats_names = [ 'Head AP', 'Shou AP', 'Elb AP', 'Wri AP', 'Hip AP', 'Knee AP', 'Ankl AP', 'Total AP' ] info_str = list(zip(stats_names, stats)) return info_str
Read the Docs v: latest
Versions
latest
1.x
v0.14.0
fix-doc
cn_doc
Downloads
pdf
html
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.