Skip to content
2 changes: 2 additions & 0 deletions utama_core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
BAUD_RATE = 115200
PORT = "/dev/ttyUSB0"
TIMEOUT = 0.1
KICKER_PERSIST_TIME = 10 # in seconds to keep kick HIGH after command
KICKER_PERSIST_TIMESTEPS = KICKER_PERSIST_TIME * CONTROL_FREQUENCY
Comment thread
energy-in-joles marked this conversation as resolved.
Outdated

MAX_GAME_HISTORY = 20 # number of previous game states to keep in Game

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import logging
import time
import warnings
Expand All @@ -7,7 +8,13 @@
from serial import EIGHTBITS, PARITY_EVEN, STOPBITS_TWO, Serial

from utama_core.config.robot_params import REAL_PARAMS
from utama_core.config.settings import BAUD_RATE, PORT, TIMEOUT, TIMESTEP
from utama_core.config.settings import (
BAUD_RATE,
KICKER_PERSIST_TIMESTEPS,
PORT,
TIMEOUT,
TIMESTEP,
)
Comment thread
energy-in-joles marked this conversation as resolved.
from utama_core.entities.data.command import RobotCommand, RobotResponse
from utama_core.skills.src.utils.move_utils import empty_command
from utama_core.team_controller.src.controllers.common.robot_controller_abstract import (
Expand All @@ -21,6 +28,12 @@
MAX_ANGULAR_VEL = REAL_PARAMS.MAX_ANGULAR_VEL


@dataclasses.dataclass
class KickTrackerEntry:
remaining_timesteps: int
is_kick: bool # True for kick, False for chip


class RealRobotController(AbstractRobotController):
"""Robot Controller for Real Robots.

Expand All @@ -39,6 +52,9 @@ def __init__(self, is_team_yellow: bool, n_friendly: int):
logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}")
self._assigned_mapping = {} # mapping of robot_id to index in the out_packet

# track last kick time for each robot to transmit kick as HIGH for n timesteps after command
self._kicker_tracker: Dict[int, KickTrackerEntry] = {}

def get_robots_responses(self) -> Optional[List[RobotResponse]]:
### TODO: Not implemented yet
return None
Expand All @@ -58,6 +74,18 @@ def send_robot_commands(self) -> None:
# print(data_in)
# TODO: add receiving feedback from the robots

### update kick and chip trackers. We persist the kick/chip command for KICKER_PERSIST_TIMESTEPS
### this feature is to combat packet loss and to ensure the robot does not kick within its cooldown period
### Embedded only registers rising edge of kick/chip command
# TODO: this logic has to be reconsidered. Solution for qualification now.

for robot_id in list(self._kicker_tracker.keys()):
if self._kicker_tracker[robot_id].remaining_timesteps > 1:
self._kicker_tracker[robot_id].remaining_timesteps -= 1
else:
# reset kick command to 0 in the out_packet
Comment thread
energy-in-joles marked this conversation as resolved.
Outdated
del self._kicker_tracker[robot_id]

self._out_packet = self._empty_command() # flush the out_packet
self._assigned_mapping = {} # reset assigned mapping

Expand Down Expand Up @@ -98,6 +126,16 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None:
command_buffer = self._generate_command_buffer(robot_id, c_command)
self._out_packet[start_idx + 1 : start_idx + self._rbt_cmd_size + 1] = command_buffer

if command.kick and robot_id not in self._kicker_tracker:
self._kicker_tracker[robot_id] = KickTrackerEntry(
remaining_timesteps=KICKER_PERSIST_TIMESTEPS, is_kick=True
)
# if for some reason we are kicking and chipping at the same time, we prioritise kick
if command.chip and robot_id not in self._kicker_tracker:
self._kicker_tracker[robot_id] = KickTrackerEntry(
remaining_timesteps=KICKER_PERSIST_TIMESTEPS, is_kick=False
)

# def _populate_robots_info(self, data_in: bytes) -> None:
# """
# Populates the robots_info list with the data received from the robots.
Expand Down Expand Up @@ -140,9 +178,10 @@ def _generate_command_buffer(self, robot_id: int, c_command: RobotCommand) -> by
)

kicker_byte = 0
if c_command.kick:
# check kick and chip command but also check the kicker tracker to see if we need to persist the command
if c_command.kick or (robot_id in self._kicker_tracker and self._kicker_tracker[robot_id].is_kick):
kicker_byte |= 0xF0 # upper kicker full power
if c_command.chip:
if c_command.chip or (robot_id in self._kicker_tracker and not self._kicker_tracker[robot_id].is_kick):
kicker_byte |= 0x0F
Comment thread
energy-in-joles marked this conversation as resolved.
Outdated
packet.append(kicker_byte) # Kicker controls # Frame end

Expand All @@ -155,7 +194,6 @@ def _convert_float16_command(self, robot_id, command: RobotCommand) -> RobotComm

Also converts angular velocity to degrees per second.
"""

angular_vel = self._sanitise_float(command.angular_vel)
local_forward_vel = self._sanitise_float(command.local_forward_vel)
local_left_vel = self._sanitise_float(command.local_left_vel)
Expand Down
Loading