"""Code for RandOm Convolutional KErnel Transformation."""
# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause
from numba import njit, prange
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.utils.validation import (
check_array, check_is_fitted, check_random_state)
from ..base import UnivariateTransformerMixin
@njit()
def generate_kernels(n_kernels, n_timestamps, kernel_sizes, seed):
"""Generate the kernels.
Parameters
----------
n_kernels : int
Number of kernels
n_timestamps : int
Number of timestamps
kernel_sizes : array
Possible sizes for the kernels.
seed : int
Seed for the random number generator.
Returns
-------
weights : array, shape = (n_kernels, max(kernel_sizes))
Weights of the kernels. Zero padding values are added.
lengths : array, shape = (n_kernels,)
Length of each kernel.
biases : array, shape = (n_kernels,)
Bias of each kernel.
dilations : array, shape = (n_kernels,)
Dilation of each kernel.
paddings : array, shape = (n_kernels,)
Padding of each kernel.
"""
# Fix the random see
np.random.seed(seed)
# Lengths of the kernels
lengths = np.random.choice(kernel_sizes, size=n_kernels)
# Weights of the kernels
cumsum_lengths = np.concatenate((np.array([0]), np.cumsum(lengths)))
weights_all = np.random.randn(cumsum_lengths[-1])
weights = np.zeros((n_kernels, np.int64(np.max(kernel_sizes))))
for i in prange(n_kernels):
weights[i, :lengths[i]] = (
weights_all[cumsum_lengths[i]: cumsum_lengths[i+1]] -
np.mean(weights_all[cumsum_lengths[i]: cumsum_lengths[i+1]])
)
# Biases
biases = np.random.uniform(-1, 1, size=n_kernels)
# Dilations
upper_bounds = np.log2(np.floor_divide(n_timestamps - 1, lengths - 1))
powers = np.empty(n_kernels)
for i in prange(n_kernels):
powers[i] = np.random.uniform(0, upper_bounds[i])
dilations = np.floor(np.power(2, powers))
# Paddings
paddings = np.zeros(n_kernels)
padding_cond = np.random.randint(0, 2, n_kernels).astype(np.bool_)
paddings[padding_cond] = np.floor_divide(
(lengths - 1) * dilations, 2)[padding_cond]
return weights, lengths, biases, dilations, paddings
@njit(fastmath=True)
def apply_one_kernel_one_sample(
x, n_timestamps, weight, length, bias, dilation, padding
):
"""Apply one kernel to one time series.
Parameters
----------
x : array, shape = (n_timestamps,)
One time series.
n_timestamps : int
Number of timestamps.
weights : array, shape = (length,)
Weights of the kernel. Zero padding values are added.
length : int
Length of the kernel.
bias : int
Bias of the kernel.
dilation : int
Dilation of the kernel.
padding : int
Padding of the kernel.
Returns
-------
x_new : array, shape = (2,)
Extracted features using the kernel.
"""
# Compute padded x
n_conv = n_timestamps - ((length - 1) * dilation) + (2 * padding)
if padding > 0:
x_pad = np.zeros(n_timestamps + 2 * padding)
x_pad[padding:-padding] = x
else:
x_pad = x
# Compute the convolutions
x_conv = np.zeros(n_conv)
for i in prange(n_conv):
for j in prange(length):
x_conv[i] += weight[j] * x_pad[i + (j * dilation)]
x_conv += bias
# Return the features: maximum and proportion of positive values
return np.max(x_conv), np.mean(x_conv > 0)
@njit()
def apply_all_kernels(X, weights, lengths, biases, dilations, paddings):
"""Apply one kernel to a data set of time series.
Parameters
----------
X : array, shape = (n_samples, n_timestamps)
Input data.
weights : array, shape = (n_kernels, max(kernel_sizes))
Weights of the kernels. Zero padding values are added.
lengths : array, shape = (n_kernels,)
Length of each kernel.
biases : array, shape = (n_kernels,)
Bias of each kernel.
dilations : array, shape = (n_kernels,)
Dilation of each kernel.
paddings : array, shape = (n_kernels,)
Padding of each kernel.
Returns
-------
X_new : array, shape = (n_samples, 2 * n_kernels)
Extracted features using all the kernels.
"""
n_samples, n_timestamps = X.shape
n_kernels = lengths.size
X_new = np.empty((n_samples, 2 * n_kernels))
for i in prange(n_samples):
for j in prange(n_kernels):
X_new[i, (2 * j):(2 * j + 2)] = apply_one_kernel_one_sample(
X[i], n_timestamps, weights[j], lengths[j],
biases[j], dilations[j], paddings[j]
)
return X_new
[docs]class ROCKET(BaseEstimator, UnivariateTransformerMixin):
"""RandOm Convolutional KErnel Transformation.
This algorithm randomly generates a great variety of convolutional kernels
and extracts two features for each convolution: the maximum and the
proportion of positive values.
Parameters
----------
n_kernels : int (default = 10000)
Number of kernels.
kernel_sizes : array-like (default = (7, 9, 11))
The possible sizes of the kernels.
random_state : None, int or RandomState instance (default = None)
The seed of the pseudo random number generator to use when shuffling
the data. If int, random_state is the seed used by the random number
generator. If RandomState instance, random_state is the random number
generator. If None, the random number generator is the RandomState
instance used by `np.random`.
Attributes
----------
weights_ : array, shape = (n_kernels, max(kernel_sizes))
Weights of the kernels. Zero padding values are added.
length_ : array, shape = (n_kernels,)
Length of each kernel.
bias_ : array, shape = (n_kernels,)
Bias of each kernel.
dilation_ : array, shape = (n_kernels,)
Dilation of each kernel.
padding_ : array, shape = (n_kernels,)
Padding of each kernel.
References
----------
.. [1] A. Dempster, F. Petitjean and G. I. Webb, "ROCKET: Exceptionally
fast and accurate time series classification using random
convolutional kernels". https://arxiv.org/abs/1910.13051.
Examples
--------
>>> from pyts.transformation import ROCKET
>>> X = np.arange(100).reshape(5, 20)
>>> rocket = ROCKET(n_kernels=10)
>>> rocket.fit_transform(X).shape
(5, 20)
"""
[docs] def __init__(self, n_kernels=10000, kernel_sizes=(7, 9, 11),
random_state=None):
self.n_kernels = n_kernels
self.kernel_sizes = kernel_sizes
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""Fit the model according to the given training data.
Parameters
----------
X : array-like, shape = (n_samples, n_timestamps)
Training vector.
y : None or array-like, shape = (n_samples,)
Class labels for each data sample. Ignored.
Returns
-------
self : object
"""
X = check_array(X, dtype='float64')
n_samples, n_timestamps = X.shape
kernel_sizes, seed = self._check_params(n_timestamps)
# Generate the kernels
weights, lengths, biases, dilations, paddings = generate_kernels(
self.n_kernels, n_timestamps, kernel_sizes, seed)
self.weights_ = weights
self.length_ = lengths
self.bias_ = biases
self.dilation_ = dilations.astype('int64')
self.padding_ = paddings.astype('int64')
return self
def _check_params(self, n_timestamps):
if not isinstance(self.n_kernels, (int, np.integer)):
raise TypeError("'n_kernels' must be an integer (got {})."
.format(self.n_kernels))
if not isinstance(self.kernel_sizes, (list, tuple, np.ndarray)):
raise TypeError("'kernel_sizes' must be a list, a tuple or "
"an array (got {}).".format(self.kernel_sizes))
kernel_sizes = check_array(self.kernel_sizes, ensure_2d=False,
dtype='int64', accept_large_sparse=False)
if not np.all(1 <= kernel_sizes):
raise ValueError("All the values in 'kernel_sizes' must be "
"greater than or equal to 1 ({} < 1)."
.format(kernel_sizes.min()))
if not np.all(kernel_sizes <= n_timestamps):
raise ValueError("All the values in 'kernel_sizes' must be lower "
"than or equal to 'n_timestamps' ({} > {})."
.format(kernel_sizes.max(), n_timestamps))
rng = check_random_state(self.random_state)
seed = rng.randint(np.iinfo(np.uint32).max, dtype='u8')
return kernel_sizes, seed