Source code for pyts.classification.tsbf

"""Code for Time Series Bag-of-Features."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

from math import ceil
from numba import njit
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils.validation import (
    check_array, check_is_fitted, check_random_state, check_X_y
)
from ..base import UnivariateClassifierMixin, UnivariateTransformerMixin


@njit
def extract_features(X, interval_indices, n_samples, n_subseries, n_intervals):
    # There are 3 features for each interval (mean, std, slope) plus
    # the mean, std over the whole subsequence and indices of the subsequence
    n_features = n_intervals * 3 + 4
    X_features = np.zeros((n_samples * n_subseries, n_features))

    for j in range(n_subseries):
        for k in range(n_intervals):

            # Get the start and end indices of the interval
            start, end = interval_indices[:, j, k]
            arange = np.arange((start - end + 1) / 2, (end + 1 - start) / 2)
            if end - start == 1:
                var_arange = 1.
            else:
                var_arange = np.sum(arange ** 2)

            for i in range(n_samples):
                i_j = i * n_subseries + j
                mean = np.mean(X[i, start:end])

                # Compute the three statistics of the interval
                X_features[i_j, 3 * k] = mean
                X_features[i_j, 3 * k + 1] = np.std(X[i, start:end])
                X_features[i_j, 3 * k + 2] = (
                    np.sum((X[i, start:end] - mean) * arange) / var_arange
                )

        start, end = interval_indices[0, j, 0], interval_indices[1, j, -1]
        for i in range(n_samples):
            i_j = i * n_subseries + j

            # Compute the four statistics of the subseries
            X_features[i_j, -4] = np.mean(X[i, start:end])
            X_features[i_j, -3] = np.std(X[i, start:end])
            X_features[i_j, -2] = start
            X_features[i_j, -1] = end

    return X_features


@njit
def histogram(X, bins, n_bins, n_samples, n_classes):
    X_new = np.empty((n_samples, (n_bins + 1) * n_classes))
    for i in range(n_samples):
        for j in range(n_classes):
            X_new[i, j * n_bins: (j + 1) * n_bins] = np.histogram(
                X[i, :, j], bins, (0., 1.))[0]
            X_new[i, - n_classes + j] = np.mean(X[i, :, j])
    return X_new


class IntervalFeatureExtractor(BaseEstimator, UnivariateTransformerMixin):
    """Feature extractor over the intervals of a subsequence.

    This transformer extracts 3 features from each interval of each
    subsequence: the mean, the standard deviation and the slope.

    Parameters
    ----------
    min_subsequence_size : int or float (default = 0.5)
        The minimum length of the subsequences. If float, it represents a
        percentage of the size of each time series. Note that the actual
        minimum length of the subsequences may be slighty lower due to the use
        of the floor function.

    min_interval_size : int or float (default = 0.1)
        The minimum length of the intervals. If float, it represents a
        percentage of the size of each time series. Must be lower than
        `min_subsequence_size`.

    n_subsequences : 'auto', int or float (default = 'auto')
        The number of considered subsequences. If 'auto', it is automatically
        calculated given the size of the time series and the values of the
        parameters. If float, it represents a percentage of the size of each
        time series.

    bins : int or array-like (default = 10)
        If bins is an int, it defines the number of equal-width bins in range
        [0, 1]. If bins is array-like, it defines a monotonically increasing
        array of bin edges, including the rightmost edge, allowing for
        non-uniform bin widths.

    random_state : None, int or RandomState instance (default = None)
        The seed of the pseudo random number generator to use when shuffling
        the data. If int, random_state is the seed used by the random number
        generator. If RandomState instance, random_state is the random number
        generator. If None, the random number generator is the RandomState
        instance used by `np.random`.

    Attributes
    ----------
    interval_indices_ : array, shape = (n_subsequences, n_intervals + 1)
        The indices for the intervals of each subsequence.

    min_subsequence_size_ : int
        The actual minimum length of the subsequences.

    """
    def __init__(self, min_subsequence_size=0.5, min_interval_size=0.1,
                 n_subsequences='auto', random_state=None):
        self.min_subsequence_size = min_subsequence_size
        self.min_interval_size = min_interval_size
        self.n_subsequences = n_subsequences
        self.random_state = random_state

    def fit(self, X, y=None):
        """Fit the model according to the given training data.

        It generates the indices of the windows from which the features will be
        extracted.

        Parameters
        ----------
        X : array-like, shape = (n_samples, n_timestamps)
            Univariate time series.

        y
            Ignored

        Returns
        -------
        self : object

        """
        X = check_array(X, dtype='float64')
        n_timestamps = X.shape[1]

        (min_subsequence_size, min_interval_size, n_subsequences,
         rng) = self._check_params(X, n_timestamps)

        # Define the parameters of the subseries and intervals
        n_intervals = int(min_subsequence_size / min_interval_size)
        min_subseries_size = min_interval_size * n_intervals
        if n_subsequences == 'auto':
            n_subseries = max(
                1, n_timestamps // min_interval_size - n_intervals)
        else:
            n_subseries = n_subsequences

        # Generate the indices of the subseries
        subseries_start = rng.randint(
            0, n_timestamps - min_subseries_size + 1, size=n_subseries
        )
        subseries_end = rng.randint(
            subseries_start + min_subseries_size, n_timestamps + 1,
            size=n_subseries
        )

        # Generate the indices of the intervals
        interval_indices = np.linspace(
            subseries_start, subseries_end, n_intervals + 1, axis=1
        ).astype('int64')

        # Save the relevant parameters as attributes
        self.interval_indices_ = interval_indices
        self.min_subsequence_size_ = min_subsequence_size

        return self

    def transform(self, X):
        """Transform the provided data.

        It extracts the three features from all the selected windows
        for all the samples.

        Parameters
        ----------
        X : array-like, shape = (n_samples, n_timestamps)
            Univariate time series.

        Returns
        -------
        X_new : array, shape = (n_samples * n_subseries, 3 * n_intervals + 4)
            Extracted features.

        """
        X = check_array(X, dtype='float64')
        n_samples = X.shape[0]

        # Extract the features from each interval
        interval_indices = np.array(
            [self.interval_indices_[:, :-1], self.interval_indices_[:, 1:]]
        )

        n_subseries, n_intervals = interval_indices.shape[1:]
        X_features = extract_features(X, interval_indices, n_samples,
                                      n_subseries, n_intervals)
        return X_features

    def _check_params(self, X, n_timestamps):

        if not isinstance(self.min_subsequence_size,
                          (int, np.integer, float, np.floating)):
            raise TypeError("'min_subsequence_size' must be an integer or a "
                            "float.")
        if isinstance(self.min_subsequence_size, (int, np.integer)):
            if not 1 <= self.min_subsequence_size <= n_timestamps:
                raise ValueError(
                    "If 'min_subsequence_size' is an integer, it must be "
                    "greater than or equal to 1 and lower than or equal to "
                    "n_timestamps (got {0}).".format(self.min_subsequence_size)
                )
            min_subsequence_size = self.min_subsequence_size
        else:
            if not (0 < self.min_subsequence_size <= 1.):
                raise ValueError(
                    "If 'min_subsequence_size' is a float, it must be greater "
                    "than 0 and lower than or equal to 1 (got {0})."
                    .format(self.min_subsequence_size)
                )
            min_subsequence_size = ceil(
                self.min_subsequence_size * n_timestamps)

        if not isinstance(self.min_interval_size,
                          (int, np.integer, float, np.floating)):
            raise TypeError("'min_interval_size' must be an integer or a "
                            "float.")
        if isinstance(self.min_interval_size, (int, np.integer)):
            if self.min_interval_size < 1:
                raise ValueError(
                    "If 'min_interval_size' is an integer, it must be "
                    "positive (got {0}).".format(self.min_interval_size)
                )
            min_interval_size = self.min_interval_size
        else:
            if self.min_interval_size <= 0:
                raise ValueError(
                    "If 'min_interval_size' is a float, it must be greater "
                    "than 0 (got {0}).".format(self.min_interval_size)
                )
            min_interval_size = ceil(self.min_interval_size * n_timestamps)

        if min_interval_size > min_subsequence_size:
            raise ValueError("'min_interval_size' must be lower than or equal "
                             "to 'min_subsequence_size' ({} > {})."
                             .format(min_interval_size, min_subsequence_size))

        n_subsequences_auto = (isinstance(self.n_subsequences, str) and
                               self.n_subsequences == 'auto')
        if not (n_subsequences_auto or isinstance(
            self.n_subsequences, (int, np.integer, float, np.floating))
        ):
            raise TypeError("'n_subsequences' must be 'auto', an integer or a "
                            "float.")
        if n_subsequences_auto:
            n_subsequences = 'auto'
        elif isinstance(self.n_subsequences, (int, np.integer)):
            if self.n_subsequences < 1:
                raise ValueError(
                    "If 'n_subsequences' is an integer, it must be positive "
                    "(got {0}).".format(self.n_subsequences)
                )
            n_subsequences = self.n_subsequences
        else:
            if self.n_subsequences <= 0:
                raise ValueError(
                    "If 'n_subsequences' is a float, it must be greater "
                    "than 0 (got {0}).".format(self.n_subsequences)
                )
            n_subsequences = ceil(self.n_subsequences * n_timestamps)

        rng = check_random_state(self.random_state)

        return (min_subsequence_size, min_interval_size,
                n_subsequences, rng)


[docs]class TSBF(BaseEstimator, UnivariateClassifierMixin): """Time Series Bag-of-Features algorithm. Parameters ---------- n_estimators : int (default = 500) The number of trees in the forest. min_subsequence_size : int or float (default = 0.5) The minimum length of the subsequences. If float, it represents a percentage of the size of each time series. Note that the actual minimum length of the subsequences may be slighty lower due to the use of the floor function. min_interval_size : int or float (default = 0.1) The minimum length of the intervals. If float, it represents a percentage of the size of each time series. Must be lower than `min_subsequence_size`. n_subsequences : 'auto', int or float (default = 'auto') The number of considered subsequences. If 'auto', it is automatically calculated given the size of the time series and the values of the parameters. If float, it represents a percentage of the size of each time series. bins : int or array-like (default = 10) If bins is an int, it defines the number of equal-width bins in range [0, 1]. If bins is array-like, it defines a monotonically increasing array of bin edges, including the rightmost edge, allowing for non-uniform bin widths. criterion : str (default = "entropy") The function to measure the quality of a split. Supported criteria are "gini" for the Gini impurity and "entropy" for the information gain. Note: this parameter is tree-specific. max_depth : integer or None (default = None) The maximum depth of the tree. If None, then nodes are expanded until all leaves are pure or until all leaves contain less than ``min_samples_split`` samples. min_samples_split : int or float (default = 2) The minimum number of samples required to split an internal node: - If int, then consider ``min_samples_split`` as the minimum number. - If float, then ``min_samples_split`` is a fraction and ``ceil(min_samples_split * n_samples)`` are the minimum number of samples for each split. min_samples_leaf : int or float (default = 1) The minimum number of samples required to be at a leaf node. A split point at any depth will only be considered if it leaves at least ``min_samples_leaf`` training samples in each of the left and right branches. This may have the effect of smoothing the model. - If int, then consider ``min_samples_leaf`` as the minimum number. - If float, then ``min_samples_leaf`` is a fraction and ``ceil(min_samples_leaf * n_samples)`` are the minimum number of samples for each node. min_weight_fraction_leaf : float (default = 0.) The minimum weighted fraction of the sum total of weights (of all the input samples) required to be at a leaf node. max_features : int, float, str or None (default = "sqrt") The number of features to consider when looking for the best split: - If int, then consider ``max_features`` features at each split. - If float, then ``max_features`` is a fraction and ``int(max_features * n_features)`` features are considered at each split. - If "sqrt", then ``max_features=sqrt(n_features)``. - If "log2", then ``max_features=log2(n_features)``. - If None, then ``max_features=n_features``. max_leaf_nodes : int or None (default = None) Grow trees with ``max_leaf_nodes`` in best-first fashion. Best nodes are defined as relative reduction in impurity. If None then unlimited number of leaf nodes. min_impurity_decrease : float (default = 0.) A node will be split if this split induces a decrease of the impurity greater than or equal to this value. The weighted impurity decrease equation is the following:: N_t / N * (impurity - N_t_R / N_t * right_impurity - N_t_L / N_t * left_impurity) where ``N`` is the total number of samples, ``N_t`` is the number of samples at the current node, ``N_t_L`` is the number of samples in the left child, and ``N_t_R`` is the number of samples in the right child. bootstrap : bool (default = True) Whether bootstrap samples are used when building trees. If False, the whole datset is used to build each tree. oob_score : bool (default = False) Whether to use out-of-bag samples to estimate the generalization accuracy. n_jobs : int or None, optional (default = None) The number of jobs to run in parallel. :meth:`fit`, :meth:`predict`, :meth:`decision_path` and :meth:`apply` are all parallelized over the trees. ``None`` means 1 unless in a ``joblib.parallel_backend`` context. ``-1`` means using all processors. random_state : int, RandomState instance or None (default = None) Controls both the randomness of the bootstrapping of the samples used when building trees (if ``bootstrap=True``) and the sampling of the features to consider when looking for the best split at each node (if ``max_features < n_features``). verbose : int (default = 0) Controls the verbosity when fitting and predicting. class_weight : dict, "balanced", "balanced_subsample" or None (default = None) Weights associated with classes in the form ``{class_label: weight}``. If not given, all classes are supposed to have weight one. The "balanced" mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as ``n_samples / (n_classes * np.bincount(y))`` The "balanced_subsample" mode is the same as "balanced" except that weights are computed based on the bootstrap sample for every tree grown. ccp_alpha : float (default = 0.) Complexity parameter used for Minimal Cost-Complexity Pruning. The subtree with the largest cost complexity that is smaller than ``ccp_alpha`` will be chosen. By default, no pruning is performed. It must be non-negative. max_samples : int, float or None (default = None) If bootstrap is True, the number of samples to draw from X to train each base estimator: - If None (default), then draw ``X.shape[0]`` samples. - If int, then draw ``max_samples`` samples. - If float, then draw ``max_samples * X.shape[0]`` samples. Thus, ``max_samples`` should be in the interval `(0, 1)`. Attributes ---------- estimator_ : DecisionTreeClassifier The child estimator template used to create the collection of fitted sub-estimators. classes_ : array, shape = (n_classes,) The classes labels. estimators_ : list of DecisionTreeClassifier The collection of fitted sub-estimators. feature_importances_ : array, shape = (n_features,) The feature importances (the higher, the more important the feature). interval_indices_ : array, shape = (n_subsequences, n_intervals + 1) The indices for the intervals of each subsequence. min_subsequence_size_ : int The actual minimum length of the subsequences. n_features_in_ : int The number of features when ``fit`` is performed. oob_decision_function_ : None or array, shape = (n_samples, n_classes) Decision function computed with out-of-bag estimate on the training set. If n_estimators is small it might be possible that a data point was never left out during the bootstrap. In this case, `oob_decision_function_` might contain NaN. This attribute is not None only when ``oob_score`` is True. oob_score_ : None or float Score of the training dataset obtained using an out-of-bag estimate. This attribute is not None only when ``oob_score`` is True. Examples -------- >>> from pyts.datasets import load_gunpoint >>> from pyts.classification import TSBF >>> X_train, X_test, y_train, y_test = load_gunpoint(return_X_y=True) >>> clf = TSBF(random_state=43) >>> clf.fit(X_train, y_train) TSBF(...) >>> clf.score(X_test, y_test) 0.97333... Notes ----- The default values for the parameters controlling the size of the trees (e.g. ``max_depth``, ``min_samples_leaf``, etc.) lead to fully grown and unpruned trees which can potentially be very large on some data sets. To reduce memory consumption, the complexity and size of the trees should be controlled by setting those parameter values. The features are always randomly permuted at each split. Therefore, the best found split may vary, even with the same training data, ``max_features=n_features`` and ``bootstrap=False``, if the improvement of the criterion is identical for several splits enumerated during the search of the best split. To obtain a deterministic behaviour during fitting, ``random_state`` has to be fixed. References ---------- .. [1] M.G. Baydogan, G. Runger and E. Tuv, "A Bag-of-Features Framework to Classify Time Series". IEEE Transactions on Pattern Analysis and Machine Intelligence, 35(11), 2796-2802 (2013). .. [2] Leo Breiman, "Random Forests", Machine Learning, 45(1), 5-32, 2001. """ # noqa: E501
[docs] def __init__(self, n_estimators=500, min_subsequence_size=0.5, min_interval_size=0.1, n_subsequences='auto', bins=10, criterion="entropy", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0., max_features="sqrt", max_leaf_nodes=None, min_impurity_decrease=0., bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, class_weight=None, ccp_alpha=0.0, max_samples=None): self.n_estimators = n_estimators self.min_subsequence_size = min_subsequence_size self.min_interval_size = min_interval_size self.n_subsequences = n_subsequences self.bins = bins self.criterion = criterion self.max_depth = max_depth self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_weight_fraction_leaf = min_weight_fraction_leaf self.max_features = max_features self.max_leaf_nodes = max_leaf_nodes self.min_impurity_decrease = min_impurity_decrease self.bootstrap = bootstrap self.oob_score = oob_score self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose self.class_weight = class_weight self.ccp_alpha = ccp_alpha self.max_samples = max_samples
[docs] def apply(self, X): """Apply trees in the forest to X, return leaf indices. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Univariate time series. Returns ------- X_leaves : array_like, shape = (n_samples, n_estimators) For each datapoint x in X and for each tree in the forest, return the index of the leaf x ends up in. """ X_binned = self._transform(X) return self._clf.apply(X_binned)
[docs] def decision_path(self, X): """Return the decision path in the forest. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Univariate time series. Returns ------- indicator : sparse csr array, shape = (n_samples, n_nodes) Return a node indicator matrix where non zero elements indicates that the samples goes through the nodes. n_nodes_ptr : array, shape = (n_estimators + 1,) The columns from indicator[n_nodes_ptr[i]:n_nodes_ptr[i+1]] gives the indicator value for the i-th estimator. """ X_binned = self._transform(X) return self._clf.decision_path(X_binned)
[docs] def fit(self, X, y): """Fit the model according to the given training data. It build a forest of trees from the training set. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Univariate time series. y : array-like, shape = (n_samples,) Class labels for each sample. Returns ------- self : object """ # Extract some information from the dataset X, y = check_X_y(X, y) n_samples, n_timestamps = X.shape n_classes = np.unique(y).size bins = self._check_params() self._bins = bins # Create the instances of the feature extractor anf classifiers feature_extractor = IntervalFeatureExtractor( min_subsequence_size=self.min_subsequence_size, min_interval_size=self.min_interval_size, n_subsequences=self.n_subsequences, random_state=self.random_state ) rfc = RandomForestClassifier( n_estimators=self.n_estimators, criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, min_weight_fraction_leaf=self.min_weight_fraction_leaf, max_features=self.max_features, max_leaf_nodes=self.max_leaf_nodes, min_impurity_decrease=self.min_impurity_decrease, bootstrap=self.bootstrap, n_jobs=self.n_jobs, random_state=self.random_state, verbose=self.verbose, class_weight=self.class_weight, ccp_alpha=self.ccp_alpha, max_samples=self.max_samples, warm_start=False, oob_score=True ) clf = RandomForestClassifier( n_estimators=self.n_estimators, criterion=self.criterion, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, min_weight_fraction_leaf=self.min_weight_fraction_leaf, max_features=self.max_features, max_leaf_nodes=self.max_leaf_nodes, min_impurity_decrease=self.min_impurity_decrease, bootstrap=self.bootstrap, oob_score=self.oob_score, n_jobs=self.n_jobs, random_state=self.random_state, verbose=self.verbose, class_weight=self.class_weight, ccp_alpha=self.ccp_alpha, max_samples=self.max_samples, warm_start=False, ) # Fit the random forest and get out-of-bag probabilities X_features = feature_extractor.fit_transform(X) n_subsequences = feature_extractor.interval_indices_.shape[0] y_features = np.repeat(y, n_subsequences) rfc.fit(X_features, y_features) X_oob_proba = rfc.oob_decision_function_.reshape( n_samples, n_subsequences, n_classes) # Check for subsequences without OOB scores no_oob_scores = ( (np.isnan(X_oob_proba).any() or np.all(X_oob_proba == 0., axis=2).any()) ) if no_oob_scores: raise ValueError( "At least one sample was never left out during the bootstrap. " "Increase the number of trees (n_estimators)." ) # Bin the probabilities if isinstance(bins, (int, np.integer)): n_bins = bins else: n_bins = len(bins) - 1 X_binned = histogram(X_oob_proba, bins, n_bins, n_samples, n_classes) # Fit a random forest on the binned probabilities clf.fit(X_binned, y) # Get attributes self.estimator_ = clf.estimator_ self.classes_ = clf.classes_ self.estimators_ = clf.estimators_ self.feature_importances_ = clf.feature_importances_ self.interval_indices_ = feature_extractor.interval_indices_ self.min_subsequence_size_ = feature_extractor.min_subsequence_size_ self.n_features_in_ = ( clf.n_features_in_ if hasattr(clf, 'n_features_in_') else clf.n_features_ ) self.oob_decision_function_ = getattr( clf, 'oob_decision_function_', None) self.oob_score_ = getattr(clf, 'oob_score_', None) self._feature_extractor = feature_extractor self._rfc = rfc self._clf = clf return self
[docs] def predict(self, X): """Predict class for X. The predicted class of an input time series is a vote by the trees in the forest, weighted by their probability estimates. That is, the predicted class is the one with highest mean probability estimate across the trees. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Univariate time series. Returns ------- y : array, shape = (n_samples,) The predicted classes. """ X_binned = self._transform(X) return self._clf.predict(X_binned)
[docs] def predict_proba(self, X): """Predict class probabilities for X. The predicted class probabilities of an input time series are computed as the mean predicted class probabilities of the trees in the forest. The class probability of a single tree is the fraction of samples of the same class in a leaf. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Univariate time series. Returns ------- p : array, shape = (n_samples, n_classes) The class probabilities of the input time series. The order of the classes corresponds to that in the attribute `classes_`. """ X_binned = self._transform(X) return self._clf.predict_proba(X_binned)
def _check_params(self): if not isinstance(self.bins, (int, np.integer, list, tuple, np.ndarray)): raise TypeError("'bins' must be an integer or array-like.") if isinstance(self.bins, (int, np.integer)): bins = self.bins else: bins = np.asarray(self.bins) if (np.diff(bins) < 0).any(): raise ValueError("If 'bins' is array-like, the bin edges " "must increase monotonically.") return bins def _transform(self, X): check_is_fitted(self) X = check_array(X, dtype='float64') n_samples = X.shape[0] n_subsequences = self.interval_indices_.shape[0] n_classes = len(self.classes_) # Predict probabilities using the inner random forest X_features = self._feature_extractor.transform(X) X_proba = self._rfc.predict_proba(X_features).reshape( n_samples, n_subsequences, n_classes) # Bin the probabilities if isinstance(self._bins, (int, np.integer)): n_bins = self._bins else: n_bins = len(self._bins) - 1 X_binned = histogram(X_proba, self._bins, n_bins, n_samples, n_classes) return X_binned