Raagsan / scraper_common.py
iamismail's picture
Initial clean commit for Raagsan Space
439e1dd
"""
Common scraper functions - shared utilities for document and text scraping
"""
import asyncio
import logging
import os
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Any
from urllib.parse import urljoin, urlparse
from playwright.async_api import async_playwright
# --- Minimal Playwright hardening for headless containers (ADDED) ---
os.environ.setdefault("PLAYWRIGHT_BROWSERS_PATH", "/root/.cache/ms-playwright")
PLAYWRIGHT_LAUNCH_KW = dict(
headless=True, # critical in HF Spaces/containers (no X server)
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--no-zygote",
"--single-process",
"--disable-extensions",
"--disable-background-networking",
],
)
# --------------------------------------------------------------------
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
logger = logging.getLogger(__name__)
# Global timeout tracking for problematic URLs
TIMEOUT_URLS = set()
# Global flag for document-only scraping mode (text tab should ignore documents)
DOCUMENT_ONLY_MODE = False
# Global cancellation flag
_scraping_cancelled = False
# Global browser instance for cancellation
current_browser = None
current_page = None
# Global captcha status for UI updates
_captcha_status = None
# Global constants for limiting scraping scope
# Set these to None to disable limits, or to a number to limit
MAX_PDF_LIMIT = 50 # Global limit to only process/download PDFs across all pages
MAX_ARTICLE_LIMIT = 50 # Limit to only process 3 articles
MAX_PAGE_LIMIT = 50 # Limit to only scrape 3 pages
# Global PDF counter to track PDFs across all pages
global_pdf_count = 0
def reset_global_pdf_count():
"""Reset the global PDF counter"""
global global_pdf_count
global_pdf_count = 0
def increment_global_pdf_count():
"""Increment the global PDF counter and return the new count"""
global global_pdf_count
global_pdf_count += 1
return global_pdf_count
def get_global_pdf_count():
"""Get the current global PDF count"""
return global_pdf_count
def is_pdf_limit_reached():
"""Check if the global PDF limit has been reached"""
if MAX_PDF_LIMIT is None:
return False
return global_pdf_count >= MAX_PDF_LIMIT
# Archive management
ARCHIVE_DIR = "archive"
ARCHIVE_INDEX = os.path.join(ARCHIVE_DIR, "archive_index.json")
# Load website configuration
def load_website_config():
"""Load website configuration from JSON file"""
try:
with open('website_config.json', 'r') as f:
config = json.load(f)
logger.info("βœ… Website configuration loaded successfully")
return config
except Exception as e:
logger.error(f"❌ Error loading website configuration: {str(e)}")
return {}
# Load the website configuration
WEBSITE_CONFIG = load_website_config()
def get_pdf_websites() -> List[str]:
"""
Dynamically get list of PDF websites from website_config.json
A website is considered a PDF website if it has 'pdf_links', 'file_links', or 'extract_table_as_csv' in its config
"""
pdf_websites = []
for website_type, config in WEBSITE_CONFIG.items():
if config and isinstance(config, dict):
# Check if config has pdf_links, file_links, or extract_table_as_csv
if config.get("pdf_links") or config.get("file_links") or config.get("extract_table_as_csv"):
pdf_websites.append(website_type)
return pdf_websites
def get_content_websites() -> List[str]:
"""
Dynamically get list of content (text) websites from website_config.json
A website is considered a content website if it does NOT have 'pdf_links' or 'file_links'
"""
content_websites = []
for website_type, config in WEBSITE_CONFIG.items():
if config and isinstance(config, dict):
if not config.get("pdf_links") and not config.get("file_links"):
content_websites.append(website_type)
return content_websites
# Debug: Print configured website types when module loads
_debug_pdf_websites = get_pdf_websites()
_debug_content_websites = get_content_websites()
logger.debug(f"πŸ“„ PDF Websites configured ({len(_debug_pdf_websites)}): {sorted(_debug_pdf_websites)}")
logger.debug(f"πŸ“° Content Websites configured ({len(_debug_content_websites)}): {sorted(_debug_content_websites)}")
def validate_website_config(config: dict) -> tuple[bool, str]:
"""
Validate website configuration structure
Args:
config: Configuration dictionary to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
if not isinstance(config, dict):
return False, "Configuration must be a dictionary"
for website_type, website_config in config.items():
if not isinstance(website_type, str):
return False, f"Website type must be a string, got {type(website_type)}"
# Validate website type name (no spaces, valid identifier)
if ' ' in website_type or not website_type:
return False, f"Website type '{website_type}' must be a valid identifier (no spaces)"
if not isinstance(website_config, dict):
return False, f"Configuration for '{website_type}' must be a dictionary"
# Check required fields: title and content (at least one must be present)
if 'title' not in website_config and 'content' not in website_config:
return False, f"Website '{website_type}' must have at least 'title' or 'content' field"
# Validate field types
string_fields = ['article_links', 'page_links', 'title', 'content', 'date',
'navigation_selector', 'navigation_url_addition', 'recaptcha_text']
for field in string_fields:
if field in website_config:
value = website_config[field]
# Allow string, None, or list (for content field)
if value is not None and not isinstance(value, (str, list)):
return False, f"Field '{field}' in '{website_type}' must be string, list, or null"
# Validate start_page (must be integer >= 0)
if 'start_page' in website_config:
start_page = website_config['start_page']
if start_page is not None:
try:
start_page_int = int(start_page)
if start_page_int < 0:
return False, f"'start_page' in '{website_type}' must be >= 0"
except (ValueError, TypeError):
return False, f"'start_page' in '{website_type}' must be an integer"
# Validate array fields
array_fields = ['pdf_links', 'file_links']
for field in array_fields:
if field in website_config:
value = website_config[field]
if value is not None:
if isinstance(value, str):
# Allow string, will be converted to array
pass
elif not isinstance(value, list):
return False, f"Field '{field}' in '{website_type}' must be a list or null"
return True, "Configuration is valid"
except Exception as e:
return False, f"Validation error: {str(e)}"
def save_website_config(config_data: dict) -> tuple[bool, str]:
"""
Save validated website configuration to file
Args:
config_data: Configuration dictionary to save
Returns:
Tuple of (success, message)
"""
global WEBSITE_CONFIG
try:
# Validate the structure first
is_valid, error_message = validate_website_config(config_data)
if not is_valid:
return False, f"Invalid configuration: {error_message}"
# Save to file
with open('website_config.json', 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=4, ensure_ascii=False)
# Reload the global config
WEBSITE_CONFIG = load_website_config()
logger.info("βœ… Website configuration saved successfully")
return True, "Website configuration saved successfully"
except Exception as e:
error_msg = f"Error saving website config: {str(e)}"
logger.error(f"❌ {error_msg}")
return False, error_msg
def set_document_only_mode(value: bool):
"""Set the global document-only mode flag."""
global DOCUMENT_ONLY_MODE
DOCUMENT_ONLY_MODE = value
def is_document_mode_enabled() -> bool:
"""Check if document-only mode is enabled."""
return DOCUMENT_ONLY_MODE
def set_scraping_cancelled(value: bool):
"""Set the global cancellation flag"""
global _scraping_cancelled
_scraping_cancelled = value
def scraping_cancelled() -> bool:
"""Check if scraping has been cancelled"""
return _scraping_cancelled
def get_captcha_status():
"""Get the current captcha status message"""
global _captcha_status
return _captcha_status
def set_captcha_status(status: str):
"""Set the captcha status message"""
global _captcha_status
_captcha_status = status
def clear_captcha_status():
"""Clear the captcha status"""
global _captcha_status
_captcha_status = None
async def force_close_browser():
"""Force close browser and page instances"""
global current_browser, current_page
try:
if current_page:
await current_page.close()
current_page = None
if current_browser:
await current_browser.close()
current_browser = None
except Exception as e:
logger.error(f"Error closing browser: {str(e)}")
def convert_to_absolute_url(href: str, base_url: str) -> str:
"""
Convert relative URL to absolute URL
"""
if href.startswith(('http://', 'https://')):
return href
return urljoin(base_url, href)
def ensure_archive_directory():
"""Ensure archive directory exists"""
if not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)
logger.info(f"πŸ“ Created archive directory: {ARCHIVE_DIR}")
async def scrape_news_async(url: str, website_type: str, custom_keywords: str = "", start_date: str = None, end_date: str = None, force_mode: str = None) -> List[dict]:
"""
Main entry point for scraping - delegates to appropriate scraper
Args:
url: URL to scrape
website_type: Website type identifier
custom_keywords: Custom keywords for filtering
start_date: Optional start date for filtering
end_date: Optional end date for filtering
force_mode: Force scraper mode - "text" for text scraper, "document" for document scraper, None for auto-detect
"""
try:
logger.info(f"πŸš€ Starting scraping for {website_type} at {url}")
# Determine which scraper to use
use_document_scraper = False
if force_mode == "text":
# Force text scraper
use_document_scraper = False
logger.info(f"πŸ“° Forcing text scraper mode for {website_type}")
elif force_mode == "document":
# Force document scraper
use_document_scraper = True
logger.info(f"πŸ“„ Forcing document scraper mode for {website_type}")
else:
# Auto-detect based on config (backward compatible)
pdf_websites = get_pdf_websites()
use_document_scraper = website_type in pdf_websites
if use_document_scraper:
logger.info(f"πŸ“„ Auto-detected: Using document scraper for {website_type}")
else:
logger.info(f"πŸ“° Auto-detected: Using text scraper for {website_type}")
# Import the appropriate scraper
if use_document_scraper:
# Document-focused sites
from document_scraper import extract_document_content_unified, download_all_pdfs_from_page
else:
# Text-focused sites
from text_scraper import extract_article_content_unified, get_all_article_links_unified, extract_all_articles_unified
# Get website configuration
config = WEBSITE_CONFIG.get(website_type)
if not config:
logger.error(f"❌ No configuration found for website type: {website_type}")
return [{
"title": "Configuration Error",
"content": f"No configuration found for website type: {website_type}",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Initialize browser
async with async_playwright() as p:
# CHANGED: use hardened, headless launch to avoid X server errors
browser = await p.chromium.launch(**PLAYWRIGHT_LAUNCH_KW)
page = await browser.new_page()
# Block ads, CSS, and images for better performance
await page.route("**/*", lambda route: (
route.abort() if any(blocked in route.request.url.lower() for blocked in [
# Ad domains
"googleads", "doubleclick", "googlesyndication", "google-analytics",
"facebook.com/tr", "googletagmanager", "amazon-adsystem", "adsystem",
"googletagservices", "ads.yahoo.com", "googletagservices",
# CSS files
".css", "stylesheet", "font-awesome", "bootstrap.css",
# Images
".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".ico",
"image/", "img/", "images/", "photos/", "pictures/",
# Fonts
".woff", ".woff2", ".ttf", ".eot", "fonts/", "font/",
# Videos and media
".mp4", ".avi", ".mov", ".wmv", ".flv", "video/", "media/",
# Analytics and tracking
"analytics", "tracking", "metrics", "stats", "telemetry"
]) else route.continue_()
))
# Store browser instance for cancellation
global current_browser, current_page
current_browser = browser
current_page = page
try:
# Navigate to the main page with retry logic (5 attempts)
max_retries = 5
retry_count = 0
page_loaded = False
while retry_count < max_retries and not page_loaded:
try:
retry_count += 1
logger.info(f"πŸ”„ Loading website (attempt {retry_count}/{max_retries}): {url}")
# Navigate with different strategies based on attempt
if retry_count == 1:
# First attempt: Use domcontentloaded for faster loading
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
elif retry_count == 2:
# Second attempt: Use basic loading
await page.goto(url, timeout=20000)
elif retry_count == 3:
# Third attempt: Use networkidle
await page.goto(url, wait_until="networkidle", timeout=15000)
else:
# Fourth and fifth attempts: Try with shorter timeouts
await page.goto(url, timeout=10000)
logger.info(f"βœ… Successfully loaded website on attempt {retry_count}")
page_loaded = True
except Exception as e:
logger.warning(f"⚠️ Attempt {retry_count} failed for {url}: {str(e)}")
if retry_count >= max_retries:
logger.error(f"❌ Failed to load website after {max_retries} attempts: {url}")
return [{
"title": "WEBSITE_LOAD_ERROR",
"content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts: {str(e)}",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Wait before retry
await asyncio.sleep(2)
if not page_loaded:
return [{
"title": "WEBSITE_LOAD_ERROR",
"content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Check for captcha on initial page load
if use_document_scraper:
from document_scraper import check_and_wait_for_recaptcha
captcha_result = await check_and_wait_for_recaptcha(page, config)
if captcha_result == "CAPTCHA_TIMEOUT":
logger.error("❌ Captcha detected but not solved within timeout period")
return [{
"title": "CAPTCHA_ERROR",
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Delegate to appropriate scraper based on determined mode
if use_document_scraper:
# Document processing
all_articles = await download_all_pdfs_from_page(page, url, config, website_type, start_date, end_date)
else:
# Text processing
all_article_links = await get_all_article_links_unified(page, url, config, website_type)
if not all_article_links:
return [{
"title": "No articles found",
"content": "No articles were found on the specified page",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Extract content from all articles
all_articles = await extract_all_articles_unified(page, all_article_links, config, website_type, custom_keywords, start_date, end_date)
return all_articles
finally:
# Clean up browser
await browser.close()
current_browser = None
current_page = None
except Exception as e:
logger.error(f"❌ Error in main scraping function: {str(e)}")
return [{
"title": "Scraping Error",
"content": f"Error during scraping: {str(e)}",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]