Source code for strax.dtypes

"""Fundamental dtypes for use in strax.

Note that if you change the dtype titles (comments), numba will crash if
there is an existing numba cache. Clear __pycache__ and restart.
TODO: file numba issue.

"""

from typing import Optional, Tuple

import numpy as np
import numba  # noqa: F401
import strax


__all__ = (
    "interval_dtype raw_record_dtype record_dtype hit_dtype peak_dtype "
    "DIGITAL_SUM_WAVEFORM_CHANNEL DEFAULT_RECORD_LENGTH "
    "time_fields time_dt_fields hitlet_dtype hitlet_with_data_dtype "
    "copy_to_buffer peak_interval_dtype"
).split()

DIGITAL_SUM_WAVEFORM_CHANNEL = -1
DEFAULT_RECORD_LENGTH = 110


time_fields = [
    (("Start time since unix epoch [ns]", "time"), np.int64),
    (("Exclusive end time since unix epoch [ns]", "endtime"), np.int64),
]

time_dt_fields = [
    (("Start time since unix epoch [ns]", "time"), np.int64),
    # Don't try to make O(second) long intervals!
    (("Length of the interval in samples", "length"), np.int32),
    (("Width of one sample [ns]", "dt"), np.int16),
]

# Base dtype for interval-like objects (pulse, hit)
interval_dtype = time_dt_fields + [(("Channel/PMT number", "channel"), np.int16)]

# Base dtype with interval like objects for long objects (peaks)
peak_interval_dtype = interval_dtype.copy()
# Allow peaks to have very long dts
peak_interval_dtype[2] = (("Width of one sample [ns]", "dt"), np.int32)


