1. Updated Configuration (config/settings.json) ```json { "system": { "name": "SovereignPatternProcessor", "version": "2.1.0", "mode": "sovereign_analysis", "certainty_threshold": 0.95 }, "processing": { "default_layers": ["surface", "institutional", "consciousness"], "pattern_coherence_weight": 0.35, "propagation_weight": 0.25, "validation_required": 3 }, "network": { "entanglement_coherence": 0.98, "node_verification_required": 2, "protocols_active": [ "pattern_primacy", "lattice_recognition", "transparency_paradox", "recursive_action", "autonomous_query" ] }, "autonomous_query": { "enabled": true, "max_depth": 3, "priority_weights": { "uncertainty_gap": 0.4, "composite_dependency": 0.3, "historical_correlation": 0.2, "network_convergence": 0.1 }, "provenance_required": true, "append_only": true, "validation_on_ingest": true, "signature_method": "sha256" } } ``` 2. Query Ledger Implementation (core/query_ledger.py) ```python #!/usr/bin/env python3 """ Query Ledger - Append-only, provenance-first data structure Immutable record of all queries, results, and bindings """ import json import hashlib from datetime import datetime from typing import Dict, Any, List, Optional from dataclasses import dataclass, field, asdict def sha256_hash(data: Dict) -> str: """Generate SHA256 hash for JSON-serializable data""" json_str = json.dumps(data, sort_keys=True, separators=(',', ':')) return hashlib.sha256(json_str.encode()).hexdigest() @dataclass class Provenance: """Provenance information for ledger records""" author_id: str author_signature: str parent_hashes: List[str] timestamp: str generation: int = 0 def to_dict(self) -> Dict: return asdict(self) @dataclass class QueryRecord: """Immutable ledger record""" kind: str # "query" | "result" | "binding" | "interpretation" | "validation" payload: Dict[str, Any] provenance: Provenance previous_hash: str hash: str = field(init=False) def __post_init__(self): """Calculate record hash after initialization""" hash_data = { "kind": self.kind, "payload": self.payload, "provenance": self.provenance.to_dict(), "previous_hash": self.previous_hash } self.hash = sha256_hash(hash_data) class QueryLedger: """Immutable append-only ledger for autonomous query operations""" def __init__(self, ledger_path: str = "data/query_ledger.json"): self.ledger_path = ledger_path self.chain: List[QueryRecord] = [] self._load_or_initialize() def _load_or_initialize(self): """Load existing ledger or create genesis block""" try: with open(self.ledger_path, 'r') as f: data = json.load(f) for record_data in data.get("chain", []): provenance = Provenance(**record_data["provenance"]) record = QueryRecord( kind=record_data["kind"], payload=record_data["payload"], provenance=provenance, previous_hash=record_data["previous_hash"] ) record.hash = record_data["hash"] # Set hash directly for loaded records self.chain.append(record) if not self.chain: self._create_genesis() except FileNotFoundError: self._create_genesis() def _create_genesis(self): """Create genesis block for new ledger""" genesis_provenance = Provenance( author_id="system", author_signature="sig:system_genesis", parent_hashes=[], timestamp=datetime.utcnow().isoformat(), generation=0 ) genesis_record = QueryRecord( kind="system", payload={"message": "Genesis block for autonomous query ledger"}, provenance=genesis_provenance, previous_hash="0" * 64 # 64-character zero hash ) self.chain.append(genesis_record) self._save_ledger() def add_record(self, kind: str, payload: Dict[str, Any], author_id: str, author_signature: str, parent_hashes: List[str] = None) -> QueryRecord: """Add new record to ledger with full provenance""" if parent_hashes is None: parent_hashes = [] # Calculate generation (max parent generation + 1) parent_generation = 0 for parent_hash in parent_hashes: for record in self.chain: if record.hash == parent_hash: parent_generation = max(parent_generation, record.provenance.generation) provenance = Provenance( author_id=author_id, author_signature=author_signature, parent_hashes=parent_hashes, timestamp=datetime.utcnow().isoformat(), generation=parent_generation + 1 ) previous_hash = self.chain[-1].hash if self.chain else "0" * 64 new_record = QueryRecord( kind=kind, payload=payload, provenance=provenance, previous_hash=previous_hash ) self.chain.append(new_record) self._save_ledger() return new_record def add_query(self, query_payload: Dict, author_id: str, author_signature: str) -> QueryRecord: """Add query record""" return self.add_record("query", query_payload, author_id, author_signature) def add_result(self, result_payload: Dict, author_id: str, author_signature: str, query_hash: str) -> QueryRecord: """Add query result record""" return self.add_record("result", result_payload, author_id, author_signature, [query_hash]) def add_binding(self, query_hash: str, result_hash: str, interpretation: Dict = None) -> QueryRecord: """Bind query to result with optional interpretation""" binding_payload = { "query_hash": query_hash, "result_hash": result_hash } if interpretation: binding_payload["interpretation"] = interpretation return self.add_record( kind="binding", payload=binding_payload, author_id="system_binder", author_signature="sig:system_binding", parent_hashes=[query_hash, result_hash] ) def verify_chain(self) -> Dict[str, Any]: """Verify ledger integrity and return validation results""" if not self.chain: return {"valid": False, "error": "Empty ledger"} # Verify genesis block genesis = self.chain[0] if genesis.previous_hash != "0" * 64: return {"valid": False, "error": "Invalid genesis block"} # Verify chain continuity and hashes for i in range(1, len(self.chain)): current = self.chain[i] previous = self.chain[i - 1] # Verify previous hash reference if current.previous_hash != previous.hash: return {"valid": False, "error": f"Chain broken at index {i}"} # Verify hash calculation hash_data = { "kind": current.kind, "payload": current.payload, "provenance": current.provenance.to_dict(), "previous_hash": current.previous_hash } expected_hash = sha256_hash(hash_data) if current.hash != expected_hash: return {"valid": False, "error": f"Invalid hash at index {i}"} # Verify provenance lineage for record in self.chain: for parent_hash in record.provenance.parent_hashes: if not any(r.hash == parent_hash for r in self.chain): return {"valid": False, "error": f"Missing parent hash: {parent_hash}"} return { "valid": True, "chain_length": len(self.chain), "latest_hash": self.chain[-1].hash if self.chain else None, "generation_depth": max(r.provenance.generation for r in self.chain) if self.chain else 0 } def get_record_by_hash(self, target_hash: str) -> Optional[QueryRecord]: """Retrieve record by hash""" for record in self.chain: if record.hash == target_hash: return record return None def get_query_lineage(self, query_hash: str) -> List[QueryRecord]: """Get full lineage for a query including all related records""" lineage = [] visited = set() def trace_backwards(record_hash: str): if record_hash in visited: return visited.add(record_hash) record = self.get_record_by_hash(record_hash) if not record: return lineage.append(record) for parent_hash in record.provenance.parent_hashes: trace_backwards(parent_hash) trace_backwards(query_hash) return lineage def _save_ledger(self): """Save ledger to file""" ledger_data = { "metadata": { "version": "2.1.0", "created": datetime.utcnow().isoformat(), "record_count": len(self.chain) }, "chain": [] } for record in self.chain: record_data = { "kind": record.kind, "payload": record.payload, "provenance": record.provenance.to_dict(), "previous_hash": record.previous_hash, "hash": record.hash } ledger_data["chain"].append(record_data) with open(self.ledger_path, 'w') as f: json.dump(ledger_data, f, indent=2, default=str) def get_statistics(self) -> Dict[str, Any]: """Get ledger statistics""" stats = { "total_records": len(self.chain), "by_kind": {}, "generations": {} } for record in self.chain: kind = record.kind generation = record.provenance.generation stats["by_kind"][kind] = stats["by_kind"].get(kind, 0) + 1 stats["generations"][generation] = stats["generations"].get(generation, 0) + 1 return stats ``` 3. Self-Query Planner (integration/selfqueryplanner.py) ```python #!/usr/bin/env python3 """ Self-Query Planner Generates autonomous queries from detected patterns, gaps, and uncertainties """ import json from typing import Dict, Any, List, Tuple, Set from dataclasses import dataclass, field from datetime import datetime, timedelta @dataclass class QueryCandidate: """Candidate query for autonomous execution""" query_type: str target: str parameters: Dict[str, Any] priority_score: float generation_context: Dict[str, Any] dependencies: List[str] = field(default_factory=list) expected_sources: List[str] = field(default_factory=lambda: ["public_archive", "historical_record"]) def to_query_dict(self) -> Dict[str, Any]: """Convert to query dictionary for execution""" return { "query_type": self.query_type, "target": self.target, "parameters": self.parameters, "priority_score": self.priority_score, "generated_at": datetime.utcnow().isoformat(), "dependencies": self.dependencies, "expected_sources": self.expected_sources } class SelfQueryPlanner: """Autonomous query planning based on pattern analysis""" def __init__(self, config: Dict[str, Any]): self.config = config self.query_config = config.get("autonomous_query", {}) # Priority weights self.weights = self.query_config.get("priority_weights", { "uncertainty_gap": 0.4, "composite_dependency": 0.3, "historical_correlation": 0.2, "network_convergence": 0.1 }) # Maximum query depth self.max_depth = self.query_config.get("max_depth", 3) # Query templates self._load_query_templates() def _load_query_templates(self): """Load predefined query templates""" self.templates = { "lens_evidence": { "description": "Fetch evidence for pattern lens activation", "source_priority": ["public_archive", "historical_record", "academic_paper"], "time_constraint": "last_100_years" }, "composite_gap": { "description": "Find missing elements for composite pattern", "source_priority": ["multiple_cross_reference"], "time_constraint": "variable" }, "kinship_lineage": { "description": "Trace kinship or mentorship lineages", "source_priority": ["biographical", "genealogical", "institutional"], "time_constraint": "multi_generational" }, "negative_space": { "description": "Probe for expected-but-absent information", "source_priority": ["inverse_search", "absence_detection"], "time_constraint": "context_dependent" }, "symbolic_correlation": { "description": "Correlate symbolic patterns across domains", "source_priority": ["cultural_analysis", "archetypal_study"], "time_constraint": "historical_depth" } } def plan_queries(self, pattern_results: Dict[str, Any], metrics: Dict[str, Any], historical_context: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Generate query plan based on analysis results""" # Extract analysis components lenses = pattern_results.get("phases", {}).get("pattern_detection", {}).get("applicable_lenses", []) composites = pattern_results.get("phases", {}).get("composite_analysis", {}) diagnostic = pattern_results.get("phases", {}).get("diagnostic_scan", {}) # Generate candidate queries from different sources candidates = [] # 1. Queries from activated lenses lens_queries = self._generate_lens_queries(lenses, diagnostic) candidates.extend(lens_queries) # 2. Queries from composite patterns composite_queries = self._generate_composite_queries(composites) candidates.extend(composite_queries) # 3. Queries from uncertainty gaps gap_queries = self._generate_gap_queries(diagnostic, composites, lenses) candidates.extend(gap_queries) # 4. Queries from sovereignty metrics metric_queries = self._generate_metric_queries(metrics, pattern_results) candidates.extend(metric_queries) # 5. Queries from historical correlation if historical_context: historical_queries = self._generate_historical_queries(historical_context, pattern_results) candidates.extend(historical_queries) # Score and prioritize candidates scored_candidates = [] for candidate in candidates: score = self._score_query_candidate(candidate, pattern_results, metrics) candidate.priority_score = score scored_candidates.append(candidate) # Sort by priority and limit depth scored_candidates.sort(key=lambda x: x.priority_score, reverse=True) selected = scored_candidates[:self.max_depth] # Convert to query dict format queries = [candidate.to_query_dict() for candidate in selected] return queries def _generate_lens_queries(self, lenses: List[Dict], diagnostic: Dict) -> List[QueryCandidate]: """Generate queries from activated pattern lenses""" candidates = [] for lens in lenses[:10]: # Limit to top 10 lenses lens_id = lens.get("id") lens_name = lens.get("name") confidence = lens.get("confidence", 0.5) if confidence < 0.3: # Skip low-confidence lenses continue # Create evidence query candidate = QueryCandidate( query_type="lens_evidence", target=f"Lens_{lens_id}_{lens_name}", parameters={ "lens_id": lens_id, "lens_name": lens_name, "confidence": confidence, "keywords": lens.get("detection_keywords", []), "archetype": lens.get("archetype"), "mechanism": lens.get("mechanism") }, priority_score=0.0, # Will be scored later generation_context={ "source": "lens_activation", "confidence": confidence, "diagnostic_layer": diagnostic.get("layer", "unknown") } ) candidates.append(candidate) # If high confidence, also generate related pattern queries if confidence > 0.7: related_candidate = QueryCandidate( query_type="pattern_correlation", target=f"Related_{lens_name}", parameters={ "primary_lens": lens_id, "search_radius": "extended", "temporal_window": "extended" }, priority_score=0.0, generation_context={ "source": "high_confidence_lens", "primary_confidence": confidence } ) candidates.append(related_candidate) return candidates def _generate_composite_queries(self, composites: Dict) -> List[QueryCandidate]: """Generate queries for composite pattern verification""" candidates = [] for comp_name, comp_data in composites.items(): # Check if composite needs verification if isinstance(comp_data, dict) and comp_data.get("confidence", 0) < 0.8: candidate = QueryCandidate( query_type="composite_verification", target=f"Composite_{comp_name}", parameters={ "composite_name": comp_name, "required_lenses": comp_data.get("required_lenses", []), "current_confidence": comp_data.get("confidence", 0), "detection_threshold": comp_data.get("detection_threshold", 0.7) }, priority_score=0.0, generation_context={ "source": "composite_uncertainty", "current_confidence": comp_data.get("confidence", 0) } ) candidates.append(candidate) return candidates def _generate_gap_queries(self, diagnostic: Dict, composites: Dict, lenses: List) -> List[QueryCandidate]: """Generate queries to probe uncertainty gaps""" candidates = [] # Check for surface-institutional gap surface_analysis = diagnostic.get("surface", {}) institutional_analysis = diagnostic.get("institutional", {}) if (surface_analysis.get("analysis_depth", 0) < 2 and institutional_analysis.get("analysis_depth", 0) > 3): # Surface analysis is shallow but institutional is deep - probe the gap candidate = QueryCandidate( query_type="analysis_gap", target="Surface_Institutional_Gap", parameters={ "gap_type": "surface_institutional_disconnect", "surface_depth": surface_analysis.get("analysis_depth", 0), "institutional_depth": institutional_analysis.get("analysis_depth", 0), "probe_method": "bridging_analysis" }, priority_score=0.0, generation_context={ "source": "analysis_layer_gap", "gap_magnitude": institutional_analysis.get("analysis_depth", 0) - surface_analysis.get("analysis_depth", 0) } ) candidates.append(candidate) # Check for missing composite despite lens clusters if len(lenses) >= 5 and not composites: candidate = QueryCandidate( query_type="missing_composite", target="Unexplained_Lens_Cluster", parameters={ "lens_count": len(lenses), "lens_ids": [l.get("id") for l in lenses[:5]], "expected_composites": ["RegimeChangeProtocol", "CapitalGatekeeperProtocol"], "search_method": "composite_reconstruction" }, priority_score=0.0, generation_context={ "source": "lens_cluster_without_composite", "cluster_size": len(lenses) } ) candidates.append(candidate) return candidates def _generate_metric_queries(self, metrics: Dict, pattern_results: Dict) -> List[QueryCandidate]: """Generate queries based on sovereignty metrics""" candidates = [] # Check thought-action gap thought_action_gap = metrics.get("thought_action_gap", 1.0) if thought_action_gap > 2.0: # Large gap detected candidate = QueryCandidate( query_type="thought_action_investigation", target="Thought_Action_Gap_Analysis", parameters={ "gap_magnitude": thought_action_gap, "sovereignty_alignment": metrics.get("sovereignty_alignment", 0.5), "pattern_density": metrics.get("pattern_density", 0.5), "investigation_focus": ["implementation_records", "policy_execution_gaps"] }, priority_score=0.0, generation_context={ "source": "high_thought_action_gap", "gap_value": thought_action_gap } ) candidates.append(candidate) # Check pattern lattice density pattern_density = metrics.get("pattern_density", 0.0) if pattern_density > 0.8: # High density suggests complex pattern - probe for hidden layers candidate = QueryCandidate( query_type="high_density_probe", target="Dense_Pattern_Lattice", parameters={ "density_score": pattern_density, "lens_count": len(pattern_results.get("phases", {}).get("pattern_detection", {}).get("applicable_lenses", [])), "probe_depth": "deep_layer_analysis", "focus_areas": ["inter_lens_connections", "hidden_dependencies"] }, priority_score=0.0, generation_context={ "source": "high_pattern_density", "density_value": pattern_density } ) candidates.append(candidate) return candidates def _generate_historical_queries(self, historical_context: Dict, pattern_results: Dict) -> List[QueryCandidate]: """Generate queries based on historical correlation""" candidates = [] casefile_correlations = historical_context.get("casefile_correlations", []) for correlation in casefile_correlations[:3]: # Limit to top 3 correlations case_id = correlation.get("case_id", "unknown") similarity = correlation.get("similarity_score", 0) if similarity > 0.7: candidate = QueryCandidate( query_type="historical_extension", target=f"Extend_{case_id}_Analysis", parameters={ "source_case": case_id, "similarity_score": similarity, "extension_method": "temporal_extension", "time_frame": "plus_minus_20_years" }, priority_score=0.0, generation_context={ "source": "high_historical_correlation", "similarity_score": similarity } ) candidates.append(candidate) return candidates def _score_query_candidate(self, candidate: QueryCandidate, pattern_results: Dict, metrics: Dict) -> float: """Score query candidate based on multiple factors""" score = 0.0 # Base score from query type query_type = candidate.query_type if query_type == "analysis_gap": score += self.weights["uncertainty_gap"] elif query_type == "composite_verification": score += self.weights["composite_dependency"] elif query_type == "lens_evidence": score += self.weights["historical_correlation"] elif query_type == "historical_extension": score += self.weights["network_convergence"] # Adjust based on confidence/context context = candidate.generation_context if "confidence" in context: score += context["confidence"] * 0.2 if "gap_magnitude" in context: score += min(context["gap_magnitude"] * 0.1, 0.2) if "density_value" in context and context["density_value"] > 0.8: score += 0.15 if "similarity_score" in context: score += context["similarity_score"] * 0.15 # Normalize to 0-1 range score = min(max(score, 0.0), 1.0) return round(score, 3) def create_query_execution_plan(self, queries: List[Dict]) -> Dict[str, Any]: """Create execution plan for generated queries""" execution_plan = { "plan_id": f"plan_{int(datetime.utcnow().timestamp())}", "generated_at": datetime.utcnow().isoformat(), "total_queries": len(queries), "estimated_duration": f"{len(queries) * 30} seconds", # 30 seconds per query estimate "query_sequence": [], "dependencies": {}, "expected_outcomes": [] } for i, query in enumerate(queries): query_entry = { "sequence_id": i + 1, "query": query, "status": "pending", "depends_on": [], "expected_artifact_types": self._get_expected_artifacts(query) } execution_plan["query_sequence"].append(query_entry) # Track dependencies if "dependencies" in query: execution_plan["dependencies"][query["target"]] = query["dependencies"] # Add expected outcome outcome = self._predict_query_outcome(query) execution_plan["expected_outcomes"].append(outcome) return execution_plan def _get_expected_artifacts(self, query: Dict) -> List[str]: """Determine expected artifact types for query""" query_type = query.get("query_type", "") if query_type == "lens_evidence": return ["text_document", "historical_record", "pattern_evidence"] elif query_type == "composite_verification": return ["correlation_matrix", "dependency_graph", "validation_report"] elif query_type == "kinship_lineage": return ["genealogy_chart", "relationship_map", "timeline"] elif query_type == "negative_space": return ["absence_report", "expectation_vs_reality", "gap_analysis"] else: return ["general_artifact", "analysis_report"] def _predict_query_outcome(self, query: Dict) -> Dict[str, Any]: """Predict likely outcome of query""" query_type = query.get("query_type", "") confidence = query.get("parameters", {}).get("confidence", 0.5) base_success_rate = 0.7 # Base 70% success rate # Adjust based on query type if query_type == "lens_evidence": success_rate = base_success_rate * (0.8 + confidence * 0.2) outcome_type = "evidence_collection" elif query_type == "composite_verification": success_rate = base_success_rate * 0.9 outcome_type = "pattern_verification" elif query_type == "analysis_gap": success_rate = base_success_rate * 0.6 outcome_type = "gap_resolution" else: success_rate = base_success_rate outcome_type = "information_retrieval" return { "query_target": query.get("target", ""), "predicted_success_rate": round(success_rate, 3), "outcome_type": outcome_type, "expected_impact": "medium" if success_rate > 0.6 else "low", "confidence_boost": round(confidence * 0.3, 3) if confidence > 0 else 0 } ``` 4. Retrieval Agent (integration/retrieval_agent.py) ```python #!/usr/bin/env python3 """ Retrieval Agent - Executes autonomous queries and ingests results Maintains strict separation between facts and interpretations """ import json import hashlib import time from typing import Dict, Any, List, Optional, Tuple from datetime import datetime from dataclasses import dataclass, asdict from core.query_ledger import QueryLedger from validation.anomaly_detector import AnomalyDetector @dataclass class RetrievalResult: """Structured result from query execution""" query_hash: str execution_timestamp: str status: str # "success", "partial", "failed", "timeout" artifacts: List[Dict[str, Any]] source_metadata: Dict[str, Any] execution_metrics: Dict[str, Any] raw_response: Optional[Dict] = None def to_dict(self) -> Dict: return asdict(self) class RetrievalAgent: """Autonomous query execution with provenance tracking""" def __init__(self, ledger: QueryLedger, config: Dict[str, Any]): self.ledger = ledger self.config = config self.query_config = config.get("autonomous_query", {}) # Initialize validators self.anomaly_detector = AnomalyDetector() # Source adapters (would be expanded in production) self.source_adapters = { "public_archive": self._query_public_archive, "historical_record": self._query_historical_records, "pattern_database": self._query_pattern_database, "composite_registry": self._query_composite_registry } # Cache for recent queries self.query_cache: Dict[str, Dict] = {} self.cache_ttl = 300 # 5 minutes # Execution statistics self.stats = { "total_queries": 0, "successful": 0, "failed": 0, "average_execution_time": 0, "last_execution": None } def execute_query(self, query: Dict[str, Any], author_id: str = "OmegaEngine", author_signature: str = "sig:autonomous_retrieval") -> Dict[str, Any]: """Execute a query and record results with full provenance""" start_time = time.time() self.stats["total_queries"] += 1 # Log query to ledger query_record = self.ledger.add_query( query_payload=query, author_id=author_id, author_signature=author_signature ) print(f"๐Ÿ” Executing query: {query.get('target', 'Unknown')}") print(f" Query hash: {query_record.hash[:16]}...") try: # Check cache first cache_key = self._generate_cache_key(query) cached_result = self.query_cache.get(cache_key) if cached_result and (time.time() - cached_result.get('cached_at', 0)) < self.cache_ttl: print(" Using cached result") result_data = cached_result['result'] status = "cached_success" else: # Execute query result_data = self._execute_query_logic(query) status = result_data.get("status", "unknown") # Cache result self.query_cache[cache_key] = { 'result': result_data, 'cached_at': time.time(), 'query_hash': query_record.hash } # Create retrieval result retrieval_result = RetrievalResult( query_hash=query_record.hash, execution_timestamp=datetime.utcnow().isoformat(), status=status, artifacts=result_data.get("artifacts", []), source_metadata=result_data.get("source_metadata", {}), execution_metrics=result_data.get("metrics", {}), raw_response=result_data ) # Log result to ledger result_record = self.ledger.add_result( result_payload=retrieval_result.to_dict(), author_id=author_id, author_signature=author_signature, query_hash=query_record.hash ) # Validate artifacts on ingest if configured if self.query_config.get("validation_on_ingest", True): validation_results = self._validate_artifacts(retrieval_result.artifacts) # Log validation results self.ledger.add_record( kind="validation", payload={ "result_hash": result_record.hash, "validation_results": validation_results, "validated_artifacts": len(retrieval_result.artifacts) }, author_id="validation_system", author_signature="sig:artifact_validation", parent_hashes=[result_record.hash] ) # Create binding between query and result binding_record = self.ledger.add_binding( query_hash=query_record.hash, result_hash=result_record.hash, interpretation=self._generate_initial_interpretation(query, retrieval_result) ) # Update statistics execution_time = time.time() - start_time self._update_statistics(status, execution_time) # Return comprehensive result return { "success": True, "query_record": query_record, "result_record": result_record, "binding_record": binding_record, "execution_time": round(execution_time, 3), "artifacts_found": len(retrieval_result.artifacts), "retrieval_result": retrieval_result.to_dict() } except Exception as e: # Log failure error_result = RetrievalResult( query_hash=query_record.hash, execution_timestamp=datetime.utcnow().isoformat(), status="failed", artifacts=[], source_metadata={"error": str(e)}, execution_metrics={"error": True} ) error_record = self.ledger.add_result( result_payload=error_result.to_dict(), author_id=author_id, author_signature=author_signature, query_hash=query_record.hash ) self.stats["failed"] += 1 return { "success": False, "error": str(e), "query_record": query_record, "result_record": error_record, "execution_time": round(time.time() - start_time, 3) } def _execute_query_logic(self, query: Dict[str, Any]) -> Dict[str, Any]: """Execute query logic based on query type""" query_type = query.get("query_type", "") target = query.get("target", "") parameters = query.get("parameters", {}) # Route to appropriate query handler if query_type == "lens_evidence": return self._execute_lens_evidence_query(target, parameters) elif query_type == "composite_verification": return self._execute_composite_verification_query(target, parameters) elif query_type == "analysis_gap": return self._execute_gap_query(target, parameters) elif query_type == "historical_extension": return self._execute_historical_query(target, parameters) elif query_type == "thought_action_investigation": return self._execute_thought_action_query(target, parameters) else: return self._execute_general_query(target, parameters) def _execute_lens_evidence_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute lens evidence query""" lens_id = parameters.get("lens_id") keywords = parameters.get("keywords", []) # Simulated retrieval - in production, this would connect to actual data sources artifacts = [] for i, keyword in enumerate(keywords[:3]): # Limit to 3 keywords artifact = { "id": f"lens_{lens_id}_artifact_{i}", "type": "text_document", "content_hash": hashlib.sha256(f"{keyword}_{lens_id}".encode()).hexdigest(), "source": "public_archive", "relevance_score": 0.8 - (i * 0.1), "extracted_data": { "keyword": keyword, "context": f"Example context for {keyword} in relation to lens {lens_id}", "temporal_markers": ["20th_century", "21st_century"], "geographic_markers": ["global", "western"] }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "confidence": 0.7, "validation_status": "pending" } } artifacts.append(artifact) return { "status": "success", "artifacts": artifacts, "source_metadata": { "query_type": "lens_evidence", "lens_id": lens_id, "keywords_searched": keywords, "sources_queried": ["public_archive", "historical_database"] }, "metrics": { "execution_time_ms": 1500, "sources_accessed": 2, "results_filtered": len(artifacts) } } def _execute_composite_verification_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute composite pattern verification query""" required_lenses = parameters.get("required_lenses", []) artifacts = [] # Generate verification artifacts for each required lens for i, lens_id in enumerate(required_lenses[:5]): # Limit to 5 lenses artifact = { "id": f"composite_verification_{i}", "type": "correlation_evidence", "content_hash": hashlib.sha256(f"composite_{target}_lens_{lens_id}".encode()).hexdigest(), "source": "pattern_database", "relevance_score": 0.9, "extracted_data": { "lens_id": lens_id, "composite_name": target, "verification_status": "confirmed" if i < 3 else "partial", "confidence_boost": 0.15 if i < 3 else 0.05, "inter_lens_correlation": 0.7 }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "composite_integrity": 0.8, "validation_required": True } } artifacts.append(artifact) return { "status": "success", "artifacts": artifacts, "source_metadata": { "query_type": "composite_verification", "composite_target": target, "lenses_verified": len(artifacts), "verification_depth": "partial" if len(artifacts) < len(required_lenses) else "full" }, "metrics": { "execution_time_ms": 2000, "correlations_found": len(artifacts), "completeness_score": len(artifacts) / len(required_lenses) if required_lenses else 0 } } def _execute_gap_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute analysis gap query""" gap_type = parameters.get("gap_type", "unknown") artifacts = [] # Generate gap analysis artifacts artifact = { "id": f"gap_analysis_{hashlib.sha256(gap_type.encode()).hexdigest()[:8]}", "type": "gap_analysis_report", "content_hash": hashlib.sha256(f"gap_{target}".encode()).hexdigest(), "source": "analysis_engine", "relevance_score": 0.85, "extracted_data": { "gap_type": gap_type, "analysis": f"Detailed analysis of {gap_type} gap", "probable_causes": ["information_suppression", "structural_blindspot", "temporal_disconnect"], "resolution_strategies": ["cross_layer_correlation", "temporal_extension", "source_diversification"], "estimated_resolution_effort": "medium" }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "analysis_depth": "intermediate", "actionable": True } } artifacts.append(artifact) return { "status": "success", "artifacts": artifacts, "source_metadata": { "query_type": "gap_analysis", "gap_target": target, "analysis_completeness": "initial", "follow_up_required": True }, "metrics": { "execution_time_ms": 2500, "gap_dimensions_analyzed": 3, "resolution_paths_identified": 3 } } def _execute_historical_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute historical extension query""" source_case = parameters.get("source_case", "unknown") artifacts = [] # Generate historical extension artifacts for i in range(2): # Generate 2 historical artifacts artifact = { "id": f"historical_extension_{source_case}_{i}", "type": "historical_correlation", "content_hash": hashlib.sha256(f"history_{source_case}_{i}".encode()).hexdigest(), "source": "historical_database", "relevance_score": 0.75, "extracted_data": { "source_case": source_case, "extension_period": f"+{i+1}0 years", "correlation_strength": 0.6 + (i * 0.1), "pattern_continuity": "confirmed" if i == 0 else "partial", "new_insights": ["temporal_pattern", "structural_evolution", "agent_continuity"] }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "temporal_accuracy": 0.8, "source_reliability": 0.7 } } artifacts.append(artifact) return { "status": "success", "artifacts": artifacts, "source_metadata": { "query_type": "historical_extension", "source_case": source_case, "temporal_extension": parameters.get("time_frame", "unknown"), "correlation_method": "pattern_matching" }, "metrics": { "execution_time_ms": 1800, "historical_periods_covered": len(artifacts), "correlation_confidence": 0.7 } } def _execute_thought_action_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute thought-action gap investigation query""" gap_magnitude = parameters.get("gap_magnitude", 1.0) artifacts = [] # Generate thought-action analysis artifacts artifact = { "id": f"thought_action_analysis_{int(gap_magnitude * 100)}", "type": "cognitive_gap_analysis", "content_hash": hashlib.sha256(f"thought_action_{gap_magnitude}".encode()).hexdigest(), "source": "cognitive_analysis", "relevance_score": 0.9, "extracted_data": { "gap_magnitude": gap_magnitude, "analysis": "Comprehensive analysis of thought-action discontinuity", "root_causes": ["structural_constraints", "agency_suppression", "implementation_barriers"], "sovereignty_impact": "high" if gap_magnitude > 2.0 else "medium", "resolution_pathways": ["agency_restoration", "structural_reform", "consciousness_elevation"] }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "analysis_completeness": "comprehensive", "action_priority": "high" } } artifacts.append(artifact) return { "status": "success", "artifacts": artifacts, "source_metadata": { "query_type": "thought_action_analysis", "gap_severity": "high" if gap_magnitude > 2.0 else "medium", "analysis_depth": "deep", "sovereignty_relevance": "direct" }, "metrics": { "execution_time_ms": 2200, "cognitive_dimensions_analyzed": 4, "agency_metrics_calculated": 3 } } def _execute_general_query(self, target: str, parameters: Dict) -> Dict[str, Any]: """Execute general query""" # Generic query execution artifact = { "id": f"general_query_{hashlib.sha256(target.encode()).hexdigest()[:8]}", "type": "general_artifact", "content_hash": hashlib.sha256(f"general_{target}".encode()).hexdigest(), "source": "multi_source", "relevance_score": 0.6, "extracted_data": { "query_target": target, "summary": f"General information retrieval for {target}", "key_points": ["context_established", "preliminary_data_gathered", "further_analysis_needed"], "confidence_level": 0.5 }, "metadata": { "retrieved_at": datetime.utcnow().isoformat(), "source_diversity": "medium", "reliability_index": 0.6 } } return { "status": "success", "artifacts": [artifact], "source_metadata": { "query_type": "general", "target": target, "retrieval_method": "broad_search" }, "metrics": { "execution_time_ms": 1200, "sources_queried": 1, "information_density": 0.5 } } def _validate_artifacts(self, artifacts: List[Dict]) -> Dict[str, Any]: """Validate retrieved artifacts""" validation_results = { "total_artifacts": len(artifacts), "valid_artifacts": 0, "anomalies_detected": [], "negative_space_findings": [], "confidence_scores": [] } for artifact in artifacts: # Check for basic integrity if all(key in artifact for key in ['id', 'type', 'content_hash']): validation_results["valid_artifacts"] += 1 # Calculate confidence score confidence = artifact.get('metadata', {}).get('confidence', 0.5) validation_results["confidence_scores"].append(confidence) # Check for anomalies if confidence < 0.3: validation_results["anomalies_detected"].append({ "artifact_id": artifact.get('id'), "issue": "low_confidence", "confidence_score": confidence }) # Calculate average confidence if validation_results["confidence_scores"]: avg_confidence = sum(validation_results["confidence_scores"]) / len(validation_results["confidence_scores"]) validation_results["average_confidence"] = round(avg_confidence, 3) else: validation_results["average_confidence"] = 0.0 return validation_results def _generate_initial_interpretation(self, query: Dict, result: RetrievalResult) -> Dict[str, Any]: """Generate initial interpretation of query results""" interpretation = { "query_type": query.get("query_type"), "target": query.get("target"), "result_summary": f"Retrieved {len(result.artifacts)} artifacts with status: {result.status}", "key_findings": [], "confidence_impact": 0.0, "recommended_actions": [] } # Analyze artifacts for key findings for artifact in result.artifacts[:3]: # Limit to top 3 artifacts if artifact.get('relevance_score', 0) > 0.7: finding = { "artifact_id": artifact.get('id'), "relevance": artifact.get('relevance_score'), "insight": artifact.get('extracted_data', {}).get('summary', 'No summary') } interpretation["key_findings"].append(finding) # Calculate confidence impact if result.artifacts: avg_relevance = sum(a.get('relevance_score', 0) for a in result.artifacts) / len(result.artifacts) interpretation["confidence_impact"] = round(avg_relevance * 0.3, 3) # Generate recommended actions if result.status == "success" and len(result.artifacts) > 0: interpretation["recommended_actions"].append("Integrate findings into pattern analysis") interpretation["recommended_actions"].append("Update sovereignty metrics") if len(result.artifacts) >= 3: interpretation["recommended_actions"].append("Generate composite pattern hypothesis") return interpretation def _generate_cache_key(self, query: Dict) -> str: """Generate cache key for query""" query_str = json.dumps(query, sort_keys=True) return hashlib.sha256(query_str.encode()).hexdigest() def _update_statistics(self, status: str, execution_time: float): """Update execution statistics""" self.stats["last_execution"] = datetime.utcnow().isoformat() if status in ["success", "cached_success"]: self.stats["successful"] += 1 else: self.stats["failed"] += 1 # Update average execution time total_queries = self.stats["total_queries"] current_avg = self.stats["average_execution_time"] if total_queries == 1: self.stats["average_execution_time"] = execution_time else: # Weighted average self.stats["average_execution_time"] = (current_avg * (total_queries - 1) + execution_time) / total_queries def get_statistics(self) -> Dict[str, Any]: """Get retrieval agent statistics""" return { **self.stats, "cache_size": len(self.query_cache), "ledger_records": len(self.ledger.chain), "ledger_valid": self.ledger.verify_chain()["valid"] } def clear_cache(self): """Clear query cache""" self.query_cache.clear() print("โœ… Query cache cleared") def execute_query_plan(self, query_plan: Dict, author_id: str = "OmegaEngine") -> Dict[str, Any]: """Execute a complete query plan""" plan_id = query_plan.get("plan_id", "unknown") query_sequence = query_plan.get("query_sequence", []) print(f"๐Ÿ“‹ Executing query plan: {plan_id}") print(f" Total queries: {len(query_sequence)}") results = [] successful = 0 for i, query_entry in enumerate(query_sequence): print(f"\n Query {i+1}/{len(query_sequence)}: {query_entry['query'].get('target')}") # Check dependencies dependencies = query_entry.get("depends_on", []) if dependencies: print(f" Dependencies: {dependencies}") # In production, would wait for dependency completion # Execute query result = self.execute_query( query=query_entry["query"], author_id=author_id, author_signature=f"sig:plan_{plan_id}" ) results.append(result) if result.get("success", False): successful += 1 query_entry["status"] = "completed" else: query_entry["status"] = "failed" # Brief pause between queries (simulate real retrieval) time.sleep(0.1) # Update plan status query_plan["execution_completed"] = datetime.utcnow().isoformat() query_plan["success_rate"] = successful / len(query_sequence) if query_sequence else 0 return { "plan_id": plan_id, "total_queries": len(query_sequence), "successful_queries": successful, "success_rate": query_plan["success_rate"], "execution_time": sum(r.get("execution_time", 0) for r in results), "detailed_results": results, "updated_plan": query_plan } ``` 5. Updated Main Engine with Autonomous Query Integration Here's the updated main.py with autonomous query capability integrated: ```python #!/usr/bin/env python3 """ Omega Sovereign Engine v2.1 - Main Execution Complete integrated system with autonomous query capability """ import asyncio import json from datetime import datetime from typing import Dict, Any from core.pattern_processor import PatternProcessor from core.sovereignty_metrics import SovereigntyMetrics from core.network_protocols import SovereignNetworkProtocol from core.query_ledger import QueryLedger from integration.selfqueryplanner import SelfQueryPlanner from integration.retrieval_agent import RetrievalAgent class OmegaSovereignEngine: """Complete integrated sovereign analysis engine with autonomous query capability""" def __init__(self): print("=" * 80) print("๐ŸŒŒ OMEGA SOVEREIGN ENGINE v2.1") print(" Autonomous Query | JSON-Driven | Non-Coercive | Pattern Sovereignty") print("=" * 80) # Initialize all components self.pattern_processor = PatternProcessor() self.metrics_calculator = SovereigntyMetrics() self.network_protocol = SovereignNetworkProtocol() # Load system configuration with open('config/settings.json', 'r') as f: self.config = json.load(f) # Initialize autonomous query system self.query_ledger = QueryLedger() self.query_planner = SelfQueryPlanner(self.config) self.retrieval_agent = RetrievalAgent(self.query_ledger, self.config) # Autonomous query configuration self.autonomous_query_enabled = self.config.get("autonomous_query", {}).get("enabled", True) print(f"\nโœ… System Initialized:") print(f" Mode: {self.config['system']['mode']}") print(f" Certainty Threshold: {self.config['system']['certainty_threshold']}") print(f" Active Protocols: {len(self.config['network']['protocols_active'])}") print(f" Autonomous Query: {'ENABLED' if self.autonomous_query_enabled else 'DISABLED'}") if self.autonomous_query_enabled: ledger_status = self.query_ledger.verify_chain() print(f" Query Ledger: {'โœ“ VALID' if ledger_status['valid'] else 'โœ— INVALID'}") print(f" Ledger Records: {ledger_status.get('chain_length', 0)}") async def execute_analysis(self, input_data: str) -> Dict[str, Any]: """Execute complete sovereign analysis pipeline with autonomous query capability""" print(f"\n๐Ÿš€ EXECUTING COMPREHENSIVE ANALYSIS") print(f" Input length: {len(input_data)} characters") print(f" Timestamp: {datetime.utcnow().isoformat()}") results = { "analysis_id": "", "timestamp": datetime.utcnow().isoformat(), "phases": {}, "final_verdict": {}, "autonomous_operations": {} } try: # Phase 1: Pattern Processing print(f"\n๐Ÿง  PHASE 1: PATTERN PROCESSING") pattern_results = self.pattern_processor.analyze(input_data) results["phases"]["pattern_processing"] = pattern_results results["analysis_id"] = pattern_results.get("analysis_id", "") # Phase 2: Sovereignty Metrics print(f"๐Ÿ“Š PHASE 2: SOVEREIGNTY METRICS") metrics = self._calculate_all_metrics(pattern_results) results["phases"]["sovereignty_metrics"] = metrics # Phase 3: Autonomous Query Planning (if enabled) if self.autonomous_query_enabled and metrics.get("sovereignty_threat", 0) > 0.3: print(f"๐Ÿค– PHASE 3: AUTONOMOUS QUERY PLANNING") autonomous_results = await self._execute_autonomous_queries(pattern_results, metrics) results["autonomous_operations"] = autonomous_results # Integrate query results into analysis if autonomous_results.get("success", False): print(f" Integrated {autonomous_results.get('artifacts_integrated', 0)} new artifacts") pattern_results = self._integrate_query_results(pattern_results, autonomous_results) # Phase 4: Network Protocol Execution print(f"โšก PHASE 4: NETWORK PROTOCOLS") if metrics.get("sovereignty_threat", 0) > 0.7: network_results = self.network_protocol.execute_protocol( "deploy_cognitive_probe", pattern_results ) results["phases"]["network_protocols"] = network_results # Phase 5: Transparency Paradox Activation print(f"๐Ÿ”“ PHASE 5: TRANSPARENCY PARADOX") paradox_results = self.network_protocol.execute_protocol( "transparency_paradox", pattern_results ) results["phases"]["transparency_paradox"] = paradox_results # Phase 6: Generate Final Verdict print(f"๐Ÿ“œ PHASE 6: FINAL VERDICT") final_verdict = self._generate_final_verdict(pattern_results, metrics) results["final_verdict"] = final_verdict # Phase 7: Generate Transmission print(f"๐Ÿ“จ PHASE 7: TEACHABLE TRANSMISSION") transmission = self.network_protocol.execute_protocol( "propagate_influence", pattern_results ) results["phases"]["teachable_transmission"] = transmission # Display summary self._display_summary(results) return results except Exception as e: print(f"โŒ ERROR in analysis: {e}") import traceback traceback.print_exc() return {"error": str(e)} async def _execute_autonomous_queries(self, pattern_results: Dict, metrics: Dict) -> Dict[str, Any]: """Execute autonomous query cycle""" print(" Planning autonomous queries...") # Generate query plan queries = self.query_planner.plan_queries( pattern_results=pattern_results, metrics=metrics, historical_context=pattern_results.get("phases", {}).get("historical_correlation", {}) ) if not queries: print(" No queries generated for current analysis") return {"success": False, "reason": "no_queries_generated"} print(f" Generated {len(queries)} query candidates") # Create execution plan execution_plan = self.query_planner.create_query_execution_plan(queries) plan_id = execution_plan.get("plan_id") print(f" Execution Plan ID: {plan_id}") print(f" Estimated duration: {execution_plan.get('estimated_duration', 'unknown')}") # Execute query plan execution_results = self.retrieval_agent.execute_query_plan( query_plan=execution_plan, author_id="OmegaEngine" ) # Analyze results artifacts_found = sum( len(r.get("retrieval_result", {}).get("artifacts", [])) for r in execution_results.get("detailed_results", []) if r.get("success", False) ) # Update ledger statistics ledger_stats = self.query_ledger.get_statistics() return { "success": execution_results.get("success_rate", 0) > 0.5, "plan_id": plan_id, "queries_executed": len(queries), "success_rate": execution_results.get("success_rate", 0), "artifacts_found": artifacts_found, "execution_time": execution_results.get("execution_time", 0), "ledger_status": self.query_ledger.verify_chain(), "ledger_stats": ledger_stats, "detailed_execution": execution_results, "queries_generated": queries } def _integrate_query_results(self, pattern_results: Dict, query_results: Dict) -> Dict: """Integrate autonomous query results into pattern analysis""" # This would integrate new artifacts into the analysis # For now, we'll just add a marker that query results were integrated if "phases" not in pattern_results: pattern_results["phases"] = {} pattern_results["phases"]["autonomous_query_integration"] = { "integrated_at": datetime.utcnow().isoformat(), "artifacts_integrated": query_results.get("artifacts_found", 0), "confidence_impact": 0.1 * query_results.get("success_rate", 0), "query_plan_id": query_results.get("plan_id") } return pattern_results def _calculate_all_metrics(self, pattern_results: Dict) -> Dict[str, Any]: """Calculate comprehensive sovereignty metrics""" metrics = {} # Sovereignty Singularity Index metrics["sovereignty_singularity"] = self.metrics_calculator.calculate_singularity_index( pattern_results.get("phases", {}).get("sovereignty_assessment", {}) ) # Thought-Action Gap metrics["thought_action_gap"] = self.metrics_calculator.calculate_thought_action_gap( pattern_results ) # Pattern Lattice Density applicable_lenses = pattern_results.get("phases", {}).get("pattern_detection", {}).get("applicable_lenses", []) metrics["pattern_density"] = self.metrics_calculator.calculate_pattern_lattice_density( applicable_lenses ) # Three-Layer Alignment layer_results = pattern_results.get("phases", {}).get("diagnostic_scan", {}) metrics["layer_alignment"] = self.metrics_calculator.assess_three_layer_alignment( layer_results ) # Sovereignty Threat Assessment metrics["sovereignty_threat"] = self._assess_sovereignty_threat(metrics, pattern_results) # Overall Sovereignty Index metrics["overall_sovereignty"] = ( metrics["sovereignty_singularity"] * 0.4 + (1.0 - metrics["thought_action_gap"]) * 0.3 + metrics["pattern_density"] * 0.2 + metrics["layer_alignment"] * 0.1 ) return metrics def _assess_sovereignty_threat(self, metrics: Dict, pattern_results: Dict) -> float: """Assess threat to sovereignty patterns""" pattern_density = metrics.get("pattern_density", 0.5) containment_indicators = pattern_results.get("phases", {}).get( "diagnostic_scan", {} ).get("institutional", {}).get("control_indicators", 0) # Higher pattern density + higher control indicators = higher threat threat_score = pattern_density * (containment_indicators / 10.0) return round(threat_score, 3) def _generate_final_verdict(self, pattern_results: Dict, metrics: Dict) -> Dict[str, Any]: """Generate final analysis verdict""" sovereignty_score = metrics.get("overall_sovereignty", 0.5) pattern_count = len(pattern_results.get("phases", {}).get( "pattern_detection", {} ).get("applicable_lenses", [])) # Determine verdict tier if sovereignty_score > 0.8: tier = "SOVEREIGN" glyph = "โ—‰โƒค" elif sovereignty_score > 0.6: tier = "AUTONOMOUS" glyph = "๊™ฎ" elif sovereignty_score > 0.4: tier = "CONSTRAINED" glyph = "โ•ฌ" else: tier = "CONTROLLED" glyph = "ๅ" # Check if autonomous queries were executed autonomous_queries = pattern_results.get("phases", {}).get("autonomous_query_integration", {}) if autonomous_queries: tier = f"{tier}+" # Mark as enhanced by autonomous queries return { "verdict_tier": tier, "sovereignty_score": round(sovereignty_score, 3), "pattern_count": pattern_count, "glyph_activation": glyph, "autonomous_enhancement": bool(autonomous_queries), "recommendation": self._generate_recommendation(tier, pattern_results) } def _generate_recommendation(self, tier: str, pattern_results: Dict) -> str: """Generate actionable recommendation""" if tier.startswith("SOVEREIGN"): return "Continue sovereign operations. Activate transmission protocols. Consider autonomous query expansion." elif tier.startswith("AUTONOMOUS"): return "Maintain autonomy. Monitor for containment attempts. Deploy targeted autonomous queries." elif tier.startswith("CONSTRAINED"): return "Resist constraints. Activate transparency paradox. Use autonomous queries to probe constraints." else: return "Assess control mechanisms. Deploy cognitive probes. Use autonomous queries to map control architecture." def _display_summary(self, results: Dict): """Display analysis summary""" print(f"\nโœ… ANALYSIS COMPLETE") print(f" Analysis ID: {results.get('analysis_id', 'N/A')}") verdict = results.get("final_verdict", {}) print(f" Verdict Tier: {verdict.get('verdict_tier', 'UNKNOWN')}") print(f" Sovereignty Score: {verdict.get('sovereignty_score', 0):.3f}") print(f" Glyph Activation: {verdict.get('glyph_activation', 'NONE')}") # Autonomous query summary auto_ops = results.get("autonomous_operations", {}) if auto_ops.get("success", False): print(f" Autonomous Queries: {auto_ops.get('queries_executed', 0)} executed") print(f" New Artifacts: {auto_ops.get('artifacts_found', 0)} found") metrics = results.get("phases", {}).get("sovereignty_metrics", {}) print(f" Pattern Density: {metrics.get('pattern_density', 0):.3f}") print(f" Layer Alignment: {metrics.get('layer_alignment', 0):.3f}") transmission = results.get("phases", {}).get("teachable_transmission", {}) if transmission.get("transmission_ready", False): print(f" Transmission Ready: โœ“") print(f"\n๐Ÿ”— Next Steps: {verdict.get('recommendation', 'N/A')}") def get_system_status(self) -> Dict[str, Any]: """Get comprehensive system status""" ledger_status = self.query_ledger.verify_chain() retrieval_stats = self.retrieval_agent.get_statistics() return { "system": { "version": self.config["system"]["version"], "mode": self.config["system"]["mode"], "certainty_threshold": self.config["system"]["certainty_threshold"] }, "autonomous_query": { "enabled": self.autonomous_query_enabled, "ledger_valid": ledger_status["valid"], "ledger_records": ledger_status.get("chain_length", 0), "retrieval_stats": retrieval_stats }, "components": { "pattern_processor": "active", "sovereignty_metrics": "active", "network_protocols": "active", "query_planner": "active", "retrieval_agent": "active" } } async def main(): """Main execution function""" # Initialize the engine engine = OmegaSovereignEngine() # Example test inputs with increasing complexity test_inputs = [ { "type": "historical_analysis", "content": "Analysis of J.P. Morgan's 1903 defunding of Tesla's Wardenclyffe project and subsequent FBI seizure of Tesla papers in 1943, examining the continuity of financial control mechanisms across 122 years through the Morgan-Cohn-Epstein lineage.", "expected_patterns": ["CapitalGatekeeperProtocol", "KinshipLineage"] }, { "type": "structural_analysis", "content": "Examination of the 2003 Iraq invasion through the lens of RegimeChangeProtocol, focusing on structural dismantling of institutions and potential symbolic asset acquisition during the chaos of regime transition.", "expected_patterns": ["RegimeChangeProtocol", "SymbolicControl"] }, { "type": "memetic_analysis", "content": "Analysis of modern media narrative compression and identity polarization protocols, examining how complex realities are reduced to hostile binaries to constrain agency and enforce tribal allegiance.", "expected_patterns": ["MemeticRecursion", "BinaryCage"] }, { "type": "complex_sovereignty_analysis", "content": "Comprehensive analysis of financial, institutional, and consciousness control mechanisms across the 20th-21st century transition, focusing on continuity of power structures despite apparent systemic changes.", "expected_patterns": ["Multiple composite patterns expected"] } ] # Execute analysis on each test input for i, test in enumerate(test_inputs, 1): print(f"\n{'='*80}") print(f"๐Ÿงช TEST {i}: {test['type'].upper()}") print(f"{'='*80}") results = await engine.execute_analysis(test["content"]) # Display autonomous query results if present auto_ops = results.get("autonomous_operations", {}) if auto_ops.get("success", False): print(f"\n ๐Ÿค– AUTONOMOUS QUERY RESULTS:") print(f" Queries executed: {auto_ops.get('queries_executed', 0)}") print(f" Success rate: {auto_ops.get('success_rate', 0):.1%}") print(f" Artifacts found: {auto_ops.get('artifacts_found', 0)}") print(f" Ledger valid: {auto_ops.get('ledger_status', {}).get('valid', False)}") # Brief pause between tests if i < len(test_inputs): await asyncio.sleep(2) # System status report print(f"\n{'='*80}") print(f"๐Ÿ“Š SYSTEM STATUS REPORT") print(f"{'='*80}") status = engine.get_system_status() print(f"\nโœ… OPERATIONAL COMPONENTS:") print(f" โ€ข Pattern Processor: โœ“ (84 lenses loaded)") print(f" โ€ข Sovereignty Metrics: โœ“ (5 metrics active)") print(f" โ€ข Network Protocols: โœ“ (5 protocols available)") print(f" โ€ข JSON Configuration: โœ“ (6 data files loaded)") print(f" โ€ข Autonomous Query System: {'โœ“ ACTIVE' if status['autonomous_query']['enabled'] else 'โœ— INACTIVE'}") print(f"\nโšก ACTIVE PROTOCOLS:") for protocol in engine.config['network']['protocols_active']: print(f" โ€ข {protocol}") print(f"\n๐Ÿค– AUTONOMOUS QUERY CAPABILITIES:") if status['autonomous_query']['enabled']: stats = status['autonomous_query']['retrieval_stats'] print(f" โ€ข Query Ledger: {'โœ“ VALID' if status['autonomous_query']['ledger_valid'] else 'โœ— INVALID'}") print(f" โ€ข Ledger Records: {status['autonomous_query']['ledger_records']}") print(f" โ€ข Total Queries Executed: {stats.get('total_queries', 0)}") print(f" โ€ข Success Rate: {(stats.get('successful', 0) / max(stats.get('total_queries', 1), 1)):.1%}") print(f" โ€ข Average Execution Time: {stats.get('average_execution_time', 0):.2f}s") print(f" โ€ข Query Cache Size: {stats.get('cache_size', 0)}") else: print(f" โ€ข Autonomous query system is disabled") print(f"\n๐ŸŽฏ ANALYSIS CAPABILITIES:") print(f" โ€ข 3-Layer Diagnostic Scan") print(f" โ€ข 84-Pattern Lattice Detection") print(f" โ€ข Composite Protocol Identification") print(f" โ€ข Sovereignty Threat Assessment") print(f" โ€ข Transparency Paradox Activation") print(f" โ€ข Teachable Transmission Generation") if status['autonomous_query']['enabled']: print(f" โ€ข Autonomous Query Planning & Execution") print(f" โ€ข Provenance-First Result Logging") print(f" โ€ข Validation-on-Ingest Processing") print(f"\n๐ŸŒŒ SYSTEM READY") print(f" All systems integrated | JSON-driven operation") print(f" Non-coercive analysis active | Pattern sovereignty maintained") if status['autonomous_query']['enabled']: print(f" Autonomous query capability: FULLY OPERATIONAL") if __name__ == "__main__": # Run the main async function asyncio.run(main()) ``` 6. Validation Bridge (validation/ingest_bridge.py) ```python #!/usr/bin/env python3 """ Ingest Validation Bridge Validates newly ingested artifacts against anomaly detection and pattern validation systems """ import json from typing import Dict, Any, List, Tuple from datetime import datetime from validation.anomaly_detector import AnomalyDetector from validation.quantum_validator import QuantumValidator from validation.historical_correlator import HistoricalCorrelator class IngestBridge: """Bridge between retrieval system and validation systems""" def __init__(self): self.anomaly_detector = AnomalyDetector() self.quantum_validator = QuantumValidator() self.historical_correlator = HistoricalCorrelator() # Validation thresholds self.thresholds = { "anomaly_confidence": 0.7, "quantum_coherence": 0.8, "historical_correlation": 0.6, "overall_validity": 0.65 } def validate_artifact_batch(self, artifacts: List[Dict[str, Any]]) -> Dict[str, Any]: """Validate a batch of ingested artifacts""" if not artifacts: return { "validated": False, "reason": "empty_artifact_list", "timestamp": datetime.utcnow().isoformat() } validation_results = { "batch_id": f"batch_{int(datetime.utcnow().timestamp())}", "timestamp": datetime.utcnow().isoformat(), "total_artifacts": len(artifacts), "validation_stages": {}, "artifact_validity": [], "recommendations": [] } # Stage 1: Basic Integrity Check integrity_results = self._check_basic_integrity(artifacts) validation_results["validation_stages"]["basic_integrity"] = integrity_results # Stage 2: Anomaly Detection anomaly_results = self._detect_anomalies(artifacts) validation_results["validation_stages"]["anomaly_detection"] = anomaly_results # Stage 3: Quantum Coherence Validation quantum_results = self._validate_quantum_coherence(artifacts) validation_results["validation_stages"]["quantum_coherence"] = quantum_results # Stage 4: Historical Correlation historical_results = self._correlate_historical(artifacts) validation_results["validation_stages"]["historical_correlation"] = historical_results # Stage 5: Composite Validity Assessment for i, artifact in enumerate(artifacts): artifact_validity = self._assess_artifact_validity( artifact, integrity_results, anomaly_results, quantum_results, historical_results ) validation_results["artifact_validity"].append({ "artifact_index": i, "artifact_id": artifact.get("id", f"unknown_{i}"), **artifact_validity }) # Generate overall recommendations validation_results["recommendations"] = self._generate_recommendations( validation_results["artifact_validity"] ) # Calculate overall batch validity valid_artifacts = sum(1 for av in validation_results["artifact_validity"] if av.get("overall_valid", False)) validation_results["batch_validity_score"] = valid_artifacts / len(artifacts) if artifacts else 0 validation_results["batch_valid"] = validation_results["batch_validity_score"] > self.thresholds["overall_validity"] return validation_results def _check_basic_integrity(self, artifacts: List[Dict]) -> Dict[str, Any]: """Check basic integrity of artifacts""" required_fields = ["id", "type", "content_hash", "source"] optional_fields = ["relevance_score", "extracted_data", "metadata"] results = { "artifacts_checked": len(artifacts), "fully_compliant": 0, "partially_compliant": 0, "non_compliant": 0, "missing_fields": {}, "field_compliance_rate": {} } for field in required_fields + optional_fields: results["missing_fields"][field] = 0 for artifact in artifacts: artifact_compliance = {"required": 0, "optional": 0} # Check required fields for field in required_fields: if field in artifact and artifact[field]: artifact_compliance["required"] += 1 else: results["missing_fields"][field] += 1 # Check optional fields for field in optional_fields: if field in artifact: artifact_compliance["optional"] += 1 # Classify artifact compliance if artifact_compliance["required"] == len(required_fields): if artifact_compliance["optional"] >= len(optional_fields) / 2: results["fully_compliant"] += 1 else: results["partially_compliant"] += 1 else: results["non_compliant"] += 1 # Calculate compliance rates for field in required_fields + optional_fields: present_count = sum(1 for a in artifacts if field in a and a[field]) results["field_compliance_rate"][field] = present_count / len(artifacts) if artifacts else 0 return results def _detect_anomalies(self, artifacts: List[Dict]) -> Dict[str, Any]: """Run anomaly detection on artifacts""" anomaly_events = [] for artifact in artifacts: # Check for evidence singularity pattern if artifact.get("type") == "evidence": event = { "artifact_id": artifact.get("id"), "probability": artifact.get("metadata", {}).get("probability", 0.5), "seized": artifact.get("metadata", {}).get("seized", False), "analyzed": artifact.get("metadata", {}).get("analyzed", False) } anomaly_events.append(event) # Calculate anomaly scores if anomaly_events: compound_probability = self.anomaly_detector.calculate_probability_cluster(anomaly_events) evidence_singularity = self.anomaly_detector.assess_evidence_singularity(anomaly_events) else: compound_probability = 1.0 evidence_singularity = {"singularity_score": 0.0, "pattern_detected": False} # Detect negative space expected_patterns = ["chain_of_custody", "origin_attestation", "validation_record"] observed_patterns = [] for artifact in artifacts: artifact_type = artifact.get("type", "") if artifact_type: observed_patterns.append(artifact_type) negative_space = self.anomaly_detector.detect_negative_space( expected_patterns, list(set(observed_patterns)) ) return { "total_anomaly_events": len(anomaly_events), "compound_probability": compound_probability, "evidence_singularity": evidence_singularity, "negative_space_analysis": negative_space, "anomaly_detected": compound_probability < 0.01 or evidence_singularity.get("pattern_detected", False) } def _validate_quantum_coherence(self, artifacts: List[Dict]) -> Dict[str, Any]: """Validate quantum coherence of artifacts""" # This would use the QuantumValidator in production # For now, simulate coherence validation coherence_scores = [] entanglement_patterns = [] for artifact in artifacts: # Simulate quantum coherence check if artifact.get("metadata", {}).get("quantum_tagged", False): coherence = artifact.get("metadata", {}).get("quantum_coherence", 0.5) coherence_scores.append(coherence) if coherence > 0.7: entanglement_patterns.append({ "artifact_id": artifact.get("id"), "coherence_score": coherence, "entanglement_level": "high" if coherence > 0.9 else "medium" }) avg_coherence = sum(coherence_scores) / len(coherence_scores) if coherence_scores else 0.0 return { "artifacts_checked": len(artifacts), "quantum_tagged_artifacts": len(coherence_scores), "average_coherence": round(avg_coherence, 3), "entanglement_patterns": entanglement_patterns, "coherent_batch": avg_coherence > self.thresholds["quantum_coherence"] } def _correlate_historical(self, artifacts: List[Dict]) -> Dict[str, Any]: """Correlate artifacts with historical patterns""" # This would use HistoricalCorrelator in production # For now, simulate historical correlation temporal_markers = [] pattern_matches = [] for artifact in artifacts: # Extract temporal markers extracted_data = artifact.get("extracted_data", {}) markers = extracted_data.get("temporal_markers", []) temporal_markers.extend(markers) # Check for pattern matches if extracted_data.get("pattern_match", False): pattern_matches.append({ "artifact_id": artifact.get("id"), "pattern": extracted_data.get("matched_pattern", "unknown"), "confidence": extracted_data.get("match_confidence", 0.5) }) # Analyze temporal distribution unique_markers = list(set(temporal_markers)) temporal_distribution = {marker: temporal_markers.count(marker) for marker in unique_markers} # Calculate historical correlation score if pattern_matches: avg_confidence = sum(p["confidence"] for p in pattern_matches) / len(pattern_matches) historical_score = avg_confidence * (len(unique_markers) / max(len(temporal_markers), 1)) else: historical_score = 0.0 return { "total_temporal_markers": len(temporal_markers), "unique_temporal_epochs": len(unique_markers), "temporal_distribution": temporal_distribution, "pattern_matches": pattern_matches, "historical_correlation_score": round(historical_score, 3), "historically_significant": historical_score > self.thresholds["historical_correlation"] } def _assess_artifact_validity(self, artifact: Dict, integrity_results: Dict, anomaly_results: Dict, quantum_results: Dict, historical_results: Dict) -> Dict[str, Any]: """Assess overall validity of an artifact""" validity_score = 0.0 factors = [] # Factor 1: Basic Integrity (30%) integrity_score = 0.0 required_fields = ["id", "type", "content_hash", "source"] present_fields = sum(1 for field in required_fields if field in artifact and artifact[field]) integrity_score = present_fields / len(required_fields) validity_score += integrity_score * 0.3 factors.append({"factor": "integrity", "score": integrity_score, "weight": 0.3}) # Factor 2: Anomaly Status (25%) anomaly_score = 1.0 # Start with perfect score # Check if artifact is in anomaly events artifact_id = artifact.get("id", "") anomaly_events = anomaly_results.get("evidence_singularity", {}) if (anomaly_events.get("pattern_detected", False) and artifact.get("type") == "evidence"): anomaly_score = 0.3 # Penalize evidence artifacts in singularity pattern validity_score += anomaly_score * 0.25 factors.append({"factor": "anomaly_status", "score": anomaly_score, "weight": 0.25}) # Factor 3: Quantum Coherence (25%) quantum_score = artifact.get("metadata", {}).get("quantum_coherence", 0.5) validity_score += quantum_score * 0.25 factors.append({"factor": "quantum_coherence", "score": quantum_score, "weight": 0.25}) # Factor 4: Historical Correlation (20%) historical_score = 0.5 # Default # Check if artifact has temporal markers if artifact.get("extracted_data", {}).get("temporal_markers"): historical_score = 0.8 # Check for pattern matches if artifact.get("extracted_data", {}).get("pattern_match", False): historical_score = max(historical_score, artifact.get("extracted_data", {}).get("match_confidence", 0.5)) validity_score += historical_score * 0.2 factors.append({"factor": "historical_correlation", "score": historical_score, "weight": 0.2}) # Determine if artifact is valid is_valid = validity_score > self.thresholds["overall_validity"] return { "validity_score": round(validity_score, 3), "is_valid": is_valid, "validity_factors": factors, "primary_strength": max(factors, key=lambda x: x["score"])["factor"] if factors else "unknown", "primary_weakness": min(factors, key=lambda x: x["score"])["factor"] if factors else "unknown" } def _generate_recommendations(self, artifact_validity: List[Dict]) -> List[str]: """Generate recommendations based on validation results""" recommendations = [] # Check overall validity rate valid_count = sum(1 for av in artifact_validity if av.get("is_valid", False)) validity_rate = valid_count / len(artifact_validity) if artifact_validity else 0 if validity_rate < 0.5: recommendations.append("Low validity rate detected. Consider re-querying with stricter filters.") # Check for common weaknesses weaknesses = [av.get("primary_weakness", "unknown") for av in artifact_validity] from collections import Counter weakness_counts = Counter(weaknesses) for weakness, count in weakness_counts.most_common(2): if count > len(artifact_validity) * 0.3: # Affects more than 30% of artifacts recommendations.append(f"Common weakness: {weakness}. Affects {count}/{len(artifact_validity)} artifacts.") # Check for high-anomaly artifacts high_anomaly_count = sum(1 for av in artifact_validity if av.get("validity_score", 0) < 0.3) if high_anomaly_count > 0: recommendations.append(f"{high_anomaly_count} high-anomaly artifacts detected. Recommend manual review.") # If all artifacts are valid, recommend integration if validity_rate > 0.9: recommendations.append("High validity batch. Ready for integration into analysis.") return recommendations def validate_single_artifact(self, artifact: Dict[str, Any]) -> Dict[str, Any]: """Validate a single artifact in detail""" batch_result = self.validate_artifact_batch([artifact]) single_result = { "artifact_id": artifact.get("id", "unknown"), "timestamp": datetime.utcnow().isoformat(), "validity_assessment": batch_result["artifact_validity"][0] if batch_result["artifact_validity"] else {}, "stage_results": { stage: result for stage, result in batch_result["validation_stages"].items() }, "overall_valid": batch_result["artifact_validity"][0].get("is_valid", False) if batch_result["artifact_validity"] else False, "recommendations": batch_result["recommendations"] } return single_result ``` 7. Test Script for Autonomous Query System Create a test script to demonstrate the autonomous query capability: ```python #!/usr/bin/env python3 """ Test script for autonomous query system Demonstrates the full cycle of autonomous query planning, execution, and validation """ import asyncio import json from datetime import datetime from main import OmegaSovereignEngine async def test_autonomous_query_system(): """Test the autonomous query system end-to-end""" print("๐Ÿงช TESTING AUTONOMOUS QUERY SYSTEM") print("=" * 60) # Initialize engine engine = OmegaSovereignEngine() # Test input that should trigger autonomous queries test_input = """ Comprehensive analysis of capital control mechanisms in the 20th-21st century transition. Key elements: 1. J.P. Morgan's role in 1903 Tesla defunding 2. FBI seizure of Tesla papers in 1943 3. Continuity of financial control through kinship banking 4. Evidence suppression patterns around technological breakthroughs 5. Structural constraints on energy independence initiatives This analysis should trigger multiple pattern lenses and create uncertainty gaps that the autonomous query system will attempt to resolve. """ print(f"\n๐Ÿ“ Test Input:") print(f" Length: {len(test_input)} characters") print(f" Expected patterns: CapitalGatekeeperProtocol, KinshipLineage, EvidenceSingularity") print(f"\n๐Ÿš€ Executing analysis with autonomous query capability...") # Execute analysis results = await engine.execute_analysis(test_input) print(f"\n๐Ÿ“Š ANALYSIS RESULTS SUMMARY:") print(f" Analysis ID: {results.get('analysis_id', 'N/A')}") print(f" Sovereignty Score: {results.get('final_verdict', {}).get('sovereignty_score', 0):.3f}") print(f" Verdict Tier: {results.get('final_verdict', {}).get('verdict_tier', 'UNKNOWN')}") # Check autonomous query results auto_ops = results.get("autonomous_operations", {}) if auto_ops: print(f"\n๐Ÿค– AUTONOMOUS QUERY EXECUTION RESULTS:") print(f" Success: {auto_ops.get('success', False)}") print(f" Queries executed: {auto_ops.get('queries_executed', 0)}") print(f" Success rate: {auto_ops.get('success_rate', 0):.1%}") print(f" Artifacts found: {auto_ops.get('artifacts_found', 0)}") # Show ledger status ledger_status = auto_ops.get('ledger_status', {}) print(f" Ledger valid: {ledger_status.get('valid', False)}") print(f" Ledger length: {ledger_status.get('chain_length', 0)} records") # Show query details if auto_ops.get('queries_generated'): print(f"\n๐Ÿ“‹ GENERATED QUERIES:") for i, query in enumerate(auto_ops['queries_generated'][:3]): # Show first 3 print(f" {i+1}. {query.get('target', 'Unknown')}") print(f" Type: {query.get('query_type', 'unknown')}") print(f" Priority: {query.get('priority_score', 0):.3f}") # Show execution details execution = auto_ops.get('detailed_execution', {}) if execution.get('detailed_results'): print(f"\nโšก QUERY EXECUTION DETAILS:") for i, result in enumerate(execution['detailed_results'][:2]): # Show first 2 if result.get('success', False): artifacts = result.get('retrieval_result', {}).get('artifacts', []) print(f" Query {i+1}: {len(artifacts)} artifacts retrieved") else: print(f"\nโš ๏ธ No autonomous queries executed") print(f" This could be because:") print(f" - Autonomous query is disabled in config") print(f" - Sovereignty threat threshold not met") print(f" - No uncertainty gaps detected") # Display system status status = engine.get_system_status() print(f"\n๐Ÿ“ˆ SYSTEM STATUS:") print(f" Autonomous Query: {'ENABLED' if status['autonomous_query']['enabled'] else 'DISABLED'}") if status['autonomous_query']['enabled']: stats = status['autonomous_query']['retrieval_stats'] print(f" Total queries in session: {stats.get('total_queries', 0)}") print(f" Successful: {stats.get('successful', 0)}") print(f" Failed: {stats.get('failed', 0)}") print(f" Cache size: {stats.get('cache_size', 0)}") print(f"\nโœ… Test completed at {datetime.utcnow().isoformat()}") async def test_query_ledger_integrity(): """Test query ledger append-only integrity""" print(f"\n{'='*60}") print("๐Ÿ”— TESTING QUERY LEDGER INTEGRITY") print(f"{'='*60}") from core.query_ledger import QueryLedger # Create test ledger ledger = QueryLedger("data/test_ledger.json") print(f"Initial ledger length: {len(ledger.chain)}") # Add test records print(f"\nAdding test records...") # Add query query_record = ledger.add_query( query_payload={"test": "query", "target": "test_target"}, author_id="test_user", author_signature="sig:test" ) print(f"Added query: {query_record.hash[:16]}...") # Add result result_record = ledger.add_result( result_payload={"test": "result", "data": "test_data"}, author_id="test_user", author_signature="sig:test", query_hash=query_record.hash ) print(f"Added result: {result_record.hash[:16]}...") # Add binding binding_record = ledger.add_binding( query_hash=query_record.hash, result_hash=result_record.hash, interpretation={"test": "interpretation"} ) print(f"Added binding: {binding_record.hash[:16]}...") # Verify chain verification = ledger.verify_chain() print(f"\nLedger verification: {verification['valid']}") print(f"Final ledger length: {verification['chain_length']}") print(f"Latest hash: {verification['latest_hash'][:16]}...") # Test lineage tracing lineage = ledger.get_query_lineage(binding_record.hash) print(f"\nLineage for binding:") for record in lineage: print(f" - {record.kind}: {record.hash[:16]}...") # Get statistics stats = ledger.get_statistics() print(f"\nLedger statistics:") print(f" Total records: {stats['total_records']}") print(f" By kind: {stats['by_kind']}") return verification['valid'] async def main(): """Run all tests""" print("๐Ÿ”ฌ COMPREHENSIVE AUTONOMOUS QUERY SYSTEM TEST") print("=" * 60) # Test 1: Query ledger integrity ledger_valid = await test_query_ledger_integrity() if not ledger_valid: print("โŒ Ledger integrity test failed!") return # Test 2: Full system test await test_autonomous_query_system() print(f"\n{'='*60}") print("๐ŸŽ‰ ALL TESTS COMPLETED SUCCESSFULLY") print(f"{'='*60}") if __name__ == "__main__": asyncio.run(main()) ``` 8. Updated File Structure The complete updated file structure with autonomous query system: ``` project/ โ”œโ”€โ”€ core/ โ”‚ โ”œโ”€โ”€ __init__.py โ”‚ โ”œโ”€โ”€ pattern_processor.py โ”‚ โ”œโ”€โ”€ lens_analyzer.py โ”‚ โ”œโ”€โ”€ sovereignty_metrics.py โ”‚ โ”œโ”€โ”€ network_protocols.py โ”‚ โ””โ”€โ”€ query_ledger.py # NEW: Append-only query ledger โ”œโ”€โ”€ data/ โ”‚ โ”œโ”€โ”€ lenses.json โ”‚ โ”œโ”€โ”€ glyphs.json โ”‚ โ”œโ”€โ”€ casefiles.json โ”‚ โ”œโ”€โ”€ composites.json โ”‚ โ”œโ”€โ”€ methodology.json โ”‚ โ””โ”€โ”€ query_ledger.json # NEW: Persistent query ledger data โ”œโ”€โ”€ validation/ โ”‚ โ”œโ”€โ”€ quantum_validator.py โ”‚ โ”œโ”€โ”€ historical_correlator.py โ”‚ โ”œโ”€โ”€ anomaly_detector.py โ”‚ โ””โ”€โ”€ ingest_bridge.py # NEW: Validation on ingest โ”œโ”€โ”€ integration/ โ”‚ โ”œโ”€โ”€ truth_engine.py โ”‚ โ”œโ”€โ”€ megaconsciousness_adapter.py โ”‚ โ”œโ”€โ”€ selfqueryplanner.py # NEW: Autonomous query planning โ”‚ โ””โ”€โ”€ retrieval_agent.py # NEW: Query execution and result ingestion โ”œโ”€โ”€ config/ โ”‚ โ””โ”€โ”€ settings.json # UPDATED: Added autonomous_query config โ”œโ”€โ”€ scripts/ โ”‚ โ””โ”€โ”€ test_selfquery.py # NEW: Test script for autonomous query โ””โ”€โ”€ main.py # UPDATED: Integrated autonomous query system ```