Skip to content

Custom

Regression with Custom Loss Function

import utils

from sklearn.base import BaseEstimator
#from sklearn.utils.estimator_checks import check_estimator
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.metrics import (
    mean_absolute_percentage_error as mape,
    # mean_squared_error as mse,
    root_mean_squared_error as rmse,
    mean_absolute_error as mae,
    r2_score as r2
)

import numpy as np
import pandas as pd
from scipy import optimize as o, stats

from inspect import getfullargspec, getsource
# import copy
class CustomRegression(BaseEstimator):
    """
    All variables inside the Class should end with underscore
    """
    def __init__(self, model, method="Nelder-Mead", cost = None, alpha=0, l1_ratio=0.5, maxiter = 500, maxfev = 1_000):
        self.model = model
        self.method = method
        self.cost = cost if cost is not None else self.mse
        self.alpha = alpha
        self.l1_ratio = l1_ratio
        self.maxiter = maxiter
        self.maxfev = maxfev

    def __str__(self):
        return str(self.model_)
    def __repr__(self):
        return str(self)

    def mse(self, pred, true, sample_weight):
        error = pred - true

        cost = error**2

        cost = (
            np.mean( # median is robust to outliers than mean
                sample_weight * cost
            )
        )

        return cost

    def l1(self, params):
        return np.sum(np.abs(params-self.model_.param_initial_guess))
    def l2(self, params):
        return np.sum((params-self.model_.param_initial_guess) ** 2)
    def l3(self, params, l1_ratio=0.5):
        return (
            l1_ratio * self.l1(params) +
            (1 - l1_ratio) * self.l2(params)
        )

    def regularization_cost(self, params, alpha, penalty_type="l3"):
        """
        Regularization requires standardised parameters
        """
        penalty = get_attr(self, penalty_type)(params, self.l1_ratio)

        return (alpha * penalty)/self.sample_size_

    def cost_total(self, params, X, y):
        pred = self.model_.equation(X, *params)
        cost = self.cost(y, pred, self.sample_weight)

        if self.alpha == 0:
            pass
        else:
            cost += self.regularization_cost(params, self.alpha)
        return cost

    def check_enough_samples(self):
        return self.enough_samples_

    def fit(self, X, y, sample_weight=None):
        check_X_y(X, y) # Using self.X,self.y = check_X_y(self.X,self.y) removes column names

        self.X, self.y = X, y

        self.n_features_in_ = self.X.shape[1]

        self.sample_size_ = self.X.shape[0]

        self.sample_weight = (
            sample_weight
            if sample_weight is not None
            else np.full(self.sample_size_, 1) # set Sample_Weight as 1 by default
        )

        self.sample_size_ = self.sample_weight[self.sample_weight > 0].shape[0]

        self.model_ = copy.deepcopy(self.model)

        self.dof_ = self.sample_size_ - self.model_.k # - 1 # n-k-1
        if self.dof_ <= 0:
            self.success_ = False
            self.enough_samples_ = False
            # raise Exception("Not enough samples")
            return self
        else:
            self.enough_samples_ = True

        params_all = getfullargspec(self.model_.equation).args        
        params = [param for param in params_all if param not in ['self', "x"]]

        self.optimization_ = o.minimize(
            self.cost_total,
            x0 = self.model_.param_initial_guess,
            args = (self.X, self.y),
            method = self.method, # "L-BFGS-B", "Nelder-Mead", "SLSQP",
            constraints = self.model_.constraints,
            bounds = self.model_.param_bounds,
            # [
            #     (-1, None) for param in params # variables must be positive
            # ]
            options = dict(
                maxiter = self.maxiter,
                maxfev = self.maxfev,
                # xatol=1e-4,
                # fatol=1e-4,
                # adaptive=False,
            )
        )
        self.success_ = self.optimization_.success

        self.fitted_coeff_ = (
            self.optimization_.x
        )

        self.fitted_coeff_formatted_ = [
            f"""{{ {utils.round_f(popt, 4)} }}""" for popt in self.fitted_coeff_
        ]

        self.model_.set_fitted_coeff(*self.fitted_coeff_)
        self.model_.set_fitted_coeff_formatted(*self.fitted_coeff_formatted_)

        self.rmse_response = root_mean_squared_error(
            y,
            self.output(X),
        )
        self.rmse_link = root_mean_squared_error(
            y,
            self.output(X),
        )

        return self

    def output(self, X):
        return np.array(
            self.model_
            .equation(X, *self.fitted_coeff_)
        )

    def get_pred_se(self, X):
        return self.rmse_response

    def predict(self, X):
        check_is_fitted(self, "fitted_coeff_") # Check to verify if .fit() has been called
        check_array(X) #X = check_array(X) # removes column names

        pred = (
            self.output(X)
            .astype(np.float32)
        )

        return pred

    def predict_quantile(self, X, q):
        return self.model_.quantile(X, self.X, self.y, link_distribution_dof=self.dof_, link_distribution_q=q)


