# Wine Quality Classification https://www.scirp.org/journal/paperinformation?paperid=107796 about Wine Datasets https://www.jair.org/index.php/jair/article/view/10302/24590 about SMOTE ```python import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt from seaborn import pairplot import warnings warnings.filterwarnings('ignore') ``` ```python white_wine = pd.read_csv("winequality-white.csv", sep=";") red_wine = pd.read_csv("winequality-red.csv", sep=";") red_wine["is_red"] = True white_wine["is_red"] = False wine = pd.concat([white_wine, red_wine]).reset_index(drop=True) print(len(wine)) wine.head() ``` ```python print(wine[wine['is_red'] == True].shape[0], wine[wine['is_red'] == False].shape[0]) ``` ```python colors = {True: "darkred", False: "gold"} ax = sns.countplot(wine, x="quality", hue="is_red", palette=colors) for p in ax.patches: ax.annotate(f'{int(p.get_height())}', (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='bottom', fontsize=12, color='black') legend_labels = ['Red Wine', 'White Wine'] legend_colors = [plt.Line2D([0], [0], marker='o', color='w', markersize=10, markerfacecolor=colors[True]), plt.Line2D([0], [0], marker='o', color='w', markersize=10, markerfacecolor=colors[False])] plt.legend(legend_colors, legend_labels, title="Wine Type", loc="upper right") plt.title('Count of Each Class in the Dataset') plt.xlabel('Wine quality class') plt.ylabel('Amount of instancies') ``` ```python from sklearn.model_selection import train_test_split data, target = wine.drop("quality", axis=1), wine["quality"] ``` ```python train_data, test_data, train_target, test_target = train_test_split(data, target, test_size=0.2, random_state=82) train_data, valid_data, train_target, valid_target = train_test_split(train_data, train_target, test_size=0.2, random_state=72) train_valid_data = pd.concat([train_data, valid_data]) train_valid_target = pd.concat([train_target, valid_target]) ``` ## Models - Naive Bayes - Regression Tree - Linear Regression - SVM - Neural Network We are training models listed above to achieve best quality possible without upsampling. Later we are going to use upsample methods try to beat this base line. ```python from sklearn.metrics import precision_score, mean_squared_error, accuracy_score, balanced_accuracy_score, get_scorer_names from sklearn.metrics import ConfusionMatrixDisplay from sklearn.pipeline import make_pipeline, Pipeline from sklearn.experimental import enable_halving_search_cv from sklearn.model_selection import HalvingGridSearchCV from sklearn.naive_bayes import GaussianNB from sklearn.tree import DecisionTreeRegressor from sklearn.svm import SVR from sklearn.linear_model import Ridge from sklearn.neural_network import MLPClassifier, MLPRegressor from sklearn.utils.class_weight import compute_class_weight from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler ``` #### Neural Network Init ```python import os import torch from torch import nn from torch.utils.data import DataLoader, Dataset from torchvision import datasets, transforms from torchvision.transforms import ToTensor from tqdm.notebook import tqdm import albumentations as A from albumentations.pytorch import ToTensorV2 ``` ```python device = 'cpu' #torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu" print(f"Using {device} device") ``` ```python config = { 'batch_size': 64, 'num_workers': 1, 'dropout': 0.5, 'lr': 0.0001, 'optimizer': torch.optim.Adam, "epochs": 500, "input_shape": train_data.shape[1] } method = "NN" ``` ```python st_scaler = StandardScaler() st_scaler.fit(train_valid_data) class WineDataset(Dataset): def __init__(self, data, target, transform=ToTensor()): self.transform = transform self.data = torch.tensor(st_scaler.transform(data).astype(np.float32)) self.target = torch.tensor(target.values.astype(np.float32)).reshape(-1, 1) def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx], self.target[idx] train_dataset = WineDataset(train_data, train_target) valid_dataset = WineDataset(valid_data, valid_target) test_dataset = WineDataset(test_data, test_target) training_loader = torch.utils.data.DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True) validation_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False) ``` ```python class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(config["input_shape"], 32), nn.ReLU(), nn.Linear(32, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 16), nn.ReLU(), nn.Linear(16, 8), nn.ReLU(), nn.Linear(8, 1) ) torch.nn.init.kaiming_normal_(self.linear_relu_stack[0].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[2].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[4].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[6].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[8].weight, nonlinearity='relu') torch.nn.init.kaiming_normal_(self.linear_relu_stack[10].weight, nonlinearity='relu') def forward(self, x): x = self.flatten(x) logits = self.linear_relu_stack(x) return logits model = NeuralNetwork().to(device) print(model) ``` ```python loss_fn = torch.nn.MSELoss() optimizer = config["optimizer"](model.parameters(), lr=config["lr"]) ``` ```python def loss_bch(model, loss_func, xb, yb, dev, opt=None): xb, yb = xb.to(dev), yb.to(dev) loss = loss_func(model(xb), yb) if opt is not None: loss.backward() opt.step() opt.zero_grad() return loss.item(), len(xb) def train(net, train_dataloader, loss_func, device, optimizer): net.train() loss, size = 0, 0 for b_idx, (xb, yb) in tqdm(enumerate(train_dataloader), total=len(train_dataloader), leave=False): b_loss, b_size = loss_bch(net, loss_func, xb, yb, device, optimizer) loss += b_loss * b_size size += b_size return loss / size def validate(net, val_dataloader, loss_func, device, optimizer=None): net.eval() with torch.no_grad(): losses, nums = zip( *[loss_bch(net, loss_func, xb, yb, device) for xb, yb in val_dataloader] ) return np.sum(np.multiply(losses, nums)) / np.sum(nums) def predict(net, data_loader, device): results = [] targets = [] net.eval() with torch.no_grad(): for xb, yb in data_loader: xb = xb.to(device) results += map(lambda x: x.item(), list(model(xb))) targets += map(lambda x: x.item(), list(yb)) return results, targets class EarlyStopper: def __init__(self, model, patience=1, min_delta=0): self.patience = patience self.min_delta = min_delta self.counter = 0 self.min_validation_loss = float('inf') self.model_path = "best_model.pt" self.model = model def early_stop(self, validation_loss): if validation_loss < self.min_validation_loss: self.min_validation_loss = validation_loss self.counter = 0 torch.save(self.model.state_dict(), self.model_path) elif validation_loss > (self.min_validation_loss + self.min_delta): self.counter += 1 if self.counter >= self.patience: return True return False def load_model(self): model = NeuralNetwork() model.load_state_dict(torch.load(self.model_path, weights_only=True)) model.eval() return model def fit(net, epochs, train_dataloader, val_dataloader, loss, optimizer, device): early_stopper = EarlyStopper(model=net, patience=20, min_delta=0.05) for epoch in tqdm(range(epochs)): train_loss = train(net, train_dataloader, loss, device, optimizer) val_loss = validate(net, val_dataloader, loss, device) pred, target = predict(net, val_dataloader, device) val_accuracy = accuracy_score(target, np.round(pred)) if early_stopper.early_stop(val_loss): net = early_stopper.load_model() break if (epoch % 10 == 0): print(f'\nepoch {epoch+1}/{epochs}, loss: {train_loss:.05f}, ' f'validation loss: {val_loss:.05f}, ' f'validation accuracy: {val_accuracy:.05f}') print("Training finished!") return net, validate(net, train_dataloader, loss, device), validate(net, val_dataloader, loss, device) ``` ## Upsampling ```python results = pd.DataFrame() results["name"] = "N/A" results["MSE"] = "N/A" results["acc"] = "N/A" results["bal acc"] = "N/A" ``` ### Create datasets ```python import imblearn ``` ```python train_valid_target.value_counts() ``` ```python train_valid_data_random, train_valid_target_random = imblearn.over_sampling.RandomOverSampler().fit_resample(train_valid_data, train_valid_target) train_data_random, train_target_random = imblearn.over_sampling.RandomOverSampler().fit_resample(train_data, train_target) valid_data_random, valid_target_random = imblearn.over_sampling.RandomOverSampler().fit_resample(valid_data, valid_target) # train_data_random, valid_data_random, train_target_random, valid_target_random = train_test_split(train_valid_data_random, train_valid_target_random, test_size=0.2, random_state=72) train_valid_target_random.value_counts() ``` ```python train_valid_data_smote, train_valid_target_smote = imblearn.over_sampling.SMOTE(k_neighbors=4).fit_resample(train_valid_data, train_valid_target) train_data_smote, train_target_smote = imblearn.over_sampling.SMOTE(k_neighbors=2).fit_resample(train_data, train_target) valid_data_smote, valid_target_smote = imblearn.over_sampling.SMOTE(k_neighbors=1).fit_resample(valid_data, valid_target) # train_data_smote, valid_data_smote, train_target_smote, valid_target_smote = train_test_split(train_valid_data_smote, train_valid_target_smote, test_size=0.2, random_state=72) train_valid_target_smote.value_counts() ``` ```python train_valid_data_KMsmote, train_valid_target_KMsmote = imblearn.over_sampling.KMeansSMOTE(cluster_balance_threshold=0.001, random_state=82).fit_resample(train_valid_data, train_valid_target) train_data_KMsmote, train_target_KMsmote = imblearn.over_sampling.KMeansSMOTE(k_neighbors=1, cluster_balance_threshold=0.001, random_state=182).fit_resample(train_data, train_target) valid_data_KMsmote, valid_target_KMsmote = imblearn.over_sampling.KMeansSMOTE(k_neighbors=1, cluster_balance_threshold=0.001, random_state=22).fit_resample(valid_data, valid_target) # train_data_KMsmote, valid_data_KMsmote, train_target_KMsmote, valid_target_KMsmote = train_test_split(train_valid_data_KMsmote, train_valid_target_KMsmote, test_size=0.2) train_valid_target_KMsmote.value_counts() ``` ```python train_valid_data["quality"] = train_valid_target train_valid_data.to_csv("train_valid_to_chat.csv", index=False) train_valid_data = train_valid_data.drop(["quality"], axis=1) ``` ChatGPT promt: For the uploaded dataset, do upsampling so that the number of samples is balanced for the quality attribute. The dataset contains physicochemical analysis of northern Portuguese wines. For upsampling interpret each sample as row and predict the data only based on your opinion. For upsampling use deep AI generative approach. Provide me upsampled dataset. Link to chat: https://chatgpt.com/share/68053f08-2e4c-8011-a21c-1b7f9f6cacd5 ```python train_valid_GPT = pd.read_csv("train_valid_chatGPT.csv") train_valid_data_GPT, train_valid_target_GPT = train_valid_GPT.drop(["quality"], axis=1), train_valid_GPT["quality"] train_valid_data_GPT["is_red"] = train_valid_data_GPT["is_red"].apply(lambda x: x != 'False' and (x == 'True' or float(x) >= 0.5)) train_data_GPT, valid_data_GPT, train_target_GPT, valid_target_GPT = train_test_split(train_valid_data_GPT, train_valid_target_GPT, test_size=0.2, random_state=72) train_valid_target_GPT.value_counts() ``` ### NB ```python method = "NB" for name, data, target in [("no_upsampling", train_valid_data, train_valid_target), ("random", train_valid_data_random, train_valid_target_random), ("SMOTE", train_valid_data_smote, train_valid_target_smote), ("KMsmote", train_valid_data_KMsmote, train_valid_target_KMsmote), ("GPT", train_valid_data_GPT, train_valid_target_GPT),]: pipeline = make_pipeline(StandardScaler(), PCA(), GaussianNB()) param_grid = {"pca__n_components": list(range(2, 13)),} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=5).fit(data, target) print(name, ":", search.best_params_) print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") results = pd.concat([results, pd.DataFrame({"name": [name + " " + method], "MSE": [mean_squared_error(test_target, search.best_estimator_.predict(test_data))], "acc": [accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))], "bal acc": [balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))] })]) ``` ### Tree ```python method = "Tree" for name, data, target in [("no_upsampling", train_valid_data, train_valid_target), ("random", train_valid_data_random, train_valid_target_random), ("SMOTE", train_valid_data_smote, train_valid_target_smote), ("KMsmote", train_valid_data_KMsmote, train_valid_target_KMsmote), ("GPT", train_valid_data_GPT, train_valid_target_GPT),]: pipeline = Pipeline([("scaler", StandardScaler()), ("rTree", DecisionTreeRegressor(criterion="squared_error"))]) param_grid = {"rTree__max_depth": list(range(1, 20)),} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=10, random_state=0).fit(data, target) print(name, ":", search.best_params_) print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") results = pd.concat([results, pd.DataFrame({"name": [name + " " + method], "MSE": [mean_squared_error(test_target, search.best_estimator_.predict(test_data))], "acc": [accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))], "bal acc": [balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))] })]) ``` ### LR ```python method = "LR" for name, data, target in [("no_upsampling", train_valid_data, train_valid_target), ("random", train_valid_data_random, train_valid_target_random), ("SMOTE", train_valid_data_smote, train_valid_target_smote), ("KMsmote", train_valid_data_KMsmote, train_valid_target_KMsmote), ("GPT", train_valid_data_GPT, train_valid_target_GPT),]: pipeline = Pipeline([("scaler", StandardScaler()), ("pca", PCA()), ("ridge", Ridge())]) param_grid = {"ridge__alpha": [i for i in range(1, 50)], "pca__n_components": list(range(2, 13))} search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", cv=5).fit(data, target) print(search.best_params_) print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") results = pd.concat([results, pd.DataFrame({"name": [name + " " + method], "MSE": [mean_squared_error(test_target, search.best_estimator_.predict(test_data))], "acc": [accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))], "bal acc": [balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))] })]) ``` ### SVM ```python method = "SVM" param_grid = [ { "svr__kernel": ["linear"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], }, { "svr__kernel": ["poly"], "svr__C": [0.1, 1, 10], "svr__degree": list(range(2, 4)), "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, { "svr__kernel": ["rbf"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, { "svr__kernel": ["sigmoid"], "svr__C": [0.1, 1, 10], "svr__epsilon": [0, 0.1, 1, 10], "svr__gamma": ["scale", "auto"], }, ] for name, data, target in [("no_upsampling", train_valid_data, train_valid_target), ("random", train_valid_data_random, train_valid_target_random), ("SMOTE", train_valid_data_smote, train_valid_target_smote), ("KMsmote", train_valid_data_KMsmote, train_valid_target_KMsmote), ("GPT", train_valid_data_GPT, train_valid_target_GPT),]: pipeline = Pipeline([("scaler", StandardScaler()), ("svr", SVR())]) search = HalvingGridSearchCV(pipeline, param_grid, scoring="neg_mean_squared_error", ).fit(data, target) print(name, ":", search.best_params_) print("mean_squared_error Train: ", mean_squared_error(train_valid_target, search.best_estimator_.predict(train_valid_data))) print("mean_squared_error Test: ", mean_squared_error(test_target, search.best_estimator_.predict(test_data))) print("accuracy Train: ", accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("accuracy Test: ", accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("balanced accuracy Train: ", balanced_accuracy_score(train_valid_target, np.round(search.best_estimator_.predict(train_valid_data)))) print("balanced accuracy Test: ", balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))) print("") results = pd.concat([results, pd.DataFrame({"name": [name + " " + method], "MSE": [mean_squared_error(test_target, search.best_estimator_.predict(test_data))], "acc": [accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))], "bal acc": [balanced_accuracy_score(test_target, np.round(search.best_estimator_.predict(test_data)))] })]) ``` ### NN ```python method = "NN" for name, tr_data, tr_target, vl_data, vl_target in [ ("no_upsampling", train_data, train_target, valid_data, valid_target), ("random", train_data_random, train_target_random, valid_data, valid_target), ("SMOTE", train_data_smote, train_target_smote, valid_data, valid_target), ("KMsmote", train_data_KMsmote, train_target_KMsmote, valid_data, valid_target), ("GPT", train_data_GPT, train_target_GPT, valid_data, valid_target),]: train_dataset = WineDataset(tr_data, tr_target) valid_dataset = WineDataset(vl_data, vl_target) training_loader = torch.utils.data.DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True) validation_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False) model = NeuralNetwork().to(device) loss_fn = torch.nn.MSELoss() optimizer = config["optimizer"](model.parameters(), lr=config["lr"]) model, train_loss, val_loss = fit(model, config["epochs"], training_loader, validation_loader, loss_fn, optimizer, device) print(name, ":", train_loss, val_loss) tr_pred, tr_target = predict(model, training_loader, device) val_pred, val_target = predict(model, validation_loader, device) train_valid_predictions, tr_val_target = tr_pred + val_pred, tr_target + val_target test_predictions, ts_target = predict(model, test_loader, device) print("mean_squared_error: ", mean_squared_error(tr_val_target, train_valid_predictions)) print("mean_squared_error: ", mean_squared_error(ts_target, test_predictions)) print("accuracy Train: ", accuracy_score(tr_val_target, np.round(train_valid_predictions))) print("accuracy Test: ", accuracy_score(ts_target, np.round(test_predictions))) print("balanced accuracy Train: ", balanced_accuracy_score(tr_val_target, np.round(train_valid_predictions))) print("balanced accuracy Test: ", balanced_accuracy_score(ts_target, np.round(test_predictions))) print("") results = pd.concat([results, pd.DataFrame({"name": [name + " " + method], "MSE": [mean_squared_error(test_target, test_predictions)], "acc": [accuracy_score(test_target, np.round(test_predictions))], "bal acc": [balanced_accuracy_score(test_target, np.round(test_predictions))] })]) ``` ### Results ```python results ``` ```python # results = results[~results.upsampling.isna()] # results = results.drop(["name"], axis=1) ``` ```python # results["n"] = results["name"] # results["type"] = results.apply(lambda x: x.n if str(x.n) != "nan" else x.type, axis=1) # results["upsampling"] = results.apply(lambda x: x.n if str(x.n) != "nan" else x.upsampling, axis=1) # results = results.drop(["name"], axis=1) # results = results.drop(["n"], axis=1) # results.head() ``` ```python # results["type"] = results["name"].apply(lambda x: x.split()[-1]) # results["upsampling"] = results["name"].apply(lambda x: x.split()[0]) # results = results.reset_index(drop=True) # results = results.drop(["name"], axis=1) # results ``` ```python results.to_csv("upsampling_whole_sep.csv") ```