-
Notifications
You must be signed in to change notification settings - Fork 0
/
siamese_model.py.bak
137 lines (111 loc) · 5.29 KB
/
siamese_model.py.bak
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import json
import torch
from torch import nn
from transformers.models.t5.modeling_t5 import T5Stack, T5PreTrainedModel
from transformers import T5ForConditionalGeneration, T5Tokenizer, T5Config
from transformers.utils.model_parallel_utils import assert_device_map, get_device_map
class T5Siamese(T5PreTrainedModel):
def __init__(self, encoder_left=None, encoder_right=None, embeddings=None, config=None):
super(T5Siamese, self).__init__(config)
self.encoder_left = encoder_left
self.encoder_right = encoder_right
self.embeddings = embeddings
self.cos = nn.CosineSimilarity(dim=1)
self.config = config
@staticmethod
def init_from_base_t5_model(model_name_or_path='t5-base', output_root='./'):
MODEL = model_name_or_path
MODEL_OUTPUT = output_root
EMBEDDINGS_OUTPUT_FILE = os.path.join(MODEL_OUTPUT, 'embedding.pth')
os.makedirs(MODEL_OUTPUT, exist_ok=True)
tokenizer = T5Tokenizer.from_pretrained(MODEL)
model = T5ForConditionalGeneration.from_pretrained(MODEL)
left_dir = os.path.join(MODEL_OUTPUT, 'left')
right_dir = os.path.join(MODEL_OUTPUT, 'right')
os.makedirs(left_dir, exist_ok=True)
os.makedirs(right_dir, exist_ok=True)
torch.save(model.encoder.embed_tokens.state_dict(), EMBEDDINGS_OUTPUT_FILE)
model.encoder.save_pretrained(left_dir)
model.encoder.save_pretrained(right_dir)
tokenizer.save_pretrained(MODEL_OUTPUT)
model.config.save_pretrained(MODEL_OUTPUT)
@staticmethod
def from_pretrained(model_path, with_tokenizer=False):
MODEL_OUTPUT = model_path
EMBEDDINGS_OUTPUT_FILE = os.path.join(MODEL_OUTPUT, 'embedding.pth')
left_dir = os.path.join(MODEL_OUTPUT, 'left')
right_dir = os.path.join(MODEL_OUTPUT, 'right')
config_left = json.load(open(os.path.join(left_dir, 'config.json')))
embedding = nn.Embedding(config_left['vocab_size'], config_left['d_model'])
embedding.load_state_dict(torch.load(EMBEDDINGS_OUTPUT_FILE))
encoder_left = T5Stack.from_pretrained(left_dir, embed_tokens=embedding)
encoder_right = T5Stack.from_pretrained(right_dir, embed_tokens=embedding)
config = T5Config.from_pretrained(MODEL_OUTPUT)
if with_tokenizer:
tokenizer = T5Tokenizer.from_pretrained(MODEL_OUTPUT)
return tokenizer, T5Siamese(encoder_left=encoder_left,
encoder_right=encoder_right,
embeddings=embedding,
config=config)
return T5Siamese(encoder_left=encoder_left,
encoder_right=encoder_right,
embeddings=embedding)
def save_pretrained(self, model_path):
MODEL_OUTPUT = model_path
EMBEDDINGS_OUTPUT_FILE = os.path.join(MODEL_OUTPUT, 'embedding.pth')
left_dir = os.path.join(MODEL_OUTPUT, 'left')
right_dir = os.path.join(MODEL_OUTPUT, 'right')
os.makedirs(left_dir, exist_ok=True)
os.makedirs(right_dir, exist_ok=True)
torch.save(self.embeddings.state_dict(), EMBEDDINGS_OUTPUT_FILE)
self.encoder_left.save_pretrained(left_dir)
self.encoder_right.save_pretrained(right_dir)
def parallelize(self, device_map=None):
self.device_map = (
get_device_map(len(self.encoder_left.block), range(torch.cuda.device_count()))
if device_map is None
else device_map
)
assert_device_map(self.device_map, len(self.encoder_left.block))
self.encoder_left.parallelize(self.device_map)
self.encoder_right.parallelize(self.device_map)
self.model_parallel = True
def deparallelize(self):
self.encoder_left.deparallelize()
self.encoder_right.deparallelize()
self.encoder_left = self.encoder.to("cpu")
self.encoder_right = self.encoder.to("cpu")
self.model_parallel = False
self.device_map = None
torch.cuda.empty_cache()
def forward(self,
input_left_ids,
input_right_ids,
attention_mask_left,
attention_mask_right,
labels=None):
if input_left_ids.shape[0] != input_right_ids.shape[0]:
raise Exception('In cosine similarity you should pass equal size batches')
output_left = self.encoder_left(
input_ids=input_left_ids,
attention_mask=attention_mask_left,
return_dict=True
)
with torch.no_grad():
output_right = self.encoder_right(
input_ids=input_right_ids,
attention_mask=attention_mask_right,
return_dict=True
)
# get the final hidden states
emb_left = output_left.last_hidden_state # * batch_left_tensor["attention_mask"]
emb_right = output_right.last_hidden_state # * batch_right_tensor["attention_mask"]
emb_left = torch.mean(emb_left, dim=1)
emb_right = torch.mean(emb_right, dim=1)
cos = self.cos(emb_left, emb_right)
if labels is not None:
cross_entropy_loss = nn.BCEWithLogitsLoss()
loss = cross_entropy_loss(cos, labels)
return loss, cos
return cos