Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions spine/data/optical.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Flash(PosDataBase):
----------
id : int
Index of the flash in the list
interaction_id : int
Index of the interaction in the list
volume_id : int
Index of the optical volume in which the flahs was recorded
time : float
Expand Down Expand Up @@ -48,6 +50,7 @@ class Flash(PosDataBase):
Units in which the position coordinates are expressed
"""
id: int = -1
interaction_id: int = -1
volume_id: int = -1
frame: int = -1
in_beam_frame: bool = False
Expand Down Expand Up @@ -108,3 +111,51 @@ def from_larcv(cls, flash):
time_abs=flash.absTime(), time_width=flash.timeWidth(),
total_pe=flash.TotalPE(), pe_per_ch=pe_per_ch,
center=center, width=width)

@classmethod
def from_hypothesis(cls, flash, interaction_id, id, volume_id=None,negative_id=True):
"""Builds and returns a Flash object from a flashmatch::Flash_t object.
From the hypothesis flash.

Parameters
----------
flash : flashmatch::Flash_t
Flash object
interaction_id : int
Interaction ID to make the flash
id : int
ID of the flash
volume_id : int, optional
Volume ID to use for the flash, set manually if provided
negative_id : bool, default True
If `True`, use the negative of the id as the flash ID. This is used to identify which hypothesis is matched. If it's positive, it's matched.

Returns
-------
Flash
Flash object
"""
# Get the number of PEs per optical channel
pe_per_ch = np.array(flash.pe_v, dtype=np.float32)

# Get the center and width of the flash
center = np.array([flash.x, flash.y, flash.z])
width = np.array([flash.x_err, flash.y_err, flash.z_err])

#Get the volume ID
if volume_id is None:
volume_id = -1
for attr in ('tpc', 'volume_id'):
if hasattr(flash, attr):
volume_id = getattr(flash, attr)()

# Create the Flash object
if negative_id:
id = -(id+1) #+1 to avoid negative zero
return cls(id=id, interaction_id=interaction_id, volume_id=volume_id,
time=flash.time,
time_width=flash.time_width,
total_pe=flash.TotalPE(), pe_per_ch=pe_per_ch,
center=center, width=width)
def __str__(self):
return f"Flash(id={self.id}, interaction_id={self.interaction_id}, volume_id={self.volume_id}, time={self.time}, time_width={self.time_width}, total_pe={self.total_pe}, center={self.center}, width={self.width})"
1 change: 1 addition & 0 deletions spine/post/optical/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .flash_matching import *
from .fill_hypothesis import *
167 changes: 167 additions & 0 deletions spine/post/optical/fill_hypothesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
Post-processor in charge of filling the hypothesis into the data product.
"""

from spine.post.base import PostBase
from spine.utils.geo import Geometry
from spine.data.out.base import OutBase
import numpy as np

from .hypothesis import Hypothesis

__all__ = ['FillFlashHypothesisProcessor']

class FillFlashHypothesisProcessor(PostBase):
"""Fills the hypothesis into the data product."""

# Name of the post-processor (as specified in the configuration)
name = 'fill_hypothesis'

# Alternative allowed names of the post-processor
aliases = ('fill_hypo',)

def __init__(self, flash_key, volume, ref_volume_id=None, detector=None, parent_path=None,
geometry_file=None, run_mode='reco', truth_point_mode='points',
truth_dep_mode='depositions', hypothesis_key='flash_hypos', **kwargs):
"""Initialize the fill hypothesis processor.

Parameters
----------
flash_key : str
Flash data product name. In most cases, this is unambiguous, unless
there are multiple types of segregated optical detectors
volume : str
Physical volume corresponding to each flash ('module' or 'tpc')
ref_volume_id : str, optional
If specified, the flash matching expects all interactions/flashes
to live into a specific optical volume. Must shift everything.
detector : str, optional
Detector to get the geometry from
geometry_file : str, optional
Path to a `.yaml` geometry file to load the geometry from
parent_path : str, optional
Path to the parent directory of the main analysis configuration.
This allows for the use of relative paths in the post-processors.
hypothesis_key : str, default 'flash_hypo'
Key to use for the hypothesis data product
"""
# Initialize the parent class
super().__init__(
'interaction', run_mode, truth_point_mode, truth_dep_mode,
parent_path=parent_path)

# Initialize the hypothesis key
self.flash_key = flash_key
self.hypothesis_key = hypothesis_key

# Initialize the detector geometry
self.geo = Geometry(detector, geometry_file)

