-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathpyhyper.py
More file actions
100 lines (82 loc) · 2.8 KB
/
pyhyper.py
File metadata and controls
100 lines (82 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import re
from pathlib import Path
import httpx
# import pyFAI
# import PyHyperScattering
from prefect import get_run_logger, task
# from utils import get_tiled_client
PATH = "/nsls2/data/dssi/scratch/prefect-outputs/rsoxs/"
DATA_SESSION_PATTERN = re.compile("[passGUCP]*-([0-9]+)")
def lookup_directory(start_doc):
"""
Return the path for the proposal directory.
PASS gives us a *list* of cycles, and we have created a proposal directory under each cycle.
"""
DATA_SESSION_PATTERN = re.compile("[GUPCpass]*-([0-9]+)")
client = httpx.Client(base_url="https://api-staging.nsls2.bnl.gov")
data_session = start_doc[
"data_session"
] # works on old-style Header or new-style BlueskyRun
try:
digits = int(DATA_SESSION_PATTERN.match(data_session).group(1))
except AttributeError:
raise AttributeError(f"incorrect data_session: {data_session}")
response = client.get(f"/proposal/{digits}/directories")
response.raise_for_status()
paths = [path_info["path"] for path_info in response.json()]
# Filter out paths from other beamlines.
paths = [path for path in paths if "sst" == path.lower().split("/")[3]]
# Filter out paths from other cycles and paths for commissioning.
paths = [
path
for path in paths
if path.lower().split("/")[5] == "commissioning"
or path.lower().split("/")[5] == start_doc["cycle"]
]
# There should be only one path remaining after these filters.
# Convert it to a pathlib.Path.
return Path(paths[0])
#######################################################################
# WIP: Commenting out this function to avoid masking real linter errors
# OK to uncomment when development resumes.
#######################################################################
# @task
# def write_run_artifacts(scan_id):
# """
# Example live-analysis function
#
# Parameters:
# run_to_plot (int): the local scan id from DataBroker
# """
# start_doc = tiled_client_raw[scan_id].start
# directory = (
# lookup_directory(start_doc)
# / start_doc["project_name"]
# / f"{start_doc['scan_id']}"
# )
# directory.mkdir(parents=True, exist_ok=True)
#
# logger = get_run_logger()
# logger.info(f"starting pyhyper export to {directory}")
#
# logger.info(f"{PyHyperScattering.__version__}")
#
# c = get_tiled_client()
# logger.info("Loaded RSoXS Profile...")
#
# logger.info("created RSoXS catalog loader...")
#
# # except Exception:
# # logger.warning("Couldn't save as NeXus file.")
# logger.info("Done!")
# return integratedimages
#
#
# @flow
# def pyhyper_flow(scan_id=36106):
# write_run_artifacts(scan_id)
# log_status()
@task
def log_status():
logger = get_run_logger()
logger.info("Done!")