Source code for km3pipe.dataclasses

# Filename: dataclasses.py
# pylint: disable=W0232,C0103,C0111
# vim:set ts=4 sts=4 sw=4 et syntax=python:
"""
Dataclasses for internal use. Heavily based on Numpy arrays.
"""
from __future__ import absolute_import, print_function, division

import itertools

import numpy as np
from numpy.lib import recfunctions as rfn

from .dataclass_templates import TEMPLATES
from .logger import get_logger
from .tools import istype

__author__ = "Tamas Gal and Moritz Lotze"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal and Moritz Lotze"
__email__ = "tgal@km3net.de"
__status__ = "Development"
__all__ = ('Table', 'is_structured', 'has_structured_dt', 'inflate_dtype')

DEFAULT_H5LOC = '/misc'
DEFAULT_NAME = 'Generic Table'
DEFAULT_SPLIT = False
DEFAULT_H5SINGLETON = False

log = get_logger(__name__)


[docs]def has_structured_dt(arr): """Check if the array representation has a structured dtype.""" arr = np.asanyarray(arr) return is_structured(arr.dtype)
[docs]def is_structured(dt): """Check if the dtype is structured.""" if not hasattr(dt, 'fields'): return False return dt.fields is not None
[docs]def inflate_dtype(arr, names): """Create structured dtype from a 2d ndarray with unstructured dtype.""" arr = np.asanyarray(arr) if has_structured_dt(arr): return arr.dtype s_dt = arr.dtype dt = [(n, s_dt) for n in names] dt = np.dtype(dt) return dt
[docs]class Table(np.recarray): """2D generic Table with grouping index. This is a `np.recarray` subclass with some metadata and helper methods. You can initialize it directly from a structured numpy array, a pandas DataFrame, a dictionary of (columnar) arrays; or, initialize it from a list of rows/list of columns using the appropriate factory. This class adds the following to ``np.recarray``: Parameters ---------- data: array-like or dict(array-like) numpy array with structured/flat dtype, or dict of arrays. h5loc: str Location in HDF5 file where to store the data. [default: '/misc'] h5singleton: bool Tables defined as h5singletons are only written once to an HDF5 file. This is used for headers for example (default=False). dtype: numpy dtype Datatype over array. If not specified and data is an unstructured array, ``names`` needs to be specified. [default: None] Attributes ---------- h5loc: str HDF5 group where to write into. (default='/misc') split_h5: bool Split the array into separate arrays, column-wise, when saving to hdf5? (default=False) name: str Human-readable name, e.g. 'Hits' h5singleton: bool Tables defined as h5singletons are only written once to an HDF5 file. This is used for headers for example (default=False). Methods ------- from_dict(arr_dict, dtype=None, **kwargs) Create an Table from a dict of arrays (similar to pandas). from_template(data, template, **kwargs) Create an array from a dict of arrays with a predefined dtype. sorted(by) Sort the table by one of its columns. append_columns(colnames, values) Append new columns to the table. to_dataframe() Return as pandas dataframe. from_dataframe(df, **kwargs) Instantiate from a dataframe. from_rows(list_of_rows, **kwargs) Instantiate from an array-like with shape (n_rows, n_columns). from_columns(list_of_columns, **kwargs) Instantiate from an array-like with shape (n_columns, n_rows). """ def __new__( cls, data, h5loc=DEFAULT_H5LOC, dtype=None, split_h5=DEFAULT_SPLIT, name=DEFAULT_NAME, h5singleton=DEFAULT_H5SINGLETON, **kwargs ): if isinstance(data, dict): return cls.from_dict( data, h5loc=h5loc, dtype=dtype, split_h5=split_h5, name=name, h5singleton=h5singleton, **kwargs ) if istype(data, 'DataFrame'): return cls.from_dataframe( data, h5loc=h5loc, dtype=dtype, split_h5=split_h5, name=name, h5singleton=h5singleton, **kwargs ) if isinstance(data, (list, tuple)): raise ValueError( "Lists/tuples are not supported! " "Please use the `from_rows` or `from_columns` method instead!" ) if not has_structured_dt(data): # flat (nonstructured) dtypes fail miserably! # default to `|V8` whyever raise ValueError( "Arrays without structured dtype are not supported! " "Please use the `from_rows` or `from_columns` method instead!" ) if dtype is None: dtype = data.dtype assert is_structured(dtype) if dtype != data.dtype: dtype_names = set(dtype.names) data_dtype_names = set(data.dtype.names) if dtype_names == data_dtype_names: if not all(dtype[f] == data.dtype[f] for f in dtype_names): log.critical( "dtype mismatch! Matching field names but differing " "field types, no chance to reorder.\n" "dtype of data: %s\n" "requested dtype: %s" % (data.dtype, dtype) ) raise ValueError("dtype mismatch") log.once( "dtype mismatch, but matching field names and types. " "Rordering input data...", identifier=h5loc ) data = Table({f: data[f] for f in dtype_names}, dtype=dtype) else: log.critical( "dtype mismatch, no chance to reorder due to differing " "fields!\n" "dtype of data: %s\n" "requested dtype: %s" % (data.dtype, dtype) ) raise ValueError("dtype mismatch") obj = np.asanyarray(data, dtype=dtype).view(cls) obj.h5loc = h5loc obj.split_h5 = split_h5 obj.name = name obj.h5singleton = h5singleton return obj def __array_finalize__(self, obj): if obj is None: # called from explicit contructor return obj # views or slices self.h5loc = getattr(obj, 'h5loc', DEFAULT_H5LOC) self.split_h5 = getattr(obj, 'split_h5', DEFAULT_SPLIT) self.name = getattr(obj, 'name', DEFAULT_NAME) self.h5singleton = getattr(obj, 'h5singleton', DEFAULT_H5SINGLETON) # attribute access returns void instances on slicing/iteration # kudos to # https://github.com/numpy/numpy/issues/3581#issuecomment-108957200 if obj is not None and type(obj) is not type(self): self.dtype = np.dtype((np.record, obj.dtype)) def __array_wrap__(self, out_arr, context=None): # then just call the parent return Table( np.recarray.__array_wrap__(self, out_arr, context), h5loc=self.h5loc, split_h5=self.split_h5, name=self.name, h5singleton=self.h5singleton, ) @staticmethod def _expand_scalars(arr_dict): scalars = [] maxlen = 1 # have at least 1-elem arrays for k, v in arr_dict.items(): if np.isscalar(v): scalars.append(k) continue # TODO: this is not covered yet, don't know if we need this # if hasattr(v, 'shape') and v.shape == (1,): # np.array([1]) # import pdb; pdb.set_trace() # arr_dict[k] = v[0] # continue if hasattr(v, 'ndim') and v.ndim == 0: # np.array(1) arr_dict[k] = v.item() continue if len(v) > maxlen: maxlen = len(v) for s in scalars: arr_dict[s] = np.full(maxlen, arr_dict[s]) return arr_dict @classmethod
[docs] def from_dict(cls, arr_dict, dtype=None, fillna=False, **kwargs): """Generate a table from a dictionary of arrays. """ # i hope order of keys == order or values if dtype is None: names = sorted(list(arr_dict.keys())) else: dtype = np.dtype(dtype) dt_names = [f for f in dtype.names] dict_names = [k for k in arr_dict.keys()] missing_names = set(dt_names) - set(dict_names) if missing_names: if fillna: dict_names = dt_names for missing_name in missing_names: arr_dict[missing_name] = np.nan else: raise KeyError( 'Dictionary keys and dtype fields do not match!' ) names = list(dtype.names) arr_dict = cls._expand_scalars(arr_dict) data = [arr_dict[key] for key in names] return cls(np.rec.fromarrays(data, names=names, dtype=dtype), **kwargs)
@classmethod
[docs] def from_columns(cls, column_list, dtype=None, colnames=None, **kwargs): if dtype is None or not is_structured(dtype): # infer structured dtype from array data + column names if colnames is None: raise ValueError( "Need to either specify column names or a " "structured dtype when passing unstructured arrays!" ) dtype = inflate_dtype(column_list, colnames) colnames = dtype.names if len(column_list) != len(dtype.names): raise ValueError( "Number of columns mismatch between data and dtype!" ) data = {k: column_list[i] for i, k in enumerate(dtype.names)} return cls(data, dtype=dtype, colnames=colnames, **kwargs)
@classmethod
[docs] def from_rows(cls, row_list, dtype=None, colnames=None, **kwargs): if dtype is None or not is_structured(dtype): # infer structured dtype from array data + column names if colnames is None: raise ValueError( "Need to either specify column names or a " "structured dtype when passing unstructured arrays!" ) dtype = inflate_dtype(row_list, colnames) # this *should* have been checked above, but do this # just to be sure in case I screwed up the logic above; # users will never see this, this should only show in tests assert is_structured(dtype) data = np.asanyarray(row_list).view(dtype) # drop useless 2nd dim data = data.reshape((data.shape[0], )) return cls(data, **kwargs)
@property
[docs] def templates_avail(self): return sorted(list(TEMPLATES.keys()))
@classmethod
[docs] def from_template(cls, data, template): """Create a table from a predefined datatype. See the ``templates_avail`` property for available names. Parameters ---------- data Data in a format that the ``__init__`` understands. template: str or dict Name of the dtype template to use from ``kp.dataclasses_templates`` or a ``dict`` containing the required attributes (see the other templates for reference). """ name = DEFAULT_NAME if isinstance(template, str): name = template table_info = TEMPLATES[name] else: table_info = template if 'name' in table_info: name = table_info['name'] dt = table_info['dtype'] loc = table_info['h5loc'] split = table_info['split_h5'] h5singleton = table_info['h5singleton'] return cls( data, h5loc=loc, dtype=dt, split_h5=split, name=name, h5singleton=h5singleton
) @staticmethod def _check_column_length(values, n): values = np.atleast_2d(values) for v in values: if len(v) == n: continue else: raise ValueError( "Trying to append more than one column, but " "some arrays mismatch in length!" )
[docs] def append_columns(self, colnames, values, **kwargs): """Append new columns to the table. When appending a single column, ``values`` can be a scalar or an array of either length 1 or the same length as this array (the one it's appended to). In case of multiple columns, values must have the shape ``list(arrays)``, and the dimension of each array has to match the length of this array. See the docs for ``numpy.lib.recfunctions.append_fields`` for an explanation of the remaining options. """ n = len(self) if np.isscalar(values): values = np.full(n, values) values = np.atleast_1d(values) if not isinstance(colnames, str) and len(colnames) > 1: values = np.atleast_2d(values) self._check_column_length(values, n) if values.ndim == 1: if len(values) > n: raise ValueError("New Column is longer than existing table!") elif len(values) > 1 and len(values) < n: raise ValueError( "New Column is shorter than existing table, " "but not just one element!" ) elif len(values) == 1: values = np.full(n, values[0]) new_arr = rfn.append_fields( self, colnames, values, usemask=False, asrecarray=True, **kwargs ) return self.__class__( new_arr, h5loc=self.h5loc, split_h5=self.split_h5, name=self.name, h5singleton=self.h5singleton
)
[docs] def drop_columns(self, colnames, **kwargs): """Drop columns from the table. See the docs for ``numpy.lib.recfunctions.drop_fields`` for an explanation of the remaining options. """ new_arr = rfn.drop_fields( self, colnames, usemask=False, asrecarray=True, **kwargs ) return self.__class__( new_arr, h5loc=self.h5loc, split_h5=self.split_h5, name=self.name, h5singleton=self.h5singleton
)
[docs] def sorted(self, by, **kwargs): """Sort array by a column. Parameters ========== by: str Name of the columns to sort by(e.g. 'time'). """ sort_idc = np.argsort(self[by], **kwargs) return self.__class__( self[sort_idc], h5loc=self.h5loc, split_h5=self.split_h5, name=self.name
)
[docs] def to_dataframe(self): from pandas import DataFrame return DataFrame(self)
@classmethod
[docs] def from_dataframe(cls, df, **kwargs): rec = df.to_records(index=False) return cls(rec, **kwargs)
@classmethod
[docs] def merge(cls, tables, fillna=False): """Merge a list of tables""" cols = set(itertools.chain(*[table.dtype.descr for table in tables])) tables_to_merge = [] for table in tables: missing_cols = cols - set(table.dtype.descr) if missing_cols: if fillna: n = len(table) n_cols = len(missing_cols) col_names = [] for col_name, col_dtype in missing_cols: if 'f' not in col_dtype: raise ValueError( "Cannot create NaNs for non-float" " type column '{}'".format(col_name) ) col_names.append(col_name) table = table.append_columns( col_names, np.full((n_cols, n), np.nan) ) else: raise ValueError( "Table columns do not match. Use fill_na=True" " if you want to append missing values with NaNs" ) tables_to_merge.append(table) first_table = tables_to_merge[0] merged_table = sum(tables_to_merge[1:], first_table) merged_table.h5loc = first_table.h5loc merged_table.h5singleton = first_table.h5singleton merged_table.split_h5 = first_table.split_h5 merged_table.name = first_table.name return merged_table
def __add__(self, other): cols1 = set(self.dtype.descr) cols2 = set(other.dtype.descr) if len(cols1 ^ cols2) != 0: cols1 = set(self.dtype.names) cols2 = set(other.dtype.names) if len(cols1 ^ cols2) == 0: raise NotImplementedError else: raise TypeError("Table columns do not match") col_order = list(self.dtype.names) ret = self.copy() len_self = len(self) len_other = len(other) final_length = len_self + len_other ret.resize(final_length, refcheck=False) ret[len_self:] = other[col_order] return Table( ret, h5loc=self.h5loc, h5singleton=self.h5singleton, split_h5=self.split_h5, name=self.name ) def __str__(self): name = self.name spl = 'split' if self.split_h5 else 'no split' s = "{} {}\n".format(name, type(self)) s += "HDF5 location: {} ({})\n".format(self.h5loc, spl) s += "\n".join( map( lambda d: "{2} (dtype: {1}) = {0}".format(self[d[0]], *d), self.dtype.descr ) ) return s def __repr__(self): s = "{} {} (rows: {})".format(self.name, type(self), self.size) return s def __contains__(self, elem): return elem in self.dtype.names @property
[docs] def pos(self): return np.array([self.pos_x, self.pos_y, self.pos_z]).T
@pos.setter def pos(self, arr): try: self.pos_x self.pos_y self.pos_z except AttributeError: raise ValueError( "Table has no existing 'pos_{x,y,z}' entries. If you'd like " "to append positions to this table, please use the " "`.append_columns(['dir_x', 'dir_y', 'dir_z'], " "[pos_x, pos_y, pos_z])` method." ) arr = np.atleast_2d(arr) assert arr.shape[1] == 3 assert len(arr) == len(self) self.pos_x = arr[:, 0] self.pos_y = arr[:, 1] self.pos_z = arr[:, 2] @property
[docs] def dir(self): return np.array([self.dir_x, self.dir_y, self.dir_z]).T
@dir.setter def dir(self, arr): try: self.dir_x self.dir_y self.dir_z except AttributeError: raise ValueError( "Table has no existing 'dir_{x,y,z}' entries. If you'd like " "to append directions to this table, please use the " "`.append_columns(['dir_x', 'dir_y', 'dir_z'], " "[dir_x, dir_y, dir_z])` method." ) arr = np.atleast_2d(arr) assert arr.shape[1] == 3 assert len(arr) == len(self) self.dir_x = arr[:, 0] self.dir_y = arr[:, 1] self.dir_z = arr[:, 2] @property
[docs] def phi(self): from km3pipe.math import phi_separg return phi_separg(self.dir_x, self.dir_y)
@property
[docs] def theta(self): from km3pipe.math import theta_separg return theta_separg(self.dir_z)
@property
[docs] def zenith(self): from km3pipe.math import neutrino_to_source_direction _, zen = neutrino_to_source_direction(self.phi, self.theta) return zen
@property
[docs] def azimuth(self): from km3pipe.math import neutrino_to_source_direction azi, _ = neutrino_to_source_direction(self.phi, self.theta) return azi
@property
[docs] def triggered_rows(self): if not hasattr(self, 'triggered'): raise KeyError("Table has no 'triggered' column!") return self[self.triggered.astype(bool)]
class NDArray(np.ndarray): """Array with HDF5 metadata.""" def __new__(cls, array, dtype=None, order=None, **kwargs): obj = np.asarray(array, dtype=dtype, order=order).view(cls) h5loc = kwargs.get('h5loc', '/misc') title = kwargs.get('title', 'Unnamed NDArray') group_id = kwargs.get('group_id', None) obj.h5loc = h5loc obj.title = title obj.group_id = group_id return obj def __array_finalize__(self, obj): if obj is None: return self.h5loc = getattr(obj, 'h5loc', None) self.title = getattr(obj, 'title', None) self.group_id = getattr(obj, 'group_id', None) class Vec3(object): def __init__(self, x, y, z): self.x = x self.y = y self.z = z def __add__(self, other): return Vec3(*np.add(self, other)) def __radd__(self, other): return Vec3(*np.add(other, self)) def __sub__(self, other): return Vec3(*np.subtract(self, other)) def __rsub__(self, other): return Vec3(*np.subtract(other, self)) def __mul__(self, other): return Vec3(*np.multiply(self, other)) def __rmul__(self, other): return Vec3(*np.multiply(other, self)) def __div__(self, other): return self.__truediv__(other) def __truediv__(self, other): return Vec3(*np.divide(self, other)) def __array__(self, dtype=None): if dtype is not None: return np.array([self.x, self.y, self.z], dtype=dtype) else: return np.array([self.x, self.y, self.z]) def __getitem__(self, index): return self.__array__()[index]