hipom_data_mapping/post_process/de_duplication/utils.py

131 lines
5.4 KiB
Python

import torch
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForSeq2SeqLM,
)
class BertEmbedder:
def __init__(self, input_texts, model_checkpoint):
# we need to generate the embedding from list of input strings
self.embeddings = []
self.inputs = input_texts
model_checkpoint = model_checkpoint
self.tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, return_tensors="pt", clean_up_tokenization_spaces=True)
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"
model.to(self.device)
self.model = model.eval()
def make_embedding(self, batch_size=128):
all_embeddings = self.embeddings
input_texts = self.inputs
for i in range(0, len(input_texts), batch_size):
batch_texts = input_texts[i:i+batch_size]
# Tokenize the input text
inputs = self.tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=120)
input_ids = inputs.input_ids.to(self.device)
attention_mask = inputs.attention_mask.to(self.device)
# Pass the input through the encoder and retrieve the embeddings
with torch.no_grad():
encoder_outputs = self.model(input_ids, attention_mask=attention_mask, output_hidden_states=True)
# get last layer
embeddings = encoder_outputs.hidden_states[-1]
# get cls token embedding
cls_embeddings = embeddings[:, 0, :] # Shape: (batch_size, hidden_size)
all_embeddings.append(cls_embeddings)
# remove the batch list and makes a single large tensor, dim=0 increases row-wise
all_embeddings = torch.cat(all_embeddings, dim=0)
self.embeddings = all_embeddings
class T5Embedder:
def __init__(self, input_texts, model_checkpoint):
# we need to generate the embedding from list of input strings
self.embeddings = []
self.inputs = input_texts
model_checkpoint = model_checkpoint
self.tokenizer = AutoTokenizer.from_pretrained("t5-base", return_tensors="pt", clean_up_tokenization_spaces=True)
# define additional special tokens
additional_special_tokens = ["<THING_START>", "<THING_END>", "<PROPERTY_START>", "<PROPERTY_END>", "<NAME>", "<DESC>", "<SIG>", "<UNIT>", "<DATA_TYPE>"]
# add the additional special tokens to the tokenizer
self.tokenizer.add_special_tokens({"additional_special_tokens": additional_special_tokens})
model = AutoModelForSeq2SeqLM.from_pretrained(model_checkpoint)
self.device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
# device = "cpu"
model.to(self.device)
self.model = model.eval()
def make_embedding(self, batch_size=128):
all_embeddings = self.embeddings
input_texts = self.inputs
for i in range(0, len(input_texts), batch_size):
batch_texts = input_texts[i:i+batch_size]
# Tokenize the input text
inputs = self.tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=128)
input_ids = inputs.input_ids.to(self.device)
attention_mask = inputs.attention_mask.to(self.device)
# Pass the input through the encoder and retrieve the embeddings
with torch.no_grad():
encoder_outputs = self.model.encoder(input_ids, attention_mask=attention_mask)
embeddings = encoder_outputs.last_hidden_state
# Compute the mean pooling of the token embeddings
# mean_embedding = embeddings.mean(dim=1)
mean_embedding = (embeddings * attention_mask.unsqueeze(-1)).sum(dim=1) / attention_mask.sum(dim=1, keepdim=True)
all_embeddings.append(mean_embedding)
# remove the batch list and makes a single large tensor, dim=0 increases row-wise
all_embeddings = torch.cat(all_embeddings, dim=0)
self.embeddings = all_embeddings
def cosine_similarity_chunked(batch1, batch2, chunk_size=1024):
device = 'cuda'
batch1_size = batch1.size(0)
batch2_size = batch2.size(0)
batch2.to(device)
# Prepare an empty tensor to store results
cos_sim = torch.empty(batch1_size, batch2_size, device=device)
# Process batch1 in chunks
for i in range(0, batch1_size, chunk_size):
batch1_chunk = batch1[i:i + chunk_size] # Get chunk of batch1
batch1_chunk.to(device)
# Expand batch1 chunk and entire batch2 for comparison
# batch1_chunk_exp = batch1_chunk.unsqueeze(1) # Shape: (chunk_size, 1, seq_len)
# batch2_exp = batch2.unsqueeze(0) # Shape: (1, batch2_size, seq_len)
batch2_norms = batch2.norm(dim=1, keepdim=True)
# Compute cosine similarity for the chunk and store it in the final tensor
# cos_sim[i:i + chunk_size] = F.cosine_similarity(batch1_chunk_exp, batch2_exp, dim=-1)
# Compute cosine similarity by matrix multiplication and normalizing
sim_chunk = torch.mm(batch1_chunk, batch2.T) / (batch1_chunk.norm(dim=1, keepdim=True) * batch2_norms.T + 1e-8)
# Store the results in the appropriate part of the final tensor
cos_sim[i:i + chunk_size] = sim_chunk
return cos_sim