import numpy as np
import numba
import strax
export, __all__ = strax.exporter()
[docs]
@export
@numba.njit(cache=True, nogil=True)
def index_of_fraction(peaks, fractions_desired):
"""Return the (fractional) indices at which the peaks reach fractions_desired of their area.
:param peaks: strax peak(let)s or other data-bearing dtype
:param fractions_desired: array of floats between 0 and 1
:return: (len(peaks), len(fractions_desired)) array of floats
"""
results = np.zeros((len(peaks), len(fractions_desired)), dtype=np.float32)
for p_i, p in enumerate(peaks):
if p["area"] <= 0:
continue # TODO: These occur a lot. Investigate!
compute_index_of_fraction(p, fractions_desired, results[p_i])
return results
[docs]
@export
@numba.njit(nogil=True, cache=True)
def compute_index_of_fraction(peak, fractions_desired, result):
"""Store the (fractional) indices at which peak reaches fractions_desired of their area in
result.
:param peak: single strax peak(let) or other data-bearing dtype
:param fractions_desired: array of floats between 0 and 1
:return: len(fractions_desired) array of floats
"""
area_tot = peak["area"]
fraction_seen = 0
current_fraction_index = 0
needed_fraction = fractions_desired[current_fraction_index]
for i, x in enumerate(peak["data"][: peak["length"]]):
# How much of the area is in this sample?
fraction_this_sample = x / area_tot
# Are we passing any desired fractions in this sample?
while fraction_seen + fraction_this_sample >= needed_fraction:
area_needed = area_tot * (needed_fraction - fraction_seen)
if x != 0:
result[current_fraction_index] = i + area_needed / x
else:
result[current_fraction_index] = i
# Advance to the next fraction
current_fraction_index += 1
if current_fraction_index > len(fractions_desired) - 1:
break
needed_fraction = fractions_desired[current_fraction_index]
if current_fraction_index > len(fractions_desired) - 1:
break
# Add this sample's area to the area seen
fraction_seen += fraction_this_sample
if needed_fraction == 1:
# Sometimes floating-point errors prevent the full area
# from being reached before the waveform ends
result[-1] = peak["length"]
[docs]
@export
def compute_widths(peaks):
"""Compute widths in ns at desired area fractions for peaks.
:param peaks: single strax peak(let) or other data-bearing dtype
"""
desired_widths = np.linspace(0, 1, len(peaks[0]["width"]))
# 0% are width is 0 by definition, and it messes up the calculation below
desired_widths = desired_widths[1:]
# Which area fractions do we need times for?
desired_fr = np.concatenate([0.5 - desired_widths / 2, 0.5 + desired_widths / 2])
# We lose the 50% fraction with this operation, let's add it back
desired_fr = strax.stable_sort(np.unique(np.append(desired_fr, [0.5])))
fr_times = index_of_fraction(peaks, desired_fr)
fr_times *= peaks["dt"].reshape(-1, 1)
i = len(desired_fr) // 2
median_time = fr_times[:, i]
width = fr_times[:, i:] - fr_times[:, ::-1][:, i:]
area_decile_from_midpoint = fr_times[:, ::2] - fr_times[:, i].reshape(-1, 1)
return median_time, width, area_decile_from_midpoint
@numba.njit(cache=True, nogil=True)
def compute_center_time(peaks):
"""Compute the center time of the peaks.
:param peaks: single strax peak(let) or other data-bearing dtype
"""
center_time = np.zeros(len(peaks), dtype=np.int64)
for p_i, p in enumerate(peaks):
data = p["data"][: p["length"]]
if data.sum() == 0.0:
# Zero-area peaks have centertime at startime
center_time[p_i] = p["time"]
continue
t = np.average(np.arange(p["length"]), weights=data)
center_time[p_i] = (t + 1 / 2) * p["dt"]
center_time[p_i] += p["time"] # converting from float to int, implicit floor
center_time = np.clip(center_time, peaks["time"], strax.endtime(peaks))
return center_time
[docs]
@export
@numba.njit(cache=True, nogil=True)
def compute_area_fraction_top(peaks, n_top_channels):
"""Compute the area fraction top for peaks."""
area_fraction_top = np.zeros(len(peaks), dtype=np.float32)
for peak_i in range(len(peaks)):
p = peaks[peak_i]
area_top = p["area_per_channel"][:n_top_channels].sum()
# Non-positive-area peaks get NaN AFT
if p["area"] > 0:
area_fraction_top[peak_i] = area_top / p["area"]
else:
area_fraction_top[peak_i] = np.nan
return area_fraction_top
[docs]
@export
def compute_properties(peaks, n_top_channels=0, select_peaks_indices=None):
"""Compute properties: median_time, width, area_decile_from_midpoint,
center_time, and area_fraction_top for peaks.
:param peaks: single strax peak(let) or other data-bearing dtype
:param select_peaks_indices: array of integers informing which peaks to compute default to None
in which case compute for all peaks
"""
if not len(peaks) or (select_peaks_indices is not None and not len(select_peaks_indices)):
return
if select_peaks_indices is None:
select_peaks_indices = slice(None)
median_time, width, area_decile_from_midpoint = compute_widths(peaks[select_peaks_indices])
peaks["median_time"][select_peaks_indices] = median_time
peaks["width"][select_peaks_indices] = width
peaks["area_decile_from_midpoint"][select_peaks_indices] = area_decile_from_midpoint
center_time = compute_center_time(peaks[select_peaks_indices])
peaks["center_time"][select_peaks_indices] = center_time
if n_top_channels > 0:
area_fraction_top = compute_area_fraction_top(peaks[select_peaks_indices], n_top_channels)
peaks["area_fraction_top"][select_peaks_indices] = area_fraction_top