diff --git a/docs/pdoc_template/css.mako b/docs/pdoc_template/css.mako index 519cae6e..b8db6fda 100644 --- a/docs/pdoc_template/css.mako +++ b/docs/pdoc_template/css.mako @@ -282,8 +282,10 @@ .admonition-title { font-weight: bold; } + .admonition.info { + background: #edfcf4; + } .admonition.note, - .admonition.info, .admonition.important { background: #ebf3ff; } diff --git a/tests/test_features_feature_collection.py b/tests/test_features_feature_collection.py index 9dc93c89..91affffd 100644 --- a/tests/test_features_feature_collection.py +++ b/tests/test_features_feature_collection.py @@ -868,6 +868,98 @@ def test_mixed_featuredescriptors_time_data(dummy_data): out = fc.calculate([df_eda, df_tmp], return_df=True) assert all(out.notna().sum(axis=0)) + +### Test vectorized features + +def test_basic_vectorized_features(dummy_data): + fs = 4 # The sample frequency in Hz + fc = FeatureCollection( + feature_descriptors=[ + FeatureDescriptor(np.max, "EDA", 250*fs, 75*fs), + FeatureDescriptor( + FuncWrapper(np.max, output_names="max_", vectorized=True, axis=-1), + "EDA", 250*fs, 75*fs, + ) + ] + ) + res = fc.calculate(dummy_data) + + assert len(res) == 2 + assert (len(res[0]) > 1) and (len(res[1]) > 1) + assert np.all(res[0].index == res[1].index) + assert np.all(res[0].values == res[1].values) + + +def test_time_based_vectorized_features(dummy_data): + fc = FeatureCollection( + feature_descriptors=[ + FeatureDescriptor(np.max, "EDA", "5min", "3min"), + FeatureDescriptor( + FuncWrapper(np.max, output_names="max_", vectorized=True, axis=-1), + "EDA", "5min", "3min", + ) + ] + ) + res = fc.calculate(dummy_data) + + assert len(res) == 2 + assert (len(res[0]) > 1) and (len(res[1]) > 1) + assert np.all(res[0].index == res[1].index) + assert np.all(res[0].values == res[1].values) + + +def test_multiple_outputs_vectorized_features(dummy_data): + def sum_mean(x, axis): + s = np.sum(x, axis) + return s, s / x.shape[axis] + + fs = 4 # The sample frequency in Hz + fc = FeatureCollection( + feature_descriptors=[ + FeatureDescriptor(np.sum, "EDA", 250*fs, 75*fs), + FeatureDescriptor(np.mean, "EDA", 250*fs, 75*fs), + FeatureDescriptor( + FuncWrapper( + sum_mean, output_names=["sum_vect", "mean_vect"], + vectorized=True, axis=1 + ), + "EDA", 250*fs, 75*fs, + ) + ] + ) + + res = fc.calculate(dummy_data, return_df=True) + + assert res.shape[1] == 4 + s = "EDA__"; p = "__w=1000_s=300" + assert np.all(res[s+"sum"+p].values == res[s+"sum_vect"+p].values) + assert np.all(res[s+"mean"+p].values == res[s+"mean_vect"+p].values) + + +def test_multiple_inputs_vectorized_features(dummy_data): + def windowed_diff(x1, x2): + return np.sum(x1, axis=-1) - np.sum(x2, axis=-1) + + fc = FeatureCollection( + feature_descriptors=[ + FeatureDescriptor(np.sum, "EDA", "5min", "2.5min"), + FeatureDescriptor(np.sum, "TMP", "5min", "2.5min"), + FeatureDescriptor( + FuncWrapper(windowed_diff, vectorized=True), + ("EDA", "TMP"), "5min", "2.5min" + ) + ] + ) + + res = fc.calculate(dummy_data, return_df=True) + + assert res.shape[1] == 3 + assert res.shape[0] > 1 + p = "__w=5m_s=2m30s" + manual_diff = res["EDA__sum"+p].values - res["TMP__sum"+p].values + assert np.all(res["EDA|TMP__windowed_diff"+p].values == manual_diff) + + ### Test 'error' use-cases @@ -1117,3 +1209,23 @@ def test_serialization(dummy_data): out[col_order].values, out_deserialized[col_order].values, equal_nan=True ) os.remove(save_path) + + +def test_vectorized_irregularly_sampled_data(dummy_data): + fc = FeatureCollection( + feature_descriptors=FeatureDescriptor( + FuncWrapper(np.std, vectorized=True, axis=1), + "EDA", window="5min", stride="3s" + ) + ) + + df_eda = dummy_data["EDA"].dropna() + df_eda.iloc[[3+66*i for i in range(5)]] = np.nan + df_eda = df_eda.dropna() + + assert len(df_eda) < len(dummy_data["EDA"].dropna()) + + # Fails bc of irregularly sampled data + # -> is a strict requirement to apply a vectorized feature function + with pytest.raises(Exception): + fc.calculate(df_eda) diff --git a/tests/test_features_func_wrapper.py b/tests/test_features_func_wrapper.py index 401b1de5..cbff8459 100644 --- a/tests/test_features_func_wrapper.py +++ b/tests/test_features_func_wrapper.py @@ -85,6 +85,21 @@ def max_diff(x: pd.Series, mult=1): assert func2(dummy_data["EDA"]) == 0.25*3 -def test_error_func_wrapper_wrong_outputnames_type(dummy_data): +def test_vectorized_func_wrapper(dummy_data): + func_cols = FuncWrapper(np.max, vectorized=True, axis=0) # Axis = columns + func_rows = FuncWrapper(np.max, vectorized=True, axis=1) # Axis = rows + + assert func_cols.output_names == ['amax'] + assert func_rows.output_names == ['amax'] + assert np.allclose(func_cols(dummy_data.values), dummy_data.max().values) + assert np.allclose(func_rows(dummy_data.values), dummy_data.max(axis=1).values) + + +def test_error_func_wrapper_wrong_outputnames_type(): with pytest.raises(TypeError): FuncWrapper(np.min, output_names=5) + + +def test_illegal_func_wrapper_vectorized_wrong_input_type(): + with pytest.raises(AssertionError): + FuncWrapper(np.min, input_type=pd.Series, vectorized=True, axis=1) diff --git a/tsflex/features/function_wrapper.py b/tsflex/features/function_wrapper.py index c49bbb52..4a639833 100644 --- a/tsflex/features/function_wrapper.py +++ b/tsflex/features/function_wrapper.py @@ -10,14 +10,14 @@ from ..utils.classes import FrozenClass from .. import __pdoc__ -__pdoc__['FuncWrapper.__call__'] = True +__pdoc__["FuncWrapper.__call__"] = True class FuncWrapper(FrozenClass): """Function wrapper. - A function wrapper which takes a numpy array / pandas series as input and returns - one or multiple values. It also defines the names of the function outputs, and + A function wrapper which takes a numpy array / pandas series as input and returns + one or multiple values. It also defines the names of the function outputs, and stores the function its keyword arguments. Parameters @@ -27,13 +27,28 @@ class FuncWrapper(FrozenClass): output_names : Union[List[str], str], optional The name of the outputs of the function, by default None. input_type: Union[np.array, pd.Series], optional - The input type that the function requires (either np.array or pd.Series), by + The input type that the function requires (either np.array or pd.Series), by default np.array. - .. Note:: + .. Note:: Make sure to only set this argument to pd.Series if the function requires - a pd.Series, since pd.Series strided-rolling is significantly less efficient. - For a np.array it is possible to create very efficient views, but there is no + a pd.Series, since pd.Series strided-rolling is significantly less efficient. + For a np.array it is possible to create very efficient views, but there is no such thing as a pd.Series view. Thus, for each stroll, a new series is created. + vectorized: bool, optional + Flag indicating whether `func` should be executed vectorized over all the + segmented windows, by default False. + .. Info:: + A vectorized function should take one or multiple series that each have the + shape (nb. segmented windows, window size). + For example a vectorized version of `np.max` is + ``FuncWrapper(np.max, vectorized=True, axis=1)``. + .. Note:: + * A function can only be applied in vectorized manner when the required + series are REGULARLY sampled (and have the same index in case of multiple + required series). + * The `input_type` should be `np.array` when `vectorized` is True. It does + not make sense to use a `pd.Series`, as the index should be regularly + sampled (see requirement above). **kwargs: dict, optional Keyword arguments which will be also passed to the `function` @@ -49,6 +64,7 @@ def __init__( func: Callable, output_names: Optional[Union[List[str], str]] = None, input_type: Optional[Union[np.array, pd.Series]] = np.array, + vectorized: bool = False, **kwargs, ): """Create FuncWrapper instance.""" @@ -64,8 +80,12 @@ def __init__( else: raise TypeError(f"`output_names` is unexpected type {type(output_names)}") - assert input_type in SUPPORTED_STROLL_TYPES + assert input_type in SUPPORTED_STROLL_TYPES, "Invalid input_type!" + assert not ( + vectorized & (input_type is not np.array) + ), "The input_type must be np.array if vectorized is True!" self.input_type = input_type + self.vectorized = vectorized self._freeze() diff --git a/tsflex/features/segmenter/strided_rolling.py b/tsflex/features/segmenter/strided_rolling.py index 304e098d..3c7d3d2b 100644 --- a/tsflex/features/segmenter/strided_rolling.py +++ b/tsflex/features/segmenter/strided_rolling.py @@ -250,20 +250,80 @@ def apply_func(self, func: FuncWrapper) -> pd.DataFrame: # expression only once, whereas a list comprehension evaluates its expression # every time). # See more why: https://stackoverflow.com/a/59838723 - out = np.array( - list( - map( - func, - *[ - [ - sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] - for idx in range(len(self.index)) - ] - for sc in self.series_containers - ], + out: np.array = None + if func.vectorized: + # Vectorized function execution + + ## IMPL 1 + ## Results in a high memory peak as a new np.array is created (and thus no + ## view is being used) + # out = np.asarray( + # func( + # *[ + # np.array([ + # sc.values[sc.start_indexes[idx]: sc.end_indexes[idx]] + # for idx in range(len(self.index)) + # ]) + # for sc in self.series_containers + # ], + # ) + # ) + + ## IMPL 2 + ## Is a good implementation (equivalent to the one below), will also fail in + ## the same cases, but it does not perform clear assertions (with their + ## accompanied clear messages). + # out = np.asarray( + # func( + # *[ + # _sliding_strided_window_1d(sc.values, self.window, self.stride) + # for sc in self.series_containers + # ], + # ) + # ) + + views = [] + for sc in self.series_containers: + windows = sc.end_indexes - sc.start_indexes + strides = sc.start_indexes[1:] - sc.start_indexes[:-1] + assert np.all( + windows == windows[0] + ), "Vectorized functions require same number of samples in each segmented window!" + assert np.all( + strides == strides[0] + ), "Vectorized functions require same number of samples as stride!" + views.append( + _sliding_strided_window_1d(sc.values, windows[0], strides[0]) + ) + out = func(*views) + + out_type = type(out) + out = np.asarray(out) + # When multiple outputs are returned (= tuple) they should be transposed + # when combining into an array + out = out.T if out_type is tuple else out + + else: + # Sequential function execution (default) + out = np.array( + list( + map( + func, + *[ + [ + sc.values[sc.start_indexes[idx] : sc.end_indexes[idx]] + for idx in range(len(self.index)) + ] + for sc in self.series_containers + ], + ) ) ) - ) + + # Check if the function output is valid. + # This assertion will be raised when e.g. np.max is applied vectorized without + # specifying axis=1. + assert out.ndim > 0, "Vectorized function returned only 1 (non-array) value!" # Aggregate function output in a dictionary feat_out = {} @@ -489,3 +549,48 @@ def _construct_start_end_times(self) -> Tuple[np.ndarray, np.ndarray]: ) np_end_times = np_start_times + self.window return np_start_times, np_end_times + + +def _sliding_strided_window_1d(data: np.ndarray, window: int, step: int): + """View based sliding strided-window for 1-dimensional data. + + Parameters + ---------- + data: np.array + The 1-dimensional series to slide over. + window: int + The window size, in number of samples. + step: int + The step size (i.e., the stride), in number of samples. + + Returns + ------- + nd.array + A view of the sliding strided window of the data. + + """ + # window and step in samples + assert data.ndim == 1, "data must be 1 dimensional" + + if isinstance(window, float): + assert window.is_integer(), "window must be an int!" + window = int(window) + if isinstance(step, float): + assert step.is_integer(), "step must be an int!" + step = int(step) + + assert (step >= 1) & (window < len(data)) + + shape = [ + np.ceil(len(data) / step - window / step).astype(int), + window, + ] + + strides = [ + data.strides[0] * step, + data.strides[0], + ] + + return np.lib.stride_tricks.as_strided( + data, shape=shape, strides=strides#, writeable=False + ) diff --git a/tsflex/features/utils.py b/tsflex/features/utils.py index a762dc5a..b08cfe94 100644 --- a/tsflex/features/utils.py +++ b/tsflex/features/utils.py @@ -75,6 +75,35 @@ def _get_name(func: Callable) -> str: except: return type(func).__name__ +def _get_funcwrapper_func_and_kwargs(func: FuncWrapper) -> Tuple[Callable, dict]: + """Extract the function and keyword arguments from the given FuncWrapper. + + Parameters + ---------- + func: FuncWrapper + The FuncWrapper to extract the function and kwargs from. + + Returns + ------- + Tuple[Callable, dict] + Tuple of 1st the function of the FuncWrapper (is a Callable) and 2nd the keyword + arguments of the FuncWrapper. + + """ + assert isinstance(func, FuncWrapper) + + # Extract the function (is a Callable) + function = func.func + + # Extract the keyword arguments + func_wrapper_kwargs = {} + func_wrapper_kwargs["output_names"] = func.output_names + func_wrapper_kwargs["input_type"] = func.input_type + func_wrapper_kwargs["vectorized"] = func.vectorized + func_wrapper_kwargs.update(func.kwargs) + + return function, func_wrapper_kwargs + def _make_single_func_robust( func: Union[Callable, FuncWrapper], @@ -105,14 +134,10 @@ def _make_single_func_robust( """ assert isinstance(func, FuncWrapper) or isinstance(func, Callable) - # Extract the keyword arguments from the function wrapper func_wrapper_kwargs = {} if isinstance(func, FuncWrapper): - _func = func - func = _func.func - func_wrapper_kwargs["output_names"] = _func.output_names - func_wrapper_kwargs["input_type"] = _func.input_type - func_wrapper_kwargs.update(_func.kwargs) + # Extract the function and keyword arguments from the function wrapper + func, func_wrapper_kwargs = _get_funcwrapper_func_and_kwargs(func) output_names = func_wrapper_kwargs.get("output_names")