Source code for pyts.approximation.dft

"""Code for Discrete Fourier Transform."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.feature_selection import f_classif
from sklearn.utils.validation import check_array, check_is_fitted, check_X_y
from math import ceil
from warnings import warn
from ..base import UnivariateTransformerMixin
from ..preprocessing import StandardScaler


[docs]class DiscreteFourierTransform(BaseEstimator, UnivariateTransformerMixin): """Discrete Fourier Transform. Parameters ---------- n_coefs : None, int or float (default = None) The number of Fourier coefficients to keep. If None, all the Fourier coeeficients are kept. If an integer, the ``n_coefs`` most significant Fourier coefficients are returned if ``anova=True``, otherwise the first ``n_coefs`` Fourier coefficients are returned. If a float, it represents a percentage of the size of each time series and must be between 0 and 1. The number of coefficients will be computed as ``ceil(n_coefs * (n_timestamps - 1))`` if ``drop_sum=True`` and ``ceil(n_coefs * n_timestamps)`` if ``drop_sum=False``. drop_sum : bool (default = False) If True, the first Fourier coefficient (i.e. the sum of the subseries) is dropped. Otherwise, it is kept. anova : bool (default = False) If True, the Fourier coefficient selection is done via a one-way ANOVA test. If False, the first Fourier coefficients are selected. norm_mean : bool (default = False) If True, center each time series before scaling. norm_std : bool (default = False) If True, scale each time series to unit variance. Attributes ---------- support_ : array, shape = (n_coefs,) Indices of the kept Fourier coefficients. References ---------- .. [1] P. Schäfer, and M. Högqvist, "SFA: A Symbolic Fourier Approximation and Index for Similarity Search in High Dimensional Datasets", International Conference on Extending Database Technology, 15, 516-527 (2012). Examples -------- >>> from pyts.approximation import DiscreteFourierTransform >>> from pyts.datasets import load_gunpoint >>> X, _, _, _ = load_gunpoint(return_X_y=True) >>> transformer = DiscreteFourierTransform(n_coefs=4) >>> X_new = transformer.fit_transform(X) >>> X_new.shape (50, 4) """
[docs] def __init__(self, n_coefs=None, drop_sum=False, anova=False, norm_mean=False, norm_std=False): self.n_coefs = n_coefs self.drop_sum = drop_sum self.anova = anova self.norm_mean = norm_mean self.norm_std = norm_std
[docs] def fit(self, X, y=None): """Learn indices of the Fourier coefficients to keep. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Training vector. y : None or array-like, shape = (n_samples,) (default = None) Class labels for each data sample. Only used if ``anova=True``. Returns ------- self : object """ if self.anova: X, y = check_X_y(X, y, dtype='float64') else: X = check_array(X, dtype='float64') n_samples, n_timestamps = X.shape n_coefs = self._check_params(n_timestamps) if self.anova: ss = StandardScaler(self.norm_mean, self.norm_std) X = ss.fit_transform(X) X_fft = np.fft.rfft(X) X_fft = np.vstack([np.real(X_fft), np.imag(X_fft)]) if n_timestamps % 2 == 0: X_fft = X_fft.reshape(n_samples, n_timestamps + 2, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:-1]] else: X_fft = X_fft.reshape(n_samples, n_timestamps + 1, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:]] if self.drop_sum: X_fft = X_fft[:, 1:] self.support_ = self._anova(X_fft, y, n_coefs, n_timestamps) else: self.support_ = np.arange(n_coefs) return self
[docs] def transform(self, X): """Return the selected Fourier coefficients for each sample. Parameters ---------- X : array-like, shape (n_samples, n_timestamps) Input data. Returns ------- X_new : array, shape (n_samples, n_coefs) The selected Fourier coefficients for each sample. """ check_is_fitted(self, 'support_') X = check_array(X, dtype='float64') n_samples, n_timestamps = X.shape ss = StandardScaler(self.norm_mean, self.norm_std) X = ss.fit_transform(X) X_fft = np.fft.rfft(X) X_fft = np.vstack([np.real(X_fft), np.imag(X_fft)]) if n_timestamps % 2 == 0: X_fft = X_fft.reshape(n_samples, n_timestamps + 2, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:-1]] else: X_fft = X_fft.reshape(n_samples, n_timestamps + 1, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:]] if self.drop_sum: X_fft = X_fft[:, 1:] return X_fft[:, self.support_]
[docs] def fit_transform(self, X, y=None): """Learn and return the Fourier coeeficients to keep. Parameters ---------- X : array-like, shape = (n_samples, n_timestamps) Training vector, where n_samples in the number of samples and n_features is the number of features. y : None or array-like, shape = (n_samples,) (default = None) Class labels for each data sample. Returns ------- X_new : array, shape (n_samples, n_coefs) The selected Fourier coefficients for each sample. """ if self.anova: X, y = check_X_y(X, y, dtype='float64') else: X = check_array(X, dtype='float64') n_samples, n_timestamps = X.shape n_coefs = self._check_params(n_timestamps) scaler = StandardScaler(self.norm_mean, self.norm_std) X = scaler.fit_transform(X) X_fft = np.fft.rfft(X) X_fft = np.vstack([np.real(X_fft), np.imag(X_fft)]) if n_timestamps % 2 == 0: X_fft = X_fft.reshape(n_samples, n_timestamps + 2, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:-1]] else: X_fft = X_fft.reshape(n_samples, n_timestamps + 1, order='F') X_fft = np.c_[X_fft[:, 0], X_fft[:, 2:]] if self.drop_sum: X_fft = X_fft[:, 1:] if self.anova: self.support_ = self._anova(X_fft, y, n_coefs, n_timestamps) else: self.support_ = np.arange(n_coefs) return X_fft[:, self.support_]
def _anova(self, X_fft, y, n_coefs, n_timestamps): if n_coefs < X_fft.shape[1]: non_constant = np.where( ~np.isclose(X_fft.var(axis=0), np.zeros_like(X_fft.shape[1])) )[0] if non_constant.size == 0: raise ValueError("All the Fourier coefficients are constant. " "Your input data is weirdly homogeneous.") elif non_constant.size < n_coefs: warn("The number of non constant Fourier coefficients ({0}) " "is lower than the number of coefficients to keep ({1}). " "The number of coefficients to keep is truncated to {2}" ".".format(non_constant.size, n_coefs, non_constant.size)) support = non_constant else: _, p = f_classif(X_fft[:, non_constant], y) support = non_constant[np.argsort(p)[:n_coefs]] else: support = np.arange(n_coefs) return support def _check_params(self, n_timestamps): if not ((isinstance(self.n_coefs, (int, np.integer, float, np.floating))) or (self.n_coefs is None)): raise TypeError("'n_coefs' must be None, an integer or a float.") if isinstance(self.n_coefs, (int, np.integer)): if self.drop_sum and not (1 <= self.n_coefs <= n_timestamps - 1): raise ValueError( "If 'n_coefs' is an integer, it must be greater than or " "equal to 1 and lower than or equal to (n_timestamps - 1) " "if 'drop_sum=True'." ) if not self.drop_sum and not (1 <= self.n_coefs <= n_timestamps): raise ValueError( "If 'n_coefs' is an integer, it must be greater than or " "equal to 1 and lower than or equal to n_timestamps " "if 'drop_sum=False'." ) n_coefs = self.n_coefs elif isinstance(self.n_coefs, (float, np.floating)): if not 0 < self.n_coefs <= 1: raise ValueError( "If 'n_coefs' is a float, it must be greater " "than 0 and lower than or equal to 1." ) if self.drop_sum: n_coefs = ceil(self.n_coefs * (n_timestamps - 1)) else: n_coefs = ceil(self.n_coefs * n_timestamps) else: n_coefs = (n_timestamps - 1) if self.drop_sum else n_timestamps return n_coefs