Skip to content

Conversation

laurensvalk
Copy link
Member

@laurensvalk laurensvalk commented Aug 18, 2025

This implements the pybricks.iodevices.I2CDevice class. It is compatible with the class we had in Pybricks 2.0, but adds new features. You can use use either LEGO-style (autodetected) devices to raise ERROR_NO_DEV if it isn't found, or disable auto-detection to allow custom devices.

image

You can gracefully switch between them. If you run a new program with an auto-detectable sensor after using a custom sensor last time, you can do so without rebooting the hub. This takes a little bit of time (less than a second), but should be more stable than in 2.0, where it would take much longer and sometimes not revert modes.

from pybricks.iodevices import I2CDevice
from pybricks.parameters import Port
from ustruct import unpack

# Using custom=True will disable auto-detection, allowing for non-LEGO I2C devices too.
# Using powered=True turns on battery voltage which is needed for some devices.
# Using nxt_quirk=True applies I2C workarounds for certain LEGO sensors, off by default.
therm = I2CDevice(Port.S4, 0x4C, custom=True, powered=False, nxt_quirk=False)

# There is a new write-then-read command to efficiently combine certain operations.
# This sets the temperature resolution to 0.125 degrees, which could also be done
# with the old-style write.
therm.write_then_read(bytes([0x01, (1 << 6) | (0 << 5)]), 0)

old_data = 0

