Skip to content
160 changes: 141 additions & 19 deletions bilby/gw/detector/interferometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class Interferometer(object):
minimum_frequency = PropertyAccessor('strain_data', 'minimum_frequency')
maximum_frequency = PropertyAccessor('strain_data', 'maximum_frequency')
frequency_mask = PropertyAccessor('strain_data', 'frequency_mask')
frequency_domain_strain = PropertyAccessor('strain_data', 'frequency_domain_strain')
frequency_domain_strain = PropertyAccessor(
'strain_data', 'frequency_domain_strain')
time_domain_strain = PropertyAccessor('strain_data', 'time_domain_strain')

def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency, length, latitude, longitude,
Expand Down Expand Up @@ -108,10 +109,14 @@ def __repr__(self):
'maximum_frequency={}, length={}, latitude={}, longitude={}, elevation={}, ' \
'xarm_azimuth={}, yarm_azimuth={}, xarm_tilt={}, yarm_tilt={})' \
.format(self.name, self.power_spectral_density, float(self.strain_data.minimum_frequency),
float(self.strain_data.maximum_frequency), float(self.geometry.length),
float(self.geometry.latitude), float(self.geometry.longitude),
float(self.geometry.elevation), float(self.geometry.xarm_azimuth),
float(self.geometry.yarm_azimuth), float(self.geometry.xarm_tilt),
float(self.strain_data.maximum_frequency), float(
self.geometry.length),
float(self.geometry.latitude), float(
self.geometry.longitude),
float(self.geometry.elevation), float(
self.geometry.xarm_azimuth),
float(self.geometry.yarm_azimuth), float(
self.geometry.xarm_tilt),
float(self.geometry.yarm_tilt))

def set_strain_data_from_gwpy_timeseries(self, time_series):
Expand Down Expand Up @@ -280,7 +285,8 @@ def antenna_response(self, ra, dec, time, psi, mode):

"""
if mode in ["plus", "cross", "x", "y", "breathing", "longitudinal"]:
polarization_tensor = get_polarization_tensor(ra, dec, time, psi, mode)
polarization_tensor = get_polarization_tensor(
ra, dec, time, psi, mode)
return three_by_three_matrix_contraction(self.geometry.detector_tensor, polarization_tensor)
elif mode == self.name:
return 1
Expand Down Expand Up @@ -342,7 +348,8 @@ def get_detector_response(self, waveform_polarizations, parameters, frequencies=
dt_geocent = parameters['geocent_time'] - self.strain_data.start_time
dt = dt_geocent + time_shift

signal_ifo[mask] = signal_ifo[mask] * np.exp(-1j * 2 * np.pi * dt * frequencies)
signal_ifo[mask] = signal_ifo[mask] * \
np.exp(-1j * 2 * np.pi * dt * frequencies)

signal_ifo[mask] *= self.calibration_model.get_calibration_factor(
frequencies, prefix='recalib_{}_'.format(self.name), **parameters
Expand Down Expand Up @@ -371,7 +378,8 @@ def check_signal_duration(self, parameters, raise_error=True):

if ("mass_1" not in parameters) and ("mass_2" not in parameters):
if raise_error:
raise AttributeError("Unable to check signal duration as mass not given")
raise AttributeError(
"Unable to check signal duration as mass not given")
else:
return

Expand All @@ -392,6 +400,111 @@ def check_signal_duration(self, parameters, raise_error=True):
else:
logger.warning(msg)

def adjust_optimal_snr(self, frequency_domain_strain, target_snr):
""" Rescale a frequency-domain strain array to a target optimal SNR.

Parameters
==========
frequency_domain_strain: numpy.array
Frequency-domain strain to rescale.
target_snr: float
Desired optimal SNR.

Returns
=======
numpy.ndarray
Frequency-domain strain rescaled so that its optimal SNR equals ``target_snr``.
"""
temporary_snr_squared = np.real(
self.optimal_snr_squared(signal=frequency_domain_strain))
return frequency_domain_strain * target_snr / np.sqrt(temporary_snr_squared)

def inject_glitch(self, glitch_parameters=None, glitch_time_domain_strain=None,
glitch_sample_times=None, glitch_waveform_generator=None):
""" Inject a glitch into the interferometer data.

