# Source code for pyts.metrics.dtw

```"""Code for Dynamic Time Warping and its variants."""

# Author: Johann Faouzi <johann.faouzi@gmail.com>

import numpy as np
from math import ceil, log2, sqrt
from numba import njit, prange
from sklearn.utils import check_array

@njit()
def _square(x, y):
return (x - y) ** 2

@njit()
def _absolute(x, y):
return abs(x - y)

@njit()
def _cost_matrix_region(x, y, dist, region):
n_timestamps_1, n_timestamps_2 = x.size, y.size
cost_mat = np.full((n_timestamps_1, n_timestamps_2), np.inf)
for i in prange(n_timestamps_1):
for j in prange(region[0, i], region[1, i]):
cost_mat[i, j] = dist(x[i], y[j])
return cost_mat

@njit()
def _project_cost_matrix_region(cost_mat, region):
n_timestamps_1, n_timestamps_2 = cost_mat.shape
cost_mat_projected = np.full((n_timestamps_1, n_timestamps_2), np.inf)
for i in prange(n_timestamps_1):
for j in prange(region[0, i], region[1, i]):
cost_mat_projected[i, j] = cost_mat[i, j]
return cost_mat_projected

@njit()
def _cost_matrix_no_region(x, y, dist):
n_timestamps_1, n_timestamps_2 = x.size, y.size
cost_mat = np.empty((n_timestamps_1, n_timestamps_2))
for j in prange(n_timestamps_2):
for i in prange(n_timestamps_1):
cost_mat[i, j] = dist(x[i], y[j])
return cost_mat

def _check_input_dtw(x, y, precomputed_cost, dist, method):
if dist == "precomputed" and method in ["multiscale", "fast"]:
raise ValueError("The method '{0}' cannot be used with "
"a precomputed cost. Provide the raw time series or"
"use one of the methods: 'classic', 'sakoechiba', "
"'itakura'".format(method))
if dist == "precomputed":
cost = check_array(precomputed_cost, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
n_timestamps_1, n_timestamps_2 = precomputed_cost.shape
else:
cost = None
x = check_array(x, ensure_2d=False, dtype='float64')
y = check_array(y, ensure_2d=False, dtype='float64')
if x.ndim != 1:
raise ValueError("'x' must be a one-dimensional array.")
if y.ndim != 1:
raise ValueError("'y' must be a one-dimensional array.")
n_timestamps_1 = x.size
n_timestamps_2 = y.size

return x, y, cost, n_timestamps_1, n_timestamps_2

def _input_to_cost(x, y, dist, precomputed_cost, region):
"""Computes cost matrix from dtw input."""
if dist == "precomputed":
if region is not None:
cost_mat = _project_cost_matrix_region(precomputed_cost, region)
else:
cost_mat = precomputed_cost.copy()
else:
cost_mat = cost_matrix(x, y, dist=dist, region=region)
cost_mat = check_array(cost_mat, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
return cost_mat

def _check_region(region, n_timestamps_1, n_timestamps_2):
"""Project region on the feasible set."""
region = np.clip(region[:, :n_timestamps_1], 0, n_timestamps_2)
return region

def cost_matrix(x, y, dist='square', region=None):
"""Compute the cost matrix between two samples.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First sample.

y : array-like, shape = (n_timestamps_2,)
Second sample.

dist : 'square', 'absolute' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

region : None or array-like, shape = (2, n_timestamps_1) (default = None)
Constraint region. If None, there is no contraint region.
If array-like, the first row indicates the starting indices (included)
and the second row the ending indices (excluded) of the valid rows
for each column.

Returns
-------
cost_matrix : array, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix.

"""
if dist == 'square':
dist_ = _square
elif dist == 'absolute':
dist_ = _absolute
elif isinstance(dist, str):
raise ValueError("'dist' must be either 'square', 'absolute' or "
"callable (got {0}).".format(dist))
else:
try:
temp = dist(1, 1)
except:
raise ValueError("Calling dist(1, 1) did not work.")
else:
if not isinstance(temp, (int, float)):
raise ValueError("Calling dist(1, 1) did not return a float "
"or an integer.")
dist_ = dist
if region is not None:
region = check_array(region)
region_shape = region.shape
if region_shape != (2, x.size):
raise ValueError(
"The shape of 'region' must be equal to (2, n_timestamps_1) "
"(got {0}).".format(region_shape)
)
if region is None:
cost_mat = _cost_matrix_no_region(x, y, dist_)
else:
cost_mat = _cost_matrix_region(x, y, dist_, region)
return cost_mat

@njit()
def _accumulated_cost_matrix_region(cost_matrix, region):
n_timestamps_1, n_timestamps_2 = cost_matrix.shape
acc_cost_mat = np.ones((n_timestamps_1, n_timestamps_2)) * np.inf
acc_cost_mat[0, 0: region[1, 0]] = np.cumsum(
cost_matrix[0, 0: region[1, 0]]
)
acc_cost_mat[0: region[1, 0], 0] = np.cumsum(
cost_matrix[0: region[1, 0], 0]
)
region_ = np.copy(region)
region_ = np.maximum(region_, 1)
for i in range(1, n_timestamps_1):
for j in range(region_[0, i], region_[1, i]):
acc_cost_mat[i, j] = cost_matrix[i, j] + min(
acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]
)
return acc_cost_mat

@njit()
def _accumulated_cost_matrix_no_region(cost_matrix):
n_timestamps_1, n_timestamps_2 = cost_matrix.shape
acc_cost_mat = np.empty((n_timestamps_1, n_timestamps_2))
acc_cost_mat = np.cumsum(cost_matrix)
acc_cost_mat[:, 0] = np.cumsum(cost_matrix[:, 0])
for j in range(1, n_timestamps_2):
for i in range(1, n_timestamps_1):
acc_cost_mat[i, j] = cost_matrix[i, j] + min(
acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]
)
return acc_cost_mat

def accumulated_cost_matrix(cost_mat, region=None):
"""Compute the accumulated cost matrix.

Parameters
----------
cost_mat : array-like, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix.

region : None or tuple, shape = (2, n_timestamps_1) (default = None)
Constraint region. If None, there is no contraint region.
If array-like, the first row indicates the starting indices (included)
and the second row the ending indices (excluded) of the valid rows
for each column.

Returns
-------
acc_cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix.

"""
cost_mat = check_array(cost_mat, ensure_min_samples=2,
ensure_min_features=2, ensure_2d=True,
force_all_finite=False, dtype='float64')
cost_mat_shape = cost_mat.shape

if region is None:
acc_cost_mat = _accumulated_cost_matrix_no_region(cost_mat)
else:
region = check_array(region, dtype='int64')
region_shape = region.shape
if region_shape != (2, cost_mat_shape):
raise ValueError("The shape of 'region' must be equal to "
"(2, n_timestamps_1) "
"(got {0})".format(region_shape)
)
acc_cost_mat = _accumulated_cost_matrix_region(cost_mat, region)
return acc_cost_mat

@njit()
def _return_path(acc_cost_mat):
n_timestamps_1, n_timestamps_2 = acc_cost_mat.shape
path = [(n_timestamps_1 - 1, n_timestamps_2 - 1)]
while path[-1] != (0, 0):
i, j = path[-1]
if i == 0:
path.append((0, j - 1))
elif j == 0:
path.append((i - 1, 0))
else:
arr = np.array([acc_cost_mat[i - 1][j - 1],
acc_cost_mat[i - 1][j],
acc_cost_mat[i][j - 1]])
argmin = np.argmin(arr)
if argmin == 0:
path.append((i - 1, j - 1))
elif argmin == 1:
path.append((i - 1, j))
else:
path.append((i, j - 1))
return np.transpose(np.array(path)[::-1])

def _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost=False, return_accumulated=False,
return_path=False):
"""Return the results according to the parameters."""
res = (dtw_dist, )
if return_cost:
res += (cost_mat, )
if return_accumulated:
res += (acc_cost_mat, )
if return_path:
path = _return_path(acc_cost_mat)
res += (path, )
if len(res) == 1:
return res
else:
return res

[docs]def dtw_classic(x=None, y=None, dist='square', precomputed_cost=None,
return_cost=False, return_accumulated=False,
return_path=False):
"""Classic Dynamic Time Warping (DTW) distance between two time series.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.

y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If 'precomputed',
`precomputed_cost` must be the cost matrix. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None).
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : array, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

References
----------
..  H. Sakoe and S. Chiba, “Dynamic programming algorithm optimization
for spoken word recognition”. IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).

Examples
--------
>>> from pyts.metrics import dtw_classic
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw_classic(x, y)
2.0

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="classic")
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=None)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=None)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

[docs]def dtw_region(x=None, y=None, dist='square', region=None,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance with a constraint region.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.

y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If 'precomputed',
`precomputed_cost` must be the cost matrix. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

region : None or array-like, shape = (2, n_timestamps_1)
Constraint region. If None, no constraint region is used. Otherwise,
the first row consists of the starting indices (included) and the
second row consists of the ending indices (excluded) of the valid rows
for each column.

precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None).
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : array, shape = (n_timestamps_1 n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : array, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

Examples
--------
>>> from pyts.metrics import dtw_region
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> region = [[0, 1, 1], [2, 2, 3]]
>>> dtw_region(x, y, region=region)
2.23...

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="region")
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

def _check_sakoe_chiba_params(n_timestamps_1, n_timestamps_2, window_size):
"""Check and set some parameters of the sakoe-chiba band."""
if not isinstance(n_timestamps_1, (int, np.integer)):
raise TypeError("'n_timestamps_1' must be an integer.")
else:
if not n_timestamps_1 >= 2:
raise ValueError("'n_timestamps_1' must be an integer greater than"
" or equal to 2.")
if not isinstance(window_size, (int, np.integer, float, np.floating)):
raise TypeError("'window_size' must be an integer or a float.")
n_timestamps = max(n_timestamps_1, n_timestamps_2)

if isinstance(window_size, (float, np.floating)):
if not 0. <= window_size <= 1.:
raise ValueError("The given 'window_size' is a float, "
"it must be between "
"0. and 1. To set the size of the sakoe-chiba "
"manually, 'window_size' must be an integer.")
window_size = ceil(window_size * (n_timestamps - 1))
else:
if not 0 <= window_size <= (n_timestamps - 1):
raise ValueError(
"The given 'window_size' is an integer, it must "
"be greater "
"than or equal to 0 and lower than max('n_timestamps_1', "
"'n_timestamps_2')."
)

scale = (n_timestamps_2 - 1) / (n_timestamps_1 - 1)

if n_timestamps_2 > n_timestamps_1:
window_size = max(window_size, scale / 2)
horizontal_shift = 0
vertical_shift = window_size
elif n_timestamps_1 > n_timestamps_2:
window_size = max(window_size, 0.5 / scale)
horizontal_shift = window_size
vertical_shift = 0
else:
horizontal_shift = 0
vertical_shift = window_size
return scale, horizontal_shift, vertical_shift

[docs]def sakoe_chiba_band(n_timestamps_1, n_timestamps_2=None, window_size=0.1):
"""Compute the Sakoe-Chiba band.

Parameters
----------
n_timestamps_1 : int
The size of the first time series.

n_timestamps_2 : int (optional, default None)
The size of the second time series. If None, set to `n_timestamps_1`.

window_size : float or int (default = 0.1)
The window size above and below the diagonale.
If float, `window_size must be between 0 and
1, and the actual window size will be computed as:
``ceil(window_size * max((n_timestamps_1, n_timestamps_2) - 1))``.
If int, `window_size` must be the largest temporal shift allowed.
Each cell whose distance with the diagonale is lower than or equal to
'window_size' becomes a valid cell for the path.

