|
|
|
|
|
|
|
|
""" |
|
|
STRUCTURAL INQUIRY SYSTEM v2.5 |
|
|
Engineering-Focused Knowledge Discovery with Concrete Improvements |
|
|
""" |
|
|
|
|
|
from enum import Enum |
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Dict, Any, Optional, Tuple, Mapping, Callable |
|
|
import hashlib |
|
|
from datetime import datetime |
|
|
from types import MappingProxyType |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
KNOWLEDGE_NODE = "●" |
|
|
PATTERN_RECOGNITION = "⟁" |
|
|
INQUIRY_MARKER = "?" |
|
|
VALIDATION_SYMBOL = "✓" |
|
|
|
|
|
|
|
|
|
|
|
class KnowledgeStateType(Enum): |
|
|
"""Knowledge state types with clear semantics""" |
|
|
PATTERN_DETECTION = "pattern_detection" |
|
|
DATA_CORRELATION = "data_correlation" |
|
|
CONTEXTUAL_ALIGNMENT = "contextual_alignment" |
|
|
METHODOLOGICAL_STRUCTURE = "methodological_structure" |
|
|
SOURCE_VERIFICATION = "source_verification" |
|
|
TEMPORAL_CONSISTENCY = "temporal_consistency" |
|
|
CROSS_DOMAIN_SYNTHESIS = "cross_domain_synthesis" |
|
|
KNOWLEDGE_GAP_IDENTIFICATION = "knowledge_gap_identification" |
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class KnowledgeState: |
|
|
"""Immutable knowledge state with provenance tracking""" |
|
|
state_id: str |
|
|
state_type: KnowledgeStateType |
|
|
confidence_score: float |
|
|
confidence_provenance: str |
|
|
methodological_rigor: float |
|
|
data_patterns: Tuple[float, ...] |
|
|
knowledge_domains: Tuple[str, ...] |
|
|
temporal_markers: Tuple[str, ...] |
|
|
research_constraints: Tuple[str, ...] |
|
|
structural_description: str |
|
|
validation_signature: str |
|
|
state_hash: str = field(init=False) |
|
|
|
|
|
def __post_init__(self): |
|
|
hash_input = f"{self.state_id}:{self.state_type.value}:{self.confidence_score}:" |
|
|
hash_input += f"{self.confidence_provenance}:{self.methodological_rigor}:" |
|
|
hash_input += ":".join(str(v) for v in self.data_patterns[:10]) |
|
|
hash_input += ":".join(self.knowledge_domains) |
|
|
|
|
|
state_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
|
|
object.__setattr__(self, 'state_hash', state_hash) |
|
|
|
|
|
|
|
|
|
|
|
class InquiryCategory(Enum): |
|
|
"""Inquiry categories with clear prioritization semantics""" |
|
|
CONFIDENCE_DISCREPANCY_ANALYSIS = "confidence_discrepancy_analysis" |
|
|
METHODOLOGICAL_CONSISTENCY_CHECK = "methodological_consistency_check" |
|
|
PATTERN_ANOMALY_DETECTION = "pattern_anomaly_detection" |
|
|
TEMPORAL_ALIGNMENT_VALIDATION = "temporal_alignment_validation" |
|
|
SOURCE_RELIABILITY_ASSESSMENT = "source_reliability_assessment" |
|
|
CROSS_REFERENCE_VALIDATION = "cross_reference_validation" |
|
|
KNOWLEDGE_COMPLETENESS_EVALUATION = "knowledge_completeness_evaluation" |
|
|
|
|
|
|
|
|
|
|
|
class AnalysisResult: |
|
|
"""Structured analysis result for inquiry generation""" |
|
|
def __init__( |
|
|
self, |
|
|
category: InquiryCategory, |
|
|
basis_code: str, |
|
|
basis_kwargs: Dict[str, Any], |
|
|
verification_requirements: List[str], |
|
|
investigation_confidence: float, |
|
|
research_completion_estimate: float, |
|
|
priority_score: float |
|
|
): |
|
|
self.category = category |
|
|
self.basis_code = basis_code |
|
|
self.basis_kwargs = basis_kwargs |
|
|
self.verification_requirements = verification_requirements |
|
|
self.investigation_confidence = investigation_confidence |
|
|
self.research_completion_estimate = research_completion_estimate |
|
|
self.priority_score = priority_score |
|
|
|
|
|
class InquiryAnalyzer: |
|
|
"""Protocol for pluggable analysis""" |
|
|
def analyze(self, state: KnowledgeState) -> List[AnalysisResult]: |
|
|
"""Analyze state and return multiple potential inquiries""" |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
|
|
|
class DefaultInquiryAnalyzer(InquiryAnalyzer): |
|
|
"""Default analyzer that generates multiple inquiry candidates""" |
|
|
|
|
|
def __init__(self, basis_templates: Dict[str, Dict[str, Any]]): |
|
|
self.basis_templates = basis_templates |
|
|
|
|
|
def analyze(self, state: KnowledgeState) -> List[AnalysisResult]: |
|
|
"""Generate multiple inquiry candidates from state""" |
|
|
results = [] |
|
|
|
|
|
|
|
|
if state.confidence_score < 0.7: |
|
|
results.append(self._confidence_analysis(state)) |
|
|
|
|
|
if state.methodological_rigor < 0.65: |
|
|
results.append(self._methodological_analysis(state)) |
|
|
|
|
|
if len(state.data_patterns) < 8: |
|
|
results.append(self._pattern_analysis(state)) |
|
|
|
|
|
if len(state.temporal_markers) < 3: |
|
|
results.append(self._temporal_analysis(state)) |
|
|
|
|
|
if len(state.knowledge_domains) > 2: |
|
|
results.append(self._cross_domain_analysis(state)) |
|
|
|
|
|
|
|
|
if not results: |
|
|
results.append(self._default_analysis(state)) |
|
|
|
|
|
return results |
|
|
|
|
|
def _confidence_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Analyze confidence discrepancies""" |
|
|
confidence_factor = max(0.1, 0.8 - state.confidence_score) |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.CONFIDENCE_DISCREPANCY_ANALYSIS, |
|
|
basis_code="CONFIDENCE_ANOMALY_INVESTIGATION", |
|
|
basis_kwargs={ |
|
|
"score": state.confidence_score * 100, |
|
|
"expected": 75.0, |
|
|
"provenance": state.confidence_provenance |
|
|
}, |
|
|
verification_requirements=[ |
|
|
"statistical_reanalysis", |
|
|
"source_review", |
|
|
"methodology_audit" |
|
|
], |
|
|
investigation_confidence=confidence_factor, |
|
|
research_completion_estimate=self._calculate_completion_estimate(3, confidence_factor), |
|
|
priority_score=self._calculate_priority_score(confidence_factor, 0.9) |
|
|
) |
|
|
|
|
|
def _methodological_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Analyze methodological issues""" |
|
|
rigor_factor = max(0.1, 0.7 - state.methodological_rigor) |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.METHODOLOGICAL_CONSISTENCY_CHECK, |
|
|
basis_code="METHODOLOGICAL_CONSISTENCY_QUESTION", |
|
|
basis_kwargs={ |
|
|
"rigor": state.methodological_rigor * 100, |
|
|
"method_type": "research_protocol" |
|
|
}, |
|
|
verification_requirements=[ |
|
|
"protocol_review", |
|
|
"reproducibility_check", |
|
|
"peer_validation" |
|
|
], |
|
|
investigation_confidence=rigor_factor, |
|
|
research_completion_estimate=self._calculate_completion_estimate(3, rigor_factor), |
|
|
priority_score=self._calculate_priority_score(rigor_factor, 0.8) |
|
|
) |
|
|
|
|
|
def _pattern_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Analyze pattern anomalies""" |
|
|
pattern_factor = len(state.data_patterns) / 10.0 |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.PATTERN_ANOMALY_DETECTION, |
|
|
basis_code="PATTERN_DEVIATION_ANALYSIS", |
|
|
basis_kwargs={ |
|
|
"pattern_completeness": pattern_factor * 100, |
|
|
"expected_patterns": 8 |
|
|
}, |
|
|
verification_requirements=[ |
|
|
"pattern_completeness_check", |
|
|
"data_collection_review", |
|
|
"statistical_validation" |
|
|
], |
|
|
investigation_confidence=1.0 - pattern_factor, |
|
|
research_completion_estimate=self._calculate_completion_estimate(3, pattern_factor), |
|
|
priority_score=self._calculate_priority_score(1.0 - pattern_factor, 0.7) |
|
|
) |
|
|
|
|
|
def _temporal_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Analyze temporal issues""" |
|
|
temporal_factor = len(state.temporal_markers) / 3.0 |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.TEMPORAL_ALIGNMENT_VALIDATION, |
|
|
basis_code="TEMPORAL_CONSISTENCY_CHECK", |
|
|
basis_kwargs={ |
|
|
"marker_count": len(state.temporal_markers), |
|
|
"expected_markers": 3 |
|
|
}, |
|
|
verification_requirements=[ |
|
|
"temporal_sequence_verification", |
|
|
"chronological_consistency_check" |
|
|
], |
|
|
investigation_confidence=1.0 - temporal_factor, |
|
|
research_completion_estimate=self._calculate_completion_estimate(2, temporal_factor), |
|
|
priority_score=self._calculate_priority_score(1.0 - temporal_factor, 0.6) |
|
|
) |
|
|
|
|
|
def _cross_domain_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Analyze cross-domain issues""" |
|
|
domain_factor = min(1.0, len(state.knowledge_domains) / 5.0) |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.CROSS_REFERENCE_VALIDATION, |
|
|
basis_code="CROSS_DOMAIN_ALIGNMENT_CHECK", |
|
|
basis_kwargs={ |
|
|
"domain_count": len(state.knowledge_domains), |
|
|
"domains": list(state.knowledge_domains)[:3] |
|
|
}, |
|
|
verification_requirements=[ |
|
|
"cross_domain_correlation", |
|
|
"independent_verification" |
|
|
], |
|
|
investigation_confidence=domain_factor, |
|
|
research_completion_estimate=self._calculate_completion_estimate(2, domain_factor), |
|
|
priority_score=self._calculate_priority_score(domain_factor, 0.5) |
|
|
) |
|
|
|
|
|
def _default_analysis(self, state: KnowledgeState) -> AnalysisResult: |
|
|
"""Default analysis for well-formed states""" |
|
|
return AnalysisResult( |
|
|
category=InquiryCategory.KNOWLEDGE_COMPLETENESS_EVALUATION, |
|
|
basis_code="BASELINE_VERIFICATION", |
|
|
basis_kwargs={ |
|
|
"confidence_score": state.confidence_score * 100, |
|
|
"rigor_score": state.methodological_rigor * 100 |
|
|
}, |
|
|
verification_requirements=["comprehensive_review"], |
|
|
investigation_confidence=0.3, |
|
|
research_completion_estimate=0.9, |
|
|
priority_score=2.0 |
|
|
) |
|
|
|
|
|
def _calculate_completion_estimate(self, requirement_count: int, confidence: float) -> float: |
|
|
"""Calculate research completion estimate""" |
|
|
base = 0.5 |
|
|
requirement_impact = 0.9 ** requirement_count |
|
|
confidence_impact = confidence * 0.4 |
|
|
return min(0.95, base * requirement_impact + confidence_impact) |
|
|
|
|
|
def _calculate_priority_score(self, investigation_confidence: float, weight: float) -> float: |
|
|
"""Calculate priority score with clear semantics""" |
|
|
base_score = investigation_confidence * weight |
|
|
return round(base_score * 10, 2) |
|
|
|
|
|
|
|
|
|
|
|
INQUIRY_BASIS_TEMPLATES = { |
|
|
"CONFIDENCE_ANOMALY_INVESTIGATION": { |
|
|
"template": "Confidence score of {score}% ({provenance}) differs from expected baseline of {expected}%", |
|
|
"investigation_focus": "confidence_validation" |
|
|
}, |
|
|
"METHODOLOGICAL_CONSISTENCY_QUESTION": { |
|
|
"template": "Methodological rigor rating of {rigor}% suggests review of {method_type} may be beneficial", |
|
|
"investigation_focus": "methodological_review" |
|
|
}, |
|
|
"PATTERN_DEVIATION_ANALYSIS": { |
|
|
"template": "Pattern completeness at {pattern_completeness}% with {expected_patterns} expected patterns", |
|
|
"investigation_focus": "pattern_analysis" |
|
|
}, |
|
|
"TEMPORAL_CONSISTENCY_CHECK": { |
|
|
"template": "Temporal markers: {marker_count} present, {expected_markers} expected", |
|
|
"investigation_focus": "temporal_validation" |
|
|
}, |
|
|
"CROSS_DOMAIN_ALIGNMENT_CHECK": { |
|
|
"template": "Cross-domain analysis across {domain_count} domains: {domains}", |
|
|
"investigation_focus": "cross_domain_validation" |
|
|
}, |
|
|
"BASELINE_VERIFICATION": { |
|
|
"template": "Baseline verification: confidence={confidence_score}%, rigor={rigor_score}%", |
|
|
"investigation_focus": "comprehensive_review" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class InquiryArtifact: |
|
|
"""Deterministic inquiry artifact with robust priority calculation""" |
|
|
artifact_id: str |
|
|
source_state_hash: str |
|
|
inquiry_category: InquiryCategory |
|
|
investigation_priority: int |
|
|
knowledge_domains_involved: Tuple[str, ...] |
|
|
basis_code: str |
|
|
inquiry_description: str |
|
|
verification_requirements: Tuple[str, ...] |
|
|
investigation_confidence: float |
|
|
research_completion_estimate: float |
|
|
confidence_provenance: str |
|
|
artifact_hash: str |
|
|
creation_context: 'CreationContext' |
|
|
|
|
|
@classmethod |
|
|
def create( |
|
|
cls, |
|
|
knowledge_state: KnowledgeState, |
|
|
analysis_result: AnalysisResult, |
|
|
basis_templates: Dict[str, Dict[str, Any]], |
|
|
creation_context: 'CreationContext' |
|
|
) -> 'InquiryArtifact': |
|
|
"""Create inquiry artifact with deterministic hash""" |
|
|
|
|
|
|
|
|
template_data = basis_templates.get(analysis_result.basis_code, {}) |
|
|
description_template = template_data.get("template", "Analysis required") |
|
|
inquiry_description = description_template.format(**analysis_result.basis_kwargs) |
|
|
|
|
|
|
|
|
priority_value = max(1, min(10, int(round(analysis_result.priority_score)))) |
|
|
|
|
|
|
|
|
hash_input = f"{knowledge_state.state_hash}:{analysis_result.category.value}:" |
|
|
hash_input += f"{analysis_result.basis_code}:{priority_value}:" |
|
|
hash_input += ":".join(analysis_result.verification_requirements) |
|
|
hash_input += creation_context.context_hash |
|
|
|
|
|
artifact_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
|
|
artifact_id = f"inq_{artifact_hash[:16]}" |
|
|
|
|
|
return cls( |
|
|
artifact_id=artifact_id, |
|
|
source_state_hash=knowledge_state.state_hash, |
|
|
inquiry_category=analysis_result.category, |
|
|
investigation_priority=priority_value, |
|
|
knowledge_domains_involved=knowledge_state.knowledge_domains, |
|
|
basis_code=analysis_result.basis_code, |
|
|
inquiry_description=inquiry_description, |
|
|
verification_requirements=tuple(analysis_result.verification_requirements), |
|
|
investigation_confidence=analysis_result.investigation_confidence, |
|
|
research_completion_estimate=analysis_result.research_completion_estimate, |
|
|
confidence_provenance=knowledge_state.confidence_provenance, |
|
|
artifact_hash=artifact_hash, |
|
|
creation_context=creation_context |
|
|
) |
|
|
|
|
|
def reference_information(self) -> Mapping[str, Any]: |
|
|
"""Immutable reference information""" |
|
|
return MappingProxyType({ |
|
|
"artifact_id": self.artifact_id, |
|
|
"source_state": self.source_state_hash[:12], |
|
|
"inquiry_category": self.inquiry_category.value, |
|
|
"investigation_priority": self.investigation_priority, |
|
|
"priority_semantics": self._priority_semantics(), |
|
|
"knowledge_domains": list(self.knowledge_domains_involved), |
|
|
"basis": { |
|
|
"code": self.basis_code, |
|
|
"description": self.inquiry_description, |
|
|
"confidence_provenance": self.confidence_provenance |
|
|
}, |
|
|
"verification_requirements": list(self.verification_requirements), |
|
|
"investigation_confidence": round(self.investigation_confidence, 3), |
|
|
"research_completion_estimate": round(self.research_completion_estimate, 3), |
|
|
"artifact_hash": self.artifact_hash, |
|
|
"creation_context": self.creation_context.reference_data() |
|
|
}) |
|
|
|
|
|
def _priority_semantics(self) -> str: |
|
|
"""Document priority semantics""" |
|
|
if self.investigation_priority >= 9: |
|
|
return "critical_immediate_attention" |
|
|
elif self.investigation_priority >= 7: |
|
|
return "high_priority_review" |
|
|
elif self.investigation_priority >= 5: |
|
|
return "moderate_priority" |
|
|
elif self.investigation_priority >= 3: |
|
|
return "low_priority_backlog" |
|
|
else: |
|
|
return "informational_only" |
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class CreationContext: |
|
|
"""Immutable creation context""" |
|
|
system_version: str |
|
|
generation_timestamp: str |
|
|
research_environment: str |
|
|
deterministic_seed: Optional[int] |
|
|
context_hash: str = field(init=False) |
|
|
|
|
|
def __post_init__(self): |
|
|
hash_input = f"{self.system_version}:{self.generation_timestamp}:" |
|
|
hash_input += f"{self.research_environment}:{self.deterministic_seed or 'none'}" |
|
|
|
|
|
context_hash = hashlib.sha3_512(hash_input.encode()).hexdigest()[:32] |
|
|
object.__setattr__(self, 'context_hash', context_hash) |
|
|
|
|
|
@classmethod |
|
|
def create( |
|
|
cls, |
|
|
research_environment: str = "knowledge_discovery_system", |
|
|
deterministic_seed: Optional[int] = None, |
|
|
clock_source: Callable[[], datetime] = datetime.now |
|
|
) -> 'CreationContext': |
|
|
"""Factory method with optional determinism""" |
|
|
return cls( |
|
|
system_version="structural_inquiry_v2.5", |
|
|
generation_timestamp=clock_source().isoformat(), |
|
|
research_environment=research_environment, |
|
|
deterministic_seed=deterministic_seed |
|
|
) |
|
|
|
|
|
def reference_data(self) -> Mapping[str, Any]: |
|
|
return MappingProxyType({ |
|
|
"system_version": self.system_version, |
|
|
"generation_timestamp": self.generation_timestamp, |
|
|
"research_environment": self.research_environment, |
|
|
"deterministic_mode": self.deterministic_seed is not None, |
|
|
"context_hash": self.context_hash[:12] |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
class InquiryGenerator: |
|
|
""" |
|
|
Deterministic inquiry generator with pluggable analysis |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
analyzer: Optional[InquiryAnalyzer] = None, |
|
|
creation_context: Optional[CreationContext] = None, |
|
|
deterministic_seed: Optional[int] = None |
|
|
): |
|
|
self.analyzer = analyzer or DefaultInquiryAnalyzer(INQUIRY_BASIS_TEMPLATES) |
|
|
self.creation_context = creation_context or CreationContext.create( |
|
|
deterministic_seed=deterministic_seed |
|
|
) |
|
|
self.generated_inquiries: List[InquiryArtifact] = [] |
|
|
|
|
|
|
|
|
if deterministic_seed is not None: |
|
|
np.random.seed(deterministic_seed) |
|
|
|
|
|
def generate_inquiries( |
|
|
self, |
|
|
knowledge_states: Tuple[KnowledgeState, ...], |
|
|
confidence_threshold: float = 0.7 |
|
|
) -> Tuple[InquiryArtifact, ...]: |
|
|
"""Generate inquiries from knowledge states""" |
|
|
|
|
|
inquiries = [] |
|
|
|
|
|
for state in knowledge_states: |
|
|
|
|
|
analysis_results = self.analyzer.analyze(state) |
|
|
|
|
|
for result in analysis_results: |
|
|
|
|
|
if result.investigation_confidence >= confidence_threshold: |
|
|
inquiry = InquiryArtifact.create( |
|
|
knowledge_state=state, |
|
|
analysis_result=result, |
|
|
basis_templates=INQUIRY_BASIS_TEMPLATES, |
|
|
creation_context=self.creation_context |
|
|
) |
|
|
inquiries.append(inquiry) |
|
|
self.generated_inquiries.append(inquiry) |
|
|
|
|
|
return tuple(inquiries) |
|
|
|
|
|
|
|
|
|
|
|
class ResearchSystem: |
|
|
"""Abstract research system interface""" |
|
|
|
|
|
async def research(self, topic: str, **kwargs) -> Dict[str, Any]: |
|
|
"""Conduct research on topic (must be implemented)""" |
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
|
|
|
class IntegratedKnowledgeDiscovery: |
|
|
""" |
|
|
Integrated system with clear async boundaries and determinism |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
research_system: ResearchSystem, |
|
|
deterministic_seed: Optional[int] = None |
|
|
): |
|
|
""" |
|
|
Initialize with concrete research system |
|
|
|
|
|
Args: |
|
|
research_system: Must implement ResearchSystem interface |
|
|
deterministic_seed: Optional seed for reproducible results |
|
|
""" |
|
|
if not isinstance(research_system, ResearchSystem): |
|
|
raise TypeError("research_system must implement ResearchSystem interface") |
|
|
|
|
|
self.research_system = research_system |
|
|
self.deterministic_seed = deterministic_seed |
|
|
self.inquiry_generator = InquiryGenerator(deterministic_seed=deterministic_seed) |
|
|
self.discovery_history: List[Dict[str, Any]] = [] |
|
|
|
|
|
async def conduct_research_with_inquiries( |
|
|
self, |
|
|
research_topic: str, |
|
|
confidence_threshold: float = 0.7, |
|
|
**research_kwargs |
|
|
) -> Dict[str, Any]: |
|
|
"""Conduct research and generate knowledge inquiries""" |
|
|
|
|
|
|
|
|
research_result = await self.research_system.research(research_topic, **research_kwargs) |
|
|
|
|
|
|
|
|
knowledge_state = self._convert_to_knowledge_state(research_result) |
|
|
|
|
|
|
|
|
knowledge_states = (knowledge_state,) |
|
|
inquiry_artifacts = self.inquiry_generator.generate_inquiries( |
|
|
knowledge_states, |
|
|
confidence_threshold |
|
|
) |
|
|
|
|
|
|
|
|
inquiry_collection = { |
|
|
"collection_id": f"inq_coll_{hashlib.sha256(knowledge_state.state_hash.encode()).hexdigest()[:16]}", |
|
|
"research_topic": research_topic, |
|
|
"knowledge_state_hash": knowledge_state.state_hash[:12], |
|
|
"inquiry_count": len(inquiry_artifacts), |
|
|
"generation_timestamp": datetime.utcnow().isoformat(), |
|
|
"confidence_threshold": confidence_threshold, |
|
|
"deterministic_mode": self.deterministic_seed is not None, |
|
|
"inquiries": [i.reference_information() for i in inquiry_artifacts] |
|
|
} |
|
|
|
|
|
|
|
|
self.discovery_history.append({ |
|
|
"research_topic": research_topic, |
|
|
"research_result": research_result, |
|
|
"knowledge_state": knowledge_state, |
|
|
"inquiry_collection": inquiry_collection, |
|
|
"inquiry_artifacts": inquiry_artifacts |
|
|
}) |
|
|
|
|
|
return { |
|
|
"research_topic": research_topic, |
|
|
"research_summary": { |
|
|
"confidence_score": research_result.get("confidence_score", 0.5), |
|
|
"methodological_rigor": research_result.get("methodological_rigor", 0.5), |
|
|
"domains": research_result.get("knowledge_domains", []) |
|
|
}, |
|
|
"inquiry_generation": { |
|
|
"inquiries_generated": len(inquiry_artifacts), |
|
|
"inquiry_collection_id": inquiry_collection["collection_id"], |
|
|
"priority_distribution": self._summarize_priorities(inquiry_artifacts), |
|
|
"confidence_threshold_met": len(inquiry_artifacts) > 0 |
|
|
} |
|
|
} |
|
|
|
|
|
def _convert_to_knowledge_state( |
|
|
self, |
|
|
research_result: Dict[str, Any] |
|
|
) -> KnowledgeState: |
|
|
"""Convert research result to knowledge state""" |
|
|
|
|
|
|
|
|
confidence_score = research_result.get("confidence_score", 0.5) |
|
|
confidence_provenance = research_result.get( |
|
|
"confidence_provenance", |
|
|
"derived_from_research" |
|
|
) |
|
|
|
|
|
|
|
|
if confidence_score < 0.6: |
|
|
state_type = KnowledgeStateType.SOURCE_VERIFICATION |
|
|
elif "pattern" in str(research_result.get("structural_description", "")).lower(): |
|
|
state_type = KnowledgeStateType.PATTERN_DETECTION |
|
|
elif len(research_result.get("knowledge_domains", [])) > 2: |
|
|
state_type = KnowledgeStateType.CROSS_DOMAIN_SYNTHESIS |
|
|
else: |
|
|
state_type = KnowledgeStateType.DATA_CORRELATION |
|
|
|
|
|
|
|
|
if self.deterministic_seed is not None: |
|
|
|
|
|
pattern_seed = hash(f"{self.deterministic_seed}:{research_result.get('content_hash', '')}") |
|
|
np.random.seed(pattern_seed % (2**32)) |
|
|
data_patterns = tuple(np.random.randn(8).tolist()) |
|
|
else: |
|
|
|
|
|
provided_patterns = research_result.get("data_patterns", []) |
|
|
data_patterns = tuple(provided_patterns[:8]) if provided_patterns else tuple(np.sin(np.arange(8) * 0.785).tolist()) |
|
|
|
|
|
|
|
|
structural_description = self._generate_structural_description(research_result) |
|
|
|
|
|
|
|
|
validation_signature = hashlib.sha3_512( |
|
|
f"{research_result.get('content_hash', '')}:{self.deterministic_seed or 'stochastic'}".encode() |
|
|
).hexdigest()[:32] |
|
|
|
|
|
return KnowledgeState( |
|
|
state_id=f"knowledge_state_{research_result.get('content_hash', 'unknown')[:12]}", |
|
|
state_type=state_type, |
|
|
confidence_score=confidence_score, |
|
|
confidence_provenance=confidence_provenance, |
|
|
methodological_rigor=research_result.get("methodological_rigor", 0.5), |
|
|
data_patterns=data_patterns, |
|
|
knowledge_domains=tuple(research_result.get("knowledge_domains", ["general"])), |
|
|
temporal_markers=( |
|
|
research_result.get("timestamp", ""), |
|
|
datetime.utcnow().isoformat() |
|
|
), |
|
|
research_constraints=self._extract_constraints(research_result), |
|
|
structural_description=structural_description, |
|
|
validation_signature=validation_signature |
|
|
) |
|
|
|
|
|
def _generate_structural_description( |
|
|
self, |
|
|
research_result: Dict[str, Any] |
|
|
) -> str: |
|
|
"""Generate structural description""" |
|
|
components = [] |
|
|
|
|
|
confidence = research_result.get("confidence_score", 0.5) |
|
|
provenance = research_result.get("confidence_provenance", "unstated") |
|
|
|
|
|
if confidence < 0.6: |
|
|
components.append(f"Low confidence ({confidence:.2f}) from {provenance}") |
|
|
elif confidence > 0.8: |
|
|
components.append(f"High confidence ({confidence:.2f}) from {provenance}") |
|
|
|
|
|
rigor = research_result.get("methodological_rigor", 0.5) |
|
|
if rigor < 0.6: |
|
|
components.append(f"Methodological rigor: {rigor:.2f}") |
|
|
|
|
|
domains = research_result.get("knowledge_domains", []) |
|
|
if len(domains) > 2: |
|
|
components.append(f"Cross-domain: {len(domains)} domains") |
|
|
|
|
|
if not components: |
|
|
components.append("Standard research structure") |
|
|
|
|
|
return f"{KNOWLEDGE_NODE} " + "; ".join(components) |
|
|
|
|
|
def _extract_constraints( |
|
|
self, |
|
|
research_result: Dict[str, Any] |
|
|
) -> Tuple[str, ...]: |
|
|
"""Extract research constraints""" |
|
|
constraints = [] |
|
|
|
|
|
if research_result.get("confidence_score", 0) < 0.7: |
|
|
constraints.append("confidence_verification_needed") |
|
|
|
|
|
if research_result.get("methodological_rigor", 0) < 0.6: |
|
|
constraints.append("methodology_review_recommended") |
|
|
|
|
|
if not research_result.get("source_references", []): |
|
|
constraints.append("source_corroboration_required") |
|
|
|
|
|
if not constraints: |
|
|
constraints.append("standard_verification_protocol") |
|
|
|
|
|
return tuple(constraints) |
|
|
|
|
|
def _summarize_priorities( |
|
|
self, |
|
|
inquiry_artifacts: Tuple[InquiryArtifact, ...] |
|
|
) -> Dict[str, Any]: |
|
|
"""Summarize inquiry priorities with clear semantics""" |
|
|
if not inquiry_artifacts: |
|
|
return {"message": "No inquiries generated", "priority_levels": {}} |
|
|
|
|
|
priority_summary = {} |
|
|
for artifact in inquiry_artifacts: |
|
|
priority = artifact.investigation_priority |
|
|
if priority not in priority_summary: |
|
|
priority_summary[priority] = { |
|
|
"count": 0, |
|
|
"domains": set(), |
|
|
"semantics": artifact._priority_semantics() |
|
|
} |
|
|
|
|
|
priority_summary[priority]["count"] += 1 |
|
|
priority_summary[priority]["domains"].update(artifact.knowledge_domains_involved) |
|
|
|
|
|
|
|
|
for priority in priority_summary: |
|
|
priority_summary[priority]["domains"] = list(priority_summary[priority]["domains"]) |
|
|
|
|
|
return { |
|
|
"total_priorities": len(priority_summary), |
|
|
"highest_priority": max(priority_summary.keys()), |
|
|
"priority_distribution": priority_summary |
|
|
} |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get system statistics""" |
|
|
total_inquiries = len(self.inquiry_generator.generated_inquiries) |
|
|
|
|
|
|
|
|
category_counts = {} |
|
|
for inquiry in self.inquiry_generator.generated_inquiries: |
|
|
category = inquiry.inquiry_category.value |
|
|
category_counts[category] = category_counts.get(category, 0) + 1 |
|
|
|
|
|
|
|
|
if total_inquiries > 0: |
|
|
avg_confidence = np.mean([i.investigation_confidence for i in self.inquiry_generator.generated_inquiries]) |
|
|
avg_priority = np.mean([i.investigation_priority for i in self.inquiry_generator.generated_inquiries]) |
|
|
else: |
|
|
avg_confidence = 0.0 |
|
|
avg_priority = 0.0 |
|
|
|
|
|
return { |
|
|
"system": "Integrated Knowledge Discovery v2.5", |
|
|
"research_sessions": len(self.discovery_history), |
|
|
"total_inquiries_generated": total_inquiries, |
|
|
"category_distribution": category_counts, |
|
|
"average_investigation_confidence": round(float(avg_confidence), 3), |
|
|
"average_investigation_priority": round(float(avg_priority), 1), |
|
|
"deterministic_mode": self.deterministic_seed is not None, |
|
|
"engineering_properties": { |
|
|
"immutable_data_structures": True, |
|
|
"deterministic_hashes": True, |
|
|
"pluggable_analyzers": True, |
|
|
"clear_async_boundaries": True, |
|
|
"priority_semantics_documented": True |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
class ConcreteResearchSystem(ResearchSystem): |
|
|
"""Example research system with proper async implementation""" |
|
|
|
|
|
def __init__(self, deterministic_seed: Optional[int] = None): |
|
|
self.deterministic_seed = deterministic_seed |
|
|
if deterministic_seed is not None: |
|
|
np.random.seed(deterministic_seed) |
|
|
|
|
|
async def research(self, topic: str, **kwargs) -> Dict[str, Any]: |
|
|
"""Conduct research (simulated for example)""" |
|
|
|
|
|
import asyncio |
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
|
|
|
if self.deterministic_seed is not None: |
|
|
|
|
|
topic_hash = hash(topic) % 1000 |
|
|
confidence = 0.5 + (topic_hash % 500) / 1000 |
|
|
rigor = 0.4 + (topic_hash % 600) / 1000 |
|
|
else: |
|
|
|
|
|
confidence = np.random.random() * 0.3 + 0.5 |
|
|
rigor = np.random.random() * 0.4 + 0.4 |
|
|
|
|
|
return { |
|
|
"topic": topic, |
|
|
"content_hash": hashlib.sha256(topic.encode()).hexdigest()[:32], |
|
|
"confidence_score": confidence, |
|
|
"confidence_provenance": "simulated_analysis", |
|
|
"methodological_rigor": rigor, |
|
|
"knowledge_domains": self._identify_domains(topic), |
|
|
"structural_description": f"Research on {topic}", |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"data_patterns": np.sin(np.arange(10) * 0.628).tolist(), |
|
|
"source_references": [f"ref_{i}" for i in range(np.random.randint(1, 4))] |
|
|
} |
|
|
|
|
|
def _identify_domains(self, topic: str) -> List[str]: |
|
|
"""Identify domains from topic""" |
|
|
domains = [] |
|
|
topic_lower = topic.lower() |
|
|
|
|
|
if any(word in topic_lower for word in ["quantum", "physics"]): |
|
|
domains.append("physics") |
|
|
if any(word in topic_lower for word in ["history", "ancient"]): |
|
|
domains.append("history") |
|
|
if any(word in topic_lower for word in ["consciousness", "mind"]): |
|
|
domains.append("psychology") |
|
|
if any(word in topic_lower for word in ["pattern", "analysis"]): |
|
|
domains.append("mathematics") |
|
|
|
|
|
return domains if domains else ["interdisciplinary"] |
|
|
|
|
|
|
|
|
|
|
|
def run_deterministic_test() -> bool: |
|
|
"""Test deterministic reproducibility""" |
|
|
print("Testing deterministic reproducibility...") |
|
|
|
|
|
|
|
|
research_system1 = ConcreteResearchSystem(deterministic_seed=42) |
|
|
system1 = IntegratedKnowledgeDiscovery(research_system1, deterministic_seed=42) |
|
|
|
|
|
research_system2 = ConcreteResearchSystem(deterministic_seed=42) |
|
|
system2 = IntegratedKnowledgeDiscovery(research_system2, deterministic_seed=42) |
|
|
|
|
|
import asyncio |
|
|
|
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
result1 = loop.run_until_complete( |
|
|
system1.conduct_research_with_inquiries("Test topic") |
|
|
) |
|
|
result2 = loop.run_until_complete( |
|
|
system2.conduct_research_with_inquiries("Test topic") |
|
|
) |
|
|
|
|
|
loop.close() |
|
|
|
|
|
|
|
|
inquiries1 = result1["inquiry_generation"]["inquiries_generated"] |
|
|
inquiries2 = result2["inquiry_generation"]["inquiries_generated"] |
|
|
|
|
|
print(f" System 1 inquiries: {inquiries1}") |
|
|
print(f" System 2 inquiries: {inquiries2}") |
|
|
print(f" Results identical: {inquiries1 == inquiries2}") |
|
|
|
|
|
return inquiries1 == inquiries2 |
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Demonstrate the system""" |
|
|
print(f""" |
|
|
{'='*70} |
|
|
STRUCTURAL INQUIRY SYSTEM v2.5 |
|
|
Engineering-Focused Knowledge Discovery |
|
|
{'='*70} |
|
|
""") |
|
|
|
|
|
|
|
|
if run_deterministic_test(): |
|
|
print(f"\n{VALIDATION_SYMBOL} Deterministic reproducibility verified") |
|
|
else: |
|
|
print(f"\n{INQUIRY_MARKER} Non-deterministic behavior detected") |
|
|
|
|
|
|
|
|
research_system = ConcreteResearchSystem() |
|
|
discovery_system = IntegratedKnowledgeDiscovery(research_system) |
|
|
|
|
|
topics = [ |
|
|
"Quantum pattern analysis techniques", |
|
|
"Historical methodology consistency", |
|
|
"Cross-domain verification protocols" |
|
|
] |
|
|
|
|
|
for i, topic in enumerate(topics, 1): |
|
|
print(f"\n{PATTERN_RECOGNITION} RESEARCH SESSION {i}: {topic}") |
|
|
print(f"{'-'*60}") |
|
|
|
|
|
result = await discovery_system.conduct_research_with_inquiries( |
|
|
topic, |
|
|
confidence_threshold=0.6 |
|
|
) |
|
|
|
|
|
inquiries = result["inquiry_generation"]["inquiries_generated"] |
|
|
priorities = result["inquiry_generation"]["priority_distribution"] |
|
|
|
|
|
print(f" {VALIDATION_SYMBOL} Research completed") |
|
|
print(f" {KNOWLEDGE_NODE} Inquiries generated: {inquiries}") |
|
|
|
|
|
if inquiries > 0: |
|
|
for priority, data in priorities.get("priority_distribution", {}).items(): |
|
|
semantics = data.get("semantics", "unknown") |
|
|
print(f" Priority {priority} ({semantics}): {data['count']} inquiries") |
|
|
|
|
|
|
|
|
stats = discovery_system.get_statistics() |
|
|
print(f"\n{'='*70}") |
|
|
print("SYSTEM STATISTICS") |
|
|
print(f"{'='*70}") |
|
|
|
|
|
print(f"\nResearch sessions: {stats['research_sessions']}") |
|
|
print(f"Total inquiries: {stats['total_inquiries_generated']}") |
|
|
print(f"\nEngineering properties:") |
|
|
for prop, value in stats["engineering_properties"].items(): |
|
|
status = "✓" if value else "✗" |
|
|
print(f" {status} {prop}: {value}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
|
|
|
try: |
|
|
asyncio.run(main()) |
|
|
except KeyboardInterrupt: |
|
|
print(f"\n\n{KNOWLEDGE_NODE} System shutdown complete.") |