Skip to content
3 changes: 3 additions & 0 deletions utama_core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
BAUD_RATE = 115200
PORT = "/dev/ttyUSB0"
TIMEOUT = 0.1
KICKER_COOLDOWN_TIME = 10 # in seconds to prevent kicker from being actuated too frequently
KICKER_COOLDOWN_TIMESTEPS = int(KICKER_COOLDOWN_TIME * CONTROL_FREQUENCY) # in timesteps
KICKER_PERSIST_TIMESTEPS = 10 # in timesteps to persist the kick command

MAX_GAME_HISTORY = 20 # number of previous game states to keep in Game

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import logging
import time
import warnings
Expand All @@ -7,7 +8,14 @@
from serial import EIGHTBITS, PARITY_EVEN, STOPBITS_TWO, Serial

from utama_core.config.robot_params import REAL_PARAMS
from utama_core.config.settings import BAUD_RATE, PORT, TIMEOUT, TIMESTEP
from utama_core.config.settings import (
BAUD_RATE,
KICKER_COOLDOWN_TIMESTEPS,
KICKER_PERSIST_TIMESTEPS,
PORT,
TIMEOUT,
TIMESTEP,
)
from utama_core.entities.data.command import RobotCommand, RobotResponse
from utama_core.skills.src.utils.move_utils import empty_command
from utama_core.team_controller.src.controllers.common.robot_controller_abstract import (
Expand All @@ -21,6 +29,13 @@
MAX_ANGULAR_VEL = REAL_PARAMS.MAX_ANGULAR_VEL


@dataclasses.dataclass
class KickTrackerEntry:
remaining_persist: int
remaining_cooldown: int
is_kick: bool # True for kick, False for chip


class RealRobotController(AbstractRobotController):
"""Robot Controller for Real Robots.

Expand All @@ -39,6 +54,9 @@ def __init__(self, is_team_yellow: bool, n_friendly: int):
logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}")
self._assigned_mapping = {} # mapping of robot_id to index in the out_packet

# track last kick time for each robot to transmit kick as HIGH for n timesteps after command
self._kicker_tracker: Dict[int, KickTrackerEntry] = {}

def get_robots_responses(self) -> Optional[List[RobotResponse]]:
### TODO: Not implemented yet
return None
Expand All @@ -58,6 +76,21 @@ def send_robot_commands(self) -> None:
# print(data_in)
# TODO: add receiving feedback from the robots

### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS
### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period
### Embedded only registers rising edge of kick/chip command
# TODO: this logic has to be reconsidered. Solution for qualification now.

for robot_id in list(self._kicker_tracker.keys()):
if self._kicker_tracker[robot_id].remaining_cooldown > 1:
self._kicker_tracker[robot_id].remaining_cooldown -= 1

if self._kicker_tracker[robot_id].remaining_persist > 0:
self._kicker_tracker[robot_id].remaining_persist -= 1
else:
# remove kicker tracker entry once cooldown is over
del self._kicker_tracker[robot_id]

self._out_packet = self._empty_command() # flush the out_packet
self._assigned_mapping = {} # reset assigned mapping

Expand Down Expand Up @@ -98,6 +131,20 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None:
command_buffer = self._generate_command_buffer(robot_id, c_command)
self._out_packet[start_idx + 1 : start_idx + self._rbt_cmd_size + 1] = command_buffer

if command.kick and robot_id not in self._kicker_tracker:
self._kicker_tracker[robot_id] = KickTrackerEntry(
remaining_persist=KICKER_PERSIST_TIMESTEPS,
remaining_cooldown=KICKER_COOLDOWN_TIMESTEPS,
is_kick=True,
)
# if for some reason we are kicking and chipping at the same time, we prioritise kick
if command.chip and robot_id not in self._kicker_tracker:
self._kicker_tracker[robot_id] = KickTrackerEntry(
remaining_persist=KICKER_PERSIST_TIMESTEPS,
remaining_cooldown=KICKER_COOLDOWN_TIMESTEPS,
is_kick=False,
)

# def _populate_robots_info(self, data_in: bytes) -> None:
# """
# Populates the robots_info list with the data received from the robots.
Expand Down Expand Up @@ -139,12 +186,31 @@ def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> by
]
)

#### KICKER LOGIC ####
# cooldown ensures that we do not resend kick/chip command within cooldown period
# persist ensures that we resend kick/chip command for n timesteps after initial command
# embedded detects rising edge of kick/chip command only
######################

kicker_byte = 0
if c_command.kick:
tracker = self._kicker_tracker.get(robot_id)

# If tracker_entry exists but persistence expired → send empty kicker byte and return
if tracker and tracker.remaining_persist <= 0:
packet.append(kicker_byte)
return packet

# Decide whether we're kicking or chipping
kick_active = c_command.kick or (tracker and tracker.remaining_persist > 0 and tracker.is_kick)

chip_active = c_command.chip or (tracker and tracker.remaining_persist > 0 and not tracker.is_kick)

if kick_active:
kicker_byte |= 0xF0 # upper kicker full power
if c_command.chip:
elif chip_active:
kicker_byte |= 0x0F
packet.append(kicker_byte) # Kicker controls # Frame end

packet.append(kicker_byte)

# packet_str = " ".join(f"{byte:08b}" for byte in packet)

Expand All @@ -155,7 +221,6 @@ def _convert_float16_command(self, robot_id, command: RobotCommand) -> RobotComm

Also converts angular velocity to degrees per second.
"""

angular_vel = self._sanitise_float(command.angular_vel)
local_forward_vel = self._sanitise_float(command.local_forward_vel)
local_left_vel = self._sanitise_float(command.local_left_vel)
Expand Down
Loading