Returns
-------
region : array, shape = (2, n_timestamps_1)
Constraint region. The first row consists of the starting indices
(included) and the second row consists of the ending indices (excluded)
of the valid rows for each column.

References
----------
..  H. Sakoe and S. Chiba, “Dynamic programming algorithm optimization
for spoken word recognition”. IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).

Examples
--------
>>> from pyts.metrics import sakoe_chiba_band
>>> print(sakoe_chiba_band(5, window_size=0.5))
[[0 0 0 1 2]
[3 4 5 5 5]]

"""
if n_timestamps_2 is None:
n_timestamps_2 = n_timestamps_1
scale, horizontal_shift, vertical_shift = \
_check_sakoe_chiba_params(n_timestamps_1, n_timestamps_2, window_size)

lower_bound = scale * (np.arange(n_timestamps_1) - horizontal_shift) \
- vertical_shift
lower_bound = np.round(lower_bound, 2)
lower_bound = np.ceil(lower_bound)
upper_bound = scale * (np.arange(n_timestamps_1) + horizontal_shift) \
+ vertical_shift
upper_bound = np.round(upper_bound, 2)
upper_bound = np.floor(upper_bound) + 1
region = np.asarray([lower_bound, upper_bound]).astype('int64')
region = _check_region(region, n_timestamps_1, n_timestamps_2)
return region

[docs]def dtw_sakoechiba(x=None, y=None, dist='square', window_size=0.1,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance with Sakoe-Chiba band constraint.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.

y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If 'precomputed',
`precomputed_cost` must be the cost matrix. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

window_size : float or int (default = 0.1)
The window size above and below the diagonale.
If float, `window_size must be between 0 and
1, and the actual window size will be computed as:
``ceil(window_size * max((n_timestamps_1, n_timestamps_2) - 1))``.
If int, `window_size` must be the largest temporal shift allowed.
Each cell whose distance with the diagonale is lower than or equal to
'window_size' becomes a valid cell for the path.

precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None).
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : array, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : array, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

References
----------
..  H. Sakoe and S. Chiba, “Dynamic programming algorithm optimization
for spoken word recognition”. IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).

Examples
--------
>>> from pyts.metrics import dtw_sakoechiba
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw_sakoechiba(x, y, window_size=1)
2.0

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="sakoechiba")
region = sakoe_chiba_band(n_timestamps_1, n_timestamps_2, window_size)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)

acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

def _get_itakura_slopes(n_timestamps_1, n_timestamps_2, max_slope):
"""Compute the slopes of the parallelogram bounds."""
if not isinstance(n_timestamps_1, (int, np.integer)):
raise TypeError("'n_timestamps_1' must be an integer.")
else:
if not n_timestamps_1 >= 2:
raise ValueError("'n_timestamps_1' must be an integer greater than"
" or equal to 2.")

if not isinstance(max_slope, (int, np.integer, float, np.floating)):
raise TypeError("'max_slope' must be an integer or a float.")
else:
if not max_slope >= 1:
raise ValueError("'max_slope' must be a number greater "
"than or equal to 1.")

min_slope = 1 / max_slope
scale_max = (n_timestamps_2 - 1) / (n_timestamps_1 - 2)
max_slope *= scale_max
max_slope = max(1., max_slope)

scale_min = (n_timestamps_2 - 2) / (n_timestamps_1 - 1)

min_slope *= scale_min
min_slope = min(1., min_slope)
return min_slope, max_slope

[docs]def itakura_parallelogram(n_timestamps_1, n_timestamps_2=None, max_slope=2.):
"""Compute the Itakura parallelogram.

