HIEHEU's picture
Update py/abstractive.py
41e35d1 verified
# --- START OF FILE abstractive.py ---
import os
import re
import pickle
import unicodedata
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
# -------------------------------------------------------------------
# CONFIG
# -------------------------------------------------------------------
CONFIG = {
"NUM_LAYERS": 2,
"MAX_VOCAB_OUT": 20000,
"MAX_TEXT_LEN": 256,
"MAX_SUMMARY_LEN": 50,
"EMBED_DIM": 512,
"NUM_HEADS": 4,
"FF_DIM": 2048,
"DROPOUT": 0.2,
"TOKENIZER_FILE": "decoder_tokenizer_re.pkl",
# LƯU Ý: Đây là file weights mới được tạo từ Bước 1
"WEIGHTS_FILE": "decoder_only.weights.h5"
}
# -------------------------------------------------------------------
# CLEAN TEXT (Cố gắng khớp logic với Colab nhất có thể)
# -------------------------------------------------------------------
def clean_text_inference(text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFC", str(text))
text = re.sub(r"<.*?>", " ", text)
# Giữ lại các ký tự cơ bản giống Colab
text = "".join(ch if (ch.isalpha() or ch.isspace() or ch.isdigit() or ch in ['/', '-']) else " " for ch in text)
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
# -------------------------------------------------------------------
# PHOBERT ENCODER (PyTorch) - [ĐÃ SỬA ĐỂ KHỚP DATA TYPE]
# -------------------------------------------------------------------
class PhoBERTEncoderTorch:
def __init__(self):
print(">>> Loading PhoBERT (PyTorch)...")
self.device = torch.device("cpu")
self.model = AutoModel.from_pretrained("vinai/phobert-base").to(self.device)
self.model.eval()
print(">>> PhoBERT loaded successfully.")
def encode(self, input_ids, attention_mask):
with torch.no_grad():
# [SỬA 1] Ép kiểu input thành Long (int64) để tránh lỗi với PyTorch CPU
ids = torch.tensor(input_ids, dtype=torch.long).to(self.device)
mask = torch.tensor(attention_mask, dtype=torch.long).to(self.device)
outputs = self.model(ids, attention_mask=mask)
# [SỬA 2] Quan trọng nhất: Chuyển output về float32
# PyTorch CPU thường trả về float64, nhưng TensorFlow Keras 3 cần float32.
# Nếu không ép kiểu này, model sẽ tính toán ra rác.
return outputs.last_hidden_state.detach().cpu().numpy().astype(np.float32)
# -------------------------------------------------------------------
# SAFE IMPORT TENSORFLOW
# -------------------------------------------------------------------
TF_AVAILABLE = True
try:
import tensorflow as tf
except Exception as e:
TF_AVAILABLE = False
_TF_ERR = e
tf = None
# -------------------------------------------------------------------
# DECODER TRANSFORMER (TensorFlow)
# -------------------------------------------------------------------
if TF_AVAILABLE:
class TransformerDecoderBlock(tf.keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super().__init__(**kwargs)
self.att1 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.att2 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = tf.keras.Sequential([
tf.keras.layers.Dense(ff_dim, activation="relu"),
tf.keras.layers.Dense(embed_dim),
])
self.ln1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.ln2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.ln3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.drop1 = tf.keras.layers.Dropout(rate)
self.drop2 = tf.keras.layers.Dropout(rate)
self.drop3 = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training=None):
# Code này dùng use_causal_mask=True -> Bắt buộc dùng Keras 3 (TensorFlow > 2.16)
attn1 = self.att1(x, x, use_causal_mask=True)
out1 = self.ln1(x + self.drop1(attn1, training=training))
attn2 = self.att2(out1, enc_output)
out2 = self.ln2(out1 + self.drop2(attn2, training=training))
ffn_out = self.ffn(out2)
return self.ln3(out2 + self.drop3(ffn_out, training=training))
# -------------------------------------------------------------------
# BUILD DECODER MODEL
# -------------------------------------------------------------------
def build_inference_model():
# 1. Inputs
# enc_raw_input: nhận output 768 chiều từ PhoBERT PyTorch
enc_raw_input = tf.keras.Input(shape=(None, 768), name='enc_raw_input')
dec_inputs_inf = tf.keras.Input(shape=(None,), dtype=tf.int32, name='dec_inputs_inf')
# 2. Projection Layer
enc_out = tf.keras.layers.Dense(CONFIG["EMBED_DIM"], activation="linear", name="encoder_projection")(enc_raw_input)
enc_out = tf.keras.layers.Dropout(CONFIG["DROPOUT"], name="encoder_dropout")(enc_out)
# 3. Embeddings
dec_token_emb = tf.keras.layers.Embedding(CONFIG["MAX_VOCAB_OUT"], CONFIG["EMBED_DIM"], mask_zero=True, name='dec_token_emb')
dec_pos_emb = tf.keras.layers.Embedding(CONFIG["MAX_SUMMARY_LEN"], CONFIG["EMBED_DIM"], name='dec_pos_emb')
def add_pos_emb_inf(x):
tokens = dec_token_emb(x)
seq_len = tf.shape(x)[1]
pos_idx = tf.range(seq_len)
pos_emb = dec_pos_emb(pos_idx)
pos_emb = tf.expand_dims(pos_emb, 0)
return tokens + pos_emb
# Wrap lambda để khớp cấu trúc
dec_emb_inf = tf.keras.layers.Lambda(add_pos_emb_inf, name='dec_emb_plus_pos_inf')(dec_inputs_inf)
# 4. Decoder Blocks
dec_out = dec_emb_inf
for i in range(CONFIG["NUM_LAYERS"]):
block = TransformerDecoderBlock(
CONFIG["EMBED_DIM"],
CONFIG["NUM_HEADS"],
CONFIG["FF_DIM"],
CONFIG["DROPOUT"],
name=f"decoder_block_{i}"
)
dec_out = block(dec_out, enc_out, training=False)
# 5. Output
outputs_inf = tf.keras.layers.Dense(CONFIG["MAX_VOCAB_OUT"], activation='softmax', name='output_dense')(dec_out)
model = tf.keras.Model(inputs=[enc_raw_input, dec_inputs_inf], outputs=outputs_inf, name="inference_decoder_export")
return model
# -------------------------------------------------------------------
# MAIN SUMMARIZER
# -------------------------------------------------------------------
class AbstractiveSummarizer:
def __init__(self, model_dir="./models"):
if not TF_AVAILABLE:
raise RuntimeError(f"TensorFlow không khả dụng: {_TF_ERR}")
self.model_dir = model_dir
# Load PhoBERT (PyTorch)
self.phobert = PhoBERTEncoderTorch()
self.phobert_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")
self._load_resources()
def _load_resources(self):
tok_path = os.path.join(self.model_dir, CONFIG["TOKENIZER_FILE"])
weights_path = os.path.join(self.model_dir, CONFIG["WEIGHTS_FILE"])
print(f"📥 Loading tokenizer from {tok_path}...")
with open(tok_path, "rb") as f:
self.tokenizer = pickle.load(f)
print("⚙️ Building Inference Model...")
self.decoder_model = build_inference_model()
print(f"📥 Loading weights from {weights_path}...")
try:
# Load weights.
self.decoder_model.load_weights(weights_path)
print("✅ Weights loaded successfully!")
except Exception as e:
print(f"❌ Error loading weights: {e}")
print("Vui lòng đảm bảo bạn đã chạy bước Export trên Colab và tải file 'decoder_only.weights.h5' về.")
def beam_search(self, enc_out, k=3):
start_token = self.tokenizer.word_index.get('startseq', 1)
end_token = self.tokenizer.word_index.get('endseq', 2)
# Sequence bắt đầu: (score, [start_token])
sequences = [(0.0, [start_token])]
for _ in range(CONFIG["MAX_SUMMARY_LEN"] - 1):
all_candidates = []
for score, seq in sequences:
if seq[-1] == end_token:
all_candidates.append((score, seq))
continue
dec_inp = np.array([seq]) # (1, cur_len)
# Predict: [enc_out (1, seq, 768), dec_inp (1, cur_len)]
preds = self.decoder_model.predict([enc_out, dec_inp], verbose=0)
# Lấy token cuối cùng
last_token_probs = preds[0, -1, :]
# Top k
top_idx = np.argsort(last_token_probs)[-k:][::-1]
for idx in top_idx:
candidate_seq = seq + [int(idx)]
# Log probability để cộng dồn điểm (tránh số quá nhỏ)
candidate_score = score + float(np.log(last_token_probs[idx] + 1e-12))
all_candidates.append((candidate_score, candidate_seq))
# Chọn k chuỗi tốt nhất
sequences = sorted(all_candidates, key=lambda x: x[0], reverse=True)[:k]
# Nếu tất cả chuỗi đều đã kết thúc thì dừng sớm
if all(s[-1] == end_token for _, s in sequences):
break
return sequences[0][1]
def summarize(self, text, k=3):
# 1. Clean Text
text_clean = clean_text_inference(text)
# 2. Tokenize Input (PhoBERT)
inp = self.phobert_tokenizer(
[text_clean],
max_length=CONFIG["MAX_TEXT_LEN"],
truncation=True, padding='max_length',
return_tensors='np'
)
# 3. Encode bằng PyTorch (nhận về vector 768 chiều, đã ép float32)
enc_out = self.phobert.encode(inp['input_ids'], inp['attention_mask'])
# 4. Generate Summary bằng TF Model
seq = self.beam_search(enc_out, k=k)
# 5. Decode kết quả
decoded_text = self.tokenizer.sequences_to_texts([seq])[0]
summary = decoded_text.replace('startseq', '').replace('endseq', '').strip()
return summary