diff --git a/src/PyHyperScattering/SST1RSoXSDB.py b/src/PyHyperScattering/SST1RSoXSDB.py index 9840adea..7c156daf 100644 --- a/src/PyHyperScattering/SST1RSoXSDB.py +++ b/src/PyHyperScattering/SST1RSoXSDB.py @@ -767,7 +767,7 @@ def subtract_dark(img, pedestal=100, darks=None): monitors = ( monitors.rename({"time": "system"}) .reset_index("system") - .assign_coords(system=index) + .assign_coords(mindex_coords) ) if "system_" in monitors.indexes.keys(): @@ -824,7 +824,8 @@ def loadMonitors( entry, integrate_onto_images: bool = True, useShutterThinning: bool = True, - n_thinning_iters: int = 5, + n_thinning_iters: int = 1, + directLoadPulsedMonitors: bool = True ): """Load the monitor streams for entry. @@ -845,51 +846,53 @@ def loadMonitors( useShutterThinning : bool, optional Whether or not to attempt to thin (filter) the raw time streams to remove data collected during shutter opening/closing, by default False As of 9 Feb 2023 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is - not recommended because the shutter data is unreliable and too many points will be culled + not recommended because the shutter data is unreliable and too many points will be removed n_thinning_iters : int, optional - how many iterations of thinning to perform, by default 5 - If the data is becoming too sparse, try fewer iterations + how many iterations of thinning to perform, by default 1 + (former default was 5 before gated monitor loading was added) + If you receive errors in assigning image timepoints to counters, try fewer iterations + directLoadPulsedMonitors : bool, optional + Whether or not to load the pulsed monitors using direct reading, by default True + This only applies if integrate_onto_images is True; otherwise you'll get very raw data. + If False, the pulsed monitors will be loaded using a shutter-thinning and masking approach as with continuous counters + Returns ------- xr.Dataset xarray dataset containing all monitor streams as data variables mapped against the dimension "time" """ - monitors = None + raw_monitors = None + - # Iterate through the list of streams held by the Bluesky document 'entry' + # Iterate through the list of streams held by the Bluesky document 'entry', and build for stream_name in list(entry.keys()): # Add monitor streams to the output xr.Dataset if "monitor" in stream_name: - if monitors is None: # First one + if raw_monitors is None: # First one # incantation to extract the dataset from the bluesky stream - monitors = entry[stream_name].data.read() + raw_monitors = entry[stream_name].data.read() else: # merge into the to existing output xarray - monitors = xr.merge((monitors, entry[stream_name].data.read())) + raw_monitors = xr.merge((raw_monitors, entry[stream_name].data.read())) # At this stage monitors has dimension time and all streams as data variables # the time dimension inherited all time values from all streams # the data variables (Mesh current, sample current etc.) are all sparse, with lots of nans # if there are no monitors, return an empty xarray Dataset - if monitors is None: + if raw_monitors is None: return xr.Dataset() # For each nan value, replace with the closest value ahead of it in time # For remaining nans, replace with closest value behind it in time - monitors = monitors.ffill("time").bfill("time") + monitors = raw_monitors.ffill("time").bfill("time") # If we need to remap timepoints to match timepoints for data acquisition if integrate_onto_images: try: # Pull out ndarray of 'primary' timepoints (measurement timepoints) - try: - primary_time = entry.primary.data["time"].values - except AttributeError: - if type(entry.primary.data["time"]) == tiled.client.array.DaskArrayClient: - primary_time = entry.primary.data["time"].read().compute() - elif type(entry.primary.data["time"]) == tiled.client.array.ArrayClient: - primary_time = entry.primary.data["time"].read() + primary_time = entry.primary.data["time"].__array__() + primary_time_bins = np.insert(primary_time, 0,0) # If we want to exclude values for when the shutter was opening or closing # This doesn't work for exposure times ~ < 0.5 s, because shutter stream isn't reliable @@ -911,22 +914,50 @@ def loadMonitors( "time" ) + #return monitors # Bin the indexes in 'time' based on the intervales between timepoints in 'primary_time' and evaluate their mean # Then rename the 'time_bin' dimension that results to 'time' monitors = ( - monitors.groupby_bins("time", np.insert(primary_time, 0, 0)) + monitors.groupby_bins("time",primary_time_bins,include_lowest=True) .mean() - .rename_dims({"time_bins": "time"}) + .rename({"time_bins": "time"}) ) - + ''' # Add primary measurement time as a coordinate in monitors that is named 'time' # Remove the coordinate 'time_bins' from the array monitors = ( monitors.assign_coords({"time": primary_time}) .drop_indexes("time_bins") .reset_coords("time_bins", drop=True) - ) - + )''' + + # load direct/pulsed monitors + + for stream_name in list(entry.keys()): + if "monitor" in stream_name and ("Beamstop" in stream_name or "Sample" in stream_name): + # the pulsed monitors we know about are "SAXS Beamstop", "WAXS Beamstop", "Sample Current" + # if others show up here, they could be added + out_name = stream_name.replace("_monitor", "") + mon = entry[stream_name].data.read()[out_name].compute() + SIGNAL_THRESHOLD = 0.1 + threshold = SIGNAL_THRESHOLD*mon.mean('time') + mon_filter = xr.zeros_like(mon) + mon_filter[monthreshold] = 1 + mon_filter.values = scipy.ndimage.binary_erosion(mon_filter) + mon_filtered = mon.where(mon_filter==1) + mon_binned = (mon_filtered.groupby_bins("time",primary_time_bins,include_lowest=True) + .mean() + .rename({"time_bins":"time"}) + ) + + if not directLoadPulsedMonitors: + out_name = 'pl_' + out_name + + monitors[out_name] = mon_binned + monitors = monitors.assign_coords({"time": primary_time}) + + except Exception as e: # raise e # for testing warnings.warn( @@ -1043,19 +1074,12 @@ def loadMd(self, run): # print(f'Loading from primary: {phs}, value {primary[rsoxs].values}') except (KeyError, HTTPStatusError): try: - blval = baseline[rsoxs] - if ( - type(blval) == tiled.client.array.ArrayClient - or type(blval) == tiled.client.array.DaskArrayClient - ): - blval = blval.read() + blval = baseline[rsoxs].__array__() md[phs] = blval.mean().round(4) - if blval.var() > 0: + if blval.var() > 1e-4*abs(blval.mean()): warnings.warn( ( - f"While loading {rsoxs} to infill metadata entry for {phs}, found" - f" beginning and end values unequal: {baseline[rsoxs]}. It is" - " possible something is messed up." + f"{phs} changed during scan: {blval}." ), stacklevel=2, ) @@ -1064,28 +1088,20 @@ def loadMd(self, run): md[phs] = primary[md_secondary_lookup[phs]].read() except (KeyError, HTTPStatusError): try: - blval = baseline[md_secondary_lookup[phs]] - if ( - type(blval) == tiled.client.array.ArrayClient - or type(blval) == tiled.client.array.DaskArrayClient - ): - blval = blval.read() + blval = baseline[md_secondary_lookup[phs]].__array__() md[phs] = blval.mean().round(4) - if blval.var() > 0: + if blval.var() > 1e-4*abs(blval.mean()): warnings.warn( ( - f"While loading {md_secondary_lookup[phs]} to infill" - f" metadata entry for {phs}, found beginning and end" - f" values unequal: {baseline[rsoxs]}. It is possible" - " something is messed up." + f"{phs} changed during scan: {blval}." ), stacklevel=2, ) except (KeyError, HTTPStatusError): warnings.warn( ( - f"Could not find {rsoxs} in either baseline or primary. " - f" Needed to infill value {phs}. Setting to None." + f"Could not find {rsoxs} in either baseline or primary while" + f" looking for {phs}. Setting to None." ), stacklevel=2, )