Skip to content
110 changes: 63 additions & 47 deletions src/PyHyperScattering/SST1RSoXSDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def subtract_dark(img, pedestal=100, darks=None):
monitors = (
monitors.rename({"time": "system"})
.reset_index("system")
.assign_coords(system=index)
.assign_coords(mindex_coords)
)

if "system_" in monitors.indexes.keys():
Expand Down Expand Up @@ -824,7 +824,8 @@ def loadMonitors(
entry,
integrate_onto_images: bool = True,
useShutterThinning: bool = True,
n_thinning_iters: int = 5,
n_thinning_iters: int = 1,
directLoadPulsedMonitors: bool = True
):
"""Load the monitor streams for entry.

Expand All @@ -845,51 +846,53 @@ def loadMonitors(
useShutterThinning : bool, optional
Whether or not to attempt to thin (filter) the raw time streams to remove data collected during shutter opening/closing, by default False
As of 9 Feb 2023 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is
not recommended because the shutter data is unreliable and too many points will be culled
not recommended because the shutter data is unreliable and too many points will be removed
n_thinning_iters : int, optional
how many iterations of thinning to perform, by default 5
If the data is becoming too sparse, try fewer iterations
how many iterations of thinning to perform, by default 1
(former default was 5 before gated monitor loading was added)
If you receive errors in assigning image timepoints to counters, try fewer iterations
directLoadPulsedMonitors : bool, optional
Whether or not to load the pulsed monitors using direct reading, by default True
This only applies if integrate_onto_images is True; otherwise you'll get very raw data.
If False, the pulsed monitors will be loaded using a shutter-thinning and masking approach as with continuous counters

Returns
-------
xr.Dataset
xarray dataset containing all monitor streams as data variables mapped against the dimension "time"
"""

monitors = None
raw_monitors = None


# Iterate through the list of streams held by the Bluesky document 'entry'
# Iterate through the list of streams held by the Bluesky document 'entry', and build
for stream_name in list(entry.keys()):
# Add monitor streams to the output xr.Dataset
if "monitor" in stream_name:
if monitors is None: # First one
if raw_monitors is None: # First one
# incantation to extract the dataset from the bluesky stream
monitors = entry[stream_name].data.read()
raw_monitors = entry[stream_name].data.read()
else: # merge into the to existing output xarray
monitors = xr.merge((monitors, entry[stream_name].data.read()))
raw_monitors = xr.merge((raw_monitors, entry[stream_name].data.read()))

# At this stage monitors has dimension time and all streams as data variables
# the time dimension inherited all time values from all streams
# the data variables (Mesh current, sample current etc.) are all sparse, with lots of nans

# if there are no monitors, return an empty xarray Dataset
if monitors is None:
if raw_monitors is None:
return xr.Dataset()

# For each nan value, replace with the closest value ahead of it in time
# For remaining nans, replace with closest value behind it in time
monitors = monitors.ffill("time").bfill("time")
monitors = raw_monitors.ffill("time").bfill("time")

# If we need to remap timepoints to match timepoints for data acquisition
if integrate_onto_images:
try:
# Pull out ndarray of 'primary' timepoints (measurement timepoints)
try:
primary_time = entry.primary.data["time"].values
except AttributeError:
if type(entry.primary.data["time"]) == tiled.client.array.DaskArrayClient:
primary_time = entry.primary.data["time"].read().compute()
elif type(entry.primary.data["time"]) == tiled.client.array.ArrayClient:
primary_time = entry.primary.data["time"].read()
primary_time = entry.primary.data["time"].__array__()
primary_time_bins = np.insert(primary_time, 0,0)

# If we want to exclude values for when the shutter was opening or closing
# This doesn't work for exposure times ~ < 0.5 s, because shutter stream isn't reliable
Expand All @@ -911,22 +914,50 @@ def loadMonitors(
"time"
)

