# --- START OF FILE abstractive.py --- import os import re import pickle import unicodedata import numpy as np import torch from transformers import AutoTokenizer, AutoModel # ------------------------------------------------------------------- # CONFIG # ------------------------------------------------------------------- CONFIG = { "NUM_LAYERS": 2, "MAX_VOCAB_OUT": 20000, "MAX_TEXT_LEN": 256, "MAX_SUMMARY_LEN": 50, "EMBED_DIM": 512, "NUM_HEADS": 4, "FF_DIM": 2048, "DROPOUT": 0.2, "TOKENIZER_FILE": "decoder_tokenizer_re.pkl", # LƯU Ý: Đây là file weights mới được tạo từ Bước 1 "WEIGHTS_FILE": "decoder_only.weights.h5" } # ------------------------------------------------------------------- # CLEAN TEXT (Cố gắng khớp logic với Colab nhất có thể) # ------------------------------------------------------------------- def clean_text_inference(text: str) -> str: if not text: return "" text = unicodedata.normalize("NFC", str(text)) text = re.sub(r"<.*?>", " ", text) # Giữ lại các ký tự cơ bản giống Colab text = "".join(ch if (ch.isalpha() or ch.isspace() or ch.isdigit() or ch in ['/', '-']) else " " for ch in text) text = text.lower() text = re.sub(r"\s+", " ", text).strip() return text # ------------------------------------------------------------------- # PHOBERT ENCODER (PyTorch) - [ĐÃ SỬA ĐỂ KHỚP DATA TYPE] # ------------------------------------------------------------------- class PhoBERTEncoderTorch: def __init__(self): print(">>> Loading PhoBERT (PyTorch)...") self.device = torch.device("cpu") self.model = AutoModel.from_pretrained("vinai/phobert-base").to(self.device) self.model.eval() print(">>> PhoBERT loaded successfully.") def encode(self, input_ids, attention_mask): with torch.no_grad(): # [SỬA 1] Ép kiểu input thành Long (int64) để tránh lỗi với PyTorch CPU ids = torch.tensor(input_ids, dtype=torch.long).to(self.device) mask = torch.tensor(attention_mask, dtype=torch.long).to(self.device) outputs = self.model(ids, attention_mask=mask) # [SỬA 2] Quan trọng nhất: Chuyển output về float32 # PyTorch CPU thường trả về float64, nhưng TensorFlow Keras 3 cần float32. # Nếu không ép kiểu này, model sẽ tính toán ra rác. return outputs.last_hidden_state.detach().cpu().numpy().astype(np.float32) # ------------------------------------------------------------------- # SAFE IMPORT TENSORFLOW # ------------------------------------------------------------------- TF_AVAILABLE = True try: import tensorflow as tf except Exception as e: TF_AVAILABLE = False _TF_ERR = e tf = None # ------------------------------------------------------------------- # DECODER TRANSFORMER (TensorFlow) # ------------------------------------------------------------------- if TF_AVAILABLE: class TransformerDecoderBlock(tf.keras.layers.Layer): def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): super().__init__(**kwargs) self.att1 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) self.att2 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) self.ffn = tf.keras.Sequential([ tf.keras.layers.Dense(ff_dim, activation="relu"), tf.keras.layers.Dense(embed_dim), ]) self.ln1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.ln2 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.ln3 = tf.keras.layers.LayerNormalization(epsilon=1e-6) self.drop1 = tf.keras.layers.Dropout(rate) self.drop2 = tf.keras.layers.Dropout(rate) self.drop3 = tf.keras.layers.Dropout(rate) def call(self, x, enc_output, training=None): # Code này dùng use_causal_mask=True -> Bắt buộc dùng Keras 3 (TensorFlow > 2.16) attn1 = self.att1(x, x, use_causal_mask=True) out1 = self.ln1(x + self.drop1(attn1, training=training)) attn2 = self.att2(out1, enc_output) out2 = self.ln2(out1 + self.drop2(attn2, training=training)) ffn_out = self.ffn(out2) return self.ln3(out2 + self.drop3(ffn_out, training=training)) # ------------------------------------------------------------------- # BUILD DECODER MODEL # ------------------------------------------------------------------- def build_inference_model(): # 1. Inputs # enc_raw_input: nhận output 768 chiều từ PhoBERT PyTorch enc_raw_input = tf.keras.Input(shape=(None, 768), name='enc_raw_input') dec_inputs_inf = tf.keras.Input(shape=(None,), dtype=tf.int32, name='dec_inputs_inf') # 2. Projection Layer enc_out = tf.keras.layers.Dense(CONFIG["EMBED_DIM"], activation="linear", name="encoder_projection")(enc_raw_input) enc_out = tf.keras.layers.Dropout(CONFIG["DROPOUT"], name="encoder_dropout")(enc_out) # 3. Embeddings dec_token_emb = tf.keras.layers.Embedding(CONFIG["MAX_VOCAB_OUT"], CONFIG["EMBED_DIM"], mask_zero=True, name='dec_token_emb') dec_pos_emb = tf.keras.layers.Embedding(CONFIG["MAX_SUMMARY_LEN"], CONFIG["EMBED_DIM"], name='dec_pos_emb') def add_pos_emb_inf(x): tokens = dec_token_emb(x) seq_len = tf.shape(x)[1] pos_idx = tf.range(seq_len) pos_emb = dec_pos_emb(pos_idx) pos_emb = tf.expand_dims(pos_emb, 0) return tokens + pos_emb # Wrap lambda để khớp cấu trúc dec_emb_inf = tf.keras.layers.Lambda(add_pos_emb_inf, name='dec_emb_plus_pos_inf')(dec_inputs_inf) # 4. Decoder Blocks dec_out = dec_emb_inf for i in range(CONFIG["NUM_LAYERS"]): block = TransformerDecoderBlock( CONFIG["EMBED_DIM"], CONFIG["NUM_HEADS"], CONFIG["FF_DIM"], CONFIG["DROPOUT"], name=f"decoder_block_{i}" ) dec_out = block(dec_out, enc_out, training=False) # 5. Output outputs_inf = tf.keras.layers.Dense(CONFIG["MAX_VOCAB_OUT"], activation='softmax', name='output_dense')(dec_out) model = tf.keras.Model(inputs=[enc_raw_input, dec_inputs_inf], outputs=outputs_inf, name="inference_decoder_export") return model # ------------------------------------------------------------------- # MAIN SUMMARIZER # ------------------------------------------------------------------- class AbstractiveSummarizer: def __init__(self, model_dir="./models"): if not TF_AVAILABLE: raise RuntimeError(f"TensorFlow không khả dụng: {_TF_ERR}") self.model_dir = model_dir # Load PhoBERT (PyTorch) self.phobert = PhoBERTEncoderTorch() self.phobert_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base") self._load_resources() def _load_resources(self): tok_path = os.path.join(self.model_dir, CONFIG["TOKENIZER_FILE"]) weights_path = os.path.join(self.model_dir, CONFIG["WEIGHTS_FILE"]) print(f"📥 Loading tokenizer from {tok_path}...") with open(tok_path, "rb") as f: self.tokenizer = pickle.load(f) print("⚙️ Building Inference Model...") self.decoder_model = build_inference_model() print(f"📥 Loading weights from {weights_path}...") try: # Load weights. self.decoder_model.load_weights(weights_path) print("✅ Weights loaded successfully!") except Exception as e: print(f"❌ Error loading weights: {e}") print("Vui lòng đảm bảo bạn đã chạy bước Export trên Colab và tải file 'decoder_only.weights.h5' về.") def beam_search(self, enc_out, k=3): start_token = self.tokenizer.word_index.get('startseq', 1) end_token = self.tokenizer.word_index.get('endseq', 2) # Sequence bắt đầu: (score, [start_token]) sequences = [(0.0, [start_token])] for _ in range(CONFIG["MAX_SUMMARY_LEN"] - 1): all_candidates = [] for score, seq in sequences: if seq[-1] == end_token: all_candidates.append((score, seq)) continue dec_inp = np.array([seq]) # (1, cur_len) # Predict: [enc_out (1, seq, 768), dec_inp (1, cur_len)] preds = self.decoder_model.predict([enc_out, dec_inp], verbose=0) # Lấy token cuối cùng last_token_probs = preds[0, -1, :] # Top k top_idx = np.argsort(last_token_probs)[-k:][::-1] for idx in top_idx: candidate_seq = seq + [int(idx)] # Log probability để cộng dồn điểm (tránh số quá nhỏ) candidate_score = score + float(np.log(last_token_probs[idx] + 1e-12)) all_candidates.append((candidate_score, candidate_seq)) # Chọn k chuỗi tốt nhất sequences = sorted(all_candidates, key=lambda x: x[0], reverse=True)[:k] # Nếu tất cả chuỗi đều đã kết thúc thì dừng sớm if all(s[-1] == end_token for _, s in sequences): break return sequences[0][1] def summarize(self, text, k=3): # 1. Clean Text text_clean = clean_text_inference(text) # 2. Tokenize Input (PhoBERT) inp = self.phobert_tokenizer( [text_clean], max_length=CONFIG["MAX_TEXT_LEN"], truncation=True, padding='max_length', return_tensors='np' ) # 3. Encode bằng PyTorch (nhận về vector 768 chiều, đã ép float32) enc_out = self.phobert.encode(inp['input_ids'], inp['attention_mask']) # 4. Generate Summary bằng TF Model seq = self.beam_search(enc_out, k=k) # 5. Decode kết quả decoded_text = self.tokenizer.sequences_to_texts([seq])[0] summary = decoded_text.replace('startseq', '').replace('endseq', '').strip() return summary