diff --git a/examples/vucm/ni-daq/Dockerfile b/examples/vucm/ni-daq/Dockerfile new file mode 100644 index 0000000..296cfb0 --- /dev/null +++ b/examples/vucm/ni-daq/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-alpine3.22 + +WORKDIR /app + +RUN apk add build-base + +RUN python -m venv .venv +COPY requirements.txt requirements.txt +RUN .venv/bin/pip install -r requirements.txt + +COPY script.py script.py + +CMD [".venv/bin/python", "script.py"] diff --git a/examples/vucm/ni-daq/docker_run.sh b/examples/vucm/ni-daq/docker_run.sh new file mode 100755 index 0000000..577a15e --- /dev/null +++ b/examples/vucm/ni-daq/docker_run.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +SCRIPT_DIR="$(realpath "$(dirname "$0")")" +IMAGE_TAG="${IMAGE_TAG:-"enapter-vucm-examples/$(basename "$SCRIPT_DIR"):latest"}" + +docker build --tag "$IMAGE_TAG" "$SCRIPT_DIR" + +docker run --rm -it \ + --name "ni-daq" \ + --network host \ + -e ENAPTER_LOG_LEVEL="${ENAPTER_LOG_LEVEL:-info}" \ + -e ENAPTER_VUCM_BLOB="$ENAPTER_VUCM_BLOB" \ + -e LISTEN_TCP_PORT="$LISTEN_TCP_PORT" \ + "$IMAGE_TAG" diff --git a/examples/vucm/ni-daq/manifest.yml b/examples/vucm/ni-daq/manifest.yml new file mode 100644 index 0000000..8e2d3d9 --- /dev/null +++ b/examples/vucm/ni-daq/manifest.yml @@ -0,0 +1,104 @@ +blueprint_spec: "device/1.0" + +display_name: ATS stack + +communication_module: + product: ENP-VIRTUAL + +properties: + vendor: + display_name: Vendor + type: string + model: + display_name: Model + type: string + +alerts: + parse_error: + display_name: Data processing failed + severity: error +telemetry: + status: + display_name: Status + type: string + enum: + - ok + - error + - no_data + T1: + display_name: T1 + type: float + T2: + display_name: T2 + type: float + T3: + display_name: T2 + type: float + Current: + display_name: Current + type: float + PSU: + display_name: Current + type: float + P1: + display_name: P1 + type: float + P2: + display_name: P2 + type: float + P3: + display_name: P3 + type: float + Flow: + display_name: Flow + type: float + Conductivity: + display_name: Conductivity + type: float + MFMH2: + display_name: MFMH2 + type: float + Theoretical_h2: + display_name: MFMH2 + type: float + MCM02: + display_name: MCM02 + type: float + Refilling: + display_name: Refilling + type: float + PC: + display_name: PC + type: float + C1: + display_name: Cell 1 + type: float + C2: + display_name: Cell 2 + type: float + C3: + display_name: Cell 3 + type: float + C4: + display_name: Cell 4 + type: float + C5: + display_name: Cell 5 + type: float + C6: + display_name: Cell 6 + type: float + C7: + display_name: Cell 7 + type: float + C8: + display_name: Cell 8 + type: float + C9: + display_name: Cell 9 + type: float + C10: + display_name: Cell 10 + type: float + +commands: {} diff --git a/examples/vucm/ni-daq/requirements.txt b/examples/vucm/ni-daq/requirements.txt new file mode 100644 index 0000000..081e9d2 --- /dev/null +++ b/examples/vucm/ni-daq/requirements.txt @@ -0,0 +1 @@ +enapter==0.9.2 diff --git a/examples/vucm/ni-daq/script.py b/examples/vucm/ni-daq/script.py new file mode 100644 index 0000000..fe3ae59 --- /dev/null +++ b/examples/vucm/ni-daq/script.py @@ -0,0 +1,91 @@ +import asyncio +import functools +import json +import os +import socket +from datetime import datetime + +import enapter + + +async def main(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) + sock.bind(("127.0.0.1", int(os.environ["LISTEN_TCP_PORT"]))) + sock.listen() + sock.setblocking(False) + device_factory = functools.partial(NIDAQ, socket=sock) + await enapter.vucm.run(device_factory) + + +class NIDAQ(enapter.vucm.Device): + def __init__(self, socket, **kwargs): + super().__init__(**kwargs) + self.socket = socket + + async def task_accept_conns(self): + sock = self.socket + while True: + (conn, addr) = await asyncio.get_event_loop().sock_accept(sock) + asyncio.create_task(self.handle_conn(conn, addr)) + + async def handle_conn(self, conn, addr): + print(addr, "accept") + data = bytearray() + try: + while True: + try: + async with asyncio.timeout(5): + chunk = await asyncio.get_event_loop().sock_recv(conn, 1024) + except TimeoutError: + print(addr, "timeout") + return + if chunk: + print(addr, "read chunk: ", chunk) + data.extend(chunk) + continue + print(addr, "read data: ", data) + await self._process_and_send_telemetry(data) + return + finally: + print(addr, "close") + conn.close() + + async def task_properties_sender(self): + """Periodically send device properties.""" + while True: + await self.send_properties( + {"vendor": "National Instruments", "model": "cDAQ 9178"} + ) + await asyncio.sleep(10) + + async def _process_and_send_telemetry(self, data): + """Parse, enrich, and send telemetry data.""" + telemetry = {} + status = "no_data" + try: + if data: + status = "ok" + telemetry = json.loads(data.decode()) + self._add_timestamp_if_present(telemetry) + telemetry["status"] = status + await self.send_telemetry(telemetry) + self.alerts.clear() + except Exception as e: + self.alerts.add("parse_error") + await self.log.error(f"Failed to process data: {e}") + + def _add_timestamp_if_present(self, telemetry): + """If 'Date' and 'Time' are present, combine and convert to timestamp.""" + date_str = telemetry.get("Date") + time_str = telemetry.get("Time") + if date_str and time_str: + dt_str = f"{date_str} {time_str}" + date = datetime.strptime(dt_str, "%d/%m/%Y %H:%M:%S") + telemetry.pop("Date") + telemetry.pop("Time") + telemetry["timestamp"] = int(date.timestamp()) + + +if __name__ == "__main__": + asyncio.run(main())