diff --git a/scripts/setup_inference.sh b/scripts/setup_inference.sh index c1aa75d6e..30f3fe0cd 100644 --- a/scripts/setup_inference.sh +++ b/scripts/setup_inference.sh @@ -80,7 +80,7 @@ if [[ ! -f $SENTINEL_FILE ]]; then # Note: On macOS, only Unitree SDK is supported (Booster SDK is Linux-only) if [[ $OS == "Darwin" ]]; then echo "Note: Installing Unitree SDK only (Booster SDK is not supported on macOS)" - pip install -e $ROOT_DIR/src/holosoma_inference[unitree] + pip install -e $ROOT_DIR/src/holosoma_inference else pip install -e $ROOT_DIR/src/holosoma_inference[unitree,booster] fi diff --git a/scripts/setup_mujoco.sh b/scripts/setup_mujoco.sh index 8f295ded3..13cf60e8e 100644 --- a/scripts/setup_mujoco.sh +++ b/scripts/setup_mujoco.sh @@ -135,7 +135,17 @@ if [[ ! -f $SENTINEL_FILE ]]; then pip install -e $ROOT_DIR/src/holosoma[unitree, booster] elif [[ "$OS_NAME" == "Darwin" ]]; then echo "Warning: only unitree support for osx" - pip install -e $ROOT_DIR/src/holosoma[unitree] + pip install -e $ROOT_DIR/src/holosoma + + # Install unitree_sdk2_python + if [[ ! -d $WORKSPACE_DIR/unitree_sdk2_python ]]; then + git clone https://github.com/unitreerobotics/unitree_sdk2_python.git $WORKSPACE_DIR/unitree_sdk2_python + fi + # Install into your conda environment + cd $WORKSPACE_DIR/unitree_sdk2_python + pip install -e . + cd .. + else echo "Unsupported OS: $OS_NAME" exit 1 diff --git a/src/holosoma/holosoma/bridge/unitree/__init__.py b/src/holosoma/holosoma/bridge/unitree/__init__.py index 9738a9b72..775bb1813 100644 --- a/src/holosoma/holosoma/bridge/unitree/__init__.py +++ b/src/holosoma/holosoma/bridge/unitree/__init__.py @@ -1,7 +1,46 @@ """ Unitree SDK2Py bridge implementation. + +Automatically selects the appropriate backend based on available SDK: +- unitree_interface (C++ bindings) for Linux +- unitree_sdk2py (pure Python) for Mac """ -from .unitree_sdk2py_bridge import UnitreeSdk2Bridge +from loguru import logger + + +# from .unitree_sdk2py_bridge import UnitreeSdk2Bridge +def _get_unitree_bridge_class(): + """Return the appropriate bridge class based on available SDK.""" + # Try C++ bindings first (Linux) + try: + from unitree_interface import UnitreeInterface # noqa: F401 + + from .unitree_sdk2py_bridge import UnitreeSdk2Bridge + + logger.debug("Using unitree_interface (C++ bindings) backend") + return UnitreeSdk2Bridge + except ImportError: + pass + + # Fall back to pure Python SDK (Mac) + try: + from unitree_sdk2py.core.channel import ChannelPublisher # noqa: F401 + + from .unitree_sdk2py_bridge_mac import UnitreeSdk2Bridge + + logger.debug("Using unitree_sdk2py (pure Python) backend") + return UnitreeSdk2Bridge + except ImportError: + pass + + raise ImportError( + "No Unitree SDK found. Install either:\n" + " - unitree_sdk2 (with unitree_interface) for Linux\n" + " - unitree_sdk2py for Mac" + ) + + +UnitreeSdk2Bridge = _get_unitree_bridge_class() __all__ = ["UnitreeSdk2Bridge"] diff --git a/src/holosoma/holosoma/bridge/unitree/unitree_sdk2py_bridge_mac.py b/src/holosoma/holosoma/bridge/unitree/unitree_sdk2py_bridge_mac.py new file mode 100644 index 000000000..66758eddf --- /dev/null +++ b/src/holosoma/holosoma/bridge/unitree/unitree_sdk2py_bridge_mac.py @@ -0,0 +1,242 @@ +from loguru import logger +import numpy as np + +# Import from unitree_sdk2_python (official SDK) +from unitree_sdk2py.core.channel import ( + ChannelPublisher, + ChannelSubscriber, + ChannelFactoryInitialize, +) +from unitree_sdk2py.utils.crc import CRC + +from holosoma.bridge.base.basic_sdk2py_bridge import BasicSdk2Bridge + + +class UnitreeSdk2Bridge(BasicSdk2Bridge): + """Unitree SDK bridge implementation using unitree_sdk2_python (official Python SDK).""" + + SUPPORTED_ROBOT_TYPES = {"g1_29dof", "h1", "h1-2", "go2_12dof"} + + # Robot type to message type mapping + # HG = humanoid (G1, H1-2), GO = quadruped (Go2, H1) + ROBOT_MESSAGE_TYPE = { + "g1_29dof": "hg", + "h1": "go", + "h1-2": "hg", + "go2_12dof": "go", + } + + def _init_sdk_components(self): + """Initialize Unitree SDK-specific components using unitree_sdk2_python.""" + robot_type = self.robot.asset.robot_type + + # Validate robot type first + if robot_type not in self.SUPPORTED_ROBOT_TYPES: + raise ValueError(f"Invalid robot type '{robot_type}'. Unitree SDK supports: {self.SUPPORTED_ROBOT_TYPES}") + + # Get network interface from config + interface_name = self.bridge_config.interface or "eth0" + + # Initialize DDS channel factory + # First argument is domain_id (0 is default) + ChannelFactoryInitialize(0, interface_name) + + # Determine message type based on robot + msg_type = self.ROBOT_MESSAGE_TYPE[robot_type] + + # Import appropriate message types based on robot + if msg_type == "hg": + # Humanoid robots (G1, H1-2) use unitree_hg messages + from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ + from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowState_ + from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ + from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ + + self.low_cmd = unitree_hg_msg_dds__LowCmd_() + self._LowCmd = LowCmd_ + self._LowState = LowState_ + else: + # Quadruped robots (Go2, H1) use unitree_go messages + from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ + from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ + from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ + from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ + + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self._LowCmd = LowCmd_ + self._LowState = LowState_ + + # Initialize Go2-specific command header + self.low_cmd.head[0] = 0xFE + self.low_cmd.head[1] = 0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + + # Initialize data structures + self.low_state = None + self.crc = CRC() + self._msg_type = msg_type + + # Create subscriber for low-level commands (from external controller) + self.lowcmd_subscriber = ChannelSubscriber("rt/lowcmd", self._LowCmd) + self.lowcmd_subscriber.Init(self._low_cmd_callback, 10) + + # Create publisher for low-level state (to external controller) + self.lowstate_publisher = ChannelPublisher("rt/lowstate", self._LowState) + self.lowstate_publisher.Init() + + # Initialize motor commands + for i in range(self.num_motor): + self.low_cmd.motor_cmd[i].mode = 0x01 # Enable motor (PMSM mode) + self.low_cmd.motor_cmd[i].q = 0.0 + self.low_cmd.motor_cmd[i].dq = 0.0 + self.low_cmd.motor_cmd[i].kp = 0.0 + self.low_cmd.motor_cmd[i].kd = 0.0 + self.low_cmd.motor_cmd[i].tau = 0.0 + + logger.info(f"Initialized unitree_sdk2_python bridge for {robot_type} on interface {interface_name}") + + def _low_state_callback(self, msg): + """Callback for receiving low-level state from robot.""" + self.low_state = msg + + def _low_cmd_callback(self, msg): + """Callback for receiving low-level commands from external controller.""" + self.low_cmd = msg + + def low_cmd_handler(self, msg=None): + """Handle Unitree low-level command messages. + + Note: In unitree_sdk2_python, commands are received via the subscriber + callback. This method is kept for interface compatibility. + """ + # The low_state is updated via _low_state_callback + # This method can be used to poll/process the latest state if needed + pass + + def publish_low_state(self): + """Publish Unitree low-level state using unitree_sdk2_python. + + This publishes the simulator state to the DDS network so external + controllers can read it. + """ + # Get simulator data + positions, velocities, accelerations = self._get_dof_states() + actuator_forces = self._get_actuator_forces() + quaternion, gyro, acceleration = self._get_base_imu_data() + + # Create a new state message for publishing + if self._msg_type == "hg": + from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowState_ + + state_msg = unitree_hg_msg_dds__LowState_() + else: + from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ + + state_msg = unitree_go_msg_dds__LowState_() + + # Populate motor state + for i in range(self.num_motor): + state_msg.motor_state[i].q = float(positions[i]) + state_msg.motor_state[i].dq = float(velocities[i]) + state_msg.motor_state[i].ddq = float(accelerations[i]) + state_msg.motor_state[i].tau_est = float(actuator_forces[i]) + + # Populate IMU state + quat_array = quaternion.detach().cpu().numpy() + state_msg.imu_state.quaternion[0] = float(quat_array[0]) # w + state_msg.imu_state.quaternion[1] = float(quat_array[1]) # x + state_msg.imu_state.quaternion[2] = float(quat_array[2]) # y + state_msg.imu_state.quaternion[3] = float(quat_array[3]) # z + + gyro_np = gyro.detach().cpu().numpy() + state_msg.imu_state.gyroscope[0] = float(gyro_np[0]) + state_msg.imu_state.gyroscope[1] = float(gyro_np[1]) + state_msg.imu_state.gyroscope[2] = float(gyro_np[2]) + + accel_np = acceleration.detach().cpu().numpy() + state_msg.imu_state.accelerometer[0] = float(accel_np[0]) + state_msg.imu_state.accelerometer[1] = float(accel_np[1]) + state_msg.imu_state.accelerometer[2] = float(accel_np[2]) + + # Calculate CRC and publish + state_msg.crc = self.crc.Crc(state_msg) + + # Publish state to external controller + self.lowstate_publisher.Write(state_msg) + + def publish_wireless_controller(self): + """Publish wireless controller data using unitree_sdk2_python.""" + # Call base class to populate wireless_controller from joystick + super().publish_wireless_controller() + + # Note: unitree_sdk2_python uses a different wireless controller interface + # This would need to be implemented based on the specific requirements + if self.joystick is not None: + logger.debug("Wireless controller state updated") + + def compute_torques(self): + """Compute torques using Unitree's unified command structure. + + Reads commands from the DDS subscriber and computes PD control torques. + """ + + try: + # Extract motor commands from low_cmd (set by external controller) + # The external controller writes to rt/lowcmd, we read and apply + tau_ff = np.zeros(self.num_motor) + kp = np.zeros(self.num_motor) + kd = np.zeros(self.num_motor) + q_target = np.zeros(self.num_motor) + dq_target = np.zeros(self.num_motor) + + for i in range(self.num_motor): + tau_ff[i] = self.low_cmd.motor_cmd[i].tau + kp[i] = self.low_cmd.motor_cmd[i].kp + kd[i] = self.low_cmd.motor_cmd[i].kd + q_target[i] = self.low_cmd.motor_cmd[i].q + dq_target[i] = self.low_cmd.motor_cmd[i].dq + + return self._compute_pd_torques( + tau_ff=tau_ff, + kp=kp, + kd=kd, + q_target=q_target, + dq_target=dq_target, + ) + except Exception as e: + logger.error(f"Error computing torques: {e}") + raise + + def send_command(self, tau_ff, kp, kd, q_target, dq_target): + """Send motor commands to the robot via DDS. + + This method allows the simulator to send commands to a real robot. + + Parameters + ---------- + tau_ff : array-like + Feedforward torques + kp : array-like + Proportional gains + kd : array-like + Derivative gains + q_target : array-like + Target positions + dq_target : array-like + Target velocities + """ + # Populate motor commands + for i in range(self.num_motor): + self.low_cmd.motor_cmd[i].mode = 0x01 # Enable + self.low_cmd.motor_cmd[i].tau = float(tau_ff[i]) + self.low_cmd.motor_cmd[i].kp = float(kp[i]) + self.low_cmd.motor_cmd[i].kd = float(kd[i]) + self.low_cmd.motor_cmd[i].q = float(q_target[i]) + self.low_cmd.motor_cmd[i].dq = float(dq_target[i]) + + # Calculate CRC for message integrity + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + + # Publish command + self.lowcmd_publisher.Write(self.low_cmd) diff --git a/src/holosoma_inference/holosoma_inference/sdk/interface_wrapper.py b/src/holosoma_inference/holosoma_inference/sdk/interface_wrapper.py index fa3b4c3a5..05a42cab2 100644 --- a/src/holosoma_inference/holosoma_inference/sdk/interface_wrapper.py +++ b/src/holosoma_inference/holosoma_inference/sdk/interface_wrapper.py @@ -50,7 +50,16 @@ def _init_sdk_components(self): try: import unitree_interface except ImportError as e: - raise ImportError("unitree_interface python binding not found.") from e + # raise ImportError("unitree_interface python binding not found.") from e + self.logger.warning("unitree_interface python binding not found; falling back to unitree_sdk2py") + from unitree_sdk2py.core.channel import ChannelFactory + + if not ChannelFactory().Init(self.domain_id, self.interface_str): + raise RuntimeError("unitree_sdk2py ChannelFatory.Init failed") + self.backend = "sdk2py" + self.command_sender = create_command_sender(self.robot_config) + self.state_processor = create_state_processor(self.robot_config) + return # Use C++/pybind11 binding (unitree only) self.backend = "binding" # Parse robot type diff --git a/src/holosoma_inference/holosoma_inference/sdk/unitree/unitree_sdk2py_interface.py b/src/holosoma_inference/holosoma_inference/sdk/unitree/unitree_sdk2py_interface.py new file mode 100644 index 000000000..316c09b2b --- /dev/null +++ b/src/holosoma_inference/holosoma_inference/sdk/unitree/unitree_sdk2py_interface.py @@ -0,0 +1,203 @@ +"""Unitree robot interface using unitree_sdk2py (official Python SDK).""" + +import numpy as np +from loguru import logger + +from holosoma_inference.config.config_types import RobotConfig +from holosoma_inference.sdk.base.base_interface import BaseInterface + + +class UnitreeSdk2pyInterface(BaseInterface): + """Interface for Unitree robots using unitree_sdk2py (pure Python SDK).""" + + # Robot type to message type mapping + ROBOT_MESSAGE_TYPE = { + "g1": "hg", + "g1_29dof": "hg", + "h1": "go", + "h1_2": "hg", + "h1-2": "hg", + "go2": "go", + "go2_12dof": "go", + } + + def __init__(self, robot_config: RobotConfig, domain_id=0, interface_str=None, use_joystick=True): + super().__init__(robot_config, domain_id, interface_str, use_joystick) + self._unitree_motor_order = None + self._kp_level = 1.0 + self._kd_level = 1.0 + self._low_state = None + self._init_sdk2py() + + def _init_sdk2py(self): + """Initialize unitree_sdk2py components.""" + try: + from unitree_sdk2py.core.channel import ( + ChannelPublisher, + ChannelSubscriber, + ChannelFactoryInitialize, + ) + from unitree_sdk2py.utils.crc import CRC + except ImportError as e: + raise ImportError("unitree_sdk2py not found. Install with: pip install unitree_sdk2py") from e + + # Initialize DDS channel factory + ChannelFactoryInitialize(self.domain_id, self.interface_str) + + # Determine message type based on robot + robot_key = self.robot_config.robot.lower().replace("-", "_") + msg_type = self.ROBOT_MESSAGE_TYPE.get(robot_key) + if msg_type is None: + raise ValueError( + f"Unknown robot type: {self.robot_config.robot}. Supported: {list(self.ROBOT_MESSAGE_TYPE.keys())}" + ) + + # Import appropriate message types based on robot + if msg_type == "hg": + from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ + from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_ + + self._low_cmd = unitree_hg_msg_dds__LowCmd_() + self._LowCmd = LowCmd_ + self._LowState = LowState_ + else: + from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ + from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_, LowState_ + + self._low_cmd = unitree_go_msg_dds__LowCmd_() + self._LowCmd = LowCmd_ + self._LowState = LowState_ + + # Initialize Go2-specific command header + self._low_cmd.head[0] = 0xFE + self._low_cmd.head[1] = 0xEF + self._low_cmd.level_flag = 0xFF + self._low_cmd.gpio = 0 + + self._msg_type = msg_type + self._crc = CRC() + + # Create publisher for low-level commands + self._lowcmd_publisher = ChannelPublisher("rt/lowcmd", self._LowCmd) + self._lowcmd_publisher.Init() + + # Create subscriber for low-level state + self._lowstate_subscriber = ChannelSubscriber("rt/lowstate", self._LowState) + self._lowstate_subscriber.Init(self._low_state_callback, 10) + + # Initialize motor commands + for i in range(self.robot_config.num_motors): + self._low_cmd.motor_cmd[i].mode = 0x01 # Enable motor (PMSM mode) + self._low_cmd.motor_cmd[i].q = 0.0 + self._low_cmd.motor_cmd[i].dq = 0.0 + self._low_cmd.motor_cmd[i].kp = 0.0 + self._low_cmd.motor_cmd[i].kd = 0.0 + self._low_cmd.motor_cmd[i].tau = 0.0 + + # GO2 SDK motor order differs from joint order + if self.robot_config.robot.lower() == "go2": + self._unitree_motor_order = (3, 4, 5, 0, 1, 2, 9, 10, 11, 6, 7, 8) + + logger.info( + f"Initialized unitree_sdk2py interface for {self.robot_config.robot} on interface {self.interface_str}" + ) + + def _low_state_callback(self, msg): + """Callback for receiving low-level state from robot.""" + self._low_state = msg + + def get_low_state(self) -> np.ndarray: + """Get robot state as numpy array.""" + if self._low_state is None: + # Return zeros if no state received yet + state_size = 3 + 4 + self.robot_config.num_joints * 2 + 3 + 3 + return np.zeros((1, state_size)) + + state = self._low_state + base_pos = np.zeros(3) + quat = np.array(state.imu_state.quaternion) + base_lin_vel = np.zeros(3) + base_ang_vel = np.array(state.imu_state.gyroscope) + + # Extract motor positions and velocities + motor_pos = np.array([state.motor_state[i].q for i in range(self.robot_config.num_motors)]) + motor_vel = np.array([state.motor_state[i].dq for i in range(self.robot_config.num_motors)]) + + joint_pos = np.zeros(self.robot_config.num_joints) + joint_vel = np.zeros(self.robot_config.num_joints) + motor_order = self._unitree_motor_order or self.robot_config.joint2motor + + for j_id in range(self.robot_config.num_joints): + m_id = motor_order[j_id] + joint_pos[j_id] = float(motor_pos[m_id]) + joint_vel[j_id] = float(motor_vel[m_id]) + + return np.concatenate([base_pos, quat, joint_pos, base_lin_vel, base_ang_vel, joint_vel]).reshape(1, -1) + + def send_low_command( + self, + cmd_q: np.ndarray, + cmd_dq: np.ndarray, + cmd_tau: np.ndarray, + dof_pos_latest: np.ndarray = None, + kp_override: np.ndarray = None, + kd_override: np.ndarray = None, + ): + """Send low-level command to robot.""" + motor_order = self._unitree_motor_order or self.robot_config.joint2motor + + for j_id in range(self.robot_config.num_joints): + m_id = motor_order[j_id] + self._low_cmd.motor_cmd[m_id].q = float(cmd_q[j_id]) + self._low_cmd.motor_cmd[m_id].dq = float(cmd_dq[j_id]) + self._low_cmd.motor_cmd[m_id].tau = float(cmd_tau[j_id]) + + if kp_override is not None: + self._low_cmd.motor_cmd[m_id].kp = float(kp_override[j_id]) * self._kp_level + else: + self._low_cmd.motor_cmd[m_id].kp = float(self.robot_config.motor_kp[j_id]) * self._kp_level + + if kd_override is not None: + self._low_cmd.motor_cmd[m_id].kd = float(kd_override[j_id]) * self._kd_level + else: + self._low_cmd.motor_cmd[m_id].kd = float(self.robot_config.motor_kd[j_id]) * self._kd_level + + # Calculate CRC for message integrity + self._low_cmd.crc = self._crc.Crc(self._low_cmd) + + # Publish command + self._lowcmd_publisher.Write(self._low_cmd) + + def get_joystick_msg(self): + """Get wireless controller message.""" + # unitree_sdk2py doesn't have direct wireless controller access + # You may need to subscribe to a wireless controller topic if needed + return None + + def get_joystick_key(self, wc_msg=None): + """Get current key from joystick message.""" + if wc_msg is None: + wc_msg = self.get_joystick_msg() + if wc_msg is None: + return None + return self._wc_key_map.get(getattr(wc_msg, "keys", 0), None) + + @property + def kp_level(self): + """Get proportional gain level.""" + return self._kp_level + + @kp_level.setter + def kp_level(self, value): + """Set proportional gain level.""" + self._kp_level = value + + @property + def kd_level(self): + """Get derivative gain level.""" + return self._kd_level + + @kd_level.setter + def kd_level(self, value): + """Set derivative gain level.""" + self._kd_level = value diff --git a/src/holosoma_inference/setup.py b/src/holosoma_inference/setup.py index b1d7637bb..e049cf142 100644 --- a/src/holosoma_inference/setup.py +++ b/src/holosoma_inference/setup.py @@ -61,6 +61,7 @@ entry_points={ "holosoma.sdk": [ "unitree = holosoma_inference.sdk.unitree.unitree_interface:UnitreeInterface", + "unitree_sdk2py = holosoma_inference.sdk.unitree.unitree_sdk2py_interface:UnitreeSdk2pyInterface", "booster = holosoma_inference.sdk.booster.booster_interface:BoosterInterface", ], },