diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index d8958c9c..53b7e946 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -3,4 +3,6 @@ - Do not comment on docstring grammar or punctuation. - Focus on logic, correctness, and API design - Assume Black and Ruff enforce formatting -- Ignore trivial, non functional changes, namely unused imports and formatting changes. \ No newline at end of file +- Ignore trivial, non functional changes, namely unused imports and formatting +changes. +- Ignore unused imports. \ No newline at end of file diff --git a/main.py b/main.py index 0d6c8f2a..7ec61e67 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +from utama_core.entities.game.field import FieldBounds from utama_core.replay import ReplayWriterConfig from utama_core.run import StrategyRunner from utama_core.strategy.examples import ( @@ -5,18 +6,22 @@ GoToBallExampleStrategy, RobotPlacementStrategy, StartupStrategy, + TwoRobotPlacementStrategy, ) def main(): + # Setup for real testing + # Custom field size based setup in real + custom_bounds = FieldBounds(top_left=(2.25, 1.5), bottom_right=(4.5, -1.5)) + runner = StrategyRunner( - strategy=StartupStrategy(), + strategy=TwoRobotPlacementStrategy(first_robot_id=0, second_robot_id=1, field_bounds=custom_bounds), my_team_is_yellow=True, my_team_is_right=True, mode="rsim", - exp_friendly=6, - exp_enemy=3, - control_scheme="dwa", + exp_friendly=2, + exp_enemy=0, replay_writer_config=ReplayWriterConfig(replay_name="test_replay", overwrite_existing=True), print_real_fps=True, profiler_name=None, diff --git a/utama_core/entities/game/field.py b/utama_core/entities/game/field.py index 6c79359e..5e9692db 100644 --- a/utama_core/entities/game/field.py +++ b/utama_core/entities/game/field.py @@ -16,6 +16,13 @@ class FieldBounds: top_left: tuple[float, float] bottom_right: tuple[float, float] + @property + def center(self) -> tuple[float, float]: + """Calculates the geometric center of the field bounds.""" + cx = (self.top_left[0] + self.bottom_right[0]) / 2.0 + cy = (self.top_left[1] + self.bottom_right[1]) / 2.0 + return (cx, cy) + class Field: """Field class that contains all the information about the field. @@ -163,6 +170,10 @@ def half_width(self) -> float: def field_bounds(self) -> FieldBounds: return self._field_bounds + @property + def center(self) -> tuple[float, float]: + return self._field_bounds.center + ### Class Properties for standard field dimensions ### @ClassProperty diff --git a/utama_core/strategy/examples/__init__.py b/utama_core/strategy/examples/__init__.py index 2d12e9d4..c97860c0 100644 --- a/utama_core/strategy/examples/__init__.py +++ b/utama_core/strategy/examples/__init__.py @@ -4,3 +4,4 @@ RobotPlacementStrategy, ) from utama_core.strategy.examples.startup_strategy import StartupStrategy +from utama_core.strategy.examples.two_robot_placement import TwoRobotPlacementStrategy diff --git a/utama_core/strategy/examples/go_to_ball_ex.py b/utama_core/strategy/examples/go_to_ball_ex.py index 8f47e8a1..36b707f2 100644 --- a/utama_core/strategy/examples/go_to_ball_ex.py +++ b/utama_core/strategy/examples/go_to_ball_ex.py @@ -9,6 +9,7 @@ from utama_core.entities.game import Game from utama_core.skills.src.go_to_ball import go_to_ball from utama_core.strategy.common import AbstractBehaviour, AbstractStrategy +from utama_core.strategy.examples.utils import SetBlackboardVariable class HasBall(AbstractBehaviour): @@ -100,29 +101,6 @@ def update(self) -> py_trees.common.Status: return py_trees.common.Status.RUNNING -class SetBlackboardVariable(AbstractBehaviour): - """ - Writes a constant `value` onto the blackboard with the key `variable_name`. - **Blackboard Interaction:** - - Writes: - - `variable_name` (Any): The name of the blackboard variable to be set. - **Returns:** - - `py_trees.common.Status.SUCCESS`: The variable has been set. - """ - - def __init__(self, name: str, variable_name: str, value: Any): - super().__init__(name=name) - self.variable_name = variable_name - self.value = value - - def setup_(self): - self.blackboard.register_key(key=self.variable_name, access=py_trees.common.Access.WRITE) - - def update(self) -> py_trees.common.Status: - self.blackboard.set(self.variable_name, self.value, overwrite=True) - return py_trees.common.Status.SUCCESS - - def go_to_ball_subtree(rd_robot_id: str) -> py_trees.behaviour.Behaviour: """ Builds a selector that drives the robot to the ball until it gains possession. diff --git a/utama_core/strategy/examples/one_robot_placement_strategy.py b/utama_core/strategy/examples/one_robot_placement_strategy.py index 4bb4b5b7..8c570731 100644 --- a/utama_core/strategy/examples/one_robot_placement_strategy.py +++ b/utama_core/strategy/examples/one_robot_placement_strategy.py @@ -7,13 +7,17 @@ from py_trees.composites import Sequence from utama_core.config.settings import TIMESTEP -from utama_core.entities.game.field import FieldBounds +from utama_core.entities.game.field import Field, FieldBounds from utama_core.global_utils.math_utils import Vector2D from utama_core.skills.src.utils.move_utils import move from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour # from robot_control.src.tests.utils import one_robot_placement from utama_core.strategy.common.abstract_strategy import AbstractStrategy +from utama_core.strategy.examples.utils import ( + CalculateFieldCenter, + SetBlackboardVariable, +) class RobotPlacementStep(AbstractBehaviour): @@ -30,18 +34,39 @@ class RobotPlacementStep(AbstractBehaviour): - `py_trees.common.Status.RUNNING`: The behaviour is actively commanding the robot to move. """ - def __init__(self, rd_robot_id: str, invert: bool = False): + def __init__(self, rd_robot_id: str, field_center_key: str = "FieldCenter"): super().__init__() + self.field_center_key = field_center_key self.robot_id_key = rd_robot_id - - self.ty = -1 - self.tx = -1 if invert else 1 + self.initialized = False + self.center_x = 0.0 + self.center_y = 0.0 + self.tx = 0.0 + self.ty = 0.0 def setup_(self): self.blackboard.register_key(key=self.robot_id_key, access=py_trees.common.Access.READ) + self.blackboard.register_key(key=self.field_center_key, access=py_trees.common.Access.READ) def update(self) -> py_trees.common.Status: """Closure which advances the simulation by one step.""" + + # Initialize targets if not ready + if not self.initialized: + try: + center = self.blackboard.get(self.field_center_key) + if center: + self.center_x, self.center_y = center + self.tx = self.center_x + self.ty = self.center_y + 0.5 + self.initialized = True + except KeyError: + # Center not yet available + return py_trees.common.Status.FAILURE + + if not self.initialized: + return py_trees.common.Status.FAILURE + game = self.blackboard.game rsim_env = self.blackboard.rsim_env id: int = self.blackboard.get(self.robot_id_key) @@ -52,15 +77,25 @@ def update(self) -> py_trees.common.Status: cx, cy = rp.x, rp.y error = math.dist((self.tx, self.ty), (cx, cy)) - switch = error < 0.05 - if switch: - if self.ty == -1: - self.ty = 1 - self.tx = random.choice([0, 1]) - else: - self.ty = -1 - self.tx = 1 - # self.tx = random.choice([0, 1]) + if game.friendly_robots and game.ball is not None: + friendly_robots = game.friendly_robots + bx, by = game.ball.p.x, game.ball.p.y + rp = friendly_robots[id].p + cx, cy, _ = rp.x, rp.y, friendly_robots[id].orientation + error = math.dist((self.tx, self.ty), (cx, cy)) + + # Ensure target x is always the center x + self.tx = self.center_x + + switch = error < 0.1 + if switch: + upper_target = self.center_y + 0.5 + lower_target = self.center_y - 0.5 + + if math.isclose(self.ty, lower_target, abs_tol=0.1): + self.ty = upper_target + else: + self.ty = lower_target # changed so the robot tracks the ball while moving oren = np.atan2(by - cy, bx - cx) @@ -81,31 +116,16 @@ def update(self) -> py_trees.common.Status: return py_trees.common.Status.RUNNING -class SetBlackboardVariable(AbstractBehaviour): - """A generic behaviour to set a variable on the blackboard.""" - - def __init__(self, name: str, variable_name: str, value: Any): - super().__init__(name=name) - self.variable_name = variable_name - self.value = value - - def setup_(self): - self.blackboard.register_key(key=self.variable_name, access=py_trees.common.Access.WRITE) - - def update(self) -> py_trees.common.Status: - # print(f"Setting {self.variable_name} to {self.value} on the blackboard.") - self.blackboard.set(self.variable_name, self.value, overwrite=True) - return py_trees.common.Status.SUCCESS - - class RobotPlacementStrategy(AbstractStrategy): - def __init__(self, robot_id: int): + def __init__(self, robot_id: int, field_bounds: Optional[FieldBounds] = None): """ Initializes the RobotPlacementStrategy with a specific robot ID. :param robot_id: The ID of the robot this strategy will control. + :param field_bounds: The bounds of the field to operate within. """ self.robot_id = robot_id + self.field_bounds = field_bounds if field_bounds else Field.FULL_FIELD_BOUNDS super().__init__() def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): @@ -124,6 +144,7 @@ def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: """Factory function to create a complete behaviour tree.""" robot_id_key = "target_robot_id" + field_center_key = "FieldCenter" coach_root = Sequence(name="CoachRoot", memory=False) @@ -135,6 +156,15 @@ def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: ### Assemble the tree ### - coach_root.add_children([set_rbt_id, RobotPlacementStep(rd_robot_id=robot_id_key)]) + # Calculate Field Center from custom field_bounds + calc_center = CalculateFieldCenter(field_bounds=self.field_bounds, output_key=field_center_key) + + coach_root.add_children( + [ + set_rbt_id, + calc_center, + RobotPlacementStep(rd_robot_id=robot_id_key, field_center_key=field_center_key), + ] + ) return coach_root diff --git a/utama_core/strategy/examples/two_robot_placement.py b/utama_core/strategy/examples/two_robot_placement.py new file mode 100644 index 00000000..32b7ff38 --- /dev/null +++ b/utama_core/strategy/examples/two_robot_placement.py @@ -0,0 +1,231 @@ +import math +from typing import Optional + +import numpy as np +import py_trees +from py_trees.composites import Parallel, Sequence + +from utama_core.config.settings import TIMESTEP +from utama_core.entities.game.field import Field, FieldBounds +from utama_core.global_utils.math_utils import Vector2D +from utama_core.skills.src.utils.move_utils import move +from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour +from utama_core.strategy.common.abstract_strategy import AbstractStrategy +from utama_core.strategy.examples.utils import ( + CalculateFieldCenter, + SetBlackboardVariable, +) + + +class RobotPlacementStep(AbstractBehaviour): + """ + A behaviour that commands a robot to move between two specific positions on the field. + + **Args:** + `role` (str): Role of the robot (horizontal or vertical). + `my_turn_idx` (int): The turn index for this robot. + `next_turn_idx` (int): The turn index to set when movement is complete. + + **Blackboard Interaction:** + Reads: + - `rd_robot_id` (int): The ID of the robot to check for ball possession. Typically from the `SetBlackboardVariable` node. + - `field_center_key` (str): Blackboard key for field center coordinates. + Writes: + - `turn_key` (int): The turn index for this robot. + """ + + def __init__( + self, + rd_robot_id: str, + role: str, + turn_key: str, + my_turn_idx: int, + next_turn_idx: int, + field_center_key: str = "FieldCenter", + ): + super().__init__() + self.robot_id_key = rd_robot_id + self.role = role + self.turn_key = turn_key + self.my_turn_idx = my_turn_idx + self.next_turn_idx = next_turn_idx + self.field_center_key = field_center_key + + # State management + self.current_target = None + self.initialized = False + self.start_point = None + self.end_point = None + + def setup_(self): + self.blackboard.register_key(key=self.robot_id_key, access=py_trees.common.Access.READ) + self.blackboard.register_key(key=self.turn_key, access=py_trees.common.Access.READ) + self.blackboard.register_key(key=self.turn_key, access=py_trees.common.Access.WRITE) + self.blackboard.register_key(key=self.field_center_key, access=py_trees.common.Access.READ) + + def update(self) -> py_trees.common.Status: + """Closure which advances the simulation by one step.""" + game = self.blackboard.game + rsim_env = self.blackboard.rsim_env + id: int = self.blackboard.get(self.robot_id_key) + + # Initialize targets if not ready + if not self.initialized: + try: + center = self.blackboard.get(self.field_center_key) + if center: + cx, cy = center + if self.role == "horizontal": + self.start_point = (cx - 0.5, cy) + self.end_point = (cx + 0.5, cy) + elif self.role == "vertical": + self.start_point = (cx, cy - 0.5) + self.end_point = (cx, cy + 0.5) + else: + return py_trees.common.Status.FAILURE + + self.current_target = self.end_point + self.initialized = True + except KeyError: + # Center not yet available + return py_trees.common.Status.FAILURE + + if not self.initialized: + return py_trees.common.Status.FAILURE + + # Check whose turn it is + current_turn = self.blackboard.get(self.turn_key) + + friendly_robots = game.friendly_robots + if not friendly_robots or id not in friendly_robots: + return py_trees.common.Status.FAILURE + + rp = friendly_robots[id].p + cx, cy = rp.x, rp.y + + cmd = None + + # Only move if it is my turn + if current_turn == self.my_turn_idx: + tx, ty = self.current_target + dist_error = math.dist((tx, ty), (cx, cy)) + + # Check if reached target + if dist_error < 0.05: + # Switch target for next time + if self.current_target == self.start_point: + self.current_target = self.end_point + else: + self.current_target = self.start_point + + # Pass turn to the other robot + self.blackboard.set(self.turn_key, self.next_turn_idx, overwrite=True) + + else: + # Move towards target + bx, by = game.ball.p.x, game.ball.p.y + oren = np.atan2(by - cy, bx - cx) + cmd = move( + game, + self.blackboard.motion_controller, + id, + Vector2D(tx, ty), + oren, + ) + + if rsim_env: + rsim_env.draw_point(tx, ty, color="red") + v = game.friendly_robots[id].v + p = game.friendly_robots[id].p + rsim_env.draw_point( + p.x + v.x * TIMESTEP * 5, + p.y + v.y * TIMESTEP * 5, + color="green", + ) + + if cmd: + self.blackboard.cmd_map[id] = cmd + + return py_trees.common.Status.RUNNING + + +class TwoRobotPlacementStrategy(AbstractStrategy): + def __init__( + self, + first_robot_id: int, + second_robot_id: int, + field_bounds: Optional[FieldBounds] = None, + ): + """ + Initialize the TwoRobotPlacementStrategy with two robot IDs and optional field bounds. + """ + self.first_robot_id = first_robot_id + self.second_robot_id = second_robot_id + self.field_bounds = field_bounds if field_bounds else Field.FULL_FIELD_BOUNDS + super().__init__() + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): + if n_runtime_friendly == 2: + return True + return False + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool): + return True # No specific goal line requirements + + def get_min_bounding_zone(self) -> Optional[FieldBounds]: + # toggles robot between (1, -1) and (1, 1) + # Using full field bounds logic now, but maybe should return specific bounds if needed. + # For now, keeping consistent with previous simple bounds or updating if needed. + return FieldBounds(top_left=(-1, 1), bottom_right=(1, -1)) + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + """Factory function to create a complete behaviour tree.""" + + first_robot_key = "first_robot_id" + second_robot_key = "second_robot_id" + turn_key = "CrossMovementTurn" + field_center_key = "FieldCenter" + + coach_root = Sequence(name="CoachRoot", memory=True) + + set_first_robot = SetBlackboardVariable(name="SetR1", variable_name=first_robot_key, value=self.first_robot_id) + set_second_robot = SetBlackboardVariable( + name="SetR2", variable_name=second_robot_key, value=self.second_robot_id + ) + + # Initialize turn to 0 (Robot 1 starts) + set_turn = SetBlackboardVariable(name="InitTurn", variable_name=turn_key, value=0) + + # Calculate Field Center from custom field_bounds + calc_center = CalculateFieldCenter(field_bounds=self.field_bounds, output_key=field_center_key) + + # Robot 1 (X-mover): Centered at (center_x, center_y), move range +/- 0.5 in X + move_robot1 = RobotPlacementStep( + rd_robot_id=first_robot_key, + role="horizontal", + turn_key=turn_key, + my_turn_idx=0, + next_turn_idx=1, + field_center_key=field_center_key, + ) + + # Robot 2 (Y-mover): Centered at (center_x, center_y), move range +/- 0.5 in Y + move_robot2 = RobotPlacementStep( + rd_robot_id=second_robot_key, + role="vertical", + turn_key=turn_key, + my_turn_idx=1, + next_turn_idx=0, + field_center_key=field_center_key, + ) + + # Use Parallel to allow both robots to be ticked + action_parallel = Parallel( + name="RobotActions", + policy=py_trees.common.ParallelPolicy.SuccessOnAll(synchronise=False), + ) + action_parallel.add_children([move_robot1, move_robot2]) + + coach_root.add_children([set_first_robot, set_second_robot, set_turn, calc_center, action_parallel]) + + return coach_root diff --git a/utama_core/strategy/examples/utils.py b/utama_core/strategy/examples/utils.py new file mode 100644 index 00000000..5d833e71 --- /dev/null +++ b/utama_core/strategy/examples/utils.py @@ -0,0 +1,53 @@ +from typing import Any + +import py_trees + +from utama_core.entities.game import FieldBounds +from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour + + +class SetBlackboardVariable(AbstractBehaviour): + """ + Writes a constant `value` onto the blackboard with the key `variable_name`. + **Blackboard Interaction:** + - Writes: + - `variable_name` (Any): The name of the blackboard variable to be set. + **Returns:** + - `py_trees.common.Status.SUCCESS`: The variable has been set. + """ + + def __init__(self, name: str, variable_name: str, value: Any): + super().__init__(name=name) + self.variable_name = variable_name + self.value = value + + def setup_(self): + self.blackboard.register_key(key=self.variable_name, access=py_trees.common.Access.WRITE) + + def update(self) -> py_trees.common.Status: + self.blackboard.set(self.variable_name, self.value, overwrite=True) + return py_trees.common.Status.SUCCESS + + +class CalculateFieldCenter(AbstractBehaviour): + """ + Calculates the center of the provided field bounds and writes it to the blackboard. + """ + + def __init__(self, field_bounds: FieldBounds, output_key: str = "FieldCenter"): + super().__init__(name="CalculateFieldCenter") + self.output_key = output_key + self.field_bounds = field_bounds + self.calculated = False + + def setup_(self): + self.blackboard.register_key(key=self.output_key, access=py_trees.common.Access.WRITE) + + def update(self) -> py_trees.common.Status: + if self.calculated: + return py_trees.common.Status.SUCCESS + + center = self.field_bounds.center + self.blackboard.set(self.output_key, center, overwrite=True) + self.calculated = True + return py_trees.common.Status.SUCCESS diff --git a/utama_core/team_controller/src/controllers/real/real_robot_controller.py b/utama_core/team_controller/src/controllers/real/real_robot_controller.py index ce1101eb..023a96d6 100644 --- a/utama_core/team_controller/src/controllers/real/real_robot_controller.py +++ b/utama_core/team_controller/src/controllers/real/real_robot_controller.py @@ -207,9 +207,7 @@ def _empty_command(self) -> bytearray: # * first byte is robot ID (here INVALID_RBT_ID) # * remaining (self._rbt_cmd_size - 1) bytes are zeros # - 0x55: end byte - cmd = bytearray( - [0xAA] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [0x55] - ) + cmd = bytearray([0xAA] + [INVALID_RBT_ID] + [0] * (self._rbt_cmd_size - 1) + [0x55]) commands.extend(cmd) self._cached_empty_command = commands return self._cached_empty_command.copy() diff --git a/utama_core/tests/strategy_examples/test_placement_coords.py b/utama_core/tests/strategy_examples/test_placement_coords.py new file mode 100644 index 00000000..21053065 --- /dev/null +++ b/utama_core/tests/strategy_examples/test_placement_coords.py @@ -0,0 +1,150 @@ +"""Integration tests for RobotPlacementStrategy using AbstractTestManager. + +These tests verify that: +1. The robot moves to positions based on the provided field_bounds center +2. The robot oscillates between the expected start and end points +3. Custom field_bounds correctly shift the placement region +""" + +import math +from typing import Optional + +import pytest + +from utama_core.config.formations import LEFT_START_ONE, RIGHT_START_ONE +from utama_core.entities.game import Game +from utama_core.entities.game.field import FieldBounds +from utama_core.global_utils.mapping_utils import ( + map_friendly_enemy_to_colors, + map_left_right_to_colors, +) +from utama_core.run import StrategyRunner +from utama_core.strategy.examples.one_robot_placement_strategy import ( + RobotPlacementStrategy, +) +from utama_core.team_controller.src.controllers import AbstractSimController +from utama_core.tests.common.abstract_test_manager import ( + AbstractTestManager, + TestingStatus, +) + + +class RobotPlacementTestManager(AbstractTestManager): + """Test manager that verifies robot oscillates around the expected center.""" + + n_episodes = 1 + + def __init__(self, expected_center: tuple[float, float], tolerance: float = 0.15): + super().__init__() + self.expected_center = expected_center + self.expected_upper = (expected_center[0], expected_center[1] + 0.5) + self.expected_lower = (expected_center[0], expected_center[1] - 0.5) + self.reached_upper = False + self.reached_lower = False + self.tolerance = tolerance + + def reset_field(self, sim_controller: AbstractSimController, game: Game): + """Reset robot and ball positions for the test.""" + ini_yellow, ini_blue = map_left_right_to_colors( + game.my_team_is_yellow, + game.my_team_is_right, + RIGHT_START_ONE, + LEFT_START_ONE, + ) + + y_robots, b_robots = map_friendly_enemy_to_colors( + game.my_team_is_yellow, game.friendly_robots, game.enemy_robots + ) + + for i in b_robots.keys(): + sim_controller.teleport_robot(False, i, ini_blue[i][0], ini_blue[i][1], ini_blue[i][2]) + for j in y_robots.keys(): + sim_controller.teleport_robot(True, j, ini_yellow[j][0], ini_yellow[j][1], ini_yellow[j][2]) + + sim_controller.teleport_robot(game.my_team_is_yellow, self.my_strategy.robot_id, 0, 0) + sim_controller.teleport_ball(3, 3) + + def eval_status(self, game: Game) -> TestingStatus: + """Verify robot reaches both oscillation targets.""" + robot = game.friendly_robots.get(self.my_strategy.robot_id) + if not robot: + return TestingStatus.IN_PROGRESS + + robot_pos = (robot.p.x, robot.p.y) + + if math.dist(robot_pos, self.expected_upper) < self.tolerance: + self.reached_upper = True + if math.dist(robot_pos, self.expected_lower) < self.tolerance: + self.reached_lower = True + + if self.reached_upper and self.reached_lower: + return TestingStatus.SUCCESS + return TestingStatus.IN_PROGRESS + + +def _run_placement_test(field_bounds: Optional[FieldBounds], expected_center: tuple[float, float]): + """Helper to run a placement strategy test with given bounds.""" + strategy = RobotPlacementStrategy(robot_id=0, field_bounds=field_bounds) + + runner = StrategyRunner( + strategy=strategy, + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=1, + exp_enemy=0, + ) + + test_manager = RobotPlacementTestManager(expected_center=expected_center) + passed = runner.run_test(test_manager=test_manager, episode_timeout=15, rsim_headless=True) + + return passed, test_manager + + +class TestFieldBoundsCenter: + """Tests for FieldBounds center calculation.""" + + def test_full_field_center_is_origin(self): + """Full field bounds should have center at (0, 0).""" + bounds = FieldBounds(top_left=(-4.5, 3.0), bottom_right=(4.5, -3.0)) + assert bounds.center == (0.0, 0.0) + + def test_custom_bounds_center(self): + """Custom bounds should correctly calculate center.""" + bounds = FieldBounds(top_left=(1.0, 2.0), bottom_right=(3.0, 0.0)) + assert bounds.center == (2.0, 1.0) + + def test_custom_differs_from_default(self): + """Custom and default bounds must have different centers.""" + default = FieldBounds(top_left=(-4.5, 3.0), bottom_right=(4.5, -3.0)) + custom = FieldBounds(top_left=(1.0, 2.0), bottom_right=(3.0, 0.0)) + assert default.center != custom.center + + +class TestRobotPlacementStrategy: + """Integration tests for RobotPlacementStrategy behavior.""" + + def test_oscillation_with_custom_bounds(self): + """Robot oscillates around custom bounds center (2.0, 1.0).""" + bounds = FieldBounds(top_left=(1.0, 2.0), bottom_right=(3.0, 0.0)) + expected_center = bounds.center + + passed, manager = _run_placement_test(bounds, expected_center) + + assert manager.reached_upper, f"Never reached upper target {manager.expected_upper}" + assert manager.reached_lower, f"Never reached lower target {manager.expected_lower}" + assert passed + + def test_oscillation_with_default_bounds(self): + """Robot oscillates around default field center (0, 0).""" + expected_center = (0.0, 0.0) + + passed, manager = _run_placement_test(field_bounds=None, expected_center=expected_center) + + assert manager.reached_upper, f"Never reached upper target {manager.expected_upper}" + assert manager.reached_lower, f"Never reached lower target {manager.expected_lower}" + assert passed + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/utama_core/tests/strategy_examples/test_two_robot_placement_coords.py b/utama_core/tests/strategy_examples/test_two_robot_placement_coords.py new file mode 100644 index 00000000..40c892a9 --- /dev/null +++ b/utama_core/tests/strategy_examples/test_two_robot_placement_coords.py @@ -0,0 +1,176 @@ +"""Integration tests for TwoRobotPlacementStrategy using AbstractTestManager. + +These tests verify that: +1. Both robots move to positions based on the provided field_bounds center +2. Robot 1 oscillates horizontally (center ± 0.5 in X) +3. Robot 2 oscillates vertically (center ± 0.5 in Y) +4. Turn-based coordination works correctly +5. Custom field_bounds correctly shift the placement region +""" + +import math +from typing import Optional + +import pytest + +from utama_core.config.formations import LEFT_START_ONE, RIGHT_START_ONE +from utama_core.entities.game import Game +from utama_core.entities.game.field import FieldBounds +from utama_core.global_utils.mapping_utils import ( + map_friendly_enemy_to_colors, + map_left_right_to_colors, +) +from utama_core.run import StrategyRunner +from utama_core.strategy.examples.two_robot_placement import TwoRobotPlacementStrategy +from utama_core.team_controller.src.controllers import AbstractSimController +from utama_core.tests.common.abstract_test_manager import ( + AbstractTestManager, + TestingStatus, +) + + +class TwoRobotPlacementTestManager(AbstractTestManager): + """Test manager that verifies both robots oscillate around the expected center.""" + + n_episodes = 1 + + def __init__( + self, + expected_center: tuple[float, float], + first_robot_id: int, + second_robot_id: int, + tolerance: float = 0.15, + ): + super().__init__() + self.expected_center = expected_center + self.first_robot_id = first_robot_id + self.second_robot_id = second_robot_id + self.tolerance = tolerance + + # Robot 1 (horizontal mover) targets + self.r1_left = (expected_center[0] - 0.5, expected_center[1]) + self.r1_right = (expected_center[0] + 0.5, expected_center[1]) + self.r1_reached_left = False + self.r1_reached_right = False + + # Robot 2 (vertical mover) targets + self.r2_upper = (expected_center[0], expected_center[1] + 0.5) + self.r2_lower = (expected_center[0], expected_center[1] - 0.5) + self.r2_reached_upper = False + self.r2_reached_lower = False + + def reset_field(self, sim_controller: AbstractSimController, game: Game): + """Reset robot and ball positions for the test.""" + ini_yellow, ini_blue = map_left_right_to_colors( + game.my_team_is_yellow, + game.my_team_is_right, + RIGHT_START_ONE, + LEFT_START_ONE, + ) + + y_robots, b_robots = map_friendly_enemy_to_colors( + game.my_team_is_yellow, game.friendly_robots, game.enemy_robots + ) + + for i in b_robots.keys(): + sim_controller.teleport_robot(False, i, ini_blue[i][0], ini_blue[i][1], ini_blue[i][2]) + for j in y_robots.keys(): + sim_controller.teleport_robot(True, j, ini_yellow[j][0], ini_yellow[j][1], ini_yellow[j][2]) + + # Position robots near the center for faster test convergence + cx, cy = self.expected_center + sim_controller.teleport_robot(game.my_team_is_yellow, self.first_robot_id, cx, cy) + sim_controller.teleport_robot(game.my_team_is_yellow, self.second_robot_id, cx, cy) + sim_controller.teleport_ball(3, 3) + + def eval_status(self, game: Game) -> TestingStatus: + """Verify both robots reach their oscillation targets.""" + robot1 = game.friendly_robots.get(self.first_robot_id) + robot2 = game.friendly_robots.get(self.second_robot_id) + if not robot1 or not robot2: + return TestingStatus.IN_PROGRESS + + r1_pos = (robot1.p.x, robot1.p.y) + r2_pos = (robot2.p.x, robot2.p.y) + + # Check Robot 1 horizontal targets + if math.dist(r1_pos, self.r1_left) < self.tolerance: + self.r1_reached_left = True + if math.dist(r1_pos, self.r1_right) < self.tolerance: + self.r1_reached_right = True + + # Check Robot 2 vertical targets + if math.dist(r2_pos, self.r2_upper) < self.tolerance: + self.r2_reached_upper = True + if math.dist(r2_pos, self.r2_lower) < self.tolerance: + self.r2_reached_lower = True + + # Success when both robots reach both their targets + if self.r1_reached_left and self.r1_reached_right and self.r2_reached_upper and self.r2_reached_lower: + return TestingStatus.SUCCESS + return TestingStatus.IN_PROGRESS + + +def _run_two_robot_placement_test( + field_bounds: Optional[FieldBounds], + expected_center: tuple[float, float], + first_robot_id: int = 0, + second_robot_id: int = 1, +): + """Helper to run a two-robot placement strategy test with given bounds.""" + strategy = TwoRobotPlacementStrategy( + first_robot_id=first_robot_id, + second_robot_id=second_robot_id, + field_bounds=field_bounds, + ) + + runner = StrategyRunner( + strategy=strategy, + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=2, + exp_enemy=0, + ) + + test_manager = TwoRobotPlacementTestManager( + expected_center=expected_center, + first_robot_id=first_robot_id, + second_robot_id=second_robot_id, + ) + passed = runner.run_test(test_manager=test_manager, episode_timeout=20, rsim_headless=True) + + return passed, test_manager + + +class TestTwoRobotPlacementStrategy: + """Integration tests for TwoRobotPlacementStrategy behavior.""" + + def test_oscillation_with_default_bounds(self): + """Both robots oscillate around default field center (0, 0).""" + expected_center = (0.0, 0.0) + + passed, manager = _run_two_robot_placement_test(field_bounds=None, expected_center=expected_center) + + assert manager.r1_reached_left, f"Robot 1 never reached left target {manager.r1_left}" + assert manager.r1_reached_right, f"Robot 1 never reached right target {manager.r1_right}" + assert manager.r2_reached_upper, f"Robot 2 never reached upper target {manager.r2_upper}" + assert manager.r2_reached_lower, f"Robot 2 never reached lower target {manager.r2_lower}" + assert passed + + def test_oscillation_with_custom_bounds(self): + """Both robots oscillate around custom bounds center (2.0, 1.0).""" + bounds = FieldBounds(top_left=(1.0, 2.0), bottom_right=(3.0, 0.0)) + expected_center = bounds.center + + passed, manager = _run_two_robot_placement_test(bounds, expected_center) + + assert manager.r1_reached_left, f"Robot 1 never reached left target {manager.r1_left}" + assert manager.r1_reached_right, f"Robot 1 never reached right target {manager.r1_right}" + assert manager.r2_reached_upper, f"Robot 2 never reached upper target {manager.r2_upper}" + assert manager.r2_reached_lower, f"Robot 2 never reached lower target {manager.r2_lower}" + assert passed + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/utama_core/tests/strategy_runner/test_error_handling.py b/utama_core/tests/strategy_runner/test_error_handling.py new file mode 100644 index 00000000..3499c334 --- /dev/null +++ b/utama_core/tests/strategy_runner/test_error_handling.py @@ -0,0 +1,140 @@ +"""Tests for error handling and safety mechanisms in strategies (REAL mode only).""" + +from unittest.mock import MagicMock, patch + +import pytest + +from utama_core.config.enums import Mode + + +@pytest.fixture +def mock_runner(): + """Create a mock StrategyRunner with REAL mode for testing.""" + from utama_core.run.strategy_runner import StrategyRunner + + with patch.object(StrategyRunner, "__init__", lambda self: None): + runner = StrategyRunner() + runner.mode = Mode.REAL + runner.logger = MagicMock() + runner.profiler = None + runner.replay_writer = None + runner.rsim_env = None + runner._fps_live = None + runner._stop_event = MagicMock() + runner._stop_event.is_set.return_value = False + runner.opp_game = None + runner.opp_strategy = None + + # Mock game with robots + runner.my_game = MagicMock() + runner.my_game.friendly_robots = {0: MagicMock(), 1: MagicMock()} + + # Mock strategy and robot controller + runner.my_strategy = MagicMock() + runner.my_strategy.robot_controller = MagicMock() + + yield runner + + +class TestStopRobotsOnClose: + """Tests for _stop_robots behavior when close() is called.""" + + def test_stop_commands_sent_in_real_mode(self, mock_runner): + """Verify stop commands are sent when close() is invoked in REAL mode.""" + mock_runner.close(stop_command_mult=5) + + controller = mock_runner.my_strategy.robot_controller + assert controller.add_robot_commands.call_count == 5 + assert controller.send_robot_commands.call_count == 5 + + def test_stop_commands_have_zero_velocity(self, mock_runner): + """Verify stop commands have all zero velocities and disabled actuators.""" + mock_runner.my_game.friendly_robots = { + 0: MagicMock(), + 1: MagicMock(), + 2: MagicMock(), + } + mock_runner._stop_robots(stop_command_mult=1) + + controller = mock_runner.my_strategy.robot_controller + commands_dict = controller.add_robot_commands.call_args[0][0] + + for robot_id, cmd in commands_dict.items(): + assert cmd.local_forward_vel == 0, f"Robot {robot_id}: non-zero forward vel" + assert cmd.local_left_vel == 0, f"Robot {robot_id}: non-zero left vel" + assert cmd.angular_vel == 0, f"Robot {robot_id}: non-zero angular vel" + assert not cmd.kick, f"Robot {robot_id}: kick enabled" + assert not cmd.chip, f"Robot {robot_id}: chip enabled" + assert not cmd.dribble, f"Robot {robot_id}: dribble enabled" + + def test_stop_not_called_in_rsim_mode(self, mock_runner): + """Verify stop commands are NOT sent in RSIM mode.""" + mock_runner.mode = Mode.RSIM + mock_runner.rsim_env = MagicMock() + mock_runner._stop_robots = MagicMock() + + mock_runner.close() + + mock_runner._stop_robots.assert_not_called() + + +class TestStopRobotsOnError: + """Tests for stop behavior when errors occur during execution.""" + + def test_stop_on_runtime_exception(self, mock_runner): + """Verify stop commands sent when exception occurs during run().""" + call_count = {"value": 0} + + def failing_run_step(): + call_count["value"] += 1 + if call_count["value"] >= 3: + raise RuntimeError("Test exception") + + mock_runner._run_step = failing_run_step + + with pytest.raises(RuntimeError, match="Test exception"): + mock_runner.run() + + controller = mock_runner.my_strategy.robot_controller + assert controller.add_robot_commands.call_count >= 1 + assert controller.send_robot_commands.call_count >= 1 + + def test_stop_on_vision_loss(self, mock_runner): + """Verify stop commands sent when vision data is lost.""" + call_count = {"value": 0} + + def vision_loss_run_step(): + call_count["value"] += 1 + if call_count["value"] >= 3: + raise KeyError("No vision data - camera disconnected") + + mock_runner._run_step = vision_loss_run_step + + with pytest.raises(KeyError, match="No vision data"): + mock_runner.run() + + controller = mock_runner.my_strategy.robot_controller + assert controller.add_robot_commands.call_count >= 1, "Stop commands not sent after vision loss" + assert controller.send_robot_commands.call_count >= 1, "Stop commands not transmitted" + + def test_stop_on_stop_event_signal(self, mock_runner): + """Verify stop commands sent when stop event is signaled (e.g., SIGINT handler).""" + call_count = {"value": 0} + + def signaled_run_step(): + call_count["value"] += 1 + if call_count["value"] >= 3: + # Simulate SIGINT being handled - sets stop_event + mock_runner._stop_event.is_set.return_value = True + + mock_runner._run_step = signaled_run_step + + # Should exit gracefully when stop_event is set + mock_runner.run() + + controller = mock_runner.my_strategy.robot_controller + assert controller.add_robot_commands.call_count >= 1 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])