diff --git a/.gitignore b/.gitignore index 3b2cf55..274b80d 100644 --- a/.gitignore +++ b/.gitignore @@ -163,9 +163,6 @@ cython_debug/ - -.vscode/ - # Ignore dynaconf secret files .secrets.* diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3aad8d6 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,59 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "tiled_ws_cli", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.app.tiled_ws_cli", + "env": { + "TILED_URI": "https://tield-dev.nsls2.bnl.gov", + + }, + "justMyCode": true + }, + { + "name": "tiled_websocket", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.tiled.tiled_websocket", + "env": { + + } + }, + { + "name": "lse_operator_cli", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.app.lse_operator_cli" + }, + { + "name": "viz_operator_cli", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.app.viz_operator_cli" + }, + { + "name": "frame_listener_sim", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.app.frame_listener_sim_cli" + }, + { + "name": "tiled_event_simulator", + "type": "debugpy", + "request": "launch", + "module": "arroyosas.app.tiled_event_sim_cli", + "args": [ + "tiled_event_logs" + ], + "env": { + "TILED_API_KEY": "32bf4b533d638f470270e0725d14a7bc153114641a7dfda58f96e7926160e9e126272add" + } + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 45a96dd..479a0c3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,13 +1,16 @@ FROM python:3.11 + +# Install git for cloning the tiled repository +RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* + WORKDIR /app + COPY . /app -RUN pip install --upgrade pip && \ - pip install .[all] +RUN pip install .[all] RUN pip install debugpy - CMD ["echo", "Hello World"] diff --git a/Dockerfile.dev b/Dockerfile.dev deleted file mode 100644 index ab7a6e7..0000000 --- a/Dockerfile.dev +++ /dev/null @@ -1,19 +0,0 @@ -FROM python:3.11 - -WORKDIR /app - -COPY . /app - - -RUN apt-get update \ - && apt-get install -y --no-install-recommends git \ - && rm -rf /var/lib/apt/lists/* \ - && pip install debugpy - -RUN pip install --upgrade pip && \ - pip install -e .[all] - -RUN pip install debugpy - - -CMD ["echo", "Hello World"] diff --git a/Dockerfile_frontend_dev b/Dockerfile_frontend_dev new file mode 100644 index 0000000..5372625 --- /dev/null +++ b/Dockerfile_frontend_dev @@ -0,0 +1,9 @@ +FROM node:18 + +WORKDIR /frontend +COPY ./frontend/package*.json /frontend/ +RUN npm ci +COPY ./frontend /frontend/ +COPY .env_frontend /frontend/.env + + diff --git a/frontend/.vscode/launch.json b/frontend/.vscode/launch.json new file mode 100644 index 0000000..822e5d1 --- /dev/null +++ b/frontend/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": [ + "/**" + ], + "program": "${file}" + } + ] +} \ No newline at end of file diff --git a/frontend/package-lock.json b/frontend/package-lock.json index af78595..52f05cc 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -20,7 +20,7 @@ "react-dom": "^18.3.1", "react-plotly.js": "^2.5.1", "react-router": "^7.1.5", - "react-scripts": "5.0.1", + "react-scripts": "^5.0.1", "react-tooltip": "^5.28.0", "web-vitals": "^2.1.4" }, @@ -17754,6 +17754,7 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/react-scripts/-/react-scripts-5.0.1.tgz", "integrity": "sha512-8VAmEm/ZAwQzJ+GOMLbBsTdDKOpuZh7RPs0UymvBR2vRk4iZWCskjbFnxqjrzoIvlNNRZ3QJFx6/qDSi6zSnaQ==", + "license": "MIT", "dependencies": { "@babel/core": "^7.16.0", "@pmmmwh/react-refresh-webpack-plugin": "^0.5.3", diff --git a/frontend/package.json b/frontend/package.json index 80d3356..13dbe74 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -15,7 +15,7 @@ "react-dom": "^18.3.1", "react-plotly.js": "^2.5.1", "react-router": "^7.1.5", - "react-scripts": "5.0.1", + "react-scripts": "^5.0.1", "react-tooltip": "^5.28.0", "web-vitals": "^2.1.4" }, diff --git a/frontend/src/components/Header.jsx b/frontend/src/components/Header.jsx index a3f6173..3bc6cee 100644 --- a/frontend/src/components/Header.jsx +++ b/frontend/src/components/Header.jsx @@ -18,7 +18,7 @@ export default function Header({isExperimentRunning=false, showStatus=false, sta

Build:{buildNumber}

-

GISAXS Data Viewer

+

SAS Data Viewer

