diff --git a/main.py b/main.py index 5202d447..90ce387f 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,11 @@ from utama_core.replay import ReplayWriterConfig from utama_core.run import StrategyRunner -from utama_core.strategy import DefenceStrategy, RobotPlacementStrategy, StartupStrategy +from utama_core.strategy.examples import ( + DefenceStrategy, + GoToBallExampleStrategy, + RobotPlacementStrategy, + StartupStrategy, +) def main(): diff --git a/utama_core/config/robot_params.py b/utama_core/config/robot_params.py index 184bef9f..a0569f41 100644 --- a/utama_core/config/robot_params.py +++ b/utama_core/config/robot_params.py @@ -34,10 +34,10 @@ class RobotParams: ) REAL_PARAMS = RobotParams( - MAX_VEL=0.2, - MAX_ANGULAR_VEL=0.5, - MAX_ACCELERATION=2, - MAX_ANGULAR_ACCELERATION=0.2, + MAX_VEL=1, + MAX_ANGULAR_VEL=2, + MAX_ACCELERATION=4, + MAX_ANGULAR_ACCELERATION=50, KICK_SPD=5, DRIBBLE_SPD=3, CHIP_ANGLE=pi / 4, diff --git a/utama_core/config/settings.py b/utama_core/config/settings.py index f03994aa..5eb5326d 100644 --- a/utama_core/config/settings.py +++ b/utama_core/config/settings.py @@ -24,7 +24,7 @@ ### REAL CONTROLLER SETTINGS ### BAUD_RATE = 115200 -PORT = "/dev/ttyACM0" +PORT = "/dev/ttyUSB0" TIMEOUT = 0.1 MAX_GAME_HISTORY = 20 diff --git a/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py b/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py index 08877ba8..1b1a5d23 100644 --- a/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py +++ b/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py @@ -268,8 +268,8 @@ def _render(self): overlay = RenderOverlay(self.overlay, self.field_renderer.scale) overlay.draw(self.window_surface) - for i in range(self.n_robots_blue): - robot = self.frame.robots_blue[i] + for blue_rb in self.frame.robots_blue.values(): + robot = blue_rb x, y = self._pos_transform(robot.x, robot.y) rbt = RenderSSLRobot( x, @@ -281,8 +281,8 @@ def _render(self): ) rbt.draw(self.window_surface) - for i in range(self.n_robots_yellow): - robot = self.frame.robots_yellow[i] + for yellow_rb in self.frame.robots_yellow.values(): + robot = yellow_rb x, y = self._pos_transform(robot.x, robot.y) rbt = RenderSSLRobot( x, diff --git a/utama_core/run/refiners/position.py b/utama_core/run/refiners/position.py index a758653e..b36bb667 100644 --- a/utama_core/run/refiners/position.py +++ b/utama_core/run/refiners/position.py @@ -128,11 +128,10 @@ def _get_most_confident_ball(balls: List[VisionBallData]) -> Ball: def _combine_single_team_positions( self, - game_robots: Dict[int, Robot], + new_game_robots: Dict[int, Robot], vision_robots: List[VisionRobotData], friendly: bool, ) -> Dict[int, Robot]: - new_game_robots = game_robots.copy() for robot in vision_robots: new_x, new_y = robot.x, robot.y diff --git a/utama_core/strategy/__init__.py b/utama_core/strategy/__init__.py index c92c3b09..e69de29b 100644 --- a/utama_core/strategy/__init__.py +++ b/utama_core/strategy/__init__.py @@ -1,9 +0,0 @@ -from .examples.strategies.defense_strategy import DefenceStrategy -from .examples.strategies.one_robot_placement_strategy import RobotPlacementStrategy -from .examples.strategies.startup_strategy import StartupStrategy - -__all__ = [ - "StartupStrategy", - "DefenceStrategy", - "RobotPlacementStrategy", -] diff --git a/utama_core/strategy/examples/__init__.py b/utama_core/strategy/examples/__init__.py new file mode 100644 index 00000000..2d12e9d4 --- /dev/null +++ b/utama_core/strategy/examples/__init__.py @@ -0,0 +1,6 @@ +from utama_core.strategy.examples.defense_strategy import DefenceStrategy +from utama_core.strategy.examples.go_to_ball_ex import GoToBallExampleStrategy +from utama_core.strategy.examples.one_robot_placement_strategy import ( + RobotPlacementStrategy, +) +from utama_core.strategy.examples.startup_strategy import StartupStrategy diff --git a/utama_core/strategy/examples/strategies/defense_strategy.py b/utama_core/strategy/examples/defense_strategy.py similarity index 96% rename from utama_core/strategy/examples/strategies/defense_strategy.py rename to utama_core/strategy/examples/defense_strategy.py index 3485f651..62ce6073 100644 --- a/utama_core/strategy/examples/strategies/defense_strategy.py +++ b/utama_core/strategy/examples/defense_strategy.py @@ -38,14 +38,19 @@ class BlockAttackerStep(AbstractBehaviour): **Blackboard Interaction:** Reads: - - `rd_defender_id` (int): The ID of the robot to check for ball possession. Typically from the `SetBlackboardVariable` node. + - `rd_defender_id` (int): The ID of the defending robot that will block the attacker. Typically from the `SetBlackboardVariable` node. - `rd_blocking_target` (int): The ID of the enemy robot to block. Typically from the `FindBlockingTarget` node. **Returns:** - `py_trees.common.Status.RUNNING`: The behaviour is actively commanding the robot to block the attacker. """ - def __init__(self, rd_defender_id: str, rd_locking_target: str, name: Optional[str] = "BlockAttackerStep"): + def __init__( + self, + rd_defender_id: str, + rd_locking_target: str, + name: Optional[str] = "BlockAttackerStep", + ): super().__init__(name=name) self.defender_id_key = rd_defender_id self.locking_target_key = rd_locking_target diff --git a/utama_core/strategy/examples/go_to_ball_ex.py b/utama_core/strategy/examples/go_to_ball_ex.py new file mode 100644 index 00000000..8f47e8a1 --- /dev/null +++ b/utama_core/strategy/examples/go_to_ball_ex.py @@ -0,0 +1,189 @@ +from typing import Any, Optional + +import py_trees +from py_trees.composites import Selector, Sequence + +from utama_core.config.physical_constants import ROBOT_RADIUS +from utama_core.config.settings import TIMESTEP +from utama_core.entities.data.vector import Vector2D +from utama_core.entities.game import Game +from utama_core.skills.src.go_to_ball import go_to_ball +from utama_core.strategy.common import AbstractBehaviour, AbstractStrategy + + +class HasBall(AbstractBehaviour): + """ + Checks whether the configured robot currently owns the ball. + **Args:** + - visual (bool): If True, uses distance-based possession check; otherwise uses game state. + **Blackboard Interaction:** + - Reads: + - `robot_id` (int): The ID of the robot to check for ball possession. + **Returns:** + - `py_trees.common.Status.SUCCESS`: The robot has possession of the ball. + - `py_trees.common.Status.FAILURE`: The robot does not have possession of the ball. + """ + + def __init__(self, rd_robot_id: str, visual: bool = True, name: Optional[str] = None): + super().__init__(name) + self.robot_id_key = rd_robot_id + + self.visual = visual + DISTANCE_BUFFER = 0.04 # meters + self.ball_capture_dist = DISTANCE_BUFFER + ROBOT_RADIUS + + def setup_(self): + self.blackboard.register_key(key=self.robot_id_key, access=py_trees.common.Access.READ) + + def update(self): + # print(f"Checking if robot {self.blackboard.get(self.robot_id_key)} has the ball") + game = self.blackboard.game + robot_id = self.blackboard.get(self.robot_id_key) + + if self.visual: + return self._has_ball_visual(game, robot_id) + else: + return self._has_ball_from_state(game, robot_id) + + def _has_ball_visual(self, game: Game, robot_id: int) -> py_trees.common.Status: + """ + Visual possession: success if the robot is within `ball_capture_radius` of the ball. + Success if the robot is within `ball_capture_radius` of the ball (uses Euclidean distance). + """ + robot = game.friendly_robots[robot_id] + ball = game.ball + + r_pos = Vector2D(robot.p.x, robot.p.y) + b_pos = Vector2D(ball.p.x, ball.p.y) + + dist = r_pos.distance_to(b_pos) + return py_trees.common.Status.SUCCESS if dist < self.ball_capture_dist else py_trees.common.Status.FAILURE + + def _has_ball_from_state(self, game: Game, robot_id: int) -> py_trees.common.Status: + """ + State possession: success if the game state's `has_ball` flag is true. + """ + has_ball = game.current.friendly_robots[robot_id].has_ball + + # print(f"Robot {robot_id} has_ball: {has_ball}") + return py_trees.common.Status.SUCCESS if has_ball else py_trees.common.Status.FAILURE + + +class GoToBallStep(AbstractBehaviour): + """ + Continuously moves the robot towards the ball using the core go_to_ball skill. + **Blackboard Interaction:** + - Reads: + - `robot_id` (int): The ID of the robot to move towards the ball. + **Returns:** + - `py_trees.common.Status.RUNNING`: The behaviour is actively commanding the robot to go to the ball. + """ + + def __init__(self, rd_robot_id: str, name: str = "GoToBallStep"): + super().__init__(name) + self.robot_id_key = rd_robot_id + + def setup_(self): + self.blackboard.register_key(key=self.robot_id_key, access=py_trees.common.Access.READ) + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + env = self.blackboard.rsim_env + robot_id = self.blackboard.get(self.robot_id_key) + if env: + v = game.friendly_robots[robot_id].v + p = game.friendly_robots[robot_id].p + env.draw_point(p.x + v.x * TIMESTEP * 5, p.y + v.y * TIMESTEP * 5, color="green") + + command = go_to_ball(game, self.blackboard.motion_controller, robot_id) + self.blackboard.cmd_map[robot_id] = command + return py_trees.common.Status.RUNNING + + +class SetBlackboardVariable(AbstractBehaviour): + """ + Writes a constant `value` onto the blackboard with the key `variable_name`. + **Blackboard Interaction:** + - Writes: + - `variable_name` (Any): The name of the blackboard variable to be set. + **Returns:** + - `py_trees.common.Status.SUCCESS`: The variable has been set. + """ + + def __init__(self, name: str, variable_name: str, value: Any): + super().__init__(name=name) + self.variable_name = variable_name + self.value = value + + def setup_(self): + self.blackboard.register_key(key=self.variable_name, access=py_trees.common.Access.WRITE) + + def update(self) -> py_trees.common.Status: + self.blackboard.set(self.variable_name, self.value, overwrite=True) + return py_trees.common.Status.SUCCESS + + +def go_to_ball_subtree(rd_robot_id: str) -> py_trees.behaviour.Behaviour: + """ + Builds a selector that drives the robot to the ball until it gains possession. + **Args:** + - rd_robot_id: Blackboard key that holds the robot ID to control. **Required.** + **Status:** + - py_trees.common.Status.SUCCESS: The robot already has the ball. + - py_trees.common.Status.RUNNING: The robot is being commanded to go to the ball. + **Returns:** + - py_trees.behaviour.Behaviour: The root node of the Go-To-Ball subtree. + """ + root = Selector( + name="GoToBallSubtree", + memory=False, + ) + + ### Assemble the tree ### + + root.add_children( + [ + HasBall(name="HasBall?", rd_robot_id=rd_robot_id), + GoToBallStep(name="GoToBallStep", rd_robot_id=rd_robot_id), + ] + ) + + return root + + +class GoToBallExampleStrategy(AbstractStrategy): + def __init__(self, robot_id: int): + """Initializes the GoToBallStrategy with a specific robot ID. + :param robot_id: The ID of the robot this strategy will control. + """ + self.robot_id = robot_id + self.robot_id_key = "robot_id" + super().__init__() + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): + if 1 <= n_runtime_friendly <= 3: + return True + return False + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool): + return True + + def get_min_bounding_zone(self): + return None + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + """Factory function to create a complete go_to_ball behaviour tree.""" + + root = Sequence(name="GoToBallRoot", memory=True) + + set_robot_id = SetBlackboardVariable( + name="SetRobotID", + variable_name=self.robot_id_key, + value=self.robot_id, + ) + + ### Assemble the tree ### + + root.add_children([set_robot_id, go_to_ball_subtree(rd_robot_id=self.robot_id_key)]) + + return root diff --git a/utama_core/strategy/examples/strategies/one_robot_placement_strategy.py b/utama_core/strategy/examples/one_robot_placement_strategy.py similarity index 83% rename from utama_core/strategy/examples/strategies/one_robot_placement_strategy.py rename to utama_core/strategy/examples/one_robot_placement_strategy.py index 1b0325f3..4bb4b5b7 100644 --- a/utama_core/strategy/examples/strategies/one_robot_placement_strategy.py +++ b/utama_core/strategy/examples/one_robot_placement_strategy.py @@ -6,6 +6,7 @@ import py_trees from py_trees.composites import Sequence +from utama_core.config.settings import TIMESTEP from utama_core.entities.game.field import FieldBounds from utama_core.global_utils.math_utils import Vector2D from utama_core.skills.src.utils.move_utils import move @@ -46,23 +47,20 @@ def update(self) -> py_trees.common.Status: id: int = self.blackboard.get(self.robot_id_key) friendly_robots = game.friendly_robots - - if game.friendly_robots and game.ball is not None: - friendly_robots = game.friendly_robots - bx, by = game.ball.p.x, game.ball.p.y - rp = friendly_robots[id].p - cx, cy, _ = rp.x, rp.y, friendly_robots[id].orientation - error = math.dist((self.tx, self.ty), (cx, cy)) - - switch = error < 0.05 - if switch: - if self.ty == -1: - self.ty = 1 - self.tx = random.choice([0, 1]) - else: - self.ty = -1 - self.tx = 1 - # self.tx = random.choice([0, 1]) + bx, by = game.ball.p.x, game.ball.p.y + rp = friendly_robots[id].p + cx, cy = rp.x, rp.y + error = math.dist((self.tx, self.ty), (cx, cy)) + + switch = error < 0.05 + if switch: + if self.ty == -1: + self.ty = 1 + self.tx = random.choice([0, 1]) + else: + self.ty = -1 + self.tx = 1 + # self.tx = random.choice([0, 1]) # changed so the robot tracks the ball while moving oren = np.atan2(by - cy, bx - cx) @@ -77,10 +75,10 @@ def update(self) -> py_trees.common.Status: rsim_env.draw_point(self.tx, self.ty, color="red") v = game.friendly_robots[id].v p = game.friendly_robots[id].p - rsim_env.draw_point(p.x + v.x * 0.167 * 5, p.y + v.y * 0.167 * 5, color="green") + rsim_env.draw_point(p.x + v.x * TIMESTEP * 5, p.y + v.y * TIMESTEP * 5, color="green") - self.blackboard.cmd_map[id] = cmd - return py_trees.common.Status.RUNNING + self.blackboard.cmd_map[id] = cmd + return py_trees.common.Status.RUNNING class SetBlackboardVariable(AbstractBehaviour): diff --git a/utama_core/strategy/examples/strategies/startup_strategy.py b/utama_core/strategy/examples/startup_strategy.py similarity index 100% rename from utama_core/strategy/examples/strategies/startup_strategy.py rename to utama_core/strategy/examples/startup_strategy.py diff --git a/utama_core/team_controller/src/controllers/real/real_robot_controller.py b/utama_core/team_controller/src/controllers/real/real_robot_controller.py index bb583089..0dd3ee65 100644 --- a/utama_core/team_controller/src/controllers/real/real_robot_controller.py +++ b/utama_core/team_controller/src/controllers/real/real_robot_controller.py @@ -1,4 +1,5 @@ import logging +import time import warnings from typing import Dict, List, Optional, Union @@ -6,8 +7,9 @@ from serial import EIGHTBITS, PARITY_EVEN, STOPBITS_TWO, Serial from utama_core.config.robot_params import REAL_PARAMS -from utama_core.config.settings import BAUD_RATE, PORT, TIMEOUT +from utama_core.config.settings import BAUD_RATE, PORT, TIMEOUT, TIMESTEP from utama_core.entities.data.command import RobotCommand, RobotResponse +from utama_core.skills.src.utils.move_utils import empty_command from utama_core.team_controller.src.controllers.common.robot_controller_abstract import ( AbstractRobotController, ) @@ -34,14 +36,22 @@ def __init__(self, is_team_yellow: bool, n_friendly: int): self._out_packet = self._empty_command() self._in_packet_size = 1 # size of the feedback packet received from the robots self._robots_info: List[RobotResponse] = [None] * self._n_friendly - + self._n_robots_unassigned = self._n_friendly logger.debug(f"Serial port: {PORT} opened with baudrate: {BAUD_RATE} and timeout {TIMEOUT}") + def get_robots_responses(self) -> Optional[List[RobotResponse]]: + ### TODO: Not implemented yet + return None + def send_robot_commands(self) -> None: """Sends the robot commands to the appropriate team (yellow or blue).""" # print(list(self.out_packet)) # binary_representation = [f"{byte:08b}" for byte in self.out_packet] # print(binary_representation) + if self._n_robots_unassigned != 0: + warnings.warn( + f"Not all robot commands have been assigned. {self._n_robots_unassigned} robots left unassigned. Sending empty commands for them." + ) self._serial_port.write(self.out_packet) self._serial_port.read_all() # data_in = self._serial.read_all() @@ -50,6 +60,7 @@ def send_robot_commands(self) -> None: # TODO: add receiving feedback from the robots self._out_packet = self._empty_command() # flush the out_packet + self._n_robots_unassigned = self._n_friendly def add_robot_commands( self, @@ -66,13 +77,17 @@ def _add_robot_command(self, command: RobotCommand, robot_id: int) -> None: robot_id (int): The ID of the robot. command (RobotCommand): A named tuple containing the robot command with keys: 'local_forward_vel', 'local_left_vel', 'angular_vel', 'kick', 'chip', 'dribble'. """ + if self._n_robots_unassigned <= 0: + raise RuntimeError(f"All {self._n_friendly} robot command slots are already assigned this frame.") c_command = self._convert_float16_command(robot_id, command) command_buffer = self._generate_command_buffer(robot_id, c_command) - print(command_buffer) - start_idx = robot_id * self._rbt_cmd_size + 1 # account for the start frame byte + start_idx = ( + self._n_friendly - self._n_robots_unassigned + ) * self._rbt_cmd_size + 1 # account for the start frame byte self._out_packet[start_idx : start_idx + self._rbt_cmd_size] = ( command_buffer # +1 to account for start frame byte ) + self._n_robots_unassigned -= 1 # def _populate_robots_info(self, data_in: bytes) -> None: # """ @@ -187,12 +202,13 @@ def _sanitise_float(self, val: float) -> float: def _empty_command(self) -> bytearray: if not hasattr(self, "_cached_empty_command"): + INVALID_RBT_ID = 0xFF commands = bytearray() - for robot_id in range(self._n_friendly): - cmd = bytearray([robot_id] + [0] * (self._rbt_cmd_size - 1)) # empty command for each robot - commands += cmd + for _ in range(self._n_friendly): + cmd = bytearray([INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1)) # empty command for each robot + commands.extend(cmd) self._cached_empty_command = bytearray([0xAA]) + commands + bytearray([0x55]) - return self._cached_empty_command + return self._cached_empty_command.copy() def _init_serial(self) -> Serial: """Establish serial connection.""" @@ -245,12 +261,14 @@ def in_packet_size(self) -> int: chip=0, dribble=False, ) - for _ in range(15): + for _ in range(100): robot_controller.add_robot_commands(cmd, 0) - # robot_controller.send_robot_commands() - # for _ in range(10): - # robot_controller.add_robot_commands(empty_command(), 0) - # robot_controller.send_robot_commands() + robot_controller.send_robot_commands() + time.sleep(TIMESTEP) + for _ in range(10): + robot_controller.add_robot_commands(empty_command(), 0) + robot_controller.send_robot_commands() + time.sleep(TIMESTEP) # print(list(robot_controller.out_packet)) # binary_representation = [f"{byte:08b}" for byte in robot_controller.out_packet] diff --git a/utama_core/tests/refiners/position_refiner_integration_test.py b/utama_core/tests/refiners/position_refiner_integration_test.py index db62f6e6..06d68607 100644 --- a/utama_core/tests/refiners/position_refiner_integration_test.py +++ b/utama_core/tests/refiners/position_refiner_integration_test.py @@ -3,6 +3,7 @@ import time from collections import deque +from utama_core.config.settings import TIMESTEP from utama_core.entities.game import Field from utama_core.run import GameGater from utama_core.run.receivers.vision_receiver import VisionReceiver @@ -50,7 +51,7 @@ def main(): print(game) assert len(game.friendly_robots) == NUM_FRIENDLY assert len(game.enemy_robots) == NUM_ENEMY - time.sleep(0.0167) + time.sleep(TIMESTEP) if __name__ == "__main__": diff --git a/utama_core/tests/replay/test_replay.py b/utama_core/tests/replay/test_replay.py new file mode 100644 index 00000000..17bdb3ac --- /dev/null +++ b/utama_core/tests/replay/test_replay.py @@ -0,0 +1,80 @@ +from unittest.mock import patch + +import pytest + +from utama_core.entities.data.vector import Vector2D, Vector3D +from utama_core.entities.game import Ball, GameFrame, Robot +from utama_core.replay.replay_player import ReplayStandardSSL + +# Example frame with non-sequential IDs +frame = GameFrame( + ts=0.0, + my_team_is_yellow=True, + my_team_is_right=True, + friendly_robots={ + 1: Robot( + id=1, + is_friendly=True, + has_ball=False, + p=Vector2D(1, 1), + v=Vector2D(0, 0), + a=Vector2D(0, 0), + orientation=0.0, + ) + }, + enemy_robots={}, + ball=Ball(p=Vector3D(0, 0, 0), v=Vector3D(0, 0, 0), a=Vector3D(0, 0, 0)), +) + +second_frame = GameFrame( + ts=0.1, + my_team_is_yellow=True, + my_team_is_right=True, + friendly_robots={ + 3: Robot( + id=3, + is_friendly=True, + has_ball=False, + p=Vector2D(1.1, 1.1), + v=Vector2D(0, 0), + a=Vector2D(0, 0), + orientation=0.0, + ), + 5: Robot( + id=5, + is_friendly=True, + has_ball=False, + p=Vector2D(2, 2), + v=Vector2D(0, 0), + a=Vector2D(0, 0), + orientation=0.0, + ), + }, + enemy_robots={ + 2: Robot( + id=2, + is_friendly=False, + has_ball=False, + p=Vector2D(-1, -1), + v=Vector2D(0, 0), + a=Vector2D(0, 0), + orientation=0.0, + ) + }, + ball=Ball(p=Vector3D(0, 0, 0), v=Vector3D(0, 0, 0), a=Vector3D(0, 0, 0)), +) + + +@pytest.mark.parametrize("test_frame", [frame, second_frame]) +def test_non_sequential_robot_ids(test_frame): + n_yellow = len(test_frame.friendly_robots) + n_blue = len(test_frame.enemy_robots) + + replay_env = ReplayStandardSSL(n_robots_yellow=n_yellow, n_robots_blue=n_blue) + + # Patch render to do nothing so we don't open a pygame window + with patch.object(replay_env, "render", return_value=None): + try: + replay_env.step_replay(test_frame) + except Exception as e: + pytest.fail(f"Replay failed with non-sequential robot IDs: {e}")