Source code for strax.chunk

import typing as ty
from warnings import warn

import numpy as np
import numba

import strax

export, __all__ = strax.exporter()
__all__.extend(["DEFAULT_CHUNK_SIZE_MB", "DEFAULT_CHUNK_SPLIT_NS"])


DEFAULT_CHUNK_SIZE_MB = 200
DEFAULT_CHUNK_SPLIT_NS = 1000


[docs] @export class Chunk: """Single chunk of strax data of one data type.""" data_type: str data_kind: str dtype: np.dtype # run_id is not superfluous to track: # this could change during the run in superruns (in the future) run_id: str start: int end: int data: np.ndarray target_size_mb: int def __init__( self, *, data_type, data_kind, dtype, run_id, start, end, data, subruns=None, superrun=None, target_size_mb=DEFAULT_CHUNK_SIZE_MB, ): self.data_type = data_type self.data_kind = data_kind self.dtype = np.dtype(dtype) self.run_id = run_id self.start = start self.end = end self.subruns = subruns if data is None: data = np.empty(0, dtype) self.data = data self.target_size_mb = target_size_mb if not ( isinstance(self.start, (int, np.integer)) and isinstance(self.end, (int, np.integer)) ): raise ValueError(f"Attempt to create chunk {self} with non-integer start times") # Convert to bona fide python integers self.start = int(self.start) self.end = int(self.end) if not isinstance(self.data, np.ndarray): raise ValueError(f"Attempt to create chunk {self} with data that isn't a numpy array") expected_dtype = strax.remove_titles_from_dtype(dtype) got_dtype = strax.remove_titles_from_dtype(dtype) if expected_dtype != got_dtype: raise ValueError( f"Attempt to create chunk {self} with data of {dtype}, should be {expected_dtype}" ) if self.start < 0: raise ValueError(f"Attempt to create chunk {self} with negative start time") if self.start > self.end: raise ValueError(f"Attempt to create chunk {self} with negative length") if len(self.data): data_starts_at = self.data[0]["time"] # Check the last 500 samples (arbitrary number) as sanity check data_ends_at = strax.endtime(self.data[-500:]).max() if data_starts_at < self.start: raise ValueError( f"Attempt to create chunk {self} whose data starts early at {data_starts_at}" ) if data_ends_at > self.end: raise ValueError( f"Attempt to create chunk {self} whose data ends late at {data_ends_at}" ) self.superrun = superrun def __len__(self): return len(self.data) @staticmethod def _t_fmt(t): return f"{t // int(1e9)}sec {t % int(1e9)} ns" def __repr__(self): return ( f"({self.run_id}.{self.data_type}: " f"{self._t_fmt(self.start)} - {self._t_fmt(self.end)}, " f"{len(self)} items, " + "{0:.1f} MB/s)".format(self._mbs()) ) @property def nbytes(self): return self.data.nbytes @property def duration(self): return self.end - self.start @property def is_superrun(self): return bool(self.subruns) and self.run_id.startswith("_") @property def subruns(self): return self._subruns @subruns.setter def subruns(self, subruns): if isinstance(subruns, dict) and None in subruns: raise ValueError( f"Attempt to create chunk {self} with None as run_id in subrun {subruns}" ) if subruns is None: self._subruns = None else: self._subruns = dict(sorted(subruns.items(), key=lambda x: x[1]["start"])) _sorted_subruns_check(self._subruns) @property def first_subrun(self): _subrun = None if self.is_superrun: _subrun = self._get_subrun(0) return _subrun @property def last_subrun(self): _subrun = None if self.is_superrun: _subrun = self._get_subrun(-1) return _subrun @property def promised_continuity(self): if not self.is_superrun: return True # TODO: be more clever on this? # Subruns start and end does not match with chunk start and end. # This might mean that you are using ExhaustPlugin. return self.first_subrun["start"] == self.start and self.last_subrun["end"] == self.end def _get_subrun(self, index): """Returns subrun according to position in chunk.""" run_id = list(self.subruns.keys())[index] _subrun = { "run_id": run_id, "start": self.subruns[run_id]["start"], "end": self.subruns[run_id]["end"], } return _subrun @property def superrun(self): return self._superrun @superrun.setter def superrun(self, superrun): """Superrun can only be None or dict with non-None keys.""" if not isinstance(superrun, dict) and superrun is not None: raise ValueError( "When creating chunk, superrun can only be dict or None. " f"But got {superrun} for {self}." ) if superrun is None: superrun = {self.run_id: {"start": self.start, "end": self.end}} if len(superrun) == 0: raise ValueError(f"Attempt to create chunk {self} with empty superrun") if None in superrun: raise ValueError( f"Attempt to create chunk {self} with None as run_id in superrun {superrun}" ) # The only chance self.run_id to be None is that self is concatenated from different runs if len(superrun) == 1 and self.run_id is None: raise ValueError( f"If superrun {superrun} of {self} has only one run_id, run_id should be provided." ) self._superrun = dict(sorted(superrun.items(), key=lambda x: x[1]["start"])) _sorted_subruns_check(self._superrun) def _mbs(self): if self.duration: return (self.nbytes / 1e6) / (self.duration / 1e9) else: # This is strange. We have a zero duration chunk. However, this is # not the right place to raise an error message. Return -1 for now. return -1
[docs] def split(self, t: ty.Union[int, None], allow_early_split=False): """Return (chunk_left, chunk_right) split at time t. :param t: Time at which to split the data. All data in the left chunk will have their (exclusive) end <= t, all data in the right chunk will have (inclusive) start >=t. :param allow_early_split: If False, raise CannotSplit if the requirements above cannot be met. If True, split at the closest possible time before t. """ t = max(min(t, self.end), self.start) # type: ignore if t == self.end: data1, data2 = self.data, self.data[:0].copy() elif t == self.start: data1, data2 = self.data[:0].copy(), self.data else: data1, data2, t = split_array(data=self.data, t=t, allow_early_split=allow_early_split) common_kwargs = dict( dtype=self.dtype, data_type=self.data_type, data_kind=self.data_kind, target_size_mb=self.target_size_mb, ) if self.promised_continuity: subruns_first_chunk, subruns_second_chunk = _split_runs_in_chunk(self.subruns, t) else: # The split will not update the subruns or superrun. subruns_first_chunk = subruns_second_chunk = self.subruns superrun_first_chunk, superrun_second_chunk = _split_runs_in_chunk(self.superrun, t) # If the superrun is split and the fragment cover only one run, # you need to recover the run_id if superrun_first_chunk is None or len(superrun_first_chunk) == 1: run_id_first_chunk = list(self.superrun.keys())[0] else: run_id_first_chunk = self.run_id if superrun_second_chunk is None or len(superrun_second_chunk) == 1: run_id_second_chunk = list(self.superrun.keys())[-1] else: run_id_second_chunk = self.run_id c1 = strax.Chunk( start=self.start, end=max(self.start, t), # type: ignore data=data1, subruns=subruns_first_chunk, superrun=superrun_first_chunk, **{**common_kwargs, "run_id": run_id_first_chunk}, ) c2 = strax.Chunk( start=max(self.start, t), # type: ignore end=max(t, self.end), # type: ignore data=data2, subruns=subruns_second_chunk, superrun=superrun_second_chunk, **{**common_kwargs, "run_id": run_id_second_chunk}, ) return c1, c2
[docs] @classmethod def merge(cls, chunks, data_type="<UNKNOWN>"): """Create chunk by merging columns of chunks of same data kind. :param chunks: Chunks to merge. None is allowed and will be ignored. :param data_type: data_type name of new created chunk. Set to <UNKNOWN> if not provided. """ chunks = [c for c in chunks if c is not None] if not chunks: raise ValueError("Need at least one chunk to merge") if len(chunks) == 1: return chunks[0] data_kinds = [c.data_kind for c in chunks] if len(set(data_kinds)) != 1: raise ValueError(f"Cannot merge chunks {chunks} of different data kinds: {data_kinds}") data_kind = data_kinds[0] run_ids = [c.run_id for c in chunks] if len(set(run_ids)) != 1: raise ValueError(f"Cannot merge chunks of different run_ids: {chunks}") run_id = run_ids[0] if len(set([len(c) for c in chunks])) != 1: raise ValueError(f"Cannot merge chunks with different number of items: {chunks}") tranges = [(c.start, c.end) for c in chunks] if len(set(tranges)) != 1: raise ValueError(f"Cannot merge chunks with different time ranges: {tranges}") start, end = tranges[0] data = strax.merge_arrs( # The chunks order matters. A specific field in the merged data # takes the LAST chunk's value, even though the dtype is sorted below by data_type. # The order of chunks is defined in depends_on. [c.data for c in chunks], # Make sure dtype field order is consistent, regardless of the # order in which chunks are passed to merge: dtype=strax.merged_dtype([c.dtype for c in sorted(chunks, key=lambda x: x.data_type)]), ) return cls( start=start, end=end, dtype=data.dtype, data_type=data_type, data_kind=data_kind, run_id=run_id, data=data, subruns=_merge_subruns_in_chunk(chunks, merge=True), superrun=_merge_superrun_in_chunk(chunks, merge=True), target_size_mb=max([c.target_size_mb for c in chunks]), )
[docs] @classmethod def concatenate(cls, chunks, allow_superrun=False): """Create chunk by concatenating chunks of same data type You can pass None's, they will be ignored.""" chunks = [c for c in chunks if c is not None] if not chunks: raise ValueError("Need at least one chunk to concatenate") if len(chunks) == 1: return chunks[0] data_types = [c.data_type for c in chunks] if len(set(data_types)) != 1: raise ValueError(f"Cannot concatenate chunks of different data types: {data_types}") data_type = data_types[0] run_ids = [c.run_id for c in chunks] if len(set(run_ids)) != 1 and not allow_superrun: raise ValueError( f"Cannot concatenate {data_type} chunks with different run ids: {run_ids}" ) if len(set(run_ids)) == 1: run_id = run_ids[0] superrun = None else: run_id = None superrun = _merge_superrun_in_chunk(chunks) try: subruns = _merge_subruns_in_chunk(chunks, merge=False) except ValueError: warn("The subruns are not continuous, try merge mode.") subruns = _merge_subruns_in_chunk(chunks, merge=True) prev_end = 0 for c in chunks: if c.start < prev_end: raise ValueError( f"Attempt to concatenate overlapping or out-of-order chunks: {chunks} " ) prev_end = c.end return cls( start=chunks[0].start, end=chunks[-1].end, dtype=chunks[0].dtype, data_type=data_type, data_kind=chunks[0].data_kind, run_id=run_id, subruns=subruns, superrun=superrun, data=np.concatenate([c.data for c in chunks]), target_size_mb=max([c.target_size_mb for c in chunks]), )
[docs] @export def continuity_check(chunk_iter): """Check continuity of chunks yielded by chunk_iter as they are yielded.""" last_end = None last_runid = None last_subrun = {"run_id": None} for chunk in chunk_iter: if chunk.run_id != last_runid: last_end = None last_subrun = {"run_id": None} if chunk.is_superrun: if chunk.first_subrun["run_id"] != last_subrun["run_id"]: last_end = None else: last_end = last_subrun["end"] if last_end is not None and chunk.promised_continuity: if chunk.start != last_end: raise ValueError( f"Data is not continuous. Chunk {chunk} should have started at {last_end}" ) yield chunk last_end = chunk.end last_runid = chunk.run_id last_subrun = chunk.last_subrun
[docs] @export class CannotSplit(Exception): pass
[docs] @export @numba.njit(cache=False, nogil=True) def split_array(data, t, allow_early_split=False): """Return (data left of t, data right of t, t), or raise CannotSplit if that would split a data element in two. :param data: strax numpy data :param t: Time to split data :param allow_early_split: Instead of raising CannotSplit, split at t_split as close as possible before t where a split can happen. The new split time replaces t in the return value. """ # Slitting an empty array is easy if not len(data): return data[:0], data[:0], t # Splitting off a bit of nothing from the start is easy # since the data is sorted by time. if data[0]["time"] >= t: return data[:0], data, t # Find: # i_first_beyond: the first element starting after t # splittable_i: nearest index left of t where we can safely split BEFORE latest_end_seen = -1 splittable_i = 0 i_first_beyond = -1 for i, d in enumerate(data): # only non-overlapping data can be split if d["time"] >= latest_end_seen: splittable_i = i # can not split beyond t if d["time"] >= t: i_first_beyond = i break latest_end_seen = max(latest_end_seen, strax.endtime(d)) if latest_end_seen > t: # Cannot split anywhere after this break else: if latest_end_seen <= t: return data, data[:0], t if splittable_i != i_first_beyond or latest_end_seen > t: if not allow_early_split: # Raise custom exception, make better one outside numba raise CannotSplit() t = min(data[splittable_i]["time"], t) return data[:splittable_i], data[splittable_i:], t
def _sorted_subruns_check(subruns): """Check if subruns are not overlapping.""" if subruns is None: return runs_start_end = list(subruns.values()) for i in range(len(runs_start_end) - 1): if runs_start_end[i]["end"] > runs_start_end[i + 1]["start"]: raise ValueError(f"Subruns are overlapping: {subruns}.") def _merge_runs_in_chunk(subruns, merged_runs): """Merge subruns information during concatenation or merge.""" if subruns is None: return for run_id, run_start_end in subruns.items(): merged_runs.setdefault(run_id, []) merged_runs[run_id].append([run_start_end["start"], run_start_end["end"]]) def _mergable_check(merged_runs, merge=False): """Check continuity of runs in a superrun chunk.""" for run_id in merged_runs.keys(): merged_runs[run_id].sort(key=lambda x: x[0]) if not merge: for i in range(1, len(merged_runs[run_id])): mask = merged_runs[run_id][i][0] != merged_runs[run_id][i - 1][1] if mask: raise ValueError( "Chunks are not continuous. " f"Run {run_id} was split into chunks {merged_runs[run_id]}." ) else: for i in range(1, len(merged_runs[run_id])): mask = merged_runs[run_id][i][0] != merged_runs[run_id][0][0] mask |= merged_runs[run_id][i][1] != merged_runs[run_id][0][1] if mask: raise ValueError( "If merging, all chunks should have the same start/end time. " f"But run {run_id} was split into chunks {merged_runs[run_id]}." ) merged_runs[run_id] = { "start": merged_runs[run_id][0][0], "end": merged_runs[run_id][-1][1], } def _merge_subruns_in_chunk(chunks, merge=False): """Merge list of subruns in a superrun chunk during concatenation. Updates also their start/ends too. """ subruns = dict() for c_i, c in enumerate(chunks): _merge_runs_in_chunk(c.subruns, subruns) _mergable_check(subruns, merge) if subruns: return subruns else: return None def _merge_superrun_in_chunk(chunks, merge=False): """Updates superrun in a superrun chunk during concatenation.""" superrun = dict() for c_i, c in enumerate(chunks): _merge_runs_in_chunk(c.superrun, superrun) _mergable_check(superrun, merge) return superrun def _pop_out_empty_run_id(subruns): """Remove empty run_id from chunks in superrun.""" keys_to_remove = [] for key in subruns.keys(): if subruns[key]["start"] == subruns[key]["end"]: keys_to_remove.append(key) for key in keys_to_remove: subruns.pop(key) def _split_runs_in_chunk(subruns, t): """Split list of runs in a superrun chunk during split. Updates also their start/ends too. """ if subruns is None: return None, None runs_first_chunk = {} runs_second_chunk = {} for run_id, run_start_end in subruns.items(): if t <= run_start_end["start"]: runs_second_chunk[run_id] = run_start_end elif run_start_end["start"] < t < run_start_end["end"]: runs_first_chunk[run_id] = {"start": run_start_end["start"], "end": int(t)} runs_second_chunk[run_id] = {"start": int(t), "end": run_start_end["end"]} elif run_start_end["end"] <= t: runs_first_chunk[run_id] = run_start_end # Pop out empty run_id _pop_out_empty_run_id(runs_first_chunk) _pop_out_empty_run_id(runs_second_chunk) # Make sure that either dictionary with content or None is assigned to Chunk if runs_first_chunk == {}: runs_first_chunk = None if runs_second_chunk == {}: runs_second_chunk = None return runs_first_chunk, runs_second_chunk
[docs] @export class Rechunker: """Helper class for rechunking. Send in chunks via receive, which returns either None (no chunk to send) or a chunk to send. Don't forget a final call to .flush() to get any final data out! """ def __init__(self, rechunk=False, run_id=None): self.rechunk = rechunk self.is_superrun = run_id and run_id.startswith("_") self.run_id = run_id self.cache = None
[docs] def receive(self, chunk) -> list: """Receive a chunk, return list of chunks to send out after merging and splitting.""" if not self.rechunk: # We aren't rechunking return [chunk] if self.cache is not None: # We have an old chunk, so we need to concatenate # We do not expect after concatenation that the chunk will be very large because # the self.cache is already after splitting according to the target size chunk = strax.Chunk.concatenate([self.cache, chunk], allow_superrun=self.is_superrun) target_size_b = chunk.target_size_mb * 1e6 # Get the split indices according to the allowed minimum gaps # between data and the target size of chunk split_indices = self.get_splits(chunk.data, target_size_b, DEFAULT_CHUNK_SPLIT_NS) # Split the cache into chunks and return list of chunks chunks = [] for index in np.diff(split_indices): _chunk, chunk = chunk.split( t=chunk.data["time"][index] - int(DEFAULT_CHUNK_SPLIT_NS // 2), allow_early_split=False, ) chunks.append(_chunk) self.cache = chunk return chunks
[docs] def flush(self) -> list: """Flush the cache and return the remaining chunk in a list.""" if self.cache is None: return [] else: result = self.cache self.cache = None return [result]
[docs] @staticmethod def get_splits(data, target_size, min_gap=DEFAULT_CHUNK_SPLIT_NS): """Get indices where to split the data into chunks of approximately target_size.""" assumed_i = int(target_size // data.itemsize) if assumed_i == 0: raise ValueError("Target size is too small.") gap_indices = np.argwhere(strax.diff(data) > min_gap).flatten() + 1 split_indices = [0] argmin = 0 if len(gap_indices) != 0: n = 0 while split_indices[-1] + assumed_i < gap_indices[-1]: if n > len(data): raise ValueError("Trapped in infinite loop!") _argmin = np.abs(gap_indices[argmin + 1 :] - assumed_i - split_indices[-1]).argmin() split_indices.append(gap_indices[argmin + 1 :][_argmin]) argmin += _argmin + 1 n += 1 split_indices = np.array(split_indices) return split_indices