"""Code for Time Series Forest."""
# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause
from math import ceil
from numba import njit
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.utils.validation import (
check_array, check_is_fitted, check_random_state)
from ..base import UnivariateClassifierMixin, UnivariateTransformerMixin
@njit()
def extract_features(X, n_samples, n_windows, indices):
X_new = np.empty((n_samples, 3 * n_windows))
for j in range(n_windows):
start, end = indices[j]
arange = np.arange((start - end + 1) / 2, (end + 1 - start) / 2)
if end - start == 1:
var_arange = 1.
else:
var_arange = np.sum(arange ** 2)
for i in range(n_samples):
mean = np.mean(X[i, start:end])
X_new[i, 3 * j] = mean
X_new[i, 3 * j + 1] = np.std(X[i, start:end])
X_new[i, 3 * j + 2] = (
np.sum((X[i, start:end] - mean) * arange) / var_arange
)
return X_new
class WindowFeatureExtractor(BaseEstimator, UnivariateTransformerMixin):
"""Feature extractor over a window.
This transformer extracts 3 features from each window: the mean, the
standard deviation and the slope.
Parameters
----------
n_windows : int or float (default = 1.)
The number of windows from which features are extracted.
min_window_size : int or float (default = 1)
The minimum length of the windows. If float, it represents a percentage
of the size of each time series.
random_state : None, int or RandomState instance (default = None)
The seed of the pseudo random number generator to use when shuffling
the data. If int, random_state is the seed used by the random number
generator. If RandomState instance, random_state is the random number
generator. If None, the random number generator is the RandomState
instance used by `np.random`.
Attributes
----------
indices_ : array, shape = (n_windows, 2)
The indices for the windows.
The first column consists of the starting indices (included)
of the windows. The second column consists of the ending indices
(excluded) of the windows.
"""
def __init__(self, n_windows=1., min_window_size=1, random_state=None):
self.n_windows = n_windows
self.min_window_size = min_window_size
self.random_state = random_state
def fit(self, X, y=None):
"""Fit the model according to the given training data.
It generates the indices of the windows from which the features will be
extracted.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
y
Ignored
Returns
-------
self : object
"""
# Check
X = check_array(X, dtype='float64')
n_timestamps = X.shape[1]
n_windows, min_window_size, rng = self._check_params(X)
# Generate the start and end indices
start = rng.randint(0, n_timestamps - min_window_size, size=n_windows)
end = rng.randint(start + min_window_size, n_timestamps + 1,
size=n_windows)
self.indices_ = np.c_[start, end]
return self
def transform(self, X):
"""Transform the provided data.
It extracts the three features from all the selected windows
for all the samples.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
Returns
-------
X_new : array, shape = (n_samples, 3 * n_windows)
Extracted features.
"""
X = check_array(X, dtype='float64')
check_is_fitted(self)
# Extract the features from each window
n_samples = X.shape[0]
n_windows = self.indices_.shape[0]
return extract_features(X, n_samples, n_windows, self.indices_)
def _check_params(self, X):
n_samples, n_timestamps = X.shape
if not isinstance(self.n_windows,
(int, np.integer, float, np.floating)):
raise TypeError("'n_windows' must be an integer or a float.")
if isinstance(self.n_windows, (int, np.integer)):
if self.n_windows < 1:
raise ValueError(
"If 'n_windows' is an integer, it must be positive "
"(got {0}).".format(self.n_windows)
)
n_windows = self.n_windows
else:
if self.n_windows <= 0:
raise ValueError(
"If 'n_windows' is a float, it must be greater "
"than 0 (got {0}).".format(self.n_windows)
)
n_windows = ceil(self.n_windows * n_timestamps)
if not isinstance(self.min_window_size,
(int, np.integer, float, np.floating)):
raise TypeError("'min_window_size' must be an integer or a float.")
if isinstance(self.min_window_size, (int, np.integer)):
if not 1 <= self.min_window_size <= n_timestamps:
raise ValueError(
"If 'min_window_size' is an integer, it must be greater "
"than or equal to 1 and lower than or equal to "
"n_timestamps (got {0}).".format(self.min_window_size)
)
min_window_size = self.min_window_size
else:
if not 0 < self.min_window_size <= 1:
raise ValueError(
"If 'min_window_size' is a float, it must be greater "
"than 0 and lower than or equal to 1 (got {}).".
format(self.min_window_size)
)
min_window_size = ceil(self.min_window_size * n_timestamps)
rng = check_random_state(self.random_state)
return n_windows, min_window_size, rng
[docs]class TimeSeriesForest(BaseEstimator, UnivariateClassifierMixin):
"""A random forest classifier for time series.
A random forest is a meta estimator that fits a number of decision tree
classifiers on various sub-samples of the dataset and uses averaging to
improve the predictive accuracy and control over-fitting.
This transformer extracts 3 features from each window: the mean, the
standard deviation and the slope. The total number of features is thus
equal to ``3 * n_windows``. Then a random forest is built using
these features as input data.
Parameters
----------
n_estimators : int (default = 500)
The number of trees in the forest.
n_windows : int or float (default = 1.)
The number of windows from which features are extracted.
min_window_size : int or float (default = 1)
The minimum length of the windows. If float, it represents a percentage
of the size of each time series.
criterion : str (default = "entropy")
The function to measure the quality of a split. Supported criteria are
"gini" for the Gini impurity and "entropy" for the information gain.
Note: this parameter is tree-specific.
max_depth : integer or None (default = None)
The maximum depth of the tree. If None, then nodes are expanded until
all leaves are pure or until all leaves contain less than
``min_samples_split`` samples.
min_samples_split : int or float (default = 2)
The minimum number of samples required to split an internal node:
- If int, then consider ``min_samples_split`` as the minimum number.
- If float, then ``min_samples_split`` is a fraction and
``ceil(min_samples_split * n_samples)`` are the minimum
number of samples for each split.
min_samples_leaf : int or float (default = 1)
The minimum number of samples required to be at a leaf node.
A split point at any depth will only be considered if it leaves at
least ``min_samples_leaf`` training samples in each of the left and
right branches. This may have the effect of smoothing the model.
- If int, then consider ``min_samples_leaf`` as the minimum number.
- If float, then ``min_samples_leaf`` is a fraction and
``ceil(min_samples_leaf * n_samples)`` are the minimum
number of samples for each node.
min_weight_fraction_leaf : float (default = 0.)
The minimum weighted fraction of the sum total of weights (of all
the input samples) required to be at a leaf node.
max_features : int, float, str or None (default = "sqrt")
The number of features to consider when looking for the best split:
- If int, then consider ``max_features`` features at each split.
- If float, then ``max_features`` is a fraction and
``int(max_features * n_features)`` features are considered at each
split.
- If "sqrt", then ``max_features=sqrt(n_features)``.
- If "log2", then ``max_features=log2(n_features)``.
- If None, then ``max_features=n_features``.
max_leaf_nodes : int or None (default = None)
Grow trees with ``max_leaf_nodes`` in best-first fashion.
Best nodes are defined as relative reduction in impurity.
If None then unlimited number of leaf nodes.
min_impurity_decrease : float (default = 0.)
A node will be split if this split induces a decrease of the impurity
greater than or equal to this value.
The weighted impurity decrease equation is the following::
N_t / N * (impurity - N_t_R / N_t * right_impurity
- N_t_L / N_t * left_impurity)
where ``N`` is the total number of samples, ``N_t`` is the number of
samples at the current node, ``N_t_L`` is the number of samples in the
left child, and ``N_t_R`` is the number of samples in the right child.
bootstrap : bool (default = True)
Whether bootstrap samples are used when building trees. If False, the
whole datset is used to build each tree.
oob_score : bool (default = False)
Whether to use out-of-bag samples to estimate
the generalization accuracy.
n_jobs : int or None, optional (default = None)
The number of jobs to run in parallel. :meth:`fit`, :meth:`predict`,
:meth:`decision_path` and :meth:`apply` are all parallelized over the
trees. ``None`` means 1 unless in a ``joblib.parallel_backend``
context. ``-1`` means using all processors.
random_state : int, RandomState instance or None (default = None)
Controls both the randomness of the bootstrapping of the samples used
when building trees (if ``bootstrap=True``) and the sampling of the
features to consider when looking for the best split at each node
(if ``max_features < n_features``).
verbose : int (default = 0)
Controls the verbosity when fitting and predicting.
class_weight : dict, "balanced", "balanced_subsample" or None (default = None)
Weights associated with classes in the form ``{class_label: weight}``.
If not given, all classes are supposed to have weight one.
The "balanced" mode uses the values of y to automatically adjust
weights inversely proportional to class frequencies in the input data
as ``n_samples / (n_classes * np.bincount(y))``
The "balanced_subsample" mode is the same as "balanced" except that
weights are computed based on the bootstrap sample for every tree
grown.
ccp_alpha : float (default = 0.)
Complexity parameter used for Minimal Cost-Complexity Pruning. The
subtree with the largest cost complexity that is smaller than
``ccp_alpha`` will be chosen. By default, no pruning is performed.
It must be non-negative.
max_samples : int, float or None (default = None)
If bootstrap is True, the number of samples to draw from X
to train each base estimator:
- If None (default), then draw ``X.shape[0]`` samples.
- If int, then draw ``max_samples`` samples.
- If float, then draw ``max_samples * X.shape[0]`` samples. Thus,
``max_samples`` should be in the interval `(0, 1)`.
Attributes
----------
estimator_ : DecisionTreeClassifier
The child estimator template used to create the collection of fitted
sub-estimators.
classes_ : array, shape = (n_classes,)
The classes labels.
estimators_ : list of DecisionTreeClassifier
The collection of fitted sub-estimators.
feature_importances_ : array, shape = (n_features,)
The feature importances (the higher, the more important the feature).
indices_ : array, shape = (n_windows, 2)
The indices for the windows.
The first column consists of the starting indices (included)
of the windows. The second column consists of the ending indices
(excluded) of the windows.
n_features_in_ : int
The number of features when ``fit`` is performed.
oob_decision_function_ : None or array, shape = (n_samples, n_classes)
Decision function computed with out-of-bag estimate on the training
set. If n_estimators is small it might be possible that a data point
was never left out during the bootstrap. In this case,
`oob_decision_function_` might contain NaN. This attribute is not None
only when ``oob_score`` is True.
oob_score_ : None or float
Score of the training dataset obtained using an out-of-bag estimate.
This attribute is not None only when ``oob_score`` is True.
Examples
--------
>>> from pyts.datasets import load_gunpoint
>>> from pyts.classification import TimeSeriesForest
>>> X_train, X_test, y_train, y_test = load_gunpoint(return_X_y=True)
>>> clf = TimeSeriesForest(random_state=43)
>>> clf.fit(X_train, y_train)
TimeSeriesForest(...)
>>> clf.score(X_test, y_test)
0.97333...
Notes
-----
The default values for the parameters controlling the size of the trees
(e.g. ``max_depth``, ``min_samples_leaf``, etc.) lead to fully grown and
unpruned trees which can potentially be very large on some data sets. To
reduce memory consumption, the complexity and size of the trees should be
controlled by setting those parameter values.
The features are always randomly permuted at each split. Therefore,
the best found split may vary, even with the same training data,
``max_features=n_features`` and ``bootstrap=False``, if the improvement
of the criterion is identical for several splits enumerated during the
search of the best split. To obtain a deterministic behaviour during
fitting, ``random_state`` has to be fixed.
References
----------
.. [1] H. Deng, G. Runger, E. Tuv and M. Vladimir, "A Time Series
Forest for Classification and Feature Extraction".
Information Sciences, 239, 142-153 (2013).
.. [2] Leo Breiman, "Random Forests", Machine Learning, 45(1), 5-32, 2001.
""" # noqa: E501
[docs] def __init__(self,
n_estimators=500,
n_windows=1.,
min_window_size=1,
criterion="entropy",
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
min_weight_fraction_leaf=0.,
max_features="sqrt",
max_leaf_nodes=None,
min_impurity_decrease=0.,
bootstrap=True,
oob_score=False,
n_jobs=None,
random_state=None,
verbose=0,
class_weight=None,
ccp_alpha=0.0,
max_samples=None):
self.n_estimators = n_estimators
self.n_windows = n_windows
self.min_window_size = min_window_size
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.min_weight_fraction_leaf = min_weight_fraction_leaf
self.max_features = max_features
self.max_leaf_nodes = max_leaf_nodes
self.min_impurity_decrease = min_impurity_decrease
self.bootstrap = bootstrap
self.oob_score = oob_score
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
self.class_weight = class_weight
self.ccp_alpha = ccp_alpha
self.max_samples = max_samples
[docs] def apply(self, X):
"""Apply trees in the forest to X, return leaf indices.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
Returns
-------
X_leaves : array_like, shape = (n_samples, n_estimators)
For each datapoint x in X and for each tree in the forest,
return the index of the leaf x ends up in.
"""
check_is_fitted(self)
X = check_array(X, dtype='float64')
X_new = self._pipeline['fe'].transform(X)
return self._pipeline['rfc'].apply(X_new)
[docs] def decision_path(self, X):
"""Return the decision path in the forest.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
Returns
-------
indicator : sparse csr array, shape = (n_samples, n_nodes)
Return a node indicator matrix where non zero elements
indicates that the samples goes through the nodes.
n_nodes_ptr : array, shape = (n_estimators + 1,)
The columns from indicator[n_nodes_ptr[i]:n_nodes_ptr[i+1]]
gives the indicator value for the i-th estimator.
"""
check_is_fitted(self)
X = check_array(X, dtype='float64')
X_new = self._pipeline['fe'].transform(X)
return self._pipeline['rfc'].decision_path(X_new)
[docs] def fit(self, X, y):
"""Fit the model according to the given training data.
It build a forest of trees from the training set.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
y : array-like, shape = (n_samples,)
Class labels for each sample.
Returns
-------
self : object
"""
# Create and fit the pipeline
feature_extractor = WindowFeatureExtractor(
n_windows=self.n_windows, min_window_size=self.min_window_size,
random_state=self.random_state
)
rfc = RandomForestClassifier(
n_estimators=self.n_estimators,
criterion=self.criterion,
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf,
min_weight_fraction_leaf=self.min_weight_fraction_leaf,
max_features=self.max_features,
max_leaf_nodes=self.max_leaf_nodes,
min_impurity_decrease=self.min_impurity_decrease,
bootstrap=self.bootstrap,
oob_score=self.oob_score,
n_jobs=self.n_jobs,
random_state=self.random_state,
verbose=self.verbose,
class_weight=self.class_weight,
ccp_alpha=self.ccp_alpha,
max_samples=self.max_samples,
warm_start=False
)
self._pipeline = Pipeline([('fe', feature_extractor), ('rfc', rfc)])
self._pipeline.fit(X, y)
# Get attributes
self.estimator_ = self._pipeline['rfc'].estimator_
self.classes_ = self._pipeline['rfc'].classes_
self.estimators_ = self._pipeline['rfc'].estimators_
self.feature_importances_ = self._pipeline['rfc'].feature_importances_
self.indices_ = self._pipeline['fe'].indices_
self.n_features_in_ = (
self._pipeline['rfc'].n_features_in_
if hasattr(self._pipeline['rfc'], 'n_features_in_')
else self._pipeline['rfc'].n_features_
)
self.oob_decision_function_ = getattr(
self._pipeline['rfc'], 'oob_decision_function_', None)
self.oob_score_ = getattr(self._pipeline['rfc'], 'oob_score_', None)
return self
[docs] def predict(self, X):
"""Predict class for X.
The predicted class of an input time series is a vote by the trees
in the forest, weighted by their probability estimates.
That is, the predicted class is the one with highest mean
probability estimate across the trees.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
Returns
-------
y : array, shape = (n_samples,)
The predicted classes.
"""
check_is_fitted(self)
return self._pipeline.predict(X)
[docs] def predict_proba(self, X):
"""Predict class probabilities for X.
The predicted class probabilities of an input time series are computed
as the mean predicted class probabilities of the trees in the forest.
The class probability of a single tree is the fraction of samples
of the same class in a leaf.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Univariate time series.
Returns
-------
p : array, shape = (n_samples, n_classes)
The class probabilities of the input time series.
The order of the classes corresponds to that in the
attribute `classes_`.
"""
check_is_fitted(self)
return self._pipeline.predict_proba(X)
[docs] def score(self, X, y):
"""Return the mean accuracy on the given test data and labels.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Test samples.
y : array-like, shape = (n_samples,)
True labels for X.
Returns
-------
score : float
Mean accuracy of self.predict(X) wrt. y.
"""
check_is_fitted(self)
return self._pipeline.score(X, y)