domain_mapping/biomedical_train/ncbi/prediction/predict.py

237 lines
6.1 KiB
Python

# %%
# from datasets import load_from_disk
import os
import glob
os.environ['NCCL_P2P_DISABLE'] = '1'
os.environ['NCCL_IB_DISABLE'] = '1'
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
import re
import torch
from torch.utils.data import DataLoader
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
DataCollatorWithPadding,
)
import evaluate
import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt
from datasets import Dataset, DatasetDict
from tqdm import tqdm
torch.set_float32_matmul_precision('high')
BATCH_SIZE = 256
# %%
# construct the target id list
data_path = '../../../biomedical_data_import/bc2gm_train.csv'
train_df = pd.read_csv(data_path, skipinitialspace=True)
entity_ids = train_df['entity_id'].to_list()
target_id_list = sorted(list(set(entity_ids)))
# target_id_list = [id for id in target_id_list]
# %%
id2label = {}
label2id = {}
for idx, val in enumerate(target_id_list):
id2label[idx] = val
label2id[val] = idx
# introduce pre-processing functions
def preprocess_text(text):
# 1. Make all uppercase
text = text.lower()
# Substitute digits with '#'
# text = re.sub(r'\d+', '#', text)
# standardize spacing
text = re.sub(r'\s+', ' ', text).strip()
return text
# outputs a list of dictionaries
# processes dataframe into lists of dictionaries
# each element maps input to output
# input: tag_description
# output: class label
def process_df_to_dict(df):
output_list = []
for _, row in df.iterrows():
desc = row['mention']
desc = preprocess_text(desc)
row_id = row['entity_id']
element = {
'text' : desc,
'labels': label2id[row_id], # ensure labels starts from 0
}
output_list.append(element)
return output_list
def create_dataset():
# train
data_path = '../../../biomedical_data_import/bc2gm_test.csv'
test_df = pd.read_csv(data_path, skipinitialspace=True)
combined_data = DatasetDict({
'test': Dataset.from_list(process_df_to_dict(test_df)),
})
return combined_data
# %%
def test():
test_dataset = create_dataset()
# prepare tokenizer
checkpoint_directory = f'../checkpoint'
# Use glob to find matching paths
# path is usually checkpoint_fold_1/checkpoint-<step number>
# we are guaranteed to save only 1 checkpoint from training
pattern = 'checkpoint-*'
model_checkpoint = glob.glob(os.path.join(checkpoint_directory, pattern))[0]
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True)
# %%
# given a dataset entry, run it through the tokenizer
def preprocess_function(example):
input = example['text']
# text_target sets the corresponding label to inputs
# there is no need to create a separate 'labels'
model_inputs = tokenizer(
input,
truncation=True,
)
return model_inputs
# map maps function to each "row" in the dataset
# aka the data in the immediate nesting
datasets = test_dataset.map(
preprocess_function,
batched=True,
num_proc=8,
remove_columns="text",
)
datasets.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
# print datasets['test'] columns
column_info = datasets['test'].features
for column, dtype in column_info.items():
print(f"Column: {column}, Type: {dtype}")
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint,
num_labels=len(target_id_list),
id2label=id2label,
label2id=label2id)
# important! after extending tokens vocab
model.resize_token_embeddings(len(tokenizer))
model = model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
pred_labels = []
actual_labels = []
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
dataloader = DataLoader(
datasets['test'],
batch_size=BATCH_SIZE,
shuffle=False,
collate_fn=data_collator)
for batch in tqdm(dataloader):
# Inference in batches
input_ids = batch['input_ids']
attention_mask = batch['attention_mask']
# save labels too
actual_labels.extend(batch['labels'])
# Move to GPU if available
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)
# Perform inference
with torch.no_grad():
logits = model(
input_ids,
attention_mask).logits
predicted_class_ids = logits.argmax(dim=1).to("cpu")
pred_labels.extend(predicted_class_ids)
pred_labels = [tensor.item() for tensor in pred_labels]
# %%
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
y_true = actual_labels
y_pred = pred_labels
# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
average_parameter = 'weighted'
zero_division_parameter = 0
f1 = f1_score(y_true, y_pred, average=average_parameter, zero_division=zero_division_parameter)
precision = precision_score(y_true, y_pred, average=average_parameter, zero_division=zero_division_parameter)
recall = recall_score(y_true, y_pred, average=average_parameter, zero_division=zero_division_parameter)
with open("output.txt", "a") as f:
print('*' * 80, file=f)
# Print the results
print(f'Accuracy: {accuracy:.5f}', file=f)
print(f'F1 Score: {f1:.5f}', file=f)
print(f'Precision: {precision:.5f}', file=f)
print(f'Recall: {recall:.5f}', file=f)
# export result
label_list = [id2label[id] for id in pred_labels]
df = pd.DataFrame({
'class_prediction': pd.Series(label_list)
})
# we can save the t5 generation output here
df.to_csv(f"exports/result.csv", index=False)
# %%
# reset file before writing to it
with open("output.txt", "w") as f:
print('', file=f)
test()