Source code for act_json_writer

# -*- coding: utf-8 -*-
"""
Kwiver process to write ``object_track_set`` from ACT and ``detected_object_set``
from an object detector in NIST specified JSON 
format

@author: Ameya Shringi
"""

#kwiver/sprokit import
from sprokit.pipeline import process
from kwiver.kwiver_process import KwiverProcess

# ACT imports
from exp_config import experiment_config, expcfg_from_file
from virat_dataset import ViratDataset

import threading
import os
import cv2
import numpy as np
import json


[docs]class ACTJsonWriter(KwiverProcess): """ Write ``object_track_set`` from ACT and ``detected_object_set`` from an object detector in NIST specified JSON format * Input Ports: * ``object_track_set`` Tracks obtained from ACT (Required) * ``timestamp`` Timestamp associated with the input from which tracks were obtained (Required) * ``file_name`` Name of the input source (Required) * ``detected_object_set`` Detections obtained from object detector * Output Ports: * None * Configuration: * ``exp`` Experiment configuration for ACT (Eg. `exp.yml`_) * ``is_aod`` Flag to specify the task for which ACT is used (Default=False) * ``confidence_threshold`` Lower bound for confidence associated with an activity (Default=0.2) * ``json_path`` Path to json file produced by the writer (default=sysfile.json) .. Repo Links .. _exp.yml: https://gitlab.kitware.com/kwiver/act_detector/blob/act-detector/virat-act-detector-scripts/rgb_actev.yml """ def _parse_bool(self, bool_str): """ Helper function parse boolean string :param bool_str: boolean string that must be parsed :return boolean associated with the string """ if bool_str == "True": return True else: return False def __init__(self, conf): """ Constructor for ACT json writer :param conf: Configuration for ACT json writer :return None """ KwiverProcess.__init__(self, conf) self.add_config_trait("exp", "exp", "experiment.yml", "experiment configuration") self.declare_config_using_trait("exp") self.add_config_trait("is_aod", "is_aod", "False", "Flag to specify if AD/AOD annotation are generated") self.declare_config_using_trait("is_aod") self.add_config_trait("confidence_threshold", "confidence_threshold", "0.2", "Confidence threshold for the scorer") self.declare_config_using_trait("confidence_threshold") self.add_config_trait("json_path", "json_path", "sysfile.json", "Json path for system output") self.declare_config_using_trait("json_path") # set up required flags required = process.PortFlags() required.add(self.flag_required) optional = process.PortFlags() self.declare_input_port_using_trait("object_track_set", required) self.declare_input_port_using_trait("timestamp", required) self.declare_input_port_using_trait("file_name", required) self.is_aod = self._parse_bool(self.config_value("is_aod")) self.declare_input_port_using_trait("detected_object_set", optional) self.all_files = [] def _configure(self): """ Configure ACT json writer process """ expcfg_from_file(self.config_value("exp")) if os.path.exists(self.config_value("json_path")): os.remove(self.config_value("json_path")) self.virat_dataset = ViratDataset(#experiment_config.data.data_root, experiment_config.data.frame_roots, experiment_config.data.flow_roots, experiment_config.data.train_annotation_dirs, experiment_config.data.test_annotation_dirs, experiment_config.data.class_index, experiment_config.data.save_directory, experiment_config.train.kpf_mode, experiment_config.train.json_mode, experiment_config.data.save_prefix) self.activity_id = 1 self.all_activities = [] if self.is_aod: self.detected_object_sets = {} def _tubescore(self, tube): """ Helper function to compute average scores of a track :param tube: track in form of an ndarray :return average of scores """ return np.mean(tube) def _video_name_from_path(self, video_path): """ Helper function to get video name form the path :param video_path: absolute path to the video :return name of the video """ return os.path.split(video_path)[-1] + ".mp4" def _iou(self, activity_detection, object_detection): """ Helper function to compute iou between an activity track and an object detection :activity detection: instance of activity track :object detection: instance of object detection :return iou between two detections """ min_x = max(activity_detection.min_x(), object_detection.min_x()) min_y = max(activity_detection.min_y(), object_detection.min_y()) max_x = min(activity_detection.max_x(), object_detection.max_x()) max_y = min(activity_detection.max_y(), object_detection.max_y()) if min_x > max_x or min_y > max_y: return 0.0 else: intersection = ( max_x - min_x ) * ( max_y - min_y ) union = activity_detection.area() + \ object_detection.area() - \ intersection try: iou = float(intersection)/union except ZeroDivisionError: iou = 0.0 return iou def _compute_participating_objects(self, track, video_name): """ Helper function to find out all the object detections associated with an activity track and create NIST specifi annotation for those object detections :param track: Instance of activity detection :param video_name: Input source :return List of object annotations """ object_annotations = [] object_id = 1 for track_state in track: frame_id = track_state.frame_id track_detection = track_state.detection.bounding_box() if frame_id in self.detected_object_sets.keys(): object_detections = self.detected_object_sets[frame_id] else: continue for object_detection in object_detections: if self._iou(track_detection, object_detection.bounding_box()) > 0.5: object_annotation = { "objectType": object_detection.type().get_most_likely_class(), "objectID": int(object_id), "localization": { video_name: { str(int(frame_id)): { "presenceConf": float(object_detection.confidence()), "boundingBox": { "x": int(object_detection.bounding_box().min_x()), "y": int(object_detection.bounding_box().min_y()), "w": int(object_detection.bounding_box().width()), "h": int(object_detection.bounding_box().height()) } }, str(int(frame_id + 1)): {} } } } object_id += 1 object_annotations.append(object_annotation) if len(object_annotations) > 0: print "Added object annotations" return object_annotations def _create_annotation_from_track(self, track, activity_id, video_name, is_aod=False): """ Helper function to create activity annotation from track :param track: An instance of the activity track :param activity_id: Current value of the unique id used in activity annotation :param video_name: Input source :param is_aod: Flag to specify is the annotation is for activity detection or activity object detection (default=False) :return (activity annoation, new value of activity id ) """ start_frame = track.first_frame + 1 end_frame = track.last_frame + 1 num_frames = len(track) tube_scores = np.zeros(num_frames) label = int(track[track.first_frame].detection.type().get_most_likely_class()) for track_index, track_state in enumerate(track): tube_scores[track_index] = track_state.detection.confidence() confidence = self._tubescore(tube_scores) if confidence > float(self.config_value("confidence_threshold")): if is_aod: participating_objects = self._compute_participating_objects(track, video_name) class_label = self.virat_dataset.labels.keys()[self.virat_dataset.labels.\ values().index(label)] if is_aod: if len(participating_objects) > 0: return {"activity": class_label, "activityID": int(activity_id), "presenceConf": float(confidence), "alertFrame": int(end_frame), "localization": { video_name: { str(start_frame): 1, str(end_frame): 0 } }, "objects": participating_objects }, activity_id+1 else: return None, activity_id else: return {"activity": class_label, "activityID": int(activity_id), "presenceConf": float(confidence), "alertFrame": int(end_frame), "localization": { video_name : { str(start_frame): 1, str(end_frame): 0 } } }, activity_id+1 else: return None, activity_id def _step(self): """ Step function for ACT Json Writer """ object_track_set = self.grab_input_using_trait('object_track_set') timestamp = self.grab_input_using_trait('timestamp') file_name = self.grab_input_using_trait('file_name') if self.is_aod: detected_object_set = self.grab_input_using_trait('detected_object_set') self.detected_object_sets[timestamp.get_frame()] = detected_object_set else: detected_object_set = None if len(object_track_set) > 0: for track in object_track_set.tracks(): activity_annotation, self.activity_id = \ self._create_annotation_from_track(track, self.activity_id, file_name, self.is_aod) if activity_annotation is not None: self.all_activities.append(activity_annotation) if file_name not in self.all_files: self.all_files.append(file_name) #if self.is_aod: # self.detected_object_sets = {} json_dict = {"filesProcessed": self.all_files, \ "activities": self.all_activities} with open(self.config_value("json_path"), 'w') as json_f: json.dump(json_dict, json_f, indent=4)
def __sprokit_register__(): """ Sprokit registration for the process """ from sprokit.pipeline import process_factory module_name = 'python:kwiver.ACTJsonWriter' if process_factory.is_process_module_loaded(module_name): return process_factory.add_process('ACTJsonWriter', 'Produce NIST specified JSON file for ACT', ACTJsonWriter) process_factory.mark_process_module_as_loaded(module_name)