Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] On-the-fly data loading of global models #1495

Closed
wants to merge 17 commits into from
1 change: 1 addition & 0 deletions neuralprophet/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,5 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None):
config_regressors=model.config_regressors,
config_missing=model.config_missing,
prediction_frequency=prediction_frequency,
config_train=model.config_train,
)
5 changes: 3 additions & 2 deletions neuralprophet/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@
# Only display the plot if the session is interactive, eg. do not show in github actions since it
# causes an error in the Windows and MacOS environment
if matplotlib.is_interactive():
fig

Check warning on line 1008 in neuralprophet/forecaster.py

View workflow job for this annotation

GitHub Actions / pyright

Expression value is unused (reportUnusedExpression)

self.fitted = True
return metrics_df
Expand Down Expand Up @@ -1774,6 +1774,7 @@
predict_mode=True,
config_missing=self.config_missing,
prediction_frequency=self.prediction_frequency,
config_train=self.config_train,
)
loader = DataLoader(dataset, batch_size=min(4096, len(df)), shuffle=False, drop_last=False)
predicted = {}
Expand Down Expand Up @@ -2684,7 +2685,7 @@
val_dataloaders=val_loader,
**self.config_train.lr_finder_args,
)
# Estimate the optimat learning rate from the loss curve
# Estimate the optimal learning rate from the loss curve
assert lr_finder is not None
_, _, lr_suggestion = utils.smooth_loss_and_suggest(lr_finder.results)
self.model.learning_rate = lr_suggestion
Expand All @@ -2706,7 +2707,7 @@
**self.config_train.lr_finder_args,
)
assert lr_finder is not None
# Estimate the optimat learning rate from the loss curve
# Estimate the optimal learning rate from the loss curve
_, _, lr_suggestion = utils.smooth_loss_and_suggest(lr_finder.results)
self.model.learning_rate = lr_suggestion
start = time.time()
Expand Down
160 changes: 118 additions & 42 deletions neuralprophet/time_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,107 @@
"events",
"regressors",
]
inputs, targets, drop_missing = tabularize_univariate_datetime(df, **kwargs)
self.init_after_tabularized(inputs, targets)
self.filter_samples_after_init(kwargs["prediction_frequency"])
self.drop_nan_after_init(df, kwargs["predict_steps"], drop_missing)
self.kwargs = kwargs

learning_rate = kwargs["config_train"].learning_rate
if (
kwargs["predict_mode"]
or (learning_rate is None)
or kwargs["config_lagged_regressors"]
or kwargs["config_country_holidays"]
or kwargs["config_events"]
or kwargs["prediction_frequency"]
):
inputs, targets = tabularize_univariate_datetime(df, **kwargs)
self.init_after_tabularized(inputs, targets)
self.filter_samples_after_init(kwargs["prediction_frequency"])
self.drop_nan_after_init(df, kwargs["predict_steps"], kwargs["config_missing"].drop_missing)
else:
self.df = df

def __getitem__(self, index):
"""Overrides parent class method to get an item at index.
Parameters
----------
index : int
Sample location in dataset
Returns
-------
OrderedDict
Model inputs, each of len(df) but with varying dimensions
Note
----
Contains the following data:
Model Inputs
* ``time`` (np.array, float), dims: (num_samples, 1)
* ``seasonalities`` (OrderedDict), named seasonalities
each with features (np.array, float) - dims: (num_samples, n_features[name])
* ``lags`` (np.array, float), dims: (num_samples, n_lags)
* ``covariates`` (OrderedDict), named covariates,
each with features (np.array, float) of dims: (num_samples, n_lags)
* ``events`` (OrderedDict), events,
each with features (np.array, float) of dims: (num_samples, n_lags)
* ``regressors`` (OrderedDict), regressors,
each with features (np.array, float) of dims: (num_samples, n_lags)
np.array, float
Targets to be predicted of same length as each of the model inputs, dims: (num_samples, n_forecasts)
"""
# TODO: Drop config_train from self!
learning_rate = self.kwargs["config_train"].learning_rate
if (
self.kwargs["predict_mode"]
or (learning_rate is None)
or self.kwargs["config_lagged_regressors"]
or self.kwargs["config_country_holidays"]
or self.kwargs["config_events"]
or self.kwargs["prediction_frequency"]
):
sample = self.samples[index]
targets = self.targets[index]
meta = self.meta
return sample, targets, meta
else:
start_idx = index

