diff --git a/isaaclab_arena/embodiments/franka/franka.py b/isaaclab_arena/embodiments/franka/franka.py index 7ebf8d46..667a5be2 100644 --- a/isaaclab_arena/embodiments/franka/franka.py +++ b/isaaclab_arena/embodiments/franka/franka.py @@ -46,6 +46,7 @@ def __init__( self, enable_cameras: bool = False, initial_pose: Pose | None = None, + initial_joint_pose: list[float] | None = None, concatenate_observation_terms: bool = False, arm_mode: ArmMode | None = None, ): @@ -55,6 +56,8 @@ def __init__( self.observation_config = FrankaObservationsCfg() self.observation_config.policy.concatenate_terms = self.concatenate_observation_terms self.event_config = FrankaEventCfg() + if initial_joint_pose is not None: + self.set_initial_joint_pose(initial_joint_pose) self.reward_config = FrankaRewardsCfg() self.mimic_env = FrankaMimicEnv @@ -68,6 +71,9 @@ def _update_scene_cfg_with_robot_initial_pose(self, scene_config: Any, pose: Pos scene_config.stand.init_state.rot = pose.rotation_wxyz return scene_config + def set_initial_joint_pose(self, initial_joint_pose: list[float]) -> None: + self.event_config.init_franka_arm_pose.params["default_pose"] = initial_joint_pose + def get_ee_frame_name(self, arm_mode: ArmMode) -> str: return "ee_frame" @@ -176,7 +182,7 @@ def __post_init__(self): @configclass class FrankaEventCfg: - """Configuration for Franek.""" + """Configuration for Franka.""" init_franka_arm_pose = EventTerm( func=franka_stack_events.set_default_joint_pose, diff --git a/isaaclab_arena/tasks/goal_pose_task.py b/isaaclab_arena/tasks/goal_pose_task.py new file mode 100644 index 00000000..39e196af --- /dev/null +++ b/isaaclab_arena/tasks/goal_pose_task.py @@ -0,0 +1,136 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np +from dataclasses import MISSING + +import isaaclab.envs.mdp as mdp_isaac_lab +from isaaclab.envs.common import ViewerCfg +from isaaclab.managers import EventTermCfg, SceneEntityCfg, TerminationTermCfg +from isaaclab.scene import InteractiveSceneCfg +from isaaclab.utils import configclass + +from isaaclab_arena.assets.asset import Asset +from isaaclab_arena.metrics.metric_base import MetricBase +from isaaclab_arena.metrics.object_moved import ObjectMovedRateMetric +from isaaclab_arena.metrics.success_rate import SuccessRateMetric +from isaaclab_arena.tasks.task_base import TaskBase +from isaaclab_arena.tasks.terminations import goal_pose_task_termination +from isaaclab_arena.terms.events import set_object_pose +from isaaclab_arena.utils.cameras import get_viewer_cfg_look_at_object + + +class GoalPoseTask(TaskBase): + def __init__( + self, + object: Asset, + episode_length_s: float | None = None, + target_x_range: tuple[float, float] | None = None, + target_y_range: tuple[float, float] | None = None, + target_z_range: tuple[float, float] | None = None, + target_orientation_wxyz: tuple[float, float, float, float] | None = None, + target_orientation_tolerance_rad: float | None = None, + ): + """ + Args: + object: The object asset for the goal pose task. + episode_length_s: Episode length in seconds. + target_x_range: Success zone x-range [min, max] in meters. + target_y_range: Success zone y-range [min, max] in meters. + target_z_range: Success zone z-range [min, max] in meters. + target_orientation_wxyz: Target quaternion [w, x, y, z]. + target_orientation_tolerance_rad: Angular tolerance in radians (default: 0.1). + """ + super().__init__(episode_length_s=episode_length_s) + self.object = object + # this is needed to revise the default env_spacing in arena_env_builder: priority task > embodiment > scene > default + self.scene_config = InteractiveSceneCfg(num_envs=1, env_spacing=3.0, replicate_physics=False) + self.events_cfg = GoalPoseEventCfg(self.object) + self.termination_cfg = self.make_termination_cfg( + target_x_range=target_x_range, + target_y_range=target_y_range, + target_z_range=target_z_range, + target_orientation_wxyz=target_orientation_wxyz, + target_orientation_tolerance_rad=target_orientation_tolerance_rad, + ) + + def get_scene_cfg(self): + return self.scene_config + + def get_termination_cfg(self): + return self.termination_cfg + + def make_termination_cfg( + self, + target_x_range: tuple[float, float] | None = None, + target_y_range: tuple[float, float] | None = None, + target_z_range: tuple[float, float] | None = None, + target_orientation_wxyz: tuple[float, float, float, float] | None = None, + target_orientation_tolerance_rad: float | None = None, + ): + params: dict = {"object_cfg": SceneEntityCfg(self.object.name)} + if target_x_range is not None: + params["target_x_range"] = target_x_range + if target_y_range is not None: + params["target_y_range"] = target_y_range + if target_z_range is not None: + params["target_z_range"] = target_z_range + if target_orientation_wxyz is not None: + params["target_orientation_wxyz"] = target_orientation_wxyz + if target_orientation_tolerance_rad is not None: + params["target_orientation_tolerance_rad"] = target_orientation_tolerance_rad + + success = TerminationTermCfg( + func=goal_pose_task_termination, + params=params, + ) + return TerminationsCfg(success=success) + + def get_events_cfg(self): + return self.events_cfg + + def get_prompt(self): + raise NotImplementedError("Function not implemented yet.") + + def get_mimic_env_cfg(self, embodiment_name: str): + raise NotImplementedError("Function not implemented yet.") + + def get_metrics(self) -> list[MetricBase]: + return [ + SuccessRateMetric(), + ObjectMovedRateMetric(self.object), + ] + + def get_viewer_cfg(self) -> ViewerCfg: + return get_viewer_cfg_look_at_object(lookat_object=self.object, offset=np.array([1.5, 1.5, 1.5])) + + +@configclass +class TerminationsCfg: + """Termination terms for the MDP.""" + + time_out: TerminationTermCfg = TerminationTermCfg(func=mdp_isaac_lab.time_out) + success: TerminationTermCfg = MISSING + + +@configclass +class GoalPoseEventCfg: + """Configuration for goal pose.""" + + reset_object_pose: EventTermCfg = MISSING + + def __init__(self, object: Asset): + initial_pose = object.get_initial_pose() + if initial_pose is not None: + self.reset_object_pose = EventTermCfg( + func=set_object_pose, + mode="reset", + params={ + "pose": initial_pose, + "asset_cfg": SceneEntityCfg(object.name), + }, + ) + else: + raise ValueError(f"Initial pose is not set for the object {object.name}") diff --git a/isaaclab_arena/tasks/terminations.py b/isaaclab_arena/tasks/terminations.py index e8fd2ea6..70c0eeb1 100644 --- a/isaaclab_arena/tasks/terminations.py +++ b/isaaclab_arena/tasks/terminations.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import math import torch from isaaclab.assets import RigidObject @@ -77,6 +78,71 @@ def objects_in_proximity( return done +def goal_pose_task_termination( + env: ManagerBasedRLEnv, + object_cfg: SceneEntityCfg = SceneEntityCfg("object"), + target_x_range: tuple[float, float] | None = None, + target_y_range: tuple[float, float] | None = None, + target_z_range: tuple[float, float] | None = None, + target_orientation_wxyz: tuple[float, float, float, float] | None = None, + target_orientation_tolerance_rad: float = 0.1, +) -> torch.Tensor: + """Terminate when the object's pose is within the thresholds (BBox + Orientation). + + Args: + env: The RL environment instance. + object_cfg: The configuration of the object to track. + target_x_range: Success zone x-range [min, max] in meters. + target_y_range: Success zone y-range [min, max] in meters. + target_z_range: Success zone z-range [min, max] in meters. + target_orientation_wxyz: Target quaternion [w, x, y, z]. + target_orientation_tolerance_rad: Angular tolerance in radians (default: 0.1). + + Returns: + A boolean tensor of shape (num_envs, ) + """ + object_instance: RigidObject = env.scene[object_cfg.name] + object_root_pos_w = object_instance.data.root_pos_w + object_root_quat_w = object_instance.data.root_quat_w + + device = env.device + num_envs = env.num_envs + + has_any_threshold = any([ + target_x_range is not None, + target_y_range is not None, + target_z_range is not None, + target_orientation_wxyz is not None, + ]) + + if not has_any_threshold: + return torch.zeros(num_envs, dtype=torch.bool, device=device) + + success = torch.ones(num_envs, dtype=torch.bool, device=device) + + # Position range checks + ranges = [target_x_range, target_y_range, target_z_range] + for idx, range_val in enumerate(ranges): + if range_val is not None: + range_min, range_max = range_val + in_range = (object_root_pos_w[:, idx] >= range_min) & (object_root_pos_w[:, idx] <= range_max) + success &= in_range + + # Orientation check + if target_orientation_wxyz is not None: + target_quat = torch.tensor(target_orientation_wxyz, device=device, dtype=torch.float32).unsqueeze(0) + + # Formula: || > cos(tolerance / 2) + quat_dot = torch.sum(object_root_quat_w * target_quat, dim=-1) + abs_dot = torch.abs(quat_dot) + min_cos = math.cos(target_orientation_tolerance_rad / 2.0) + + ori_success = abs_dot >= min_cos + success &= ori_success + + return success + + def object_above( env: ManagerBasedRLEnv, object_cfg: SceneEntityCfg, diff --git a/isaaclab_arena/tests/test_achieve_cube_goal_pose.py b/isaaclab_arena/tests/test_achieve_cube_goal_pose.py new file mode 100644 index 00000000..791cbe33 --- /dev/null +++ b/isaaclab_arena/tests/test_achieve_cube_goal_pose.py @@ -0,0 +1,254 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import gymnasium as gym +import torch + +from isaaclab_arena.tests.utils.subprocess import run_simulation_app_function + +NUM_STEPS = 10 +HEADLESS = True + + +def get_test_environment(num_envs: int): + """Returns a scene which we use for these tests.""" + + from isaaclab_arena.assets.asset_registry import AssetRegistry + from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser + from isaaclab_arena.embodiments.franka.franka import FrankaEmbodiment + from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder + from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment + from isaaclab_arena.scene.scene import Scene + from isaaclab_arena.tasks.goal_pose_task import GoalPoseTask + from isaaclab_arena.utils.pose import Pose + + args_parser = get_isaaclab_arena_cli_parser() + args_cli = args_parser.parse_args(["--num_envs", str(num_envs)]) + + asset_registry = AssetRegistry() + background = asset_registry.get_asset_by_name("table")() + light = asset_registry.get_asset_by_name("light")() + dex_cube = asset_registry.get_asset_by_name("dex_cube")() + + # Set the initial pose of the cube + dex_cube.set_initial_pose( + Pose( + position_xyz=(0.1, 0.0, 0.05), + rotation_wxyz=(1.0, 0.0, 0.0, 0.0), + ) + ) + + scene = Scene(assets=[background, light, dex_cube]) + + # Define success thresholds: z in [0.0, 0.5] and yaw 90 degrees + task = GoalPoseTask( + dex_cube, + target_z_range=(0.0, 0.5), + target_orientation_wxyz=(0.7071, 0.0, 0.0, 0.7071), # yaw 90 degrees + target_orientation_tolerance_rad=0.2, + ) + + embodiment = FrankaEmbodiment() + embodiment.set_initial_pose( + Pose( + position_xyz=(-0.4, 0.0, 0.0), + rotation_wxyz=(1.0, 0.0, 0.0, 0.0), + ) + ) + + isaaclab_arena_environment = IsaacLabArenaEnvironment( + name="test_achieve_cube_goal_pose", + embodiment=embodiment, + scene=scene, + task=task, + ) + + env_builder = ArenaEnvBuilder(isaaclab_arena_environment, args_cli) + name, cfg = env_builder.build_registered() + env = gym.make(name, cfg=cfg).unwrapped + env.reset() + + return env, dex_cube + + +def _test_achieve_cube_goal_pose_initial_state(simulation_app) -> bool: + """Test that the cube is not in success state initially.""" + + from isaaclab.envs.manager_based_env import ManagerBasedEnv + + from isaaclab_arena.tests.utils.simulation import step_zeros_and_call + + env, dex_cube = get_test_environment(num_envs=1) + + def assert_not_success(env: ManagerBasedEnv, terminated: torch.Tensor): + # Initially the cube is at (0.1, 0.0, 0.2) with identity quaternion (1, 0, 0, 0) + # The target orientation is (0.7071, 0, 0, 0.7071) - yaw 90 degrees + # So the task should NOT be successful + assert not terminated.item(), "Task should not be successful initially" + + try: + print("Testing initial state - cube should not be in success state") + step_zeros_and_call(env, NUM_STEPS, assert_not_success) + print("Initial state test passed: cube is not in success state") + + except Exception as e: + print(f"Error: {e}") + return False + + finally: + env.close() + + return True + + +def _test_achieve_cube_goal_pose_success(simulation_app) -> bool: + """Test that the cube reaches success state when moved to target pose.""" + + from isaaclab.assets import RigidObject + + env, dex_cube = get_test_environment(num_envs=1) + + try: + print("Testing success state - moving cube to target pose") + + with torch.inference_mode(): + # Get the cube rigid object from the scene + cube_object: RigidObject = env.scene[dex_cube.name] + + # Set the cube to target pose: + # - Position: z > 0.2 (in success zone) + # - Orientation: yaw 90 degrees (0.7071, 0, 0, 0.7071) + target_pos = torch.tensor([[0.3, 0.0, 0.05]], device=env.device) # z=0.5 is in [0.2, 1.0] + target_quat = torch.tensor([[0.7071, 0.0, 0.0, 0.7071]], device=env.device) # yaw 90 degrees + + # Step the environment to let the physics settle + for _ in range(NUM_STEPS): + + # Write the new pose to the simulation + cube_object.write_root_pose_to_sim( + root_pose=torch.cat([target_pos + env.scene.env_origins, target_quat], dim=-1) + ) + # Also set velocity to zero to stabilize + cube_object.write_root_velocity_to_sim(root_velocity=torch.zeros((1, 6), device=env.device)) + + actions = torch.zeros(env.action_space.shape, device=env.device) + _, _, terminated, _, info = env.step(actions) + + # Check if the task is successful + # The 'success' termination should be triggered + print(f"Terminated: {terminated}") + assert terminated.item(), "Task should be successful after moving to target pose" + print("Success state test passed: cube reached target pose") + + except Exception as e: + print(f"Error: {e}") + return False + + finally: + env.close() + + return True + + +def _test_achieve_cube_goal_pose_multiple_envs(simulation_app) -> bool: + """Test goal pose cube pose with multiple environments.""" + + from isaaclab.assets import RigidObject + + from isaaclab_arena.tests.utils.simulation import step_zeros_and_call + + env, dex_cube = get_test_environment(num_envs=2) + + try: + print("Testing multiple environments") + + with torch.inference_mode(): + cube_object: RigidObject = env.scene[dex_cube.name] + + # Initially, both envs should not be successful + step_zeros_and_call(env, 1) + + # Move only the first env's cube to target pose + current_poses = cube_object.data.root_state_w.clone() + + # Set first env to success pose + target_pos_0 = env.scene.env_origins[0] + torch.tensor([0.1, 0.0, 0.5], device=env.device) + target_quat = torch.tensor([0.7071, 0.0, 0.0, 0.7071], device=env.device) + + new_poses = current_poses.clone() + new_poses[0, :3] = target_pos_0 + new_poses[0, 3:7] = target_quat + new_poses[0, 7:] = 0 # zero velocity + + # Step and check + for _ in range(NUM_STEPS): + cube_object.write_root_state_to_sim(new_poses) + actions = torch.zeros(env.action_space.shape, device=env.device) + _, _, terminated, _, _ = env.step(actions) + + print(f"Expected: [True, False], got: {terminated}") + assert terminated[0].item(), "First env should be successful" + assert not terminated[1].item(), "Second env should not be successful" + + # Now move second env to success pose too + current_poses = cube_object.data.root_state_w.clone() + target_pos_1 = env.scene.env_origins[1] + torch.tensor([0.1, 0.0, 0.5], device=env.device) + + new_poses = current_poses.clone() + new_poses[0, :3] = env.scene.env_origins[0] + torch.tensor([0.1, 0.0, 0.5], device=env.device) + new_poses[0, 3:7] = target_quat + new_poses[0, 7:] = 0 + new_poses[1, :3] = target_pos_1 + new_poses[1, 3:7] = target_quat + new_poses[1, 7:] = 0 + + for _ in range(NUM_STEPS): + actions = torch.zeros(env.action_space.shape, device=env.device) + cube_object.write_root_state_to_sim(new_poses) + _, _, terminated, _, _ = env.step(actions) + + print(f"Expected: [True, True], got: {terminated}") + assert torch.all(terminated), "Both envs should be successful" + + print("Multiple environments test passed") + + except Exception as e: + print(f"Error: {e}") + return False + + finally: + env.close() + + return True + + +def test_achieve_cube_goal_pose_initial_state(): + result = run_simulation_app_function( + _test_achieve_cube_goal_pose_initial_state, + headless=HEADLESS, + ) + assert result, f"Test {_test_achieve_cube_goal_pose_initial_state.__name__} failed" + + +def test_achieve_cube_goal_pose_success(): + result = run_simulation_app_function( + _test_achieve_cube_goal_pose_success, + headless=HEADLESS, + ) + assert result, f"Test {_test_achieve_cube_goal_pose_success.__name__} failed" + + +def test_achieve_cube_goal_pose_multiple_envs(): + result = run_simulation_app_function( + _test_achieve_cube_goal_pose_multiple_envs, + headless=HEADLESS, + ) + assert result, f"Test {_test_achieve_cube_goal_pose_multiple_envs.__name__} failed" + + +if __name__ == "__main__": + test_achieve_cube_goal_pose_initial_state() + test_achieve_cube_goal_pose_success() + test_achieve_cube_goal_pose_multiple_envs() diff --git a/isaaclab_arena_environments/cli.py b/isaaclab_arena_environments/cli.py index 006bd68d..77478ace 100644 --- a/isaaclab_arena_environments/cli.py +++ b/isaaclab_arena_environments/cli.py @@ -8,6 +8,7 @@ from typing import Any from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser +from isaaclab_arena_environments.cube_goal_pose_environment import CubeGoalPoseEnvironment from isaaclab_arena_environments.galileo_g1_locomanip_pick_and_place_environment import ( GalileoG1LocomanipPickAndPlaceEnvironment, ) @@ -32,6 +33,7 @@ GalileoPickAndPlaceEnvironment.name: GalileoPickAndPlaceEnvironment, GalileoG1LocomanipPickAndPlaceEnvironment.name: GalileoG1LocomanipPickAndPlaceEnvironment, PressButtonEnvironment.name: PressButtonEnvironment, + CubeGoalPoseEnvironment.name: CubeGoalPoseEnvironment, LiftObjectEnvironment.name: LiftObjectEnvironment, TableTopPlaceUprightEnvironment.name: TableTopPlaceUprightEnvironment, } diff --git a/isaaclab_arena_environments/cube_goal_pose_environment.py b/isaaclab_arena_environments/cube_goal_pose_environment.py new file mode 100644 index 00000000..092da729 --- /dev/null +++ b/isaaclab_arena_environments/cube_goal_pose_environment.py @@ -0,0 +1,85 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import argparse + +from isaaclab_arena_environments.example_environment_base import ExampleEnvironmentBase + +# NOTE(alexmillane, 2025.09.04): There is an issue with type annotation in this file. +# We cannot annotate types which require the simulation app to be started in order to +# import, because this file is used to retrieve CLI arguments, so it must be imported +# before the simulation app is started. +# TODO(alexmillane, 2025.09.04): Fix this. + + +class CubeGoalPoseEnvironment(ExampleEnvironmentBase): + """ + A environment for achieving the goal pose of a cube. + """ + + name = "cube_goal_pose" + + def get_env(self, args_cli: argparse.Namespace): + + from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment + from isaaclab_arena.scene.scene import Scene + from isaaclab_arena.tasks.goal_pose_task import GoalPoseTask + from isaaclab_arena.utils.pose import Pose + + # Add the asset registry from the arena migration package + background = self.asset_registry.get_asset_by_name(args_cli.background)() + light = self.asset_registry.get_asset_by_name("light")() + object = self.asset_registry.get_asset_by_name(args_cli.object)() + object.set_initial_pose( + Pose( + position_xyz=(0.1, 0.0, 0.2), + rotation_wxyz=(1.0, 0.0, 0.0, 0.0), + ) + ) + embodiment = self.asset_registry.get_asset_by_name(args_cli.embodiment)(enable_cameras=args_cli.enable_cameras) + embodiment.set_initial_pose( + Pose( + position_xyz=(-0.4, 0.0, 0.0), + rotation_wxyz=(1.0, 0.0, 0.0, 0.0), + ) + ) + # order: [panda_joint1, panda_joint2, panda_joint3, panda_joint4, panda_joint5, panda_joint6, panda_joint7, panda_finger_joint1, panda_finger_joint2] + embodiment.set_initial_joint_pose( + initial_joint_pose=[0.0444, -0.1894, -0.1107, -2.5148, 0.0044, 2.3775, 0.6952, 0.0400, 0.0400] + ) + + if args_cli.teleop_device is not None: + teleop_device = self.device_registry.get_device_by_name(args_cli.teleop_device)() + # increase sensitivity for teleop device + teleop_device.pos_sensitivity = 0.25 + teleop_device.rot_sensitivity = 0.5 + else: + teleop_device = None + + scene = Scene(assets=[background, light, object]) + + task = GoalPoseTask( + object, + target_z_range=(0.2, 1), + target_orientation_wxyz=(0.7071, 0.0, 0.0, 0.7071), # yaw 90 degrees + target_orientation_tolerance_rad=0.2, + ) + + isaaclab_arena_environment = IsaacLabArenaEnvironment( + name=self.name, + embodiment=embodiment, + scene=scene, + task=task, + teleop_device=teleop_device, + ) + return isaaclab_arena_environment + + @staticmethod + def add_cli_args(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--object", type=str, default="dex_cube") + parser.add_argument("--background", type=str, default="table") + parser.add_argument("--embodiment", type=str, default="franka") + parser.add_argument("--enable_cameras", type=bool, default=False) + parser.add_argument("--teleop_device", type=str, default=None)