Skip to content
1 change: 1 addition & 0 deletions entities/data/vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
BallData = namedtuple("BallData", ["x", "y", "z"])
RobotData = namedtuple("RobotData", ["x", "y", "orientation"])
FrameData = namedtuple("FrameData", ["ts", "yellow_robots", "blue_robots", "ball"])
TeamRobotCoords = namedtuple("TeamRobotCoords", ["team_color", "robots"])
33 changes: 9 additions & 24 deletions team_controller/src/controllers/robot_startup_controller.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,36 @@
import time
import threading
import numpy as np
from typing import Tuple, Optional, Dict, List, Union
from typing import Tuple, Optional, Dict, Union

from global_utils.math_utils import rotate_vector
from team_controller.src.data.vision_receiver import VisionDataReceiver
from team_controller.src.controllers.sim_robot_controller import SimRobotController
from motion_planning.src.pid.pid import PID
from team_controller.src.config.settings import (
PID_PARAMS,
LOCAL_HOST,
YELLOW_TEAM_SIM_PORT,
BLUE_TEAM_SIM_PORT,
YELLOW_START,
)
from team_controller.src.utils import network_manager

from team_controller.src.generated_code.ssl_simulation_robot_control_pb2 import (
RobotControl,
)

# TODO: To be moved to a High-level Descision making repo


class StartUpController:
def __init__(
self,
vision_receiver: VisionDataReceiver,
address=LOCAL_HOST,
port=(YELLOW_TEAM_SIM_PORT, BLUE_TEAM_SIM_PORT),
debug=False,
):
self.vision_receiver = vision_receiver

self.net = network_manager.NetworkManager(address=(address, port[0]))
self.sim_robot_controller = SimRobotController(debug=debug)

# TODO: Tune PID parameters further when going from sim to real(it works for Grsim)
# potentially have a set tunig parameters for each robot
self.pid_oren = PID(0.0167, 8, -8, 5, 0.01, 0, num_robots=6)
self.pid_trans = PID(0.0167, 1.5, -1.5, 5, 0.01, 0, num_robots=6)
self.pid_oren = PID(0.0167, 8, -8, 4.5, 0, 0, num_robots=6)
self.pid_trans = PID(0.0167, 1.5, -1.5, 4.5, 0, 0, num_robots=6)

self.lock = threading.Lock()

Expand All @@ -58,11 +51,12 @@ def startup(self):
command = self._calculate_robot_velocities(
robot_id, target_coords, robots, balls, face_ball=True
)
self._add_robot_command(out_packet, command)
self.sim_robot_controller.add_robot_command(command)

if self.debug:
print(out_packet)
self.net.send_command(out_packet)
self.sim_robot_controller.send_robot_commands(team_is_yellow=True)
self.sim_robot_controller.robot_has_ball(robot_id=3, team_is_yellow=True)