class CustomRegressionGrouped(CustomRegression):
    def __str__(self):
        x = ""
        for key, value in self.get_params().items():
            x += f"{str(key)}: {str([utils.round_f(v, 4) for v in list(value)])} \n\n"

        return str(x)
    def __init__(self, model, cost, method="Nelder-Mead", group=None):
        super().__init__(model=model, cost=cost, method=method)
        self.group = group
        self.model = model
        self.method = method
        self.cost = cost

        self.optimization_ = dict()
        self.model_ = dict()
        self.enough_samples_ = dict()

        self.dof_ = 0
        self.sample_size_ = 0

    def get_params(self, how="dict"):
        params = dict()
        for key, value in self.model_.items():
            popt = "fitted_coeff_"
            if popt in dir(value):
                params[key] = getattr(value, popt)
            else:
                params[key] = None

        if how == "df":
            params = pd.DataFrame(params).T
        return params

    def model_group(self, X, y, model, method, error, sample_weight):
        return (
            CustomRegression(
                model = self.model,
                cost = self.cost,
                method = self.method,
            )
            .fit(
                X,
                y,
                sample_weight
            )
        )

    def check_enough_samples(self, how="all"):
        if how == "all":
            enough_samples = True
            for e in self.enough_samples_.values():
                if not e:
                    enough_samples = False
        elif how == "any":
            enough_samples = self.enough_samples_
        else:
            pass

        return enough_samples

    def apply_model_multiple_group(self, X, y, group, model, method, cost, sample_weight):
        for g in X[self.group].unique():
            mask = X.eval(f"{self.group} == {g}")

            m = self.model_group(
                X[mask],
                y[mask],
                model,
                method,
                cost,
                sample_weight[mask] if sample_weight is not None else sample_weight
            )

            if m.success_:
                self.model_[g] = m
                self.enough_samples_[g] = m.enough_samples_
                self.optimization_[g] = m.optimization_
                self.sample_size_ += m.sample_size_

        success = True
        for o in self.optimization_.values():
            if not o.success:
                success = False

        self.success_ = success

    def fit(self, X, y, sample_weight=None):
        self.model_ = dict()

        self.apply_model_multiple_group(X, y, self.group, self.model, self.method, self.cost, sample_weight)
    def predict(self, X):
        Xs = []
        preds = pd.DataFrame()

        for g in X[self.group].unique():
            if g not in self.model_.keys():
                return
            else:
                Xg = X.query(f"{self.group} == {g}")
                index = Xg.index

                pred = self.model_[g].predict(
                    X = Xg.copy().reset_index(drop=True),
                )

                preds = pd.concat([
                    preds, pd.Series(pred, index=index)
                ])
        return preds.sort_index()

    def predict_quantile(self, X, q):
        Xs = []
        preds = pd.DataFrame()

        for g in X[self.group].unique():
            if g not in self.model_.keys():
                return Exception(f"Model not trained for {g}")
            else:
                Xg = X.query(f"{self.group} == {g}")
                index = Xg.index

                pred = self.model_[g].predict_quantile(
                    X = Xg.copy().reset_index(drop=True),
                    q = q
                )

                preds = pd.concat([
                    preds, pd.Series(pred, index=index)
                ])

        return preds.sort_index()
