-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpyhyper.py
More file actions
80 lines (60 loc) · 2.18 KB
/
pyhyper.py
File metadata and controls
80 lines (60 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import prefect
from prefect import task, Flow, Parameter
from prefect.triggers import all_finished
import warnings
from tiled.client import from_profile
import PyHyperScattering
print(f"{PyHyperScattering.__version__}")
RUN_TO_PLOT = 36106
PATH = "/nsls2/data/tst/proposals/2022-2/prefect-test/"
@task
def write_run_artifacts(RUN_TO_PLOT):
"""
Example live-analysis function
Parameters:
run_to_plot (int): the local scan id from DataBroker
"""
# Prefect logger
logger = prefect.context.get("logger")
c = from_profile("nsls2")
rsoxsload = PyHyperScattering.load.SST1RSoXSDB(corr_mode="none", catalog=c["rsoxs"])
itp = rsoxsload.loadRun(c["rsoxs"][RUN_TO_PLOT], dims=["energy"])
if itp.rsoxs_config == "waxs":
maskmethod = "nika"
mask = "/nsls2/data/sst/legacy/RSoXS/analysis/SST1_WAXS_mask.hdf"
elif itp.rsoxs_config == "saxs":
maskmethod = "nika"
mask = "/nsls2/data/sst/legacy/RSoXS/analysis/SST1-SAXS_mask.hdf"
else:
maskmethod = "none"
warnings.warn(
f"Bad rsoxs_config, expected saxs or waxs but found {itp.rsoxs_config}. This will disable masking and certainly cause issues later.",
stacklevel=2,
)
integ = PyHyperScattering.integrate.PFEnergySeriesIntegrator(
maskmethod=maskmethod,
maskpath=mask,
geomethod="template_xr",
template_xr=itp,
integration_method="csr_ocl",
)
name = itp.attrs["sample_name"]
# DataArray
integratedimages = integ.integrateImageStack(itp)
# print(integratedimages)
try:
integratedimages.fileio.saveNexus(f"{PATH}reduced_{RUN_TO_PLOT}_{name}.nxs")
except Exception:
logger.warning("Couldn't save as NeXus file.")
# integratedimages.fileio.saveNexus(f'{PATH}reduced_{RUN_TO_PLOT}_{name}.nxs')
return integratedimages
@task
def log_status(trigger=all_finished):
logger = prefect.context.get("logger")
logger.info("Done!")
with Flow("pyhyper-flow") as flow:
scan_id = Parameter("scan_id", default=36106)
da = write_run_artifacts(scan_id)
# log_task = log_status()
# log_task.set_upstream(da)
log_status(upstream_tasks=[da])