Strax

Github page: https://github.com/AxFoundation/strax

Strax is an analysis framework for pulse-only digitization data, specialized for live data processing at speeds of 50-100 MB(raw) / core / sec.

For comparison, this is more than 100x faster than the XENON1T processor pax, and does not require a preprocessing stage (‘eventbuilder’). It achieves this due to using numpy structured arrays internally, which are supported by the amazing just-in-time compiler numba.

Strax is primarily developed for the XENONnT experiment, although the configuration and specific algorithms for XENONnT are hosted at https://github.com/XENONnT/straxen. You can find its documentation here.

You might also find these presentations useful:

Setting up strax

To install the latest stable version (from pypi), run pip install strax. Dependencies should install automatically: numpy, pandas, numba, two compression libraries (blosc and zstd) and a few miscellaneous pure-python packages. Strax requires python >= 3.8.

If you want to try out strax on XENON1T data, you’re probably better off installing strax’s XENON bindings at https://github.com/XENONnT/straxen. Strax will be automatically installed along with straxen.

You might want to install some dependencies (such as numpy and numba) via conda rather than pip, but it’s up to you.

You can also clone the repository, then setup a developer installation with python setup.py develop.

If you experience problems during installation, try installing exactly the same version of the dependencies as used on the Github Actions integrated testing. Clone the repository, then do pip install -r strax/extra_requirements/requirements-tests.txt.

Using strax

Strax is centralized around a centralized class: the Context. Users interact with the context to process and request data. On this page, we will build up an example context from scratch and how to interact with this context.

A user will always interact with the context, after which the context will figure out the required action to perform such as loading data from disk, or processing data newly.

_images/context.svg

To start a new context, initialize the class from strax:

>>> import strax

>>> context = strax.Context()

At this point, we still need to add the plugins and storage (explained below) to use the context.

Plugins

To a context several classes are registered that are responsible for processing/generating the data. In the example-figure above, we have three plugins:

  • PulseProcessing

  • PeakProcessing

  • EventProcessing

Let’s start by adding the first plugin (PulseProcessing):

>>> import strax
>>> from strax.testutils import Records as PulseProcessing
>>> context = strax.Context(register = [PulseProcessing],
                            storage = [],  # We will explain the storage below
                           )
>>> print(f'Our context provides the following data-types {context.provided_dtypes().keys()}')
Our context provides the following data-types dict_keys(['records'])

Now one can load data via the context (because our PulseProcessing plugin provides new data at request)

>>> pulses = context.get_array(run_id = 'some_run',
                               targets = 'records',)
>>> len(pulses)
100

You can see that in the figure above, we asked for data at A, at which point the context checked which plugin was responsible for generating the data ‘records’, processed this data (C) and returned the data back to the user (A).

This data is not stored, but generated on the fly:

>>> pulses = context.is_stored(run_id = 'some_run', target = 'records',)
False

Storage

If we want to add one or more storage-sites, we can do this while creating our context (using the storage keyword argument we specified as storage=[] above) or by adding storage later:

If we now add a storage site (which is called a storage frontend, see the developer documentation) we can check again if the data is stored. Just after adding the storage site, the data is not stored. However, If we now ask for data again, we will see that the context will again process the data (C) and will appart from returning it only to the user (A), also store it to disk (B). The next time a user will request this data, the context will load it directly from disk.

>>> my_storage_site = strax.DataDirectory('./strax_data')
>>> context.storage.append(my_storage_site)
>>> context.is_stored(run_id = 'some_run', target = 'records',)
False
>>> pulses = context.get_array(run_id = 'some_run',
                               targets = 'records',
                              )
>>> context.is_stored(run_id = 'some_run', target = 'records',)
True

Options

Plugins take options (also see the advanced documentation on plugins and configs). A basic plugin to process the Peak-Processing could look like this:

import numpy as np

class PeakProcessing(strax.Plugin):
    """Base every peak on a record, and make all peaks of the same type"""
    __version__ = '0.0.0'

    # Which input is this plugin based on
    depends_on = 'records'

    # Which data type does the plugin provide
    provides = 'peaks'

    # A configuration option, which we can use in the computation (self.compute)
    peak_type = strax.Config(default=1,
                             type=int,
                             help='Classifify all records as this type of peak'
                             )

    # The numpy-dtype of the output
    dtype = strax.peak_dtype()


    def compute(self, records):
        """Compute the peaks based on the records"""
        result = np.zeros(len(records), dtype=self.dtype)

        # Use the 'peak_type' config to set the type of this data
        result['type'] = self.config['peak_type']

        # Strax always needs time fields, see advanced documentation
        result['time'] = records['time']
        result['dt'] = records['dt']
        result['length'] = records['length']

        return result

>>> context.register(PeakProcessing)

Now we can load peaks just as we did for our records-data. Because of the peak_type configuration, we can now test that all the data is of the same type:

>>> peaks = context.get_array('some_run', targets='peaks')
>>> assert np.all(peaks['type']==1)

Strax tracks options via the “lineage” of a plugin, by bookkeeping the options, with which data was processed. Let’s have a look what this looks like for our current context:

We can see that for peaks, this lineage contains the peak_type-option (and it’s value). Additionally, we see that this lineage also contains the name of the plugin providing the data ('PeakProcessing'), the version number ('0.0.0') as well as the same information for the lower-lying plugin (‘records’) for which it also shows the plugin-name, plugin-version and plugin-configurations.

We can also change the options set in plugins, again, using the context:

>>> context.set_config(dict(peak_type=2))
>>> peak_processing = context.get_single_plugin(run_id='some_run', data_name='peaks')
>>> peak_processing.lineage
{'peaks': ('PeakProcessing', '0.0.0', {'peak_type': 2}),
 'records': ('Records', '0.0.0', {'crash': False, 'dummy_tracked_option': 42})}

If we now request data again, the context will check if the plugin with this configuration is stored, and since this is not the case, recompute it.

>>> peaks = context.get_array('some_run', targets='peaks')
>>> assert np.all(peaks['type']==2)

We will now see that if we check the data-folder, there are two versions of the data! One for each configuration of our option peak_type

>>> import os
>>> os.listdir('./strax_data')
['some_run-peaks-3g2rc4f3jg',
 'some_run-peaks-vqo4oamp76',
 'some_run-records-j3nd2fjbiq']