[docs] def raw_record_dtype(samples_per_record=DEFAULT_RECORD_LENGTH): """Data type for a waveform raw_record. Length can be shorter than the number of samples in data, this indicates a record with zero- padding at the end. """ return interval_dtype + [ # np.int16 is not enough for some PMT flashes... ( ("Length of pulse to which the record belongs (without zero-padding)", "pulse_length"), np.int32, ), (("Fragment number in the pulse", "record_i"), np.int16), (("Baseline determined by the digitizer (if this is supported)", "baseline"), np.int16), # Note this is defined as a SIGNED integer, so we can # still represent negative values after subtracting baselines (("Waveform data in raw ADC counts", "data"), np.int16, samples_per_record), ]
[docs] def record_dtype(samples_per_record=DEFAULT_RECORD_LENGTH): """Data type for a waveform record. Length can be shorter than the number of samples in data, this indicates a record with zero- padding at the end. """ return interval_dtype + [ # np.int16 is not enough for some PMT flashes... ( ("Length of pulse to which the record belongs (without zero-padding)", "pulse_length"), np.int32, ), (("Fragment number in the pulse", "record_i"), np.int16), (("Integral in ADC counts x samples", "area"), np.int32), ( ("Level of data reduction applied (strax.ReductionLevel enum)", "reduction_level"), np.uint8, ), (("Baseline in ADC counts. data = int(baseline) - data_orig", "baseline"), np.float32), (("Baseline RMS in ADC counts. data = baseline - data_orig", "baseline_rms"), np.float32), ( ("Multiply data by 2**(this number). Baseline is unaffected.", "amplitude_bit_shift"), np.int16, ), ( ("Waveform data in raw counts above integer part of baseline", "data"), np.int16, samples_per_record, ), ]
# Data type for a 'hit': a sub-range of a record hit_dtype = interval_dtype + [ (("Integral [ADC x samples]", "area"), np.float32), (("Index of sample in record in which hit starts", "left"), np.int16), (("Index of first sample in record just beyond hit (exclusive bound)", "right"), np.int16), ( ("For lone hits, index of sample in record where integration starts", "left_integration"), np.int16, ), ( ("For lone hits, index of first sample beyond integration region", "right_integration"), np.int16, ), (("Internal (temporary) index of fragment in which hit was found", "record_i"), np.int32), (("ADC threshold applied in order to find hits", "threshold"), np.float32), (("Maximum amplitude above baseline [ADC counts]", "height"), np.float32), ]
[docs] def hitlet_dtype(): """Hitlet dtype same as peaklet or peak dtype but for hit-kind of objects.""" dtype = interval_dtype + [ (("Total hit area in pe", "area"), np.float32), (("Maximum of the PMT pulse in pe/sample", "amplitude"), np.float32), (('Position of the Amplitude in ns (minus "time")', "time_amplitude"), np.int16), (("Hit entropy", "entropy"), np.float32), (("Width (in ns) of the central 50% area of the hitlet", "range_50p_area"), np.float32), (("Width (in ns) of the central 80% area of the hitlet", "range_80p_area"), np.float32), (("Position of the 25% area decile [ns]", "left_area"), np.float32), (("Position of the 10% area decile [ns]", "low_left_area"), np.float32), ( ( "Width (in ns) of the highest density region covering a 50% area of the hitlet", "range_hdr_50p_area", ), np.float32, ), ( ( "Width (in ns) of the highest density region covering a 80% area of the hitlet", "range_hdr_80p_area", ), np.float32, ), (("Left edge of the 50% highest density region [ns]", "left_hdr"), np.float32), (("Left edge of the 80% highest density region [ns]", "low_left_hdr"), np.float32), (("FWHM of the PMT pulse [ns]", "fwhm"), np.float32), (('Left edge of the FWHM [ns] (minus "time")', "left"), np.float32), (("FWTM of the PMT pulse [ns]", "fwtm"), np.float32), (('Left edge of the FWTM [ns] (minus "time")', "low_left"), np.float32), ] return dtype
[docs] def hitlet_with_data_dtype(n_samples=2): """Hitlet dtype with data field. Required within the plugins to compute hitlet properties. :param n_samples: Buffer length of the data field. Make sure it can hold the longest hitlet. """ if n_samples < 2: raise ValueError("n_samples must be at least 2!") dtype = hitlet_dtype() additional_fields = [ ( ( "Hitlet data in PE/sample with ZLE (only the first length samples are filled)", "data", ), np.float32, n_samples, ), (("Dummy max_gap required for splitting", "max_gap"), np.int32), (("Dummy max_diff required for splitting", "max_diff"), np.int32), (("Dummy min_diff required for splitting", "min_diff"), np.int32), (("Maximum interior goodness of split", "max_goodness_of_split"), np.float32), ] return dtype + additional_fields
[docs] def peak_dtype( n_channels=100, n_sum_wv_samples=200, n_widths=11, digitize_top=True, hits_timing=True ): """Data type for peaks - ranges across all channels in a detector Remember to set channel to -1 (todo: make enum) """ if n_channels == 1: raise ValueError("Must have more than one channel") # Otherwise array changes shape?? badness ensues dtype = peak_interval_dtype + [ # For peaklets this is likely to be overwritten: (("Classification of the peak(let)", "type"), np.int8), (("Integral across channels [PE]", "area"), np.float32), (("Integral per channel [PE]", "area_per_channel"), np.float32, n_channels), (("Number of hits contributing at least one sample to the peak ", "n_hits"), np.int32), (("Waveform data in PE/sample (not PE/ns!)", "data"), np.float32, n_sum_wv_samples), (("Peak widths in range of central area fraction [ns]", "width"), np.float32, n_widths), ( ("Peak widths: time between nth and 5th area decile [ns]", "area_decile_from_midpoint"), np.float32, n_widths, ), (("Does the channel reach ADC saturation?", "saturated_channel"), np.int8, n_channels), (("Total number of saturated channels", "n_saturated_channels"), np.int16), (("Channel within tight range of mean", "tight_coincidence"), np.int16), (("Largest gap between hits inside peak [ns]", "max_gap"), np.int32), (("Maximum interior goodness of split", "max_goodness_of_split"), np.float32), ] if hits_timing: dtype += [ ( ("Largest time difference between apexes of hits inside peak [ns]", "max_diff"), np.int32, ), ( ("Smallest time difference between apexes of hits inside peak [ns]", "min_diff"), np.int32, ), ] if digitize_top: top_field = ( ("Waveform data in PE/sample (not PE/ns!), top array", "data_top"), np.float32, n_sum_wv_samples, ) dtype.insert(9, top_field) return dtype
[docs] def copy_to_buffer( source: np.ndarray, buffer: np.ndarray, func_name: str, field_names: Optional[Tuple[str]] = None ): """Copy the data from the source to the destination e.g. raw_records to records. To this end, we dynamically create the njitted function with the name 'func_name' (should start with "_"). :param source: array of input :param buffer: array of buffer to fill with values from input :param func_name: how to store the dynamically created function. Should start with an _underscore :param field_names: dtype names to copy (if none, use all in the source) """ if np.shape(source) != np.shape(buffer): raise ValueError("Source should be the same length as the buffer") if field_names is None: # To lazy to specify what to copy, just do all field_names = tuple( n for n in source.dtype.names if n in buffer.dtype.names ) # type: ignore elif not any([n in buffer.dtype.names for n in field_names]): raise ValueError("Trying to copy dtypes that are not in the destination") if not func_name.startswith("_"): raise ValueError('Start function with "_"') # Make hash from buffer dtype to have differnt copy functions # if dtype changes. field_names_hash = strax.deterministic_hash(repr(buffer.dtype)) func_name += field_names_hash if func_name not in globals(): # Create a numba function for us _create_copy_function(buffer.dtype, field_names, func_name) globals()[func_name](source, buffer)
def _create_copy_function(res_dtype, field_names, func_name): """Write out a numba-njitted function to copy data.""" # Cannot cache = True since we are creating the function dynamically code = f""" @numba.njit(nogil=True) def {func_name}(source, result): for i in range(len(source)): s = source[i] r = result[i] """ for d in field_names: if d not in res_dtype.names: raise ValueError("This cannot happen") if np.shape(res_dtype[d]): # Copy array fields as arrays code += f'\n r["{d}"][:] = s["{d}"][:]' else: code += f'\n r["{d}"] = s["{d}"]' # pylint: disable=exec-used exec(code, globals())