diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index e60af27..5ec4ecf 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,6 +2,7 @@ # from data_validation import general_data_validation from export import export +from pyhyper import pyhyper_flow @task @@ -15,4 +16,5 @@ def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] # general_data_validation(uid) export(uid) + pyhyper_flow(uid) log_completion() diff --git a/pyhyper.py b/pyhyper.py index c59a907..f9894e8 100644 --- a/pyhyper.py +++ b/pyhyper.py @@ -2,18 +2,74 @@ from pathlib import Path import httpx - -# import pyFAI -# import PyHyperScattering -from prefect import get_run_logger, task - -# from tiled.client import from_profile +import PyHyperScattering +from prefect import flow, get_run_logger, task +from tiled.client import from_profile +from tiled.client.xarray import write_xarray_dataset PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/" DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)") +@task +def load_and_reduce( + scanid, override_bcx=None, override_bcy=None, override_dist=None, override_mask=None +): + scan = load.loadRun(load.c[scanid]) + scan_us = scan.unstack("system") # this is expensive, do it only once + if "energy" in scan_us.dims: + integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator + else: + integrator = PyHyperScattering.integrate.PFGeneralIntegrator + integrator = integrator( + integration_method="csr", geomethod="template_xr", template_xr=scan + ) + if override_bcx is not None: + integrator.ni_beamcenter_x = override_bcx + if override_bcy is not None: + integrator.ni_beamcenter_y = override_bcy + if override_dist is not None: + integrator.ni_distance = override_dist + if override_mask is not None: + integrator.mask = override_mask + return ( + integrator.integrateImageStack(scan) + .to_dataset(name="reduced") + .unstack("system") + ) + + +load = PyHyperScattering.load.SST1RSoXSDB(corr_mode="none") +writable_tiled = from_profile("rsoxs")["reduced_sandbox"] + + +@task +def load_reduce_and_write_to_tiled(scanid): + logger = get_run_logger() + try: + scan = load_and_reduce(scanid) + except Exception as e: + logger.warning(f"Exception during reduction {e}") + write_xarray_dataset(writable_tiled, scan) + return scan + + +""" +def auto_reduce_recent_data_if_not_reduced(number): + for scan_ref in range (-number,-1): + try: + if load.c[scan_ref].stop is not None: # if the scan is currently running, stop will be None. + local_scan_id = load.c[scan_ref].metadata['summary']['scan_id'] + if len(analyzed_tiled.search(Eq('attrs.start.scan_id',local_scan_id))) > 0: + continue + load_reduce_and_write_to_tiled(scan_ref) + except Exception as e: + print(f'error in processing {scan_ref}, {e}') + +""" + + def lookup_directory(start_doc): """ Return the path for the proposal directory. @@ -52,46 +108,12 @@ def lookup_directory(start_doc): return Path(paths[0]) -####################################################################### -# WIP: Commenting out this function to avoid masking real linter errors -# OK to uncomment when development resumes. -####################################################################### -# @task -# def write_run_artifacts(scan_id): -# """ -# Example live-analysis function -# -# Parameters: -# run_to_plot (int): the local scan id from DataBroker -# """ -# start_doc = tiled_client_raw[scan_id].start -# directory = ( -# lookup_directory(start_doc) -# / start_doc["project_name"] -# / f"{start_doc['scan_id']}" -# ) -# directory.mkdir(parents=True, exist_ok=True) -# -# logger = get_run_logger() -# logger.info(f"starting pyhyper export to {directory}") -# -# logger.info(f"{PyHyperScattering.__version__}") -# -# c = from_profile("nsls2") -# logger.info("Loaded RSoXS Profile...") -# -# logger.info("created RSoXS catalog loader...") -# -# # except Exception: -# # logger.warning("Couldn't save as NeXus file.") -# logger.info("Done!") -# return integratedimages -# -# -# @flow -# def pyhyper_flow(scan_id=36106): -# write_run_artifacts(scan_id) -# log_status() +@flow +def pyhyper_flow(scan_id=36106): + load_reduce_and_write_to_tiled(scan_id) + # scan = load_reduce_and_write_to_tiled(scan_id) + # TODO: decide and save these artifacts write_run_artifacts(scan) + log_status() @task