from __future__ import annotations
import copy
from typing import Dict, List
import logging
import h5py
import numpy as np
import pandas as pd
from .utils import _size_repr
[docs]
class ArrayDict(object):
r"""A dictionary of arrays that share the same first dimension. The number of
dimensions for each array can be different, but they need to be at least
1-dimensional.
Args:
**kwargs: arrays that shares the same first dimension.
Example ::
>>> from temporaldata import ArrayDict
>>> import numpy as np
>>> units = ArrayDict(
... unit_id=np.array(["unit01", "unit02"]),
... brain_region=np.array(["M1", "M1"]),
... waveform_mean=np.random.rand(2, 48),
... )
>>> units
ArrayDict(
unit_id=[2],
brain_region=[2],
waveform_mean=[2, 48]
)
"""
def __init__(self, **kwargs: np.ndarray):
for key, value in kwargs.items():
self.__setattr__(key, value)
[docs]
def keys(self) -> List[str]:
r"""Returns a list of all array attribute names."""
return list(filter(lambda x: not x.startswith("_"), self.__dict__))
def _maybe_first_dim(self):
# If self has at least one attribute, returns the first dimension of
# the first attribute. Otherwise, returns :obj:`None`.
keys = self.keys()
if len(keys) == 0:
return None
else:
return self.__dict__[keys[0]].shape[0]
def __len__(self):
r"""Returns the first dimension shared by all attributes."""
first_dim = self._maybe_first_dim()
if first_dim is None:
raise ValueError(f"{self.__class__.__name__} is empty.")
return first_dim
def __setattr__(self, name, value):
# for non-private attributes, we want to check that they are ndarrays
# and that they match the first dimension of existing attributes
if not name.startswith("_"):
# only ndarrays are accepted
assert isinstance(
value, np.ndarray
), f"{name} must be a numpy array, got object of type {type(value)}"
if value.ndim == 0:
raise ValueError(
f"{name} must be at least 1-dimensional, got 0-dimensional array."
)
first_dim = self._maybe_first_dim()
if first_dim is not None and value.shape[0] != first_dim:
raise ValueError(
f"All elements of {self.__class__.__name__} must have the same "
f"first dimension. The first dimension of {name} is "
f"{value.shape[0]} but the first dimension of existing attributes "
f"is {first_dim}."
)
super(ArrayDict, self).__setattr__(name, value)
def __contains__(self, key: str) -> bool:
r"""Returns :obj:`True` if the attribute :obj:`key` is present in the data."""
return key in self.keys()
def __repr__(self) -> str:
cls = self.__class__.__name__
info = [_size_repr(k, self.__dict__[k], indent=2) for k in self.keys()]
info = ",\n".join(info)
return f"{cls}(\n{info}\n)"
[docs]
def select_by_mask(self, mask: np.ndarray, **kwargs):
r"""Return a new :obj:`ArrayDict` object where all array attributes are indexed
using the boolean mask.
Args:
mask: Boolean array used for masking. The mask needs to be 1-dimensional,
and of equal length as the first dimension of the :obj:`ArrayDict`.
**kwargs: Private attributes that will not be masked will need to be passed
as arguments.
Example ::
>>> from temporaldata import ArrayDict
>>> import numpy as np
>>> units = ArrayDict(
... unit_id=np.array(["unit01", "unit02"]),
... brain_region=np.array(["M1", "M1"]),
... waveform_mean=np.random.rand(2, 48),
... )
>>> units_subset = units.select_by_mask(np.array([True, False]))
>>> units_subset
ArrayDict(
unit_id=[1],
brain_region=[1],
waveform_mean=[1, 48]
)
"""
assert mask.ndim == 1, f"mask must be 1D, got {mask.ndim}D mask"
assert mask.dtype == bool, f"mask must be boolean, got {mask.dtype}"
first_dim = self._maybe_first_dim()
if mask.shape[0] != first_dim:
raise ValueError(
f"mask length {mask.shape[0]} does not match first dimension of arrays "
f"({first_dim})."
)
# kwargs are other private attributes
# TODO automatically add private attributes
return self.__class__(
**{k: getattr(self, k)[mask].copy() for k in self.keys()}, **kwargs
)
[docs]
@classmethod
def from_dataframe(cls, df, unsigned_to_long=True, **kwargs):
r"""Creates an :obj:`ArrayDict` object from a pandas DataFrame.
The columns in the DataFrame are converted to arrays when possible, otherwise
they will be skipped.
Args:
df (pandas.DataFrame): DataFrame.
unsigned_to_long (bool, optional): If :obj:`True`, automatically converts
unsigned integers to int64. Defaults to :obj:`True`.
"""
data = {**kwargs}
for column in df.columns:
if column in cls.__dict__.keys():
# We don't let users override existing attributes with this method,
# since that is most likely a mistake.
# Example: A dataframe might contain a 'split' attribute signifying
# train/val/test splits.
raise ValueError(
f"Attribute '{column}' already exists. Cannot override this "
f"attribute with the from_dataframe method. Please rename the "
f"attribute in the dataframe. If you really meant to override "
f"this attribute, please do so manually after the object is "
f"created."
)
if pd.api.types.is_numeric_dtype(df[column]):
# Directly convert numeric columns to numpy arrays
np_arr = df[column].to_numpy()
# Convert unsigned integers to long
if np.issubdtype(np_arr.dtype, np.unsignedinteger) and unsigned_to_long:
np_arr = np_arr.astype(np.int64)
data[column] = np_arr
elif df[column].apply(lambda x: isinstance(x, np.ndarray)).all():
# Check if all ndarrays in the column have the same shape
ndarrays = df[column]
first_shape = ndarrays.iloc[0].shape
if all(
arr.shape == first_shape
for arr in ndarrays
if isinstance(arr, np.ndarray)
):
# If all elements in the column are ndarrays with the same shape,
# stack them
np_arr = np.stack(df[column].values)
if (
np.issubdtype(np_arr.dtype, np.unsignedinteger)
and unsigned_to_long
):
np_arr = np_arr.astype(np.int64)
data[column] = np_arr
else:
logging.warning(
f"The ndarrays in column '{column}' do not all have the same shape."
)
elif isinstance(df[column].iloc[0], str):
try: # try to see if unicode strings can be converted to fixed length ASCII bytes
df[column].to_numpy(dtype="S")
except UnicodeEncodeError:
logging.warning(
f"Unable to convert column '{column}' to a numpy array. Skipping."
)
else:
data[column] = df[column].to_numpy()
else:
logging.warning(
f"Unable to convert column '{column}' to a numpy array. Skipping."
)
return cls(**data)
[docs]
def to_hdf5(self, file):
r"""Saves the data object to an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. code-block:: python
import h5py
from temporaldata import ArrayDict
data = ArrayDict(
unit_id=np.array(["unit01", "unit02"]),
brain_region=np.array(["M1", "M1"]),
waveform_mean=np.zeros((2, 48)),
)
with h5py.File("data.h5", "w") as f:
data.to_hdf5(f)
"""
# save class name
file.attrs["object"] = self.__class__.__name__
# save attributes
_unicode_keys = []
for key in self.keys():
value = getattr(self, key)
if value.dtype.kind == "U": # if its a unicode string type
try:
# convert string arrays to fixed length ASCII bytes
value = value.astype("S")
except UnicodeEncodeError:
raise NotImplementedError(
f"Unable to convert column '{key}' from numpy 'U' string type "
"to fixed-length ASCII (np.dtype('S')). HDF5 does not support "
"numpy 'U' strings."
)
# keep track of the keys of the arrays that were originally unicode
_unicode_keys.append(key)
file.create_dataset(key, data=value)
# save a list of the keys of the arrays that were originally unicode to
# convert them back to unicode when loading
file.attrs["_unicode_keys"] = np.array(_unicode_keys, dtype="S")
[docs]
@classmethod
def from_hdf5(cls, file):
r"""Loads the data object from an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. note::
This method will load all data in memory, if you would like to use lazy
loading, call :meth:`LazyArrayDict.from_hdf5` instead.
.. code-block:: python
import h5py
from temporaldata import ArrayDict
with h5py.File("data.h5", "r") as f:
data = ArrayDict.from_hdf5(f)
"""
if file.attrs["object"] != cls.__name__:
raise ValueError(
f"File contains data for a {file.attrs['object']} object, expected "
f"{cls.__name__} object."
)
_unicode_keys = file.attrs["_unicode_keys"].astype(str).tolist()
data = {}
for key, value in file.items():
data[key] = value[:]
# if the values were originally unicode but stored as fixed length ASCII bytes
if key in _unicode_keys:
data[key] = data[key].astype("U")
obj = cls(**data)
return obj
def __copy__(self):
# create a shallow copy of the object
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo):
# create a deep copy of the object
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if isinstance(v, h5py.Dataset):
# h5py.File objects cannot be deepcopied
result.__dict__[k] = v
else:
result.__dict__[k] = copy.deepcopy(v, memo)
return result
[docs]
def materialize(self) -> ArrayDict:
r"""Materializes the data object, i.e., loads into memory all of the data that
is still referenced in the HDF5 file."""
for key in self.keys():
# simply access all attributes to trigger the lazy loading
getattr(self, key)
return self
[docs]
class LazyArrayDict(ArrayDict):
r"""Lazy variant of :obj:`ArrayDict`. The data is not loaded until it is accessed.
This class is meant to be used when the data is too large to fit in memory, and
is intended to be intantiated via. :obj:`LazyArrayDict.from_hdf5`.
.. note:: To access an attribute without triggering the in-memory loading use
self.__dict__[key] otherwise using self.key or getattr(self, key) will trigger
the lazy loading and will automatically convert the h5py dataset to a numpy
array as well as apply any outstanding masks.
"""
_lazy_ops = dict()
_unicode_keys = []
def _maybe_first_dim(self):
if len(self.keys()) == 0:
return None
else:
for key in self.keys():
value = self.__dict__[key]
# check if an array is already loaded, return its first dimension
if isinstance(value, np.ndarray):
return value.shape[0]
# no array was loaded, check if there is a mask in _lazy_ops
if "mask" in self._lazy_ops:
return self._lazy_ops["mask"].sum()
# otherwise nothing was loaded, return the first dim of the h5py dataset
return self.__dict__[self.keys()[0]].shape[0]
def __getattribute__(self, name):
if not name in ["__dict__", "keys"]:
# intercept attribute calls. this is where data that is not loaded is loaded
# and when any lazy operations are applied
if name in self.keys():
out = self.__dict__[name]
if isinstance(out, h5py.Dataset):
# apply any mask, and return the numpy array
if "mask" in self._lazy_ops:
out = out[self._lazy_ops["mask"]]
else:
out = out[:]
# if the array was originally unicode, convert it back to unicode
if name in self._unicode_keys:
out = out.astype("U")
# store it, now the array is loaded
self.__dict__[name] = out
# if all attributes are loaded, we can remove the lazy flag
all_loaded = all(
isinstance(self.__dict__[key], np.ndarray) for key in self.keys()
)
if all_loaded:
self.__class__ = ArrayDict
# delete special private attributes
del self._lazy_ops, self._unicode_keys
return out
return super(LazyArrayDict, self).__getattribute__(name)
[docs]
def select_by_mask(self, mask: np.ndarray):
assert mask.ndim == 1, f"mask must be 1D, got {mask.ndim}D mask"
assert mask.dtype == bool, f"mask must be boolean, got {mask.dtype}"
first_dim = self._maybe_first_dim()
if mask.shape[0] != first_dim:
raise ValueError(
f"mask length {mask.shape[0]} does not match first dimension of arrays "
f"({first_dim})."
)
# make a copy
out = self.__class__.__new__(self.__class__)
# private attributes
out._unicode_keys = self._unicode_keys
out._lazy_ops = {}
# array attributes
for key in self.keys():
value = self.__dict__[key]
if isinstance(value, h5py.Dataset):
# the mask will be applied when the getattr is called for this key
# the details of the mask operation are stored in _lazy_ops
out.__dict__[key] = value
else:
# this is a numpy array that is already loaded in memory, apply the mask
out.__dict__[key] = value[mask].copy()
# store the mask operation in _lazy_ops for differed execution
if "mask" not in self._lazy_ops:
out._lazy_ops["mask"] = mask
else:
# if a mask was already applied, we need to combine the masks
out._lazy_ops["mask"] = self._lazy_ops["mask"].copy()
out._lazy_ops["mask"][out._lazy_ops["mask"]] = mask
return out
[docs]
@classmethod
def from_dataframe(cls, df, unsigned_to_long=True):
raise NotImplementedError("Cannot convert a dataframe to a lazy array dict.")
[docs]
def to_hdf5(self, file):
raise NotImplementedError("Cannot save a lazy array dict to hdf5.")
[docs]
@classmethod
def from_hdf5(cls, file):
r"""Loads the data object from an HDF5 file.
Args:
file (h5py.File): HDF5 file.
.. code-block:: python
import h5py
from temporaldata import ArrayDict
with h5py.File("data.h5", "r") as f:
data = ArrayDict.from_hdf5(f)
"""
assert file.attrs["object"] == ArrayDict.__name__, (
f"File contains data for a {file.attrs['object']} object, expected "
f"{ArrayDict.__name__} object."
)
obj = cls.__new__(cls)
for key, value in file.items():
obj.__dict__[key] = value
obj._unicode_keys = file.attrs["_unicode_keys"].astype(str).tolist()
obj._lazy_ops = {}
return obj