# Wine Quality Classification https://www.scirp.org/journal/paperinformation?paperid=107796 about Wine Datasets https://www.jair.org/index.php/jair/article/view/10302/24590 about SMOTE ```python import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt from seaborn import pairplot ``` ```python white_wine = pd.read_csv("winequality-white.csv", sep=";") red_wine = pd.read_csv("winequality-red.csv", sep=";") red_wine.head() ``` ```python red_wine["is_red"] = True white_wine["is_red"] = False wine = pd.concat([white_wine, red_wine]).reset_index(drop=True) wine ``` ```python print(wine[wine['is_red'] == True].shape[0], wine[wine['is_red'] == False].shape[0]) ``` ```python colors = {True: "darkred", False: "gold"} ax = sns.countplot(wine, x="quality", hue="is_red", palette=colors) for p in ax.patches: ax.annotate(f'{int(p.get_height())}', (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='bottom', fontsize=12, color='black') legend_labels = ['Red Wine', 'White Wine'] legend_colors = [plt.Line2D([0], [0], marker='o', color='w', markersize=10, markerfacecolor=colors[True]), plt.Line2D([0], [0], marker='o', color='w', markersize=10, markerfacecolor=colors[False])] plt.legend(legend_colors, legend_labels, title="Wine Type", loc="upper right") plt.title('Count of Each Class in the Dataset') plt.xlabel('Wine quality class') plt.ylabel('Amount of instancies') ``` ```python import warnings warnings.filterwarnings('ignore') pairplot(wine, vars=["density", "pH", "alcohol"], hue="quality") pairplot(wine, vars=["chlorides", "free sulfur dioxide", "total sulfur dioxide", "sulphates"], hue="quality") pairplot(wine, vars=["fixed acidity", "volatile acidity", "citric acid", "residual sugar"], hue="quality") ``` ```python c = wine.corr() c.style.background_gradient(cmap='coolwarm') ``` ```python from sklearn.model_selection import train_test_split data, target = wine.drop("quality", axis=1), wine["quality"] ``` ```python from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler import matplotlib.pyplot as plt # Standardizing the data (PCA works best with standardized data) scaler = StandardScaler() data_scaled = scaler.fit_transform(data) # Apply PCA data_pca_variations = [] for i in range(1, data.shape[1] + 1): pca = PCA(n_components=i) data_pca = pca.fit_transform(data_scaled) data_pca_variations.append(sum(pca.explained_variance_ratio_)) plt.plot(range(1, data.shape[1] + 1), data_pca_variations) for i, txt in enumerate(data_pca_variations): plt.text(i + 1, txt, f"{txt:.2f}", fontsize=10, ha='right', va='bottom', color='black') plt.xlabel("Number of PCA Components") plt.ylabel("Cumulative Explained Variance") plt.title("PCA Total Variation vs. Number of Components") plt.show() ``` ```python train_data, test_data, train_target, test_target = train_test_split(data, target, test_size=0.2, random_state=82) train_data, valid_data, train_target, valid_target = train_test_split(train_data, train_target, test_size=0.2, random_state=72) train_valid_data = pd.concat([train_data, valid_data]) train_valid_target = pd.concat([train_target, valid_target]) ``` ## Models - Naive Bayes - Regression Tree - Linear Regression - SVM - Neural Network We are training models listed above to achieve best quality possible without upsampling. Later we are going to use upsample methods try to beat this base line. ```python from sklearn.metrics import precision_score, mean_squared_error, accuracy_score, balanced_accuracy_score, get_scorer_names from sklearn.metrics import ConfusionMatrixDisplay from sklearn.pipeline import make_pipeline, Pipeline from sklearn.experimental import enable_halving_search_cv from sklearn.model_selection import HalvingGridSearchCV from sklearn.naive_bayes import GaussianNB from sklearn.tree import DecisionTreeRegressor from sklearn.svm import SVR from sklearn.linear_model import Ridge from sklearn.neural_network import MLPClassifier, MLPRegressor from sklearn.utils.class_weight import compute_class_weight ``` ### Naive Bayes ```python method = "NB" pipeline = make_pipeline(StandardScaler(), PCA(), GaussianNB()) param_grid = {"pca__n_components": list(range(2, 13)),} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error").fit(train_valid_data, train_valid_target) search.best_params_ ``` ```python print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("") print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) ConfusionMatrixDisplay.from_predictions(test_target, np.round(search.best_estimator_.predict(test_data)), normalize="true" ) plt.title('Evaluation of test data (Naive Bayes)') ``` ### Decission Tree ```python method = "Tree" pipeline = Pipeline([("scaler", StandardScaler()), ("rTree", DecisionTreeRegressor(criterion="squared_error"))]) param_grid = {"rTree__max_depth": list(range(20)),} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=10, ).fit(train_valid_data, train_valid_target) search.best_params_ ``` ```python print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("") print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) ConfusionMatrixDisplay.from_predictions(test_target, np.round(search.best_estimator_.predict(test_data)), normalize="true" ) plt.title('Evaluation of test data (Dec. tree)') ``` ### Linear Regression ```python method = "LR" pipeline = Pipeline([("scaler", StandardScaler()), ("pca", PCA()), ("ridge", Ridge())]) param_grid = {"ridge__alpha": [i for i in range(1, 50)], "pca__n_components": list(range(2, 13))} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=10).fit(train_valid_data, train_valid_target) search.best_params_ ``` ```python print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("") print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) ConfusionMatrixDisplay.from_predictions(test_target, np.round(search.best_estimator_.predict(test_data)), normalize="true" ) plt.title('Evaluation of test data (Line reg.)') ``` ### SVM ```python method = "SVM" pipeline = Pipeline([("scaler", StandardScaler()), ("svr", SVR())]) param_grid = [ { "svr__kernel": ["linear"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], }, { "svr__kernel": ["poly"], "svr__C": [0.1, 1, 10], "svr__degree": list(range(2, 4)), "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, { "svr__kernel": ["rbf"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, { "svr__kernel": ["sigmoid"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, ] search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=5).fit(train_valid_data, train_valid_target) search.best_params_ ``` ```python print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("") print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) ConfusionMatrixDisplay.from_predictions(test_target, np.round(search.best_estimator_.predict(test_data)), normalize="true" ) plt.title('Evaluation of test data (SVR)') ``` ### Neural Network ```python import os import torch from torch import nn from torch.utils.data import DataLoader, Dataset from torchvision import datasets, transforms from torchvision.transforms import ToTensor from tqdm.notebook import tqdm import albumentations as A from albumentations.pytorch import ToTensorV2 ``` ```python device = 'cpu' #torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu" print(f"Using {device} device") ``` ```python config = { 'batch_size': 64, 'num_workers': 1, 'dropout': 0.5, 'lr': 0.0001, 'optimizer': torch.optim.Adam, "epochs": 500, "input_shape": train_data.shape[1] } method = "NN" ``` ```python st_scaler = StandardScaler() st_scaler.fit(train_valid_data) class WineDataset(Dataset): def __init__(self, data, target, transform=ToTensor()): self.transform = transform self.data = torch.tensor(st_scaler.transform(data).astype(np.float32)) self.target = torch.tensor(target.values.astype(np.float32)).reshape(-1, 1) def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx], self.target[idx] train_dataset = WineDataset(train_data, train_target) valid_dataset = WineDataset(valid_data, valid_target) test_dataset = WineDataset(test_data, test_target) training_loader = torch.utils.data.DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True) validation_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False) ``` ```python class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(config["input_shape"], 32), nn.ReLU(), nn.Linear(32, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(),plot nn.Linear(64, 16), nn.ReLU(), nn.Linear(16, 8), nn.ReLU(), nn.Linear(8, 1) ) torch.nn.init.kaiming_normal_(self.linear_relu_stack[0].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[2].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[4].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[6].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[8].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[10].weight, nonlinearity='relu') def forward(self, x): x = self.flatten(x) logits = self.linear_relu_stack(x) return logits model = NeuralNetwork().to(device) print(model) ``` ```python loss_fn = torch.nn.MSELoss() optimizer = config["optimizer"](model.parameters(), lr=config["lr"]) ``` ```python def loss_bch(model, loss_func, xb, yb, dev, opt=None): xb, yb = xb.to(dev), yb.to(dev) loss = loss_func(model(xb), yb) if opt is not None: loss.backward() opt.step() opt.zero_grad() return loss.item(), len(xb) def train(net, train_dataloader, loss_func, device, optimizer): net.train() loss, size = 0, 0 for b_idx, (xb, yb) in tqdm(enumerate(train_dataloader), total=len(train_dataloader), leave=False): b_loss, b_size = loss_bch(net, loss_func, xb, yb, device, optimizer) loss += b_loss * b_size size += b_size return loss / size def validate(net, val_dataloader, loss_func, device, optimizer=None): net.eval() with torch.no_grad(): losses, nums = zip( *[loss_bch(net, loss_func, xb, yb, device) for xb, yb in val_dataloader] ) return np.sum(np.multiply(losses, nums)) / np.sum(nums) def predict(net, data_loader, device): results = [] targets = [] net.eval() with torch.no_grad(): for xb, yb in data_loader: xb = xb.to(device) results += map(lambda x: x.item(), list(model(xb))) targets += map(lambda x: x.item(), list(yb)) return results, targets class EarlyStopper: def __init__(self, model, patience=1, min_delta=0): self.patience = patience self.min_delta = min_delta self.counter = 0 self.min_validation_loss = float('inf') self.model_path = "best_model.pt" self.model = model def early_stop(self, validation_loss): if validation_loss < self.min_validation_loss: self.min_validation_loss = validation_loss self.counter = 0 torch.save(self.model.state_dict(), self.model_path) elif validation_loss > (self.min_validation_loss + self.min_delta): self.counter += 1 if self.counter >= self.patience: return True return False def load_model(self): model = NeuralNetwork() model.load_state_dict(torch.load(self.model_path, weights_only=True)) model.eval() return model def fit(net, epochs, train_dataloader, val_dataloader, loss, optimizer, device): early_stopper = EarlyStopper(model=net, patience=5, min_delta=0.01) for epoch in tqdm(range(epochs)): train_loss = train(net, train_dataloader, loss, device, optimizer) val_loss = validate(net, val_dataloader, loss, device) if early_stopper.early_stop(val_loss): net = early_stopper.load_model() break if (epoch % 10 == 0): print(f'\nepoch {epoch+1}/{epochs}, loss: {train_loss:.05f},' f'validation loss: {val_loss:.05f}') print("Training finished!") return net, validate(net, train_dataloader, loss, device), validate(net, val_dataloader, loss, device) ``` ```python model, train_loss, val_loss = fit(model, config["epochs"], training_loader, validation_loader, loss_fn, optimizer, device) ``` ```python train_loss, val_loss ``` ```python tr_pred, tr_target = predict(model, training_loader, device) val_pred, val_target = predict(model, validation_loader, device) train_valid_predictions, tr_val_target = tr_pred + val_pred, tr_target + val_target test_predictions, ts_target = predict(model, test_loader, device) print("mean_squared_error: ", mean_squared_error(tr_val_target, train_valid_predictions)) print("mean_squared_error: ", mean_squared_error(ts_target, test_predictions)) print("") print("accuracy Train: ", accuracy_score(tr_val_target, np.round(train_valid_predictions))) print("accuracy Test: ", accuracy_score(ts_target, np.round(test_predictions))) print("") print("balanced accuracy Train: ", balanced_accuracy_score(tr_val_target, np.round(train_valid_predictions))) print("balanced accuracy Test: ", balanced_accuracy_score(ts_target, np.round(test_predictions))) ConfusionMatrixDisplay.from_predictions(ts_target, np.round(test_predictions), normalize="true" ) plt.title('Evaluation of test data (NN)') ``` ## Interpretability ```python # fig, axes = plt.subplots(2, 3, figsize=(12, 8)) ``` ```python from sklearn.inspection import permutation_importance r = permutation_importance(pipeline, train_valid_data, train_valid_target, n_repeats=5, random_state=0) ``` ```python for i in r.importances_mean.argsort()[::-1]: if r.importances_mean[i] - 2 * r.importances_std[i] > 0: print(f"{train_data.columns[i]:<25}" f"{r.importances_mean[i]:.3f}" f" +/- {r.importances_std[i]:.3f}") ```