Parameters
----------
n_timestamps_1 : int
The size of the first time series.

n_timestamps_2 : int (optional, default None)
The size of the second time series. If None, set to `n_timestamps_1`.

max_slope : float (default = 2.)
Maximum slope for the parallelogram. Must be >= 1.

Returns
-------
region : array, shape = (2, n_timestamps_1)
Constraint region. The first row consists of the starting indices
(included) and the second row consists of the ending indices (excluded)
of the valid rows for each column.

References
----------
..  F. Itakura, “Minimum prediction residual principle applied to speech
recognition”. IEEE Transactions on Acoustics, Speech, and Signal
Processing, 23(1), 67–72 (1975).

Examples
--------
>>> from pyts.metrics import itakura_parallelogram
>>> print(itakura_parallelogram(5))
[[0 1 1 2 4]
[1 3 4 4 5]]

"""
if n_timestamps_2 is None:
n_timestamps_2 = n_timestamps_1
min_slope_, max_slope_ = _get_itakura_slopes(
n_timestamps_1, n_timestamps_2, max_slope)

# Now we create the piecewise linear functions defining the parallelogram
# lower_bound = min_slope * x
# lower_bound = max_slope * (x - n_timestamps_1) + n_timestamps_2

centered_scale = np.arange(n_timestamps_1) - n_timestamps_1 + 1
lower_bound = np.empty((2, n_timestamps_1))
lower_bound = min_slope_ * np.arange(n_timestamps_1)
lower_bound = max_slope_ * centered_scale + n_timestamps_2 - 1

# take the max of the lower linear funcs
lower_bound = np.round(lower_bound, 2)
lower_bound = np.ceil(np.max(lower_bound, axis=0))

# upper_bound = max_slope * x
# upper_bound = min_slope * (x - n_timestamps_1) + n_timestamps_2

upper_bound = np.empty((2, n_timestamps_1))
upper_bound = max_slope_ * np.arange(n_timestamps_1) + 1
upper_bound = min_slope_ * centered_scale + n_timestamps_2

# take the min of the upper linear funcs
upper_bound = np.round(upper_bound, 2)
upper_bound = np.floor(np.min(upper_bound, axis=0))

# Little fix for max_slope = 1
if max_slope == 1:
if n_timestamps_2 > n_timestamps_1:
upper_bound[:-1] = lower_bound[1:]
else:
upper_bound = lower_bound + 1

region = np.asarray([lower_bound, upper_bound]).astype('int64')
region = _check_region(region, n_timestamps_1, n_timestamps_2)
return region

[docs]def dtw_itakura(x=None, y=None, dist='square', max_slope=2.,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping distance with Itakura parallelogram constraint.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.

y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If 'precomputed',
`precomputed_cost` must be the cost matrix. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

max_slope : float (default = 2.)
Maximum slope for the parallelogram.

precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None).
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : array, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

References
----------
..  F. Itakura, “Minimum prediction residual principle applied to speech
recognition”. IEEE Transactions on Acoustics, Speech, and Signal
Processing, 23(1), 67–72 (1975).

Examples
--------
>>> from pyts.metrics import dtw_itakura
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw_itakura(x, y, max_slope=1.5)
2.23...

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method="itakura")
region = itakura_parallelogram(n_timestamps_1, n_timestamps_2, max_slope)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

def _blurred_path_region(n_timestamps_1, n_timestamps_2, resolution_level,
n_timestamps_reduced_1, n_timestamps_reduced_2,
path_length = path.shape
path_down = path_up.copy()
path_left = path_up.copy()
path_right = path_up.copy()

for i in range(1, radius + 1):
start = path_length * (i - 1)
end = path_length * i
path_up[0, start: end] += i
path_down[0, start: end] -= i
path_left[1, start: end] -= i
path_right[1, start: end] += i

path_radius = np.c_[path, path_up, path_down, path_left, path_right]

region_reduced = np.empty((2, n_timestamps_reduced_1))
for i in range(n_timestamps_reduced_1):
min_, max_ = np.min(arr), np.max(arr)
region_reduced[0, i] = min_ * resolution_level
region_reduced[1, i] = (max_ + 1) * resolution_level

region = np.repeat(region_reduced, resolution_level, axis=1)
region = _check_region(region, n_timestamps_1, n_timestamps_2)

return region.astype('int64')

def _multiscale_region(x, y, dist, resolution=2, radius=0):
n_timestamps_1 = len(x)
n_timestamps_2 = len(y)
if not isinstance(resolution, (int, np.integer)):
raise TypeError("'resolution' must be an integer.")
if resolution < 1:
raise ValueError("'resolution' must be a positive integer.")
raise TypeError("'radius' must be an integer.")
raise ValueError("'radius' must be a non-negative integer.")

if resolution == 1:
region = None
else:
remainder_1 = n_timestamps_1 % resolution
remainder_2 = n_timestamps_2 % resolution

if remainder_1 != 0:
x_padded = np.append(x, np.repeat(x[-1], resolution - remainder_1))
else:
if remainder_2 != 0:
y_padded = np.append(y, np.repeat(y[-1], resolution - remainder_2))
else:
acc_cost_mat_res = accumulated_cost_matrix(cost_mat_res, region=None)
path_res = _return_path(acc_cost_mat_res)
region = _blurred_path_region(n_timestamps_1, n_timestamps_2,
return region

raise TypeError("'radius' must be an integer.")
raise ValueError("'radius' must be a non-negative integer.")
n_timestamps_1, n_timestamps_2 = len(x), len(y)
region = None
n_timestamps = min(n_timestamps_1, n_timestamps_2)
if n_timestamps > min_size:
n_recursions = ceil(log2(n_timestamps / min_size))
for i in range(n_recursions):
resolution = 2 ** (n_recursions - i)
remainder_1 = n_timestamps_1 % resolution
remainder_2 = n_timestamps_2 % resolution

if remainder_1 != 0:
x, np.repeat(x[-1], resolution - remainder_1)
)
else:
if remainder_2 != 0:
y, np.repeat(y[-1], resolution - remainder_2)
)
else:

dist=dist, region=region)
acc_cost_mat_res = accumulated_cost_matrix(cost_mat_res,
region=region)
path_res = _return_path(acc_cost_mat_res)
n_timestamps_next_1 = ceil((2 * n_timestamps_1) / resolution)
n_timestamps_next_2 = ceil((2 * n_timestamps_2) / resolution)

region = _blurred_path_region(n_timestamps_next_1,
n_timestamps_next_2,

return region

def _compute_region(n_timestamps_1, n_timestamps_2, method, dist,
x=None, y=None, **options):
"""Compute the region of feasible alignment paths."""

if options is None:
options = dict()

if method == 'classic':
region = None
elif method == 'sakoechiba':
region = sakoe_chiba_band(n_timestamps_1, n_timestamps_2, **options)
elif method == 'itakura':
region = itakura_parallelogram(n_timestamps_1, n_timestamps_2,
**options)
elif method == 'multiscale':
region = _multiscale_region(x, y, dist, **options)
elif method == 'fast':
region = _fast_region(x, y, dist, **options)
else:
raise ValueError("'method' must be either 'classic', 'sakoechiba', "
"'itakura', 'multiscale' or 'fast'.")

return region

[docs]def dtw_multiscale(x, y, dist='square', resolution=2, radius=0,
return_cost=False, return_accumulated=False,
return_path=False):
"""Multiscale Dynamic Time Warping distance.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array.

