"""Code for Dynamic Time Warping and its variants."""
# Author: Johann Faouzi <johann.faouzi@gmail.com>
# License: BSD-3-Clause
import numpy as np
from math import ceil, log2, sqrt
from numba import njit, prange
from sklearn.utils import check_array
@njit()
def _square(x, y):
return (x - y) ** 2
@njit()
def _absolute(x, y):
return abs(x - y)
@njit()
def _cost_matrix_region(x, y, dist, region):
n_timestamps_1, n_timestamps_2 = x.size, y.size
cost_mat = np.full((n_timestamps_1, n_timestamps_2), np.inf)
for i in prange(n_timestamps_1):
for j in prange(region[0, i], region[1, i]):
cost_mat[i, j] = dist(x[i], y[j])
return cost_mat
@njit()
def _project_cost_matrix_region(cost_mat, region):
n_timestamps_1, n_timestamps_2 = cost_mat.shape
cost_mat_projected = np.full((n_timestamps_1, n_timestamps_2), np.inf)
for i in prange(n_timestamps_1):
for j in prange(region[0, i], region[1, i]):
cost_mat_projected[i, j] = cost_mat[i, j]
return cost_mat_projected
@njit()
def _cost_matrix_no_region(x, y, dist):
n_timestamps_1, n_timestamps_2 = x.size, y.size
cost_mat = np.empty((n_timestamps_1, n_timestamps_2))
for j in prange(n_timestamps_2):
for i in prange(n_timestamps_1):
cost_mat[i, j] = dist(x[i], y[j])
return cost_mat
def _check_input_dtw(x, y, precomputed_cost, dist, method):
if dist == "precomputed" and method in ["multiscale", "fast"]:
raise ValueError("The method '{0}' cannot be used with "
"a precomputed cost. Provide the raw time series or"
"use one of the methods: 'classic', 'sakoechiba', "
"'itakura'".format(method))
if dist == "precomputed":
cost = check_array(precomputed_cost, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
n_timestamps_1, n_timestamps_2 = precomputed_cost.shape
else:
cost = None
x = check_array(x, ensure_2d=False, dtype='float64')
y = check_array(y, ensure_2d=False, dtype='float64')
if x.ndim != 1:
raise ValueError("'x' must be a one-dimensional array.")
if y.ndim != 1:
raise ValueError("'y' must be a one-dimensional array.")
n_timestamps_1 = x.size
n_timestamps_2 = y.size
return x, y, cost, n_timestamps_1, n_timestamps_2
def _input_to_cost(x, y, dist, precomputed_cost, region):
"""Computes cost matrix from dtw input."""
if dist == "precomputed":
if region is not None:
cost_mat = _project_cost_matrix_region(precomputed_cost, region)
else:
cost_mat = precomputed_cost.copy()
else:
cost_mat = cost_matrix(x, y, dist=dist, region=region)
cost_mat = check_array(cost_mat, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
return cost_mat
def _check_region(region, n_timestamps_1, n_timestamps_2):
"""Project region on the feasible set."""
region = np.clip(region[:, :n_timestamps_1], 0, n_timestamps_2)
return region
def cost_matrix(x, y, dist='square', region=None):
"""Compute the cost matrix between two samples.
Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First sample.
y : array-like, shape = (n_timestamps_2,)
Second sample.
dist : 'square', 'absolute' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.
region : None or array-like, shape = (2, n_timestamps_1) (default = None)
Constraint region. If None, there is no contraint region.
If array-like, the first row indicates the starting indices (included)
and the second row the ending indices (excluded) of the valid rows
for each column.
Returns
-------
cost_matrix : array, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix.
"""
if dist == 'square':
dist_ = _square
elif dist == 'absolute':
dist_ = _absolute
elif isinstance(dist, str):
raise ValueError("'dist' must be either 'square', 'absolute' or "
"callable (got {0}).".format(dist))
else:
try:
temp = dist(1, 1)
except: # noqa: E722
raise ValueError("Calling dist(1, 1) did not work.")
else:
if not isinstance(temp, (int, float)):
raise ValueError("Calling dist(1, 1) did not return a float "
"or an integer.")
dist_ = dist
if region is not None:
region = check_array(region)
region_shape = region.shape
if region_shape != (2, x.size):
raise ValueError(
"The shape of 'region' must be equal to (2, n_timestamps_1) "
"(got {0}).".format(region_shape)
)
if region is None:
cost_mat = _cost_matrix_no_region(x, y, dist_)
else:
cost_mat = _cost_matrix_region(x, y, dist_, region)
return cost_mat
@njit()
def _accumulated_cost_matrix_region(cost_matrix, region):
n_timestamps_1, n_timestamps_2 = cost_matrix.shape
acc_cost_mat = np.ones((n_timestamps_1, n_timestamps_2)) * np.inf
acc_cost_mat[0, 0: region[1, 0]] = np.cumsum(
cost_matrix[0, 0: region[1, 0]]
)
acc_cost_mat[0: region[1, 0], 0] = np.cumsum(
cost_matrix[0: region[1, 0], 0]
)
region_ = np.copy(region)
region_[0] = np.maximum(region_[0], 1)
for i in range(1, n_timestamps_1):
for j in range(region_[0, i], region_[1, i]):
acc_cost_mat[i, j] = cost_matrix[i, j] + min(
acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]
)
return acc_cost_mat
@njit()
def _accumulated_cost_matrix_no_region(cost_matrix):
n_timestamps_1, n_timestamps_2 = cost_matrix.shape
acc_cost_mat = np.empty((n_timestamps_1, n_timestamps_2))
acc_cost_mat[0] = np.cumsum(cost_matrix[0])
acc_cost_mat[:, 0] = np.cumsum(cost_matrix[:, 0])
for j in range(1, n_timestamps_2):
for i in range(1, n_timestamps_1):
acc_cost_mat[i, j] = cost_matrix[i, j] + min(
acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]
)
return acc_cost_mat
def accumulated_cost_matrix(cost_mat, region=None):
"""Compute the accumulated cost matrix.
Parameters
----------
cost_mat : array-like, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix.
region : None or tuple, shape = (2, n_timestamps_1) (default = None)
Constraint region. If None, there is no contraint region.
If array-like, the first row indicates the starting indices (included)
and the second row the ending indices (excluded) of the valid rows
for each column.
Returns
-------
acc_cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix.
"""
cost_mat = check_array(cost_mat, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
cost_mat_shape = cost_mat.shape
if region is None:
acc_cost_mat = _accumulated_cost_matrix_no_region(cost_mat)
else:
region = check_array(region, dtype='int64')
region_shape = region.shape
if region_shape != (2, cost_mat_shape[0]):
raise ValueError("The shape of 'region' must be equal to "
"(2, n_timestamps_1) "
"(got {0})".format(region_shape)
)
acc_cost_mat = _accumulated_cost_matrix_region(cost_mat, region)
return acc_cost_mat
@njit()
def _return_path(acc_cost_mat):
n_timestamps_1, n_timestamps_2 = acc_cost_mat.shape
path = [(n_timestamps_1 - 1, n_timestamps_2 - 1)]
while path[-1] != (0, 0):
i, j = path[-1]
if i == 0:
path.append((0, j - 1))
elif j == 0:
path.append((i - 1, 0))
else:
arr = np.array([acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]])
argmin = np.argmin(arr)
if argmin == 0:
path.append((i - 1, j - 1))
elif argmin == 1:
path.append((i - 1, j))
else:
path.append((i, j - 1))
return np.transpose(np.array(path)[::-1])
def _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost=False, return_accumulated=False,
return_path=False):
"""Return the results according to the parameters."""
res = (dtw_dist, )
if return_cost:
res += (cost_mat, )
if return_accumulated:
res += (acc_cost_mat, )
if return_path:
path = _return_path(acc_cost_mat)
res += (path, )
if len(res) == 1:
return res[0]
else:
return res
def _dtw_classic(x=None, y=None, dist='square', precomputed_cost=None,
return_cost=False, return_accumulated=False,
return_path=False):
"""Classic Dynamic Time Warping (DTW) distance between two time series.
The classic version of DTW has no constraint region, allowing for possibly
very large time shifts. This can be an upside or a downside depending on
the application. If you do not want to allow large time shifts,
consider using another method with a constraint region.
Options
-------
This method has no options.
References
----------
.. [1] H. Sakoe and S. Chiba, "Dynamic programming algorithm optimization
for spoken word recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="classic")
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=None)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=None)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
def _dtw_region(x=None, y=None, dist='square', region=None,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance with a constraint region.
This version of DTW allows users to provide their own constraint region.
If not provided, no constraint region is used, which is exactly the
classic DTW.
Options
-------
region : None or array-like, shape = (2, n_timestamps_1) (default = None)
Constraint region. If None, no constraint region is used. Otherwise,
the first row consists of the starting indices (included) and the
second row consists of the ending indices (excluded) of the valid rows
for each column.
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="region")
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
def _check_sakoe_chiba_params(n_timestamps_1, n_timestamps_2, window_size):
"""Check and set some parameters of the sakoe-chiba band."""
if not isinstance(n_timestamps_1, (int, np.integer)):
raise TypeError("'n_timestamps_1' must be an integer.")
else:
if not n_timestamps_1 >= 2:
raise ValueError("'n_timestamps_1' must be an integer greater than"
" or equal to 2.")
if not isinstance(window_size, (int, np.integer, float, np.floating)):
raise TypeError("'window_size' must be an integer or a float.")
n_timestamps = max(n_timestamps_1, n_timestamps_2)
if isinstance(window_size, (float, np.floating)):
if not 0. <= window_size <= 1.:
raise ValueError("The given 'window_size' is a float, "
"it must be between "
"0. and 1. To set the size of the sakoe-chiba "
"manually, 'window_size' must be an integer.")
window_size = ceil(window_size * (n_timestamps - 1))
else:
if not 0 <= window_size <= (n_timestamps - 1):
raise ValueError(
"The given 'window_size' is an integer, it must "
"be greater "
"than or equal to 0 and lower than max('n_timestamps_1', "
"'n_timestamps_2')."
)
scale = (n_timestamps_2 - 1) / (n_timestamps_1 - 1)
if n_timestamps_2 > n_timestamps_1:
window_size = max(window_size, scale / 2)
horizontal_shift = 0
vertical_shift = window_size
elif n_timestamps_1 > n_timestamps_2:
window_size = max(window_size, 0.5 / scale)
horizontal_shift = window_size
vertical_shift = 0
else:
horizontal_shift = 0
vertical_shift = window_size
return scale, horizontal_shift, vertical_shift
[docs]def sakoe_chiba_band(n_timestamps_1, n_timestamps_2=None, window_size=0.1):
"""Compute the Sakoe-Chiba band.
Parameters
----------
n_timestamps_1 : int
The size of the first time series.
n_timestamps_2 : int (optional, default None)
The size of the second time series. If None, set to `n_timestamps_1`.
window_size : float or int (default = 0.1)
The window size above and below the diagonale.
If float, `window_size must be between 0 and
1, and the actual window size will be computed as:
``ceil(window_size * max((n_timestamps_1, n_timestamps_2) - 1))``.
If int, `window_size` must be the largest temporal shift allowed.
Each cell whose distance with the diagonale is lower than or equal to
'window_size' becomes a valid cell for the path.
Returns
-------
region : array, shape = (2, n_timestamps_1)
Constraint region. The first row consists of the starting indices
(included) and the second row consists of the ending indices (excluded)
of the valid rows for each column.
References
----------
.. [1] H. Sakoe and S. Chiba, “Dynamic programming algorithm optimization
for spoken word recognition”. IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).
Examples
--------
>>> from pyts.metrics import sakoe_chiba_band
>>> print(sakoe_chiba_band(5, window_size=0.5))
[[0 0 0 1 2]
[3 4 5 5 5]]
"""
if n_timestamps_2 is None:
n_timestamps_2 = n_timestamps_1
scale, horizontal_shift, vertical_shift = \
_check_sakoe_chiba_params(n_timestamps_1, n_timestamps_2, window_size)
lower_bound = scale * (np.arange(n_timestamps_1) - horizontal_shift) \
- vertical_shift
lower_bound = np.round(lower_bound, 2)
lower_bound = np.ceil(lower_bound)
upper_bound = scale * (np.arange(n_timestamps_1) + horizontal_shift) \
+ vertical_shift
upper_bound = np.round(upper_bound, 2)
upper_bound = np.floor(upper_bound) + 1
region = np.asarray([lower_bound, upper_bound]).astype('int64')
region = _check_region(region, n_timestamps_1, n_timestamps_2)
return region
def _dtw_sakoechiba(x=None, y=None, dist='square', window_size=0.1,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance with Sakoe-Chiba band constraint.
The Sakoe-Chiba constraint region is a band centered around the main
diagonal. Any cell whose distance to the diagonale is lower than a
given value is valid for the path.
Options
-------
window_size : float or int (default = 0.1)
The window size above and below the diagonal.
If float, `window_size` must be between 0 and
1, and the actual window size will be computed as
``ceil(window_size * max((n_timestamps_1, n_timestamps_2) - 1))``.
If int, `window_size` must be the largest temporal shift allowed.
Each cell whose distance with the diagonal is lower than or equal to
`window_size` becomes a valid cell for the path.
References
----------
.. [1] H. Sakoe and S. Chiba, "Dynamic programming algorithm optimization
for spoken word recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="sakoechiba")
region = sakoe_chiba_band(n_timestamps_1, n_timestamps_2, window_size)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
def _get_itakura_slopes(n_timestamps_1, n_timestamps_2, max_slope):
"""Compute the slopes of the parallelogram bounds."""
if not isinstance(n_timestamps_1, (int, np.integer)):
raise TypeError("'n_timestamps_1' must be an integer.")
else:
if not n_timestamps_1 >= 2:
raise ValueError("'n_timestamps_1' must be an integer greater than"
" or equal to 2.")
if not isinstance(max_slope, (int, np.integer, float, np.floating)):
raise TypeError("'max_slope' must be an integer or a float.")
else:
if not max_slope >= 1:
raise ValueError("'max_slope' must be a number greater "
"than or equal to 1.")
min_slope = 1 / max_slope
scale_max = (n_timestamps_2 - 1) / (n_timestamps_1 - 2)
max_slope *= scale_max
max_slope = max(1., max_slope)
scale_min = (n_timestamps_2 - 2) / (n_timestamps_1 - 1)
min_slope *= scale_min
min_slope = min(1., min_slope)
return min_slope, max_slope
[docs]def itakura_parallelogram(n_timestamps_1, n_timestamps_2=None, max_slope=2.):
"""Compute the Itakura parallelogram.
Parameters
----------
n_timestamps_1 : int
The size of the first time series.
n_timestamps_2 : int (optional, default None)
The size of the second time series. If None, set to `n_timestamps_1`.
max_slope : float (default = 2.)
Maximum slope for the parallelogram. Must be >= 1.
Returns
-------
region : array, shape = (2, n_timestamps_1)
Constraint region. The first row consists of the starting indices
(included) and the second row consists of the ending indices (excluded)
of the valid rows for each column.
References
----------
.. [1] F. Itakura, "Minimum prediction residual principle applied to
speech recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 23(1), 67–72 (1975).
Examples
--------
>>> from pyts.metrics import itakura_parallelogram
>>> print(itakura_parallelogram(5))
[[0 1 1 2 4]
[1 3 4 4 5]]
"""
if n_timestamps_2 is None:
n_timestamps_2 = n_timestamps_1
min_slope_, max_slope_ = _get_itakura_slopes(
n_timestamps_1, n_timestamps_2, max_slope)
# Now we create the piecewise linear functions defining the parallelogram
# lower_bound[0] = min_slope * x
# lower_bound[1] = max_slope * (x - n_timestamps_1) + n_timestamps_2
centered_scale = np.arange(n_timestamps_1) - n_timestamps_1 + 1
lower_bound = np.empty((2, n_timestamps_1))
lower_bound[0] = min_slope_ * np.arange(n_timestamps_1)
lower_bound[1] = max_slope_ * centered_scale + n_timestamps_2 - 1
# take the max of the lower linear funcs
lower_bound = np.round(lower_bound, 2)
lower_bound = np.ceil(np.max(lower_bound, axis=0))
# upper_bound[0] = max_slope * x
# upper_bound[1] = min_slope * (x - n_timestamps_1) + n_timestamps_2
upper_bound = np.empty((2, n_timestamps_1))
upper_bound[0] = max_slope_ * np.arange(n_timestamps_1) + 1
upper_bound[1] = min_slope_ * centered_scale + n_timestamps_2
# take the min of the upper linear funcs
upper_bound = np.round(upper_bound, 2)
upper_bound = np.floor(np.min(upper_bound, axis=0))
# Little fix for max_slope = 1
if max_slope == 1:
if n_timestamps_2 > n_timestamps_1:
upper_bound[:-1] = lower_bound[1:]
else:
upper_bound = lower_bound + 1
region = np.asarray([lower_bound, upper_bound]).astype('int64')
region = _check_region(region, n_timestamps_1, n_timestamps_2)
return region
def _dtw_itakura(x=None, y=None, dist='square', max_slope=2.,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping distance with Itakura parallelogram constraint.
The Itakura constraint region is a parallelogram. Contrary to the
Sakoe-Chiba band, whose width is constant, the Itakura parallelogram
has a varying width, allowing for larger time shifts in the middle than
at the first and last time points.
Options
-------
max_slope : float (default = 2.)
Maximum slope of the parallelogram.
References
----------
.. [1] F. Itakura, "Minimum prediction residual principle applied to
speech recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 23(1), 67–72 (1975).
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="itakura")
region = itakura_parallelogram(n_timestamps_1, n_timestamps_2, max_slope)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
def _blurred_path_region(n_timestamps_1, n_timestamps_2, resolution_level,
n_timestamps_reduced_1, n_timestamps_reduced_2,
path, radius):
path_length = path.shape[1]
path_up = np.tile(path, radius)
path_down = path_up.copy()
path_left = path_up.copy()
path_right = path_up.copy()
for i in range(1, radius + 1):
start = path_length * (i - 1)
end = path_length * i
path_up[0, start: end] += i
path_down[0, start: end] -= i
path_left[1, start: end] -= i
path_right[1, start: end] += i
path_radius = np.c_[path, path_up, path_down, path_left, path_right]
path_radius[0] = np.clip(path_radius[0], 0, n_timestamps_reduced_1 - 1)
path_radius[1] = np.clip(path_radius[1], 0, n_timestamps_reduced_2 - 1)
region_reduced = np.empty((2, n_timestamps_reduced_1))
for i in range(n_timestamps_reduced_1):
arr = path_radius[1, path_radius[0] == i]
min_, max_ = np.min(arr), np.max(arr)
region_reduced[0, i] = min_ * resolution_level
region_reduced[1, i] = (max_ + 1) * resolution_level
region = np.repeat(region_reduced, resolution_level, axis=1)
region = _check_region(region, n_timestamps_1, n_timestamps_2)
return region.astype('int64')
def _multiscale_region(x, y, dist, resolution=2, radius=0):
n_timestamps_1 = len(x)
n_timestamps_2 = len(y)
if not isinstance(resolution, (int, np.integer)):
raise TypeError("'resolution' must be an integer.")
if resolution < 1:
raise ValueError("'resolution' must be a positive integer.")
if not isinstance(radius, (int, np.integer)):
raise TypeError("'radius' must be an integer.")
if radius < 0:
raise ValueError("'radius' must be a non-negative integer.")
if resolution == 1:
region = None
else:
remainder_1 = n_timestamps_1 % resolution
remainder_2 = n_timestamps_2 % resolution
if remainder_1 != 0:
x_padded = np.append(x, np.repeat(x[-1], resolution - remainder_1))
x_padded = x_padded.reshape(-1, resolution).mean(axis=1)
else:
x_padded = x.reshape(-1, resolution).mean(axis=1)
if remainder_2 != 0:
y_padded = np.append(y, np.repeat(y[-1], resolution - remainder_2))
y_padded = y_padded.reshape(-1, resolution).mean(axis=1)
else:
y_padded = y.reshape(-1, resolution).mean(axis=1)
cost_mat_res = cost_matrix(x_padded, y_padded, dist=dist, region=None)
acc_cost_mat_res = accumulated_cost_matrix(cost_mat_res, region=None)
path_res = _return_path(acc_cost_mat_res)
region = _blurred_path_region(n_timestamps_1, n_timestamps_2,
resolution, x_padded.size,
y_padded.size, path_res,
radius)
return region
def _fast_region(x, y, dist, radius=0):
if not isinstance(radius, (int, np.integer)):
raise TypeError("'radius' must be an integer.")
if radius < 0:
raise ValueError("'radius' must be a non-negative integer.")
n_timestamps_1, n_timestamps_2 = len(x), len(y)
min_size = radius + 2
region = None
n_timestamps = min(n_timestamps_1, n_timestamps_2)
if n_timestamps > min_size:
n_recursions = ceil(log2(n_timestamps / min_size))
for i in range(n_recursions):
resolution = 2 ** (n_recursions - i)
remainder_1 = n_timestamps_1 % resolution
remainder_2 = n_timestamps_2 % resolution
if remainder_1 != 0:
x_padded = np.append(
x, np.repeat(x[-1], resolution - remainder_1)
)
x_padded = x_padded.reshape(-1, resolution).mean(axis=1)
else:
x_padded = x.reshape(-1, resolution).mean(axis=1)
if remainder_2 != 0:
y_padded = np.append(
y, np.repeat(y[-1], resolution - remainder_2)
)
y_padded = y_padded.reshape(-1, resolution).mean(axis=1)
else:
y_padded = y.reshape(-1, resolution).mean(axis=1)
cost_mat_res = cost_matrix(x_padded, y_padded,
dist=dist, region=region)
acc_cost_mat_res = accumulated_cost_matrix(cost_mat_res,
region=region)
path_res = _return_path(acc_cost_mat_res)
n_timestamps_next_1 = ceil((2 * n_timestamps_1) / resolution)
n_timestamps_next_2 = ceil((2 * n_timestamps_2) / resolution)
region = _blurred_path_region(n_timestamps_next_1,
n_timestamps_next_2,
2, x_padded.size, y_padded.size,
path_res, radius)
return region
def _compute_region(n_timestamps_1, n_timestamps_2, method, dist,
x=None, y=None, **options):
"""Compute the region of feasible alignment paths."""
if method == 'classic':
region = None
elif method == 'sakoechiba':
region = sakoe_chiba_band(n_timestamps_1, n_timestamps_2, **options)
elif method == 'itakura':
region = itakura_parallelogram(n_timestamps_1, n_timestamps_2,
**options)
elif method == 'multiscale':
region = _multiscale_region(x, y, dist, **options)
elif method == 'fast':
region = _fast_region(x, y, dist, **options)
else:
raise ValueError("'method' must be either 'classic', 'sakoechiba', "
"'itakura', 'multiscale' or 'fast'.")
return region
def _dtw_multiscale(x, y, dist='square', resolution=2, radius=0,
return_cost=False, return_accumulated=False,
return_path=False):
"""Multiscale Dynamic Time Warping (DTW) distance.
This version of DTW builds an adaptive constraint region. First both time
series are downsampled, with the sizes of the original times series being
divided by ``resolution``. Then the optimal path between the two
downsampled time series is computed. Finally this optimal path is
projected on the orginal scale and used as the constraint region.
Options
-------
resolution : int (default = 2)
The resolution level.
radius : int (default = 0)
The radius used to expand the constraint region. The optimal path
computed at the resolution level is expanded with `radius` cells to the
top, bottom, left and right of every cell belonging to the optimal
path. It is computed at the resolution level.
References
----------
.. [1] M. Müller, H. Mattes and F. Kurth, "An efficient multiscale approach
to audio synchronization". International Conference on Music
Information Retrieval, 6(1), 192-197 (2006).
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost=None, dist=dist,
method="multiscale")
region = _multiscale_region(x, y, dist, resolution=resolution,
radius=radius)
cost_mat = cost_matrix(x, y, dist=dist, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
def _dtw_fast(x, y, dist='square', radius=0, return_cost=False,
return_accumulated=False, return_path=False):
"""Fast Dynamic Time Warping distance.
This version of DTW builds an adaptive constraint region. The constraint
region is created recursively by downsampling the time series, computing
the optimal path and projecting it to a higher resolution. This process
is repeated until the resolution matches the original resolution.
Options
-------
radius : int (default = 0)
The radius used to expand the constraint region. The optimal path
computed at the resolution level is expanded with `radius` cells to the
top, bottom, left and right of every cell belonging to the optimal
path. It is computed at the resolution level.
References
----------
.. [1] S. Salvador ans P. Chan, "FastDTW: Toward Accurate Dynamic Time
Warping in Linear Time and Space". KDD Workshop on Mining Temporal
and Sequential Data, 70–80 (2004).
"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost=None, dist=dist, method="fast")
region = _fast_region(x, y, dist, radius=radius)
cost_mat = cost_matrix(x, y, dist=dist, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
[docs]def dtw(x=None, y=None, dist='square', method='classic', options=None,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance between two samples.
Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.
y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.
dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.
If 'precomputed', ``precomputed_cost`` must be the cost matrix and
``method`` must be 'classic', 'sakoechiba' or 'itakura'.
method : str (default = 'classic')
Method used. Should be one of
- 'classic': :ref:`(see here) <metrics.dtw-classic>`
- 'sakoechiba': :ref:`(see here) <metrics.dtw-sakoechiba>`
- 'itakura': :ref:`(see here) <metrics.dtw-itakura>`
- 'region': :ref:`(see here) <metrics.dtw-region>`
- 'multiscale': :ref:`(see here) <metrics.dtw-multiscale>`
- 'fast': :ref:`(see here) <metrics.dtw-fast>`
options : None or dict (default = None)
Dictionary of method options. Here is a quick summary of the available
options for each method:
- 'classic': None
- 'sakoechiba': window_size (int or float)
- 'itakura': max_slope (float)
- 'region' : region (array-like)
- 'multiscale': resolution (int) and radius (int)
- 'fast': radius (int)
For more information on these options, see :func:`show_options()`.
precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None)
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.
return_cost : bool (default = False)
If True, the cost matrix is returned.
return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.
return_path : bool (default = False)
If True, the optimal path is returned.
Returns
-------
dist : float
The DTW distance between the two arrays.
cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.
acc_cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.
path : ndarray, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.
Notes
-----
This section describes the available versions of Dynamic Time Warping (DTW)
that can be selected by the `method` parameter.
**Unconstrained path**
Method :ref:`'classic'<metrics.dtw-classic>` computes the original DTW
score between two time series with no constraint region: each cell is a
valid cell to find the optimal path minimizing the total cost.
**Global constraint region**
Using no constraint region allows for very large distortion between both
time series, which may be ill-suited for some cases. One possible solution
to this issue is to use a global constraint region for the optimal path.
A cell can be part of the optimal path if and only if this cell is in the
constraint region. Another advantage of using a global constraint region is
to decrease the computational complexity to find the optimal path.
Method :ref:`'sakoechiba'<metrics.dtw-sakoechiba>` uses a band as the
constraint region, allowing for a maximum fixed time shift at every time
point.
Method :ref:`'itakura'<metrics.dtw-itakura>` uses a parallelogram as
the constraint region, allowing for time shifts of varying length: small
time shifts at the beginning and at the end of the time series, larger
time shifts in the middle.
**Adaptative constraint region**
One drawback of global constraint regions is that they do not depend on
the time series and might be ill-suited for some time series. Adaptative
constraint regions are regions that are specific to the two provided
time series. However, their computational complexities are larger that
the ones of global regions.
Method :ref:`'multiscale'<metrics.dtw-multiscale>` downsamples both time
series by a given factor, computes the optimal path with no constraint
region for the downsampled time series, and projects the optimal path on
the original scale to define the constraint region.
Method :ref:`'fast'<metrics.dtw-fast>` can be seen as a recursive version
of 'multiscale': the time series are downsampled so that the new time
series are very small, the optimal path for these time series is computed
and is projected at the scale that is two times larger. This process is
repeated until the original scale of the time series is reached.
References
----------
.. [1] H. Sakoe and S. Chiba, "Dynamic programming algorithm optimization
for spoken word recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).
.. [2] F. Itakura, "Minimum prediction residual principle applied to
speech recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 23(1), 67–72 (1975).
.. [3] M. Müller, H. Mattes and F. Kurth, "An efficient multiscale approach
to audio synchronization". International Conference on Music
Information Retrieval, 6(1), 192-197 (2006).
.. [4] S. Salvador ans P. Chan, "FastDTW: Toward Accurate Dynamic Time
Warping in Linear Time and Space". KDD Workshop on Mining Temporal
and Sequential Data, 70–80 (2004).
Examples
--------
>>> from pyts.metrics import dtw
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw(x, y, method='sakoechiba', options={'window_size': 0.5})
2.0
"""
if options is None:
options = dict()
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method)
region = _compute_region(n_timestamps_1, n_timestamps_2, method, dist, x=x,
y=y, **options)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)
res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res
[docs]def show_options(method=None, disp=True):
"""Show documentation for additional options of Dynamic Time Warping
methods.
These are method-specific options that can be supplied through the
``options`` dict.
Parameters
----------
method : None or str (default = None)
If None, shows all methods. Otherwise, shows only the options for the
specified method. If str, it must be either 'classic', 'sakoechiba',
'itakura', 'region', 'multiscale' or 'fast'.
disp : bool (default = True)
Whether to print the result rather than returning it.
Returns
-------
text : None or str
Either None (if ``disp=True``) or the text string (if ``disp=False``).
Notes
-----
The available methods are:
- :ref:`'classic' <metrics.dtw-classic>`
- :ref:`'sakoechiba' <metrics.dtw-sakoechiba>`
- :ref:`'itakura' <metrics.dtw-itakura>`
- :ref:`'region' <metrics.dtw-region>`
- :ref:`'multiscale' <metrics.dtw-multiscale>`
- :ref:`'fast' <metrics.dtw-fast>`
"""
import textwrap
def get_options(string):
start = ' Options'
end = ' References'
str_lines = string.splitlines()
start_idx = str_lines.index(start)
end_idx = str_lines.index(end)
doc = textwrap.dedent('\n'.join(str_lines[:start_idx])) + '\n'
doc += textwrap.dedent('\n'.join(str_lines[start_idx:end_idx])) + '\n'
return doc
text = """\n"""
if method is None:
methods = ('classic', 'sakoechiba', 'itakura', 'region',
'multiscale', 'fast')
for met in methods:
text += show_options(met, disp=False)
elif method == 'classic':
text += "classic\n=======\n\n"
text += get_options(_dtw_classic.__doc__)
elif method == 'sakoechiba':
text += "sakoechiba\n==========\n\n"
text += get_options(_dtw_sakoechiba.__doc__)
elif method == 'itakura':
text += "itakura\n=======\n\n"
text += get_options(_dtw_itakura.__doc__)
elif method == 'region':
text += "region\n=======\n\n"
text += get_options(_dtw_itakura.__doc__)
elif method == 'multiscale':
text += "multiscale\n==========\n\n"
text += get_options(_dtw_multiscale.__doc__)
elif method == 'fast':
text += "fast\n====\n\n"
text += get_options(_dtw_fast.__doc__)
else:
raise ValueError("'method' must be either None, 'classic', "
"'sakoechiba', 'itakura', 'region', 'multiscale' "
"or 'fast'.")
if disp:
print(text)
return
else:
return text