import pickle
import numpy as np
import pandas as pd
[docs]def load_pickle(filepath):
"""
Load a pickle file.
Args:
filepath (str): A string that indicates the path to the pickle file (including the pickle file itself).
Returns:
Object that was stored in filepath.
"""
with open(filepath, 'rb') as f:
file = pickle.load(f)
return file
[docs]def insert_zeros_between(x, j):
"""
Inserts a specified number of zeros between each element in a 1D numpy array.
The first set of zeros are inserted between the first and second elements in x.
No zeros are inserted after the last element in x.
Args:
x (np.ndarray): A 1D numpy array.
j (int): Number of zeros to insert between elements of x.
Returns:
np.ndarray: A 1D numpy array.
"""
assert len(x.shape) == 1
new_x = np.zeros(len(x) + (len(x)-1)*(j))
new_x[::j+1] = x
return new_x
[docs]def circ_conv(signal, ker):
"""
Perform circular convolution. Note that signal and ker must have same shape.
Reference: https://stackoverflow.com/questions/35474078/python-1d-array-circular-convolution
Args:
signal (np.ndarray): A 1D numpy array.
ker (np.ndarray): A 1D numpy array.
Returns:
np.ndarray: A 1D numpy array.
"""
assert len(signal.shape) == len(ker.shape) == 1 # Both signal and ker are 1D numpy arrays.
assert signal.shape[0] == ker.shape[0] # Both signal and ker have the same shape.
return np.real(np.fft.ifft( np.fft.fft(signal)*np.fft.fft(ker)))
[docs]def add_lags(x, n_lags, pandas_output = False):
'''
Creates a DataFrame (or a NumPy array) where each column is a lagged version of the input series.
Parameters
----------
x : array-like
Input sequence of data points.
n_lags : int
Number of lags to include in the output.
pandas_output : bool, optional
If True, the output will be a pandas DataFrame.
If False, the output will be a NumPy array.
Defaults to False.
Returns
-------
output : pandas.DataFrame or numpy.ndarray
DataFrame (or NumPy array) with original series and its lagged versions.
Each column corresponds to a lag (from 0 to n_lags).
The output excludes rows where lagged data is not available due to shifting (NA values).
Example
-------
>>> add_lags([1, 2, 3, 4, 5], 2, True)
0 1 2
2 3 2 1
3 4 3 2
4 5 4 3
'''
x = pd.Series(x)
cols = [x]
for i in range(1, n_lags+1):
cols.append(x.shift(i))
if pandas_output:
return pd.concat(cols, axis=1).dropna()
else:
return pd.concat(cols, axis=1).dropna().to_numpy()
[docs]def make_lag_names(n_inputs, n_lags):
"""
Creates a list of string names for original and lagged inputs.
Parameters
----------
n_inputs : int
Number of original input variables.
n_lags : int
Number of lags for each input variable.
Returns
-------
out : list of str
List of names for the original and lagged input variables. Each original input variable is named as 'Xn',
where n is the input number (1-indexed). Each lagged version of an input variable is named as 'Xn_lag_m',
where n is the input number (1-indexed) and m is the lag number. The lag number for the original (unlagged)
variables is dropped, so they are named just 'Xn'.
Example
-------
>>> make_lag_names(2, 3)
['X1', 'X1_lag_1', 'X1_lag_2', 'X1_lag_3', 'X2', 'X2_lag_1', 'X2_lag_2', 'X2_lag_3']
"""
orig_input_names = ["X" + str(i) for i in range(1, n_inputs+1)]
lag_names = ["lag_" + str(i) for i in range(n_lags+1)]
out = [i + "_" + j for i in orig_input_names for j in lag_names]
out = [i.replace('_lag_0', '') for i in out]
return out
[docs]def make_lag_names_from_list(orig_input_names, n_lags):
"""
Creates a list of string names for original and lagged inputs, based on the original input names provided.
Parameters
----------
orig_input_names : list of str
List of original input variable names.
n_lags : int
Number of lags for each input variable.
Returns
-------
out : list of str
List of names for the original and lagged input variables. Each original input variable name is appended with
'_lag_m', where m is the lag number. The lag number for the original (unlagged) variables is dropped.
Example
-------
>>> make_lag_names_from_list(['temp', 'humidity'], 3)
['temp', 'temp_lag_1', 'temp_lag_2', 'temp_lag_3', 'humidity', 'humidity_lag_1', 'humidity_lag_2', 'humidity_lag_3']
"""
lag_names = ["lag_" + str(i) for i in range(n_lags+1)]
out = [i + "_" + j for i in orig_input_names for j in lag_names]
out = [i.replace('_lag_0', '') for i in out]
return out
[docs]def add_lagged_variables(X, y=None, n_lags=1):
"""
Add lagged variables to a given input dataset X, and optionally adjust the target variable y to match the new structure.
Parameters
----------
X : numpy.ndarray or pandas.DataFrame
Input dataset with shape (n_samples, n_features). Each feature is transformed to include its lags.
y : numpy.ndarray or pandas.Series or pandas.DataFrame, optional
Target variable with shape (n_samples,). If provided, it is adjusted to match the new structure of X.
The first n_lags samples are dropped to match the size of X after adding the lagged variables.
Defaults to None, in which case only X is processed and returned.
n_lags : int, optional
Number of lags to add for each feature in X. Defaults to 1.
Returns
-------
out : numpy.ndarray or pandas.DataFrame
Transformed input dataset with added lagged variables.
If X was a pandas DataFrame, out is also a pandas DataFrame, with column names adjusted to reflect the lags.
If X was a numpy array, out is also a numpy array.
y : numpy.ndarray or pandas.Series, optional
Adjusted target variable. Only returned if y was provided as input.
If y was a pandas Series or DataFrame, the output y is a pandas Series.
If y was a numpy array, the output y is also a numpy array.
Notes
-----
This function requires the add_lags and make_lag_names_from_list functions to work.
Raises
------
AssertionError
If the shape of X is not (n_samples, n_features) with n_samples > n_features.
If y is provided and its adjusted shape does not match the number of samples in the transformed X.
Example
-------
>>> X = pd.DataFrame({'temp': [1, 2, 3, 4], 'humidity': [30, 40, 50, 60]})
>>> y = pd.Series([0, 1, 0, 1])
>>> add_lagged_variables(X, y, 2)
( temp temp_lag_1 temp_lag_2 humidity humidity_lag_1 humidity_lag_2
0 3.0 2.0 1.0 50.0 40.0 30.0
1 4.0 3.0 2.0 60.0 50.0 40.0, 0 0
1 1
Name: y, dtype: int64)
"""
assert len(X.shape) > 1
assert X.shape[1] > 0
assert X.shape[0] > X.shape[1]
# If X is a pandas data frame
pandas_output = isinstance(X, pd.DataFrame)
if pandas_output:
original_X_colnames = list(X)
X = X.to_numpy()
n_inputs = X.shape[1]
out = np.apply_along_axis(add_lags, 0, X, n_lags, False)
out = np.split(out, n_inputs, 2)
out = [i.squeeze() for i in out]
out = np.concatenate(out, axis=1)
# If X was given as a pandas data frame by the user
if pandas_output:
out = pd.DataFrame(out, columns=make_lag_names_from_list(original_X_colnames, n_lags))
if y is not None:
if isinstance(y, pd.Series) or isinstance(y, pd.DataFrame):
y = y.to_numpy()
y = y.squeeze()
assert isinstance(y, np.ndarray)
assert len(y.shape) == 1
y = y[n_lags:]
assert out.shape[0] == y.shape[0]
# If X was given as a pandas data frame by the user
if pandas_output:
y = pd.Series(y, name="y")
return out, y
else:
return out
[docs]def test_size(X, test_frac=0.2):
"""
Determines the size of the test set based on the provided fraction.
Parameters
----------
X : numpy.ndarray
Input dataset with shape (n_samples, n_features).
test_frac : float, optional
Fraction of the total samples to be used for the test set.
Default is 0.2 (20% of total samples).
Returns
-------
test_size : int
Number of samples in the test set.
Raises
------
AssertionError
If `test_frac` is greater or equal to 1, raising a ValueError.
Example
-------
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]])
>>> test_size(X, test_frac=0.3)
1
"""
# Check that the fraction is valid.
assert test_frac < 1, "Invalid split fraction."
# Get test set size
test_size = int(test_frac * X.shape[0])
return test_size
[docs]def val_test_sizes(X, val_frac=0.1, test_frac=0.2):
"""
Determines the size of the validation and test sets based on the provided fractions.
Parameters
----------
X : numpy.ndarray
Input dataset with shape (n_samples, n_features).
val_frac : float, optional
Fraction of the total samples to be used for the validation set.
Default is 0.1 (10% of total samples).
test_frac : float, optional
Fraction of the total samples to be used for the test set.
Default is 0.2 (20% of total samples).
Returns
-------
val_size : int
Number of samples in the validation set.
test_size : int
Number of samples in the test set.
Raises
------
AssertionError
If the sum of `test_frac` and `val_frac` is greater or equal to 1, raising a ValueError.
Example
-------
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]])
>>> val_test_sizes(X, val_frac=0.2, test_frac=0.3)
(1, 1)
"""
# Check that the fractions are valid.
assert test_frac + val_frac < 1, "Invalid split fractions."
# Get validation and test set sizes
val_size = int(val_frac * X.shape[0])
test_size = int(test_frac * X.shape[0])
return val_size, test_size
[docs]def absolute_split_2(X, y, ntest):
"""
Splits the input dataset (X, y) into training and test sets based on an absolute number.
Parameters
----------
X : numpy.ndarray
Input dataset with shape (n_samples, n_features).
y : numpy.ndarray
Target variable with shape (n_samples,).
ntest : int
Number of samples to include in the test set.
Returns
-------
X : numpy.ndarray
Training input dataset.
X_test : numpy.ndarray
Test input dataset.
y : numpy.ndarray
Training target variable.
y_test : numpy.ndarray
Test target variable.
Raises
------
AssertionError
If the input dimensions do not match the requirements.
If ntest is not a positive integer.
If the total number of samples is less than ntest.
If the final training or test set do not match the expected sizes.
Example
-------
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
>>> y = np.array([1, 0, 1, 0])
>>> absolute_split_2(X, y, 1)
(array([[1, 2], [3, 4], [5, 6]]),
array([[7, 8]]),
array([1, 0, 1]),
array([0]))
"""
assert len(X.shape) == 2
assert len(y.shape) == 1
assert ntest > 0
nrows = X.shape[0]
assert nrows > ntest
# Test set
# ---------
X_test = X[(nrows-ntest):, :]
y_test = y[(nrows-ntest):]
assert X_test.shape[0] == ntest
# Training set
# -------------
# Remove test set observations from X and y
X = X[:(nrows-ntest), :]
y = y[:(nrows-ntest)]
assert X.shape[0] == (nrows-ntest)
return X, X_test, y, y_test
[docs]def absolute_split_3(X, y, nval, ntest):
"""
Splits the input dataset (X, y) into training, validation, and test sets based on absolute numbers.
Parameters
----------
X : numpy.ndarray
Input dataset with shape (n_samples, n_features).
y : numpy.ndarray
Target variable with shape (n_samples,).
nval : int
Number of samples to include in the validation set.
ntest : int
Number of samples to include in the test set.
Returns
-------
X : numpy.ndarray
Training input dataset.
X_val : numpy.ndarray
Validation input dataset.
X_test : numpy.ndarray
Test input dataset.
y : numpy.ndarray
Training target variable.
y_val : numpy.ndarray
Validation target variable.
y_test : numpy.ndarray
Test target variable.
Raises
------
AssertionError
If the input dimensions do not match the requirements.
If nval or ntest are not positive integers.
If the total number of samples is less than nval + ntest.
If the final training, validation, or test sets do not match the expected sizes.
Example
-------
>>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])
>>> y = np.array([1, 0, 1, 0, 1])
>>> absolute_split_3(X, y, 2, 1)
(array([[1, 2],
[3, 4]]), array([[5, 6],
[7, 8]]), array([[ 9, 10]]), array([1, 0]), array([1, 0]), array([1]))
"""
assert len(X.shape) == 2
assert len(y.shape) == 1
assert nval > 0
assert ntest > 0
nrows = X.shape[0]
assert nrows > nval + ntest
# Test set
# ---------
X_test = X[(nrows-ntest):, :]
y_test = y[(nrows-ntest):]
assert X_test.shape[0] == ntest
# Validation set
# ---------------
# Remove test set observations from X and y
X = X[:(nrows-ntest), :]
y = y[:(nrows-ntest)]
assert X.shape[0] == (nrows-ntest)
# nrows is now the size of training set + size of validation set
nrows = X.shape[0]
X_val = X[(nrows-nval):, :]
y_val = y[(nrows-nval):]
assert X_val.shape[0] == nval
# Training set
# -------------
X = X[:(nrows-nval), :]
y = y[:(nrows-nval)]
return X, X_val, X_test, y, y_val, y_test
[docs]def prep_forecast_data(X, y, h, auto_regress_y = False):
"""
Prepare an input feature set X and target y for forecasting by specifying the forecast horizon h.
The output of this function is a tuple with input features and target such that each row of input features maps to
a future observation of the target. This setup allows cross validation to be used when evaluating machine learning models.
Args:
X (np.ndarray or pd.DataFrame): A 2D numpy array or pandas data frame.
y (np.ndarray, pd.Series, or pd.DataFrame): A 1D numpy array, pandas series or pandas data frame.
h (int): Forecast horizon.
auto_regress_y (bool): Whether the target should be included as an auto-regressive feature (to exploit autocorrelations present in the target variable).
Returns:
tuple:
if auto_regress = False (the default):
First element is a 2D numpy array with X.shape[1] columns. If X was given as a pandas data frame, the output will be a pandas data frame.
The number of rows will be h less than X.shape[0] of the originally provided X.
Second element is a 1D array corresponding to the target y provided by the user.
If X was given as a pandas data frame, the output will be a pandas series with name "y".
The number of values will be h less than y.shape[0] of the originally provided y.
if auto_regress = True:
First element is a 2D numpy array with X.shape[1]+1 columns.
The first column will contain the auto-regressive target feature (essentially a lagged version of the target).
If X was given as a pandas data frame, the output will be a pandas data frame.
The number of rows will be h less than X.shape[0] of the originally provided X.
Second element is a 1D array corresponding to the target y provided by the user.
If X was given as a pandas data frame, the output will be a pandas series with name "y".
The number of values will be h less than y.shape[0] of the originally provided y.
"""
assert len(X.shape) == 2
# If X is a pandas data frame
pandas_output = isinstance(X, pd.DataFrame)
if pandas_output:
original_X_colnames = list(X)
X = X.to_numpy()
if isinstance(y, pd.Series) or isinstance(y, pd.DataFrame):
y = y.to_numpy()
y = y.squeeze()
assert isinstance(y, np.ndarray)
assert len(y.shape) == 1
assert X.shape[0] == y.shape[0]
# Add h padding rows on top of X
X = np.pad(X, ((h, 0), (0, 0)), mode='constant', constant_values=np.nan)
if auto_regress_y:
y1 = np.pad(y, (h, 0), mode='constant', constant_values=np.nan)
X = np.hstack((y1[:, None], X))
if pandas_output:
if "y_lagged" in original_X_colnames:
raise ValueError('The name y_lagged cannot exist in your input feature data frame X.')
original_X_colnames = ["y_lagged"] + original_X_colnames
# Add h padding elements to the end of y
y = np.pad(y, (0, h), mode='constant', constant_values=np.nan)
yX = np.hstack((y[:, None], X))
yX = yX[~np.isnan(yX).any(axis=1)]
if pandas_output:
return pd.DataFrame(yX[:, 1:], columns = original_X_colnames), pd.Series(yX[:, 0], name="y")
else:
return yX[:, 1:], yX[:, 0]