| | |
| | """ |
| | PRODUCTION-READY TRUTH REVELATION API |
| | Complete system with proper architecture, error handling, and scalability |
| | """ |
| |
|
| | import asyncio |
| | import logging |
| | import time |
| | from dataclasses import dataclass, asdict |
| | from enum import Enum |
| | from typing import Dict, List, Any, Optional, Tuple |
| | from contextlib import asynccontextmanager |
| | import json |
| | import os |
| |
|
| | from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Depends |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.responses import JSONResponse |
| | from pydantic import BaseModel, Field |
| | import numpy as np |
| | from PIL import Image |
| | import cv2 |
| | from scipy import ndimage |
| | import torch |
| | import torch.nn as nn |
| | from torchvision import models, transforms |
| | import aiofiles |
| | from redis import asyncio as aioredis |
| | import psutil |
| | import prometheus_client |
| | from prometheus_client import Counter, Histogram, Gauge |
| |
|
| | |
| | class Config: |
| | REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") |
| | MODEL_CACHE_SIZE = int(os.getenv("MODEL_CACHE_SIZE", "100")) |
| | MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "10485760")) |
| | REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30")) |
| | LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") |
| | |
| | |
| | HIGH_TRUTH_THRESHOLD = 0.75 |
| | MEDIUM_TRUTH_THRESHOLD = 0.6 |
| | MIN_CONFIDENCE = 0.3 |
| |
|
| | |
| | logging.basicConfig( |
| | level=getattr(logging, Config.LOG_LEVEL), |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger("truth_revelation_api") |
| |
|
| | |
| | REQUEST_COUNT = Counter('request_total', 'Total requests', ['method', 'endpoint']) |
| | REQUEST_DURATION = Histogram('request_duration_seconds', 'Request duration') |
| | ACTIVE_REQUESTS = Gauge('active_requests', 'Active requests') |
| | TRUTH_SCORE_DISTRIBUTION = Histogram('truth_score', 'Truth score distribution', buckets=[0.1, 0.3, 0.5, 0.7, 0.9, 1.0]) |
| |
|
| | |
| | class AnalysisRequest(BaseModel): |
| | text_content: Optional[str] = Field(None, description="Text content to analyze") |
| | domain: Optional[str] = Field(None, description="Artistic domain") |
| | context: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| | class ImageAnalysisRequest(BaseModel): |
| | description: Optional[str] = Field(None, description="Image description for context") |
| | context: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| | class AnalysisResponse(BaseModel): |
| | request_id: str |
| | status: str |
| | truth_score: float |
| | confidence: float |
| | archetypes: List[str] |
| | patterns: List[str] |
| | visualization_prompt: Optional[str] = None |
| | processing_time: float |
| | timestamp: str |
| |
|
| | class HealthResponse(BaseModel): |
| | status: str |
| | version: str |
| | redis_connected: bool |
| | memory_usage: float |
| | active_requests: int |
| |
|
| | |
| | class ArtisticDomain(str, Enum): |
| | LITERATURE = "literature" |
| | VISUAL_ARTS = "visual_arts" |
| | MUSIC = "music" |
| | PERFORMING_ARTS = "performing_arts" |
| | ARCHITECTURE = "architecture" |
| |
|
| | class TruthArchetype(str, Enum): |
| | COSMIC_REVELATION = "cosmic_revelation" |
| | HISTORICAL_CIPHER = "historical_cipher" |
| | CONSCIOUSNESS_CODE = "consciousness_code" |
| | ESOTERIC_SYMBOL = "esoteric_symbol" |
| |
|
| | |
| | class ProductionImageAnalyzer: |
| | def __init__(self): |
| | self.model = self._load_model() |
| | self.transform = transforms.Compose([ |
| | transforms.Resize((224, 224)), |
| | transforms.ToTensor(), |
| | transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
| | ]) |
| | |
| | def _load_model(self): |
| | """Load production-ready model""" |
| | try: |
| | model = models.resnet50(pretrained=True) |
| | model.eval() |
| | if torch.cuda.is_available(): |
| | model = model.cuda() |
| | logger.info("Production model loaded successfully") |
| | return model |
| | except Exception as e: |
| | logger.error(f"Failed to load model: {e}") |
| | raise |
| |
|
| | async def analyze_image(self, image_path: str) -> Dict[str, Any]: |
| | """Production image analysis with proper error handling""" |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | image = Image.open(image_path).convert('RGB') |
| | img_array = np.array(image) |
| | |
| | |
| | complexity = self._calculate_complexity(img_array) |
| | symmetry = self._analyze_symmetry(img_array) |
| | color_analysis = await self._analyze_colors(img_array) |
| | patterns = await self._detect_patterns(img_array) |
| | archetypes = await self._detect_archetypes(img_array) |
| | |
| | |
| | truth_score = self._calculate_truth_score( |
| | complexity, symmetry, color_analysis, patterns, archetypes |
| | ) |
| | |
| | processing_time = time.time() - start_time |
| | logger.info(f"Image analysis completed in {processing_time:.2f}s") |
| | |
| | return { |
| | "truth_score": truth_score, |
| | "complexity": complexity, |
| | "symmetry": symmetry, |
| | "color_analysis": color_analysis, |
| | "patterns": patterns, |
| | "archetypes": archetypes, |
| | "processing_time": processing_time |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Image analysis failed: {e}") |
| | raise |
| |
|
| | def _calculate_complexity(self, img_array: np.ndarray) -> float: |
| | """Calculate image complexity""" |
| | try: |
| | gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| | edges = cv2.Canny(gray, 50, 150) |
| | edge_density = np.sum(edges > 0) / edges.size |
| | |
| | |
| | hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) |
| | hist = hist / hist.sum() |
| | entropy = -np.sum(hist * np.log2(hist + 1e-8)) / 8.0 |
| | |
| | return min(1.0, (edge_density + entropy) / 2) |
| | except Exception as e: |
| | logger.warning(f"Complexity calculation failed: {e}") |
| | return 0.5 |
| |
|
| | def _analyze_symmetry(self, img_array: np.ndarray) -> float: |
| | """Analyze image symmetry""" |
| | try: |
| | gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| | height, width = gray.shape |
| | |
| | |
| | left = gray[:, :width//2] |
| | right = cv2.flip(gray[:, width//2:], 1) |
| | min_height = min(left.shape[0], right.shape[0]) |
| | min_width = min(left.shape[1], right.shape[1]) |
| | |
| | vertical_sym = 1.0 - np.abs( |
| | left[:min_height, :min_width] - right[:min_height, :min_width] |
| | ).mean() / 255.0 |
| | |
| | return vertical_sym |
| | except Exception as e: |
| | logger.warning(f"Symmetry analysis failed: {e}") |
| | return 0.5 |
| |
|
| | async def _analyze_colors(self, img_array: np.ndarray) -> Dict[str, float]: |
| | """Analyze color symbolism""" |
| | try: |
| | hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) |
| | |
| | color_ranges = { |
| | 'spiritual_gold': ([20, 100, 100], [30, 255, 255]), |
| | 'divine_purple': ([130, 50, 50], [160, 255, 255]), |
| | 'cosmic_blue': ([100, 50, 50], [130, 255, 255]), |
| | } |
| | |
| | color_presence = {} |
| | for color_name, (lower, upper) in color_ranges.items(): |
| | mask = cv2.inRange(hsv, np.array(lower), np.array(upper)) |
| | presence = np.sum(mask > 0) / mask.size |
| | color_presence[color_name] = min(1.0, presence * 5) |
| | |
| | return color_presence |
| | except Exception as e: |
| | logger.warning(f"Color analysis failed: {e}") |
| | return {} |
| |
|
| | async def _detect_patterns(self, img_array: np.ndarray) -> List[str]: |
| | """Detect visual patterns""" |
| | try: |
| | patterns = [] |
| | gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| | |
| | |
| | circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, 20, |
| | param1=50, param2=30, minRadius=5, maxRadius=100) |
| | if circles is not None and len(circles[0]) > 2: |
| | patterns.append("sacred_geometry") |
| | |
| | |
| | symmetry_score = self._analyze_symmetry(img_array) |
| | if symmetry_score > 0.7: |
| | patterns.append("harmonic_balance") |
| | |
| | return patterns |
| | except Exception as e: |
| | logger.warning(f"Pattern detection failed: {e}") |
| | return [] |
| |
|
| | async def _detect_archetypes(self, img_array: np.ndarray) -> List[str]: |
| | """Detect truth archetypes""" |
| | try: |
| | archetypes = [] |
| | gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| | |
| | |
| | complexity = self._calculate_complexity(img_array) |
| | if complexity > 0.7: |
| | archetypes.append("complex_symbolism") |
| | |
| | |
| | color_analysis = await self._analyze_colors(img_array) |
| | if color_analysis.get('cosmic_blue', 0) > 0.3: |
| | archetypes.append("cosmic_revelation") |
| | |
| | return archetypes |
| | except Exception as e: |
| | logger.warning(f"Archetype detection failed: {e}") |
| | return [] |
| |
|
| | def _calculate_truth_score(self, complexity: float, symmetry: float, |
| | color_analysis: Dict[str, float], patterns: List[str], |
| | archetypes: List[str]) -> float: |
| | """Calculate overall truth revelation score""" |
| | weights = { |
| | 'complexity': 0.25, |
| | 'symmetry': 0.20, |
| | 'color': 0.25, |
| | 'patterns': 0.15, |
| | 'archetypes': 0.15 |
| | } |
| | |
| | color_score = np.mean(list(color_analysis.values())) if color_analysis else 0.0 |
| | pattern_score = len(patterns) * 0.1 |
| | archetype_score = len(archetypes) * 0.1 |
| | |
| | score = (complexity * weights['complexity'] + |
| | symmetry * weights['symmetry'] + |
| | color_score * weights['color'] + |
| | pattern_score * weights['patterns'] + |
| | archetype_score * weights['archetypes']) |
| | |
| | return min(1.0, score) |
| |
|
| | class TextAnalyzer: |
| | async def analyze_text(self, text: str, domain: Optional[str] = None) -> Dict[str, Any]: |
| | """Production text analysis""" |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | word_count = len(text.split()) |
| | symbolic_density = self._calculate_symbolic_density(text) |
| | emotional_impact = self._assess_emotional_impact(text) |
| | archetypes = self._detect_text_archetypes(text) |
| | |
| | truth_score = self._calculate_text_truth_score( |
| | symbolic_density, emotional_impact, archetypes |
| | ) |
| | |
| | processing_time = time.time() - start_time |
| | |
| | return { |
| | "truth_score": truth_score, |
| | "word_count": word_count, |
| | "symbolic_density": symbolic_density, |
| | "emotional_impact": emotional_impact, |
| | "archetypes": archetypes, |
| | "processing_time": processing_time |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Text analysis failed: {e}") |
| | raise |
| |
|
| | def _calculate_symbolic_density(self, text: str) -> float: |
| | """Calculate symbolic density in text""" |
| | symbolic_terms = { |
| | 'light', 'dark', 'water', 'fire', 'earth', 'air', 'journey', |
| | 'transformation', 'truth', 'reality', 'consciousness', 'cosmic' |
| | } |
| | words = text.lower().split() |
| | if not words: |
| | return 0.0 |
| | |
| | matches = sum(1 for word in words if word in symbolic_terms) |
| | return min(1.0, matches / len(words) * 5) |
| |
|
| | def _assess_emotional_impact(self, text: str) -> float: |
| | """Assess emotional impact of text""" |
| | emotional_words = { |
| | 'love', 'fear', 'hope', 'despair', 'joy', 'sorrow', 'passion', |
| | 'rage', 'ecstasy', 'terror', 'bliss', 'anguish' |
| | } |
| | words = text.lower().split() |
| | if not words: |
| | return 0.0 |
| | |
| | matches = sum(1 for word in words if word in emotional_words) |
| | return min(1.0, matches / len(words) * 3) |
| |
|
| | def _detect_text_archetypes(self, text: str) -> List[str]: |
| | """Detect truth archetypes in text""" |
| | archetype_patterns = { |
| | 'cosmic_revelation': ['cosmic', 'universe', 'galaxy', 'star', 'nebula'], |
| | 'historical_cipher': ['ancient', 'civilization', 'lost', 'artifact'], |
| | 'consciousness_code': ['mind', 'awareness', 'consciousness', 'dream'], |
| | 'esoteric_symbol': ['symbol', 'sacred', 'mystery', 'hidden'] |
| | } |
| | |
| | text_lower = text.lower() |
| | detected = [] |
| | for archetype, patterns in archetype_patterns.items(): |
| | if any(pattern in text_lower for pattern in patterns): |
| | detected.append(archetype) |
| | |
| | return detected |
| |
|
| | def _calculate_text_truth_score(self, symbolic_density: float, |
| | emotional_impact: float, archetypes: List[str]) -> float: |
| | """Calculate text truth score""" |
| | base_score = (symbolic_density * 0.4 + emotional_impact * 0.3) |
| | archetype_boost = len(archetypes) * 0.1 |
| | return min(1.0, base_score + archetype_boost) |
| |
|
| | |
| | class CacheManager: |
| | def __init__(self): |
| | self.redis = None |
| | |
| | async def connect(self): |
| | """Connect to Redis""" |
| | try: |
| | self.redis = await aioredis.from_url(Config.REDIS_URL, decode_responses=True) |
| | await self.redis.ping() |
| | logger.info("Redis connected successfully") |
| | except Exception as e: |
| | logger.error(f"Redis connection failed: {e}") |
| | self.redis = None |
| |
|
| | async def get(self, key: str) -> Optional[str]: |
| | """Get value from cache""" |
| | if not self.redis: |
| | return None |
| | try: |
| | return await self.redis.get(key) |
| | except Exception as e: |
| | logger.warning(f"Cache get failed: {e}") |
| | return None |
| |
|
| | async def set(self, key: str, value: str, expire: int = 3600): |
| | """Set value in cache""" |
| | if not self.redis: |
| | return |
| | try: |
| | await self.redis.set(key, value, ex=expire) |
| | except Exception as e: |
| | logger.warning(f"Cache set failed: {e}") |
| |
|
| | async def close(self): |
| | """Close Redis connection""" |
| | if self.redis: |
| | await self.redis.close() |
| |
|
| | |
| | class TruthRevelationAPI: |
| | def __init__(self): |
| | self.app = FastAPI( |
| | title="Truth Revelation API", |
| | description="Production-ready API for artistic and visual truth analysis", |
| | version="1.0.0" |
| | ) |
| | self.cache = CacheManager() |
| | self.image_analyzer = ProductionImageAnalyzer() |
| | self.text_analyzer = TextAnalyzer() |
| | self.setup_middleware() |
| | self.setup_routes() |
| | |
| | def setup_middleware(self): |
| | """Setup application middleware""" |
| | self.app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | def setup_routes(self): |
| | """Setup API routes""" |
| | |
| | @self.app.on_event("startup") |
| | async def startup(): |
| | await self.cache.connect() |
| | logger.info("Truth Revelation API started") |
| |
|
| | @self.app.on_event("shutdown") |
| | async def shutdown(): |
| | await self.cache.close() |
| | logger.info("Truth Revelation API stopped") |
| |
|
| | @self.app.get("/health", response_model=HealthResponse) |
| | async def health_check(): |
| | """Health check endpoint""" |
| | redis_connected = self.cache.redis is not None |
| | memory_usage = psutil.Process().memory_percent() |
| | |
| | return HealthResponse( |
| | status="healthy", |
| | version="1.0.0", |
| | redis_connected=redis_connected, |
| | memory_usage=memory_usage, |
| | active_requests=ACTIVE_REQUESTS._value.get() |
| | ) |
| |
|
| | @self.app.post("/analyze/text", response_model=AnalysisResponse) |
| | @REQUEST_DURATION.time() |
| | async def analyze_text(request: AnalysisRequest): |
| | """Analyze text content for truth revelation""" |
| | ACTIVE_REQUESTS.inc() |
| | REQUEST_COUNT.labels(method="POST", endpoint="/analyze/text").inc() |
| | |
| | try: |
| | start_time = time.time() |
| | request_id = f"text_{int(time.time())}_{hash(request.text_content or '')}" |
| | |
| | |
| | cache_key = f"text_analysis:{hash(request.text_content or '')}" |
| | cached_result = await self.cache.get(cache_key) |
| | |
| | if cached_result: |
| | result = json.loads(cached_result) |
| | result['cached'] = True |
| | logger.info(f"Serving cached text analysis for {request_id}") |
| | else: |
| | |
| | analysis = await self.text_analyzer.analyze_text( |
| | request.text_content or "", request.domain |
| | ) |
| | |
| | |
| | prompt = self._generate_prompt(analysis, request.domain) |
| | |
| | result = { |
| | "request_id": request_id, |
| | "status": "completed", |
| | "truth_score": analysis["truth_score"], |
| | "confidence": 0.8, |
| | "archetypes": analysis["archetypes"], |
| | "patterns": [], |
| | "visualization_prompt": prompt, |
| | "processing_time": analysis["processing_time"], |
| | "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "cached": False |
| | } |
| | |
| | |
| | await self.cache.set(cache_key, json.dumps(result)) |
| | |
| | TRUTH_SCORE_DISTRIBUTION.observe(result["truth_score"]) |
| | ACTIVE_REQUESTS.dec() |
| | |
| | return AnalysisResponse(**{k: v for k, v in result.items() if k != 'cached'}) |
| | |
| | except Exception as e: |
| | ACTIVE_REQUESTS.dec() |
| | logger.error(f"Text analysis failed: {e}") |
| | raise HTTPException(status_code=500, detail="Text analysis failed") |
| |
|
| | @self.app.post("/analyze/image", response_model=AnalysisResponse) |
| | @REQUEST_DURATION.time() |
| | async def analyze_image( |
| | file: UploadFile = File(...), |
| | description: Optional[str] = Form(None), |
| | context: str = Form("{}") |
| | ): |
| | """Analyze image content for truth revelation""" |
| | ACTIVE_REQUESTS.inc() |
| | REQUEST_COUNT.labels(method="POST", endpoint="/analyze/image").inc() |
| | |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | if not file.content_type.startswith('image/'): |
| | raise HTTPException(status_code=400, detail="Invalid image file") |
| | |
| | |
| | file_path = f"/tmp/{file.filename}" |
| | async with aiofiles.open(file_path, 'wb') as f: |
| | content = await file.read() |
| | if len(content) > Config.MAX_IMAGE_SIZE: |
| | raise HTTPException(status_code=400, detail="File too large") |
| | await f.write(content) |
| | |
| | request_id = f"image_{int(time.time())}_{hash(file.filename)}" |
| | |
| | |
| | cache_key = f"image_analysis:{hash(content)}" |
| | cached_result = await self.cache.get(cache_key) |
| | |
| | if cached_result: |
| | result = json.loads(cached_result) |
| | result['cached'] = True |
| | logger.info(f"Serving cached image analysis for {request_id}") |
| | else: |
| | |
| | analysis = await self.image_analyzer.analyze_image(file_path) |
| | |
| | |
| | prompt = self._generate_image_prompt(analysis, description) |
| | |
| | result = { |
| | "request_id": request_id, |
| | "status": "completed", |
| | "truth_score": analysis["truth_score"], |
| | "confidence": 0.7, |
| | "archetypes": analysis["archetypes"], |
| | "patterns": analysis["patterns"], |
| | "visualization_prompt": prompt, |
| | "processing_time": analysis["processing_time"], |
| | "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "cached": False |
| | } |
| | |
| | |
| | await self.cache.set(cache_key, json.dumps(result)) |
| | |
| | |
| | os.remove(file_path) |
| | |
| | TRUTH_SCORE_DISTRIBUTION.observe(result["truth_score"]) |
| | ACTIVE_REQUESTS.dec() |
| | |
| | return AnalysisResponse(**{k: v for k, v in result.items() if k != 'cached'}) |
| | |
| | except HTTPException: |
| | ACTIVE_REQUESTS.dec() |
| | raise |
| | except Exception as e: |
| | ACTIVE_REQUESTS.dec() |
| | logger.error(f"Image analysis failed: {e}") |
| | raise HTTPException(status_code=500, detail="Image analysis failed") |
| |
|
| | @self.app.get("/metrics") |
| | async def metrics(): |
| | """Prometheus metrics endpoint""" |
| | return prometheus_client.generate_latest() |
| |
|
| | def _generate_prompt(self, analysis: Dict[str, Any], domain: Optional[str]) -> str: |
| | """Generate visualization prompt from analysis""" |
| | components = ["middle-ages-islamic-art style"] |
| | |
| | if domain: |
| | components.append(f"{domain} theme") |
| | |
| | if analysis["archetypes"]: |
| | components.extend(analysis["archetypes"][:2]) |
| | |
| | components.extend(["intricate details", "symbolic meaning", "high resolution"]) |
| | |
| | return ", ".join(components) |
| |
|
| | def _generate_image_prompt(self, analysis: Dict[str, Any], description: Optional[str]) -> str: |
| | """Generate image visualization prompt""" |
| | components = ["middle-ages-islamic-art style"] |
| | |
| | if description: |
| | components.append(description) |
| | |
| | if analysis["archetypes"]: |
| | components.extend(analysis["archetypes"][:2]) |
| | |
| | if analysis["patterns"]: |
| | components.extend(analysis["patterns"][:2]) |
| | |
| | components.extend(["detailed", "symbolic", "illuminated manuscript style"]) |
| | |
| | return ", ".join(components) |
| |
|
| | |
| | app = TruthRevelationAPI().app |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run( |
| | "main:app", |
| | host="0.0.0.0", |
| | port=8000, |
| | reload=False, |
| | access_log=True, |
| | timeout_keep_alive=30 |
| | ) |