Source code for strax.chunk

import typing as ty

import numpy as np
import numba

import strax

export, __all__ = strax.exporter()
__all__.extend(["default_chunk_size_mb"])


default_chunk_size_mb = 200


[docs] @export class Chunk: """Single chunk of strax data of one data type.""" data_type: str data_kind: str dtype: np.dtype # run_id is not superfluous to track: # this could change during the run in superruns (in the future) run_id: str subruns: dict start: int end: int data: np.ndarray target_size_mb: int def __init__( self, *, data_type, data_kind, dtype, run_id, start, end, data, subruns=None, target_size_mb=default_chunk_size_mb, ): self.data_type = data_type self.data_kind = data_kind self.dtype = np.dtype(dtype) self.run_id = run_id self.start = start self.end = end self.subruns = subruns if data is None: data = np.empty(0, dtype) self.data = data self.target_size_mb = target_size_mb if not ( isinstance(self.start, (int, np.integer)) and isinstance(self.end, (int, np.integer)) ): raise ValueError(f"Attempt to create chunk {self} with non-integer start times") # Convert to bona fide python integers self.start = int(self.start) self.end = int(self.end) if not isinstance(self.data, np.ndarray): raise ValueError(f"Attempt to create chunk {self} with data that isn't a numpy array") expected_dtype = strax.remove_titles_from_dtype(dtype) got_dtype = strax.remove_titles_from_dtype(dtype) if expected_dtype != got_dtype: raise ValueError( f"Attempt to create chunk {self} with data of {dtype}, should be {expected_dtype}" ) if self.start < 0: raise ValueError(f"Attempt to create chunk {self} with negative start time") if self.start > self.end: raise ValueError(f"Attempt to create chunk {self} with negative length") if len(self.data): data_starts_at = self.data[0]["time"] # Check the last 500 samples (arbitrary number) as sanity check data_ends_at = strax.endtime(self.data[-500:]).max() if data_starts_at < self.start: raise ValueError( f"Attempt to create chunk {self} whose data starts early at {data_starts_at}" ) if data_ends_at > self.end: raise ValueError( f"Attempt to create chunk {self} whose data ends late at {data_ends_at}" ) # This is commented out for performance, but it's perhaps useful # when debugging # if len(data) > 1: # if min(np.diff(data['time'])) < 0: # raise ValueError(f"Attempt to create chunk {self} " # "whose data is not sorted by time.") def __len__(self): return len(self.data) @staticmethod def _t_fmt(t): return f"{t // int(1e9)}sec {t % int(1e9)} ns" def __repr__(self): return ( f"[{self.run_id}.{self.data_type}: " f"{self._t_fmt(self.start)} - {self._t_fmt(self.end)}, " f"{len(self)} items, " + "{0:.1f} MB/s]".format(self._mbs()) ) @property def nbytes(self): return self.data.nbytes @property def duration(self): return self.end - self.start @property def is_superrun(self): return bool(self.subruns) and self.run_id.startswith("_") @property def first_subrun(self): _subrun = None if self.is_superrun: _subrun = self._get_subrun(0) return _subrun @property def last_subrun(self): _subrun = None if self.is_superrun: _subrun = self._get_subrun(-1) return _subrun def _get_subrun(self, index): """Returns subrun according to position in chunk.""" subrun_id = list(self.subruns.keys())[index] _subrun = { "run_id": subrun_id, "start": self.subruns[subrun_id]["start"], "end": self.subruns[subrun_id]["end"], } return _subrun def _mbs(self): if self.duration: return (self.nbytes / 1e6) / (self.duration / 1e9) else: # This is strange. We have a zero duration chunk. However, this is # not the right place to raise an error message. Return -1 for now. return -1
[docs] def split(self, t: ty.Union[int, None], allow_early_split=False): """Return (chunk_left, chunk_right) split at time t. :param t: Time at which to split the data. All data in the left chunk will have their (exclusive) end <= t, all data in the right chunk will have (inclusive) start >=t. :param allow_early_split: If False, raise CannotSplit if the requirements above cannot be met. If True, split at the closest possible time before t. """ t = max(min(t, self.end), self.start) # type: ignore if t == self.end: data1, data2 = self.data, self.data[:0] elif t == self.start: data1, data2 = self.data[:0], self.data else: data1, data2, t = split_array(data=self.data, t=t, allow_early_split=allow_early_split) common_kwargs = dict( run_id=self.run_id, dtype=self.dtype, data_type=self.data_type, data_kind=self.data_kind, target_size_mb=self.target_size_mb, ) c1 = strax.Chunk( start=self.start, end=max(self.start, t), # type: ignore data=data1, **common_kwargs, ) c2 = strax.Chunk( start=max(self.start, t), # type: ignore end=max(t, self.end), # type: ignore data=data2, **common_kwargs, ) return c1, c2
[docs] @classmethod def merge(cls, chunks, data_type="<UNKNOWN>"): """Create chunk by merging columns of chunks of same data kind. :param chunks: Chunks to merge. None is allowed and will be ignored. :param data_type: data_type name of new created chunk. Set to <UNKNOWN> if not provided. """ chunks = [c for c in chunks if c is not None] if not chunks: raise ValueError("Need at least one chunk to merge") if len(chunks) == 1: return chunks[0] data_kinds = [c.data_kind for c in chunks] if len(set(data_kinds)) != 1: raise ValueError(f"Cannot merge chunks {chunks} of different data kinds: {data_kinds}") data_kind = data_kinds[0] run_ids = [c.run_id for c in chunks] if len(set(run_ids)) != 1: raise ValueError(f"Cannot merge chunks of different run_ids: {chunks}") run_id = run_ids[0] if len(set([len(c) for c in chunks])) != 1: raise ValueError(f"Cannot merge chunks with different number of items: {chunks}") tranges = [(c.start, c.end) for c in chunks] if len(set(tranges)) != 1: raise ValueError(f"Cannot merge chunks with different time ranges: {tranges}") start, end = tranges[0] data = strax.merge_arrs( [c.data for c in chunks], # Make sure dtype field order is consistent, regardless of the # order in which chunks are passed to merge: dtype=strax.merged_dtype([c.dtype for c in sorted(chunks, key=lambda x: x.data_type)]), ) return cls( start=start, end=end, dtype=data.dtype, data_type=data_type, data_kind=data_kind, run_id=run_id, data=data, target_size_mb=max([c.target_size_mb for c in chunks]), )
[docs] @classmethod def concatenate(cls, chunks, allow_hyperrun=False): """Create chunk by concatenating chunks of same data type You can pass None's, they will be ignored.""" chunks = [c for c in chunks if c is not None] if not chunks: raise ValueError("Need at least one chunk to concatenate") if len(chunks) == 1: return chunks[0] data_types = [c.data_type for c in chunks] if len(set(data_types)) != 1: raise ValueError(f"Cannot concatenate chunks of different data types: {data_types}") data_type = data_types[0] run_ids = [c.run_id for c in chunks] if len(set(run_ids)) != 1 and not allow_hyperrun: raise ValueError( f"Cannot concatenate {data_type} chunks with different run ids: {run_ids}" ) run_id = run_ids[0] subruns = _update_subruns_in_chunk(chunks) prev_end = 0 for c in chunks: if c.start < prev_end: raise ValueError( f"Attempt to concatenate overlapping or out-of-order chunks: {chunks} " ) prev_end = c.end return cls( start=chunks[0].start, end=chunks[-1].end, dtype=chunks[0].dtype, data_type=data_type, data_kind=chunks[0].data_kind, run_id=run_id, subruns=subruns, data=np.concatenate([c.data for c in chunks]), target_size_mb=max([c.target_size_mb for c in chunks]), )
[docs] @export def continuity_check(chunk_iter): """Check continuity of chunks yielded by chunk_iter as they are yielded.""" last_end = None last_runid = None last_subrun = {"run_id": None} for chunk in chunk_iter: if chunk.run_id != last_runid: last_end = None last_subrun = {"run_id": None} if chunk.is_superrun: _subrun = chunk.first_subrun if _subrun["run_id"] != last_subrun["run_id"]: last_end = None else: last_end = last_subrun["end"] if last_end is not None: if chunk.start != last_end: raise ValueError( f"Data is not continuous. Chunk {chunk} should have started at {last_end}" ) yield chunk last_end = chunk.end last_runid = chunk.run_id last_subrun = chunk.last_subrun
[docs] @export class CannotSplit(Exception): pass
[docs] @export @numba.njit(cache=True, nogil=True) def split_array(data, t, allow_early_split=False): """Return (data left of t, data right of t, t), or raise CannotSplit if that would split a data element in two. :param data: strax numpy data :param t: Time to split data :param allow_early_split: Instead of raising CannotSplit, split at t_split as close as possible before t where a split can happen. The new split time replaces t in the return value. """ # Slitting an empty array is easy if not len(data): return data[:0], data[:0], t # Splitting off a bit of nothing from the start is easy # since the data is sorted by time. if data[0]["time"] >= t: return data[:0], data, t # Find: # i_first_beyond: the first element starting after t # splittable_i: nearest index left of t where we can safely split BEFORE latest_end_seen = -1 splittable_i = 0 i_first_beyond = -1 for i, d in enumerate(data): if d["time"] >= latest_end_seen: splittable_i = i if d["time"] >= t: i_first_beyond = i break latest_end_seen = max(latest_end_seen, strax.endtime(d)) if latest_end_seen > t: # Cannot split anywhere after this break else: if latest_end_seen <= t: return data, data[:0], t if splittable_i != i_first_beyond or latest_end_seen > t: if not allow_early_split: # Raise custom exception, make better one outside numba raise CannotSplit() t = min(data[splittable_i]["time"], t) return data[:splittable_i], data[splittable_i:], t
[docs] @export def transform_chunk_to_superrun_chunk(superrun_id, chunk): """Function which transforms/creates a new superrun chunk from subrun chunk. :param superrun_id: id/name of the superrun. :param chunk: strax.Chunk of a superrun subrun. :return: strax.Chunk """ if chunk is None: return chunk subruns = {chunk.run_id: {"start": chunk.start, "end": chunk.end}} return Chunk( start=chunk.start, end=chunk.end, dtype=chunk.dtype, data_type=chunk.data_type, data_kind=chunk.data_kind, run_id=superrun_id, subruns=subruns, data=chunk.data, target_size_mb=chunk.target_size_mb, )
def _update_subruns_in_chunk(chunks): """Updates list of subruns in a superrun chunk during concatenation Updates also their start/ends too.""" subruns = None for c_i, c in enumerate(chunks): if not subruns: subruns = c.subruns continue for subrun_id, subrun_start_end in c.subruns.items(): if subrun_id in subruns: subruns[subrun_id] = { "start": min(subruns[subrun_id]["start"], subrun_start_end["start"]), "end": max(subruns[subrun_id]["end"], subrun_start_end["end"]), } else: subruns[subrun_id] = subrun_start_end return subruns