curve_fit = CustomRegression(
    model = model_selected,
    cost = regression.LogCosh().cost,
    method = solver
)
print(curve_fit) ## prints latex

curve_fit.fit(
    X_train,
    y_train,
    sample_weight=df_train["Sample_Weight"],
)

print(curve_fit) ## prints latex with coefficent values

pred = curve_fit.predict(X_test)
ll = curve_fit.predict_quantile(X_test, q=0.025)
ul = curve_fit.predict_quantile(X_test, q=0.975)

Holt-Winters

class HoltWinters(BaseEstimator):
    """Scikit-learn like interface for Holt-Winters method."""

    def __init__(self, season_len=24, alpha=0.5, beta=0.5, gamma=0.5):
        self.beta = beta
        self.alpha = alpha
        self.gamma = gamma
        self.season_len = season_len

    def fit(self, series):
        # note that unlike scikit-learn's fit method, it doesn't learn
        # the optimal model paramters, alpha, beta, gamma instead it takes
        # whatever the value the user specified the produces the predicted time
        # series, this of course can be changed.
        beta = self.beta
        alpha = self.alpha
        gamma = self.gamma
        season_len = self.season_len
        seasonals = self._initial_seasonal(series)

        # initial values
        predictions = []
        smooth = series[0]
        trend = self._initial_trend(series)
        predictions.append(smooth)

        for i in range(1, len(series)):
            value = series[i]
            previous_smooth = smooth
            seasonal = seasonals[i % season_len]
            smooth = alpha * (value - seasonal) + (1 - alpha) * (previous_smooth + trend)
            trend = beta * (smooth - previous_smooth) + (1 - beta) * trend
            seasonals[i % season_len] = gamma * (value - smooth) + (1 - gamma) * seasonal
            predictions.append(smooth + trend + seasonals[i % season_len])

        self.trend_ = trend
        self.smooth_ = smooth
        self.seasonals_ = seasonals
        self.predictions_ = predictions
        return self

    def _initial_trend(self, series):
        season_len = self.season_len
        total = 0.0
        for i in range(season_len):
            total += (series[i + season_len] - series[i]) / season_len

        trend = total / season_len
        return trend

    def _initial_seasonal(self, series):
        season_len = self.season_len
        n_seasons = len(series) // season_len

        season_averages = np.zeros(n_seasons)
        for j in range(n_seasons):
            start_index = season_len * j
            end_index = start_index + season_len
            season_average = np.sum(series[start_index:end_index]) / season_len
            season_averages[j] = season_average

        seasonals = np.zeros(season_len)
        seasons = np.arange(n_seasons)
        index = seasons * season_len
        for i in range(season_len):
            seasonal = np.sum(series[index + i] - season_averages) / n_seasons
            seasonals[i] = seasonal

        return seasonals

    def predict(self, n_preds=10):
        """
        Parameters
        ----------
        n_preds: int, default 10
            Predictions horizon. e.g. If the original input time series to the .fit
            method has a length of 50, then specifying n_preds = 10, will generate
            predictions for the next 10 steps. Resulting in a prediction length of 60.
        """
        predictions = self.predictions_
        original_series_len = len(predictions)
        for i in range(original_series_len, original_series_len + n_preds):
            m = i - original_series_len + 1
            prediction = self.smooth_ + m * self.trend_ + self.seasonals_[i % self.season_len]
            predictions.append(prediction)

        return predictions
