|
| 1 | +"""Framebuffer command helpers. |
| 2 | +
|
| 3 | +Provides utilities to interact with devices exposing a framebuffer (e.g. LCD |
| 4 | +modules) via the `framebuffer:read` command. |
| 5 | +
|
| 6 | +Exposed helpers: |
| 7 | +* framebuffer_read -> raw response (contains base64 PNG at result.png) |
| 8 | +* framebuffer_png_bytes -> decoded PNG bytes |
| 9 | +* save_framebuffer_png -> save PNG to disk |
| 10 | +* compare_framebuffer_png -> compare current framebuffer against reference |
| 11 | +""" |
| 12 | + |
| 13 | +# SPDX-FileCopyrightText: 2025-present CodeMagic LTD |
| 14 | +# |
| 15 | +# SPDX-License-Identifier: MIT |
| 16 | + |
| 17 | +from __future__ import annotations |
| 18 | + |
| 19 | +import base64 |
| 20 | +from pathlib import Path |
| 21 | + |
| 22 | +from .exceptions import WokwiError |
| 23 | +from .protocol_types import ResponseMessage |
| 24 | +from .transport import Transport |
| 25 | + |
| 26 | +__all__ = [ |
| 27 | + "read_framebuffer", |
| 28 | + "read_framebuffer_png_bytes", |
| 29 | + "save_framebuffer_png", |
| 30 | +] |
| 31 | + |
| 32 | + |
| 33 | +async def read_framebuffer(transport: Transport, *, id: str) -> ResponseMessage: |
| 34 | + """Issue `framebuffer:read` for the given device id and return raw response.""" |
| 35 | + return await transport.request("framebuffer:read", {"id": id}) |
| 36 | + |
| 37 | + |
| 38 | +def _extract_png_b64(resp: ResponseMessage) -> str: |
| 39 | + result = resp.get("result", {}) |
| 40 | + png_b64 = result.get("png") |
| 41 | + if not isinstance(png_b64, str): # pragma: no cover - defensive |
| 42 | + raise WokwiError("Malformed framebuffer:read response: missing 'png' base64 string") |
| 43 | + return png_b64 |
| 44 | + |
| 45 | + |
| 46 | +async def read_framebuffer_png_bytes(transport: Transport, *, id: str) -> bytes: |
| 47 | + """Return decoded PNG bytes for the framebuffer of device `id`.""" |
| 48 | + resp = await read_framebuffer(transport, id=id) |
| 49 | + return base64.b64decode(_extract_png_b64(resp)) |
| 50 | + |
| 51 | + |
| 52 | +async def save_framebuffer_png( |
| 53 | + transport: Transport, *, id: str, path: Path, overwrite: bool = True |
| 54 | +) -> Path: |
| 55 | + """Save the framebuffer PNG to `path` and return the path. |
| 56 | +
|
| 57 | + Args: |
| 58 | + transport: Active transport. |
| 59 | + id: Device id (e.g. "lcd1"). |
| 60 | + path: Destination file path. |
| 61 | + overwrite: Overwrite existing file (default True). If False and file |
| 62 | + exists, raises WokwiError. |
| 63 | + """ |
| 64 | + if path.exists() and not overwrite: |
| 65 | + raise WokwiError(f"File already exists and overwrite=False: {path}") |
| 66 | + data = await read_framebuffer_png_bytes(transport, id=id) |
| 67 | + path.parent.mkdir(parents=True, exist_ok=True) |
| 68 | + with open(path, "wb") as f: |
| 69 | + f.write(data) |
| 70 | + return path |
0 commit comments