Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ vectorized feature function support #58

Merged
merged 9 commits into from
Mar 23, 2022
4 changes: 3 additions & 1 deletion docs/pdoc_template/css.mako
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,10 @@
.admonition-title {
font-weight: bold;
}
.admonition.info {
background: #edfcf4;
}
.admonition.note,
.admonition.info,
.admonition.important {
background: #ebf3ff;
}
Expand Down
112 changes: 112 additions & 0 deletions tests/test_features_feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,98 @@ def test_mixed_featuredescriptors_time_data(dummy_data):
out = fc.calculate([df_eda, df_tmp], return_df=True)
assert all(out.notna().sum(axis=0))


### Test vectorized features

def test_basic_vectorized_features(dummy_data):
fs = 4 # The sample frequency in Hz
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.max, "EDA", 250*fs, 75*fs),
FeatureDescriptor(
FuncWrapper(np.max, output_names="max_", vectorized=True, axis=-1),
"EDA", 250*fs, 75*fs,
)
]
)
res = fc.calculate(dummy_data)

assert len(res) == 2
assert (len(res[0]) > 1) and (len(res[1]) > 1)
assert np.all(res[0].index == res[1].index)
assert np.all(res[0].values == res[1].values)


def test_time_based_vectorized_features(dummy_data):
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.max, "EDA", "5min", "3min"),
FeatureDescriptor(
FuncWrapper(np.max, output_names="max_", vectorized=True, axis=-1),
"EDA", "5min", "3min",
)
]
)
res = fc.calculate(dummy_data)

assert len(res) == 2
assert (len(res[0]) > 1) and (len(res[1]) > 1)
assert np.all(res[0].index == res[1].index)
assert np.all(res[0].values == res[1].values)


def test_multiple_outputs_vectorized_features(dummy_data):
def sum_mean(x, axis):
s = np.sum(x, axis)
return s, s / x.shape[axis]

fs = 4 # The sample frequency in Hz
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.sum, "EDA", 250*fs, 75*fs),
FeatureDescriptor(np.mean, "EDA", 250*fs, 75*fs),
FeatureDescriptor(
FuncWrapper(
sum_mean, output_names=["sum_vect", "mean_vect"],
vectorized=True, axis=1
),
"EDA", 250*fs, 75*fs,
)
]
)

res = fc.calculate(dummy_data, return_df=True)

assert res.shape[1] == 4
s = "EDA__"; p = "__w=1000_s=300"
assert np.all(res[s+"sum"+p].values == res[s+"sum_vect"+p].values)
assert np.all(res[s+"mean"+p].values == res[s+"mean_vect"+p].values)


def test_multiple_inputs_vectorized_features(dummy_data):
def windowed_diff(x1, x2):
return np.sum(x1, axis=-1) - np.sum(x2, axis=-1)

fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.sum, "EDA", "5min", "2.5min"),
FeatureDescriptor(np.sum, "TMP", "5min", "2.5min"),
FeatureDescriptor(
FuncWrapper(windowed_diff, vectorized=True),
("EDA", "TMP"), "5min", "2.5min"
)
]
)

res = fc.calculate(dummy_data, return_df=True)

assert res.shape[1] == 3
assert res.shape[0] > 1
p = "__w=5m_s=2m30s"
manual_diff = res["EDA__sum"+p].values - res["TMP__sum"+p].values
assert np.all(res["EDA|TMP__windowed_diff"+p].values == manual_diff)


### Test 'error' use-cases


Expand Down Expand Up @@ -1117,3 +1209,23 @@ def test_serialization(dummy_data):
out[col_order].values, out_deserialized[col_order].values, equal_nan=True
)
os.remove(save_path)


def test_vectorized_irregularly_sampled_data(dummy_data):
fc = FeatureCollection(
feature_descriptors=FeatureDescriptor(
FuncWrapper(np.std, vectorized=True, axis=1),
"EDA", window="5min", stride="3s"
)
)