def timeseries_cv_score(params, series, loss_function, season_len=24, n_splits=3):
    """
    Iterating over folds, train model on each fold's training set,
    forecast and calculate error on each fold's test set.
    """
    errors = []    
    alpha, beta, gamma = params
    time_series_split = TimeSeriesSplit(n_splits=n_splits) 

    for train, test in time_series_split.split(series):
        model = HoltWinters(season_len, alpha, beta, gamma)
        model.fit(series[train])

        # evaluate the prediction on the test set only
        predictions = model.predict(n_preds=len(test))
        test_predictions = predictions[-len(test):]
        test_actual = series[test]
        error = loss_function(test_actual, test_predictions)
        errors.append(error)

    return np.mean(errors)
x = [0, 0, 0]
test_size = 20
data = series.values[:-test_size]
opt = minimize(
  timeseries_cv_score,
  x0=x, 
  args=(data, mean_squared_log_error),
  method='TNC',
  bounds=((0, 1), (0, 1), (0, 1))
)
alpha_final, beta_final, gamma_final = opt.x

model = HoltWinters(season_len, alpha_final, beta_final, gamma_final)
model.fit(data)
predictions = model.predict(n_preds=50)

print('original series length: ', len(series))
print('prediction length: ', len(predictions))

Soft Labels/Label Smoothing

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_array
from scipy.special import softmax
import numpy as np

def _log_odds_ratio_scale(X):
    X = np.clip(X, 1e-8, 1 - 1e-8)   # numerical stability
    X = np.log(X / (1 - X))  # transform to log-odds-ratio space
    return X

class FuzzyTargetClassifier(ClassifierMixin, BaseEstimator):

    def __init__(self, regressor):
        '''
        Fits regressor in the log odds ratio space (inverse crossentropy) of target variable.
        during transform, rescales back to probability space with softmax function

        Parameters
        ---------
        regressor: Sklearn Regressor
            base regressor to fit log odds ratio space. Any valid sklearn regressor can be used here.

        '''

        self.regressor = regressor
        return

    def fit(self, X, y=None, **kwargs):
        #ensure passed y is onehotencoded-like
        y = check_array(y, accept_sparse=True, dtype = 'numeric', ensure_min_features=1)
        self.regressors_ = [clone(self.regressor) for _ in range(y.shape[1])]
        for i in range(y.shape[1]):
            self._fit_single_regressor(self.regressors_[i], X, y[:,i], **kwargs)

        return self

    def _fit_single_regressor(self, regressor, X, ysub, **kwargs):
        ysub = _log_odds_ratio_scale(ysub)        
        regressor.fit(X, ysub, **kwargs)
        return regressor    

    def decision_function(self,X):
        all_results = []
        for reg in self.regressors_:
            results = reg.predict(X)
            if results.ndim < 2:
                results = results.reshape(-1,1)
            all_results.append(results)

        results = np.hstack(all_results)                
        return results

    def predict_proba(self, X):
        results = self.decision_function(X)
        results = softmax(results, axis = 1)
        return results

    def predict(self, X):
        results = self.decision_function(X)
        results = results.argmax(1)
        return results

Tree-Based Proximity

Random Forest Proximity