Strax will create a hash for the lineage as in the examples above. Which is different when whe changed the peak_type in our examples above. You can check the lineage e.g. by using the context.key_for method (which computes the lineage and corresponding hash for the requested datatype:

>>> context.set_config(dict(peak_type=1))
>>> print(context.key_for(run_id='some_run', target='peaks'))
>>> context.set_config(dict(peak_type=2))
>>> print(context.key_for(run_id='some_run', target='peaks'))
some_run-peaks-3g2rc4f3jg
some_run-peaks-vqo4oamp76

For more examples, checkout the developer and advanced documentation.

Plugin development

Special time fields

The time, endtime, dt and length fields have special meaning for strax.

It is useful for most plugins to output a time and endtime field, indicating the start and (exclusive) end time of the entities you are producing. If you do not do this, your plugin cannot be loaded for part of a run (e.g. with seconds_range).

Both time and endtime should be 64-bit integer timestamps in nanoseconds since the unix epoch. Instead of endtime, you can provide dt (an integer time resolution in ns) and length (integer); strax will then compute the endtime as dt * length. Lower-level datatypes commonly use this.

Usually, you simply pass these time-related fields through from one of your dependencies (e.g. events or peak_basics). You should only modify them if you are changing data kind. If your plugin does defines a new data kind, you set the values yourself, depending on the thing you are making (events, peaks, etc).

Multiple outputs

Plugins can have multiple outputs. Do not use this if you just want to return a multi-column output (e.g. the area and width of a peak), that’s what the structured arrays are for. But if you want to return incompatible kinds of rows, e.g. both records and hits, or peaks and some metadata for each chunk, multi-output support is essential.

To return multiple outputs from a plugin:
  • The provides tuple should have multiple elements, listing the provided data type names

  • The dtype and data_kind attributes should be dictionaries mapping data type names to dtypes and data kind names, respectively. The values of these dicts can be specified in the same way as the entire attribute would be for a single-output plugin

  • The compute method must return a dictionary mapping data types to results (structured numpy arrays or field/array dictionaries).

Plugin types

There are several plugin types:
  • Plugin: The general type of plugin. Should contain at least depends_on = <datakind>, provides = <datatype>, def compute(self, <datakind>), and dtype = <dtype> or def infer_dtype(): <>.

  • OverlapWindowPlugin: Allows a plugin to look for data in adjacent chunks. A OverlapWindowPlugin assumes all inputs are sorted by endtime. This only works for disjoint intervals such as peaks or events, but NOT records! The user has to define get_window_size(self) along with the plugin which returns the required chunk extension in nanoseconds.

  • LoopPlugin: Allows user to loop over a given datakind and find the corresponding data of a lower datakind using for example def compute_loop(self, events, peaks) where we loop over events and get the corresponding peaks that are within the time range of the event. By default the second argument (peaks) must be fully contained in the first argument (events ). If a touching time window is desired set the class attribute time_selection to ‘`touching’`.

  • CutPlugin: Plugin type where using def cut_by(self, <datakind>) inside the plugin a user can return a boolean array that can be used to select data.

  • MergeOnlyPlugin: This is for internal use and only merges two plugins into a new one. See as an example in straxen the EventInfo plugin where the following datatypes are merged 'events', 'event_basics', 'event_positions', 'corrected_areas', 'energy_estimates'.

  • ParallelSourcePlugin: For internal use only to parallelize the processing of low level plugins. This can be activated using stating parallel = 'process' in a plugin.

Minimal examples

Below, each of the plugins is minimally worked out, each plugin can be worked out into much greater detail, see e.g. the plugins in straxen.

strax.Plugin

In the example below, we are going to set the BasePlugin.__version__ to None which is useful for development, since the class automatically generates a version when the code is changed. For mature pipelines, like straxen, specifying the __version__ manually is recommended since it allows users to specify if any code-change should be reflected in the plugins’ lineage.

# To tests, one can use these dummy Peaks and Records from strax
import strax
import numpy as np
from strax.testutils import Records, Peaks, run_id
st = strax.Context(register=[Records, Peaks])

class BasePlugin(strax.Plugin):
    """The most common plugin where computations on data are performed in strax"""
    depends_on = 'records'

    # For good practice always specify the version and provide argument
    provides = 'simple_data'
    # When you are developing a new plugin, you can set this version to None to
    # auto-infer this from the code of the plugin.
    __version__ = None  # Only for development! Otherwise '0.0.0'

    # We need to specify the datatype, for this example, we are
    # going to calculate some areas
    dtype = strax.time_fields + [(("Total ADC counts",'area'), np.int32)]

    def compute(self, records):
        result = np.zeros(len(records), dtype=self.dtype)

        # All data in strax must have some sort of time fields
        result['time'] = records['time']
        result['endtime'] = strax.endtime(records)

        # For this example, we calculate the total sum of the records-data
        result['area'] = np.sum(records['data'], axis = 1)
        return result

st.register(BasePlugin)
st.get_df(run_id, 'simple_data')

strax.OverlapWindowPlugin

class OverlapPlugin(strax.OverlapWindowPlugin):
    """
    Allow peaks get_window_size() left and right to get peaks
        within the time range
    """
    depends_on = 'peaks'
    provides = 'overlap_data'

    dtype = strax.time_fields + [(("total peaks", 'n_peaks'), np.int16)]

    def get_window_size(self):
        # Look 10 ns left and right of each peak
        return 10

    def compute(self, peaks):
        result = np.zeros(1, dtype=self.dtype)
        result['time'] = np.min(peaks['time'])
        result['endtime'] = np.max(strax.endtime(peaks))
        result['n_peaks'] = len(peaks)
        return result

st.register(OverlapPlugin)
st.get_df(run_id, 'overlap_data')

strax.LoopPlugin

class LoopData(strax.LoopPlugin):
    """Loop over peaks and find the records within each of those peaks."""
    depends_on = 'peaks', 'records'
    provides = 'looped_data'

    dtype = strax.time_fields + [(("total records", 'n_records'), np.int16)]

    # The LoopPlugin specific requirements
    time_selection = 'fully_contained' # other option is 'touching'
    loop_over = 'peaks'

    # Use the compute_loop() instead of compute()
    def compute_loop(self, peaks, records):
        result = np.zeros(len(peaks), dtype=self.dtype)
        result['time'] = np.min(peaks['time'])
        result['endtime'] = np.max(strax.endtime(peaks))
        result['n_records'] = len(records)
        return result
st.register(LoopData)
st.get_df(run_id, 'looped_data')

strax.CutPlugin

class CutData(strax.CutPlugin):
    """
    Create a boolean array if an entry passes a given cut,
        in this case if the peak has a positive area
    """
    depends_on = 'peaks'
    provides = 'cut_data'

    # Use cut_by() instead of compute() to generate a boolean array
    def cut_by(self, peaks):
        return peaks['area']>0

st.register(CutData)
st.get_df(run_id, 'cut_data')

strax.MergeOnlyPlugin

class MergeData(strax.MergeOnlyPlugin):
    """Merge datatypes of the same datakind into a single datatype"""
    depends_on = ('peaks', 'cut_data')
    provides = 'merged_data'

    # You only need specify the dependencies, those are merged.

st.register(MergeData)
st.get_array(run_id, 'merged_data')

Plugin inheritance

It is possible to inherit the compute() method of an already existing plugin with another plugin. We call these types of plugins child plugins. Child plugins are recognized by strax when the child_plugin attribute of the plugin is set to True. Below you can find a simple example of a child plugin with its parent plugin:

@strax.takes_config(
strax.Option('by_child_overwrite_option', type=int, default=5,
             help="Option we will overwrite in our child plugin"),
strax.Option('parent_unique_option', type=int, default=2,
             help='Option which is not touched by the child and '
                  'therefore the same for parent and child'),
                  )
class ParentPlugin(strax.Plugin):
    provides = 'odd_peaks'
    depends_on = 'peaks'
    __version__ = '0.0.1'
    dtype = parent_dtype

    def compute(self, peaks):
        peaks['area'] *= self.config['parent_unique_option']
        peaks['time'] *= self.config['by_child_overwrite_option']
        return res


# Child:
@strax.takes_config(
    strax.Option('by_child_overwrite_option_child',
                 default=3,
                 child_option=True,
                 parent_option_name='by_child_overwrite_option',
                 help="Option we will overwrite in our child plugin"),
    strax.Option('option_unique_child',
                  default=10,
                  help="Option we will overwrite in our child plugin"),
)
class ChildPlugin(ParentPlugin):
    provides = 'odd_peaks_child'
    depends_on = 'peaks'
    __version__ = '0.0.1'
    child_plugin = True

    def compute(self, peaks):
        res = super().compute(peaks)
        res['width'] = self.config['option_unique_child']
        return res

The super().compute() statement in the compute method of ChildPlugin allows us to execute the code of the parent’s compute method without duplicating it. Additionally, if needed, we can extend the code with some for the child-plugin unique computation steps.

To allow for the child plugin to have different settings then its parent (e.g. 'by_child_overwrite_option' in self.config['by_child_overwrite_option'] of the parent’s compute method), we have to use specific child option. These options will be recognized by strax and overwrite the config values of the parent parameter during the initialization of the child-plugin. Hence, these changes only affect the child, but not the parent.

An option can be flagged as a child option if the corresponding option attribute is set child_option=True. Further, the option name which should be overwritten must be specified via the option attribute parent_option_name.

The lineage of a child plugin contains in addition to its options the name and version of the parent plugin.

Options and defaults

You can specify options using the strax.takes_config decorator and the strax.Option objects. See any plugin source code for example.

There is a single configuration dictionary in a strax context, shared by all plugins. Be judicious in how you name your options to avoid clashes. “Threshold” is probably a bad name, “peak_min_channels” is better.

If an option is not set in the context’s configuration, strax will use its default value if the option defines one. If the plugin specifies no default, you will get a RuntimeError when attempting to start processing.

Even when a default is used, the value used will be stored in the metadata of the produced data. Thus, if you only change a default value for a plugin’s option, you do NOT have to increment the plugin version to ensure data is rebuilt when needed.

You can specify defaults in several ways:

  • default: Use the given value as default.

  • default_factory: Call the given function (with no arguments) to produce a default. Use for mutable values such as lists.

  • default_per_run: Specify a list of 2-tuples: (start_run, default). Here start_run is a numerized run name (e.g 170118_1327; note the underscore is valid in integers since python 3.6) and default the option that applies from that run onwards.

  • The strax_defaults dictionary in the run metadata. This overrides any defaults specified in the plugin code, but take care – if you change a value here, there will be no record anywhere of what value was used previously, so you cannot reproduce your results anymore!

Example

@strax.takes_config(
    strax.Option('config_name', type=int, default=1)
)
class DummyPlugin(strax.Plugin):
    depends_on = ('records', )
    provides = ('dummy_data')
    ...

    def compute(self, records):
        value = self.config_name
        # or
        value = self.config['config_name']
        ...

Descriptor Options

An alternative way to define plugin configuration is with the Config class as follows:

class DummyPlugin(strax.Plugin):
    depends_on = ('records', )

    config_name = Config(type=int, default=1)

    def compute(self, records):
        # configs should be accessed as attributes for runtime evaluation
        value = self.config_name*2

Some projects require more flexible plugin configuration that is evaluated at runtime. For these cases its recommended to subclass the Config class and overwrite the fetch(self, plugin) method to compute the value from the current plugin state at runtime when the attribute is accessed.

A few tips when implementing such workflows:
  • You should limit yourself to a single syntax for your plugin configuration. Mixing multiple approaches in a single project can increase the complexity and mental burdon on analysts who will need to remember multiple configuratoin syntaxes and which one is used in each case.

  • Remember that whatever syntax is used, strax assumes the same set of user configs will always create the same data. When defining complex lookups for the plugin configuration at runtime it is up to you to keep this implicit promise.

  • When defining time-consuming lookups, it is recommended to implement a caching mechanism. Configuration value may be accessed many times during processing and expensive runtime computation of these values can reduce performance significantly.

Reference implementations

Lookup by key

import strax
import typing as ty


class LookupConfig(Config):
    mapping: ty.Mapping
    keys = ty.Iterable

    def __init__(self, mapping: ty.Mapping, keys=('name', 'value'), **kwargs):
        super().__init__(**kwargs)
        self.mapping = mapping
        keys = strax.to_str_tuple(keys)
        self.keys = keys

    def fetch(self, plugin):
        key = []
        for k in self.keys:
            if k=='name':
                v = self.name
            elif k=='value':
                v = plugin.config[self.name]
            elif isinstance(k, str) and hasattr(plugin, k):
                v = getattr(plugin, k)
            else:
                v = k
            key.append(v)
        if len(key)==1:
            key = key[0]
        else:
            key = tuple(key)
        return self.mapping[key]

Find config from a list of values stores.

import strax
import typing as ty


class RemoteConfig(Config):
    storages: ty.Iterable
    name_key: str
    value_key: str

    def __init__(self, storages, name_key='name', value_key='value', **kwargs):
        super().__init__(**kwargs)
        self.storages = storages
        self.name_key = name_key
        self.value_key = value_key

    def fetch(self, plugin, **kwargs):
        kwargs[self.name_key] = self.name
        kwargs[self.value_key] = plugin.config[self.name]
        for store in self.storages:
            v = store.get_value(**kwargs)
            if v is not None:
                break
        else:
            raise KeyError(f'A value for the {self.name} config has not been \
                            found in any of its registered storages.')
        return v

Fetch config value from a callable

import strax
import typing as ty


class CallableConfig(Config):
    func: ty.Callable

    def __init__(self, func: ty.Callable, args=(), kwargs: dict=None, **extra_kwargs):
        if not isinstance(func, ty.Callable):
            raise TypeError('func parameter must be of type Callable.')
        self.func = func
        self.args = args
        if kwargs is None:
            kwargs = {}
        self.kwargs = kwargs
        super().__init__(**extra_kwargs)

    def fetch(self, plugin):
        args = []
        for arg in self.args:
            if isinstance(arg, str) and hasattr(plugin, arg):
                args.append(getattr(plugin, arg))
            else:
                args.append(arg)

        kwargs = {}
        for k,v in self.kwargs.items():
            if isinstance(v, str) and hasattr(plugin, v):
                kwargs[k] = getattr(plugin, v)
            else:
                kwargs[k] = v

        value = super().fetch(plugin)
        value = self.func(value, *args, **kwargs)
        return value

URL style configuration (used in straxen )

import strax
from numpy import isin
import inspect
from urllib.parse import urlparse, parse_qs
from ast import literal_eval
from functools import lru_cache

def parse_val(val):
    try:
        val = literal_eval(val)
    except:
        pass
    return val

class URLConfig(strax.Config):
    """Dispatch on URL protocol.
    unrecognized protocol returns identity
    inspired by dasks Dispatch and fsspec fs protocols.
    """

    _LOOKUP = {}
    SCHEME_SEP = '://'
    QUERY_SEP = '?'
    PLUGIN_ATTR_PREFIX = 'plugin.'

    def __init__(self, cache=False, **kwargs):
        self.final_type = OMITTED
        super().__init__(**kwargs)
        # Ensure backwards compatibility with Option validation
        # type of the config value can be different from the fetched value.
        if self.type is not OMITTED:
            self.final_type = self.type
            self.type = OMITTED # do not enforce type on the URL
        if cache:
            maxsize = cache if isinstance(cache, int) else None
            self.dispatch = lru_cache(maxsize)(self.dispatch)

    @classmethod
    def register(cls, protocol, func=None):
        """Register dispatch of `func` on urls
        starting with protocol name `protocol` """

        def wrapper(func):
            if isinstance(protocol, tuple):
                for t in protocol:
                    cls.register(t, func)
                return func

            if not isinstance(protocol, str):
                raise ValueError('Protocol name must be a string.')

            if protocol in cls._LOOKUP:
                raise ValueError(f'Protocol with name {protocol} already registered.')
            cls._LOOKUP[protocol] = func
            return func
        return wrapper(func) if func is not None else wrapper

    def dispatch(self, url, *args, **kwargs):
        """
        Call the corresponding method based on protocol in url.
        chained protocols will be called with the result of the
        previous protocol as input
        overrides are passed to any protocol whos signature can accept them.
        """

        # seperate the protocol name from the path
        protocol, _, path =  url.partition(self.SCHEME_SEP)

        # find the corresponding protocol method
        meth = self._LOOKUP.get(protocol, None)
        if meth is None:
            # unrecongnized protocol
            # evaluate as string-literal
            return url

        if self.SCHEME_SEP in path:
            # url contains a nested protocol
            # first call sub-protocol
            arg = self.dispatch(path, **kwargs)
        else:
            # we are at the end of the chain
            # method should be called with path as argument
            arg = path

        # filter kwargs to pass only the kwargs
        #  accepted by the method.
        kwargs = self.filter_kwargs(meth, kwargs)

        return meth(arg, *args, **kwargs)

    def split_url_kwargs(self, url):
        """split a url into path and kwargs
        """
        path, _, _ = url.rpartition(self.QUERY_SEP)
        kwargs = {}
        for k,v in parse_qs(urlparse(url).query).items():
            # values of query arguments are evaluated as lists
            # split logic depending on length
            n = len(v)
            if not n:
                kwargs[k] = None
            elif n==1:
                kwargs[k] = parse_val(v[0])
            else:
                kwargs[k] = map(parse_val, v)
        return path, kwargs

    @staticmethod
    def filter_kwargs(func, kwargs):
        """Filter out keyword arguments that
            are not in the call signature of func
            and return filtered kwargs dictionary
        """
        params = inspect.signature(func).parameters
        if any([str(p).startswith('**') for p in params.values()]):
            # if func accepts wildcard kwargs, return all
            return kwargs
        return {k:v for k,v in kwargs.items() if k in params}

    def fetch(self, plugin):
        # first fetch the user-set value
        # from the config dictionary
        url = super().fetch(plugin)

        if not isinstance(url, str):
            # if the value is not a string it is evaluated
            # as a literal config and returned as is.
            return url

        if self.SCHEME_SEP not in url:
            # no protocol in the url so its evaluated
            # as string-literal config and returned as is
            return url

        # sperate out the query part of the URL which
        # will become the method kwargs
        url, url_kwargs = self.split_url_kwargs(url)

        kwargs = {}
        for k,v in url_kwargs.items():
            if isinstance(v, str) and v.startswith(self.PLUGIN_ATTR_PREFIX):
                # kwarg is referring to a plugin attribute, lets fetch it
                kwargs[k] = getattr(plugin, v[len(self.PLUGIN_ATTR_PREFIX):], v)
            else:
                # kwarg is a literal, add its value to the kwargs dict
                kwargs[k] = v

        return self.dispatch(url, **kwargs)

Strax data model

Data type and kind

All data lives in data types, such as raw_records or peak_basics. Each of these has a fixed numpy datatype.

If a single row of two data types refers to the same physical / logical thing, such as an event or a peak, we say those data types have the same data_kind.

The Laws of Chunking

You shall obey them.

  1. Each data row corresponds to a time interval. Time and (endtime or (dt and length)) are mandatory fields in all datatypes.

  2. Strax handles data in chunks. A chunk is also an interval (containing rows of data which are individually intervals).

  3. Suppose you have a chunk of some datatype reaching from [t0, t1), then

    1. It contains all and only data that starts >= t0 or ends <= t1;

    2. All data outside the chunk ends <= t0, or starts >= t1. (Remember intervals are half-open; the boundary cases are not ambiguous.)

    3. In particular, every data row lies completely in one chunk. No data whatsoever lies partially in more than one chunk. This means chunks cannot be split at arbitrary times.

  4. Zero-duration data rows are not allowed. Zero-duration chunks are allowed, but they cannot contain data.

Incoming data

From the perspective of a plugin, all incoming data is time-synchronized and merged by kind. Specifically:

  • Data of the same kind is merged into a single array. If you depend on events, peaks and peak_basics, you will get two arrays: events and peaks. The second will be the merged array of peaks and peak_basics.

  • Data of different kinds are synchronized by time. Strax will fetch a chunks of the first kind (events), then fetch as much as needed from the second kind (peaks) until you have all peaks that end before or at exactly the same time as the last event.

This example is a bit odd: when loading data of multiple kinds that are contained in each other, e.g. events and peaks, you very often want to use a LoopPlugin rather than a straight-up Plugin.

Outgoing data

Plugins can chunk their output as they wish, including withholding some data until the next chunk is sent out. Of course this requires keeping state, which means you cannot parallelize: see the chunk boundary handling section later in this documentation.

Savers, too, are free to chunk their data as they like; for example, to create files of convenient sizes. This affects the chunks you get when loading or reprocessing data. If you don’t want this, e.g. if the next plugin in line assumes a particular kind of chunking you want to preserve, set the attribute rechunk_on_save = False.

In cases where rechunking is permitted, a plugin can also specify a desired minimum uncompressed chunk size in bytes via the chunk_target_size attribute, with 200 MB as the default value. Chunks are concatenated until this desired size is exceeded, or all chunks have been combined, whereupon the data is compressed and written to disk.

Sorted output requirement

Strax requires that all output is sorted by time inside chunks.

Additionally, most or all plugins will assume that incoming data is time-ordered between chunks. That is, a subsequent chunk should not contain any data that starts before an item from a previous chunk ends. Strax data must be either consist of disjoint things, or if there are overlaps, chunk boundaries must fall in places where gaps exist.

It would have been much harder to code an algorithm if you do not know when you have seen all input before a certain time. Essentially you would have to wait until the end of the run before you can process any data, which goes against the idea of processing your data as a stream.

If your plugin removes or adds items from the original incoming array, it must output a different data kind. For example, during the initial data reduction steps, we remove items from ‘raw_records’ to make a new data kind ‘records’. Here we change data kind, even though the fields in the output data type are identical to the fields in the input data type.

Superruns

Overview and motivation

A superrun is a run defined by (parts of) other runs, which are called ‘subruns’. Superrun names start with an underscore. Regular run names cannot start with an underscore.

Strax builds data for a superrun by loading (and potentially building) each of the subruns, then slicing and concatenating them as necessary. In addition superruns can be stored to disk as a rechunked representation of its subruns. This currently only works for static lineages e.g. without default-by-run_id settings. Stored superruns have the advantage that loading data is much faster and different data_types of the same kind can be combined.

Superruns are useful to track common groupings of data. For example:

  • ‘Minimum bias’ runs, consisting only of low-energy events, events passing some cuts, DM-candidates, PMT flashes, or other thing of interest. The low-level data of these is much smaller than that of all the full runs, and can be brought to a local analysis facility, enabling on-site low-level waveform watching.

  • Grouping similar runs. For example, shifters might group good runs from a week of calibration data with some source under a single name, e.g. _kr_feb2019.

Superruns can be built from other superruns. Thus, _sr1_v0.2 could be built from _background_january, _background_february, etc.

Defining superruns and making data:

Use the define_run context method to define a new superrun. Currently it is only supported to define superruns from a list of run_ids:

st.define_run('_awesome_superrun', ['123', '124'])

From a dictionary of time range tuples. The times must be 64-bit integer UTC timestamps since the unix epoch:

st.define_run('_awesome_superrun', {
    '123': [(start, stop), (start, stop), ...],
    '124': [(start, stop), (start, stop), ...],})

From a dataframe (or record array) with strax data:

st.define_run('_awesome_superrun', events_df)
st.define_run('_awesome_superrun', events_df, from_run='123')

In this case, the run will be made of the time ranges that correspond exactly to events_df. If `events_df already has a run_id field (e.g. because it consists of data from multiple runs), you do not need to pass from_run, it will be read off from the data.

It is up to the storage frontend to process your request for defining a run. As a normal user, you generally only have permissions to create a new run in the DataDirectory (local files) storage frontend, where runs are recorded in json files.

Making superrun data is as easy as creating any other data. Once a superrun is defined we can make for example event_info via:

st.make('_awesome_superrun', 'event_info)

For bookkeeping each stored superrun chunk contains information of its constituents in a field called subruns e.g.:

{'0': {'end': 10, 'start': 0},
 '1': {'end': 30, 'start': 20},
 '2': {'end': 50, 'start': 40}}

Where the keys represent the subrun_ids and start/end the start and end of the corresponding first/last chunk included in the superrun chunk. The same information can also be found in the metadata of the individual chunks:

{'chunk_i': 0,
 'end': 50,
 'filename': 'records-j3nd2fjbiq-000000',
 'filesize': 2343,
 'first_endtime': 1,
 'first_time': 0,
 'last_endtime': 50,
 'last_time': 49,
 'n': 300,
 'nbytes': 77100,
 'run_id': '_superrun_test',
 'start': 0,
 'subruns': {'0': {'end': 10, 'start': 0},
             '1': {'end': 30, 'start': 20},
             '2': {'end': 50, 'start': 40}}}

After creating data we can load the superrun as we are used to and combine it with other data_types of the same kind too.

To work more easily with superruns all chunks have also the properties chunk.is_superun as well as chunk.first_subrun and chunk.last_subrun.

If you wish to make/store a superrun you have to specify the context option:

st.set_context_config({'write_superruns': True})

Superruns follow the same saving rules (SaveWhen.TARGET, SaveWhen.EXPLICIT or SaveWhen.ALWAYS) as regular runs.

How superruns work

As mentioned above, strax builds data for superruns by slicing data of the subruns. Thus, peaks from a superrun come from the peaks of the subruns, which are built from their own records as usual.

Defaults for settings can be runid-dependent in strax, although this is not preferred any longer. If an option specifies default_per_run=[(run, setting), (run2, setting2)], then runs in between run and run2 will use setting, and runs after run2 setting2. Superruns store a deterministic hash of this default_per_run specification for tracking purposes.

You cannot currently go directly from the superrun’s records to the superrun’s peaks. This would be tricky to implement, since (1) (2) even with the same settings, many plugins choose to do something different depending on the run_id. For example, in straxen the gain model is specified by a file, but which gains from the file are actually used is dependent on the runid.

Thus, superruns won’t help build data faster, but they will speed up loading data after it has been built. This is important, because strax’ overhead for loading a run is larger than hax, due to its version and option tracking (this is only true if per-run-default options are allowed).

Out of core computation

Overview and motivation

Many times analyses involve performing some computation not implemented by a plugin (e.g. plotting) that require loading more data than can fit into memory, these type of tasks are commonly reffered to as out-of-core computations. Out-of-core algorithms usually involve a few repeating steps:

  1. chunk the dataset into managable sizes

  2. load the data chunk by chunk

  3. perform some computation on each chunk

  4. save a summary of the results for each chunk

  5. perform some combination of the per-chunk results into a final result.

While it is of course possible to implement these operations yourself, it can be tedious and repetative and the code becomes very rigid to the specific calculations being performed. A better approach is to use abstractions of commonly performed operations that use out-of-core algorithms under the hood to get the same result as if the operations were performed on the entire dataset. Code written using these abstractions can then run both on in-memory datasets as well as out-of-core datasets alike. More importantly the implmentations of these algorithms can be written once and packaged to then be used by all.

Data chunking

The zarr package provides an abstraction of the data-access api of numpy arrays for chunked and compressed data stored in memory or disk. zarr provides an array abstraction with identical behavior to a numpy array when accessing data but where the underlyign data is actually a collection of compressed (optional) chunks. the strax context provides a convenience method for loading data directly into zarr arrays.

import strax

context = strax.Context(**CONTEXT_KWARGS)

# you can pass the same arguments you pass to context.get_array()
zgrp = context.get_zarr(RUN_IDs, DATA_TYPES, **GET_ARRAY_KWARGS)

# the zarr group contains multiple arrays, one for each data type
z = zgrp.data_type

# individual arrays are also accessible via the __getitem__ interface
z = zgrp['data_type']

# numpy-like data access, abstracting away the underlying
# data reading which may include readin multiple chunks from disk/memory
# and decompression then concatenation to return an in memory numpy array
z[:100]

Data processing

The dask package provides abstractions for most of the numpy and pandas apis. The dask.Array and dask.DataFrame objects implement their respective apis using fully distributed algorithms, only loading a fraction of the total data into memory at any given moment for a given computing partition (thread/process/HPC-job).

import dask.array as da

# easily convert to dask.Array abstraction for processing
darr = da.from_zarr(z)

# its recommended to rechunk to sizes more appropriate for processing
# see dask documentation for details
darr.rechunk(CHUNK_SIZE)

# you can also convert the dask.Array abstraction
# to a dask.DataFrame abstraction if you need the pandas api
ddf = darr.to_dask_dataframe()

Recompressing & moving data

There are two options for recompressing data:
  • via the context context.copy_to_frontend()

  • via a dedicated script rechunker that only works for filesystem backends and works outside the context.

In order to recompress data with another compression algorithm the context.copy_to_frontend() function can be used. The function works on a per run_id-, per datatype- basis. In the example below, peaks data is copied to a second frontend.

import strax
import os
# Naturally, these plugins (Records and Peaks) only serve as examples
# and are best replaced by a fully constructed context
from strax.testutils import Records, Peaks, run_id

# Initialize context (st):
st = strax.Context(register=[Records, Peaks])

# Initialize frontends
storage_frontend_A = strax.DataDirectory('./folder_A')
storage_frontend_B = strax.DataDirectory('./folder_B',
                                      readonly=True)
st.storage = [storage_frontend_A,
              storage_frontend_B]

# In this example, we will only consider records
target = "records"

print(f'Are records stored?\n{st.is_stored(run_id, target)}')

# Make the data (stores to every frontend available)
st.get_array(run_id, 'records')

for sf in st.storage:
    print(f'{target} stored in\n\t{sf}?\n\t{st._is_stored_in_sf(run_id, target, sf)}')

Which prints:

Are records stored?
False
records stored in
    strax.storage.files.DataDirectory, path: ./folder_A?
    True
records stored in
    strax.storage.files.DataDirectory, readonly: True, path: ./folder_B?
    False

Copy

In the example above the storage_frontend_B was readonly, therefore, when creating records, no is data stored there. Below, we will copy the data from storage_frontend_A to storage_frontend_B.

# First set the storage_frontend_B for readonly=False such that we can copy
# data there
storage_frontend_B.readonly = False

# In the st.storage-list, storage_frontend_B is index 1
index_frontend_B = 1
st.copy_to_frontend(run_id, target,
                    target_frontend_id=index_frontend_B)

for sf in [storage_frontend_A,  storage_frontend_B]:
    print(f'{target} stored in\n\t{sf}?\n\t{st._is_stored_in_sf(run_id, target, sf)}')

Which prints the following (so we can see that the copy to folder_B was successful.

records stored in
    strax.storage.files.DataDirectory, path: ./folder_A?
    True
records stored in
    strax.storage.files.DataDirectory, path: ./folder_B?
    True

Copy and recompress

Now, with a third storage frontend, we will recompress the data to reduce the size on disk.

# Recompression with a different compressor
# See strax.io.COMPRESSORS for more compressors
target_compressor = 'bz2'

# Add the extra storage frontend
index_frontend_C = 2
storage_frontend_C = strax.DataDirectory('./folder_C')
st.storage.append(storage_frontend_C)

# Copy and recompress
st.copy_to_frontend(run_id, target,
                    target_frontend_id=index_frontend_C,
                    target_compressor=target_compressor)

for sf in st.storage:
    first_cunk = os.path.join(sf.path,
                             '0-records-sqcyyhsfpv',
                             'records-sqcyyhsfpv-000000')
    print(f'In {sf.path}, the first chunk is {os.path.getsize(first_cunk)} kB')

Which outputs:

In ./folder_A, the first chunk is 275 kB
In ./folder_B, the first chunk is 275 kB
In ./folder_C, the first chunk is 65 kB

From the output we can see that the size of the first chunk of folder_C, the data much smaller than in folder_A/folder_B. This comes from the fact that bz2 compresses the data much more than the default compressor blosc.

How does this work?

Strax knows from the metadata stored with the data with witch compressor the data was written. It is possible to use a different compressor when re-writing the data to disk (as done for strax knows from the metadata stored with the data with witch compressor the data was written. It is possible to use a different compressor when re-writing the data to disk (as done folder_C in the example above).

As such, for further use, it does not matter if the data is coming from either of folders folder_A-folder_C as the metadata will tell strax which compressor to use. Different compressors may have different performance for loading/writing data.

Rechunker script

From strax v1.2.2 onwards, a rechunker script is automatically installed with strax. It can be used to re-write data in the FileSystem backend.

For example:

rechunker --source 009104-raw_records_aqmon-rfzvpzj4mf --compressor zstd

will output:

Will write to /tmp/tmpoj0xpr78 and make sub-folder 009104-raw_records_aqmon-rfzvpzj4mf
Rechunking 009104-raw_records_aqmon-rfzvpzj4mf to /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf
move /tmp/tmpoj0xpr78/009104-raw_records_aqmon-rfzvpzj4mf to 009104-raw_records_aqmon-rfzvpzj4mf
Re-compressed 009104-raw_records_aqmon-rfzvpzj4mf
        backend_key             009104-raw_records_aqmon-rfzvpzj4mf
        load_time               0.4088103771209717
        write_time              0.07699322700500488
        uncompressed_mb         1.178276
        source_compressor       zstd
        dest_compressor         zstd
        source_mb               0.349217
        dest_mb                 0.349218

This script can easily be used to profile different compressors:

for COMPRESSOR in zstd bz2 lz4 blosc zstd; \
    do echo $COMPRESSOR; \
    rechunker \
        --source 009104-raw_records-rfzvpzj4mf \
        --write_stats_to test.csv \
        --compressor $COMPRESSOR; \
    done

We can check the output in python using:

>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df['read_mbs'] = df['uncompressed_mb']/df['load_time']
>>> df['write_mbs'] = df['uncompressed_mb']/df['write_time']
>>> print(df[['source_compressor', 'read_mbs', 'dest_compressor', 'write_mbs']].to_string())
    source_compressor    read_mbs dest_compressor   write_mbs
  0              zstd  313.922890            zstd  298.429123
  1              zstd  284.530054             bz2    8.932259
  2               bz2   20.289876             lz4  228.932498
  3               lz4  372.491150           blosc  433.494794
  4             blosc  725.154966            zstd  215.765177

Fuzzy for functionality

Since strax tracks lineages, updates to low level plugins may change the availability of high level data. When a low level plugin is changed (for example the version of a plugin is incremented), strax will recognize that the data corresponding to the plugin whereof the version is changed is not stored (since only the previous version is stored). This safeguards that the data that the user is loading is always consistent with the context.

This functionality can partially be disabled using fuzzy-for settings. This should only be done temporarily or for quick checks as strax is not anymore checking if the entire ancestry of the requested and the delivered data is consistent.

When to use

There are situations where the above robustness of the context is not what the user wants. Such situations can be if a user is developing a new plugin on the master branch, when the master branch has some changes in the lower level plugins. The user in this case cannot easily check if the plugin works on data, as no data is available in the context of the master branch. In this case, the user might want to tell the context to just load whatever data is available, ignoring changes in a specific plugin. Another example would be if a dataset was simulated with specific instructions and a user wants to quickly look at the data in the simulated dataset without having to manually check which context was used for simulating this data (of course, the best way to solve this would be to open the metadata that is stored with the simulation files and construct the context from those options).

How to use

There are two ways of ignoring the lineage. Both are set in the context config (see context.context_config):

  • fuzzy_for_options a tuple of options to specify that each option with a name in the tuple can be ignored

  • fuzzy_for a tuple of data-types to ignore.

In the example below, we will use setting the fuzzy_for option. We will use the online context from straxen to illustrate how the options are set.

import straxen
# Use a context that can load data from a datatype 'peak-basics'
st = straxen.contexts.xenonnt_online()
run_id, target = '022880', 'peak_basics'

# Check if the data is stored for this run and datatype
print(f'{run_id} {target} is stored: {st.is_stored(run_id, target)}')

# Now let's mimic the situation wherein the version of the plugin that provides
# peak basics has changed (it has a different version). We will do so by changing
# the version of the plugin below
PeakBasics = st._plugin_class_registry[target]
PeakBasics.__version__ = 'does_not_exist'
print(f'{run_id} {target} is stored: {st.is_stored(run_id, target)}')

# The print statement will tell us the data is not stored. To load the data
# from the default version of PeakBasics we will use the fuzzy-for option:
st.context_config['fuzzy_for'] = (target,)
print(f'{run_id} {target} is stored: {st.is_stored(run_id, target)}')

The block above prints:

022880 peak_basics is stored: True
022880 peak_basics is stored: False
022880 peak_basics is stored: True

Is it advisable / safe to use?

For running production analyses, one should never base results on a context where fuzzy-ness is enabled.

For quick tests, it is save to use. If new data is made based on a fuzzy context, this is not stored to prevent the creation of data-files with unreproducible results.

Additionally (depending on the StorageFrontend), loading data with fuzzy options will be generally much slower. For example, the most commonly used StorageFrontend, the DataDirectory scans all folders within it’s parent directory and filters the meta-data in search for a folder with a lineage compatible with the fuzzy for options.

Pipeline

This describes how strax chains computations from multiple plugins together in a pipeline.

In python, pipeline components can offer two semantics. In pull-semantics, usually implemented with generators, somebody calls next to pull output, and StopIteration signals nothing more is coming. In push-semantics, usually implemented with coroutines, input is pushed in with a send method. If cleanup is required, a close method must be invoked. These can be chained together to make pipelines. Either can also be implemented with custom classes instead of standard python generators/coroutines.

Strax primarily uses pull-semantics:
  • Loaders are plain iterators;

  • Plugins iterate over inputs, and expect their results to be iterated over;

  • Savers use both semantics. Usually they iterate over their input. However, during multiprocessing, savers have their inputs sent into them, and must be explicitly closed.

Mailboxes

Strax could not be built by just chaining iterators or coroutines.
  • Pipelines can have multiple inputs and outputs, which generally come at different speeds; we cannot simply push on or pull from one endpoint.

  • For parallellization, we must run the same computation on several chunks at a time, then gather the results.

The mailbox class provides the additional machinery that handles this. During processing, each data type has a mailbox. A data type’s mailbox iterates over the results of the plugin or loader that produces it. It also provides an iterator to each plugin that needs it as an input.

The input iterators given to the plugins must be somewhat magical. If we call next, but the input is not yet available, we must pause (and do something else) until it is. To enable this suspending, strax runs each plugin in a separate thread. (We could use a framework like asyncio instead if we wanted to run our own scheduler, now we just use the OS’s scheduler.)

The threads in strax are thus motivated by concurrency, not parallelism. As a bonus, they do allow different plugins to run simultaneously. The benefit is limited by python’s global interpreter lock, but this does not affect IO or computations in numpy and numba.

Exception propgagation

TODO: document MailboxKilled etc.

Parallelization

Strax can process data at 50-100 raw-MB /sec single core, which is not enough for live online processing at high DAQ rates. We must thus parallelize at least some of the signal processing.

Not all plugins can be parallelized. For example, we cannot assign event numbers (0, 1, 2, …) in parallel if we want unique numbers that increment without gaps. We also cannot save to a single file in parallel.

Multithreading

To get parallelization, plugins can defer computations to a pool of threads or processes. If they do, they yield futures to the output mailbox instead of the actual results (numpy arrays). The mailbox awaits the futures and ensures each consumer gets the results in order.

A plugin indicates to strax it is paralellizable by setting its parallel attribute to True. This usually causes strax to outsource computations to a pool of threads. Every chunk will result in a call to the thread pool. This has little overhead, though the performance gain is limited by the global interpreter lock. If the computation is in pure python, there is no benefit; however, numpy and numba code can benefit significantly (until the pure-python overhead around it becomes the limiting factor, at high numbers of cores).

Loaders use multithreading by default, since their work is eminently parallelizable: they just load some data and decompress it (using low-level compressors that happily release the GIL). Savers that rechunk the data (e.g. to achieve more sysadmin-friendly filesizes) are not parallelizable. Savers that do not rechunk use multithreading just like loaders.

Multiprocessing

Strax can also use multiprocessing for parallelization. This is useful to free pure-python computations from the shackles of the GIL. Low-level plugins deal with a massive data flow, so parallelizing theircomputations in separate processes is very inefficient due to data transfer overhead. Thread parallelization works fine (since the algorithms are implemented in numba) until you reach ~10 cores, when the GIL becomes binding due to pure-python overhead.

You can set the parallel attribute to process, to suggest strax should use a process pool instead of a thread pool. This is often not a good idea: multiprocessing incurs overhead from (1) forking the strax process and (2) pickling and unpickling the results in the child and parent processes. Strax will still not use multiprocessing at all unless you:
  • Set the allow_multiprocess context option to True,

  • Set max_workers to a value higher than 1 in the get_xxx call.

During multiprocessing, computations of chunks from parallel='process' plugins will be outsourced to a process pool. Additionally, to avoid data transfer overhead, strax attempts to gather as many savers, dependencies, and savers of dependencies of a parallel='process' plugin to “inline” them: their computations are set to happen immedately after the main plugin’s computation in the same process. This is achieved behind the scenes by replacing the plugin with a container-like plugin called ParallelSourcePlugin. Only parallelizable plugins and savers that do not rechunk will be inlined.

Since savers can become inlined, they should work even if they are forked. That implies they cannot keep state, and must store metadata for each chunk in their backend as it arrives. For example, the FileStore backend produces a json file with metadata for each chunk. When the saver is closed, all the json files are read in and concatenated. A saver that can’t do this should set allow_fork = False.

Multi-run parallelization: a different problem

Paralellizing quick (re)processing of many runs is a different problem altogether. It is easier in one way: since runs are assumed to be independent, we can simply process each run on a single core, and use our multiple cores to process multiple runs. However, it is harder in another: the large volume of desired result data may exceed available RAM. We can use Dask dataframes for this. Probably we can just copy/reuse the code in hax.

Chunk boundary handling

Many algorithms need to look back and ahead in the data. For example, we want to group nearby PMT pulses together into peaks. Or, to ensure non-overlapping events, we want to group triggers with others that occur just before or ahead.

During online processing, however, not all data is available. How can you look ahead if the data has not been acquired? Even during offline processing, you may not be able to keep all data of a given type for a run into RAM.

Overlap window plugins

Strax includes the OverlapWindowPlugin to deal with this case. To use it, specify a window size: a maximum duration which the algorithm needs to look back or ahead. Then write your algorithm as if there are no chunk breaks – everything will be taken care of behind the scenes.

The illustration below shows how OverlapWindowPlugin works. Imagine this is a event finder plugin, which finds events (green and red dashes) in a stream of peaks (continuous blue line).

_images/overlap_window.jpg
  • Outputs too close to the end of a chunk are discarded, except for the last chunk. When a chunk arrives, it is generally not known to be the last, so we keep this data around internally and emit it once we get word there will be no more chunks.

  • Inputs close to the end of a chunk (pink region) are cached, and added to the input for the next chunk. Note we must cache two windows of input: to interpret data at (end - one window) we need all data from (end - two windows) to (end).

  • For the next chunk, outputs fully in the region between (end - window) and (end) of the previous chunk are discarded. These are invalid, and moreover, we sent out the valid ouputs for that range during the previous chunks.

Note from the figure that outputs straddling (end - window) are initially discarded; they are recomputed during the next chunk.

If the endtimes of two objects are separated by more than a window size, they must be guaranteed to not influence each other. If your algorithm does not have this guarantee, you cannot use OverlapWindowPlugin and must implement a custom solution. Make sure your window is large enough so this guarantee holds even if the objects themselves have a relevant length.

Chunk breaks and the DAQ reader

Low-level datastreams are too large to be routed through a single core. Instead, each CPU sees only a chunk of data. However, the OverlapWindowPlugin will not work, because it keeps has state (the cached input, and the temporarily cached output), it cannot be parallelized.

For the low-level datastream, we take a different approach

TODO: document pre- and post-chunk stuff here.

Storage

Overview

Players in strax’s storage system take on one of three roles:
  • StorageFrontend: Find data locations, and communicate this to one or more StorageBackend instances;

  • StorageBackend: load pieces of data, and create instances of Saver;

  • Saver: save pieces of data to a specific location.

As an example, a StorageFrontend could talk to a database that tracks which data is stored where. A StorageBackend then retrieves data from local disks, while another might retrieve it remotely using SSH or other transfer systems. The front-end decides which backend is appropriate for a given request. Finally, a Savers guides the process of writing a particular piece of data to disk or databases (potentially from multiple cores), compressing and rechunking as needed.

To implement a new way of storing and/or tracking data, you must implement (subclass) all or some of these classes. This means subclassing them and overriding a few specific methods (called ‘abstract methods’ because they raise NotImplementedError if they are not overridden).

Keys

In strax, a piece of data is identified by a DataKey, consisting of three components:
  • The run id

  • The data type

  • The complete lineage of the data. This includes, for the data type itself, and all types it depends on (and their dependencies, and so forth): * The plugin class name that produced the data; * The version string of the plugin; * The values of all configuration options the plugin took (whether they were explicitly specified or left as default).

When you ask for data using Context.get_xxx, the context will produce a key like this, and pass it to the StorageFrontend. It then looks for a filename or database collection name that matches this key – something a StorageBackend understands. which is therefore generically called a backend key. The matching between DataKey and backend key can be done very strictly, or more loosely, depending on how the context is configured. This way you can choose to be completely sure about what data you get, or be more flexible and load whatever is available. TODO: ref context documentation.

Run-level metadata

Metadata can be associated with a run, but no particular data type. The StorageFrontend must take care of saving and loading these.

Such run-level metadata can be crucial in providing run-dependent default setting for configuration options, for example, calibrated quantities necessary for data processing (e.g. electron lifetime and PMT gains).

## Contribution guidelines

You’re welcome to contribute to strax!

Currently, many features are still in significant flux, and the documentation is still very basic. Until more people start getting involved in development, we’re probably not even following our own advice below…

### Please fork Please work in a fork, then submit pull requests. Only maintainers sometimes work in branches if there is a good reason for it.

### No large files Avoid committing large (> 100 kB) files. We’d like to keep the repository no more than a few MB.

For example, do not commit jupyter notebooks with high-resolution plots (clear the output first), or long configuration files, or binary test data.

While it’s possible to rewrite history to remove large files, this is a bit of work and messes with the repository’s consistency. Once data has gone to master it’s especially difficult, then there’s a risk of others merging the files back in later unless they cooperate in the history-rewriting.

This is one reason to prefer forks over branches; if you commit a huge file by mistake it’s just in your fork.

### Code style Of course, please write nice and clean code :-)

PEP8-compatibility is great (you can test with flake8) but not as important as other good coding habits such as avoiding duplication. See e.g. the [famous beyond PEP8 talk](https://www.youtube.com/watch?v=wf-BqAjZb8M).

In particular, don’t go into code someone else is maintaining to “PEP8-ify” it (or worse, use some automatic styling tool)

Other style guidelines (docstrings etc.) are yet to be determined.

### Pull requests When accepting pull requests, preferrably squash as it attributes all the commits to one single pull request. One might consider merging the pull request without squashing if it’s a few commits that mostly outline discrete steps of an implementation which seem worth keeping.

Writing documentation

To write documentation, please refer to the existing for examples. To add new pages:
  • Add a new .rst file in the basics/advanced/developer folder within ./docs.

  • Add the link to the file in the docs/index.rst

  • run bash make_docs.sh. This will run sphinx locally, this allows one to preview if the results are the desired results. Several modules need be installed in order to run this script.

  • Add the .rst file, the index.rst to git.

Updating docs/reference

The docs/reference is only updated with bash make_docs.sh. In case modules are added/removed, one needs to rerun this script to and commit the changes to the files in docs/reference.

Release procedure

  • Update personal fork & local master to Axfoundation fork

  • Edit and commit HISTORY.md

  • bumpversion patch (or minor/major, as appropriate)

  • Push to personal and AxFoundation fork, with –tags

  • fast-foward and push AxFoundation/stable

  • Add release info on release page of github website

The above pages describe how strax’s processing framework works under the hood, and explains some implementation choices. It’s meant for people who want to do core development on strax; users or even plugin developers should not need it.

strax package

Subpackages

strax.processing package

Submodules
strax.processing.data_reduction module

Functions to perform in-place pulse-level data reduction.

class strax.processing.data_reduction.ReductionLevel(value)[source]

Bases: IntEnum

Identifies what type of data reduction has been used on a record.

BASELINE_CUT = 1
HITS_ONLY = 2
METADATA_ONLY = 4
NO_REDUCTION = 0
WAVEFORM_REPLACED = 3
strax.processing.data_reduction.cut_baseline(records, n_before=48, n_after=30)[source]

Replace first n_before and last n_after samples of pulses by 0.

strax.processing.data_reduction.cut_outside_hits(records, hits, left_extension=2, right_extension=15)[source]

Return records with waveforms zeroed if not within left_extension or right_extension of hits. These extensions properly account for breaking of pulses into records.

If you pass an incomplete (e.g. cut) set of records, we will not save data around hits found in the removed records, even if this stretches into records that you did pass.

strax.processing.general module
exception strax.processing.general.NoBreakFound[source]

Bases: Exception

strax.processing.general.abs_time_to_prev_next_interval(things, intervals)[source]

Function which determines the time difference of things to previous and next interval, e.g., events to veto intervals. Assumes that things do not overlap.

Parameters:
  • things – Numpy structured array containing strax time fields

  • intervals – Numpy structured array containing time fields

Returns:

Two integer arrays with the time difference to the previous and next intervals.

strax.processing.general.endtime(x)[source]

Return endtime of intervals x.

strax.processing.general.from_break(x, safe_break, not_before=0, left=True, tolerant=False)[source]

Return records on side of a break at least safe_break long If there is no such break, return the best break found.

strax.processing.general.fully_contained_in(things, containers)[source]

Return array of len(things) with index of interval in containers for which things are fully contained in a container, or -1 if no such exists.

We assume all things and containers are sorted by time. If containers are overlapping, the first container of the thing is chosen.

strax.processing.general.overlap_indices(a1, n_a, b1, n_b)[source]

Given interval [a1, a1 + n_a), and [b1, b1 + n_b) of integers, return indices [a_start, a_end), [b_start, b_end) of overlapping region.

strax.processing.general.sort_by_time(x)[source]

Sorts things.

Either by time or by time, then channel if both fields are in the given array.

strax.processing.general.split_by_containment(things, containers)[source]

Return list of thing-arrays contained in each container. Result is returned as a numba.typed.List or list if containers are empty.

Assumes everything is sorted, and containers are non-overlapping.

strax.processing.general.split_touching_windows(things, containers, window=0)[source]

Split things by their containers and return a list of length containers.

Parameters:
  • things – Sorted array of interval-like data

  • containers – Sorted array of interval-like data

  • window – threshold distance for touching check.

For example:
  • window = 0: things must overlap one sample

  • window = -1: things can start right after container ends

    (i.e. container endtime equals the thing starttime, since strax endtimes are exclusive)

Returns:

strax.processing.general.touching_windows(things, containers, window=0)[source]

Return array of (start, exclusive end) indices into things which extend to within window of the container, for each container in containers.

Parameters:
  • things – Sorted array of interval-like data. We assume all things and containers are sorted by time. When endtime are not sorted, it will return indices of the first and last things which are touching the container.

  • containers – Sorted array of interval-like data. Containers are allowed to overlap.

  • window – threshold distance for touching check.

For example:
  • window = 0: things must overlap one sample

  • window = -1: things can start right after container ends

    (i.e. container endtime equals the thing starttime, since strax endtimes are exclusive)

strax.processing.hitlets module
strax.processing.hitlets.concat_overlapping_hits(hits, extensions, pmt_channels, start, end)[source]

Function which concatenates hits which may overlap after left and right hit extension. Assumes that hits are sorted correctly.

Note:

This function only updates time, and length of the hit.

Parameters:
  • hits – Hits in records.

  • extensions – Tuple of the left and right hit extension.

  • pmt_channels – Tuple of the detectors first and last PMT

  • start – Startime of the chunk

  • end – Endtime of the chunk

Returns:

array with concataneted hits.

strax.processing.hitlets.conditional_entropy(hitlets, template='flat', square_data=False)[source]

Function which estimates the conditional entropy based on the specified template.

In order to compute the conditional entropy each hitlet will be aligned such that its maximum falls into the same sample as for the template. If the maximum is ambiguous the first maximum is taken.

Parameters:
  • hitlets – Hitlets for which the entropy shall be computed. Can be any data_kind which offers the fields data and length.

  • template – Template to compare the data with. Can be either specified as “flat” to use a flat distribution or as a numpy array containing any normalized template.

  • square_data – If true data will be squared and normalized before estimating the entropy. Otherwise the data will only be normalized.

Returns:

Array containing the entropy values for each hitlet.

Note:

The template has to be normalized such that its total area is 1. Independently of the specified options, only samples for which the content is greater zero are used to compute the entropy.

In case of the non-squared case negative samples are omitted in the calculation.

strax.processing.hitlets.create_hitlets_from_hits(hits, save_outside_hits, channel_range, chunk_start=None, chunk_end=None)[source]

Function which creates hitlets from a bunch of hits.

Parameters:
  • hits – Hits found in records.

  • save_outside_hits – Tuple with left and right hit extension.

  • channel_range – Detectors change range from channel map.

  • chunk_start – (optional) start time of a chunk. Ensures that no hitlet is earlier than this timestamp.

  • chunk_end – (optional) end time of a chunk. Ensures that no hitlet ends later than this timestamp.

Returns:

Hitlets with temporary fields (data, max_goodness_of_split…)

strax.processing.hitlets.get_fwxm(hitlet, fraction=0.5)[source]

Estimates the left and right edge of a specific height percentage.

Parameters:
  • hitlet – Single hitlet

  • fraction – Level for which the width shall be computed.

Returns:

Two floats, left edge and right edge in ns

Notes:

The function searches for the last sample below and above the specified height level on the left and right hand side of the maximum. When the samples are found the width is estimated based upon a linear interpolation between the respective samples. In case, that the samples cannot be found for either one of the sides the corresponding outer most bin edges are used: left 0; right last sample + 1.

strax.processing.hitlets.get_hitlets_data(hitlets, records, to_pe, min_hitlet_sample=200)[source]

Function which searches for every hitlet in a given chunk the corresponding records data. Additionally compute the total area of the signal.

Parameters:
  • hitlets – Hitlets found in a chunk of records.

  • records – Records of the chunk.

  • to_pe – Array with area conversion factors from adc/sample to pe/sample. Please make sure that to_pe has the correct shape. The array index should match the channel number.

  • min_hitlet_sample – minimal length of the hitlet data field. prevents numba compiling from running into race conditions.

Returns:

Hitlets including data stored in the “data” field (if it did not exists before it will be added.)

strax.processing.hitlets.hitlet_properties(hitlets)[source]

Computes additional hitlet properties such as amplitude, FHWM, etc.

strax.processing.peak_building module
strax.processing.peak_building.find_hit_integration_bounds(hits, excluded_intervals, records, save_outside_hits, n_channels, allow_bounds_beyond_records=False)[source]

Update (lone) hits to include integration bounds. Please note that time and length of the original hit are not changed!

Parameters:
  • hits – Hits or lone hits which should be extended by integration bounds.

  • excluded_intervals – Regions in which hits should not extend to. E.g. Peaks for lone hits. If not needed just put a zero length strax.time_fields array.

  • records – Records in which hits were found.

  • save_outside_hits – Hit extension to the left and right in ns not samples!!

  • n_channels – Number of channels for given detector.

  • allow_bounds_beyond_records – If true extend left/ right_integration beyond record boundaries. E.g. to negative samples for left side.

strax.processing.peak_building.find_peak_groups(peaks, gap_threshold, left_extension=0, right_extension=0, max_duration=1000000000)[source]

Return boundaries of groups of peaks separated by gap_threshold, extended left and right.

Parameters:
  • peaks – Peaks to group

  • gap_threshold – Minimum gap between peaks

  • left_extension – Extend groups by this many ns left

  • right_extension – “ “ right

  • max_duration – max duration time of merged peak in ns

Returns:

time, endtime arrays of group boundaries

strax.processing.peak_building.find_peaks(hits, adc_to_pe, gap_threshold=300, left_extension=20, right_extension=150, min_area=0, min_channels=2, max_duration=10000000, _result_buffer=None, result_dtype=None)[source]

Return peaks made from grouping hits together Assumes all hits have the same dt.

Parameters:
  • hits – Hit (or any interval) to group

  • left_extension – Extend peaks by this many ns left

  • right_extension – Extend peaks by this many ns right

  • gap_threshold – No hits for this much ns means new peak

  • min_area – Peaks with less than min_area are not returned

  • min_channels – Peaks with less contributing channels are not returned

  • max_duration – max duration time of merged peak in ns

strax.processing.peak_building.integrate_lone_hits(lone_hits, records, peaks, save_outside_hits, n_channels)[source]

Update the area of lone_hits to the integral in ADCcounts x samples.

Parameters:
  • lone_hits – Hits outside of peaks

  • records – Records in which hits and peaks were found

  • peaks – Peaks

  • save_outside_hits – (left, right) TIME with wich we should extend

the integration window of hits the integration region :param n_channels: number of channels

TODO: this doesn’t extend the integration range beyond record boundaries

strax.processing.peak_building.store_downsampled_waveform(p, wv_buffer, store_in_data_top=False, wv_buffer_top=array([1.], dtype=float32))[source]

Downsample the waveform in buffer and store it in p[‘data’] and in p[‘data_top’] if indicated to do so.

Parameters:
  • p – Row of a strax peak array, or compatible type. Note that p[‘dt’] is adjusted to match the downsampling.

  • wv_buffer – numpy array containing sum waveform during the peak at the input peak’s sampling resolution p[‘dt’].

  • store_in_data_top – Boolean which indicates whether to also store into p[‘data_top’] When downsampling results in a fractional number of samples, the peak is shortened rather than extended. This causes data loss, but it is necessary to prevent overlaps between peaks.

strax.processing.peak_building.sum_waveform(peaks, hits, records, record_links, adc_to_pe, n_top_channels=0, select_peaks_indices=None)[source]

Compute sum waveforms for all peaks in peaks. Only builds summed waveform other regions in which hits were found. This is required to avoid any bias due to zero-padding and baselining. Will downsample sum waveforms if they do not fit in per-peak buffer.

Parameters:
  • peaks – Peaks for which the summed waveform should be build.

  • hits – Hits which are inside peaks. Must be sorted according to record_i.

  • records – Records to be used to build peaks.

  • record_links – Tuple of previous and next records.

  • n_top_channels – Number of top array channels.

  • select_peaks_indices – Indices of the peaks for partial processing. In the form of np.array([np.int, np.int, ..]). If None (default), all the peaks are used for the summation. Assumes all peaks AND pulses have the same dt!

strax.processing.peak_merging module
strax.processing.peak_merging.add_lone_hits(peaks, lone_hits, to_pe, n_top_channels=0)[source]

Function which adds information from lone hits to peaks if lone hit is inside a peak (e.g. after merging.). Modifies peak area and data inplace.

Parameters:
  • peaks – Numpy array of peaks

  • lone_hits – Numpy array of lone_hits

  • to_pe – Gain values to convert lone hit area into PE.

  • n_top_channels – Number of top array channels.

strax.processing.peak_merging.merge_peaks(peaks, start_merge_at, end_merge_at, max_buffer=100000)[source]

Merge specified peaks with their neighbors, return merged peaks.

Parameters:
  • peaks – Record array of strax peak dtype.

  • start_merge_at – Indices to start merge at

  • end_merge_at – EXCLUSIVE indices to end merge at

  • max_buffer – Maximum number of samples in the sum_waveforms and other waveforms of the resulting peaks (after merging). Peaks must be constructed based on the properties of constituent peaks, it being too time-consuming to revert to records/hits.

strax.processing.peak_merging.replace_merged(orig, merge)[source]

Return sorted array of ‘merge’ and members of ‘orig’ that do not touch any of merge.

Parameters:
  • orig – Array of interval-like objects (e.g. peaks)

  • merge – Array of interval-like objects (e.g. peaks)

strax.processing.peak_properties module
strax.processing.peak_properties.compute_index_of_fraction(peak, fractions_desired, result)[source]

Store the (fractional) indices at which peak reaches fractions_desired of their area in result.

Parameters:
  • peak – single strax peak(let) or other data-bearing dtype

  • fractions_desired – array of floats between 0 and 1

Returns:

len(fractions_desired) array of floats

strax.processing.peak_properties.compute_widths(peaks, select_peaks_indices=None)[source]

Compute widths in ns at desired area fractions for peaks.

Parameters:
  • peaks – single strax peak(let) or other data-bearing dtype

  • select_peaks_indices – array of integers informing which peaks to compute default to None in which case compute for all peaks

strax.processing.peak_properties.index_of_fraction(peaks, fractions_desired)[source]

Return the (fractional) indices at which the peaks reach fractions_desired of their area.

Parameters:
  • peaks – strax peak(let)s or other data-bearing dtype

  • fractions_desired – array of floats between 0 and 1

Returns:

(len(peaks), len(fractions_desired)) array of floats

strax.processing.peak_splitting module
strax.processing.peak_splitting.natural_breaks_gof(w, dt, normalize=False, split_low=False, filter_wing_width=0)[source]

Return natural breaks goodness of split/fit for the waveform w a sharp peak gives ~0, two widely separate peaks ~1.

strax.processing.peak_splitting.split_peaks(peaks, hits, records, rlinks, to_pe, algorithm='local_minimum', data_type='peaks', n_top_channels=0, **kwargs)[source]

Return peaks split according to algorithm, with waveforms summed and widths computed.

Note:

Can also be used for hitlets splitting with local_minimum splitter. Just put hitlets instead of peaks.

Parameters:

peaks – Original peaks. Sum waveform must have been built

and properties must have been computed (if you use them) :param hits: Hits found in records. (or None in case of hitlets

splitting.)

Parameters:
  • records – Records from which peaks were built

  • rlinks – strax.record_links for given records (or None in case of hitlets splitting.)

  • to_pe – ADC to PE conversion factor array (of n_channels)

  • algorithm – ‘local_minimum’ or ‘natural_breaks’.

  • data_type – ‘peaks’ or ‘hitlets’. Specifies whether to use sum_wavefrom or get_hitlets_data to compute the waveform of the new split peaks/hitlets.

  • n_top_channels – Number of top array channels.

  • result_dtype – dtype of the result.

Any other options are passed to the algorithm.

strax.processing.peak_splitting.symmetric_moving_average(a, wing_width)[source]

Return the moving average of a, over windows of length [2 * wing_width + 1] centered on each sample.

(i.e. the window covers each sample itself, plus a ‘wing’ of width wing_width on either side)

strax.processing.pulse_processing module

Functions that perform processing on pulses (other than data reduction functions, which are in data_reduction.py)

strax.processing.pulse_processing.baseline(records, baseline_samples=40, flip=True, allow_sloppy_chunking=False, fallback_baseline=16000)[source]

Determine baseline as the average of the first baseline_samples of each pulse. Subtract the pulse data from int(baseline), and store the baseline mean and rms.

Parameters:
  • baseline_samples – number of samples at start of pulse to average to determine the baseline.

  • flip – If true, flip sign of data

  • allow_sloppy_chunking – Allow use of the fallback_baseline in case the 0th fragment of a pulse is missing

  • fallback_baseline – Fallback baseline (ADC counts) Assumes records are sorted in time (or at least by channel, then time). Assumes record_i information is accurate – so don’t cut pulses before baselining them!

strax.processing.pulse_processing.filter_records(r, ir)[source]

Apply filter with impulse response ir over the records r. Assumes the filter origin is at the impulse response maximum.

Parameters:
  • ws – Waveform matrix, must be float

  • ir – Impulse response, must have odd length. Will normalize.

  • prev_r – Previous record map from strax.record_links

  • next_r – Next record map from strax.record_links

strax.processing.pulse_processing.filter_waveforms(ws, ir, prev_r, next_r)[source]

Convolve filter with impulse response ir over each row of ws. Assumes the filter origin is at the impulse response maximum.

Parameters:
  • ws – Waveform matrix, must be float

  • ir – Impulse response, must have odd length.

  • prev_r – Previous record map from strax.record_links

  • next_r – Next record map from strax.record_links

strax.processing.pulse_processing.find_hits(records, min_amplitude: int | ndarray = 15, min_height_over_noise: int | ndarray = 0)[source]

Return hits (intervals >= threshold) found in records. Hits that straddle record boundaries are split (perhaps we should fix this?)

NB: returned hits are NOT sorted yet!

strax.processing.pulse_processing.integrate(records)[source]

Integrate records in-place.

strax.processing.pulse_processing.raw_to_records(raw_records)[source]
strax.processing.pulse_processing.record_length_from_dtype(dtype)[source]

Return (prev_r, next_r), each arrays of indices of previous/next record in the same pulse, or -1 if this is not applicable.

strax.processing.pulse_processing.zero_out_of_bounds(records)[source]

Set waveforms to zero out of pulse bounds.

strax.processing.statistics module
strax.processing.statistics.highest_density_region(data, fractions_desired, only_upper_part=False, _buffer_size=10)[source]

Computes for a given sampled distribution the highest density region of the desired fractions. Does not assume anything on the normalisation of the data.

Parameters:
  • data – Sampled distribution

  • fractions_desired – numpy.array Area/probability for which the hdr should be computed.

  • _buffer_size – Size of the result buffer. The size is equivalent to the maximal number of allowed intervals.

  • only_upper_part – Boolean, if true only computes area/probability between maximum and current height.

Returns:

two arrays: The first one stores the start and inclusive endindex of the highest density region. The second array holds the amplitude for which the desired fraction was reached.

Note:

Also goes by the name highest posterior density. Please note, that the right edge corresponds to the right side of the sample. Hence the corresponding index is -= 1.

Module contents

strax.storage package

Submodules
strax.storage.common module

Base classes for storage backends, frontends, and savers in strax.

Please see the developer documentation for more details on strax’ storage hierarchy.

exception strax.storage.common.DataCorrupted[source]

Bases: Exception

exception strax.storage.common.DataExistsError(at, message='')[source]

Bases: Exception

Raised when attempting to write a piece of data that is already written.

class strax.storage.common.DataKey(run_id, data_type, lineage)[source]

Bases: object

Request for data to a storage registry.

Instances of this class uniquely identify a single piece of strax data abstractly – that is, it describes the full history of algorithms that have to be run to reproduce it.

It is used for communication between the main Context class and storage frontends.

data_type: str
lineage: dict
property lineage_hash

Deterministic hash of the lineage.

run_id: str
exception strax.storage.common.DataNotAvailable[source]

Bases: Exception

Raised when requested data is not available.

exception strax.storage.common.EmptyDataWarning[source]

Bases: UserWarning

exception strax.storage.common.RunMetadataNotAvailable[source]

Bases: Exception

class strax.storage.common.Saver(metadata, saver_timeout=300)[source]

Bases: object

Interface for saving a data type.

Must work even if forked. Do NOT add unpickleable things as attributes (such as loggers)!

allow_fork = True
allow_rechunk = True
close(wait_for: list | tuple = ())[source]
closed = False
got_exception = None
is_forked = False
save(chunk: Chunk, chunk_i: int, executor=None)[source]

Save a chunk, returning future to wait on or None.

save_from(source: Generator, rechunk=True, executor=None)[source]

Iterate over source and save the results under key along with metadata.

class strax.storage.common.StorageBackend[source]

Bases: object

Storage backend for strax data.

This is a ‘dumb’ interface to data. Each bit of data stored is described by backend-specific keys (e.g. directory names). Finding and assigning backend keys is the responsibility of the StorageFrontend.

The backend class name + backend_key must together uniquely identify a piece of data. So don’t make __init__ take options like ‘path’ or ‘host’, these have to be hardcoded (or made part of the key).

get_metadata(backend_key: DataKey | str, **kwargs) dict[source]

Get the metadata using the backend_key and the Backend specific _get_metadata method. When an unforeseen error occurs, raises an strax.DataCorrupted error. Any kwargs are passed on to _get_metadata.

Parameters:

backend_key – The key the backend should look for (can be string or strax.DataKey)

Returns:

metadata for the data associated to the requested backend-key

Raises:
  • strax.DataCorrupted – This backend is not able to read the metadata but it should exist

  • strax.DataNotAvailable – When there is no data associated with this backend-key

loader(backend_key, time_range=None, chunk_number=None, executor=None)[source]

Iterates over strax data in backend_key.

Parameters:
  • time_range – 2-length arraylike of (start,exclusive end) of desired data. Will return all data that partially overlaps with the range. Default is None, which means get the entire run.

  • chunk_number – Chunk number to load exclusively

  • executor – Executor to push load/decompress operations to

saver(key, metadata, **kwargs)[source]

Return saver for data described by key.

class strax.storage.common.StorageFrontend(readonly=False, provide_run_metadata=None, overwrite='if_broken', take_only=(), exclude=())[source]

Bases: object

Interface to something that knows data-locations and run-level metadata.

For example, a runs database, or a data directory on the file system.

backends: list
can_define_runs = False
define_run(name, sub_run_spec, **metadata)[source]
find(key: DataKey, write=False, check_broken=True, allow_incomplete=False, fuzzy_for=(), fuzzy_for_options=())[source]

Return (str: backend class name, backend-specific) key to get at / write data, or raise exception.

Parameters:
  • key – DataKey of data to load {data_type: (plugin_name, version, {config_option: value, …}, …}

  • write – Set to True if writing new data. The data is immediately registered, so you must follow up on the write!

  • check_broken – If True, raise DataNotAvailable if data has not been complete written, or writing terminated with an exception.

find_several(keys, **kwargs)[source]

Return list with backend keys or False for several data keys.

Options are as for find()

get_metadata(key, allow_incomplete=False, fuzzy_for=(), fuzzy_for_options=())[source]

Retrieve data-level metadata for the specified key.

Other parameters are the same as for .find

loader(key: DataKey, time_range=None, allow_incomplete=False, fuzzy_for=(), fuzzy_for_options=(), chunk_number=None, executor=None)[source]

Return loader for data described by DataKey.

Parameters:
  • key – DataKey describing data

  • time_range – 2-length arraylike of (start, exclusive end) of row numbers to get. Default is None, which means get the entire run.

  • allow_incomplete – Allow loading of data which has not been completely written to disk yet.

  • fuzzy_for – list/tuple of plugin names for which no plugin name, version, or option check is performed.

  • fuzzy_for_options – list/tuple of configuration options for which no check is performed.

  • chunk_number – Chunk number to load exclusively.

  • executor – Executor for pushing load computation to

provide_run_metadata = False
provide_superruns = False
remove(key)[source]

Removes a registration.

Does not delete any actual data

run_metadata(run_id, projection=None)[source]

Return run metadata dictionary, or raise RunMetadataNotAvailable.

saver(key, metadata, **kwargs)[source]

Return saver for data described by DataKey.

storage_type = 1
write_run_metadata(run_id, metadata)[source]

Stores metadata for run_id.

Silently overwrites any previously stored run-level metadata.

class strax.storage.common.StorageType(value)[source]

Bases: IntEnum

Class attribute of how far/close data is when fetched from a given storage frontend.

This is used to prioritize which frontend will be asked first for data (prevents loading data from slow frontends when fast frontends might also have the data)

COMPRESSED = 3
LOCAL = 1
MEMORY = 0
ONLINE = 2
REMOTE = 4
TAPE = 10
strax.storage.files module
class strax.storage.files.DataDirectory(path='.', *args, deep_scan=False, **kwargs)[source]

Bases: StorageFrontend

Simplest registry: single directory with FileStore data sitting in subdirectories.

Run-level metadata is stored in loose json files in the directory.

backend_key(dirname)[source]
can_define_runs = True
provide_run_metadata = False
provide_superruns = True
static raise_if_non_compatible_run_id(run_id)[source]
remove(key)[source]

Removes a registration.

Does not delete any actual data

run_metadata(run_id, projection=None)[source]

Return run metadata dictionary, or raise RunMetadataNotAvailable.

write_run_metadata(run_id, metadata)[source]

Stores metadata for run_id.

Silently overwrites any previously stored run-level metadata.

class strax.storage.files.FileSaver(dirname, metadata, **kwargs)[source]

Bases: Saver

Saves data to compressed binary files.

json_options = {'indent': 4, 'sort_keys': True}
class strax.storage.files.FileSytemBackend(*args, set_target_chunk_mb: int | None = None, **kwargs)[source]

Bases: StorageBackend

Store data locally in a directory of binary files.

Files are named after the chunk number (without extension). Metadata is stored in a file called metadata.json.

exception strax.storage.files.InvalidFolderNameFormat[source]

Bases: Exception

strax.storage.files.dirname_to_prefix(dirname)[source]

Return filename prefix from dirname.

strax.storage.mongo module

I/O format for MongoDB.

This plugin is designed with data monitoring in mind, to put smaller amounts of extracted data into a database for quick access. However it should work with any plugin.

Note that there is no check to make sure the 16MB document size limit is respected!

class strax.storage.mongo.MongoBackend(uri: str, database: str, col_name: str)[source]

Bases: StorageBackend

Mongo storage backend.

class strax.storage.mongo.MongoFrontend(uri: str, database: str, col_name: str, *args, **kwargs)[source]

Bases: StorageFrontend

MongoDB storage frontend.

property collection
storage_type = 2
class strax.storage.mongo.MongoSaver(key: str, metadata: dict, col: Collection, **kwargs)[source]

Bases: Saver

allow_rechunk = False
strax.storage.zipfiles module
class strax.storage.zipfiles.ZipDirectory(path='.', *args, readonly=True, **kwargs)[source]

Bases: StorageFrontend

ZipFile-based storage frontend for strax.

All data for one run is assumed to be in a single zip file <run_id>.zip, with the same file/directory structure as created by FileStore.

We cannot write zip files directly (this would result in concurrency hell), instead these zip files are made by zipping stuff from FileSytemBackend.

remove(key)[source]

Removes a registration.

Does not delete any actual data

run_metadata(run_id)[source]

Return run metadata dictionary, or raise RunMetadataNotAvailable.

storage_typ = 3
write_run_metadata(run_id, metadata)[source]

Stores metadata for run_id.

Silently overwrites any previously stored run-level metadata.

static zip_dir(input_dir, output_zipfile, delete=False)[source]

Zips subdirectories of input_dir to output_zipfile (without compression).

Travels into subdirectories, but not sub-subdirectories. Skips any other files in directory. :param delete: If True, delete original directories

class strax.storage.zipfiles.ZipFileBackend[source]

Bases: StorageBackend

saver(*args, **kwargs)[source]

Return saver for data described by key.

Module contents

Submodules

strax.chunk module

exception strax.chunk.CannotSplit[source]

Bases: Exception

class strax.chunk.Chunk(*, data_type, data_kind, dtype, run_id, start, end, data, subruns=None, target_size_mb=200)[source]

Bases: object

Single chunk of strax data of one data type.

classmethod concatenate(chunks, allow_hyperrun=False)[source]

Create chunk by concatenating chunks of same data type You can pass None’s, they will be ignored.

data: ndarray
data_kind: str
data_type: str
dtype: dtype
property duration
end: int
property first_subrun
property is_superrun
property last_subrun
classmethod merge(chunks, data_type='<UNKNOWN>')[source]

Create chunk by merging columns of chunks of same data kind.

Parameters:
  • chunks – Chunks to merge. None is allowed and will be ignored.

  • data_type – data_type name of new created chunk. Set to <UNKNOWN> if not provided.

property nbytes
run_id: str
split(t: int | None, allow_early_split=False)[source]

Return (chunk_left, chunk_right) split at time t.

Parameters:
  • t – Time at which to split the data. All data in the left chunk will have their (exclusive) end <= t, all data in the right chunk will have (inclusive) start >=t.

  • allow_early_split – If False, raise CannotSplit if the requirements above cannot be met. If True, split at the closest possible time before t.

start: int
subruns: dict
target_size_mb: int
strax.chunk.continuity_check(chunk_iter)[source]

Check continuity of chunks yielded by chunk_iter as they are yielded.

strax.chunk.split_array(data, t, allow_early_split=False)[source]

Return (data left of t, data right of t, t), or raise CannotSplit if that would split a data element in two.

Parameters:
  • data – strax numpy data

  • t – Time to split data

  • allow_early_split – Instead of raising CannotSplit, split at t_split as close as possible before t where a split can happen. The new split time replaces t in the return value.

strax.chunk.transform_chunk_to_superrun_chunk(superrun_id, chunk)[source]

Function which transforms/creates a new superrun chunk from subrun chunk.

Parameters:
  • superrun_id – id/name of the superrun.

  • chunk – strax.Chunk of a superrun subrun.

Returns:

strax.Chunk

strax.config module

class strax.config.Config(**kwargs)[source]

Bases: Option

An alternative to the takes_config class decorator which uses the descriptor protocol to return the config value when the attribute is accessed from within a plugin.

fetch(plugin)[source]

This function is called when the attribute is being accessed.

Should be overridden by subclasses to customize behavior.

exception strax.config.InvalidConfiguration[source]

Bases: Exception

class strax.config.Option(name: str, type: str | type | tuple | list = '<OMITTED>', default: Any = '<OMITTED>', default_factory: str | Callable = '<OMITTED>', default_by_run='<OMITTED>', child_option: bool = False, parent_option_name: str | None = None, track: bool = True, infer_type=False, help: str = '')[source]

Bases: object

Configuration option taken by a strax plugin.

get_default(run_id, run_defaults: dict | None = None)[source]

Return default value for the option.

taken_by: str
validate(config, run_id=None, run_defaults=None, set_defaults=True)[source]

Checks if the option is in config and sets defaults if needed.

strax.config.combine_configs(old_config, new_config=None, mode='update')[source]
strax.config.takes_config(*options)[source]

Decorator for plugin classes, to specify which options it takes.

Parameters:

options – Option instances of options this plugin takes.

strax.context module

class strax.context.Context(storage=None, config=None, register=None, register_all=None, **kwargs)[source]

Bases: object

Context for strax analysis.

A context holds info on HOW to process data, such as which plugins provide what data types, where to store which results, and configuration options for the plugins.

You start all strax processing through a context.

accumulate(run_id: str, targets: Tuple[str] | List[str], fields=None, function=None, store_first_for_others=True, function_takes_fields=False, **kwargs)[source]

Return a dictionary with the sum of the result of get_array.

Parameters:
  • function

    Apply this function to the array before summing the results. Will be called as function(array), where array is a chunk of the get_array result. Should return either:

    • A scalar or 1d array -> accumulated result saved under ‘result’

    • A record array or dict -> fields accumulated individually

    • None -> nothing accumulated

    If not provided, the identify function is used.

    NB: Additionally and independently, if there are any functions registered under context_config[‘apply_data_function’] these are applied first directly after loading the data.

  • fields – Fields of the function output to accumulate. If not provided, all output fields will be accumulated.

  • store_first_for_others – if True (default), for fields included in the data but not fields, store the first value seen in the data (if any value is seen).

  • function_takes_fields – If True, function will be called as function(data, fields) instead of function(data).

All other options are as for get_iter.

Return dictionary:

Dictionary with the accumulated result; see function and store_first_for_others arguments. Four fields are always added:

start: start time of the first processed chunk end: end time of the last processed chunk n_chunks: number of chunks in run n_rows: number of data entries in run

classmethod add_method(f)[source]

Add f as a new Context method.

available_for_run(run_id: str, include_targets: None | list | tuple | str = None, exclude_targets: None | list | tuple | str = None, pattern_type: str = 'fnmatch') DataFrame

For a given single run, check all the targets if they are stored. Excludes the target if never stored anyway.

Parameters:
  • run_id – requested run

  • include_targets – targets to include e.g. raw_records, raw_records* or *_nv. If multiple targets (e.g. a list) is provided, the target should match any of the arguments!

  • exclude_targets – targets to exclude e.g. raw_records, raw_records* or *_nv. If multiple targets (e.g. a list) is provided, the target should match none of the arguments!

  • pattern_type – either ‘fnmatch’ (Unix filename pattern matching) or ‘re’ (Regular expression operations).

Returns:

Table of available data per target

check_hyperrun()[source]

Raise if non-hyperrun plugins depends on hyperrun plugins.

compare_metadata(data1, data2, return_results=False)[source]

Compare the metadata between two strax data.

Parameters:

data2 (data1,) – either a list (tuple) of runid + target pair, or path to metadata to

compare, or a dictionary of the metadata :param return_results: bool, if True, returns a dictionary with metadata and lineages that

are found for the inputs does not do the comparison

example usage:

context.compare_metadata( (“053877”, “peak_basics”), “./my_path_to/JSONfile.json”) first_metadata = context.get_metadata(run_id, “events”) context.compare_metadata(

(“053877”, “peak_basics”), first_metadata)

context.compare_metadata(

(“053877”, “records”), (“053899”, “records”) )

results_dict = context.compare_metadata(
(“053877”, “peak_basics”), (“053877”, “events_info”),

return_results=True)

config: dict
context_config: dict
copy_to_frontend(run_id: str, target: str, target_frontend_id: int | None = None, target_compressor: str | None = None, rechunk: bool = False, rechunk_to_mb: int = 200)[source]

Copy data from one frontend to another.

Parameters:
  • run_id – run_id

  • target – target datakind

  • target_frontend_id – index of the frontend that the data should go to in context.storage. If no index is specified, try all.

  • target_compressor – if specified, recompress with this compressor.

  • rechunk – allow re-chunking for saving

  • rechunk_to_mb – rechunk to specified target size. Only works if rechunk is True.

data_info(data_name: str) DataFrame[source]

Return pandas DataFrame describing fields in data_name.

define_run(name: str, data: ndarray | DataFrame | dict | list | tuple, from_run: str | None = None)

Function for defining new superruns from a list of run_ids.

Note:

The function also allows to create a superrun from data (numpy.arrays/pandas.DataFframrs). However, this is currently not supported from the data loading side.

Parameters:
  • name – Name/run_id of the superrun. Suoerrun names must start with an underscore.

  • data – Data from which the superrun should be created. Can be either one of the following: a tuple/list of run_ids or a numpy.array/pandas.DataFrame containing some data.

  • from_run – List of run_ids which were used to create the numpy.array/pandas.DataFrame passed in data.

deregister_plugins_with_missing_dependencies()[source]

Deregister plugins in case a data_type the plugin depends on is not provided by any other plugin.

estimate_run_start_and_end(run_id, targets=None)[source]

Return run start and end time in ns since epoch.

This fetches from run metadata, and if this fails, it estimates it using data metadata from the targets or the underlying data-types (if it is stored).

get_array(run_id: str | tuple | list, targets, save=(), max_workers=None, **kwargs) ndarray[source]

Compute target for run_id and return as numpy array.

Parameters:
  • run_id – run id to get

  • targets – list/tuple of strings of data type names to get

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • save – extra data types you would like to save to cache, if they occur in intermediate computations. Many plugins save automatically anyway.

  • max_workers – Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax’s multithreading.

  • allow_multiple – Allow multiple targets to be computed simultaneously without merging the results of the target. This can be used when mass producing plugins that are not of the same datakind. Don’t try to use this in get_array or get_df because the data is not returned.

  • add_run_id_field – Boolean whether to add a run_id field in case of multi-runs.

  • run_id_as_bytes – Boolean if true uses byte string instead of an unicode string added to a multi-run array. This can save a lot of memory when loading many runs.

  • selection – Query string, sequence of strings, or simple function to apply. The function must take a single argument which represents the structure numpy array of the loaded data.

  • selection_str – Same as selection (deprecated)

  • keep_columns – Array field/dataframe column names to keep. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • drop_columns – Array field/dataframe column names to drop. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • time_range – (start, stop) range to load, in ns since the epoch

  • seconds_range – (start, stop) range of seconds since the start of the run to load.

  • time_within – row of strax data (e.g. event) to use as time range

  • time_selection – Kind of time selection to apply: - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so

  • _chunk_number – For internal use: return data from one chunk.

  • progress_bar – Display a progress bar if metedata exists.

  • multi_run_progress_bar – Display a progress bar for loading multiple runs

get_components(run_id: str, targets=(), save=(), time_range=None, chunk_number=None, multi_run_progress_bar=False) ProcessorComponents[source]

Return components for setting up a processor.

Parameters:
  • run_id – run id to get

  • targets – list/tuple of strings of data type names to get

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • save – extra data types you would like to save to cache, if they occur in intermediate computations. Many plugins save automatically anyway.

  • max_workers – Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax’s multithreading.

  • allow_multiple – Allow multiple targets to be computed simultaneously without merging the results of the target. This can be used when mass producing plugins that are not of the same datakind. Don’t try to use this in get_array or get_df because the data is not returned.

  • add_run_id_field – Boolean whether to add a run_id field in case of multi-runs.

  • run_id_as_bytes – Boolean if true uses byte string instead of an unicode string added to a multi-run array. This can save a lot of memory when loading many runs.

  • selection – Query string, sequence of strings, or simple function to apply. The function must take a single argument which represents the structure numpy array of the loaded data.

  • selection_str – Same as selection (deprecated)

  • keep_columns – Array field/dataframe column names to keep. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • drop_columns – Array field/dataframe column names to drop. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • time_range – (start, stop) range to load, in ns since the epoch

  • seconds_range – (start, stop) range of seconds since the start of the run to load.

  • time_within – row of strax data (e.g. event) to use as time range

  • time_selection – Kind of time selection to apply: - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so

  • _chunk_number – For internal use: return data from one chunk.

  • progress_bar – Display a progress bar if metedata exists.

  • multi_run_progress_bar – Display a progress bar for loading multiple runs

get_data_kinds() Tuple[source]

Return two dictionaries: 1. one with all available data_kind as key and their data_types(list) as values 2. one with all available data_type as key and their data_kind(str) as values

get_df(run_id: str | tuple | list, targets, save=(), max_workers=None, **kwargs) DataFrame[source]

Compute target for run_id and return as pandas DataFrame.

Parameters:
  • run_id – run id to get

  • targets – list/tuple of strings of data type names to get

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • save – extra data types you would like to save to cache, if they occur in intermediate computations. Many plugins save automatically anyway.

  • max_workers – Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax’s multithreading.

  • allow_multiple – Allow multiple targets to be computed simultaneously without merging the results of the target. This can be used when mass producing plugins that are not of the same datakind. Don’t try to use this in get_array or get_df because the data is not returned.

  • add_run_id_field – Boolean whether to add a run_id field in case of multi-runs.

  • run_id_as_bytes – Boolean if true uses byte string instead of an unicode string added to a multi-run array. This can save a lot of memory when loading many runs.

  • selection – Query string, sequence of strings, or simple function to apply. The function must take a single argument which represents the structure numpy array of the loaded data.

  • selection_str – Same as selection (deprecated)

  • keep_columns – Array field/dataframe column names to keep. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • drop_columns – Array field/dataframe column names to drop. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • time_range – (start, stop) range to load, in ns since the epoch

  • seconds_range – (start, stop) range of seconds since the start of the run to load.

  • time_within – row of strax data (e.g. event) to use as time range

  • time_selection – Kind of time selection to apply: - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so

  • _chunk_number – For internal use: return data from one chunk.

  • progress_bar – Display a progress bar if metedata exists.

  • multi_run_progress_bar – Display a progress bar for loading multiple runs

get_iter(run_id: str, targets: Tuple[str] | List[str], save=(), max_workers=None, time_range=None, seconds_range=None, time_within=None, time_selection='fully_contained', selection=None, selection_str=None, keep_columns=None, drop_columns=None, allow_multiple=False, progress_bar=True, multi_run_progress_bar=True, _chunk_number=None, **kwargs) Iterator[Chunk][source]

Compute target for run_id and iterate over results.

Do NOT interrupt the iterator (i.e. break): it will keep running stuff in background threads…

Parameters:
  • run_id – run id to get

  • targets – list/tuple of strings of data type names to get

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • save – extra data types you would like to save to cache, if they occur in intermediate computations. Many plugins save automatically anyway.

  • max_workers – Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax’s multithreading.

  • allow_multiple – Allow multiple targets to be computed simultaneously without merging the results of the target. This can be used when mass producing plugins that are not of the same datakind. Don’t try to use this in get_array or get_df because the data is not returned.

  • add_run_id_field – Boolean whether to add a run_id field in case of multi-runs.

  • run_id_as_bytes – Boolean if true uses byte string instead of an unicode string added to a multi-run array. This can save a lot of memory when loading many runs.

  • selection – Query string, sequence of strings, or simple function to apply. The function must take a single argument which represents the structure numpy array of the loaded data.

  • selection_str – Same as selection (deprecated)

  • keep_columns – Array field/dataframe column names to keep. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • drop_columns – Array field/dataframe column names to drop. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • time_range – (start, stop) range to load, in ns since the epoch

  • seconds_range – (start, stop) range of seconds since the start of the run to load.

  • time_within – row of strax data (e.g. event) to use as time range

  • time_selection – Kind of time selection to apply: - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so

  • _chunk_number – For internal use: return data from one chunk.

  • progress_bar – Display a progress bar if metedata exists.

  • multi_run_progress_bar – Display a progress bar for loading multiple runs

get_meta(run_id, target) dict[source]

Return metadata for target for run_id, or raise DataNotAvailable if data is not yet available.

Parameters:
  • run_id – run id to get

  • target – data type to get

get_metadata(run_id, target) dict

Return metadata for target for run_id, or raise DataNotAvailable if data is not yet available.

Parameters:
  • run_id – run id to get

  • target – data type to get

get_save_when(target: str) SaveWhen | int[source]

For a given plugin, get the save when attribute either being a dict or a number.

get_single_plugin(run_id, data_name)[source]

Return a single fully initialized plugin that produces data_name for run_id.

For use in custom processing.

get_source(run_id: str, target: str, check_forbidden: bool = True) set | None[source]

For a given run_id and target get the stored bases where we can start processing from, if no base is available, return None.

Parameters:
  • run_id – run_id

  • target – target

  • check_forbidden – Check that we are not requesting to make a plugin that is forbidden by the context to be created.

Returns:

set of plugin names that are needed to start processing from and are needed in order to build this target.

get_source_sf(run_id, target, should_exist=False)[source]

Get the source storage frontends for a given run_id and target.

Parameters:
  • target (run_id,) – run_id, target

  • should_exist – Raise a ValueError if we cannot find one (e.g. we already checked the data is stored)

Returns:

list of strax.StorageFrontend (when should_exist is False)

get_zarr(run_ids, targets, storage='./strax_temp_data', progress_bar=False, overwrite=True, **kwargs)[source]

Get persistent arrays using zarr. This is useful when loading large amounts of data that cannot fit in memory zarr is very compatible with dask. Targets are loaded into separate arrays and runs are merged. the data is added to any existing data in the storage location.

Parameters:
  • run_ids – (Iterable) Run ids you wish to load.

  • targets – (Iterable) targets to load.

  • storage – (str, optional) fsspec path to store array. Defaults to ‘./strax_temp_data’.

  • overwrite – (boolean, optional) whether to overwrite existing arrays for targets at given path.

Return zarr.Group:

zarr group containing the persistant arrays available at the storage location after loading the requested data the runs loaded into a given array can be seen in the array .attrs[‘RUNS’] field

static hyperrun_id(run_id)[source]
property inverse_tree

Inverse tree whose key is depends_on and value is provides.

is_stored(run_id, target, detailed=False, **kwargs)[source]

Return whether data type target has been saved for run_id through any of the registered storage frontends.

Note that even if False is returned, the data type may still be made with a trivial computation.

key_for(run_id, target)[source]

Get the DataKey for a given run and a given target plugin. The DataKey is inferred from the plugin lineage. The lineage can come either from the _fixed_plugin_cache or computed on the fly.

Parameters:
  • run_id – run id to get

  • target – data type to get

Returns:

strax.DataKey of the target

keys_for_runs(target: str, run_ids: ndarray | list | tuple | str) List[DataKey]

Get the data-keys for a multitude of runs. If use_per_run_defaults is False which it preferably is (#246), getting many keys should be fast as we only only compute the lineage once.

Parameters:
  • run_ids – Runs to get datakeys for

  • target – datatype requested

Returns:

list of datakeys of the target for the given runs.

lineage(run_id, data_type)[source]

Return lineage dictionary for data_type and run_id, based on the options in this context.

list_available(target, runs=None, **kwargs) list

Return sorted list of run_id’s for which target is available.

Parameters:
  • target – Data type to check

  • runs – Runs to check. If None, check all runs.

make(run_id: str | tuple | list, targets, save=(), max_workers=None, _skip_if_built=True, **kwargs) None[source]

Compute target for run_id. Returns nothing (None).

Parameters:
  • run_id – run id to get

  • targets – list/tuple of strings of data type names to get

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • save – extra data types you would like to save to cache, if they occur in intermediate computations. Many plugins save automatically anyway.

  • max_workers – Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax’s multithreading.

  • allow_multiple – Allow multiple targets to be computed simultaneously without merging the results of the target. This can be used when mass producing plugins that are not of the same datakind. Don’t try to use this in get_array or get_df because the data is not returned.

  • add_run_id_field – Boolean whether to add a run_id field in case of multi-runs.

  • run_id_as_bytes – Boolean if true uses byte string instead of an unicode string added to a multi-run array. This can save a lot of memory when loading many runs.

  • selection – Query string, sequence of strings, or simple function to apply. The function must take a single argument which represents the structure numpy array of the loaded data.

  • selection_str – Same as selection (deprecated)

  • keep_columns – Array field/dataframe column names to keep. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • drop_columns – Array field/dataframe column names to drop. Useful to reduce amount of data in memory. (You can only specify either keep or drop column.)

  • time_range – (start, stop) range to load, in ns since the epoch

  • seconds_range – (start, stop) range of seconds since the start of the run to load.

  • time_within – row of strax data (e.g. event) to use as time range

  • time_selection – Kind of time selection to apply: - fully_contained: (default) select things fully contained in the range - touching: select things that (partially) overlap with the range - skip: Do not select a time range, even if other arguments say so

  • _chunk_number – For internal use: return data from one chunk.

  • progress_bar – Display a progress bar if metedata exists.

  • multi_run_progress_bar – Display a progress bar for loading multiple runs

new_context(storage=(), config=None, register=None, register_all=None, replace=False, **kwargs)[source]

Return a new context with new setting adding to those in this context.

Parameters:

replace – If True, replaces settings rather than adding them. See Context.__init__ for documentation on other parameters.

provided_dtypes(runid='0')[source]

Summarize dtype information provided by this context.

Returns:

dictionary of provided dtypes with their corresponding lineage hash, save_when, version

purge_unused_configs()[source]

Purge unused configs from the context.

register(plugin_class)[source]

Register plugin_class as provider for data types in provides.

Parameters:

plugin_class – class inheriting from strax.Plugin. You can also pass a sequence of plugins to register, but then you must omit the provides argument. If a plugin class omits the .provides attribute, we will construct one from its class name (CamelCase -> snake_case) Returns plugin_class (so this can be used as a decorator)

register_all(module)[source]

Register all plugins defined in module.

Can pass a list/tuple of modules to register all in each.

register_cut_list(cut_list)[source]

Register cut lists to strax context.

Parameters:

cut_list – cut lists to be registered. can be cutlist object or list/tuple of cutlist objects

property root_data_types

Root data_type that does not depend on anything.

run_defaults(run_id)[source]

Get configuration defaults from the run metadata (if these exist)

This will only call the rundb once for each run while the context is in existence; further calls to this will return a cached value.

run_metadata(run_id, projection=None) dict[source]

Return run-level metadata for run_id, or raise DataNotAvailable if this is not available.

Parameters:
  • run_id – run id to get

  • projection – Selection of fields to get, following MongoDB syntax. May not be supported by frontend.

runs: DataFrame | None = None
scan_runs(check_available=(), if_check_available='raise', store_fields=()) DataFrame

Update and return self.runs with runs currently available in all storage frontends.

Parameters:
  • check_available – Check whether these data types are available Availability of xxx is stored as a boolean in the xxx_available column.

  • if_check_available – ‘raise’ (default) or ‘skip’, whether to do the check

  • store_fields – Additional fields from run doc to include as rows in the dataframe. The context options scan_availability and store_run_fields list data types and run fields, respectively, that will always be scanned.

search_field(pattern: str, include_code_usage: bool = True, return_matches: bool = False)[source]

Find and print which plugin(s) provides a field that matches pattern (fnmatch).

Parameters:
  • pattern – pattern to match, e.g. ‘time’ or ‘tim*’

  • include_code_usage – Also include the code occurrences of the fields that match the pattern.

  • return_matches – If set, return a dictionary with the matching fields and the occurrences in code.

Returns:

when return_matches is set, return a dictionary with the matching fields and the occurrences in code. Otherwise, we are not returning anything and just print the results

search_field_usage(search_string: str, plugin: Plugin | List[Plugin] | None = None) List[str][source]

Find and return which plugin(s) use a given field.

Parameters:
  • search_string – a field that matches pattern exact

  • plugin – plugin where to look for a field

Returns:

list of code occurrences in the form of PLUGIN.FUNCTION

select_runs(run_mode=None, run_id=None, include_tags=None, exclude_tags=None, available=(), pattern_type='fnmatch', ignore_underscore=True, force_reload=False)

Return pandas.DataFrame with basic info from runs that match selection criteria.

Parameters:
  • run_mode – Pattern to match run modes (reader.ini.name)

  • run_id – Pattern to match a run_id or run_ids

  • available – str or tuple of strs of data types for which data must be available according to the runs DB.

  • include_tags – String or list of strings of patterns for required tags

  • exclude_tags – String / list of strings of patterns for forbidden tags. Exclusion criteria have higher priority than inclusion criteria.

  • pattern_type – Type of pattern matching to use. Defaults to ‘fnmatch’, which means you can use unix shell-style wildcards (?, *). The alternative is ‘re’, which means you can use full python regular expressions.

  • ignore_underscore – Ignore the underscore at the start of tags (indicating some degree of officialness or automation).

  • force_reload – Force reloading of runs from storage. Otherwise, runs are cached after the first time they are loaded in self.runs.

Examples:
  • run_selection(include_tags=’blinded’)

    select all datasets with a blinded or _blinded tag.

  • run_selection(include_tags=’*blinded’)

    … with blinded or _blinded, unblinded, blablinded, etc.

  • run_selection(include_tags=[‘blinded’, ‘unblinded’])

    … with blinded OR unblinded, but not blablinded.

  • `run_selection(include_tags=’blinded’,

    exclude_tags=[‘bad’, ‘messy’])`

    … select blinded dsatasets that aren’t bad or messy

set_config(config=None, mode='update')[source]

Set new configuration options.

Parameters:
  • config – dict of new options

  • mode – can be either - update: Add to or override current options in context - setdefault: Add to current options, but do not override - replace: Erase config, then set only these options

set_context_config(context_config=None, mode='update')[source]

Set new context configuration options.

Parameters:
  • context_config – dict of new context configuration options

  • mode – can be either - update: Add to or override current options in context - setdefault: Add to current options, but do not override - replace: Erase config, then set only these options

show_config(data_type=None, pattern='*', run_id='99999999999999999999')[source]

Return configuration options that affect data_type.

Parameters:
  • data_type – Data type name

  • pattern – Show only options that match (fnmatch) pattern

  • run_id – Run id to use for run-dependent config options. If omitted, will show defaults active for new runs.

size_mb(run_id, target)[source]

Return megabytes of memory required to hold data.

storage: List[StorageFrontend]
stored_dependencies(run_id: str, target: str | list | tuple, check_forbidden: bool = True, _targets_stored: dict | None = None) dict | None[source]

For a given run_id and target(s) get a dictionary of all the datatypes that are required to build the requested target.

Parameters:
  • run_id – run_id

  • target – target or a list of targets

  • check_forbidden – Check that we are not requesting to make a plugin that is forbidden by the context to be created.

Returns:

dictionary of data types (keys) required for building the requested target(s) and if they are stored (values)

Raises:

strax.DataNotAvailable – if there is at least one data type that is not stored and has no dependency or if it cannot be created

takes_config = immutabledict({'storage_converter': <strax.config.Option object>, 'fuzzy_for': <strax.config.Option object>, 'fuzzy_for_options': <strax.config.Option object>, 'allow_incomplete': <strax.config.Option object>, 'allow_rechunk': <strax.config.Option object>, 'allow_multiprocess': <strax.config.Option object>, 'allow_shm': <strax.config.Option object>, 'allow_lazy': <strax.config.Option object>, 'forbid_creation_of': <strax.config.Option object>, 'store_run_fields': <strax.config.Option object>, 'check_available': <strax.config.Option object>, 'max_messages': <strax.config.Option object>, 'timeout': <strax.config.Option object>, 'saver_timeout': <strax.config.Option object>, 'use_per_run_defaults': <strax.config.Option object>, 'free_options': <strax.config.Option object>, 'apply_data_function': <strax.config.Option object>, 'write_superruns': <strax.config.Option object>})
to_absolute_time_range(run_id, targets=None, time_range=None, seconds_range=None, time_within=None, full_range=None)[source]

Return (start, stop) time in ns since unix epoch corresponding to time range.

Parameters:
  • run_id – run id to get

  • time_range – (start, stop) time in ns since unix epoch. Will be returned without modification

  • targets – data types. Used only if run metadata is unavailable, so run start time has to be estimated from data.

  • seconds_range – (start, stop) seconds since start of run

  • time_within – row of strax data (e.g. eent)

  • full_range – If True returns full time_range of the run.

exception strax.context.OutsideException[source]

Bases: Exception

strax.corrections module

I/O format for corrections using MongoDB.

This module list, reads, writes, among others functionalities to a MongoDB where corrections are stored.

class strax.corrections.CorrectionsInterface(client=None, database_name='corrections', host=None, username=None, password=None)[source]

Bases: object

A class to manage corrections that are stored in a MongoDB, corrections are defined as pandas.DataFrame with a pandas.DatetimeIndex in UTC, a v1 and online version must be specified, online versions are meant for online processing, whereas v1, v2, v3…

are meant for offline processing. A Global configuration can be set, this means a unique set of correction maps.

static after_date_query(date, limit=1)[source]
static before_date_query(date, limit=1)[source]
static check_timezone(date)[source]

Smart logic to check date is given in UTC time zone.

Raises ValueError if not. :param date: date e.g. datetime(2020, 8, 12, 21, 4, 32, 7, tzinfo=pytz.utc) :return: the inserted date

get_context_config(when, global_config='global', global_version='v1')[source]

Global configuration logic.

Parameters:
  • when – date e.g. datetime(2020, 8, 12, 21, 4, 32, 7, tzinfo=pytz.utc)

  • global_config – a map of corrections

  • global_version – global configuration’s version

Returns:

configuration (type: dict)

interpolate(what, when, how='interpolate', **kwargs)[source]

Interpolate values of a given quantity (‘what’) of a given correction.

For information of interpolation methods see: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.interpolate.html :param what: what do you want to interpolate, what correction(DataFrame) :param when: date, e.g. datetime(2020, 8, 12, 21, 4, 32, 7, tzinfo=pytz.utc) :param how: Interpolation method, can be either ‘interpolate’ or ‘fill’ :param kwargs: are forward to the interpolation :return: DataFrame of the correction with the interpolated time (‘when’)

list_corrections()[source]

Smart logic to list all corrections available in the corrections database.

read(correction)[source]

Smart logic to read corrections.

Parameters:

correction – pandas.DataFrame object name in the DB (str type).

Returns:

DataFrame as read from the corrections database with time index or None if an empty DataFrame is read from the database

read_at(correction, when, limit=1)[source]

Smart logic to read corrections at given time (index), i.e by datetime index.

Parameters:
  • correction – pandas.DataFrame object name in the DB (str type).

  • when – when, datetime to read the corrections, e.g. datetime(2020, 8, 12, 21, 4, 32, 7, tzinfo=pytz.utc)

  • limit – how many indexes after and before when, i.e. limit=1 will return 1 index before and 1 after

Returns:

DataFrame as read from the corrections database with time index or None if an empty DataFrame is read from the database

static sort_by_index(df)[source]

Smart logic to sort dataframe by index using time column.

Retrun:

df sorted by index(time)

write(correction, df, required_columns=('ONLINE', 'v1'))[source]

Smart logic to write corrections to the corrections database.

Parameters:
  • correction – corrections name (str type)

  • df – pandas.DataFrame object a DatetimeIndex

  • required_columns – DataFrame must include two columns online, an ONLINE version and OFFLINE version (e.g. v1)

strax.dtypes module

Fundamental dtypes for use in strax.

Note that if you change the dtype titles (comments), numba will crash if there is an existing numba cache. Clear __pycache__ and restart. TODO: file numba issue.

strax.dtypes.copy_to_buffer(source: ndarray, buffer: ndarray, func_name: str, field_names: Tuple[str] | None = None)[source]

Copy the data from the source to the destination e.g. raw_records to records. To this end, we dynamically create the njitted function with the name ‘func_name’ (should start with “_”).

Parameters:
  • source – array of input

  • buffer – array of buffer to fill with values from input

  • func_name – how to store the dynamically created function. Should start with an _underscore

  • field_names – dtype names to copy (if none, use all in the source)

strax.dtypes.hitlet_dtype()[source]

Hitlet dtype same as peaklet or peak dtype but for hit-kind of objects.

strax.dtypes.hitlet_with_data_dtype(n_samples=2)[source]

Hitlet dtype with data field. Required within the plugins to compute hitlet properties.

Parameters:

n_samples – Buffer length of the data field. Make sure it can hold the longest hitlet.

strax.dtypes.peak_dtype(n_channels=100, n_sum_wv_samples=200, n_widths=11, digitize_top=True, hits_timing=True)[source]

Data type for peaks - ranges across all channels in a detector Remember to set channel to -1 (todo: make enum)

strax.dtypes.raw_record_dtype(samples_per_record=110)[source]

Data type for a waveform raw_record.

Length can be shorter than the number of samples in data, this indicates a record with zero- padding at the end.

strax.dtypes.record_dtype(samples_per_record=110)[source]

Data type for a waveform record.

Length can be shorter than the number of samples in data, this indicates a record with zero- padding at the end.

strax.io module

Read/write numpy arrays to/from compressed files or file-like objects.

strax.io.dry_load_files(dirname, chunk_number=None)[source]
strax.io.load_file(f, compressor, dtype)[source]

Read and return data from file.

Parameters:
  • f – file name or handle to read from

  • compressor – compressor to use for decompressing. If not passed, will try to load it from json metadata file.

  • dtype – numpy dtype of data to load

strax.io.save_file(f, data, compressor='zstd')[source]

Save data to file and return number of bytes written.

Parameters:
  • f – file name or handle to save to

  • data – data (numpy array) to save

  • compressor – compressor to use

strax.mailbox module

exception strax.mailbox.InvalidMessageNumber[source]

Bases: MailboxException

exception strax.mailbox.MailBoxAlreadyClosed[source]

Bases: MailboxException

class strax.mailbox.Mailbox(name='mailbox', timeout=None, lazy=False, max_messages=None)[source]

Bases: object

Publish/subscribe mailbox for builing complex pipelines out of simple iterators, using multithreading.

A sender can be any iterable. To read from the mailbox, either:
  1. Use .subscribe() to get an iterator. You can only use one of these per thread.

  2. Use .add_subscriber(f) to subscribe the function f. f should take an iterator as its first argument (and actually iterate over it, of course).

Each sender and receiver is wrapped in a thread, so they can be paused:
  • senders, if the mailbox is full;

  • readers, if they call next() but the next message is not yet available.

Any futures sent in are awaited before they are passed to receivers.

Exceptions in a sender cause MailboxKilled to be raised in each reader. If the reader doesn’t catch this, and it writes to another mailbox, this therefore kills that mailbox (raises MailboxKilled for each reader) as well. Thus MailboxKilled exceptions travel downstream in pipelines.

Sender threads are not killed by exceptions raise in readers. To kill sender threads too, use .kill(upstream=True). Even this does not propagate further upstream than the immediate sender threads.

DEFAULT_MAX_MESSAGES = 4
DEFAULT_TIMEOUT = 300
add_reader(subscriber, name=None, can_drive=True, **kwargs)[source]

Subscribe a function to the mailbox.

Parameters:
  • subscriber – Function which accepts a generator over messages as the first argument. Any kwargs will also be passed to the function.

  • name – Name of the thread in which the function will run. Defaults to read_<number>:<mailbox_name>

  • can_drive – Whether this reader can cause new messages to be generated when in lazy mode.

add_sender(source, name=None)[source]

Configure mailbox to read from an iterable source.

Parameters:
  • source – Iterable to read from

  • name – Name of the thread in which the function will run. Defaults to source:<mailbox_name>

cleanup()[source]
close()[source]
kill(upstream=True, reason=None)[source]
kill_from_exception(e, reraise=True)[source]

Kill the mailbox following a caught exception e.

send(msg, msg_number: int | None = None)[source]

Send a message.

If the message is a future, receivers will be passed its result. (possibly waiting for completion if needed)

If the mailbox is currently full, sleep until there is room for your message (or timeout occurs)

start()[source]
subscribe(can_drive=True)[source]

Return generator over messages in the mailbox.

exception strax.mailbox.MailboxException[source]

Bases: Exception

exception strax.mailbox.MailboxFullTimeout[source]

Bases: MailboxException

exception strax.mailbox.MailboxKilled[source]

Bases: MailboxException

exception strax.mailbox.MailboxReadTimeout[source]

Bases: MailboxException

strax.mailbox.divide_outputs(source, mailboxes: Dict[str, Mailbox], lazy=False, flow_freely=(), outputs=None)[source]

This code is a ‘mail sorter’ which gets dicts of arrays from source and sends the right array to the right mailbox.

strax.plugin module

Plugin system for strax.

A ‘plugin’ is something that outputs an array and gets arrays from one or more other plugins.

exception strax.plugin.InputTimeoutExceeded

Bases: Exception

class strax.plugin.Plugin

Bases: object

Plugin containing strax computation.

You should NOT instantiate plugins directly. Do NOT add unpickleable things (e.g. loggers) as attributes.

allow_hyperrun = False
can_rechunk(data_type)
child_plugin = False
chunk(*, start, end, data, data_type=None, run_id=None)
chunk_target_size_mb = 200
cleanup(wait_for)
compressor = 'blosc'
compute(**kwargs)
compute_takes_chunk_i = False
compute_takes_start_end = False
config: Dict
data_kind: str | immutabledict | dict
data_kind_for(data_type)
dependencies_by_kind()

Return dependencies grouped by data kind i.e. {kind1: [dep0, dep1], kind2: [dep, dep]}

Parameters:

require_time – If True, one dependency of each kind must provide time information. It will be put first in the list. If require_time is omitted, we will require time only if there is more than one data kind in the dependencies.

depends_on: str | tuple | list
deps: Dict
do_compute(chunk_i=None, **kwargs)

Wrapper for the user-defined compute method.

This is the ‘job’ that gets executed in different processes/threads during multiprocessing

dtype: tuple | dtype | immutabledict | dict
dtype_for(data_type)

Provide the dtype of one of the provide arguments of the plugin.

NB: does not simply provide the dtype of any datatype but must be one of the provide arguments known to the plugin.

empty_result()
fix_dtype()
infer_dtype()

Return dtype of computed data; used only if no dtype attribute defined.

input_buffer: Dict[str, Chunk]
input_timeout = 80
is_ready(chunk_i)

Return whether the chunk chunk_i is ready for reading.

Returns True by default; override if you make an online input plugin.

iter(iters, executor=None)

Iterate over dependencies and yield results.

Parameters:
  • iters – dict with iterators over dependencies

  • executor – Executor to punt computation tasks to. If None, will compute inside the plugin’s thread.

property log
max_messages = None
metadata(run_id, data_type)

Metadata to save along with produced data.

property multi_output
parallel: str | bool = False
provides: str | tuple | list
rechunk_on_save = True
run_i: int
run_id: str
save_when = 3
setup()

Hook if plugin wants to do something on initialization.

source_finished()

Return whether all chunks the plugin wants to read have been written.

Only called for online input plugins.

takes_config = immutabledict({})
version(run_id=None)

Return version number applicable to the run_id.

Most plugins just have a single version (in .__version__) but some may be at different versions for different runs (e.g. time-dependent corrections).

exception strax.plugin.PluginGaveWrongOutput

Bases: Exception

class strax.plugin.SaveWhen(value)

Bases: IntEnum

Plugin’s preference for having it’s data saved.

ALWAYS = 3
EXPLICIT = 1
NEVER = 0
TARGET = 2

strax.processor module

class strax.processor.ProcessorComponents(plugins: Dict[str, Plugin], loaders: Dict[str, Callable], loader_plugins: Dict[str, Plugin], savers: Dict[str, List[Saver]], targets: Tuple[str])[source]

Bases: NamedTuple

Specification to assemble a processor.

loader_plugins: Dict[str, Plugin]

Alias for field number 2

loaders: Dict[str, Callable]

Alias for field number 1

plugins: Dict[str, Plugin]

Alias for field number 0

savers: Dict[str, List[Saver]]

Alias for field number 3

targets: Tuple[str]

Alias for field number 4

class strax.processor.ThreadedMailboxProcessor(components: ProcessorComponents, allow_rechunk=True, allow_shm=False, allow_multiprocess=False, allow_lazy=True, max_workers=None, max_messages=4, timeout=60, is_superrun=False)[source]

Bases: object

iter()[source]
mailboxes: Dict[str, Mailbox]

strax.run_selection module

Context methods dealing with run scanning and selection.

strax.run_selection.flatten_run_metadata(md)[source]

strax.testutils module

strax.utils module

class strax.utils.NumpyJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

Special json encoder for numpy types Edited from mpl3d: mpld3/_display.py

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
strax.utils.apply_selection(x, selection=None, selection_str=None, keep_columns=None, drop_columns=None, time_range=None, time_selection='fully_contained')[source]

Return x after applying selections.

Parameters:
  • x – Numpy structured array

  • selection – Query string, sequence of strings, or simple function to apply.

  • selection_str – Same as selection (deprecated)

  • time_range – (start, stop) range to load, in ns since the epoch

  • keep_columns – Field names of the columns to keep.

  • drop_columns – Field names of the columns to drop.

  • time_selection – Kind of time selectoin to apply: - skip: Do not select a time range, even if other arguments say so - touching: select things that (partially) overlap with the range - fully_contained: (default) select things fully contained in the range

The right bound is, as always in strax, considered exclusive. Thus, data that ends (exclusively) exactly at the right edge of a fully_contained selection is returned.

strax.utils.camel_to_snake(x)[source]

Convert x from CamelCase to snake_case.

strax.utils.compare_dict(old: dict, new: dict)[source]

Compare two dictionaries and print the differences.

strax.utils.convert_tuple_to_list(init_func_input)[source]

Convert the tuples into list in an arbitrarily nested dictionary.

strax.utils.count_tags(ds)[source]

Return how often each tag occurs in the datasets DataFrame ds.

strax.utils.deterministic_hash(thing, length=10)[source]

Return a base32 lowercase string of length determined from hashing a container hierarchy.

strax.utils.dict_to_rec(x, dtype=None)[source]

Convert dictionary {field_name: array} to record array Optionally, provide dtype.

strax.utils.exporter(export_self=False)[source]

Export utility modified from https://stackoverflow.com/a/41895194 Returns export decorator, __all__ list

strax.utils.flatten_dict(d, separator=':', _parent_key='', keep=())[source]

Flatten nested dictionaries into a single dictionary, indicating levels by separator.

Don’t set _parent_key argument, this is used for recursive calls. Stolen from http://stackoverflow.com/questions/6027558 :param keep: key or list of keys whose values should not be flattened.

strax.utils.formatted_exception()[source]

Return human-readable multiline string with info about the exception that is currently being handled.

If no exception, or StopIteration, is being handled, returns an empty string.

For MailboxKilled exceptions, we return the original exception instead.

strax.utils.group_by_kind(dtypes, plugins=None, context=None) Dict[str, List][source]

Return dtypes grouped by data kind i.e. {kind1: [d, d, …], kind2: [d, d, …], …}

Parameters:
  • plugins – plugins providing the dtypes.

  • context – context to get plugins from if not given.

strax.utils.growing_result(dtype=<class 'numpy.int64'>, chunk_size=10000)[source]

Decorator factory for functions that fill numpy arrays.

Functions must obey following API:

  • accept _result_buffer keyword argument with default None; this will be the buffer array of specified dtype and length chunk_size (it’s an optional argument so this decorator preserves signature)

  • ‘yield N’ from function will cause first elements to be saved

  • function is responsible for tracking offset, calling yield on time, and clearing the buffer afterwards.

  • optionally, accept result_dtype argument with default None; this allows function user to specify return dtype

See test_utils.py for a simple example (I can’t get it to run as a doctest unfortunately)

strax.utils.hashablize(obj)[source]

Convert a container hierarchy into one that can be hashed.

See http://stackoverflow.com/questions/985294

strax.utils.inherit_docstring_from(cls)[source]

Decorator for inheriting doc strings, stolen from https://groups.google.com/forum/#!msg/comp.lang.python/HkB1uhDcvdk/lWzWtPy09yYJ

strax.utils.iter_chunk_meta(md)[source]

Iterate over chunk info from metadata md adding n_from and n_to fields.

strax.utils.merge_arrs(arrs, dtype=None)[source]

Merge structured arrays of equal length. On field name collisions, data from later arrays is kept.

If you pass one array, it is returned without copying. TODO: hmm… inconsistent

Much faster than the similar function in numpy.lib.recfunctions.

strax.utils.merged_dtype(dtypes)[source]
strax.utils.multi_run(exec_function, run_ids, *args, max_workers=None, throw_away_result=False, multi_run_progress_bar=True, ignore_errors=False, log=None, **kwargs)[source]

Execute exec_function(run_id, *args, **kwargs) over multiple runs, then return list of result arrays, each with a run_id column added.

Parameters:
  • exec_function – Function to run

  • run_ids – list/tuple of run_ids

  • max_workers – number of worker threads/processes to spawn. If set to None, defaults to 1.

  • throw_away_result – instead of collecting result, return None.

  • multi_run_progress_bar – show a tqdm progressbar for multiple runs.

  • ignore_errors – Return the data for the runs that successfully loaded, even if some runs failed executing.

  • log – logger to be used. Other (kw)args will be passed to the exec_function.

strax.utils.parse_selection(x, selection)[source]

Parse a selection string into a mask that can be used to filter data.

Parameters:

selection – Query string, sequence of strings, or simple function to apply.

Returns:

Boolean indicating the selected items.

strax.utils.print_record(x, skip_array=True)[source]

Print record(s) d in human-readable format.

Parameters:

skip_array – Omit printing array fields.

strax.utils.profile_threaded(filename)[source]
strax.utils.remove_titles_from_dtype(dtype)[source]

Return np.dtype with titles removed from fields.

strax.utils.to_numpy_dtype(field_spec)[source]
strax.utils.to_str_tuple(x) Tuple[Any, ...][source]
strax.utils.unpack_dtype(dtype)[source]

Return list of tuples needed to construct the dtype.

dtype == np.dtype(unpack_dtype(dtype))

Module contents

Release notes

1.6.3 / 2024-04-30

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.6.2…v1.6.3

1.6.2 / 2024-04-04

New Contributors

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.6.1…v1.6.2

1.6.1 / 2024-02-17

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.6.0…v1.6.1

1.6.0 / 2024-01-15

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.5…v1.6.0

1.5.5 / 2023-12-21

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.4…v1.5.5

1.5.4 / 2023-09-19

New Contributors

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.3…v1.5.4

1.5.3 / 2023-08-29

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.2…v1.5.3

1.5.2 / 2023-07-06

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.1…v1.5.2

1.5.1 / 2023-06-22

New Contributors

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.5.0…v1.5.1

1.5.0 / 2023-05-02

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.4.3…v1.5.0

1.4.3 / 2023-04-22

New Contributors

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.4.2…v1.4.3

1.4.2 / 2023-03-08

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.4.1…v1.4.2

1.4.1 / 2023-02-13

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.4.0…v1.4.1

1.4.0 / 2022-10-17

Notes

Full Changelog: https://github.com/AxFoundation/strax/compare/v1.3.0…v1.4.0

1.3.0 / 2022-09-09

1.2.3 / 2022-06-07

1.2.2 / 2022-05-11

  • Add option to ignore errors in multirun loading (#653)

  • Auto version, fix #217 (#689)

  • Add basics documentation - split Config and Plugin docs (#691)

  • Add n_hits comment in code (#692)

  • Rechunker script (#686)

1.2.1 / 2022-04-12

  • run dependabot remotely (#683)

  • Docs fixing (#684)

  • Allow different chunk size (#687)

1.2.0 / 2022-03-09

  • Added lone hit area to area per channel (#649)

1.1.8 / 2022-03-08

  • Fix saving behavior of multioutput plugins with different SaveWhens (#674)

  • Change tempdirs in test (#657)

  • Define extra kwargs based on cut_by (db14f809414fe91c4e16d04bd7f166970891e591)

  • Update run_selection.py (#658)

  • Ignore raises on testing (#669)

  • Documentation tweaks (#670)

  • Test for inline plugin (#673)

1.1.7 / 2022-02-16

  • Fix savewhen issues (#648)

  • Fix testing from straxen (#650)

  • Small fix superruns define run (#651)

1.1.6 / 2022-02-03

  • Extend search field to also print occurrences (#638)

  • Extend save when (#631)

  • test straxen for coverage and backward compatibility (#635)

  • Failing example for test_conditional_entropy (#544)

  • python 3.10 (#630)

  • deprecate py3.6 py3.7 (#636)

  • remove deprecated function (#632)

  • Numba 0.55 (#634)

1.1.5 / 2022-01-10

  • StorageFrontend remoteness attribute and test (#617)

  • fix line endings (#618)

  • Bump numpy (#627)

  • Don’t cache hitlet_properties (#616)

1.1.4 / 2021-12-16

  • Make truly HDR (#613)

  • Remove tight coincidence channel from data_type (#614)

1.1.3 / 2021-12-13

  • Add mode and tags to superrun. (#593)

  • cache deps (#595)

  • Fix online monitor bug for only md stored (#596)

  • speedup get_source with lookupdict (#599)

  • remove config warning and infer_dtype=False (#600)

  • Require pymongo 3.* (#611)

1.1.2 / 2021-11-19

  • Descriptor configs (#550)

  • Add funcs for getting stored source (#590)

  • use logged warnings (#582)

  • Fail for - run_ids (#567)

  • Infer type from default value in Option (#569, #586, #587)

  • Fix buffer issue in highest density regions, adds tests (#591)

  • Fix memory usage multi runs (#581)

  • Update CONTRIBUTING.md (#585)

Notes:

  • PRs #569, #586, #587 may cause a lot of warnings for options

1.1.1 / 2021-10-27

  • Fix memory leak (#561)

  • Fix superrun creation (#562)

  • Add deregister plugins (#560)

  • Script for testing python setup.py install (#557)

1.1.0 / 2021-10-15

major / minor:

  • Fix hitlet splitting (#549)

  • Add tight channel (#551)

patch:

  • Add read by index plus some extra checks (#529)

  • Add drop column option (#530)

  • Remove context.apply_selection (#531)

  • Add option to support superruns for storage frontends. Adds test (#532)

  • Fix issue #536 (#537)

  • Two pbar patches (#538)

  • Add get_zarr method to context (#540)

  • Broken metadata error propagation (#541)

  • few tests for MongoStorage frontend (#542)

  • Fix caching (#545)

  • dds information about failing chunk (#548)

  • remove rucio (#552)

  • Allow loading SaveWhen.EXPLICIT time range selection (#553)

  • Changes to savewhen behavior (#554)

1.0.0 / 2021-09-1

major / minor:

  • Fixing peaklet baseline bias (#486)

  • Fix double dependency (#512)

patch:

  • Parallel plugin timeout (#489)

  • Added pytest.ini (#492)

  • Fix nveto processing (#491)

  • disable testing py3.6 (#505)

  • Fix peaks merging (#506)

  • Added export (#508)

  • Simplify get_components (#510)

  • Allow creation and storing of superruns if SaveWhen > 0 (#509)

  • re-use instance of plugin for multi output (#516)

  • Add raise if instance are not equal (#517)

0.16.1 / 2021-07-16

  • Cached lineage if per run default is not allowed (#483, #485)

  • Fix define runs and allow storing of superruns (#472, #488)

  • Change default pbar behavior (for multiple runs) (#480)

  • Reduce numpy warnings (#481, #484)

  • Reduce codefactor (#487)

0.16.0 / 2021-06-23

  • Add select index to compute width (#465)

  • int blaseline (#464)

  • Fix #452 assert there is a mailbox for the final generator (#463)

  • Document fuzzy-for and documentation itself (#471)

  • Re ordered time field in cut plugins (#473)

  • Github actions for CI (#454, #460)

  • Remove travis for testing (#474)

  • Remove outdated files/configs (#462)

  • Remove overwrite from options (#467)

0.15.3 / 2021-06-03

  • Match cached buffer chunk start times OverlapWindowPlugin (#450)

  • Prevent late creation of unattended mailboxes (#452)

  • Temporary patch div/zero in hitlets (#447)

  • Relax zstd requirements again (#443)

  • Don’t ignore if pattern also startswith underscore (#445)

  • kB/s pbar (#449)

0.15.2 / 2021-05-20

  • Speed up run selection by ~100x for fixed defaults (#440)

  • Use zstd for from base-env for testing (#441)

  • Add MB/s pbar (#442)

0.15.1 / 2021-05-04

  • Refactor hitlets (#430, #436)

  • Update classifiers for pipy #437

  • Allow Py39 in travis tests (#427)

0.15.0 / 2021-04-16

  • Use int32 for peak dt, fix #397 (#403, #426)

  • max peak duration (#420)

  • Loopplugin touching windows + plugin documentation (#424)

  • Move apply selection from context to utils (#425)

  • Context testing functions + copy_to_frontend documented (#423)

  • Apply function to data & test (#422)

0.14.0 / 2021-04-09

  • Check data availability for single run (#416)

0.13.11 / 2021-04-02

  • Allow re-compression at copy to frontend (#407)

  • Bug fix, in processing hitlets (#411)

  • Cleanup requirements for boto3 (#414)

0.13.10 / 2021-03-24

  • Allow multiple targets to be computed simultaneously (#408, #409)

  • Numbafy split by containment (#402)

  • Infer start/stop from any dtype (#405)

  • Add property provided_dtypes to Context (#404)

  • Updated OverlapWindowPlugin docs (#401)

0.13.9 / 2021-02-22

  • Clip progress progressbar (#399)

0.13.8 / 2021-02-09

  • Specify saver timeout at context level (#394)

  • Allow dynamic function creation for dtype copy (#395)

  • Close inlined savers on exceptions in multiprocessing (#390)

  • Allow option to be overwritten to allow subclassing (#392)

  • Fix availability checks (#389)

  • Don’t print for temp plugins (#391)

0.13.7 / 2021-01-29

  • Warn for non saved plugins in run selection (#387)

  • Cleanup progressbar (#386)

0.13.4 / 2021-01-22

  • Nveto changes + highest density regions (#384)

  • Parse requirements for testing (#383)

  • Added keep_columns into docstring (#382)

  • remove slow operators from mongo storage (#382)

0.13.3 / 2021-01-15

  • Better online monitor queries (#375)

  • Multiprocess fix (#376)

  • Bugfix (#377)

0.13.2 / 2021-01-04

  • Speed up st.select_runs by ~100x (#371)

  • Finding latest OnlineMonitor data (#374)

0.13.1 / 2020-12-21

  • Fix bug in baselining (#367)

0.12.7 / 2020-12-21

  • Fix for select_runs with Nones(#370)

  • Numpy requirement fix (#369)

  • Documentation maintenance (cad6bce8, 9c023b0d)

0.12.6 / 2020-12-09

  • Muveto + hitlet fixes (#355)

  • Add more tests to strax (#359)

  • More clear print statement (#362)

  • Fix reproducibility of peak split test (#363)

  • Fix numpy deps (#364)

0.12.5 / 2020-12-06

  • Finally fix time selection bug (#345)

  • Allow multioutput for loop plugin (#357)

  • Allow copy from frontend to frontend (#351)

  • Add more tests to strax (#359)

  • Small bugfix progressbar (#353)

  • Smooth database initiation CMT (#356)

  • Remove s3 storage (#358)

0.12.4 / 2020-11-13

  • Limit mongo backend memory usage (#349)

  • Small CMT simplification (#347)

0.12.3 / 2020-11-05

  • Updates to mongo.py (#335, #336 )

  • Several bug-fixes (#340, #343, #344, #338)

  • Contributions to documentation (#342, #344)

  • Accommodate scada (#318)

0.12.2 / 2020-10-15

  • OnlineMonitor in mongo.py (#315, #323)

  • Several bugfixes (#320, #321, #327, #329, #334)

  • Option to give range to sum_waveform (#322)

0.12.1 / 2020-09-10

  • Added the “Corrections Management Tool” (#303)

  • Check of loop dependency for multioutput plugins (#312)

  • Fix memory leak peaksplitting (#309)

0.12.0 / 2020-08-17

  • Add backend for rucio (#300)

  • Hitlets data types and utilities (#275)

  • Apply function to data prior to delivering (#304)

  • Child options for inherited plugins (#297)

  • Introducing a template for pull-requests (#302)

  • Make fuzzy_for option more intuitive (#306)

0.11.3 / 2020-07-29

  • bugfix in new progressbar feature (#296)

0.11.2 / 2020-07-21

  • new plugin type: CutPlugin (#274)

  • add progressbar (#276)

  • allow for plugin-specific chunk-sizes (#277)

  • broaden scope of endtime check in chunk.py (#281)

  • change dtype of saturated channels (#286)

  • several (bug-)fixes (#289, #288, #284, #278, #273)

0.11.1 / 2020-04-28

  • Per-datatype rechunking control (#272)

  • Check defaults are consistent across plugins (#271)

  • Documentation / comment updates (#269, #268)

  • Peak splitter test (#267)

  • Fix bug in pulse_processing when not flipping waveforms (#266)

  • Fix lineage hash caching (#263)

  • Flexible run start time estimation (905335)

0.11.0 / 2020-04-28

  • accumulate method for basic out-of-core processing (#253)

  • Lone hit integration routines (#261)

  • Record amplitude bit shift, fix saturation counting (#260)

  • Make index_of_fraction more reusable (#257)

  • DataDirectory does not deep_scan or provide_run_metadata by default

  • Numba 0.49 compatibility

0.10.3 / 2020-04-13

  • Disable timeout / make it 24 hours (#255)

  • Minor fixes for warning messages

0.10.2 / 2020-04-06

  • Fix loading of incomplete data (#251)

  • Fx exception handling (#252)

  • Fix hitfinder buffer overrun if too few thresholds specified (bc2c2b)

  • keep_columns selection option (4e2550)

  • Assume all run metadata is in UTC (4e223e)

  • Can now specify * in forbid_creation_of (86552f)

  • Simplify length computations (#250)

0.10.1 / 2020-03-18

  • Even lazier processing (#248)

  • Fix multiprocessing bug for multi-output plugins (0f1b1d, 1e388a)

  • Make baselining work in non-jitted mode (8f1f23)

  • Better exception handling in estimate_run_start (9e2f88, #249)

  • Disable run defaults by default (c1f094)

0.10.0 / 2020-03-15

  • Lazy mailbox for processing (#241)

  • Baselining checks for nonzero first fragment (#243)

  • Add size_mb context method

  • Force time range to be integer

  • Update messages and exceptions (#244, #245)

0.9.1 / 2020-03-08

  • Fix bug in input synchronization

0.9.0 / 2020-03-05

  • Use chunks with defined start/end rather than plain arrays (#235)

  • Mandate time fields in all datatypes (#235)

  • Remove unnecessary fields from raw-records (#235, #237)

  • Allow compute to take start and end fields (#239)

  • Channel-dependent hitfinder threshold (#234)

  • Wait on Executors during shutdown (#236)

  • Protect hitfinder against buffer overruns

0.8.8 / 2020-02-11

  • Fixes for time range selection (#231)

  • Mailbox timeout and max_messages accessible as context options

  • Fix output inference for ParallelSourcePlugin (straxen #46)

  • Sane default for max_workers in multi-run loading

  • run_id field is now a string when loading multiple runs

0.8.7 / 2020-02-07

  • Small bugfixes:

    • Fixes for multi-output plugins

    • Use frozendict for Plugin.takes_config

0.8.6 / 2020-01-17

  • Peak merging code (from straxen)

  • time_range selection for user-defined plugins that never save

  • Add hit height, store fpart-baseline corrected hit area

  • Preserve cached input on empty chunks in OverlapWindowPlugin

0.8.5 / 2020-01-16

  • Natural breaks splitting (#225)

  • Add ptype, max_gap and max_goodness_of_split to peaks dtype

  • Fixes for multiprocessing

  • Fixes for time selection

  • Fix for final data in same-length merging

0.8.4 / 2019-12-24

  • Export downsampling function (#224)

  • Fix show_config

  • Fix pulse_processing for empty chunks

0.8.3 / 2019-12-23

  • Fix for skipping data near end of run during multi-kind merge

  • Add tight coincidence field to peaks dtype

  • Pulse filtering optimization

  • max_messages configurable per plugin, defaults to 4

0.8.2 / 2019-12-19

  • Specify defaults via run doc (#223)

  • Fix hypothesis test deadline issue during build (5bf2ad7)

  • Fix: use selection_str also when selecting time range (87faeab)

0.8.1 / 2019-11-13

  • Protect OverlapWindowPlugin against empty chunks (#212)

  • Make test helpers accessible, test with numba on (#219)

0.8.0 / 2019-09-16

  • Superruns (#207)

  • Pulse processing fixes (#207)

  • LZ4 compression (#207)

  • Fixes for edge cases (#201)

0.7.5 / 2019-07-06

  • Time selection refactor and context extensibility (#195)

0.7.4 / 2019-06-26

  • Fix availability checks (#194)

  • Allow selection of runs by name (#192)

  • Fix some context methods for multi-output plugins

0.7.3 / 2019-06-17

  • Multiple outputs per plugin (#190)

  • Minor fixes and additions (#188, #182, #175, #185)

0.7.2 / 2019-06-06

  • Area per channel in PE (#187)

  • Update pinned dependencies, notably numba to 0.44.0 (#186)

  • Fixes to empty chunk handling and chunk_arrays

0.7.1 / 2019-05-11

  • Sum waveform now operates on all channels (#158)

  • MongoDB output (#159)

  • Better exception handling in saver (#160)

  • Force plugins to produce correct dtype (#161)

0.7.0 / 2019-05-04

  • Pulse processing upgrades (filtering etc) (#154)

  • Run selection and run-level metadata handling (#155)

  • Stabilize and shorten lineage hash (#152)

  • Shared memory transfers, parallel save/load (#150)

  • Ensure unique filenames (#143)

  • Many processing fixes (#134, #129)

0.6.1 / 2019-01-20

  • Many bugfixes from DAQ test (#118)

  • Fix dtype merging bug, add saturation info (#120)

  • Fixes to sum waveform (cd0cd2f)

0.6.0 / 2018-10-09

  • strax / straxen split (#107)

  • Support incomplete data loading (#99)

  • Fix for loading data made by ParallelSourcePlugin (#104)

  • Runs DB frontend (#100) (moved to straxen)

  • Fix MANIFEST.in

0.5.0 / 2018-09-02

  • Directory name delimiter changed from _ to - (#76)

  • Time-based random access (#80)

  • Throw original exceptions on crashes (#87)

  • Check for corrupted data (#88)

  • FIX: edge cases in processing (#94)

  • FIX: prevent saving during time range or fuzzy selection (#89)

  • FIX: Workaround for memory leak in single-core mode (#91)

  • XENON: Example cuts (#84)

  • XENON: proper S1-S2 pairing (#82)

  • XENON: Fix pax conversion (#95)

  • DOCS: Datastructure docs (#85)

0.4.0 / 2018-08-27

  • S3-protocol I/O (#68, #71, #74)

  • Fuzzy matching, context options (#66)

  • Fix bug with PyPI lacking MANIFEST (e9771db79bd0c6a148afe1fa8c2ed3d13495da88)

  • Zenodo badge (#58)

0.3.0 / 2018-08-13

  • Storage frontend/backend split, several fixes (#46)

  • XENON: pax conversion fix (#47)

  • Globally configurable mailbox settings (#55, #57)

0.2.0 / 2018-07-03

  • Start documentation

  • ParallelSourcePlugin to better distribute low-level processing over multiple cores

  • OverlapWindowPlugin to simplify algorithms that look back and ahead in the data

  • Run-dependent config defaults

  • XENON: Position reconstruction (tensorflow NN) and corrections

0.1.2 / 2018-05-09

  • Failed to make last patch release.

0.1.1 / 2018-05-09

  • `#19 <https://github.com/AxFoundation/strax/pull/19>`_: list subpackages in setup.py, so numba can find cached code

  • Autodeploy from Travis to PyPI

  • README badges

0.1.0 / 2018-05-06

  • Initial release