Source code for datawaza.model

# model.py – Model module of Datawaza
#
# Datawaza  Copyright (C) 2024  Jim Beno
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details:
# https://github.com/jbeno/datawaza/blob/main/LICENSE
"""
This module provides tools to streamline data modeling workflows. It contains
functions to set up pipelines, iterate over models, and evaluate and plot results.

Functions:
    - :func:`~datawaza.model.compare_models` - Find the best classification model and hyper-parameters for a dataset.
    - :func:`~datawaza.model.create_nn_binary` - Create a binary classification neural network model.
    - :func:`~datawaza.model.create_nn_multi` - Create a multi-class classification neural network model.
    - :func:`~datawaza.model.create_pipeline` - Create a custom pipeline for data preprocessing and modeling.
    - :func:`~datawaza.model.create_results_df` - Initialize the results_df DataFrame with the columns required for `iterate_model`.
    - :func:`~datawaza.model.eval_model` - Produce a detailed evaluation report for a classification model.
    - :func:`~datawaza.model.iterate_model` - Iterate and evaluate a model pipeline with specified parameters.
    - :func:`~datawaza.model.plot_acf_residuals` - Plot residuals, histogram, ACF, and PACF of a time series ARIMA model.
    - :func:`~datawaza.model.plot_results` - Plot the results of model iterations and select the best metric.
    - :func:`~datawaza.model.plot_train_history` - Plot the training and validation history of a fitted Keras model.
"""

# Metadata
__author__ = "Jim Beno"
__email__ = "jim@jimbeno.net"
__version__ = "0.1.3"
__license__ = "GNU GPLv3"

# Standard library imports
import os
from datetime import datetime
import time
import math

# Data manipulation and analysis
import numpy as np
import pandas as pd
import pytz

# Visualization libraries
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import seaborn as sns

# Scikit-learn imports
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
from sklearn.ensemble import (AdaBoostClassifier, AdaBoostRegressor, BaggingClassifier, BaggingRegressor,
                              GradientBoostingClassifier, GradientBoostingRegressor, RandomForestClassifier,
                              RandomForestRegressor, VotingRegressor)
from sklearn.feature_selection import RFE, SequentialFeatureSelector
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.linear_model import (Lasso, LinearRegression, LogisticRegression, Ridge)
from sklearn.metrics import (mean_absolute_error, mean_squared_error, confusion_matrix, classification_report,
                             ConfusionMatrixDisplay, RocCurveDisplay, roc_curve, precision_recall_curve, PrecisionRecallDisplay,
                             roc_auc_score, make_scorer, precision_score, recall_score, f1_score, accuracy_score)
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, KFold, cross_val_score, train_test_split
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (FunctionTransformer, MinMaxScaler, OneHotEncoder, OrdinalEncoder,
                                   PolynomialFeatures, RobustScaler, StandardScaler)
from sklearn.svm import SVC, SVR
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor

# XGBoost
from xgboost import XGBClassifier

# Statsmodels
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Imbalanced learn - Package: imbalanced-learn
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

# Miscellaneous imports
from joblib import dump

# Local Datawaza helper function imports
from datawaza.tools import calc_pfi, calc_vif, extract_coef, log_transform, thousands, DebugPrinter, model_summary

# Typing imports
from typing import Optional, Union, Tuple, List, Dict, Any

# TensorFlow and Keras
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Suppress TensorFlow warning on import
import tensorflow as tf
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Input
from keras.regularizers import L2

