import torch from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, ) import torch.nn.functional as F class Retriever: def __init__(self, input_texts, model_checkpoint): # we need to generate the embedding from list of input strings self.embeddings = [] self.inputs = input_texts model_checkpoint = model_checkpoint self.tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True) model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # device = "cpu" model.to(self.device) self.model = model.eval() def make_embedding(self, batch_size=64): all_embeddings = self.embeddings input_texts = self.inputs for i in range(0, len(input_texts), batch_size): batch_texts = input_texts[i:i+batch_size] # Tokenize the input text inputs = self.tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=64) input_ids = inputs.input_ids.to(self.device) attention_mask = inputs.attention_mask.to(self.device) # Pass the input through the encoder and retrieve the embeddings with torch.no_grad(): encoder_outputs = self.model(input_ids, attention_mask=attention_mask, output_hidden_states=True) # get last layer embeddings = encoder_outputs.hidden_states[-1] # get cls token embedding cls_embeddings = embeddings[:, 0, :] # Shape: (batch_size, hidden_size) all_embeddings.append(cls_embeddings) # remove the batch list and makes a single large tensor, dim=0 increases row-wise all_embeddings = torch.cat(all_embeddings, dim=0) self.embeddings = all_embeddings def cosine_similarity_chunked(batch1, batch2, chunk_size=1024): device = 'cuda' batch1_size = batch1.size(0) batch2_size = batch2.size(0) batch2.to(device) # Prepare an empty tensor to store results cos_sim = torch.empty(batch1_size, batch2_size, device=device) # Process batch1 in chunks for i in range(0, batch1_size, chunk_size): batch1_chunk = batch1[i:i + chunk_size] # Get chunk of batch1 batch1_chunk.to(device) # Expand batch1 chunk and entire batch2 for comparison # batch1_chunk_exp = batch1_chunk.unsqueeze(1) # Shape: (chunk_size, 1, seq_len) # batch2_exp = batch2.unsqueeze(0) # Shape: (1, batch2_size, seq_len) batch2_norms = batch2.norm(dim=1, keepdim=True) # Compute cosine similarity for the chunk and store it in the final tensor # cos_sim[i:i + chunk_size] = F.cosine_similarity(batch1_chunk_exp, batch2_exp, dim=-1) # Compute cosine similarity by matrix multiplication and normalizing sim_chunk = torch.mm(batch1_chunk, batch2.T) / (batch1_chunk.norm(dim=1, keepdim=True) * batch2_norms.T + 1e-8) # Store the results in the appropriate part of the final tensor cos_sim[i:i + chunk_size] = sim_chunk return cos_sim