Source code for pyts.classification.learning_shapelets

"""Code for Learning Time-Series Shapelets algorithm."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

from itertools import chain
from math import ceil
from numba import njit, prange
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.cluster import KMeans
from sklearn.exceptions import ConvergenceWarning
from sklearn.multiclass import OneVsOneClassifier, OneVsRestClassifier
from sklearn.multiclass import _ovr_decision_function
from sklearn.preprocessing import LabelBinarizer, LabelEncoder
from sklearn.utils import compute_class_weight, check_array
from sklearn.utils.validation import (
    check_is_fitted, check_random_state, check_X_y, _check_sample_weight)
from sklearn.utils.multiclass import check_classification_targets
import warnings
from ..utils.utils import _windowed_view
from ..base import UnivariateClassifierMixin


@njit(fastmath=True)
def _expit(x):
    """Compute the expit (logistic) function."""
    return 1 / (1 + np.exp(-x))


@njit(fastmath=True)
def _xlogy(x, y):
    """Compute the x * log(y) function."""
    return x * np.log(y)


@njit(fastmath=True)
def _softmin(arr, alpha):
    """Derive the soft-minimum of an array."""
    maximum = np.max(alpha * arr)
    exp = np.exp(alpha * arr - maximum)
    num = np.sum(arr * exp)
    den = np.sum(exp)
    return num / den


@njit(fastmath=True)
def _softmin_grad(arr, alpha):
    """Derive the gradient of the softmin function."""
    minimum = _softmin(arr, alpha)
    maximum = np.max(alpha * arr)
    exp = np.exp(alpha * arr - maximum)
    num = exp * (1 + alpha * (arr - minimum))
    den = np.sum(exp)
    return num / den


@njit(fastmath=True)
def _softmax(X, n_samples, n_classes):
    """Derive the softmax of a 2D-array."""
    maximum = np.empty((n_samples, 1))
    for i in prange(n_samples):
        maximum[i, 0] = np.max(X[i])
    exp = np.exp(X - maximum)
    sum_ = np.empty((n_samples, 1))
    for i in prange(n_samples):
        sum_[i, 0] = np.sum(exp[i])
    return exp / sum_


@njit(fastmath=True)
def _derive_shapelet_distances(X, shapelet, alpha):
    """Derive the distance between a shapelet and all the time series."""
    n_samples, n_windows, _ = X.shape

    # Derive all squared distances
    mean = np.empty((n_samples, n_windows))
    for i in prange(n_samples):
        for j in prange(n_windows):
            mean[i, j] = np.mean((X[i, j] - shapelet) ** 2)

    # Derive the soft minimum of all the distances
    dist = np.empty(n_samples)
    for i in prange(n_samples):
        dist[i] = _softmin(mean[i], alpha)

    return dist


@njit()
def _derive_all_squared_distances(
    X, n_samples, n_timestamps, shapelets, lengths, alpha
):
    """Derive the squared distances between all shapelets and time series."""
    distances = []  # save the distances in a list

    for i in prange(len(lengths)):
        window_size = lengths[i][0]
        X_window = _windowed_view(X, n_samples, n_timestamps,
                                  window_size, window_step=1)
        for j in prange(shapelets[i].shape[0]):
            dist = _derive_shapelet_distances(X_window, shapelets[i][j], alpha)
            distances.append(dist)

    return distances


@njit()
def _reshape_list_shapelets(shapelets, lengths):
    """Reshape shapelets from a 1D-array to a list of 2D-arrays."""
    shapelets_reshaped = []
    start = 0
    for length in lengths:
        n_shapelets = length.size
        length_ = length[0]
        end = start + n_shapelets * length_
        shapelets_reshaped.append(shapelets[start: end].reshape(-1, length_))
        start = end
    return shapelets_reshaped


@njit()
def _reshape_array_shapelets(shapelets, lengths):
    """Reshape shapelets from a tuple of 2D-arrays to a 1D-array."""
    lengths_concatenated = np.concatenate(lengths)
    size = np.sum(lengths_concatenated)
    shapelets_reshaped = np.empty(size)
    start = 0
    for i in range(len(shapelets)):
        end = start + np.sum(lengths[i])
        shapelets_reshaped[start:end] = np.ravel(shapelets[i])
        start = end
    return shapelets_reshaped


def _loss(X, y, n_classes, weights, shapelets, lengths, alpha, penalty, C,
          fit_intercept, intercept_scaling, sample_weight):
    """Compute the objective function."""
    n_samples, n_timestamps = X.shape

    # Derive distances between shapelets and time series
    distances = _derive_all_squared_distances(
        X, n_samples, n_timestamps, shapelets, lengths, alpha)
    distances = np.asarray(distances).T

    # Add intercept
    if fit_intercept:
        distances = np.c_[np.ones(n_samples) * intercept_scaling, distances]

    # Derive probabilities and cross-entropy loss
    if weights.ndim == 1:
        proba = _expit(distances @ weights)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)
        loss_value = - np.mean(
            sample_weight * (_xlogy(y, proba) + _xlogy(1 - y, 1 - proba)))
    else:
        proba = _softmax(distances @ weights, n_samples, n_classes)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)
        loss_value = - np.mean(
            sample_weight * np.sum(y * np.log(proba), axis=1))

    # Add regularization
    if penalty == 'l2':
        loss_value += (1 / C) * np.square(weights).sum()
    elif penalty == 'l1':
        loss_value += (1 / C) * np.abs(weights).sum()

    return loss_value


def _grad_weights(X, y, n_classes, weights, shapelets, lengths, alpha, penalty,
                  C, fit_intercept, intercept_scaling, sample_weight):
    """Compute the gradient of the loss with regards to the weights."""
    n_samples, n_timestamps = X.shape

    # Derive distances between shapelets and time series
    distances = _derive_all_squared_distances(
        X, n_samples, n_timestamps, shapelets, lengths, alpha)
    distances = np.asarray(distances).T

    # Add intercept
    if fit_intercept:
        distances = np.c_[np.ones(n_samples) * intercept_scaling, distances]

    # Derive probabilities and binary cross-entropy loss
    if weights.ndim == 1:
        proba = _expit(distances @ weights)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)
        gradients = ((proba - y)[:, None] *
                     distances *
                     sample_weight).mean(axis=0)
    else:
        proba = _softmax(distances @ weights, n_samples, n_classes)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)
        gradients = ((proba - y)[:, None, :] *
                     (distances * sample_weight)[:, :, None]).mean(axis=0)

    if penalty == 'l2':
        gradients += (2 / C) * weights
    elif penalty == 'l1':
        gradients += (1 / C) * np.sign(weights)

    return gradients


@njit()
def _compute_shapelet_grad(
    X, n_samples, n_timestamps, weights, shapelets, lengths, alpha,
    proba_minus_y, weight_idx, sample_weight,
):

    gradients = []

    for i in range(len(lengths)):
        X_window = _windowed_view(X, n_samples, n_timestamps,
                                  window_size=lengths[i][0], window_step=1)
        n_windows = X_window.shape[1]
        size = shapelets[i][0].size
        for j in range(shapelets[i].shape[0]):
            # Get the current shapelet
            shapelet = shapelets[i][j]

            # Derive the difference and the distance
            diff = shapelet - X_window
            dist = np.empty((n_samples, n_windows))
            for k in prange(n_samples):
                for m in prange(n_windows):
                    dist[k, m] = np.mean(diff[k, m] ** 2)

            # Derive the softmin gradient
            softmin_gradient = np.empty((n_samples, n_windows))
            for k in prange(n_samples):
                softmin_gradient[k] = _softmin_grad(dist[k], alpha)

            # Normalize the difference for actual gradient
            diff *= (2 / size)

            # Compute the gradient
            grad = np.empty(size)
            if weights.ndim == 1:
                for k in prange(size):
                    grad[k] = np.mean(
                        np.sum(diff[:, :, k] * softmin_gradient, axis=1) *
                        weights[weight_idx] *
                        proba_minus_y *
                        sample_weight
                    )
            else:
                for k in prange(size):
                    grad[k] = np.mean(
                        np.sum(diff[:, :, k] * softmin_gradient, axis=1) *
                        np.sum(weights[weight_idx] * proba_minus_y, axis=1) *
                        sample_weight
                    )
            gradients.append(grad)

            # Update the weight index
            weight_idx += 1

    return gradients


def _grad_shapelets(X, y, n_classes, weights, shapelets, lengths, alpha,
                    penalty, C, fit_intercept, intercept_scaling,
                    sample_weight):
    """Compute the gradient of the loss with regards to the shapelets."""
    n_samples, n_timestamps = X.shape

    # Derive distances between shapelets and time series
    distances = _derive_all_squared_distances(
        X, n_samples, n_timestamps, shapelets, lengths, alpha)
    distances = np.asarray(distances).T

    # Add intercept
    if fit_intercept:
        distances = np.c_[np.ones(n_samples) * intercept_scaling, distances]
        weight_idx = 1
    else:
        weight_idx = 0

    # Derive probabilities and cross-entropy loss
    if weights.ndim == 1:
        proba = _expit(distances @ weights)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)
    else:
        proba = _softmax(distances @ weights, n_samples, n_classes)
        proba = np.clip(proba, 1e-8, 1 - 1e-8)

    # Reshape some arrays
    if weights.ndim == 1:
        proba_minus_y = (proba - y)[:, None]
    else:
        proba_minus_y = proba - y

    # Compute the gradients
    gradients = _compute_shapelet_grad(
        X, n_samples, n_timestamps, weights, shapelets, lengths,
        alpha, proba_minus_y, weight_idx, sample_weight
    )
    gradients = np.concatenate(gradients)

    return gradients


class CrossEntropyLearningShapelets(BaseEstimator, UnivariateClassifierMixin):
    """Learning Shapelets algorithm with cross-entropy loss.

    Parameters
    ----------
    n_shapelets_per_size : int or float (default = 0.2)
        Number of shapelets per size. If float, it represents
        a fraction of the number of timestamps and the number
        of shapelets per size is equal to
        ``ceil(n_shapelets_per_size * n_timestamps)``.

    min_shapelet_length : int or float (default = 0.1)
        Minimum length of the shapelets. If float, it represents
        a fraction of the number of timestamps and the minimum
        length of the shapelets per size is equal to
        ``ceil(min_shapelet_length * n_timestamps)``.

    shapelet_scale : int (default = 3)
        The different scales for the lengths of the shapelets.
        The lengths of the shapelets are equal to
        ``min_shapelet_length * np.arange(1, shapelet_scale + 1)``.
        The total number of shapelets (and features)
        is equal to ``n_shapelets_per_size * shapelet_scale``.

    penalty : 'l1' or 'l2' (default = 'l2')
        Used to specify the norm used in the penalization.

    tol : float (default = 1e-3)
        Relative tolerance for stopping criterion.

    C : float (default = 1000)
        Inverse of regularization strength. It must be a positive float.
        Smaller values specify stronger regularization.

    learning_rate : float (default = 1.)
        Learning rate for gradient descent optimization. It must be a positive
        float. Note that the learning rate will be automatically decreased
        if the loss function is not decreasing.

    max_iter : int (default = 1000)
        Maximum number of iterations for gradient descent algorithm.

    alpha : float (default = -100)
        Scaling term in the softmin function. The lower, the more precised
        the soft minimum will be. Default value should be good for
        standardized time series.

    fit_intercept : bool (default = True)
        Specifies if a constant (a.k.a. bias or intercept) should be
        added to the decision function.

    intercept_scaling : float (default = 1.)
        Scaling of the intercept. Only used if ``fit_intercept=True``.

    class_weight : dict, None or 'balanced' (default = None)
        Weights associated with classes in the form ``{class_label: weight}``.
        If not given, all classes are supposed to have unit weight.
        The "balanced" mode uses the values of y to automatically adjust
        weights inversely proportional to class frequencies in the input data
        as ``n_samples / (n_classes * np.bincount(y))``.

    verbose : int (default = 0)
        Controls the verbosity. It must be a non-negative integer.
        If positive, loss at each iteration is printed.

    random_state : None, int or RandomState instance (default = None)
        The seed of the pseudo random number generator to use when shuffling
        the data. If int, random_state is the seed used by the random number
        generator. If RandomState instance, random_state is the random number
        generator. If None, the random number generator is the RandomState
        instance used by `np.random`.

    Attributes
    ----------
    classes_ : array, shape = (n_classes,)
        An array of class labels known to the classifier.

    shapelets_ : array, shape = (n_shapelets,)
        Learned shapelets.

    coef_ : array, shape = (1, n_shapelets) or (n_classes, n_shapelets)
        Coefficients for each shapelet in the decision function.

    intercept_ : array, shape = (1,) or (n_classes,)
        Intercepts (a.k.a. biases) added to the decision function.
        If ``fit_intercept=False``, the intercepts are set to zero.

    n_iter_ : int
        Actual number of iterations.

    References
    ----------
    .. [1] J. Grabocka, N. Schilling, M. Wistuba and L. Schmidt-Thieme,
           "Learning Time-Series Shapelets". International Conference on Data
           Mining, 14, 392-401 (2014).

    """

    def __init__(self, n_shapelets_per_size=0.2, min_shapelet_length=0.1,
                 shapelet_scale=3, penalty='l2', tol=0.001, C=1000,
                 learning_rate=1., max_iter=1000, alpha=-100,
                 fit_intercept=True, intercept_scaling=1.,
                 class_weight=None, verbose=0, random_state=None):
        self.n_shapelets_per_size = n_shapelets_per_size
        self.min_shapelet_length = min_shapelet_length
        self.shapelet_scale = shapelet_scale
        self.penalty = penalty
        self.tol = tol
        self.C = C
        self.learning_rate = learning_rate
        self.max_iter = max_iter
        self.alpha = alpha
        self.fit_intercept = fit_intercept
        self.intercept_scaling = intercept_scaling
        self.class_weight = class_weight
        self.verbose = verbose
        self.random_state = random_state

    def fit(self, X, y, sample_weight=None):
        """Fit the model according to the given training data.

        Parameters
        ----------
        X : array-like, shape = (n_samples, n_timestamps)
            Training vector.

        y : array-like, shape = (n_samples,)
            Class labels for each data sample.

        sample_weight : None or array-like, shape = (n_samples,) (default = None)
            Array of weights that are assigned to individual samples.
            If not provided, then each sample is given unit weight.

        Returns
        -------
        self : object

        """  # noqa: E501
        X, y = check_X_y(X, y)
        n_samples, n_timestamps = X.shape
        check_classification_targets(y)
        le = LabelEncoder().fit(y)
        y_ind = le.transform(y)
        self.classes_ = le.classes_
        n_classes = len(le.classes_)

        (n_shapelets_per_size, min_shapelet_length, sample_weight,
         rng) = self._check_params(X, y, y_ind, le.classes_, sample_weight)

        if n_classes > 2:
            y_ind = LabelBinarizer().fit_transform(y)

        # Shapelet initialization
        window_sizes = np.arange(
            min_shapelet_length,
            min_shapelet_length * (self.shapelet_scale + 1),
            min_shapelet_length
        )

        n_shapelets_per_cluster = n_timestamps - window_sizes + 1
        if np.any(n_shapelets_per_size > n_shapelets_per_cluster):
            raise ValueError("'n_shapelets_per_size' is too high given "
                             "'min_shapelet_length' and 'shapelet_scale'.")

        shapelets = []
        lengths = []
        for window_size in window_sizes:
            X_window = _windowed_view(
                X, n_samples, n_timestamps, window_size, window_step=1)
            X_window = X_window.reshape(-1, window_size)
            kmeans = KMeans(
                n_init=10, n_clusters=n_shapelets_per_size, random_state=rng
            )
            kmeans.fit(X_window)
            shapelets.append(kmeans.cluster_centers_)
            lengths.append(np.full(n_shapelets_per_size, window_size))
        shapelets = tuple(shapelets)
        lengths = tuple(lengths)

        # Weight initialization
        n_shapelets = n_shapelets_per_size * self.shapelet_scale
        if n_classes == 2:
            if self.fit_intercept:
                weights = rng.randn(n_shapelets + 1) / 100
            else:
                weights = rng.randn(n_shapelets) / 100
        else:
            if self.fit_intercept:
                weights = rng.randn(n_shapelets + 1, n_classes) / 100
            else:
                weights = rng.randn(n_shapelets, n_classes) / 100

        # Gradient descent
        learning_rate = self.learning_rate
        losses = []
        iteration = 0
        loss_iteration = _loss(
            X, y_ind, n_classes, weights, shapelets, lengths, self.alpha,
            self.penalty, self.C, self.fit_intercept, self.intercept_scaling,
            sample_weight
        )
        if self.verbose:
            print('Iteration {0}: loss = {1:0.6f}'.format(0, loss_iteration))
        losses.append(loss_iteration)
        for iteration in range(1, self.max_iter + 1):

            # Update weights
            gradient_weights = _grad_weights(
                X, y_ind, n_classes, weights, shapelets, lengths, self.alpha,
                self.penalty, self.C, self.fit_intercept,
                self.intercept_scaling, sample_weight
            )
            weights -= learning_rate * gradient_weights

            # Update shapelets
            gradient_shapelets = _grad_shapelets(
                X, y_ind, n_classes, weights, shapelets, lengths, self.alpha,
                self.penalty, self.C, self.fit_intercept,
                self.intercept_scaling, sample_weight
            )
            shapelets_array = _reshape_array_shapelets(shapelets, lengths)
            shapelets_array -= learning_rate * gradient_shapelets
            shapelets = tuple(
                _reshape_list_shapelets(shapelets_array, lengths))

            # Compute current loss
            loss_iteration = _loss(
                X, y_ind, n_classes, weights, shapelets, lengths, self.alpha,
                self.penalty, self.C, self.fit_intercept,
                self.intercept_scaling, sample_weight
            )

            # If loss is increasing, decrease the learning rate
            if losses[-1] < loss_iteration:
                while losses[-1] < loss_iteration:
                    # Go back to previous state
                    weights += learning_rate * gradient_weights
                    shapelets_array = _reshape_array_shapelets(
                        shapelets, lengths)
                    shapelets_array += learning_rate * gradient_shapelets
                    shapelets = tuple(
                        _reshape_list_shapelets(shapelets_array, lengths))

                    # Update learning  rate
                    learning_rate /= 5

                    # Recompute shapelet gradient
                    weights -= learning_rate * gradient_weights
                    gradient_shapelets = _grad_shapelets(
                        X, y_ind, n_classes, weights, shapelets, lengths,
                        self.alpha, self.penalty, self.C, self.fit_intercept,
                        self.intercept_scaling, sample_weight
                    )
                    shapelets_array = _reshape_array_shapelets(
                        shapelets, lengths)
                    shapelets_array -= learning_rate * gradient_shapelets
                    shapelets = tuple(
                        _reshape_list_shapelets(shapelets_array, lengths))

                    loss_iteration = _loss(
                        X, y_ind, n_classes, weights, shapelets, lengths,
                        self.alpha, self.penalty, self.C, self.fit_intercept,
                        self.intercept_scaling, sample_weight
                    )
            if self.verbose:
                print('Iteration {0}: loss = {1:0.6f}'
                      .format(iteration, loss_iteration))
            losses.append(loss_iteration)

            # Stopping criterion
            if abs(losses[-2] - losses[-1]) < self.tol * losses[-1]:
                break

        if iteration == self.max_iter:
            warnings.warn('Maximum number of iterations reached without '
                          'converging. Increase the maximum number of '
                          'iterations.', ConvergenceWarning)

        # Save results in attributes
        self._shapelets = shapelets
        self._lengths = lengths
        self.shapelets_ = [list(shapelet) for shapelet in shapelets]
        self.shapelets_ = np.asarray(
            list(chain.from_iterable(self.shapelets_)), dtype='object')
        if n_classes == 2:
            if self.fit_intercept:
                self.intercept_ = np.array([weights[0]])
                self.coef_ = weights[1:].reshape(1, -1)
            else:
                self.intercept_ = np.array([0])
                self.coef_ = weights.reshape(1, -1)
        else:
            if self.fit_intercept:
                self.intercept_ = weights[0]
                self.coef_ = weights[1:].T
            else:
                self.intercept_ = np.zeros(weights.shape[1])
                self.coef_ = weights.T
        self.n_iter_ = iteration

    def decision_function(self, X):
        """Decision function scores.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_timestamps)
            Test samples.

        Returns
        -------
        T : array-like of shape (n_samples,) or (n_samples, n_classes)
            Decision function scores for each sample for each class in the
            model, where classes are ordered as they are in ``self.classes_``.

        """
        check_is_fitted(self, ['shapelets_', 'coef_', 'intercept_', 'n_iter_'])

        X = check_array(X)
        n_samples, n_timestamps = X.shape

        # Derive distances between shapelets and time series
        distances = _derive_all_squared_distances(
            X, n_samples, n_timestamps, self._shapelets,
            self._lengths, self.alpha
        )
        distances = np.asarray(distances).T

        # Add intercept
        if self.fit_intercept:
            distances = np.c_[np.ones(n_samples) * self.intercept_scaling,
                              distances]

        # Derive decision function
        if self.fit_intercept:
            if len(self.classes_) == 2:
                weights = np.r_[self.intercept_, np.squeeze(self.coef_)]
            else:
                weights = np.r_[self.intercept_.reshape(1, -1), self.coef_.T]
        else:
            weights = self.coef_.T
        X_new = np.squeeze(distances @ weights)

        return X_new

    def predict_proba(self, X):
        """Probability estimates.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_timestamps)
            Test samples.

        Returns
        -------
        T : array-like of shape (n_samples, n_classes)
            Probability of the samples for each class in the model,
            where classes are ordered as they are in ``self.classes_``.

        """
        X_new = self.decision_function(X)
        n_samples = X_new.shape[0]
        if len(self.classes_) == 2:
            proba = _expit(X_new)
            X_proba = np.c_[1 - proba, proba]
        else:
            X_proba = _softmax(X_new, n_samples, len(self.classes_))
        return X_proba

    def predict(self, X):
        """Predict the class labels for the provided data.

        Parameters
        ----------
        X : array-like, shape = (n_samples, n_timestamps)
            Test samples.

        Returns
        -------
        y_pred : array-like, shape = (n_samples,)
            Class labels for each data sample.

        """
        if len(self.classes_) == 2:
            y_pred = (self.decision_function(X) > 0.).astype('int64')
        else:
            y_pred = self.decision_function(X).argmax(axis=1)
        return self.classes_[y_pred]

    def _check_params(self, X, y, y_ind, classes, sample_weight):
        """Parameter check"""
        n_samples, n_timestamps = X.shape

        if not isinstance(self.n_shapelets_per_size,
                          (int, np.integer, float, np.floating)):
            raise TypeError("'n_shapelets_per_size' must be an integer or a "
                            "float (got {})."
                            .format(self.n_shapelets_per_size))
        if isinstance(self.n_shapelets_per_size, (int, np.integer)):
            if not 1 <= self.n_shapelets_per_size <= n_timestamps:
                raise ValueError(
                    "If 'n_shapelets_per_size' is an integer, it must be "
                    "greater than or equal to 1 and lower than or equal to "
                    "n_timestamps (got {0}).".format(self.n_shapelets_per_size)
                )
            n_shapelets_per_size = self.n_shapelets_per_size
        else:
            if not (0 < self.n_shapelets_per_size <= 1.):
                raise ValueError(
                    "If 'n_shapelets_per_size' is a float, it must be greater "
                    "than 0 and lower than or equal to 1 (got {0})."
                    .format(self.n_shapelets_per_size)
                )
            n_shapelets_per_size = ceil(
                self.n_shapelets_per_size * n_timestamps)

        if not isinstance(self.min_shapelet_length,
                          (int, np.integer, float, np.floating)):
            raise TypeError("'min_shapelet_length' must be an integer or a "
                            "float (got {}).".format(self.min_shapelet_length))
        if isinstance(self.min_shapelet_length, (int, np.integer)):
            if not 1 <= self.min_shapelet_length <= n_timestamps:
                raise ValueError(
                    "If 'min_shapelet_length' is an integer, it must be "
                    "greater than or equal to 1 and lower than or equal to "
                    "n_timestamps (got {0}).".format(self.min_shapelet_length)
                )
            min_shapelet_length = self.min_shapelet_length
        else:
            if not (0 < self.min_shapelet_length <= 1.):
                raise ValueError(
                    "If 'min_shapelet_length' is a float, it must be greater "
                    "than 0 and lower than or equal to 1 (got {0})."
                    .format(self.min_shapelet_length)
                )
            min_shapelet_length = ceil(self.min_shapelet_length * n_timestamps)

        if not (isinstance(self.shapelet_scale, (int, np.integer)) and
                self.shapelet_scale > 0):
            raise ValueError("'shapelet_scale' must be a positive integer "
                             "(got {}).".format(self.shapelet_scale))

        if self.shapelet_scale * min_shapelet_length > n_timestamps:
            raise ValueError(
                "'shapelet_scale' and 'min_shapelet_length' must be "
                "such that shapelet_scale * min_shapelet_length is "
                "smaller than or equal to n_timestamps."
            )

        if self.penalty not in ('l1', 'l2'):
            raise ValueError("'penalty' must be either 'l2' or 'l1' "
                             "(got {}).".format(self.penalty))

        if not (isinstance(self.C, (int, np.integer, float, np.floating)) and
                self.C > 0):
            raise ValueError("'C' must be a positive float (got {})."
                             .format(self.C))

        if not (isinstance(self.tol, (int, np.integer, float, np.floating)) and
                self.tol > 0):
            raise ValueError("'tol' must be a positive float (got {})."
                             .format(self.tol))

        if not (isinstance(self.learning_rate,
                           (int, np.integer, float, np.floating)) and
                self.learning_rate > 0):
            raise ValueError("'learning_rate' must be a positive float "
                             "(got {}).".format(self.learning_rate))

        if not (isinstance(self.max_iter, (int, np.integer)) and
                self.max_iter >= 0):
            raise ValueError("'max_iter' must be a non-negative integer "
                             "(got {}).".format(self.max_iter))

        if not (isinstance(self.alpha, (int, np.integer, float, np.floating))
                and self.alpha < 0):
            raise ValueError("'alpha' must be a negative float (got {})."
                             .format(self.alpha))

        if not isinstance(self.intercept_scaling,
                          (int, np.integer, float, np.floating)):
            raise ValueError("'intercept_scaling' must be a float (got {})."
                             .format(self.intercept_scaling))

        class_weight_balanced = (isinstance(self.class_weight, str) and
                                 self.class_weight == 'balanced')
        if not (self.class_weight is None or
                class_weight_balanced or
                isinstance(self.class_weight, dict)):
            raise ValueError("'class_weight' must be None, a dictionary "
                             " or 'balanced' (got {})."
                             .format(self.class_weight))
        class_weight = compute_class_weight(
            self.class_weight, classes=classes, y=y)

        sample_weight = _check_sample_weight(sample_weight, X, dtype='float64')
        sample_weight *= class_weight[y_ind]
        sample_weight = sample_weight.reshape(-1, 1)

        rng = check_random_state(self.random_state)

        if not (isinstance(self.verbose, (int, np.integer)) and
                self.verbose >= 0):
            raise ValueError("'verbose' must be a non-negative integer "
                             "(got {}).".format(self.verbose))

        return n_shapelets_per_size, min_shapelet_length, sample_weight, rng


[docs]class LearningShapelets(BaseEstimator, UnivariateClassifierMixin): """Learning Shapelets algorithm. This estimator consists of two steps: computing the distances between the shapelets and the time series, then computing a logistic regression using these distances as features. This algorithm learns the shapelets as well as the coefficients of the logistic regression. Parameters ---------- n_shapelets_per_size : int or float (default = 0.2) Number of shapelets per size. If float, it represents a fraction of the number of timestamps and the number of shapelets per size is equal to ``ceil(n_shapelets_per_size * n_timestamps)``. min_shapelet_length : int or float (default = 0.1) Minimum length of the shapelets. If float, it represents a fraction of the number of timestamps and the minimum length of the shapelets per size is equal to ``ceil(min_shapelet_length * n_timestamps)``. shapelet_scale : int (default = 3) The different scales for the lengths of the shapelets. The lengths of the shapelets are equal to ``min_shapelet_length * np.arange(1, shapelet_scale + 1)``. The total number of shapelets (and features) is equal to ``n_shapelets_per_size * shapelet_scale``. penalty : 'l1' or 'l2' (default = 'l2') Used to specify the norm used in the penalization. tol : float (default = 1e-3) Tolerance for stopping criterion. C : float (default = 1000) Inverse of regularization strength. It must be a positive float. Smaller values specify stronger regularization. learning_rate : float (default = 1.) Learning rate for gradient descent optimization. It must be a positive float. Note that the learning rate will be automatically decreased if the loss function is not decreasing. max_iter : int (default = 1000) Maximum number of iterations for gradient descent algorithm. multi_class : {'multinomial', 'ovr', 'ovo'} (default = 'multinomial') Strategy for multiclass classification. 'multinomial' stands for multinomial cross-entropy loss. 'ovr' stands for one-vs-rest strategy. 'ovo' stands for one-vs-one strategy. Ignored if the classification task is binary. alpha : float (default = -100) Scaling term in the softmin function. The lower, the more precised the soft minimum will be. Default value should be good for standardized time series. fit_intercept : bool (default = True) Specifies if a constant (a.k.a. bias or intercept) should be added to the decision function. intercept_scaling : float (default = 1.) Scaling of the intercept. Only used if ``fit_intercept=True``. class_weight : dict, None or 'balanced' (default = None) Weights associated with classes in the form ``{class_label: weight}``. If not given, all classes are supposed to have unit weight. The "balanced" mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as ``n_samples / (n_classes * np.bincount(y))``. verbose : int (default = 0) Controls the verbosity. It must be a non-negative integer. If positive, loss at each iteration is printed. random_state : None, int or RandomState instance (default = None) The seed of the pseudo random number generator to use when shuffling the data. If int, random_state is the seed used by the random number generator. If RandomState instance, random_state is the random number generator. If None, the random number generator is the RandomState instance used by `np.random`. n_jobs : None or int (default = None) The number of jobs to use for the computation. Only used if ``multi_class`` is 'ovr' or 'ovo'. Attributes ---------- classes_ : array, shape = (n_classes,) An array of class labels known to the classifier. shapelets_ : array shape = (n_tasks, n_shapelets) Learned shapelets. Each element of this array is a learned shapelet. coef_ : array, shape = (n_tasks, n_shapelets) or (n_classes, n_shapelets) Coefficients for each shapelet in the decision function. intercept_ : array, shape = (n_tasks,) or (n_classes,) Intercepts (a.k.a. biases) added to the decision function. If ``fit_intercept=False``, the intercepts are set to zero. n_iter_ : array, shape = (n_tasks,) Actual number of iterations. Notes ----- The number of tasks (n_tasks) depends on the value of ``multi_class`` and the number of classes. If there are two classes, the number of tasks is equal to 1. If there are more than two classes, the number of tasks is equal to: - 1 if ``multi_class='multinomial'`` - n_classes if ``multi_class='ovr'`` - n_classes * (n_classes - 1) / 2 if ``multi_class='ovo'`` References ---------- .. [1] J. Grabocka, N. Schilling, M. Wistuba and L. Schmidt-Thieme, "Learning Time-Series Shapelets". International Conference on Data Mining, 14, 392-401 (2014). Examples -------- >>> from pyts.classification import LearningShapelets >>> X = [[1, 2, 2, 1, 2, 3, 2], ... [0, 2, 0, 2, 0, 2, 3], ... [0, 1, 2, 2, 1, 2, 2]] >>> y = [0, 1, 0] >>> clf = LearningShapelets(random_state=42, tol=0.01) >>> clf.fit(X, y) LearningShapelets(...) >>> clf.coef_.shape (1, 6) """
[docs] def __init__(self, n_shapelets_per_size=0.2, min_shapelet_length=0.1, shapelet_scale=3, penalty='l2', tol=0.001, C=1000, learning_rate=1., max_iter=1000, multi_class='multinomial', alpha=-100, fit_intercept=True, intercept_scaling=1., class_weight=None, verbose=0, random_state=None, n_jobs=None): self.n_shapelets_per_size = n_shapelets_per_size self.min_shapelet_length = min_shapelet_length self.shapelet_scale = shapelet_scale self.penalty = penalty self.tol = tol self.C = C self.learning_rate = learning_rate self.max_iter = max_iter self.multi_class = multi_class self.alpha = alpha self.fit_intercept = fit_intercept self.intercept_scaling = intercept_scaling self.class_weight = class_weight self.verbose = verbose self.random_state = random_state self.n_jobs = n_jobs
[docs] def fit(self, X, y, sample_weight=None): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Training vector. y : array-like, shape = (n_samples,) Class labels for each data sample. sample_weight : None or array-like, shape = (n_samples,) (default = None) Array of weights that are assigned to individual samples. If not provided, then each sample is given unit weight. Returns ------- self : object """ # noqa: E501 X, y = check_X_y(X, y) n_classes = len(LabelEncoder().fit(y).classes_) multi_class = self._check_params(n_classes) params = self.get_params() params.pop('n_jobs', None) params.pop('multi_class') clf = CrossEntropyLearningShapelets(**params) if multi_class == 'ovr': clf = OneVsRestClassifier(clf, n_jobs=self.n_jobs) elif multi_class == 'ovo': clf = OneVsOneClassifier(clf, n_jobs=self.n_jobs) clf.fit(X, y) self.classes_ = clf.classes_ self._multi_class = multi_class if multi_class in ('ovr', 'ovo'): self._estimators = clf.estimators_ self.shapelets_ = np.array( [est.shapelets_ for est in clf.estimators_]) self.coef_ = np.squeeze(np.asarray( [est.coef_ for est in clf.estimators_])) self.intercept_ = np.squeeze(np.array( [est.intercept_ for est in clf.estimators_])) self.n_iter_ = np.array( [est.n_iter_ for est in clf.estimators_]) else: self._clf = clf self.shapelets_ = np.array([clf.shapelets_]) self.coef_ = clf.coef_ self.intercept_ = clf.intercept_ self.n_iter_ = np.array([clf.n_iter_]) return self
[docs] def decision_function(self, X): """Decision function scores. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Test samples. Returns ------- T : array, shape = (n_samples,) or (n_samples, n_classes) Decision function scores for each sample for each class in the model, where classes are ordered as they are in ``self.classes_``. """ X = check_array(X) if self._multi_class == 'ovr': X_new = np.empty((X.shape[0], self.classes_.size)) for i, estimator in enumerate(self._estimators): X_new[:, i] = estimator.decision_function(X) elif self._multi_class == 'ovo': predictions = np.vstack( [est.predict(X) for est in self._estimators]).T confidences = np.vstack( [est.decision_function(X) for est in self._estimators]).T X_new = _ovr_decision_function( predictions, confidences, len(self.classes_)) else: X_new = self._clf.decision_function(X) return X_new
[docs] def predict_proba(self, X): """Probability estimates. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Test samples. Returns ------- T : array, shape = (n_samples, n_classes) Probability of the samples for each class in the model, where classes are ordered as they are in ``self.classes_``. """ X_new = self.decision_function(X) n_samples = X_new.shape[0] if self._multi_class == 'binary': proba = _expit(X_new) return np.c_[1 - proba, proba] elif self._multi_class == 'multinomial': proba = _softmax(X_new, n_samples, len(self.classes_)) else: # OvR normalization, like LibLinear's predict_probability proba = _expit(X_new) proba /= proba.sum(axis=1).reshape((proba.shape[0], -1)) return proba
[docs] def predict(self, X): """Predict the class labels for the provided data. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Test samples. Returns ------- y_pred : array-like, shape = (n_samples,) Class labels for each data sample. """ X_new = self.decision_function(X) if X_new.ndim == 2: y_pred = X_new.argmax(axis=1) else: y_pred = (X_new > 0).astype('int64') return self.classes_[y_pred]
def _check_params(self, n_classes): if self.multi_class not in ('multinomial', 'ovr', 'ovo'): raise ValueError( "'multi_class' must be either 'multinomial', " "'ovr' or 'ovo' (got {}).".format(self.multi_class) ) multi_class = 'binary' if n_classes == 2 else self.multi_class class_weight_dict = isinstance(self.class_weight, dict) if multi_class in ('ovr', 'ovo') and class_weight_dict: raise ValueError("'class_weight' must be None or 'balanced' if " "'multi_class' is either 'ovr' or 'ovo'.") n_jobs_int = (isinstance(self.n_jobs, (int, np.integer)) and self.n_jobs != 0) if not (self.n_jobs is None or n_jobs_int): raise ValueError("'n_jobs' must be None or an integer not equal " "to zero (got {}).".format(self.n_jobs)) return multi_class