df_eda = dummy_data["EDA"].dropna()
df_eda.iloc[[3+66*i for i in range(5)]] = np.nan
df_eda = df_eda.dropna()

assert len(df_eda) < len(dummy_data["EDA"].dropna())

# Fails bc of irregularly sampled data
# -> is a strict requirement to apply a vectorized feature function
with pytest.raises(Exception):
jvdd marked this conversation as resolved.
Show resolved Hide resolved
fc.calculate(df_eda)
17 changes: 16 additions & 1 deletion tests/test_features_func_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ def max_diff(x: pd.Series, mult=1):
assert func2(dummy_data["EDA"]) == 0.25*3


def test_error_func_wrapper_wrong_outputnames_type(dummy_data):
def test_vectorized_func_wrapper(dummy_data):
jvdd marked this conversation as resolved.
Show resolved Hide resolved
func_cols = FuncWrapper(np.max, vectorized=True, axis=0) # Axis = columns
func_rows = FuncWrapper(np.max, vectorized=True, axis=1) # Axis = rows

assert func_cols.output_names == ['amax']
assert func_rows.output_names == ['amax']
assert np.allclose(func_cols(dummy_data.values), dummy_data.max().values)
assert np.allclose(func_rows(dummy_data.values), dummy_data.max(axis=1).values)


def test_error_func_wrapper_wrong_outputnames_type():
with pytest.raises(TypeError):
FuncWrapper(np.min, output_names=5)


def test_illegal_func_wrapper_vectorized_wrong_input_type():
with pytest.raises(AssertionError):
FuncWrapper(np.min, input_type=pd.Series, vectorized=True, axis=1)
36 changes: 28 additions & 8 deletions tsflex/features/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from ..utils.classes import FrozenClass
from .. import __pdoc__

__pdoc__['FuncWrapper.__call__'] = True
__pdoc__["FuncWrapper.__call__"] = True


