diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index 884665934..9aa560b20 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -9,6 +9,7 @@ "serial": ("can.interfaces.serial.serial_can", "SerialBus"), "pcan": ("can.interfaces.pcan", "PcanBus"), "usb2can": ("can.interfaces.usb2can", "Usb2canBus"), + "usb2can_libusb": ("can.interfaces.usb2can_libusb", "Usb2CanLibUsbBus"), "ixxat": ("can.interfaces.ixxat", "IXXATBus"), "nican": ("can.interfaces.nican", "NicanBus"), "iscan": ("can.interfaces.iscan", "IscanBus"), diff --git a/can/interfaces/usb2can_libusb/__init__.py b/can/interfaces/usb2can_libusb/__init__.py new file mode 100644 index 000000000..53e84b325 --- /dev/null +++ b/can/interfaces/usb2can_libusb/__init__.py @@ -0,0 +1,4 @@ +""" +""" + +from .usb2can_libusb_bus import Usb2CanLibUsbBus diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py new file mode 100644 index 000000000..3402aed62 --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py @@ -0,0 +1,152 @@ +import logging +import queue +from threading import Thread + +from .can_8dev_usb_utils import * + +logger = logging.getLogger(__name__) + +try: + import usb.core + import usb.util +except ImportError: + logger.warning( + "The PyUSB module is not installed. Install it using `python3 -m pip install pyusb`" + ) + + +class Can8DevUSBDevice: + cmd_rx_ep: usb.core.Endpoint + cmd_tx_ep: usb.core.Endpoint + data_rx_ep: usb.core.Endpoint + data_tx_ep: usb.core.Endpoint + serial_number: str + _close: bool + _rx_queue: queue.Queue + _recv_thread: Thread + + def __init__(self, serial_number=None): + if serial_number is not None: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, + idProduct=USB_8DEV_PRODUCT_ID, + serial_number=serial_number, + ) + else: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID + ) + + if dev is None: + raise ValueError( + "8Devices CAN interface not found! Serial number provided: %s" + % serial_number + ) + + self.serial_number = dev.serial_number + + dev.reset() + # set the active configuration. With no arguments, the first + # configuration will be the active one + dev.set_configuration() + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX + ) + self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX + ) + self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX + ) + self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX + ) + + if ( + self.cmd_rx_ep is None + or self.cmd_tx_ep is None + or self.data_rx_ep is None + or self.data_tx_ep is None + ): + raise ValueError("Could not configure 8Devices CAN endpoints!") + + self._rx_queue = queue.Queue(MAX_8DEV_RECV_QUEUE) + + def _recv_thread_loop(self): + while True: + byte_buffer = bytes() + try: + # We must read the full possible buffer size each iteration or we risk a buffer overrun exception losing data. + byte_buffer = self.data_rx_ep.read(512, 0).tobytes() + except Exception: + pass + for i in range(0, len(byte_buffer), 21): + # We could have read multiple frames in a single bulk xfer + self._rx_queue.put(Can8DevRxFrame(byte_buffer[i : i + 21])) + if self._close: + return + + def _start_recv_thread(self): + self._close = False + self._recv_thread = Thread(target=self._recv_thread_loop, daemon=True) + self._recv_thread.start() + + def _stop_recv_thread(self): + self._close = True + + def send_command(self, cmd: Can8DevCommandFrame): + self.cmd_tx_ep.write(cmd.to_bytes()) + return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) + + def open( + self, + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, + ): + self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) + open_command = can_8dev_open_frame( + phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot + ) + if self.send_command(open_command).opt1 == 0: + self._start_recv_thread() + return True + else: + return False + + def close(self): + self._stop_recv_thread() + close_command = Can8DevCommand.USB_8DEV_CLOSE + self.send_command(Can8DevCommandFrame(close_command)) + + def recv(self, timeout=None): + try: + return self._rx_queue.get(True, timeout=timeout / 1000) + except queue.Empty: + return None + + def send(self, tx_frame: Can8DevTxFrame, timeout=None): + self.data_tx_ep.write(tx_frame.to_bytes(), timeout) + + def get_version(self): + cmd_response = self.send_command( + Can8DevCommandFrame(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) + ) + version = int.from_bytes(cmd_response.data[0:4], byteorder="big") + return version + + def get_firmware_version(self): + version = self.get_version() + return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) + + def get_serial_number(self): + return self.serial_number diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py new file mode 100644 index 000000000..b6836303d --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py @@ -0,0 +1,221 @@ +from enum import Enum + +MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue + +USB_8DEV_VENDOR_ID = ( + 0x0483 +) # Unfortunately this is actually the ST Microelectronics Vendor ID +USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus +USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. + +USB_8DEV_ABP_CLOCK = 32000000 + +# USB Bulk Endpoint identifiers + +USB_8DEV_ENDP_DATA_RX = 0x81 +USB_8DEV_ENDP_DATA_TX = 0x2 +USB_8DEV_ENDP_CMD_RX = 0x83 +USB_8DEV_ENDP_CMD_TX = 0x4 + +# Open Device Options + +USB_8DEV_SILENT = 0x01 +USB_8DEV_LOOPBACK = 0x02 +USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 +USB_8DEV_STATUS_FRAME = 0x08 + +# Command options +USB_8DEV_BAUD_MANUAL = 0x09 +USB_8DEV_CMD_START = 0x11 +USB_8DEV_CMD_END = 0x22 + +USB_8DEV_CMD_SUCCESS = 0 +USB_8DEV_CMD_ERROR = 255 + +USB_8DEV_CMD_TIMEOUT = 1000 + +# Framing definitions +USB_8DEV_DATA_START = 0x55 +USB_8DEV_DATA_END = 0xAA + +USB_8DEV_TYPE_CAN_FRAME = 0 +USB_8DEV_TYPE_ERROR_FRAME = 3 + +USB_8DEV_EXTID = 0x01 +USB_8DEV_RTR = 0x02 +USB_8DEV_ERR_FLAG = 0x04 + +# Status messages +USB_8DEV_STATUSMSG_OK = 0x00 +USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ +USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ +USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ +USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ +USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ +USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ +USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ +USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ +USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ +USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ + +USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ + +# Available Commands + + +class Can8DevCommand(Enum): + USB_8DEV_RESET = 1 # Reset Device + USB_8DEV_OPEN = 2 # Open Port + USB_8DEV_CLOSE = 3 # Close Port + USB_8DEV_SET_SPEED = 4 + USB_8DEV_SET_MASK_FILTER = ( + 5 + ) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares + USB_8DEV_GET_STATUS = 6 + USB_8DEV_GET_STATISTICS = 7 + USB_8DEV_GET_SERIAL = 8 + USB_8DEV_GET_SOFTW_VER = 9 + USB_8DEV_GET_HARDW_VER = 0xA + USB_8DEV_RESET_TIMESTAMP = 0xB + USB_8DEV_GET_SOFTW_HARDW_VER = 0xC + + +class Can8DevTxFrame: + flags: int + id: int + dlc: int + data: bytes + + def __init__( + self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool + ): + self.can_id = can_id + self.dlc = dlc + self.data = data + self.flags = 0 + if is_ext: + self.flags |= USB_8DEV_EXTID + if is_remote: + self.flags |= USB_8DEV_RTR + + def _pad_data(self, data: bytes): + data_bytes = bytearray(8) + for i in range(0, 7): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_DATA_START) + cmd_buf.append(self.flags) + id_bytes = self.can_id.to_bytes(4, byteorder="big") + cmd_buf.extend(id_bytes) + cmd_buf.append(self.dlc) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_DATA_END) + return bytes(cmd_buf) + + +class Can8DevRxFrame: + data: bytes + id: int + dlc: int + timestamp: int + ext_id: bool + is_error: bool + is_remote: bool + + def __init__(self, bytes_in: bytes): + if len(bytes_in) != 21: + raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") + if bytes_in[0] != USB_8DEV_DATA_START: + raise ValueError("Did not receive a valid 8Dev Data Frame") + if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: + self.data = bytes_in[8:16] + self.dlc = bytes_in[7] + self.ext_id = bytes_in[2] & USB_8DEV_EXTID + self.is_remote = bytes_in[2] & USB_8DEV_RTR + self.id = int.from_bytes(bytes_in[3:7], byteorder="big") + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + self.is_error = False + elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: + self.is_error = True + self.data = bytes_in[7:15] + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + else: + raise ValueError("8Dev Data Frame with Unknown Type") + + +class Can8DevCommandFrame: + command: Can8DevCommand + opt1: int + opt2: int + data: bytes + + def __init__(self, command, data=bytes(), opt1=0, opt2=0): + self.command = command + self.data = data + self.opt1 = opt1 + self.opt2 = opt2 + + def _pad_data(self, data: bytes): + data_bytes = bytearray(10) + for i in range(0, 9): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_CMD_START) + cmd_buf.append(0) # Supposedly could be a channel value, but unknown + cmd_buf.append(self.command.value) + cmd_buf.append(self.opt1) + cmd_buf.append(self.opt2) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_CMD_END) + return bytes(cmd_buf) + + def from_bytes(byte_input: bytes): + if len(byte_input) != 16: + raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") + return Can8DevCommandFrame( + Can8DevCommand(byte_input[2]), + byte_input[5:15], + byte_input[3], + byte_input[4], + ) + + +def can_8dev_open_frame( + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, +) -> Can8DevCommandFrame: + open_command = Can8DevCommand.USB_8DEV_OPEN + opt1 = USB_8DEV_BAUD_MANUAL + flags = 0 + if loopback: + flags |= USB_8DEV_LOOPBACK + if listenonly: + flags |= USB_8DEV_SILENT + if oneshot: + flags |= USB_8DEV_DISABLE_AUTO_RESTRANS + flags_bytes = flags.to_bytes(4, "big") + brp_bytes = brp.to_bytes(2, "big") + data = bytearray(10) + data[0] = phase_seg1 + data[1] = phase_seg2 + data[2] = sjw + data[3] = brp_bytes[0] + data[4] = brp_bytes[1] + data[5] = flags_bytes[0] + data[6] = flags_bytes[1] + data[7] = flags_bytes[2] + data[8] = flags_bytes[3] + return Can8DevCommandFrame(open_command, data, opt1) diff --git a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py new file mode 100644 index 000000000..5259709e7 --- /dev/null +++ b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py @@ -0,0 +1,114 @@ +""" +This interface requires LibUSB and `pyusb` to be installed on your system. +The interface will bind by default to the first device with VID +""" + +import logging +from ctypes import byref + +from can import BusABC, Message, BitTiming +from .can_8dev_usb_utils import * + +# Set up logging +log = logging.getLogger("can.usb2can_libusb") + +try: + from .can_8dev_usb_device import * +except NameError: + log.warning( + "The PyUSB module is not installed, but it is required for USB2Can_LibUSB support. Install it using `python3 -m pip install pyusb`" + ) + + +def message_convert_tx(msg): + """convert message from PythonCAN Message to 8Devices frame""" + return Can8DevTxFrame( + can_id=msg.arbitration_id, + dlc=msg.dlc, + data=msg.data, + is_ext=msg.is_extended_id, + is_remote=msg.is_remote_frame, + ) + + +def message_convert_rx(message_rx: Can8DevRxFrame): + """convert message from 8Devices frame to PythonCAN Message""" + + if message_rx.is_error: + return Message( + timestamp=message_rx.timestamp / 1000, + is_error_frame=message_rx.is_error, + data=message_rx.data, + ) + + return Message( + timestamp=message_rx.timestamp / 1000, + is_remote_frame=message_rx.is_remote, + is_extended_id=message_rx.ext_id, + is_error_frame=message_rx.is_error, + arbitration_id=message_rx.id, + dlc=message_rx.dlc, + data=message_rx.data[: message_rx.dlc], + ) + + +class Usb2CanLibUsbBus(BusABC): + """Interface to an 8Devices USB2CAN Bus. + + This device should work on any platform with a working LibUSB and PyUSB. It was tested with a "Korlan USB2Can" but should work with the older module as well. + + Hardware filtering is not provided, if anyone knows how the 8Devices filtering command works, this would be valuable. + + Based on the in-tree Linux kernel SocketCAN driver for USB2CAN. + + :param str channel (optional): + The device's serial number. If not provided, the first matching VID/DID will match (WARNING: 8Devices reuse a random ST VID/DID, so other devices may match!) + + :param int bitrate (optional): + Bitrate of channel in bit/s. Values will be limited to a maximum of 1000 Kb/s. + Default is 500 Kbs + + :param int flags (optional): + Flags to directly pass to open function of the usb2can abstraction layer. + """ + + def __init__(self, channel=None, *args, bitrate=500000, **kwargs): + + self.can = Can8DevUSBDevice() + + # convert to kb/s and cap: max rate is 1000 kb/s + baudrate = min(int(bitrate // 1000), 1000) + + self.channel_info = "USB2CAN LibUSB device {}".format( + self.can.get_serial_number() + ) + + connector = "{}; {}".format("USB2Can_LibUSB", baudrate) + + timing = BitTiming( + tseg1=6, tseg2=1, sjw=1, bitrate=bitrate, f_clock=USB_8DEV_ABP_CLOCK + ) + self.can.open(timing.tseg1, timing.tseg2, timing.sjw, timing.brp) + + super().__init__(channel=channel, bitrate=bitrate, *args, **kwargs) + + def send(self, msg, timeout=None): + tx = message_convert_tx(msg) + if timeout is not None: + timeout *= 1000 + self.can.send(tx, timeout) + + def _recv_internal(self, timeout): + if timeout is not None: + timeout *= 1000 + messagerx = self.can.recv(timeout) + rx = None + if messagerx is not None: + rx = message_convert_rx(messagerx) + return rx, False + + def shutdown(self): + """ + Shuts down connection to the device. + """ + self.can.close() diff --git a/doc/interfaces.rst b/doc/interfaces.rst index 361ea4097..2613d5e06 100644 --- a/doc/interfaces.rst +++ b/doc/interfaces.rst @@ -28,6 +28,7 @@ The available interfaces are: interfaces/socketcan interfaces/systec interfaces/usb2can + interfaces/usb2can_libusb interfaces/vector interfaces/virtual diff --git a/doc/interfaces/usb2can_libusb.rst b/doc/interfaces/usb2can_libusb.rst new file mode 100644 index 000000000..09ddffb26 --- /dev/null +++ b/doc/interfaces/usb2can_libusb.rst @@ -0,0 +1,57 @@ +USB2CAN Interface +================= + +OVERVIEW +-------- + +The `USB2CAN `_ is a cheap CAN interface. Two versions exist, "Korlan" and a previous edition. +Both are based on STM32 chips. + +There is support for this device on Linux through the :doc:`socketcan` interface and for Windows using the +``usb2can`` interface. + +This interface supports any platform with a working "pyusb". + +The device has been tested on OS X at 500kbaud against a device spamming 300+ messages/second, so it should be decently robust. + +INSTALL +_______ + +Install `pyusb` and a working pyusb backend (on most platforms, this is `libusb1`). + +This should be the only required action. + +Interface Layout +---------------- + +`USB2CanLibUsbBus` implements the basic `python-can` Bus for this device. + +`Can8DevUSBDevice` implements an abstract device driver for the hardware, based on `pyusb` and the SocketCAN kernel module for the device. + +`Can8DevUSBDevice` uses a Queue and a read thread to ensure that messages are read into host hardware before they overflow the internal buffers in the device. The `recv` methods simply poll the read queue for available messages. + +Interface Specific Items +------------------------ + +This device is really an oddball. It works well, but documentation is quite sparse. + +Filtering is not implemented because the details of how to use it are not documented in any way. + + +.. warning:: + + Currently message filtering is not implemented. Contributions are most welcome! + + +Bus +--- + +.. autoclass:: can.interfaces.usb2can_libusb.Usb2CanLibUsbBus + + +Internals +--------- + +.. autoclass:: can.interfaces.usb2can_libusb.Can8DevUSBDevice + :members: + :undoc-members: diff --git a/test/test_usb2can_libusb.py b/test/test_usb2can_libusb.py new file mode 100644 index 000000000..695f5dfb6 --- /dev/null +++ b/test/test_usb2can_libusb.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +Test for USB2Can_LibUSB +""" + +import unittest +from unittest.mock import Mock + +import pytest + +import can +from can import Message +from can.interfaces.usb2can_libusb.usb2can_libusb_bus import ( + message_convert_rx, + message_convert_tx, + Usb2CanLibUsbBus, +) +from can.interfaces.usb2can_libusb.can_8dev_usb_utils import ( + Can8DevRxFrame, + Can8DevCommandFrame, + Can8DevCommand, + can_8dev_open_frame, +) + + +class TestUsb2CanLibUsbBus(unittest.TestCase): + def test_receive_deserialize(self) -> None: + recv_packet = bytes.fromhex("55000000000121084ef1ff1f00007e00008355e0aa") + recv_rx_frame = Can8DevRxFrame(recv_packet) + recv_msg = message_convert_rx(recv_rx_frame) + self.assertEqual(recv_msg.arbitration_id, 0x121) + self.assertEqual(recv_msg.dlc, 8) + self.assertEqual( + recv_msg.data, bytes([0x4E, 0xF1, 0xFF, 0x1F, 0x00, 0x00, 0x7E, 0x00]) + ) + self.assertEqual(recv_msg.timestamp, 8607.200000) + + def test_transmit_serialize(self) -> None: + send_packet = Message( + arbitration_id=0xC0FFEE, + is_extended_id=True, + data=[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08], + ) + tx_frame = message_convert_tx(send_packet) + tx_bytes = tx_frame.to_bytes() + self.assertEqual(tx_bytes.hex(), "550100c0ffee080102030405060700aa") + + def test_command_serialize(self) -> None: + frame = Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET) + self.assertEqual(frame.to_bytes().hex(), "11000100000000000000000000000022") + deserialized_frame = Can8DevCommandFrame.from_bytes(frame.to_bytes()) + self.assertEqual(deserialized_frame.command, Can8DevCommand.USB_8DEV_RESET) + + def test_open_command(self) -> None: + phase_seg1 = 6 + phase_seg2 = 1 + sjw = 1 + brp = 8 + loopback = True + listenonly = False + oneshot = False + open_command = can_8dev_open_frame( + phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot + ) + self.assertEqual( + open_command.to_bytes().hex(), "11000209000601010008000000020022" + ) + + +if __name__ == "__main__": + unittest.main()