diff --git a/pslab/connection/connection.py b/pslab/connection/connection.py index d87fd18..35860e0 100644 --- a/pslab/connection/connection.py +++ b/pslab/connection/connection.py @@ -70,6 +70,36 @@ def write(self, data: bytes) -> int: """ ... + def exchange(self, cmd: bytes, data: bytes = b"") -> bytes: + """Send command and input data to device, and return output data. + + Parameters + ---------- + cmd : int + Command code. + data : bytes, default b'' + Input data for command, if any. + + Returns + ------- + bytes + Output data from command, if any. + """ + (cmd_int,) = CP.ShortInt.unpack(cmd) + header = CP.Header.pack(cmd_int, len(data)) + self.write(header + data) + status, response_size = CP.Header.unpack(self.read(CP.Header.size)) + + if status: + raise Exception(status) + + response = self.read(response_size) + + if len(response) < response_size: + raise TimeoutError + + return response + def get_byte(self) -> int: """Read a single one-byte of integer value. @@ -164,10 +194,7 @@ def get_version(self) -> str: str Version string. """ - self.send_byte(CP.COMMON) - self.send_byte(CP.GET_VERSION) - version_length = 9 - version = self.read(version_length) + version = self.exchange(CP.COMMON + CP.GET_VERSION) try: if b"PSLab" not in version: @@ -177,23 +204,16 @@ def get_version(self) -> str: msg = "device not found" raise ConnectionError(msg) from exc - return version.decode("utf-8") + return version.rstrip(b"\x00").decode("utf-8") def get_firmware_version(self) -> FirmwareVersion: """Get firmware version. Returns ------- - tuple[int, int, int] + FirmwareVersion major, minor, patch. """ - self.send_byte(CP.COMMON) - self.send_byte(CP.GET_FW_VERSION) - - # Firmware version query was added in firmware version 3.0.0. - major = self.get_byte() - minor = self.get_byte() - patch = self.get_byte() - + major, minor, patch = self.exchange(CP.COMMON + CP.GET_FW_VERSION) return FirmwareVersion(major, minor, patch) diff --git a/pslab/instrument/oscilloscope.py b/pslab/instrument/oscilloscope.py index b8d3e30..9372d73 100644 --- a/pslab/instrument/oscilloscope.py +++ b/pslab/instrument/oscilloscope.py @@ -38,8 +38,8 @@ def __init__(self, device: ConnectionHandler | None = None): self._trigger_voltage = None self._trigger_enabled = False self._trigger_channel = "CH1" - self._set_gain("CH1", 1) - self._set_gain("CH2", 1) + # self._set_gain("CH1", 1) + # self._set_gain("CH2", 1) def capture( self, @@ -375,12 +375,6 @@ def select_range(self, channel: str, voltage_range: Union[int, float]): self._set_gain(channel, gain) def _set_gain(self, channel: str, gain: int): - spi_config_supported = self._check_spi_config() - - if not spi_config_supported: - spi_parameters = SPIMaster.get_parameters() - spi = SPIMaster(self._device) # Initializing SPIMaster will reset config. - self._channels[channel].gain = gain pga = self._channels[channel].programmable_gain_amplifier gain_idx = GAIN_VALUES.index(gain) @@ -390,9 +384,6 @@ def _set_gain(self, channel: str, gain: int): self._device.send_byte(gain_idx) self._device.get_ack() - if not spi_config_supported: - spi.set_parameters(*spi_parameters) - @staticmethod def _check_spi_config() -> bool: """Check whether current SPI config is supported by PGA. diff --git a/pslab/protocol.py b/pslab/protocol.py index de7f258..8f5dfa7 100644 --- a/pslab/protocol.py +++ b/pslab/protocol.py @@ -9,6 +9,8 @@ ShortInt = struct.Struct("H") # size 2 Integer = struct.Struct("I") # size 4 +Header = struct.Struct(">> psl.rgb_led([[10,0,0],[0,10,10],[10,0,10]], output="SQ1", order="RGB") """ - if "6" in self.device.version: + if "6" in self.version: pins = {"ONBOARD": 0, "SQ1": 1, "SQ2": 2, "SQ3": 3, "SQ4": 4} + if output == "RGB": + output = "ONBOARD" else: pins = {"RGB": CP.SET_RGB1, "PGC": CP.SET_RGB2, "SQ1": CP.SET_RGB3} @@ -189,24 +192,13 @@ def rgb_led(self, colors: List, output: str = "RGB", order: str = "GRB"): f"Invalid order: {order}. order must contain 'R', 'G', and 'B'." ) - self.device.send_byte(CP.COMMON) - - if "6" in self.device.version: - self.device.send_byte(CP.SET_RGB_COMMON) - else: - self.device.send_byte(pin) - - self.device.send_byte(len(colors) * 3) - - for color in colors: - self.device.send_byte(color[order.index("R")]) - self.device.send_byte(color[order.index("G")]) - self.device.send_byte(color[order.index("B")]) - - if "6" in self.device.version: - self.device.send_byte(pin) - - self.device.get_ack() + cmd = CP.COMMON + CP.SET_RGB_COMMON + args = CP.Byte.pack(pin) + args += CP.Byte.pack(len(colors) * 3) + args += bytes( + color[order.index(channel)] for channel in "RGB" for color in colors + ) + self.device.exchange(cmd, args) def _read_program_address(self, address: int): """Return the value stored at the specified address in program memory.