from __future__ import annotations
import math
from typing import Any
import warnings
import copy
import h5py
import numpy as np
from .arraydict import ArrayDict
from .typing import ArrayLike
from .interval import Interval
from .irregular_ts import IrregularTimeSeries
_NP_DTYPE_KINDS = {"b", "i", "u", "f", "c", "m", "M", "O", "S", "U", "V"}
# ^ From https://numpy.org/doc/2.2/reference/generated/numpy.dtype.kind.html
_DEFAULT_GAP_VALUE = {
"b": False, # boolean
"i": -1, # signed integers
"u": 0, # unsigned integers
"f": np.nan, # floating
}
def _validate_gap_value_dict(gap_value):
for k, v in gap_value.items():
if k not in _NP_DTYPE_KINDS:
raise ValueError(
f"gap_value dict has unsupported key {k!r}; valid keys "
f"are {sorted(_NP_DTYPE_KINDS)} "
)
# bool is a subclass of int in Python, so check it explicitly first.
is_bool = isinstance(v, (bool, np.bool_))
is_int = isinstance(v, (int, np.integer)) and not is_bool
is_float = isinstance(v, (float, np.floating))
if k == "b" and not is_bool:
raise ValueError(f"gap_value['b'] must be a bool, got {v!r}")
if k == "i" and not is_int:
raise ValueError(f"gap_value['i'] must be an integer, got {v!r}")
if k == "u":
if not is_int:
raise ValueError(f"gap_value['u'] must be an integer, got {v!r}")
if v < 0:
raise ValueError(f"gap_value['u'] must be non-negative, got {v}")
if k == "f" and not (is_int or is_float):
raise ValueError(f"gap_value['f'] must be a number, got {v!r}")
def _validate_gap_value_matches_array_dtype(v, array: np.ndarray, name: str):
"""Validate that `v` is legal to be used with all input array dtypes
Logic: cast gap value into target dtype. If:
1. cast changes the value, we raise
2. the cast emits a warning, we raise
"""
src = np.array(v)
# doing the cast here:
# Numpy sometimes emits RuntmeWarning when doing a risky cast
# and we want to catch that
with warnings.catch_warnings():
warnings.simplefilter("error", RuntimeWarning)
try:
dst = src.astype(array.dtype)
except RuntimeWarning as _:
raise ValueError(
f"gap_value={v} cannot be losslessly stored in {name!r}; "
f"cannot cast {src.dtype!r} into {array.dtype!r}"
)
if not np.array_equal(src, dst, equal_nan=True):
raise ValueError(
f"gap_value={v} cannot be losslessly stored in {name!r}; "
f"numpy would silently cast it from {src.item()!r} to {dst.item()!r}"
)
[docs]
class RegularTimeSeries(ArrayDict):
r"""A regular time series is the same as an irregular time series, but it has a
regular sampling rate. This allows for faster indexing, possibility of patching data
and meaningful Fourier operations. The first dimension of all attributes must be
the time dimension.
.. note::
If you have a matrix of shape :math:`(N, T)`, where :math:`N` is the number of
channels and :math:`T` is the number of time points, you should transpose it to
:math:`(T, N)` before passing it to the constructor, since the first dimension
should always be time.
Args:
sampling_rate: Sampling rate in Hz.
domain_start: Absolute starting time offset (in seconds) of this signal. Defaults to :obj:`0.0`.
**kwargs: Arbitrary keyword arguments where the values are arbitrary
multi-dimensional (2d, 3d, ..., nd) arrays with shape (N, \*).
See Also:
:meth:`from_gappy_timeseries` to construct from regular timeseries that has
gaps or missing values.
Example ::
>>> import numpy as np
>>> from temporaldata import RegularTimeSeries
>>> lfp = RegularTimeSeries(
... raw=np.zeros((1000, 128)),
... sampling_rate=250.,
... )
>>> lfp.slice(0, 1)
RegularTimeSeries(
raw=[250, 128]
)
>>> lfp.to_irregular()
IrregularTimeSeries(
timestamps=[1000],
raw=[1000, 128]
)
"""
_domain: Interval
def __init__(
self,
*,
sampling_rate: float, # in Hz
domain_start: float = 0.0,
**kwargs: ArrayLike,
):
if "domain" in kwargs:
domain = kwargs.pop("domain")
if domain == "auto":
warnings.warn(
"The `domain` argument of `RegularTimeSeries` is deprecated "
"and will be removed in a future version. The domain is "
"always computed automatically as "
"[domain_start, domain_start + len(self) / sampling_rate); "
'you can drop `domain="auto"` from your call.',
DeprecationWarning,
stacklevel=2,
)
else:
raise ValueError(
"Manually setting the domain of `RegularTimeSeries` to a "
"custom `Interval` is no longer supported; the domain is "
"always computed automatically as "
"[domain_start, domain_start + len(self) / sampling_rate) "
"so that its boundaries stay aligned to the sample grid. "
"Use `domain_start` to set the start time."
)
super().__init__(**kwargs)
self._sampling_rate = sampling_rate
if not isinstance(domain_start, (int, float)):
raise ValueError(
f"domain_start must be a number, got {type(domain_start)}."
)
self._domain = Interval(
start=domain_start,
end=domain_start + len(self) / sampling_rate,
)
@property
def sampling_rate(self) -> float:
r"""Sampling rate in Hz"""
return self._sampling_rate
@property
def timestamps(self) -> np.ndarray:
r"""Sample timestamps"""
return (
self.domain.start[0]
+ np.arange(len(self), dtype=np.float64) / self.sampling_rate
)
@property
def domain(self) -> Interval:
r"""Domain of this time series"""
return self._domain
[docs]
def index_mask(self) -> np.ndarray:
r"""Boolean mask marking which samples fall inside :attr:`domain`.
For a gappy :obj:`RegularTimeSeries` (one whose :attr:`domain` consists
of more than one interval), some positions along the time axis are
fill values rather than real observations. This method returns a
1-D boolean array of length ``len(self)`` where ``True`` marks a real
sample and ``False`` marks a gap (fill).
For a contiguous :obj:`RegularTimeSeries` (single-interval domain) the
result is all ``True``.
Returns:
np.ndarray: 1-D boolean array of shape ``(len(self),)``.
Example ::
>>> import numpy as np
>>> from temporaldata import RegularTimeSeries
>>> # Contiguous (non-gappy) series: every sample is real.
>>> rts = RegularTimeSeries(
... raw=np.arange(4), sampling_rate=100.0,
... )
>>> rts.index_mask()
array([ True, True, True, True])
>>> # Gappy series: 0.02s and 0.05s samples are missing.
>>> ts = [0.0, 0.01, 0.03, 0.04, 0.06]
>>> raw = [1, 2, 3, 4, 5]
>>> rts = RegularTimeSeries.from_gappy_timeseries(
... ts, sampling_rate=100.0, raw=raw,
... )
>>> rts.index_mask()
array([ True, True, False, True, True, False, True])
>>> rts.raw # contains fill values
array([ 1, 2, -1, 3, 4, -1, 5])
>>> rts.raw[rts.index_mask()]
array([1, 2, 3, 4, 5])
"""
n = len(self)
domain = self.domain
if len(domain) == 1:
return np.full(n, True, dtype=bool)
sampling_rate = self.sampling_rate
start_ts, end_ts = domain.start, domain.end
start_id = np.round((start_ts - start_ts[0]) * sampling_rate).astype(int)
end_id = np.round((end_ts - start_ts[0]) * sampling_rate).astype(int)
if end_id[-1] != n:
raise RuntimeError( # pragma: no cover
f"This should never happen. Debug info:\n"
f"{n=}\n"
f"{start_id=}\n"
f"{end_id=}\n"
)
# Create an array that marks start of a True run by +1
# and start of a False run by -1
diff = np.zeros(n + 1, dtype=np.int8)
diff[start_id] = 1
diff[end_id] = -1
# Cumsum would convert it to runs of ones and zeros corresponding
# to valid and invalid timestamps
ans = diff.cumsum()[:n].astype(bool)
# Why this way? to avoid python for-loops; numpy vector ops should be faster
return ans
[docs]
def select_by_mask(self, mask: np.ndarray):
"""Raises a NotImplementedError as this method is not supported
for :obj:`RegularTimeSeries`.
Raises:
NotImplementedError: Always, because this method cannot
be implemented for this class.
"""
# TODO: Implement once we support "gappy" regular timeseries
raise NotImplementedError("Not implemented for RegularTimeSeries.")
def _time_to_idx(
self,
time: float,
eps: float = 1e-9,
) -> tuple[int, float]:
"""Converts a timestamp to a sample index and its exact reconstructed time.
Args:
time: The timestamp to convert.
eps: Tolerance for floating-point precision. If the calculated index
is within ``eps`` of an integer, it is snapped to that integer.
This prevents tiny precision errors (e.g., 3.999999999999999) from
causing off-by-one errors when applying ``math.ceil``.
Returns:
tuple[int, float]: A tuple containing:
* **index**: The calculated integer sample index within the array.
* **reconstructed_time**: The exact timestamp in seconds that corresponds
to the selected **index** (i.e. the actual time of the sample).
"""
domain_start = self.domain.start[0]
domain_end = self.domain.end[-1]
# Clamp to domain bounds
if time <= domain_start:
return 0, domain_start
if time > domain_end:
return len(self), domain_end
# Calculate relative index
rel_t = time - domain_start
idx_float = rel_t * self.sampling_rate
# Precision check: if it's "close enough" to an integer, treat it as that integer
rounded = round(idx_float)
if abs(idx_float - rounded) < eps:
idx_float = float(rounded)
# Determine index and reconstruct the actual timestamp of that sample
idx = math.ceil(idx_float)
actual_time = domain_start + (idx / self.sampling_rate)
return idx, actual_time
[docs]
def slice(
self,
start: float,
end: float,
reset_origin: bool = True,
eps: float = 1e-9,
):
r"""Returns a new :obj:`RegularTimeSeries` object that contains the data between
the start (inclusive) and end (exclusive) times (i.e., [start, end)).
:obj:`start` and :obj:`end` are snapped up to the next grid point (the next
multiple of ``1/sampling_rate``).
- Gap-filled samples at the start or end of the result are trimmed, so
returned data always begins and ends on real samples.
- Gaps in the middle of the window are preserved as-is and remain filled
with the gap value.
- Slices that fall fully outside the domain or entirely within a gap
return empty data.
Args:
start: Start time.
end: End time.
reset_origin: If :obj:`True`, all time attributes will be updated to be
relative to the new start time. Defaults to :obj:`True`.
eps: A tiny 'rounding buffer' to handle floating-point noise when computing indices.
If your sampling rate is very high, you may need to increase
this (e.g., to 1e-7) to avoid off-by-one errors.
Returns:
RegularTimeSeries: A new instance of the same class
containing a subset of the data. The new object will have a modified
:obj:`Interval` domain reflecting the actual sampled boundaries.
"""
start_id, out_start = self._time_to_idx(start, eps=eps)
end_id, out_end = self._time_to_idx(end, eps=eps)
# Intersect with the (possibly multi-interval) domain
new_domain = self.domain & Interval(out_start, out_end)
out = self.__class__.__new__(self.__class__)
out._sampling_rate = self.sampling_rate
# No real samples
is_empty = len(new_domain) == 0 or new_domain.start[0] == new_domain.end[-1]
if is_empty:
out._domain = (
Interval(start=0.0, end=0.0)
if reset_origin
else Interval(start=out_start, end=out_start)
)
for key in self.keys():
out.__dict__[key] = self.__dict__[key][0:0].copy()
return out
# Trim leading/trailing gap samples, Internal gaps stay in the array as gap-filled values.
leading_trim = int(
round((new_domain.start[0] - out_start) * self.sampling_rate)
)
trailing_trim = int(round((out_end - new_domain.end[-1]) * self.sampling_rate))
start_id += leading_trim
end_id -= trailing_trim
if reset_origin:
new_domain.start = new_domain.start - start
new_domain.end = new_domain.end - start
out._domain = new_domain
for key in self.keys():
out.__dict__[key] = self.__dict__[key][start_id:end_id].copy()
return out
[docs]
def to_irregular(self):
r"""Converts the :obj:`RegularTimeSeries` object to an :obj:`IrregularTimeSeries` object.
Gap-fill samples (where :meth:`index_mask` is :obj:`False`) are dropped.
The returned arrays (timestamps, values, and domain) are independent
copies; mutating them will not affect this :obj:`RegularTimeSeries`.
Returns:
:obj:`IrregularTimeSeries` with timestamps and all attributes copied.
Example ::
>>> import numpy as np
>>> from temporaldata import RegularTimeSeries
>>> # Contiguous (non-gappy) series: every sample is kept.
>>> rts = RegularTimeSeries(raw=np.arange(4), sampling_rate=10.0)
>>> irts = rts.to_irregular()
>>> irts.timestamps
array([0. , 0.1, 0.2, 0.3])
>>> irts.raw
array([0, 1, 2, 3])
>>> # Gappy series: gap-fill samples are dropped.
>>> ts = [0.0, 0.01, 0.03, 0.04, 0.06]
>>> raw = [1, 2, 3, 4, 5]
>>> rts = RegularTimeSeries.from_gappy_timeseries(
... ts, sampling_rate=100.0, raw=raw,
... )
>>> rts.raw # contains fill values
array([ 1, 2, -1, 3, 4, -1, 5])
>>> irts = rts.to_irregular()
>>> irts.timestamps
array([0. , 0.01, 0.03, 0.04, 0.06])
>>> irts.raw
array([1, 2, 3, 4, 5])
"""
if not self.is_gappy():
# Every sample is real, skip the mask.
return IrregularTimeSeries(
timestamps=self.timestamps,
**{k: getattr(self, k).copy() for k in self.keys()},
domain=copy.deepcopy(self.domain),
)
mask = self.index_mask()
return IrregularTimeSeries(
timestamps=self.timestamps[mask],
**{k: getattr(self, k)[mask] for k in self.keys()},
domain=copy.deepcopy(self.domain),
)
[docs]
def to_hdf5(self, file):
r"""Saves the data object to an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. code-block:: python
import h5py
from temporaldata import RegularTimeSeries
data = RegularTimeSeries(
raw=np.zeros((1000, 128)),
sampling_rate=250.,
)
with h5py.File("data.h5", "w") as f:
data.to_hdf5(f)
"""
for key in self.keys():
value = getattr(self, key)
file.create_dataset(key, data=value)
# domain is of type Interval
grp = file.create_group("domain")
self._domain.to_hdf5(grp)
file.attrs["object"] = self.__class__.__name__
file.attrs["sampling_rate"] = self.sampling_rate
[docs]
@classmethod
def from_hdf5(cls, file):
r"""Loads the data object from an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. note::
This method will load all data in memory, if you would like to use lazy
loading, call :meth:`LazyRegularTimeSeries.from_hdf5` instead.
.. code-block:: python
import h5py
from temporaldata import RegularTimeSeries
with h5py.File("data.h5", "r") as f:
data = RegularTimeSeries.from_hdf5(f)
"""
assert file.attrs["object"] == cls.__name__, "object type mismatch"
data = {}
for key, value in file.items():
if key != "domain":
data[key] = value[:]
domain = Interval.from_hdf5(file["domain"])
obj = cls(
**data,
sampling_rate=file.attrs["sampling_rate"],
domain_start=float(domain.start[0]),
)
obj._domain = domain
return obj
[docs]
@classmethod
def from_gappy_timeseries(
cls,
timestamps: ArrayLike,
sampling_rate: float,
gap_value: Any | dict[str, Any] | None = None,
rtol: float = 1e-3,
**kwargs: ArrayLike,
) -> RegularTimeSeries:
r"""Regularize an approximately-regular but gappy timeseries.
Construct a :obj:`RegularTimeSeries` from approximately-regular but
gappy timestamps and value arrays by snapping each sample to a regular
grid at :obj:`sampling_rate` and filling missing samples with
:obj:`gap_value`.
Useful for signals that are nominally regular (e.g. behavioral streams
at a fixed sampling rate) but contain missing samples, which would
otherwise have to be carried as an :obj:`IrregularTimeSeries` and would
suffer numerical-precision issues during slicing.
Args:
timestamps: 1-D array-like of timestamps, strictly increasing.
Each entry must lie within :obj:`rtol` samples of a regular
grid at :obj:`sampling_rate`, anchored at
:obj:`timestamps[0]`.
sampling_rate: Sampling rate in Hz.
gap_value: Value used to fill missing samples. May be:
* :obj:`None` (default) — uses per-kind defaults: ``-1`` for
signed integers, ``0`` for unsigned integers,
:obj:`numpy.nan` for floats, ``False`` for bools.
* A scalar (``int``, ``float``, or ``bool``) — used for every
kwarg array regardless of dtype.
* A ``dict`` mapping :obj:`numpy.dtype.kind` codes to fill
values. Recognized kinds: ``'b'`` (bool), ``'i'`` (signed
int), ``'u'`` (unsigned int), ``'f'`` (float). Example:
``{'i': -1, 'u': 0, 'f': np.nan}``. Raises :obj:`KeyError`
if a kwarg's dtype kind is not in the dict.
rtol: Maximum allowed deviation, in samples, of any input timestamp
from the regular grid.
**kwargs: Named array-like values whose first dimension equals
``len(timestamps)``.
Returns:
RegularTimeSeries: A regular time series with the same named
arrays, gaps filled with :obj:`gap_value`.
Raises:
ValueError: If timestamps deviate from the regular grid by more than :obj:`rtol`
See Also:
* :meth:`is_gappy` to check whether a series has gaps.
* :meth:`index_mask` for a boolean mask of real vs. gap-fill samples.
Example ::
>>> import numpy as np
>>> from temporaldata import RegularTimeSeries
>>> # 4 samples at 100 Hz, the 0.02s sample is missing.
>>> ts = np.array([0.0, 0.01, 0.03, 0.04])
>>> raw = np.array([1.0, 2.0, 3.0, 4.0])
>>> rts = RegularTimeSeries.from_gappy_timeseries(
... ts, sampling_rate=100.0, raw=raw,
... )
>>> rts.raw
array([ 1., 2., nan, 3., 4.])
>>> rts.domain.start
array([0. , 0.03])
>>> rts.domain.end
array([0.02, 0.05])
>>> rts.index_mask() # indicates valid and filled-in timestamps
array([ True, True, False, True, True])
"""
timestamps = np.asarray(timestamps)
if timestamps.ndim != 1:
raise ValueError(f"timestamps must be 1-D, got shape {timestamps.shape}")
if len(timestamps) < 2:
raise ValueError(
f"timestamps must have at least 2 entries, got {len(timestamps)}"
)
if not (np.diff(timestamps) > 0).all():
raise ValueError("timestamps must be strictly increasing")
if gap_value is None:
gap_value = _DEFAULT_GAP_VALUE
if isinstance(gap_value, dict):
_validate_gap_value_dict(gap_value)
start_time = float(timestamps[0])
rel_idx = (timestamps - start_time) * sampling_rate
grid_idx = np.round(rel_idx).astype(np.int64)
max_dev = float(np.max(np.abs(rel_idx - grid_idx)))
if max_dev > rtol:
raise ValueError(
f"timestamps deviate from a regular grid at sampling_rate="
f"{sampling_rate} Hz by up to {max_dev:.3g} samples, "
f"exceeding rtol={rtol}. Pick a different sampling_rate, "
f"increase rtol, or use IrregularTimeSeries if this signal "
f"is inherently irregular."
)
idx_diffs = np.diff(grid_idx)
min_idx_gap = int(idx_diffs.min())
if min_idx_gap < 1:
raise ValueError(
f"timestamps contain duplicate or sub-sample-spaced entries "
f"at sampling_rate={sampling_rate} Hz"
)
if min_idx_gap > 1:
raise ValueError(
f"sampling_rate={sampling_rate} appears too high: the smallest "
f"gap between consecutive timestamps is {min_idx_gap} grid "
f"steps (expected 1). The true sampling rate may be closer to "
f"{sampling_rate / min_idx_gap}."
)
num_timesteps = int(grid_idx[-1]) + 1
# Build a multi-interval domain that excludes gaps
gap_after = idx_diffs > 1
is_run_start = np.concatenate([[True], gap_after])
is_run_end = np.concatenate([gap_after, [True]])
domain = Interval(
start=start_time + grid_idx[is_run_start] / sampling_rate,
end=start_time + (grid_idx[is_run_end] + 1) / sampling_rate,
)
filled: dict[str, np.ndarray] = {}
for key, arr in kwargs.items():
arr = np.asarray(arr)
if len(arr) != len(timestamps):
raise ValueError(
f"{key!r} has length {len(arr)}, expected "
f"{len(timestamps)} to match timestamps"
)
if isinstance(gap_value, dict):
kind = arr.dtype.kind
if kind not in gap_value:
raise KeyError(
f"{key!r} has dtype {arr.dtype} (kind {kind!r}) which is "
f"not in gap_value dict (keys: {list(gap_value)})"
)
_gap_value = gap_value[kind]
else:
_gap_value = gap_value
_validate_gap_value_matches_array_dtype(_gap_value, array=arr, name=key)
out = np.full((num_timesteps, *arr.shape[1:]), _gap_value, dtype=arr.dtype)
out[grid_idx] = arr
filled[key] = out
obj = cls(sampling_rate=sampling_rate, domain_start=start_time, **filled)
obj._domain = domain # replace single-interval auto domain with gappy one
return obj
[docs]
def is_gappy(self) -> bool:
r"""Returns :obj:`True` if this :obj:`RegularTimeSeries` has gaps.
A series is *gappy* when its :attr:`domain` is made up of more than one
interval; positions inside the gaps are filled with the configured
gap value (see :meth:`from_gappy_timeseries`). A contiguous series
(single-interval domain) returns :obj:`False`.
Returns:
bool: :obj:`True` if the domain has more than one interval.
See Also:
:meth:`index_mask` for a boolean mask of real vs. gap-fill samples.
Example ::
>>> import numpy as np
>>> from temporaldata import RegularTimeSeries
>>> rts = RegularTimeSeries(raw=np.arange(4), sampling_rate=100.0)
>>> rts.is_gappy()
False
>>> rts = RegularTimeSeries.from_gappy_timeseries(
... [0.0, 0.01, 0.03], sampling_rate=100.0, raw=[1, 2, 3],
... )
>>> rts.is_gappy()
True
"""
return len(self.domain) > 1
[docs]
class LazyRegularTimeSeries(RegularTimeSeries):
r"""Lazy variant of :obj:`RegularTimeSeries`. The data is not loaded until it is
accessed. This class is meant to be used when the data is too large to fit in
memory, and is intended to be intantiated via.
:obj:`LazyRegularTimeSeries.from_hdf5`.
.. note:: To access an attribute without triggering the in-memory loading use
self.__dict__[key] otherwise using self.key or getattr(self, key) will trigger
the lazy loading and will automatically convert the h5py dataset to a numpy
array as well as apply any outstanding masks.
"""
_lazy_ops: dict
def __init__(self, **kwargs):
raise NotImplementedError(
f"{self.__class__.__name__} cannot be constructed directly; use from_hdf5."
)
def _maybe_first_dim(self):
if len(self.keys()) == 0:
return None
else:
# todo check _lazy_ops
for key in self.keys():
value = self.__dict__[key]
if isinstance(value, np.ndarray):
return value.shape[0]
if "slice" in self._lazy_ops:
# TODO add more constraints to the domain in RegularTimeSeries
# TODO it is always better to resolve another attribute before timestamps
# this is because we are dealing with numerical noise
# we know the domain and the sampling rate, we can infer the number of pts
domain_length = self.domain.end[-1] - self.domain.start[0]
return int(np.round(domain_length * self.sampling_rate))
# otherwise nothing was loaded, return the first dim of the h5py dataset
return self.__dict__[self.keys()[0]].shape[0]
def __getattribute__(self, name):
if not name in ["__dict__", "keys"]:
# intercept attribute calls
if name in self.keys():
out = self.__dict__[name]
if isinstance(out, h5py.Dataset):
# convert into numpy array
if "slice" in self._lazy_ops:
idx_l, idx_r = self._lazy_ops["slice"]
out = out[idx_l:idx_r]
else:
out = out[:]
# store it
self.__dict__[name] = out
# If all attributes are loaded, we can remove the lazy flag
all_loaded = all(
isinstance(self.__dict__[key], np.ndarray) for key in self.keys()
)
if all_loaded:
self.__class__ = RegularTimeSeries
del self._lazy_ops
return out
return super(LazyRegularTimeSeries, self).__getattribute__(name)
[docs]
def slice(
self,
start: float,
end: float,
reset_origin: bool = True,
eps: float = 1e-9,
):
r"""Returns a new :obj:`RegularTimeSeries` object that contains the data between
the start (inclusive) and end (exclusive) times (i.e., [start, end)).
:obj:`start` and :obj:`end` are snapped up to the next grid point (the next
multiple of ``1/sampling_rate``).
- Gap-filled samples at the start or end of the result are trimmed, so
returned data always begins and ends on real samples.
- Gaps in the middle of the window are preserved as-is and remain filled
with the gap value.
- Slices that fall fully outside the domain or entirely within a gap
return empty data.
Args:
start: Start time.
end: End time.
reset_origin: If :obj:`True`, all time attributes will be updated to be
relative to the new start time. Defaults to :obj:`True`.
eps: A tiny 'rounding buffer' to handle floating-point noise when computing indices.
If your sampling rate is very high, you may need to increase
this (e.g., to 1e-7) to avoid off-by-one errors.
Returns:
LazyRegularTimeSeries: A new instance of the same class
containing a subset of the data. The new object will have a modified
:obj:`Interval` domain reflecting the actual sampled boundaries.
"""
start_id, out_start = self._time_to_idx(start, eps=eps)
end_id, out_end = self._time_to_idx(end, eps=eps)
# Intersect with the (possibly multi-interval) domain
new_domain = self.domain & Interval(out_start, out_end)
is_empty = len(new_domain) == 0 or new_domain.start[0] == new_domain.end[-1]
if is_empty:
# No data to defer-load; return an eager RegularTimeSeries.
out = RegularTimeSeries.__new__(RegularTimeSeries)
out._sampling_rate = self.sampling_rate
out._domain = (
Interval(start=0.0, end=0.0)
if reset_origin
else Interval(start=out_start, end=out_start)
)
for key in self.keys():
out.__dict__[key] = self.__dict__[key][0:0]
return out
out = self.__class__.__new__(self.__class__)
out._sampling_rate = self.sampling_rate
out._lazy_ops = {}
parent_offset = self._lazy_ops["slice"][0] if "slice" in self._lazy_ops else 0
# Trim leading/trailing gap samples
leading_trim = int(
round((new_domain.start[0] - out_start) * self.sampling_rate)
)
trailing_trim = int(round((out_end - new_domain.end[-1]) * self.sampling_rate))
start_id += leading_trim
end_id -= trailing_trim
if reset_origin:
new_domain.start = new_domain.start - start
new_domain.end = new_domain.end - start
out._domain = new_domain
for key in self.keys():
if isinstance(self.__dict__[key], h5py.Dataset):
out.__dict__[key] = self.__dict__[key]
else:
out.__dict__[key] = self.__dict__[key][start_id:end_id].copy()
out._lazy_ops["slice"] = (
parent_offset + start_id,
parent_offset + end_id,
)
return out
[docs]
def to_hdf5(self, file):
raise NotImplementedError("Cannot save a lazy array dict to hdf5.")
[docs]
@classmethod
def from_gappy_timeseries(cls, *_args, **_kwargs):
r"""Not implemented for :obj:`LazyRegularTimeSeries`.
Use :meth:`RegularTimeSeries.from_gappy_timeseries` instead.
"""
raise NotImplementedError(
"from_gappy_timeseries is not available on LazyRegularTimeSeries; "
"use RegularTimeSeries.from_gappy_timeseries instead."
)
[docs]
@classmethod
def from_hdf5(cls, file):
r"""Loads the data object from an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. code-block:: python
import h5py
from temporaldata import ArrayDict
with h5py.File("data.h5", "r") as f:
data = ArrayDict.from_hdf5(f)
"""
assert (
file.attrs["object"] == RegularTimeSeries.__name__
), "object type mismatch"
obj = cls.__new__(cls)
for key, value in file.items():
if key == "domain":
obj.__dict__["_domain"] = Interval.from_hdf5(file[key])
else:
obj.__dict__[key] = value
obj._lazy_ops = {}
obj._sampling_rate = file.attrs["sampling_rate"]
return obj