# Get the volume within which each flash is confined
assert volume in ('tpc', 'module'), (
"The `volume` must be one of 'tpc' or 'module'.")
self.volume = volume
self.ref_volume_id = ref_volume_id

# Initialize the hypothesis algorithm
self.hypothesis = Hypothesis(detector=detector, parent_path=self.parent_path, **kwargs)

#Assert that we have flashes and interactions
self.update_keys({self.flash_key: True})

def match_hypothesis(self, hypothesis_v, flash_info_v):
"""Match the hypothesis to the flash. The hypothesis has the interaction ID,
whereas the interaction has the flash ID. So we will match the hypothesis interaction ID to
the interaction that's matched to the flash, then set the flash ID to the hypothesis ID.

Parameters
----------
hypothesis_v : list
List of hypothesis objects
flash_info_v : list
List of tuples of interaction ID, flash IDs, and flash volumes

Returns
-------
None
Modifies the hypothesis objects in place
"""

#Make a dictionary of the flash IDs and volumes
int_id_dict = {ii[0]: (ii[1],ii[2]) for ii in flash_info_v}

#Modify the hypothesis objects
for hypo in hypothesis_v:
#If the interaction ID is in the dictionary, and the hypothesis volume matches the flash volume, set the flash IDs
if hypo.interaction_id in int_id_dict and int_id_dict[hypo.interaction_id][1] == hypo.volume_id:
hypo.id = int_id_dict[hypo.interaction_id][0]


def process(self, data):
"""Fills the hypothesis into the data product.

Parameters
----------
data : dict
Data product to fill the hypothesis into
"""

volume_ids = np.asarray([f.volume_id for f in data[self.flash_key]])

#Loop over optical volumes, make the hypotheses in each
for k in self.interaction_keys:
# Fetch interactions, nothing to do if there are not any
interactions = data[k]
if not len(interactions):
continue

# Make sure the interaction coordinates are expressed in cm
self.check_units(interactions[0])

# Loop over the optical volumes
id_offset = 0
hypothesis_v = []
for volume_id in np.unique(volume_ids):
# Crop interactions to only include depositions in the optical volume
interactions_v = []
flash_info_v = []
for inter in interactions:
# Fetch the points in the current optical volume
sources = self.get_sources(inter)
if self.volume == 'module':
index = self.geo.get_volume_index(sources, volume_id)

elif self.volume == 'tpc':
num_cpm = self.geo.tpc.num_chambers_per_module
module_id, tpc_id = volume_id//num_cpm, volume_id%num_cpm
index = self.geo.get_volume_index(sources, module_id, tpc_id)

# If there are no points in this volume, proceed
if len(index) == 0:
continue

# Fetch points and depositions
points = self.get_points(inter)[index]
depositions = self.get_depositions(inter)[index]
if self.ref_volume_id is not None:
# If the reference volume is specified, shift positions
points = self.geo.translate(
points, volume_id, self.ref_volume_id)

# Create an interaction which holds positions/depositions
inter_v = OutBase(
id=inter.id, points=points, depositions=depositions)
interactions_v.append(inter_v)
for fid,fvol in zip(inter.flash_ids,inter.flash_volume_ids):
if fvol == volume_id:
flash_info_v.append((inter.id,fid,fvol)) #needed for matching
# Make the hypothesis
_hypo_v = self.hypothesis.make_hypothesis_list(interactions_v, id_offset, volume_id)
hypothesis_v.extend(_hypo_v)
id_offset += len(_hypo_v) #increment the offset for the next volume

# Match the matched hypothesis to the flash if provided in interactions_v
self.match_hypothesis(hypothesis_v, flash_info_v)

# Fill the hypothesis into the data product
return {self.hypothesis_key: hypothesis_v}
154 changes: 154 additions & 0 deletions spine/post/optical/hypothesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Module for storing flash hypotheses into flash objects."""

#TODO: Make base class between likelihood and this

import os
import sys
import numpy as np
import re

# Import the base class and Flash data structure
from .opt0_interface import OpT0Interface
from spine.data.optical import Flash


class Hypothesis(OpT0Interface):
"""
Interface class between flash hypothesis generation and OpT0Finder.

Inherits common initialization and QCluster creation from OpT0Interface.
Uses an OpT0Finder hypothesis algorithm (e.g., SemiAnalyticalModel,
PhotonLibHypothesis) to generate predicted flash PEs from TPC interactions.
"""

def __init__(self, cfg, detector, parent_path=None, scaling=1., alpha=0.21,
recombination_mip=0.6, legacy=False):
"""Initialize the flash hypothesis algorithm.