illumine logo ml exchange logo mwet logo diff --git a/frontend/src/utils/connectionHelper.js b/frontend/src/utils/connectionHelper.js index 66f9623..defcfca 100644 --- a/frontend/src/utils/connectionHelper.js +++ b/frontend/src/utils/connectionHelper.js @@ -4,7 +4,7 @@ */ const getWsUrl = () => { const currentWebsiteIP = window.location.hostname; - const pathname = "/viz"; + const pathname = "/ws"; const port = window.location.port; var wsUrl; diff --git a/pyproject.toml b/pyproject.toml index ff48065..9965525 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,36 +8,10 @@ name = "arroyoSAS" version = "0.0.1" description = "A package to perform computations and suggestions during a time-resolved GISAXS experiment." authors = [{ name = "Dylan McReynolds", email = "dmcreynolds@lbl.gov" }] -dependencies = [ - "arroyopy", - "dynaconf", - "python-dotenv", - "pandas", - "msgpack", - "numpy", - "Pillow", - "bluesky-tiled-plugins==2.0.0b57", - "pyzmq", - "tqdm", - "typer", - "websockets", - "zarr", - "pyFAI", - "transformers", - "redis"] +dependencies = [] [project.optional-dependencies] -workflow-viz = [ - "joblib", - "numpy<2.0.0", - "pandas", - "plotly-express", - "pyFAI==2023.9.0", - "python-dotenv", - "pyarrow>=14.0.1", - "requests==2.26.0", - "diskcache==5.6.3", - "scikit-learn==1.3.0"] + dev = [ "fakeredis", @@ -45,7 +19,6 @@ dev = [ "pre-commit", "pytest-asyncio", "pytest-mock", - "tiled[server] >=0.1.0b14, <0.2" ] lse = [ @@ -53,6 +26,10 @@ lse = [ "joblib" ] +viz = [ + +] + all = [ "arroyopy", "bluesky", @@ -61,20 +38,19 @@ all = [ "pandas", "msgpack", "numpy", - "ophyd", + "tiled[client] >=0.1.0b14, <0.2", "Pillow", "pyzmq", "tqdm", "typer", "websockets", - "tiled[client] >=0.1.0b14, <0.2", "redis", "numpy<2.0.0", "pandas", "python-dotenv", "pyarrow>=14.0.1", + "pyFAI==2025.03", "aiosqlite", - ] [tool.isort] diff --git a/settings_container.yaml b/settings_container.yaml deleted file mode 100644 index e825dc4..0000000 --- a/settings_container.yaml +++ /dev/null @@ -1,17 +0,0 @@ ---- -logging_level: INFO -redis_host: redis -resis_port: 6379 -models: - models_root_dir: /data/models - torch_models: - - name: CNNAutoencoder - checkpoint_dir: /torch/cnnautoencoder - python_class: CNNAutoencoder - - dim_reduction_models: - - name: PCA - dir: /dim_reduction/pca - - # - name: UMAP - # directory: /dim_reduction/umap diff --git a/settings_dev.yaml b/settings_dev.yaml new file mode 100644 index 0000000..5f0d8ea --- /dev/null +++ b/settings_dev.yaml @@ -0,0 +1,23 @@ +--- +logging_level: INFO +redis_host: redis +resis_port: 6379 + +viz_operator: + listeners: + listener: + zmq_address: tcp://tiled_socket:5000 #only safe in containers + + operator: + redis: + host: kvrocks + port: 6666 + + publishers: + websockets: + host: 0.0.0.0 + port: 8021 + tiled: + raw: + uri: https://tiled-dev.nsls2.bnl.gov + api_key: "@format {env[TILED_API_KEY]}" \ No newline at end of file diff --git a/src/arroyosas/app/frame_listener_sim_cli.py b/src/arroyosas/app/frame_listener_sim_cli.py index b265166..a43166f 100644 --- a/src/arroyosas/app/frame_listener_sim_cli.py +++ b/src/arroyosas/app/frame_listener_sim_cli.py @@ -22,9 +22,9 @@ onto ZMQ, taking care of pydantic messages, serialization and msgpack """ -FRAME_WIDTH = 1475 +FRAME_WIDTH = 475 FRAME_HEIGHT = 619 -DATA_TYPE = "float32" +DATA_TYPE = "uint8" app = typer.Typer() @@ -59,6 +59,7 @@ async def process_images( ) print("event") await socket.send(msgpack.packb(event.model_dump())) + await asyncio.sleep(0.5) stop = SASStop(num_frames=frames) print("stop") await socket.send(msgpack.packb(stop.model_dump())) @@ -76,7 +77,7 @@ async def run(): print(f"Cycles: {cycles}, Frames: {frames}, Pause: {pause}") context = zmq.asyncio.Context() socket = context.socket(zmq.PUB) - address = settings.tiled_poller.zmq_frame_publisher.address + address = settings.frame_listener_sim.zmq_frame_publisher.address print(f"Connecting to {address}") socket.bind(address) await process_images(socket, cycles, frames, pause) diff --git a/src/arroyosas/app/lse_operator_cli.py b/src/arroyosas/app/lse_operator_cli.py deleted file mode 100644 index 2a1fb7e..0000000 --- a/src/arroyosas/app/lse_operator_cli.py +++ /dev/null @@ -1,40 +0,0 @@ -import asyncio -import logging - -import typer - -from ..config import settings -from ..log_utils import setup_logger -from ..lse.lse_operator import LatentSpaceOperator -from ..lse.lse_ws_publisher import LSEWSResultPublisher -from ..tiled.tiled_poller import TiledProcessedPublisher -from ..zmq import ZMQFrameListener - -app = typer.Typer() -logger = logging.getLogger("arroyosas") -setup_logger(logger, settings.logging_level) - - -@app.command() -async def start() -> None: - app_settings = settings.lse_operator - logger.info("Getting settings") - logger.info(f"{settings.lse_operator}") - - logger.info("Starting ZMQ PubSub Listener") - logger.info(f"ZMQPubSubListener settings: {app_settings}") - operator = LatentSpaceOperator.from_settings(app_settings, settings.lse_reducer) - - ws_publisher = LSEWSResultPublisher.from_settings(app_settings.ws_publisher) - tiled_event_publisher = TiledProcessedPublisher.from_settings( - settings.tiled_processed - ) - operator.add_publisher(ws_publisher) - operator.add_publisher(tiled_event_publisher) - - listener = ZMQFrameListener.from_settings(app_settings.listener, operator) - await asyncio.gather(listener.start(), ws_publisher.start()) - - -if __name__ == "__main__": - asyncio.run(start()) \ No newline at end of file diff --git a/src/arroyosas/app/tiled_event_sim_cli.py b/src/arroyosas/app/tiled_event_sim_cli.py index 5ef335d..3ba5887 100644 --- a/src/arroyosas/app/tiled_event_sim_cli.py +++ b/src/arroyosas/app/tiled_event_sim_cli.py @@ -208,7 +208,7 @@ def main( log_dir: str = typer.Argument(..., help="Directory containing Tiled event logs"), host: str = typer.Option("0.0.0.0", help="Host to bind the WebSocket server to"), port: int = typer.Option(8000, help="Port to bind the WebSocket server to"), - stream_path: str = typer.Option("/stream", help="WebSocket path for the stream endpoint"), + stream_path: str = typer.Option("/", help="WebSocket path for the stream endpoint"), run_id: str = typer.Option(None, help="Specific run ID to replay (defaults to first run found)") ): """ diff --git a/src/arroyosas/app/tiled_ws_cli.py b/src/arroyosas/app/tiled_ws_cli.py index 445f0e3..1fc818b 100644 --- a/src/arroyosas/app/tiled_ws_cli.py +++ b/src/arroyosas/app/tiled_ws_cli.py @@ -1,23 +1,24 @@ import asyncio import logging +import time import typer from arroyosas.config import settings from arroyosas.log_utils import setup_logger from arroyosas.tiled.tiled_poller import TiledRawFrameOperator -from arroyosas.tiled.tiled_websocket import TiledWebSocketListener +from arroyosas.tiled.tiled_websocket import TiledClientListener from arroyosas.zmq import ZMQFramePublisher app = typer.Typer() logger = logging.getLogger("arroyosas") -app_settings = settings.tiled_poller +app_settings = settings.tiled_ws_listener setup_logger(logger, log_level=settings.logging_level) @app.command() -async def start( +def start( tiled_url: str = typer.Option(None, help="Tiled base URL"), websocket_url: str = typer.Option(None, help="WebSocket URL"), zmq_url: str = typer.Option(None, help="ZMQ publisher URL"), @@ -47,9 +48,8 @@ async def start( operator.add_publisher(publisher) # Create and start listener - listener = TiledWebSocketListener.from_settings(app_settings, operator) - await listener.start() - + listener = TiledClientListener.from_settings(app_settings, operator) + asyncio.run(listener.start()) if __name__ == "__main__": - asyncio.run(start()) + app() diff --git a/src/arroyosas/app/viz_operator_cli.py b/src/arroyosas/app/viz_operator_cli.py index bc31f48..aad0051 100644 --- a/src/arroyosas/app/viz_operator_cli.py +++ b/src/arroyosas/app/viz_operator_cli.py @@ -8,7 +8,7 @@ from ..config import settings from ..log_utils import setup_logger from ..one_d_reduction.operator import OneDReductionOperator -from ..tiled.tiled_poller import TiledProcessedPublisher +from ..tiled.publisher import Tiled1DResultsPublisher from ..websockets import OneDWSPublisher app = typer.Typer() @@ -22,14 +22,14 @@ async def start(): logger.info("Starting Tiled Poller") logger.info("Getting settings") logger.info(f"{settings.viz_operator}") - operator = OneDReductionOperator.from_settings(app_settings) - tiled_event_publisher = TiledProcessedPublisher.from_settings( - settings.tiled_processed + operator = OneDReductionOperator.from_settings(app_settings.operator) + tiled_event_publisher = Tiled1DResultsPublisher.from_settings( + app_settings.publishers.tiled ) ws_publisher = OneDWSPublisher.from_settings(app_settings.ws_publisher) operator.add_publisher(ws_publisher) operator.add_publisher(tiled_event_publisher) - listener = ZMQFrameListener.from_settings(app_settings.listener, operator) + listener = ZMQFrameListener.from_settings(app_settings.publishers.websockets, operator) await asyncio.gather(listener.start(), ws_publisher.start()) diff --git a/src/arroyosas/one_d_reduction/operator.py b/src/arroyosas/one_d_reduction/operator.py index 5aee943..c6dc7fd 100644 --- a/src/arroyosas/one_d_reduction/operator.py +++ b/src/arroyosas/one_d_reduction/operator.py @@ -25,9 +25,8 @@ class OneDReductionOperator(Operator): - def __init__(self, tiled_client: BaseClient, redis_conn: RedisConn): + def __init__(self, redis_conn: RedisConn): super().__init__() - self.tiled_client = tiled_client self.redis_conn = redis_conn self.current_scan_metadata = None # self.mask = None @@ -163,7 +162,4 @@ def generate_masked_image(image, mask): @classmethod def from_settings(cls, settings) -> "OneDReductionOperator": redis_conn = RedisConn.from_settings(settings.redis) - tiled_client = from_uri( - settings.tiled.raw.uri, api_key=settings.tiled.raw.api_key - ) - return cls(tiled_client, redis_conn) + return cls(redis_conn) diff --git a/src/arroyosas/tiled/publisher.py b/src/arroyosas/tiled/publisher.py new file mode 100644 index 0000000..f965774 --- /dev/null +++ b/src/arroyosas/tiled/publisher.py @@ -0,0 +1,307 @@ +import asyncio +import logging +import os +import re +import time +from datetime import datetime + +import numpy as np +import pandas as pd +import pytz +from arroyopy.publisher import Publisher +from tiled.client import from_uri + +from ..schemas import SAS1DReduction + +logger = logging.getLogger(__name__) + + +class Tiled1DResultsPublisher(Publisher): + """Publisher that saves latent space vectors to a Tiled server.""" + + def __init__(self, tiled_uri, tiled_api_key, root_segments=None): + super().__init__() + self.tiled_uri = tiled_uri + self.tiled_api_key = tiled_api_key + self.root_segments = root_segments or ["1D_live_results"] + self.client = None + self.root_container = None + self.daily_container = None + + logger.info(f"Initialized publisher with UUID-based table grouping") + + async def start(self): + """Connect to Tiled server and initialize containers.""" + try: + # Run the entire initialization in a separate thread + await asyncio.to_thread(self._start_sync) + except Exception as e: + logger.error(f"Failed to initialize Tiled client: {e}") + import traceback + logger.error(traceback.format_exc()) + + def _start_sync(self): + """Synchronous implementation of start() to be run in a thread.""" + try: + self.client = from_uri(self.tiled_uri, api_key=self.tiled_api_key) + + # Navigate to the root container and create the daily run container inside it + self._setup_containers_sync() + + # List all existing tables in the daily container + if self.daily_container is not None: + table_keys = list(self.daily_container) + logger.info(f"Found {len(table_keys)} existing tables in daily container") + + # Add all existing tables to our set of existing UUIDs + self.existing_uuids.update(table_keys) + logger.info(f"Tracking {len(self.existing_uuids)} existing UUIDs") + + # Log some examples of existing UUIDs for debugging + if self.existing_uuids: + examples = list(self.existing_uuids)[:3] + logger.info(f"Examples of existing UUIDs: {', '.join(examples)}") + + logger.info(f"Connected to Tiled server at {self.tiled_uri}") + logger.info(f"Using container path: {'/'.join(self.root_segments)}/{DAILY_RUN_ID}") + except Exception as e: + logger.error(f"Error in _start_sync: {e}") + import traceback + logger.error(traceback.format_exc()) + raise # Re-raise so the async method can catch it + + def _extract_uuid_from_url(self, url): + """Extract UUID from tiled_url.""" + if not url: + return self.default_table_name + + # Log the URL for debugging + logger.debug(f"Extracting UUID from URL: {url}") + + match = re.search(UUID_PATTERN, url) + if match: + uuid = match.group(1) + logger.debug(f"Extracted UUID: {uuid}") + return uuid + + logger.debug(f"No UUID found in URL, using default: {self.default_table_name}") + return self.default_table_name + + def _setup_containers_sync(self): + pass + # """Set up the container structure (synchronous version).""" + # try: + # # Navigate through root_segments + # container = self.client + # for segment in self.root_segments: + # if segment in container: + # logger.info(f"Using existing container: {segment}") + # container = container[segment] + # else: + # logger.info(f"Creating container: {segment}") + # container = container.create_container(segment) + + # # Store reference to the root container + # self.root_container = container + + # # Now create the daily run container inside the root container + # if DAILY_RUN_ID not in self.root_container: + # logger.info(f"Creating daily container: {DAILY_RUN_ID}") + # self.root_container.create_container(DAILY_RUN_ID) + # else: + # logger.info(f"Using existing daily container: {DAILY_RUN_ID}") + + # # Store reference to daily container + # self.daily_container = self.root_container[DAILY_RUN_ID] + + # except Exception as e: + # logger.error(f"Error setting up containers: {e}") + # import traceback + # logger.error(traceback.format_exc()) + # raise # Re-raise to propagate the error + + async def publish(self, message): + """Publish a message to Tiled server.""" + if not isinstance(message, SAS1DReduction): + return + + try: + # Run the entire publish operation in a separate thread + uuid_to_write = await asyncio.to_thread(self._publish_sync, message) + + # If there's a UUID to write, write it + if uuid_to_write: + await self.write_table_to_tiled(uuid_to_write) + + except Exception as e: + logger.error(f"Error publishing to Tiled: {e}") + import traceback + logger.error(traceback.format_exc()) + + def _publish_sync(self, message): + """Synchronous implementation of publish() to be run in a thread.""" + try: + # Ensure daily container exists + if self.daily_container is None: + logger.error("Daily container not initialized, cannot publish") + return None + + # Format vector and metadata + vector = np.array(message.feature_vector, dtype=np.float32) + if vector.ndim == 1: + # Extract UUID from tiled_url + tiled_url = getattr(message, "tiled_url", None) + uuid = self._extract_uuid_from_url(tiled_url) + + # Check if this UUID already exists + if uuid in self.existing_uuids: + logger.debug(f"Skipping vector for existing UUID: {uuid}") + return None + + # Check if this is a new UUID + uuid_to_write = None + + if self.current_uuid is not None and uuid != self.current_uuid and self.current_uuid in self.uuid_dataframes: + # We have a new UUID, so write the data for the previous UUID (if it's not an existing UUID) + if self.current_uuid not in self.existing_uuids and not self.uuid_dataframes[self.current_uuid].empty: + logger.info(f"New UUID detected, marking previous UUID for writing: {self.current_uuid}") + uuid_to_write = self.current_uuid + + # Update current UUID + self.current_uuid = uuid + + # Initialize tracking for this UUID if needed + if uuid not in self.uuid_dataframes: + self.uuid_dataframes[uuid] = pd.DataFrame() + + # Create a record with metadata and the vector + record = { + "tiled_url": tiled_url, + "autoencoder_model": getattr(message, "autoencoder_model", None), + "dimred_model": getattr(message, "dimred_model", None), + "timestamp": getattr(message, "timestamp", time.time()), + "total_processing_time": getattr(message, "total_processing_time", None), + "autoencoder_time": getattr(message, "autoencoder_time", None), + "dimred_time": getattr(message, "dimred_time", None) + } + + # Add vector elements as columns (limit to first 20 to keep it manageable) + for i, val in enumerate(vector[:20]): + record[f"feature_{i}"] = float(val) + + # Append to DataFrame for this UUID + new_row = pd.DataFrame([record]) + self.uuid_dataframes[uuid] = pd.concat([self.uuid_dataframes[uuid], new_row], ignore_index=True) + + logger.debug(f"Added vector to table '{uuid}'") + + return uuid_to_write + else: + logger.warning(f"Received vector with unexpected dimensions: {vector.shape}") + return None + except Exception as e: + logger.error(f"Error in _publish_sync: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + + async def write_table_to_tiled(self, table_key): + """Write the collected vectors for a specific UUID to Tiled.""" + try: + # Run the write operation in a separate thread + await asyncio.to_thread(self._write_table_to_tiled_sync, table_key) + except Exception as e: + logger.error(f"Error in write_table_to_tiled for {table_key}: {e}") + import traceback + logger.error(traceback.format_exc()) + + def _write_table_to_tiled_sync(self, table_key): + """Synchronous implementation of write_table_to_tiled to be run in a thread.""" + try: + # Check if this UUID already exists + if table_key in self.existing_uuids: + logger.info(f"Skipping write for existing UUID: {table_key}") + return + + # Get the DataFrame for this UUID + df = self.uuid_dataframes.get(table_key) + if df is None: + logger.warning(f"No DataFrame found for {table_key}") + return + + # Log DataFrame info for debugging + logger.info(f"Writing {len(df)} vectors to new table '{table_key}'") + + # Check if DataFrame is empty + if df.empty: + logger.warning(f"DataFrame for {table_key} is empty, nothing to write") + return + + # Simply write the DataFrame to Tiled + try: + # Use write_dataframe with the UUID as the key + self.daily_container.write_dataframe(df, key=table_key) + + logger.info(f"Successfully wrote {len(df)} vectors to '{table_key}'") + + # Add this UUID to our set of existing UUIDs + self.existing_uuids.add(table_key) + + # Clear the DataFrame for this UUID + self.uuid_dataframes[table_key] = pd.DataFrame() + + except Exception as e: + logger.error(f"Error writing DataFrame for {table_key}: {e}") + import traceback + logger.error(traceback.format_exc()) + + except Exception as e: + logger.error(f"Error in _write_table_to_tiled_sync for {table_key}: {e}") + import traceback + logger.error(traceback.format_exc()) + + async def stop(self): + """Write any remaining data for new UUIDs before stopping.""" + try: + # Run the stopping operation in a separate thread to get UUID to write + uuid_to_write = await asyncio.to_thread(self._stop_sync) + + # If there's a UUID to write, write it + if uuid_to_write: + logger.info(f"Writing final data for UUID: {uuid_to_write}") + await self.write_table_to_tiled(uuid_to_write) + + logger.info("Publisher stopped") + except Exception as e: + logger.error(f"Error stopping publisher: {e}") + import traceback + logger.error(traceback.format_exc()) + + def _stop_sync(self): + """Synchronous implementation of stop() to be run in a thread. + + Returns: + str or None: UUID that needs to be written, or None if no writing needed + """ + try: + logger.info("Publisher stopping, checking if current UUID needs writing") + + # Check if the current UUID needs writing + if (self.current_uuid is not None and + self.current_uuid not in self.existing_uuids and + self.current_uuid in self.uuid_dataframes and + not self.uuid_dataframes[self.current_uuid].empty): + + return self.current_uuid + + return None + except Exception as e: + logger.error(f"Error in _stop_sync: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + + @classmethod + def from_settings(cls, settings): + """Create a TiledResultsPublisher from settings.""" + return cls(root_segments=settings.get("root_segments")) \ No newline at end of file diff --git a/src/arroyosas/tiled/tiled_websocket.py b/src/arroyosas/tiled/tiled_websocket.py index 6c8e78b..cfd62e6 100644 --- a/src/arroyosas/tiled/tiled_websocket.py +++ b/src/arroyosas/tiled/tiled_websocket.py @@ -6,18 +6,18 @@ from collections import defaultdict from typing import Any, Dict -# import numpy as np +from arroyosas.schemas import ( # SASStop,; SerializableNumpyArrayModel, + RawFrameEvent, + SASMessage, + SASStart, + SerializableNumpyArrayModel +) from arroyopy.listener import Listener from arroyopy.operator import Operator from tiled.client import from_uri from tiled.client.base import BaseClient from tiled.client.stream import Subscription -from arroyosas.schemas import ( # SASStop,; SerializableNumpyArrayModel, - RawFrameEvent, - SASMessage, - SASStart, -) logger = logging.getLogger(__name__) @@ -35,15 +35,16 @@ def __init__( self, operator: Operator, tiled_client: BaseClient, - stream_name: str, - target: str = "img", - create_run_logs: bool = True, + stream_name: str = "primary", + target: str = "pil2M_image", + create_run_logs: bool = False, log_dir: str = "tiled_logs", ): + self.operator = operator self.tiled_client = tiled_client - self.stream_name = stream_name - self.target = target + self.stream_name = stream_name # The name of the stream to listen to e.g. 'primary' + self.target = target # The name of the data source to listen to e.g. 'pil2M_image' self.create_run_logs = create_run_logs if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) @@ -57,109 +58,139 @@ def on_new_run(self, sub: Subscription, data: Dict[str, Any]): """ Handle new run events by creating a subscription for the run. """ - uid = data["key"] - logger.debug(f"New run {uid}") if logger.isEnabledFor(logging.DEBUG) else None - - # Create new folder for this run - if self.create_run_logs: - self.create_run_folder(uid) - self.log_message_to_json("on_new_run", sub, data) - - # Subscribe to the run - run_sub = Subscription(self.tiled_client.context, [uid], start=0) - run_sub.add_callback(self.on_streams_namespace) - run_sub.start() - # Publish start event - self.publish_start(data) + try: + uid = data["key"] + logger.debug(f"New run {uid}") if logger.isEnabledFor(logging.DEBUG) else None + + # Create new folder for this run + if self.create_run_logs: + self.create_run_folder(uid) + self.log_message_to_json("on_new_run", sub, data) + + # Subscribe to the run + run_sub = Subscription(self.tiled_client.context, [uid], start=0) + run_sub.add_callback(self.on_streams_namespace) + run_sub.start() + # Publish start event + self.publish_start(data) + except Exception as e: + logger.error(f"Error processing new run {data}: {e}") + return def on_streams_namespace(self, sub, data): """ Handle new streams namespace events by subscribing to the 'streams' segment. For example, this might be the creation of 'baseline' or 'primary' streams. """ - logger.debug(data) if logger.isEnabledFor(logging.DEBUG) else None - - # Log the event - if self.create_run_logs: - self.log_message_to_json("on_streams_namespace", sub, data) - - streams_sub = Subscription( - self.tiled_client.context, sub.segments + ["streams"], start=0 - ) - streams_sub.add_callback(self.on_new_stream) - streams_sub.start() + try: + logger.debug(data) if logger.isEnabledFor(logging.DEBUG) else None + + # Log the event + if self.create_run_logs: + self.log_message_to_json("on_streams_namespace", sub, data) + streams_sub = Subscription( + self.tiled_client.context, sub.segments + ["streams"], start=0 + ) + streams_sub.add_callback(self.on_new_stream) + streams_sub.start() + except Exception as e: + logger.error(f"Error processing streams namespace {data}: {e}") + return def on_new_stream(self, sub, data): """ Handle new stream """ - logger.debug(data) if logger.isEnabledFor(logging.DEBUG) else None - stream_name = data["key"] - logger.info(f"new stream {stream_name}") if logger.isEnabledFor( - logging.INFO - ) else None - - if self.create_run_logs: - self.log_message_to_json("on_new_stream", sub, data) - - stream_sub = Subscription( - self.tiled_client.context, sub.segments + [stream_name], start=0 - ) - stream_sub.add_callback(self.on_node_in_stream) - stream_sub.start() - - def on_event(self, sub: Subscription, data: Dict[str, Any]) -> None: - """ - Handle new event - """ - - logger.info(data) if logger.isEnabledFor(logging.INFO) else None - if self.create_run_logs: - self.log_message_to_json("on_event", sub, data) + try: + logger.debug(data) if logger.isEnabledFor(logging.DEBUG) else None + stream_name = data["key"] + if stream_name != self.stream_name: + logger.debug(f"Skipping stream {stream_name}, not the target stream") if logger.isEnabledFor( + logging.INFO + ) else None + return + logger.info(f"new stream {stream_name}") if logger.isEnabledFor( + logging.INFO + ) else None + + if self.create_run_logs: + self.log_message_to_json("on_new_stream", sub, data) + + stream_sub = Subscription( + self.tiled_client.context, sub.segments + [stream_name], start=0 + ) + stream_sub.add_callback(self.on_node_in_stream) + stream_sub.start() + except Exception as e: + logger.error(f"Error processing new stream {data}: {e}") + return + def load_data(self, sub, data): + try: + patch = data['patch'] + logger.debug(data['uri']) if logger.isEnabledFor(logging.DEBUG) else None + slice_ = tuple(slice(offset, offset + shape) for offset, shape in zip(patch["offset"], patch["shape"])) # GET /array/full/... + node = self.tiled_client['/'.join(sub.segments)] # GET /metadata/... wasteful to do it on each load_data call + data['images'] = node.read(slice=slice_) # could be sub.node.read(...) + logger.debug(f"images shape {data['images'].shape}") if logger.isEnabledFor(logging.DEBUG) else None + self.publish_event(data) + except Exception as e: + logger.error(f"Error loading data for {sub.segments}: {e}") + return + def on_node_in_stream(self, sub, data): logger.debug(data) if logger.isEnabledFor(logging.DEBUG) else None key = data["key"] - if self.create_run_logs: self.log_message_to_json("on_node_in_stream", sub, data) - # Log what we're comparing for debugging - logger.info( - f"Checking key '{key}' against target '{self.target}'" - ) if logger.isEnabledFor(logging.INFO) else None - if key != self.target: - logger.info( - f"Key '{key}' does not match target '{self.target}', skipping" - ) if logger.isEnabledFor(logging.INFO) else None + logger.debug(f"Skipping node {key}, not the target node") if logger.isEnabledFor( + logging.INFO + ) else None return - - logger.info( - f"Key '{key}' matches target '{self.target}', proceeding" - ) if logger.isEnabledFor(logging.INFO) else None - - stream_sub = Subscription( - self.tiled_client.context, sub.segments + [key], start=0 - ) - # stream_sub.add_callback(print) - stream_sub.add_callback(self.on_event) - stream_sub.start() - self.publish_event(data) + try: + stream_sub = Subscription( + self.tiled_client.context, sub.segments + [key], start=0 + ) + stream_sub.add_callback(self.load_data) + stream_sub.start() + except Exception as e: + logger.error(f"Error processing node {sub.segments + [key]}: {e}") async def start(self) -> None: """Start the listener by calling _start method.""" self._running = True await asyncio.to_thread(self._start) + while True: + if not self._running: + break + await asyncio.sleep(1) def _start(self) -> None: - """Subscribe to the socket at the provided base segments level""" + """ + Subscribe to the socket at the provided base segments level + + When tiledwriter puts bluesky data into tiled, it's in newish tree: + catalog (those in quotes are literal names): + + - run: (run start and stop as metadata) + - "streams": (namespace) + - stream_name: (e.g. 'primary') + - key: (e.g. 'pil2M_image') + - "internal" (table of data from events) + """ node = self.tiled_client catalog_sub = Subscription(node.context) catalog_sub.add_callback(self.on_new_run) # catalog_sub.add_callback(self.test) - catalog_sub.start() + try: + catalog_sub.start() + except Exception as e: + # self.context.revoke_api_key(key_info["first_eight"]) + + return print("I'm running") async def stop(self) -> None: @@ -171,19 +202,34 @@ def send_to_operator(self, message: SASMessage) -> None: asyncio.run(self.operator.process(message)) def publish_start(self, data: Dict[str, Any]) -> None: + + # We need to make a request to get image information + # structure = ArrayStructure.from_json(data["data_source"]["structure"]) + if self.tiled_client[data['key']]['streams'].get(self.stream_name) is None: + logger.debug(f"Stream {self.stream_name} not found for key {data['key']}") + return + + structure = self.tiled_client[data['key']]['streams'][self.stream_name][self.target]._structure start = SASStart( - data=data, # Include any relevant data for the start event + run_name=data['key'], + run_id=data['key'], + width=structure.shape[1], + height=structure.shape[2], + data_type=structure.data_type.to_numpy_dtype().str, + tiled_url=self.tiled_client.uri, ) + logging.debug(f"sending start message: {start}") if logging.getLogger().isEnabledFor(logging.DEBUG) else None self.send_to_operator(start) def publish_event(self, data: Dict[str, Any]) -> None: """ Publish an event to the operator. """ + serializable_array = SerializableNumpyArrayModel(array=data['images']) event = RawFrameEvent( - image=None, + image=serializable_array, frame_number=data.get("sequence", 0), - tiled_url="", # Placeholder for actual URL if needed + tiled_url=data['uri'], # Placeholder for actual URL if needed ) self.send_to_operator(event) @@ -242,24 +288,21 @@ def from_settings(cls, settings: Any, op: Operator) -> "TiledClientListener": settings.uri, api_key=settings.api_key, ) - # for key in client.context.whoami()['api_keys']: - # client.context.revoke_api_key(key['first_eight']) - logger.info(f"#### Listening for runs at {settings.base_segments}") - # logger.info(f"#### Frames segments: {settings.frames_segments}") # Create log directory if specified in settings log_dir = getattr(settings, "log_dir", "tiled_logs") - return cls( op, - client, - settings.stream_name, - settings.target, - log_dir, + tiled_client=client, + stream_name=settings.stream_name, + target=settings.target, + create_run_logs=getattr(settings, "create_run_logs", False), + log_dir=log_dir, ) if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) # Example usage settings = { @@ -268,9 +311,8 @@ def from_settings(cls, settings: Any, op: Operator) -> "TiledClientListener": "api_key": None, # Replace with actual API key if needed "base_segments": [], # "frames_segments": ["primary", "data"], - "stream_name": "primary", - "target": "pil2M_image", - "log_dir": "tiled_event_logs", # Directory for JSON logs + "stream_name": "primary", + "log_dir": "tiled_event_logs", # Directory for JSON log } class Settings: @@ -288,6 +330,3 @@ async def process(self, message: SASMessage) -> None: listener = TiledClientListener.from_settings(settings, n_operator) asyncio.run(listener.start()) - - while True: - time.sleep(5) # Keep the script running diff --git a/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_run_0001.json b/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_run_0001.json new file mode 100644 index 0000000..b9ae074 --- /dev/null +++ b/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_run_0001.json @@ -0,0 +1,130 @@ +{ + "event_name": "on_new_run", + "sequence": 1, + "timestamp": 1753492065.6542416, + "subscription_segments": [ + "/" + ], + "callback_data": { + "sequence": 285, + "timestamp": "2025-07-26T01:07:45.616488", + "key": "0d7125fa-96e5-48ac-bb47-6c578c9a3177", + "structure_family": "container", + "specs": [ + { + "name": "BlueskyRun", + "version": "3.0" + } + ], + "metadata": { + "start": { + "uid": "0d7125fa-96e5-48ac-bb47-6c578c9a3177", + "time": 1753492065.4753332, + "username": "egann", + "beamline_name": "SMI", + "facility": "NSLS-II", + "tstamp": 1752211775.3666577, + "beamline_attenuators": { + "att1_7_status": { + "material": "Sn_60um", + "thickness": "4x" + } + }, + "sample_name": "test_test_16.10keV_wa16.0_sdd5.0m", + "SAXS_setup": { + "sdd": 5000, + "beam_centre": [ + 746, + 1105 + ], + "bs": "rod", + "energy": 16100 + }, + "x0": 1499.8220000000001, + "beamline_beamsize": "3.5um", + "start_datetime": "2025-07-16T13:25:25.696186", + "proposal": { + "proposal_id": "317903", + "title": "Advancing Real-Time Characterization of Polymer Membrane Formation Using X-Ray Scattering", + "type": "General User", + "pi_name": "Gregory Su" + }, + "beamline_sample_environment": "in-air", + "versions": { + "ophyd": "1.10.5", + "bluesky": "1.8.2.dev1119+gfb1e9960", + "ophyd_async": "0.10.0a4" + }, + "alignment_LUT": { + "-2000": { + "x": -2000, + "y": 7105.626, + "z": 2000, + "th": 0.4 + }, + "0": { + "x": 0, + "y": 7092.411, + "z": 2000, + "th": 0.4 + }, + "2000": { + "x": 2000, + "y": 7075.913, + "z": 2000, + "th": 0.4 + } + }, + "project_name": "solvent_blanks", + "cycle": "2025-2", + "scan_id": 971395, + "th0": 1.180005, + "data_session": "pass-317903", + "plan_type": "generator", + "plan_name": "rel_scan", + "detectors": [ + "pil2M" + ], + "motors": [ + "piezo_y" + ], + "num_points": 21, + "num_intervals": 20, + "plan_args": { + "detectors": [ + "SAXS_Detector(prefix='XF:12ID2-ES{Pilatus:Det-2M}', name='pil2M', read_attrs=['cam', 'cam.file_number', 'tiff', 'stats1', 'stats1.total'], configuration_attrs=['cam', 'cam.acquire_period', 'cam.acquire_time', 'cam.image_mode', 'cam.manufacturer', 'cam.model', 'cam.num_exposures', 'cam.num_images', 'cam.trigger_mode', 'tiff', 'stats1', 'stats1.bgd_width', 'stats1.centroid_threshold', 'stats1.compute_centroid', 'stats1.compute_histogram', 'stats1.compute_profiles', 'stats1.compute_statistics', 'stats1.hist_max', 'stats1.hist_min', 'stats1.hist_size', 'stats1.profile_cursor', 'stats1.profile_size', 'stats1.ts_num_points'])" + ], + "num": 21, + "args": [ + "EpicsMotor(prefix='XF:12IDC-ES:2{MCS:1-Ax:3}Mtr', name='piezo_y', parent='piezo', settle_time=0.0, timeout=None, read_attrs=['user_readback', 'user_setpoint'], configuration_attrs=['user_readback', 'user_setpoint'])", + -2000, + 2000 + ], + "per_step": "None" + }, + "hints": { + "dimensions": [ + [ + [ + "piezo_y" + ], + "primary" + ] + ] + }, + "plan_pattern": "inner_product", + "plan_pattern_module": "bluesky.plan_patterns", + "plan_pattern_args": { + "num": 21, + "args": [ + "EpicsMotor(prefix='XF:12IDC-ES:2{MCS:1-Ax:3}Mtr', name='piezo_y', parent='piezo', settle_time=0.0, timeout=None, read_attrs=['user_readback', 'user_setpoint'], configuration_attrs=['user_readback', 'user_setpoint'])", + -2000, + 2000 + ] + } + } + }, + "data_sources": [], + "uri": "http://tiled-dev.nsls2.bnl.gov/api/v1/array/full/" + } +} \ No newline at end of file diff --git a/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_stream_0001.json b/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_stream_0001.json new file mode 100644 index 0000000..7d8a2fb --- /dev/null +++ b/tiled_logs/run_0d7125fa-96e5-48ac-bb47-6c578c9a3177/on_new_stream_0001.json @@ -0,0 +1,369 @@ +{ + "event_name": "on_new_stream", + "sequence": 1, + "timestamp": 1753492088.4022586, + "subscription_segments": [ + "0d7125fa-96e5-48ac-bb47-6c578c9a3177", + "streams" + ], + "callback_data": { + "sequence": 2, + "timestamp": "2025-07-26T01:08:08.362981", + "key": "primary", + "structure_family": "composite", + "specs": [ + { + "name": "BlueskyEventStream", + "version": "3.0" + } + ], + "metadata": { + "configuration": { + "pil2M": { + "data": { + "pil2M_cam_acquire_period": 0.501, + "pil2M_cam_acquire_time": 0.5, + "pil2M_cam_image_mode": 1, + "pil2M_cam_manufacturer": "Dectris", + "pil2M_cam_model": "Pilatus", + "pil2M_cam_num_exposures": 1, + "pil2M_cam_num_images": 1, + "pil2M_cam_trigger_mode": 0, + "pil2M_stats1_bgd_width": 1, + "pil2M_stats1_centroid_threshold": 1.0, + "pil2M_stats1_compute_centroid": "No", + "pil2M_stats1_compute_histogram": "No", + "pil2M_stats1_compute_profiles": "No", + "pil2M_stats1_compute_statistics": "Yes", + "pil2M_stats1_hist_max": 255.0, + "pil2M_stats1_hist_min": 0.0, + "pil2M_stats1_hist_size": 256, + "pil2M_stats1_ts_num_points": 2048 + }, + "timestamps": { + "pil2M_cam_acquire_period": 1753389986.881245, + "pil2M_cam_acquire_time": 1753389986.872906, + "pil2M_cam_image_mode": 1746550981.403281, + "pil2M_cam_manufacturer": 1746550977.988163, + "pil2M_cam_model": 1746550977.988235, + "pil2M_cam_num_exposures": 1746550977.98888, + "pil2M_cam_num_images": 1753380750.712442, + "pil2M_cam_trigger_mode": 1746550977.988523, + "pil2M_stats1_bgd_width": 1746550977.988214, + "pil2M_stats1_centroid_threshold": 1746550977.987016, + "pil2M_stats1_compute_centroid": 1746550977.987631, + "pil2M_stats1_compute_histogram": 1746550977.987861, + "pil2M_stats1_compute_profiles": 1746550977.987757, + "pil2M_stats1_compute_statistics": 1746550977.987549, + "pil2M_stats1_hist_max": 1746550977.987272, + "pil2M_stats1_hist_min": 1746550977.987138, + "pil2M_stats1_hist_size": 1746550977.988419, + "pil2M_stats1_ts_num_points": 1746550977.949789 + }, + "data_keys": { + "pil2M_cam_acquire_period": { + "source": "PV:XF:12ID2-ES{Pilatus:Det-2M}cam1:AcquirePeriod_RBV", + "dtype": "number", + "shape": [], + "units": "", + "lower_ctrl_limit": 0.0, + "upper_ctrl_limit": 0.0, + "precision": 3, + "dtype_numpy": "