class FuncWrapper(FrozenClass):
"""Function wrapper.

A function wrapper which takes a numpy array / pandas series as input and returns
one or multiple values. It also defines the names of the function outputs, and
A function wrapper which takes a numpy array / pandas series as input and returns
one or multiple values. It also defines the names of the function outputs, and
stores the function its keyword arguments.

Parameters
Expand All @@ -27,13 +27,28 @@ class FuncWrapper(FrozenClass):
output_names : Union[List[str], str], optional
The name of the outputs of the function, by default None.
input_type: Union[np.array, pd.Series], optional
The input type that the function requires (either np.array or pd.Series), by
The input type that the function requires (either np.array or pd.Series), by
default np.array.
.. Note::
.. Note::
Make sure to only set this argument to pd.Series if the function requires
a pd.Series, since pd.Series strided-rolling is significantly less efficient.
For a np.array it is possible to create very efficient views, but there is no
a pd.Series, since pd.Series strided-rolling is significantly less efficient.
For a np.array it is possible to create very efficient views, but there is no
such thing as a pd.Series view. Thus, for each stroll, a new series is created.
vectorized: bool, optional
Flag indicating whether `func` should be executed vectorized over all the
segmented windows, by default False.
.. Info::
A vectorized function should take one or multiple series that each have the
shape (nb. segmented windows, window size).
For example a vectorized version of `np.max` is
``FuncWrapper(np.max, vectorized=True, axis=1)``.
.. Note::
* A function can only be applied in vectorized manner when the required
series are REGULARLY sampled (and have the same index in case of multiple
required series).
* The `input_type` should be `np.array` when `vectorized` is True. It does
not make sense to use a `pd.Series`, as the index should be regularly
sampled (see requirement above).
**kwargs: dict, optional
Keyword arguments which will be also passed to the `function`

Expand All @@ -49,6 +64,7 @@ def __init__(
func: Callable,
output_names: Optional[Union[List[str], str]] = None,
input_type: Optional[Union[np.array, pd.Series]] = np.array,
vectorized: bool = False,
**kwargs,
):
"""Create FuncWrapper instance."""
Expand All @@ -64,8 +80,12 @@ def __init__(
else:
raise TypeError(f"`output_names` is unexpected type {type(output_names)}")

assert input_type in SUPPORTED_STROLL_TYPES
assert input_type in SUPPORTED_STROLL_TYPES, "Invalid input_type!"
assert not (
vectorized & (input_type is not np.array)
), "The input_type must be np.array if vectorized is True!"
self.input_type = input_type
self.vectorized = vectorized

self._freeze()

Expand Down
129 changes: 117 additions & 12 deletions tsflex/features/segmenter/strided_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,80 @@ def apply_func(self, func: FuncWrapper) -> pd.DataFrame:
# expression only once, whereas a list comprehension evaluates its expression
# every time).
# See more why: https://stackoverflow.com/a/59838723
out = np.array(
list(
map(
func,
*[
[
sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]]
for idx in range(len(self.index))
]
for sc in self.series_containers
],
out: np.array = None
if func.vectorized:
# Vectorized function execution

## IMPL 1
## Results in a high memory peak as a new np.array is created (and thus no
## view is being used)
# out = np.asarray(
jvdd marked this conversation as resolved.
Show resolved Hide resolved
# func(
# *[
# np.array([
# sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]]
# for idx in range(len(self.index))
# ])
# for sc in self.series_containers
# ],
# )
# )

## IMPL 2
## Is a good implementation (equivalent to the one below), will also fail in
## the same cases, but it does not perform clear assertions (with their
## accompanied clear messages).
# out = np.asarray(
jvdd marked this conversation as resolved.
Show resolved Hide resolved
# func(
# *[
# _sliding_strided_window_1d(sc.values, self.window, self.stride)
# for sc in self.series_containers
# ],
# )
# )

views = []
for sc in self.series_containers:
windows = sc.end_indexes - sc.start_indexes
strides = sc.start_indexes[1:] - sc.start_indexes[:-1]
assert np.all(
windows == windows[0]
), "Vectorized functions require same number of samples in each segmented window!"
assert np.all(
strides == strides[0]
), "Vectorized functions require same number of samples as stride!"
views.append(
_sliding_strided_window_1d(sc.values, windows[0], strides[0])
)
out = func(*views)

out_type = type(out)
out = np.asarray(out)
# When multiple outputs are returned (= tuple) they should be transposed
# when combining into an array
out = out.T if out_type is tuple else out

else:
# Sequential function execution (default)
out = np.array(
list(
map(
func,
*[
[
sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]]
for idx in range(len(self.index))
]
for sc in self.series_containers
],
)
)
)
)

# Check if the function output is valid.
# This assertion will be raised when e.g. np.max is applied vectorized without
# specifying axis=1.
assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!"

# Aggregate function output in a dictionary
feat_out = {}
Expand Down Expand Up @@ -489,3 +549,48 @@ def _construct_start_end_times(self) -> Tuple[np.ndarray, np.ndarray]:
)
np_end_times = np_start_times + self.window
return np_start_times, np_end_times


def _sliding_strided_window_1d(data: np.ndarray, window: int, step: int):
"""View based sliding strided-window for 1-dimensional data.

Parameters
----------
data: np.array
The 1-dimensional series to slide over.
window: int
The window size, in number of samples.
step: int
The step size (i.e., the stride), in number of samples.

Returns
-------
nd.array
A view of the sliding strided window of the data.

"""
# window and step in samples
assert data.ndim == 1, "data must be 1 dimensional"

if isinstance(window, float):
assert window.is_integer(), "window must be an int!"
window = int(window)
if isinstance(step, float):
assert step.is_integer(), "step must be an int!"
step = int(step)

assert (step >= 1) & (window < len(data))

shape = [
np.ceil(len(data) / step - window / step).astype(int),
window,
]

strides = [
data.strides[0] * step,
data.strides[0],
]

return np.lib.stride_tricks.as_strided(
data, shape=shape, strides=strides#, writeable=False
)
Loading