# %% # from datasets import load_from_disk import os os.environ['NCCL_P2P_DISABLE'] = '1' os.environ['NCCL_IB_DISABLE'] = '1' os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" import re import random import torch from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, Trainer, EarlyStoppingCallback, TrainingArguments ) import evaluate import numpy as np import pandas as pd # import matplotlib.pyplot as plt from datasets import Dataset, DatasetDict torch.set_float32_matmul_precision('high') # %% def set_seed(seed): """ Set the random seed for reproducibility. """ random.seed(seed) # Python random module np.random.seed(seed) # NumPy random torch.manual_seed(seed) # PyTorch CPU torch.cuda.manual_seed(seed) # PyTorch GPU torch.cuda.manual_seed_all(seed) # If using multiple GPUs torch.backends.cudnn.deterministic = True # Ensure deterministic behavior torch.backends.cudnn.benchmark = False # Disable optimization for reproducibility set_seed(42) SHUFFLES=0 # 0 shuffles means it does not re-sample # %% # We want to map the entity_id to a consecutive set of id's # import training file data_path = '../../biomedical_data_import/bc2gm_train.csv' train_df = pd.read_csv(data_path, skipinitialspace=True) # rather than use pattern, we use the real thing and property entity_ids = train_df['entity_id'].to_list() target_id_list = sorted(list(set(entity_ids))) # %% id2label = {} label2id = {} for idx, val in enumerate(target_id_list): id2label[idx] = val label2id[val] = idx # %% # introduce pre-processing functions def preprocess_text(text): # 1. Make all uppercase text = text.lower() # Substitute digits with 'x' # text = re.sub(r'\d+', '#', text) # standardize spacing text = re.sub(r'\s+', ' ', text).strip() return text def generate_random_shuffles(text, n): """ Generate n strings with randomly shuffled words from the input text. Args: text (str): The input text. n (int): The number of random variations to generate. Returns: list: A list of strings with shuffled words. """ words = text.split() # Split the input into words shuffled_variations = [] for _ in range(n): shuffled = words[:] # Copy the word list to avoid in-place modification random.shuffle(shuffled) # Randomly shuffle the words shuffled_variations.append(" ".join(shuffled)) # Join the words back into a string return shuffled_variations # generate n more shuffled examples def shuffle_text(text, n_shuffles=SHUFFLES): """ Preprocess a list of texts and add n random shuffles for each string. Args: texts (list): An input strings. n_shuffles (int): Number of random shuffles to generate for each string. Returns: list: A list of preprocessed and shuffled strings. """ all_processed = [] # add the original text all_processed.append(text) # Generate random shuffles shuffled_variations = generate_random_shuffles(text, n_shuffles) all_processed.extend(shuffled_variations) return all_processed ###################################### # augmentation by text corruption def corrupt_word(word): """Corrupt a single word using random corruption techniques.""" if len(word) <= 1: # Skip corruption for single-character words return word corruption_type = random.choice(["delete", "swap"]) if corruption_type == "delete": # Randomly delete a character idx = random.randint(0, len(word) - 1) word = word[:idx] + word[idx + 1:] elif corruption_type == "swap": # Swap two adjacent characters if len(word) > 1: idx = random.randint(0, len(word) - 2) word = (word[:idx] + word[idx + 1] + word[idx] + word[idx + 2:]) return word def corrupt_string(sentence, corruption_probability=0.01): """Corrupt each word in the string with a given probability.""" words = sentence.split() corrupted_words = [ corrupt_word(word) if random.random() < corruption_probability else word for word in words ] return " ".join(corrupted_words) ############################################################# # Data Run code here # outputs a list of dictionaries # processes dataframe into lists of dictionaries # each element maps input to output # input: tag_description # output: class label def process_df_to_dict(df): output_list = [] for _, row in df.iterrows(): # produce shuffling index = row['entity_id'] parent_desc = row['mention'] if isinstance(parent_desc, float): print(parent_desc) parent_desc = f'{parent_desc}' parent_desc = preprocess_text(parent_desc) # unaugmented data element = { 'text' : parent_desc, 'label': label2id[index], # ensure labels starts from 0 } output_list.append(element) # # short sequences are rare, and we must compensate by including more examples # # mutation of other longer sequences might drown out rare short sequences # words = parent_desc.split() # word_count = len(words) # if word_count < 3: # for _ in range(10): # element = { # 'text': parent_desc, # 'label': label2id[index], # } # output_list.append(element) # add shuffled strings processed_descs = shuffle_text(parent_desc, n_shuffles=SHUFFLES) for desc in processed_descs: if (desc != parent_desc): element = { 'text' : desc, 'label': label2id[index], # ensure labels starts from 0 } output_list.append(element) # # corrupt string # desc = corrupt_string(parent_desc, corruption_probability=0.1) # if (desc != parent_desc): # element = { # 'text' : desc, # 'label': label2id[index], # ensure labels starts from 0 # } # output_list.append(element) # # augmentation # # remove all non-alphanumerics # desc = re.sub(r'[^\w\s]', ' ', parent_desc) # Retains only alphanumeric and spaces # if (desc != parent_desc): # element = { # 'text' : desc, # 'label': label2id[index], # ensure labels starts from 0 # } # output_list.append(element) return output_list def create_dataset(): # train data_path = '../../biomedical_data_import/bc2gm_train.csv' train_df = pd.read_csv(data_path, skipinitialspace=True) combined_data = DatasetDict({ 'train': Dataset.from_list(process_df_to_dict(train_df)), }) return combined_data # %% ######################################### # training function def train(): save_path = f'checkpoint' split_datasets = create_dataset() # prepare tokenizer model_checkpoint = "distilbert/distilbert-base-uncased" # model_checkpoint = 'google-bert/bert-base-cased' # model_checkpoint = 'prajjwal1/bert-small' tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True) # max_length = 120 # given a dataset entry, run it through the tokenizer def preprocess_function(example): input = example['text'] # text_target sets the corresponding label to inputs # there is no need to create a separate 'labels' model_inputs = tokenizer( input, truncation=True, # enable truncation for efficiency ) return model_inputs # map maps function to each "row" in the dataset # aka the data in the immediate nesting tokenized_datasets = split_datasets.map( preprocess_function, batched=True, num_proc=8, remove_columns="text", # we only need the tokenization, not the original strings ) # %% # create data collator data_collator = DataCollatorWithPadding(tokenizer=tokenizer) # %% # compute metrics metric = evaluate.load("accuracy") def compute_metrics(eval_preds): preds, labels = eval_preds preds = np.argmax(preds, axis=1) return metric.compute(predictions=preds, references=labels) # %% # create id2label and label2id # %% model = AutoModelForSequenceClassification.from_pretrained( model_checkpoint, num_labels=len(target_id_list), id2label=id2label, label2id=label2id) # important! after extending tokens vocab model.resize_token_embeddings(len(tokenizer)) # model = torch.compile(model, backend="inductor", dynamic=True) # %% # Trainer training_args = TrainingArguments( output_dir=f"{save_path}", # eval_strategy="epoch", eval_strategy="no", logging_dir="tensorboard-log", logging_strategy="epoch", # save_strategy="epoch", load_best_model_at_end=False, learning_rate=1e-3, per_device_train_batch_size=512, # per_device_eval_batch_size=64, auto_find_batch_size=False, ddp_find_unused_parameters=False, weight_decay=0.01, save_total_limit=1, num_train_epochs=40, warmup_steps=400, bf16=True, push_to_hub=False, remove_unused_columns=False, ) trainer = Trainer( model, training_args, train_dataset=tokenized_datasets["train"], tokenizer=tokenizer, data_collator=data_collator, # data_collator performs dynamic padding compute_metrics=compute_metrics, # callbacks=[EarlyStoppingCallback(early_stopping_patience=3)], ) # uncomment to load training from checkpoint # checkpoint_path = 'default_40_1/checkpoint-5600' # trainer.train(resume_from_checkpoint=checkpoint_path) trainer.train() # execute training train() # %%