Source code for pyts.multivariate.transformation.multivariate

"""Utility class for multivariate time series transformation."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

import numpy as np
from scipy.sparse import csr_matrix, hstack
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.utils.validation import check_is_fitted
from ..utils import check_3d_array

import sklearn
SKLEARN_VERSION = sklearn.__version__


[docs]class MultivariateTransformer(BaseEstimator, TransformerMixin): r"""Transformer for multivariate time series. It provides a convenient class to transform multivariate time series with transformers that can only deal with univariate time series. Parameters ---------- estimator : estimator object or list thereof Transformer. If one estimator is provided, it is cloned and each clone transforms one feature. If a list of estimators is provided, each estimator transforms one feature. flatten : bool (default = True) Affect shape of transform output. If True, ``transform`` returns an array with shape (n_samples, \*). If False, the output of ``transform`` from each estimator must have the same shape and ``transform`` returns an array with shape (n_samples, n_features, \*). Ignored if the transformers return sparse matrices. Attributes ---------- estimators_ : list of estimator objects The collection of fitted transformers. Examples -------- >>> from pyts.datasets import load_basic_motions >>> from pyts.multivariate.transformation import MultivariateTransformer >>> from pyts.image import GramianAngularField >>> X, _, _, _ = load_basic_motions(return_X_y=True) >>> transformer = MultivariateTransformer(GramianAngularField(), ... flatten=False) >>> X_new = transformer.fit_transform(X) >>> X_new.shape (40, 6, 100, 100) """
[docs] def __init__(self, estimator, flatten=True): self.estimator = estimator self.flatten = flatten
[docs] def fit(self, X, y=None): """Pass. Parameters ---------- X : array-like, shape = (n_samples, n_features, n_timestamps) Multivariate time series. y : None or array-like, shape = (n_samples,) (default = None) Class labels. Returns ------- self : object """ X = check_3d_array(X) _, n_features, _ = X.shape self._check_params(n_features) for i, transformer in enumerate(self.estimators_): transformer.fit(X[:, i, :], y) return self
[docs] def transform(self, X): r"""Apply transform to each feature. Parameters ---------- X : array-like, shape = (n_samples, n_features, n_timestamps) Multivariate time series. Returns ------- X_new : array, shape = (n_samples, *) or (n_samples, n_features, *) Transformed time series. """ X = check_3d_array(X) n_samples, _, _ = X.shape if SKLEARN_VERSION >= '0.22': check_is_fitted(self) else: check_is_fitted(self, 'estimators_') X_transformed = [transformer.transform(X[:, i, :]) for i, transformer in enumerate(self.estimators_)] all_sparse = np.all([isinstance(X_transformed_i, csr_matrix) for X_transformed_i in X_transformed]) if all_sparse: X_new = hstack(X_transformed) else: X_new = [self._convert_to_array(X_transformed_i) for X_transformed_i in X_transformed] ndims = [X_new_i.ndim for X_new_i in X_new] shapes = [X_new_i.shape for X_new_i in X_new] one_dim = (np.unique(ndims).size == 1) if one_dim: one_shape = np.unique(shapes, axis=0).shape[0] == 1 else: one_shape = False if (not one_shape) or self.flatten: X_new = [X_new_i.reshape(n_samples, -1) for X_new_i in X_new] X_new = np.concatenate(X_new, axis=1) else: X_new = np.asarray(X_new) axes = [1, 0] + [i for i in range(2, X_new.ndim)] X_new = np.transpose(X_new, axes=axes) return X_new
def _check_params(self, n_features): """Check parameters.""" if (isinstance(self.estimator, BaseEstimator) and hasattr(self.estimator, 'transform')): self.estimators_ = [clone(self.estimator) for _ in range(n_features)] elif isinstance(self.estimator, list): if len(self.estimator) != n_features: raise ValueError( "If 'estimator' is a list, its length must be equal to " "the number of features ({0} != {1})" .format(len(self.estimator), n_features) ) for i, estimator in enumerate(self.estimator): if not (isinstance(estimator, BaseEstimator) and hasattr(estimator, 'transform')): raise ValueError("Estimator {} must be a transformer." .format(i)) self.estimators_ = self.estimator else: raise TypeError( "'estimator' must be a transformer that inherits from " "sklearn.base.BaseEstimator or a list thereof.") @staticmethod def _convert_to_array(X): """Convert the input data to an array if necessary.""" if isinstance(X, csr_matrix): return X.A elif isinstance(X, np.ndarray): return X else: raise ValueError('Unexpected type for X: {}.' .format(type(X).__name__))