|
|
""" |
|
|
Common scraper functions - shared utilities for document and text scraping |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
import os |
|
|
import json |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any |
|
|
from urllib.parse import urljoin, urlparse |
|
|
from playwright.async_api import async_playwright |
|
|
|
|
|
|
|
|
os.environ.setdefault("PLAYWRIGHT_BROWSERS_PATH", "/root/.cache/ms-playwright") |
|
|
|
|
|
PLAYWRIGHT_LAUNCH_KW = dict( |
|
|
headless=True, |
|
|
args=[ |
|
|
"--no-sandbox", |
|
|
"--disable-setuid-sandbox", |
|
|
"--disable-dev-shm-usage", |
|
|
"--disable-gpu", |
|
|
"--no-zygote", |
|
|
"--single-process", |
|
|
"--disable-extensions", |
|
|
"--disable-background-networking", |
|
|
], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
TIMEOUT_URLS = set() |
|
|
|
|
|
|
|
|
DOCUMENT_ONLY_MODE = False |
|
|
|
|
|
|
|
|
_scraping_cancelled = False |
|
|
|
|
|
|
|
|
current_browser = None |
|
|
current_page = None |
|
|
|
|
|
|
|
|
_captcha_status = None |
|
|
|
|
|
|
|
|
|
|
|
MAX_PDF_LIMIT = 50 |
|
|
MAX_ARTICLE_LIMIT = 50 |
|
|
MAX_PAGE_LIMIT = 50 |
|
|
|
|
|
|
|
|
global_pdf_count = 0 |
|
|
|
|
|
def reset_global_pdf_count(): |
|
|
"""Reset the global PDF counter""" |
|
|
global global_pdf_count |
|
|
global_pdf_count = 0 |
|
|
|
|
|
def increment_global_pdf_count(): |
|
|
"""Increment the global PDF counter and return the new count""" |
|
|
global global_pdf_count |
|
|
global_pdf_count += 1 |
|
|
return global_pdf_count |
|
|
|
|
|
def get_global_pdf_count(): |
|
|
"""Get the current global PDF count""" |
|
|
return global_pdf_count |
|
|
|
|
|
def is_pdf_limit_reached(): |
|
|
"""Check if the global PDF limit has been reached""" |
|
|
if MAX_PDF_LIMIT is None: |
|
|
return False |
|
|
return global_pdf_count >= MAX_PDF_LIMIT |
|
|
|
|
|
|
|
|
ARCHIVE_DIR = "archive" |
|
|
ARCHIVE_INDEX = os.path.join(ARCHIVE_DIR, "archive_index.json") |
|
|
|
|
|
|
|
|
def load_website_config(): |
|
|
"""Load website configuration from JSON file""" |
|
|
try: |
|
|
with open('website_config.json', 'r') as f: |
|
|
config = json.load(f) |
|
|
logger.info("β
Website configuration loaded successfully") |
|
|
return config |
|
|
except Exception as e: |
|
|
logger.error(f"β Error loading website configuration: {str(e)}") |
|
|
return {} |
|
|
|
|
|
|
|
|
WEBSITE_CONFIG = load_website_config() |
|
|
|
|
|
def get_pdf_websites() -> List[str]: |
|
|
""" |
|
|
Dynamically get list of PDF websites from website_config.json |
|
|
A website is considered a PDF website if it has 'pdf_links', 'file_links', or 'extract_table_as_csv' in its config |
|
|
""" |
|
|
pdf_websites = [] |
|
|
for website_type, config in WEBSITE_CONFIG.items(): |
|
|
if config and isinstance(config, dict): |
|
|
|
|
|
if config.get("pdf_links") or config.get("file_links") or config.get("extract_table_as_csv"): |
|
|
pdf_websites.append(website_type) |
|
|
return pdf_websites |
|
|
|
|
|
def get_content_websites() -> List[str]: |
|
|
""" |
|
|
Dynamically get list of content (text) websites from website_config.json |
|
|
A website is considered a content website if it does NOT have 'pdf_links' or 'file_links' |
|
|
""" |
|
|
content_websites = [] |
|
|
for website_type, config in WEBSITE_CONFIG.items(): |
|
|
if config and isinstance(config, dict): |
|
|
if not config.get("pdf_links") and not config.get("file_links"): |
|
|
content_websites.append(website_type) |
|
|
return content_websites |
|
|
|
|
|
|
|
|
_debug_pdf_websites = get_pdf_websites() |
|
|
_debug_content_websites = get_content_websites() |
|
|
logger.debug(f"π PDF Websites configured ({len(_debug_pdf_websites)}): {sorted(_debug_pdf_websites)}") |
|
|
logger.debug(f"π° Content Websites configured ({len(_debug_content_websites)}): {sorted(_debug_content_websites)}") |
|
|
|
|
|
def validate_website_config(config: dict) -> tuple[bool, str]: |
|
|
""" |
|
|
Validate website configuration structure |
|
|
|
|
|
Args: |
|
|
config: Configuration dictionary to validate |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_valid, error_message) |
|
|
""" |
|
|
try: |
|
|
if not isinstance(config, dict): |
|
|
return False, "Configuration must be a dictionary" |
|
|
|
|
|
for website_type, website_config in config.items(): |
|
|
if not isinstance(website_type, str): |
|
|
return False, f"Website type must be a string, got {type(website_type)}" |
|
|
|
|
|
|
|
|
if ' ' in website_type or not website_type: |
|
|
return False, f"Website type '{website_type}' must be a valid identifier (no spaces)" |
|
|
|
|
|
if not isinstance(website_config, dict): |
|
|
return False, f"Configuration for '{website_type}' must be a dictionary" |
|
|
|
|
|
|
|
|
if 'title' not in website_config and 'content' not in website_config: |
|
|
return False, f"Website '{website_type}' must have at least 'title' or 'content' field" |
|
|
|
|
|
|
|
|
string_fields = ['article_links', 'page_links', 'title', 'content', 'date', |
|
|
'navigation_selector', 'navigation_url_addition', 'recaptcha_text'] |
|
|
for field in string_fields: |
|
|
if field in website_config: |
|
|
value = website_config[field] |
|
|
|
|
|
if value is not None and not isinstance(value, (str, list)): |
|
|
return False, f"Field '{field}' in '{website_type}' must be string, list, or null" |
|
|
|
|
|
|
|
|
if 'start_page' in website_config: |
|
|
start_page = website_config['start_page'] |
|
|
if start_page is not None: |
|
|
try: |
|
|
start_page_int = int(start_page) |
|
|
if start_page_int < 0: |
|
|
return False, f"'start_page' in '{website_type}' must be >= 0" |
|
|
except (ValueError, TypeError): |
|
|
return False, f"'start_page' in '{website_type}' must be an integer" |
|
|
|
|
|
|
|
|
array_fields = ['pdf_links', 'file_links'] |
|
|
for field in array_fields: |
|
|
if field in website_config: |
|
|
value = website_config[field] |
|
|
if value is not None: |
|
|
if isinstance(value, str): |
|
|
|
|
|
pass |
|
|
elif not isinstance(value, list): |
|
|
return False, f"Field '{field}' in '{website_type}' must be a list or null" |
|
|
|
|
|
return True, "Configuration is valid" |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Validation error: {str(e)}" |
|
|
|
|
|
def save_website_config(config_data: dict) -> tuple[bool, str]: |
|
|
""" |
|
|
Save validated website configuration to file |
|
|
|
|
|
Args: |
|
|
config_data: Configuration dictionary to save |
|
|
|
|
|
Returns: |
|
|
Tuple of (success, message) |
|
|
""" |
|
|
global WEBSITE_CONFIG |
|
|
|
|
|
try: |
|
|
|
|
|
is_valid, error_message = validate_website_config(config_data) |
|
|
if not is_valid: |
|
|
return False, f"Invalid configuration: {error_message}" |
|
|
|
|
|
|
|
|
with open('website_config.json', 'w', encoding='utf-8') as f: |
|
|
json.dump(config_data, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
|
|
|
WEBSITE_CONFIG = load_website_config() |
|
|
|
|
|
logger.info("β
Website configuration saved successfully") |
|
|
return True, "Website configuration saved successfully" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error saving website config: {str(e)}" |
|
|
logger.error(f"β {error_msg}") |
|
|
return False, error_msg |
|
|
|
|
|
def set_document_only_mode(value: bool): |
|
|
"""Set the global document-only mode flag.""" |
|
|
global DOCUMENT_ONLY_MODE |
|
|
DOCUMENT_ONLY_MODE = value |
|
|
|
|
|
def is_document_mode_enabled() -> bool: |
|
|
"""Check if document-only mode is enabled.""" |
|
|
return DOCUMENT_ONLY_MODE |
|
|
|
|
|
def set_scraping_cancelled(value: bool): |
|
|
"""Set the global cancellation flag""" |
|
|
global _scraping_cancelled |
|
|
_scraping_cancelled = value |
|
|
|
|
|
def scraping_cancelled() -> bool: |
|
|
"""Check if scraping has been cancelled""" |
|
|
return _scraping_cancelled |
|
|
|
|
|
def get_captcha_status(): |
|
|
"""Get the current captcha status message""" |
|
|
global _captcha_status |
|
|
return _captcha_status |
|
|
|
|
|
def set_captcha_status(status: str): |
|
|
"""Set the captcha status message""" |
|
|
global _captcha_status |
|
|
_captcha_status = status |
|
|
|
|
|
def clear_captcha_status(): |
|
|
"""Clear the captcha status""" |
|
|
global _captcha_status |
|
|
_captcha_status = None |
|
|
|
|
|
async def force_close_browser(): |
|
|
"""Force close browser and page instances""" |
|
|
global current_browser, current_page |
|
|
try: |
|
|
if current_page: |
|
|
await current_page.close() |
|
|
current_page = None |
|
|
if current_browser: |
|
|
await current_browser.close() |
|
|
current_browser = None |
|
|
except Exception as e: |
|
|
logger.error(f"Error closing browser: {str(e)}") |
|
|
|
|
|
def convert_to_absolute_url(href: str, base_url: str) -> str: |
|
|
""" |
|
|
Convert relative URL to absolute URL |
|
|
""" |
|
|
if href.startswith(('http://', 'https://')): |
|
|
return href |
|
|
return urljoin(base_url, href) |
|
|
|
|
|
def ensure_archive_directory(): |
|
|
"""Ensure archive directory exists""" |
|
|
if not os.path.exists(ARCHIVE_DIR): |
|
|
os.makedirs(ARCHIVE_DIR) |
|
|
logger.info(f"π Created archive directory: {ARCHIVE_DIR}") |
|
|
|
|
|
async def scrape_news_async(url: str, website_type: str, custom_keywords: str = "", start_date: str = None, end_date: str = None, force_mode: str = None) -> List[dict]: |
|
|
""" |
|
|
Main entry point for scraping - delegates to appropriate scraper |
|
|
|
|
|
Args: |
|
|
url: URL to scrape |
|
|
website_type: Website type identifier |
|
|
custom_keywords: Custom keywords for filtering |
|
|
start_date: Optional start date for filtering |
|
|
end_date: Optional end date for filtering |
|
|
force_mode: Force scraper mode - "text" for text scraper, "document" for document scraper, None for auto-detect |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Starting scraping for {website_type} at {url}") |
|
|
|
|
|
|
|
|
use_document_scraper = False |
|
|
|
|
|
if force_mode == "text": |
|
|
|
|
|
use_document_scraper = False |
|
|
logger.info(f"π° Forcing text scraper mode for {website_type}") |
|
|
elif force_mode == "document": |
|
|
|
|
|
use_document_scraper = True |
|
|
logger.info(f"π Forcing document scraper mode for {website_type}") |
|
|
else: |
|
|
|
|
|
pdf_websites = get_pdf_websites() |
|
|
use_document_scraper = website_type in pdf_websites |
|
|
if use_document_scraper: |
|
|
logger.info(f"π Auto-detected: Using document scraper for {website_type}") |
|
|
else: |
|
|
logger.info(f"π° Auto-detected: Using text scraper for {website_type}") |
|
|
|
|
|
|
|
|
if use_document_scraper: |
|
|
|
|
|
from document_scraper import extract_document_content_unified, download_all_pdfs_from_page |
|
|
else: |
|
|
|
|
|
from text_scraper import extract_article_content_unified, get_all_article_links_unified, extract_all_articles_unified |
|
|
|
|
|
|
|
|
config = WEBSITE_CONFIG.get(website_type) |
|
|
if not config: |
|
|
logger.error(f"β No configuration found for website type: {website_type}") |
|
|
return [{ |
|
|
"title": "Configuration Error", |
|
|
"content": f"No configuration found for website type: {website_type}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
async with async_playwright() as p: |
|
|
|
|
|
browser = await p.chromium.launch(**PLAYWRIGHT_LAUNCH_KW) |
|
|
page = await browser.new_page() |
|
|
|
|
|
|
|
|
await page.route("**/*", lambda route: ( |
|
|
route.abort() if any(blocked in route.request.url.lower() for blocked in [ |
|
|
|
|
|
"googleads", "doubleclick", "googlesyndication", "google-analytics", |
|
|
"facebook.com/tr", "googletagmanager", "amazon-adsystem", "adsystem", |
|
|
"googletagservices", "ads.yahoo.com", "googletagservices", |
|
|
|
|
|
".css", "stylesheet", "font-awesome", "bootstrap.css", |
|
|
|
|
|
".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".ico", |
|
|
"image/", "img/", "images/", "photos/", "pictures/", |
|
|
|
|
|
".woff", ".woff2", ".ttf", ".eot", "fonts/", "font/", |
|
|
|
|
|
".mp4", ".avi", ".mov", ".wmv", ".flv", "video/", "media/", |
|
|
|
|
|
"analytics", "tracking", "metrics", "stats", "telemetry" |
|
|
]) else route.continue_() |
|
|
)) |
|
|
|
|
|
|
|
|
global current_browser, current_page |
|
|
current_browser = browser |
|
|
current_page = page |
|
|
|
|
|
try: |
|
|
|
|
|
max_retries = 5 |
|
|
retry_count = 0 |
|
|
page_loaded = False |
|
|
|
|
|
while retry_count < max_retries and not page_loaded: |
|
|
try: |
|
|
retry_count += 1 |
|
|
logger.info(f"π Loading website (attempt {retry_count}/{max_retries}): {url}") |
|
|
|
|
|
|
|
|
if retry_count == 1: |
|
|
|
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
|
elif retry_count == 2: |
|
|
|
|
|
await page.goto(url, timeout=20000) |
|
|
elif retry_count == 3: |
|
|
|
|
|
await page.goto(url, wait_until="networkidle", timeout=15000) |
|
|
else: |
|
|
|
|
|
await page.goto(url, timeout=10000) |
|
|
|
|
|
logger.info(f"β
Successfully loaded website on attempt {retry_count}") |
|
|
page_loaded = True |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Attempt {retry_count} failed for {url}: {str(e)}") |
|
|
|
|
|
if retry_count >= max_retries: |
|
|
logger.error(f"β Failed to load website after {max_retries} attempts: {url}") |
|
|
return [{ |
|
|
"title": "WEBSITE_LOAD_ERROR", |
|
|
"content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
if not page_loaded: |
|
|
return [{ |
|
|
"title": "WEBSITE_LOAD_ERROR", |
|
|
"content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
if use_document_scraper: |
|
|
from document_scraper import check_and_wait_for_recaptcha |
|
|
captcha_result = await check_and_wait_for_recaptcha(page, config) |
|
|
if captcha_result == "CAPTCHA_TIMEOUT": |
|
|
logger.error("β Captcha detected but not solved within timeout period") |
|
|
return [{ |
|
|
"title": "CAPTCHA_ERROR", |
|
|
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
if use_document_scraper: |
|
|
|
|
|
all_articles = await download_all_pdfs_from_page(page, url, config, website_type, start_date, end_date) |
|
|
else: |
|
|
|
|
|
all_article_links = await get_all_article_links_unified(page, url, config, website_type) |
|
|
|
|
|
if not all_article_links: |
|
|
return [{ |
|
|
"title": "No articles found", |
|
|
"content": "No articles were found on the specified page", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|
|
|
|
|
|
all_articles = await extract_all_articles_unified(page, all_article_links, config, website_type, custom_keywords, start_date, end_date) |
|
|
|
|
|
return all_articles |
|
|
|
|
|
finally: |
|
|
|
|
|
await browser.close() |
|
|
current_browser = None |
|
|
current_page = None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in main scraping function: {str(e)}") |
|
|
return [{ |
|
|
"title": "Scraping Error", |
|
|
"content": f"Error during scraping: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": url |
|
|
}] |
|
|
|