fromsklearn.pipelineimportmake_pipeline## just `Pipeline` involves naming each stepfromsklearn.linear_modelimportLogisticRegressionfromsklearn.treeimportDecisionTreeClassifierfromsklearn.ensembleimportRandomForestClassifierfromsklearn.model_selectionimporttrain_test_splitfromsklearn.preprocessingimportStandardScalerfromsklearn.decompositionimportPCA
pipeline_lr=make_pipeline(StandardScaler(),LogisticRegression())## more controlled waypipeline_dt=Pipeline([('scaler',StandardScaler()),('classifier',DecisionTreeClassifier())])
pipelines=[pipeline_lr,pipeline_dt,pipeline_randomforest]## Dictionary of pipelines and classifier types for ease of referencepipe_dict={0:'Logistic Regression',1:'Decision Tree'}best_accuracy=0.0best_classifier=0best_pipeline=""
fori,modelinenumerate(pipelines):ifmodel.score(X_test,y_test)>best_accuracy:best_accuracy=model.score(X_test,y_test)best_classifier=ibest_pipeline=modelprint('Classifier with best accuracy:{}'.format(pipe_dict[best_classifier]))
N=len(X)p=len(X.columns)+1## plus one because LinearRegression adds an intercept termX_with_intercept=np.empty(shape=(N,p),dtype=np.float)X_with_intercept[:,0]=1X_with_intercept[:,1:p]=X.valuesbeta_hat=np.linalg.inv(X_with_intercept.T@X_with_intercept)@X_with_intercept.T@y.valuesprint(beta_hat)y_hat=model.predict(X)residuals=y.values-y_hatresidual_sum_of_squares=residuals.T@residualssigma_squared_hat=residual_sum_of_squares[0,0]/(N-p)var_beta_hat=np.linalg.inv(X_with_intercept.T@X_with_intercept)*sigma_squared_hatforp_inrange(p):standard_error=var_beta_hat[p_,p_]**0.5print(f"SE(beta_hat[{p_}]): {standard_error}")
importnumpyasnpimportpandasaspdfromscipyimportstatsfromsklearn.linear_modelimportLinearRegressiondefget_conf_int(X,y,model,alpha=0.05):""" ## alpha = 0.05 for 95% confidence interval; 0.01 for 99%-CI Returns (1-alpha) 2-sided confidence intervals for sklearn.LinearRegression coefficients as a pandas DataFrame """coefs=np.r_[[lr.intercept_],lr.coef_]X_aux=X.copy()X_aux.insert(0,'const',1)dof=-np.diff(X_aux.shape)[0]mse=np.sum((y-model.predict(X))**2)/dofvar_params=np.diag(np.linalg.inv(X_aux.T.dot(X_aux)))t_val=stats.t.isf(alpha/2,dof)gap=t_val*np.sqrt(mse*var_params)returnpd.DataFrame({'lower':coefs-gap,'upper':coefs+gap},index=X_aux.columns)model=LinearRegression().fit(X_train,Y_train)get_conf_int(X_train,y_train,model,alpha=0.05)
## demonstrate data normalization with sklearnfromsklearn.preprocessingimportMinMaxScaler## load datadata=...## create scalerscaler=MinMaxScaler()## fit and transform in one stepnormalized=scaler.fit_transform(data)## inverse transforminverse=scaler.inverse_transform(normalized)
X / V are the training / validation sets. "||" indicates a gap (parameter n_gap: int>0) truncated at the beginning of the validation set, in order to prevent leakage effects.
fromsklearn.baseimportRegressorMixin,TransformerMixinfromsklearn.utils.validationimportcheck_is_fitted,check_array,check_X_yclassHybridBoostingRegressor(RegressorMixin,TransformerMixin):def__init__(self,kwargs):self.names=[]self.models=[]self.features_list=[]self.targets=[]forname,model,features,targetinkwargs:self.names.append(name)self.models.append(model)self.features_list.append(features)iflen(target)==1:self.targets.append(target[0])else:self.targets.append(target)deffilter_X(self,X,features):iflen(features)>0:X=X[features]returnXdefget_y(self,y,y_intermediate,target):iflen(target)==0:y=yelse:y=y_intermediate[target]returnydeffit(self,X,y,y_intermediate=None,**fit_params):ify_intermediateisNoneory_intermediate.shape[0]==0:fortargetinself.targets:iflen(target)>0:returnException(f"y_intermediate not found for {target}")y_res=y.copy()y_intermediate_res=y_intermediate.copy()self.fitted_models_=[]formodel,features,targetinzip(self.models,self.features_list,self.targets):X=self.filter_X(X.copy(),features)y=self.get_y(y_res,y_intermediate_res,target)# Check that X and y have correct shapeX,y=check_X_y(X,y)model.fit(X,y,**fit_params)self.fitted_models_.append(model)iflen(target)==0:pred=model.predict(X)else:pred=model.predict(X)# residualiflen(target)!=0:y_intermediate_res[target]=self.accumulate(y_intermediate_res[target],pred)y_res=self.get_residual(y_res,pred)returnselfdefpredict(self,X):check_is_fitted(self,"fitted_models_")pred=self.pred_initforfitted_model,features,targetinzip(self.fitted_models_,self.features_list,self.targets):X=self.filter_X(X.copy(),features)# Input validationX=check_array(X)pred=self.accumulate(pred,fitted_model.predict(X))returnpredclassAdditiveHybridBoostingRegressor(HybridBoostingRegressor):""" y_hat = f_1(x_1) + f_1(x_2) + ... """def__init__(self,kwargs):super().__init__(kwargs)self.pred_init=0defget_residual(self,y,pred):returny-preddefaccumulate(self,y,pred):returny+predclassMultiplicativeHybridBoostingRegressor(HybridBoostingRegressor):""" y_hat = f_1(x_1) * f_1(x_2) * ... """def__init__(self,kwargs):super().__init__(kwargs)self.pred_init=1defget_residual(self,y,pred):returny/preddefaccumulate(self,y,pred):returny*pred
importnumpyasnpfrommatplotlibimportpyplotaspltfromscipy.cluster.hierarchyimportdendrogramfromsklearn.clusterimportFeatureAgglomerationdefplot_dendrogram(feature_clustering,names=None,title=None,**dendogram_params):# Create linkage matriX and then plot the dendrogram# create the counts of samples under each nodecounts=np.zeros(feature_clustering.children_.shape[0])n_samples=len(feature_clustering.labels_)fori,mergeinenumerate(feature_clustering.children_):current_count=0forchild_idXinmerge:ifchild_idX<n_samples:current_count+=1# leaf nodeelse:current_count+=counts[child_idX-n_samples]counts[i]=current_countlinkage_matrix=np.column_stack([feature_clustering.children_,feature_clustering.distances_,counts]).astype(float)ifnamesisNone:cols=Noneelse:cols=lambdacol_index:names[col_index]dendrogram(linkage_matrix,leaf_label_func=cols,**dendogram_params)iftitleisnotNone:plt.title(title)# plt.xlabel("Number of points in node (or index of point if no parenthesis).")plt.show()
matrix_similarity=mutual_information# df.corr() or df.corr().abs()matrix_distance=np.sqrt(2*(1-matrix_similarity))# matrix_similarity.abs()feature_clustering=FeatureAgglomeration(distance_threshold=0,n_clusters=None,metric="precomputed",linkage="average")feature_clustering.fit(matrix_distance)
# plot the top three levels of the dendrogramplot_dendrogram(feature_clustering,matrix_distance.columns,title="Mutual Information",orientation="right",truncate_mode="level")
importnumpyasnpimportpandasaspdfromsklearn.composeimportTransformedTargetRegressorclassTransformedTargetRegressorBC():def__init__(self,**init_params):self.estimator=TransformedTargetRegressor(**init_params)passdefbias_correction(y,lam,sig):"""Back transform Box-Cox with Bias correction. See https://robjhyndman.com/hyndsight/backtransforming """iflam==0:returnnp.exp(y)*(1+0.5*sig**2)else:res=np.power(lam*y+1.,1/lam)res*=(1+0.5*sig**2*(1-lam)/(lam*y+1.)**2)returnresdeffit(self,X,y,**fit_params):self.estimator_=clone(self.estimator)self.estimator_.fit(X,y)self.lam_=model_trans.transformer_.lambdas_[0]# standard deviation of transformed targetz_train=model_trans.transformer_.transform(y_train[:,np.newaxis]).ravel()z_predict=model_trans.regressor_.predict(X_train)sig=np.sum((z_train-z_predict)**2)self.sig_=np.sqrt(sig/(len(y_train)-len(model_trans.regressor_.coef_)-1))returnselfdefpredict(self,X):returnself.bias_correction(self.estimator_.predict(X),self.lam_,self.sig_)model=TransformedTargetRegressor(regressor=LinearRegression(),transformer=PowerTransformer(method='box-cox',standardize=False))
classIRWLS(BaseEstimator,RegressorMixin):def__init__(self,estimator,get_residual_,max_iter_irwls=10,delta=1e-3,coef_delta_tol=1e-3,loss_delta_tol=1e-3,loss_reduction=None,**init_params):self.estimator=estimator(**init_params)self.get_residual_=get_residual_ifget_residual_isnotNoneelseself.get_residual_l1_self.max_iter_irwls=max_iter_irwlsself.delta=deltaself.coef_delta_tol=coef_delta_tolself.loss_delta_tol=loss_delta_tolself.loss_reduction=loss_reductionifloss_reductionisnotNoneelsenp.medianself.trace=[]defget_residual_l1_(self,y_true,y_pred):returnnp.abs(y_true-y_pred)defget_sample_weight_(self,X,y,estimator):return1/np.maximum(self.delta,# ensures that weight does not explodeself.get_residual_(y,estimator.predict(X)))defget_delta_(self,old,new):returnnp.median(np.abs(new-old))deffit(self,X,y,sample_weight=None,**fit_params):self.estimator_=clone(self.estimator)sample_weight_new=sample_weightifsample_weightisnotNoneelsenp.ones(X.shape[0])foriterationinrange(0,self.max_iter_irwls,1):ifi>0:coef_old=coef_newsample_weight_old=sample_weight_newloss_old=loss_newself.estimator_.fit(X,y,sample_weight_new,**fit_params)coef_new=self.estimator_.coef_sample_weight_new=self.get_sample_weight_(X,y,self.estimator_)loss_new=self.loss_reduction(self.get_residual_(y,self.estimator_.predict(X)))self.trace.append({'iteration':iteration,"loss":loss_new})ifi>0:coef_delta=self.get_delta_(coef_old,coef_new)loss_delta=loss_new-loss_oldif((coef_delta<self.coef_delta_tol)or(loss_delta<self.loss_delta_tol)):breakdefpredict(self,X,y=None):returnself.estimator_.predict(X)irwls=IRWLS(Ridge)irwls.fit(X,y)irwls.predict(X)