class TreeBasedProximity(): # BaseEstimator, TransformerMixin
  """
  Create Proximity matrix

  normalization_type = column_wise  : Normalize columns to sum to 1
  normalization_type = n_trees      : pm / n_trees
  """

  def __init__(self, estimator, **init_params):
    self.estimator = estimator

    self.supported_types = {
      "RandomForest": "a",
      "XGBoost": "a",
      "GradientBoosting": "b"
    }

    for m, t in self.supported_types.items():
      if m in self.estimator.__class__.__name__:
        self.estimator_type = t
        break
    else:
        return Exception("Unsupported estimator")

    self.n_trees = len(self.estimator.estimators_)

  def fit(self, X, y=None, **fit_params):
    # Get leaf_indices indices with shape = (x.shape[0], n_trees)
    if self.estimator_type == "a":
      leaf_indices = self.estimator.apply(X)
    elif self.estimator_type == "b":
      leaf_indices = np.array([tree[0].apply(X) for tree in self.estimator.estimators_]).T

    self.pm_ = (
        (leaf_indices[:, None, :] == leaf_indices[None, :, :])
        .sum(axis=-1)
    )
    #np.fill_diagonal(self.pm_, 0)
    return self

  def normalize_(self, pm, normalization="n_trees", ):
    if normalization == "n_trees":
        divisor = self.n_trees
    elif normalization == "col_wise":
        divisor = pm.sum(axis=0, keepdims=True)
    else:
        return Exception("Invalid normalization type")
    return pm / divisor

  def transform(self, X=None, y=None, metric="similarity", normalization="n_trees", ):
    pm = self.normalize_(
        self.pm_,
        normalization
    )
    np.fill_diagonal(pm, 1)

    if metric=="distance":
      pm = 1-pm

    return pm
model = RandomForestClassifier( # or Regressor
    n_estimators=100,
    max_depth=5, # make sure not too deep
    n_jobs=-1,
)
model.fit(X, y)

rfp = TreeBasedProximity(model) # randomforest model
rfp.fit(X)
rfp.transform(metric="similarity", normalization="n_trees")

Pairwise Mutual Information Matrix

from joblib import Parallel, delayed

class PairwiseMutualInformation():
  def __init__(self, normalized=True, n_bins=None, sample=None, random_state=None, n_jobs=None, ):
    self.n_bins = n_bins
    self.sample = sample
    self.normalized = normalized
    self.random_state = random_state
    self.n_jobs = n_jobs if n_jobs is not None else 1

  def compute_histogram2d(self, i, j, X, n_bins):
        return np.histogram2d(X[:, i], X[:, j], bins=n_bins)[0]

  def joint_entropies(self, X):
    histograms2d = np.empty((self.n_variables, self.n_variables, self.n_bins, self.n_bins))

    results = (
        Parallel(n_jobs=self.n_jobs)
        (
            delayed(self.compute_histogram2d)
            (i, j, X, self.n_bins)
            for i in range(self.n_variables)
            for j in range(self.n_variables)
        )
    )

    index = 0
    for i in range(self.n_variables):
        for j in range(self.n_variables):
            histograms2d[i, j] = results[index]
            index += 1

    probs = histograms2d / len(X) + 1e-100
    joint_entropies = -(probs * np.log2(probs)).sum((2,3))
    return joint_entropies

  def get_mutual_info_matrix(self, X):
    j_entropies = self.joint_entropies(X)
    entropies = j_entropies.diagonal()
    entropies_tile = np.tile(entropies, (self.n_variables, 1))
    sum_entropies = entropies_tile + entropies_tile.T

    mi_matrix = sum_entropies - j_entropies
    if self.normalized:
        mi_matrix = mi_matrix * 2 / sum_entropies
    return mi_matrix

  def fit(self, X, y=None):
    self.columns_ = X.columns

    if self.sample is not None:
      if type(self.sample) == int:
        X = df.sample(n=self.sample, random_state=self.random_state)
      elif type(self.sample) == float:
        X = df.sample(frac=self.sample, random_state=self.random_state)
      else:
        pass

    X = X.to_numpy()

    self.n_variables = X.shape[-1]
    self.n_samples = X.shape[0]

    if self.n_bins == None:
        self.n_bins = int((self.n_samples/5)**.5)

    self.mi_matrix_ = self.get_mutual_info_matrix(X)
    return self

  def transform(self, X, y=None):
    return pd.DataFrame(self.mi_matrix_, index=self.columns_, columns=self.columns_)

  def fit_transform(self, X, y=None):
    return self.fit(X, y).transform(X, y)
matrix_similarity = PairwiseMutualInformation(normalized=True, n_jobs=-1, sample=0.10, random_state=0).fit_transform(df)

Gradient Regularization

