|
|
""" |
|
|
Document Scraper - Handles PDF and document processing |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import hashlib |
|
|
import tempfile |
|
|
import requests |
|
|
import urllib3 |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any |
|
|
from urllib.parse import urlparse, urlunparse, unquote |
|
|
|
|
|
|
|
|
from scraper_common import ( |
|
|
WEBSITE_CONFIG, MAX_PDF_LIMIT, MAX_ARTICLE_LIMIT, MAX_PAGE_LIMIT, |
|
|
ensure_archive_directory, convert_to_absolute_url, |
|
|
set_scraping_cancelled, scraping_cancelled, force_close_browser, |
|
|
reset_global_pdf_count, increment_global_pdf_count, get_global_pdf_count, is_pdf_limit_reached, |
|
|
get_pdf_websites |
|
|
) |
|
|
|
|
|
|
|
|
from date_filter import is_date_in_range, parse_date_input, standardize_date |
|
|
|
|
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def construct_navigation_url(base_url: str, nav_addition: str) -> str: |
|
|
""" |
|
|
Construct navigation URL by properly handling trailing slashes and query parameters |
|
|
""" |
|
|
|
|
|
if base_url.endswith('/'): |
|
|
base_url = base_url.rstrip('/') |
|
|
|
|
|
|
|
|
if nav_addition.startswith('/'): |
|
|
|
|
|
return base_url + nav_addition |
|
|
elif nav_addition.startswith('?'): |
|
|
|
|
|
return base_url + nav_addition |
|
|
else: |
|
|
|
|
|
return base_url + '/' + nav_addition |
|
|
|
|
|
|
|
|
mopnd_article_dates = {} |
|
|
mopnd_article_titles = {} |
|
|
|
|
|
def clear_mopnd_cache(): |
|
|
"""Clear MOPND article cache when starting a new scraping session""" |
|
|
global mopnd_article_dates, mopnd_article_titles |
|
|
mopnd_article_dates.clear() |
|
|
mopnd_article_titles.clear() |
|
|
logger.info("π§Ή Cleared MOPND article cache") |
|
|
|
|
|
def get_pdf_hash(pdf_url: str) -> str: |
|
|
"""Generate a hash for the PDF URL to use as cache key""" |
|
|
return hashlib.md5(pdf_url.encode()).hexdigest() |
|
|
|
|
|
def is_pdf_archived(pdf_url: str, source: str) -> bool: |
|
|
"""Check if PDF is already archived""" |
|
|
ensure_archive_directory() |
|
|
hash_key = get_pdf_hash(pdf_url) |
|
|
archive_dir = f"archive/{source}" |
|
|
date_folder = datetime.now().strftime("%Y-%m-%d") |
|
|
archive_path = f"{archive_dir}/{date_folder}" |
|
|
|
|
|
if os.path.exists(archive_path): |
|
|
for file in os.listdir(archive_path): |
|
|
if file.startswith(hash_key): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def get_archived_pdf_path(pdf_url: str, source: str) -> str: |
|
|
"""Get the archived PDF file path""" |
|
|
ensure_archive_directory() |
|
|
hash_key = get_pdf_hash(pdf_url) |
|
|
archive_dir = f"archive/{source}" |
|
|
date_folder = datetime.now().strftime("%Y-%m-%d") |
|
|
archive_path = f"{archive_dir}/{date_folder}" |
|
|
|
|
|
if os.path.exists(archive_path): |
|
|
for file in os.listdir(archive_path): |
|
|
if file.startswith(hash_key): |
|
|
return os.path.join(archive_path, file) |
|
|
return None |
|
|
|
|
|
def archive_pdf(pdf_url: str, content: bytes, source: str) -> str: |
|
|
"""Archive PDF content and return the local file path""" |
|
|
logger.info(f"πΎ Starting PDF archiving process...") |
|
|
ensure_archive_directory() |
|
|
|
|
|
|
|
|
archive_dir = f"archive/{source}" |
|
|
date_folder = datetime.now().strftime("%Y-%m-%d") |
|
|
archive_path = f"{archive_dir}/{date_folder}" |
|
|
|
|
|
|
|
|
os.makedirs(archive_path, exist_ok=True) |
|
|
|
|
|
|
|
|
hash_key = get_pdf_hash(pdf_url) |
|
|
filename = f"{hash_key}.pdf" |
|
|
file_path = os.path.join(archive_path, filename) |
|
|
|
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
f.write(content) |
|
|
|
|
|
logger.info(f"π PDF archived to: {file_path}") |
|
|
|
|
|
|
|
|
update_archive_index(pdf_url, file_path, source) |
|
|
|
|
|
return file_path |
|
|
|
|
|
def archive_file(file_url: str, content: bytes, source: str, file_extension: str = "csv") -> str: |
|
|
"""Archive file content (CSV, etc.) and return the local file path""" |
|
|
logger.info(f"πΎ Starting file archiving process for {file_extension.upper()}...") |
|
|
ensure_archive_directory() |
|
|
|
|
|
|
|
|
archive_dir = f"archive/{source}" |
|
|
date_folder = datetime.now().strftime("%Y-%m-%d") |
|
|
archive_path = f"{archive_dir}/{date_folder}" |
|
|
|
|
|
|
|
|
os.makedirs(archive_path, exist_ok=True) |
|
|
|
|
|
|
|
|
hash_key = get_pdf_hash(file_url) |
|
|
filename = f"{hash_key}.{file_extension}" |
|
|
file_path = os.path.join(archive_path, filename) |
|
|
|
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
f.write(content) |
|
|
|
|
|
logger.info(f"π File archived to: {file_path}") |
|
|
|
|
|
|
|
|
update_archive_index(file_url, file_path, source) |
|
|
|
|
|
return file_path |
|
|
|
|
|
def update_archive_index(pdf_url: str, local_path: str, source: str): |
|
|
"""Update the archive index with PDF information""" |
|
|
ensure_archive_directory() |
|
|
index_file = f"archive/{source}/index.json" |
|
|
|
|
|
|
|
|
if os.path.exists(index_file): |
|
|
try: |
|
|
with open(index_file, 'r') as f: |
|
|
index = json.load(f) |
|
|
except: |
|
|
index = {} |
|
|
else: |
|
|
index = {} |
|
|
|
|
|
|
|
|
hash_key = get_pdf_hash(pdf_url) |
|
|
index[hash_key] = { |
|
|
"url": pdf_url, |
|
|
"local_path": local_path, |
|
|
"source": source, |
|
|
"archived_date": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
with open(index_file, 'w') as f: |
|
|
json.dump(index, f, indent=2) |
|
|
|
|
|
def download_and_save_pdf(pdf_url: str, source: str = "unknown") -> dict: |
|
|
""" |
|
|
Download PDF and save it to archive, return metadata |
|
|
""" |
|
|
try: |
|
|
logger.info(f"β¬οΈ Downloading PDF: {pdf_url}") |
|
|
logger.info(f"π Source: {source}") |
|
|
|
|
|
|
|
|
if is_pdf_archived(pdf_url, source): |
|
|
logger.info(f"β
PDF already archived: {pdf_url}") |
|
|
cached_path = get_archived_pdf_path(pdf_url, source) |
|
|
return { |
|
|
"success": True, |
|
|
"path": cached_path, |
|
|
"size": os.path.getsize(cached_path), |
|
|
"message": "PDF already archived" |
|
|
} |
|
|
|
|
|
|
|
|
parsed_url = urlparse(pdf_url) |
|
|
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}" |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Connection": "keep-alive", |
|
|
"Referer": base_domain |
|
|
} |
|
|
|
|
|
logger.info(f"π Using base domain as referer: {base_domain}") |
|
|
|
|
|
|
|
|
try: |
|
|
session = requests.Session() |
|
|
|
|
|
session.verify = False |
|
|
|
|
|
|
|
|
session.get(base_domain, headers=headers, timeout=30, verify=False) |
|
|
logger.info(f"πͺ Visited domain homepage to gather cookies") |
|
|
|
|
|
|
|
|
response = session.get(pdf_url, headers=headers, timeout=30, verify=False) |
|
|
response.raise_for_status() |
|
|
logger.info(f"β
PDF downloaded successfully. Size: {len(response.content)} bytes") |
|
|
except Exception as e: |
|
|
logger.error(f"β Error downloading PDF: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
archived_path = archive_pdf(pdf_url, response.content, source) |
|
|
logger.info(f"π PDF archived to: {archived_path}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"path": archived_path, |
|
|
"size": len(response.content), |
|
|
"message": "PDF downloaded and archived successfully" |
|
|
} |
|
|
except Exception as e: |
|
|
|
|
|
logger.error(f"β PDF download failed for {pdf_url}: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"path": None, |
|
|
"size": 0, |
|
|
"message": f"Error downloading PDF: {str(e)}" |
|
|
} |
|
|
|
|
|
def download_and_save_file(file_url: str, source: str = "unknown", file_type: str = "csv") -> dict: |
|
|
""" |
|
|
Download file (CSV, etc.) and save it to archive, return metadata |
|
|
""" |
|
|
try: |
|
|
logger.info(f"β¬οΈ Downloading {file_type.upper()}: {file_url}") |
|
|
logger.info(f"π Source: {source}") |
|
|
|
|
|
|
|
|
file_extension = file_type.lower() |
|
|
if file_extension not in ["csv", "xlsx", "xls", "png", "jpg", "jpeg", "gif", "webp"]: |
|
|
|
|
|
if file_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
|
|
file_extension = file_url.lower().split('.')[-1] |
|
|
else: |
|
|
file_extension = "csv" |
|
|
|
|
|
|
|
|
if is_pdf_archived(file_url, source): |
|
|
logger.info(f"β
File already archived: {file_url}") |
|
|
cached_path = get_archived_pdf_path(file_url, source) |
|
|
|
|
|
if cached_path and os.path.exists(cached_path): |
|
|
return { |
|
|
"success": True, |
|
|
"path": cached_path, |
|
|
"size": os.path.getsize(cached_path), |
|
|
"file_type": file_type, |
|
|
"message": "File already archived" |
|
|
} |
|
|
|
|
|
|
|
|
parsed_url = urlparse(file_url) |
|
|
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}" |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Connection": "keep-alive", |
|
|
"Referer": base_domain |
|
|
} |
|
|
|
|
|
logger.info(f"π Using base domain as referer: {base_domain}") |
|
|
|
|
|
|
|
|
try: |
|
|
session = requests.Session() |
|
|
|
|
|
session.verify = False |
|
|
|
|
|
|
|
|
session.get(base_domain, headers=headers, timeout=30, verify=False) |
|
|
logger.info(f"πͺ Visited domain homepage to gather cookies") |
|
|
|
|
|
|
|
|
response = session.get(file_url, headers=headers, timeout=30, verify=False) |
|
|
response.raise_for_status() |
|
|
logger.info(f"β
{file_type.upper()} downloaded successfully. Size: {len(response.content)} bytes") |
|
|
except Exception as e: |
|
|
logger.error(f"β Error downloading {file_type.upper()}: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
archived_path = archive_file(file_url, response.content, source, file_extension) |
|
|
logger.info(f"π {file_type.upper()} archived to: {archived_path}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"path": archived_path, |
|
|
"size": len(response.content), |
|
|
"file_type": file_type, |
|
|
"message": f"{file_type.upper()} downloaded and archived successfully" |
|
|
} |
|
|
except Exception as e: |
|
|
|
|
|
logger.error(f"β {file_type.upper()} download failed for {file_url}: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"path": None, |
|
|
"size": 0, |
|
|
"file_type": file_type, |
|
|
"message": f"Error downloading {file_type.upper()}: {str(e)}" |
|
|
} |
|
|
|
|
|
def get_website_type_from_source(source: str) -> str: |
|
|
""" |
|
|
Map source name to website type for config lookup |
|
|
""" |
|
|
source_to_type = { |
|
|
"FS Cluster": "fscluster", |
|
|
"ReliefWeb": "reliefweb", |
|
|
"NBS Somalia": "nbs", |
|
|
"HDX": "hdx", |
|
|
"HDX Humanitarian Data Exchange": "hdx", |
|
|
"LogCluster": "logcluster", |
|
|
"FSNau": "fsnau", |
|
|
"FSNau - Food Security and Nutrition Analysis Unit": "fsnau", |
|
|
"FSNau Publications": "fsnau_publications", |
|
|
"FEWS NET": "fews", |
|
|
"FEWS NET - Famine Early Warning Systems Network": "fews", |
|
|
"ICPAC": "icpac", |
|
|
"ICPAC - IGAD Climate Prediction and Applications Centre": "icpac", |
|
|
"ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast": "icpac_seasonal_forecast", |
|
|
"FAO SWALIM": "faoswalim", |
|
|
"FAO SWALIM Publications": "faoswalim_publications", |
|
|
"FAO SWALIM Journals": "faoswalim_journals", |
|
|
"FAO SWALIM Events": "faoswalim_events", |
|
|
"FAO SWALIM Articles": "faoswalim_articles", |
|
|
"FAO SWALIM Flood Watch": "faoswalim_flood_watch", |
|
|
"FAO SWALIM Water Publications": "faoswalim_water_publications", |
|
|
"MOPND Somaliland": "mopnd", |
|
|
"Copernicus Drought Observatory": "copernicus_drought", |
|
|
"fscluster": "fscluster", |
|
|
"reliefweb": "reliefweb", |
|
|
"NBS": "nbs", |
|
|
"HDX": "hdx", |
|
|
"LogCluster": "logcluster", |
|
|
"FSNau": "fsnau", |
|
|
"FSNau Publications": "fsnau_publications", |
|
|
"FEWS NET": "fews", |
|
|
"ICPAC": "icpac", |
|
|
"FAO SWALIM": "faoswalim" |
|
|
} |
|
|
return source_to_type.get(source, "fscluster") |
|
|
|
|
|
|
|
|
def extract_pdf_text(pdf_url: str, source: str = "unknown") -> str: |
|
|
""" |
|
|
Extract text content from archived PDF using multiple methods |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Starting PDF text extraction for URL: {pdf_url}") |
|
|
logger.info(f"π Source: {source}") |
|
|
|
|
|
|
|
|
parsed_url = urlparse(pdf_url) |
|
|
|
|
|
|
|
|
if not parsed_url.scheme and pdf_url.startswith('/'): |
|
|
|
|
|
website_type = get_website_type_from_source(source) |
|
|
config = WEBSITE_CONFIG.get(website_type, {}) |
|
|
base_url = config.get('base_url', 'https://fscluster.org') |
|
|
|
|
|
logger.info(f"π Using base_url from config for {website_type}: {base_url}") |
|
|
|
|
|
|
|
|
complete_url = f"{base_url}{pdf_url}" |
|
|
logger.info(f"π Converted relative URL {pdf_url} to absolute URL: {complete_url}") |
|
|
pdf_url = complete_url |
|
|
|
|
|
|
|
|
if is_pdf_archived(pdf_url, source): |
|
|
cached_path = get_archived_pdf_path(pdf_url, source) |
|
|
logger.info(f"π Using archived PDF: {cached_path}") |
|
|
result = extract_text_from_pdf_file(cached_path) |
|
|
logger.info(f"π Extracted text length: {len(result)} characters") |
|
|
|
|
|
if not result.strip(): |
|
|
logger.warning("β οΈ No text extracted from PDF - might be image-based or corrupted") |
|
|
else: |
|
|
logger.info(f"β
Successfully extracted text from PDF") |
|
|
|
|
|
return result |
|
|
else: |
|
|
|
|
|
logger.info(f"β PDF not found in archive: {pdf_url}") |
|
|
logger.info(f"β¬οΈ Attempting to download PDF now...") |
|
|
|
|
|
|
|
|
download_result = download_and_save_pdf(pdf_url, source) |
|
|
if download_result["success"]: |
|
|
logger.info(f"β
Successfully downloaded PDF: {download_result['path']}") |
|
|
|
|
|
result = extract_text_from_pdf_file(download_result["path"]) |
|
|
return result |
|
|
else: |
|
|
logger.error(f"β Failed to download PDF: {download_result['message']}") |
|
|
|
|
|
|
|
|
if source.lower() == "fscluster" and "403" in download_result["message"]: |
|
|
return f"PDF download blocked by fscluster.org (403 Forbidden). Try visiting the document page first in your browser before scraping, or use authenticated session cookies: {pdf_url}" |
|
|
else: |
|
|
return f"PDF not found in archive and download failed: {pdf_url}" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting PDF text from {pdf_url}: {str(e)}") |
|
|
return f"Error extracting PDF: {str(e)}" |
|
|
|
|
|
def extract_text_from_pdf_file(pdf_file_or_path): |
|
|
""" |
|
|
Extract text from PDF using multiple methods for better compatibility |
|
|
""" |
|
|
text_content = "" |
|
|
|
|
|
try: |
|
|
logger.info(f"π Starting PDF text extraction...") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info(f"π Trying pypdf extraction...") |
|
|
import pypdf |
|
|
|
|
|
if isinstance(pdf_file_or_path, str): |
|
|
|
|
|
logger.info(f"π Reading from file path: {pdf_file_or_path}") |
|
|
with open(pdf_file_or_path, 'rb') as file: |
|
|
pdf_reader = pypdf.PdfReader(file) |
|
|
logger.info(f"π PDF has {len(pdf_reader.pages)} pages") |
|
|
for i, page in enumerate(pdf_reader.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
else: |
|
|
|
|
|
logger.info(f"π Reading from BytesIO object") |
|
|
pdf_reader = pypdf.PdfReader(pdf_file_or_path) |
|
|
logger.info(f"π PDF has {len(pdf_reader.pages)} pages") |
|
|
for i, page in enumerate(pdf_reader.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
|
|
|
if text_content.strip(): |
|
|
logger.info(f"β
Successfully extracted text using pypdf: {len(text_content)} characters") |
|
|
return text_content.strip() |
|
|
else: |
|
|
logger.warning("β οΈ pypdf extracted no text") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ pypdf extraction failed: {str(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info(f"π Trying pdfplumber extraction...") |
|
|
import pdfplumber |
|
|
|
|
|
if isinstance(pdf_file_or_path, str): |
|
|
with pdfplumber.open(pdf_file_or_path) as pdf: |
|
|
logger.info(f"π PDF has {len(pdf.pages)} pages") |
|
|
for i, page in enumerate(pdf.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
else: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
|
temp_file.write(pdf_file_or_path.getvalue()) |
|
|
temp_file.flush() |
|
|
|
|
|
with pdfplumber.open(temp_file.name) as pdf: |
|
|
logger.info(f"π PDF has {len(pdf.pages)} pages") |
|
|
for i, page in enumerate(pdf.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
|
|
|
|
|
|
os.unlink(temp_file.name) |
|
|
logger.info(f"ποΈ Temp file cleaned up") |
|
|
|
|
|
if text_content.strip(): |
|
|
logger.info(f"β
Successfully extracted text using pdfplumber: {len(text_content)} characters") |
|
|
return text_content.strip() |
|
|
else: |
|
|
logger.warning("β οΈ pdfplumber extracted no text") |
|
|
except ImportError: |
|
|
logger.warning("β οΈ pdfplumber not available") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ pdfplumber extraction failed: {str(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info(f"π Trying PyMuPDF extraction...") |
|
|
import fitz |
|
|
|
|
|
if isinstance(pdf_file_or_path, str): |
|
|
doc = fitz.open(pdf_file_or_path) |
|
|
else: |
|
|
doc = fitz.open(stream=pdf_file_or_path.getvalue(), filetype="pdf") |
|
|
|
|
|
logger.info(f"π PDF has {doc.page_count} pages") |
|
|
for page_num in range(doc.page_count): |
|
|
page = doc.load_page(page_num) |
|
|
page_text = page.get_text() |
|
|
if page_text: |
|
|
text_content += page_text + "\n" |
|
|
|
|
|
doc.close() |
|
|
|
|
|
if text_content.strip(): |
|
|
logger.info(f"β
Successfully extracted text using PyMuPDF: {len(text_content)} characters") |
|
|
return text_content.strip() |
|
|
else: |
|
|
logger.warning("β οΈ PyMuPDF extracted no text") |
|
|
except ImportError: |
|
|
logger.warning("β οΈ PyMuPDF not available") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ PyMuPDF extraction failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
if not text_content.strip() or len(text_content.strip()) < 500: |
|
|
try: |
|
|
logger.info(f"π Trying OCR extraction as last resort...") |
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
from pdf2image import convert_from_path, convert_from_bytes |
|
|
|
|
|
if isinstance(pdf_file_or_path, str): |
|
|
|
|
|
images = convert_from_path(pdf_file_or_path, dpi=300) |
|
|
else: |
|
|
|
|
|
images = convert_from_bytes(pdf_file_or_path.getvalue(), dpi=300) |
|
|
|
|
|
logger.info(f"πΌοΈ Converted PDF to {len(images)} images for OCR") |
|
|
|
|
|
for i, image in enumerate(images): |
|
|
|
|
|
page_text = pytesseract.image_to_string(image, lang='eng') |
|
|
if page_text.strip(): |
|
|
text_content += f"Page {i+1} (OCR):\n{page_text}\n" |
|
|
logger.info(f"π OCR extracted {len(page_text)} characters from page {i+1}") |
|
|
|
|
|
if text_content.strip(): |
|
|
logger.info(f"β
Successfully extracted text using OCR: {len(text_content)} characters") |
|
|
return text_content.strip() |
|
|
else: |
|
|
logger.warning("β οΈ OCR extracted no text") |
|
|
except ImportError: |
|
|
logger.warning("β οΈ OCR libraries not available (pytesseract, pdf2image)") |
|
|
except Exception as e: |
|
|
logger.warning(f"β OCR extraction failed: {str(e)}") |
|
|
|
|
|
|
|
|
if text_content.strip(): |
|
|
logger.info(f"β οΈ Returning partial text extraction ({len(text_content.strip())} characters)") |
|
|
return text_content.strip() |
|
|
|
|
|
|
|
|
logger.warning("β All PDF extraction methods failed") |
|
|
return "PDF text extraction failed - document may be image-based or corrupted" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in PDF text extraction: {str(e)}") |
|
|
return f"PDF text extraction failed: {str(e)}" |
|
|
|
|
|
async def download_all_pdfs_from_page(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Download all PDFs from multiple pages with pagination support |
|
|
Supports both approaches: |
|
|
1. Direct PDF discovery (pdf_links only) |
|
|
2. Page links first, then PDF discovery (page_links + pdf_links) |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Starting PDF download from page: {url}") |
|
|
logger.info(f"π Source: {source}") |
|
|
|
|
|
|
|
|
if source == "mopnd": |
|
|
clear_mopnd_cache() |
|
|
|
|
|
|
|
|
reset_global_pdf_count() |
|
|
logger.info(f"π Reset global PDF counter. Limit: {MAX_PDF_LIMIT}") |
|
|
|
|
|
|
|
|
extract_table_as_csv = config.get("extract_table_as_csv", False) |
|
|
if extract_table_as_csv: |
|
|
logger.info("π Using table extraction mode: Extract table data and convert to CSV") |
|
|
return await extract_table_as_csv_file(page, url, config, source, start_date, end_date) |
|
|
|
|
|
|
|
|
page_links_selector = config.get("page_links") |
|
|
pdf_links_selector = config.get("pdf_links") |
|
|
file_links_selector = config.get("file_links") |
|
|
|
|
|
|
|
|
logger.debug(f"π Config check for source '{source}': page_links={page_links_selector}, pdf_links={pdf_links_selector}, file_links={file_links_selector}") |
|
|
|
|
|
|
|
|
|
|
|
if page_links_selector and pdf_links_selector: |
|
|
|
|
|
logger.info("π Using Approach 2: Page links first, then PDF discovery") |
|
|
return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date) |
|
|
elif page_links_selector and file_links_selector: |
|
|
|
|
|
logger.info("π Using Approach 2: Page links first, then file discovery") |
|
|
return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date) |
|
|
elif pdf_links_selector or file_links_selector: |
|
|
|
|
|
logger.info("π Using Approach 1: Direct PDF/file discovery") |
|
|
return await download_pdfs_direct(page, url, config, source, start_date, end_date) |
|
|
else: |
|
|
logger.error("β No pdf_links, file_links, or page_links configured") |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error downloading PDFs from pages: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_table_as_csv_file(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Special function to extract table data and convert to CSV |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Starting table extraction from page: {url}") |
|
|
logger.info(f"π Source: {source}") |
|
|
|
|
|
|
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
content_selector = config.get("content") |
|
|
if not content_selector: |
|
|
logger.error("β No content selector configured for table extraction") |
|
|
return [] |
|
|
|
|
|
logger.info(f"π Extracting table data using selector: {content_selector}") |
|
|
|
|
|
|
|
|
cell_elements = await page.query_selector_all(content_selector) |
|
|
logger.info(f"π Found {len(cell_elements)} table cells") |
|
|
|
|
|
if not cell_elements: |
|
|
logger.warning("β οΈ No table cells found") |
|
|
return [] |
|
|
|
|
|
|
|
|
cells_data = [] |
|
|
for element in cell_elements: |
|
|
try: |
|
|
cell_text = await element.text_content() |
|
|
if cell_text: |
|
|
cells_data.append(cell_text.strip()) |
|
|
else: |
|
|
cells_data.append("") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Error extracting cell text: {str(e)}") |
|
|
cells_data.append("") |
|
|
|
|
|
|
|
|
|
|
|
table_rows = [] |
|
|
try: |
|
|
|
|
|
row_elements = await page.query_selector_all("tr") |
|
|
if row_elements: |
|
|
logger.info(f"π Found {len(row_elements)} table rows") |
|
|
for row_element in row_elements: |
|
|
row_cells = await row_element.query_selector_all("td, th") |
|
|
row_data = [] |
|
|
for cell in row_cells: |
|
|
try: |
|
|
cell_text = await cell.text_content() |
|
|
row_data.append(cell_text.strip() if cell_text else "") |
|
|
except: |
|
|
row_data.append("") |
|
|
if row_data: |
|
|
table_rows.append(row_data) |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not extract table rows: {str(e)}") |
|
|
|
|
|
|
|
|
if cells_data: |
|
|
table_rows = [cells_data] |
|
|
|
|
|
if not table_rows: |
|
|
logger.warning("β οΈ No table rows extracted") |
|
|
return [] |
|
|
|
|
|
|
|
|
import csv |
|
|
import io |
|
|
|
|
|
csv_buffer = io.StringIO() |
|
|
csv_writer = csv.writer(csv_buffer) |
|
|
|
|
|
|
|
|
for row in table_rows: |
|
|
csv_writer.writerow(row) |
|
|
|
|
|
csv_content = csv_buffer.getvalue() |
|
|
csv_buffer.close() |
|
|
|
|
|
logger.info(f"π Generated CSV content: {len(csv_content)} characters, {len(table_rows)} rows") |
|
|
|
|
|
|
|
|
from datetime import datetime |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"river_levels_{timestamp}.csv" |
|
|
|
|
|
|
|
|
csv_bytes = csv_content.encode('utf-8') |
|
|
csv_file_path = archive_file(url, csv_bytes, source, "csv") |
|
|
|
|
|
logger.info(f"π CSV file saved to: {csv_file_path}") |
|
|
|
|
|
|
|
|
document = { |
|
|
"url": url, |
|
|
"local_path": csv_file_path, |
|
|
"size": len(csv_bytes), |
|
|
"title": f"River Levels Data - {datetime.now().strftime('%Y-%m-%d')}", |
|
|
"source": source, |
|
|
"extracted_text": f"CSV File: {filename}\nFile Path: {csv_file_path}\nTotal Rows: {len(table_rows)}\n\nPreview:\n{csv_content[:500]}...", |
|
|
"file_type": "CSV", |
|
|
"date": datetime.now().strftime("%Y-%m-%d") |
|
|
} |
|
|
|
|
|
|
|
|
increment_global_pdf_count() |
|
|
|
|
|
logger.info(f"β
Successfully extracted table data and saved as CSV") |
|
|
return [document] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting table as CSV: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def download_pdfs_direct(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Approach 1: Direct PDF discovery on listing pages |
|
|
""" |
|
|
try: |
|
|
|
|
|
navigation_selector = config.get("navigation_selector") |
|
|
navigation_url_addition = config.get("navigation_url_addition") |
|
|
start_page = config.get("start_page", 1) |
|
|
|
|
|
all_pdfs = [] |
|
|
seen_pdf_urls = set() |
|
|
current_page = start_page |
|
|
consecutive_empty_pages = 0 |
|
|
max_consecutive_empty = 2 |
|
|
|
|
|
|
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
if navigation_selector and navigation_url_addition: |
|
|
logger.info(f"π§ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}") |
|
|
logger.info(f"π Starting from page: {start_page}") |
|
|
|
|
|
while True: |
|
|
logger.info(f"π Processing page {current_page}") |
|
|
|
|
|
|
|
|
if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT: |
|
|
logger.info(f"π Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
if current_page > start_page: |
|
|
nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page)) |
|
|
nav_url = construct_navigation_url(url, nav_url_addition) |
|
|
logger.info(f"π§ Navigating to: {nav_url}") |
|
|
await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
captcha_result = await check_and_wait_for_recaptcha(page, config) |
|
|
if captcha_result == "CAPTCHA_TIMEOUT": |
|
|
logger.error("β Captcha detected but not solved within timeout period") |
|
|
return [] |
|
|
|
|
|
|
|
|
nav_element = await page.query_selector(navigation_selector) |
|
|
if current_page == start_page and nav_element: |
|
|
logger.info("β
Navigation element found, more pages available") |
|
|
elif current_page > start_page and not nav_element: |
|
|
logger.info("π No more navigation elements found, stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
if is_pdf_limit_reached(): |
|
|
logger.info(f"π Global PDF limit reached ({MAX_PDF_LIMIT}), stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date) |
|
|
|
|
|
if page_pdfs: |
|
|
|
|
|
new_pdfs = [] |
|
|
for pdf in page_pdfs: |
|
|
pdf_url = pdf.get("url", "") |
|
|
if pdf_url and pdf_url not in seen_pdf_urls: |
|
|
seen_pdf_urls.add(pdf_url) |
|
|
new_pdfs.append(pdf) |
|
|
|
|
|
if new_pdfs: |
|
|
all_pdfs.extend(new_pdfs) |
|
|
consecutive_empty_pages = 0 |
|
|
logger.info(f"π Found {len(new_pdfs)} new PDFs on page {current_page} (total: {len(page_pdfs)} PDFs on page)") |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π No new PDFs found on page {current_page} (all {len(page_pdfs)} PDFs were duplicates)") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content") |
|
|
break |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π No PDFs found on page {current_page}") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no content") |
|
|
break |
|
|
|
|
|
current_page += 1 |
|
|
|
|
|
else: |
|
|
|
|
|
logger.info("π No navigation configured - scraping single page only") |
|
|
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date) |
|
|
all_pdfs.extend(page_pdfs) |
|
|
|
|
|
logger.info(f"π Total unique PDFs found across all pages: {len(all_pdfs)}") |
|
|
return all_pdfs |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in direct PDF discovery: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def download_pdfs_via_page_links(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Approach 2: Page links first, then PDF discovery |
|
|
1. Go through pagination to collect all page links |
|
|
2. Visit each individual page link |
|
|
3. Find and download PDFs from each page |
|
|
""" |
|
|
try: |
|
|
logger.info("π Starting Approach 2: Page links first, then PDF discovery") |
|
|
|
|
|
|
|
|
logger.info("π Step 1: Collecting all page links through pagination") |
|
|
all_page_links = await collect_all_page_links(page, url, config, source) |
|
|
|
|
|
if not all_page_links: |
|
|
logger.warning("β οΈ No page links found") |
|
|
return [] |
|
|
|
|
|
logger.info(f"π Collected {len(all_page_links)} page links") |
|
|
|
|
|
|
|
|
logger.info("π Step 2: Visiting individual pages to find PDFs") |
|
|
all_pdfs = [] |
|
|
seen_pdf_urls = set() |
|
|
|
|
|
for i, page_url in enumerate(all_page_links, 1): |
|
|
if scraping_cancelled(): |
|
|
logger.info("π Scraping cancelled, stopping PDF downloads") |
|
|
break |
|
|
|
|
|
|
|
|
if is_pdf_limit_reached(): |
|
|
logger.info(f"π Global PDF limit reached ({MAX_PDF_LIMIT}), stopping page processing") |
|
|
break |
|
|
|
|
|
logger.info(f"π Processing page {i}/{len(all_page_links)}: {page_url}") |
|
|
logger.info(f"π Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}") |
|
|
|
|
|
try: |
|
|
|
|
|
await page.goto(page_url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
captcha_result = await check_and_wait_for_recaptcha(page, config) |
|
|
if captcha_result == "CAPTCHA_TIMEOUT": |
|
|
logger.error("β Captcha detected but not solved within timeout period") |
|
|
return [{ |
|
|
"title": "CAPTCHA_ERROR", |
|
|
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": page_url |
|
|
}] |
|
|
|
|
|
|
|
|
page_title = "" |
|
|
|
|
|
|
|
|
if source == "mopnd": |
|
|
|
|
|
if page_url in mopnd_article_titles: |
|
|
page_title = mopnd_article_titles[page_url] |
|
|
logger.info(f"π Using MOPND cached title from listing page: {page_title}") |
|
|
else: |
|
|
|
|
|
page_url_parsed = urlparse(page_url) |
|
|
page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', '')) |
|
|
|
|
|
|
|
|
matching_url = None |
|
|
for cached_url in mopnd_article_titles.keys(): |
|
|
cached_parsed = urlparse(cached_url) |
|
|
cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', '')) |
|
|
if cached_normalized == page_url_normalized: |
|
|
matching_url = cached_url |
|
|
break |
|
|
|
|
|
if matching_url: |
|
|
page_title = mopnd_article_titles[matching_url] |
|
|
logger.info(f"π Using MOPND cached title (matched normalized URL): {page_title}") |
|
|
else: |
|
|
logger.warning(f"β οΈ MOPND title not found in cache for URL: {page_url}") |
|
|
logger.debug(f"π Available URLs in cache: {list(mopnd_article_titles.keys())[:3]}") |
|
|
else: |
|
|
|
|
|
title_selector = config.get("title") |
|
|
if title_selector: |
|
|
try: |
|
|
title_element = await page.query_selector(title_selector) |
|
|
if title_element: |
|
|
page_title = await title_element.text_content() |
|
|
if page_title: |
|
|
page_title = page_title.strip() |
|
|
logger.info(f"π Extracted title from page: {page_title}") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element found but no text content") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element not found with selector: {title_selector}") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Error extracting title from page: {str(e)}") |
|
|
|
|
|
|
|
|
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date, use_page_title_for_pdfs=True, page_title=page_title) |
|
|
|
|
|
if page_pdfs: |
|
|
|
|
|
new_pdfs = [] |
|
|
for pdf in page_pdfs: |
|
|
pdf_url = pdf.get("url", "") |
|
|
if pdf_url and pdf_url not in seen_pdf_urls: |
|
|
seen_pdf_urls.add(pdf_url) |
|
|
new_pdfs.append(pdf) |
|
|
|
|
|
if new_pdfs: |
|
|
all_pdfs.extend(new_pdfs) |
|
|
logger.info(f"π Found {len(new_pdfs)} new PDFs on page {i} (total: {len(page_pdfs)} PDFs on page)") |
|
|
else: |
|
|
logger.info(f"π No new PDFs found on page {i} (all {len(page_pdfs)} PDFs were duplicates)") |
|
|
else: |
|
|
logger.info(f"π No PDFs found on page {i}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error processing page {i} ({page_url}): {str(e)}") |
|
|
continue |
|
|
|
|
|
logger.info(f"π Total unique PDFs found across all pages: {len(all_pdfs)}") |
|
|
|
|
|
|
|
|
if all_pdfs: |
|
|
logger.info(f"π Sample PDF structure: {all_pdfs[0]}") |
|
|
else: |
|
|
logger.warning("β οΈ No PDFs found - this might be the issue") |
|
|
|
|
|
return all_pdfs |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in page-links-first approach: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def check_and_wait_for_recaptcha(page, config: dict) -> bool: |
|
|
""" |
|
|
Check if recaptcha is present on the page and wait for user to solve it |
|
|
|
|
|
Returns: |
|
|
True if recaptcha was detected and handled, False otherwise |
|
|
""" |
|
|
from scraper_common import set_captcha_status, clear_captcha_status |
|
|
|
|
|
recaptcha_text = config.get("recaptcha_text") |
|
|
if not recaptcha_text: |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
page_content = await page.content() |
|
|
if recaptcha_text.lower() in page_content.lower(): |
|
|
logger.warning(f"π‘οΈ Recaptcha detected on page: {recaptcha_text}") |
|
|
logger.info("β³ Waiting for user to solve recaptcha (max 60 seconds)...") |
|
|
logger.info("π‘ Please solve the recaptcha in the browser window") |
|
|
|
|
|
|
|
|
set_captcha_status("π‘οΈ Captcha detected! Please complete the captcha challenge in the browser window. Waiting for you to solve it...") |
|
|
|
|
|
|
|
|
max_wait_time = 60 |
|
|
wait_interval = 2 |
|
|
waited_time = 0 |
|
|
|
|
|
while waited_time < max_wait_time: |
|
|
await asyncio.sleep(wait_interval) |
|
|
waited_time += wait_interval |
|
|
|
|
|
|
|
|
remaining_time = max_wait_time - waited_time |
|
|
set_captcha_status(f"π‘οΈ Captcha detected! Please complete the captcha challenge in the browser window. Time remaining: {remaining_time}s...") |
|
|
|
|
|
|
|
|
current_content = await page.content() |
|
|
if recaptcha_text.lower() not in current_content.lower(): |
|
|
logger.info("β
Recaptcha appears to be solved, continuing...") |
|
|
|
|
|
clear_captcha_status() |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
return True |
|
|
|
|
|
logger.debug(f"β³ Still waiting for recaptcha to be solved... ({waited_time}/{max_wait_time}s)") |
|
|
|
|
|
logger.warning(f"β οΈ Recaptcha wait timeout ({max_wait_time}s). Continuing anyway...") |
|
|
|
|
|
clear_captcha_status() |
|
|
|
|
|
return "CAPTCHA_TIMEOUT" |
|
|
else: |
|
|
|
|
|
clear_captcha_status() |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Error checking for recaptcha: {str(e)}") |
|
|
clear_captcha_status() |
|
|
return False |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
async def collect_all_page_links(page, url: str, config: dict, source: str) -> List[str]: |
|
|
""" |
|
|
Collect all page links through pagination |
|
|
""" |
|
|
try: |
|
|
logger.info("π Starting page link collection through pagination") |
|
|
|
|
|
|
|
|
navigation_selector = config.get("navigation_selector") |
|
|
navigation_url_addition = config.get("navigation_url_addition") |
|
|
start_page = config.get("start_page", 1) |
|
|
page_links_selector = config.get("page_links") |
|
|
|
|
|
if not page_links_selector: |
|
|
logger.error("β No page_links selector configured") |
|
|
return [] |
|
|
|
|
|
all_page_links = [] |
|
|
seen_page_urls = set() |
|
|
current_page = start_page |
|
|
consecutive_empty_pages = 0 |
|
|
max_consecutive_empty = 2 |
|
|
|
|
|
|
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
captcha_result = await check_and_wait_for_recaptcha(page, config) |
|
|
if captcha_result == "CAPTCHA_TIMEOUT": |
|
|
logger.error("β Captcha detected but not solved within timeout period") |
|
|
return [{ |
|
|
"title": "CAPTCHA_ERROR", |
|
|
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
if navigation_selector and navigation_url_addition: |
|
|
logger.info(f"π§ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}") |
|
|
logger.info(f"π Starting from page: {start_page}") |
|
|
|
|
|
while True: |
|
|
logger.info(f"π Collecting page links from page {current_page}") |
|
|
|
|
|
|
|
|
if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT: |
|
|
logger.info(f"π Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
if current_page > start_page: |
|
|
nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page)) |
|
|
nav_url = construct_navigation_url(url, nav_url_addition) |
|
|
logger.info(f"π§ Navigating to: {nav_url}") |
|
|
await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
captcha_result = await check_and_wait_for_recaptcha(page, config) |
|
|
if captcha_result == "CAPTCHA_TIMEOUT": |
|
|
logger.error("β Captcha detected but not solved within timeout period") |
|
|
return [] |
|
|
|
|
|
|
|
|
nav_element = await page.query_selector(navigation_selector) |
|
|
if current_page == start_page and nav_element: |
|
|
logger.info("β
Navigation element found, more pages available") |
|
|
|
|
|
elif current_page > start_page and not nav_element: |
|
|
logger.info("π No more navigation elements found, stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
if source == "mopnd": |
|
|
page_links = await extract_mopnd_page_links_with_dates(page, config) |
|
|
else: |
|
|
page_links = await extract_page_links_from_current_page(page, config) |
|
|
|
|
|
if page_links: |
|
|
|
|
|
new_page_links = [] |
|
|
for page_link in page_links: |
|
|
if page_link and page_link not in seen_page_urls: |
|
|
seen_page_urls.add(page_link) |
|
|
new_page_links.append(page_link) |
|
|
|
|
|
if new_page_links: |
|
|
all_page_links.extend(new_page_links) |
|
|
consecutive_empty_pages = 0 |
|
|
logger.info(f"π Found {len(new_page_links)} new page links on page {current_page} (total: {len(page_links)} page links on page)") |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π No new page links found on page {current_page} (all {len(page_links)} page links were duplicates)") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content") |
|
|
break |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π No page links found on page {current_page}") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no content") |
|
|
break |
|
|
|
|
|
current_page += 1 |
|
|
|
|
|
else: |
|
|
|
|
|
logger.info("π No navigation configured - collecting page links from single page only") |
|
|
|
|
|
if source == "mopnd": |
|
|
page_links = await extract_mopnd_page_links_with_dates(page, config) |
|
|
else: |
|
|
page_links = await extract_page_links_from_current_page(page, config) |
|
|
all_page_links.extend(page_links) |
|
|
|
|
|
logger.info(f"π Total unique page links collected: {len(all_page_links)}") |
|
|
return all_page_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error collecting page links: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_page_links_from_current_page(page, config: dict) -> List[str]: |
|
|
""" |
|
|
Extract page links from the current page |
|
|
""" |
|
|
try: |
|
|
|
|
|
page_links = [] |
|
|
page_links_selector = config.get("page_links") |
|
|
|
|
|
if isinstance(page_links_selector, list): |
|
|
for selector in page_links_selector: |
|
|
logger.info(f"π Looking for page links with selector: {selector}") |
|
|
elements = await page.query_selector_all(selector) |
|
|
logger.info(f"π° Found {len(elements)} elements with selector: {selector}") |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
else: |
|
|
|
|
|
|
|
|
link_element = await element.query_selector("a") |
|
|
if link_element: |
|
|
href = await link_element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
parent = await element.evaluate_handle("el => el.parentElement") |
|
|
if parent: |
|
|
parent_link = await parent.query_selector("a") |
|
|
if parent_link: |
|
|
href = await parent_link.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not find link in parent: {str(e)}") |
|
|
elif isinstance(page_links_selector, str): |
|
|
logger.info(f"π Looking for page links with selector: {page_links_selector}") |
|
|
elements = await page.query_selector_all(page_links_selector) |
|
|
logger.info(f"π° Found {len(elements)} elements with selector: {page_links_selector}") |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
else: |
|
|
|
|
|
|
|
|
link_element = await element.query_selector("a") |
|
|
if link_element: |
|
|
href = await link_element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
parent = await element.evaluate_handle("el => el.parentElement") |
|
|
if parent: |
|
|
parent_link = await parent.query_selector("a") |
|
|
if parent_link: |
|
|
href = await parent_link.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not find link in parent: {str(e)}") |
|
|
|
|
|
logger.info(f"π Found {len(page_links)} page links on current page") |
|
|
return page_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting page links from current page: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_mopnd_page_links_with_dates(page, config: dict) -> List[str]: |
|
|
""" |
|
|
Extract MOPND page links with dates and titles (special handling for MOPND) |
|
|
""" |
|
|
try: |
|
|
logger.info("π Extracting MOPND page links with dates and titles") |
|
|
|
|
|
|
|
|
page_links_selector = config.get("page_links") |
|
|
if not page_links_selector: |
|
|
logger.warning("β οΈ No page_links selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
date_selector = config.get("date") |
|
|
if not date_selector: |
|
|
logger.warning("β οΈ No date selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
title_selector = config.get("title") |
|
|
if not title_selector: |
|
|
logger.warning("β οΈ No title selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
logger.info(f"π Looking for page links with selector: {page_links_selector}") |
|
|
link_elements = await page.query_selector_all(page_links_selector) |
|
|
logger.info(f"π° Found {len(link_elements)} page link elements") |
|
|
|
|
|
|
|
|
logger.info(f"π Looking for dates with selector: {date_selector}") |
|
|
date_elements = await page.query_selector_all(date_selector) |
|
|
logger.info(f"π
Found {len(date_elements)} date elements") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
page_links = [] |
|
|
for i, link_element in enumerate(link_elements): |
|
|
try: |
|
|
|
|
|
href = await link_element.get_attribute("href") |
|
|
if href: |
|
|
|
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
|
|
|
|
|
|
try: |
|
|
title_text = await link_element.text_content() |
|
|
if title_text and title_text.strip(): |
|
|
|
|
|
mopnd_article_titles[absolute_url] = title_text.strip() |
|
|
logger.debug(f"β
Stored title for {absolute_url}: {title_text.strip()}") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract title from link {i}: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
date_found = False |
|
|
if i < len(date_elements): |
|
|
try: |
|
|
date_text = await date_elements[i].text_content() |
|
|
if date_text and date_text.strip(): |
|
|
|
|
|
mopnd_article_dates[absolute_url] = date_text.strip() |
|
|
logger.debug(f"β
Stored date for {absolute_url}: {date_text.strip()}") |
|
|
date_found = True |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract date for link {i}: {str(e)}") |
|
|
|
|
|
|
|
|
if not date_found: |
|
|
try: |
|
|
|
|
|
parent = await link_element.evaluate_handle("el => el.closest('.post_info, .post, [class*=\"post\"], [class*=\"item\"], [class*=\"entry\"]')") |
|
|
if parent: |
|
|
|
|
|
date_in_parent = await parent.query_selector(date_selector) |
|
|
if date_in_parent: |
|
|
date_text = await date_in_parent.text_content() |
|
|
if date_text and date_text.strip(): |
|
|
mopnd_article_dates[absolute_url] = date_text.strip() |
|
|
logger.debug(f"β
Stored date from parent container for {absolute_url}: {date_text.strip()}") |
|
|
date_found = True |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not find date in parent container: {str(e)}") |
|
|
|
|
|
if not date_found: |
|
|
logger.warning(f"β οΈ Could not extract date for link {i} ({absolute_url})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β Error extracting link {i}: {str(e)}") |
|
|
continue |
|
|
|
|
|
logger.info(f"π Found {len(page_links)} MOPND page links with dates and titles") |
|
|
logger.info(f"π Stored {len(mopnd_article_titles)} titles and {len(mopnd_article_dates)} dates") |
|
|
|
|
|
|
|
|
if mopnd_article_titles: |
|
|
sample_titles = list(mopnd_article_titles.items())[:3] |
|
|
logger.debug(f"π Sample titles: {sample_titles}") |
|
|
if mopnd_article_dates: |
|
|
sample_dates = list(mopnd_article_dates.items())[:3] |
|
|
logger.debug(f"π Sample dates: {sample_dates}") |
|
|
|
|
|
return page_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting MOPND page links: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def _extract_nbs_pdfs_grouped_by_title(page, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Special NBS handler: Multiple titles on one page, each title can have multiple PDFs |
|
|
Approach 1: Extract all titles and PDFs, then group PDFs sequentially by title |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π· NBS special handling (Approach 1): Processing multiple titles with grouped PDFs") |
|
|
|
|
|
|
|
|
title_selector = config.get("title") |
|
|
titles = [] |
|
|
if title_selector: |
|
|
try: |
|
|
title_elements = await page.query_selector_all(title_selector) |
|
|
for element in title_elements: |
|
|
try: |
|
|
title_text = await element.text_content() |
|
|
if title_text: |
|
|
title_text = title_text.strip() |
|
|
titles.append(title_text) |
|
|
logger.debug(f"π Found title: {title_text}") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract title text: {str(e)}") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Error extracting titles: {str(e)}") |
|
|
|
|
|
if not titles: |
|
|
logger.warning("β οΈ No titles found on NBS page, falling back to standard processing") |
|
|
return [] |
|
|
|
|
|
logger.info(f"π Found {len(titles)} titles on page") |
|
|
|
|
|
|
|
|
pdf_selector = config.get("pdf_links") |
|
|
all_pdf_links = [] |
|
|
if isinstance(pdf_selector, list): |
|
|
for selector in pdf_selector: |
|
|
try: |
|
|
elements = await page.query_selector_all(selector) |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
pdf_name = link_text.strip() if link_text else "" |
|
|
except: |
|
|
pdf_name = "" |
|
|
|
|
|
if not pdf_name: |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
pdf_name = unquote(os.path.basename(url_path)) |
|
|
if pdf_name.lower().endswith('.pdf'): |
|
|
pdf_name = pdf_name[:-4] |
|
|
|
|
|
|
|
|
if pdf_name and pdf_name.strip().lower() == "read more": |
|
|
logger.debug(f"βοΈ Skipping PDF with 'Read More' name: {absolute_url}") |
|
|
continue |
|
|
|
|
|
all_pdf_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": pdf_name |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Error with PDF selector '{selector}': {str(e)}") |
|
|
elif isinstance(pdf_selector, str): |
|
|
try: |
|
|
elements = await page.query_selector_all(pdf_selector) |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
pdf_name = link_text.strip() if link_text else "" |
|
|
except: |
|
|
pdf_name = "" |
|
|
|
|
|
if not pdf_name: |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
pdf_name = unquote(os.path.basename(url_path)) |
|
|
if pdf_name.lower().endswith('.pdf'): |
|
|
pdf_name = pdf_name[:-4] |
|
|
|
|
|
|
|
|
if pdf_name and pdf_name.strip().lower() == "read more": |
|
|
logger.debug(f"βοΈ Skipping PDF with 'Read More' name: {absolute_url}") |
|
|
continue |
|
|
|
|
|
all_pdf_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": pdf_name |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Error extracting PDF elements: {str(e)}") |
|
|
|
|
|
logger.info(f"π Found {len(all_pdf_links)} PDF links on page") |
|
|
|
|
|
if not all_pdf_links: |
|
|
logger.warning("β οΈ No PDF links found on NBS page") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
pdfs_per_title = len(all_pdf_links) // len(titles) if len(titles) > 0 else 0 |
|
|
remainder = len(all_pdf_links) % len(titles) |
|
|
|
|
|
title_pdf_groups = [] |
|
|
pdf_index = 0 |
|
|
|
|
|
for i, title in enumerate(titles): |
|
|
|
|
|
num_pdfs = pdfs_per_title + (1 if i < remainder else 0) |
|
|
|
|
|
|
|
|
title_pdfs = all_pdf_links[pdf_index:pdf_index + num_pdfs] |
|
|
pdf_index += num_pdfs |
|
|
|
|
|
if title_pdfs: |
|
|
title_pdf_groups.append({ |
|
|
"title": title, |
|
|
"pdfs": title_pdfs |
|
|
}) |
|
|
logger.info(f"π Title '{title}': {len(title_pdfs)} associated PDFs") |
|
|
|
|
|
if not title_pdf_groups: |
|
|
logger.warning("β οΈ No title-PDF groups created") |
|
|
return [] |
|
|
|
|
|
|
|
|
date_selector = config.get("date") |
|
|
date_elements = [] |
|
|
if date_selector: |
|
|
try: |
|
|
date_elements = await page.query_selector_all(date_selector) |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract date elements: {str(e)}") |
|
|
|
|
|
|
|
|
all_documents = [] |
|
|
|
|
|
for group_idx, group in enumerate(title_pdf_groups): |
|
|
if scraping_cancelled(): |
|
|
logger.info("π Scraping cancelled, stopping NBS processing") |
|
|
break |
|
|
|
|
|
if is_pdf_limit_reached(): |
|
|
logger.info(f"π Global PDF limit reached ({MAX_PDF_LIMIT}), stopping NBS processing") |
|
|
break |
|
|
|
|
|
title = group["title"] |
|
|
pdf_list = group["pdfs"] |
|
|
|
|
|
logger.info(f"π· Processing title {group_idx+1}/{len(title_pdf_groups)}: '{title}' ({len(pdf_list)} PDFs)") |
|
|
|
|
|
|
|
|
successful_pdfs = [] |
|
|
combined_text_parts = [] |
|
|
all_pdf_paths = [] |
|
|
total_size = 0 |
|
|
|
|
|
for pdf_idx, pdf_info in enumerate(pdf_list): |
|
|
if scraping_cancelled(): |
|
|
break |
|
|
|
|
|
if is_pdf_limit_reached(): |
|
|
break |
|
|
|
|
|
pdf_url = pdf_info["url"] |
|
|
pdf_link_name = pdf_info.get("name", "") or f"PDF {pdf_idx+1}" |
|
|
|
|
|
|
|
|
if pdf_link_name and pdf_link_name.strip().lower() == "read more": |
|
|
logger.info(f" βοΈ Skipping PDF with 'Read More' name: {pdf_url}") |
|
|
continue |
|
|
|
|
|
logger.info(f" β¬οΈ Trying PDF {pdf_idx+1}/{len(pdf_list)}: {pdf_link_name}") |
|
|
|
|
|
try: |
|
|
download_result = download_and_save_pdf(pdf_url, source) |
|
|
if download_result["success"]: |
|
|
local_pdf_path = download_result["path"] |
|
|
extracted_text = extract_text_from_pdf_file(local_pdf_path) |
|
|
|
|
|
if extracted_text and len(extracted_text.strip()) > 10: |
|
|
current_count = increment_global_pdf_count() |
|
|
|
|
|
successful_pdfs.append({ |
|
|
"url": pdf_url, |
|
|
"path": local_pdf_path, |
|
|
"name": pdf_link_name, |
|
|
"size": download_result["size"], |
|
|
"text": extracted_text |
|
|
}) |
|
|
|
|
|
combined_text_parts.append(f"=== {pdf_link_name} ===\n{extracted_text}") |
|
|
all_pdf_paths.append(local_pdf_path) |
|
|
total_size += download_result["size"] |
|
|
|
|
|
logger.info(f" β
Successfully processed PDF '{pdf_link_name}' (Global: {current_count}/{MAX_PDF_LIMIT})") |
|
|
else: |
|
|
logger.warning(f" β οΈ PDF downloaded but no text extracted: {pdf_link_name}") |
|
|
else: |
|
|
logger.warning(f" β Failed to download PDF: {download_result.get('message', 'Unknown error')}") |
|
|
except Exception as e: |
|
|
logger.error(f" β Error processing PDF: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
if successful_pdfs: |
|
|
|
|
|
pdf_date_raw = "" |
|
|
if date_elements: |
|
|
date_idx = min(group_idx, len(date_elements) - 1) |
|
|
try: |
|
|
date_text = await date_elements[date_idx].text_content() |
|
|
if date_text: |
|
|
pdf_date_raw = date_text.strip() |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
pdf_date = standardize_date(pdf_date_raw, default_to_current=True) |
|
|
if not pdf_date: |
|
|
pdf_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
if start_date or end_date: |
|
|
start_dt = parse_date_input(start_date) if start_date else None |
|
|
end_dt = parse_date_input(end_date) if end_date else None |
|
|
if not is_date_in_range(pdf_date, start_dt, end_dt, include_missing=False): |
|
|
logger.info(f"π
Title date {pdf_date} is outside date range - skipping") |
|
|
continue |
|
|
|
|
|
|
|
|
combined_text = "\n\n".join(combined_text_parts) |
|
|
primary_path = all_pdf_paths[0] if all_pdf_paths else "" |
|
|
|
|
|
all_documents.append({ |
|
|
"url": successful_pdfs[0]["url"], |
|
|
"local_path": primary_path, |
|
|
"size": total_size, |
|
|
"title": title, |
|
|
"source": source, |
|
|
"extracted_text": combined_text, |
|
|
"file_type": "PDF", |
|
|
"date": pdf_date, |
|
|
"nbs_pdf_count": len(successful_pdfs), |
|
|
"nbs_all_paths": all_pdf_paths |
|
|
}) |
|
|
|
|
|
logger.info(f"β
Created document for title '{title}' with {len(successful_pdfs)}/{len(pdf_list)} successful PDFs") |
|
|
else: |
|
|
logger.warning(f"β οΈ No PDFs successfully processed for title: '{title}' - moving forward") |
|
|
|
|
|
logger.info(f"π NBS Processing Summary: {len(all_documents)} documents created from {len(title_pdf_groups)} titles") |
|
|
return all_documents |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in NBS PDF extraction: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_pdfs_from_current_page(page, config: dict, source: str, start_date: str = None, end_date: str = None, use_page_title_for_pdfs: bool = False, page_title: str = None) -> List[dict]: |
|
|
""" |
|
|
Extract PDFs from the current page |
|
|
Special handling for NBS: Multiple titles on one page, each title can have multiple PDFs |
|
|
|
|
|
Args: |
|
|
page: Playwright page object |
|
|
config: Website configuration dict |
|
|
source: Source name |
|
|
start_date: Optional start date for filtering |
|
|
end_date: Optional end date for filtering |
|
|
use_page_title_for_pdfs: If True, use page title for PDFs (Approach 2 behavior) |
|
|
page_title: Pre-extracted page title (optional, will extract if not provided and use_page_title_for_pdfs is True) |
|
|
""" |
|
|
try: |
|
|
|
|
|
is_nbs = source.lower() in ["nbs", "nbs somalia"] |
|
|
if is_nbs: |
|
|
return await _extract_nbs_pdfs_grouped_by_title(page, config, source, start_date, end_date) |
|
|
|
|
|
|
|
|
|
|
|
pdf_links = [] |
|
|
pdf_selector = config.get("pdf_links") |
|
|
|
|
|
if isinstance(pdf_selector, list): |
|
|
for selector in pdf_selector: |
|
|
elements = await page.query_selector_all(selector) |
|
|
for element in elements: |
|
|
|
|
|
href = await element.get_attribute("href") |
|
|
if not href: |
|
|
href = await element.get_attribute("button-url") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
|
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
pdf_name = link_text.strip() if link_text else "" |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract link text: {str(e)}") |
|
|
pdf_name = "" |
|
|
|
|
|
|
|
|
if not pdf_name: |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
pdf_name = unquote(os.path.basename(url_path)) |
|
|
|
|
|
if pdf_name.lower().endswith('.pdf'): |
|
|
pdf_name = pdf_name[:-4] |
|
|
|
|
|
pdf_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": pdf_name, |
|
|
"file_type": "PDF" |
|
|
}) |
|
|
elif isinstance(pdf_selector, str): |
|
|
elements = await page.query_selector_all(pdf_selector) |
|
|
for element in elements: |
|
|
|
|
|
href = await element.get_attribute("href") |
|
|
if not href: |
|
|
href = await element.get_attribute("button-url") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
|
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
pdf_name = link_text.strip() if link_text else "" |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract link text: {str(e)}") |
|
|
pdf_name = "" |
|
|
|
|
|
|
|
|
if not pdf_name: |
|
|
from urllib.parse import unquote |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
pdf_name = unquote(os.path.basename(url_path)) |
|
|
|
|
|
if pdf_name.lower().endswith('.pdf'): |
|
|
pdf_name = pdf_name[:-4] |
|
|
|
|
|
pdf_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": pdf_name, |
|
|
"file_type": "PDF" |
|
|
}) |
|
|
|
|
|
|
|
|
file_links = [] |
|
|
file_selector = config.get("file_links") |
|
|
|
|
|
if file_selector: |
|
|
|
|
|
file_type = "CSV" |
|
|
|
|
|
if isinstance(file_selector, list): |
|
|
for selector in file_selector: |
|
|
elements = await page.query_selector_all(selector) |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
|
|
|
if absolute_url.lower().endswith('.csv'): |
|
|
file_type = "CSV" |
|
|
elif absolute_url.lower().endswith(('.xlsx', '.xls')): |
|
|
file_type = "XLSX" |
|
|
elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
|
|
file_type = "PNG" |
|
|
else: |
|
|
file_type = "CSV" |
|
|
|
|
|
|
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
file_name = link_text.strip() if link_text else "" |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract link text: {str(e)}") |
|
|
file_name = "" |
|
|
|
|
|
|
|
|
if not file_name: |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
file_name = unquote(os.path.basename(url_path)) |
|
|
|
|
|
for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']: |
|
|
if file_name.lower().endswith(ext): |
|
|
file_name = file_name[:-len(ext)] |
|
|
break |
|
|
|
|
|
file_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": file_name, |
|
|
"file_type": file_type |
|
|
}) |
|
|
elif isinstance(file_selector, str): |
|
|
elements = await page.query_selector_all(file_selector) |
|
|
for element in elements: |
|
|
href = await element.get_attribute("href") |
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
|
|
|
if absolute_url.lower().endswith('.csv'): |
|
|
file_type = "CSV" |
|
|
elif absolute_url.lower().endswith(('.xlsx', '.xls')): |
|
|
file_type = "XLSX" |
|
|
elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
|
|
file_type = "PNG" |
|
|
else: |
|
|
file_type = "CSV" |
|
|
|
|
|
|
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
file_name = link_text.strip() if link_text else "" |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract link text: {str(e)}") |
|
|
file_name = "" |
|
|
|
|
|
|
|
|
if not file_name: |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
file_name = unquote(os.path.basename(url_path)) |
|
|
|
|
|
for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']: |
|
|
if file_name.lower().endswith(ext): |
|
|
file_name = file_name[:-len(ext)] |
|
|
break |
|
|
|
|
|
file_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": file_name, |
|
|
"file_type": file_type |
|
|
}) |
|
|
|
|
|
|
|
|
all_links = pdf_links + file_links |
|
|
|
|
|
logger.info(f"π Found {len(pdf_links)} PDF links and {len(file_links)} file links on current page (total: {len(all_links)})") |
|
|
|
|
|
|
|
|
csv_files = [link for link in file_links if link.get("file_type") == "CSV"] |
|
|
if csv_files: |
|
|
logger.info(f"π Found {len(csv_files)} CSV file(s) to process:") |
|
|
for csv_file in csv_files: |
|
|
logger.info(f" - CSV: {csv_file.get('name', 'Unknown')} at {csv_file.get('url', 'Unknown URL')}") |
|
|
|
|
|
|
|
|
if page_title is None: |
|
|
page_title = "" |
|
|
title_selector = config.get("title") |
|
|
if title_selector: |
|
|
try: |
|
|
title_element = await page.query_selector(title_selector) |
|
|
if title_element: |
|
|
page_title = await title_element.text_content() |
|
|
if page_title: |
|
|
page_title = page_title.strip() |
|
|
logger.info(f"π Extracted page title: {page_title}") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element found but no text content") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element not found with selector: {title_selector}") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Error extracting page title: {str(e)}") |
|
|
elif page_title: |
|
|
logger.info(f"π Using provided page title: {page_title}") |
|
|
|
|
|
|
|
|
date_selector = config.get("date") |
|
|
date_elements = [] |
|
|
if date_selector: |
|
|
try: |
|
|
date_elements = await page.query_selector_all(date_selector) |
|
|
logger.debug(f"π
Found {len(date_elements)} date elements on current page") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract date elements: {str(e)}") |
|
|
|
|
|
|
|
|
downloaded_pdfs = [] |
|
|
for i, file_info in enumerate(all_links): |
|
|
if scraping_cancelled(): |
|
|
logger.info("π Scraping cancelled, stopping file downloads") |
|
|
break |
|
|
|
|
|
|
|
|
if is_pdf_limit_reached(): |
|
|
logger.info(f"π Global PDF limit reached ({MAX_PDF_LIMIT}), stopping file processing") |
|
|
break |
|
|
|
|
|
file_url = file_info["url"] |
|
|
file_name = file_info.get("name", "") |
|
|
file_type = file_info.get("file_type", "PDF") |
|
|
|
|
|
|
|
|
if use_page_title_for_pdfs and page_title: |
|
|
|
|
|
file_name = page_title |
|
|
logger.info(f"π Using page title for {file_type} (Approach 2): {file_name}") |
|
|
elif file_name and file_name != "": |
|
|
|
|
|
|
|
|
file_name = " ".join(file_name.split()) |
|
|
logger.info(f"π Using {file_type} link text as name: {file_name}") |
|
|
elif page_title: |
|
|
|
|
|
file_name = page_title |
|
|
logger.info(f"π Using page title as fallback for {file_type}: {file_name}") |
|
|
else: |
|
|
|
|
|
current_count = get_global_pdf_count() + 1 |
|
|
file_name = f"{file_type} {current_count}" |
|
|
logger.info(f"π Using fallback name: {file_name}") |
|
|
|
|
|
logger.info(f"β¬οΈ Downloading {file_type} {i+1}/{len(all_links)}: {file_url}") |
|
|
logger.info(f"π {file_type} name: {file_name}") |
|
|
logger.info(f"π Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}") |
|
|
|
|
|
try: |
|
|
|
|
|
if file_type == "PDF": |
|
|
download_result = download_and_save_pdf(file_url, source) |
|
|
else: |
|
|
|
|
|
download_result = download_and_save_file(file_url, source, file_type.lower()) |
|
|
|
|
|
if download_result["success"]: |
|
|
local_file_path = download_result["path"] |
|
|
extracted_text = "" |
|
|
|
|
|
|
|
|
if file_type == "PDF": |
|
|
logger.info(f"π Extracting text from local file: {local_file_path}") |
|
|
extracted_text = extract_text_from_pdf_file(local_file_path) |
|
|
logger.info(f"π Extracted text length: {len(extracted_text)} characters") |
|
|
if not extracted_text: |
|
|
logger.warning("β οΈ No text extracted from PDF") |
|
|
elif file_type == "CSV": |
|
|
|
|
|
try: |
|
|
import csv |
|
|
logger.info(f"π Reading CSV file preview: {local_file_path}") |
|
|
with open(local_file_path, 'r', encoding='utf-8', errors='ignore') as csv_file: |
|
|
csv_reader = csv.reader(csv_file) |
|
|
|
|
|
preview_rows = [] |
|
|
for idx, row in enumerate(csv_reader): |
|
|
if idx >= 10: |
|
|
break |
|
|
preview_rows.append(row) |
|
|
|
|
|
|
|
|
if preview_rows: |
|
|
|
|
|
headers = preview_rows[0] if len(preview_rows) > 0 else [] |
|
|
data_rows = preview_rows[1:] if len(preview_rows) > 1 else [] |
|
|
|
|
|
|
|
|
location_info = "" |
|
|
if source == "icpac_seasonal_forecast" and file_name: |
|
|
location_info = f"Location: {file_name}\n" |
|
|
|
|
|
|
|
|
preview_text = f"CSV File: {file_name}\n" |
|
|
if location_info: |
|
|
preview_text += location_info |
|
|
preview_text += f"File Path: {local_file_path}\n" |
|
|
preview_text += f"Total Rows Previewed: {len(preview_rows)}\n\n" |
|
|
|
|
|
if headers: |
|
|
preview_text += "Headers: " + ", ".join(str(h) for h in headers) + "\n\n" |
|
|
|
|
|
if data_rows: |
|
|
preview_text += "Sample Data (first few rows):\n" |
|
|
for row in data_rows[:5]: |
|
|
preview_text += ", ".join(str(cell) for cell in row) + "\n" |
|
|
|
|
|
extracted_text = preview_text |
|
|
logger.info(f"π CSV preview extracted: {len(extracted_text)} characters") |
|
|
else: |
|
|
location_info = "" |
|
|
if source == "icpac_seasonal_forecast" and file_name: |
|
|
location_info = f"Location: {file_name}\n" |
|
|
extracted_text = f"CSV File: {file_name}\n" |
|
|
if location_info: |
|
|
extracted_text += location_info |
|
|
extracted_text += f"File Path: {local_file_path}\n(File is empty or could not be read)" |
|
|
logger.warning("β οΈ CSV file appears to be empty") |
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Could not read CSV preview: {str(e)}") |
|
|
location_info = "" |
|
|
if source == "icpac_seasonal_forecast" and file_name: |
|
|
location_info = f"Location: {file_name}\n" |
|
|
extracted_text = f"CSV File: {file_name}\n" |
|
|
if location_info: |
|
|
extracted_text += location_info |
|
|
extracted_text += f"File Path: {local_file_path}\n(Preview could not be generated: {str(e)})" |
|
|
elif file_type == "PNG": |
|
|
|
|
|
location_info = "" |
|
|
if source == "icpac_seasonal_forecast" and file_name: |
|
|
location_info = f"Location: {file_name}\n" |
|
|
|
|
|
extracted_text = f"PNG File: {file_name}\n" |
|
|
if location_info: |
|
|
extracted_text += location_info |
|
|
extracted_text += f"File Path: {local_file_path}\n" |
|
|
extracted_text += "(PNG image file downloaded successfully)" |
|
|
logger.info(f"π PNG file info extracted: {file_name}") |
|
|
else: |
|
|
|
|
|
logger.info(f"π {file_type} file downloaded (no text extraction needed)") |
|
|
extracted_text = f"{file_type} File: {file_name}\nFile Path: {local_file_path}" |
|
|
|
|
|
|
|
|
file_date_raw = "" |
|
|
if source == "mopnd": |
|
|
|
|
|
current_page_url = page.url |
|
|
|
|
|
if current_page_url in mopnd_article_dates: |
|
|
file_date_raw = mopnd_article_dates[current_page_url] |
|
|
logger.debug(f"β
Using MOPND date from cache (page URL: {current_page_url}): {file_date_raw}") |
|
|
else: |
|
|
|
|
|
page_url_parsed = urlparse(current_page_url) |
|
|
page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', '')) |
|
|
|
|
|
|
|
|
matching_url = None |
|
|
for cached_url in mopnd_article_dates.keys(): |
|
|
cached_parsed = urlparse(cached_url) |
|
|
cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', '')) |
|
|
if cached_normalized == page_url_normalized: |
|
|
matching_url = cached_url |
|
|
break |
|
|
|
|
|
if matching_url: |
|
|
file_date_raw = mopnd_article_dates[matching_url] |
|
|
logger.debug(f"β
Using MOPND date from cache (matched normalized URL): {file_date_raw}") |
|
|
else: |
|
|
logger.warning(f"β οΈ MOPND date not found in cache for page URL: {current_page_url}") |
|
|
logger.debug(f"π Available page URLs in cache: {list(mopnd_article_dates.keys())[:3]}") |
|
|
elif i < len(date_elements): |
|
|
try: |
|
|
date_text = await date_elements[i].text_content() |
|
|
if date_text: |
|
|
file_date_raw = date_text.strip() |
|
|
logger.debug(f"β
Extracted raw date from listing page: {file_date_raw}") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract date for {file_type} {i+1}: {str(e)}") |
|
|
|
|
|
|
|
|
file_date = standardize_date(file_date_raw, default_to_current=True) |
|
|
if not file_date: |
|
|
file_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
if start_date or end_date: |
|
|
start_dt = parse_date_input(start_date) if start_date else None |
|
|
end_dt = parse_date_input(end_date) if end_date else None |
|
|
if not is_date_in_range(file_date, start_dt, end_dt, include_missing=False): |
|
|
logger.info(f"π
{file_type} date {file_date} is outside date range [{start_date}, {end_date}] - filtering out") |
|
|
continue |
|
|
|
|
|
|
|
|
current_count = increment_global_pdf_count() |
|
|
|
|
|
downloaded_pdfs.append({ |
|
|
"url": file_url, |
|
|
"local_path": local_file_path, |
|
|
"size": download_result["size"], |
|
|
"title": file_name, |
|
|
"source": source, |
|
|
"extracted_text": extracted_text, |
|
|
"file_type": file_type, |
|
|
"date": file_date |
|
|
}) |
|
|
logger.info(f"β
Successfully downloaded and processed {file_type} '{file_name}' (Global: {current_count}/{MAX_PDF_LIMIT})") |
|
|
else: |
|
|
logger.warning(f"β Failed to download {file_type} {i+1}: {download_result['message']}") |
|
|
except Exception as e: |
|
|
logger.error(f"β Error downloading {file_type} {i+1}: {str(e)}") |
|
|
continue |
|
|
|
|
|
return downloaded_pdfs |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting PDFs from current page: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def extract_document_content_unified(page, document_url: str, config: dict, website_type: str = None, pdf_count: int = 0, start_date: str = None, end_date: str = None) -> dict: |
|
|
""" |
|
|
Unified function to extract content from a single document (PDF-focused) |
|
|
With 5 retry attempts for loading documents |
|
|
""" |
|
|
try: |
|
|
|
|
|
max_retries = 5 |
|
|
retry_count = 0 |
|
|
page_loaded = False |
|
|
|
|
|
while retry_count < max_retries and not page_loaded: |
|
|
try: |
|
|
retry_count += 1 |
|
|
logger.info(f"π Loading document (attempt {retry_count}/{max_retries}): {document_url}") |
|
|
|
|
|
|
|
|
if retry_count == 1: |
|
|
|
|
|
await page.goto(document_url, wait_until="domcontentloaded", timeout=30000) |
|
|
elif retry_count == 2: |
|
|
|
|
|
await page.goto(document_url, timeout=20000) |
|
|
elif retry_count == 3: |
|
|
|
|
|
await page.goto(document_url, wait_until="networkidle", timeout=15000) |
|
|
else: |
|
|
|
|
|
await page.goto(document_url, timeout=10000) |
|
|
|
|
|
logger.info(f"β
Successfully loaded document on attempt {retry_count}") |
|
|
page_loaded = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Attempt {retry_count} failed for {document_url}: {str(e)}") |
|
|
|
|
|
if retry_count >= max_retries: |
|
|
logger.error(f"β Failed to load document after {max_retries} attempts: {document_url}") |
|
|
return { |
|
|
"title": "Network Error", |
|
|
"content": f"Failed to access document after {max_retries} attempts: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": document_url |
|
|
} |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
if not page_loaded: |
|
|
return { |
|
|
"title": "Network Error", |
|
|
"content": f"Failed to access document after {max_retries} attempts", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": document_url |
|
|
} |
|
|
|
|
|
|
|
|
title = "" |
|
|
title_extracted_from_page = False |
|
|
|
|
|
|
|
|
if website_type == "mopnd" and document_url in mopnd_article_titles: |
|
|
title = mopnd_article_titles[document_url] |
|
|
title_extracted_from_page = True |
|
|
logger.debug(f"β
Using MOPND title from main page: {title}") |
|
|
elif website_type == "mopnd": |
|
|
logger.warning(f"β οΈ MOPND title not found in cache for URL: {document_url}") |
|
|
logger.debug(f"π Available titles: {list(mopnd_article_titles.keys())[:3]}") |
|
|
else: |
|
|
|
|
|
title_selector = config.get("title") |
|
|
if title_selector: |
|
|
try: |
|
|
title_element = await page.query_selector(title_selector) |
|
|
if title_element: |
|
|
title = await title_element.text_content() |
|
|
if title: |
|
|
title = title.strip() |
|
|
title_extracted_from_page = True |
|
|
logger.info(f"β
Extracted title from page using selector '{title_selector}': {title}") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element found but no text content with selector: {title_selector}") |
|
|
else: |
|
|
logger.debug(f"β οΈ Title element not found with selector: {title_selector}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting title with selector '{title_selector}': {str(e)}") |
|
|
else: |
|
|
logger.warning("β οΈ No title selector found in config") |
|
|
|
|
|
|
|
|
if website_type is None: |
|
|
for site_type, site_config in WEBSITE_CONFIG.items(): |
|
|
if site_config == config: |
|
|
website_type = site_type |
|
|
break |
|
|
if website_type is None: |
|
|
website_type = "unknown" |
|
|
|
|
|
content = "" |
|
|
pdf_path = "" |
|
|
|
|
|
|
|
|
|
|
|
pdf_websites = get_pdf_websites() |
|
|
if website_type in pdf_websites: |
|
|
pdf_links = [] |
|
|
try: |
|
|
|
|
|
pdf_links_selector = config.get("pdf_links") |
|
|
|
|
|
|
|
|
pdf_elements = [] |
|
|
|
|
|
|
|
|
if isinstance(pdf_links_selector, list): |
|
|
|
|
|
logger.info(f"π Processing array of {len(pdf_links_selector)} PDF selectors") |
|
|
for selector in pdf_links_selector: |
|
|
try: |
|
|
elements = await page.query_selector_all(selector) |
|
|
logger.info(f"π Found {len(elements)} elements with selector {selector}") |
|
|
pdf_elements.extend(elements) |
|
|
except Exception as e: |
|
|
logger.warning(f"β Error with selector '{selector}': {str(e)}") |
|
|
elif isinstance(pdf_links_selector, str): |
|
|
|
|
|
logger.info(f"π Using string selector: {pdf_links_selector}") |
|
|
pdf_elements = await page.query_selector_all(pdf_links_selector) |
|
|
else: |
|
|
logger.warning("β οΈ No pdf_links selector in config, skipping PDF extraction") |
|
|
|
|
|
|
|
|
logger.debug(f"π Processing {len(pdf_elements)} PDF elements for {website_type}") |
|
|
for i, element in enumerate(pdf_elements): |
|
|
try: |
|
|
logger.debug(f"π Extracting PDF URL from element {i+1}/{len(pdf_elements)}") |
|
|
|
|
|
|
|
|
href = await element.get_attribute("href") |
|
|
if not href: |
|
|
href = await element.get_attribute("button-url") |
|
|
if href: |
|
|
|
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
|
|
|
|
|
|
try: |
|
|
link_text = await element.text_content() |
|
|
pdf_name = link_text.strip() if link_text else "" |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract link text: {str(e)}") |
|
|
pdf_name = "" |
|
|
|
|
|
|
|
|
if not pdf_name: |
|
|
from urllib.parse import unquote |
|
|
url_path = urlparse(absolute_url).path |
|
|
if url_path: |
|
|
pdf_name = unquote(os.path.basename(url_path)) |
|
|
|
|
|
if pdf_name.lower().endswith('.pdf'): |
|
|
pdf_name = pdf_name[:-4] |
|
|
|
|
|
pdf_links.append({ |
|
|
"url": absolute_url, |
|
|
"name": pdf_name |
|
|
}) |
|
|
logger.info(f"π Found PDF URL: {absolute_url}") |
|
|
if pdf_name: |
|
|
logger.info(f"π PDF name: {pdf_name}") |
|
|
else: |
|
|
logger.debug(f"β οΈ No href or button-url attribute found on element {i+1}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β Error extracting PDF URL from element {i+1}: {str(e)}") |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting PDF links: {str(e)}") |
|
|
pdf_links = [] |
|
|
|
|
|
if pdf_links: |
|
|
logger.info(f"π Found {len(pdf_links)} PDF links, processing...") |
|
|
|
|
|
pdf_content_parts = [] |
|
|
for i, pdf_info in enumerate(pdf_links): |
|
|
if pdf_count >= MAX_PDF_LIMIT and MAX_PDF_LIMIT is not None: |
|
|
logger.info(f"π Reached PDF limit ({MAX_PDF_LIMIT}), stopping PDF processing") |
|
|
break |
|
|
|
|
|
|
|
|
if isinstance(pdf_info, dict): |
|
|
pdf_url = pdf_info["url"] |
|
|
pdf_name = pdf_info.get("name", "") |
|
|
else: |
|
|
|
|
|
pdf_url = pdf_info |
|
|
pdf_name = "" |
|
|
|
|
|
try: |
|
|
logger.info(f"π Processing PDF {i+1}/{len(pdf_links)}: {pdf_url}") |
|
|
if pdf_name: |
|
|
logger.info(f"π PDF name: {pdf_name}") |
|
|
|
|
|
|
|
|
download_result = download_and_save_pdf(pdf_url, website_type) |
|
|
if download_result["success"]: |
|
|
|
|
|
pdf_path = download_result["path"] |
|
|
logger.info(f"π PDF downloaded to: {pdf_path}") |
|
|
|
|
|
|
|
|
pdf_content = extract_text_from_pdf_file(pdf_path) |
|
|
|
|
|
if pdf_content and len(pdf_content.strip()) > 10: |
|
|
|
|
|
pdf_label = pdf_name if pdf_name else f"PDF {i+1}" |
|
|
pdf_content_parts.append(f"{pdf_label} Content:\n{pdf_content}") |
|
|
logger.info(f"β
Extracted {len(pdf_content)} characters from {pdf_label}") |
|
|
|
|
|
|
|
|
|
|
|
if pdf_name and not title_extracted_from_page and not title: |
|
|
title = pdf_name |
|
|
logger.info(f"π Using PDF name as title (page title extraction failed): {title}") |
|
|
else: |
|
|
logger.warning(f"β οΈ No content extracted from PDF {i+1}") |
|
|
else: |
|
|
logger.warning(f"β Failed to download PDF {i+1}: {download_result['message']}") |
|
|
|
|
|
pdf_count += 1 |
|
|
logger.info(f"π PDF {pdf_count}/{MAX_PDF_LIMIT} processed") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β Error processing PDF {i+1}: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
if pdf_content_parts: |
|
|
content = "\n\n".join(pdf_content_parts) |
|
|
logger.info(f"π Combined PDF content: {len(content)} characters total") |
|
|
|
|
|
|
|
|
|
|
|
if not title_extracted_from_page and not title and content and len(content) > 50: |
|
|
lines = content.split('\n')[:5] |
|
|
for line in lines: |
|
|
if line.strip() and len(line.strip()) > 10 and len(line.strip()) < 100: |
|
|
title = line.strip() |
|
|
logger.info(f"π Using title extracted from PDF content (page title extraction failed): {title}") |
|
|
break |
|
|
else: |
|
|
logger.warning("β οΈ No PDF content extracted, skipping document") |
|
|
content = "" |
|
|
else: |
|
|
|
|
|
logger.info("π No PDF links found, skipping document") |
|
|
content = "" |
|
|
|
|
|
|
|
|
date_raw = "" |
|
|
|
|
|
|
|
|
if website_type == "mopnd" and document_url in mopnd_article_dates: |
|
|
date_raw = mopnd_article_dates[document_url] |
|
|
logger.debug(f"β
Using MOPND date from main page: {date_raw}") |
|
|
elif website_type == "mopnd": |
|
|
logger.warning(f"β οΈ MOPND date not found in cache for URL: {document_url}") |
|
|
logger.debug(f"π Available dates: {list(mopnd_article_dates.keys())[:3]}") |
|
|
else: |
|
|
|
|
|
date_selector = config.get("date") |
|
|
|
|
|
if date_selector: |
|
|
try: |
|
|
date_element = await page.query_selector(date_selector) |
|
|
if date_element: |
|
|
date_raw = await date_element.text_content() |
|
|
if date_raw: |
|
|
date_raw = date_raw.strip() |
|
|
logger.debug(f"β
Extracted raw date: {date_raw}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting date with selector {date_selector}: {str(e)}") |
|
|
|
|
|
|
|
|
date = standardize_date(date_raw, default_to_current=True) |
|
|
if not date: |
|
|
date = datetime.now().strftime("%Y-%m-%d") |
|
|
logger.info(f"No date found with config selector, using current date: {date}") |
|
|
|
|
|
|
|
|
if start_date or end_date: |
|
|
start_dt = parse_date_input(start_date) if start_date else None |
|
|
end_dt = parse_date_input(end_date) if end_date else None |
|
|
if not is_date_in_range(date, start_dt, end_dt, include_missing=False): |
|
|
logger.info(f"π
Document date {date} is outside date range [{start_date}, {end_date}] - filtering out") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
pdf_websites = get_pdf_websites() |
|
|
if website_type in pdf_websites: |
|
|
if not content or len(content.strip()) < 10: |
|
|
logger.info(f"π Skipping document with no PDF content: {document_url}") |
|
|
return None |
|
|
|
|
|
result = { |
|
|
"title": title or "No title found", |
|
|
"content": content or "No content found", |
|
|
"date": date, |
|
|
"url": document_url |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pdf_websites = get_pdf_websites() |
|
|
if website_type in pdf_websites: |
|
|
if pdf_path: |
|
|
result["pdf_path"] = pdf_path |
|
|
logger.info(f"π Added PDF path to result: {pdf_path}") |
|
|
else: |
|
|
logger.warning("β οΈ No PDF path available for PDF-based site") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting content from {document_url}: {str(e)}") |
|
|
return { |
|
|
"title": "Error", |
|
|
"content": f"Error extracting content: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": document_url |
|
|
} |
|
|
|