|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import pickle |
|
|
import unicodedata |
|
|
import numpy as np |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CONFIG = { |
|
|
"NUM_LAYERS": 2, |
|
|
"MAX_VOCAB_OUT": 20000, |
|
|
"MAX_TEXT_LEN": 256, |
|
|
"MAX_SUMMARY_LEN": 50, |
|
|
"EMBED_DIM": 512, |
|
|
"NUM_HEADS": 4, |
|
|
"FF_DIM": 2048, |
|
|
"DROPOUT": 0.2, |
|
|
"TOKENIZER_FILE": "decoder_tokenizer_re.pkl", |
|
|
|
|
|
"WEIGHTS_FILE": "decoder_only.weights.h5" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_text_inference(text: str) -> str: |
|
|
if not text: |
|
|
return "" |
|
|
text = unicodedata.normalize("NFC", str(text)) |
|
|
text = re.sub(r"<.*?>", " ", text) |
|
|
|
|
|
text = "".join(ch if (ch.isalpha() or ch.isspace() or ch.isdigit() or ch in ['/', '-']) else " " for ch in text) |
|
|
text = text.lower() |
|
|
text = re.sub(r"\s+", " ", text).strip() |
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PhoBERTEncoderTorch: |
|
|
def __init__(self): |
|
|
print(">>> Loading PhoBERT (PyTorch)...") |
|
|
self.device = torch.device("cpu") |
|
|
self.model = AutoModel.from_pretrained("vinai/phobert-base").to(self.device) |
|
|
self.model.eval() |
|
|
print(">>> PhoBERT loaded successfully.") |
|
|
|
|
|
def encode(self, input_ids, attention_mask): |
|
|
with torch.no_grad(): |
|
|
|
|
|
ids = torch.tensor(input_ids, dtype=torch.long).to(self.device) |
|
|
mask = torch.tensor(attention_mask, dtype=torch.long).to(self.device) |
|
|
|
|
|
outputs = self.model(ids, attention_mask=mask) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return outputs.last_hidden_state.detach().cpu().numpy().astype(np.float32) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TF_AVAILABLE = True |
|
|
try: |
|
|
import tensorflow as tf |
|
|
except Exception as e: |
|
|
TF_AVAILABLE = False |
|
|
_TF_ERR = e |
|
|
tf = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if TF_AVAILABLE: |
|
|
class TransformerDecoderBlock(tf.keras.layers.Layer): |
|
|
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.att1 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) |
|
|
self.att2 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) |
|
|
self.ffn = tf.keras.Sequential([ |
|
|
tf.keras.layers.Dense(ff_dim, activation="relu"), |
|
|
tf.keras.layers.Dense(embed_dim), |
|
|
]) |
|
|
self.ln1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
|
self.ln2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
|
self.ln3 = tf.keras.layers.LayerNormalization(epsilon=1e-6) |
|
|
self.drop1 = tf.keras.layers.Dropout(rate) |
|
|
self.drop2 = tf.keras.layers.Dropout(rate) |
|
|
self.drop3 = tf.keras.layers.Dropout(rate) |
|
|
|
|
|
def call(self, x, enc_output, training=None): |
|
|
|
|
|
attn1 = self.att1(x, x, use_causal_mask=True) |
|
|
out1 = self.ln1(x + self.drop1(attn1, training=training)) |
|
|
attn2 = self.att2(out1, enc_output) |
|
|
out2 = self.ln2(out1 + self.drop2(attn2, training=training)) |
|
|
ffn_out = self.ffn(out2) |
|
|
return self.ln3(out2 + self.drop3(ffn_out, training=training)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_inference_model(): |
|
|
|
|
|
|
|
|
enc_raw_input = tf.keras.Input(shape=(None, 768), name='enc_raw_input') |
|
|
dec_inputs_inf = tf.keras.Input(shape=(None,), dtype=tf.int32, name='dec_inputs_inf') |
|
|
|
|
|
|
|
|
enc_out = tf.keras.layers.Dense(CONFIG["EMBED_DIM"], activation="linear", name="encoder_projection")(enc_raw_input) |
|
|
enc_out = tf.keras.layers.Dropout(CONFIG["DROPOUT"], name="encoder_dropout")(enc_out) |
|
|
|
|
|
|
|
|
dec_token_emb = tf.keras.layers.Embedding(CONFIG["MAX_VOCAB_OUT"], CONFIG["EMBED_DIM"], mask_zero=True, name='dec_token_emb') |
|
|
dec_pos_emb = tf.keras.layers.Embedding(CONFIG["MAX_SUMMARY_LEN"], CONFIG["EMBED_DIM"], name='dec_pos_emb') |
|
|
|
|
|
def add_pos_emb_inf(x): |
|
|
tokens = dec_token_emb(x) |
|
|
seq_len = tf.shape(x)[1] |
|
|
pos_idx = tf.range(seq_len) |
|
|
pos_emb = dec_pos_emb(pos_idx) |
|
|
pos_emb = tf.expand_dims(pos_emb, 0) |
|
|
return tokens + pos_emb |
|
|
|
|
|
|
|
|
dec_emb_inf = tf.keras.layers.Lambda(add_pos_emb_inf, name='dec_emb_plus_pos_inf')(dec_inputs_inf) |
|
|
|
|
|
|
|
|
dec_out = dec_emb_inf |
|
|
for i in range(CONFIG["NUM_LAYERS"]): |
|
|
block = TransformerDecoderBlock( |
|
|
CONFIG["EMBED_DIM"], |
|
|
CONFIG["NUM_HEADS"], |
|
|
CONFIG["FF_DIM"], |
|
|
CONFIG["DROPOUT"], |
|
|
name=f"decoder_block_{i}" |
|
|
) |
|
|
dec_out = block(dec_out, enc_out, training=False) |
|
|
|
|
|
|
|
|
outputs_inf = tf.keras.layers.Dense(CONFIG["MAX_VOCAB_OUT"], activation='softmax', name='output_dense')(dec_out) |
|
|
|
|
|
model = tf.keras.Model(inputs=[enc_raw_input, dec_inputs_inf], outputs=outputs_inf, name="inference_decoder_export") |
|
|
return model |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AbstractiveSummarizer: |
|
|
def __init__(self, model_dir="./models"): |
|
|
if not TF_AVAILABLE: |
|
|
raise RuntimeError(f"TensorFlow không khả dụng: {_TF_ERR}") |
|
|
|
|
|
self.model_dir = model_dir |
|
|
|
|
|
|
|
|
self.phobert = PhoBERTEncoderTorch() |
|
|
self.phobert_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base") |
|
|
|
|
|
self._load_resources() |
|
|
|
|
|
def _load_resources(self): |
|
|
tok_path = os.path.join(self.model_dir, CONFIG["TOKENIZER_FILE"]) |
|
|
weights_path = os.path.join(self.model_dir, CONFIG["WEIGHTS_FILE"]) |
|
|
|
|
|
print(f"📥 Loading tokenizer from {tok_path}...") |
|
|
with open(tok_path, "rb") as f: |
|
|
self.tokenizer = pickle.load(f) |
|
|
|
|
|
print("⚙️ Building Inference Model...") |
|
|
self.decoder_model = build_inference_model() |
|
|
|
|
|
print(f"📥 Loading weights from {weights_path}...") |
|
|
try: |
|
|
|
|
|
self.decoder_model.load_weights(weights_path) |
|
|
print("✅ Weights loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"❌ Error loading weights: {e}") |
|
|
print("Vui lòng đảm bảo bạn đã chạy bước Export trên Colab và tải file 'decoder_only.weights.h5' về.") |
|
|
|
|
|
def beam_search(self, enc_out, k=3): |
|
|
start_token = self.tokenizer.word_index.get('startseq', 1) |
|
|
end_token = self.tokenizer.word_index.get('endseq', 2) |
|
|
|
|
|
|
|
|
sequences = [(0.0, [start_token])] |
|
|
|
|
|
for _ in range(CONFIG["MAX_SUMMARY_LEN"] - 1): |
|
|
all_candidates = [] |
|
|
for score, seq in sequences: |
|
|
if seq[-1] == end_token: |
|
|
all_candidates.append((score, seq)) |
|
|
continue |
|
|
|
|
|
dec_inp = np.array([seq]) |
|
|
|
|
|
|
|
|
preds = self.decoder_model.predict([enc_out, dec_inp], verbose=0) |
|
|
|
|
|
|
|
|
last_token_probs = preds[0, -1, :] |
|
|
|
|
|
|
|
|
top_idx = np.argsort(last_token_probs)[-k:][::-1] |
|
|
for idx in top_idx: |
|
|
candidate_seq = seq + [int(idx)] |
|
|
|
|
|
candidate_score = score + float(np.log(last_token_probs[idx] + 1e-12)) |
|
|
all_candidates.append((candidate_score, candidate_seq)) |
|
|
|
|
|
|
|
|
sequences = sorted(all_candidates, key=lambda x: x[0], reverse=True)[:k] |
|
|
|
|
|
|
|
|
if all(s[-1] == end_token for _, s in sequences): |
|
|
break |
|
|
|
|
|
return sequences[0][1] |
|
|
|
|
|
def summarize(self, text, k=3): |
|
|
|
|
|
text_clean = clean_text_inference(text) |
|
|
|
|
|
|
|
|
inp = self.phobert_tokenizer( |
|
|
[text_clean], |
|
|
max_length=CONFIG["MAX_TEXT_LEN"], |
|
|
truncation=True, padding='max_length', |
|
|
return_tensors='np' |
|
|
) |
|
|
|
|
|
|
|
|
enc_out = self.phobert.encode(inp['input_ids'], inp['attention_mask']) |
|
|
|
|
|
|
|
|
seq = self.beam_search(enc_out, k=k) |
|
|
|
|
|
|
|
|
decoded_text = self.tokenizer.sequences_to_texts([seq])[0] |
|
|
summary = decoded_text.replace('startseq', '').replace('endseq', '').strip() |
|
|
|
|
|
return summary |