# %% # from datasets import load_from_disk import os import glob os.environ['NCCL_P2P_DISABLE'] = '1' os.environ['NCCL_IB_DISABLE'] = '1' os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" import torch from torch.utils.data import DataLoader from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, Trainer, EarlyStoppingCallback, TrainingArguments ) import evaluate import numpy as np import pandas as pd # import matplotlib.pyplot as plt from datasets import Dataset, DatasetDict from tqdm import tqdm torch.set_float32_matmul_precision('high') # %% # we need to create the mdm_list # import the full mdm-only file data_path = '../../../data_import/exports/data_mapping_mdm.csv' full_df = pd.read_csv(data_path, skipinitialspace=True) mdm_list = sorted(list((set(full_df['pattern'])))) # %% id2label = {} label2id = {} for idx, val in enumerate(mdm_list): id2label[idx] = val label2id[val] = idx # %% # outputs a list of dictionaries # processes dataframe into lists of dictionaries # each element maps input to output # input: tag_description # output: class label def process_df_to_dict(df, mdm_list): output_list = [] for _, row in df.iterrows(): desc = f"{row['tag_description']}" pattern = row['pattern'] try: index = mdm_list.index(pattern) except ValueError: index = -1 element = { 'text' : f"{desc}", 'label': index, } output_list.append(element) return output_list def create_dataset(fold, mdm_list): data_path = f"../../../data_preprocess/exports/dataset/group_{fold}/test_all.csv" test_df = pd.read_csv(data_path, skipinitialspace=True) # we only use the mdm subset test_df = test_df[test_df['MDM']].reset_index(drop=True) test_dataset = Dataset.from_list(process_df_to_dict(test_df, mdm_list)) return test_dataset # %% # function to perform training for a given fold # def train(fold): fold = 1 test_dataset = create_dataset(fold, mdm_list) # prepare tokenizer checkpoint_directory = f'../checkpoint_fold_{fold}' # Use glob to find matching paths # path is usually checkpoint_fold_1/checkpoint- # we are guaranteed to save only 1 checkpoint from training pattern = 'checkpoint-*' model_checkpoint = glob.glob(os.path.join(checkpoint_directory, pattern))[0] tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True) # Define additional special tokens # additional_special_tokens = ["", "", "", "", "", "", "", "", ""] # Add the additional special tokens to the tokenizer # tokenizer.add_special_tokens({"additional_special_tokens": additional_special_tokens}) # %% # compute max token length max_length = 0 for sample in test_dataset['text']: # Tokenize the sample and get the length input_ids = tokenizer(sample, truncation=False, add_special_tokens=True)["input_ids"] length = len(input_ids) # Update max_length if this sample is longer if length > max_length: max_length = length print(max_length) # %% max_length = 64 # given a dataset entry, run it through the tokenizer def preprocess_function(example): input = example['text'] # text_target sets the corresponding label to inputs # there is no need to create a separate 'labels' model_inputs = tokenizer( input, max_length=max_length, # truncation=True, padding='max_length' ) return model_inputs # map maps function to each "row" in the dataset # aka the data in the immediate nesting datasets = test_dataset.map( preprocess_function, batched=True, num_proc=8, remove_columns="text", ) datasets.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label']) # %% temp # tokenized_datasets['train'].rename_columns() # %% # create data collator data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding="max_length") # %% # compute metrics # metric = evaluate.load("accuracy") # # # def compute_metrics(eval_preds): # preds, labels = eval_preds # preds = np.argmax(preds, axis=1) # return metric.compute(predictions=preds, references=labels) model = AutoModelForSequenceClassification.from_pretrained( model_checkpoint, num_labels=len(mdm_list), id2label=id2label, label2id=label2id) # important! after extending tokens vocab model.resize_token_embeddings(len(tokenizer)) model = model.eval() device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) pred_labels = [] actual_labels = [] BATCH_SIZE = 64 dataloader = DataLoader(datasets, batch_size=BATCH_SIZE, shuffle=False) for batch in tqdm(dataloader): # Inference in batches input_ids = batch['input_ids'] attention_mask = batch['attention_mask'] # save labels too actual_labels.extend(batch['label']) # Move to GPU if available input_ids = input_ids.to(device) attention_mask = attention_mask.to(device) # Perform inference with torch.no_grad(): logits = model( input_ids, attention_mask).logits predicted_class_ids = logits.argmax(dim=1).to("cpu") pred_labels.extend(predicted_class_ids) pred_labels = [tensor.item() for tensor in pred_labels] # %% from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix y_true = actual_labels y_pred = pred_labels # Compute metrics accuracy = accuracy_score(y_true, y_pred) f1 = f1_score(y_true, y_pred, average='macro') precision = precision_score(y_true, y_pred, average='macro') recall = recall_score(y_true, y_pred, average='macro') # Print the results print(f'Accuracy: {accuracy:.2f}') print(f'F1 Score: {f1:.2f}') print(f'Precision: {precision:.2f}') print(f'Recall: {recall:.2f}') # %%