diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 0000000..ab7a6e7 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,19 @@ +FROM python:3.11 + +WORKDIR /app + +COPY . /app + + +RUN apt-get update \ + && apt-get install -y --no-install-recommends git \ + && rm -rf /var/lib/apt/lists/* \ + && pip install debugpy + +RUN pip install --upgrade pip && \ + pip install -e .[all] + +RUN pip install debugpy + + +CMD ["echo", "Hello World"] diff --git a/pyproject.toml b/pyproject.toml index e1cdd24..a34bf55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] -name = "arroyoGISAXS" +name = "arroyoSAS" version = "0.0.1" description = "A package to perform computations and suggestions during a time-resolved GISAXS experiment." authors = [{ name = "Dylan McReynolds", email = "dmcreynolds@lbl.gov" }] @@ -22,7 +22,6 @@ dependencies = [ "typer", "websockets", "zarr", - "tiled >=0.1.0b14, <0.2", "transformers", "redis"] diff --git a/src/_test/test_tiled_poller.py b/src/_test/test_tiled_poller.py index c4f26c4..c925933 100644 --- a/src/_test/test_tiled_poller.py +++ b/src/_test/test_tiled_poller.py @@ -8,7 +8,7 @@ GISAXSRawEvent, SerializableNumpyArrayModel, ) -from arroyogisaxs.tiled import unsent_frame_numbers +from arroyogisaxs.tiled.tiled import unsent_frame_numbers # import pytest_asyncio # from arroyopy.operator import Operator diff --git a/src/arroyogisaxs/app/frame_listener_sim_cli.py b/src/arroyogisaxs/app/frame_listener_sim_cli.py index 25520c2..d4ce55e 100644 --- a/src/arroyogisaxs/app/frame_listener_sim_cli.py +++ b/src/arroyogisaxs/app/frame_listener_sim_cli.py @@ -72,9 +72,11 @@ async def process_images( @app.command() def main(cycles: int = 10000, frames: int = 50, pause: float = 5): async def run(): + print("Starting frame listener simulation") + print(f"Cycles: {cycles}, Frames: {frames}, Pause: {pause}") context = zmq.asyncio.Context() socket = context.socket(zmq.PUB) - address = settings.tiled_poller.publish_address + address = settings.tiled_poller.zmq_frame_publisher.address print(f"Connecting to {address}") socket.bind(address) await process_images(socket, cycles, frames, pause) diff --git a/src/arroyogisaxs/app/lse_operator_cli.py b/src/arroyogisaxs/app/lse_operator_cli.py index 66ccbfe..2a48950 100644 --- a/src/arroyogisaxs/app/lse_operator_cli.py +++ b/src/arroyogisaxs/app/lse_operator_cli.py @@ -7,7 +7,7 @@ from ..log_utils import setup_logger from ..lse.lse_operator import LatentSpaceOperator from ..lse.lse_ws_publisher import LSEWSResultPublisher -from ..tiled import TiledProcessedPublisher +from ..tiled.tiled import TiledProcessedPublisher from ..zmq import ZMQFrameListener app = typer.Typer() diff --git a/src/arroyogisaxs/app/tiled_file_watcher_cli.py b/src/arroyogisaxs/app/tiled_file_watcher_cli.py new file mode 100644 index 0000000..03ebeb4 --- /dev/null +++ b/src/arroyogisaxs/app/tiled_file_watcher_cli.py @@ -0,0 +1,136 @@ +import asyncio +import logging +import os +from pathlib import Path +from typing import Optional + + +from arroyopy import Listener, Operator, Publisher +from arroyopy.files import FileWatcherMessage +from tiled.client import from_uri +from tiled.client.base import BaseClient +import typer +from watchfiles import awatch, Change + +from ..tiled.ingestor import TiledIngestor + +# ----------------------------------------------------------------------------- +# Logging Setup +# ----------------------------------------------------------------------------- +logger = logging.getLogger("data_watcher") + + +def setup_logging(log_level: str = "INFO"): + level = getattr(logging, log_level.upper(), logging.INFO) + logger.setLevel(level) + + if not logger.hasHandlers(): + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.propagate = False # Prevent duplication through root logger + + + + + + +# ----------------------------------------------------------------------------- +# CLI App +# ----------------------------------------------------------------------------- +app = typer.Typer(help="Watch a directory and publish new .gb files to Redis.") + + +class TiledPublisher(Publisher): + def __init__(self, tiled_ingestor: TiledIngestor): + self.tiled_ingestor = tiled_ingestor + super().__init__() + + async def publish(self, message: FileWatcherMessage): + logger.debug(f"Publishing message to Tiled: {message}") + self.tiled_ingestor.add_scan_tiled(message.file_path) + +class FileWatcherOperator(Operator): + def __init__(self, publisher: Publisher): + self.publisher = publisher + + async def process(self, message): + logger.info(f"Processing message: {message}") + await self.publisher.publish(message) + + +class NullPublisher(Publisher): + async def publish(self, message): + logger.debug(f"NullPublisher: {message} - No action taken.") + +class FileWatcherListener(Listener): + def __init__(self, directory: str, operator: Operator, force_polling: bool = True): + self.directory = directory + self.operator = operator + self.force_polling = force_polling + + async def start(self): + logger.info(f"🔍 Watching directory recursively: {self.directory} (force_polling={self.force_polling})") + async for changes in awatch(self.directory, force_polling=self.force_polling): + for change_type, path_str in changes: + path = Path(path_str) + if not path.exists(): + logger.debug(f"⚠️ Skipping non-existent path: {path}") + continue + if path.suffix not in [".gb", ".edf"]: + logger.debug(f"⚠️ Skipping non-supported file type: {path.suffix}") + continue + if path.is_dir(): + logger.debug(f"⚠️ Skipping directory: {path}") + continue + logger.info(f"📦 Detected: {change_type} on {path}") + message = FileWatcherMessage( + file_path=str(path), is_directory=path.is_dir() + ) + await self.operator.process(message) + + async def stop(self): + pass + + +@app.command() +def main( + directory: Path = typer.Argument(..., help="Directory to watch for new files"), + tiled_uri: str = typer.Option(str, help="Tiled server URI, can bet set with TILED_URI env var"), + tiled_raw_root: str = typer.Option(str, help= "Root locatiuon of tile raw data can be set with TILED_RAW_ROOT env var"), + tiled_api_key: Optional[int] = typer.Option(None, help="tiled apikey can be set with TILED_API_KEY env vars"), + log_level: str = typer.Option( + "INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR)" + ), +): + setup_logging(log_level) + + loop = asyncio.get_event_loop() + + + if not tiled_uri: + tiled_uri = os.getenv("TILED_URI") + if not tiled_api_key: + tiled_api_key = os.getenv("TILED_API_KEY") + if not tiled_raw_root: + tiled_raw_root = os.getenv("TILED_RAW_ROOT") + + + logger.info(f"Connecting to Tiled server at {tiled_uri} with root {tiled_raw_root}") + tiled_client = from_uri(tiled_uri, api_key=tiled_api_key) + tiled_ingestor = TiledIngestor(tiled_client, tiled_raw_root, directory) + publisher = TiledPublisher(tiled_ingestor) + logger.info("Using Tiled publisher") + else: + publisher = NullPublisher() + logger.info("Using default null publisher") + + operator = FileWatcherOperator(publisher) + listener = FileWatcherListener(str(directory), operator) + loop.run_until_complete(listener.start()) + + +if __name__ == "__main__": + app() diff --git a/src/arroyogisaxs/app/tiled_poller_cli.py b/src/arroyogisaxs/app/tiled_poller_cli.py index 1c27ef3..660b0ac 100644 --- a/src/arroyogisaxs/app/tiled_poller_cli.py +++ b/src/arroyogisaxs/app/tiled_poller_cli.py @@ -4,7 +4,7 @@ import typer from arroyogisaxs.config import settings -from arroyogisaxs.tiled import TiledPollingFrameListener, TiledRawFrameOperator +from arroyogisaxs.tiled.tiled import TiledPollingFrameListener, TiledRawFrameOperator from arroyogisaxs.zmq import ZMQFramePublisher from ..log_utils import setup_logger diff --git a/src/arroyogisaxs/app/viz_operator_cli.py b/src/arroyogisaxs/app/viz_operator_cli.py index e306346..52f0fd8 100644 --- a/src/arroyogisaxs/app/viz_operator_cli.py +++ b/src/arroyogisaxs/app/viz_operator_cli.py @@ -9,7 +9,7 @@ from ..log_utils import setup_logger from ..one_d_reduction.operator import OneDReductionOperator -from ..tiled import TiledProcessedPublisher +from ..tiled.tiled import TiledProcessedPublisher from ..websockets import OneDWSPublisher app = typer.Typer() diff --git a/src/arroyogisaxs/one_d_reduction/operator.py b/src/arroyogisaxs/one_d_reduction/operator.py index 352c82a..0bfec31 100644 --- a/src/arroyogisaxs/one_d_reduction/operator.py +++ b/src/arroyogisaxs/one_d_reduction/operator.py @@ -17,7 +17,7 @@ GISAXSStop, SerializableNumpyArrayModel, ) -from ..tiled import get_nested_client +from ..tiled.tiled import get_nested_client from .detector import VerticalPilatus900kw from .reduce import pixel_roi_horizontal_cut diff --git a/src/arroyogisaxs/tiled/__init__.py b/src/arroyogisaxs/tiled/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/arroyogisaxs/tiled/ingestor.py b/src/arroyogisaxs/tiled/ingestor.py new file mode 100644 index 0000000..569bddc --- /dev/null +++ b/src/arroyogisaxs/tiled/ingestor.py @@ -0,0 +1,126 @@ +import logging +import os +import pathlib + +import numpy as np +from tiled.client.base import BaseClient +from tiled.structures.array import ArrayStructure, BuiltinDtype +from tiled.structures.core import Spec, StructureFamily +from tiled.structures.data_source import Asset, DataSource, Management +from tiled.utils import ensure_uri + +logger = logging.getLogger("data_watcher.tiled_ingest") + + +class TiledIngestor: + def __init__(self, tiled_client: BaseClient, raw_tiled_root: str, path_to_raw_data: str): + self.tiled_client = tiled_client + self.raw_tiled_root = raw_tiled_root + self.path_to_raw_data = path_to_raw_data + + + def add_scan_tiled(self, scan_filepath: str) -> str: + # gives a root path path, lie /raw_data/scan1/good.file, would give you /raw_data/scan1 + common_path = os.path.commonpath([self.path_to_raw_data, scan_filepath]) + if common_path is None: + return None + + relative_scan_filepath = os.path.relpath(scan_filepath, self.path_to_raw_data) + scan_container, scan = os.path.split(relative_scan_filepath) + scan_container_parts = os.path.normpath(scan_container).split(os.sep) + + # scan_container_parts is [raw_data scan1] + + # create containers in tiled if they do not exist + current_container_client = self.tiled_client[self.raw_tiled_root] + for part in scan_container_parts: + if part in current_container_client: + current_container_client = current_container_client[part] + else: + current_container_client = current_container_client.create_container( + key=part + ) + key = os.path.splitext(scan)[0] + + if key in current_container_client: + current_container_client.delete(key) + + # hard coded for bl733 and P03 + structure = ArrayStructure( + data_type=BuiltinDtype.from_numpy_dtype( + np.dtype("float32") if scan_filepath.endswith(".gb") else np.dtype("int32") + ), + shape=(1679, 1475), + chunks=((1679,), (1475,)), + ) + + if scan_filepath.endswith(".edf"): + metadata = parse_txt_accompanying_edf(scan_filepath) + else: + metadata = {} + + # TODO: Add metadata and spec + + scan_client = current_container_client.new( + key=key, + structure_family=StructureFamily.array, + data_sources=[ + DataSource( + management=Management.external, + mimetype=( + "application/x-gb" + if scan_filepath.endswith(".gb") + else "application/x-edf" + ), + structure_family=StructureFamily.array, + structure=structure, + assets=[ + Asset( + data_uri=ensure_uri(scan_filepath), + is_directory=False, + parameter="data_uri", + ) + ], + ), + ], + metadata=metadata, + specs=[Spec("gb") if scan_filepath.endswith(".gb") else Spec("edf")], + ) + return scan_client.uri + + +def parse_txt_accompanying_edf(filepath): + """Pase a .txt file produced at ALS beamline 7.3.3 into a dictionary. + + Parameters + ---------- + filepath: str or pathlib.Path + Filepath of the .edf file. + """ + txt_filepath = None + if isinstance(filepath, str): + txt_filepath = filepath.replace(".edf", ".txt") + if isinstance(filepath, pathlib.Path): + txt_filepath = filepath.with_suffix(".txt") + + # File does not exist, return empty dictionary + if not os.path.isfile(txt_filepath): + return dict() + + with open(txt_filepath, "r") as file: + lines = file.readlines() + + # Some lines have the format + # key: value + # others are just values with no key + keyless_lines = 0 + txt_params = dict() + for line in lines: + line_components = list(map(str.strip, line.split(":", maxsplit=1))) + if len(line_components) >= 2: + txt_params[line_components[0]] = line_components[1] + else: + if line_components[0] != "!0": + txt_params[f"Keyless Parameter #{keyless_lines}"] = line_components[0] + keyless_lines += 1 + return txt_params diff --git a/src/arroyogisaxs/tiled.py b/src/arroyogisaxs/tiled/tiled.py similarity index 99% rename from src/arroyogisaxs/tiled.py rename to src/arroyogisaxs/tiled/tiled.py index 4af4e19..717c5d8 100644 --- a/src/arroyogisaxs/tiled.py +++ b/src/arroyogisaxs/tiled/tiled.py @@ -16,7 +16,7 @@ # from tiled.client.dataframe import DataFrameClient from tiled.client.container import Container -from .schemas import ( +from ..schemas import ( GISAXS1DReduction, GISAXSLatentSpaceEvent, GISAXSMessage, diff --git a/tiled_script.py b/tiled_script.py new file mode 100644 index 0000000..0a38455 --- /dev/null +++ b/tiled_script.py @@ -0,0 +1,6 @@ +import os + +from tiled.client import from_uri + +c = from_uri("http://tiled:8000", api_key=os.getenv("TILED_API_KEY")) +raw = c["bl733/raw"] \ No newline at end of file