Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ camera_index: 0
# Link to camera feed to be used for video capture
# camera_index: http://unctalos.student.rit.edu:5000/video_feed

tracking_priority: 'largest' # Determines which bounding box to track when multiple are detected
acceptable_box_percent: 0.4 # Percentage of the frame width and height used to define the size of the acceptable box
vertical_field_of_view: 48 # vertical FOV of camera being used
horizontal_field_of_view: 89 # horizontal FOV of camera being used
Expand Down
3 changes: 3 additions & 0 deletions src/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class Connection:
port: int
video_connection: VideoConnection
publisher: Publisher = field(init=False)
tracking_priority: str = field(
default_factory=lambda: load_config().get("tracking_priority", "largest") #default tracking priority will be largest bbox
)
is_manual: bool = True
is_manual_only: bool = field(
default_factory=lambda: load_config().get("default_manual_only", False)
Expand Down
42 changes: 37 additions & 5 deletions src/directors/base_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ControlFeed:
manual: bool
frame_shape: tuple
publisher: Publisher
tracking_priority: str


class BaseDirector(ABC):
Expand All @@ -40,6 +41,7 @@ def __init__(
manual=conn.is_manual,
frame_shape=shape,
publisher=conn.publisher,
tracking_priority=conn.tracking_priority,
)
else:
print(
Expand All @@ -50,10 +52,10 @@ def __init__(
self.start_auto_control()

def add_control_feed(
self, host: str, manual: bool, frame_shape: tuple, publisher: Publisher
self, host: str, manual: bool, frame_shape: tuple, publisher: Publisher, tracking_priority: str
) -> ControlFeed:
cf = ControlFeed(
host=host, manual=manual, frame_shape=frame_shape, publisher=publisher
host=host, manual=manual, frame_shape=frame_shape, publisher=publisher, tracking_priority=tracking_priority
)
self.control_feeds[host] = cf
if self.scheduler is not None and self.control_task is None:
Expand Down Expand Up @@ -82,11 +84,18 @@ def process_frame(
raise NotImplementedError("Subclasses must implement this method.")

def track_obj(self) -> Any | None:
bboxes = self.tracker.get_bboxes()
for host, bbox in bboxes.items():
if host in self.control_feeds and bbox is not None and len(bbox) > 0:
all_bboxes = self.tracker.get_bboxes()
for host, bboxes in all_bboxes.items():
if host in self.control_feeds and bboxes is not None and len(bboxes) > 0:
if self.control_feeds[host].manual:
continue # skip manual feeds
bbox = bboxes[0]
if len(bboxes) > 1: # Find which bbox to track
match self.control_feeds[host].tracking_priority:
case "largest":
bbox = self.track_largest(bboxes)
case "smallest":
bbox = self.track_smallest(bboxes)
return self.process_frame(
host,
bbox,
Expand Down Expand Up @@ -114,3 +123,26 @@ def stop_auto_control(self) -> None:
if self._term is not None:
remove_termination_handler(self._term)
self._term = None

def track_largest(self, bboxes: list[list[int]]) -> list[int]:
'''Returns the largest bounding box from a list of bounding boxes.'''
largest_bbox = bboxes[0]
largest_area = (largest_bbox[2] - largest_bbox[0]) * (largest_bbox[3] - largest_bbox[1])
for bbox in bboxes:
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
if bbox_area > largest_area:
largest_bbox = bbox
largest_area = bbox_area
return largest_bbox

def track_smallest(self, bboxes: list[list[int]]) -> list[int]:
'''Returns the smallest bounding box from a list of bounding boxes.'''
smallest_bbox = bboxes[0]
smallest_area = (smallest_bbox[2] - smallest_bbox[0]) * (smallest_bbox[3] - smallest_bbox[1])
for bbox in bboxes:
bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
if bbox_area < smallest_area:
smallest_bbox = bbox
smallest_area = bbox_area
return smallest_bbox

7 changes: 3 additions & 4 deletions src/directors/continuous_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,18 @@ def process_frame(
acceptable_box_bottom,
) = calculate_acceptable_box(frame_width, frame_height)

# C alculate where the middle point of the bounding box lies in relation to the box
# Calculate where the middle point of the bounding box lies in relation to the box
# Unpack bounding box
# If there is a single speaker it should return one bounding box anyway
first_face = bounding_box[0]
x, y, w, h = first_face
x, y, w, h = bounding_box

top = y
bottom = y + h
center = frame_height // 2
average = (top + bottom) // 2

# Calculate the center of the bounding box
bbox_center_x, bbox_center_y = calculate_center_bbox(first_face)
bbox_center_x, bbox_center_y = calculate_center_bbox(bounding_box)

# Are we inside the acceptable box
if (
Expand Down
2 changes: 1 addition & 1 deletion src/directors/discrete_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def process_frame(
# TODO: Right now I am going to assume we only want the first face

# Calculate the center of the bounding box
bbox_center_x, bbox_center_y = calculate_center_bbox(bounding_box[0])
bbox_center_x, bbox_center_y = calculate_center_bbox(bounding_box)

# Are we inside the acceptable box
if (
Expand Down
4 changes: 2 additions & 2 deletions src/talos_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def open_connection(
conf = self.config.get(hostname, {})
camera = conf["camera_index"] if camera is None else camera
vid_conn = self.tracker.add_capture(hostname, camera)
conn = Connection(hostname, port or conf["socket_port"], vid_conn)
conn = Connection(hostname, port or conf["socket_port"], vid_conn, conf["tracking_priority"])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
conn = Connection(hostname, port or conf["socket_port"], vid_conn, conf["tracking_priority"])
conn = Connection(hostname, port or conf["socket_port"], vid_conn, conf.get("tracking_priority", "largest")

self.connections[hostname] = conn
self.set_active_connection(hostname)
if write_config:
self.config = load_config()
if self.director is not None and vid_conn.shape is not None:
self.director.add_control_feed(
hostname, conn.is_manual, vid_conn.shape, conn.publisher
hostname, conn.is_manual, vid_conn.shape, conn.publisher, conn.tracking_priority
)

def start_move(self, direction: Direction) -> None:
Expand Down
47 changes: 45 additions & 2 deletions src/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _detect_person_worker(

# Class for handling video feed and object detection model usage
class Tracker:
speaker_bbox: tuple[int, int, int, int] | None = None
speaker_bbox: list[int] | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change

config: dict = load_config()
default_config: dict = load_default_config()
max_fps = 1 # this will be set dynamically based on captures
Expand Down Expand Up @@ -189,6 +189,7 @@ def poll_bboxes(self) -> None:

bboxes_by_host: dict = {host: [] for host, _ in self.frame_order}

# Return the bboxes straightaway if there's only one host
if 1 == len(self.frame_order):
(host, _) = self.frame_order[0]
self._bboxes = {host: raw_bboxes}
Expand All @@ -209,7 +210,49 @@ def poll_bboxes(self) -> None:
[max(0, x1 - dx), y1, max(0, x2 - dx), y2]
)

self._bboxes = bboxes_by_host
# culled_bboxes: dict[str, ] = self.cull_bboxes(bboxes_by_host)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete commented out code


# self._bboxes = culled_bboxes

# def cull_bboxes(self, bboxes_by_host) -> dict[str, ]:
# print(f"before: {bboxes_by_host}")
# # Eliminate hosts that are in manual mode, and find the relevant bbox to track for those with multiple in frame
# culled_bboxes = dict()
# for host, bboxes in bboxes_by_host.items():
# if self.config[host].manual:
# continue
# if len(bboxes) > 1:
# match self.config[host].tracking_priority:
# case "largest":
# bbox = self.track_largest(bboxes)
# case "smallest":
# bbox = self.track_smallest(bboxes)
# culled_bboxes[host] = bbox
# else:
# culled_bboxes[host] = bboxes
# print(f"after: {bboxes_by_host}")

# def track_largest(self, bboxes: list[list[int]]) -> list[int]:
# '''Returns the largest bounding box from a list of bounding boxes.'''
# largest_bbox = bboxes[0]
# largest_area = (largest_bbox[2] - largest_bbox[0]) * (largest_bbox[3] - largest_bbox[1])
# for bbox in bboxes:
# bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
# if bbox_area > largest_area:
# largest_bbox = bbox
# largest_area = bbox_area
# return largest_bbox

# def track_smallest(self, bboxes: list[list[int]]) -> list[int]:
# '''Returns the smallest bounding box from a list of bounding boxes.'''
# smallest_bbox = bboxes[0]
# smallest_area = (smallest_bbox[2] - smallest_bbox[0]) * (smallest_bbox[3] - smallest_bbox[1])
# for bbox in bboxes:
# bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
# if bbox_area < smallest_area:
# smallest_bbox = bbox
# smallest_area = bbox_area
# return smallest_bbox

def get_total_frame_shape(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def calculate_center_box(x, y, w, h):
return (x + w) // 2, (y + h) // 2


def calculate_center_bbox(bbox: tuple[int, int, int, int]):
def calculate_center_bbox(bbox: list[int]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this back

return calculate_center_box(*bbox)


Expand Down