-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathpost_processors.py
More file actions
31 lines (25 loc) · 943 Bytes
/
post_processors.py
File metadata and controls
31 lines (25 loc) · 943 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from tiled.client import from_profile
from exporters import export_E_step, export_E_fly
BEAMLINE_ACRONYM = "tes"
api_key = Secret.load("tiled-tes-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)[BEAMLINE_ACRONYM]["raw"]
processor_map = {
'export_E_step': export_E_step,
'export_E_fly': export_E_fly,
}
@task
def dispatcher(run_uid):
logger = get_run_logger()
run = tiled_client[run_uid]
for processor in run.start["prefect_post_processors"]:
logger.info(f"Start post-processor '{processor}'...")
processor_map[processor](run)
logger.info(f"Finish post-processor '{processor}'")
@flow(log_prints=True)
def post_processors(run_uid):
logger = get_run_logger()
logger.info("Start post_processors...")
dispatcher(run_uid)
logger.info("Finish post_processors.")