# Functions
[docs] def compare_models( x: pd.DataFrame, y: pd.Series, models: List[str], config: Dict[str, Any], class_map: Optional[Dict[Any, Any]] = None, pos_label: Optional[Any] = None, test_size: float = 0.25, search_type: str = 'grid', grid_cv: Union[int, str] = 5, plot_perf: bool = False, scorer: str = 'accuracy', random_state: int = 42, decimal: int = 4, verbose: int = 4, title: Optional[str] = None, fig_size: Tuple[int, int] = (12, 6), figmulti: float = 1.5, multi_class: str = 'ovr', average: str = None, legend_loc: str = 'best', model_eval: bool = False, svm_proba: bool = False, threshold: float = 0.5, class_weight: Optional[Dict[Any, float]] = None, stratify: Optional[pd.Series] = None, imputer: Optional[str] = None, impute_first: bool = True, transformers: Optional[List[str]] = None, scaler: Optional[str] = None, selector: Optional[str] = None, cat_columns: Optional[List[str]] = None, num_columns: Optional[List[str]] = None, max_iter: int = 10000, rotation: Optional[int] = None, plot_curve: bool = True, under_sample: Optional[float] = None, over_sample: Optional[float] = None, notes: Optional[str] = None, svm_knn_resample: Optional[float] = None, n_jobs: Optional[int] = None, output: bool = True, timezone: str = 'UTC', debug: bool = False ) -> pd.DataFrame: """ Find the best classification model and hyper-parameters for a dataset by automating the workflow for multiple models and comparing results. This function integrates a number of steps in a typical classification model workflow, and it does this for multiple models, all with one command line: * Auto-detecting single vs. multi-class classification problems * Option to Under-sample or Over-smple imbalanced data, * Option to use a sub-sample of data for SVC or KNN, which can be computation intense * Ability to split the Train/Test data at a specified ratio, * Creation of a multiple-step Pipeline, including Imputation, multiple Column Transformer/Encoding steps, Scaling, Feature selection, and the Model, * Grid Search of hyper-parameters, either full or random, * Calculating performance metrics from the standard Classification Report (Accuracy, Precision, Recall, F1) but also with ROC AUC, and if binary, True Positive Rate, True Negative Rate, False Positive Rate, False Negative Rate, * Evaluating this performance based on a customizable Threshold, * Visually showing performance by plotting (a) a Confusion Matrix, and if binary, (b) a Histogram of Predicted Probabilities, (c) an ROC Curve, and (d) a Precision-Recall Curve. * Save all the results in a DataFrame for reference and comparison, and * Option to plot the results to visually compare performance of the specified metric across multiple model pipelines with their best parameters. To use this function, a configuration should be created that defines the desired model configurations and parameters you want to search. When `compare_models` is run, for each model in the `models` parameter, the `create_pipeline` function will be called to create a pipeline from the specified parameters. Each model iteration will have the same pipeline construction, except for the final model, which will vary. Here are the major pipeline parameters, along with the config sections they map to: * `imputer` (str) is selected from `config['imputers']` * `transformers` (list or str) are selected from `config['transformers']` * `scaler` (str) is selected from `config['scalers']` * `selector` (str) is selected from `config['selectors']` * `models` (list or str) are selected from `config['models']` Here is an example of the configuration dictionary structure. It is based on what `create_pipeline` requires to assemble the pipeline. But it adds some additional configuration parameters referenced by `compare_models`, which are `params` (grid search parameters, required) and `cv` (cross-validation parameters, optional if `grid_cv` is an integer). The configuration dictionary is passed to `compare_models` as the `config` parameter: >>> config = { # doctest: +SKIP ... 'models' : { ... 'logreg': LogisticRegression(max_iter=max_iter, ... random_state=random_state, class_weight=class_weight), ... 'knn_class': KNeighborsClassifier(), ... 'tree_class': DecisionTreeClassifier(random_state=random_state, ... class_weight=class_weight) ... }, ... 'imputers': { ... 'simple_imputer': SimpleImputer() ... }, ... 'transformers': { ... 'ohe': (OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ... ohe_columns) ... }, ... 'scalers': { ... 'stand': StandardScaler() ... }, ... 'selectors': { ... 'sfs_logreg': SequentialFeatureSelector(LogisticRegression( ... max_iter=max_iter, random_state=random_state, ... class_weight=class_weight)) ... }, ... 'params' : { ... 'logreg': { ... 'logreg__C': [0.0001, 0.001, 0.01, 0.1, 1, 10, 100], ... 'logreg__solver': ['newton-cg', 'lbfgs', 'saga'] ... }, ... 'knn_class': { ... 'knn_class__n_neighbors': [3, 5, 10, 15, 20, 25], ... 'knn_class__weights': ['uniform', 'distance'], ... 'knn_class__metric': ['euclidean', 'manhattan'] ... }, ... 'tree_class': { ... 'tree_class__max_depth': [3, 5, 7], ... 'tree_class__min_samples_split': [5, 10, 15], ... 'tree_class__criterion': ['gini', 'entropy'], ... 'tree_class__min_samples_leaf': [2, 4, 6] ... }, ... }, ... 'cv': { ... 'kfold_5': KFold(n_splits=5, shuffle=True, random_state=42) ... }, ... 'no_scale': ['tree_class'], ... 'no_poly': ['knn_class', 'tree_class'] ... } In addition to the configuration file, you will need to define any column lists if you want to target certain transformations to a subset of columns. For example, you might define a 'ohe' transformer for One-Hot Encoding, and reference 'ohe_columns' or 'cat_columns' in its definition in the config. Here is an example of how to call this function in an organized manner: >>> results_df = dw.compare_models( # doctest: +SKIP ... ... # Data split and sampling ... x=X, y=y, test_size=0.25, stratify=None, under_sample=None, ... over_sample=None, svm_knn_resample=None, ... ... # Models and pipeline steps ... imputer=None, transformers=None, scaler='stand', selector=None, ... models=['logreg', 'knn_class', 'svm_proba', 'tree_class', ... 'forest_class', 'xgb_class', 'keras_class'], svm_proba=True, ... ... # Grid search ... search_type='random', scorer='accuracy', grid_cv='kfold_5', verbose=4, ... ... # Model evaluation and charts ... model_eval=True, plot_perf=True, plot_curve=True, fig_size=(12,6), ... legend_loc='lower left', rotation=45, threshold=0.5, ... class_map=class_map, pos_label=1, title='Breast Cancer', ... ... # Config, preferences and notes ... config=my_config, class_weight=None, random_state=42, decimal=4, ... n_jobs=None, debug=False, notes='Test Size=0.25, Threshold=0.50' ... ) Use this function when you want to find the best classification model and hyper-parameters for a dataset, after doing any required pre-processing or cleaning. It is a significant time saver, replacing numerous manual coding steps with one command. Parameters ---------- x : pd.DataFrame The feature matrix. y : pd.Series The target vector. test_size : float, optional (default=0.25) The proportion of the dataset to include in the test split. models : List[str] A list of model names to iterate over. config : Dict[str, Any] A configuration dictionary that defines the pipeline steps, models, grid search parameters, and cross-validation functions. It should have the following keys: 'imputers', 'transformers', 'scalers', 'selectors', 'models', 'params', 'cv', 'no_scale', and 'no_poly'. class_map : Dict[Any, Any], optional (default=None) A dictionary to map class labels to new values. search_type : str, optional (default='grid') The type of hyperparameter search to perform. Can be either 'grid' for GridSearchCV or 'random' for RandomizedSearchCV. grid_cv : Union[int, str], optional (default=5) The number of cross-validation folds for GridSearchCV or RandomizedSearchCV, or a string to select a cross-validation function from config['cv']. Default is 5. plot_perf : bool, optional (default=False) Whether to plot the model performance. scorer : str, optional (default='accuracy') The scorer to use for model evaluation. pos_label : Any, optional (default=None) The positive class label. random_state : int, optional (default=42) The random state for reproducibility. decimal : int, optional (default=4) The number of decimal places to round the results to. verbose : int, optional (default=4) The verbosity level for the search. title : str, optional (default=None) The title for the plots. fig_size : Tuple[int, int], optional (default=(12, 6)) The figure size for the plots. figmulti : float, optional (default=1.5) The multiplier for the figure size in multi-class classification. multi_class : str, optional The method for handling multi-class ROC AUC calculation. Can be 'ovr' (one-vs-rest) or 'ovo' (one-vs-one). Default is 'ovr'. average : str, optional The averaging method for multi-class classification metrics. Can be 'macro', 'micro', 'weighted', or 'samples'. Default is 'macro'. legend_loc : str, optional (default='best') The location of the legend in the plots. model_eval : bool, optional (default=False) Whether to perform a detailed model evaluation. svm_proba : bool, optional (default=False) Whether to enable probability estimates for SVC. threshold : float, optional (default=0.5) The classification threshold for binary classification. class_weight : Dict[Any, float], optional (default=None) The class weights for balancing imbalanced classes. stratify : pd.Series, optional (default=None) The stratification variable for train-test split. imputer : str, optional (default=None) The imputation strategy. impute_first : bool, optional (default=True) Whether to impute before other preprocessing steps. transformers : List[str], optional (default=None) A list of transformers to apply. scaler : str, optional (default=None) The scaling strategy. selector : str, optional (default=None) The feature selection strategy. config : Dict[str, Any], optional (default=None) A configuration dictionary for customizing the pipeline. cat_columns : List[str], optional (default=None) A list of categorical columns in X. num_columns : List[str], optional (default=None) A list of numerical columns in X. max_iter : int, optional (default=10000) The maximum number of iterations for the solvers. rotation : int, optional (default=None) The rotation angle for the x-axis labels in the plots. plot_curve : bool, optional (default=True) Whether to plot the learning curve for KerasClassifier. under_sample : float, optional (default=None) The under-sampling ratio. over_sample : float, optional (default=None) The over-sampling ratio. notes : str, optional (default=None) Additional notes or comments. svm_knn_resample : float, optional (default=None) The resampling ratio for SVC and KNeighborsClassifier. n_jobs : int, optional (default=None) The number of parallel jobs to run. output : bool, optional (default=True) Whether to print the progress and results. timezone : str, optional Timezone to be used for timestamps. Default is 'UTC'. debug : bool, optional Flag to show debugging information. Returns ------- pd.DataFrame A DataFrame containing the performance metrics and other details for each model. Examples -------- Prepare the data for the examples: >>> pd.set_option('display.max_columns', None) # For test consistency >>> pd.set_option('display.width', None) # For test consistency >>> from sklearn.datasets import make_classification >>> X, y = make_classification(n_samples=1000, n_classes=2, n_features=20, ... weights=[0.4, 0.6], random_state=42) >>> X = pd.DataFrame(X, columns=[f'Feature_{i+1}' for i in range(X.shape[1])]) >>> y = pd.Series(y, name='Target') >>> class_map = {0: 'Malignant', 1: 'Benign'} Example 1: Define the configuration for the models: >>> # Set some variables referenced in the config >>> random_state = 42 >>> class_weight = None >>> max_iter = 10000 >>> >>> # Set column lists referenced in the config >>> num_columns = list(X.columns) >>> cat_columns = [] >>> >>> # Create a custom configuration file with 3 models and grid search params >>> my_config = { ... 'models' : { ... 'logreg': LogisticRegression(max_iter=max_iter, ... random_state=random_state, class_weight=class_weight), ... 'knn_class': KNeighborsClassifier(), ... 'tree_class': DecisionTreeClassifier(random_state=random_state, ... class_weight=class_weight), ... 'svm_proba': SVC(random_state=random_state, probability=True, ... class_weight=class_weight), ... }, ... 'imputers': { ... 'simple_imputer': SimpleImputer() ... }, ... 'transformers': { ... 'ohe': (OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ... cat_columns), ... 'poly2': (PolynomialFeatures(degree=2, include_bias=False), num_columns) ... }, ... 'scalers': { ... 'stand': StandardScaler() ... }, ... 'selectors': { ... 'sfs_logreg': SequentialFeatureSelector(LogisticRegression( ... max_iter=max_iter, random_state=random_state, ... class_weight=class_weight)) ... }, ... 'params' : { ... 'logreg': { ... 'logreg__C': [0.0001, 0.001, 0.01, 0.1, 1, 10, 100], ... 'logreg__solver': ['newton-cg', 'lbfgs', 'saga'] ... }, ... 'knn_class': { ... 'knn_class__n_neighbors': [3, 5, 10, 15, 20, 25], ... 'knn_class__weights': ['uniform', 'distance'], ... 'knn_class__metric': ['euclidean', 'manhattan'] ... }, ... 'tree_class': { ... 'tree_class__max_depth': [3, 5, 7], ... 'tree_class__min_samples_split': [5, 10, 15], ... 'tree_class__criterion': ['gini', 'entropy'], ... 'tree_class__min_samples_leaf': [2, 4, 6] ... }, ... 'svm_proba': { ... 'svm_proba__C': [0.01, 0.1, 1, 10, 100], ... 'svm_proba__kernel': ['linear', 'poly'] ... }, ... }, ... 'cv': { ... 'kfold_5': KFold(n_splits=5, shuffle=True, random_state=42) ... }, ... 'no_scale': ['tree_class'], ... 'no_poly': ['knn_class', 'tree_class'] ... } Example 1: Compare models with default parameters: >>> results_df = compare_models( ... ... # Data split and sampling ... x=X, y=y, test_size=0.25, stratify=None, under_sample=None, ... over_sample=None, svm_knn_resample=None, ... ... # Models and pipeline steps ... imputer=None, transformers=None, scaler='stand', selector=None, ... models=['logreg', 'knn_class', 'tree_class'], svm_proba=True, ... ... # Grid search ... search_type='random', scorer='accuracy', grid_cv='kfold_5', verbose=1, ... ... # Model evaluation and charts ... model_eval=True, plot_perf=True, plot_curve=True, fig_size=(12,6), ... legend_loc='lower left', rotation=45, threshold=0.5, ... class_map=class_map, pos_label=1, title='Breast Cancer', ... ... # Config, preferences and notes ... config=my_config, class_weight=None, random_state=42, decimal=2, ... n_jobs=None, notes='Test Size=0.25, Threshold=0.50' ... ) #doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS <BLANKLINE> ----------------------------------------------------------------------------------------- Starting Data Processing - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Classification type detected: binary Unique values in y: [0 1] <BLANKLINE> Train/Test split, test_size: 0.25 X_train, X_test, y_train, y_test shapes: (750, 20) (250, 20) (750,) (250,) <BLANKLINE> ----------------------------------------------------------------------------------------- 1/3: Starting LogisticRegression Random Search - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Fitting 5 folds for each of 10 candidates, totalling 50 fits <BLANKLINE> Total Time: ... seconds Average Fit Time: ... seconds Inference Time: ... Best CV Accuracy Score: 0.88 Train Accuracy Score: 0.89 Test Accuracy Score: 0.86 Overfit: Yes Overfit Difference: 0.03 Best Parameters: {'logreg__solver': 'saga', 'logreg__C': 0.1} <BLANKLINE> LogisticRegression Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Malignant 0.81 0.82 0.81 92 Benign 0.89 0.89 0.89 158 <BLANKLINE> accuracy 0.86 250 macro avg 0.85 0.85 0.85 250 weighted avg 0.86 0.86 0.86 250 <BLANKLINE> ROC AUC: 0.92 <BLANKLINE> Predicted:0 1 Actual: 0 75 17 Actual: 1 18 140 <BLANKLINE> True Positive Rate / Sensitivity: 0.89 True Negative Rate / Specificity: 0.82 False Positive Rate / Fall-out: 0.18 False Negative Rate / Miss Rate: 0.11 <BLANKLINE> Positive Class: Benign (1) Threshold: 0.5 <BLANKLINE> ----------------------------------------------------------------------------------------- 2/3: Starting KNeighborsClassifier Random Search - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Fitting 5 folds for each of 10 candidates, totalling 50 fits <BLANKLINE> Total Time: ... seconds Average Fit Time: ... seconds Inference Time: ... Best CV Accuracy Score: 0.86 Train Accuracy Score: 1.00 Test Accuracy Score: 0.84 Overfit: Yes Overfit Difference: 0.16 Best Parameters: {'knn_class__weights': 'distance', 'knn_class__n_neighbors': 20, 'knn_class__metric': 'manhattan'} <BLANKLINE> KNeighborsClassifier Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Malignant 0.75 0.84 0.79 92 Benign 0.90 0.84 0.87 158 <BLANKLINE> accuracy 0.84 250 macro avg 0.82 0.84 0.83 250 weighted avg 0.84 0.84 0.84 250 <BLANKLINE> ROC AUC: 0.91 <BLANKLINE> Predicted:0 1 Actual: 0 77 15 Actual: 1 26 132 <BLANKLINE> True Positive Rate / Sensitivity: 0.84 True Negative Rate / Specificity: 0.84 False Positive Rate / Fall-out: 0.16 False Negative Rate / Miss Rate: 0.16 <BLANKLINE> Positive Class: Benign (1) Threshold: 0.5 <BLANKLINE> ----------------------------------------------------------------------------------------- 3/3: Starting DecisionTreeClassifier Random Search - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Fitting 5 folds for each of 10 candidates, totalling 50 fits <BLANKLINE> Total Time: ... seconds Average Fit Time: ... seconds Inference Time: ... Best CV Accuracy Score: 0.88 Train Accuracy Score: 0.93 Test Accuracy Score: 0.86 Overfit: Yes Overfit Difference: 0.08 Best Parameters: {'tree_class__min_samples_split': 15, 'tree_class__min_samples_leaf': 6, 'tree_class__max_depth': 5, 'tree_class__criterion': 'entropy'} <BLANKLINE> DecisionTreeClassifier Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Malignant 0.76 0.89 0.82 92 Benign 0.93 0.84 0.88 158 <BLANKLINE> accuracy 0.86 250 macro avg 0.84 0.86 0.85 250 weighted avg 0.87 0.86 0.86 250 <BLANKLINE> ROC AUC: 0.92 <BLANKLINE> Predicted:0 1 Actual: 0 82 10 Actual: 1 26 132 <BLANKLINE> True Positive Rate / Sensitivity: 0.84 True Negative Rate / Specificity: 0.89 False Positive Rate / Fall-out: 0.11 False Negative Rate / Miss Rate: 0.16 <BLANKLINE> Positive Class: Benign (1) Threshold: 0.5 >>> results_df.head() #doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS Model Test Size Over Sample Under Sample Resample Total Fit Time Fit Count Average Fit Time Inference Time Grid Scorer Best Params Best CV Score Train Score Test Score Overfit Overfit Difference Train Accuracy Score Test Accuracy Score Train Precision Score Test Precision Score Train Recall Score Test Recall Score Train F1 Score Test F1 Score Train ROC AUC Score Test ROC AUC Score Threshold True Positives False Positives True Negatives False Negatives TPR FPR TNR FNR False Rate Pipeline Notes Timestamp 0 LogisticRegression 0.25 None None None ... 50 ... Accuracy {'logreg__solver': 'saga', 'logreg__C': 0.1} 0.877333 0.888 0.860 Yes 0.028 0.888 0.860 0.903153 0.891720 0.907240 0.886076 0.905192 0.888889 0.935388 0.922675 0.5 140 17 75 18 0.886076 0.184783 0.815217 0.113924 0.298707 [stand, logreg] Test Size=0.25, Threshold=0.50... 1 KNeighborsClassifier 0.25 None None None ... 50 ... Accuracy {'knn_class__weights': 'distance', 'knn_class_... 0.861333 1.000 0.836 Yes 0.164 1.000 0.836 1.000000 0.897959 1.000000 0.835443 1.000000 0.865574 1.000000 0.911805 0.5 132 15 77 26 0.835443 0.163043 0.836957 0.164557 0.327600 [stand, knn_class] Test Size=0.25, Threshold=0.50... 2 DecisionTreeClassifier 0.25 None None None ... 50 ... Accuracy {'tree_class__min_samples_split': 15, 'tree_cl... 0.882667 0.932 0.856 Yes 0.076 0.932 0.856 0.955711 0.929577 0.927602 0.835443 0.941447 0.880000 0.974926 0.919889 0.5 132 10 82 26 0.835443 0.108696 0.891304 0.164557 0.273253 [tree_class] Test Size=0.25, Threshold=0.50... Example 2: Compare models with more pipeline steps, stratification, under sampling, and resampling for SVM, with SVM probabilities enabled: >>> results_df = compare_models( ... ... # Data split and sampling ... x=X, y=y, test_size=0.25, stratify=y, under_sample=0.8, ... over_sample=None, svm_knn_resample=0.2, ... ... # Models and pipeline steps ... imputer='simple_imputer', transformers=None, scaler='stand', selector=None, ... models=['logreg', 'svm_proba'], svm_proba=True, ... ... # Grid search ... search_type='random', scorer='accuracy', grid_cv='kfold_5', verbose=1, ... ... # Model evaluation and charts ... model_eval=True, plot_perf=True, plot_curve=True, fig_size=(12,6), ... legend_loc='lower left', rotation=45, threshold=0.5, ... class_map=class_map, pos_label=1, title='Breast Cancer', ... ... # Config, preferences and notes ... config=my_config, class_weight=None, random_state=42, decimal=2, ... n_jobs=None, notes='Test Size=0.25, Threshold=0.50' ... ) #doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS <BLANKLINE> ----------------------------------------------------------------------------------------- Starting Data Processing - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Classification type detected: binary Unique values in y: [0 1] <BLANKLINE> Train/Test split, test_size: 0.25 X_train, X_test, y_train, y_test shapes: (750, 20) (250, 20) (750,) (250,) <BLANKLINE> Undersampling via RandomUnderSampler strategy: 0.8 X_train, y_train shapes before: (750, 20) (750,) y_train value counts before: Target 1 450 0 300 Name: count, dtype: int64 Running RandomUnderSampler on X_train, y_train... X_train, y_train shapes after: (675, 20) (675,) y_train value counts after: Target 1 375 0 300 Name: count, dtype: int64 <BLANKLINE> ----------------------------------------------------------------------------------------- 1/2: Starting LogisticRegression Random Search - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Fitting 5 folds for each of 10 candidates, totalling 50 fits <BLANKLINE> Total Time: ... seconds Average Fit Time: ... seconds Inference Time: ... Best CV Accuracy Score: 0.87 Train Accuracy Score: 0.88 Test Accuracy Score: 0.86 Overfit: Yes Overfit Difference: 0.01 Best Parameters: {'logreg__solver': 'saga', 'logreg__C': 0.1} <BLANKLINE> LogisticRegression Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Malignant 0.84 0.82 0.83 100 Benign 0.88 0.89 0.89 150 <BLANKLINE> accuracy 0.86 250 macro avg 0.86 0.86 0.86 250 weighted avg 0.86 0.86 0.86 250 <BLANKLINE> ROC AUC: 0.92 <BLANKLINE> Predicted:0 1 Actual: 0 82 18 Actual: 1 16 134 <BLANKLINE> True Positive Rate / Sensitivity: 0.89 True Negative Rate / Specificity: 0.82 False Positive Rate / Fall-out: 0.18 False Negative Rate / Miss Rate: 0.11 <BLANKLINE> Positive Class: Benign (1) Threshold: 0.5 <BLANKLINE> ----------------------------------------------------------------------------------------- 2/2: Starting SVC Random Search - ... UTC ----------------------------------------------------------------------------------------- <BLANKLINE> Training data resampled to 20.0% of original for KNN and SVM speed improvement X_train, y_train shapes after: (135, 20) (135,) y_train value counts after: Target 1 75 0 60 Name: count, dtype: int64 <BLANKLINE> Fitting 5 folds for each of 10 candidates, totalling 50 fits <BLANKLINE> Total Time: ... seconds Average Fit Time: ... seconds Inference Time: ... Best CV Accuracy Score: 0.87 Train Accuracy Score: 0.90 Test Accuracy Score: 0.86 Overfit: Yes Overfit Difference: 0.05 Best Parameters: {'svm_proba__kernel': 'linear', 'svm_proba__C': 0.01} <BLANKLINE> SVC Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Malignant 0.83 0.85 0.84 100 Benign 0.90 0.88 0.89 150 <BLANKLINE> accuracy 0.87 250 macro avg 0.86 0.86 0.86 250 weighted avg 0.87 0.87 0.87 250 <BLANKLINE> ROC AUC: 0.92 <BLANKLINE> Predicted:0 1 Actual: 0 85 15 Actual: 1 18 132 <BLANKLINE> True Positive Rate / Sensitivity: 0.88 True Negative Rate / Specificity: 0.85 False Positive Rate / Fall-out: 0.15 False Negative Rate / Miss Rate: 0.12 <BLANKLINE> Positive Class: Benign (1) Threshold: 0.5 """ # Initialize debugging, controlled via 'debug' parameter db = DebugPrinter(debug = debug) db.print('-' * 40) db.print('START compare_models') db.print('-' * 40, '\n') db.print('x shape:', x.shape) db.print('y shape:', y.shape) db.print('models:', models) db.print('imputer:', imputer) db.print('impute_first:', impute_first) db.print('transformers:', transformers) db.print('scaler:', scaler) db.print('selector:', selector) db.print('cat_columns:', cat_columns) db.print('num_columns:', num_columns) db.print('class_map:', class_map) db.print('pos_label:', pos_label) db.print('test_size:', test_size) db.print('threshold:', threshold) db.print('class_weight:', class_weight) db.print('stratify:', stratify) db.print('search_type:', search_type) db.print('cv_folds:', grid_cv) db.print('plot_perf:', plot_perf) db.print('scorer:', scorer) db.print('random_state:', random_state) db.print('decimal:', decimal) db.print('verbose:', verbose) db.print('title:', title) db.print('fig_size:', fig_size) db.print('figmulti:', figmulti) db.print('multi_class:', multi_class) db.print('average:', average) db.print('legend_loc:', legend_loc) db.print('model_eval:', model_eval) db.print('svm_proba:', svm_proba) db.print('max_iter:', max_iter) db.print('rotation:', rotation) db.print('plot_curve:', plot_curve) db.print('under_sample:', under_sample) db.print('over_sample:', over_sample) db.print('notes:', notes) db.print('svm_knn_resample:', svm_knn_resample) db.print('n_jobs:', n_jobs) db.print('output:', output) db.print('timezone:', timezone) db.print('config:', config) # Define required parameters required_params = { 'x': x, 'y': y, 'models': models, 'config': config } # Find which parameters are missing db.print('\nChecking for missing parameters...') missing_params = [name for name, value in required_params.items() if value is None] # Show error message if required parameters are missing if missing_params: missing_str = ", ".join(missing_params) raise ValueError(f"Missing required parameters: {missing_str}.") # Define required keys required_keys = ['models', 'params'] # Check for missing keys missing_keys = [key for key in required_keys if key not in config] if missing_keys: missing_str = ", ".join(missing_keys) raise ValueError(f"Missing required configuration keys: {missing_str}") # Create a mapping from model key to class name based on the provided configuration # model_map = {key: value.__class__.__name__ for key, value in config['models'].items()} model_map = {key: (value, value.__class__.__name__) for key, value in config['models'].items()} db.print('model_map:', model_map) # Check if all provided model keys exist in the model_map missing_models = [model_key for model_key in models if model_key not in model_map] # If there are missing models, raise an error now instead of finding out later if missing_models: known_models = ', '.join(model_map.keys()) missing_models_str = ', '.join(missing_models) raise ValueError(f"'{missing_models_str}' not in config['models']. Please add them to your configuration. Known models are: {known_models}") # Store the grid search params from the config in grid_params # To-do: Make grid search optional grid_params = config['params'] db.print('grid_params:', grid_params) # Configure the cross-validation function for Grid Search if isinstance(grid_cv, int): db.print(f'\ngrid_cv is int: {grid_cv}. Using KFold cross-validation...') cv_func = KFold(n_splits=grid_cv, shuffle=True, random_state=random_state) elif isinstance(grid_cv, str): db.print(f"\ngrid_cv is str: {grid_cv}. Looking for function in config['cv']...") if 'cv' not in config: raise ValueError("Key 'cv' not found in config. Please define a cross-validation function in config['cv'] and set grid_cv to that string name. Alternatively, specify an int for the number of folds, or don't specify grid_cv to go with default of 5.") elif config['cv'] is None: raise ValueError("config['cv'] is None. Please define a cross-validation function in config['cv'] and set grid_cv to that string name. Alternatively, specify an int for the number of folds, or don't specify grid_cv to go with default of 5.") # Get the cross-validation function from the config elif grid_cv in config['cv']: cv_func = config['cv'][grid_cv] db.print("grid_cv found in config['cv']. Using specified instance for cross-validation...") else: raise ValueError(f"Invalid grid_cv: {grid_cv}. Please define a cross-validation function in config['cv'] and set grid_cv to that string name. Alternatively, specify an int for the number of folds, or don't specify grid_cv to go with default of 5.") else: db.print(f"\ngrid_cv is None or not an int or str. Using default KFold cross-validation with 5 splits...") cv_func = KFold(n_splits=5, shuffle=True, random_state=random_state) db.print('cv_func:', cv_func) # Function to create a scorer and a display name from the scorer param def get_scorer_and_name(scorer, pos_label=None): # Define valid average types for multi-class/multi-label scenarios average_types = ['micro', 'macro', 'weighted', 'samples'] # Define valid scorers, including those with specific average types valid_scorers = [ 'accuracy', 'balanced_accuracy', 'neg_log_loss', 'average_precision', 'roc_auc', 'roc_auc_ovr', 'roc_auc_ovo', 'roc_auc_ovr_weighted', 'roc_auc_ovo_weighted', *['precision', 'recall', 'f1'], # Basic forms for binary classification with pos_label *[ f'{metric}_{avg}' for metric in ['precision', 'recall', 'f1'] for avg in average_types ] ] # Function to build the scoring function and display name def build_scoring_function(score_type, pos_label=None, average='macro', zero_division=0): if pos_label is not None: # For binary classification tasks requiring a pos_label return (make_scorer(eval(f'{score_type}_score'), pos_label=pos_label, zero_division=zero_division), f'{score_type.capitalize()} (pos_label={pos_label})') elif average in average_types: # For multi-class/multi-label tasks specifying an average type return make_scorer(eval(f'{score_type}_score'), average=average, zero_division=zero_division), f'{score_type.capitalize()} ({average})' else: raise ValueError(f"Invalid average type: {average}. Valid options are: {', '.join(average_types)}") # Determine the scorer and display name based on input db.print('\nCreating scoring function...') if scorer in valid_scorers: if scorer in ['precision', 'recall', 'f1'] and pos_label is None: # Default to 'macro' average for multi-class tasks if pos_label is not specified db.print('Using macro average for multi-class tasks...') scoring_function, display_name = build_scoring_function(scorer, average='macro') elif scorer.startswith(('precision_', 'recall_', 'f1_')): # Extract score type and average type from scorer string db.print('Extracting score type and average type from scorer string...') score_type, avg_type = scorer.split('_') scoring_function, display_name = build_scoring_function(score_type, average=avg_type, zero_division=0) elif scorer == 'accuracy': db.print('Using accuracy as the scoring function...') scoring_function, display_name = 'accuracy', 'Accuracy' else: # Use predefined scikit-learn scorer strings for other cases db.print('Using predefined scikit-learn scorer strings...') scoring_function, display_name = scorer, scorer.capitalize() else: # Show an error message if the scorer is invalid raise ValueError(f"Unsupported scorer: {scorer}. Valid options are: {', '.join(valid_scorers)}") return scoring_function, display_name # Define the scorer and display name scorer, scorer_name = get_scorer_and_name(scorer=scorer, pos_label=pos_label) db.print('scorer:', scorer) db.print('scorer_name:', scorer_name) # Empty timestamp by default for test cases where we don't want time differences to trigger a failure timestamp = '' # Set initial timestamp for data processing current_time = datetime.now(pytz.timezone(timezone)) timestamp = current_time.strftime(f'%b %d, %Y %I:%M %p {timezone}') if output: print(f"\n-----------------------------------------------------------------------------------------") print(f"Starting Data Processing - {timestamp}") print(f"-----------------------------------------------------------------------------------------\n") # Detect the type of classification problem unique_y = np.unique(y) num_classes = len(unique_y) db.print('unique_y:', unique_y) db.print('num_classes:', num_classes) if num_classes > 2: class_type = 'multi' if average is None: average = 'macro' elif num_classes == 2: class_type = 'binary' average = 'binary' else: raise ValueError(f"Check data, cannot classify. Number of classes in y_test ({num_classes}) is less than 2: {unique_y}") if output: print(f"Classification type detected: {class_type}") print("Unique values in y:", unique_y) # Change data type of y if necessary # if y.dtype.kind in 'biufc': # If y is numeric # y = y.astype(int) # Convert to int for numeric labels # else: # y = y.astype(str) # Convert to str for categorical labels # # if output: # print(f"y data type after conversion: {y.dtype}") # Make sure y is a Series or a one-dimensional array if isinstance(y, pd.DataFrame): # Check if y is a DataFrame with only one column if y.shape[1] == 1: # Convert the single-column DataFrame to a Series db.print('\nConverting y from DataFrame to Series...') y = y.squeeze() db.print('y shape after conversion:', y.shape) else: # Handle the case where y is a DataFrame with multiple columns raise ValueError("y should be a Series or a one-dimensional array, but a DataFrame with multiple columns was provided.") # Perform the train/test split X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=test_size, stratify=stratify, random_state=random_state) if output: print("\nTrain/Test split, test_size: ", test_size) print("X_train, X_test, y_train, y_test shapes: ", X_train.shape, X_test.shape, y_train.shape, y_test.shape) # Over sample with SMOTE, if requested if over_sample: if output: print("\nOversampling via SMOTE strategy: ", over_sample) print("X_train, y_train shapes before: ", X_train.shape, y_train.shape) print("y_train value counts before: ", y_train.value_counts()) print("Running SMOTE on X_train, y_train...") over = SMOTE(sampling_strategy=over_sample, random_state=random_state) X_train, y_train = over.fit_resample(X_train, y_train) if output: print("X_train, y_train shapes after: ", X_train.shape, y_train.shape) print("y_train value counts after: ", y_train.value_counts()) # Under sample with RandomUnderSampler, if requested if under_sample: if output: print("\nUndersampling via RandomUnderSampler strategy: ", under_sample) print("X_train, y_train shapes before: ", X_train.shape, y_train.shape) print("y_train value counts before: ", y_train.value_counts()) print("Running RandomUnderSampler on X_train, y_train...") under = RandomUnderSampler(sampling_strategy=under_sample, random_state=random_state) X_train, y_train = under.fit_resample(X_train, y_train) if output: print("X_train, y_train shapes after: ", X_train.shape, y_train.shape) print("y_train value counts after: ", y_train.value_counts()) # Initialized some variables and lists timestamp_list = [] model_name_list = [] pipeline_list = [] fit_time_list = [] fit_count_list = [] avg_fit_time_list = [] inference_time_list = [] train_score_list = [] test_score_list = [] overfit_list = [] overfit_diff_list = [] best_param_list = [] best_cv_score_list = [] best_estimator_list = [] train_accuracy_list = [] test_accuracy_list = [] train_precision_list = [] test_precision_list = [] train_recall_list = [] test_recall_list = [] train_f1_list = [] test_f1_list = [] train_roc_auc_list = [] test_roc_auc_list = [] binary_metrics = None tp_list = [] fp_list = [] tn_list = [] fn_list = [] tpr_list = [] fpr_list = [] tnr_list = [] fnr_list = [] fr_list = [] resample_list = [] resample_completed = False # Function to use a subset of the data for KNN and SVM which can be compute intensive def resample_for_knn_svm(X_train, y_train): X_train, _, y_train, _ = train_test_split( X_train, y_train, test_size=1-svm_knn_resample, stratify=y_train, random_state=random_state ) if output: print(f"Training data resampled to {svm_knn_resample*100}% of original for KNN and SVM speed improvement") print("X_train, y_train shapes after: ", X_train.shape, y_train.shape) print("y_train value counts after: ", y_train.value_counts(), "\n") return X_train, y_train # Function to create the grid search object based on the model_type key def create_grid(model_type): # Ensure the model type is in the params dictionary if model_type not in grid_params: raise ValueError(f"Parameters for {model_type} are not defined in the grid_params dictionary") # Grab the model params for the grid search combined_params = grid_params[model_type] # Add optional params for pipeline components, they all need to be in one dict for the search if imputer is not None and imputer in grid_params: combined_params = {**combined_params, **grid_params[imputer]} if selector is not None and selector in grid_params: combined_params = {**combined_params, **grid_params[selector]} if scaler is not None and scaler in grid_params: combined_params = {**combined_params, **grid_params[scaler]} # Select the appropriate search method if search_type == 'grid': grid = GridSearchCV(pipe, param_grid=combined_params, scoring=scorer, verbose=verbose, cv=cv_func, n_jobs=n_jobs) elif search_type == 'random': grid = RandomizedSearchCV(pipe, param_distributions=combined_params, scoring=scorer, verbose=verbose, cv=cv_func, random_state=random_state, n_jobs=n_jobs) else: raise ValueError("search_type should be either 'grid' for GridSearchCV, or 'random' for RandomizedSearchCV") return grid # Clean up the grid search type for display search_string = search_type.capitalize() # Set count of total models to iterate through total_models = len(models) # Model Loop: Iterate through each model in the list and run the workflow for each for i, model_key in enumerate(models): # Get the model class and a text version of the name from the mapping we did earlier model_class, model_name = model_map[model_key] db.print(f'\nStarting iteration. i: {i}, total_models: {total_models}, model_key: {model_key}, model_class:{model_class}, model_name: {model_name}:') # Create the timestamp for this model's iteration current_time = datetime.now(pytz.timezone(timezone)) timestamp = current_time.strftime(f'%b %d, %Y %I:%M %p {timezone}') timestamp_list.append(timestamp) # Show a banner with number, model name, search type, timestamp, for this model's iteration if output: print(f"\n-----------------------------------------------------------------------------------------") print(f"{i+1}/{total_models}: Starting {model_name} {search_string} Search - {timestamp}") print(f"-----------------------------------------------------------------------------------------\n") # Resample the data only for KNN and SVC, if svn_knn_resample is defined if svm_knn_resample is not None and model_name in ['KNeighborsClassifier', 'SVC']: db.print('\nResampling for KNN and SVM...') X_train, y_train = resample_for_knn_svm(X_train, y_train) resample_list.append(svm_knn_resample) else: resample_list.append("None") # Set the random seed to random_state for models using TensorFlow if model_name == 'KerasClassifier': db.print('\nSetting random seed for Keras Classifier:', random_state) tf.random.set_seed(random_state) db.print('\nCreating pipeline from transformer and model parameters...') # Create a pipeline from transformer and model parameters pipe = create_pipeline(imputer_key=imputer, transformer_keys=transformers, scaler_key=scaler, selector_key=selector, model_key=model_key, config=config, cat_columns=cat_columns, num_columns=num_columns, class_weight=class_weight, random_state=random_state, max_iter=max_iter, impute_first=impute_first) db.print('pipe:', pipe) db.print('\nCreating grid search object...') grid = create_grid(model_type=model_key) db.print('grid:', grid) # Append to each list the value from this iteration, starting with model name, pipeline, etc. model_name_list.append(model_name) pipeline_list.append(list(pipe.named_steps.keys())) # Fit the model and measure total fit time, append to list start_time = time.time() db.print('\nFitting grid...') grid.fit(X_train, y_train) db.print('\nGrid fit complete.') db.print('\nGrid search results:') db.print(grid.cv_results_) fit_time = time.time() - start_time fit_time_list.append(fit_time) if output: print(f"\nTotal Time: {fit_time:.{decimal}f} seconds") # Calculate average fit time (for each fold in the CV search) and append to list db.print('\nCalculating average fit time...') n_splits = cv_func.get_n_splits() db.print('n_splits:', n_splits) n_folds = len(grid.cv_results_['params']) db.print('n_folds:', n_folds) fit_count = n_splits * n_folds db.print('fit_count:', fit_count) fit_count_list.append(fit_count) db.print('fit_time:', fit_time) avg_fit_time = fit_time / fit_count avg_fit_time_list.append(avg_fit_time) if output: print(f"Average Fit Time: {avg_fit_time:.{decimal}f} seconds") # Function to apply different thresholds for binary classification def apply_threshold(probs, threshold): return np.where(probs >= threshold, 1, 0) # Debugging data for detecting support of predict_proba db.print("grid.best_estimator_:", grid.best_estimator_) db.print("hasattr(grid.best_estimator_, 'predict_proba'):", hasattr(grid.best_estimator_, 'predict_proba')) db.print("hasattr(grid.best_estimator_, 'decision_function'):", hasattr(grid.best_estimator_, 'decision_function')) # Generate train predictions based on class type and threshold db.print('\nGenerating train predictions based on class type and threshold...') if class_type == 'binary': if hasattr(grid.best_estimator_, 'predict_proba'): # Model supports probability estimates if threshold != 0.5: db.print(f'Class: {class_type}, Method: predict_proba, Threshold: {threshold}, Data: Train') # Get probabilities for the positive class probabilities_train = grid.predict_proba(X_train)[:, 1] # Apply the custom threshold to get binary predictions y_train_pred = apply_threshold(probabilities_train, threshold) else: db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Train') # Use default predictions for binary classification y_train_pred = grid.predict(X_train) elif hasattr(grid.best_estimator_, 'decision_function'): db.print(f'Class: {class_type}, Method: decision_function, Threshold: {threshold}, Data: Train') # Model does not support probability estimates but has a decision function (ex: SVC without probability) decision_values_train = grid.decision_function(X_train) # Apply the custom threshold to the decision function values y_train_pred = apply_threshold(decision_values_train, threshold) else: db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Train') # Use default predictions if neither predict_proba nor decision_function are available y_train_pred = grid.predict(X_train) elif class_type == 'multi': db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Train') # Use default predictions for multi-class classification y_train_pred = grid.predict(X_train) # Start tracking the inference time, or test predictions time start_time = time.time() # Generate test predictions based on class type and threshold db.print('\nGenerating test predictions based on class type and threshold...') if class_type == 'binary': if hasattr(grid.best_estimator_, 'predict_proba'): if threshold != 0.5: db.print(f'Class: {class_type}, Method: predict_proba, Threshold: {threshold}, Data: Test') probabilities_test = grid.predict_proba(X_test)[:, 1] y_test_pred = apply_threshold(probabilities_test, threshold) else: db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Test') y_test_pred = grid.predict(X_test) elif hasattr(grid.best_estimator_, 'decision_function'): db.print(f'Class: {class_type}, Method: decision_function, Threshold: {threshold}, Data: Test') decision_values_test = grid.decision_function(X_test) y_test_pred = apply_threshold(decision_values_test, threshold) else: db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Test') y_test_pred = grid.predict(X_test) elif class_type == 'multi': db.print(f'Class: {class_type}, Method: predict, Threshold: {threshold}, Data: Test') y_test_pred = grid.predict(X_test) # Capture the inference time, or test predictions time inference_time = time.time() - start_time inference_time_list.append(inference_time) if output: print(f"Inference Time: {inference_time:.{decimal}f}") # Calculate ROC AUC, based on class type and predict_proba support def calculate_roc_auc(grid, X, y, class_type, note): try: # Attempt to use predict_proba or decision_function based on class_type if class_type == 'multi': # Ensure predict_proba is available for the grid (model) if hasattr(grid, 'predict_proba'): db.print(f'Class: {class_type}, Method: predict_proba(X), Threshold: {threshold}, Data: {note}, Score: ROC AUC') pred_proba = grid.predict_proba(X) # Check if predict_proba output is 2D and correct shape, adjust if necessary if pred_proba.ndim == 1: db.print(f'pred_proba.ndim == 1, Before: {pred_proba.shape}') db.print('pred_proba:', pred_proba) pred_proba = np.expand_dims(pred_proba, axis=1) db.print(f'After: {pred_proba.shape}') db.print('pred_proba:', pred_proba) return roc_auc_score(y, pred_proba, multi_class='ovr') else: print(f"Model does not support 'predict_proba' for multi-class ROC AUC calculation.") return None else: # For binary classification, directly use predict_proba or decision_function if hasattr(grid, 'predict_proba'): db.print(f'Class: {class_type}, Method: predict_proba(X)[:, 1], Threshold: {threshold}, Data: {note}, Score: ROC AUC') pred_proba = grid.predict_proba(X)[:, 1] db.print('pred_proba:', pred_proba) return roc_auc_score(y, pred_proba) elif hasattr(grid, 'decision_function'): db.print(f'Class: {class_type}, Method: decision_function(X), Threshold: {threshold}, Data: {note}, Score: ROC AUC') decision_values = grid.decision_function(X) db.print('decision_values:', decision_values) return roc_auc_score(y, decision_values) else: print(f"Model does not support 'predict_proba' or 'decision_function' for binary ROC AUC calculation.") return None except Exception as e: print(f"An error occurred during ROC AUC calculation: {str(e)}") return None # Calculate the train and test ROC AUC db.print('\nCalculating ROC AUC...') train_roc_auc = calculate_roc_auc(grid, X_train, y_train, class_type=class_type, note='Train') test_roc_auc = calculate_roc_auc(grid, X_test, y_test, class_type=class_type, note='Test') # Calculate train metrics db.print('\nCalculating train metrics...') train_accuracy = accuracy_score(y_train, y_train_pred) train_precision = precision_score(y_train, y_train_pred, average=average, zero_division=0, pos_label=pos_label) train_recall = recall_score(y_train, y_train_pred, average=average, pos_label=pos_label) train_f1 = f1_score(y_train, y_train_pred, average=average, pos_label=pos_label) # Calculate test metrics db.print('\nCalculating test metrics...') test_accuracy = accuracy_score(y_test, y_test_pred) test_precision = precision_score(y_test, y_test_pred, average=average, zero_division=0, pos_label=pos_label) test_recall = recall_score(y_test, y_test_pred, average=average, pos_label=pos_label) test_f1 = f1_score(y_test, y_test_pred, average=average, pos_label=pos_label) # Append train metrics to lists db.print('\nAppending train metrics to lists...') train_accuracy_list.append(train_accuracy) train_precision_list.append(train_precision) train_recall_list.append(train_recall) train_f1_list.append(train_f1) train_roc_auc_list.append(train_roc_auc) # Append test metrics to lists db.print('\nAppending test metrics to lists...') test_accuracy_list.append(test_accuracy) test_precision_list.append(test_precision) test_recall_list.append(test_recall) test_f1_list.append(test_f1) test_roc_auc_list.append(test_roc_auc) # Get the best Grid Search CV score and append to list db.print('\nGetting the best Grid Search CV score...') best_cv_score = grid.best_score_ best_cv_score_list.append(best_cv_score) if output: print(f"Best CV {scorer_name} Score: {best_cv_score:.{decimal}f}") # Get the best Grid Search Train score and append to list db.print('\nGetting the best Grid Search Train score...') train_score = grid.score(X_train, y_train) train_score_list.append(train_score) if output: print(f"Train {scorer_name} Score: {train_score:.{decimal}f}") # Get the best Grid Search Test score and append to list db.print('\nGetting the best Grid Search Test score...') test_score = grid.score(X_test, y_test) test_score_list.append(test_score) if output: print(f"Test {scorer_name} Score: {test_score:.{decimal}f}") # Assess the degree of overfit (train score higher than test score) db.print('\nAssessing the degree of overfit...') overfit_diff = train_score - test_score overfit_diff_list.append(overfit_diff) if train_score > test_score: overfit = 'Yes' else: overfit = 'No' overfit_list.append(overfit) if output: print(f"Overfit: {overfit}") print(f"Overfit Difference: {overfit_diff:.{decimal}f}") # Capture the best model and params from grid search db.print('\nCapturing the best model and params from grid search...') best_estimator = grid.best_estimator_ best_estimator_list.append(best_estimator) best_params = grid.best_params_ best_param_list.append(best_params) if output: print(f"Best Parameters: {best_params}") # Output the neural network layers for KerasClassifier if model_name == 'KerasClassifier': db.print('\nOutputting the neural network layers for KerasClassifier...') keras_classifier = grid.best_estimator_.named_steps['keras_class'] keras_model = keras_classifier.model_ if output: print('') # Empty line for spacing # Access the Keras model from the best estimator in the grid search keras_model.summary() # Display model evaluation metrics and plots by calling 'eval_model' function # Note: Some of this duplicates what we just calculated, room for future optimization if model_eval: db.print('\nDisplaying model evaluation metrics and plots...') # Handle binary vs. multi-class, and special case for SVC that requires svm_proba=True if model_name != 'SVC' or (model_name == 'SVC' and svm_proba == True): if class_type == 'binary': # Capture binary metrics for processing later, only in the binary case binary_metrics = eval_model(y_test=y_test, y_pred=y_test_pred, x_test=X_test, estimator=grid, class_map=class_map, pos_label=pos_label, debug=debug, class_type=class_type, model_name=model_name, threshold=threshold, decimal=decimal, plot=True, figsize=(12,11), class_weight=class_weight, return_metrics=True, output=output) elif class_type == 'multi': multi_metrics = eval_model(y_test=y_test, y_pred=y_test_pred, x_test=X_test, estimator=grid, class_map=class_map, pos_label=pos_label, debug=debug, class_type=class_type, model_name=model_name, average=average, decimal=decimal, plot=True, figmulti=figmulti, class_weight=class_weight, return_metrics=True, output=output, multi_class=multi_class) # For neural network, if plot_curves=True, plot training history if model_name == 'KerasClassifier' and plot_curve: # Access the training history db.print('best_estimator:', best_estimator) db.print('keras_classifier:', keras_classifier) db.print('keras_model:', keras_model) db.print('keras_classifier.history_:', keras_classifier.history_) history = keras_classifier.history_ # Plot the training history plot_train_history(history=history) # Set the binary metric values based on the list of binary metrics, if it was produced by 'eval_model' if binary_metrics is not None: db.print('\nSetting the binary metric values based on the list of binary metrics...') tp = binary_metrics['True Positives'] fp = binary_metrics['False Positives'] tn = binary_metrics['True Negatives'] fn = binary_metrics['False Negatives'] tpr = binary_metrics['TPR'] fpr = binary_metrics['FPR'] tnr = binary_metrics['TNR'] fnr = binary_metrics['FNR'] fr = fnr + fpr # If no binary metrics, set the values as NaN (better than string, allows numeric formatting from 'format_df') else: db.print('\nSetting the binary metric values as NaN...') tp = np.nan fp = np.nan tn = np.nan fn = np.nan tpr = np.nan fpr = np.nan tnr = np.nan fnr = np.nan fr = np.nan # Append the binary metrics to the list db.print('\nAppending the binary metrics to the list...') tp_list.append(tp) fp_list.append(fp) tn_list.append(tn) fn_list.append(fn) tpr_list.append(tpr) fpr_list.append(fpr) tnr_list.append(tnr) fnr_list.append(fnr) fr_list.append(fr) # To debug lists not being the same length, print the lengths db.print('\nLength of each list:') db.print('Model', len(model_name_list)) db.print('Test Size', len([test_size] * len(model_name_list))) db.print('Over Sample', len([over_sample] * len(model_name_list))) db.print('Under Sample', len([under_sample] * len(model_name_list))) db.print('Resample', len(resample_list)) db.print('Total Fit Time', len(fit_time_list)) db.print('Fit Count', len(fit_count_list)) db.print('Average Fit Time', len(avg_fit_time_list)) db.print('Inference Time', len(inference_time_list)) db.print('Grid Scorer', len([scorer_name] * len(model_name_list))) db.print('Best Params', len(best_param_list)) db.print('Best CV Score', len(best_cv_score_list)) db.print('Train Score', len(train_score_list)) db.print('Test Score', len(test_score_list)) db.print('Overfit', len(overfit_list)) db.print('Overfit Difference', len(overfit_diff_list)) db.print('Train Accuracy Score', len(train_accuracy_list)) db.print('Test Accuracy Score', len(test_accuracy_list)) db.print('Train Precision Score', len(train_precision_list)) db.print('Test Precision Score', len(test_precision_list)) db.print('Train Recall Score', len(train_recall_list)) db.print('Test Recall Score', len(test_recall_list)) db.print('Train F1 Score', len(train_f1_list)) db.print('Test F1 Score', len(test_f1_list)) db.print('Train ROC AUC Score', len(train_roc_auc_list)) db.print('Test ROC AUC Score', len(test_roc_auc_list)) db.print('Threshold', len([threshold] * len(model_name_list))) db.print('True Positives', len(tp_list)) db.print('False Positives', len(fp_list)) db.print('True Negatives', len(tn_list)) db.print('False Negatives', len(fn_list)) db.print('TPR', len(tpr_list)) db.print('TNR', len(tnr_list)) db.print('FNR', len(fnr_list)) db.print('False Rate', len(fr_list)) db.print('Pipeline', len(pipeline_list)) db.print('Notes', len([notes] * len(model_name_list))) db.print('Timestamp', len(timestamp_list)) # Create the results DataFrame with each list as a column, with a row for model iteration in this run db.print('\nCreating the results DataFrame...') results_df = pd.DataFrame({'Model': model_name_list, 'Test Size': [test_size] * len(model_name_list), 'Over Sample': [over_sample] * len(model_name_list), 'Under Sample': [under_sample] * len(model_name_list), 'Resample': resample_list, 'Total Fit Time': fit_time_list, 'Fit Count': fit_count_list, 'Average Fit Time': avg_fit_time_list, 'Inference Time': inference_time_list, 'Grid Scorer': [scorer_name] * len(model_name_list), 'Best Params': best_param_list, 'Best CV Score': best_cv_score_list, 'Train Score': train_score_list, 'Test Score': test_score_list, 'Overfit': overfit_list, 'Overfit Difference': overfit_diff_list, 'Train Accuracy Score': train_accuracy_list, 'Test Accuracy Score': test_accuracy_list, 'Train Precision Score': train_precision_list, 'Test Precision Score': test_precision_list, 'Train Recall Score': train_recall_list, 'Test Recall Score': test_recall_list, 'Train F1 Score': train_f1_list, 'Test F1 Score': test_f1_list, 'Train ROC AUC Score': train_roc_auc_list, 'Test ROC AUC Score': test_roc_auc_list, 'Threshold': [threshold] * len(model_name_list), 'True Positives': tp_list, 'False Positives': fp_list, 'True Negatives': tn_list, 'False Negatives': fn_list, 'TPR': tpr_list, 'FPR': fpr_list, 'TNR': tnr_list, 'FNR': fnr_list, 'False Rate': fr_list, 'Pipeline': pipeline_list, 'Notes': [notes] * len(model_name_list), 'Timestamp': timestamp_list }) # Plot a chart showing the performance of each model, if requested if plot_perf: db.print('\nPlotting a chart showing the performance of each model...') # Melt the results_df so we can plot the scores for each model db.print('Melting the results_df so we can plot the scores for each model...') score_df = results_df.melt(id_vars=['Model'], value_vars=[f'Best CV Score', f'Train Score', f'Test Score'], var_name='Split', value_name=f'{scorer_name}') # Create the bar plot of Scores by Model and Data Split plt.figure(figsize=fig_size) sns.barplot(data=score_df, x='Model', y=f'{scorer_name}', hue='Split') plt.title(f'{title} {scorer_name} Scores by Model and Data Split', fontsize=18, pad=15) plt.yticks(np.arange(0,1.1,0.1)) plt.xticks(rotation=rotation) plt.xlabel('Model', fontsize=14, labelpad=10) plt.ylabel(f'{scorer_name}', fontsize=14, labelpad=10) plt.legend(loc=legend_loc) plt.show() # Create the bar plot of Fit Time by Model plt.figure(figsize=fig_size) sns.barplot(data=results_df, x='Model', y='Average Fit Time') plt.title(f'{title} Average Fit Time by Model', fontsize=18, pad=15) plt.xticks(rotation=rotation) plt.xlabel('Model', fontsize=14, labelpad=10) plt.ylabel('Average Fit Time (seconds)', fontsize=14, labelpad=10) plt.show() # Return the results as a DataFrame return results_df
[docs] def create_nn_binary( hidden_layer_dim: int, dropout_rate: float, l2_reg: float, second_layer_dim: Optional[int] = None, third_layer_dim: Optional[int] = None, meta: Dict[str, Any] = None ) -> keras.models.Sequential: """ Create a binary classification neural network model. This function allows for flexible configuration of the neural network structure for binary classification using the KerasClassifier in scikit-learn. It supports adding up to three hidden layers with customizable dimensions, dropout regularization, and L2 regularization. Use this function to create a neural network model with a specific structure and regularization settings for binary classification tasks. It is set as the `model` parameter of a KerasClassifier instance referenced in the configuration file for `compare_models`. Parameters ---------- hidden_layer_dim : int The number of neurons in the first hidden layer. dropout_rate : float The dropout rate to be applied after each hidden layer. l2_reg : float The L2 regularization strength. If greater than 0, L2 regularization is applied to the kernel weights of the dense layers. second_layer_dim : Optional[int], optional The number of neurons in an additional hidden layer. If not None, an additional hidden layer is added. Default is None. third_layer_dim : Optional[int], optional The number of neurons in a third hidden layer. If not None, a third hidden layer is added. Default is None. meta : Dict[str, Any], optional A dictionary containing metadata about the input features and shape. Default is None. Returns ------- keras.models.Sequential The constructed neural network model for binary classification. Examples -------- >>> pd.set_option('display.max_columns', None) # For test consistency >>> pd.set_option('display.width', None) # For test consistency >>> from sklearn.datasets import make_classification >>> from sklearn.model_selection import train_test_split >>> X, y = make_classification(n_samples=100, n_features=10, random_state=42) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, ... random_state=42) >>> meta = {"n_features_in_": 10, "X_shape_": (80, 10)} Example 1: Create a basic neural network with default settings: >>> model = create_nn_binary(hidden_layer_dim=32, dropout_rate=0.2, l2_reg=0.01, ... meta=meta) >>> model_summary(model) #doctest: +NORMALIZE_WHITESPACE Item Name Type Activation Output Shape Parameters Bytes 0 Model Sequential Sequential None None NaN NaN 1 Input Input KerasTensor None (None, 10) 0.0 0.0 2 Layer Hidden_1 Dense relu (None, 32) 352.0 1408.0 3 Layer Dropout_1 Dropout None (None, 32) 0.0 0.0 4 Layer Output Dense sigmoid (None, 1) 33.0 132.0 5 Statistic Total Params None None None 385.0 1540.0 6 Statistic Trainable Params None None None 385.0 1540.0 7 Statistic Non-Trainable Params None None None 0.0 0.0 Example 2: Create a neural network with additional layers and regularization: >>> model = create_nn_binary(hidden_layer_dim=64, dropout_rate=0.3, l2_reg=0.05, ... second_layer_dim=32, third_layer_dim=16, meta=meta) >>> model_summary(model) #doctest: +NORMALIZE_WHITESPACE Item Name Type Activation Output Shape Parameters Bytes 0 Model Sequential Sequential None None NaN NaN 1 Input Input KerasTensor None (None, 10) 0.0 0.0 2 Layer Hidden_1 Dense relu (None, 64) 704.0 2816.0 3 Layer Dropout_1 Dropout None (None, 64) 0.0 0.0 4 Layer Hidden_2 Dense relu (None, 32) 2080.0 8320.0 5 Layer Dropout_2 Dropout None (None, 32) 0.0 0.0 6 Layer Hidden_3 Dense relu (None, 16) 528.0 2112.0 7 Layer Dropout_3 Dropout None (None, 16) 0.0 0.0 8 Layer Output Dense sigmoid (None, 1) 17.0 68.0 9 Statistic Total Params None None None 3329.0 13316.0 10 Statistic Trainable Params None None None 3329.0 13316.0 11 Statistic Non-Trainable Params None None None 0.0 0.0 """ # Capture parameters from metadata n_features_in_ = meta["n_features_in_"] X_shape_ = meta["X_shape_"] n_classes_ = 1 # For binary classification # Adjust L2 regularization based on the parameter reg = L2(l2_reg) if l2_reg > 0 else None # Create a sequential model model = keras.models.Sequential(name='Sequential') # Create the input layer input_shape = (X_shape_[1],) model.add(Input(shape=input_shape, name='Input')) # Add the first hidden layer model.add(Dense(hidden_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_1')) model.add(Dropout(dropout_rate, name='Dropout_1')) # Add a second hidden layer if specified if second_layer_dim is not None: model.add(Dense(second_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_2')) model.add(Dropout(dropout_rate, name='Dropout_2')) # Add a third hidden layer if specified if third_layer_dim is not None: model.add(Dense(third_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_3')) model.add(Dropout(dropout_rate, name='Dropout_3')) # Add the output layer for binary classification model.add(Dense(n_classes_, activation='sigmoid', name='Output')) return model
[docs] def create_nn_multi( hidden_layer_dim: int, dropout_rate: float, l2_reg: float, second_layer_dim: Optional[int] = None, third_layer_dim: Optional[int] = None, meta: Dict[str, Any] = None ) -> keras.models.Sequential: """ Create a multi-class classification neural network model. This function allows for flexible configuration of the neural network structure for multi-class classification using the KerasClassifier in scikit-learn. It supports adding an optional hidden layer with customizable dimensions, dropout regularization, and L2 regularization. Use this function to create a neural network model with a specific structure and regularization settings for multi-class classification tasks. It is set as the `model` parameter of a KerasClassifier instance referenced in the configuration file for `compare_models`. Parameters ---------- hidden_layer_dim : int The number of neurons in the hidden layer. dropout_rate : float The dropout rate to be applied after the hidden layer. l2_reg : float The L2 regularization strength applied to the kernel weights of the dense layers. second_layer_dim : Optional[int], optional The number of neurons in an additional hidden layer. If not None, an additional hidden layer is added. Default is None. third_layer_dim : Optional[int], optional The number of neurons in a third hidden layer. If not None, a third hidden layer is added. Default is None. meta : Dict[str, Any], optional A dictionary containing metadata about the input features, shape, and number of classes. Default is None. Returns ------- keras.models.Sequential The constructed neural network model for multi-class classification. Examples -------- >>> pd.set_option('display.max_columns', None) # For test consistency >>> pd.set_option('display.width', None) # For test consistency >>> from sklearn.datasets import load_iris >>> from sklearn.model_selection import train_test_split >>> X, y = load_iris(return_X_y=True) >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, ... random_state=42) >>> meta = {"n_features_in_": 4, "X_shape_": (120, 4), "n_classes_": 3} Example 1: Create a basic neural network with default settings: >>> model = create_nn_multi(hidden_layer_dim=64, dropout_rate=0.2, l2_reg=0.01, ... meta=meta) >>> model_summary(model) #doctest: +NORMALIZE_WHITESPACE Item Name Type Activation Output Shape Parameters Bytes 0 Model Sequential Sequential None None NaN NaN 1 Input Input KerasTensor None (None, 4) 0.0 0.0 2 Layer Hidden_1 Dense relu (None, 64) 320.0 1280.0 3 Layer Dropout_1 Dropout None (None, 64) 0.0 0.0 4 Layer Output Dense softmax (None, 3) 195.0 780.0 5 Statistic Total Params None None None 515.0 2060.0 6 Statistic Trainable Params None None None 515.0 2060.0 7 Statistic Non-Trainable Params None None None 0.0 0.0 Example 2: Create a neural network with an additional hidden layer: >>> model = create_nn_multi(hidden_layer_dim=128, dropout_rate=0.3, l2_reg=0.05, ... second_layer_dim=64, meta=meta) >>> model_summary(model) #doctest: +NORMALIZE_WHITESPACE Item Name Type Activation Output Shape Parameters Bytes 0 Model Sequential Sequential None None NaN NaN 1 Input Input KerasTensor None (None, 4) 0.0 0.0 2 Layer Hidden_1 Dense relu (None, 128) 640.0 2560.0 3 Layer Dropout_1 Dropout None (None, 128) 0.0 0.0 4 Layer Hidden_2 Dense relu (None, 64) 8256.0 33024.0 5 Layer Dropout_2 Dropout None (None, 64) 0.0 0.0 6 Layer Output Dense softmax (None, 3) 195.0 780.0 7 Statistic Total Params None None None 9091.0 36364.0 8 Statistic Trainable Params None None None 9091.0 36364.0 9 Statistic Non-Trainable Params None None None 0.0 0.0 """ # Capture parameters from metadata n_features_in_ = meta["n_features_in_"] X_shape_ = meta["X_shape_"] n_classes_ = meta["n_classes_"] # Number of classes for multi-class classification # Adjust L2 regularization based on the parameter reg = L2(l2_reg) if l2_reg > 0 else None # Create a sequential model model = keras.models.Sequential(name='Sequential') # Create the input layer input_shape = (X_shape_[1],) # Tuple representing the shape of a single sample model.add(Input(shape=input_shape, name='Input')) # Add the first hidden layer model.add(Dense(hidden_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_1')) model.add(Dropout(dropout_rate, name='Dropout_1')) # Add a second hidden layer if specified if second_layer_dim is not None: model.add(Dense(second_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_2')) model.add(Dropout(dropout_rate, name='Dropout_2')) # Add a third hidden layer if specified if third_layer_dim is not None: model.add(Dense(third_layer_dim, activation='relu', kernel_regularizer=reg, name='Hidden_3')) model.add(Dropout(dropout_rate, name='Dropout_3')) # Output layer for multi-class classification model.add(Dense(n_classes_, activation='softmax', name='Output')) return model
[docs] def create_pipeline( imputer_key: Optional[str] = None, transformer_keys: Optional[Union[List[str], str]] = None, scaler_key: Optional[str] = None, selector_key: Optional[str] = None, model_key: Optional[str] = None, impute_first: bool = True, config: Optional[Dict[str, Any]] = None, cat_columns: Optional[List[str]] = None, num_columns: Optional[List[str]] = None, random_state: int = 42, class_weight: Optional[Dict[int, float]] = None, max_iter: int = 10000, debug: bool = False ) -> Pipeline: """ Create a custom pipeline for data preprocessing and modeling. This function allows you to define a custom pipeline by specifying the desired preprocessing steps (imputation, transformation, scaling, feature selection) and the model to use for predictions. Provide the keys for the steps you want to include in the pipeline. If a step is not specified, it will be skipped. The definition of the keys are defined in a configuration dictionary that is passed to the function. If no external configuration is provided, a default one will be used. * `imputer_key` (str) is selected from `config['imputers']` * `transformer_keys` (list or str) are selected from `config['transformers']` * `scaler_key` (str) is selected from `config['scalers']` * `selector_key` (str) is selected from `config['selectors']` * `model_key` (str) is selected from `config['models']` * `config['no_scale']` lists model keys that should not be scaled. * `config['no_poly']` lists models that should not be polynomial transformed. By default, the sequence of the Pipeline steps are: Imputer > Column Transformer > Scaler > Selector > Model. However, if `impute_first` is False, the data will be imputed after the column transformations. Scaling will not be done for any Model that is listed in `config['no_scale']` (ex: for decision trees, which don't require scaling). A column transformer will be created based on the specified `transformer_keys`. Any number of column transformations can be defined here. For example, you can define `transformer_keys = ['ohe', 'poly2', 'log']` to One-Hot Encode some columns, Polynomial transform some columns, and Log transform others. Just define each of these in your config file to reference the appropriate column lists. By default, these will transform the columns passed in as `cat_columns` or `num_columns`. But you may want to apply different transformations to your categorical features. For example, if you One-Hot Encode some, but Ordinal Encode others, you could define separate column lists for these as 'ohe_columns' and 'ord_columns', and then define `transformer_keys` in your config dictionary that reference them. Here is an example of the configuration dictionary structure: >>> config = { # doctest: +SKIP ... 'imputers': { ... 'knn_imputer': KNNImputer().set_output(transform='pandas'), ... 'simple_imputer': SimpleImputer() ... }, ... 'transformers': { ... 'ohe': (OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ... cat_columns), ... 'ord': (OrdinalEncoder(), cat_columns), ... 'poly2': (PolynomialFeatures(degree=2, include_bias=False), ... num_columns), ... 'log': (FunctionTransformer(np.log1p, validate=True), ... num_columns) ... }, ... 'scalers': { ... 'stand': StandardScaler(), ... 'minmax': MinMaxScaler() ... }, ... 'selectors': { ... 'rfe_logreg': RFE(LogisticRegression(max_iter=max_iter, ... random_state=random_state, ... class_weight=class_weight)), ... 'sfs_linreg': SequentialFeatureSelector(LinearRegression()) ... }, ... 'models': { ... 'linreg': LinearRegression(), ... 'logreg': LogisticRegression(max_iter=max_iter, ... random_state=random_state, ... class_weight=class_weight), ... 'tree_class': DecisionTreeClassifier(random_state=random_state), ... 'tree_reg': DecisionTreeRegressor(random_state=random_state) ... }, ... 'no_scale': ['tree_class', 'tree_reg'], ... 'no_poly': ['tree_class', 'tree_reg'], ... } Use this function to quickly create a pipeline during model iteration and evaluation. You can easily experiment with different combinations of preprocessing steps and models to find the best performing pipeline. This function is utilized by `iterate_model`, `compare_models`, and `compare_reg_models` to dynamically build pipelines as part of that larger modeling workflow. Parameters ---------- imputer_key : str, optional The key corresponding to the imputer to use for handling missing values. If not provided, no imputation will be performed. transformer_keys : list of str, str, or None, optional The keys corresponding to the transformers to apply to the data. This can be a list of string keys or a single string key. If not provided, no transformers will be applied. scaler_key : str or None, optional The key corresponding to the scaler to use for scaling the data. If not provided, no scaling will be performed. selector_key : str or None, optional The key corresponding to the feature selector to use for selecting relevant features. If not provided, no feature selection will be performed. model_key : str, optional The key corresponding to the model to use for predictions. impute_first : bool, default=True Whether to perform imputation before applying the transformers. If False, imputation will be performed after the transformers. config : dict or None, optional A dictionary containing the configuration for the pipeline components. If not provided, a default configuration will be used. cat_columns : list-like, optional List of categorical columns from the input dataframe. This is used in the default configuration for the relevant transformers. num_columns : list-like, optional List of numeric columns from the input dataframe. This is used in the default configuration for the relevant transformers. random_state : int, default=42 The random state to use for reproducibility. class_weight : dict or None, optional A dictionary mapping class labels to weights for imbalanced classification problems. If not provided, equal weights will be used. max_iter : int, default=10000 The maximum number of iterations for iterative models. debug : bool, optional Flag to show debugging information. Returns ------- pipeline : sklearn.pipeline.Pipeline The constructed pipeline based on the specified components and configuration. Examples -------- Prepare sample data for the examples: >>> from sklearn.datasets import fetch_california_housing >>> X, y = fetch_california_housing(return_X_y=True) >>> cat_columns = ['ocean_proximity'] >>> num_columns = ['longitude', 'latitude', 'housing_median_age', ... 'total_rooms', 'total_bedrooms', 'population', ... 'households', 'median_income'] Example 1: Create a pipeline with Standard Scaler and Linear Regression: >>> pipeline = create_pipeline(scaler_key='stand', model_key='linreg', ... cat_columns=cat_columns, ... num_columns=num_columns) >>> pipeline.steps [('stand', StandardScaler()), ('linreg', LinearRegression())] Example 2: Create a pipeline with One-Hot Encoding, Standard Scaler, and a Logistic Regression model: >>> pipeline = create_pipeline(transformer_keys=['ohe'], ... scaler_key='stand', ... model_key='logreg', ... cat_columns=cat_columns, ... num_columns=num_columns) >>> pipeline.steps [('ohe', ColumnTransformer(force_int_remainder_cols=False, remainder='passthrough', transformers=[('ohe', OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ['ocean_proximity'])])), ('stand', StandardScaler()), ('logreg', LogisticRegression(max_iter=10000, random_state=42))] Example 3: Create a pipeline with KNN Imputer, One-Hot Encoding, Polynomial Transformation, Log Transformation, Standard Scaler, and Gradient Boost Regressor for the model: >>> pipeline = create_pipeline(imputer_key='knn_imputer', ... transformer_keys=['ohe', 'poly2', 'log'], ... scaler_key='stand', ... model_key='boost_reg', ... cat_columns=cat_columns, ... num_columns=num_columns) >>> pipeline.steps [('knn_imputer', KNNImputer()), ('ohe_poly2_log', ColumnTransformer(force_int_remainder_cols=False, remainder='passthrough', transformers=[('ohe', OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ['ocean_proximity']), ('poly2', PolynomialFeatures(include_bias=False), ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']), ('log', FunctionTransformer(func=<ufunc 'log1p'>, validate=True), ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income'])])), ('stand', StandardScaler()), ('boost_reg', GradientBoostingRegressor(random_state=42))] """ # Check for configuration file parameter, if none, use default in library if config is None: # If no column lists are provided, raise an error if not cat_columns and not num_columns: raise ValueError("If no config is provided, cat_columns and num_columns must be passed.") config = { 'imputers': { 'knn_imputer': KNNImputer().set_output(transform='pandas'), 'knn20_imputer': KNNImputer().set_output(transform='pandas'), 'simple_imputer': SimpleImputer(), 'zero_imputer': SimpleImputer(), 'mean_imputer': SimpleImputer() }, 'transformers': { 'ohe': (OneHotEncoder(drop='if_binary', handle_unknown='ignore'), cat_columns), 'ord': (OrdinalEncoder(), cat_columns), 'poly2': (PolynomialFeatures(degree=2, include_bias=False), num_columns), 'poly2_bias': (PolynomialFeatures(degree=2, include_bias=True), num_columns), 'poly3': (PolynomialFeatures(degree=3, include_bias=False), num_columns), 'poly3_bias': (PolynomialFeatures(degree=3, include_bias=True), num_columns), 'log': (FunctionTransformer(np.log1p, validate=True), num_columns) }, 'scalers': { 'stand': StandardScaler(), 'robust': RobustScaler(), 'minmax': MinMaxScaler() }, 'selectors': { 'rfe_logreg': RFE(LogisticRegression(max_iter=max_iter, random_state=random_state, class_weight=class_weight)), 'sfs_logreg': SequentialFeatureSelector( LogisticRegression(max_iter=max_iter, random_state=random_state, class_weight=class_weight)), 'sfs_linreg': SequentialFeatureSelector(LinearRegression()), 'sfs_7': SequentialFeatureSelector(LinearRegression(), n_features_to_select=7), 'sfs_6': SequentialFeatureSelector(LinearRegression(), n_features_to_select=6), 'sfs_5': SequentialFeatureSelector(LinearRegression(), n_features_to_select=5), 'sfs_4': SequentialFeatureSelector(LinearRegression(), n_features_to_select=4), 'sfs_3': SequentialFeatureSelector(LinearRegression(), n_features_to_select=3), 'sfs_bw': SequentialFeatureSelector(LinearRegression(), direction='backward') }, 'models': { 'linreg': LinearRegression(), 'knn_reg': KNeighborsRegressor(), 'ttr_log': TransformedTargetRegressor( regressor=LinearRegression(), func=np.log, inverse_func=np.exp), 'svr': SVR(), 'logreg': LogisticRegression(max_iter=max_iter, random_state=random_state, class_weight=class_weight), 'ridge': Ridge(random_state=random_state), 'lasso': Lasso(random_state=random_state), 'tree_class': DecisionTreeClassifier(random_state=random_state), 'tree_reg': DecisionTreeRegressor(random_state=random_state), 'knn': KNeighborsClassifier(), 'svm': SVC(random_state=random_state, class_weight=class_weight), 'svm_proba': SVC(random_state=random_state, probability=True, class_weight=class_weight), 'forest_reg': RandomForestRegressor(random_state=random_state), 'forest_class': RandomForestClassifier(random_state=random_state, class_weight=class_weight), 'vot_reg': VotingRegressor([('linreg', LinearRegression()), ('knn_reg', KNeighborsRegressor()), ('tree_reg', DecisionTreeRegressor( random_state=random_state)), ('ridge', Ridge( random_state=random_state)), ('svr', SVR())]), 'bag_reg': BaggingRegressor(random_state=random_state), 'bag_class': BaggingClassifier(random_state=random_state), 'boost_reg': GradientBoostingRegressor( random_state=random_state), 'boost_class': GradientBoostingClassifier( random_state=random_state), 'ada_class': AdaBoostClassifier(random_state=random_state), 'ada_reg': AdaBoostRegressor(random_state=random_state) }, 'no_scale': ['tree_class', 'tree_reg', 'forest_reg', 'forest_class'], 'no_poly': ['knn', 'tree_reg', 'tree_class', 'forest_reg', 'forest_class'] } # Initialize an empty list for the transformation steps steps = [] # Function to add imputer to the pipeline steps def add_imputer_step(): if imputer_key is not None: imputer_obj = config['imputers'][imputer_key] steps.append((imputer_key, imputer_obj)) # Add imputer step before column transformers if impute_first is True if impute_first: add_imputer_step() # If transformers are provided, add them to the steps if transformer_keys is not None: transformer_steps = [] for key in (transformer_keys if isinstance(transformer_keys, list) else [transformer_keys]): transformer, cols = config['transformers'][key] if key in ['poly2', 'poly2_bias', 'poly3', 'poly3_bias'] and model_key in config['no_poly']: continue # Skip polynomial transformers if the model is in 'no_poly' transformer_steps.append((key, transformer, cols)) # Create column transformer col_trans = ColumnTransformer(transformer_steps, remainder='passthrough', force_int_remainder_cols=False) transformer_name = '_'.join(transformer_keys) \ if isinstance(transformer_keys, list) else transformer_keys steps.append((transformer_name, col_trans)) if debug: print('col_trans:', col_trans) print('transformer_name:', transformer_name) print('steps:', steps) # Add imputer step after column transformers if impute_first is False if not impute_first: add_imputer_step() # If a scaler is provided, add it to the steps, unless model listed in # no_scale config if scaler_key is not None and model_key not in config['no_scale']: scaler_obj = config['scalers'][scaler_key] steps.append((scaler_key, scaler_obj)) # If a selector is provided, add it to the steps if selector_key is not None: selector_obj = config['selectors'][selector_key] steps.append((selector_key, selector_obj)) # If a model is provided, add it to the steps if model_key is not None: model_obj = config['models'][model_key] steps.append((model_key, model_obj)) if debug: print('steps:', steps) # Create and return pipeline return Pipeline(steps)
[docs] def create_results_df() -> pd.DataFrame: """ Initialize the results_df DataFrame with the columns required for `iterate_model`. This function creates a new DataFrame with the following columns: 'Iteration', 'Train MSE', 'Test MSE', 'Train RMSE', 'Test RMSE', 'Train MAE', 'Test MAE', 'Train R^2 Score', 'Test R^2 Score', 'Pipeline', 'Best Grid Params', 'Note', 'Date'. Create a `results_df` with this function, and then pass it as a parameter to `iterate_model`. The results of each model iteration will be appended to `results_df`. Returns ------- pd.DataFrame The initialized results_df DataFrame. Examples -------- Create a DataFrame with the columns required for `iterate_model`: >>> results_df = create_results_df() >>> results_df.columns Index(['Iteration', 'Train MSE', 'Test MSE', 'Train RMSE', 'Test RMSE', 'Train MAE', 'Test MAE', 'Train R^2 Score', 'Test R^2 Score', 'Pipeline', 'Best Grid Params', 'Note', 'Date'], dtype='object') """ columns = [ 'Iteration', 'Train MSE', 'Test MSE', 'Train RMSE', 'Test RMSE', 'Train MAE', 'Test MAE', 'Train R^2 Score', 'Test R^2 Score', 'Pipeline', 'Best Grid Params', 'Note', 'Date' ] return pd.DataFrame(columns=columns)
[docs] def eval_model( *, y_test: np.ndarray, y_pred: np.ndarray, class_map: Dict[Any, Any] = None, estimator: Optional[Any] = None, x_test: Optional[np.ndarray] = None, class_type: Optional[str] = None, pos_label: Optional[Any] = 1, threshold: float = 0.5, multi_class: str = 'ovr', average: str = 'macro', title: Optional[str] = None, model_name: str = 'Model', class_weight: Optional[str] = None, decimal: int = 2, bins: int = 10, bin_strategy: str = None, plot: bool = False, figsize: Tuple[int, int] = (12, 11), figmulti: float = 1.7, conf_fontsize: int = 14, return_metrics: bool = False, output: bool = True, debug: bool = False ) -> Optional[Dict[str, Union[int, float]]]: """ Evaluate a classification model's performance and plot results. This function provides a comprehensive evaluation of a binary or multi-class classification model based on `y_test` (the actual target values) and `y_pred` (the predicted target values). It displays a text-based classification report enhanced with True/False Positives/Negatives (if binary), and 4 charts if `plot` is True: Confusion Matrix, Histogram of Predicted Probabilities, ROC Curve, and Precision-Recall Curve. If `class_type` is 'binary', it will treat this as a binary classification. If `class_type` is 'multi', it will treat this as a multi-class problem. If `class_type` is not specified, it will be detected based on the number of unique values in `y_test`. To plot the curves or adjust the `threshold` (default 0.5), both `x_test` and `estimator` must be provided so that proababilities can be calculated. For binary classification, `pos_label` is required. This defaults to 1 as an integer, but can be set to any value that matches one of the values in `y_test` and `y_pred`. The `class_map` can be used to provide display names for the classes. If not provided, the actual class values will be used. A number of classification metrics are shown in the report: Accuracy, Precision, Recall, F1, and ROC AUC. In addition, for binary classification, True Positive Rate, False Positive Rate, True Negative Rate, and False Negative Rate are shown. The metrics are calculated at the default threshold of 0.5, but can be adjusted with the `threshold` parameter. You can customize the `title` of the report completely, or pass the `model_name` and it will be displayed in a dynamically generated title. You can also specify the number of `decimal` places to show, and size of the figure (`fig_size`). For multi-class, you can set a `figmulti` scaling factor for the plot. You can set the `class_weight` as a display only string that is not used in any functions within `eval_model`. This is useful if you trained the model with a 'balanced' class_weight, and now want to pass that to this report to see the effects. A dictionary of metrics can be returned if `return_metrics` is True, and the output can be disabled by setting `output` to False. These are used by parent functions (ex: `compare_models`) to gather the data into a DataFrame of the results. Use this function to assess the performance of a trained classification model. You can experiment with different thresholds to see how they affect metrics like Precision, Recall, False Positive Rate and False Negative Rate. The plots make it easy to see if you're getting good separation and maximum area under the curve. Parameters ---------- y_test : np.ndarray The true labels of the test set. y_pred : np.ndarray The predicted labels of the test set. class_map : Dict[Any, Any], optional A dictionary mapping class labels to their string representations. Default is None. estimator : Any, optional The trained estimator object used for prediction. Required for generating probabilities. Default is None. x_test : np.ndarray, optional The test set features. Required for generating probabilities. Default is None. class_type : str, optional The type of classification problem. Can be 'binary' or 'multi'. If not provided, it will be inferred from the number of unique labels. Default is None. pos_label : Any, optional The positive class label for binary classification. Default is 1. threshold : float, optional The threshold for converting predicted probabilities to class labels. Default is 0.5. multi_class : str, optional The method for handling multi-class ROC AUC calculation. Can be 'ovr' (one-vs-rest) or 'ovo' (one-vs-one). Default is 'ovr'. average : str, optional The averaging method for multi-class classification metrics. Can be 'macro', 'micro', 'weighted', or 'samples'. Default is 'macro'. title : str, optional The title for the plots. Default is None. model_name : str, optional The name of the model for labeling the plots. Default is 'Model'. class_weight : str, optional The class weight settings used for training the model. Default is None. decimal : int, optional The number of decimal places to display in the output and plots. Default is 4. bins : int, optional The number of bins for the predicted probabilities histogram when `bin_strategy` is None. Default is 10. bin_strategy : str, optional The strategy for determining the number of bins for the predicted probabilities histogram. Can be 'sqrt', 'sturges', 'rice', 'freed', 'scott', or 'doane'. Default is None. plot : bool, optional Whether to display the evaluation plots. Default is False. figsize : Tuple[int, int], optional The figure size for the plots in inches. Default is (12, 11). figmulti : float, optional The multiplier for the figure size in multi-class classification. Default is 1.7. conf_fontsize : int, optional The font size for the numbers in the confusion matrix. Default is 14. return_metrics : bool, optional Whether to return the evaluation metrics as a dictionary. Default is False. output : bool, optional Whether to print the evaluation results. Default is True. debug : bool, optional Whether to print debug information. Default is False. Returns ------- metrics : Dict[str, Union[int, float]], optional A dictionary containing the evaluation metrics. Returned only if `return_metrics` is True and the classification type is binary. Examples -------- Prepare data and model for the examples: >>> from sklearn.datasets import make_classification >>> from sklearn.model_selection import train_test_split >>> from sklearn.svm import SVC >>> X, y = make_classification(n_samples=1000, n_classes=2, weights=[0.4, 0.6], ... random_state=42) >>> class_map = {0: 'Malignant', 1: 'Benign'} >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, ... random_state=42) >>> model = SVC(kernel='linear', probability=True, random_state=42) >>> model.fit(X_train, y_train) SVC(kernel='linear', probability=True, random_state=42) >>> y_pred = model.predict(X_test) Example 1: Basic evaluation with default settings: >>> eval_model(y_test=y_test, y_pred=y_pred) #doctest: +NORMALIZE_WHITESPACE <BLANKLINE> Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> 0 0.76 0.74 0.75 72 1 0.85 0.87 0.86 128 <BLANKLINE> accuracy 0.82 200 macro avg 0.81 0.80 0.80 200 weighted avg 0.82 0.82 0.82 200 <BLANKLINE> Predicted:0 1 Actual: 0 53 19 Actual: 1 17 111 <BLANKLINE> True Positive Rate / Sensitivity: 0.87 True Negative Rate / Specificity: 0.74 False Positive Rate / Fall-out: 0.26 False Negative Rate / Miss Rate: 0.13 <BLANKLINE> Positive Class: 1 (1) Threshold: 0.5 Example 2: Evaluation with custom settings: >>> eval_model(y_test=y_test, y_pred=y_pred, estimator=model, x_test=X_test, ... class_type='binary', class_map=class_map, pos_label=0, ... threshold=0.35, model_name='SVM', class_weight='balanced', ... decimal=4, plot=True, figsize=(13, 13), conf_fontsize=18, ... bins=20) #doctest: +NORMALIZE_WHITESPACE <BLANKLINE> SVM Binary Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Benign 0.9545 0.8203 0.8824 128 Malignant 0.7444 0.9306 0.8272 72 <BLANKLINE> accuracy 0.8600 200 macro avg 0.8495 0.8754 0.8548 200 weighted avg 0.8789 0.8600 0.8625 200 <BLANKLINE> ROC AUC: 0.9220 <BLANKLINE> Predicted:1 0 Actual: 1 105 23 Actual: 0 5 67 <BLANKLINE> True Positive Rate / Sensitivity: 0.9306 True Negative Rate / Specificity: 0.8203 False Positive Rate / Fall-out: 0.1797 False Negative Rate / Miss Rate: 0.0694 <BLANKLINE> Positive Class: Malignant (0) Class Weight: balanced Threshold: 0.35 Example 3: Evaluate model with no output and return a dictionary: >>> metrics = eval_model(y_test=y_test, y_pred=y_pred, estimator=model, ... x_test=X_test, class_map=class_map, pos_label=0, ... return_metrics=True, output=False) >>> print(metrics) {'True Positives': 53, 'False Positives': 17, 'True Negatives': 111, 'False Negatives': 19, 'TPR': 0.7361111111111112, 'TNR': 0.8671875, 'FPR': 0.1328125, 'FNR': 0.2638888888888889, 'Benign': {'precision': 0.8538461538461538, 'recall': 0.8671875, 'f1-score': 0.8604651162790697, 'support': 128.0}, 'Malignant': {'precision': 0.7571428571428571, 'recall': 0.7361111111111112, 'f1-score': 0.7464788732394366, 'support': 72.0}, 'accuracy': 0.82, 'macro avg': {'precision': 0.8054945054945055, 'recall': 0.8016493055555556, 'f1-score': 0.8034719947592532, 'support': 200.0}, 'weighted avg': {'precision': 0.819032967032967, 'recall': 0.82, 'f1-score': 0.819430068784802, 'support': 200.0}, 'ROC AUC': 0.9219835069444444, 'Threshold': 0.5, 'Class Type': 'binary', 'Class Map': {0: 'Malignant', 1: 'Benign'}, 'Positive Label': 0, 'Title': None, 'Model Name': 'Model', 'Class Weight': None, 'Multi-Class': 'ovr', 'Average': 'macro'} Prepare multi-class example data: >>> from sklearn.datasets import load_iris >>> X, y = load_iris(return_X_y=True) >>> X = pd.DataFrame(X, columns=['sepal_length', 'sepal_width', 'petal_length', ... 'petal_width']) >>> y = pd.Series(y) >>> class_map = {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'} >>> X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, ... random_state=42) >>> model = SVC(kernel='linear', probability=True, random_state=42) >>> model.fit(X_train, y_train) SVC(kernel='linear', probability=True, random_state=42) >>> y_pred = model.predict(X_test) Example 4: Evaluate multi-class model with default settings: >>> metrics = eval_model(y_test=y_test, y_pred=y_pred, class_map=class_map, ... return_metrics=True) #doctest: +NORMALIZE_WHITESPACE <BLANKLINE> Multi-Class Classification Report <BLANKLINE> precision recall f1-score support <BLANKLINE> Setosa 1.00 1.00 1.00 10 Versicolor 1.00 1.00 1.00 9 Virginica 1.00 1.00 1.00 11 <BLANKLINE> accuracy 1.00 30 macro avg 1.00 1.00 1.00 30 weighted avg 1.00 1.00 1.00 30 <BLANKLINE> Predicted Setosa Versicolor Virginica Actual Setosa 10 0 0 Versicolor 0 9 0 Virginica 0 0 11 <BLANKLINE> >>> print(metrics) {'Setosa': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 10.0}, 'Versicolor': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 9.0}, 'Virginica': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 11.0}, 'accuracy': 1.0, 'macro avg': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 30.0}, 'weighted avg': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 30.0}, 'ROC AUC': None, 'Threshold': 0.5, 'Class Type': 'multi', 'Class Map': {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'}, 'Positive Label': None, 'Title': None, 'Model Name': 'Model', 'Class Weight': None, 'Multi-Class': 'ovr', 'Average': 'macro'} """ # Initialize debugging, controlled via 'debug' parameter db = DebugPrinter(debug = debug) db.print('-' * 40) db.print('START eval_model') db.print('-' * 40, '\n') db.print('y_test shape:', y_test.shape) db.print('y_pred shape:', y_pred.shape) db.print('class_map:', class_map) db.print('pos_label:', pos_label) db.print('class_type:', class_type) db.print('estimator:', estimator) if x_test is not None: db.print('x_test shape:', x_test.shape) else: db.print('x_test:', x_test) db.print('threshold:', threshold) # Convert y_test DataFrame to a Series if it's not already if isinstance(y_test, pd.DataFrame): db.print('\nConverting y_test DataFrame to Series...') db.print('y_test shape before:', y_test.shape) y_test = y_test.squeeze() db.print('y_test shape after:', y_test.shape) # Convert y_test DataFrame to a Series if it's not already if isinstance(y_pred, pd.DataFrame): db.print('\nConverting y_pred DataFrame to Series...') db.print('y_pred shape before:', y_pred.shape) y_pred = y_pred.squeeze() db.print('y_pred shape after:', y_pred.shape) # Get the unique labels and display labels for the confusion matrix if class_map is not None: # Make sure class_map is a dictionary if isinstance(class_map, dict): db.print('\nGetting labels from class_map...') unique_labels = list(class_map.keys()) display_labels = list(class_map.values()) else: raise TypeError("class_map must be a dictionary") # Make sure every unique_label has a corresponding entry in y_test missing_labels = set(np.unique(y_test)) - set(unique_labels) if missing_labels: db.print('y_test[:5]:', list(y_test[:5])) db.print('set(unique_labels):', set(unique_labels)) db.print('set(np.unique(y_test)):', set(np.unique(y_test))) db.print('missing_labels:', missing_labels) raise ValueError(f"The following labels in y_test are missing from class_map: {missing_labels}") else: db.print('\nGetting labels from unique values in y_test...') unique_labels = np.unique(y_test) display_labels = [str(label) for label in unique_labels] db.print('Creating class_map...') class_map = {label: str(label) for label in unique_labels} db.print('class_map:', class_map) db.print('unique_labels:', unique_labels) db.print('display_labels:', display_labels) # Count the number of classes num_classes = len(unique_labels) db.print('num_classes:', num_classes) # If class_type is not passed, auto-detect based on unique values of y_test if class_type is None: if num_classes > 2: class_type = 'multi' elif num_classes == 2: class_type = 'binary' else: raise ValueError(f"Check data, cannot classify. Number of classes in y_test ({num_classes}) is less than 2: {unique_labels}") db.print(f"\nClassification type detected: {class_type}") db.print("Unique values in y:", num_classes) elif class_type not in ['binary', 'multi']: # If class type is invalid, raise an error raise ValueError(f"Class type '{class_type}' is invalid, must be 'binary' or 'multi'. Number of classes in y_test: {num_classes}, unique labels: {unique_labels}") # Check to ensure num_classes matches the passed class_type if class_type == 'binary' and num_classes != 2: raise ValueError(f"Class type is {class_type}, but number of classes in y_test ({num_classes}) is not 2: {unique_labels}") elif class_type == 'multi' and num_classes < 3: raise ValueError(f"Class type is {class_type}, but number of classes in y_test ({num_classes}) is less than 3: {unique_labels}") elif num_classes < 2: raise ValueError(f"Check data, cannot classify. Class type is {class_type}, and number of classes in y_test ({num_classes}) is less than 2: {unique_labels}") # Evaluation for multi-class classification if class_type == 'multi': # Set pos_label to None for multi-class pos_label = None # Calculate confusion matrix cm = confusion_matrix(y_test, y_pred) # Run the classification report db.print('\nRun the Classification Report...') class_report = classification_report(y_test, y_pred, digits=decimal, target_names=display_labels, zero_division=0, output_dict=True) db.print('class_report:', class_report) # Calculate ROC AUC if we have x_test and estimator if x_test is not None and estimator is not None: db.print('\nCalculating ROC AUC...') roc_auc = roc_auc_score(y_test, estimator.predict_proba(x_test), multi_class=multi_class, average=average) else: roc_auc = None db.print('roc_auc:', roc_auc) if output: # Display the best title we can create if title is not None: print(f"\n{title}\n") elif model_name != 'Model': print(f"\n{model_name} Multi-Class Classification Report\n") else: print(f"\nMulti-Class Classification Report\n") # Display the classification report print(classification_report(y_test, y_pred, digits=decimal, target_names=display_labels, zero_division=0)) # Display the ROC AUC if roc_auc is not None: if isinstance(roc_auc, float): print(f'ROC AUC: {round(roc_auc, decimal)}\n') elif isinstance(roc_auc, np.ndarray): # It's an array, handle different cases if roc_auc.size == 1: print(f'ROC AUC: {round(roc_auc[0], decimal)}\n') else: # If it's an array with multiple elements, print the mean value, rounded mean_roc_auc = np.mean(roc_auc) print(f'ROC AUC (mean): {round(mean_roc_auc, decimal)}\n') else: # Print it raw print(f'ROC AUC: {roc_auc}\n') # Display the class weight for reference only if class_weight is not None: print(f'Class Weight: {class_weight}\n') # Create a DataFrame from the confusion matrix df_cm = pd.DataFrame(cm, index=display_labels, columns=display_labels) df_cm.index.name = 'Actual' df_cm.columns.name = 'Predicted' print(f'{df_cm}\n') # Pre-processing for binary classification if class_type == 'binary': # Check if pos_label is in unique_labels if pos_label not in unique_labels: db.print('pos_label:', pos_label) db.print('type(pos_label):', type(pos_label).__name__) db.print('unique_labels:', unique_labels) db.print('unique_labels[0]:', unique_labels[0]) db.print('unique_labels[1]:', unique_labels[1]) db.print('type(unique_labels[0]):', type(unique_labels[0]).__name__) db.print('type(unique_labels[1]):', type(unique_labels[1]).__name__) raise ValueError(f"Positive label: {pos_label} ({type(pos_label).__name__}) is not in y_test unique values: {unique_labels}. Please specify the correct 'pos_label'.") # Encode labels if binary classification problem db.print('\nEncoding labels for binary classification...') # Assign neg_label based on pos_label neg_label = np.setdiff1d(unique_labels, [pos_label])[0] db.print('pos_label:', pos_label) db.print('neg_label:', neg_label) # Create a label_map for encoding label_map = {neg_label: 0, pos_label: 1} db.print('label_map:', label_map) # Encode new labels as 0 and 1 db.print('\nEncoding y_test and y_pred...') y_test_enc = np.array([label_map[label] for label in y_test]) y_pred_enc = np.array([label_map[label] for label in y_pred]) db.print('y_test[:5]:', list(y_test[:5])) db.print('y_test_enc[:5]:', y_test_enc[:5]) db.print('y_pred[:5]:', y_pred[:5]) db.print('y_pred_enc[:5]:', y_pred_enc[:5]) db.print('Overwriting y_test and y_pred...') y_test = y_test_enc y_pred = y_pred_enc db.print('y_test[:5]:', list(y_test[:5])) db.print('y_pred[:5]:', y_pred[:5]) # Create a map for the new labels db.print('\nGetting the display labels...') pos_display = class_map[pos_label] neg_display = class_map[neg_label] db.print('pos_display:', pos_display) db.print('neg_display:', neg_display) if class_map is not None: display_map = {0: neg_display, 1: pos_display} else: display_map = {0: str(neg_label), 1: str(pos_label)} db.print('display_map:', display_map) # Update the unique labels and display labels for the confusion matrix db.print('\nUpdating labels from display_map...') unique_labels = list(display_map.keys()) display_labels = list(display_map.values()) db.print('New unique_labels:', unique_labels) db.print('New display_labels:', display_labels) # Calculate the probabilities if class_type == 'binary' and x_test is not None and estimator is not None: db.print('\nCalculating probabilities...') pos_class_index = np.where(estimator.classes_ == pos_label)[0][0] db.print('estimator.classes_:', estimator.classes_) db.print('pos_label:', pos_label) db.print('pos_class_index:', pos_class_index) probabilities = estimator.predict_proba(x_test)[:, pos_class_index] all_probabilities = estimator.predict_proba(x_test) db.print('probabilities[:5]:', probabilities[:5]) db.print('all_probabilities[:5]:', all_probabilities[:5]) db.print('all_probabilities shape:', np.shape(all_probabilities)) # Apply the threshold to the probabilities if plot or threshold != 0.5: db.print(f'\nApplying threshold {threshold} to probabilities...') y_pred_thresh = (probabilities >= threshold).astype(int) db.print('y_pred[:5]:', y_pred[:5]) db.print('y_pred_thresh[:5]:', y_pred_thresh[:5]) db.print('Overwriting y_pred with y_pred_thres...') y_pred = y_pred_thresh db.print('y_pred[:5]:', y_pred[:5]) else: db.print(f'\nUsing default threshold of {threshold}...') db.print('plot:', plot) else: probabilities = None db.print(f'\nSkipping probabilities. class_type: {class_type}, x_test shape: {np.shape(x_test)}, estimator: {estimator.__class__.__name__}') # Evaluation for binary classification if class_type == 'binary': if output: # Display the best title we can create if title is not None: print(f"\n{title}\n") elif model_name != 'Model': print(f"\n{model_name} Binary Classification Report\n") else: print(f"\nBinary Classification Report\n") # Run the classification report db.print('\nRun the Classification Report...') class_report = classification_report(y_test, y_pred, labels=unique_labels, target_names=display_labels, digits=decimal, zero_division=0, output_dict=True) db.print('class_report:', class_report) if output: print(classification_report(y_test, y_pred, labels=unique_labels, target_names=display_labels, digits=decimal, zero_division=0)) # Calculate the confusion matrix db.print('\nCalculating confusion matrix and metrics...') cm = confusion_matrix(y_test, y_pred, labels=unique_labels) # Calculate the binary metrics tn, fp, fn, tp = cm.ravel() tpr = tp / (tp + fn) fpr = fp / (fp + tn) tnr = tn / (tn + fp) fnr = fn / (fn + tp) db.print('cm:\n', cm) db.print('\ncm.ravel:', cm.ravel()) db.print(f'TN: {tn}') db.print(f'FP: {fp}') db.print(f'FN: {fn}') db.print(f'TP: {tp}') binary_metrics = { "True Positives": tp, "False Positives": fp, "True Negatives": tn, "False Negatives": fn, "TPR": tpr, "TNR": tnr, "FPR": fpr, "FNR": fnr, } # Calculate the ROC AUC score if binary classification with probabilities if class_type == 'binary' and probabilities is not None: # Calculate ROC AUC score db.print('\nCalculating ROC AUC score...') roc_auc = roc_auc_score(y_test, probabilities, labels=unique_labels) db.print('y_test[:5]:', y_test[:5]) db.print('probabilities[:5]:', probabilities[:5]) db.print('unique_labels:', unique_labels) if output: print(f'ROC AUC: {roc_auc:.{decimal}f}\n') # Calculate false positive rate, true positive rate, and thresholds for ROC curve db.print('\nCalculating ROC curve...') fpr_array, tpr_array, thresholds = roc_curve(y_test, probabilities, pos_label=1) if len(thresholds) == 0 or len(fpr_array) == 0 or len(tpr_array) == 0: raise ValueError(f"Error in ROC curve calculation, at least one empty array. fpr_array length: {len(fpr_array)}, tpr_array length: {len(tpr_array)}, thresholds length: {len(thresholds)}.") db.print('y_test[:5]:', y_test[:5]) db.print('probabilities[:5]:', probabilities[:5]) db.print('Arrays from roc_curve:') db.print('fpr_array[:5]:', fpr_array[:5]) db.print('tpr_array[:5]:', tpr_array[:5]) db.print('thresholds[:5]:', thresholds[:5]) # Print the binary classification output if class_type == 'binary' and output: # Print confusion matrix print(f"{'':<15}{'Predicted:':<10}{neg_label:<10}{pos_label:<10}") print(f"{'Actual: ' + str(neg_label):<25}{cm[0][0]:<10}{cm[0][1]:<10}") print(f"{'Actual: ' + str(pos_label):<25}{cm[1][0]:<10}{cm[1][1]:<10}") # Print evaluation metrics print("\nTrue Positive Rate / Sensitivity:", round(tpr, decimal)) print("True Negative Rate / Specificity:", round(tnr, decimal)) print("False Positive Rate / Fall-out:", round(fpr, decimal)) print("False Negative Rate / Miss Rate:", round(fnr, decimal)) print(f"\nPositive Class: {pos_display} ({pos_label})") if class_weight is not None: print("Class Weight:", class_weight) print("Threshold:", threshold) # Plot the evaluation metrics if plot and output: # Define a blue color for plots blue = (0.12156862745098039, 0.4666666666666667, 0.7058823529411765) # Just plot a confusion matrix for multi-class if class_type == 'multi': # Calculate the figure size for multi-class plots multiplier = figmulti max_size = 20 size = min(len(unique_labels) * multiplier, max_size) figsize = (size, size) # Create a figure and axis for multi-class confusion matrix fig, ax1 = plt.subplots(1, 1, figsize=figsize) # Plot the confusion matrix cm_display = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels) cm_display.plot(cmap='Blues', ax=ax1, colorbar=False) for text in cm_display.text_: for t in text: t.set_fontsize(conf_fontsize - 2) # Reduce font size for multi-class ax1.set_title(f'Confusion Matrix', fontsize=18, pad=15) ax1.set_xlabel('Predicted Label', fontsize=14, labelpad=15) ax1.set_ylabel('True Label', fontsize=14, labelpad=10) ax1.tick_params(axis='both', which='major', labelsize=10) plt.tight_layout() plt.show() # Just plot a confusion matrix for binary classification without probabilities elif class_type == 'binary' and probabilities is None: # Calculate the figure size for a single-chart plot multiplier = figmulti max_size = 20 size = min(len(unique_labels) * multiplier, max_size) + 1.5 # Extra size for just 2 classes figsize = (size, size) # Create a figure and axis for a confusion matrix fig, ax1 = plt.subplots(1, 1, figsize=figsize) # Plot the confusion matrix cm_display = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels) cm_display.plot(cmap='Blues', ax=ax1, colorbar=False) for text in cm_display.text_: for t in text: t.set_fontsize(conf_fontsize) ax1.set_title(f'Confusion Matrix', fontsize=18, pad=15) ax1.set_xlabel('Predicted Label', fontsize=14, labelpad=15) ax1.set_ylabel('True Label', fontsize=14, labelpad=10) ax1.tick_params(axis='both', which='major', labelsize=10) plt.tight_layout() plt.show() # Plot 4 charts for binary classification elif class_type == 'binary' and probabilities is not None: # Calculate the number of bins if bin_strategy is not None: # Calculate the number of bins based on the specified strategy data_len = len(probabilities) if bin_strategy == 'sqrt': num_bins = int(np.sqrt(data_len)) elif bin_strategy == 'sturges': num_bins = int(np.ceil(np.log2(data_len)) + 1) elif bin_strategy == 'rice': num_bins = int(2 * data_len ** (1/3)) elif bin_strategy == 'freed': iqr = np.subtract(*np.percentile(probabilities, [75, 25])) bin_width = 2 * iqr * data_len ** (-1/3) num_bins = int(np.ceil((probabilities.max() - probabilities.min()) / bin_width)) elif bin_strategy == 'scott': std_dev = np.std(probabilities) bin_width = 3.5 * std_dev * data_len ** (-1/3) num_bins = int(np.ceil((probabilities.max() - probabilities.min()) / bin_width)) elif bin_strategy == 'doane': std_dev = np.std(probabilities) skewness = ((np.mean(probabilities) - np.median(probabilities)) / std_dev) sigma_g1 = np.sqrt(6 * (data_len - 2) / ((data_len + 1) * (data_len + 3))) num_bins = int(np.ceil(np.log2(data_len) + 1 + np.log2(1 + abs(skewness) / sigma_g1))) else: raise ValueError("Invalid bin strategy, possible values of 'bin_strategy' are 'sqrt', 'sturges', 'rice', 'freed', 'scott', and 'doane'") else: # Use default behavior of bins=10 for X axis range of 0 to 1.0 num_bins = bins # Create a figure and subplots for binary classification plots fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=figsize) # 1. Confusion Matrix cm_matrix = ConfusionMatrixDisplay.from_predictions(y_true=y_test, y_pred=y_pred, labels=unique_labels, display_labels=display_labels, cmap='Blues', colorbar=False, normalize=None, ax=ax1) for text in cm_matrix.text_: for t in text: t.set_fontsize(conf_fontsize) ax1.set_title(f'Confusion Matrix', fontsize=18, pad=15) ax1.set_xlabel('Predicted Label', fontsize=14, labelpad=15) ax1.set_ylabel('True Label', fontsize=14, labelpad=10) ax1.tick_params(axis='both', which='major', labelsize=11) # 2. Histogram of Predicted Probabilities ax2.hist(probabilities, color=blue, edgecolor='black', alpha=0.7, bins=num_bins, label=f'{model_name} Probabilities') ax2.axvline(x=threshold, color='red', linestyle='--', linewidth=1, label=f'Threshold: {threshold:.{decimal}f}') ax2.set_title('Histogram of Predicted Probabilities', fontsize=18, pad=15) ax2.set_xlabel('Probability', fontsize=14, labelpad=15) ax2.set_ylabel('Frequency', fontsize=14, labelpad=10) ax2.set_xticks(np.arange(0, 1.1, 0.1)) ax2.legend() # 3. ROC Curve ax3.plot([0, 1], [0, 1], color='grey', linestyle=':', label='Chance Baseline') ax3.plot(fpr_array, tpr_array, color=blue, marker='.', lw=2, label=f'{model_name} ROC Curve') ax3.scatter(fpr, tpr, color='red', s=80, zorder=5, label=f'Threshold {threshold:.{decimal}f}') ax3.axvline(x=fpr, ymax=tpr-0.027, color='red', linestyle='--', lw=1, label=f'TPR: {tpr:.{decimal}f}, FPR: {fpr:.{decimal}f}') ax3.axhline(y=tpr, xmax=fpr+0.04, color='red', linestyle='--', lw=1) ax3.set_xticks(np.arange(0, 1.1, 0.1)) ax3.set_yticks(np.arange(0, 1.1, 0.1)) ax3.set_ylim(0,1.05) ax3.set_xlim(-0.05,1.0) ax3.grid(which='both', color='lightgrey', linewidth=0.5) ax3.set_title('ROC Curve', fontsize=18, pad=15) ax3.set_xlabel('False Positive Rate', fontsize=14, labelpad=15) ax3.set_ylabel('True Positive Rate', fontsize=14, labelpad=10) ax3.legend(loc='lower right') # 4. Precision-Recall Curve db.print('\nCalculating precision-recall curve...') db.print('y_test[:5]:', y_test[:5]) db.print('probabilities[:5]:', probabilities[:5]) db.print('pos_label:', pos_label) precision_array, recall_array, _ = precision_recall_curve(y_test, probabilities, pos_label=1) db.print('precision_array[:5]:', precision_array[:5]) db.print('recall_array[:5]:', recall_array[:5]) precision = class_report[pos_display]['precision'] recall = class_report[pos_display]['recall'] db.print('precision:', precision) db.print('recall:', recall) # Plot the Precision-Recall curve ax4.plot(recall_array, precision_array, marker='.', label=f'{model_name} Precision-Recall', color=blue) ax4.scatter(recall, precision, color='red', s=80, zorder=5, label=f'Threshold: {threshold:.{decimal}f}') ax4.axvline(x=recall, ymax=precision-0.025, color='red', linestyle='--', lw=1, label=f'Precision: {precision:.{decimal}f}, Recall: {recall:.{decimal}f}') ax4.axhline(y=precision, xmax=recall-0.025, color='red', linestyle='--', lw=1) ax4.set_xticks(np.arange(0, 1.1, 0.1)) ax4.set_yticks(np.arange(0, 1.1, 0.1)) ax4.set_ylim(0,1.05) ax4.set_xlim(0,1.05) ax4.grid(which='both', color='lightgrey', linewidth=0.5) ax4.set_title('Precision-Recall Curve', fontsize=18, pad=15) ax4.set_xlabel('Recall', fontsize=14, labelpad=15) ax4.set_ylabel('Precision', fontsize=14, labelpad=10) ax4.legend(loc='lower left') plt.tight_layout() plt.show() # Package up the metrics if requested if return_metrics: # Custom metrics dictionary db.print('\nPackaging metrics dictionary...') custom_metrics = { "ROC AUC": roc_auc, "Threshold": threshold, "Class Type": class_type, "Class Map": class_map, "Positive Label": pos_label, "Title": title, "Model Name": model_name, "Class Weight": class_weight, "Multi-Class": multi_class, "Average": average } # Assemble the final metrics based on class type if class_type == 'binary': metrics = {**binary_metrics, **class_report, **custom_metrics} else: metrics = {**class_report, **custom_metrics} db.print('metrics:', metrics) # Return a dictionary of metrics return metrics
[docs] def iterate_model( x_train: pd.DataFrame, x_test: pd.DataFrame, y_train: pd.Series, y_test: pd.Series, model: Optional[str] = None, imputer: Optional[str] = None, transformers: Optional[Union[List[str], str]] = None, scaler: Optional[str] = None, selector: Optional[str] = None, drop: Optional[List[str]] = None, config: Optional[Dict[str, Any]] = None, iteration: str = '1', note: str = '', save: bool = False, save_df: Optional[pd.DataFrame] = None, export: bool = False, plot: bool = False, coef: bool = False, perm: bool = False, vif: bool = False, cross: bool = False, cv_folds: int = 5, grid: bool = False, grid_params: Optional[str] = None, grid_cv: Optional[str] = None, grid_score: str = 'r2', grid_verbose: int = 1, search_type: str = 'grid', random_state: int = 42, n_jobs: Optional[int] = None, decimal: int = 2, lowess: bool = False, timezone: str = 'UTC', debug: bool = False ) -> Tuple[pd.DataFrame, Pipeline, Optional[Dict[str, Any]]]: """ Iterate and evaluate a model pipeline with specified parameters. This function creates a pipeline from specified parameters for imputers, column transformers, scalers, feature selectors, and models. Parameters must be defined in a configuration dictionary containing the sections described below. If `config` is not defined, the `create_pipeline` function will revert to the default config embedded in its code. After creating the pipeline, it fits the pipeline to the passed training data, and evaluates performance with both test and training data. There are options to see plots of residuals and actuals vs. predicted, save results to a save_df with user-defined note, display coefficients, calculate permutation feature importance, variance inflation factor (VIF), and perform cross-validation. If `grid` is set to True, a Grid Search CV will run to find the best hyper- parameters. You must also specify a `grid_params` string that matches a key in the `config['params']` dictionary. This needs to point to a dictionary whose keys exactly match the name of the pipeline steps and parameter you want to search. See the example config. You can also specify a different `grid_score` and control the `grid_verbose` level (set it to 4 to see a full log). If you want to do a Randomized Grid Search, set `search_type` to 'random'. `random_state` defaults to 42. `n_jobs` are None by default, but you can increase the number (however, you may not see the real-time output of the search if you have `grid_verbose` set high). When `iterate_model` is run, the `create_pipeline` function is called to create a pipeline from the specified parameters: * `imputer_key` (str) is selected from `config['imputers']` * `transformer_keys` (list or str) are selected from `config['transformers']` * `scaler_key` (str) is selected from `config['scalers']` * `selector_key` (str) is selected from `config['selectors']` * `model_key` (str) is selected from `config['models']` * `config['no_scale']` lists model keys that should not be scaled. * `config['no_poly']` lists models that should not be polynomial transformed. Here is an example of the configuration dictionary structure. It is based on what `create_pipeline` requires to assemble the pipeline. But it adds some additional configuration parameters only required by `iterate_model`, which are `params` (grid search parameters) and `cv` (cross-validation parameters): >>> config = { # doctest: +SKIP ... 'imputers': { ... 'knn_imputer': KNNImputer().set_output(transform='pandas'), ... 'simple_imputer': SimpleImputer() ... }, ... 'transformers': { ... 'ohe': (OneHotEncoder(drop='if_binary', handle_unknown='ignore'), ... cat_columns), ... 'ord': (OrdinalEncoder(), cat_columns), ... 'poly2': (PolynomialFeatures(degree=2, include_bias=False), ... num_columns), ... 'log': (FunctionTransformer(np.log1p, validate=True), ... num_columns) ... }, ... 'scalers': { ... 'stand': StandardScaler(), ... 'minmax': MinMaxScaler() ... }, ... 'selectors': { ... 'rfe_logreg': RFE(LogisticRegression(max_iter=max_iter, ... random_state=random_state, ... class_weight=class_weight)), ... 'sfs_linreg': SequentialFeatureSelector(LinearRegression()) ... }, ... 'models': { ... 'linreg': LinearRegression(), ... 'logreg': LogisticRegression(max_iter=max_iter, ... random_state=random_state, ... class_weight=class_weight), ... 'tree_class': DecisionTreeClassifier(random_state=random_state), ... 'tree_reg': DecisionTreeRegressor(random_state=random_state) ... }, ... 'no_scale': ['tree_class', 'tree_reg'], ... 'no_poly': ['tree_class', 'tree_reg'], ... 'params': { ... 'sfs': { ... 'Selector: sfs__n_features_to_select': np.arange(3, 13, 1), ... }, ... 'linreg': { ... 'Model: linreg__fit_intercept': [True], ... }, ... 'ridge': { ... 'Model: ridge__alpha': np.array([0.001, 0.1, 1, 10, 100, 1000, 10000, 100000]), ... } ... }, ... 'cv': { ... 'kfold_5': KFold(n_splits=5, shuffle=True, random_state=42), ... 'kfold_10': KFold(n_splits=10, shuffle=True, random_state=42), ... 'skf_5': StratifiedKFold(n_splits=5, shuffle=True, random_state=42), ... 'skf_10': StratifiedKFold(n_splits=10, shuffle=True, random_state=42) ... } ... } In addition to the configuration file, you will need to define any column lists if you want to target certain transformations to a subset of columns. For example, you might define a 'ohe' transformer for One-Hot Encoding, and reference 'ohe_columns' or 'cat_columns' in its definition in the config. When `iterate_model` completes, it will print out the results and performance metrics, as well as any requested charts. It will return the best model, and also the grid search results (if a grid search was ran). In addition, if `save = True` it will append the results to a global variable `results_df`. This should be created using `create_results_df` beforehand. If `export=True` it will save the best model to disk using joblib dump with a timestamp. Use this function to iterate and evaluate different model pipeline configurations, analyze their performance, and select the best model. With one line of code, you can quickly explore a change to the model pipeline, or grid search parameters, and see how it impacts performance. You can also track the results of these iterations in a `results_df` DataFrame that can be used to evaluate the best model, or to plot the progress you made from each iteration. Parameters ---------- x_train : pd.DataFrame Training feature set. x_test : pd.DataFrame Test feature set. y_train : pd.Series Training target set. y_test : pd.Series Test target set. model : str, optional Key for the model to be used (ex: 'linreg', 'lasso', 'ridge'). imputer : str, optional Key for the imputer to be applied (ex: 'simple_imputer'). transformers : List[str], optional List of transformation keys to apply (ex: ['ohe', 'poly2']). scaler : str, optional Key for the scaler to be applied (ex: 'stand'). selector : str, optional Key for the selector to be applied (ex: 'sfs'). drop : List[str], optional List of columns to be dropped from the training and test sets. iteration : str, optional A string identifier for the iteration (default '1'). note : str, optional Any note or comment to be added for the iteration. save : bool, optional Boolean flag to save the results to the global results dataframe. save_df : pd.DataFrame, optional DataFrame to store the results of each iteration. export : bool, optional Boolean flag to export the trained model. plot : bool, optional Flag to plot residual and actual vs predicted for train/test data. coef : bool, optional Flag to print and plot model coefficients. perm : bool, optional Flag to compute and display permutation feature importance. vif : bool, optional Flag to calculate and display Variance Inflation Factor. cross : bool, optional Flag to perform cross-validation and print results. cv_folds : int, optional Number of folds for cross-validation if cross=True (default 5). config : Dict[str, Any], optional Configuration dictionary for pipeline construction. grid : bool, optional Flag to perform grid search for hyperparameter tuning. grid_params : str, optional Key for the grid search parameters in the config dictionary. grid_cv : str, optional Key for the grid search cross-validation in the config dictionary. grid_score : str, optional Scoring metric for grid search (default 'r2'). grid_verbose : int, optional Verbosity level for grid search (default 1). search_type : str, optional Choose type of grid search: 'grid' for GridSearchCV, or 'random' for RandomizedSearchCV. Default is 'grid'. random_state : int, optional Random state seed, necessary for reproducability with RandomizedSearchCV. Default is 42. n_jobs : int, optional Number of jobs to run in parallel for Grid Search or Randomized Search. Default is None. decimal : int, optional Number of decimal places for displaying metrics (default 2). lowess : bool, optional Flag to display lowess curve in residual plots (default False). timezone : str, optional Timezone to be used for timestamps. Default is 'UTC'. debug : bool, optional Flag to show debugging information. Returns ------- Tuple[DataFrame, Pipeline, Optional[Dict[str, Any]]] A tuple containing the save_df DataFrame, the best model pipeline, and the grid search results (if grid=True, else None). Examples -------- Prepare some sample data for the examples: >>> from sklearn.datasets import make_regression >>> from sklearn.model_selection import train_test_split >>> X, y = make_regression(n_samples=100, n_features=5, noise=0.5, ... random_state=42) >>> X_df = pd.DataFrame(X, ... columns=[f"Feature {i+1}" for i in range(X.shape[1])]) >>> y_df = pd.DataFrame(y, columns=['Target']) >>> X_train, X_test, y_train, y_test = train_test_split(X_df, y_df, ... test_size=0.2, random_state=42) Create column lists and set some variables: >>> num_columns = ['Feature 1','Feature 2','Feature 3','Feature 4','Feature 5'] >>> cat_columns = [] >>> random_state = 42 Create a dataframe to store the results of each iteration (optional): >>> results_df = create_results_df() Create a custom configuration file: >>> my_config = { ... 'imputers': { ... 'simple_imputer': SimpleImputer() ... }, ... 'transformers': { ... 'poly2': (PolynomialFeatures(degree=2, include_bias=False), ... num_columns) ... }, ... 'scalers': { ... 'stand': StandardScaler() ... }, ... 'selectors': { ... 'sfs_linreg': SequentialFeatureSelector(LinearRegression()) ... }, ... 'models': { ... 'linreg': LinearRegression(), ... 'ridge': Ridge(random_state=random_state) ... }, ... 'no_scale': [], ... 'no_poly': [], ... 'params': { ... 'linreg': { ... 'linreg__fit_intercept': [True], ... }, ... 'ridge': { ... 'ridge__alpha': np.array([0.1, 1, 10, 100]), ... } ... }, ... 'cv': { ... 'kfold_5': KFold(n_splits=5, shuffle=True, random_state=42) ... } ... } Example 1: Iterate a linear regression model with default parameters: >>> model = iterate_model(X_train, X_test, y_train, y_test, ... model='linreg') #doctest: +ELLIPSIS <BLANKLINE> ITERATION 1 RESULTS <BLANKLINE> Pipeline: linreg ...UTC <BLANKLINE> Predictions: Train Test MSE: 0.20 0.28 RMSE: 0.45 0.53 MAE: 0.36 0.42 R^2 Score: 1.00 1.00 Example 2: Iterate a pipeline with transformers and scalers >>> results_df, model, grid = iterate_model(X_train, X_test, y_train, y_test, ... transformers=['poly2'], scaler='stand', model='ridge', iteration='2', ... grid=True, grid_params='ridge', grid_cv='kfold_5', plot=True, ... coef=True, perm=True, vif=True, config=my_config, ... save=True, save_df=results_df) #doctest: +ELLIPSIS <BLANKLINE> ITERATION 2 RESULTS <BLANKLINE> Pipeline: poly2 -> stand -> ridge ...UTC <BLANKLINE> Grid Search: <BLANKLINE> Fitting 5 folds for each of 4 candidates, totalling 20 fits <BLANKLINE> Best Grid mean score (r2): 1.00 Best Grid parameters: ridge__alpha: 0.1 <BLANKLINE> Predictions: Train Test MSE: 0.20 0.43 RMSE: 0.45 0.66 MAE: 0.37 0.50 R^2 Score: 1.00 1.00 <BLANKLINE> Permutation Feature Importance: Feature Importance Mean Importance Std Feature 2 0.83 0.14 Feature 1 0.47 0.03 Feature 4 0.33 0.03 Feature 3 0.31 0.03 Feature 5 0.11 0.01 <BLANKLINE> Variance Inflation Factor: Features VIF Multicollinearity Feature 1 1.03 Low Feature 4 1.03 Low Feature 5 1.02 Low Feature 3 1.02 Low Feature 2 1.01 Low <BLANKLINE> <BLANKLINE> Coefficients: Feature Coefficient 1 Feature 1 65.68 2 Feature 2 90.96 3 Feature 3 53.72 4 Feature 4 56.56 5 Feature 5 33.85 6 Feature 1^2 0.02 7 Feature 1 Feature 2 0.03 8 Feature 1 Feature 3 -0.16 9 Feature 1 Feature 4 -0.08 10 Feature 1 Feature 5 0.03 11 Feature 2^2 -0.03 12 Feature 2 Feature 3 -0.03 13 Feature 2 Feature 4 0.07 14 Feature 2 Feature 5 -0.05 15 Feature 3^2 -0.06 16 Feature 3 Feature 4 0.03 17 Feature 3 Feature 5 -0.07 18 Feature 4^2 0.01 19 Feature 4 Feature 5 -0.04 20 Feature 5^2 -0.05 """ # Drop specified columns from Xn_train and Xn_test if drop is not None: x_train = x_train.drop(columns=drop) x_test = x_test.drop(columns=drop) if debug: print('Drop:', drop) print('x_train.columns', x_train.columns) print('x_test.columns', x_test.columns) # Check for configuration file parameter, if none, use default in library if config is None: num_columns = x_train.select_dtypes(include=[np.number]).columns.tolist() cat_columns = x_train.select_dtypes(exclude=[np.number]).columns.tolist() if debug: print('Config:', config) print('num_columns:', num_columns) print('cat_columns:', cat_columns) else: num_columns = None cat_columns = None # Create a pipeline from transformer and model parameters if debug: print('BEFORE create_pipeline') print('transformers:', transformers) pipe = create_pipeline(imputer_key=imputer, transformer_keys=transformers, scaler_key=scaler, selector_key=selector, model_key=model, config=config, cat_columns=cat_columns, num_columns=num_columns, debug=debug) if debug: print('AFTER create_pipeline') print('Pipeline:', pipe) print('Pipeline Parameters:', pipe.get_params()) # Construct format string format_str = f',.{decimal}f' # Print some metadata print(f'\nITERATION {iteration} RESULTS\n') pipe_steps = " -> ".join(pipe.named_steps.keys()) print(f'Pipeline: {pipe_steps}') if note: print(f'Note: {note}') # Get the current date and time current_time = datetime.now(pytz.timezone(timezone)) timestamp = current_time.strftime(f'%b %d, %Y %I:%M %p {timezone}') print(f'{timestamp}\n') if cross: print('Cross Validation:\n') # Before fitting the pipeline, check if cross-validation is desired: if cross: # Flatten yn_train for compatibility yn_train_flat = y_train.values.flatten() if isinstance(y_train, pd.Series) else np.array(y_train).flatten() cv_scores = cross_val_score(pipe, x_train, yn_train_flat, cv=cv_folds, scoring='r2') print(f'Cross-Validation (R^2) Scores for {cv_folds} Folds:') for i, score in enumerate(cv_scores, 1): print(f'Fold {i}: {score:{format_str}}') print(f'Average: {np.mean(cv_scores):{format_str}}') print(f'Standard Deviation: {np.std(cv_scores):{format_str}}\n') if grid: # Select the appropriate search method if search_type == 'grid': print('Grid Search:\n') grid = GridSearchCV(pipe, param_grid=config['params'][grid_params], scoring=grid_score, verbose=grid_verbose, cv=config['cv'][grid_cv], n_jobs=n_jobs) elif search_type == 'random': print('Randomized Grid Search:\n') grid = RandomizedSearchCV(pipe, param_distributions=config['params'][grid_params], scoring=grid_score, verbose=grid_verbose, cv=config['cv'][grid_cv], random_state=random_state, n_jobs=n_jobs) else: raise ValueError("search_type should be either 'grid' for GridSearchCV, or 'random' for RandomizedSearchCV") if debug: print('Grid: ', grid) print('Grid Parameters: ', grid.get_params()) # Fit the grid and predict grid.fit(x_train, y_train) #best_model = grid.best_estimator_ best_model = grid yn_train_pred = grid.predict(x_train) yn_test_pred = grid.predict(x_test) if debug: print("First 10 actual train values:", y_train[:10]) print("First 10 predicted train values:", yn_train_pred[:10]) print("First 10 actual test values:", y_test[:10]) print("First 10 predicted test values:", yn_test_pred[:10]) best_grid_params = grid.best_params_ best_grid_score = grid.best_score_ best_grid_estimator = grid.best_estimator_ best_grid_index = grid.best_index_ grid_results = grid.cv_results_ else: best_grid_params = np.nan best_grid_score = np.nan # Fit the pipeline and predict pipe.fit(x_train, y_train) best_model = pipe yn_train_pred = pipe.predict(x_train) yn_test_pred = pipe.predict(x_test) # MSE yn_train_mse = mean_squared_error(y_train, yn_train_pred) yn_test_mse = mean_squared_error(y_test, yn_test_pred) # RMSE yn_train_rmse = np.sqrt(yn_train_mse) yn_test_rmse = np.sqrt(yn_test_mse) # MAE yn_train_mae = mean_absolute_error(y_train, yn_train_pred) yn_test_mae = mean_absolute_error(y_test, yn_test_pred) # R^2 Score if grid: if grid_score == 'r2': train_score = grid.score(x_train, y_train) test_score = grid.score(x_test, y_test) else: train_score = 0 test_score = 0 else: train_score = pipe.score(x_train, y_train) test_score = pipe.score(x_test, y_test) # Print Grid best parameters if grid: print(f'\nBest Grid mean score ({grid_score}): {best_grid_score:{format_str}}') #print(f'Best Grid parameters: {best_grid_params}\n') param_str = ', '.join(f"{key}: {value}" for key, value in best_grid_params.items()) print(f"Best Grid parameters: {param_str}\n") #print(f'Best Grid estimator: {best_grid_estimator}') #print(f'Best Grid index: {best_grid_index}') #print(f'Grid results: {grid_results}') # Print the results print('Predictions:') print(f'{"":<15} {"Train":>15} {"Test":>15}') #print('-'*55) print(f'{"MSE:":<15} {yn_train_mse:>15{format_str}} {yn_test_mse:>15{format_str}}') print(f'{"RMSE:":<15} {yn_train_rmse:>15{format_str}} {yn_test_rmse:>15{format_str}}') print(f'{"MAE:":<15} {yn_train_mae:>15{format_str}} {yn_test_mae:>15{format_str}}') print(f'{"R^2 Score:":<15} {train_score:>15{format_str}} {test_score:>15{format_str}}') # Save the results if save=True if save: if save_df is not None: results_df = save_df else: # Create results_df if it doesn't exist with predefined columns results_df = pd.DataFrame(columns=['Iteration', 'Train MSE', 'Test MSE', 'Train RMSE', 'Test RMSE', 'Train MAE', 'Test MAE', 'Train R^2 Score', 'Test R^2 Score', 'Best Grid Mean Score', 'Best Grid Params', 'Pipeline', 'Note', 'Date']) # Store results in a dictionary results = { 'Iteration': iteration, 'Train MSE': yn_train_mse, 'Test MSE': yn_test_mse, 'Train RMSE': yn_train_rmse, 'Test RMSE': yn_test_rmse, 'Train MAE': yn_train_mae, 'Test MAE': yn_test_mae, 'Train R^2 Score': train_score, 'Test R^2 Score': test_score, 'Best Grid Mean Score': best_grid_score, 'Best Grid Params': best_grid_params, 'Pipeline': pipe_steps, 'Note': note, 'Date': timestamp } # Convert the results dictionary to a pd.Series results_series = pd.Series(results) # Append the series to the DataFrame results_df = pd.concat([results_df, results_series.to_frame().T], ignore_index=True) # Permutation Feature Importance if perm: print("\nPermutation Feature Importance:") if grid: pfi_df = calc_pfi(grid, x_train, y_train) else: pfi_df = calc_pfi(pipe, x_train, y_train) print(pfi_df.to_string(index=False)) # Variance Inflation Factor if vif: print("\nVariance Inflation Factor:") if pipe is not None: if debug: print(type(pipe)) print(pipe.steps) print(hasattr(pipe, '_final_estimator')) if pipe.steps: last_step = pipe.steps[-1][1] if hasattr(last_step, 'transform'): vif_data = pipe.transform(x_train) else: vif_data = x_train else: vif_data = x_train # Convert vif_data to a DataFrame if it's a NumPy array if isinstance(vif_data, np.ndarray): vif_df = pd.DataFrame(vif_data, columns=[f"Feature_{i}" for i in range(vif_data.shape[1])]) else: vif_df = vif_data vif_results = calc_vif(vif_df) print(vif_results.to_string(index=False)) elif grid is not None: if grid.best_estimator_.steps: last_step = grid.best_estimator_.steps[-1][1] if hasattr(last_step, 'transform'): vif_data = grid.best_estimator_.transform(x_train) else: vif_data = x_train else: vif_data = x_train # Convert vif_data to a DataFrame if it's a NumPy array if isinstance(vif_data, np.ndarray): vif_df = pd.DataFrame(vif_data, columns=[f"Feature_{i}" for i in range(vif_data.shape[1])]) else: vif_df = vif_data vif_results = calc_vif(vif_df) print(vif_results.to_string(index=False)) else: print("No pipeline or grid found. Skipping VIF calculation.") if plot: print('') y_train = y_train.values.flatten() if isinstance(y_train, pd.Series) else np.array(y_train).flatten() y_test = y_test.values.flatten() if isinstance(y_test, pd.Series) else np.array(y_test).flatten() yn_train_pred = yn_train_pred.flatten() yn_test_pred = yn_test_pred.flatten() # Generate residual plots plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) sns.residplot(x=y_train, y=yn_train_pred, lowess=lowess, scatter_kws={'s': 30, 'edgecolor': 'white'}, line_kws={'color': 'red', 'lw': '1'}) plt.gca().xaxis.set_major_formatter(FuncFormatter(thousands)) plt.gca().yaxis.set_major_formatter(FuncFormatter(thousands)) plt.title(f'Training Residuals - Iteration {iteration}') plt.xlabel('Predicted') plt.ylabel('Residuals') plt.subplot(1, 2, 2) sns.residplot(x=y_test, y=yn_test_pred, lowess=lowess, scatter_kws={'s': 30, 'edgecolor': 'white'}, line_kws={'color': 'red', 'lw': '1'}) plt.gca().xaxis.set_major_formatter(FuncFormatter(thousands)) plt.gca().yaxis.set_major_formatter(FuncFormatter(thousands)) plt.title(f'Test Residuals - Iteration {iteration}') plt.xlabel('Predicted') plt.ylabel('Residuals') plt.tight_layout() plt.show() # Generate predicted vs actual plots plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) sns.scatterplot(x=y_train, y=yn_train_pred, s=30, edgecolor='white') plt.plot([y_train.min(), y_train.max()], [y_train.min(), y_train.max()], color='red', linewidth=1) plt.gca().xaxis.set_major_formatter(FuncFormatter(thousands)) plt.gca().yaxis.set_major_formatter(FuncFormatter(thousands)) plt.xlabel('Actual') plt.ylabel('Predicted') plt.title(f'Training Predicted vs. Actual - Iteration {iteration}') plt.subplot(1, 2, 2) sns.scatterplot(x=y_test, y=yn_test_pred, s=30, edgecolor='white') plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], color='red', linewidth=1) plt.gca().xaxis.set_major_formatter(FuncFormatter(thousands)) plt.gca().yaxis.set_major_formatter(FuncFormatter(thousands)) plt.xlabel('Actual') plt.ylabel('Predicted') plt.title(f'Test Predicted vs. Actual - Iteration {iteration}') plt.tight_layout() plt.show() # Calculate coefficients if model supports if coef: # Extract features and coefficients using the function coefficients_df = extract_coef( grid.best_estimator_ if grid else pipe, x_train, format=False, debug=debug ) # Check if there are any non-NaN coefficients if coefficients_df['Coefficient'].notna().any(): # Ensure the coefficients are shaped as a 2D numpy array coefficients = coefficients_df[['Coefficient']].values else: coefficients = None # Debugging information if debug: print("Coefficients: ", coefficients) # Print the number of coefficients and selected rows print(f"Number of coefficients: {len(coefficients)}") if coefficients is not None: print("\nCoefficients:") with pd.option_context('display.float_format', lambda x: f'{x:,.{decimal}f}'.replace('-0.00', '0.00')): coefficients_df.index = coefficients_df.index + 1 coefficients_df = coefficients_df.rename(columns={'feature_name': 'Feature', 'coefficients': 'Value'}) print(coefficients_df) if plot: # Flatten the coefficients array for plotting coefficients = coefficients_df['Coefficient'].values.flatten() feature_names = coefficients_df['Feature'].values.flatten() plt.figure(figsize=(12, 4)) x_values = range(len(feature_names)) plt.bar(x_values, coefficients, align='center') # Set the x-ticks labels to be the feature names plt.xticks(x_values, feature_names, rotation=90, ha='right') plt.xlabel('') plt.ylabel('') plt.title('Coefficients') plt.axhline(y=0, color='black', linestyle='dotted', lw=1) plt.gca().yaxis.set_major_formatter(FuncFormatter(thousands)) plt.tight_layout() plt.show() if export: filestamp = current_time.strftime('%Y%m%d_%H%M%S') filename = f'iteration_{iteration}_model_{filestamp}.joblib' dump(best_model, filename) # Check if file exists and display a message if os.path.exists(filename): print(f"\nModel saved successfully as {filename}") else: print(f"\nFAILED to save the model as {filename}") if save: if grid: return results_df, best_model, grid_results else: return results_df, best_model else: if grid: return best_model, grid_results else: return best_model
[docs] def plot_acf_residuals( results: Any, figsize: Tuple[float, float] = (12, 8), rotation: int = 45, bins: int = 30, lags: int = 40, legend_loc: str = 'best', show_std: bool = True, pacf_method: str = 'ywm', alpha: float = 0.7 ) -> None: """ Plot residuals, histogram, ACF, and PACF of a time series ARIMA model. This function takes the results of an ARIMA model and creates a 2x2 grid of plots to visualize the residuals, their histogram, autocorrelation function (ACF), and partial autocorrelation function (PACF). The residuals are plotted with lines indicating standard deviations from the mean if `show_std` is True. Use this function in time series analysis to assess the residuals of an ARIMA model and check for any patterns or autocorrelations that may indicate inadequacies in the model. Parameters ---------- results : Any The result object typically obtained after fitting an ARIMA model. This object should have a `resid` attribute containing the residuals. figsize : Tuple[float, float], optional The size of the figure in inches, specified as (width, height). Default is (12, 7). rotation : int, optional The rotation angle for the x-axis tick labels in degrees. Default is 45. bins : int, optional The number of bins to use in the histogram of residuals. Default is 30. lags : int, optional The number of lags to plot in the ACF and PACF plots. Default is 40. legend_loc : str, optional The location of the legend in the residual plot and histogram. Default is 'best'. show_std : bool, optional Whether to display the standard deviation lines in the residual plot and histogram. Default is True. pacf_method : str, optional The method to use for the partial autocorrelation function (PACF) plot. Default is 'ywm'. Other options include 'ywadjusted', 'ywmle' and 'ols'. alpha : float, optional The transparency of the histogram bars, between 0 and 1. Default is 0.7. Returns ------- None The function displays a 2x2 grid of plots using matplotlib. Examples -------- Prepare the necessary data and model: >>> from statsmodels.tsa.arima.model import ARIMA >>> import numpy as np >>> data = np.random.random(100) >>> model = ARIMA(data, order=(1, 1, 1)) >>> results = model.fit() Example 1: Plot residuals with default parameters: >>> plot_acf_residuals(results) Example 2: Plot residuals without standard deviation lines: >>> plot_acf_residuals(results, show_std=False) Example 3: Plot residuals with custom figsize, bins, and PACF method: >>> plot_acf_residuals(results, figsize=(12, 10), bins=20, pacf_method='ols') """ residuals = results.resid std_dev = residuals.std() fig, ax = plt.subplots(2, 2, figsize=figsize) # Plot residuals ax[0, 0].axhline(y=0, color='lightgrey', linestyle='-', lw=1) if show_std: ax[0, 0].axhline(y=std_dev, color='red', linestyle='--', lw=1, label=f'1 STD (±{std_dev:.2f})') ax[0, 0].axhline(y=2*std_dev, color='red', linestyle=':', lw=1, label=f'2 STD (±{2*std_dev:.2f})') ax[0, 0].axhline(y=-std_dev, color='red', linestyle='--', lw=1) ax[0, 0].axhline(y=2*-std_dev, color='red', linestyle=':', lw=1) ax[0, 0].legend(loc=legend_loc) ax[0, 0].plot(residuals, label='Residuals') ax[0, 0].tick_params(axis='x', rotation=rotation) ax[0, 0].set_title('Residuals from ARIMA Model', fontsize=15, pad=10) ax[0, 0].set_xlabel("Time", fontsize=12, labelpad=10) ax[0, 0].set_ylabel("Residual Value", fontsize=12, labelpad=10) # Plot histogram of residuals ax[0, 1].hist(residuals, bins=bins, edgecolor='k', alpha=alpha) if show_std: ax[0, 1].axvline(x=std_dev, color='red', linestyle='--', lw=1, label=f'1 STD (±{std_dev:.2f})') ax[0, 1].axvline(x=2*std_dev, color='red', linestyle=':', lw=1, label=f'2 STD (±{2*std_dev:.2f})') ax[0, 1].axvline(x=-std_dev, color='red', linestyle='--', lw=1) ax[0, 1].axvline(x=2*-std_dev, color='red', linestyle=':', lw=1) ax[0, 1].legend(loc=legend_loc) ax[0, 1].set_title("Histogram of Residuals", fontsize=15, pad=10) ax[0, 1].set_xlabel("Residual Value", fontsize=12, labelpad=10) ax[0, 1].set_ylabel("Frequency", fontsize=12, labelpad=10) # Plot ACF of residuals plot_acf(residuals, lags=lags, ax=ax[1, 0]) ax[1, 0].set_title("ACF of Residuals", fontsize=15, pad=10) ax[1, 0].set_xlabel("Lag", fontsize=12, labelpad=10) ax[1, 0].set_ylabel("Autocorrelation", fontsize=12, labelpad=10) # Plot PACF of residuals plot_pacf(residuals, lags=lags, ax=ax[1, 1], method=pacf_method) ax[1, 1].set_title("PACF of Residuals", fontsize=15, pad=10) ax[1, 1].set_xlabel("Lag", fontsize=12, labelpad=10) ax[1, 1].set_ylabel("Partial Autocorrelation", fontsize=12, labelpad=10) plt.tight_layout(pad=2) plt.show()
[docs] def plot_results( df: pd.DataFrame, metrics: Optional[Union[str, List[str]]] = None, select_metric: Optional[str] = None, select_criteria: str = 'max', chart_type: str = 'line', decimal: int = 2, return_df: bool = False, x_column: str = 'Iteration', y_label: str = None, rotation: int = 45, title: Optional[str] = None ) -> Optional[pd.DataFrame]: """ Plot the results of model iterations and select the best metric. This function creates line plots to visualize the performance of a model over multiple iterations, or to compare the performance of multiple models. Specify one or more `metrics` columns to plot (ex: 'Train MAE', 'Test MAE') in a list, and specify the name of the `x_column` whose values will become the X axis of the plot. The default is 'Iteration', which aligns with the format of the 'results_df' DataFrame created by the `create_results_df` function. But this could be any column in the provided `df` that you want to compare across (for example, 'Model', 'Epoch', 'Dataset'). In addition, if you specify `select_metric` (any metric column in the `df`) and `select_criteria` ('min' or 'max'), the best result will be selected and plotted on the chart with a vertical line, dot, and a legend label that describes the value. The number of decimal places can be controlled by setting `decimal` (default is 2). The title of the chart will be dynamically generated if `y_label` and `x_column` are defined. The title will be constructed in this format: '{y_label} over {x_column}' (ex: 'MSE over Iteration'). However, you can always pass a customer title by setting `title` to any string of text. If none of these are defined, there will be no title on the chart. Use this function to easily visualize and compare the performance of a model across different metrics, and identify the best iteration based on a chosen metric and criteria. Parameters ---------- df : pd.DataFrame The DataFrame containing the model evaluation results. metrics : Optional[Union[str, List[str]]], optional The metric(s) to plot. If a single string is provided, it will be converted to a list. If None, an error will be raised. Default is None. select_metric : Optional[str], optional The metric to use for selecting the best result. If None, then no best result will be selected. Default is None. select_criteria : str, optional The criteria for selecting the best result. Can be either 'max' or 'min'. Required if `select_metric` is specified. Default is 'max'. chart_type : str, optional The type of chart to plot. Currently only 'line' or 'bar' is supported. Default is 'line'. decimal : int, optional The number of decimal places to display in the plot and legend. Default is 2. return_df : bool, optional Whether to return the melted DataFrame used for plotting. Default is False. x_column : str, optional The column in `df` to use as the x-axis. Default is 'Iteration'. y_label : str, optional The text to display as the label for the y-axis, and to also include in the dynamically generated title of the chart. Default is None. title : Optional[str], optional The title of the plot. If None, a default title will be generated from `select_metric` and `x_column`. If `select_metric` is also None, the title will be blank. Default is None. rotation : int, optional The rotation angle for the x-axis tick labels in degrees. Default is 45. Returns ------- Optional[pd.DataFrame] If `return_df` is True, returns the melted DataFrame used for plotting. Otherwise, returns None. Examples -------- Prepare some example data: >>> df = pd.DataFrame({ ... 'Iteration': [1, 2, 3, 4, 5], ... 'Train Accuracy': [0.8510, 0.9017, 0.8781, 0.9209, 0.8801], ... 'Test Accuracy': [0.8056, 0.8509, 0.8232, 0.8889, 0.8415] ... }) Example 1: Plot a single metric with default parameters: >>> plot_results(df, metrics='Test Accuracy') Example 2: Plot multiple metrics, select the best result based on the minimum value of 'Test Accuracy', and customize the Y-axis label: >>> plot_results(df, metrics=['Train Accuracy', 'Test Accuracy'], ... select_metric='Test Accuracy', select_criteria='max', ... y_label='Accuracy') Example 3: Plot multiple metrics, customize the title and decimal, and return the melted DataFrame: >>> long_df = plot_results(df, metrics=['Train Accuracy', 'Test Accuracy'], ... select_metric='Test Accuracy', select_criteria='max', ... title='Train vs. Test Accuracy by Model Iteration', ... return_df=True, decimal=4) >>> long_df Iteration Metric Value 0 1 Train Accuracy 0.8510 1 2 Train Accuracy 0.9017 2 3 Train Accuracy 0.8781 3 4 Train Accuracy 0.9209 4 5 Train Accuracy 0.8801 5 1 Test Accuracy 0.8056 6 2 Test Accuracy 0.8509 7 3 Test Accuracy 0.8232 8 4 Test Accuracy 0.8889 9 5 Test Accuracy 0.8415 Example 4: Plot a single metric as a bar chart: >>> plot_results(df, metrics='Test Accuracy', chart_type='bar') Example 5: Plot multiple metrics as a bar chart: >>> plot_results(df, metrics=['Train Accuracy', 'Test Accuracy'], ... select_metric='Test Accuracy', select_criteria='max', ... y_label='Accuracy', chart_type='bar') """ # Check if metrics are provided if metrics is None: raise ValueError("At least one metric must be provided.") # Convert metrics to a list if it's a single string if isinstance(metrics, str): metrics = [metrics] # Melt dataframe to long format df_long = df.melt(id_vars=[x_column], value_vars=metrics, var_name='Metric', value_name='Value') # Start the plot plt.figure(figsize=(12, 6)) plt.grid(linestyle='-', linewidth=0.5, color='#DDDDDD', zorder=0) # Decide between lineplot and barplot if chart_type == 'line': sns.lineplot(data=df_long, x=x_column, y='Value', hue='Metric', zorder=2) elif chart_type == 'bar': sns.barplot(data=df_long, x=x_column, y='Value', hue='Metric', zorder=2) # Plot the best result if select_metric is specified if select_metric is not None: # Check if select_criteria is valid if select_criteria not in ['max', 'min']: raise ValueError("To select a best result, select_criteria must be either 'max' or 'min'.") # Find iteration with min/max metric value if select_criteria == 'max': best_iter = df[df[select_metric] == df[select_metric].max()][x_column].values[0] best_val = df[df[x_column] == best_iter][select_metric].values[0] else: best_iter = df[df[select_metric] == df[select_metric].min()][x_column].values[0] best_val = df[df[x_column] == best_iter][select_metric].values[0] # Get y-coordinate of the vertical line to position the dot y_coord = df_long[(df_long[x_column] == best_iter) & (df_long['Metric'] == select_metric)]['Value'].values[0] # Format the best_val with decimal places and commas best_val_formatted = f'{best_val:,.{decimal}f}' # Plot the best result if chart_type == 'line': # Plot the vertical dotted line plt.axvline(x=best_iter, color='green', linestyle='--', zorder=3, label=f"{x_column} {best_iter}: {select_metric}: {best_val_formatted}") # Plot the dot plt.scatter(best_iter, y_coord, color='green', s=60, zorder=3) elif chart_type == 'bar': # Plot the horizontal dotted line plt.axhline(y=best_val, color='green', linestyle='--', zorder=3, label=f"{x_column} {best_iter}: {select_metric}: {best_val_formatted}") # Continue the plot plt.legend(loc='best') # Format the X axis plt.xticks(df[x_column].unique(), rotation=rotation) plt.xlabel(x_column, fontsize=14, labelpad=10) # Plot the title, with whatever parameters we have if title is None: if y_label is not None: plt.title(f'{y_label} over {x_column}', fontsize=18, pad=15) else: plt.title('') else: plt.title(f'{title}', fontsize=18, pad=15) # Custom formatter that adds commas and respects decimal parameter def format_tick(value, pos): return f'{value:,.{decimal}f}' # Format the Y axis if y_label is not None: plt.ylabel(y_label, fontsize=14, labelpad=10) else: plt.ylabel('Value', fontsize=14, labelpad=10) plt.gca().yaxis.set_major_formatter(FuncFormatter(format_tick)) plt.show() # Return the long format df if requested if return_df: return df_long.reset_index(drop=True)
[docs] def plot_train_history( model=None, history=None, metrics: Optional[List[str]] = None, plot_loss: bool = True ) -> None: """ Visualize the training history of a fitted Keras model or history dictionary. This function creates a grid of subplots to display the training and validation metrics over the epochs. You can pass a fitted model, in which case the history will be extracted from it. Alternatively, you can pass the history dictionary itself. This function will automatically detect the metrics present in the history and plot them all, unless a specific list of metrics is provided. The loss is plotted by default, but can be excluded by setting `plot_loss` to False. Use this function to quickly analyze the model's performance during training and identify potential issues such as overfitting or underfitting. Parameters ---------- model : keras.Model, optional The fitted Keras model whose training history will be plotted. Default is None. history : dict, optional A direct history dictionary obtained from the fitting process. Default is None. metrics : List[str], optional A list of metric names to plot. If None, all metrics found in the history will be plotted, excluding 'loss' unless explicitly listed. Default is None. plot_loss : bool, optional Whether to plot the training and validation loss. Default is True. Returns ------- None The function displays the plot and does not return any value. Examples -------- Prepare a simple example model: >>> model = Sequential([ ... Input(shape=(8,)), ... Dense(10, activation='relu'), ... Dense(1, activation='sigmoid') ... ]) >>> model.compile(optimizer='adam', loss='binary_crossentropy', ... metrics=['accuracy', 'precision', 'recall']) Fit the model on some random data: >>> import numpy as np >>> X = np.random.rand(100, 8) >>> y = np.random.randint(0, 2, size=(100, 1)) >>> model.fit(X, y, epochs=10, batch_size=32, validation_split=0.2, ... verbose=0) #doctest: +ELLIPSIS <keras...callbacks.history.History object at 0x...> >>> history = model.history.history Example 1: Plot all metrics in the training history from a model: >>> plot_train_history(model) Example 2: Plot the training history with specific metrics: >>> plot_train_history(model, metrics=['accuracy', 'precision']) Example 3: Plot the training history without the loss: >>> plot_train_history(model, plot_loss=False) Example 4: Plot the training history of a model without validation data: >>> model.fit(X, y, epochs=10, batch_size=32, verbose=0) #doctest: +ELLIPSIS <keras...callbacks.history.History object at 0x...> >>> plot_train_history(model) Example 5: Plot the training history from a history dictionary: >>> plot_train_history(history=history) """ # Determine the history source if model is not None: if not hasattr(model, 'history') or model.history is None: raise ValueError("The model has not been fitted yet. Please fit the model before plotting.") history_data = model.history.history elif history is not None: if not isinstance(history, dict): raise TypeError("The 'history' parameter must be a dictionary.") history_data = history else: raise ValueError("Either a fitted 'model' or 'history' dictionary is required for plotting.") # Auto-detect metrics if not provided, excluding loss if metrics is None: metrics = [key for key in history_data.keys() if not key.startswith('val_') and key != 'loss'] # Filter out metrics not in history metrics = [metric for metric in metrics if metric in history_data or 'val_' + metric in history_data] # Calculate the number of plots total_plots = (1 if plot_loss and 'loss' in history_data else 0) + len(metrics) rows = math.ceil(total_plots / 2) cols = 2 if total_plots > 1 else 1 # Create subplots fig, axs = plt.subplots(rows, cols, figsize=(12, 5.5 * rows)) axs = np.array(axs).reshape(-1) if total_plots > 1 else np.array([axs]) plot_index = 0 # Plot Loss if required if plot_loss and 'loss' in history_data: axs[plot_index].plot(history_data['loss'], label='Training Loss', marker='.') if 'val_loss' in history_data: axs[plot_index].plot(history_data['val_loss'], label='Validation Loss', marker='.') axs[plot_index].set_title('Loss', fontsize=18, pad=15) axs[plot_index].set_xlabel('Epoch', fontsize=14, labelpad=15) axs[plot_index].set_ylabel('Loss', fontsize=14, labelpad=10) axs[plot_index].grid(which='both', color='lightgrey', linewidth=0.5) axs[plot_index].legend() plot_index += 1 # Plot specified metrics and their validation counterparts if present for metric in metrics: if metric in history_data: axs[plot_index].plot(history_data[metric], label=f'Training {metric.capitalize()}', marker='.') val_metric = 'val_' + metric if val_metric in history_data: axs[plot_index].plot(history_data[val_metric], label=f'Validation {metric.capitalize()}', marker='.') axs[plot_index].set_title(metric.capitalize(), fontsize=18, pad=15) axs[plot_index].set_xlabel('Epoch', fontsize=14, labelpad=15) axs[plot_index].set_ylabel(metric.capitalize(), fontsize=14, labelpad=10) axs[plot_index].grid(which='both', color='lightgrey', linewidth=0.5) axs[plot_index].legend() plot_index += 1 # Hide any unused axes in case of an odd number of total plots for idx in range(plot_index, rows * cols): axs[idx].set_visible(False) plt.tight_layout() plt.show()