# Lagged Regressors
if self.kwargs["config_lagged_regressors"]:
n_lagged_regressor_list = []
for dict_name, nested_dict in self.kwargs["config_lagged_regressors"].items():
name_of_nested_dict = dict_name
n_lagged_regressor = self.kwargs["config_lagged_regressors"][name_of_nested_dict].n_lags
n_lagged_regressor_list.append(n_lagged_regressor)
max_lag = max(self.kwargs["n_lags"], *n_lagged_regressor_list)
end_idx = start_idx + max_lag + self.kwargs.get("n_forecasts")

Check warning on line 136 in neuralprophet/time_dataset.py

View check run for this annotation

Codecov / codecov/patch

neuralprophet/time_dataset.py#L130-L136

Added lines #L130 - L136 were not covered by tests

else:
end_idx = start_idx + self.kwargs.get("n_lags") + self.kwargs.get("n_forecasts")

df_slice = self.df.iloc[start_idx:end_idx]

# Functions
inputs, targets = tabularize_univariate_datetime(df_slice, **self.kwargs)
self.init_after_tabularized(inputs, targets)
self.filter_samples_after_init(self.kwargs["prediction_frequency"])
self.drop_nan_after_init(self.df, self.kwargs["predict_steps"], self.kwargs["config_missing"].drop_missing)

sample = self.samples[index]
targets = self.targets[index]
meta = self.meta
return sample, targets, meta

def __len__(self):
"""Overrides Parent class method to get data length."""
return self.length

def drop_nan_init(self, drop_missing):
"""Checks if inputs/targets contain any NaN values and drops them, if user opts to.
Parameters
----------
drop_missing : bool
whether to automatically drop missing samples from the data
predict_steps : int
number of steps to predict
"""