Parameters
==========
glitch_parameters: dict, optional
Dictionary of glitch parameters.
Must contain ``onset_time`` (the GPS time at which the glitch peak should occur).
Must contain ``snr``. The glitch will be rescaled in such a
way that it's optimal SNR matches to this value.
glitch_time_domain_strain: numpy.array, optional
Time-domain strain data of the glitch to inject.
glitch_sample_times: numpy.array
Array of sample times corresponding to ``glitch_time_domain_strain``.
glitch_waveform_generator: bilby.gw.WaveformGenerator,
A waveform generator used to produce the glitch frequency-domain
strain.
"""

if glitch_parameters is None:
glitch_parameters = {}
for key, val in [('ra', 0.0), ('dec', 0.0), ('psi', 0.0)]:
glitch_parameters.setdefault(key, val)

if glitch_time_domain_strain is None and glitch_waveform_generator is None:
raise ValueError(
"inject_glitch needs one of glitch_waveform_generator or "
"glitch_time_domain_strain.")
elif glitch_time_domain_strain is not None:
# Populate ra, dec, psi. Necessary to make an injection.
glitch_sampling_frequency = 1.0 / (glitch_sample_times[1] - glitch_sample_times[0])
# Resample the glitch to match with the interferometer's sampling frequency
if glitch_sampling_frequency != self.sampling_frequency:
temp_sample_times = np.arange(
len(self.time_array)) / self.sampling_frequency
glitch_time_domain_strain = np.interp(
temp_sample_times, glitch_sample_times, glitch_time_domain_strain)
glitch_sample_times = temp_sample_times

# Padding the glitch to have the correct size for the fft.
padded_glitch = np.zeros(len(self.time_array))
padded_glitch[:len(glitch_time_domain_strain)
] += glitch_time_domain_strain

# Convert to frequency domain glitch
glitch_frequency_domain_strain, _ = utils.nfft(
padded_glitch, self.sampling_frequency)

# Adjust SNR
glitch_frequency_domain_strain = self.adjust_optimal_snr(
glitch_frequency_domain_strain, glitch_parameters['snr'])
injection_polarizations = {
self.name: glitch_frequency_domain_strain}

# Roll the glitch since the glitch maxima of the glitch may not align in the same way as the signal maxima.
glitch_parameters['geocent_time'] = glitch_parameters["onset_time"] \
- glitch_sample_times[np.argmax(glitch_time_domain_strain)]

# Usual inject method
self.inject_signal_from_waveform_polarizations(parameters=glitch_parameters,
injection_polarizations=injection_polarizations)

elif glitch_waveform_generator is not None:
glitch_frequency_domain_strain = glitch_waveform_generator.frequency_domain_strain(
glitch_parameters)
glitch_frequency_domain_strain = self.adjust_optimal_snr(
np.asarray(glitch_frequency_domain_strain[0]), glitch_parameters['snr'])
injection_polarizations = {self.name: glitch_frequency_domain_strain}

# Populate ra, dec, psi. Necessary to make an injection.
for key, val in [('ra', 0.0), ('dec', 0.0), ('psi', 0.0)]:
glitch_parameters.setdefault(key, val)
# Roll the glitch since the glitch maxima of the glitch may not align in the same way as the signal maxima.
glitch_time_domain_strain = glitch_waveform_generator.time_domain_strain(glitch_parameters)
glitch_parameters['geocent_time'] = glitch_parameters["onset_time"] - \
glitch_waveform_generator.time_array[np.argmax(glitch_time_domain_strain)]

self.inject_signal_from_waveform_polarizations(parameters=glitch_parameters,
injection_polarizations=injection_polarizations)

logger.info("Injected a glitch in: {}".format(self.name))
logger.info("Optimal SNR of the glitch: {}".format(glitch_parameters['snr']))

return glitch_time_domain_strain

def inject_signal(self, parameters, injection_polarizations=None,
waveform_generator=None, raise_error=True):
""" General signal injection method.
Expand Down Expand Up @@ -490,7 +603,8 @@ def inject_signal_from_waveform_polarizations(self, parameters, injection_polari
'Injecting signal outside segment, start_time={}, merger time={}.'
.format(self.strain_data.start_time, parameters['geocent_time']))

