Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,22 @@ class TaskConfig:
"""

use_joystick: bool = False
"""Shortcut: set both velocity_input and state_input to "joystick".
"""Shortcut: set both velocity_input and state_input to "interface".

Reads from the SDK's wireless controller (the dongle/controller shipped
with Unitree G1, Booster T1, etc.). For host-side USB gamepads
(Xbox/Logitech via /dev/input/event*), use ``use_usb_joystick`` instead.

Cannot be combined with explicit input settings.
"""

use_usb_joystick: bool = False
"""Shortcut: set both velocity_input and state_input to "joystick".

Reads a USB gamepad on the host via evdev (``/dev/input/event*``).
Linux-only. Cannot be combined with explicit input settings.
"""

joystick_type: str = "xbox"
"""Joystick type."""

Expand Down Expand Up @@ -113,16 +124,29 @@ class TaskConfig:
"""Debug overrides for quick testing."""

def __post_init__(self):
"""Resolve use_keyboard/use_joystick shortcuts into velocity_input/state_input."""
if self.use_keyboard and self.use_joystick:
"""Resolve use_keyboard/use_joystick/use_usb_joystick shortcuts into velocity_input/state_input."""
active_shortcuts = [
name
for name, enabled in (
("keyboard", self.use_keyboard),
("joystick", self.use_joystick),
("usb-joystick", self.use_usb_joystick),
)
if enabled
]
if len(active_shortcuts) > 1:
joined = ", ".join(f"--task.use-{n}" for n in active_shortcuts)
raise ValueError(
"Cannot combine --task.use-keyboard with --task.use-joystick. "
f"Cannot combine multiple input shortcuts ({joined}). "
"Use one shortcut or set --task.velocity-input and --task.state-input individually."
)

shortcut: InputSource | None = None
flag_name: str | None = None
if self.use_joystick:
if self.use_usb_joystick:
shortcut = "joystick"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change — see the top-level review comment. This single character change ("interface""joystick") silently flips the meaning of --task.use-joystick for every existing G1/T1 user from "SDK wireless controller" to "USB gamepad on the host." Please confirm intent and either:

  • add a fallback so "joystick" reverts to "interface" when no USB gamepad is present, or
  • preserve --task.use-joystick → "interface" and add a separate --task.use-usb-joystick shortcut for the new behavior.

Whichever direction we pick, please update the use_joystick docstring just above to spell it out.

flag_name = "usb-joystick"
elif self.use_joystick:
shortcut = "interface"
flag_name = "joystick"
elif self.use_keyboard:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ def test_conflicts_with_both_inputs(self):
TaskConfig(model_path="test.onnx", use_joystick=True, velocity_input="ros2", state_input="ros2")


class TestUseUsbJoystickShortcut:
def test_sets_both_channels(self):
tc = TaskConfig(model_path="test.onnx", use_usb_joystick=True)
assert tc.velocity_input == "joystick"
assert tc.state_input == "joystick"

def test_conflicts_with_velocity_input(self):
with pytest.raises(Exception, match="Cannot combine"):
TaskConfig(model_path="test.onnx", use_usb_joystick=True, velocity_input="ros2")

def test_conflicts_with_use_joystick(self):
with pytest.raises(Exception, match="Cannot combine multiple input shortcuts"):
TaskConfig(model_path="test.onnx", use_joystick=True, use_usb_joystick=True)


class TestUseKeyboardShortcut:
def test_sets_both_channels(self):
tc = TaskConfig(model_path="test.onnx", use_keyboard=True)
Expand All @@ -88,7 +103,7 @@ def test_conflicts_with_state_input(self):

class TestShortcutMutualExclusion:
def test_both_shortcuts_rejected(self):
with pytest.raises(Exception, match="Cannot combine.*use-keyboard.*use-joystick"):
with pytest.raises(Exception, match="Cannot combine multiple input shortcuts"):
TaskConfig(model_path="test.onnx", use_keyboard=True, use_joystick=True)


