# %% import pandas as pd import numpy as np from typing import List from tqdm import tqdm from utils import Retriever, cosine_similarity_chunked import glob import os # %% class Embedder(): input_df: pd.DataFrame fold: int def __init__(self, input_df): self.input_df = input_df def make_embedding(self, checkpoint_path): def generate_input_list(df): input_list = [] for _, row in df.iterrows(): # name = f"{row['tag_name']}" desc = f"{row['tag_description']}" # element = f"{name}{desc}" element = f"{desc}" input_list.append(element) return input_list # prepare reference embed train_data = list(generate_input_list(self.input_df)) # Define the directory and the pattern retriever_train = Retriever(train_data, checkpoint_path) retriever_train.make_mean_embedding(batch_size=64) return retriever_train.embeddings.to('cpu') # %% # input data fold = 2 data_path = f"../../data_preprocess/exports/dataset/group_{fold}/test_all.csv" test_df = pd.read_csv(data_path, skipinitialspace=True) ships_list = list(set(test_df['ships_idx'])) # %% data_path = '../../data_preprocess/exports/preprocessed_data.csv' full_df = pd.read_csv(data_path, skipinitialspace=True) train_df = full_df[~full_df['ships_idx'].isin(ships_list)] # %% checkpoint_directory = "../../train/baseline" directory = os.path.join(checkpoint_directory, f'checkpoint_fold_{fold}') # Use glob to find matching paths # path is usually checkpoint_fold_1/checkpoint- # we are guaranteed to save only 1 checkpoint from training pattern = 'checkpoint-*' checkpoint_path = glob.glob(os.path.join(directory, pattern))[0] train_embedder = Embedder(input_df=train_df) train_embeds = train_embedder.make_embedding(checkpoint_path) # %% train_embeds.shape # %% # now we need to generate the class labels data_path = '../../data_import/exports/data_mapping_mdm.csv' full_df = pd.read_csv(data_path, skipinitialspace=True) mdm_list = sorted(list((set(full_df['pattern'])))) # %% # based on the mdm_labels, we assign a value to the dataframe def generate_labels(df, mdm_list): label_list = [] for _, row in df.iterrows(): pattern = row['pattern'] try: index = mdm_list.index(pattern) label_list.append(index + 1) except ValueError: label_list.append(0) return label_list # %% label_list = generate_labels(train_df, mdm_list) # %% from collections import Counter frequency = Counter(label_list) frequency #################################################### # %% # we can start classifying # %% import torch import torch.nn as nn import torch.optim as optim device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Define the neural network with non-linearity class NeuralNet(nn.Module): def __init__(self, input_dim, output_dim): super(NeuralNet, self).__init__() self.fc1 = nn.Linear(input_dim, 512) # First layer (input to hidden) self.relu = nn.ReLU() # Non-linearity self.fc2 = nn.Linear(512, 256) # Output layer self.fc3 = nn.Linear(256, output_dim) def forward(self, x): out = self.fc1(x) # Input to hidden out = self.relu(out) # Apply non-linearity out = self.fc2(out) # Hidden to output out = self.relu(out) out = self.fc3(out) return out # Example usage input_dim = 512 # Example input dimension (adjust based on your mean embedding size) output_dim = 203 # 202 classes + 1 out model = NeuralNet(input_dim, output_dim) model = torch.compile(model) model = model.to(device) torch.set_float32_matmul_precision('high') # %% from torch.utils.data import DataLoader, TensorDataset # Example mean embeddings and labels (replace these with your actual data) # mean_embeddings = torch.randn(1000, embedding_dim) # 1000 samples of embedding_dim size mean_embeddings = train_embeds # labels = torch.randint(0, 2, (1000,)) # Random binary labels (0 for OOD, 1 for ID) train_labels = generate_labels(train_df, mdm_list) labels = torch.tensor(train_labels) # Create a dataset and DataLoader dataset = TensorDataset(mean_embeddings, labels) dataloader = DataLoader(dataset, batch_size=256, shuffle=True) # %% # Define loss function and optimizer # criterion = nn.BCELoss() # Binary cross entropy loss # criterion = nn.BCEWithLogitsLoss() criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=1e-4) # Define the scheduler # Training loop num_epochs = 200 # Adjust as needed # Define the lambda function for linear decay # It should return the multiplier for the learning rate (starts at 1.0 and goes to 0) def linear_decay(epoch): return 1 - epoch / num_epochs scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=linear_decay) for epoch in range(num_epochs): model.train() running_loss = 0.0 for inputs, targets in dataloader: # Forward pass inputs = inputs.to(device) targets = targets.to(device) outputs = model(inputs) # loss = criterion(outputs.squeeze(), targets.float().squeeze()) # Ensure the target is float loss = criterion(outputs, targets) # Backward pass and optimization optimizer.zero_grad() loss.backward() optimizer.step() running_loss += loss.item() scheduler.step() print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(dataloader)}") # %% data_path = f"../../data_preprocess/exports/dataset/group_{fold}/test_all.csv" test_df = pd.read_csv(data_path, skipinitialspace=True) checkpoint_directory = "../../train/baseline" directory = os.path.join(checkpoint_directory, f'checkpoint_fold_{fold}') # Use glob to find matching paths # path is usually checkpoint_fold_1/checkpoint- # we are guaranteed to save only 1 checkpoint from training pattern = 'checkpoint-*' checkpoint_path = glob.glob(os.path.join(directory, pattern))[0] test_embedder = Embedder(input_df=test_df) test_embeds = test_embedder.make_embedding(checkpoint_path) test_labels = generate_labels(test_df, mdm_list) # %% mean_embeddings = test_embeds labels = torch.tensor(test_labels) dataset = TensorDataset(mean_embeddings, labels) dataloader = DataLoader(dataset, batch_size=64, shuffle=False) model.eval() output_classes = [] output_probs = [] for inputs, _ in dataloader: with torch.no_grad(): inputs = inputs.to(device) logits = model(inputs) probabilities = torch.softmax(logits, dim=1) # predicted_classes = torch.argmax(probabilities, dim=1) max_probabilities, predicted_classes = torch.max(probabilities, dim=1) output_classes.extend(predicted_classes.to('cpu').numpy()) output_probs.extend(max_probabilities.to('cpu').numpy()) # %% # evaluation from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix y_true = test_labels y_pred = output_classes # Compute metrics accuracy = accuracy_score(y_true, y_pred) f1 = f1_score(y_true, y_pred, average='macro') precision = precision_score(y_true, y_pred, average='macro') recall = recall_score(y_true, y_pred, average='macro') # Print the results print(f'Accuracy: {accuracy:.2f}') print(f'F1 Score: {f1:.2f}') print(f'Precision: {precision:.2f}') print(f'Recall: {recall:.2f}') # %%