#return monitors
# Bin the indexes in 'time' based on the intervales between timepoints in 'primary_time' and evaluate their mean
# Then rename the 'time_bin' dimension that results to 'time'
monitors = (
monitors.groupby_bins("time", np.insert(primary_time, 0, 0))
monitors.groupby_bins("time",primary_time_bins,include_lowest=True)
.mean()
.rename_dims({"time_bins": "time"})
.rename({"time_bins": "time"})
)

'''
# Add primary measurement time as a coordinate in monitors that is named 'time'
# Remove the coordinate 'time_bins' from the array
monitors = (
monitors.assign_coords({"time": primary_time})
.drop_indexes("time_bins")
.reset_coords("time_bins", drop=True)
)

)'''

# load direct/pulsed monitors

for stream_name in list(entry.keys()):
if "monitor" in stream_name and ("Beamstop" in stream_name or "Sample" in stream_name):
# the pulsed monitors we know about are "SAXS Beamstop", "WAXS Beamstop", "Sample Current"
# if others show up here, they could be added
out_name = stream_name.replace("_monitor", "")
mon = entry[stream_name].data.read()[out_name].compute()
SIGNAL_THRESHOLD = 0.1
threshold = SIGNAL_THRESHOLD*mon.mean('time')
mon_filter = xr.zeros_like(mon)
mon_filter[mon<threshold] = 0
mon_filter[mon>threshold] = 1
mon_filter.values = scipy.ndimage.binary_erosion(mon_filter)
mon_filtered = mon.where(mon_filter==1)
mon_binned = (mon_filtered.groupby_bins("time",primary_time_bins,include_lowest=True)
.mean()
.rename({"time_bins":"time"})
)

if not directLoadPulsedMonitors:
out_name = 'pl_' + out_name

monitors[out_name] = mon_binned
monitors = monitors.assign_coords({"time": primary_time})


except Exception as e:
# raise e # for testing
warnings.warn(
Expand Down Expand Up @@ -1043,19 +1074,12 @@ def loadMd(self, run):
# print(f'Loading from primary: {phs}, value {primary[rsoxs].values}')
except (KeyError, HTTPStatusError):
try:
blval = baseline[rsoxs]
if (
type(blval) == tiled.client.array.ArrayClient
or type(blval) == tiled.client.array.DaskArrayClient
):
blval = blval.read()
blval = baseline[rsoxs].__array__()
md[phs] = blval.mean().round(4)
if blval.var() > 0:
if blval.var() > 1e-4*abs(blval.mean()):
warnings.warn(
(
f"While loading {rsoxs} to infill metadata entry for {phs}, found"
f" beginning and end values unequal: {baseline[rsoxs]}. It is"
" possible something is messed up."
f"{phs} changed during scan: {blval}."
),
stacklevel=2,
)
Expand All @@ -1064,28 +1088,20 @@ def loadMd(self, run):
md[phs] = primary[md_secondary_lookup[phs]].read()
except (KeyError, HTTPStatusError):
try:
blval = baseline[md_secondary_lookup[phs]]
if (
type(blval) == tiled.client.array.ArrayClient
or type(blval) == tiled.client.array.DaskArrayClient
):
blval = blval.read()
blval = baseline[md_secondary_lookup[phs]].__array__()
md[phs] = blval.mean().round(4)
if blval.var() > 0:
if blval.var() > 1e-4*abs(blval.mean()):
warnings.warn(
(
f"While loading {md_secondary_lookup[phs]} to infill"
f" metadata entry for {phs}, found beginning and end"
f" values unequal: {baseline[rsoxs]}. It is possible"
" something is messed up."
f"{phs} changed during scan: {blval}."
),
stacklevel=2,
)
except (KeyError, HTTPStatusError):
warnings.warn(
(
f"Could not find {rsoxs} in either baseline or primary. "
f" Needed to infill value {phs}. Setting to None."
f"Could not find {rsoxs} in either baseline or primary while"
f" looking for {phs}. Setting to None."
),
stacklevel=2,
)
Expand Down
Loading