Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# from data_validation import general_data_validation
from export import export
from pyhyper import pyhyper_flow


@task
Expand All @@ -15,4 +16,5 @@ def end_of_run_workflow(stop_doc):
uid = stop_doc["run_start"]
# general_data_validation(uid)
export(uid)
pyhyper_flow(uid)
log_completion()
114 changes: 68 additions & 46 deletions pyhyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,74 @@
from pathlib import Path

import httpx

# import pyFAI
# import PyHyperScattering
from prefect import get_run_logger, task

# from tiled.client import from_profile
import PyHyperScattering
from prefect import flow, get_run_logger, task
from tiled.client import from_profile
from tiled.client.xarray import write_xarray_dataset

PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/"

DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)")


@task
def load_and_reduce(
scanid, override_bcx=None, override_bcy=None, override_dist=None, override_mask=None
):
scan = load.loadRun(load.c[scanid])
scan_us = scan.unstack("system") # this is expensive, do it only once
if "energy" in scan_us.dims:
integrator = PyHyperScattering.integrate.PFEnergySeriesIntegrator
else:
integrator = PyHyperScattering.integrate.PFGeneralIntegrator
integrator = integrator(
integration_method="csr", geomethod="template_xr", template_xr=scan
)
if override_bcx is not None:
integrator.ni_beamcenter_x = override_bcx
if override_bcy is not None:
integrator.ni_beamcenter_y = override_bcy
if override_dist is not None:
integrator.ni_distance = override_dist
if override_mask is not None:
integrator.mask = override_mask
return (
integrator.integrateImageStack(scan)
.to_dataset(name="reduced")
.unstack("system")
)


load = PyHyperScattering.load.SST1RSoXSDB(corr_mode="none")
writable_tiled = from_profile("rsoxs")["reduced_sandbox"]


@task
def load_reduce_and_write_to_tiled(scanid):
logger = get_run_logger()
try:
scan = load_and_reduce(scanid)
except Exception as e:
logger.warning(f"Exception during reduction {e}")
write_xarray_dataset(writable_tiled, scan)
return scan


"""
def auto_reduce_recent_data_if_not_reduced(number):
for scan_ref in range (-number,-1):
try:
if load.c[scan_ref].stop is not None: # if the scan is currently running, stop will be None.
local_scan_id = load.c[scan_ref].metadata['summary']['scan_id']
if len(analyzed_tiled.search(Eq('attrs.start.scan_id',local_scan_id))) > 0:
continue
load_reduce_and_write_to_tiled(scan_ref)
except Exception as e:
print(f'error in processing {scan_ref}, {e}')

"""


def lookup_directory(start_doc):
"""
Return the path for the proposal directory.
Expand Down Expand Up @@ -52,46 +108,12 @@ def lookup_directory(start_doc):
return Path(paths[0])


#######################################################################
# WIP: Commenting out this function to avoid masking real linter errors
# OK to uncomment when development resumes.
#######################################################################
# @task
# def write_run_artifacts(scan_id):
# """
# Example live-analysis function
#
# Parameters:
# run_to_plot (int): the local scan id from DataBroker
# """
# start_doc = tiled_client_raw[scan_id].start
# directory = (
# lookup_directory(start_doc)
# / start_doc["project_name"]
# / f"{start_doc['scan_id']}"
# )
# directory.mkdir(parents=True, exist_ok=True)
#
# logger = get_run_logger()
# logger.info(f"starting pyhyper export to {directory}")
#
# logger.info(f"{PyHyperScattering.__version__}")
#
# c = from_profile("nsls2")
# logger.info("Loaded RSoXS Profile...")
#
# logger.info("created RSoXS catalog loader...")
#
# # except Exception:
# # logger.warning("Couldn't save as NeXus file.")
# logger.info("Done!")
# return integratedimages
#
#
# @flow
# def pyhyper_flow(scan_id=36106):
# write_run_artifacts(scan_id)
# log_status()
@flow
def pyhyper_flow(scan_id=36106):
load_reduce_and_write_to_tiled(scan_id)
# scan = load_reduce_and_write_to_tiled(scan_id)
# TODO: decide and save these artifacts write_run_artifacts(scan)
log_status()


@task
Expand Down