domain_mapping/biomedical_train/ncbi/train.py

369 lines
10 KiB
Python

# %%
# from datasets import load_from_disk
import os
os.environ['NCCL_P2P_DISABLE'] = '1'
os.environ['NCCL_IB_DISABLE'] = '1'
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
import re
import random
import torch
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
DataCollatorWithPadding,
Trainer,
EarlyStoppingCallback,
TrainingArguments
)
import evaluate
import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt
from datasets import Dataset, DatasetDict
torch.set_float32_matmul_precision('high')
# %%
def set_seed(seed):
"""
Set the random seed for reproducibility.
"""
random.seed(seed) # Python random module
np.random.seed(seed) # NumPy random
torch.manual_seed(seed) # PyTorch CPU
torch.cuda.manual_seed(seed) # PyTorch GPU
torch.cuda.manual_seed_all(seed) # If using multiple GPUs
torch.backends.cudnn.deterministic = True # Ensure deterministic behavior
torch.backends.cudnn.benchmark = False # Disable optimization for reproducibility
set_seed(42)
SHUFFLES=0 # 0 shuffles means it does not re-sample
# %%
# We want to map the entity_id to a consecutive set of id's
# import training file
data_path = '../../biomedical_data_import/bc2gm_train.csv'
train_df = pd.read_csv(data_path, skipinitialspace=True)
# rather than use pattern, we use the real thing and property
entity_ids = train_df['entity_id'].to_list()
target_id_list = sorted(list(set(entity_ids)))
# %%
id2label = {}
label2id = {}
for idx, val in enumerate(target_id_list):
id2label[idx] = val
label2id[val] = idx
# %%
# introduce pre-processing functions
def preprocess_text(text):
# 1. Make all uppercase
text = text.lower()
# Substitute digits with 'x'
# text = re.sub(r'\d+', '#', text)
# standardize spacing
text = re.sub(r'\s+', ' ', text).strip()
return text
def generate_random_shuffles(text, n):
"""
Generate n strings with randomly shuffled words from the input text.
Args:
text (str): The input text.
n (int): The number of random variations to generate.
Returns:
list: A list of strings with shuffled words.
"""
words = text.split() # Split the input into words
shuffled_variations = []
for _ in range(n):
shuffled = words[:] # Copy the word list to avoid in-place modification
random.shuffle(shuffled) # Randomly shuffle the words
shuffled_variations.append(" ".join(shuffled)) # Join the words back into a string
return shuffled_variations
# generate n more shuffled examples
def shuffle_text(text, n_shuffles=SHUFFLES):
"""
Preprocess a list of texts and add n random shuffles for each string.
Args:
texts (list): An input strings.
n_shuffles (int): Number of random shuffles to generate for each string.
Returns:
list: A list of preprocessed and shuffled strings.
"""
all_processed = []
# add the original text
all_processed.append(text)
# Generate random shuffles
shuffled_variations = generate_random_shuffles(text, n_shuffles)
all_processed.extend(shuffled_variations)
return all_processed
######################################
# augmentation by text corruption
def corrupt_word(word):
"""Corrupt a single word using random corruption techniques."""
if len(word) <= 1: # Skip corruption for single-character words
return word
corruption_type = random.choice(["delete", "swap"])
if corruption_type == "delete":
# Randomly delete a character
idx = random.randint(0, len(word) - 1)
word = word[:idx] + word[idx + 1:]
elif corruption_type == "swap":
# Swap two adjacent characters
if len(word) > 1:
idx = random.randint(0, len(word) - 2)
word = (word[:idx] + word[idx + 1] + word[idx] + word[idx + 2:])
return word
def corrupt_string(sentence, corruption_probability=0.01):
"""Corrupt each word in the string with a given probability."""
words = sentence.split()
corrupted_words = [
corrupt_word(word) if random.random() < corruption_probability else word
for word in words
]
return " ".join(corrupted_words)
#############################################################
# Data Run code here
# outputs a list of dictionaries
# processes dataframe into lists of dictionaries
# each element maps input to output
# input: tag_description
# output: class label
def process_df_to_dict(df):
output_list = []
for _, row in df.iterrows():
# produce shuffling
index = row['entity_id']
parent_desc = row['mention']
if isinstance(parent_desc, float):
print(parent_desc)
parent_desc = f'{parent_desc}'
parent_desc = preprocess_text(parent_desc)
# unaugmented data
element = {
'text' : parent_desc,
'label': label2id[index], # ensure labels starts from 0
}
output_list.append(element)
# # short sequences are rare, and we must compensate by including more examples
# # mutation of other longer sequences might drown out rare short sequences
# words = parent_desc.split()
# word_count = len(words)
# if word_count < 3:
# for _ in range(10):
# element = {
# 'text': parent_desc,
# 'label': label2id[index],
# }
# output_list.append(element)
# add shuffled strings
processed_descs = shuffle_text(parent_desc, n_shuffles=SHUFFLES)
for desc in processed_descs:
if (desc != parent_desc):
element = {
'text' : desc,
'label': label2id[index], # ensure labels starts from 0
}
output_list.append(element)
# # corrupt string
# desc = corrupt_string(parent_desc, corruption_probability=0.1)
# if (desc != parent_desc):
# element = {
# 'text' : desc,
# 'label': label2id[index], # ensure labels starts from 0
# }
# output_list.append(element)
# # augmentation
# # remove all non-alphanumerics
# desc = re.sub(r'[^\w\s]', ' ', parent_desc) # Retains only alphanumeric and spaces
# if (desc != parent_desc):
# element = {
# 'text' : desc,
# 'label': label2id[index], # ensure labels starts from 0
# }
# output_list.append(element)
return output_list
def create_dataset():
# train
data_path = '../../biomedical_data_import/bc2gm_train.csv'
train_df = pd.read_csv(data_path, skipinitialspace=True)
combined_data = DatasetDict({
'train': Dataset.from_list(process_df_to_dict(train_df)),
})
return combined_data
# %%
#########################################
# training function
def train():
save_path = f'checkpoint'
split_datasets = create_dataset()
# prepare tokenizer
model_checkpoint = "distilbert/distilbert-base-uncased"
# model_checkpoint = 'google-bert/bert-base-cased'
# model_checkpoint = 'prajjwal1/bert-small'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True)
# max_length = 120
# given a dataset entry, run it through the tokenizer
def preprocess_function(example):
input = example['text']
# text_target sets the corresponding label to inputs
# there is no need to create a separate 'labels'
model_inputs = tokenizer(
input,
truncation=True, # enable truncation for efficiency
)
return model_inputs
# map maps function to each "row" in the dataset
# aka the data in the immediate nesting
tokenized_datasets = split_datasets.map(
preprocess_function,
batched=True,
num_proc=8,
remove_columns="text", # we only need the tokenization, not the original strings
)
# %%
# create data collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
# %%
# compute metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_preds):
preds, labels = eval_preds
preds = np.argmax(preds, axis=1)
return metric.compute(predictions=preds, references=labels)
# %%
# create id2label and label2id
# %%
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint,
num_labels=len(target_id_list),
id2label=id2label,
label2id=label2id)
# important! after extending tokens vocab
model.resize_token_embeddings(len(tokenizer))
# model = torch.compile(model, backend="inductor", dynamic=True)
# %%
# Trainer
training_args = TrainingArguments(
output_dir=f"{save_path}",
# eval_strategy="epoch",
eval_strategy="no",
logging_dir="tensorboard-log",
logging_strategy="epoch",
# save_strategy="epoch",
load_best_model_at_end=False,
learning_rate=1e-3,
per_device_train_batch_size=512,
# per_device_eval_batch_size=64,
auto_find_batch_size=False,
ddp_find_unused_parameters=False,
weight_decay=0.01,
save_total_limit=1,
num_train_epochs=40,
warmup_steps=400,
bf16=True,
push_to_hub=False,
remove_unused_columns=False,
)
trainer = Trainer(
model,
training_args,
train_dataset=tokenized_datasets["train"],
tokenizer=tokenizer,
data_collator=data_collator, # data_collator performs dynamic padding
compute_metrics=compute_metrics,
# callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
)
# uncomment to load training from checkpoint
# checkpoint_path = 'default_40_1/checkpoint-5600'
# trainer.train(resume_from_checkpoint=checkpoint_path)
trainer.train()
# execute training
train()
# %%