Source code for pyts.bag_of_words.bow

"""Code for Bag-of-words representation for time series."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

import numpy as np
from math import ceil
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline
from sklearn.utils.validation import check_array
from ..approximation import (
    PiecewiseAggregateApproximation, SymbolicAggregateApproximation)
from ..base import UnivariateTransformerMixin
from ..preprocessing import KBinsDiscretizer, StandardScaler
from ..preprocessing.discretizer import _digitize
from ..utils.utils import _windowed_view


[docs]class WordExtractor(BaseEstimator, UnivariateTransformerMixin): r"""Transform discretized time series into sequences of words. Parameters ---------- window_size : int or float (default = 0.1) Size of the sliding window (i.e. the size of each word). If float, it represents the percentage of the size of each time series and must be between 0 and 1. The window size will be computed as ``ceil(window_size * n_timestamps)``. window_step : int or float (default = 1) Step of the sliding window. If float, it represents the percentage of the size of each time series and must be between 0 and 1. The window size will be computed as ``ceil(window_step * n_timestamps)``. numerosity_reduction : bool (default = True) If True, delete sample-wise all but one occurence of back to back identical occurences of the same words. Examples -------- >>> from pyts.bag_of_words import WordExtractor >>> X = [['a', 'a', 'b', 'a', 'b', 'b', 'b', 'b', 'a'], ... ['a', 'b', 'c', 'c', 'c', 'c', 'a', 'a', 'c']] >>> word = WordExtractor(window_size=2) >>> print(word.transform(X)) ['aa ab ba ab bb ba' 'ab bc cc ca aa ac'] >>> word = WordExtractor(window_size=2, numerosity_reduction=False) >>> print(word.transform(X)) ['aa ab ba ab bb bb bb ba' 'ab bc cc cc cc ca aa ac'] """
[docs] def __init__(self, window_size=0.1, window_step=1, numerosity_reduction=True): self.window_size = window_size self.window_step = window_step self.numerosity_reduction = numerosity_reduction
[docs] def fit(self, X=None, y=None): """Pass. Parameters ---------- X ignored y Ignored Returns ------- self : object """ return self
[docs] def transform(self, X): """Transform time series into sequences of words. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Returns ------- X_new : array, shape = (n_samples,) Transformed data. Each row is a string consisting of words separated by a whitespace. """ X = check_array(X, dtype=None) n_samples, n_timestamps = X.shape window_size, window_step = self._check_params(n_timestamps) n_windows = (n_timestamps - window_size + window_step) // window_step X_window = _windowed_view(X, n_samples, n_timestamps, window_size, window_step) X_word = np.asarray([[''.join(X_window[i, j]) for j in range(n_windows)] for i in range(n_samples)]) if self.numerosity_reduction: not_equal = np.c_[X_word[:, 1:] != X_word[:, :-1], np.full(n_samples, True)] X_bow = np.asarray([' '.join(X_word[i, not_equal[i]]) for i in range(n_samples)]) else: X_bow = np.asarray([' '.join(X_word[i]) for i in range(n_samples)]) return X_bow
def _check_params(self, n_timestamps): if not isinstance(self.window_size, (int, np.integer, float, np.floating)): raise TypeError("'window_size' must be an integer or a float.") if isinstance(self.window_size, (int, np.integer)): if not 1 <= self.window_size <= n_timestamps: raise ValueError( "If 'window_size' is an integer, it must be greater " "than or equal to 1 and lower than or equal to " "n_timestamps (got {0}).".format(self.window_size) ) window_size = self.window_size else: if not 0 < self.window_size <= 1: raise ValueError( "If 'window_size' is a float, it must be greater " "than 0 and lower than or equal to 1 (got {0})." .format(self.window_size) ) window_size = ceil(self.window_size * n_timestamps) if not isinstance(self.window_step, (int, np.integer, float, np.floating)): raise TypeError("'window_step' must be an integer or a float.") if isinstance(self.window_step, (int, np.integer)): if not 1 <= self.window_step <= n_timestamps: raise ValueError( "If 'window_step' is an integer, it must be greater " "than or equal to 1 and lower than or equal to " "n_timestamps (got {0}).".format(self.window_step) ) window_step = self.window_step else: if not 0 < self.window_step <= 1: raise ValueError( "If 'window_step' is a float, it must be greater " "than 0 and lower than or equal to 1 (got {0})." .format(self.window_step) ) window_step = ceil(self.window_step * n_timestamps) return window_size, window_step
[docs]class BagOfWords(BaseEstimator, TransformerMixin): """Bag-of-words representation for time series. This algorithm uses a sliding window to extract subsequences from the time series and transforms each subsequence into a word using the Piecewise Aggregate Approximation and the Symbolic Aggregate approXimation algorithms. Thus it transforms each time series into a bag of words. Parameters ---------- window_size : int or float (default = 0.5) Length of the sliding window. If float, it represents a percentage of the size of each time series and must be between 0 and 1. word_size : int or float (default = 0.5) Length of the words. If float, it represents a percentage of the length of the sliding window and must be between 0. and 1. n_bins : int (default = 4) The number of bins to produce. It must be between 2 and ``min(n_timestamps, 26)``. strategy : 'uniform', 'quantile' or 'normal' (default = 'normal') Strategy used to define the widths of the bins: - 'uniform': All bins in each sample have identical widths - 'quantile': All bins in each sample have the same number of points - 'normal': Bin edges are quantiles from a standard normal distribution numerosity_reduction : bool (default = True) If True, delete sample-wise all but one occurence of back to back identical occurences of the same words. window_step : int or float (default = 1) Step of the sliding window. If float, it represents the percentage of the size of each time series and must be between 0 and 1. The window size will be computed as ``ceil(window_step * n_timestamps)``. threshold_std : int, float or None (default = 0.01) Threshold used to determine whether a subsequence is standardized. Subsequences whose standard deviations are lower than this threshold are not standardized. If None, all the subsequences are standardized. norm_mean : bool (default = True) If True, center each subseries before scaling. norm_std : bool (default = True) If True, scale each subseries to unit variance. overlapping : bool (default = True) If True, time points may belong to two bins when decreasing the size of the subsequence with the Piecewise Aggregate Approximation algorithm. If False, each time point belong to one single bin, but the size of the bins may vary. raise_warning : bool (default = False) If True, a warning is raised when the number of bins is smaller for at least one subsequence. In this case, you should consider decreasing the number of bins, using another strategy to compute the bins or removing the corresponding time series. alphabet : None or array-like, shape = (n_bins,) Alphabet to use. If None, the first `n_bins` letters of the Latin alphabet are used. References ---------- .. [1] J. Lin, R. Khade and Y. Li, "Rotation-invariant similarity in time series using bag-of-patterns representation". Journal of Intelligent Information Systems, 39 (2), 287-315 (2012). Examples -------- >>> import numpy as np >>> from pyts.bag_of_words import BagOfWords >>> X = np.arange(12).reshape(2, 6) >>> bow = BagOfWords(window_size=4, word_size=4) >>> bow.transform(X) array(['abcd', 'abcd'], dtype='<U4') >>> bow.set_params(numerosity_reduction=False) BagOfWords(...) >>> bow.transform(X) array(['abcd abcd abcd', 'abcd abcd abcd'], dtype='<U14') """
[docs] def __init__(self, window_size=0.5, word_size=0.5, n_bins=4, strategy='normal', numerosity_reduction=True, window_step=1, threshold_std=0.01, norm_mean=True, norm_std=True, overlapping=True, raise_warning=False, alphabet=None): self.window_size = window_size self.word_size = word_size self.n_bins = n_bins self.strategy = strategy self.numerosity_reduction = numerosity_reduction self.window_step = window_step self.threshold_std = threshold_std self.norm_mean = norm_mean self.norm_std = norm_std self.overlapping = overlapping self.raise_warning = raise_warning self.alphabet = alphabet
[docs] def fit(self, X, y=None): """Pass. Parameters ---------- X Ignored y Ignored Returns ------- self : object """ return self
[docs] def transform(self, X): """Transform each time series into a bag of words. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Data to transform. Returns ------- X_new : array, shape = (n_samples,) Bags of words. """ X = check_array(X, dtype='float64') n_samples, n_timestamps = X.shape window_size, word_size, window_step, alphabet = self._check_params( n_timestamps) n_windows = (n_timestamps - window_size + window_step) // window_step # Standardize time series if quantile from standard normal distribution if self.strategy == 'normal': X_scaled = StandardScaler().transform(X) else: X_scaled = X # Extract subsequences using a sliding window X_window = _windowed_view( X_scaled, n_samples, n_timestamps, window_size, window_step ).reshape(n_samples * n_windows, window_size) if self.threshold_std is not None: # Identify subsequences whose standard deviation below threshold idx = np.std(X_window, axis=1) < self.threshold_std if np.any(idx): # Subsequences with standard deviations below threshold X_paa = PiecewiseAggregateApproximation( window_size=None, output_size=word_size, overlapping=self.overlapping ).transform(X_window[idx]) # Compute the bin edges discretizer = KBinsDiscretizer( n_bins=self.n_bins, strategy=self.strategy, raise_warning=self.raise_warning ) bin_edges = discretizer._compute_bins( X_scaled, n_samples, self.n_bins, self.strategy) # Tile the bin edges for subsequences from the same time series if self.strategy != 'normal': count = np.bincount( np.floor_divide(np.nonzero(idx)[0], n_windows) ) bin_edges = np.vstack([ np.tile(bin_edges[i], (count[i], 1)) for i in range(count.size) if count[i] != 0 ]) X_sax_below_thresh = alphabet[_digitize(X_paa, bin_edges)] # Subsequences with standard deviations above threshold if (self.threshold_std is None) or (not np.all(idx)): pipeline = make_pipeline( StandardScaler( with_mean=self.norm_mean, with_std=self.norm_std ), PiecewiseAggregateApproximation( window_size=None, output_size=word_size, overlapping=self.overlapping ), SymbolicAggregateApproximation( n_bins=self.n_bins, strategy=self.strategy, alphabet=self.alphabet, raise_warning=self.raise_warning ) ) if self.threshold_std is None: X_sax_above_thresh = pipeline.fit_transform(X_window) else: X_sax_above_thresh = pipeline.fit_transform(X_window[~idx]) # Concatenate SAX words if self.threshold_std is not None: if np.any(idx): if not np.all(idx): X_sax = np.empty((n_samples * n_windows, word_size), dtype='<U1') X_sax[idx] = X_sax_below_thresh X_sax[~idx] = X_sax_above_thresh else: X_sax = X_sax_below_thresh else: X_sax = X_sax_above_thresh else: X_sax = X_sax_above_thresh X_sax = X_sax.reshape(n_samples, n_windows, word_size) # Join letters to make words X_word = np.asarray([[''.join(X_sax[i, j]) for j in range(n_windows)] for i in range(n_samples)]) # Apply numerosity reduction if self.numerosity_reduction: not_equal = np.c_[X_word[:, 1:] != X_word[:, :-1], np.full(n_samples, True)] X_bow = np.asarray([' '.join(X_word[i, not_equal[i]]) for i in range(n_samples)]) else: X_bow = np.asarray([' '.join(X_word[i]) for i in range(n_samples)]) return X_bow
def _check_params(self, n_timestamps): if not isinstance(self.window_size, (int, np.integer, float, np.floating)): raise TypeError("'window_size' must be an integer or a float.") if isinstance(self.window_size, (int, np.integer)): if not 1 <= self.window_size <= n_timestamps: raise ValueError( "If 'window_size' is an integer, it must be greater " "than or equal to 1 and lower than or equal to " "n_timestamps (got {0}).".format(self.window_size) ) window_size = self.window_size else: if not 0 < self.window_size <= 1: raise ValueError( "If 'window_size' is a float, it must be greater " "than 0 and lower than or equal to 1 " "(got {0}).".format(self.window_size) ) window_size = ceil(self.window_size * n_timestamps) if not isinstance(self.word_size, (int, np.integer, float, np.floating)): raise TypeError("'word_size' must be an integer or a float.") if isinstance(self.word_size, (int, np.integer)): if not 1 <= self.word_size <= window_size: raise ValueError( "If 'word_size' is an integer, it must be greater " "than or equal to 1 and lower than or equal to " "window_size (got {0}).".format(self.word_size) ) word_size = self.word_size else: if not 0 < self.word_size <= 1: raise ValueError( "If 'word_size' is a float, it must be greater " "than 0 and lower than or equal to 1 " "(got {0}).".format(self.word_size) ) word_size = ceil(self.word_size * window_size) if not isinstance(self.n_bins, (int, np.integer)): raise TypeError("'n_bins' must be an integer.") if not 2 <= self.n_bins <= 26: raise ValueError( "'n_bins' must be greater than or equal to 2 and lower than " "or equal to 26 (got {0})." .format(self.n_bins) ) if self.strategy not in ['uniform', 'quantile', 'normal']: raise ValueError("'strategy' must be either 'uniform', 'quantile' " "or 'normal' (got {0})".format(self.strategy)) if not isinstance(self.window_step, (int, np.integer, float, np.floating)): raise TypeError("'window_step' must be an integer or a float.") if isinstance(self.window_step, (int, np.integer)): if not 1 <= self.window_step <= n_timestamps: raise ValueError( "If 'window_step' is an integer, it must be greater " "than or equal to 1 and lower than or equal to " "n_timestamps (got {0}).".format(self.window_step) ) window_step = self.window_step else: if not 0 < self.window_step <= 1: raise ValueError( "If 'window_step' is a float, it must be greater " "than 0 and lower than or equal to 1 (got {0})." .format(self.window_step) ) window_step = ceil(self.window_step * n_timestamps) threshold_std_none = self.threshold_std is None threshold_std_int_float = isinstance( self.threshold_std, (int, np.integer, float, np.floating)) if not (threshold_std_none or threshold_std_int_float): raise TypeError( "'threshold_std' must be an integer, a float or None." ) if threshold_std_int_float and (not self.threshold_std >= 0.): raise ValueError("If 'threshold_std' is an integer or a float, it " "must be non-negative (got {0})." .format(self.threshold_std)) if not ((self.alphabet is None) or (isinstance(self.alphabet, (list, tuple, np.ndarray)))): raise TypeError("'alphabet' must be None or array-like " "with shape (n_bins,) (got {0})." .format(self.alphabet)) if self.alphabet is None: alphabet = np.array([chr(i) for i in range(97, 97 + self.n_bins)]) else: alphabet = check_array(self.alphabet, ensure_2d=False, dtype=None) if alphabet.shape != (self.n_bins,): raise ValueError("If 'alphabet' is array-like, its shape " "must be equal to (n_bins,).") return window_size, word_size, window_step, alphabet