Source code for stardust.metric.tracking

import collections
import os
import json
from typing import Dict, List, Generator

import numpy as np
from scipy.optimize import linear_sum_assignment

from stardust.components.annotations.box3d import Box3D
from stardust.components.annotations.point3d import Point3D
from stardust.components.annotations.box import Box2D
from stardust.components.annotations.point import Point
from stardust.metric.object_detection import BoxIou2D, BoxIou3D


[docs] class TrackingData: def __init__(self, track_id: int, box_type: str, obj_info: List): """ Define a tracking object Args: track_id: int tracking id of object box_type: str type of object, which should be chosen from '2D' and '3D' obj_info: List info of box, [x, y, z, l, w, h, ry] for box 3d and [x, y, w, h] for box 2d Returns: TrackingData object Examples: .. code-block:: python from stardust.metric.tracking import TrackingData t_data = TrackingData(0, '3D', [1, 2, 3, 4.5, 1.8, 1.5, 0.1]) t_data = TrackingData(0, '2D', [1, 2, 4.5, 1.8]) """ if box_type is None or box_type not in ['2D', '3D']: raise ValueError(f"Got invalid box_type {box_type}") self.track_id = track_id if box_type == '2D': if len(obj_info) != 4: raise ValueError(f"The input {box_type} box should be [x, y, w, h]") self.x = obj_info[0] self.y = obj_info[1] self.w = obj_info[2] self.h = obj_info[3] elif box_type == '3D': if len(obj_info) != 7: raise ValueError(f"The input {box_type} box should be [x, y, z, l, w, h, ry]") self.x = obj_info[0] self.y = obj_info[1] self.z = obj_info[2] self.l = obj_info[3] self.w = obj_info[4] self.h = obj_info[5] self.yaw = obj_info[6] self.valid = False self.tracker = None
[docs] class TrackingMetric: def __repr__(self): description = 'Calculate tracking metric' return description def __init__(self, trajectory_gt: List, trajectory_pd: List, IoU_thr: float, metric_tyoe: str) -> None: """ Define a TrackMetric object and prepare metric data Args: trajectory_gt: List trajectory of annotation trajectory_pd: List trajectory of prediction IoU_thr: float The iou threshold of tp boxes metric_tyoe: str type of input object, which should be chosen from '2D' and '3D' Returns: TrackingMetric obj """ self.trajectory_gt = trajectory_gt self.trajectory_pd = trajectory_pd self.IoU_thr = IoU_thr self.metric_tyoe = metric_tyoe self.gt = 0 self.pd = 0 for frame_id in trajectory_gt: self.gt += len(trajectory_gt[frame_id]) for frame_id in trajectory_pd: self.pd += len(trajectory_pd[frame_id]) self.tp = 0 self.id_switches = 0 self.MOTA = 0 self.ious = []
[docs] def compute(self): """ compute tracking metric Args: Returns: Dict """ max_cost = 1e9 self.seq_trajectories = collections.defaultdict(list) for f in range(len(self.trajectory_gt)): g = self.trajectory_gt[f] t = self.trajectory_pd[f] cost_matrix_iou = [] iou = np.zeros((len(g), len(t))) for gi, gg in enumerate(g): gg.tracker = -1 cost_row_iou = [] for ti, tt in enumerate(t): if self.metric_tyoe == '3D': pred_bbox = Box3D(Point3D(tt.x, tt.y, tt.z), Point3D(tt.l, tt.w, tt.h), [0, 0, tt.yaw]) gt_bbox = Box3D(Point3D(gg.x, gg.y, gg.z), Point3D(gg.l, gg.w, gg.h), [0, 0, gg.yaw]) iou[gi][ti] = BoxIou3D().compute_IoU(gt_bbox, pred_bbox, 'IoU') else: pred_bbox = Box2D(center=Point(tt.x, tt.y), size=[tt.w, tt.h]) gt_bbox = Box2D(center=Point(gg.x, gg.y), size=[gg.w, gg.h]) iou[gi][ti] = BoxIou2D().compute_IoU(gt_bbox, pred_bbox, 'IoU') if iou[gi][ti] >= self.IoU_thr: cost_row_iou.append(1 - iou[gi][ti]) else: cost_row_iou.append(max_cost) self.seq_trajectories[gg.track_id].append(-1) cost_matrix_iou.append(cost_row_iou) if len(g) == 0: cost_matrix_iou = [[]] row_inds, col_inds = linear_sum_assignment(cost_matrix_iou) for row, col in zip(row_inds, col_inds): c_iou = float(cost_matrix_iou[row][col]) if c_iou < max_cost: self.ious.append(1 - c_iou) g[row].tracker = t[col].track_id t[col].valid = True self.tp += 1 self.seq_trajectories[g[row].track_id][-1] = t[col].track_id else: g[row].tracker = -1 self.fn = self.gt - self.tp self.fp = self.pd - self.tp for g in self.seq_trajectories.values(): last_id = g[0] for f in range(1, len(g)): if last_id != g[f] and last_id != -1 and g[f] != -1 and g[f - 1] != -1: self.id_switches += 1 if self.gt == 0: self.MOTA = float('-inf') else: self.MOTA = 1 - (self.fn + self.fp + self.id_switches) / float(self.gt) self.recall = float('nan') if self.gt == 0 else self.tp / self.gt self.precision = float('nan') if self.pd == 0 else self.tp / self.pd self.f1 = float('nan') if self.recall + self.precision == 0 else 2 * self.recall * self.precision / (self.recall + self.precision) self.miou = sum(self.ious) / len(self.ious) if len(self.ious) else 0 self.map07 = len([x for x in self.ious if x >= 0.7]) / self.pd if self.pd != 0 else 0 self.map08 = len([x for x in self.ious if x >= 0.8]) / self.pd if self.pd != 0 else 0 self.map09 = len([x for x in self.ious if x >= 0.9]) / self.pd if self.pd != 0 else 0 metric = dict(gt=self.gt, pd=self.pd, tp=self.tp, recall=self.recall, precision=[self.map07, self.map08, self.map09], f1=self.f1, miou=self.miou, idsw=self.id_switches, MOTA=self.MOTA) return metric
[docs] def compute_metric(data: Generator = None, IoU_thr: float = None, metric_types: List = None, save_path: str = None): """ Computing IoU of all objects in all frames Args: data: Generator A generator object to get all information from all frames IoU_thr: float The iou threshold of tp boxes metric_types: List which type of box to compute metric, which can be chosen from ['2D'], ['3D'] and ['2D', '3D'] save_path: str Local path to save metric results Returns: metric: List The metric of dataset which include two dict, the first represents metric of every single frame and the second represents metric of all frames Examples: .. code-block:: python from stardust.metric.tracking import compute_metric project_id = 856 Export(project_id, 'top', input_path, True).export() json_datas = read_rosetta(project_id=project_id, input_path=input_path, ) metric = compute_metric(json_datas, 0.5, ['2D', '3D'], 'local/') """ assert data assert IoU_thr assert metric_types and metric_types in [['2D'], ['3D'], ['2D', '3D']] task_dict = collections.defaultdict(dict) for json_data in data: task_info = json_data.task_info task_id = str(task_info.task_id) frame_num = str(task_info.frame_num) task_dict[task_id][frame_num] = json_data metric_output = collections.defaultdict(dict) metric_total = {} for metric_type in metric_types: total_gt = total_pd = total_tp = total_idsw = 0 if metric_type == '3D': for task_id in task_dict: task_info = sorted(task_dict[task_id].items(), key=lambda x: x[0]) trajectory_gt = collections.defaultdict(list) trajectory_pd = collections.defaultdict(list) for frame_id in range(len(task_info)): gts = task_info[frame_id][1].annotation.box3d_lst pds = task_info[frame_id][1].prediction.box3d_lst for obj_id, obj_info in gts.items(): x, y, z = obj_info.center l, w, h = obj_info.size ry = obj_info.rotation[-1] t_data = TrackingData(obj_id, metric_type, [x, y, z, l, w, h, ry]) trajectory_gt[frame_id].append(t_data) for obj_id, obj_info in pds.items(): x, y, z = obj_info.center l, w, h = obj_info.size ry = obj_info.rotation[-1] t_data = TrackingData(obj_id, metric_type, [x, y, z, l, w, h, ry]) trajectory_pd[frame_id].append(t_data) metric_output[task_id][metric_type] = TrackingMetric(trajectory_gt, trajectory_pd, IoU_thr, metric_type).compute() total_gt += metric_output[task_id]['gt'] total_pd += metric_output[task_id]['pd'] total_tp += metric_output[task_id]['tp'] total_idsw += metric_output[task_id]['idsw'] if metric_type == '2D': for task_id in task_dict: task_info = sorted(task_dict[task_id].items(), key=lambda x: x[0]) trajectory_gt = collections.defaultdict(list) trajectory_pd = collections.defaultdict(list) for frame_id in range(len(task_info)): gts = task_info[frame_id][1].annotation.box2d_lst pds = task_info[frame_id][1].prediction.box2d_lst for obj_id, obj_info in gts.items(): x, y = obj_info.center w, h = obj_info.size t_data = TrackingData(obj_id, metric_type, [x, y, w, h]) trajectory_gt[frame_id].append(t_data) for obj_id, obj_info in pds.items(): x, y = obj_info.center w, h = obj_info.size t_data = TrackingData(obj_id, metric_type, [x, y, w, h]) trajectory_pd[frame_id].append(t_data) metric_output[task_id] = TrackingMetric(trajectory_gt, trajectory_pd, IoU_thr, metric_type).compute() total_gt += metric_output[task_id]['gt'] total_pd += metric_output[task_id]['pd'] total_tp += metric_output[task_id]['tp'] total_idsw += metric_output[task_id]['idsw'] total_fn = total_gt - total_tp total_fp = total_pd - total_tp total_recall = float('nan') if total_gt == 0 else total_tp / total_gt total_precision = float('nan') if total_pd == 0 else total_tp / total_pd total_f1 = float('nan') if total_recall + total_precision == 0 else 2 * total_recall * total_precision / (total_recall + total_precision) total_MOTA = float('nan') if total_gt == 0 else 1 - (total_fn + total_fp + total_idsw) / float(total_gt) metric_total[metric_type] = dict(gt=total_gt, pd=total_pd, tp=total_tp, recall=total_recall, precision=total_precision, f1=total_f1, MOTA=total_MOTA, idsw=total_idsw) if save_path: os.makedirs(os.path.join(save_path, 'metric', 'tracking'), exist_ok=True) with open(os.path.join(save_path, 'metric', 'tracking', 'metric_by_task_id.json'), 'w') as f: json.dump(metric_output, f) with open(os.path.join(save_path, 'metric', 'tracking', 'metric_summary.json'), 'w') as f: json.dump(metric_total, f) return [metric_output, metric_total]