Source code for pyts.classification.saxvsm

"""Code for SAX-VSM."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

import numpy as np
from sklearn.utils.validation import check_X_y, check_is_fitted
from sklearn.utils.multiclass import check_classification_targets
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from ..bag_of_words import BagOfWords
from ..approximation import SymbolicAggregateApproximation

import sklearn
SKLEARN_VERSION = sklearn.__version__


[docs]class SAXVSM(BaseEstimator, ClassifierMixin): """Classifier based on SAX-VSM representation and tf-idf statistics. Time series are first transformed into bag of words using Symbolic Aggregate approXimation (SAX) algorithm followed by a bag-of-words model. Then the classes are transformed into a Vector Space Model (VSM) using term frequencies (tf) and inverse document frequencies (idf). Parameters ---------- n_bins : int (default = 4) The number of bins to produce. It must be between 2 and ``min(n_timestamps, 26)``. strategy : 'uniform', 'quantile' or 'normal' (default = 'quantile') Strategy used to define the widths of the bins: - 'uniform': All bins in each sample have identical widths - 'quantile': All bins in each sample have the same number of points - 'normal': Bin edges are quantiles from a standard normal distribution window_size : int or float (default = 4) Size of the sliding window (i.e. the size of each word). If float, it represents the percentage of the size of each time series and must be between 0 and 1. The window size will be computed as ``ceil(window_size * n_timestamps)``. window_step : int or float (default = 1) Step of the sliding window. If float, it represents the percentage of the size of each time series and must be between 0 and 1. The window step will be computed as ``ceil(window_step * n_timestamps)``. numerosity_reduction : bool (default = True) If True, delete sample-wise all but one occurence of back to back identical occurences of the same words. use_idf : bool (default = True) Enable inverse-document-frequency reweighting. smooth_idf : bool (default = False) Smooth idf weights by adding one to document frequencies, as if an extra document was seen containing every term in the collection exactly once. Prevents zero divisions. sublinear_tf : bool (default = True) Apply sublinear tf scaling, i.e. replace tf with 1 + log(tf). alphabet : None or array-like, shape = (n_bins,) Alphabet to use. If None, the first `n_bins` letters of the Latin alphabet are used. Attributes ---------- classes_ : array, shape = (n_classes,) An array of class labels known to the classifier. idf_ : array, shape = (n_features,) , or None The learned idf vector (global term weights) when ``use_idf=True``, None otherwise. tfidf_ : array, shape = (n_classes, n_words) Term-document matrix. vocabulary_ : dict A mapping of feature indices to terms. References ---------- .. [1] P. Senin, and S. Malinchik, "SAX-VSM: Interpretable Time Series Classification Using SAX and Vector Space Model". International Conference on Data Mining, 13, 1175-1180 (2013). Examples -------- >>> from pyts.classification import SAXVSM >>> from pyts.datasets import load_gunpoint >>> X_train, X_test, y_train, y_test = load_gunpoint(return_X_y=True) >>> clf = SAXVSM(window_size=34, sublinear_tf=False, use_idf=False) >>> clf.fit(X_train, y_train) # doctest: +ELLIPSIS SAXVSM(...) >>> clf.score(X_test, y_test) 0.76 """
[docs] def __init__(self, n_bins=4, strategy='quantile', window_size=4, window_step=1, numerosity_reduction=True, use_idf=True, smooth_idf=False, sublinear_tf=True, alphabet=None): self.n_bins = n_bins self.strategy = strategy self.window_size = window_size self.window_step = window_step self.numerosity_reduction = numerosity_reduction self.use_idf = use_idf self.smooth_idf = smooth_idf self.sublinear_tf = sublinear_tf self.alphabet = alphabet
[docs] def fit(self, X, y): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Training vector. y : array-like, shape = (n_samples,) Class labels for each data sample. Returns ------- self : object """ X, y = check_X_y(X, y) self._check_params() check_classification_targets(y) le = LabelEncoder() y_ind = le.fit_transform(y) self.classes_ = le.classes_ n_classes = self.classes_.size sax = SymbolicAggregateApproximation( self.n_bins, self.strategy, self.alphabet) X_sax = sax.fit_transform(X) bow = BagOfWords(self.window_size, self.window_step, self.numerosity_reduction) X_bow = bow.fit_transform(X_sax) X_class = [' '.join(X_bow[y_ind == classe]) for classe in range(n_classes)] tfidf = TfidfVectorizer( norm=None, use_idf=self.use_idf, smooth_idf=self.smooth_idf, sublinear_tf=self.sublinear_tf ) self.tfidf_ = tfidf.fit_transform(X_class).toarray() self.vocabulary_ = {value: key for key, value in tfidf.vocabulary_.items()} if self.use_idf: self.idf_ = tfidf.idf_ else: self.idf_ = None self._tfidf = tfidf self._sax = sax self._bow = bow return self
[docs] def decision_function(self, X): """Evaluate the cosine similarity between document-term matrix and X. Parameters ---------- X : array-like, shape (n_samples, n_timestamps) Test samples. Returns ------- X : array-like, shape (n_samples, n_classes) osine similarity between the document-term matrix and X. """ if SKLEARN_VERSION >= '0.22': check_is_fitted(self) else: check_is_fitted(self, ['vocabulary_', 'tfidf_', 'idf_', '_tfidf', 'classes_']) X_sax = self._sax.transform(X) X_bow = self._bow.transform(X_sax) vectorizer = CountVectorizer(vocabulary=self._tfidf.vocabulary_) X_transformed = vectorizer.transform(X_bow).toarray() return cosine_similarity(X_transformed, self.tfidf_)
[docs] def predict(self, X): """Predict the class labels for the provided data. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Test samples. Returns ------- y_pred : array-like, shape = (n_samples,) Class labels for each data sample. """ return self.classes_[self.decision_function(X).argmax(axis=1)]
def _check_params(self): if not isinstance(self.n_bins, (int, np.integer)): raise TypeError("'n_bins' must be an integer.") if not 2 <= self.n_bins <= 26: raise ValueError("'n_bins' must be between 2 and 26.")