def drop_nan_after_init(self, df, predict_steps, drop_missing):
"""Checks if inputs/targets contain any NaN values and drops them, if user opts to.
Expand Down Expand Up @@ -223,42 +320,6 @@
sample.pop("timestamps")
self.length = len(self.samples)

def __getitem__(self, index):
"""Overrides parent class method to get an item at index.
Parameters
----------
index : int
Sample location in dataset
Returns
-------
OrderedDict
Model inputs, each of len(df) but with varying dimensions
Note
----
Contains the following data:
Model Inputs
* ``time`` (np.array, float), dims: (num_samples, 1)
* ``seasonalities`` (OrderedDict), named seasonalities
each with features (np.array, float) - dims: (num_samples, n_features[name])
* ``lags`` (np.array, float), dims: (num_samples, n_lags)
* ``covariates`` (OrderedDict), named covariates,
each with features (np.array, float) of dims: (num_samples, n_lags)
* ``events`` (OrderedDict), events,
each with features (np.array, float) of dims: (num_samples, n_lags)
* ``regressors`` (OrderedDict), regressors,
each with features (np.array, float) of dims: (num_samples, n_lags)
np.array, float
Targets to be predicted of same length as each of the model inputs, dims: (num_samples, n_forecasts)
"""
sample = self.samples[index]
targets = self.targets[index]
meta = self.meta
return sample, targets, meta

def __len__(self):
"""Overrides Parent class method to get data length."""
return self.length


def tabularize_univariate_datetime(
df,
Expand All @@ -272,6 +333,7 @@
config_lagged_regressors: Optional[configure.ConfigLaggedRegressors] = None,
config_regressors: Optional[configure.ConfigFutureRegressors] = None,
config_missing=None,
config_train=None,
prediction_frequency=None,
):
"""Create a tabular dataset from univariate timeseries for supervised forecasting.
Expand Down Expand Up @@ -324,7 +386,21 @@
Targets to be predicted of same length as each of the model inputs, dims: (num_samples, n_forecasts)
"""
max_lags = get_max_num_lags(config_lagged_regressors, n_lags)
n_samples = len(df) - max_lags + 1 - n_forecasts
# n_samples = len(df) - max_lags + 1 - n_forecasts
# TODO
learning_rate = config_train.learning_rate
if (
predict_mode
or (learning_rate is None)
or config_lagged_regressors
or config_country_holidays
or config_events
or prediction_frequency
):
n_samples = len(df) - max_lags + 1 - n_forecasts
else:
n_samples = 1

# data is stored in OrderedDict
inputs = OrderedDict({})

Expand Down Expand Up @@ -481,7 +557,7 @@
tabularized_input_shapes_str += f" {key} {value.shape} \n"
log.debug(f"Tabularized inputs shapes: \n{tabularized_input_shapes_str}")

return inputs, targets, config_missing.drop_missing
return inputs, targets


def fourier_series(dates, period, series_order):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,3 +1705,15 @@ def test_unused_future_regressors():
m.add_future_regressor("price")
m.add_lagged_regressor("cost")
m.fit(df, freq="D")


def test_on_the_fly_sampling():
start_date = "2022-10-16 00:00:00"
end_date = "2022-12-30 00:00:00"
date_range = pd.date_range(start=start_date, end=end_date, freq="D")
y = np.random.randint(0, 20, size=(len(date_range),))
df = pd.DataFrame({"ds": date_range, "y": y})

m = NeuralProphet(epochs=1, learning_rate=0.01)
m.fit(df, freq="H")
_ = m.predict(df)
31 changes: 28 additions & 3 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,21 @@ def test_time_dataset():
n_forecasts = 1
valid_p = 0.2
config_missing = configure.MissingDataHandling()
config_train = configure.Train(
learning_rate=LR,
epochs=EPOCHS,
batch_size=BATCH_SIZE,
loss_func="SmoothL1Loss",
optimizer="AdamW",
)
df_train, df_val = df_utils.split_df(df_in, n_lags, n_forecasts, valid_p)
# create a tabularized dataset from time series
df, _, _ = df_utils.check_dataframe(df_train)
local_data_params, global_data_params = df_utils.init_data_params(df=df, normalize="minmax")
df = df.drop("ID", axis=1)
df = df_utils.normalize(df, global_data_params)
inputs, targets, _ = time_dataset.tabularize_univariate_datetime(
df, n_lags=n_lags, n_forecasts=n_forecasts, config_missing=config_missing
inputs, targets = time_dataset.tabularize_univariate_datetime(
df, n_lags=n_lags, n_forecasts=n_forecasts, config_missing=config_missing, config_train=config_train
)
log.debug(
"tabularized inputs: {}".format(
Expand Down Expand Up @@ -806,6 +813,13 @@ def test_too_many_NaN():
config_missing = configure.MissingDataHandling(
impute_missing=True, impute_linear=5, impute_rolling=5, drop_missing=False
)
config_train = configure.Train(
learning_rate=None,
epochs=EPOCHS,
batch_size=BATCH_SIZE,
loss_func="SmoothL1Loss",
optimizer="AdamW",
)
length = 100
days = pd.date_range(start="2017-01-01", periods=length)
y = np.ones(length)
Expand All @@ -825,7 +839,18 @@ def test_too_many_NaN():
df["ID"] = "__df__"
# Check if ValueError is thrown, if NaN values remain after auto-imputing
with pytest.raises(ValueError):
time_dataset.TimeDataset(df, "name", config_missing=config_missing, predict_steps=1, prediction_frequency=None)
time_dataset.TimeDataset(
df,
"name",
predict_mode=False,
config_missing=config_missing,
config_lagged_regressors=None,
config_country_holidays=None,
config_events=None,
config_train=config_train,
predict_steps=1,
prediction_frequency=None,
)


def test_future_df_with_nan():
Expand Down
Loading