import os
import warnings
from collections import OrderedDict, defaultdict
import json_tricks as json
import numpy as np
from xtcocotools.coco import COCO
from xtcocotools.cocoeval import COCOeval
from ....core.post_processing import oks_nms, soft_oks_nms
from ...builder import DATASETS
from .topdown_base_dataset import TopDownBaseDataset
[文档]@DATASETS.register_module()
class TopDownCocoDataset(TopDownBaseDataset):
"""CocoDataset dataset for top-down pose estimation.
`Microsoft COCO: Common Objects in Context' ECCV'2014
More details can be found in the `paper
<https://arxiv.org/abs/1405.0312>`__ .
The dataset loads raw features and apply specified transforms
to return a dict containing the image tensors and other information.
COCO keypoint indexes::
0: 'nose',
1: 'left_eye',
2: 'right_eye',
3: 'left_ear',
4: 'right_ear',
5: 'left_shoulder',
6: 'right_shoulder',
7: 'left_elbow',
8: 'right_elbow',
9: 'left_wrist',
10: 'right_wrist',
11: 'left_hip',
12: 'right_hip',
13: 'left_knee',
14: 'right_knee',
15: 'left_ankle',
16: 'right_ankle'
Args:
ann_file (str): Path to the annotation file.
img_prefix (str): Path to a directory where images are held.
Default: None.
data_cfg (dict): config
pipeline (list[dict | callable]): A sequence of data transforms.
test_mode (bool): Store True when building test or
validation dataset. Default: False.
"""
def __init__(self,
ann_file,
img_prefix,
data_cfg,
pipeline,
test_mode=False):
super().__init__(
ann_file, img_prefix, data_cfg, pipeline, test_mode=test_mode)
self.use_gt_bbox = data_cfg['use_gt_bbox']
self.bbox_file = data_cfg['bbox_file']
self.det_bbox_thr = data_cfg.get('det_bbox_thr', 0.0)
if 'image_thr' in data_cfg:
warnings.warn(
'image_thr is deprecated, '
'please use det_bbox_thr instead', DeprecationWarning)
self.det_bbox_thr = data_cfg['image_thr']
self.use_nms = data_cfg.get('use_nms', True)
self.soft_nms = data_cfg['soft_nms']
self.nms_thr = data_cfg['nms_thr']
self.oks_thr = data_cfg['oks_thr']
self.vis_thr = data_cfg['vis_thr']
self.ann_info['flip_pairs'] = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10],
[11, 12], [13, 14], [15, 16]]
self.ann_info['upper_body_ids'] = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
self.ann_info['lower_body_ids'] = (11, 12, 13, 14, 15, 16)
self.ann_info['use_different_joint_weights'] = False
self.ann_info['joint_weights'] = np.array(
[
1., 1., 1., 1., 1., 1., 1., 1.2, 1.2, 1.5, 1.5, 1., 1., 1.2,
1.2, 1.5, 1.5
],
dtype=np.float32).reshape((self.ann_info['num_joints'], 1))
# 'https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/'
# 'pycocotools/cocoeval.py#L523'
self.sigmas = np.array([
.26, .25, .25, .35, .35, .79, .79, .72, .72, .62, .62, 1.07, 1.07,
.87, .87, .89, .89
]) / 10.0
self.coco = COCO(ann_file)
cats = [
cat['name'] for cat in self.coco.loadCats(self.coco.getCatIds())
]
self.classes = ['__background__'] + cats
self.num_classes = len(self.classes)
self._class_to_ind = dict(zip(self.classes, range(self.num_classes)))
self._class_to_coco_ind = dict(zip(cats, self.coco.getCatIds()))
self._coco_ind_to_class_ind = dict(
(self._class_to_coco_ind[cls], self._class_to_ind[cls])
for cls in self.classes[1:])
self.img_ids = self.coco.getImgIds()
self.num_images = len(self.img_ids)
self.id2name, self.name2id = self._get_mapping_id_name(self.coco.imgs)
self.dataset_name = 'coco'
self.db = self._get_db()
print(f'=> num_images: {self.num_images}')
print(f'=> load {len(self.db)} samples')
@staticmethod
def _get_mapping_id_name(imgs):
"""
Args:
imgs (dict): dict of image info.
Returns:
tuple: Image name & id mapping dicts.
- id2name (dict): Mapping image id to name.
- name2id (dict): Mapping image name to id.
"""
id2name = {}
name2id = {}
for image_id, image in imgs.items():
file_name = image['file_name']
id2name[image_id] = file_name
name2id[file_name] = image_id
return id2name, name2id
def _get_db(self):
"""Load dataset."""
if (not self.test_mode) or self.use_gt_bbox:
# use ground truth bbox
gt_db = self._load_coco_keypoint_annotations()
else:
# use bbox from detection
gt_db = self._load_coco_person_detection_results()
return gt_db
def _load_coco_keypoint_annotations(self):
"""Ground truth bbox and keypoints."""
gt_db = []
for img_id in self.img_ids:
gt_db.extend(self._load_coco_keypoint_annotation_kernel(img_id))
return gt_db
def _load_coco_keypoint_annotation_kernel(self, img_id):
"""load annotation from COCOAPI.
Note:
bbox:[x1, y1, w, h]
Args:
img_id: coco image id
Returns:
dict: db entry
"""
img_ann = self.coco.loadImgs(img_id)[0]
width = img_ann['width']
height = img_ann['height']
num_joints = self.ann_info['num_joints']
ann_ids = self.coco.getAnnIds(imgIds=img_id, iscrowd=False)
objs = self.coco.loadAnns(ann_ids)
# sanitize bboxes
valid_objs = []
for obj in objs:
if 'bbox' not in obj:
continue
x, y, w, h = obj['bbox']
x1 = max(0, x)
y1 = max(0, y)
x2 = min(width - 1, x1 + max(0, w - 1))
y2 = min(height - 1, y1 + max(0, h - 1))
if ('area' not in obj or obj['area'] > 0) and x2 > x1 and y2 > y1:
obj['clean_bbox'] = [x1, y1, x2 - x1, y2 - y1]
valid_objs.append(obj)
objs = valid_objs
bbox_id = 0
rec = []
for obj in objs:
if 'keypoints' not in obj:
continue
if max(obj['keypoints']) == 0:
continue
if 'num_keypoints' in obj and obj['num_keypoints'] == 0:
continue
joints_3d = np.zeros((num_joints, 3), dtype=np.float32)
joints_3d_visible = np.zeros((num_joints, 3), dtype=np.float32)
keypoints = np.array(obj['keypoints']).reshape(-1, 3)
joints_3d[:, :2] = keypoints[:, :2]
joints_3d_visible[:, :2] = np.minimum(1, keypoints[:, 2:3])
center, scale = self._xywh2cs(*obj['clean_bbox'][:4])
image_file = os.path.join(self.img_prefix, self.id2name[img_id])
rec.append({
'image_file': image_file,
'center': center,
'scale': scale,
'bbox': obj['clean_bbox'][:4],
'rotation': 0,
'joints_3d': joints_3d,
'joints_3d_visible': joints_3d_visible,
'dataset': self.dataset_name,
'bbox_score': 1,
'bbox_id': bbox_id
})
bbox_id = bbox_id + 1
return rec
def _xywh2cs(self, x, y, w, h):
"""This encodes bbox(x,y,w,h) into (center, scale)
Args:
x, y, w, h
Returns:
tuple: A tuple containing center and scale.
- center (np.ndarray[float32](2,)): center of the bbox (x, y).
- scale (np.ndarray[float32](2,)): scale of the bbox w & h.
"""
aspect_ratio = self.ann_info['image_size'][0] / self.ann_info[
'image_size'][1]
center = np.array([x + w * 0.5, y + h * 0.5], dtype=np.float32)
if (not self.test_mode) and np.random.rand() < 0.3:
center += 0.4 * (np.random.rand(2) - 0.5) * [w, h]
if w > aspect_ratio * h:
h = w * 1.0 / aspect_ratio
elif w < aspect_ratio * h:
w = h * aspect_ratio
# pixel std is 200.0
scale = np.array([w / 200.0, h / 200.0], dtype=np.float32)
# padding to include proper amount of context
scale = scale * 1.25
return center, scale
def _load_coco_person_detection_results(self):
"""Load coco person detection results."""
num_joints = self.ann_info['num_joints']
all_boxes = None
with open(self.bbox_file, 'r') as f:
all_boxes = json.load(f)
if not all_boxes:
raise ValueError('=> Load %s fail!' % self.bbox_file)
print(f'=> Total boxes: {len(all_boxes)}')
kpt_db = []
bbox_id = 0
for det_res in all_boxes:
if det_res['category_id'] != 1:
continue
image_file = os.path.join(self.img_prefix,
self.id2name[det_res['image_id']])
box = det_res['bbox']
score = det_res['score']
if score < self.det_bbox_thr:
continue
center, scale = self._xywh2cs(*box[:4])
joints_3d = np.zeros((num_joints, 3), dtype=np.float32)
joints_3d_visible = np.ones((num_joints, 3), dtype=np.float32)
kpt_db.append({
'image_file': image_file,
'center': center,
'scale': scale,
'rotation': 0,
'bbox': box[:4],
'bbox_score': score,
'dataset': self.dataset_name,
'joints_3d': joints_3d,
'joints_3d_visible': joints_3d_visible,
'bbox_id': bbox_id
})
bbox_id = bbox_id + 1
print(f'=> Total boxes after filter '
f'low score@{self.det_bbox_thr}: {bbox_id}')
return kpt_db
[文档] def evaluate(self, outputs, res_folder, metric='mAP', **kwargs):
"""Evaluate coco keypoint results. The pose prediction results will be
saved in `${res_folder}/result_keypoints.json`.
Note:
batch_size: N
num_keypoints: K
heatmap height: H
heatmap width: W
Args:
outputs (list(dict))
:preds (np.ndarray[N,K,3]): The first two dimensions are
coordinates, score is the third dimension of the array.
:boxes (np.ndarray[N,6]): [center[0], center[1], scale[0]
, scale[1],area, score]
:image_paths (list[str]): For example, ['data/coco/val2017
/000000393226.jpg']
:heatmap (np.ndarray[N, K, H, W]): model output heatmap
:bbox_id (list(int)).
res_folder (str): Path of directory to save the results.
metric (str | list[str]): Metric to be performed. Defaults: 'mAP'.
Returns:
dict: Evaluation results for evaluation metric.
"""
metrics = metric if isinstance(metric, list) else [metric]
allowed_metrics = ['mAP']
for metric in metrics:
if metric not in allowed_metrics:
raise KeyError(f'metric {metric} is not supported')
res_file = os.path.join(res_folder, 'result_keypoints.json')
kpts = defaultdict(list)
for output in outputs:
preds = output['preds']
boxes = output['boxes']
image_paths = output['image_paths']
bbox_ids = output['bbox_ids']
batch_size = len(image_paths)
for i in range(batch_size):
image_id = self.name2id[image_paths[i][len(self.img_prefix):]]
kpts[image_id].append({
'keypoints': preds[i],
'center': boxes[i][0:2],
'scale': boxes[i][2:4],
'area': boxes[i][4],
'score': boxes[i][5],
'image_id': image_id,
'bbox_id': bbox_ids[i]
})
kpts = self._sort_and_unique_bboxes(kpts)
# rescoring and oks nms
num_joints = self.ann_info['num_joints']
vis_thr = self.vis_thr
oks_thr = self.oks_thr
valid_kpts = []
for image_id in kpts.keys():
img_kpts = kpts[image_id]
for n_p in img_kpts:
box_score = n_p['score']
kpt_score = 0
valid_num = 0
for n_jt in range(0, num_joints):
t_s = n_p['keypoints'][n_jt][2]
if t_s > vis_thr:
kpt_score = kpt_score + t_s
valid_num = valid_num + 1
if valid_num != 0:
kpt_score = kpt_score / valid_num
# rescoring
n_p['score'] = kpt_score * box_score
if self.use_nms:
nms = soft_oks_nms if self.soft_nms else oks_nms
keep = nms(img_kpts, oks_thr, sigmas=self.sigmas)
valid_kpts.append([img_kpts[_keep] for _keep in keep])
else:
valid_kpts.append(img_kpts)
self._write_coco_keypoint_results(valid_kpts, res_file)
info_str = self._do_python_keypoint_eval(res_file)
name_value = OrderedDict(info_str)
return name_value
def _write_coco_keypoint_results(self, keypoints, res_file):
"""Write results into a json file."""
data_pack = [{
'cat_id': self._class_to_coco_ind[cls],
'cls_ind': cls_ind,
'cls': cls,
'ann_type': 'keypoints',
'keypoints': keypoints
} for cls_ind, cls in enumerate(self.classes)
if not cls == '__background__']
results = self._coco_keypoint_results_one_category_kernel(data_pack[0])
with open(res_file, 'w') as f:
json.dump(results, f, sort_keys=True, indent=4)
def _coco_keypoint_results_one_category_kernel(self, data_pack):
"""Get coco keypoint results."""
cat_id = data_pack['cat_id']
keypoints = data_pack['keypoints']
cat_results = []
for img_kpts in keypoints:
if len(img_kpts) == 0:
continue
_key_points = np.array(
[img_kpt['keypoints'] for img_kpt in img_kpts])
key_points = _key_points.reshape(-1,
self.ann_info['num_joints'] * 3)
result = [{
'image_id': img_kpt['image_id'],
'category_id': cat_id,
'keypoints': key_point.tolist(),
'score': float(img_kpt['score']),
'center': img_kpt['center'].tolist(),
'scale': img_kpt['scale'].tolist()
} for img_kpt, key_point in zip(img_kpts, key_points)]
cat_results.extend(result)
return cat_results
def _do_python_keypoint_eval(self, res_file):
"""Keypoint evaluation using COCOAPI."""
coco_det = self.coco.loadRes(res_file)
coco_eval = COCOeval(self.coco, coco_det, 'keypoints', self.sigmas)
coco_eval.params.useSegm = None
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
stats_names = [
'AP', 'AP .5', 'AP .75', 'AP (M)', 'AP (L)', 'AR', 'AR .5',
'AR .75', 'AR (M)', 'AR (L)'
]
info_str = list(zip(stats_names, coco_eval.stats))
return info_str
def _sort_and_unique_bboxes(self, kpts, key='bbox_id'):
"""sort kpts and remove the repeated ones."""
for img_id, persons in kpts.items():
num = len(persons)
kpts[img_id] = sorted(kpts[img_id], key=lambda x: x[key])
for i in range(num - 1, 0, -1):
if kpts[img_id][i][key] == kpts[img_id][i - 1][key]:
del kpts[img_id][i]
return kpts