"""Code for Singular Spectrum Analysis."""
# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause
import numpy as np
from math import ceil
from numba import njit, prange
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_array
from ..base import UnivariateTransformerMixin
from ..utils.utils import _windowed_view
@njit
def _outer_dot(v, X, n_samples, window_size, n_windows):
X_new = np.empty((n_samples, window_size, window_size, n_windows))
for i in prange(n_samples):
for j in prange(window_size):
X_new[i, j] = np.dot(np.outer(v[i, :, j], v[i, :, j]), X[i])
return X_new
@njit
def _diagonal_averaging(X, n_samples, n_timestamps, window_size,
n_windows, grouping_size, gap):
X_new = np.empty((n_samples, grouping_size, n_timestamps))
first_row = [(0, col) for col in range(n_windows)]
last_col = [(row, n_windows - 1) for row in range(1, window_size)]
indices = first_row + last_col
for i in prange(n_samples):
for group in prange(grouping_size):
for (j, k) in indices:
X_new[i, group, j + k] = np.diag(
X[i, group, :, ::-1], gap - j - k - 1
).mean()
return X_new
[docs]class SingularSpectrumAnalysis(BaseEstimator, UnivariateTransformerMixin):
"""Singular Spectrum Analysis.
Parameters
----------
window_size : int or float (default = 4)
Size of the sliding window (i.e. the size of each word). If float, it
represents the percentage of the size of each time series and must be
between 0 and 1. The window size will be computed as
``max(2, ceil(window_size * n_timestamps))``.
groups : None, int, 'auto', or array-like (default = None)
The way the elementary matrices are grouped. If None, no grouping is
performed. If an integer, it represents the number of groups and the
bounds of the groups are computed as
``np.linspace(0, window_size, groups + 1).astype('int64')``.
If 'auto', then three groups are determined, containing trend,
seasonal, and residual. If array-like, each element must be array-like
and contain the indices for each group.
lower_frequency_bound : float (default = 0.075)
The boundary of the periodogram to characterize trend, seasonal and
residual components. It must be between 0 and 0.5.
Ignored if ``groups`` is not set to 'auto'.
lower_frequency_contribution : float (default = 0.85)
The relative threshold to characterize trend, seasonal and
residual components by considering the periodogram.
It must be between 0 and 1. Ignored if ``groups`` is not set to 'auto'.
chunksize : int or None (default = None)
If int, the transformation of the whole dataset is performed using
chunks (batches) and ``chunksize`` corresponds to the maximum size of
each chunk (batch). If None, the transformation is performed on the
whole dataset at once. Performing the transformation with chunks is
likely to be a bit slower but requires less memory.
n_jobs : None or int (default = None)
The number of jobs to use for the computation. Only used if
``chunksize`` is set to an integer.
References
----------
.. [1] N. Golyandina, and A. Zhigljavsky, "Singular Spectrum Analysis for
Time Series". Springer-Verlag Berlin Heidelberg (2013).
.. [2] T. Alexandrov, "A Method of Trend Extraction Using Singular
Spectrum Analysis", REVSTAT (2008).
Examples
--------
>>> from pyts.datasets import load_gunpoint
>>> from pyts.decomposition import SingularSpectrumAnalysis
>>> X, _, _, _ = load_gunpoint(return_X_y=True)
>>> transformer = SingularSpectrumAnalysis(window_size=5)
>>> X_new = transformer.transform(X)
>>> X_new.shape
(50, 5, 150)
"""
[docs] def __init__(self, window_size=4, groups=None,
lower_frequency_bound=0.075,
lower_frequency_contribution=0.85,
chunksize=None, n_jobs=1):
self.window_size = window_size
self.groups = groups
self.lower_frequency_bound = lower_frequency_bound
self.lower_frequency_contribution = lower_frequency_contribution
self.chunksize = chunksize
self.n_jobs = n_jobs
[docs] def fit(self, X=None, y=None):
"""Pass.
Parameters
----------
X
ignored
y
Ignored
Returns
-------
self : object
"""
return self
def _transform(self, X):
n_samples, n_timestamps = X.shape
window_size, grouping_size = self._check_params(n_timestamps)
n_windows = n_timestamps - window_size + 1
X_window = np.transpose(
_windowed_view(X, n_samples, n_timestamps,
window_size, window_step=1), axes=(0, 2, 1)
).copy()
X_tranpose = np.matmul(X_window,
np.transpose(X_window, axes=(0, 2, 1)))
w, v = np.linalg.eigh(X_tranpose)
w, v = w[:, ::-1], v[:, :, ::-1]
del X_tranpose
X_elem = _outer_dot(v, X_window, n_samples, window_size, n_windows)
X_groups, grouping_size = self._grouping(
X_elem, v, n_samples, window_size, n_windows, grouping_size,
)
if window_size >= n_windows:
X_groups = np.transpose(X_groups, axes=(0, 1, 3, 2))
gap = window_size
else:
gap = n_windows
del X_elem
X_ssa = _diagonal_averaging(
X_groups, n_samples, n_timestamps, window_size,
n_windows, grouping_size, gap
)
return np.squeeze(X_ssa)
def _grouping(
self, X, v, n_samples, window_size, n_windows, grouping_size
):
if self.groups is None:
X_new = X
elif self.groups == "auto":
f = np.arange(0, 1 + window_size // 2) / window_size
Pxx = np.abs(np.fft.rfft(v, axis=1, norm='ortho')) ** 2
if Pxx.shape[-1] % 2 == 0:
Pxx[:, 1:-1, :] *= 2
else:
Pxx[:, 1:, :] *= 2
Pxx_cumsum = np.cumsum(Pxx, axis=1)
idx_trend = np.where(f < self.lower_frequency_bound)[0][-1]
idx_resid = Pxx_cumsum.shape[1] // 2
c = self.lower_frequency_contribution
trend = Pxx_cumsum[:, idx_trend, :] / Pxx_cumsum[:, -1, :] > c
resid = Pxx_cumsum[:, idx_resid, :] / Pxx_cumsum[:, -1, :] < c
season = np.logical_and(~trend, ~resid)
X_new = np.zeros((n_samples, grouping_size,
window_size, n_windows))
for i in range(n_samples):
for j, arr in enumerate((trend, season, resid)):
X_new[i, j] = X[i, arr[i]].sum(axis=0)
elif isinstance(self.groups, (int, np.integer)):
grouping = np.linspace(0, window_size,
self.groups + 1).astype('int64')
X_new = np.zeros((n_samples, grouping_size,
window_size, n_windows))
for i, (j, k) in enumerate(zip(grouping[:-1], grouping[1:])):
X_new[:, i] = X[:, j:k].sum(axis=1)
else:
X_new = np.zeros((n_samples, grouping_size,
window_size, n_windows))
for i, group in enumerate(self.groups):
X_new[:, i] = X[:, group].sum(axis=1)
return X_new, grouping_size
def _check_params(self, n_timestamps):
if not isinstance(self.window_size,
(int, np.integer, float, np.floating)):
raise TypeError("'window_size' must be an integer or a float.")
if isinstance(self.window_size, (int, np.integer)):
if not 2 <= self.window_size <= n_timestamps:
raise ValueError(
"If 'window_size' is an integer, it must be greater "
"than or equal to 2 and lower than or equal to "
"n_timestamps (got {0}).".format(self.window_size)
)
window_size = self.window_size
else:
if not 0 < self.window_size <= 1:
raise ValueError(
"If 'window_size' is a float, it must be greater "
"than 0 and lower than or equal to 1 "
"(got {0}).".format(self.window_size)
)
window_size = max(2, ceil(self.window_size * n_timestamps))
if not (self.groups is None
or (isinstance(self.groups, str) and self.groups == "auto")
or isinstance(self.groups, (int, list, tuple, np.ndarray))):
raise TypeError("'groups' must be either None, an integer, "
"'auto' or array-like.")
if self.groups is None:
grouping_size = window_size
elif (isinstance(self.groups, str) and self.groups == "auto"):
grouping_size = 3
elif isinstance(self.groups, (int, np.integer)):
if not 1 <= self.groups <= self.window_size:
raise ValueError(
"If 'groups' is an integer, it must be greater than or "
"equal to 1 and lower than or equal to 'window_size'."
)
grouping = np.linspace(
0, window_size, self.groups + 1).astype('int64')
grouping_size = len(grouping) - 1
if isinstance(self.groups, (list, tuple, np.ndarray)):
idx = np.concatenate(self.groups)
diff = np.setdiff1d(idx, np.arange(self.window_size))
flat_list = [item for group in self.groups for item in group]
if ((diff.size > 0)
or not (all(isinstance(x, (int, np.integer))
for x in flat_list))):
raise ValueError(
"If 'groups' is array-like, all the values in 'groups' "
"must be integers between 0 and ('window_size' - 1)."
)
grouping_size = len(self.groups)
if not isinstance(self.lower_frequency_bound, (float, np.floating)):
raise TypeError("'lower_frequency_bound' must be a float.")
else:
if not 0 < self.lower_frequency_bound < 0.5:
raise ValueError(
"'lower_frequency_bound' must be greater than 0 and "
"lower than 0.5."
)
if not isinstance(self.lower_frequency_contribution,
(float, np.floating)):
raise TypeError("'lower_frequency_contribution' must be a float.")
else:
if not 0 < self.lower_frequency_contribution < 1:
raise ValueError(
"'lower_frequency_contribution' must be greater than 0 "
"and lower than 1."
)
chunksize_int = isinstance(self.chunksize, (int, np.integer))
if not (self.chunksize is None or chunksize_int):
raise TypeError("'chunksize' must be None or an integer.")
if chunksize_int and self.chunksize < 1:
raise ValueError("If 'chunksize' is an integer, it must be "
"positive (got {})".format(self.chunksize))
n_jobs_int = (isinstance(self.n_jobs, (int, np.integer)) and
self.n_jobs != 0)
if not (self.n_jobs is None or n_jobs_int):
raise ValueError("'n_jobs' must be None or an integer not equal "
"to zero (got {}).".format(self.n_jobs))
return window_size, grouping_size