# %% # from datasets import load_from_disk import os os.environ['NCCL_P2P_DISABLE'] = '1' os.environ['NCCL_IB_DISABLE'] = '1' os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" import re import random import torch from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, Trainer, EarlyStoppingCallback, TrainingArguments ) import evaluate import numpy as np import pandas as pd # import matplotlib.pyplot as plt from datasets import Dataset, DatasetDict torch.set_float32_matmul_precision('high') # %% def set_seed(seed): """ Set the random seed for reproducibility. """ random.seed(seed) # Python random module np.random.seed(seed) # NumPy random torch.manual_seed(seed) # PyTorch CPU torch.cuda.manual_seed(seed) # PyTorch GPU torch.cuda.manual_seed_all(seed) # If using multiple GPUs torch.backends.cudnn.deterministic = True # Ensure deterministic behavior torch.backends.cudnn.benchmark = False # Disable optimization for reproducibility set_seed(42) SHUFFLES=2 # %% # import training file data_path = '../../esAppMod_data_import/train.csv' train_df = pd.read_csv(data_path, skipinitialspace=True) # rather than use pattern, we use the real thing and property entity_ids = train_df['entity_id'].to_list() target_id_list = sorted(list(set(entity_ids))) def compute_normalized_class_weights(class_counts, max_resamples=SHUFFLES): """ Compute normalized class weights inversely proportional to class counts. The weights are normalized so that they sum to 1. Args: class_counts (array-like): An array or list where each element represents the count of samples for a class. Returns: numpy.ndarray: A normalized array of weights for each class. """ class_counts = np.array(class_counts) total_samples = np.sum(class_counts) class_weights = total_samples / class_counts # so that highest weight is 1 normalized_weights = class_weights / np.max(class_weights) # Scale weights such that the highest weight corresponds to `max_resamples` resample_counts = normalized_weights * max_resamples # Round resamples to nearest integer resample_counts = np.round(resample_counts).astype(int) return resample_counts # %% id_counts = train_df['entity_id'].value_counts() id_weights = compute_normalized_class_weights(id_counts, max_resamples=SHUFFLES) id_index = id_counts.index label2weight = {} for idx, label in enumerate(id_index): label2weight[label] = id_weights[idx] # %% id2label = {} label2id = {} for idx, val in enumerate(target_id_list): id2label[idx] = val label2id[val] = idx # %% # introduce pre-processing functions def preprocess_text(text): # 1. Make all uppercase text = text.lower() # Remove any non alphanumeric character # text = re.sub(r'[^\w\s]', ' ', text) # Retains only alphanumeric and spaces # replace dashes text = re.sub(r"[-;:]", " ", text) # Add space between digit followed by a letter text = re.sub(r"(\d)([A-Z])", r"\1 \2", text) # Add space between letter followed by a digit text = re.sub(r"([A-Z])(\d)", r"\1 \2", text) # Substitute digits with 'x' text = re.sub(r'\d+', 'x', text) # standardize spacing text = re.sub(r'\s+', ' ', text).strip() return text def generate_random_shuffles(text, n): """ Generate n strings with randomly shuffled words from the input text. Args: text (str): The input text. n (int): The number of random variations to generate. Returns: list: A list of strings with shuffled words. """ words = text.split() # Split the input into words shuffled_variations = [] for _ in range(n): shuffled = words[:] # Copy the word list to avoid in-place modification random.shuffle(shuffled) # Randomly shuffle the words shuffled_variations.append(" ".join(shuffled)) # Join the words back into a string return shuffled_variations # generate n more shuffled examples def shuffle_text(text, n_shuffles=SHUFFLES): """ Preprocess a list of texts and add n random shuffles for each string. Args: texts (list): An input strings. n_shuffles (int): Number of random shuffles to generate for each string. Returns: list: A list of preprocessed and shuffled strings. """ all_processed = [] all_processed.append(text) # Generate random shuffles shuffled_variations = generate_random_shuffles(text, n_shuffles) all_processed.extend(shuffled_variations) return all_processed term_to_abbrev = { r'job entry system': 'jes', r'subversion': 'svn', r'borland database engine': 'bde', r'business intelligence and reporting tools': 'birt', r'lan management solution': 'lms', r'laboratory information management system': 'lims', r'ibm database 2': 'db/2', r'integrated development environment': 'ide', r'software development kit': 'sdk', r'hp operations orchestration': 'hpoo', r'hp server automation': 'hpsa', r'internet information server': 'iis', r'release 2': 'r2', r'red hat enterprise linux': 'rhel', r'oracle enterprise linux': 'oel', r'websphere application server': 'was', r'application development facility': 'adf', r'server analysis services': 'ssas' } abbrev_to_term = {rf'\b{value}\b': key for key, value in term_to_abbrev.items()} def replace_terms_with_abbreviations(text): for input, replacement in term_to_abbrev.items(): text = re.sub(input, replacement, text) return text def replace_abbreivations_with_terms(text): for input, replacement in abbrev_to_term.items(): text = re.sub(input, replacement, text) return text # outputs a list of dictionaries # processes dataframe into lists of dictionaries # each element maps input to output # input: tag_description # output: class label def process_df_to_dict(df): output_list = [] for _, row in df.iterrows(): # produce shuffling index = row['entity_id'] parent_desc = row['mention'] parent_desc = preprocess_text(parent_desc) # ensure at least 1 shuffle # no_of_shuffles = label2weight[index] + 1 no_of_shuffles = SHUFFLES processed_descs = shuffle_text(parent_desc, n_shuffles=no_of_shuffles) for desc in processed_descs: element = { 'text' : desc, 'label': label2id[index], # ensure labels starts from 0 } output_list.append(element) # perform abbrev_to_term desc = replace_terms_with_abbreviations(parent_desc) no_of_shuffles = SHUFFLES processed_descs = shuffle_text(desc, n_shuffles=no_of_shuffles) for desc in processed_descs: element = { 'text' : desc, 'label': label2id[index], # ensure labels starts from 0 } output_list.append(element) # perform term to abbrev desc = replace_abbreivations_with_terms(parent_desc) no_of_shuffles = SHUFFLES processed_descs = shuffle_text(desc, n_shuffles=no_of_shuffles) for desc in processed_descs: element = { 'text' : desc, 'label': label2id[index], # ensure labels starts from 0 } output_list.append(element) return output_list def create_dataset(): # train data_path = '../../esAppMod_data_import/train.csv' train_df = pd.read_csv(data_path, skipinitialspace=True) combined_data = DatasetDict({ 'train': Dataset.from_list(process_df_to_dict(train_df)), }) return combined_data # %% def train(): save_path = f'checkpoint' split_datasets = create_dataset() # prepare tokenizer model_checkpoint = "distilbert/distilbert-base-uncased" # model_checkpoint = 'google-bert/bert-base-cased' # model_checkpoint = 'prajjwal1/bert-small' tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True) # Define additional special tokens # additional_special_tokens = [""] # Add the additional special tokens to the tokenizer # tokenizer.add_special_tokens({"additional_special_tokens": additional_special_tokens}) max_length = 120 # given a dataset entry, run it through the tokenizer def preprocess_function(example): input = example['text'] # text_target sets the corresponding label to inputs # there is no need to create a separate 'labels' model_inputs = tokenizer( input, max_length=max_length, truncation=True, padding=True ) return model_inputs # map maps function to each "row" in the dataset # aka the data in the immediate nesting tokenized_datasets = split_datasets.map( preprocess_function, batched=True, num_proc=8, remove_columns="text", ) # %% temp # tokenized_datasets['train'].rename_columns() # %% # create data collator data_collator = DataCollatorWithPadding(tokenizer=tokenizer) # %% # compute metrics metric = evaluate.load("accuracy") def compute_metrics(eval_preds): preds, labels = eval_preds preds = np.argmax(preds, axis=1) return metric.compute(predictions=preds, references=labels) # %% # create id2label and label2id # %% model = AutoModelForSequenceClassification.from_pretrained( model_checkpoint, num_labels=len(target_id_list), id2label=id2label, label2id=label2id) # important! after extending tokens vocab model.resize_token_embeddings(len(tokenizer)) # model = torch.compile(model, backend="inductor", dynamic=True) # %% # Trainer training_args = TrainingArguments( output_dir=f"{save_path}", # eval_strategy="epoch", eval_strategy="no", logging_dir="tensorboard-log", logging_strategy="epoch", # save_strategy="epoch", load_best_model_at_end=False, learning_rate=5e-5, per_device_train_batch_size=64, per_device_eval_batch_size=64, auto_find_batch_size=False, ddp_find_unused_parameters=False, weight_decay=0.01, save_total_limit=1, num_train_epochs=80, warmup_steps=400, bf16=True, push_to_hub=False, remove_unused_columns=False, ) trainer = Trainer( model, training_args, train_dataset=tokenized_datasets["train"], tokenizer=tokenizer, data_collator=data_collator, compute_metrics=compute_metrics, # callbacks=[EarlyStoppingCallback(early_stopping_patience=3)], ) # uncomment to load training from checkpoint # checkpoint_path = 'default_40_1/checkpoint-5600' # trainer.train(resume_from_checkpoint=checkpoint_path) trainer.train() # execute training train() # %%