signal_ifo = self.get_detector_response(injection_polarizations, parameters)
signal_ifo = self.get_detector_response(
injection_polarizations, parameters)
self.strain_data.frequency_domain_strain += signal_ifo

self.meta_data['optimal_SNR'] = (
Expand All @@ -500,8 +614,10 @@ def inject_signal_from_waveform_polarizations(self, parameters, injection_polari
self.meta_data['parameters'] = parameters

logger.info("Injected signal in {}:".format(self.name))
logger.info(" optimal SNR = {:.2f}".format(self.meta_data['optimal_SNR']))
logger.info(" matched filter SNR = {:.2f}".format(self.meta_data['matched_filter_SNR']))
logger.info(" optimal SNR = {:.2f}".format(
self.meta_data['optimal_SNR']))
logger.info(" matched filter SNR = {:.2f}".format(
self.meta_data['matched_filter_SNR']))
for key in parameters:
logger.info(' {} = {}'.format(key, parameters[key]))

Expand All @@ -512,7 +628,8 @@ def _window_power_correction(self):
using the :code:`BILBY_INCORRECT_PSD_NORMALIZATION` environment variable.
"""
if string_to_boolean(
os.environ.get("BILBY_INCORRECT_PSD_NORMALIZATION", "FALSE").upper()
os.environ.get("BILBY_INCORRECT_PSD_NORMALIZATION",
"FALSE").upper()
):
return self.strain_data.window_factor
else:
Expand Down Expand Up @@ -657,11 +774,12 @@ def matched_filter_snr(self, signal):
"""
return gwutils.matched_filter_snr(
signal=signal[self.strain_data.frequency_mask],
frequency_domain_strain=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask],
frequency_domain_strain=self.strain_data.frequency_domain_strain[
self.strain_data.frequency_mask],
power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask],
duration=self.strain_data.duration)

def whiten_frequency_series(self, frequency_series : np.array) -> np.array:
def whiten_frequency_series(self, frequency_series: np.array) -> np.array:
"""Whitens a frequency series with the noise properties of the detector

.. math::
Expand All @@ -684,7 +802,7 @@ def whiten_frequency_series(self, frequency_series : np.array) -> np.array:

def get_whitened_time_series_from_whitened_frequency_series(
self,
whitened_frequency_series : np.array
whitened_frequency_series: np.array
) -> np.array:
"""Gets the whitened time series from a whitened frequency series.

Expand Down Expand Up @@ -769,10 +887,12 @@ def save_data(self, outdir, label=None):

if label is None:
filename_asd = '{}/{}_asd.dat'.format(outdir, self.name)
filename_data = '{}/{}_frequency_domain_data.dat'.format(outdir, self.name)
filename_data = '{}/{}_frequency_domain_data.dat'.format(
outdir, self.name)
else:
filename_asd = '{}/{}_{}_asd.dat'.format(outdir, self.name, label)
filename_data = '{}/{}_{}_frequency_domain_data.dat'.format(outdir, self.name, label)
filename_data = '{}/{}_{}_frequency_domain_data.dat'.format(
outdir, self.name, label)
np.savetxt(filename_data,
np.array(
[self.strain_data.frequency_array,
Expand All @@ -791,7 +911,8 @@ def plot_data(self, signal=None, outdir='.', label=None):
return

fig, ax = plt.subplots()
df = self.strain_data.frequency_array[1] - self.strain_data.frequency_array[0]
df = self.strain_data.frequency_array[1] - \
self.strain_data.frequency_array[0]
asd = gwutils.asd_from_freq_series(
freq_data=self.strain_data.frequency_domain_strain, df=df)

Expand Down Expand Up @@ -924,7 +1045,8 @@ def _filename_from_outdir_label_extension(outdir, label, extension="h5"):
))
def to_pickle(self, outdir="outdir", label=None):
utils.check_directory_exists_and_if_not_mkdir('outdir')
filename = self._filename_from_outdir_label_extension(outdir, label, extension="pkl")
filename = self._filename_from_outdir_label_extension(
outdir, label, extension="pkl")
safe_file_dump(self, filename, "dill")

@classmethod
Expand Down
Loading