diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27e87d2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +fullpipeline_results/* +gt/* +keys diff --git a/baseline.py b/baseline.py index 6fb7706..a430e98 100644 --- a/baseline.py +++ b/baseline.py @@ -4,10 +4,9 @@ import time from collections import deque from tqdm import tqdm -import pdb import math import base64, json, os -import openai, pdb, cv2 +import openai, cv2 from utils import img_proc_utils, mobilesam, file_utils from utils import process_utils from ast import literal_eval @@ -20,10 +19,21 @@ from PIL import Image from io import BytesIO -class VLM: +from utils.ros_vlm import VLM + +def frame_paths_from_folder(folder_path): + frames= [] + for f in os.listdir(folder_path): + frame = os.path.join(folder_path,f) + frames.append(frame) + return frames + +class Baseline_VLM: def __init__(self, config, args) -> None: self.root = args.root self.config = file_utils.load_yaml(config) + self.config['root'] = self.root + self.img_queue = deque(maxlen=self.config['exp']['prompt_img_len']) self.video_name = None self.model_name = self.config['exp']['model_name'] @@ -110,23 +120,15 @@ def create_crops(self, img_path=None): if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"): os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}") - try: - temp_crop_name_list = list() - rot_im_lst , bbox_lst, conf_lst = img_proc_utils.get_rotated_image_crops(img_path, self.crop_model) - for idx, rot_im in enumerate(rot_im_lst): - cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) - self.img_dict['full'] = f"{img_path}" - temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") - self.img_dict['rot_crops'] = temp_crop_name_list - self.img_dict['bbox'] = bbox_lst - self.img_dict['conf'] = conf_lst - - except Exception as e: - print(e) - print(f"while running {self.config['sam']['model_name']} on {img_path}") - print('cropping messup - using full image for this!') - print('not saved any rotation img as NONE generated') - pdb.set_trace() + temp_crop_name_list = list() + rot_im_lst , bbox_lst, conf_lst = img_proc_utils.get_rotated_image_crops(img_path, self.crop_model) + for idx, rot_im in enumerate(rot_im_lst): + cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) + self.img_dict['full'] = f"{img_path}" + temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") + self.img_dict['rot_crops'] = temp_crop_name_list + self.img_dict['bbox'] = bbox_lst + self.img_dict['conf'] = conf_lst elif isinstance(img_path, deque): for x, img_p in enumerate(img_path): @@ -134,25 +136,17 @@ def create_crops(self, img_path=None): if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"): os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}") - try: - temp_crop_name_list = list() - rot_im_lst , bbox_lst , conf_lst = img_proc_utils.get_image_crops(img_p, self.crop_model) - for idx, rot_im in enumerate(rot_im_lst): - cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) - self.img_dict[x]['full'] = f"{img_p}" - temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") - self.img_dict[x]['rot_crops'] = temp_crop_name_list - self.img_dict[x]['bbox'] = bbox_lst - self.img_dict[x]['conf'] = conf_lst - - except Exception as e: - print(e) - print(f"UNSUCCESS-- at crop generation") - print(f"while running {self.config['sam']['model_name']} on {img_p}") - print('cropping messup - using full image for this!') - print('not saved any rotation img as NONE generated') - pdb.set_trace() + temp_crop_name_list = list() + rot_im_lst , bbox_lst , conf_lst = img_proc_utils.get_image_crops(img_p, self.crop_model) + for idx, rot_im in enumerate(rot_im_lst): + cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) + self.img_dict[x]['full'] = f"{img_p}" + temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") + self.img_dict[x]['rot_crops'] = temp_crop_name_list + self.img_dict[x]['bbox'] = bbox_lst + self.img_dict[x]['conf'] = conf_lst + def create_message(self, image_path): self.create_prompt() @@ -205,13 +199,6 @@ def create_message(self, image_path): all_messages.append(messages) return all_messages - def frame_paths_from_folder(self, folder_path): - frames= [] - for f in os.listdir(folder_path): - frame = os.path.join(folder_path,f) - frames.append(frame) - return frames - def post_process_gemini_response(self, text, width, height): lines = text.splitlines() for i, line in enumerate(lines): @@ -232,10 +219,12 @@ def process_gpt_output(self, resp): temp_3 = [te.replace('python','') for te in temp_2] temp_4 = [te.replace('\t','') for te in temp_3] temp_final = [json.loads(te) for te in temp_4] + # print(">>>") + # print(temp_final) + # print("<<<") return temp_final def prompt_model(self, image_path): - if self.config['exp']['prompt_img_len'] == 1: self.list_message = self.create_message(image_path) mega_resp = [] @@ -250,6 +239,8 @@ def prompt_model(self, image_path): n=self.config['exp']['voting_iter_count'] ) prc_resp = self.process_gpt_output(completion.choices) + print(prc_resp) + print(type(prc_resp), type(prc_resp[0])) mega_resp.append(prc_resp) break except Exception as e: @@ -275,23 +266,24 @@ def get_image_string(self, image_path): parser = argparse.ArgumentParser(description="baseline") parser.add_argument('--root', type=str, help='/path/to/Sign-Understanding') args = parser.parse_args() + + root = args.root + while True: - try: - r = input("Recognition or Full-Pipeline Evaluation? R/F") - if r.upper() == 'R': - config = os.path.join(root, 'config/recognition_eval_config.yaml') - elif r.upper() == 'F': - config = os.path.join(root,'config/full_pipeline_eval_config.yaml') + r = input("Recognition or Full-Pipeline Evaluation? R/F\n") + if r.upper() == 'R': + config = os.path.join(root, 'config/recognition_eval_config.yaml') + break + elif r.upper() == 'F': + config = os.path.join(root,'config/full_pipeline_eval_config.yaml') break - except Exception as e: - print('Please enter valid response....') + print('Please enter valid response....') - vlm = VLM(config=config, args) + vlm = VLM(config, args) + # vlm = VLM(config) print(f"You are using this config: {config}") print(f"You are using this model: {vlm.config['exp']['model_name']} and version {vlm.config['exp']['model_version']}") print(f"You are using this prompt: {vlm.config['exp']['prompt_file']} and symbol_list {vlm.config['exp']['symbols']}") - print('Do you agree (c) or disagree (q)?') - pdb.set_trace() confidence_tries = vlm.config['exp']['voting_iter_count'] if vlm.config['exp']['source'] == 'selected-frames' and vlm.config['name'] == 'recognition': @@ -299,7 +291,8 @@ def get_image_string(self, image_path): elif vlm.config['exp']['source'] == 'selected-frames' and vlm.config['name'] == 'full-pipeline': names = [f"{vlm.config['exp']['crop_gen_model']}-{vlm.config['exp']['model_name']}"] - for nm in tqdm(names): + print("Starting eval...") + for nm in names: vlm.video_name = nm if vlm.config['name'] == 'full-pipeline': vlm.crop_model.video_name = nm @@ -327,7 +320,7 @@ def get_image_string(self, image_path): 'symbol labels' : ann['symbol labels']}) bbox_gt_dict[item['imagePath']] = gt_boxes #xyxy list gt_resp_dict[item['imagePath']] = recg_ann - frame_paths = vlm.frame_paths_from_folder(all_frame_folder) + frame_paths = frame_paths_from_folder(all_frame_folder) base = 0 correct = 0 @@ -335,16 +328,28 @@ def get_image_string(self, image_path): match_history = list() vlm.img_queue = deque(maxlen=vlm.config['exp']['prompt_img_len']) + print(f"Evaluating {len(frame_paths)} frames...") bbox_preds = dict() for cnt, frame_path in tqdm(enumerate(frame_paths)): - if cnt == 1: - break result = dict() vlm_decider_flag = True vlm.img_queue.append(frame_path) vlm.img_dict = {i: {'full': None, 'rot_crops': None, 'bbox': None} for i in range(len(vlm.img_queue))} vlm.last_message = None - resp = vlm.prompt_model(vlm.img_queue) + # resp = vlm.prompt_model(vlm.img_queue) + + if vlm.config['exp']['rot_crops']: + if len(vlm.img_queue) != 1: + raise NotImplementedError + # Should only take a single image path + tmp_deq = deque(maxlen=1) + tmp_deq.append(frame_path) + vlm.create_crops(tmp_deq) + + resp = [] + for rot_im in vlm.img_dict[0]['rot_crops']: + individual_resp = vlm.prompt_model([rot_im], return_json=False) + resp.append(individual_resp) outputs = [] for r in resp: diff --git a/utils/file_utils.py b/utils/file_utils.py index b30f20d..5cea766 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -1,14 +1,8 @@ import yaml, json, os def load_yaml(filepath): - try: - with open(f"{filepath}", "r") as file: - # print(os.getcwd()) - data = yaml.safe_load(file) # Use safe_load to avoid potential security issues - # print(data) - except FileNotFoundError: - print(os.getcwd()) - print("File not found.") + with open(f"{filepath}", "r") as file: + data = yaml.safe_load(file) # Use safe_load to avoid potential security issues return data def save_file_json(file_path, data): @@ -40,4 +34,4 @@ def read_json(filepath): def makeCheck(fol_path): if not os.path.exists(fol_path): - os.makedirs(fol_path) \ No newline at end of file + os.makedirs(fol_path) diff --git a/utils/img_proc_utils.py b/utils/img_proc_utils.py index 5362f72..98f5cce 100644 --- a/utils/img_proc_utils.py +++ b/utils/img_proc_utils.py @@ -1,8 +1,6 @@ import numpy as np from scipy.ndimage import label, center_of_mass -import cv2, os, pdb -from utils.mobilesam import GroundedSAM -from utils import file_utils +import cv2 def crop_buffer_bbox(img_path, bbox_cords, buffer = 10): ''' @@ -75,4 +73,86 @@ def greedy_match(preds, gts, iou_threshold=0.75): matched_pred_indices.add(i) matched_gt_indices.add(j) - return matches \ No newline at end of file + return matches + + +def convert_to_binary(img, bbox=None, mode = "bbox"): + if mode == "bbox": + assert len(bbox), 'provide bbox if you choose bbox type for binary conversion' + img_bin = np.zeros((img.shape[0], img.shape[1]), dtype=np.uint8) + x_min , y_min, x_max , y_max = bbox + img_bin[int(y_min):int(y_max) + 1 , int(x_min):int(x_max)+1] = 255 + return img_bin + + elif mode == 'irregular': + raise NotImplementedError + +def calculate_orientation(binary_image): + """ + Calculate the orientation vector of a 2D shape in a binary image. + Returns angle in radians and the unit vector of orientation. + """ + # Calculate moments + y_coords, x_coords = np.nonzero(binary_image) + x_bar, y_bar = np.mean(x_coords), np.mean(y_coords) + + # Calculate central moments + u20 = np.sum((x_coords - x_bar) ** 2) + u02 = np.sum((y_coords - y_bar) ** 2) + u11 = np.sum((x_coords - x_bar) * (y_coords - y_bar)) + + # Calculate orientation angle + theta = 0.5 * np.arctan2(2 * u11, u20 - u02) + + # Calculate unit vector + direction_vector = np.array([np.cos(theta), np.sin(theta)]) + + return theta, direction_vector + +def get_shape_properties(binary_image): + """ + Get basic properties of the shape including centroid and orientation. + """ + # Find centroid + labeled_array, num_features = label(binary_image) + cy , cx = center_of_mass(binary_image) + + # Get orientation + theta, direction = calculate_orientation(binary_image) + + return { + 'centroid': (cx, cy), + 'angle_rad': theta, + 'angle_deg': np.degrees(theta), + 'direction_vector': direction + } + +def rotate_sign_to_align_bbox(crop_img, bbox_cords1 , irregular_binary_mask, ablation = False): + x_min , y_min, x_max , y_max = bbox_cords1 # regular shape + bbox_binary = convert_to_binary(crop_img, bbox_cords1) + center = get_shape_properties(irregular_binary_mask)['centroid'] # center of irregular + angle_bbox = np.degrees(calculate_orientation(bbox_binary)[0]) + angle_irr = np.degrees(calculate_orientation(irregular_binary_mask)[0]) + # print(f"BBOX ANGLE: {angle_bbox}") + # print(f"IRR ANAGLE: {angle_irr}") + if angle_irr > 0 : + angle = min(10,angle_irr) + else: + angle = max(-10,angle_irr) + + scale = 1 + rotation_matrix = cv2.getRotationMatrix2D(center, angle, scale) + rotated_image = cv2.warpAffine(crop_img, rotation_matrix, (crop_img.shape[1], crop_img.shape[0])) + if ablation: + return angle_irr, crop_img + return angle_irr, rotated_image + +def get_rotated_image_crops(img_path, crop_model, ocr_map_queue=None, ablation=True): + #ablation True means we are not canonicalizing the crop + ctd, area = crop_model.execute_model(img_path, ocr_map_queue, type='box') + crop_img = crop_buffer_bbox(img_path, crop_model.detections.xyxy[0]) + # if not ablation: + # crop_model.execute_model(crop_img,ocr_map_queue, type='mask') + irregular_binary_mask = crop_model.detections.mask[0].astype(np.uint8)*255 + ang , rot_img = rotate_sign_to_align_bbox(crop_img, crop_model.detections.xyxy[0], irregular_binary_mask, ablation) + return ctd, area, ang, rot_img \ No newline at end of file diff --git a/utils/mobilesam.py b/utils/mobilesam.py index 043682d..9d23cb9 100644 --- a/utils/mobilesam.py +++ b/utils/mobilesam.py @@ -9,7 +9,7 @@ class GroundedSAM: - def __init__(self, config, args, model ='mobile_sam'): + def __init__(self, config, model ='mobile_sam'): self.config = config self.model_name = model self.video_name = None @@ -84,10 +84,10 @@ def get_masks(self, image): def save_binary_annotations(self): self.binary_mask = self.detections.mask[0].astype(np.uint8)*255 - if not os.path.exists(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/"): - os.makedirs(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/") + if not os.path.exists(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/"): + os.makedirs(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/") - cv2.imwrite(f"{args.root}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", self.binary_mask) + cv2.imwrite(f"{self.config['root']}/{self.config['sam']['output_binary_mask_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", self.binary_mask) def save_rgb_annotations(self, image): box_annotator = sv.BoxAnnotator() @@ -99,10 +99,10 @@ def save_rgb_annotations(self, image): in self.detections] annotated_frame = box_annotator.annotate(scene=image.copy(), detections=self.detections) annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=self.detections, labels=labels) - if not os.path.exists(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/"): - os.makedirs(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/") + if not os.path.exists(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/"): + os.makedirs(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/") - cv2.imwrite(f"{args.root}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", annotated_frame) + cv2.imwrite(f"{self.config['root']}/{self.config['sam']['output_ann_box_folder']}/{self.video_name}/{os.path.basename(self.temp_image_path)}", annotated_frame) def max_conf_process(self): conf = np.array([c for c in self.detections.confidence]) @@ -158,4 +158,4 @@ def largest_box_process(self): self.largest_area = areas[idx] self.detections.confidence = np.array([self.detections.confidence[idx]]) self.detections.class_id = np.array([self.detections.class_id[idx]]) - self.detections.xyxy = np.reshape(self.detections.xyxy[idx], (1,4)) \ No newline at end of file + self.detections.xyxy = np.reshape(self.detections.xyxy[idx], (1,4)) diff --git a/utils/ros_vlm.py b/utils/ros_vlm.py new file mode 100644 index 0000000..6bbb2af --- /dev/null +++ b/utils/ros_vlm.py @@ -0,0 +1,233 @@ +from collections import deque, Counter +import openai +from openai import OpenAI +import base64, time +import os, cv2, json +# from cv_bridge import CvBridge +from utils import file_utils, img_proc_utils, mobilesam + +class VLM: + def __init__(self, config, args=None) -> None: + self.config = file_utils.load_yaml(config) + if 'root' not in self.config.keys(): + self.config['root'] = args.root + if 'pkg_path' not in self.config.keys(): + self.config['pkg_path'] = '' + self.root = self.config['root'] + self.model_name = self.config['exp']['model_name'] + self.max_retry_count = self.config['exp']['retry_count'] + self.last_message = None + self.setup_gen_model() + # self.bridge = CvBridge() + if self.config['name'] == 'full-pipeline': + self.setup_crop_model() + + def setup_crop_model(self): + if self.config['exp']['crop_gen_model'] == 'gemini-2.0-flash': + self.crop_model = genai.Client(api_key = file_utils.load_yaml(os.path.join(self.root,self.config['exp']['gemini_detection_api_key']))['api_key']) + self.crop_safety_settings = [ + types.SafetySetting( + category="HARM_CATEGORY_DANGEROUS_CONTENT", + threshold="BLOCK_ONLY_HIGH", + ), + ] + elif self.config['exp']['crop_gen_model'] == 'g-dino': + groundedsam = mobilesam.GroundedSAM(self.config) + self.crop_model = groundedsam + + def setup_gen_model(self): + if self.model_name == 'gemini': + api_key = file_utils.load_yaml(os.path.join(self.config['root'],self.config['exp']['gemini_api_key_path']))['api_key'] + self.client = OpenAI(api_key = api_key, base_url = "https://generativelanguage.googleapis.com/v1beta/openai/") + elif self.model_name == 'openai': + api_key = file_utils.load_yaml(os.path.join(self.config['root'],self.config['exp']['openai_api_key_path']))['api_key'] + self.client = openai + self.client.api_key = api_key + + def encode_rosmsg_image(self, rosmsg_image): + cv_image = self.bridge.imgmsg_to_cv2(rosmsg_image,desired_encoding='passthrough') + _, buffer = cv2.imencode('.jpg', cv_image) + return base64.b64encode(buffer).decode("utf-8") + + def encode_image_path(self, image_path): + with open(f'{image_path}', "rb") as image_file: + return base64.b64encode(image_file.read()).decode("utf-8") + + def get_image_string(self, image): + if isinstance(image, str): + # An image path + encoded = self.encode_image_path(image) + else: + # ROS Image message + encoded = self.encode_rosmsg_image(image) + return f"data:image/jpg;base64,{encoded}" + + def create_prompt(self): + filedata = file_utils.read_prompt(os.path.join(self.config['root'],self.config['pkg_path'], self.config['exp']['prompt_file'])) + filedata = filedata.replace('REPLACE_DIRECTION_LIST', f"{self.config['exp']['directions']}") + self.prompt = filedata + + # def create_temporal_message(self, arm_image_queue): + # self.create_prompt() + # content_msg = [{ + # "type": "text", + # "text": f"{self.prompt}" + # }] + # for i in range(len(arm_image_queue)): + # content_msg.append({ + # "type": "image_url", + # "image_url": { "url": self.get_image_string(arm_image_queue[i]), "detail": "high"} + # }) + + # if self.config['exp']['rot_crops']: + + # messages=[ + # {"role": "system", "content": "You are a helpful assistant capable of understanding navigational signs."}, + # { + # "role": "user", + # "content": content_msg + # } + # ] + + # else: + # raise NotImplementedError + + # return messages + + def create_temporal_message(self, arm_image_queue): + self.create_prompt() + content_msg = [{ + "type": "text", + "text": f"{self.prompt}" + }] + for i in range(len(arm_image_queue)): + content_msg.append({ + "type": "image_url", + "image_url": { "url": self.get_image_string(arm_image_queue[i]), "detail": "high"} + }) + + if self.config['exp']['rot_crops']: + + messages=[ + {"role": "system", "content": "You are a helpful assistant capable of understanding navigational signs."}, + { + "role": "user", + "content": content_msg + } + ] + + else: + raise NotImplementedError + + return messages + + def generative_prompting(self, arm_image_queue, retry_count): + if not retry_count: + self.last_message = self.create_temporal_message(arm_image_queue) + completion = self.client.chat.completions.create( + model=self.config['exp']['model_version'], + messages = self.last_message, + n=self.config['exp']['voting_iter_count'] + ) + + temp_final = self.process_gpt_output(completion.choices, type='gen') + return temp_final + + def process_gpt_output(self, resp, type='discr'): + temp = [choice.message.content.replace('\n', '').replace('#','') for choice in resp] + temp_1 = [te.replace('json','') for te in temp] + temp_2 = [te.replace('```','') for te in temp_1] + temp_3 = [te.replace('python','') for te in temp_2] + temp_4 = [te.replace('\t','') for te in temp_3] + if type == 'discr': + temp_final = [json.loads(te) for te in temp_4] + dr_lst = [] + for dic in temp_final: + assert dic['direction'].upper() in self.config['exp']['directions'], print('Got unknown direction ... reprompting') + dr_lst.append(dic['direction'].upper()) + #if the response was a tuple list + # dr_lst = [] + # for re in temp_4: + # re = ast.literal_eval(re) + # assert re[1].upper() in self.config['exp']['directions'], pdb.set_trace() + # dr_lst.append(re[1].upper()) + temp_final_0 = Counter(dr_lst).most_common() + temp_final = [(t[0],t[1]/self.config['exp']['discr_iter_count']) for t in temp_final_0] + + elif type =='gen': + temp_final = [json.loads(te) for te in temp_4] + # print(">>>") + # print(temp_final) + # print("<<<") + # for dic in temp_final: + # for _,v in dic.items(): + # if isinstance(v,str): + # assert v.upper() in self.config['exp']['directions'], print('Wrong Direction Output... reprompting....') + # elif isinstance(v,list) or isinstance(v,tuple): + # raise ValueError('Got a tuple.. not expected ... we reprompt') + # # for vi in v: + # # # print(vi) + # # assert vi.upper() in self.config['exp']['directions'], print('Wrong Direction Output... reprompting....') + # else: + # raise ValueError('Got a dict.. not expected ... we reprompt') + return temp_final + + def prompt_model(self, crop_image_queue, return_json=True): + retry_count = 0 + while retry_count < self.max_retry_count: + try: + #step-1 generative prompting + gen_temp_final = self.generative_prompting(crop_image_queue, retry_count) + print(gen_temp_final) + break + + except Exception as e: + print(e) + print('failed to get a py dict : re-prompting....') + retry_count += 1 + + if retry_count >= self.max_retry_count: + print(f'prompting failure') + gen_temp_final = -1 + + if return_json: + return json.dumps(gen_temp_final) + else: + return gen_temp_final + + def create_crops(self, img_path=None): + '''uses your crop model to create crops of navigational sign boards that are fed to VLM''' + if self.config['exp']['crop_gen_model'] == 'gemini-2.0-flash': + raise NotImplementedError + + elif self.config['exp']['crop_gen_model'] == 'g-dino': + if isinstance(img_path, str): + rot_img_path = f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}/{os.path.basename(img_path)}" + if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"): + os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}") + + temp_crop_name_list = list() + rot_im_lst , bbox_lst, conf_lst = img_proc_utils.get_rotated_image_crops(img_path, self.crop_model) + for idx, rot_im in enumerate(rot_im_lst): + cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) + self.img_dict['full'] = f"{img_path}" + temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") + self.img_dict['rot_crops'] = temp_crop_name_list + self.img_dict['bbox'] = bbox_lst + self.img_dict['conf'] = conf_lst + + elif isinstance(img_path, deque): + for x, img_p in enumerate(img_path): + rot_img_path = f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}/{os.path.basename(img_p)}" + if not os.path.exists(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}"): + os.makedirs(f"{self.root}/{self.config['sam']['output_rotated_crop_folder']}/{self.video_name}") + + temp_crop_name_list = list() + rot_im_lst , bbox_lst , conf_lst = img_proc_utils.get_image_crops(img_p, self.crop_model) + for idx, rot_im in enumerate(rot_im_lst): + cv2.imwrite(f"{rot_img_path[:-4]}_{idx}.jpg", rot_im) + self.img_dict[x]['full'] = f"{img_p}" + temp_crop_name_list.append(f"{rot_img_path[:-4]}_{idx}.jpg") + self.img_dict[x]['rot_crops'] = temp_crop_name_list + self.img_dict[x]['bbox'] = bbox_lst + self.img_dict[x]['conf'] = conf_lst