Source code for gaitmod.models.regression_models

from gaitmod.models.base_model import BaseModel
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from gaitmod.utils.utils import load_config

import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from tensorflow.keras.callbacks import Callback,  ModelCheckpoint, TensorBoard, EarlyStopping, ReduceLROnPlateau

import inspect
import numpy as np

[docs] class RegressionModels(BaseModel): def __init__(self, config_path=None, **kwargs): if config_path is None: raise ValueError(f"Config file is required for Regression Models.") super().__init__(config_path) self.feature_scaler = StandardScaler() if self.config['features']['feature_scaler'] else None self.target_scaler = StandardScaler() if self.config['features']['target_scaler'] else None
[docs] class LinearRegressionModel(RegressionModels): def __init__(self, config_path=None, **kwargs): super().__init__(config_path) MODELS = ["linear", "ridge", "lasso"] model_params = self.config['model'].get("parameters", {}) if self.model_type == "linear": self.model = LinearRegression(**model_params) elif self.model_type == "ridge": self.model = Ridge(**model_params) elif self.model_type == "lasso": self.model = Lasso(**model_params) else: raise ValueError(f"Unsupported model type. Choose from {MODELS}.")
[docs] def get_coefficients(self): return self.model.coef_
[docs] def get_intercept(self): return self.model.intercept_
[docs] def fit(self, X_train, y_train): if self.feature_scaler: X_train = self.feature_scaler.fit_transform(X_train) if self.target_scaler: y_train = self.target_scaler.fit_transform(y_train) self.model.fit(X_train, y_train) return self
[docs] def predict(self, X_test): if self.feature_scaler: X_test = self.feature_scaler.transform(X_test) y_pred = self.model.predict(X_test) if self.target_scaler: y_pred = self.target_scaler.inverse_transform(y_pred) return y_pred
[docs] class RegressionLSTMModel(RegressionModels): def __init__(self, config_path=None, **kwargs): super().__init__(config_path)
[docs] def build_model(self, input_shape): model = Sequential() for idx, layer in enumerate(self.config['model']['layers']): if layer['type'] == 'LSTM': if idx == 0: # Ensure input_shape is only passed to the first layer model.add(LSTM( units=layer['units'], activation=layer.get('activation', 'tanh'), recurrent_activation=layer.get('recurrent_activation', 'sigmoid'), return_sequences=layer.get('return_sequences', False), input_shape=input_shape)) else: model.add(LSTM( units=layer['units'], activation=layer.get('activation', 'tanh'), recurrent_activation=layer.get('recurrent_activation', 'sigmoid'), return_sequences=layer.get('return_sequences', False))) elif layer['type'] == 'Dropout': model.add(Dropout(rate=layer['rate'])) elif layer['type'] == 'Dense': units = layer['units'] if isinstance(units, str) and units.startswith("input_shape"): units = eval(units, {}, {"input_shape": input_shape}) model.add(Dense( units=units, activation=layer.get('activation', 'linear' if idx == len(self.config['model']['layers']) - 1 else 'relu'))) # Compilation logic optimizer_config = self.config['model'].get( 'optimizer', {'type': 'Adam', 'learning_rate': 0.001}) if optimizer_config['type'] == 'Adam': optimizer = Adam(learning_rate=optimizer_config['learning_rate']) elif optimizer_config['type'] == 'SGD': optimizer = SGD(learning_rate=optimizer_config['learning_rate']) elif optimizer_config['type'] == 'RMSprop': optimizer = RMSprop(learning_rate=optimizer_config['learning_rate']) else: raise ValueError(f"Unsupported optimizer type: {optimizer_config['type']}") model.compile( loss=self.config['model']['training'].get('loss', 'mean_squared_error'), # Default to MSE optimizer=optimizer, metrics= self.metrics, # Use metrics initialized in BaseModel # self.config['model']['training'].get('metrics', ['mean_squared_error', 'mean_absolute_error']), # Default metrics run_eagerly=True ) return model
[docs] def data_generator(self, X, y, batch_size=32): while True: for i in range(0, len(X), batch_size): # print(f"Batch {i // batch_size}: X shape = {X[i:i + batch_size].shape}, y shape = {y[i:i + batch_size].shape}") yield X[i:i + batch_size], y[i:i + batch_size]
[docs] def fit(self, X_train, y_train, callbacks): #TODO: Optionally scale target data # Flatten for scaling X_train_reshaped = X_train.reshape(-1, X_train.shape[-1]) y_train_reshaped = y_train.reshape(-1, y_train.shape[-1]) # Scale input data X_train_scaled = self.feature_scaler.fit_transform(X_train_reshaped) y_train_scaled = self.target_scaler.fit_transform(y_train_reshaped) X_train_scaled = X_train_scaled.reshape(X_train.shape) y_train_scaled = y_train_scaled.reshape(y_train.shape) # Build the model self.model = self.build_model((y_train_scaled.shape[1], y_train_scaled.shape[2])) # (time_steps, features) # Get the batch size and steps per epoch batch_size = self.config['model']['training']['batch_size'] if len(X_train_scaled) < batch_size: # Set batch_size to the number of samples if it's too large batch_size = len(X_train_scaled) steps_per_epoch = max(1, len(X_train_scaled) // batch_size) print(f"Using batch size: {batch_size}, steps per epoch: {steps_per_epoch}") # Prepare the data generator train_generator = self.data_generator( X_train_scaled, y_train_scaled, batch_size ) # Check if a GPU is available if tf.config.list_physical_devices('GPU'): print("Training on GPU") with tf.device('/device:GPU:0'): self.model.fit( train_generator, epochs=self.config['model']['training']['epochs'], steps_per_epoch=steps_per_epoch, verbose=0, callbacks=callbacks ) else: print("Training on CPU") self.model.fit( train_generator, epochs=self.config['model']['training']['epochs'], steps_per_epoch=steps_per_epoch, verbose=0, callbacks=callbacks )
[docs] def predict(self, X_test): if self.feature_scaler: X_test_reshaped = X_test.reshape(-1, X_test.shape[-1]) X_test_scaled = self.feature_scaler.transform(X_test_reshaped) X_test_scaled = X_test_scaled.reshape(X_test.shape) else: X_test_scaled = X_test # Perform prediction y_pred = self.model.predict(X_test_scaled) if self.target_scaler: y_pred_reshaped = y_pred.reshape(-1, y_pred.shape[-1]) y_pred_original = self.target_scaler.inverse_transform(y_pred_reshaped) y_pred_original = y_pred_original.reshape(y_pred.shape) else: y_pred_original = y_pred return y_pred_original