import numpy as np

def linear_regression_with_gradient_regularization(X, y, lambda_ridge, lambda_second_gradient):
    """
    Perform linear regression with gradient regularization.

    Parameters:
    X (np.array): Design matrix of shape (n_samples, n_features)
    y (np.array): Target vector of shape (n_samples,)
    lambda_ridge (float): Regularization parameter
    lambda_second_gradient (float): Regularization parameter

    Returns:
    beta (np.array): Coefficient vector
    loss (float): Total loss including regularization
    """

    # Compute X^T X
    XTX = X.T @ X

    # Compute (X^T X)^2
    XTX_squared = XTX @ XTX

    # Compute the regularized matrix
    reg_ridge = lambda_ridge * np.eye(n_features)
    reg_second_gradient = 4 * lambda_second_gradient * XTX_squared
    regularized_matrix = XTX + reg_ridge + reg_second_gradient

    # Compute X^T y
    XTy = X.T @ y

    # Compute the closed-form solution
    beta = np.linalg.solve(regularized_matrix, XTy)

    # Compute the loss
    residuals = y - X @ beta
    mse = np.mean(residuals**2)
    reg_term = lambda_ridge * np.sum(beta**2) + lambda_second_gradient * np.sum(XTX**2)
    total_loss = mse + reg_term

    return beta, total_loss

# Example usage
np.random.seed(42)
n_samples, n_features = 100, 5
X = np.random.randn(n_samples, n_features)
true_beta = np.array([1, 2, 3, 4, 5])
y = X @ true_beta + np.random.randn(n_samples) * 0.1

beta_hat, loss = linear_regression_with_gradient_regularization(X, y, lambda_ridge=1, lambda_second_gradient=1e-4)

print("Estimated coefficients:", beta_hat)
print("Total loss:", loss)

# Compare with OLS
beta_ols = np.linalg.solve(X.T @ X, X.T @ y)
print("\nOLS coefficients:", beta_ols)

Non-Linear Confidence Intervals

def nlpredict(X, y, model, loss, popt, xnew, alpha=0.05, ub=1e-5, ef=1.05):
    """Prediction error for a nonlinear fit.

    Parameters
    ----------
    model : model function with signature model(x, ...)
    loss : loss function the model was fitted with loss(...)
    popt : the optimized paramters
    xnew : x-values to predict at
    alpha : confidence level, 95% = 0.05
    ub : upper bound for smallest allowed Hessian eigenvalue
    ef : eigenvalue factor for scaling Hessian

    This function uses numdifftools for the Hessian and Jacobian.

    Returns
    -------
    y, yint, se

    y : predicted values
    yint : prediction interval at alpha confidence interval
    se : standard error of prediction
    """
    ypred = model(xnew, *popt)

    hessp = nd.Hessian(lambda p: loss(*p))(popt)
    # for making the Hessian better conditioned.
    eps = max(ub, ef * np.linalg.eigvals(hessp).min())

    sse = loss(*popt)
    n = len(y)
    mse = sse / n
    I_fisher = np.linalg.pinv(hessp + np.eye(len(popt)) * eps)

    gprime = nd.Jacobian(lambda p: model(xnew, *p))(popt)

    temp = np.diag(gprime @ I_fisher @ gprime.T)
    if interval_type == "confidence":
        pass
    elif interval_type == "prediction":
        # 1 + comes for the prediction interval, not for confidence interval
        # https://online.stat.psu.edu/stat501/lesson/7/7.2
        temp += 1

    sigmas = np.sqrt(
        mse *
        (1 + temp)
    )

    tval = t.ppf(1 - alpha / 2, len(y) - len(popt))

    return [
        ypred,
        np.array(
            [
                ypred + tval * sigmas,
                ypred - tval * sigmas,
            ]
        ).T,
        sigmas,
    ]
Last Updated: 2025-03-13 ; Contributors: AhmedThahir, web-flow

Comments