diff --git a/utama_core/config/settings.py b/utama_core/config/settings.py index d07bef9d..e2a2bce3 100644 --- a/utama_core/config/settings.py +++ b/utama_core/config/settings.py @@ -31,6 +31,9 @@ BAUD_RATE = 115200 PORT = "/dev/ttyUSB0" TIMEOUT = 0.1 +KICKER_COOLDOWN_TIME = 10 # in seconds to prevent kicker from being actuated too frequently +KICKER_COOLDOWN_TIMESTEPS = int(KICKER_COOLDOWN_TIME * CONTROL_FREQUENCY) # in timesteps +KICKER_PERSIST_TIMESTEPS = 10 # in timesteps to persist the kick command MAX_GAME_HISTORY = 20 # number of previous game states to keep in Game diff --git a/utama_core/team_controller/src/controllers/real/real_robot_controller.py b/utama_core/team_controller/src/controllers/real/real_robot_controller.py index 023a96d6..c93ff98d 100644 --- a/utama_core/team_controller/src/controllers/real/real_robot_controller.py +++ b/utama_core/team_controller/src/controllers/real/real_robot_controller.py @@ -1,3 +1,4 @@ +import dataclasses import logging import time import warnings @@ -7,7 +8,14 @@ from serial import EIGHTBITS, PARITY_EVEN, STOPBITS_TWO, Serial from utama_core.config.robot_params import REAL_PARAMS -from utama_core.config.settings import BAUD_RATE, PORT, TIMEOUT, TIMESTEP +from utama_core.config.settings import ( + BAUD_RATE, + KICKER_COOLDOWN_TIMESTEPS, + KICKER_PERSIST_TIMESTEPS, + PORT, + TIMEOUT, + TIMESTEP, +) from utama_core.entities.data.command import RobotCommand, RobotResponse from utama_core.skills.src.utils.move_utils import empty_command from utama_core.team_controller.src.controllers.common.robot_controller_abstract import ( @@ -21,6 +29,13 @@ MAX_ANGULAR_VEL = REAL_PARAMS.MAX_ANGULAR_VEL +@dataclasses.dataclass +class KickTrackerEntry: + remaining_persist: int + remaining_cooldown: int + is_kick: bool # True for kick, False for chip + + class RealRobotController(AbstractRobotController): """Robot Controller for Real Robots. @@ -39,6 +54,9 @@ def __init__(self, is_team_yellow: bool, n_friendly: int): logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}") self._assigned_mapping = {} # mapping of robot_id to index in the out_packet + # track last kick time for each robot to transmit kick as HIGH for n timesteps after command + self._kicker_tracker: Dict[int, KickTrackerEntry] = {} + def get_robots_responses(self) -> Optional[List[RobotResponse]]: ### TODO: Not implemented yet return None @@ -58,6 +76,21 @@ def send_robot_commands(self) -> None: # print(data_in) # TODO: add receiving feedback from the robots + ### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS + ### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period + ### Embedded only registers rising edge of kick/chip command + # TODO: this logic has to be reconsidered. Solution for qualification now. + + for robot_id in list(self._kicker_tracker.keys()): + if self._kicker_tracker[robot_id].remaining_cooldown > 1: + self._kicker_tracker[robot_id].remaining_cooldown -= 1 + + if self._kicker_tracker[robot_id].remaining_persist > 0: + self._kicker_tracker[robot_id].remaining_persist -= 1 + else: + # remove kicker tracker entry once cooldown is over + del self._kicker_tracker[robot_id] + self._out_packet = self._empty_command() # flush the out_packet self._assigned_mapping = {} # reset assigned mapping @@ -98,6 +131,20 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None: command_buffer = self._generate_command_buffer(robot_id, c_command) self._out_packet[start_idx + 1 : start_idx + self._rbt_cmd_size + 1] = command_buffer + if command.kick and robot_id not in self._kicker_tracker: + self._kicker_tracker[robot_id] = KickTrackerEntry( + remaining_persist=KICKER_PERSIST_TIMESTEPS, + remaining_cooldown=KICKER_COOLDOWN_TIMESTEPS, + is_kick=True, + ) + # if for some reason we are kicking and chipping at the same time, we prioritise kick + if command.chip and robot_id not in self._kicker_tracker: + self._kicker_tracker[robot_id] = KickTrackerEntry( + remaining_persist=KICKER_PERSIST_TIMESTEPS, + remaining_cooldown=KICKER_COOLDOWN_TIMESTEPS, + is_kick=False, + ) + # def _populate_robots_info(self, data_in: bytes) -> None: # """ # Populates the robots_info list with the data received from the robots. @@ -139,12 +186,31 @@ def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> by ] ) + #### KICKER LOGIC #### + # cooldown ensures that we do not resend kick/chip command within cooldown period + # persist ensures that we resend kick/chip command for n timesteps after initial command + # embedded detects rising edge of kick/chip command only + ###################### + kicker_byte = 0 - if c_command.kick: + tracker = self._kicker_tracker.get(robot_id) + + # If tracker_entry exists but persistence expired → send empty kicker byte and return + if tracker and tracker.remaining_persist <= 0: + packet.append(kicker_byte) + return packet + + # Decide whether we're kicking or chipping + kick_active = c_command.kick or (tracker and tracker.remaining_persist > 0 and tracker.is_kick) + + chip_active = c_command.chip or (tracker and tracker.remaining_persist > 0 and not tracker.is_kick) + + if kick_active: kicker_byte |= 0xF0 # upper kicker full power - if c_command.chip: + elif chip_active: kicker_byte |= 0x0F - packet.append(kicker_byte) # Kicker controls # Frame end + + packet.append(kicker_byte) # packet_str = " ".join(f"{byte:08b}" for byte in packet) @@ -155,7 +221,6 @@ def _convert_float16_command(self, robot_id, command: RobotCommand) -> RobotComm Also converts angular velocity to degrees per second. """ - angular_vel = self._sanitise_float(command.angular_vel) local_forward_vel = self._sanitise_float(command.local_forward_vel) local_left_vel = self._sanitise_float(command.local_left_vel)