y : array-like, shape = (n_timestamps_2,)
Second array.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

resolution : int (default = 2)
The resolution level.

radius : int (default = 0)
The radius used to expand the constraint region. The optimal path
computed at the resolution level is expanded with `radius` cells to the
top, bottom, left and right of every cell belonging to the optimal
path. It is computed at the resolution level.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : array, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

References
----------
..  M. Müller, H. Mattes and F. Kurth, “An efficient multiscale approach
to audio synchronization”. International Conference on Music
Information Retrieval, 6(1), 192-197 (2006).

Examples
--------
>>> from pyts.metrics import dtw_multiscale
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw_multiscale(x, y, resolution=2)
2.23...

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost=None, dist=dist,
method="multiscale")

region = _multiscale_region(x, y, dist, resolution=resolution,
cost_mat = cost_matrix(x, y, dist=dist, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

[docs]def dtw_fast(x, y, dist='square', radius=0, return_cost=False,
return_accumulated=False, return_path=False):
"""Fast Dynamic Time Warping distance.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array.

y : array-like, shape = (n_timestamps_2,)
Second array.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If 'precomputed',
`precomputed_cost` must be the cost matrix. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.

radius : int (default = 0)
The radius used to expand the constraint region. The optimal path
computed at the resolution level is expanded with `radius` cells to the
top, bottom, left and right of every cell belonging to the optimal
path. It is computed at the resolution level.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dtw_dist : float
The DTW distance between the two arrays.

cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : ndarray, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

References
----------
..  S. Salvador ans P. Chan, “FastDTW: Toward Accurate Dynamic Time
Warping in Linear Time and Space”. KDD Workshop on Mining Temporal
and Sequential Data, 70–80 (2004).

Examples
--------
>>> from pyts.metrics import dtw_fast
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
2.0

"""
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost=None, dist=dist, method="fast")
cost_mat = cost_matrix(x, y, dist=dist, region=region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

[docs]def dtw(x=None, y=None, dist='square', method='classic', options=None,
precomputed_cost=None, return_cost=False,
return_accumulated=False, return_path=False):
"""Dynamic Time Warping (DTW) distance between two samples.

Parameters
----------
x : array-like, shape = (n_timestamps_1,)
First array. Ignored if ``dist == 'precomputed'``.

y : array-like, shape = (n_timestamps_2,)
Second array. Ignored if ``dist == 'precomputed'``.

dist : 'square', 'absolute', 'precomputed' or callable (default = 'square')
Distance used. If 'square', the squared difference is used.
If 'absolute', the absolute difference is used. If callable,
it must be a function with a numba.njit() decorator that takes
as input two numbers (two arguments) and returns a number.
If 'precomputed', ``precomputed_cost`` must be the cost matrix and
``method`` must be 'classic', 'sakoechiba' or 'itakura'.

method : str (default = 'classic')
Method used.  Should be one of

- 'classic': Classic DTW
- 'sakoechiba': DTW with Sakoe-Chiba band constraint
- 'itakura': DTW with Itakura parallelogram constraint
- 'multiscale': MultiscaleDTW
- 'fast': FastDTW

options : None or dict (default = None)
Dictionary of method options

- 'classic': None
- 'sakoechiba': window_size (int or float)
- 'itakura': max_slope (float)
- 'multiscale': resolution (int) and radius (int)

section.

precomputed_cost : array-like, shape = (n_timestamps_1, n_timestamps_2) \
(default = None).
Precomputed cost matrix between the time series.
Ignored if ``dist != 'precomputed'``.

return_cost : bool (default = False)
If True, the cost matrix is returned.

return_accumulated : bool (default = False)
If True, the accumulated cost matrix is returned.

return_path : bool (default = False)
If True, the optimal path is returned.

Returns
-------
dist : float
The DTW distance between the two arrays.

cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Cost matrix. Only returned if ``return_cost=True``.

acc_cost_mat : ndarray, shape = (n_timestamps_1, n_timestamps_2)
Accumulated cost matrix. Only returned if ``return_accumulated=True``.

path : ndarray, shape = (2, path_length)
The optimal path along the cost matrix. The first row consists
of the indices of the optimal path for x while the second row
consists of the indices of the optimal path for y. Only returned
if ``return_path=True``.

Other Parameters
----------------
window_size : float or int (default = 0.1)
The window size above and below the diagonale.
If float, `window_size must be between 0 and
1, and the actual window size will be computed as
``ceil(window_size * max((n_timestamps_1, n_timestamps_2) - 1))``.
If int, `window_size` must be the largest temporal shift allowed.
Each cell whose distance with the diagonale is lower than or equal to
'window_size' becomes a valid cell for the path.

max_slope : float (default = 2.)
Maximum slope for the parallelogram.

resolution : int (default = 2)
The resolution level.

radius : int (default = 0)
The radius used to expand the constraint region. The optimal path
computed at the resolution level is expanded with `radius` cells to the
top, bottom, left and right of every cell belonging to the optimal
path. It is computed at the resolution level.

References
----------
..  H. Sakoe and S. Chiba, "Dynamic programming algorithm optimization
for spoken word recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 26(1), 43-49 (1978).

..  F. Itakura, "Minimum prediction residual principle applied to
speech recognition". IEEE Transactions on Acoustics,
Speech, and Signal Processing, 23(1), 67–72 (1975).

..  M. Müller, H. Mattes and F. Kurth, "An efficient multiscale approach
to audio synchronization". International Conference on Music
Information Retrieval, 6(1), 192-197 (2006).

..  S. Salvador ans P. Chan, "FastDTW: Toward Accurate Dynamic Time
Warping in Linear Time and Space". KDD Workshop on Mining Temporal
and Sequential Data, 70–80 (2004).

Examples
--------
>>> from pyts.metrics import dtw
>>> x = [0, 1, 1]
>>> y = [2, 0, 1]
>>> dtw(x, y, method='sakoechiba', options={'window_size': 0.5})
2.0

"""
if options is None:
options = dict()
x, y, precomputed_cost, n_timestamps_1, n_timestamps_2 = \
_check_input_dtw(x, y, precomputed_cost, dist, method)
region = _compute_region(n_timestamps_1, n_timestamps_2, method, dist, x=x,
y=y, **options)
cost_mat = _input_to_cost(x, y, dist, precomputed_cost, region)
acc_cost_mat = accumulated_cost_matrix(cost_mat, region=region)
dtw_dist = acc_cost_mat[-1, -1]
if dist == 'square':
dtw_dist = sqrt(dtw_dist)

res = _return_results(dtw_dist, cost_mat, acc_cost_mat,
return_cost, return_accumulated, return_path)
return res

[docs]def show_options(method=None, disp=True):
"""Show documentation for additional options of DTW methods.

These are method-specific options that can be supplied through the
``options`` dict.

Parameters
----------
method : None or str (default = None)
If None, shows all methods of the specified solver. Otherwise,
show only the options for the specified method. If str, it must be
either 'classic', 'sakoechiba', 'itakura', 'multiscale' or 'fast'.

disp : bool (default = True)
Whether to print the result rather than returning it.

Returns
-------
text
Either None (for disp=True) or the text string (disp=False).

"""
import textwrap

text = """\n\n"""
if method is None:
text += "classic\n=======\n\n" + dtw_classic.__doc__ + "\n"
text += "sakoechiba\n==========\n\n" + dtw_sakoechiba.__doc__ + "\n"
text += "itakura\n=======\n\n" + dtw_itakura.__doc__ + "\n"
text += "multiscale\n==========\n\n" + dtw_multiscale.__doc__ + "\n"
text += "fast\n====\n\n" + dtw_fast.__doc__ + "\n"
elif method == 'classic':
doc = textwrap.dedent(dtw_classic.__doc__).strip()
text += doc
elif method == 'sakoechiba':
doc = textwrap.dedent(dtw_sakoechiba.__doc__).strip()
text += doc
elif method == 'itakura':
doc = textwrap.dedent(dtw_itakura.__doc__).strip()
text += doc
elif method == 'multiscale':
doc = textwrap.dedent(dtw_multiscale.__doc__).strip()
text += doc
elif method == 'fast':
doc = textwrap.dedent(dtw_fast.__doc__).strip()
text += doc
else:
raise ValueError("'method' must be either None, 'classic', "
"'sakoechiba', 'itakura', 'multiscale' or 'fast'.")
if disp:
print(text)
return
else:
return text
```