while True:

    # Original read and write API remain available.
    data = therm.read(reg=0x00, length=2)
    if data == old_data:
        continue

    old_data = data

    temperature_raw = unpack(">h", data)[0]

    print(temperature_raw // 16 / 16)

A new method called write_then_read is added to complement the existing read and write for advanced users.

Other sensors

Support for the NXT Ultrasonic Sensor and the Temperature Sensor has been added. The Energy Meter should follow shortly. Support for powered devices is added, which is needed for the Ultrasonic Sensor and selected other devices.

image image

Async support

Unlike in 2.0 where everything was blocking, the new API is also async-compatible. You can make background loops that poll your devices without blocking the rest of your code. The internal implementation of the I2CDevice class is such that we can easily add builtin support for more sensors without a big penalty on build size. Of course new sensors can just be implemented in Python too.

from pybricks.nxtdevices import TemperatureSensor, UltrasonicSensor
from pybricks.parameters import Port
from pybricks.tools import multitask, run_task

therm = TemperatureSensor(Port.S4)
sonar = UltrasonicSensor(Port.S3)


async def therm_thread():

    old = 0

    while True:
        new = await therm.temperature()
        if new == old:
            continue
        old = new
        print(f"Temperature: {new}°C")


async def sonar_thread():

    old = 0

    while True:
        new = await sonar.distance()
        if new == old:
            continue
        old = new
        print(f"Distance: {new} mm")


run_task(multitask(therm_thread(), sonar_thread()))

Multiple devices

Using multiple sensors with different addresses on one port also works by initializing two I2CDevice instances (or two regular sensor instances). It is then up to the user to poll them one at a time.

from pybricks.nxtdevices import TemperatureSensor, UltrasonicSensor
from pybricks.parameters import Port
from pybricks.tools import wait

sonar = UltrasonicSensor(Port.S4)
wait(100)
therm = TemperatureSensor(Port.S4)
wait(100)

while True:

    dist = sonar.distance()
    temp = therm.temperature()

    print(f"Dist: {dist} mm  Temp: {temp} °C")
    wait(200)

880cc8ac-5f2d-44c4-bcec-a8b77256c3a5

Thanks

Big thanks to @ArcaneNibble for implementing a low-level I2C driver using the PRU.

Testing to allow a range of devices like any I2C devices is more natural this way.
So far we assumed that if a device made it out of the passive device manager, it must be a LUMP device.

There may be other categories of active sensors that need a background process, so prepare for it now.
Reset isn't implemented here, so we don't need this argument.
Start off with a basic implementation. This is not async-compatible yet.
We used to turn power off only at the end of active sensor processes as well as at the end of a program. In general, we want to turn power off whenever something is disconnected, which is what we do here.

The goal here is to be able to turn off 9V to a sensor if a user has switched it on and then unplugs the device.
This is only an indentation change with continue replaced by returning success.

This thread should wait for a connection and then wait until disconnected. The outer sensor port loop will run this again instead.

This allows us to handle things like port reset in the outer sensor loop. This ensures 9V power is turned off once a device is unplugged, for example.
This will automatically turn off when unplugged.
Also supports async API.

This replaces the smbus version for ev3dev. Some missing methods will be reinstated in future commits.
This way we don't need to add the `max_read_size` parameter and only allocate as much as we need.
These are wrappers around write_then_read for backwards compatibility.
It appears that this was left on by accident.
A user could inadvertently make another async I2C request, but we don't allow this if another operation is ongoing. So we shouldn't override the state in this case.
Since this already deals with nxt quirks, include the required delay between operations too. This simplifies higher level code common to all I2C Sensors.
Creating duplicate async functions in Python for every method is not very nice. We can instead define callbacks to map bytes to relevant data when the operation completes.
Instead of having the driver memcpy it, let the caller handle it so it can do it after allocation. This also means we don't have to allocate bytes objects at all for most sensor methods in the nect commits.
We no longer need a double allocation just to shift the write data.
We've been gradually introducing this pattern for newer classes, so be consistent about it.
Allows sensor definitions to have an I2DDevice instance with awaitable operations with a result mapped to objects.
Given the firmware size limitations on NXT, using Python wasn't going to be scalable. With the recent changes to the I2CDevice implementation, we can use instances of it for sensor classes written in C.

This reverts commit a37f46f756cd8453ff8f99193263ac68f9b5a057.
This is reverted instead of dropped so we maintain the logic to import Python classes into the C module in git history for future reference.
This is commonly used for LEGO-style I2C device classes.

Also simplify passing in address to I2CDevice object creator.
@coveralls
Copy link

Coverage Status

coverage: 58.691% (-0.05%) from 58.741%
when pulling 5fba49d on work
into 5c37864 on master.

Copy link
Member

@dlech dlech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many commits! I think half of my comments were on code that got added and then promptly removed.

Just a few high-level comments:

  1. I'm not sure that write_then_read() as a user API is particularly useful. Pretty much the only time you actually write then read is on devices that use register addressing, so this is already covered by the read() method.

    For the less common cases when a device doesn't use register addressing, I think allowing None as the reg arg for the read() and write() methods would be simpler than adding a new method.

  2. For the case of multiple devices on a single port, why do we need the extra wait() calls? It seems like this is something that should be baked in if it is actually required.

@@ -46,6 +46,17 @@ pbio_error_t pbdrv_ioport_p5p6_set_mode(const pbdrv_ioport_pins_t *pins, pbdrv_i
pbdrv_gpio_alt(&pins->uart_tx, pins->uart_tx_alt_uart);
pbdrv_gpio_out_low(&pins->uart_buf);
return PBIO_SUCCESS;
} else if (mode == PBDRV_IOPORT_P5P6_MODE_I2C) {
// First reset all pins to inputs by going to GPIO mode recursively.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of doing this recursively vs. just setting each gpio once here?

Comment on lines +138 to +139
PBIO_OS_AWAIT_UNTIL(state, pbio_os_timer_is_expired(&i2c_dev->timer));
pbio_os_timer_set(&i2c_dev->timer, 100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reversed. Timer is set after awaiting. If something funny is going on that makes this correct, we should have a comment explaining it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    // This ensures a minimum delay without slowing down code that polls
    // less frequently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't explain why we wait for the timer to expire first and then set the duration of the timer after that. I would expect to set the timer first, then wait for it.

try:
assert self._i2c.read(0x08, 4) == b"LEGO"
assert self._i2c.read(0x10, 5) == b"Sonar"
except:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only catch Exception, otherwise it could swallow SystemExit or KeyboardInterrupt.

assert self._i2c.read(0x08, 4) == b"LEGO"
assert self._i2c.read(0x10, 5) == b"Sonar"
except:
raise OSError("Ultrasonic Sensor not found.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proper use of OSError should include an error code as the first argument. I wouldn't call this OSError though since it didn't come from a pbio error.

wait(100)

try:
assert self._i2c.read(0x08, 4) == b"LEGO"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically assert statements can be optimized out when compiling.

@@ -17,10 +17,22 @@

#include <pybricks/util_mp/pb_kwarg_helper.h>
#include <pybricks/util_mp/pb_obj_helper.h>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintentional blank line?

@@ -106,7 +106,7 @@ pbio_error_t pbdrv_i2c_write_then_read(
uint8_t dev_addr,
const uint8_t *wdata,
size_t wlen,
uint8_t *rdata,
uint8_t **rdata,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scares me. I'm guessing it was done to avoid copying the same data twice. I would just pass in an array allocated from the MicroPython heap and leave the copy here. Then call mp_obj_str_set_data() to attach that array to a bytes object if calling this method is successful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just pass in an array allocated from the MicroPython heap

As we have seen several times with Bluetooth code, this tends to lead to issues. When pbio does the copying, problems arise when the allocated buffer no longer exists or the user program has ended.

When MicroPython drives the final copy step, we know that MicroPython is still running, and it is reading from a static pbio buffer, so this is always safe. If the async code never completes, there are no issues.

Alternatively, we could make a separate get_data call that does the copying. The end result is the same though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point.

}

// Otherwise need to prefix write data with given register.
uint8_t write_data[256];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This amount of stack is fine on EV3, but not so sure about NXT. Most I2C operations are going to be 32 bytes or less so that could be a sensible number to use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants