diff --git a/src/holosoma_inference/holosoma_inference/config/config_types/task.py b/src/holosoma_inference/holosoma_inference/config/config_types/task.py index 0b34f312e..c2874664f 100644 --- a/src/holosoma_inference/holosoma_inference/config/config_types/task.py +++ b/src/holosoma_inference/holosoma_inference/config/config_types/task.py @@ -67,11 +67,22 @@ class TaskConfig: """ use_joystick: bool = False - """Shortcut: set both velocity_input and state_input to "joystick". + """Shortcut: set both velocity_input and state_input to "interface". + + Reads from the SDK's wireless controller (the dongle/controller shipped + with Unitree G1, Booster T1, etc.). For host-side USB gamepads + (Xbox/Logitech via /dev/input/event*), use ``use_usb_joystick`` instead. Cannot be combined with explicit input settings. """ + use_usb_joystick: bool = False + """Shortcut: set both velocity_input and state_input to "joystick". + + Reads a USB gamepad on the host via evdev (``/dev/input/event*``). + Linux-only. Cannot be combined with explicit input settings. + """ + joystick_type: str = "xbox" """Joystick type.""" @@ -113,16 +124,29 @@ class TaskConfig: """Debug overrides for quick testing.""" def __post_init__(self): - """Resolve use_keyboard/use_joystick shortcuts into velocity_input/state_input.""" - if self.use_keyboard and self.use_joystick: + """Resolve use_keyboard/use_joystick/use_usb_joystick shortcuts into velocity_input/state_input.""" + active_shortcuts = [ + name + for name, enabled in ( + ("keyboard", self.use_keyboard), + ("joystick", self.use_joystick), + ("usb-joystick", self.use_usb_joystick), + ) + if enabled + ] + if len(active_shortcuts) > 1: + joined = ", ".join(f"--task.use-{n}" for n in active_shortcuts) raise ValueError( - "Cannot combine --task.use-keyboard with --task.use-joystick. " + f"Cannot combine multiple input shortcuts ({joined}). " "Use one shortcut or set --task.velocity-input and --task.state-input individually." ) shortcut: InputSource | None = None flag_name: str | None = None - if self.use_joystick: + if self.use_usb_joystick: + shortcut = "joystick" + flag_name = "usb-joystick" + elif self.use_joystick: shortcut = "interface" flag_name = "joystick" elif self.use_keyboard: diff --git a/src/holosoma_inference/holosoma_inference/config/tests/test_task_config.py b/src/holosoma_inference/holosoma_inference/config/tests/test_task_config.py index 9e71922c3..897e2811d 100644 --- a/src/holosoma_inference/holosoma_inference/config/tests/test_task_config.py +++ b/src/holosoma_inference/holosoma_inference/config/tests/test_task_config.py @@ -71,6 +71,21 @@ def test_conflicts_with_both_inputs(self): TaskConfig(model_path="test.onnx", use_joystick=True, velocity_input="ros2", state_input="ros2") +class TestUseUsbJoystickShortcut: + def test_sets_both_channels(self): + tc = TaskConfig(model_path="test.onnx", use_usb_joystick=True) + assert tc.velocity_input == "joystick" + assert tc.state_input == "joystick" + + def test_conflicts_with_velocity_input(self): + with pytest.raises(Exception, match="Cannot combine"): + TaskConfig(model_path="test.onnx", use_usb_joystick=True, velocity_input="ros2") + + def test_conflicts_with_use_joystick(self): + with pytest.raises(Exception, match="Cannot combine multiple input shortcuts"): + TaskConfig(model_path="test.onnx", use_joystick=True, use_usb_joystick=True) + + class TestUseKeyboardShortcut: def test_sets_both_channels(self): tc = TaskConfig(model_path="test.onnx", use_keyboard=True) @@ -88,7 +103,7 @@ def test_conflicts_with_state_input(self): class TestShortcutMutualExclusion: def test_both_shortcuts_rejected(self): - with pytest.raises(Exception, match="Cannot combine.*use-keyboard.*use-joystick"): + with pytest.raises(Exception, match="Cannot combine multiple input shortcuts"): TaskConfig(model_path="test.onnx", use_keyboard=True, use_joystick=True) diff --git a/src/holosoma_inference/holosoma_inference/inputs/__init__.py b/src/holosoma_inference/holosoma_inference/inputs/__init__.py index c5318f9a4..2b3d45815 100644 --- a/src/holosoma_inference/holosoma_inference/inputs/__init__.py +++ b/src/holosoma_inference/holosoma_inference/inputs/__init__.py @@ -17,9 +17,14 @@ def create_input(policy: BasePolicy, source: InputSource, role: str) -> VelCmdPr if not policy.use_joystick and source in ("interface", "joystick"): source = "keyboard" - if source in ("interface", "joystick"): + if source == "interface": return InterfaceInput(policy.interface) + if source == "joystick": + from holosoma_inference.inputs.impl.usb_joystick import UsbJoystickInput + + return UsbJoystickInput(device_index=policy.config.task.joystick_device) + if source == "keyboard": vel_keys = KEYBOARD_VELOCITY_LOCOMOTION if role == "velocity" else None return KeyboardInput.create(velocity_keys=vel_keys) diff --git a/src/holosoma_inference/holosoma_inference/inputs/impl/usb_joystick.py b/src/holosoma_inference/holosoma_inference/inputs/impl/usb_joystick.py new file mode 100644 index 000000000..b6543462b --- /dev/null +++ b/src/holosoma_inference/holosoma_inference/inputs/impl/usb_joystick.py @@ -0,0 +1,273 @@ +"""USB joystick input provider (evdev-based). + +Reads a USB gamepad (Xbox / Logitech / similar) directly from +``/dev/input/event*`` via ``python-evdev``. Bypasses the SDK +:class:`InterfaceInput` path so that SDKs without a built-in wireless +controller can still drive policies from a host-side controller. + +Implements both :class:`VelCmdProvider` and :class:`StateCommandProvider` +in a single class — the policy factory assigns the same instance to both +slots when ``velocity_input == state_input == "joystick"``. +""" + +from __future__ import annotations + +import threading + +import evdev +from loguru import logger + +from holosoma_inference.inputs.api.base import InputProvider +from holosoma_inference.inputs.api.commands import StateCommand, VelCmd +from holosoma_inference.inputs.impl.joystick import JOYSTICK_COMMANDS + +STICK_DEADZONE = 0.1 +TRIGGER_THRESHOLD = 128 # 0-255 typical for analog triggers; >threshold counts as pressed. + +# Match interface_wrapper.py's _default_wc_key_map bit layout so policies that +# inspect raw key codes still see consistent values. +_BIT_R1 = 1 +_BIT_L1 = 2 +_BIT_START = 4 +_BIT_SELECT = 8 +_BIT_R2 = 16 +_BIT_L2 = 32 +_BIT_A = 256 +_BIT_B = 512 +_BIT_X = 1024 +_BIT_Y = 2048 +_BIT_UP = 4096 +_BIT_RIGHT = 8192 +_BIT_DOWN = 16384 +_BIT_LEFT = 32768 + +_BUTTON_BIT = { + evdev.ecodes.BTN_A: _BIT_A, + evdev.ecodes.BTN_B: _BIT_B, + evdev.ecodes.BTN_X: _BIT_X, + evdev.ecodes.BTN_Y: _BIT_Y, + evdev.ecodes.BTN_TL: _BIT_L1, + evdev.ecodes.BTN_TR: _BIT_R1, + evdev.ecodes.BTN_TL2: _BIT_L2, + evdev.ecodes.BTN_TR2: _BIT_R2, + evdev.ecodes.BTN_START: _BIT_START, + evdev.ecodes.BTN_SELECT: _BIT_SELECT, +} + +# Subset of the Unitree wireless-controller bitmask map sufficient for the +# combinations referenced in JOYSTICK_COMMANDS. Kept inline so this module +# does not depend on the SDK. +_KEY_LABEL = { + _BIT_R1: "R1", + _BIT_L1: "L1", + _BIT_L1 | _BIT_R1: "L1+R1", + _BIT_START: "start", + _BIT_SELECT: "select", + _BIT_R2: "R2", + _BIT_L2: "L2", + _BIT_A: "A", + _BIT_SELECT | _BIT_A: "select+A", + _BIT_B: "B", + _BIT_SELECT | _BIT_B: "select+B", + _BIT_X: "X", + _BIT_SELECT | _BIT_X: "select+X", + _BIT_Y: "Y", + _BIT_SELECT | _BIT_Y: "select+Y", + _BIT_UP: "up", + _BIT_DOWN: "down", + _BIT_LEFT: "left", + _BIT_RIGHT: "right", +} + + +def _list_gamepads() -> list[evdev.InputDevice]: + """Return evdev devices that look like gamepads (have sticks + buttons).""" + candidates: list[evdev.InputDevice] = [] + for path in evdev.list_devices(): + try: + dev = evdev.InputDevice(path) + except (PermissionError, OSError): + continue + caps = dev.capabilities() + if evdev.ecodes.EV_ABS not in caps or evdev.ecodes.EV_KEY not in caps: + dev.close() + continue + abs_codes = {code for code, _ in caps[evdev.ecodes.EV_ABS]} + if {evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y, evdev.ecodes.ABS_RX} <= abs_codes: + candidates.append(dev) + else: + dev.close() + return candidates + + +class UsbJoystickInput(InputProvider): + """Reads stick + button state from a USB gamepad via evdev. + + A daemon thread continuously consumes events and updates internal state. + :meth:`poll_velocity` returns the latest sticks; :meth:`poll_commands` + edge-detects button transitions and emits :class:`StateCommand` values + using :data:`JOYSTICK_COMMANDS`. + """ + + def __init__(self, device_index: int = 0): + if device_index < 0: + raise ValueError(f"joystick_device must be >= 0, got {device_index}") + + self._mapping = dict(JOYSTICK_COMMANDS) + + gamepads = _list_gamepads() + if not gamepads: + raise RuntimeError( + "No USB joystick found via evdev. Is the controller plugged in " + "and is /dev/input mounted into this container?" + ) + if device_index >= len(gamepads): + for d in gamepads: + d.close() + raise RuntimeError(f"joystick_device={device_index} but only {len(gamepads)} gamepad(s) detected") + + self._device = gamepads[device_index] + for d in gamepads: + if d is not self._device: + d.close() + + abs_caps = dict(self._device.capabilities()[evdev.ecodes.EV_ABS]) + self._abs_info = { + evdev.ecodes.ABS_X: abs_caps[evdev.ecodes.ABS_X], + evdev.ecodes.ABS_Y: abs_caps[evdev.ecodes.ABS_Y], + evdev.ecodes.ABS_RX: abs_caps[evdev.ecodes.ABS_RX], + } + + self._lock = threading.Lock() + self._lx = 0.0 # left stick X, normalized [-1, 1], left=+ after sign flip + self._ly = 0.0 # left stick Y, normalized [-1, 1], up=+ after sign flip + self._rx = 0.0 # right stick X, normalized [-1, 1], left=+ after sign flip + self._key_bits = 0 # OR'd _BIT_* of currently-held buttons + self._dpad_x = 0 # -1 / 0 / 1 from ABS_HAT0X + self._dpad_y = 0 # -1 / 0 / 1 from ABS_HAT0Y + + self._last_label: str = "" + + self._stop = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True, name="usb_joystick") + self._thread.start() + + # -- Lifecycle -------------------------------------------------------- + + def start(self) -> None: + pass # Thread already running from __init__. + + def close(self) -> None: + self._stop.set() + try: + self._device.close() + except OSError as e: + logger.debug(f"evdev close raised {type(e).__name__}: {e}") + + # -- VelCmdProvider protocol ----------------------------------------- + + def poll_velocity(self) -> VelCmd | None: + with self._lock: + keys = self._effective_key_bits_locked() + lx, ly, rx = self._lx, self._ly, self._rx + + # Match InterfaceInput: suppress sticks while any button is held. + if keys != 0: + return None + + lin_x = ly if abs(ly) > STICK_DEADZONE else 0.0 + lin_y = -lx if abs(lx) > STICK_DEADZONE else 0.0 + ang_z = -rx if abs(rx) > STICK_DEADZONE else 0.0 + return VelCmd((lin_x, lin_y), ang_z) + + def zero(self) -> None: + pass + + # -- StateCommandProvider protocol ----------------------------------- + + def poll_commands(self) -> list[StateCommand]: + with self._lock: + keys = self._effective_key_bits_locked() + label = _KEY_LABEL.get(keys, "") + + commands: list[StateCommand] = [] + if label and label != self._last_label: + cmd = self._mapping.get(label) + if cmd is not None: + commands.append(cmd) + self._last_label = label + return commands + + # -- Read loop ------------------------------------------------------- + + def _run(self) -> None: + try: + for event in self._device.read_loop(): + if self._stop.is_set(): + return + if event.type == evdev.ecodes.EV_ABS: + self._handle_abs(event.code, event.value) + elif event.type == evdev.ecodes.EV_KEY: + self._handle_key(event.code, event.value) + except OSError: + return # Device unplugged. + + def _handle_abs(self, code: int, value: int) -> None: + if code in self._abs_info: + info = self._abs_info[code] + span = info.max - info.min + if span <= 0: + return + normalized = (value - info.min) / span * 2.0 - 1.0 # → [-1, 1] + with self._lock: + if code == evdev.ecodes.ABS_X: + self._lx = normalized # stick-left → -1 (matches SDK wireless-controller convention) + elif code == evdev.ecodes.ABS_Y: + self._ly = -normalized # stick-up (forward) → +1 + elif code == evdev.ecodes.ABS_RX: + self._rx = normalized + elif code == evdev.ecodes.ABS_HAT0X: + with self._lock: + self._dpad_x = int(value) + elif code == evdev.ecodes.ABS_HAT0Y: + with self._lock: + self._dpad_y = int(value) + elif code == evdev.ecodes.ABS_Z: # analog L2 + self._set_bit(_BIT_L2, value > TRIGGER_THRESHOLD) + elif code == evdev.ecodes.ABS_RZ: # analog R2 + self._set_bit(_BIT_R2, value > TRIGGER_THRESHOLD) + + def _handle_key(self, code: int, value: int) -> None: + bit = _BUTTON_BIT.get(code) + if bit is None: + # Some controllers expose dpad as buttons rather than HAT axes. + if code == evdev.ecodes.BTN_DPAD_UP: + self._set_bit(_BIT_UP, value != 0) + elif code == evdev.ecodes.BTN_DPAD_DOWN: + self._set_bit(_BIT_DOWN, value != 0) + elif code == evdev.ecodes.BTN_DPAD_LEFT: + self._set_bit(_BIT_LEFT, value != 0) + elif code == evdev.ecodes.BTN_DPAD_RIGHT: + self._set_bit(_BIT_RIGHT, value != 0) + return + self._set_bit(bit, value != 0) + + def _set_bit(self, bit: int, pressed: bool) -> None: + with self._lock: + if pressed: + self._key_bits |= bit + else: + self._key_bits &= ~bit + + def _effective_key_bits_locked(self) -> int: + """Combine button bits with HAT-axis-derived dpad bits.""" + bits = self._key_bits + if self._dpad_x < 0: + bits |= _BIT_LEFT + elif self._dpad_x > 0: + bits |= _BIT_RIGHT + if self._dpad_y < 0: + bits |= _BIT_UP + elif self._dpad_y > 0: + bits |= _BIT_DOWN + return bits diff --git a/src/holosoma_inference/holosoma_inference/inputs/tests/test_providers.py b/src/holosoma_inference/holosoma_inference/inputs/tests/test_providers.py index 7f3eef578..4ae7b78eb 100644 --- a/src/holosoma_inference/holosoma_inference/inputs/tests/test_providers.py +++ b/src/holosoma_inference/holosoma_inference/inputs/tests/test_providers.py @@ -758,12 +758,17 @@ def test_interface_returns_interface_input(self): assert isinstance(result, InterfaceInput) assert result._mapping == JOYSTICK_COMMANDS - def test_joystick_maps_to_interface(self): - from holosoma_inference.inputs.impl.interface import InterfaceInput + def test_joystick_maps_to_usb_joystick(self, monkeypatch): + pytest.importorskip("evdev") + from holosoma_inference.inputs.impl import usb_joystick + + sentinel = MagicMock() + monkeypatch.setattr(usb_joystick, "UsbJoystickInput", lambda **_: sentinel) p = self._make_policy_for_factory() + p.config = SimpleNamespace(task=SimpleNamespace(joystick_device=0)) result = create_input(p, "joystick", "velocity") - assert isinstance(result, InterfaceInput) + assert result is sentinel def test_ros2_returns_ros2_input(self): from holosoma_inference.inputs.impl.ros2 import Ros2Input @@ -1141,3 +1146,10 @@ def test_use_joystick_maps_to_interface(self): tc = TaskConfig(model_path="test.onnx", use_joystick=True) assert tc.velocity_input == "interface" assert tc.state_input == "interface" + + def test_use_usb_joystick_maps_to_joystick(self): + from holosoma_inference.config.config_types.task import TaskConfig + + tc = TaskConfig(model_path="test.onnx", use_usb_joystick=True) + assert tc.velocity_input == "joystick" + assert tc.state_input == "joystick" diff --git a/src/holosoma_inference/holosoma_inference/policies/base.py b/src/holosoma_inference/holosoma_inference/policies/base.py index 2811fa4fb..88537c071 100644 --- a/src/holosoma_inference/holosoma_inference/policies/base.py +++ b/src/holosoma_inference/holosoma_inference/policies/base.py @@ -149,10 +149,12 @@ def _init_communication_components(self): if hasattr(self, "_shared_hardware_source"): self.interface = self._shared_hardware_source.interface return - # Derive use_joystick for SDK: True if interface/joystick is used for either channel + # The SDK's own wireless-controller path is only needed when an + # "interface" channel is selected. The "joystick" channel is read + # via host-side evdev (UsbJoystickInput) and does not touch the SDK. vel = self.config.task.velocity_input other = self.config.task.state_input - need_joystick = bool({"interface", "joystick"} & {vel, other}) + need_joystick = "interface" in {vel, other} self.interface = create_interface( self.robot_config, self.config.task.domain_id,