81 lines
3.3 KiB
Python
81 lines
3.3 KiB
Python
|
import torch
|
||
|
from transformers import (
|
||
|
AutoTokenizer,
|
||
|
AutoModelForSequenceClassification,
|
||
|
DataCollatorWithPadding,
|
||
|
)
|
||
|
import torch.nn.functional as F
|
||
|
|
||
|
|
||
|
|
||
|
class Retriever:
|
||
|
def __init__(self, input_texts, model_checkpoint):
|
||
|
# we need to generate the embedding from list of input strings
|
||
|
self.embeddings = []
|
||
|
self.inputs = input_texts
|
||
|
model_checkpoint = model_checkpoint
|
||
|
self.tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True)
|
||
|
|
||
|
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint)
|
||
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||
|
# device = "cpu"
|
||
|
model.to(self.device)
|
||
|
self.model = model.eval()
|
||
|
|
||
|
|
||
|
def make_embedding(self, batch_size=64):
|
||
|
all_embeddings = self.embeddings
|
||
|
input_texts = self.inputs
|
||
|
|
||
|
for i in range(0, len(input_texts), batch_size):
|
||
|
batch_texts = input_texts[i:i+batch_size]
|
||
|
# Tokenize the input text
|
||
|
inputs = self.tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=64)
|
||
|
input_ids = inputs.input_ids.to(self.device)
|
||
|
attention_mask = inputs.attention_mask.to(self.device)
|
||
|
|
||
|
|
||
|
# Pass the input through the encoder and retrieve the embeddings
|
||
|
with torch.no_grad():
|
||
|
encoder_outputs = self.model(input_ids, attention_mask=attention_mask, output_hidden_states=True)
|
||
|
# get last layer
|
||
|
embeddings = encoder_outputs.hidden_states[-1]
|
||
|
# get cls token embedding
|
||
|
cls_embeddings = embeddings[:, 0, :] # Shape: (batch_size, hidden_size)
|
||
|
all_embeddings.append(cls_embeddings)
|
||
|
|
||
|
# remove the batch list and makes a single large tensor, dim=0 increases row-wise
|
||
|
all_embeddings = torch.cat(all_embeddings, dim=0)
|
||
|
|
||
|
self.embeddings = all_embeddings
|
||
|
|
||
|
def cosine_similarity_chunked(batch1, batch2, chunk_size=1024):
|
||
|
device = 'cuda'
|
||
|
batch1_size = batch1.size(0)
|
||
|
batch2_size = batch2.size(0)
|
||
|
batch2.to(device)
|
||
|
|
||
|
# Prepare an empty tensor to store results
|
||
|
cos_sim = torch.empty(batch1_size, batch2_size, device=device)
|
||
|
|
||
|
# Process batch1 in chunks
|
||
|
for i in range(0, batch1_size, chunk_size):
|
||
|
batch1_chunk = batch1[i:i + chunk_size] # Get chunk of batch1
|
||
|
|
||
|
batch1_chunk.to(device)
|
||
|
# Expand batch1 chunk and entire batch2 for comparison
|
||
|
# batch1_chunk_exp = batch1_chunk.unsqueeze(1) # Shape: (chunk_size, 1, seq_len)
|
||
|
# batch2_exp = batch2.unsqueeze(0) # Shape: (1, batch2_size, seq_len)
|
||
|
batch2_norms = batch2.norm(dim=1, keepdim=True)
|
||
|
|
||
|
|
||
|
# Compute cosine similarity for the chunk and store it in the final tensor
|
||
|
# cos_sim[i:i + chunk_size] = F.cosine_similarity(batch1_chunk_exp, batch2_exp, dim=-1)
|
||
|
|
||
|
# Compute cosine similarity by matrix multiplication and normalizing
|
||
|
sim_chunk = torch.mm(batch1_chunk, batch2.T) / (batch1_chunk.norm(dim=1, keepdim=True) * batch2_norms.T + 1e-8)
|
||
|
|
||
|
# Store the results in the appropriate part of the final tensor
|
||
|
cos_sim[i:i + chunk_size] = sim_chunk
|
||
|
|
||
|
return cos_sim
|