Parameters
----------
cfg : str
Flash matching configuration file path
detector : str, optional
Detector to get the geometry from
parent_path : str, optional
Path to the parent configuration file (allows for relative paths)
scaling : Union[float, str], default 1.
Global scaling factor for the depositions (can be an expression)
alpha : float, default 0.21
Number of excitons (Ar*) divided by number of electron-ion pairs (e-,Ar+)
recombination_mip : float, default 0.6
Recombination factor for MIP-like particles in LAr
legacy : bool, default False
Use the legacy OpT0Finder function(s). TODO: remove when dropping legacy
"""
# Call the parent class initializer for common setup
super().__init__(cfg, detector, parent_path, scaling, alpha,
recombination_mip, legacy)

# Initialize hypothesis-specific attributes
self.hypothesis_v = None

def _initialize_algorithm(self, cfg_params):
"""
Initialize the specific Hypothesis algorithm based on configuration.

Parameters
----------
cfg_params : flashmatch::PSet
The loaded OpT0Finder configuration parameters.
"""
from flashmatch import flashmatch
# Get FlashMatchManager configuration section to find the HypothesisAlgo name
# Assuming the relevant parameters are under 'FlashMatchManager' PSet
# Adjust 'FlashMatchManager' if your config structure is different
manager_params = cfg_params.get['flashmatch::FMParams']('FlashMatchManager')

# Parse the configuration dump to find the HypothesisAlgo value
config_dump = manager_params.dump()
match = re.search(r'HypothesisAlgo\s*:\s*"([^"]+)"', config_dump)
if match:
algo_name = match.group(1)
else:
# Fallback: Check if the hypothesis algo config exists directly under top level
# This depends on how the .cfg file is structured
found_algo = False
for name in ['SemiAnalyticalModel', 'PhotonLibHypothesis']: # Add other known hypothesis algos
if cfg_params.contains_pset(name):
algo_name = name
found_algo = True
break
if not found_algo:
raise ValueError(f"Could not find HypothesisAlgo parameter within "
f"'FlashMatchManager' PSet in configuration: {config_dump}")


print(f'HypothesisAlgo: {algo_name}')

# Create the hypothesis algorithm based on the extracted name
# Ensure the factory name matches the class name used in OpT0Finder registration
try:
self.hypothesis = flashmatch.FlashHypothesisFactory.get().create(
algo_name, algo_name) # Factory name and instance name often match
except Exception as e:
raise ValueError(f"Failed to create hypothesis algorithm '{algo_name}'. "
f"Is it registered correctly in OpT0Finder? Error: {e}")

# Configure the hypothesis algorithm using its own PSet
try:
algo_pset = cfg_params.get['flashmatch::FMParams'](algo_name)
self.hypothesis.Configure(algo_pset)
except Exception as e:
raise ValueError(f"Failed to configure hypothesis algorithm '{algo_name}' "
f"using PSet '{algo_name}'. Error: {e}")

def make_hypothesis_list(self, interactions, id_offset=0, volume_id=None):
"""
Runs the hypothesis algorithm on a list of interactions to create
a list of spine Flash objects representing the predicted light.

Parameters
----------
interactions : List[Union[Interaction, TruthInteraction]]
List of TPC interactions
id_offset : int, default 0
Offset to add to the flash ID
volume_id : int, optional
Volume ID to use for the hypothesis

Returns
-------
List[Flash]
List of generated spine Flash objects.
"""
# Make the QCluster_t objects using the base class method
# Store them in self.qcluster_v as the base class expects
self.qcluster_v = self.make_qcluster_list(interactions)

# Map original interaction index to qcluster object for easy lookup
qcluster_map = {qc.idx: qc for qc in self.qcluster_v}

# Initialize the list of generated spine Flash objects
self.hypothesis_v = []

# Run the hypothesis algorithm for each interaction that produced a valid qcluster
for i, inter in enumerate(interactions):
# Find the corresponding QCluster_t object using the original index
qcluster = qcluster_map.get(inter.id) # Assuming inter.id is the original index used in make_qcluster_list

# Skip if no valid qcluster was created for this interaction
if qcluster is None:
continue

# Run the hypothesis algorithm
flash_hypothesis_fm = self.hypothesis.GetEstimate(qcluster) # flashmatch::Flash_t

# Create a new spine Flash object from the hypothesis result
# Pass the original interaction ID (inter.id)
flash = Flash.from_hypothesis(flash_hypothesis_fm, inter.id, i + id_offset, volume_id)

# Append the generated spine Flash object
self.hypothesis_v.append(flash)

return self.hypothesis_v

Loading
Loading