diff --git a/examples/README.md b/examples/README.md
index 6e9a28fb1..56c88d7e8 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -9,3 +9,4 @@ Examples Overview:
* `lgsvl`: Examples for the LGSVL simulator
* `visualizer`: Examples for the built in Scenic visualizer
* `webots`: Examples for the Webots robotics simulator
+* `mujoco`: Examples for the MuJoCo physics simulator
\ No newline at end of file
diff --git a/examples/mujoco/README.md b/examples/mujoco/README.md
new file mode 100644
index 000000000..3a0b99951
--- /dev/null
+++ b/examples/mujoco/README.md
@@ -0,0 +1,7 @@
+# MuJoCo Examples
+
+To run the examples, type `mjpython -m` prior to the Scenic command, i.e.,
+
+```
+mjpython -m scenic -S farm-ng/example_simulation.scenic
+```
\ No newline at end of file
diff --git a/examples/mujoco/farm-ng/amiga.scenic b/examples/mujoco/farm-ng/amiga.scenic
new file mode 100644
index 000000000..c949e39ac
--- /dev/null
+++ b/examples/mujoco/farm-ng/amiga.scenic
@@ -0,0 +1,202 @@
+from scenic.simulators.mujoco.model import MujocoBody, DynamicMujocoBody
+
+class Amiga(DynamicMujocoBody):
+
+ # Scenic property - can be overridden in .scenic files.
+ semanticColor: [255, 0, 0] # Default green.
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.instance_id = None
+ # For torque control.
+ self.target_velocity = 0.0 # Target wheel velocity (rad/s).
+ self.kp = 50.0 # Proportional gain for velocity control.
+ self.kd = 5.0 # Derivative gain for damping.
+
+ def semanticColorForGeom(self, geom_name):
+ """
+ Return semantic color for a specific geom.
+
+ This allows custom coloring of different parts of the Amiga.
+ Override this method or use the default semanticColor for all geoms.
+
+ Args:
+ geom_name: Name of the geometry (e.g., "fl_wheel_amiga_0", "rail_left_amiga_0")
+
+ Returns:
+ [R, G, B] color list, or None to use default semanticColor
+ """
+ # Example: Color wheels differently from the frame.
+ if 'wheel' in geom_name.lower():
+ return [50, 50, 50] # Dark gray for wheels.
+
+ # # Example: Color rails and crossbars with frame color.
+ # if any(part in geom_name.lower() for part in ['rail', 'crossbar', 'control_head']):
+ # return [200, 200, 200] # Light gray for structure
+
+ # # Example: Color module sides.
+ # if 'module_side' in geom_name.lower():
+ # return [220, 220, 210] # Off-white for module sides
+
+ # Default: use the object's semanticColor.
+ return None
+
+ def get_mujoco_xml(self, obj_counter, position, quaternion):
+ u_id = f"amiga_{obj_counter}"
+
+ # Dedicated chassis body for the camera.
+ self.body_name = f"chassis_{u_id}"
+ self.instance_id = u_id
+
+ complete_xml = f'''
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ '''
+
+ return complete_xml
+
+ def control(self, model, data):
+ """Apply PD control to achieve target velocity with torque actuators."""
+ if not hasattr(self, 'target_velocities') or not self.target_velocities:
+ return
+
+ if not hasattr(self, 'instance_id') or not self.instance_id:
+ return
+
+ fl_target, rl_target, fr_target, rr_target = self.target_velocities
+
+ try:
+ # Get joint indices.
+ fl_joint_id = model.joint(f'fl_axle_{self.instance_id}').id
+ rl_joint_id = model.joint(f'rl_axle_{self.instance_id}').id
+ fr_joint_id = model.joint(f'fr_axle_{self.instance_id}').id
+ rr_joint_id = model.joint(f'rr_axle_{self.instance_id}').id
+
+ # Get actuator indices.
+ fl_idx = model.actuator(f'fl_motor_{self.instance_id}').id
+ rl_idx = model.actuator(f'rl_motor_{self.instance_id}').id
+ fr_idx = model.actuator(f'fr_motor_{self.instance_id}').id
+ rr_idx = model.actuator(f'rr_motor_{self.instance_id}').id
+
+ # Get current velocities.
+ fl_vel = data.qvel[fl_joint_id]
+ rl_vel = data.qvel[rl_joint_id]
+ fr_vel = data.qvel[fr_joint_id]
+ rr_vel = data.qvel[rr_joint_id]
+
+ # PD control: torque = kp * (target - current) - kd * current.
+ # Simple proportional control with velocity damping.
+ data.ctrl[fl_idx] = self.kp * (fl_target - fl_vel) - self.kd * fl_vel
+ data.ctrl[rl_idx] = self.kp * (rl_target - rl_vel) - self.kd * rl_vel
+ data.ctrl[fr_idx] = self.kp * (fr_target - fr_vel) - self.kd * fr_vel
+ data.ctrl[rr_idx] = self.kp * (rr_target - rr_vel) - self.kd * rr_vel
+
+ except Exception as e:
+ pass
+
+
+class Plant(MujocoBody):
+
+ # Scenic property - can be overridden in .scenic files.
+ semanticColor: [0, 255, 0] # Default green.
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.instance_id = None
+
+ def semanticColorForGeom(self, geom_name):
+ """Return semantic color for a specific geom of the plant.
+
+ Args:
+ geom_name: Name of the geometry (e.g., "stem_plant_0", "leaf_plant_0")
+
+ Returns:
+ [R, G, B] color list, or None to use default semanticColor
+ """
+ # Example: Color stem differently from leaves.
+ if 'stem' in geom_name.lower():
+ return [139, 69, 19] # Brown for stem.
+
+ if 'leaf' in geom_name.lower():
+ return [34, 139, 34] # Forest green for leaves.
+
+ # Default: use the object's semanticColor.
+ return None
+
+ def get_mujoco_xml(self, obj_counter, position, quaternion):
+
+ u_id = f"plant_{obj_counter}"
+ self.instance_id = u_id
+ self.body_name = f"frame_{u_id}"
+
+ complete_xml = f'''
+
+
+
+ '''
+
+ return complete_xml
\ No newline at end of file
diff --git a/examples/mujoco/farm-ng/amiga_base.xml b/examples/mujoco/farm-ng/amiga_base.xml
new file mode 100644
index 000000000..3beee17c1
--- /dev/null
+++ b/examples/mujoco/farm-ng/amiga_base.xml
@@ -0,0 +1,73 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/mujoco/farm-ng/example_simulation.scenic b/examples/mujoco/farm-ng/example_simulation.scenic
new file mode 100644
index 000000000..0348e7cd0
--- /dev/null
+++ b/examples/mujoco/farm-ng/example_simulation.scenic
@@ -0,0 +1,52 @@
+from amiga import Amiga, Plant
+from scenic.simulators.mujoco.simulator import MujocoSimulator
+from scenic.simulators.mujoco.model import Ground, Hill
+
+from scenic.core.regions import RectangularRegion
+from scenic.core.vectors import Vector
+
+simulator MujocoSimulator(base_xml_file='amiga_base.xml', use_viewer=True)
+
+# Creating region for spawning objects.
+field_region = RectangularRegion(Vector(-20, -10), 0, 40, 20)
+
+# Creating irregular terrain (two hills).
+hill1 = new Hill at (10, 10, 0), with height 2, with spread 0.3
+hill2 = new Hill at (-10, -10, 0), with height 1.5, with spread 0.4
+
+new Ground at (0, 0, 0),
+ with width 50,
+ with length 50,
+ with gridSize 50,
+ with terrain (hill1, hill2)
+
+# Defining regions for the crop rows to ensure that they're parallel.
+row_y_positions = [-6, -3, 0, 3, 6]
+
+# Spawning multiple Amiga objects at different positions.
+amiga1 = new Amiga at (8, 1, 0.5),
+ facing 0 deg
+
+amiga2 = new Amiga at (-10, 2, 0.5),
+ facing 0 deg
+
+# Creating plants along the crop rows.
+# Row 1 (y = -6)
+for i in range(-8, 6, 2):
+ new Plant at (i, -6, 0.1), with plant_type "corn"
+
+# Row 2 (y = -3)
+for j in range(-8, 6, 2):
+ new Plant at (j, -3, 0.1), with plant_type "wheat"
+
+# # Row 3 (y = 0)
+for k in range(-8, 6, 2):
+ new Plant at (k, 0, 0.1), with plant_type "soybean"
+
+# # Row 4 (y = 3)
+for l in range(-8, 6, 2):
+ new Plant at (l, 3, 0.1), with plant_type "lettuce"
+
+# Row 5 (y = 6)
+for m in range(-8, 6, 2):
+ new Plant at (m, 6, 0.1), with plant_type "tomato"
\ No newline at end of file
diff --git a/examples/mujoco/farm-ng/sensor_simulation.scenic b/examples/mujoco/farm-ng/sensor_simulation.scenic
new file mode 100644
index 000000000..4cc04529b
--- /dev/null
+++ b/examples/mujoco/farm-ng/sensor_simulation.scenic
@@ -0,0 +1,77 @@
+from amiga import Amiga, Plant
+from scenic.simulators.mujoco.model import Ground, Hill, Terrain
+from scenic.simulators.mujoco.simulator import MujocoSimulator
+from scenic.simulators.mujoco.sensors import MujocoRGBSensor, MujocoSSSensor
+
+param timestep = 0.01
+
+simulator MujocoSimulator(base_xml_file='amiga_base.xml', use_viewer=True)
+
+hill1 = new Hill at (3, 0, 0), with height 0.5, with spread 0.3, with width 4, with length 4
+
+new Ground at (0, 0, 0),
+ with width 20,
+ with length 20,
+ with gridSize 20,
+ with terrain (hill1,)
+
+ego = new Amiga at (0, 0, 0),
+ with sensors {
+ "rgb_camera": MujocoRGBSensor(
+ offset=(0.0, 0.0, 1.5), # On top of the robot.
+ width=640,
+ height=480,
+ attributes={
+ "fovy": 60,
+ "look_direction": (1, 0, 0) # Look forward (positive x in robot frame).
+ }
+ ),
+ "ss_camera": MujocoSSSensor(
+ offset=(0.0, 0.0, 1.5),
+ width=640,
+ height=480,
+ attributes={
+ "fovy": 60,
+ "look_direction": (1, 0, 0) # Look forward.
+ },
+ use_object_types=False
+ )
+ }
+
+param recordFolder = "out/{simulation}"
+record ego.observations["rgb_camera"] to "videos/rgb/test_video_{simulation}.mp4"
+record ego.observations["ss_camera"] every 1 seconds to "pictures/ss/ss{simulation}_{time}.jpg"
+record ego.observations["rgb_camera"] every 1 seconds to "pictures/rgb/ss{simulation}_{time}.jpg"
+
+# Row 1 (y = -6)
+for i in range(-8, 6, 2):
+ new Plant at (i, -6, 0.1), with plant_type "corn"
+
+# Row 2 (y = -3)
+for j in range(-8, 6, 2):
+ new Plant at (j, -3, 0.1), with plant_type "wheat"
+
+# Row 4 (y = 3)
+for l in range(-8, 6, 2):
+ new Plant at (l, 3, 0.1), with plant_type "lettuce"
+
+# Row 5 (y = 6)
+for m in range(-8, 6, 2):
+ new Plant at (m, 6, 0.1), with plant_type "tomato"
+
+behavior RealisticDriving():
+ """
+ Realistic driving behavior using torque control.
+ Target velocities are in rad/s for the wheels.
+
+ With torque control, the robot will:
+ - Slow down going uphill (gravity resists)
+ - Speed up going downhill (gravity assists)
+ - React realistically to terrain changes
+ """
+ while True:
+ self.target_velocities = [4.0, 4.0, 4.0, 4.0]
+ wait
+
+ego.behavior = RealisticDriving()
+terminate after 25 seconds
\ No newline at end of file
diff --git a/examples/mujoco/pusher/PPO_pusher_improved.zip b/examples/mujoco/pusher/PPO_pusher_improved.zip
new file mode 100644
index 000000000..aa23d8f71
Binary files /dev/null and b/examples/mujoco/pusher/PPO_pusher_improved.zip differ
diff --git a/examples/mujoco/pusher/example_pusher.scenic b/examples/mujoco/pusher/example_pusher.scenic
new file mode 100644
index 000000000..8d193d979
--- /dev/null
+++ b/examples/mujoco/pusher/example_pusher.scenic
@@ -0,0 +1,15 @@
+from pusher import Pusher
+from scenic.simulators.mujoco.simulator import MujocoSimulator
+
+simulator MujocoSimulator(base_xml_file="pusher_base.xml", use_viewer=True)
+
+# Load pusher XML content
+# pusher_xml = open("pusher.xml").read()
+
+# Create pusher using the XML's default positioning
+# The pusher.xml already has pos="0 -0.6 0" for the shoulder
+# ego = new Pusher at (0, 0, 0), # Let XML handle positioning
+# with xml_content pusher_xml,
+# with sb3_model "PPO_pusher.zip"
+
+ego = new Pusher at (0, 0, 0)
\ No newline at end of file
diff --git a/examples/mujoco/pusher/pusher.scenic b/examples/mujoco/pusher/pusher.scenic
new file mode 100644
index 000000000..30f05ef8b
--- /dev/null
+++ b/examples/mujoco/pusher/pusher.scenic
@@ -0,0 +1,234 @@
+import os
+import numpy as np
+from stable_baselines3 import PPO
+from scenic.simulators.mujoco.model import DynamicMujocoBody
+
+
+class Pusher(DynamicMujocoBody):
+ """Mujoco Body for Pusher v4 using pretrained Stable Baselines 3 model"""
+
+ def __init__(self, *args, **kwargs):
+ # Extract sb3_model from kwargs if present - can be full path or just filename
+ self.sb3_model_path = kwargs.pop('sb3_model', "./PPO_pusher_improved.zip")
+
+ super().__init__(*args, **kwargs)
+
+ self.controller = None # Will be loaded lazily
+ self.instance_id = None # Will be set by simulator
+
+ def get_mujoco_xml(self, obj_counter, position, quaternion):
+ """Generate MuJoCo XML for this Pusher instance - single XML block approach."""
+
+ u_id = f"pusher_{obj_counter}"
+ self.instance_id = u_id
+ self.body_name = f"r_shoulder_pan_link_{u_id}" # Main arm body instead of frame
+
+ # Complete XML in one natural block - matches original pusher.xml structure
+ complete_xml = f'''
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ '''
+
+ return complete_xml
+
+ def control(self, model, data):
+ """Control using Stable Baselines 3 model"""
+ if not self.controller:
+ try:
+ print(f"Loading SB3 model from: {self.sb3_model_path}")
+ self.controller = PPO.load(self.sb3_model_path, device="cpu")
+ print("SB3 model loaded successfully!")
+ except FileNotFoundError:
+ print(f"ERROR: SB3 model not found at: {self.sb3_model_path}")
+ print("Please check the path and ensure the model file exists.")
+ return
+ except Exception as e:
+ print(f"ERROR: Failed to load SB3 model: {e}")
+ print(f"Model path: {self.sb3_model_path}")
+ return
+
+ if not self.instance_id:
+ return # Not ready yet
+
+ try:
+ # Create proper observation matching Pusher-v5 format (23 dimensions)
+ observation = self._get_observation(model, data)
+
+ # Get control commands from SB3 model
+ action, _ = self.controller.predict(observation, deterministic=True)
+
+ # Motor names with instance_id
+ motor_names = [
+ f"motor_shoulder_pan_{self.instance_id}",
+ f"motor_shoulder_lift_{self.instance_id}",
+ f"motor_upper_arm_roll_{self.instance_id}",
+ f"motor_elbow_flex_{self.instance_id}",
+ f"motor_forearm_roll_{self.instance_id}",
+ f"motor_wrist_flex_{self.instance_id}",
+ f"motor_wrist_roll_{self.instance_id}"
+ ]
+
+ # Apply control commands to actuators with proper scaling
+ for i, motor_name in enumerate(motor_names):
+ if i < len(action):
+ try:
+ motor_idx = model.actuator(motor_name).id
+ # Scale action like in the working bridge version
+ scaled_action = np.clip(action[i] * 0.08, -0.4, 0.4)
+ data.ctrl[motor_idx] = scaled_action
+ except:
+ # Silently fail during early simulation steps
+ pass
+
+ except Exception as e:
+ # Silently handle errors during early simulation steps
+ pass
+
+ def _get_observation(self, model, data):
+ """Create observation vector matching Gym Pusher-v5 environment (23 dimensions)"""
+
+ # Joint names
+ joint_names = [
+ f"r_shoulder_pan_joint_{self.instance_id}",
+ f"r_shoulder_lift_joint_{self.instance_id}",
+ f"r_upper_arm_roll_joint_{self.instance_id}",
+ f"r_elbow_flex_joint_{self.instance_id}",
+ f"r_forearm_roll_joint_{self.instance_id}",
+ f"r_wrist_flex_joint_{self.instance_id}",
+ f"r_wrist_roll_joint_{self.instance_id}"
+ ]
+
+ joint_positions = []
+ joint_velocities = []
+
+ # Get joint states
+ for joint_name in joint_names:
+ try:
+ joint_data = data.joint(joint_name)
+ joint_positions.append(joint_data.qpos[0] if len(joint_data.qpos) > 0 else 0.0)
+ joint_velocities.append(joint_data.qvel[0] if len(joint_data.qvel) > 0 else 0.0)
+ except:
+ joint_positions.append(0.0)
+ joint_velocities.append(0.0)
+
+ # Ensure exactly 7 joint values
+ joint_positions = np.array(joint_positions[:7] + [0.0] * (7 - len(joint_positions)))
+ joint_velocities = np.array(joint_velocities[:7] + [0.0] * (7 - len(joint_velocities)))
+
+ # Get fingertip position (3D: x, y, z)
+ try:
+ fingertip_id = model.body(f"tips_arm_{self.instance_id}").id
+ fingertip_pos = data.xpos[fingertip_id][:3]
+ except:
+ try:
+ fingertip_id = model.body(f"r_wrist_roll_link_{self.instance_id}").id
+ fingertip_pos = data.xpos[fingertip_id][:3]
+ except:
+ fingertip_pos = np.array([0.0, 0.0, 0.0])
+
+ # Get object position (3D: x, y, z)
+ try:
+ obj_id = model.body(f"object_{self.instance_id}").id
+ obj_pos = data.xpos[obj_id][:3]
+ except:
+ obj_pos = np.array([0.0, 0.0, 0.0])
+
+ # Get goal position (3D: x, y, z)
+ try:
+ goal_id = model.body(f"goal_{self.instance_id}").id
+ goal_pos = data.xpos[goal_id][:3]
+ except:
+ goal_pos = np.array([0.0, 0.0, 0.0])
+
+ # Construct observation vector: [joint_pos(7), joint_vel(7), fingertip_pos(3), obj_pos(3), goal_pos(3)]
+ observation = np.concatenate([
+ joint_positions.astype(np.float32),
+ joint_velocities.astype(np.float32),
+ fingertip_pos.astype(np.float32),
+ obj_pos.astype(np.float32),
+ goal_pos.astype(np.float32)
+ ])
+
+ return observation
\ No newline at end of file
diff --git a/examples/mujoco/pusher/pusher.xml b/examples/mujoco/pusher/pusher.xml
new file mode 100644
index 000000000..100c35e5f
--- /dev/null
+++ b/examples/mujoco/pusher/pusher.xml
@@ -0,0 +1,112 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/mujoco/pusher/pusher_base.xml b/examples/mujoco/pusher/pusher_base.xml
new file mode 100644
index 000000000..bc2d291eb
--- /dev/null
+++ b/examples/mujoco/pusher/pusher_base.xml
@@ -0,0 +1,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/mujoco/pusher/pusher_old.xml b/examples/mujoco/pusher/pusher_old.xml
new file mode 100644
index 000000000..fff8578c3
--- /dev/null
+++ b/examples/mujoco/pusher/pusher_old.xml
@@ -0,0 +1,91 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/mujoco/pusher/pusher_scenic.py b/examples/mujoco/pusher/pusher_scenic.py
new file mode 100644
index 000000000..3d36d4c7b
--- /dev/null
+++ b/examples/mujoco/pusher/pusher_scenic.py
@@ -0,0 +1,27 @@
+import scenic
+from scenic.core.scenarios import Scene
+from scenic.simulators.mujoco.__archive__.simulator3 import MujocoSimulator
+
+if __name__ == "__main__":
+ SAMPLES = 100
+
+ right_falls = 0
+ left_falls = 0
+
+ for sample_index in range(SAMPLES):
+ simulator = MujocoSimulator(xml="", actual=False, use_default_arena=False)
+ scenario = scenic.scenarioFromFile("./src/pusher-scenic/pusher.scenic")
+
+ scene, _ = scenario.generate()
+ simulation = simulator.simulate(scene, maxSteps=500000)
+
+ result = simulation.result
+
+ # final_state = result.finalState
+ # if final_state[0].x < 0.0:
+ # left_falls += 1
+ # elif final_state[0].x > 0.0:
+ # right_falls += 1
+
+ print("Left falls: ", left_falls)
+ print("Right falls: ", right_falls)
diff --git a/examples/mujoco/pusher/pusher_scenic_body.scenic b/examples/mujoco/pusher/pusher_scenic_body.scenic
new file mode 100644
index 000000000..52b89a580
--- /dev/null
+++ b/examples/mujoco/pusher/pusher_scenic_body.scenic
@@ -0,0 +1,49 @@
+import os
+
+from stable_baselines3 import PPO
+from scenic.simulators.mujoco.model import *
+
+
+class Pusher(DynamicMujocoBody):
+ """Mujoco Body for Pusher v4 using pretrained Stable Baselines 3 model"""
+ def __init__(self, xml: str="", sb3_model: str=None, *args, **kwargs):
+ super().__init__(xml, *args, **kwargs)
+ root_path = os.getcwd()
+
+
+ if not sb3_model:
+ sb3_model = "PPO_pusher.zip"
+
+ self.controller = PPO.load(os.path.join(root_path, sb3_model), device="cpu")
+
+ def control(self, model, data):
+ joints = [
+ "r_shoulder_pan_joint",
+ "r_shoulder_lift_joint",
+ "r_upper_arm_roll_joint",
+ "r_elbow_flex_joint",
+ "r_forearm_roll_joint",
+ "r_wrist_flex_joint",
+ "r_wrist_roll_joint"
+ ]
+
+ joint_positions = []
+ joint_velocities = []
+
+
+
+ for joint in joints:
+ full_joint_name = self.body_name + joint
+ joint_positions.append(data.joint(full_joint_name).qpos[0])
+ joint_velocities.append(data.joint(full_joint_name).qvel[0])
+
+ observation = joint_positions + joint_velocities + [0 for i in range(9)]
+ ctrl = self.controller.predict(observation)
+
+ for i, joint in enumerate(joints):
+ full_joint_name = self.body_name + "/" + f"unnamed_actuator_{i}"
+
+ actuator = data.actuator(full_joint_name)
+
+ actuator.ctrl = ctrl[0][i]
+
diff --git a/examples/mujoco/pusher/train-pusher.py b/examples/mujoco/pusher/train-pusher.py
new file mode 100644
index 000000000..67592321d
--- /dev/null
+++ b/examples/mujoco/pusher/train-pusher.py
@@ -0,0 +1,158 @@
+import os
+
+import gymnasium as gym
+import numpy as np
+from stable_baselines3 import PPO, SAC
+from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
+from stable_baselines3.common.env_util import make_vec_env
+from stable_baselines3.common.vec_env import VecNormalize
+import torch
+
+# Create directories
+os.makedirs("pusher_models", exist_ok=True)
+os.makedirs("pusher_logs", exist_ok=True)
+os.makedirs("pusher_tensorboard", exist_ok=True)
+
+
+def make_env():
+ return gym.make("Pusher-v5", render_mode="rgb_array")
+
+
+# Create vectorized environment with normalization for better training
+vec_env = make_vec_env(make_env, n_envs=8) # More parallel environments
+vec_env = VecNormalize(vec_env, norm_obs=True, norm_reward=True, clip_obs=10.0)
+
+# Create evaluation environment
+eval_env = make_vec_env(make_env, n_envs=1)
+eval_env = VecNormalize(
+ eval_env, norm_obs=True, norm_reward=True, clip_obs=10.0, training=False
+)
+
+# Callbacks
+checkpoint_callback = CheckpointCallback(
+ save_freq=50000,
+ save_path="./pusher_models/",
+ name_prefix="pusher_improved_checkpoint",
+)
+
+eval_callback = EvalCallback(
+ eval_env,
+ best_model_save_path="./pusher_models/",
+ log_path="./pusher_logs/",
+ eval_freq=25000,
+ deterministic=True,
+ render=False,
+ n_eval_episodes=10,
+)
+
+# Option 1: Improved PPO with better hyperparameters
+print("Training with improved PPO...")
+model = PPO(
+ "MlpPolicy",
+ vec_env,
+ learning_rate=1e-4, # Lower learning rate for stability
+ n_steps=4096, # More steps per update
+ batch_size=128, # Smaller batch size
+ n_epochs=20, # More epochs per update
+ gamma=0.995, # Higher discount factor
+ gae_lambda=0.98, # Higher GAE lambda
+ clip_range=0.2, # Standard clip range
+ normalize_advantage=True,
+ ent_coef=0.05, # Higher entropy for more exploration
+ vf_coef=0.5,
+ max_grad_norm=0.5,
+ verbose=1,
+ tensorboard_log="./pusher_tensorboard/",
+ policy_kwargs=dict(
+ net_arch=dict(pi=[512, 512, 256], vf=[512, 512, 256]), # Deeper networks
+ activation_fn=torch.nn.ReLU,
+ ),
+)
+
+# Train for much longer
+total_timesteps = 5_000_000 # 5 million timesteps
+
+print(f"Starting training for {total_timesteps:,} timesteps...")
+try:
+ model.learn(
+ total_timesteps=total_timesteps,
+ callback=[checkpoint_callback, eval_callback],
+ progress_bar=True,
+ )
+ print("Training completed!")
+except KeyboardInterrupt:
+ print("Training interrupted by user")
+
+# Save the model and normalization statistics
+model.save("pusher/PPO_pusher_improved_v2.zip")
+vec_env.save("pusher/vec_normalize.pkl")
+
+# Test the model
+print("\nTesting the improved model...")
+test_env = gym.make("Pusher-v5", render_mode="rgb_array")
+
+episode_rewards = []
+success_count = 0
+num_test_episodes = 10
+
+for episode in range(num_test_episodes):
+ obs, info = test_env.reset()
+ episode_reward = 0
+
+ # Track object and goal positions
+ initial_obj_pos = None
+ final_obj_pos = None
+ goal_pos = None
+
+ for step in range(500): # Shorter episodes for testing
+ # Normalize observation the same way as during training
+ if hasattr(vec_env, "normalize_obs"):
+ obs = vec_env.normalize_obs(obs)
+
+ action, _states = model.predict(obs, deterministic=True)
+ obs, reward, terminated, truncated, info = test_env.step(action)
+ episode_reward += reward
+
+ # Extract positions for success measurement (approximate indices)
+ if step == 0 and len(obs) >= 23:
+ initial_obj_pos = obs[17:19] # Object x,y position
+ goal_pos = obs[20:22] # Goal x,y position
+
+ if step == 499 or terminated or truncated:
+ if len(obs) >= 23:
+ final_obj_pos = obs[17:19]
+ break
+
+ # Check if successful (object within 0.1 units of goal)
+ if initial_obj_pos is not None and final_obj_pos is not None and goal_pos is not None:
+ initial_distance = np.linalg.norm(initial_obj_pos - goal_pos)
+ final_distance = np.linalg.norm(final_obj_pos - goal_pos)
+
+ if final_distance < 0.1: # Success threshold
+ success_count += 1
+ success_marker = "✓"
+ else:
+ success_marker = "✗"
+
+ print(
+ f"Episode {episode+1}: Reward={episode_reward:.3f}, "
+ f"Initial dist={initial_distance:.3f}, Final dist={final_distance:.3f} {success_marker}"
+ )
+ else:
+ print(f"Episode {episode+1}: Reward={episode_reward:.3f}")
+
+ episode_rewards.append(episode_reward)
+
+print(f"\n=== Results ===")
+print(
+ f"Success rate: {success_count}/{num_test_episodes} ({100*success_count/num_test_episodes:.1f}%)"
+)
+print(f"Average reward: {np.mean(episode_rewards):.3f} ± {np.std(episode_rewards):.3f}")
+print(f"Best episode: {np.max(episode_rewards):.3f}")
+
+test_env.close()
+vec_env.close()
+eval_env.close()
+
+print("\nModel saved as pusher/PPO_pusher_improved_v2.zip")
+print("Don't forget to update your pusher_bridge.py to use the new model!")
diff --git a/examples/mujoco/pusher/train_pusher_old.py b/examples/mujoco/pusher/train_pusher_old.py
new file mode 100644
index 000000000..b58baaef2
--- /dev/null
+++ b/examples/mujoco/pusher/train_pusher_old.py
@@ -0,0 +1,18 @@
+import gymnasium as gym
+from stable_baselines3 import PPO
+
+# Use Pusher-v5 which is compatible with MuJoCo 3.x
+env = gym.make("Pusher-v5", render_mode="rgb_array")
+
+model = PPO("MlpPolicy", env, verbose=1)
+model.learn(total_timesteps=1_000_000)
+
+# Test the trained model
+vec_env = model.get_env()
+obs = vec_env.reset()
+for i in range(1000):
+ action, _state = model.predict(obs, deterministic=True)
+ obs, reward, done, info = vec_env.step(action)
+
+model.save("PPO_pusher.zip")
+print("Model saved as PPO_pusher.zip")
diff --git a/examples/mujoco/simple/run.py b/examples/mujoco/simple/run.py
new file mode 100644
index 000000000..f966f8cb5
--- /dev/null
+++ b/examples/mujoco/simple/run.py
@@ -0,0 +1,15 @@
+import scenic
+from scenic.core.scenarios import Scene
+from scenic.simulators.mujoco.__archive__.simulator3 import MujocoSimulator
+
+if __name__ == "__main__":
+ SAMPLES = 1
+
+ for sample_index in range(SAMPLES):
+ simulator = MujocoSimulator(xml="")
+ scenario = scenic.scenarioFromFile("./examples/mujoco/simple.scenic")
+
+ scene, _ = scenario.generate()
+ simulation = simulator.simulate(scene, maxSteps=100000)
+
+ result = simulation.result
diff --git a/examples/mujoco/simple/simple.scenic b/examples/mujoco/simple/simple.scenic
new file mode 100644
index 000000000..665f902d7
--- /dev/null
+++ b/examples/mujoco/simple/simple.scenic
@@ -0,0 +1,16 @@
+ego = new Object at (0, 0, 5),
+ with width 0.5,
+ with length 0.5,
+ with height 0.5,
+ with color (0.5, 0.5, 0.5, 1),
+ with shape BoxShape(),
+ facing (-90 deg, 45 deg, 0)
+
+
+cone = new Object at (2, 0, 20),
+ with color (0.75, 0.5, 0.5, 1),
+ with width 1,
+ with length 1,
+ with height 1,
+ with shape Uniform(SpheroidShape(), BoxShape(), CylinderShape())
+
diff --git a/src/scenic/simulators/mujoco/model.scenic b/src/scenic/simulators/mujoco/model.scenic
new file mode 100644
index 000000000..c143e4f49
--- /dev/null
+++ b/src/scenic/simulators/mujoco/model.scenic
@@ -0,0 +1,224 @@
+import math
+from collections.abc import Callable
+from typing import List
+
+from scenic.core.object_types import Object
+
+from scenic.core.distributions import distributionFunction
+
+import dm_control
+from dm_control import mjcf
+import numpy as np
+
+import mujoco
+
+class MujocoBody(Object):
+ semanticColor: None
+
+ """Abstract class for Mujoco objects."""
+ def __init__(self, properties, *args, **kwargs):
+ super().__init__(properties, *args, **kwargs)
+ self.body_name = None
+ # self.semanticColor = None
+
+ def semanticColorFromGeom(self, geom_name):
+ """Override to provide per-geom colors for segmentation."""
+ return None
+
+ def model(self):
+ return None
+
+class DynamicMujocoBody(MujocoBody):
+ """Dynamic Mujoco Body"""
+ def __init__(self, properties, *args, **kwargs):
+ super().__init__(properties, *args, **kwargs)
+
+ def control(self, model, data):
+ """Apply control inputs (forces/torques) to the simulation."""
+ raise NotImplementedError("Error: control not implemented for object")
+
+class Terrain(MujocoBody):
+ """
+ Abstract class for objects added together to make a Ground.
+
+ This is not rendered as a separate MuJoCo body since it doesn't actually
+ correspond to a MuJoCo body. Only the overall Ground has a body with hfield geometry.
+ """
+
+ allowCollisions: True
+ render: False # Terrain objects are not rendered separately.
+
+ def heightAt(self, pt):
+ """Get height at a given point."""
+ offset = pt - self.position
+ return self.heightAtOffset(offset)
+
+ def heightAtOffset(self, offset):
+ """Get height at offset from terrain center. Must be implemented by subclasses."""
+ raise NotImplementedError('should be implemented by subclasses')
+
+ def get_mujoco_xml(self, obj_counter, position, quaternion):
+ """Terrain objects don't generate XML - they only contribute to Ground."""
+ return {
+ 'body': '',
+ 'assets': '',
+ 'actuators': '',
+ 'sensors': ''
+ }
+
+class Hill(Terrain):
+ """
+ Terrain shaped like a Gaussian.
+
+ Attributes:
+ height (float): height of the hill (default 1).
+ spread (float): standard deviation as a fraction of the hill's size
+ (default 0.25).
+ """
+
+ height: 1
+ spread: 0.25
+
+ def heightAtOffset(self, offset):
+ """Calculate Gaussian height at offset from hill center."""
+ dx, dy, _ = offset
+ if not (-self.hw < dx < self.hw and -self.hl < dy < self.hl):
+ return 0
+ sx, sy = dx / (self.width * self.spread), dy / (self.length * self.spread)
+ nh = math.exp(-((sx * sx) + (sy * sy)) * 0.5)
+ return self.height * nh
+
+
+class Ground(MujocoBody):
+ """
+ Ground surface with irregular terrain using MuJoCo heightfield.
+
+ Implemented using MuJoCo's hfield asset type.
+
+ Attributes:
+ allowCollisions (bool): default value True (overriding default from Object).
+ width (float): width of the ground in meters (default 10).
+ length (float): length of the ground in meters (default 10).
+ gridSize (int): resolution of the heightfield grid (default 20).
+ gridSizeX (int): X resolution (defaults to gridSize).
+ gridSizeY (int): Y resolution (defaults to gridSize).
+ baseThickness (float): thickness of the base below lowest terrain point (default 0.1).
+ terrain (tuple): tuple of Terrain objects to combine (default empty).
+ heights (tuple): computed height values for the grid.
+ """
+
+ allowCollisions: True
+
+ width: 10
+ length: 10
+
+ gridSize: 20
+ gridSizeX: self.gridSize
+ gridSizeY: self.gridSize
+ baseThickness: 0.1
+ terrain: ()
+ heights: Ground.heightsFromTerrain(self.terrain, self.gridSizeX, self.gridSizeY,
+ self.width, self.length)
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.instance_id = None
+ self.semanticColor = [128, 128, 128] # Gray for ground.
+
+ @staticmethod
+ @distributionFunction
+ def heightsFromTerrain(terrain, gridSizeX, gridSizeY, width, length):
+ """Generate height data from terrain objects."""
+ for elem in terrain:
+ if not isinstance(elem, Terrain):
+ raise RuntimeError(f'Ground terrain element {elem} is not a Terrain')
+
+ heights = []
+ if gridSizeX < 2 or gridSizeY < 2:
+ raise RuntimeError(f'invalid grid size {gridSizeX} x {gridSizeY} for Ground')
+
+ dx, dy = width / (gridSizeX - 1), length / (gridSizeY - 1)
+
+ # Sample grid points from -length/2 to +length/2 and -width/2 to +width/2.
+ y = -length / 2
+ for i in range(gridSizeY):
+ row = []
+ x = -width / 2
+ for j in range(gridSizeX):
+ # Create a Scenic Vector for the grid point.
+ from scenic.core.vectors import Vector
+ pt = Vector(x, y, 0)
+
+ # Sum heights from all terrain elements at this point.
+ height = sum(elem.heightAt(pt) for elem in terrain)
+ row.append(height)
+ x += dx
+ heights.append(tuple(row))
+ y += dy
+ return tuple(heights)
+
+ def get_mujoco_xml(self, obj_counter, position, quaternion):
+ """Generate MuJoCo XML for heightfield ground."""
+ u_id = f"ground_{obj_counter}"
+ self.instance_id = u_id
+ self.body_name = f"frame_{u_id}"
+
+ # Convert heights to flat array for MuJoCo.
+ # MuJoCo expects heights in row-major order.
+ heights_flat = []
+ for row in self.heights:
+ heights_flat.extend(row)
+
+ # Convert to numpy array for statistics.
+ heights_array = np.array(heights_flat)
+ min_height = float(np.min(heights_array))
+ max_height = float(np.max(heights_array))
+ height_range = max_height - min_height
+
+ # If terrain is flat, use a small range to avoid issues.
+ if height_range < 0.001:
+ height_range = 0.001
+
+ # Normalize heights to [0, 1] range for MuJoCo.
+ normalized_heights = (heights_array - min_height) / height_range
+
+ # Create height data string for XML.
+ height_data_str = ' '.join(f'{h:.6f}' for h in normalized_heights)
+
+ # MuJoCo hfield size: [size_x, size_y, elevation_z, base_z]
+ # size_x, size_y: half-sizes in x and y
+ # elevation_z: maximum elevation (scaled by height data)
+ # base_z: depth of the base below the lowest point
+ size_x = self.width / 2
+ size_y = self.length / 2
+ elevation_z = height_range
+ base_z = self.baseThickness
+
+ # Return as dictionary with separate sections.
+ asset_xml = f''' '''
+
+ body_xml = f'''
+
+ '''
+
+ return {
+ 'body': body_xml,
+ 'assets': asset_xml,
+ 'actuators': '',
+ 'sensors': ''
+ }
+
+ def startDynamicSimulation(self):
+ """Called when dynamic simulation starts."""
+ super().startDynamicSimulation()
+ # Terrain geometry is set via XML, no runtime updates needed.
+ pass
\ No newline at end of file
diff --git a/src/scenic/simulators/mujoco/sensors.py b/src/scenic/simulators/mujoco/sensors.py
new file mode 100644
index 000000000..78660cb8a
--- /dev/null
+++ b/src/scenic/simulators/mujoco/sensors.py
@@ -0,0 +1,489 @@
+import random
+import traceback
+
+import mujoco
+import numpy as np
+
+from scenic.core.sensors import RGBSensor, SSSensor
+
+
+class MujocoRGBSensor(RGBSensor):
+ """A simulated RGB camera for MuJoCo that supports efficient data sharing."""
+
+ def __init__(
+ self,
+ offset=(0, 0, 0),
+ rotation=(0, 0, 0),
+ width=None,
+ height=None,
+ attributes=None,
+ ):
+ if width is None or height is None:
+ raise ValueError("Width and height are required for sensors.")
+
+ self.offset = offset
+ self.rotation = rotation
+ self.width = width
+ self.height = height
+ self.attributes = attributes or {}
+
+ # These properties are set in setup().
+ self.camera_name = None
+ self.model = None
+ self.data = None
+ self.renderer = None
+
+ def getObservation(self):
+ """Capture RGB image from the camera."""
+ if self.camera_name is None or self.model is None or self.data is None:
+ raise RuntimeError("MuJoCo sensor not properly initialized by simulator.")
+
+ if self.renderer is None or not self.renderer.is_initialized():
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ current_time = self.data.time
+
+ # Check if we can reuse shared RGB data from this timestep.
+ if (
+ hasattr(self.renderer, "_shared_rgb_time")
+ and hasattr(self.renderer, "_shared_rgb_data")
+ and self.renderer._shared_rgb_time == current_time
+ and self.renderer._shared_rgb_camera == self.camera_name
+ ):
+ return self.renderer._shared_rgb_data.copy()
+
+ # Otherwise render and cache for sharing.
+ try:
+ pixels = self.renderer.render(data=self.data, camera_name=self.camera_name)
+
+ if pixels is not None and pixels.size > 0:
+ # Cache for sharing with SS sensors.
+ self.renderer._shared_rgb_data = pixels
+ self.renderer._shared_rgb_time = current_time
+ self.renderer._shared_rgb_camera = self.camera_name
+ return pixels.copy()
+ else:
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ except Exception as e:
+ print(f"RGB sensor rendering failed: {e}")
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+
+class MujocoSSSensor(SSSensor):
+ """
+ Native MuJoCo semantic segmentation sensor using built-in segmentation rendering.
+ - Parses geom names (e.g., 'wheel_car_0') to extract semantic classes ('car').
+ - Colors objects based on their 'semanticColor' attribute in Scenic.
+ - Falls back to a generated color palette for unassigned classes.
+ """
+
+ def __init__(
+ self,
+ offset=None,
+ rotation=(0, 0, 0),
+ width=None,
+ height=None,
+ attributes=None,
+ use_object_types=False,
+ color_map=None,
+ color_by_instance=True,
+ ):
+ if width is None or height is None:
+ raise ValueError("width and height are required for sensors")
+
+ self.offset = offset if offset else (0, 0, 0)
+ self.rotation = rotation
+ self.width = width
+ self.height = height
+ self.attributes = attributes or {}
+ self.use_object_types = use_object_types
+ self.color_by_instance = (
+ color_by_instance # Color whole Scenic objects vs individual geoms.
+ )
+
+ # Color mapping: dict of keyword -> [R, G, B]
+ # If None, uses default palette-based coloring.
+ self.color_map = color_map
+
+ # Set by simulator during setup.
+ self.camera_name = None
+ self.model = None
+ self.data = None
+ self.renderer = None
+ self._last_render_time = -1
+ self._class_color_map = {} # Maps class names to colors.
+
+ # Default color palette for unmapped objects.
+ self._color_palette = [
+ [255, 0, 0], # Red
+ [0, 255, 0], # Green
+ [0, 0, 255], # Blue
+ [255, 255, 0], # Yellow
+ [255, 0, 255], # Magenta
+ [0, 255, 255], # Cyan
+ [255, 128, 0], # Orange
+ [128, 0, 255], # Purple
+ [255, 128, 128], # Light Red
+ [128, 255, 128], # Light Green
+ [128, 128, 255], # Light Blue
+ [192, 192, 192], # Silver
+ ]
+
+ def getObservation(self):
+ """Generate semantic segmentation using MuJoCo's native segmentation rendering."""
+ if self.camera_name is None or self.model is None or self.data is None:
+ raise RuntimeError("MuJoCo sensor not properly initialized by simulator")
+
+ if self.renderer is None or not self.renderer.is_initialized():
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ current_time = self.data.time
+
+ # Simple caching to avoid reprocessing.
+ if current_time == self._last_render_time and hasattr(
+ self, "_cached_segmentation"
+ ):
+ return self._cached_segmentation.copy()
+
+ try:
+ # Create a dedicated segmentation renderer if it doesn't exist.
+ if not hasattr(self.renderer, "_seg_renderer"):
+ self.renderer._seg_renderer = mujoco.Renderer(
+ self.model, height=self.height, width=self.width
+ )
+ self.renderer._seg_renderer.enable_segmentation_rendering()
+
+ # Use dedicated segmentation renderer.
+ camera_id = -1
+ if self.camera_name:
+ camera_id = mujoco.mj_name2id(
+ self.model, mujoco.mjtObj.mjOBJ_CAMERA, self.camera_name
+ )
+ if camera_id < 0:
+ print(f"WARNING: Camera '{self.camera_name}' not found")
+
+ if camera_id >= 0:
+ self.renderer._seg_renderer.update_scene(self.data, camera=camera_id)
+ else:
+ self.renderer._seg_renderer.update_scene(self.data)
+
+ # Render segmentation.
+ seg_data = self.renderer._seg_renderer.render()
+
+ if seg_data is None or seg_data.size == 0:
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ # Convert to colored segmentation.
+ segmentation = self._segmentation_to_colors(seg_data)
+
+ # Delete seg_data immediately after use to free memory.
+ del seg_data
+
+ # Cache the result.
+ self._cached_segmentation = segmentation.copy()
+ self._last_render_time = current_time
+
+ return segmentation
+
+ except Exception as e:
+ print(f"Native segmentation sensor failed: {e}")
+ traceback.print_exc()
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ def _segmentation_to_colors(self, seg_data):
+ """Convert MuJoCo segmentation data to colored visualization."""
+ if self.use_object_types:
+ ids = seg_data[:, :, 1] # Use object types.
+ else:
+ ids = seg_data[:, :, 0] # Use geom IDs.
+
+ # Handle background.
+ ids = ids.astype(np.int32)
+ ids[ids == -1] = 0 # Background.
+
+ # Create colored output.
+ colored = np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ # Assign colors based on object categories.
+ unique_ids = np.unique(ids)
+ for obj_id in unique_ids:
+ if obj_id == 0: # Background.
+ color = [64, 64, 64] # Dark gray for background.
+ else:
+ color = self._get_semantic_color(obj_id)
+
+ mask = ids == obj_id
+ colored[mask] = color
+
+ return colored
+
+ def _get_semantic_color(self, geom_id):
+ """
+ Get semantic color for a geom.
+
+ Priority:
+ 1. Check if object has semanticColorForGeom method and use it.
+ 2. Otherwise, use object's semanticColor property.
+ 3. Fallback to default coloring (color_map or palette).
+ """
+ if not self.model:
+ return [128, 128, 128] # Gray fallback.
+
+ try:
+ # Get geom name.
+ if 0 <= geom_id < self.model.ngeom:
+ geom_name = mujoco.mj_id2name(
+ self.model, mujoco.mjtObj.mjOBJ_GEOM, geom_id
+ )
+
+ if geom_name:
+ name_lower = geom_name.lower()
+
+ # Special case: ground/floor always gets gray.
+ if any(word in name_lower for word in ["ground", "floor", "plane"]):
+ return [128, 128, 128]
+
+ # Try to find the Scenic object for this geom.
+ scenic_obj = None
+ if hasattr(self, "geom_to_object") and self.geom_to_object:
+ scenic_obj = self.geom_to_object.get(geom_id)
+
+ if scenic_obj:
+ # Priority 1: Check if object has semanticColorForGeom method.
+ if hasattr(scenic_obj, "semanticColorForGeom") and callable(
+ scenic_obj.semanticColorForGeom
+ ):
+ try:
+ geom_color = scenic_obj.semanticColorForGeom(geom_name)
+ if geom_color is not None:
+ if (
+ isinstance(geom_color, (list, tuple))
+ and len(geom_color) == 3
+ ):
+ return list(geom_color)
+ except Exception as e:
+ print(
+ f"Warning: semanticColorForGeom failed for {geom_name}: {e}"
+ )
+
+ # Priority 2: Use object's semanticColor property.
+ if (
+ hasattr(scenic_obj, "semanticColor")
+ and scenic_obj.semanticColor is not None
+ ):
+ color = scenic_obj.semanticColor
+ if isinstance(color, (list, tuple)) and len(color) == 3:
+ return list(color)
+
+ # Priority 3: Fallback to class-based coloring.
+ class_name = self._extract_class_name(geom_name)
+
+ if class_name:
+ # Check if we've already assigned a color to this class.
+ if class_name not in self._class_color_map:
+ # Assign color based on settings.
+ if self.color_by_instance:
+ # Check custom color map first.
+ color_assigned = False
+ if self.color_map:
+ for keyword, color in self.color_map.items():
+ if keyword.lower() in class_name.lower():
+ self._class_color_map[class_name] = color
+ color_assigned = True
+ break
+
+ # Otherwise use palette.
+ if not color_assigned:
+ available_color = self._get_available_palette_color()
+ self._class_color_map[class_name] = available_color
+ else:
+ # Color by geom_id (old behavior).
+ color_idx = geom_id % len(self._color_palette)
+ self._class_color_map[class_name] = self._color_palette[
+ color_idx
+ ]
+
+ return self._class_color_map[class_name]
+
+ # No class name found, use geom-based coloring.
+ color_idx = geom_id % len(self._color_palette)
+ return self._color_palette[color_idx]
+
+ return [255, 255, 255] # White for unknown.
+
+ except Exception as e:
+ print(f"Error getting semantic color for geom {geom_id}: {e}")
+ return [128, 128, 128] # Gray fallback.
+
+ def _extract_class_name(self, geom_name):
+ """
+ Extract class name from geometry name.
+
+ Assumes naming convention: "partname_classname_instancenumber"
+ E.g., "wheel_amiga_0" -> "amiga"
+ "fl_wheel_amiga_0" -> "amiga"
+ "stem_plant_1" -> "plant"
+ "rail_left_amiga_0" -> "amiga"
+ """
+ # Split by underscore.
+ parts = geom_name.split("_")
+
+ # Look for pattern where last part is a number.
+ # The class name should be the second-to-last part.
+ if len(parts) >= 2 and parts[-1].isdigit():
+ # Class name is second to last.
+ return parts[-2]
+
+ # Fallback: return the full name.
+ return geom_name
+
+ def _get_scenic_color_for_class(self, class_name):
+ """Check if any Scenic object with this class name has semanticColor defined."""
+ # We need access to the geom_to_object mapping.
+ if not hasattr(self, "geom_to_object") or not self.geom_to_object:
+ return None
+
+ # Look through all mapped objects to find one with this class name.
+ for geom_id, scenic_obj in self.geom_to_object.items():
+ if hasattr(scenic_obj, "instance_id") and scenic_obj.instance_id:
+ # Check if this object's instance_id contains the class_name.
+ # E.g., instance_id = "amiga_0", class_name = "amiga"
+ if class_name in scenic_obj.instance_id:
+ # Found a matching object, check if it has semanticColor.
+ if (
+ hasattr(scenic_obj, "semanticColor")
+ and scenic_obj.semanticColor is not None
+ ):
+ color = scenic_obj.semanticColor
+ if isinstance(color, (list, tuple)) and len(color) == 3:
+ return list(color)
+
+ return None
+
+ def _get_all_semantic_colors(self):
+ """Get all semanticColors that are explicitly defined on Scenic objects."""
+ if not hasattr(self, "geom_to_object") or not self.geom_to_object:
+ return set()
+
+ semantic_colors = set()
+ seen_objects = set() # Avoid checking same object multiple times.
+
+ for geom_id, scenic_obj in self.geom_to_object.items():
+ # Use object id to avoid duplicates.
+ obj_id = id(scenic_obj)
+ if obj_id in seen_objects:
+ continue
+ seen_objects.add(obj_id)
+
+ if (
+ hasattr(scenic_obj, "semanticColor")
+ and scenic_obj.semanticColor is not None
+ ):
+ color = scenic_obj.semanticColor
+ if isinstance(color, (list, tuple)) and len(color) == 3:
+ # Convert to tuple for set storage.
+ semantic_colors.add(tuple(color))
+
+ return semantic_colors
+
+ def _get_available_palette_color(self):
+ """
+ Get a palette color that doesn't conflict with semanticColors.
+
+ Returns:
+ [R, G, B] color list from palette that's not used by semanticColor
+ """
+ # Get all colors already used by semanticColor.
+ reserved_colors = self._get_all_semantic_colors()
+
+ # Also reserve colors already assigned to other classes.
+ used_colors = set(tuple(c) for c in self._class_color_map.values())
+ reserved_colors.update(used_colors)
+
+ # Find first palette color not in reserved set.
+ for color in self._color_palette:
+ if tuple(color) not in reserved_colors:
+ return color
+
+ # If all palette colors are taken, start generating variations.
+ while True:
+ # Generate a random color that's not reserved.
+ color = [
+ random.randint(50, 255),
+ random.randint(50, 255),
+ random.randint(50, 255),
+ ]
+ if tuple(color) not in reserved_colors:
+ return color
+
+
+class MujocoRenderer:
+ """MuJoCo renderer with shared RGB data caching."""
+
+ def __init__(self, model, width, height):
+ self.model = model
+ self.width = width
+ self.height = height
+
+ # Shared RGB data for sensors using the same camera.
+ self._shared_rgb_data = None
+ self._shared_rgb_time = -1
+ self._shared_rgb_camera = None
+
+ try:
+ self.renderer = mujoco.Renderer(model, height=height, width=width)
+ self._initialized = True
+ except Exception as e:
+ print(f"Failed to initialize renderer: {e}")
+ self.renderer = None
+ self._initialized = False
+
+ def is_initialized(self):
+ """
+ Check if the underlying MuJoCo renderer context is active and valid.
+ """
+ return self._initialized and self.renderer is not None
+
+ def render(self, data=None, camera_name=None):
+ """Render RGB image."""
+ if not self.is_initialized():
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ try:
+ if data is not None:
+ camera_id = -1
+ if camera_name:
+ camera_id = mujoco.mj_name2id(
+ self.model, mujoco.mjtObj.mjOBJ_CAMERA, camera_name
+ )
+ if camera_id < 0:
+ print(f"WARNING: Camera '{camera_name}' not found")
+
+ if camera_id >= 0:
+ self.renderer.update_scene(data, camera=camera_id)
+ else:
+ self.renderer.update_scene(data)
+
+ pixels = self.renderer.render()
+ return pixels
+
+ except Exception as e:
+ print(f"Rendering failed: {e}")
+ return np.zeros((self.height, self.width, 3), dtype=np.uint8)
+
+ def close(self):
+ """Release MuJoCo rendering context and clear shared caches."""
+ # Clean up shared data.
+ if hasattr(self, "_shared_rgb_data") and self._shared_rgb_data is not None:
+ del self._shared_rgb_data
+ self._shared_rgb_data = None
+
+ if hasattr(self, "renderer") and self.renderer:
+ try:
+ self.renderer.close()
+ except Exception as e:
+ print(f"WARNING: Error closing renderer: {e}")
+ self.renderer = None
+
+ self._initialized = False
diff --git a/src/scenic/simulators/mujoco/simulator.py b/src/scenic/simulators/mujoco/simulator.py
new file mode 100644
index 000000000..33850fa7f
--- /dev/null
+++ b/src/scenic/simulators/mujoco/simulator.py
@@ -0,0 +1,550 @@
+import math
+import re
+import xml.etree.ElementTree as ET
+
+import mujoco
+import mujoco.viewer
+from scipy.spatial.transform import Rotation
+
+from scenic.core.sensors import RGBSensor, SSSensor
+from scenic.core.simulators import Simulation, Simulator
+from scenic.core.vectors import Vector
+
+from .sensors import MujocoRenderer, MujocoRGBSensor, MujocoSSSensor
+
+VERTICAL_THRESHOLD = (
+ 0.99 # Threshold to detect gimbal lock when camera looks straight up and down.
+)
+MIN_VECTOR_LENGTH = 0.001
+
+
+class MujocoSimulator(Simulator):
+ """
+ Class that serves as the interface between Scenic and the MuJoCo physics engine.
+ Configures the simulation environment (loading the base XML and setting the viewer preferences).
+ Spawns the individual `MujocoSimulation` instances for each scenario run.
+ """
+
+ def __init__(self, base_xml_file=None, actual=False, use_viewer=True):
+ super().__init__()
+ self.base_xml_file = base_xml_file
+ self.actual = actual
+ self.use_viewer = use_viewer
+
+ def createSimulation(self, scene, **kwargs):
+ return MujocoSimulation(
+ scene, self.base_xml_file, self.actual, self.use_viewer, **kwargs
+ )
+
+
+class MujocoSimulation(Simulation):
+ """
+ Manages a single execution of a Scenic scenario within MuJoCo.
+ Handles the lifecycle of a simulation run which includes
+ - Scene Geneeration
+ - XML Injection
+ - Sensor Management
+ - Physics Stepping
+ Uses string manipulation and regex to inject object XML into the
+ base model. Assumes target bodies for cameras/sensors are "leaf"
+ nodes or strictly defined containers. Complex nesting in `get_mujoco_xml`
+ may require careful structure to avoid regex misplacement.
+ """
+
+ def __init__(
+ self, scene, base_xml_file=None, actual=False, use_viewer=True, **kwargs
+ ):
+ self.base_xml_file = base_xml_file
+ self.actual = actual
+ self.use_viewer = use_viewer
+ self.scene = scene
+
+ self.mujocohandle = None
+
+ # Shared renderer management to prevent memory leaks.
+ # Key: (width, height) / Value: renderer
+ self.shared_renderers = {}
+ self.camera_counter = 0
+
+ kwargs.pop("timestep")
+
+ # Default timestep.
+ self.timestep = 0.01
+ if scene.params.get("timestep"):
+ self.timestep = scene.params.get("timestep")
+
+ # Set defaults.
+ if "maxSteps" not in kwargs:
+ kwargs["maxSteps"] = 1000000
+ if "name" not in kwargs:
+ kwargs["name"] = "MujocoSimulation"
+
+ super().__init__(scene, timestep=self.timestep, **kwargs)
+
+ def createObjectInSimulator(self, obj):
+ # Objects are created in setup() through XML templates.
+ pass
+
+ def setup(self):
+ super().setup()
+
+ # Load base XML.
+ if self.base_xml_file:
+ with open(self.base_xml_file, "r") as f:
+ complete_xml = f.read()
+ else:
+ complete_xml = self._get_minimal_base_xml()
+
+ # Process all objects with get_mujoco_xml and their sensors.
+ all_bodies = []
+ all_actuators = []
+ all_sensors = []
+ all_cameras = []
+ all_assets = []
+
+ object_counter = 0
+
+ for obj in self.objects:
+ if callable(getattr(obj, "get_mujoco_xml")):
+
+ # Get position and orientation from Scenic.
+ pos_str = f"{float(obj.position.x)} {float(obj.position.y)} {float(obj.position.z)}"
+ quat_str = self._get_quaternion_string(obj)
+
+ # Get XML from the object's get_mujoco_xml method.
+ xml_result = obj.get_mujoco_xml(object_counter, pos_str, quat_str)
+
+ # Parse the XML block into sections.
+ if isinstance(xml_result, str):
+ sections = self._parse_single_xml_block(xml_result)
+ else:
+ sections = xml_result
+
+ # Add to appropriate sections.
+ if "body" in sections and sections["body"]:
+ all_bodies.append(sections["body"])
+ if "actuators" in sections and sections["actuators"]:
+ all_actuators.append(sections["actuators"])
+ if "sensors" in sections and sections["sensors"]:
+ all_sensors.append(sections["sensors"])
+ if "assets" in sections and sections["assets"]:
+ all_assets.append(sections["assets"])
+
+ # Process object sensors.
+ camera_data = self._process_object_sensors(obj, object_counter)
+ if camera_data:
+ all_cameras.extend(camera_data)
+
+ object_counter += 1
+
+ # Insert assets first.
+ if all_assets:
+ assets_xml = "\n".join(all_assets)
+ complete_xml = complete_xml.replace("", f"{assets_xml}\n")
+
+ # Insert bodies.
+ if all_bodies:
+ bodies_xml = "\n".join(all_bodies)
+ complete_xml = complete_xml.replace(
+ "", f"{bodies_xml}\n"
+ )
+
+ # Insert cameras into parent bodies.
+ if all_cameras:
+ for camera_xml, parent_body_name in all_cameras:
+
+ # WARNING: This regex finds the *first* )'
+
+ def insert_camera(match):
+ return match.group(1) + "\n" + camera_xml + "\n " + match.group(2)
+
+ complete_xml = re.sub(
+ pattern, insert_camera, complete_xml, flags=re.DOTALL
+ )
+
+ if all_actuators:
+ actuators_xml = "\n".join(all_actuators)
+ complete_xml = complete_xml.replace(
+ "", f"{actuators_xml}\n"
+ )
+
+ if all_sensors:
+ sensors_xml = "\n".join(all_sensors)
+ complete_xml = complete_xml.replace("", f"{sensors_xml}\n")
+
+ # Load into MuJoCo.
+ try:
+ self.model = mujoco.MjModel.from_xml_string(complete_xml)
+ self.data = mujoco.MjData(self.model)
+ except Exception as e:
+ print(f"ERROR: MuJoCo model creation failed: {e}")
+ raise
+
+ self._initialize_sensors()
+
+ # Launch viewer.
+ if self.use_viewer:
+ try:
+ self.mujocohandle = mujoco.viewer.launch_passive(self.model, self.data)
+ except Exception as e:
+ print(f"WARNING: Viewer launch failed: {e}")
+ self.mujocohandle = None
+ else:
+ self.mujocohandle = None
+
+ def _process_object_sensors(self, obj, obj_counter):
+ camera_xmls = []
+
+ sensors = getattr(obj, "sensors", None)
+ if not sensors:
+ return camera_xmls
+
+ for _, sensor in obj.sensors.items():
+ if isinstance(sensor, (RGBSensor, SSSensor)):
+ camera_xml = self._generate_camera_xml(sensor, obj_counter)
+ if camera_xml:
+ camera_xmls.append((camera_xml, obj.body_name))
+
+ return camera_xmls
+
+ def _generate_camera_xml(self, sensor, obj_counter):
+
+ attrs = getattr(sensor, "attributes", None) or {}
+
+ # Get offset.
+ offset = getattr(sensor, "offset", None) or (3.0, 2.0, 2.0)
+ offset_x, offset_y, offset_z = offset
+
+ # Get field of view.
+ fovy = attrs.get("fovy", 60)
+
+ # Set look direction from attributes; default to looking forward.
+ look_x, look_y, look_z = attrs.get("look_direction", (1.0, 0.0, 0.0))
+
+ # Create camera name.
+ self.camera_counter += 1
+ camera_name = f"camera_{obj_counter}_{self.camera_counter}"
+ sensor.camera_name = camera_name
+
+ # Normalize the look direction.
+ length = math.sqrt(look_x * look_x + look_y * look_y + look_z * look_z)
+ if length > 0:
+ look_x /= length
+ look_y /= length
+ look_z /= length
+
+ # Calculate camera axes for MuJoCo xyaxes.
+ # Check if looking nearly straight up or down.
+ if abs(look_z) > VERTICAL_THRESHOLD: # Nearly vertical.
+ if look_z < 0: # Looking down.
+ xyaxes = "1 0 0 0 1 0"
+ else: # Looking up.
+ xyaxes = "1 0 0 0 -1 0"
+ else:
+ # Normal case: calculate right and up vectors.
+ world_up_x, world_up_y, world_up_z = 0, 0, 1
+
+ right_x = look_y * world_up_z - look_z * world_up_y
+ right_y = look_z * world_up_x - look_x * world_up_z
+ right_z = look_x * world_up_y - look_y * world_up_x
+
+ right_length = math.sqrt(
+ right_x * right_x + right_y * right_y + right_z * right_z
+ )
+ if right_length > MIN_VECTOR_LENGTH:
+ right_x /= right_length
+ right_y /= right_length
+ right_z /= right_length
+ else:
+ # Fallback if right vector is too small.
+ right_x, right_y, right_z = 0, 1, 0
+
+ up_x = right_y * look_z - right_z * look_y
+ up_y = right_z * look_x - right_x * look_z
+ up_z = right_x * look_y - right_y * look_x
+
+ up_length = math.sqrt(up_x * up_x + up_y * up_y + up_z * up_z)
+ if up_length > MIN_VECTOR_LENGTH:
+ up_x /= up_length
+ up_y /= up_length
+ up_z /= up_length
+
+ xyaxes = f"{right_x:.3f} {right_y:.3f} {right_z:.3f} {up_x:.3f} {up_y:.3f} {up_z:.3f}"
+
+ # Return camera XML with relative position.
+ camera_xml = f""" """
+
+ return camera_xml
+
+ def _build_geom_to_object_mapping(self):
+ """Build mapping from geom_id to Scenic object for semantic segmentation."""
+ geom_to_object = {}
+
+ for obj in self.objects:
+ # Get the instance_id to match geom names.
+ if getattr(obj, "instance_id", None):
+ instance_id = obj.instance_id
+
+ # Iterate through all geoms in the model.
+ for geom_id in range(self.model.ngeom):
+ geom_name = mujoco.mj_id2name(
+ self.model, mujoco.mjtObj.mjOBJ_GEOM, geom_id
+ )
+
+ # Check if this geom belongs to this object by checking if instance_id is in the name.
+ if geom_name and instance_id in geom_name:
+ geom_to_object[geom_id] = obj
+
+ return geom_to_object
+
+ def _initialize_sensors(self):
+ """Initialize all sensors with shared renderers and geom-to-object mappings."""
+
+ # Build geom-to-object mapping for semantic segmentation.
+ geom_to_object_map = self._build_geom_to_object_mapping()
+
+ for obj in self.objects:
+ if getattr(obj, "sensors", None):
+ for sensor_name, sensor in obj.sensors.items():
+ if isinstance(sensor, (MujocoRGBSensor, MujocoSSSensor)):
+ sensor.model = self.model
+ sensor.data = self.data
+
+ # For SS sensors, provide geom-to-object mapping.
+ if isinstance(sensor, MujocoSSSensor):
+ sensor.geom_to_object = geom_to_object_map
+
+ # Use shared renderer based on resolution.
+ key = (sensor.width, sensor.height)
+ if key not in self.shared_renderers:
+ renderer = MujocoRenderer(
+ self.model, sensor.width, sensor.height
+ )
+ if renderer.is_initialized():
+ self.shared_renderers[key] = renderer
+ else:
+ print(f"ERROR: Failed to create renderer for {key}")
+ continue
+
+ sensor.renderer = self.shared_renderers[key]
+
+ # Verify camera exists in model.
+ if sensor.camera_name:
+ camera_id = mujoco.mj_name2id(
+ self.model, mujoco.mjtObj.mjOBJ_CAMERA, sensor.camera_name
+ )
+ if camera_id == -1:
+ print(
+ f"WARNING: Camera '{sensor.camera_name}' not found in model"
+ )
+
+ def _parse_single_xml_block(self, xml_block):
+ """Parse a single XML block into body, actuators, and sensors sections."""
+ sections = {"body": "", "actuators": "", "sensors": ""}
+
+ try:
+ # Wrap in a root element for parsing.
+ root_xml = f"{xml_block}"
+ root = ET.fromstring(root_xml)
+
+ # Extract all top-level body elements.
+ body_elements = []
+ for child in root:
+ if child.tag == "body":
+ body_elements.append(ET.tostring(child, encoding="unicode"))
+
+ if body_elements:
+ sections["body"] = "\n".join(body_elements)
+
+ # Extract actuators section.
+ actuators_elements = root.findall(".//actuators")
+ if actuators_elements:
+ actuators_content = []
+ for actuator_section in actuators_elements:
+ for child in actuator_section:
+ actuators_content.append(ET.tostring(child, encoding="unicode"))
+ if actuators_content:
+ sections["actuators"] = "\n".join(actuators_content)
+
+ # Extract sensors section.
+ sensors_elements = root.findall(".//sensors")
+ if sensors_elements:
+ sensors_content = []
+ for sensor_section in sensors_elements:
+ for child in sensor_section:
+ sensors_content.append(ET.tostring(child, encoding="unicode"))
+ if sensors_content:
+ sections["sensors"] = "\n".join(sensors_content)
+
+ except ET.ParseError as e:
+ print(f"Warning: Failed to parse XML block: {e}")
+ # Fallback: treat entire block as body.
+ sections["body"] = xml_block
+
+ return sections
+
+ def _get_quaternion_string(self, obj):
+ q = obj.orientation.q
+ return f"{q[0]} {q[1]} {q[2]} {q[3]}"
+
+ def _get_minimal_base_xml(self):
+ return """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+ def step(self):
+ if self.mujocohandle and not self.mujocohandle.is_running():
+ raise KeyboardInterrupt("MuJoCo Viewer closed by user.")
+
+ # Let objects control themselves.
+ for obj in self.objects:
+ if getattr(obj, "control", None):
+ obj.control(self.model, self.data)
+
+ mujoco.mj_step(self.model, self.data)
+
+ if self.mujocohandle:
+ self.mujocohandle.sync()
+
+ self._update_observations()
+
+ def _update_observations(self):
+ """Update observations with proper sensor coordination to prevent double rendering."""
+
+ for obj in self.objects:
+ if not getattr(obj, "sensors", None):
+ continue
+
+ if not getattr(obj, "observations", None):
+ obj.observations = {}
+
+ # Categorize sensors by type for ordered processing.
+ # RGB sensors must be processed first to establish shared cache for SS sensors.
+ sensor_groups = {"rgb": [], "ss": [], "other": []}
+
+ for sensor_name, sensor in obj.sensors.items():
+ if isinstance(sensor, MujocoRGBSensor):
+ sensor_groups["rgb"].append((sensor_name, sensor))
+ elif isinstance(sensor, MujocoSSSensor):
+ sensor_groups["ss"].append((sensor_name, sensor))
+ else:
+ sensor_groups["other"].append((sensor_name, sensor))
+
+ # Process in order: RGB -> SS -> other
+ # This ensures SS sensors can reuse RGB rendering cache.
+ for group_name in ["rgb", "ss", "other"]:
+ for sensor_name, sensor in sensor_groups[group_name]:
+ try:
+ obj.observations[sensor_name] = sensor.getObservation()
+ except Exception as e:
+ print(
+ f"Failed to update {group_name.upper()} sensor {sensor_name}: {e}"
+ )
+ obj.observations[sensor_name] = None
+
+ def getProperties(self, obj, properties):
+ if not getattr(obj, "body_name", None):
+ # Return minimal properties for non-body objects (like Terrain).
+ return {
+ "position": obj.position,
+ "velocity": Vector(0, 0, 0),
+ "speed": 0,
+ "angularSpeed": 0,
+ "angularVelocity": Vector(0, 0, 0),
+ "yaw": 0,
+ "pitch": 0,
+ "roll": 0,
+ }
+
+ try:
+ body_data = self.data.body(obj.body_name)
+
+ # Position
+ x, y, z = body_data.subtree_com
+ position = Vector(x, y, z)
+
+ # Velocity
+ vel_x, vel_y, vel_z = body_data.cvel[0:3]
+ velocity = Vector(vel_x, vel_y, vel_z)
+ speed = math.hypot(*velocity)
+
+ # Angular velocity
+ ang_x, ang_y, ang_z = body_data.cvel[3:6]
+ angularVelocity = Vector(ang_x, ang_y, ang_z)
+ angularSpeed = math.hypot(*angularVelocity)
+
+ # Orientation
+ try:
+ orientation_matrix = body_data.ximat.reshape(3, 3)
+ r = Rotation.from_matrix(orientation_matrix)
+ yaw, pitch, roll = r.as_euler("zyx")
+ except:
+ yaw = pitch = roll = 0.0
+
+ return {
+ "position": position,
+ "velocity": velocity,
+ "speed": speed,
+ "angularSpeed": angularSpeed,
+ "angularVelocity": angularVelocity,
+ "yaw": yaw,
+ "pitch": pitch,
+ "roll": roll,
+ }
+
+ except Exception as e:
+ print(f"Error getting properties for {obj}: {e}")
+ return {}
+
+ def destroy(self):
+ """Clean up all resources properly."""
+
+ # Clean up shared renderers first.
+ for key, renderer in self.shared_renderers.items():
+ try:
+ renderer.close()
+ except Exception as e:
+ print(f"WARNING: Error closing renderer {key}: {e}")
+
+ self.shared_renderers.clear()
+
+ # Close MuJoCo viewer.
+ if self.mujocohandle:
+ try:
+ self.mujocohandle.close()
+ except Exception as e:
+ print(f"WARNING: Error closing viewer: {e}")
+ self.mujocohandle = None
+
+ # Call parent cleanup.
+ super().destroy()
diff --git a/tests/simulators/mujoco/basic.scenic b/tests/simulators/mujoco/basic.scenic
new file mode 100644
index 000000000..0348e7cd0
--- /dev/null
+++ b/tests/simulators/mujoco/basic.scenic
@@ -0,0 +1,52 @@
+from amiga import Amiga, Plant
+from scenic.simulators.mujoco.simulator import MujocoSimulator
+from scenic.simulators.mujoco.model import Ground, Hill
+
+from scenic.core.regions import RectangularRegion
+from scenic.core.vectors import Vector
+
+simulator MujocoSimulator(base_xml_file='amiga_base.xml', use_viewer=True)
+
+# Creating region for spawning objects.
+field_region = RectangularRegion(Vector(-20, -10), 0, 40, 20)
+
+# Creating irregular terrain (two hills).
+hill1 = new Hill at (10, 10, 0), with height 2, with spread 0.3
+hill2 = new Hill at (-10, -10, 0), with height 1.5, with spread 0.4
+
+new Ground at (0, 0, 0),
+ with width 50,
+ with length 50,
+ with gridSize 50,
+ with terrain (hill1, hill2)
+
+# Defining regions for the crop rows to ensure that they're parallel.
+row_y_positions = [-6, -3, 0, 3, 6]
+
+# Spawning multiple Amiga objects at different positions.
+amiga1 = new Amiga at (8, 1, 0.5),
+ facing 0 deg
+
+amiga2 = new Amiga at (-10, 2, 0.5),
+ facing 0 deg
+
+# Creating plants along the crop rows.
+# Row 1 (y = -6)
+for i in range(-8, 6, 2):
+ new Plant at (i, -6, 0.1), with plant_type "corn"
+
+# Row 2 (y = -3)
+for j in range(-8, 6, 2):
+ new Plant at (j, -3, 0.1), with plant_type "wheat"
+
+# # Row 3 (y = 0)
+for k in range(-8, 6, 2):
+ new Plant at (k, 0, 0.1), with plant_type "soybean"
+
+# # Row 4 (y = 3)
+for l in range(-8, 6, 2):
+ new Plant at (l, 3, 0.1), with plant_type "lettuce"
+
+# Row 5 (y = 6)
+for m in range(-8, 6, 2):
+ new Plant at (m, 6, 0.1), with plant_type "tomato"
\ No newline at end of file
diff --git a/tests/simulators/mujoco/test_mujoco.py b/tests/simulators/mujoco/test_mujoco.py
new file mode 100644
index 000000000..15c971e8d
--- /dev/null
+++ b/tests/simulators/mujoco/test_mujoco.py
@@ -0,0 +1,152 @@
+import os
+from pathlib import Path
+
+import numpy as np
+import pytest
+
+import scenic
+
+try:
+ import mujoco
+
+ from scenic.simulators.mujoco.simulator import MujocoSimulator
+except ModuleNotFoundError:
+ pytest.skip("MuJoCo package not installed.", allow_module_level=True)
+
+from tests.utils import compileScenic, pickle_test, sampleScene, tryPickling
+
+WINDOW_ERR = "Could not open window."
+
+
+# Helper to run a simulation but skip cleanly on CI.
+def simulate_or_skip(simulator, scene):
+ try:
+ return simulator.simulate(scene)
+ except Exception as e:
+ if WINDOW_ERR in str(e) or "display" in str(e).lower():
+ pytest.skip("MuJoCo viewer cannot open on this platform/CI")
+ else:
+ raise
+
+
+@pytest.fixture(scope="package")
+def getMujocoSimulator():
+ """Factory for creating MuJoCo simulators with/without viewer."""
+
+ def _getMujocoSimulator(use_viewer=False):
+ simulator = MujocoSimulator(use_viewer=use_viewer)
+ return simulator
+
+ yield _getMujocoSimulator
+
+
+def test_basic_ground(getMujocoSimulator):
+ """Test that a basic scene with ground can be created."""
+ simulator = getMujocoSimulator(use_viewer=False)
+ code = """
+ model scenic.simulators.mujoco.model
+
+ workspace = Workspace(RectangularRegion(0 @ 0, 0, 10, 10))
+
+ ground = new Ground with width 10, with length 10
+
+ terminate after 1 steps
+ """
+ scenario = compileScenic(code)
+ scene = sampleScene(scenario)
+ simulation = simulator.simulate(scene)
+ assert simulation is not None
+ assert len(simulation.result.trajectory) > 0
+
+
+def test_ground_with_terrain(getMujocoSimulator):
+ """Test ground with hills (terrain)."""
+ simulator = getMujocoSimulator(use_viewer=False)
+ code = """
+ model scenic.simulators.mujoco.model
+
+ workspace = Workspace(RectangularRegion(0 @ 0, 0, 20, 20))
+
+ hill1 = new Hill at 2 @ 2, with width 3, with length 3, with height 0.5
+ hill2 = new Hill at -2 @ -2, with width 2, with length 2, with height 0.3
+
+ ground = new Ground with width 20, with length 20,
+ with terrain (hill1, hill2),
+ with gridSize 30
+
+ terminate after 2 steps
+ """
+ scenario = compileScenic(code)
+ scene = sampleScene(scenario)
+ simulation = simulator.simulate(scene)
+ assert simulation is not None
+ # Verify ground was created.
+ assert len(scene.objects) == 3 # 2 hills + 1 ground.
+
+
+@pickle_test
+def test_pickle(getMujocoSimulator):
+ """Test that scenarios and scenes can be pickled."""
+ code = """
+ model scenic.simulators.mujoco.model
+
+ workspace = Workspace(RectangularRegion(0 @ 0, 0, 10, 10))
+ ground = new Ground with width 10, with length 10
+
+ terminate after 1 steps
+ """
+ scenario = tryPickling(compileScenic(code))
+ tryPickling(sampleScene(scenario))
+
+
+def test_headless_execution(getMujocoSimulator):
+ """Test that simulator can run completely headless without viewer."""
+ simulator = getMujocoSimulator(use_viewer=False)
+ code = """
+ model scenic.simulators.mujoco.model
+
+ workspace = Workspace(RectangularRegion(0 @ 0, 0, 10, 10))
+
+ hill = new Hill at 0 @ 0, with width 5, with length 5, with height 1.0
+ ground = new Ground with width 10, with length 10,
+ with terrain (hill,),
+ with gridSize 25
+
+ terminate after 5 steps
+ """
+ scenario = compileScenic(code)
+ scene = sampleScene(scenario)
+
+ # This should NOT try to open a viewer window.
+ simulation = simulator.simulate(scene)
+
+ assert simulation is not None
+ assert len(simulation.result.trajectory) == 6 # 5 steps + initial state.
+
+
+def test_amiga_robot_integration(getMujocoSimulator):
+ """Integration test: Loads the full Amiga example to verify robot XML assets."""
+
+ current_dir = Path(__file__).parent
+ repo_root = current_dir.parents[2]
+
+ example_path = (
+ repo_root / "examples" / "mujoco" / "farm-ng" / "example_simulation.scenic"
+ )
+
+ if not example_path.exists():
+ pytest.skip(f"Amiga example not found at {example_path}")
+
+ # Load the scenario using the 'scenic' library directly.
+ # Override use_viewer=False to ensure it runs on CI/Headless.
+ scenario = scenic.scenarioFromFile(str(example_path), params={"use_viewer": False})
+
+ # Generate a scene (this verifies 'amiga.scenic' parses correctly).
+ scene, _ = scenario.generate()
+
+ # Simulate for a few steps (this verifies 'amiga_base.xml' and physics load correctly).
+ simulator = getMujocoSimulator(use_viewer=False)
+ simulation = simulator.simulate(scene, maxSteps=5)
+
+ assert simulation is not None
+ assert len(simulation.result.trajectory) >= 6 # Initial state + 5 steps.