time_to_sleep = max(0, 0.0167 - (time.time() - start_time))
time.sleep(time_to_sleep)
Expand Down Expand Up @@ -91,7 +85,7 @@ def _calculate_robot_velocities(
target_coords (Tuple[float, float] | Tuple[float, float, float]): Target coordinates the robot should move towards.
Can be a (x, y) or (x, y, orientation) tuple. If `face_ball` is True, the robot will face the ball instead of
using the orientation value in target_coords.
robots (Dict[int, Optional[Tuple[float, float, float]]]): All the Current coordinates of the robots sepateated
robots (Dict[int, Optional[Tuple[float, float, float]]]): All the Current coordinates of the robots sepateated
by thier robot_id which containts a tuple (x, y, orientation).
balls (Dict[int, Tuple[float, float, float]]): All the Coordinates of the detected balls (int) , typically (x, y, z/height in 3D space). face_ball (bool, optional): If True, the robot will orient itself to face the ball's position. Defaults to False.

Expand Down Expand Up @@ -142,12 +136,3 @@ def _calculate_robot_velocities(
)

return out

def _add_robot_command(self, out_packet, command) -> None:
robot = out_packet.robot_commands.add()
robot.id = command["id"]
local_vel = robot.move_command.local_velocity
local_vel.forward = command["xvel"]
local_vel.left = command["yvel"]
local_vel.angular = command["wvel"]
# print(f"Robot {command['id']} command: ({command['xvel']:.3f}, {command['yvel']:.3f}, {command['wvel']:.3f})")
107 changes: 107 additions & 0 deletions team_controller/src/controllers/sim_robot_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import time
import threading
import numpy as np
from typing import Tuple, Optional, Dict, List, Union

from global_utils.math_utils import rotate_vector
from team_controller.src.data.vision_receiver import VisionDataReceiver
from motion_planning.src.pid.pid import PID
from team_controller.src.config.settings import (
PID_PARAMS,
LOCAL_HOST,
YELLOW_TEAM_SIM_PORT,
BLUE_TEAM_SIM_PORT,
YELLOW_START,
)
from team_controller.src.utils import network_manager

from team_controller.src.generated_code.ssl_simulation_robot_control_pb2 import (
RobotControl,
)
from team_controller.src.generated_code.ssl_simulation_robot_feedback_pb2 import (
RobotControlResponse, RobotFeedback
)

# TODO: To be moved to a High-level Descision making repo


class SimRobotController:
def __init__(
self,
address=LOCAL_HOST,
port=(YELLOW_TEAM_SIM_PORT, BLUE_TEAM_SIM_PORT),
debug=False,
):

self.out_packet = RobotControl()

self.net_yellow = network_manager.NetworkManager(address=(address, port[0]))
self.net_blue = network_manager.NetworkManager(address=(address, port[1]))

self.yellow_robot_info = RobotControlResponse()
self.blue_robot_info = RobotControlResponse()

self.debug = debug

def send_robot_commands(self, team_is_yellow: bool) -> None:
"""
Sends the robot commands to the appropriate team (yellow or blue).

Args:
team_is_yellow (bool): True if the team is yellow, False if the team is blue.
"""
if self.debug:
print(f"Sending Robot Commands")

if team_is_yellow:
data = self.net_yellow.send_command(self.out_packet, is_sim_robot_cmd=True)
if data:
self.yellow_robot_info = RobotControlResponse()
self.yellow_robot_info.ParseFromString(data)
self.out_packet.Clear()
else:
data = self.net_blue.send_command(self.out_packet, is_sim_robot_cmd=True)
if data:
self.blue_robot_info = RobotControlResponse()
self.blue_robot_info.ParseFromString(data)
self.out_packet.Clear()

def add_robot_command(self, command: Dict) -> None:
"""
Adds a robot command to the out_packet.

Args:
command (dict): A dictionary containing the robot command with keys 'id', 'xvel', 'yvel', and 'wvel'.
"""
robot = self.out_packet.robot_commands.add()
robot.id = command["id"]
local_vel = robot.move_command.local_velocity
local_vel.forward = command["xvel"]
local_vel.left = command["yvel"]
local_vel.angular = command["wvel"]
# print(f"Robot {command['id']} command: ({command['xvel']:.3f}, {command['yvel']:.3f}, {command['wvel']:.3f})")

def robot_has_ball(self, robot_id: int, team_is_yellow: bool) -> bool:
"""
Checks if the specified robot has the ball.

Args:
robot_id (int): The ID of the robot.
team_is_yellow (bool): True if the team is yellow, False if the team is blue.

Returns:
bool: True if the robot has the ball, False otherwise.
"""
if team_is_yellow:
response = self.yellow_robot_info
else:
response = self.blue_robot_info

for robot_feedback in response.feedback:
if robot_feedback.HasField("dribbler_ball_contact") and robot_feedback.id == robot_id:
if robot_feedback.dribbler_ball_contact:
if self.debug:
print(f"Robot: {robot_id}: HAS the Ball")
return True
else:
return False
99 changes: 89 additions & 10 deletions team_controller/src/data/vision_receiver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import threading
import time
from typing import List
from entities.data.vision import BallData, RobotData, FrameData
from entities.data.vision import BallData, RobotData, FrameData, TeamRobotCoords
from team_controller.src.utils import network_manager
from team_controller.src.config.settings import MULTICAST_GROUP, VISION_PORT, NUM_ROBOTS
from team_controller.src.generated_code.ssl_vision_wrapper_pb2 import SSL_WrapperPacket
Expand Down Expand Up @@ -67,14 +67,15 @@ def __update_team_robots_pos(
) -> None:
# Generic method to update robots for both teams.
for robot in robots_data:
robots[robot.robot_id] = RobotData(
robot.x,
robot.y,
robot.orientation if robot.HasField("orientation") else 0,
)
# TODO: When do we not have orientation?

def _print_frame_info(self, t_received: float, detection: object):
if 0 <= robot.robot_id < len(robots):
robots[robot.robot_id] = RobotData(
robot.x,
robot.y,
robot.orientation if robot.HasField("orientation") else 0,
)
# TODO: When do we not have orientation?

def _print_frame_info(self, t_received: float, detection: object) -> None:
t_now = time.time()
print(f"Time Now: {t_now:.3f}s")
print(
Expand All @@ -91,7 +92,7 @@ def _print_frame_info(self, t_received: float, detection: object):
f"{((t_now - t_received) + (detection.t_sent - detection.t_capture)) * 1000.0:.3f}ms"
)

def get_frame_data(self):
def get_frame_data(self) -> FrameData:
"""
Retrieve all relevant data for this frame. Combination of timestep, ball and robot data.

Expand Down Expand Up @@ -174,3 +175,81 @@ def pull_game_data(self) -> None:
print(f"Robots: {self.get_robots_pos(True)}\n")
print(f"Ball: {self.get_ball_pos()}\n")
time.sleep(0.0083)

def get_robot_coords(self, is_yellow: bool) -> TeamRobotCoords:
"""
Retrieves the current positions of all robots on the specified team.

Args:
is_yellow (bool): If True, retrieves data for the yellow team; otherwise, for the blue team.

Returns:
TeamRobotCoords: A named tuple containing the team color and a list of RobotData.
"""
with self.lock:
team_color = "yellow" if is_yellow else "blue"
robots = self.robots_yellow_pos if is_yellow else self.robots_blue_pos
return TeamRobotCoords(team_color=team_color, robots=robots)

def get_robot_by_id(self, is_yellow: bool, robot_id: int) -> RobotData:
"""
Retrieves the position data for a specific robot by ID.

Args:
is_yellow (bool): If True, retrieves data for the yellow team; otherwise, for the blue team.
robot_id (int): The ID of the robot.

Returns:
RobotData: The position data of the specified robot.
"""
with self.lock:
robots = self.robots_yellow_pos if is_yellow else self.robots_blue_pos
if 0 <= robot_id < len(robots) and robots[robot_id] is not None:
return robots[robot_id]
else:
return None # Or raise an exception. TODO

def get_closest_robot_to_point(self, is_yellow: bool, x: float, y: float) -> RobotData:
"""
Finds the robot closest to a given point.

Args:
is_yellow (bool): If True, searches within the yellow team; otherwise, within the blue team.
x (float): The x-coordinate of the point.
y (float): The y-coordinate of the point.

Returns:
RobotData: The position data of the closest robot.
"""
with self.lock:
robots = self.robots_yellow_pos if is_yellow else self.robots_blue_pos
min_distance = float('inf')
closest_robot = None
for robot in robots:
if robot is not None:
distance = ((robot.x - x) ** 2 + (robot.y - y) ** 2) ** 0.5
if distance < min_distance:
min_distance = distance
closest_robot = robot
# Haven't been tested TODO
return closest_robot

def get_ball_velocity(self) -> tuple:
"""
Calculates the ball's velocity based on position changes over time.

Returns:
tuple: The velocity components (vx, vy).
"""
# TODO Find a method to store the data and get velocity. --> self.previour_ball_pos
# with self.lock:
# if self.previous_ball_pos and self.ball_pos:
# dt = self.time_received - self.previous_time_received
# if dt > 0:
# vx = (self.ball_pos.x - self.previous_ball_pos.x) / dt
# vy = (self.ball_pos.y - self.previous_ball_pos.y) / dt
# return (vx, vy)
# return (0.0, 0.0)
pass


Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# TODO: Imcomplete implementation

# Add the project root directory to sys.path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../"))
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
print(project_root)
sys.path.insert(0, project_root)

Expand Down
2 changes: 1 addition & 1 deletion team_controller/src/tests/referee_receiver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


# Add the project root directory to sys.path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../"))
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
print(project_root)
sys.path.insert(0, project_root)

Expand Down
27 changes: 27 additions & 0 deletions team_controller/src/tests/robot_controller_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import sys
import threading

# Add the project root directory to sys.path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
print(project_root)
sys.path.insert(0, project_root)

from team_controller.src.data.vision_receiver import VisionDataReceiver
from team_controller.src.controllers.robot_startup_controller import StartUpController

if __name__ == "__main__":
vision_receiver = VisionDataReceiver(debug=True)
decision_maker = StartUpController(vision_receiver, debug=False)

vision_thread = threading.Thread(target=vision_receiver.pull_game_data)
command_thread = threading.Thread(target=decision_maker.startup)

vision_thread.start()
command_thread.start()

try:
vision_thread.join()
command_thread.join()
except KeyboardInterrupt:
print("Exiting...")
4 changes: 2 additions & 2 deletions team_controller/src/tests/sim_controller_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import sys

# Add the project root directory to sys.path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../"))
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
print(project_root)
sys.path.insert(0, project_root)

from controllers.sim_controller import SimulatorController
from team_controller.src.controllers.sim_controller import SimulatorController

def main():
controller = SimulatorController()
Expand Down
Loading