Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM python:3.11

WORKDIR /app

COPY . /app


RUN apt-get update \
&& apt-get install -y --no-install-recommends git \
&& rm -rf /var/lib/apt/lists/* \
&& pip install debugpy

RUN pip install --upgrade pip && \
pip install -e .[all]

RUN pip install debugpy


CMD ["echo", "Hello World"]
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"


[project]
name = "arroyoGISAXS"
name = "arroyoSAS"
version = "0.0.1"
description = "A package to perform computations and suggestions during a time-resolved GISAXS experiment."
authors = [{ name = "Dylan McReynolds", email = "dmcreynolds@lbl.gov" }]
Expand All @@ -22,7 +22,6 @@ dependencies = [
"typer",
"websockets",
"zarr",
"tiled >=0.1.0b14, <0.2",
"transformers",
"redis"]

Expand Down
2 changes: 1 addition & 1 deletion src/_test/test_tiled_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
GISAXSRawEvent,
SerializableNumpyArrayModel,
)
from arroyogisaxs.tiled import unsent_frame_numbers
from arroyogisaxs.tiled.tiled import unsent_frame_numbers

# import pytest_asyncio
# from arroyopy.operator import Operator
Expand Down
4 changes: 3 additions & 1 deletion src/arroyogisaxs/app/frame_listener_sim_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ async def process_images(
@app.command()
def main(cycles: int = 10000, frames: int = 50, pause: float = 5):
async def run():
print("Starting frame listener simulation")
print(f"Cycles: {cycles}, Frames: {frames}, Pause: {pause}")
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
address = settings.tiled_poller.publish_address
address = settings.tiled_poller.zmq_frame_publisher.address
print(f"Connecting to {address}")
socket.bind(address)
await process_images(socket, cycles, frames, pause)
Expand Down
2 changes: 1 addition & 1 deletion src/arroyogisaxs/app/lse_operator_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..log_utils import setup_logger
from ..lse.lse_operator import LatentSpaceOperator
from ..lse.lse_ws_publisher import LSEWSResultPublisher
from ..tiled import TiledProcessedPublisher
from ..tiled.tiled import TiledProcessedPublisher
from ..zmq import ZMQFrameListener

app = typer.Typer()
Expand Down
136 changes: 136 additions & 0 deletions src/arroyogisaxs/app/tiled_file_watcher_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import asyncio
import logging
import os
from pathlib import Path
from typing import Optional


from arroyopy import Listener, Operator, Publisher
from arroyopy.files import FileWatcherMessage
from tiled.client import from_uri
from tiled.client.base import BaseClient
import typer
from watchfiles import awatch, Change

from ..tiled.ingestor import TiledIngestor

# -----------------------------------------------------------------------------
# Logging Setup
# -----------------------------------------------------------------------------
logger = logging.getLogger("data_watcher")


def setup_logging(log_level: str = "INFO"):
level = getattr(logging, log_level.upper(), logging.INFO)
logger.setLevel(level)

if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

logger.propagate = False # Prevent duplication through root logger






# -----------------------------------------------------------------------------
# CLI App
# -----------------------------------------------------------------------------
app = typer.Typer(help="Watch a directory and publish new .gb files to Redis.")


class TiledPublisher(Publisher):
def __init__(self, tiled_ingestor: TiledIngestor):
self.tiled_ingestor = tiled_ingestor
super().__init__()

async def publish(self, message: FileWatcherMessage):
logger.debug(f"Publishing message to Tiled: {message}")
self.tiled_ingestor.add_scan_tiled(message.file_path)

class FileWatcherOperator(Operator):
def __init__(self, publisher: Publisher):
self.publisher = publisher

async def process(self, message):
logger.info(f"Processing message: {message}")
await self.publisher.publish(message)


class NullPublisher(Publisher):
async def publish(self, message):
logger.debug(f"NullPublisher: {message} - No action taken.")

class FileWatcherListener(Listener):
def __init__(self, directory: str, operator: Operator, force_polling: bool = True):
self.directory = directory
self.operator = operator
self.force_polling = force_polling

async def start(self):
logger.info(f"🔍 Watching directory recursively: {self.directory} (force_polling={self.force_polling})")
async for changes in awatch(self.directory, force_polling=self.force_polling):
for change_type, path_str in changes:
path = Path(path_str)
if not path.exists():
logger.debug(f"⚠️ Skipping non-existent path: {path}")
continue
if path.suffix not in [".gb", ".edf"]:
logger.debug(f"⚠️ Skipping non-supported file type: {path.suffix}")
continue
if path.is_dir():
logger.debug(f"⚠️ Skipping directory: {path}")
continue
logger.info(f"📦 Detected: {change_type} on {path}")
message = FileWatcherMessage(
file_path=str(path), is_directory=path.is_dir()
)
await self.operator.process(message)

async def stop(self):
pass


@app.command()
def main(
directory: Path = typer.Argument(..., help="Directory to watch for new files"),
tiled_uri: str = typer.Option(str, help="Tiled server URI, can bet set with TILED_URI env var"),
tiled_raw_root: str = typer.Option(str, help= "Root locatiuon of tile raw data can be set with TILED_RAW_ROOT env var"),
tiled_api_key: Optional[int] = typer.Option(None, help="tiled apikey can be set with TILED_API_KEY env vars"),
log_level: str = typer.Option(
"INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR)"
),
):
setup_logging(log_level)

loop = asyncio.get_event_loop()


if not tiled_uri:
tiled_uri = os.getenv("TILED_URI")
if not tiled_api_key:
tiled_api_key = os.getenv("TILED_API_KEY")
if not tiled_raw_root:
tiled_raw_root = os.getenv("TILED_RAW_ROOT")


logger.info(f"Connecting to Tiled server at {tiled_uri} with root {tiled_raw_root}")
tiled_client = from_uri(tiled_uri, api_key=tiled_api_key)
tiled_ingestor = TiledIngestor(tiled_client, tiled_raw_root, directory)
publisher = TiledPublisher(tiled_ingestor)
logger.info("Using Tiled publisher")
else:
publisher = NullPublisher()
logger.info("Using default null publisher")

operator = FileWatcherOperator(publisher)
listener = FileWatcherListener(str(directory), operator)
loop.run_until_complete(listener.start())


if __name__ == "__main__":
app()
2 changes: 1 addition & 1 deletion src/arroyogisaxs/app/tiled_poller_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import typer

from arroyogisaxs.config import settings
from arroyogisaxs.tiled import TiledPollingFrameListener, TiledRawFrameOperator
from arroyogisaxs.tiled.tiled import TiledPollingFrameListener, TiledRawFrameOperator
from arroyogisaxs.zmq import ZMQFramePublisher

from ..log_utils import setup_logger
Expand Down
2 changes: 1 addition & 1 deletion src/arroyogisaxs/app/viz_operator_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..log_utils import setup_logger
from ..one_d_reduction.operator import OneDReductionOperator

from ..tiled import TiledProcessedPublisher
from ..tiled.tiled import TiledProcessedPublisher
from ..websockets import OneDWSPublisher

app = typer.Typer()
Expand Down
2 changes: 1 addition & 1 deletion src/arroyogisaxs/one_d_reduction/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
GISAXSStop,
SerializableNumpyArrayModel,
)
from ..tiled import get_nested_client
from ..tiled.tiled import get_nested_client
from .detector import VerticalPilatus900kw
from .reduce import pixel_roi_horizontal_cut

Expand Down
Empty file.
126 changes: 126 additions & 0 deletions src/arroyogisaxs/tiled/ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
import os
import pathlib

import numpy as np
from tiled.client.base import BaseClient
from tiled.structures.array import ArrayStructure, BuiltinDtype
from tiled.structures.core import Spec, StructureFamily
from tiled.structures.data_source import Asset, DataSource, Management
from tiled.utils import ensure_uri

logger = logging.getLogger("data_watcher.tiled_ingest")


class TiledIngestor:
def __init__(self, tiled_client: BaseClient, raw_tiled_root: str, path_to_raw_data: str):
self.tiled_client = tiled_client
self.raw_tiled_root = raw_tiled_root
self.path_to_raw_data = path_to_raw_data


def add_scan_tiled(self, scan_filepath: str) -> str:
# gives a root path path, lie /raw_data/scan1/good.file, would give you /raw_data/scan1
common_path = os.path.commonpath([self.path_to_raw_data, scan_filepath])
if common_path is None:
return None

relative_scan_filepath = os.path.relpath(scan_filepath, self.path_to_raw_data)
scan_container, scan = os.path.split(relative_scan_filepath)
scan_container_parts = os.path.normpath(scan_container).split(os.sep)

# scan_container_parts is [raw_data scan1]

# create containers in tiled if they do not exist
current_container_client = self.tiled_client[self.raw_tiled_root]
for part in scan_container_parts:
if part in current_container_client:
current_container_client = current_container_client[part]
else:
current_container_client = current_container_client.create_container(
key=part
)
key = os.path.splitext(scan)[0]

if key in current_container_client:
current_container_client.delete(key)

# hard coded for bl733 and P03
structure = ArrayStructure(
data_type=BuiltinDtype.from_numpy_dtype(
np.dtype("float32") if scan_filepath.endswith(".gb") else np.dtype("int32")
),
shape=(1679, 1475),
chunks=((1679,), (1475,)),
)

if scan_filepath.endswith(".edf"):
metadata = parse_txt_accompanying_edf(scan_filepath)
else:
metadata = {}

# TODO: Add metadata and spec

scan_client = current_container_client.new(
key=key,
structure_family=StructureFamily.array,
data_sources=[
DataSource(
management=Management.external,
mimetype=(
"application/x-gb"
if scan_filepath.endswith(".gb")
else "application/x-edf"
),
structure_family=StructureFamily.array,
structure=structure,
assets=[
Asset(
data_uri=ensure_uri(scan_filepath),
is_directory=False,
parameter="data_uri",
)
],
),
],
metadata=metadata,
specs=[Spec("gb") if scan_filepath.endswith(".gb") else Spec("edf")],
)
return scan_client.uri


def parse_txt_accompanying_edf(filepath):
"""Pase a .txt file produced at ALS beamline 7.3.3 into a dictionary.

Parameters
----------
filepath: str or pathlib.Path
Filepath of the .edf file.
"""
txt_filepath = None
if isinstance(filepath, str):
txt_filepath = filepath.replace(".edf", ".txt")
if isinstance(filepath, pathlib.Path):
txt_filepath = filepath.with_suffix(".txt")

# File does not exist, return empty dictionary
if not os.path.isfile(txt_filepath):
return dict()

with open(txt_filepath, "r") as file:
lines = file.readlines()

# Some lines have the format
# key: value
# others are just values with no key
keyless_lines = 0
txt_params = dict()
for line in lines:
line_components = list(map(str.strip, line.split(":", maxsplit=1)))
if len(line_components) >= 2:
txt_params[line_components[0]] = line_components[1]
else:
if line_components[0] != "!0":
txt_params[f"Keyless Parameter #{keyless_lines}"] = line_components[0]
keyless_lines += 1
return txt_params
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# from tiled.client.dataframe import DataFrameClient
from tiled.client.container import Container

from .schemas import (
from ..schemas import (
GISAXS1DReduction,
GISAXSLatentSpaceEvent,
GISAXSMessage,
Expand Down
6 changes: 6 additions & 0 deletions tiled_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import os

from tiled.client import from_uri

c = from_uri("http://tiled:8000", api_key=os.getenv("TILED_API_KEY"))
raw = c["bl733/raw"]