importutilsfromsklearn.baseimportBaseEstimator#from sklearn.utils.estimator_checks import check_estimatorfromsklearn.utils.validationimportcheck_X_y,check_array,check_is_fittedfromsklearn.metricsimport(mean_absolute_percentage_errorasmape,# mean_squared_error as mse,root_mean_squared_errorasrmse,mean_absolute_errorasmae,r2_scoreasr2)importnumpyasnpimportpandasaspdfromscipyimportoptimizeaso,statsfrominspectimportgetfullargspec,getsource# import copy
classCustomRegression(BaseEstimator):""" All variables inside the Class should end with underscore """def__init__(self,model,method="Nelder-Mead",cost=None,alpha=0,l1_ratio=0.5,maxiter=500,maxfev=1_000):self.model=modelself.method=methodself.cost=costifcostisnotNoneelseself.mseself.alpha=alphaself.l1_ratio=l1_ratioself.maxiter=maxiterself.maxfev=maxfevdef__str__(self):returnstr(self.model_)def__repr__(self):returnstr(self)defmse(self,pred,true,sample_weight):error=pred-truecost=error**2cost=(np.mean(# median is robust to outliers than meansample_weight*cost))returncostdefl1(self,params):returnnp.sum(np.abs(params-self.model_.param_initial_guess))defl2(self,params):returnnp.sum((params-self.model_.param_initial_guess)**2)defl3(self,params,l1_ratio=0.5):return(l1_ratio*self.l1(params)+(1-l1_ratio)*self.l2(params))defregularization_cost(self,params,alpha,penalty_type="l3"):""" Regularization requires standardised parameters """penalty=get_attr(self,penalty_type)(params,self.l1_ratio)return(alpha*penalty)/self.sample_size_defcost_total(self,params,X,y):pred=self.model_.equation(X,*params)cost=self.cost(y,pred,self.sample_weight)ifself.alpha==0:passelse:cost+=self.regularization_cost(params,self.alpha)returncostdefcheck_enough_samples(self):returnself.enough_samples_deffit(self,X,y,sample_weight=None):check_X_y(X,y)# Using self.X,self.y = check_X_y(self.X,self.y) removes column namesself.X,self.y=X,yself.n_features_in_=self.X.shape[1]self.sample_size_=self.X.shape[0]self.sample_weight=(sample_weightifsample_weightisnotNoneelsenp.full(self.sample_size_,1)# set Sample_Weight as 1 by default)self.sample_size_=self.sample_weight[self.sample_weight>0].shape[0]self.model_=copy.deepcopy(self.model)self.dof_=self.sample_size_-self.model_.k# - 1 # n-k-1ifself.dof_<=0:self.success_=Falseself.enough_samples_=False# raise Exception("Not enough samples")returnselfelse:self.enough_samples_=Trueparams_all=getfullargspec(self.model_.equation).argsparams=[paramforparaminparams_allifparamnotin['self',"x"]]self.optimization_=o.minimize(self.cost_total,x0=self.model_.param_initial_guess,args=(self.X,self.y),method=self.method,# "L-BFGS-B", "Nelder-Mead", "SLSQP",constraints=self.model_.constraints,bounds=self.model_.param_bounds,# [# (-1, None) for param in params # variables must be positive# ]options=dict(maxiter=self.maxiter,maxfev=self.maxfev,# xatol=1e-4,# fatol=1e-4,# adaptive=False,))self.success_=self.optimization_.successself.fitted_coeff_=(self.optimization_.x)self.fitted_coeff_formatted_=[f"""{{{utils.round_f(popt,4)}}}"""forpoptinself.fitted_coeff_]self.model_.set_fitted_coeff(*self.fitted_coeff_)self.model_.set_fitted_coeff_formatted(*self.fitted_coeff_formatted_)self.rmse_response=root_mean_squared_error(y,self.output(X),)self.rmse_link=root_mean_squared_error(y,self.output(X),)returnselfdefoutput(self,X):returnnp.array(self.model_.equation(X,*self.fitted_coeff_))defget_pred_se(self,X):returnself.rmse_responsedefpredict(self,X):check_is_fitted(self,"fitted_coeff_")# Check to verify if .fit() has been calledcheck_array(X)#X = check_array(X) # removes column namespred=(self.output(X).astype(np.float32))returnpreddefpredict_quantile(self,X,q):returnself.model_.quantile(X,self.X,self.y,link_distribution_dof=self.dof_,link_distribution_q=q)classCustomRegressionGrouped(CustomRegression):def__str__(self):x=""forkey,valueinself.get_params().items():x+=f"{str(key)}: {str([utils.round_f(v,4)forvinlist(value)])}\n\n"returnstr(x)def__init__(self,model,cost,method="Nelder-Mead",group=None):super().__init__(model=model,cost=cost,method=method)self.group=groupself.model=modelself.method=methodself.cost=costself.optimization_=dict()self.model_=dict()self.enough_samples_=dict()self.dof_=0self.sample_size_=0defget_params(self,how="dict"):params=dict()forkey,valueinself.model_.items():popt="fitted_coeff_"ifpoptindir(value):params[key]=getattr(value,popt)else:params[key]=Noneifhow=="df":params=pd.DataFrame(params).Treturnparamsdefmodel_group(self,X,y,model,method,error,sample_weight):return(CustomRegression(model=self.model,cost=self.cost,method=self.method,).fit(X,y,sample_weight))defcheck_enough_samples(self,how="all"):ifhow=="all":enough_samples=Trueforeinself.enough_samples_.values():ifnote:enough_samples=Falseelifhow=="any":enough_samples=self.enough_samples_else:passreturnenough_samplesdefapply_model_multiple_group(self,X,y,group,model,method,cost,sample_weight):forginX[self.group].unique():mask=X.eval(f"{self.group} == {g}")m=self.model_group(X[mask],y[mask],model,method,cost,sample_weight[mask]ifsample_weightisnotNoneelsesample_weight)ifm.success_:self.model_[g]=mself.enough_samples_[g]=m.enough_samples_self.optimization_[g]=m.optimization_self.sample_size_+=m.sample_size_success=Trueforoinself.optimization_.values():ifnoto.success:success=Falseself.success_=successdeffit(self,X,y,sample_weight=None):self.model_=dict()self.apply_model_multiple_group(X,y,self.group,self.model,self.method,self.cost,sample_weight)defpredict(self,X):Xs=[]preds=pd.DataFrame()forginX[self.group].unique():ifgnotinself.model_.keys():returnelse:Xg=X.query(f"{self.group} == {g}")index=Xg.indexpred=self.model_[g].predict(X=Xg.copy().reset_index(drop=True),)preds=pd.concat([preds,pd.Series(pred,index=index)])returnpreds.sort_index()defpredict_quantile(self,X,q):Xs=[]preds=pd.DataFrame()forginX[self.group].unique():ifgnotinself.model_.keys():returnException(f"Model not trained for {g}")else:Xg=X.query(f"{self.group} == {g}")index=Xg.indexpred=self.model_[g].predict_quantile(X=Xg.copy().reset_index(drop=True),q=q)preds=pd.concat([preds,pd.Series(pred,index=index)])returnpreds.sort_index()
curve_fit=CustomRegression(model=model_selected,cost=regression.LogCosh().cost,method=solver)print(curve_fit)## prints latexcurve_fit.fit(X_train,y_train,sample_weight=df_train["Sample_Weight"],)print(curve_fit)## prints latex with coefficent valuespred=curve_fit.predict(X_test)ll=curve_fit.predict_quantile(X_test,q=0.025)ul=curve_fit.predict_quantile(X_test,q=0.975)
classHoltWinters(BaseEstimator):"""Scikit-learn like interface for Holt-Winters method."""def__init__(self,season_len=24,alpha=0.5,beta=0.5,gamma=0.5):self.beta=betaself.alpha=alphaself.gamma=gammaself.season_len=season_lendeffit(self,series):# note that unlike scikit-learn's fit method, it doesn't learn# the optimal model paramters, alpha, beta, gamma instead it takes# whatever the value the user specified the produces the predicted time# series, this of course can be changed.beta=self.betaalpha=self.alphagamma=self.gammaseason_len=self.season_lenseasonals=self._initial_seasonal(series)# initial valuespredictions=[]smooth=series[0]trend=self._initial_trend(series)predictions.append(smooth)foriinrange(1,len(series)):value=series[i]previous_smooth=smoothseasonal=seasonals[i%season_len]smooth=alpha*(value-seasonal)+(1-alpha)*(previous_smooth+trend)trend=beta*(smooth-previous_smooth)+(1-beta)*trendseasonals[i%season_len]=gamma*(value-smooth)+(1-gamma)*seasonalpredictions.append(smooth+trend+seasonals[i%season_len])self.trend_=trendself.smooth_=smoothself.seasonals_=seasonalsself.predictions_=predictionsreturnselfdef_initial_trend(self,series):season_len=self.season_lentotal=0.0foriinrange(season_len):total+=(series[i+season_len]-series[i])/season_lentrend=total/season_lenreturntrenddef_initial_seasonal(self,series):season_len=self.season_lenn_seasons=len(series)//season_lenseason_averages=np.zeros(n_seasons)forjinrange(n_seasons):start_index=season_len*jend_index=start_index+season_lenseason_average=np.sum(series[start_index:end_index])/season_lenseason_averages[j]=season_averageseasonals=np.zeros(season_len)seasons=np.arange(n_seasons)index=seasons*season_lenforiinrange(season_len):seasonal=np.sum(series[index+i]-season_averages)/n_seasonsseasonals[i]=seasonalreturnseasonalsdefpredict(self,n_preds=10):""" Parameters ---------- n_preds: int, default 10 Predictions horizon. e.g. If the original input time series to the .fit method has a length of 50, then specifying n_preds = 10, will generate predictions for the next 10 steps. Resulting in a prediction length of 60. """predictions=self.predictions_original_series_len=len(predictions)foriinrange(original_series_len,original_series_len+n_preds):m=i-original_series_len+1prediction=self.smooth_+m*self.trend_+self.seasonals_[i%self.season_len]predictions.append(prediction)returnpredictions
deftimeseries_cv_score(params,series,loss_function,season_len=24,n_splits=3):""" Iterating over folds, train model on each fold's training set, forecast and calculate error on each fold's test set. """errors=[]alpha,beta,gamma=paramstime_series_split=TimeSeriesSplit(n_splits=n_splits)fortrain,testintime_series_split.split(series):model=HoltWinters(season_len,alpha,beta,gamma)model.fit(series[train])# evaluate the prediction on the test set onlypredictions=model.predict(n_preds=len(test))test_predictions=predictions[-len(test):]test_actual=series[test]error=loss_function(test_actual,test_predictions)errors.append(error)returnnp.mean(errors)
fromsklearn.baseimportBaseEstimator,ClassifierMixinfromsklearn.utils.validationimportcheck_arrayfromscipy.specialimportsoftmaximportnumpyasnpdef_log_odds_ratio_scale(X):X=np.clip(X,1e-8,1-1e-8)# numerical stabilityX=np.log(X/(1-X))# transform to log-odds-ratio spacereturnXclassFuzzyTargetClassifier(ClassifierMixin,BaseEstimator):def__init__(self,regressor):''' Fits regressor in the log odds ratio space (inverse crossentropy) of target variable. during transform, rescales back to probability space with softmax function Parameters --------- regressor: Sklearn Regressor base regressor to fit log odds ratio space. Any valid sklearn regressor can be used here. '''self.regressor=regressorreturndeffit(self,X,y=None,**kwargs):#ensure passed y is onehotencoded-likey=check_array(y,accept_sparse=True,dtype='numeric',ensure_min_features=1)self.regressors_=[clone(self.regressor)for_inrange(y.shape[1])]foriinrange(y.shape[1]):self._fit_single_regressor(self.regressors_[i],X,y[:,i],**kwargs)returnselfdef_fit_single_regressor(self,regressor,X,ysub,**kwargs):ysub=_log_odds_ratio_scale(ysub)regressor.fit(X,ysub,**kwargs)returnregressordefdecision_function(self,X):all_results=[]forreginself.regressors_:results=reg.predict(X)ifresults.ndim<2:results=results.reshape(-1,1)all_results.append(results)results=np.hstack(all_results)returnresultsdefpredict_proba(self,X):results=self.decision_function(X)results=softmax(results,axis=1)returnresultsdefpredict(self,X):results=self.decision_function(X)results=results.argmax(1)returnresults
classTreeBasedProximity():# BaseEstimator, TransformerMixin""" Create Proximity matrix normalization_type = column_wise : Normalize columns to sum to 1 normalization_type = n_trees : pm / n_trees """def__init__(self,estimator,**init_params):self.estimator=estimatorself.supported_types={"RandomForest":"a","XGBoost":"a","GradientBoosting":"b"}form,tinself.supported_types.items():ifminself.estimator.__class__.__name__:self.estimator_type=tbreakelse:returnException("Unsupported estimator")self.n_trees=len(self.estimator.estimators_)deffit(self,X,y=None,**fit_params):# Get leaf_indices indices with shape = (x.shape[0], n_trees)ifself.estimator_type=="a":leaf_indices=self.estimator.apply(X)elifself.estimator_type=="b":leaf_indices=np.array([tree[0].apply(X)fortreeinself.estimator.estimators_]).Tself.pm_=((leaf_indices[:,None,:]==leaf_indices[None,:,:]).sum(axis=-1))#np.fill_diagonal(self.pm_, 0)returnselfdefnormalize_(self,pm,normalization="n_trees",):ifnormalization=="n_trees":divisor=self.n_treeselifnormalization=="col_wise":divisor=pm.sum(axis=0,keepdims=True)else:returnException("Invalid normalization type")returnpm/divisordeftransform(self,X=None,y=None,metric="similarity",normalization="n_trees",):pm=self.normalize_(self.pm_,normalization)np.fill_diagonal(pm,1)ifmetric=="distance":pm=1-pmreturnpm
model=RandomForestClassifier(# or Regressorn_estimators=100,max_depth=5,# make sure not too deepn_jobs=-1,)model.fit(X,y)rfp=TreeBasedProximity(model)# randomforest modelrfp.fit(X)rfp.transform(metric="similarity",normalization="n_trees")
# Install the necessary libraries!pipinstall-qmatplotlibpandasscipyscikit-learnnumpydcorimportmatplotlib.pylabaspltimportnumpyasnpimportpandasaspdfromscipy.spatial.distanceimportsquareformfromscipy.cluster.hierarchyimportdendrogram,linkagefromsklearn.feature_selectionimportmutual_info_regressionimportdcordefplot_dendrogram(correlation_matrix,ax,title='Dendrogram',min_distance=0.0,method='ward'):""" Plot a dendrogram based on a correlation matrix using hierarchical clustering. Parameters: correlation_matrix (pd.DataFrame): Correlation matrix with numerical values. ax (matplotlib.axes.Axes): Axis to plot the dendrogram on. title (str): Title for the dendrogram plot. Default is 'Dendrogram'. min_distance (float): Minimum distance threshold to adjust the linkage distances. method (str): Linkage method used for clustering. Default is 'ward'. Returns: None """# Convert correlation to distance (1 - absolute correlation)distance_matrix=1-correlation_matrix.abs()# Set diagonal elements to 0 (distance to self is 0)np.fill_diagonal(distance_matrix.values,0)# Convert the distance matrix to condensed formcondensed_distances=squareform(distance_matrix)# Perform hierarchical clusteringlinkage_matrix=linkage(condensed_distances,method=method)# Apply minimum distance thresholdlinkage_matrix[:,2]=np.maximum(linkage_matrix[:,2],min_distance)# Plot the dendrogram on the provided axisdendro=dendrogram(linkage_matrix,labels=distance_matrix.columns,leaf_font_size=6,ax=ax)# Annotate the dendrogram with distance valuesfori,dinzip(dendro['icoord'],dendro['dcoord']):ax.text((i[1]+i[2])/2,d[1],f'{d[1]:.2f}',va='center',ha='right',fontsize=8)# Set plot title and labelsax.set_title(title)ax.set_xlabel("Features")ax.set_ylabel("Distance")defdistance_correlation_matrix(data):""" Calculate the distance correlation between all pairs of columns in a pandas DataFrame. Distance correlation is a measure of statistical dependence between two features. It can detect both linear and non-linear relationships. Parameters: data (pd.DataFrame): Input data with numerical columns. Each column represents a feature. Returns: pd.DataFrame: A symmetric matrix of distance correlation scores for each pair of columns. The diagonal elements represent the distance correlation of a column with itself. """# Ensure input consists of numeric columns onlydata=data.select_dtypes(include=[np.number])n=data.shape[1]# Number of features (columns)columns=data.columns# Column namesdc_matrix=np.zeros((n,n))# Initialize the distance correlation matrix# Compute distance correlation for each pair of features (i, j)foriinrange(n):forjinrange(i,n):# Compute only the upper triangular part of the matrixdc=dcor.distance_correlation(data.iloc[:,i],data.iloc[:,j])# Fill the symmetric matrix with the computed distance correlationdc_matrix[i,j]=dcdc_matrix[j,i]=dc# Matrix is symmetric# Convert the matrix to a pandas DataFrame for readabilitydc_df=pd.DataFrame(dc_matrix,index=columns,columns=columns)returndc_dfdefmutual_information_matrix(data):""" Calculate the normalized mutual information between all pairs of columns in a pandas DataFrame. Mutual information measures the dependency between two features and can detect both linear and non-linear monotonic relationships. The values are normalized to a range between 0 and 1. Parameters: data (pd.DataFrame): Input data with numerical columns. Each column represents a feature. Returns: pd.DataFrame: A symmetric matrix of normalized mutual information scores (range: 0 to 1). The diagonal elements represent the mutual information of a column with itself. """# Ensure input consists of numeric columns onlydata=data.select_dtypes(include=[np.number])n=data.shape[1]# Number of features (columns)columns=data.columns# Column namesmi_matrix=np.zeros((n,n))# Initialize the mutual information matrix# Compute mutual information for each pair of features (i, j)foriinrange(n):forjinrange(i,n):# Compute only the upper triangular part of the matrixx=data.iloc[:,i].values.reshape(-1,1)y=data.iloc[:,j].valuesifi==j:# Self-mutual information (highest value for a single feature)mi=mutual_info_regression(x,x.ravel())[0]else:# Compute mutual information between different featuresmi=mutual_info_regression(x,y)[0]# Fill the symmetric matrix with the computed mutual informationmi_matrix[i,j]=mimi_matrix[j,i]=mi# Matrix is symmetric# Convert the matrix to a pandas DataFrame for readabilitymi_df=pd.DataFrame(mi_matrix,index=columns,columns=columns)# Normalize the matrix so the diagonal values are 1 (self-correlation)mi_df/=mi_df.max().max()returnmi_df
importnumpyasnpdeflinear_regression_with_gradient_regularization(X,y,lambda_ridge,lambda_second_gradient):""" Perform linear regression with gradient regularization. Parameters: X (np.array): Design matrix of shape (n_samples, n_features) y (np.array): Target vector of shape (n_samples,) lambda_ridge (float): Regularization parameter lambda_second_gradient (float): Regularization parameter Returns: beta (np.array): Coefficient vector loss (float): Total loss including regularization """# Compute X^T XXTX=X.T@X# Compute (X^T X)^2XTX_squared=XTX@XTX# Compute the regularized matrixreg_ridge=lambda_ridge*np.eye(n_features)reg_second_gradient=4*lambda_second_gradient*XTX_squaredregularized_matrix=XTX+reg_ridge+reg_second_gradient# Compute X^T yXTy=X.T@y# Compute the closed-form solutionbeta=np.linalg.solve(regularized_matrix,XTy)# Compute the lossresiduals=y-X@betamse=np.mean(residuals**2)reg_term=lambda_ridge*np.sum(beta**2)+lambda_second_gradient*np.sum(XTX**2)total_loss=mse+reg_termreturnbeta,total_loss# Example usagenp.random.seed(42)n_samples,n_features=100,5X=np.random.randn(n_samples,n_features)true_beta=np.array([1,2,3,4,5])y=X@true_beta+np.random.randn(n_samples)*0.1beta_hat,loss=linear_regression_with_gradient_regularization(X,y,lambda_ridge=1,lambda_second_gradient=1e-4)print("Estimated coefficients:",beta_hat)print("Total loss:",loss)# Compare with OLSbeta_ols=np.linalg.solve(X.T@X,X.T@y)print("\nOLS coefficients:",beta_ols)
defnlpredict(X,y,model,loss,popt,xnew,alpha=0.05,ub=1e-5,ef=1.05):"""Prediction error for a nonlinear fit. Parameters ---------- model : model function with signature model(x, ...) loss : loss function the model was fitted with loss(...) popt : the optimized paramters xnew : x-values to predict at alpha : confidence level, 95% = 0.05 ub : upper bound for smallest allowed Hessian eigenvalue ef : eigenvalue factor for scaling Hessian This function uses numdifftools for the Hessian and Jacobian. Returns ------- y, yint, se y : predicted values yint : prediction interval at alpha confidence interval se : standard error of prediction """ypred=model(xnew,*popt)hessp=nd.Hessian(lambdap:loss(*p))(popt)# for making the Hessian better conditioned.eps=max(ub,ef*np.linalg.eigvals(hessp).min())sse=loss(*popt)n=len(y)mse=sse/nI_fisher=np.linalg.pinv(hessp+np.eye(len(popt))*eps)gprime=nd.Jacobian(lambdap:model(xnew,*p))(popt)temp=np.diag(gprime@I_fisher@gprime.T)ifinterval_type=="confidence":passelifinterval_type=="prediction":# 1 + comes for the prediction interval, not for confidence interval# https://online.stat.psu.edu/stat501/lesson/7/7.2temp+=1sigmas=np.sqrt(mse*(1+temp))tval=t.ppf(1-alpha/2,len(y)-len(popt))return[ypred,np.array([ypred+tval*sigmas,ypred-tval*sigmas,]).T,sigmas,]