Expand Down
7 changes: 6 additions & 1 deletion src/holosoma_inference/holosoma_inference/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ def create_input(policy: BasePolicy, source: InputSource, role: str) -> VelCmdPr
if not policy.use_joystick and source in ("interface", "joystick"):
source = "keyboard"

if source in ("interface", "joystick"):
if source == "interface":
return InterfaceInput(policy.interface)

if source == "joystick":
from holosoma_inference.inputs.impl.usb_joystick import UsbJoystickInput

return UsbJoystickInput(device_index=policy.config.task.joystick_device)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy-importing UsbJoystickInput here is the right call — it keeps macOS users (where evdev doesn't build) able to import this module as long as they don't request "joystick". ✓

One thing to watch: the test in inputs/tests/test_providers.py does from holosoma_inference.inputs.impl import usb_joystick at the top of the test method, which will fail to collect on macOS regardless. Suggest pytest.importorskip("evdev") there.


if source == "keyboard":
vel_keys = KEYBOARD_VELOCITY_LOCOMOTION if role == "velocity" else None
return KeyboardInput.create(velocity_keys=vel_keys)
Expand Down
273 changes: 273 additions & 0 deletions src/holosoma_inference/holosoma_inference/inputs/impl/usb_joystick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
"""USB joystick input provider (evdev-based).

Reads a USB gamepad (Xbox / Logitech / similar) directly from
``/dev/input/event*`` via ``python-evdev``. Bypasses the SDK
:class:`InterfaceInput` path so that SDKs without a built-in wireless
controller can still drive policies from a host-side controller.

Implements both :class:`VelCmdProvider` and :class:`StateCommandProvider`
in a single class — the policy factory assigns the same instance to both
slots when ``velocity_input == state_input == "joystick"``.
"""

from __future__ import annotations

import threading

import evdev
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

macOS test-collection failure: importing this module on macOS crashes at this line because evdev only builds on Linux (it links against <linux/input.h>). The lazy import in inputs/__init__.py keeps production code safe, but tests/test_providers.py::test_joystick_maps_to_usb_joystick does from holosoma_inference.inputs.impl import usb_joystick unconditionally, which will fail collection on darwin.

Two options:

  1. Easier: at the top of the test, pytest.importorskip("evdev").
  2. More invasive: defer import evdev to inside the function bodies that use it, so this module can be imported on darwin (just not instantiated).

Option 1 is sufficient.

from loguru import logger

from holosoma_inference.inputs.api.base import InputProvider
from holosoma_inference.inputs.api.commands import StateCommand, VelCmd
from holosoma_inference.inputs.impl.joystick import JOYSTICK_COMMANDS

STICK_DEADZONE = 0.1
TRIGGER_THRESHOLD = 128 # 0-255 typical for analog triggers; >threshold counts as pressed.

# Match interface_wrapper.py's _default_wc_key_map bit layout so policies that
# inspect raw key codes still see consistent values.
_BIT_R1 = 1
_BIT_L1 = 2
_BIT_START = 4
_BIT_SELECT = 8
_BIT_R2 = 16
_BIT_L2 = 32
_BIT_A = 256
_BIT_B = 512
_BIT_X = 1024
_BIT_Y = 2048
_BIT_UP = 4096
_BIT_RIGHT = 8192
_BIT_DOWN = 16384
_BIT_LEFT = 32768

_BUTTON_BIT = {
evdev.ecodes.BTN_A: _BIT_A,
evdev.ecodes.BTN_B: _BIT_B,
evdev.ecodes.BTN_X: _BIT_X,
evdev.ecodes.BTN_Y: _BIT_Y,
evdev.ecodes.BTN_TL: _BIT_L1,
evdev.ecodes.BTN_TR: _BIT_R1,
evdev.ecodes.BTN_TL2: _BIT_L2,
evdev.ecodes.BTN_TR2: _BIT_R2,
evdev.ecodes.BTN_START: _BIT_START,
evdev.ecodes.BTN_SELECT: _BIT_SELECT,
}

# Subset of the Unitree wireless-controller bitmask map sufficient for the
# combinations referenced in JOYSTICK_COMMANDS. Kept inline so this module
# does not depend on the SDK.
_KEY_LABEL = {
_BIT_R1: "R1",
_BIT_L1: "L1",
_BIT_L1 | _BIT_R1: "L1+R1",
_BIT_START: "start",
_BIT_SELECT: "select",
_BIT_R2: "R2",
_BIT_L2: "L2",
_BIT_A: "A",
_BIT_SELECT | _BIT_A: "select+A",
_BIT_B: "B",
_BIT_SELECT | _BIT_B: "select+B",
_BIT_X: "X",
_BIT_SELECT | _BIT_X: "select+X",
_BIT_Y: "Y",
_BIT_SELECT | _BIT_Y: "select+Y",
_BIT_UP: "up",
_BIT_DOWN: "down",
_BIT_LEFT: "left",
_BIT_RIGHT: "right",
}


def _list_gamepads() -> list[evdev.InputDevice]:
"""Return evdev devices that look like gamepads (have sticks + buttons)."""
candidates: list[evdev.InputDevice] = []
for path in evdev.list_devices():
try:
dev = evdev.InputDevice(path)
except (PermissionError, OSError):
continue
caps = dev.capabilities()
if evdev.ecodes.EV_ABS not in caps or evdev.ecodes.EV_KEY not in caps:
dev.close()
continue
abs_codes = {code for code, _ in caps[evdev.ecodes.EV_ABS]}
if {evdev.ecodes.ABS_X, evdev.ecodes.ABS_Y, evdev.ecodes.ABS_RX} <= abs_codes:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring ABS_RX excludes single-stick / flight-style controllers but is fine for Xbox/Logitech-class gamepads. Worth a docstring note that this is gamepad-only by design — otherwise the next debug session for "my controller wasn't detected" goes through this whole function.

candidates.append(dev)
else:
dev.close()
return candidates
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Device identity is fragile. evdev.list_devices() returns /dev/input/event* in alphabetical order, which is not stable across reboots/replugs. Today's joystick_device=0 could be tomorrow's =1.

For a single-controller setup it's fine; for multi-controller or robust-replug setups it's a footgun.

Suggestion (potentially future-CR): add an optional joystick_device_id: str = "" config field with this match order:

  1. dev.uniq exact match (when populated, this is the closest evdev equivalent to a MAC — Bluetooth gamepads almost always have it; some wired USB gamepads do too if the vendor bothered with serials).
  2. Substring match against the /dev/input/by-id/usb-VENDOR_PRODUCT_SERIAL-event-joystick symlink target.
  3. Substring match against dev.name (warn on >1 match).
  4. Fall back to joystick_device index when joystick_device_id is empty (today's behavior).

dev.uniq is the strongest primitive when populated — worth exploring even if we don't ship the full chain in this PR.



class UsbJoystickInput(InputProvider):
"""Reads stick + button state from a USB gamepad via evdev.

A daemon thread continuously consumes events and updates internal state.
:meth:`poll_velocity` returns the latest sticks; :meth:`poll_commands`
edge-detects button transitions and emits :class:`StateCommand` values
using :data:`JOYSTICK_COMMANDS`.
"""

def __init__(self, device_index: int = 0):
if device_index < 0:
raise ValueError(f"joystick_device must be >= 0, got {device_index}")

self._mapping = dict(JOYSTICK_COMMANDS)

gamepads = _list_gamepads()
if not gamepads:
raise RuntimeError(
"No USB joystick found via evdev. Is the controller plugged in "
"and is /dev/input mounted into this container?"
)
if device_index >= len(gamepads):
for d in gamepads:
d.close()
raise RuntimeError(f"joystick_device={device_index} but only {len(gamepads)} gamepad(s) detected")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative device_index (e.g. -1) passes this check and silently selects the last gamepad due to Python list semantics — probably not what callers expect. Suggest:

if device_index < 0 or device_index >= len(gamepads):
    ...


self._device = gamepads[device_index]
for d in gamepads:
if d is not self._device:
d.close()

abs_caps = dict(self._device.capabilities()[evdev.ecodes.EV_ABS])
self._abs_info = {
evdev.ecodes.ABS_X: abs_caps[evdev.ecodes.ABS_X],
evdev.ecodes.ABS_Y: abs_caps[evdev.ecodes.ABS_Y],
evdev.ecodes.ABS_RX: abs_caps[evdev.ecodes.ABS_RX],
}

self._lock = threading.Lock()
self._lx = 0.0 # left stick X, normalized [-1, 1], left=+ after sign flip
self._ly = 0.0 # left stick Y, normalized [-1, 1], up=+ after sign flip
self._rx = 0.0 # right stick X, normalized [-1, 1], left=+ after sign flip
self._key_bits = 0 # OR'd _BIT_* of currently-held buttons
self._dpad_x = 0 # -1 / 0 / 1 from ABS_HAT0X
self._dpad_y = 0 # -1 / 0 / 1 from ABS_HAT0Y

self._last_label: str = ""

self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True, name="usb_joystick")
self._thread.start()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread is started here in __init__, which makes start() (below) a no-op. That breaks the InputProvider lifecycle contract — BasePolicy._create_input_providers() calls self._velocity_input.start() after construction expecting that's when the input goes live.

Suggest moving thread creation into start() and having __init__ only enumerate + select the device. Matches KeyboardInput semantics and means construction is cheap/safe (e.g. for tests, fixtures, etc.).


# -- Lifecycle --------------------------------------------------------

def start(self) -> None:
pass # Thread already running from __init__.

def close(self) -> None:
self._stop.set()
try:
self._device.close()
except OSError as e:
logger.debug(f"evdev close raised {type(e).__name__}: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() isn't currently called by anyone — InputProvider protocol has start() but no close(), and policies don't tear down their input providers explicitly. The daemon thread will be killed at process exit, which is fine for a long-running policy.

Worth a comment that this is intentional, or — better — register an atexit handler so the device fd doesn't linger if multiple UsbJoystickInputs are constructed (tests, accidental double-init, etc.).

Related: _list_gamepads() opens every /dev/input/event* and closes all but the chosen one. On a host with bluetoothd / accessibility devices that's 20+ opens — a try/finally around the close calls would prevent fd leaks if a RuntimeError fires mid-scan.


# -- VelCmdProvider protocol -----------------------------------------

def poll_velocity(self) -> VelCmd | None:
with self._lock:
keys = self._effective_key_bits_locked()
lx, ly, rx = self._lx, self._ly, self._rx

# Match InterfaceInput: suppress sticks while any button is held.
if keys != 0:
return None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: stick suppression while a button is held — flagging for future discussion, not this PR.

This mirrors InterfaceInput.poll_velocity, which suppresses sticks because the SDK packs sticks + keys into a single wireless-controller frame and the SDK's own button-handling steals the frame. That rationale doesn't apply to evdev (sticks and buttons are independent event streams), so we're inheriting a UX choice here, not a hardware constraint.

Net effect: holding any button freezes the commanded velocity at whatever it last was. Benign (the robot keeps doing what it was doing), but not necessarily desirable for chord-while-moving flows. Worth re-examining as a follow-up — and if we change it, change InterfaceInput at the same time.


lin_x = ly if abs(ly) > STICK_DEADZONE else 0.0
lin_y = -lx if abs(lx) > STICK_DEADZONE else 0.0
ang_z = -rx if abs(rx) > STICK_DEADZONE else 0.0
return VelCmd((lin_x, lin_y), ang_z)

def zero(self) -> None:
pass

# -- StateCommandProvider protocol -----------------------------------

def poll_commands(self) -> list[StateCommand]:
with self._lock:
keys = self._effective_key_bits_locked()
label = _KEY_LABEL.get(keys, "")

commands: list[StateCommand] = []
if label and label != self._last_label:
cmd = self._mapping.get(label)
if cmd is not None:
commands.append(cmd)
self._last_label = label
return commands
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge detection on _last_label can swallow rapid same-chord presses if both the press and release land between two poll_commands() cycles (~policy rl_rate, usually 50Hz). InterfaceInput has the same model and same theoretical bug, so not a regression — just noting that command repeat-rate is bounded by policy rate, not physical button rate.


# -- Read loop -------------------------------------------------------

def _run(self) -> None:
try:
for event in self._device.read_loop():
if self._stop.is_set():
return
if event.type == evdev.ecodes.EV_ABS:
self._handle_abs(event.code, event.value)
elif event.type == evdev.ecodes.EV_KEY:
self._handle_key(event.code, event.value)
except OSError:
return # Device unplugged.

def _handle_abs(self, code: int, value: int) -> None:
if code in self._abs_info:
info = self._abs_info[code]
span = info.max - info.min
if span <= 0:
return
normalized = (value - info.min) / span * 2.0 - 1.0 # → [-1, 1]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operator precedence: (value - info.min) / span * 2.0 - 1.0 evaluates as ((value - info.min) / span) * 2.0 - 1.0 — correct, but adding outer parens reads more clearly:

normalized = ((value - info.min) / span) * 2.0 - 1.0

with self._lock:
if code == evdev.ecodes.ABS_X:
self._lx = normalized # stick-left → -1 (matches SDK wireless-controller convention)
elif code == evdev.ecodes.ABS_Y:
self._ly = -normalized # stick-up (forward) → +1
elif code == evdev.ecodes.ABS_RX:
self._rx = normalized
elif code == evdev.ecodes.ABS_HAT0X:
with self._lock:
self._dpad_x = int(value)
elif code == evdev.ecodes.ABS_HAT0Y:
with self._lock:
self._dpad_y = int(value)
elif code == evdev.ecodes.ABS_Z: # analog L2
self._set_bit(_BIT_L2, value > TRIGGER_THRESHOLD)
elif code == evdev.ecodes.ABS_RZ: # analog R2
self._set_bit(_BIT_R2, value > TRIGGER_THRESHOLD)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TRIGGER_THRESHOLD = 128 assumes an unsigned 0–255 range. Some controllers (especially older Xbox 360 wired pads) report ABS_Z/ABS_RZ as signed -32768..32767, in which case value > 128 is true for the entire "trigger pressed past quarter" range including the resting position on some pads.

Safer: read info.min / info.max from dev.capabilities()[EV_ABS] and threshold at min + (max - min) * 0.5. Out of scope for this PR if Xbox-Series-X-only is the immediate target, but worth a TODO.


def _handle_key(self, code: int, value: int) -> None:
bit = _BUTTON_BIT.get(code)
if bit is None:
# Some controllers expose dpad as buttons rather than HAT axes.
if code == evdev.ecodes.BTN_DPAD_UP:
self._set_bit(_BIT_UP, value != 0)
elif code == evdev.ecodes.BTN_DPAD_DOWN:
self._set_bit(_BIT_DOWN, value != 0)
elif code == evdev.ecodes.BTN_DPAD_LEFT:
self._set_bit(_BIT_LEFT, value != 0)
elif code == evdev.ecodes.BTN_DPAD_RIGHT:
self._set_bit(_BIT_RIGHT, value != 0)
return
self._set_bit(bit, value != 0)

def _set_bit(self, bit: int, pressed: bool) -> None:
with self._lock:
if pressed:
self._key_bits |= bit
else:
self._key_bits &= ~bit

def _effective_key_bits_locked(self) -> int:
"""Combine button bits with HAT-axis-derived dpad bits."""
bits = self._key_bits
if self._dpad_x < 0:
bits |= _BIT_LEFT
elif self._dpad_x > 0:
bits |= _BIT_RIGHT
if self._dpad_y < 0:
bits |= _BIT_UP
elif self._dpad_y > 0:
bits |= _BIT_DOWN
return bits
Loading