|
|
|
|
|
|
|
|
import re
|
|
|
import os
|
|
|
import random
|
|
|
import networkx as nx
|
|
|
import numpy as np
|
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
|
|
try:
|
|
|
import py_vncorenlp
|
|
|
_VNCORENLP_AVAILABLE = True
|
|
|
except Exception:
|
|
|
_VNCORENLP_AVAILABLE = False
|
|
|
|
|
|
|
|
|
DEFAULT_STOPWORDS_PATH = os.path.join("data", "Vietnamese-stopwords.txt")
|
|
|
|
|
|
|
|
|
def load_stopwords(path=None):
|
|
|
p = path or DEFAULT_STOPWORDS_PATH
|
|
|
if os.path.exists(p):
|
|
|
try:
|
|
|
with open(p, "r", encoding="utf-8") as f:
|
|
|
return [m.strip().lower() for m in f.readlines() if m.strip()]
|
|
|
except Exception:
|
|
|
return None
|
|
|
return None
|
|
|
|
|
|
|
|
|
def clean_text_basic(text):
|
|
|
if not text:
|
|
|
return ""
|
|
|
|
|
|
text = re.sub(r'^.*(Ảnh|Video|Clip|Nguồn)\s*:[^(\n)]*.*$', '', text, flags=re.MULTILINE | re.IGNORECASE)
|
|
|
text = re.sub(r'\(?\[?(Ảnh|Video|Clip)\s*:[^)]+[\)\]]?', '', text, flags=re.IGNORECASE)
|
|
|
text = re.sub(r'\n\s*[A-ZÁÀẢÃẠÂĂÊÔƠƯĐ\s\.]{2,50}$', '', text.strip(), flags=re.MULTILINE)
|
|
|
text = re.sub(r'[ \t]+', ' ', text)
|
|
|
return text.strip()
|
|
|
|
|
|
|
|
|
def split_sentences_fallback(text):
|
|
|
sentences = re.split(r'(?<=[.!?])\s+', text)
|
|
|
return [s.strip() for s in sentences if len(s.strip()) > 15]
|
|
|
|
|
|
|
|
|
class ExtractiveSummarizer:
|
|
|
def __init__(self, stopwords_path=None):
|
|
|
self.stopwords = load_stopwords(stopwords_path)
|
|
|
|
|
|
|
|
|
self.rdrsegmenter = None
|
|
|
if _VNCORENLP_AVAILABLE:
|
|
|
try:
|
|
|
|
|
|
self.rdrsegmenter = py_vncorenlp.VnCoreNLP()
|
|
|
except Exception:
|
|
|
self.rdrsegmenter = None
|
|
|
|
|
|
def process_text_vncorenlp(self, text):
|
|
|
text = clean_text_basic(text)
|
|
|
paragraphs = [p.strip() for p in text.replace('\r\n', '\n').replace('\r', '\n').split('\n') if p.strip()]
|
|
|
processed_sentences = []
|
|
|
if self.rdrsegmenter:
|
|
|
try:
|
|
|
for p in paragraphs:
|
|
|
sentences = self.rdrsegmenter.word_segment(p)
|
|
|
for s in sentences:
|
|
|
raw_sent = s.replace("_", " ")
|
|
|
if len(raw_sent) > 15 and any(c.isalpha() for c in raw_sent):
|
|
|
processed_sentences.append({'raw': raw_sent, 'segmented': s})
|
|
|
except Exception:
|
|
|
processed_sentences = []
|
|
|
|
|
|
if not processed_sentences:
|
|
|
|
|
|
raw_sents = split_sentences_fallback(text)
|
|
|
processed_sentences = [{'raw': s, 'segmented': s} for s in raw_sents]
|
|
|
|
|
|
return processed_sentences
|
|
|
|
|
|
def summarize(self, text, top_n=3):
|
|
|
if not text or not text.strip():
|
|
|
return ""
|
|
|
sent_data = self.process_text_vncorenlp(text)
|
|
|
if not sent_data:
|
|
|
return ""
|
|
|
if len(sent_data) <= top_n:
|
|
|
|
|
|
return " ".join([s['raw'] for s in sent_data])
|
|
|
|
|
|
corpus = [item['segmented'] for item in sent_data]
|
|
|
try:
|
|
|
vectorizer = TfidfVectorizer(stop_words=self.stopwords)
|
|
|
tfidf_matrix = vectorizer.fit_transform(corpus)
|
|
|
except Exception:
|
|
|
return sent_data[0]['raw'] if sent_data else text
|
|
|
|
|
|
similarity_matrix = cosine_similarity(tfidf_matrix, tfidf_matrix)
|
|
|
nx_graph = nx.from_numpy_array(similarity_matrix)
|
|
|
try:
|
|
|
scores = nx.pagerank(nx_graph, max_iter=500)
|
|
|
except Exception:
|
|
|
scores = {i: 1 for i in range(len(sent_data))}
|
|
|
|
|
|
ranked_sentences = sorted(((scores[i], sent_data[i]['raw'], i)
|
|
|
for i in range(len(sent_data))), reverse=True)
|
|
|
selected_sentences = ranked_sentences[:top_n]
|
|
|
selected_sentences.sort(key=lambda x: x[2])
|
|
|
summary = " ".join([s for _, s, _ in selected_sentences])
|
|
|
return summary |