Source code for act_visualizer

# -*- coding: utf-8 -*-
"""
Kwiver process to visualize act predictions

@author: Ameya Shringi
"""

#kwiver/sprokit import
from sprokit.pipeline import process
from kwiver.kwiver_process import KwiverProcess
from vital.types import ImageContainer, DetectedObjectType, ObjectTrackSet
from vital.util import VitalPIL

# ACT imports
from exp_config import experiment_config, expcfg_from_file
from virat_dataset import ViratDataset

import threading
import os
import cv2
import numpy as np
import json
from PIL import Image

# Global colours for 19 activities
ACTIVITY_COLORS = [
                    (51, 0, 0),
                    (153, 0, 0),
                    (102, 51, 0),
                    (200, 0, 0),
                    (51, 51, 0),
                    (25, 51, 0),
                    (16, 148, 46),
                    (119, 124, 57),
                    (0, 102, 51),
                    (0, 102, 102),
                    (0, 76, 153),
                    (0, 0, 204),
                    (25, 0, 51),
                    (0, 0, 153),
                    (102, 0, 204),
                    (102, 0, 102),
                    (153, 0, 76),
                    (88, 10, 49),
                    (31, 96,119)
                ]

[docs]class ACTVisualizer(KwiverProcess): """ Render ``track_object_set`` from ACT and ``detected_object_set`` from object detector on an image .. note:: The tracks visualized are intermediate tracks and not the final track. * Input Ports: * ``image`` Input RGB image used by ACT (Required) * ``timestamp`` Timestamp associated with the image (Required) * ``object_track_set`` Tracks obtained from ACT (Required) * ``detected_object_set`` Detections obtained from object detector (Optional) * Output Ports: * ``image`` Output image with bounding box representing spatial localization of an activity * Configuration: * ``exp`` Experiment configuration for ACT (Eg. `exp.yml`_) * ``is_aod`` Flag to specify the task for which ACT is used (Default=False) .. Repo Links .. _exp.yml: https://gitlab.kitware.com/kwiver/act_detector/blob/act-detector/virat-act-detector-scripts/rgb_actev.yml """ def _parse_bool(self, bool_str): """ Helper function parse boolean string :param bool_str: boolean string that must be parsed :return boolean associated with the string """ if bool_str == "True": return True else: return False def __init__(self, conf): """ Constructor for ACT visualizer :param conf: Configuration for ACT visualizer :return None """ KwiverProcess.__init__(self, conf) self.add_config_trait("exp", "exp", "experiment.yml", "experiment configuration") self.declare_config_using_trait("exp") self.add_config_trait("is_aod", "is_aod", "False", "Flag to specify if AD/AOD annotation are generated") self.declare_config_using_trait("is_aod") # set up required flags required = process.PortFlags() required.add(self.flag_required) optional = process.PortFlags() self.declare_input_port_using_trait("image", required) self.declare_input_port_using_trait("timestamp", required) self.declare_input_port_using_trait("object_track_set", required) self.is_aod = self._parse_bool(self.config_value("is_aod")) self.declare_input_port_using_trait("detected_object_set", optional) self.declare_output_port_using_trait("image", optional) self.object_track_set = ObjectTrackSet() def _configure(self): """ Configure ACT process """ expcfg_from_file(self.config_value("exp")) self.virat_dataset = ViratDataset( experiment_config.data.frame_roots, experiment_config.data.flow_roots, experiment_config.data.train_annotation_dirs, experiment_config.data.test_annotation_dirs, experiment_config.data.class_index, experiment_config.data.save_directory, experiment_config.train.kpf_mode, experiment_config.train.json_mode, experiment_config.data.save_prefix) def _draw_trajectory(self, image, detection): """ Helper function to draw small circle at center of detection on a image :param image: Input image :param detection: Input detection :return Image with small circle computed using the detection """ x_coor = detection.bounding_box().max_x() y_coor = float(detection.bounding_box().min_y() + \ detection.bounding_box().max_y())/2 cv2.circle(image, tuple(map(int,(x_coor, y_coor))), 2, (255, 0, 0) ) return image def _draw_detection(self, image, detection, is_activity): """ Helper function to draw activity and object detection on an image :param image: Input image :param detection: Instance of detection (Activity/object) :param is_activity: flag to specify is the detection is from an activity :return Image with bounding box rendered """ if is_activity: label_id = int(detection.type().get_most_likely_class()) label = self.virat_dataset.labels.keys()[self.\ virat_dataset.labels.values().index(label_id)] color = ACTIVITY_COLORS[label_id-1] else: label = detection.type().get_most_likely_class() color = (32, 32, 32) cv2.rectangle(image, tuple(map(int, (detection.bounding_box().min_x(), detection.bounding_box().max_y()))), tuple(map(int, (detection.bounding_box().max_x(), detection.bounding_box().min_y()))), color, 3) font = cv2.FONT_HERSHEY_SIMPLEX label += " ({:4.4f})".format(detection.confidence()) text_size, _ = cv2.getTextSize(label, font, 0.5, 1) text_width, text_height = text_size label_top = detection.bounding_box().max_x() - text_width - 2, \ detection.bounding_box().min_y() - text_height - 2 if label_top[0] > 0 and label_top[1] > 0: # draw the other box cv2.rectangle(image, tuple(map(int, label_top)), tuple(map(int, ( label_top[0] + text_width + 2, label_top[1] + text_height + 2 ))), color, cv2.FILLED) cv2.putText(image, label, tuple(map(int, (label_top[0]+1, label_top[1]+text_height+1))), font, 0.5, (255, 255, 255), 1) return image def _draw_track(self, image, track): """ Draw activity track on an image :param image: Input image :param track: Instance of track :return Image with track rendered """ label_id = int(track[track.last_frame].detection.type().get_most_likely_class()) if label_id == 0: return image if track.size > 1: for track_id in range(track.first_frame, track.last_frame): image = self._draw_trajectory(image, track[track_id].detection) image = self._draw_detection(image, track[track.last_frame].detection, True) return image def _step(self): """ Step function for the visualizer """ object_track_set = self.grab_input_using_trait('object_track_set') timestamp = self.grab_input_using_trait('timestamp') image = self.grab_input_using_trait('image').image().asarray().astype(np.uint8) image = cv2.resize(image, (1920, 1080)) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if self.is_aod: detected_object_set = self.grab_input_using_trait('detected_object_set') else: detected_object_set = None # Draw tracks if len(object_track_set) > 0 and timestamp.get_frame() > 1: for track in object_track_set.tracks(): image = self._draw_track(image, track) self.object_track_set = object_track_set elif len(object_track_set) == 0 and timestamp.get_frame() > 1: for track in self.object_track_set.tracks(): image = self._draw_track(image, track) # Draw detected object set if self.is_aod: if len(detected_object_set) > 0: for detected_object in detected_object_set: image = self._draw_detection(image, detected_object, False) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image) vital_image = VitalPIL.from_pil(pil_image) image_container = ImageContainer(vital_image) self.push_to_port_using_trait("image", image_container)
def __sprokit_register__(): """ Sprokit registration for the process """ from sprokit.pipeline import process_factory module_name = 'python:kwiver.ACTVisualizer' if process_factory.is_process_module_loaded(module_name): return process_factory.add_process('ACTVisualizer', 'Visualize predictions from ACT and Object detector', ACTVisualizer) process_factory.mark_process_module_as_loaded(module_name)