|
|
| ```python |
| """ |
| IMMUTABLE REALITY ENGINE v6.2.2 - PRODUCTION-READY ADVANCED ARCHITECTURE |
| Fixed all identified issues with proper error handling and guarantees |
| """ |
|
|
| import asyncio |
| import hashlib |
| import json |
| import os |
| import secrets |
| import time |
| import uuid |
| from collections import Counter, defaultdict |
| from dataclasses import dataclass, field, asdict |
| from datetime import datetime, timedelta |
| from enum import Enum |
| from typing import Any, Dict, List, Optional, Tuple, Union, Callable |
| from abc import ABC, abstractmethod |
| import aiohttp |
| from aiohttp import ClientTimeout, ClientSession |
| import logging |
| from logging.handlers import RotatingFileHandler |
| from queue import Queue |
| from concurrent.futures import ThreadPoolExecutor |
| import base64 |
|
|
| |
|
|
| class ProductionConfig: |
| """Production configuration with proper type safety""" |
| |
| |
| N8N_WEBHOOK_URL: str = os.getenv("N8N_WEBHOOK_URL", "http://localhost:5678/webhook/ire") |
| N8N_API_KEY: str = os.getenv("N8N_API_KEY", "") |
| N8N_TIMEOUT_SECONDS: int = int(os.getenv("N8N_TIMEOUT", "30")) |
| N8N_MAX_RETRIES: int = int(os.getenv("N8N_MAX_RETRIES", "3")) |
| |
| |
| HASH_ALGORITHM: str = "SHA3-512" |
| SIGNATURE_SCHEME: str = "ED25519_WITH_SHA3" |
| |
| |
| MAX_CONCURRENT_DETECTIONS: int = 10 |
| DETECTION_TIMEOUT_SECONDS: int = 30 |
| LEDGER_BATCH_SIZE: int = 50 |
| VALIDATION_TIMEOUT_SECONDS: int = 5 |
| |
| |
| DATA_DIR: str = "./ire_production_data" |
| LEDGER_PATH: str = "./ire_production_data/ledger" |
| CACHE_PATH: str = "./ire_production_data/cache" |
| LOG_PATH: str = "./ire_production_data/logs" |
| |
| |
| MIN_VALIDATORS: int = 3 |
| QUORUM_THRESHOLD: float = 0.67 |
| DISSENT_THRESHOLD: float = 0.33 |
| |
| |
| MAX_FUTURE_TOLERANCE_SECONDS: int = 300 |
| MAX_PAST_TOLERANCE_DAYS: int = 365 * 10 |
| |
| |
| WORKFLOW_IDS: Dict[str, str] = { |
| "lens_analysis": "lens-detection-v5", |
| "method_execution": "method-execution-v5", |
| "equilibrium_detection": "equilibrium-detection-v5", |
| "threat_analysis": "stride-e-threat-v5", |
| "validator_attestation": "validator-quorum-v5", |
| "ledger_commit": "ledger-commit-v5", |
| "quorum_calculation": "quorum-calculation-v5" |
| } |
| |
| @classmethod |
| def ensure_directories(cls): |
| """Ensure all required directories exist""" |
| for path in [cls.DATA_DIR, cls.LEDGER_PATH, cls.CACHE_PATH, cls.LOG_PATH]: |
| os.makedirs(path, exist_ok=True) |
|
|
| |
| ProductionConfig.ensure_directories() |
|
|
| |
|
|
| class ProductionLogger: |
| """Production-grade logging with rotation""" |
| |
| def __init__(self, name: str = "IRE_Engine"): |
| self.logger = logging.getLogger(name) |
| self.logger.setLevel(logging.INFO) |
| |
| |
| console_handler = logging.StreamHandler() |
| console_handler.setLevel(logging.INFO) |
| console_format = logging.Formatter( |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| console_handler.setFormatter(console_format) |
| |
| |
| log_file = os.path.join(ProductionConfig.LOG_PATH, f"{name}.log") |
| file_handler = RotatingFileHandler( |
| log_file, |
| maxBytes=10 * 1024 * 1024, |
| backupCount=5 |
| ) |
| file_handler.setLevel(logging.DEBUG) |
| file_format = logging.Formatter( |
| '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' |
| ) |
| file_handler.setFormatter(file_format) |
| |
| |
| self.logger.addHandler(console_handler) |
| self.logger.addHandler(file_handler) |
| |
| def info(self, message: str, **kwargs): |
| self.logger.info(f"{message} | {kwargs}") |
| |
| def warning(self, message: str, **kwargs): |
| self.logger.warning(f"{message} | {kwargs}") |
| |
| def error(self, message: str, **kwargs): |
| self.logger.error(f"{message} | {kwargs}") |
| |
| def critical(self, message: str, **kwargs): |
| self.logger.critical(f"{message} | {kwargs}") |
|
|
| |
| logger = ProductionLogger() |
|
|
| |
|
|
| class Primitive(str, Enum): |
| """14 Primitives - clearly labeled as concepts, not cryptographic guarantees""" |
| ERASURE = "ERASURE" |
| INTERRUPTION = "INTERRUPTION" |
| FRAGMENTATION = "FRAGMENTATION" |
| NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" |
| MISDIRECTION = "MISDIRECTION" |
| SATURATION = "SATURATION" |
| DISCREDITATION = "DISCREDITATION" |
| ATTRITION = "ATTRITION" |
| ACCESS_CONTROL = "ACCESS_CONTROL" |
| TEMPORAL = "TEMPORAL" |
| CONDITIONING = "CONDITIONING" |
| META = "META" |
| ABSORPTIVE = "ABSORPTIVE" |
| EXHAUSTION = "EXHAUSTION" |
| |
| @property |
| def is_equilibrium_primitive(self) -> bool: |
| """Check if primitive is for equilibrium detection""" |
| return self in [Primitive.ABSORPTIVE, Primitive.EXHAUSTION] |
|
|
| class SuppressionPhase(str, Enum): |
| """Suppression lifecycle phases""" |
| ACTIVE_SUPPRESSION = "ACTIVE_SUPPRESSION" |
| ESTABLISHING_SUPPRESSION = "ESTABLISHING_SUPPRESSION" |
| POST_SUPPRESSION_EQUILIBRIUM = "POST_SUPPRESSION_EQUILIBRIUM" |
| |
| @classmethod |
| def detect(cls, metrics: Dict[str, float]) -> 'SuppressionPhase': |
| """Deterministic phase detection""" |
| equilibrium_score = metrics.get("equilibrium_score", 0) |
| active_score = metrics.get("active_suppression_score", 0) |
| |
| if equilibrium_score > 0.7: |
| return cls.POST_SUPPRESSION_EQUILIBRIUM |
| elif equilibrium_score > 0.3: |
| return cls.ESTABLISHING_SUPPRESSION |
| else: |
| return cls.ACTIVE_SUPPRESSION |
|
|
| class ValidatorArchetype(str, Enum): |
| """Validator archetypes for attestation""" |
| HUMAN_SOVEREIGN = "HUMAN_SOVEREIGN" |
| SYSTEM_EPISTEMIC = "SYSTEM_EPISTEMIC" |
| SOURCE_PROVENANCE = "SOURCE_PROVENANCE" |
| TEMPORAL_INTEGRITY = "TEMPORAL_INTEGRITY" |
| COMMUNITY_PLURALITY = "COMMUNITY_PLURALITY" |
| QUANTUM_GUARDIAN = "QUANTUM_GUARDIAN" |
| |
| @property |
| def requires_external_orchestration(self) -> bool: |
| """Check if validator requires external process""" |
| return self in [ |
| ValidatorArchetype.HUMAN_SOVEREIGN, |
| ValidatorArchetype.COMMUNITY_PLURALITY |
| ] |
|
|
| |
|
|
| @dataclass |
| class QuantumAwareSignature: |
| """ |
| Quantum-aware signature (not quantum-resistant) |
| Clearly labeled as using quantum-aware algorithms, not quantum-resistant cryptography |
| """ |
| algorithm: str = ProductionConfig.SIGNATURE_SCHEME |
| signature: str = "" |
| public_key_hash: str = "" |
| timestamp: str = "" |
| nonce: str = "" |
| proof_of_work: Optional[str] = None |
| |
| def __post_init__(self): |
| """Initialize with proper values""" |
| if not self.timestamp: |
| self.timestamp = datetime.utcnow().isoformat() + "Z" |
| if not self.nonce: |
| self.nonce = secrets.token_hex(16) |
| |
| @classmethod |
| def create(cls, data: Any, private_key_context: str = "") -> 'QuantumAwareSignature': |
| """ |
| Create quantum-aware signature using SHA3-512 |
| Note: This is quantum-aware, not quantum-resistant |
| """ |
| |
| if isinstance(data, dict): |
| data_str = json.dumps(data, sort_keys=True) |
| else: |
| data_str = str(data) |
| |
| |
| data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
| |
| |
| signature_parts = [ |
| "SIG", |
| data_hash[:32], |
| datetime.utcnow().strftime("%Y%m%d%H%M%S"), |
| hashlib.sha3_512(private_key_context.encode()).hexdigest()[:16] if private_key_context else secrets.token_hex(8) |
| ] |
| |
| signature = "_".join(signature_parts) |
| |
| return cls( |
| signature=signature, |
| public_key_hash=hashlib.sha3_512(private_key_context.encode()).hexdigest()[:32] if private_key_context else secrets.token_hex(32), |
| proof_of_work=cls._optional_proof_of_work(data_hash) |
| ) |
| |
| @staticmethod |
| def _optional_proof_of_work(data_hash: str, difficulty: int = 2) -> Optional[str]: |
| """ |
| Optional proof-of-work for rate limiting |
| Not for cryptographic security |
| """ |
| if difficulty <= 0: |
| return None |
| |
| nonce = 0 |
| target = "0" * difficulty |
| |
| |
| max_iterations = 10000 |
| while nonce < max_iterations: |
| test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
| if test_hash.startswith(target): |
| return f"{nonce}:{test_hash}" |
| nonce += 1 |
| |
| return None |
| |
| def verify(self, data: Any) -> Tuple[bool, Optional[str]]: |
| """ |
| Verify quantum-aware signature |
| Returns (is_valid, error_message) |
| """ |
| try: |
| |
| if isinstance(data, dict): |
| data_str = json.dumps(data, sort_keys=True) |
| else: |
| data_str = str(data) |
| |
| data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
| |
| |
| if not self.signature.startswith("SIG_"): |
| return False, "Invalid signature format" |
| |
| |
| parts = self.signature.split("_") |
| if len(parts) != 4: |
| return False, "Malformed signature" |
| |
| sig_type, signed_hash, timestamp, context = parts |
| |
| |
| if signed_hash != data_hash[:32]: |
| return False, "Hash mismatch" |
| |
| |
| try: |
| sig_time = datetime.strptime(timestamp, "%Y%m%d%H%M%S") |
| now = datetime.utcnow() |
| if (now - sig_time).total_seconds() > 86400: |
| return False, "Signature expired" |
| except ValueError: |
| return False, "Invalid timestamp format" |
| |
| |
| if self.proof_of_work: |
| try: |
| nonce, pow_hash = self.proof_of_work.split(":") |
| test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
| if test_hash != pow_hash: |
| return False, "Proof of work invalid" |
| except (ValueError, AttributeError): |
| return False, "Malformed proof of work" |
| |
| return True, None |
| |
| except Exception as e: |
| return False, f"Verification error: {str(e)}" |
|
|
| |
|
|
| @dataclass |
| class RealityNode: |
| """ |
| Immutable reality node with proper validation |
| Quantum-aware but not quantum-resistant |
| """ |
| content_hash: str |
| node_type: str |
| source_id: str |
| signature: QuantumAwareSignature |
| temporal_anchor: str |
| content: Dict[str, Any] |
| metadata: Dict[str, Any] = field(default_factory=dict) |
| witness_signatures: List[Dict] = field(default_factory=list) |
| cross_references: Dict[str, List[str]] = field(default_factory=dict) |
| proof_of_existence: Optional[str] = None |
| n8n_execution_id: Optional[str] = None |
| |
| def __post_init__(self): |
| """Initialize with proof of existence""" |
| if not self.proof_of_existence: |
| self.proof_of_existence = self._create_proof_of_existence() |
| |
| def _create_proof_of_existence(self) -> str: |
| """Create proof of existence using external time simulation""" |
| proof_data = { |
| "content_hash": self.content_hash, |
| "temporal_anchor": self.temporal_anchor, |
| "witness_count": len(self.witness_signatures), |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "external_anchor": self._simulate_external_time_anchor() |
| } |
| |
| return hashlib.sha3_512( |
| json.dumps(proof_data, sort_keys=True).encode() |
| ).hexdigest() |
| |
| def _simulate_external_time_anchor(self) -> str: |
| """Simulate external time oracle - clearly labeled as simulation""" |
| current_timestamp = int(time.time()) |
| |
| return hashlib.sha3_512( |
| f"simulated_anchor_{current_timestamp // 600}".encode() |
| ).hexdigest() |
| |
| def add_witness(self, validator_id: str, signature: QuantumAwareSignature, |
| attestation_data: Dict = None) -> None: |
| """Add witness signature with attestation data""" |
| witness_entry = { |
| "validator_id": validator_id, |
| "signature": signature.signature, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "public_key_hash": signature.public_key_hash, |
| "attestation": attestation_data or {} |
| } |
| |
| self.witness_signatures.append(witness_entry) |
| self.metadata.setdefault("witnesses", []).append(validator_id) |
| |
| def validate(self) -> Tuple[bool, List[str]]: |
| """ |
| Comprehensive node validation with clear error messages |
| Returns (is_valid, errors) |
| """ |
| errors = [] |
| |
| |
| try: |
| content_str = json.dumps(self.content, sort_keys=True) |
| computed_hash = hashlib.sha3_512(content_str.encode()).hexdigest() |
| |
| if computed_hash != self.content_hash: |
| errors.append(f"Content hash mismatch: expected {self.content_hash[:16]}..., got {computed_hash[:16]}...") |
| except (TypeError, ValueError) as e: |
| errors.append(f"Content serialization error: {str(e)}") |
| |
| |
| is_valid_sig, sig_error = self.signature.verify(self.content) |
| if not is_valid_sig: |
| errors.append(f"Signature validation failed: {sig_error}") |
| |
| |
| try: |
| node_time = datetime.fromisoformat(self.temporal_anchor.replace('Z', '+00:00')) |
| now = datetime.utcnow() |
| |
| |
| time_diff = (node_time - now).total_seconds() |
| |
| if time_diff > ProductionConfig.MAX_FUTURE_TOLERANCE_SECONDS: |
| errors.append(f"Future timestamp beyond tolerance: {time_diff:.0f}s ahead") |
| elif time_diff > 0: |
| logger.info(f"Timestamp {time_diff:.0f}s in future (within tolerance)") |
| |
| |
| past_diff = (now - node_time).total_seconds() |
| if past_diff > ProductionConfig.MAX_PAST_TOLERANCE_DAYS * 86400: |
| errors.append(f"Timestamp too far in past: {past_diff/86400:.0f} days") |
| |
| except ValueError as e: |
| errors.append(f"Invalid temporal anchor format: {str(e)}") |
| |
| |
| if not self.proof_of_existence: |
| errors.append("Missing proof of existence") |
| |
| |
| if len(self.witness_signatures) < ProductionConfig.MIN_VALIDATORS: |
| errors.append(f"Insufficient witnesses: {len(self.witness_signatures)}/{ProductionConfig.MIN_VALIDATORS}") |
| |
| |
| for i, witness in enumerate(self.witness_signatures): |
| |
| if not witness.get("validator_id"): |
| errors.append(f"Witness {i} missing validator_id") |
| if not witness.get("signature"): |
| errors.append(f"Witness {i} missing signature") |
| if not witness.get("timestamp"): |
| errors.append(f"Witness {i} missing timestamp") |
| |
| return len(errors) == 0, errors |
| |
| def calculate_quorum(self) -> Tuple[float, float, Dict[str, List[str]]]: |
| """ |
| Calculate quorum statistics |
| Returns (agreement_score, dissent_score, groups) |
| """ |
| if not self.witness_signatures: |
| return 0.0, 0.0, {} |
| |
| |
| attestation_groups = defaultdict(list) |
| for witness in self.witness_signatures: |
| attestation = witness.get("attestation", {}) |
| |
| group_key = hashlib.sha3_512( |
| json.dumps(attestation, sort_keys=True).encode() |
| ).hexdigest()[:16] |
| attestation_groups[group_key].append(witness["validator_id"]) |
| |
| |
| total_witnesses = len(self.witness_signatures) |
| group_sizes = [len(ids) for ids in attestation_groups.values()] |
| |
| if not group_sizes: |
| return 0.0, 0.0, {} |
| |
| max_group_size = max(group_sizes) |
| agreement_score = max_group_size / total_witnesses |
| |
| |
| second_largest = sorted(group_sizes, reverse=True)[1] if len(group_sizes) > 1 else 0 |
| dissent_score = second_largest / total_witnesses |
| |
| |
| readable_groups = {} |
| for group_key, validator_ids in attestation_groups.items(): |
| readable_groups[group_key[:8]] = { |
| "validators": validator_ids, |
| "size": len(validator_ids), |
| "percentage": len(validator_ids) / total_witnesses |
| } |
| |
| return agreement_score, dissent_score, readable_groups |
| |
| def to_transport_format(self) -> Dict[str, Any]: |
| """Convert to transport format for n8n/webhooks""" |
| return { |
| "node_id": self.content_hash[:32], |
| "node_type": self.node_type, |
| "source": self.source_id, |
| "content_preview": str(self.content)[:500] + "..." if len(str(self.content)) > 500 else str(self.content), |
| "timestamp": self.temporal_anchor, |
| "witness_count": len(self.witness_signatures), |
| "proof_of_existence": self.proof_of_existence[:32] + "..." if self.proof_of_existence else None, |
| "metadata_summary": { |
| "keys": list(self.metadata.keys()), |
| "witness_ids": [w.get("validator_id", "unknown") for w in self.witness_signatures] |
| }, |
| "execution_id": self.n8n_execution_id or f"exec_{uuid.uuid4().hex[:8]}" |
| } |
|
|
| |
|
|
| class N8NClient: |
| """n8n client with proper async session management""" |
| |
| def __init__(self): |
| self.base_url = ProductionConfig.N8N_WEBHOOK_URL |
| self.api_key = ProductionConfig.N8N_API_KEY |
| self.timeout = ProductionConfig.N8N_TIMEOUT_SECONDS |
| self.max_retries = ProductionConfig.N8N_MAX_RETRIES |
| |
| |
| self._session: Optional[aiohttp.ClientSession] = None |
| self._session_lock = asyncio.Lock() |
| |
| async def get_session(self) -> aiohttp.ClientSession: |
| """Get or create session with proper cleanup""" |
| async with self._session_lock: |
| if self._session is None or self._session.closed: |
| timeout = ClientTimeout(total=self.timeout) |
| headers = { |
| "User-Agent": "ImmutableRealityEngine/5.0", |
| "Content-Type": "application/json" |
| } |
| |
| if self.api_key: |
| headers["Authorization"] = f"Bearer {self.api_key}" |
| |
| self._session = ClientSession( |
| timeout=timeout, |
| headers=headers |
| ) |
| logger.info("Created new n8n session") |
| |
| return self._session |
| |
| async def execute_workflow(self, workflow_id: str, payload: Dict) -> Dict[str, Any]: |
| """ |
| Execute n8n workflow with exponential backoff and proper error handling |
| """ |
| session = await self.get_session() |
| url = f"{self.base_url}/{workflow_id}" |
| |
| for attempt in range(self.max_retries): |
| try: |
| async with session.post(url, json=payload) as response: |
| if response.status == 200: |
| result = await response.json() |
| return { |
| "success": True, |
| "workflow_id": workflow_id, |
| "execution_id": result.get("executionId", f"exec_{uuid.uuid4().hex[:8]}"), |
| "data": result.get("data", {}), |
| "metrics": result.get("metrics", {}), |
| "status_code": response.status, |
| "attempt": attempt + 1, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| else: |
| error_text = await response.text() |
| logger.warning(f"n8n workflow {workflow_id} failed (attempt {attempt + 1}/{self.max_retries}): {response.status} - {error_text}") |
| |
| |
| if attempt < self.max_retries - 1: |
| await asyncio.sleep(2 ** attempt) |
| continue |
| |
| return { |
| "success": False, |
| "error": f"n8n returned {response.status}: {error_text[:200]}", |
| "workflow_id": workflow_id, |
| "status_code": response.status, |
| "attempt": attempt + 1, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| except asyncio.TimeoutError: |
| logger.warning(f"n8n timeout for {workflow_id} (attempt {attempt + 1}/{self.max_retries})") |
| if attempt < self.max_retries - 1: |
| await asyncio.sleep(2 ** attempt) |
| continue |
| return { |
| "success": False, |
| "error": f"Timeout after {self.timeout}s", |
| "workflow_id": workflow_id, |
| "attempt": attempt + 1, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| except aiohttp.ClientError as e: |
| logger.warning(f"n8n connection error for {workflow_id} (attempt {attempt + 1}/{self.max_retries}): {str(e)}") |
| if attempt < self.max_retries - 1: |
| await asyncio.sleep(2 ** attempt) |
| continue |
| return { |
| "success": False, |
| "error": f"Connection error: {str(e)}", |
| "workflow_id": workflow_id, |
| "attempt": attempt + 1, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| |
| return { |
| "success": False, |
| "error": "Max retries exceeded", |
| "workflow_id": workflow_id, |
| "attempt": self.max_retries, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| async def batch_execute(self, workflows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| """Execute multiple workflows in parallel with proper limits""" |
| semaphore = asyncio.Semaphore(ProductionConfig.MAX_CONCURRENT_DETECTIONS) |
| |
| async def execute_with_limit(workflow: Dict[str, Any]) -> Dict[str, Any]: |
| async with semaphore: |
| return await self.execute_workflow( |
| workflow["workflow_id"], |
| workflow["payload"] |
| ) |
| |
| tasks = [execute_with_limit(wf) for wf in workflows] |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| |
| processed_results = [] |
| for i, result in enumerate(results): |
| if isinstance(result, Exception): |
| processed_results.append({ |
| "success": False, |
| "error": str(result), |
| "workflow_id": workflows[i]["workflow_id"], |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| }) |
| else: |
| processed_results.append(result) |
| |
| return processed_results |
| |
| async def close(self): |
| """Properly close session""" |
| async with self._session_lock: |
| if self._session and not self._session.closed: |
| await self._session.close() |
| self._session = None |
| logger.info("Closed n8n session") |
|
|
| |
|
|
| class ImmutableLedger: |
| """ |
| Immutable ledger with proper sync/async separation |
| Quantum-aware append-only log (not a blockchain) |
| """ |
| |
| def __init__(self, n8n_client: N8NClient, storage_path: str = None): |
| self.n8n = n8n_client |
| self.storage_path = storage_path or ProductionConfig.LEDGER_PATH |
| os.makedirs(self.storage_path, exist_ok=True) |
| |
| self.chain: List[Dict] = [] |
| self.node_index: Dict[str, List[str]] = defaultdict(list) |
| self.validator_index: Dict[str, List[str]] = defaultdict(list) |
| self.temporal_index: Dict[str, List[str]] = defaultdict(list) |
| |
| |
| self._bootstrap_sync() |
| |
| def _bootstrap_sync(self): |
| """Synchronous bootstrap - no async calls""" |
| ledger_file = os.path.join(self.storage_path, "ledger.json") |
| |
| if os.path.exists(ledger_file): |
| try: |
| with open(ledger_file, 'r') as f: |
| data = json.load(f) |
| self.chain = data.get("chain", []) |
| self._rebuild_indexes_sync() |
| logger.info(f"Loaded ledger: {len(self.chain)} blocks, {len(self.node_index)} nodes indexed") |
| |
| |
| if not self._validate_chain_sync(): |
| logger.warning("Ledger integrity check failed, creating new genesis") |
| self._create_genesis_sync() |
| except Exception as e: |
| logger.error(f"Failed to load ledger: {e}") |
| self._create_genesis_sync() |
| else: |
| self._create_genesis_sync() |
| |
| def _create_genesis_sync(self): |
| """Create genesis block synchronously""" |
| genesis = { |
| "id": "genesis_v5", |
| "prev": "0" * 128, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "nodes": [], |
| "metadata": { |
| "version": "IRE_v5.0", |
| "genesis": True, |
| "created_by": "ImmutableLedger", |
| "hash_algorithm": ProductionConfig.HASH_ALGORITHM, |
| "note": "Quantum-aware, not quantum-resistant" |
| }, |
| "hash": self._hash_block_sync({"genesis": True}), |
| "signatures": [] |
| } |
| |
| self.chain.append(genesis) |
| self._save_ledger_sync() |
| logger.info("Created genesis block") |
| |
| def _hash_block_sync(self, data: Dict) -> str: |
| """Synchronous hashing""" |
| return hashlib.sha3_512( |
| json.dumps(data, sort_keys=True).encode() |
| ).hexdigest() |
| |
| def _rebuild_indexes_sync(self): |
| """Rebuild indexes synchronously""" |
| self.node_index.clear() |
| self.validator_index.clear() |
| self.temporal_index.clear() |
| |
| for block in self.chain: |
| block_id = block["id"] |
| |
| |
| for node in block.get("nodes", []): |
| node_hash = node.get("content_hash") |
| if node_hash: |
| self.node_index[node_hash].append(block_id) |
| |
| |
| for sig in block.get("signatures", []): |
| validator = sig.get("validator_id") |
| if validator: |
| self.validator_index[validator].append(block_id) |
| |
| |
| timestamp = block.get("timestamp", "") |
| if timestamp: |
| date_key = timestamp[:10] |
| self.temporal_index[date_key].append(block_id) |
| |
| def _validate_chain_sync(self) -> bool: |
| """Validate chain integrity synchronously""" |
| if not self.chain: |
| return False |
| |
| if self.chain[0]["id"] != "genesis_v5": |
| return False |
| |
| for i in range(1, len(self.chain)): |
| current = self.chain[i] |
| previous = self.chain[i - 1] |
| |
| if current["prev"] != previous["hash"]: |
| return False |
| |
| return True |
| |
| def _save_ledger_sync(self): |
| """Save ledger synchronously with atomic write""" |
| ledger_data = { |
| "chain": self.chain, |
| "metadata": { |
| "version": "IRE_v5.0", |
| "total_blocks": len(self.chain), |
| "total_nodes": sum(len(b.get("nodes", [])) for b in self.chain), |
| "last_updated": datetime.utcnow().isoformat() + "Z", |
| "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| } |
| } |
| |
| ledger_file = os.path.join(self.storage_path, "ledger.json") |
| temp_file = ledger_file + ".tmp" |
| |
| try: |
| |
| with open(temp_file, 'w') as f: |
| json.dump(ledger_data, f, indent=2) |
| |
| |
| os.replace(temp_file, ledger_file) |
| |
| except Exception as e: |
| logger.error(f"Failed to save ledger: {e}") |
| |
| if os.path.exists(temp_file): |
| os.remove(temp_file) |
| raise |
| |
| async def commit_node(self, node: RealityNode, validators: List[str]) -> Dict[str, Any]: |
| """Commit node to ledger via n8n orchestration""" |
| |
| |
| is_valid, errors = node.validate() |
| if not is_valid: |
| return { |
| "success": False, |
| "error": f"Node validation failed: {errors}", |
| "node_id": node.content_hash[:32], |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| |
| payload = { |
| "operation": "ledger_commit", |
| "node": node.to_transport_format(), |
| "validators": validators, |
| "current_chain_length": len(self.chain), |
| "previous_block_hash": self.chain[-1]["hash"] if self.chain else "0" * 128, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| |
| response = await self.n8n.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["ledger_commit"], |
| payload |
| ) |
| |
| if response.get("success"): |
| block_data = response.get("data", {}).get("block", {}) |
| |
| |
| if self._validate_block_sync(block_data): |
| self.chain.append(block_data) |
| self._update_indexes_sync(block_data) |
| self._save_ledger_sync() |
| |
| logger.info(f"Committed node {node.content_hash[:16]}... in block {block_data.get('id', 'unknown')}") |
| |
| return { |
| "success": True, |
| "block_id": block_data.get("id", "unknown"), |
| "block_hash": block_data.get("hash", "unknown")[:32] + "...", |
| "node_id": node.content_hash[:32], |
| "validator_count": len(validators), |
| "ledger_length": len(self.chain), |
| "n8n_execution_id": response.get("execution_id"), |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| else: |
| return { |
| "success": False, |
| "error": "Block validation failed", |
| "n8n_response": response, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| return { |
| "success": False, |
| "error": "Failed to commit node via n8n", |
| "n8n_response": response, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| def _validate_block_sync(self, block: Dict) -> bool: |
| """Validate block structure synchronously""" |
| required_fields = ["id", "prev", "timestamp", "hash", "nodes"] |
| for field in required_fields: |
| if field not in block: |
| logger.error(f"Block missing required field: {field}") |
| return False |
| |
| |
| if self.chain and block["prev"] != self.chain[-1]["hash"]: |
| logger.error(f"Block prev hash mismatch: {block['prev'][:16]}... != {self.chain[-1]['hash'][:16]}...") |
| return False |
| |
| return True |
| |
| def _update_indexes_sync(self, block: Dict): |
| """Update indexes synchronously""" |
| block_id = block["id"] |
| |
| |
| for node in block.get("nodes", []): |
| node_hash = node.get("content_hash") |
| if node_hash: |
| self.node_index[node_hash].append(block_id) |
| |
| |
| for sig in block.get("signatures", []): |
| validator = sig.get("validator_id") |
| if validator: |
| self.validator_index[validator].append(block_id) |
| |
| |
| timestamp = block.get("timestamp", "") |
| if timestamp: |
| date_key = timestamp[:10] |
| self.temporal_index[date_key].append(block_id) |
| |
| def get_node_history_sync(self, node_hash: str) -> List[Dict]: |
| """Get node history synchronously""" |
| block_ids = self.node_index.get(node_hash, []) |
| history = [] |
| |
| for block_id in block_ids: |
| block = next((b for b in self.chain if b["id"] == block_id), None) |
| if block: |
| history.append({ |
| "block_id": block_id, |
| "timestamp": block["timestamp"], |
| "block_hash": block["hash"][:16] + "...", |
| "validator_count": len(block.get("signatures", [])), |
| "block_index": self.chain.index(block) |
| }) |
| |
| return sorted(history, key=lambda x: x["timestamp"]) |
| |
| def analyze_health_sync(self) -> Dict[str, Any]: |
| """Analyze ledger health synchronously""" |
| if not self.chain: |
| return {"status": "EMPTY", "health_score": 0.0} |
| |
| total_blocks = len(self.chain) |
| total_nodes = sum(len(b.get("nodes", [])) for b in self.chain) |
| |
| |
| integrity_ok = self._validate_chain_sync() |
| |
| |
| block_intervals = [] |
| for i in range(1, len(self.chain)): |
| try: |
| prev_time = datetime.fromisoformat(self.chain[i-1]["timestamp"].replace('Z', '+00:00')) |
| curr_time = datetime.fromisoformat(self.chain[i]["timestamp"].replace('Z', '+00:00')) |
| interval = (curr_time - prev_time).total_seconds() |
| block_intervals.append(interval) |
| except (ValueError, KeyError): |
| pass |
| |
| |
| factors = [] |
| |
| |
| factors.append(1.0 if integrity_ok else 0.0) |
| |
| |
| factors.append(min(1.0, total_blocks / 100.0)) |
| |
| |
| factors.append(min(1.0, total_nodes / 500.0)) |
| |
| |
| unique_validators = len(self.validator_index) |
| factors.append(min(1.0, unique_validators / 10.0)) |
| |
| |
| unique_days = len(self.temporal_index) |
| factors.append(min(1.0, unique_days / 30.0)) |
| |
| |
| health_score = sum(factors) / len(factors) if factors else 0.0 |
| |
| |
| if health_score >= 0.8: |
| status = "HEALTHY" |
| elif health_score >= 0.6: |
| status = "DEGRADED" |
| elif health_score >= 0.4: |
| status = "WARNING" |
| else: |
| status = "CRITICAL" |
| |
| return { |
| "status": status, |
| "health_score": round(health_score, 3), |
| "metrics": { |
| "total_blocks": total_blocks, |
| "total_nodes": total_nodes, |
| "unique_nodes": len(self.node_index), |
| "unique_validators": unique_validators, |
| "unique_days": unique_days, |
| "chain_integrity": integrity_ok, |
| "average_block_interval": statistics.mean(block_intervals) if block_intervals else 0, |
| "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| }, |
| "factors": {f"factor_{i}": round(v, 3) for i, v in enumerate(factors)}, |
| "recommendations": self._generate_health_recommendations_sync(health_score, total_blocks, unique_validators) |
| } |
| |
| def _generate_health_recommendations_sync(self, health_score: float, |
| total_blocks: int, |
| unique_validators: int) -> List[str]: |
| """Generate health recommendations synchronously""" |
| recommendations = [] |
| |
| if health_score < 0.5: |
| recommendations.append("Ledger health critical - add more nodes and validators") |
| |
| if total_blocks < 10: |
| recommendations.append("Increase ledger activity to establish chain history") |
| |
| if unique_validators < ProductionConfig.MIN_VALIDATORS: |
| recommendations.append(f"Add more validators (currently {unique_validators}, need {ProductionConfig.MIN_VALIDATORS})") |
| |
| if not recommendations: |
| recommendations.append("Ledger operating within optimal parameters") |
| |
| return recommendations |
|
|
| |
|
|
| class LensMethodRegistry: |
| """ |
| Registry for lenses and methods with n8n orchestration |
| Cross-referential and externally managed |
| """ |
| |
| def __init__(self, n8n_client: N8NClient): |
| self.n8n = n8n_client |
| self.lenses: Dict[str, Dict] = {} |
| self.methods: Dict[str, Dict] = {} |
| self.cross_references: Dict[str, List[str]] = defaultdict(list) |
| self.inverse_references: Dict[str, List[str]] = defaultdict(list) |
| self.last_sync: Optional[str] = None |
| self.sync_lock = asyncio.Lock() |
| |
| async def sync_from_n8n(self) -> bool: |
| """Sync registry from n8n with proper locking""" |
| async with self.sync_lock: |
| try: |
| logger.info("Syncing registry from n8n...") |
| |
| |
| lenses_response = await self.n8n.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| {"operation": "get_registry", "type": "lenses"} |
| ) |
| |
| if lenses_response.get("success"): |
| self.lenses = lenses_response.get("data", {}).get("lenses", {}) |
| logger.info(f"Loaded {len(self.lenses)} lenses") |
| else: |
| logger.error(f"Failed to load lenses: {lenses_response.get('error')}") |
| return False |
| |
| |
| methods_response = await self.n8n.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["method_execution"], |
| {"operation": "get_registry", "type": "methods"} |
| ) |
| |
| if methods_response.get("success"): |
| self.methods = methods_response.get("data", {}).get("methods", {}) |
| logger.info(f"Loaded {len(self.methods)} methods") |
| else: |
| logger.error(f"Failed to load methods: {methods_response.get('error')}") |
| return False |
| |
| |
| self._build_cross_references() |
| |
| self.last_sync = datetime.utcnow().isoformat() + "Z" |
| logger.info("Registry sync completed successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Registry sync failed: {e}") |
| return False |
| |
| def _build_cross_references(self): |
| """Build cross-references between lenses and methods""" |
| self.cross_references.clear() |
| self.inverse_references.clear() |
| |
| |
| for method_id, method in self.methods.items(): |
| lens_ids = method.get("lens_ids", []) |
| for lens_id in lens_ids: |
| if lens_id in self.lenses: |
| self.cross_references[lens_id].append(method_id) |
| self.inverse_references[method_id].append(lens_id) |
| |
| logger.info(f"Built cross-references: {len(self.cross_references)} lenses ↔ {len(self.inverse_references)} methods") |
| |
| def get_lens(self, lens_id: str) -> Optional[Dict]: |
| """Get lens by ID""" |
| return self.lenses.get(str(lens_id)) |
| |
| def get_method(self, method_id: str) -> Optional[Dict]: |
| """Get method by ID""" |
| return self.methods.get(str(method_id)) |
| |
| def get_methods_for_lens(self, lens_id: str) -> List[Dict]: |
| """Get all methods for a lens""" |
| method_ids = self.cross_references.get(str(lens_id), []) |
| return [self.get_method(mid) for mid in method_ids if self.get_method(mid)] |
| |
| def get_lenses_for_method(self, method_id: str) -> List[Dict]: |
| """Get all lenses for a method""" |
| lens_ids = self.inverse_references.get(str(method_id), []) |
| return [self.get_lens(lid) for lid in lens_ids if self.get_lens(lid)] |
| |
| def find_similar_lenses(self, query: str, limit: int = 5) -> List[Dict]: |
| """Find lenses similar to query (simple keyword matching)""" |
| query_lower = query.lower() |
| results = [] |
| |
| for lens_id, lens in self.lenses.items(): |
| score = 0 |
| |
| |
| if query_lower in lens.get("name", "").lower(): |
| score += 3 |
| |
| |
| if query_lower in lens.get("description", "").lower(): |
| score += 2 |
| |
| |
| keywords = lens.get("keywords", []) |
| for keyword in keywords: |
| if query_lower in keyword.lower(): |
| score += 1 |
| |
| if score > 0: |
| result = lens.copy() |
| result["match_score"] = score |
| results.append(result) |
| |
| results.sort(key=lambda x: x.get("match_score", 0), reverse=True) |
| return results[:limit] |
| |
| async def execute_method_via_n8n(self, method_id: str, content: Dict, |
| context: Dict = None) -> Dict[str, Any]: |
| """Execute method via n8n orchestration""" |
| method = self.get_method(method_id) |
| if not method: |
| return { |
| "success": False, |
| "error": f"Method {method_id} not found", |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| payload = { |
| "operation": "execute_method", |
| "method_id": method_id, |
| "method_name": method.get("name", "Unknown"), |
| "content": content, |
| "context": context or {}, |
| "registry_version": self.last_sync, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| return await self.n8n.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["method_execution"], |
| payload |
| ) |
|
|
| |
|
|
| class QuorumSystem: |
| """Proper quorum calculation and validation system""" |
| |
| def __init__(self): |
| self.quorum_threshold = ProductionConfig.QUORUM_THRESHOLD |
| self.dissent_threshold = ProductionConfig.DISSENT_THRESHOLD |
| |
| def calculate_quorum(self, attestations: List[Dict]) -> Dict[str, Any]: |
| """ |
| Calculate quorum statistics from attestations |
| Returns detailed quorum analysis |
| """ |
| if not attestations: |
| return { |
| "quorum_met": False, |
| "agreement_score": 0.0, |
| "dissent_score": 0.0, |
| "total_votes": 0, |
| "analysis": "No attestations" |
| } |
| |
| total_votes = len(attestations) |
| |
| |
| decision_groups = defaultdict(list) |
| for att in attestations: |
| decision = att.get("decision", "unknown") |
| decision_hash = hashlib.sha3_512( |
| json.dumps(decision, sort_keys=True).encode() |
| ).hexdigest()[:16] |
| decision_groups[decision_hash].append(att) |
| |
| |
| group_sizes = [len(group) for group in decision_groups.values()] |
| if not group_sizes: |
| return { |
| "quorum_met": False, |
| "agreement_score": 0.0, |
| "dissent_score": 0.0, |
| "total_votes": total_votes, |
| "analysis": "No valid decisions" |
| } |
| |
| |
| group_sizes.sort(reverse=True) |
| largest_group = group_sizes[0] |
| second_largest = group_sizes[1] if len(group_sizes) > 1 else 0 |
| |
| |
| agreement_score = largest_group / total_votes |
| dissent_score = second_largest / total_votes if second_largest > 0 else 0 |
| |
| |
| quorum_met = agreement_score >= self.quorum_threshold |
| dissent_issue = dissent_score >= self.dissent_threshold |
| |
| |
| analysis_parts = [] |
| if quorum_met: |
| analysis_parts.append(f"Quorum met ({agreement_score:.1%} ≥ {self.quorum_threshold:.1%})") |
| else: |
| analysis_parts.append(f"Quorum not met ({agreement_score:.1%} < {self.quorum_threshold:.1%})") |
| |
| if dissent_issue: |
| analysis_parts.append(f"Significant dissent ({dissent_score:.1%} ≥ {self.dissent_threshold:.1%})") |
| |
| |
| group_details = {} |
| for decision_hash, group in decision_groups.items(): |
| group_details[decision_hash[:8]] = { |
| "size": len(group), |
| "percentage": len(group) / total_votes, |
| "validators": [a.get("validator_id", "unknown") for a in group], |
| "sample_decision": group[0].get("decision", "unknown") if group else None |
| } |
| |
| return { |
| "quorum_met": quorum_met, |
| "agreement_score": round(agreement_score, 3), |
| "dissent_score": round(dissent_score, 3), |
| "total_votes": total_votes, |
| "group_count": len(decision_groups), |
| "largest_group_size": largest_group, |
| "analysis": "; ".join(analysis_parts), |
| "group_details": group_details, |
| "thresholds": { |
| "quorum": self.quorum_threshold, |
| "dissent": self.dissent_threshold |
| } |
| } |
| |
| async def validate_quorum_via_n8n(self, node: RealityNode, |
| attestations: List[Dict]) -> Dict[str, Any]: |
| """Validate quorum via n8n for complex cases""" |
| payload = { |
| "operation": "quorum_validation", |
| "node_hash": node.content_hash[:32], |
| "attestations": attestations, |
| "total_witnesses": len(node.witness_signatures), |
| "quorum_threshold": self.quorum_threshold, |
| "dissent_threshold": self.dissent_threshold, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| return await self.n8n.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["quorum_calculation"], |
| payload |
| ) |
|
|
| |
|
|
| class ProductionDetectionEngine: |
| """ |
| Production-ready detection engine with all fixes applied |
| Proper async/await, error handling, and clear guarantees |
| """ |
| |
| def __init__(self): |
| |
| self.n8n_client = N8NClient() |
| self.registry = LensMethodRegistry(self.n8n_client) |
| self.ledger = ImmutableLedger(self.n8n_client) |
| self.quorum_system = QuorumSystem() |
| |
| |
| self.metrics = { |
| "total_detections": 0, |
| "successful_detections": 0, |
| "failed_detections": 0, |
| "average_execution_time": 0.0, |
| "phase_distribution": Counter(), |
| "equilibrium_detections": 0, |
| "quorum_validations": 0, |
| "ledger_commits": 0 |
| } |
| |
| |
| self.result_cache: Dict[str, Dict] = {} |
| self.cache_lock = asyncio.Lock() |
| |
| |
| self._background_tasks: List[asyncio.Task] = [] |
| |
| logger.info("Production Detection Engine initialized") |
| |
| async def initialize(self): |
| """Async initialization""" |
| try: |
| |
| success = await self.registry.sync_from_n8n() |
| if not success: |
| logger.warning("Registry sync failed, using empty registry") |
| |
| |
| cleanup_task = asyncio.create_task(self._cleanup_loop()) |
| self._background_tasks.append(cleanup_task) |
| |
| logger.info("Engine initialization completed") |
| |
| except Exception as e: |
| logger.error(f"Engine initialization failed: {e}") |
| raise |
| |
| async def detect_suppression(self, content: Dict, context: Dict = None) -> Dict[str, Any]: |
| """ |
| Main detection pipeline with proper error handling and metrics |
| """ |
| detection_id = f"det_{uuid.uuid4().hex[:16]}" |
| start_time = time.time() |
| |
| try: |
| logger.info(f"Starting detection {detection_id}") |
| |
| |
| content_hash = hashlib.sha3_512( |
| json.dumps(content, sort_keys=True).encode() |
| ).hexdigest() |
| |
| node = RealityNode( |
| content_hash=content_hash, |
| node_type="suppression_detection", |
| source_id=context.get("source", "unknown") if context else "unknown", |
| signature=QuantumAwareSignature.create(content), |
| temporal_anchor=datetime.utcnow().isoformat() + "Z", |
| content=content, |
| metadata={ |
| "detection_id": detection_id, |
| "context": context or {}, |
| "engine_version": "IRE_v5.0_Production" |
| } |
| ) |
| |
| |
| content_analysis = await self._analyze_content(content, context) |
| |
| |
| pattern_analysis = await self._detect_patterns(content, content_analysis) |
| |
| |
| current_phase = self._determine_phase(pattern_analysis) |
| |
| |
| method_results = await self._apply_methods(content, current_phase, pattern_analysis) |
| |
| |
| equilibrium_analysis = await self._detect_equilibrium(pattern_analysis, method_results) |
| |
| |
| threat_analysis = await self._analyze_threats({ |
| "content": content, |
| "patterns": pattern_analysis, |
| "methods": method_results, |
| "equilibrium": equilibrium_analysis |
| }) |
| |
| |
| composite_analysis = self._create_composite_analysis( |
| content_analysis, pattern_analysis, method_results, |
| equilibrium_analysis, threat_analysis |
| ) |
| |
| |
| node.metadata["analysis"] = composite_analysis |
| node.metadata["detection_phase"] = current_phase |
| node.n8n_execution_id = f"exec_{uuid.uuid4().hex[:8]}" |
| |
| |
| validators = self._select_validators(threat_analysis, current_phase) |
| |
| |
| attestations = await self._get_attestations(node, validators, composite_analysis) |
| |
| |
| successful_attestations = 0 |
| for att in attestations: |
| if att.get("success"): |
| validator_id = att.get("validator_id") |
| signature_data = att.get("signature_data", {}) |
| signature = QuantumAwareSignature(**signature_data) |
| node.add_witness(validator_id, signature, att.get("attestation", {})) |
| successful_attestations += 1 |
| |
| |
| quorum_result = self.quorum_system.calculate_quorum( |
| [a.get("attestation", {}) for a in attestations if a.get("success")] |
| ) |
| |
| |
| ledger_result = None |
| if quorum_result.get("quorum_met", False) and successful_attestations >= ProductionConfig.MIN_VALIDATORS: |
| ledger_result = await self.ledger.commit_node(node, validators) |
| if ledger_result.get("success"): |
| self.metrics["ledger_commits"] += 1 |
| |
| execution_time = time.time() - start_time |
| |
| |
| self._update_metrics( |
| success=True, |
| execution_time=execution_time, |
| phase=current_phase, |
| has_equilibrium=equilibrium_analysis.get("has_equilibrium", False), |
| quorum_met=quorum_result.get("quorum_met", False) |
| ) |
| |
| |
| result = { |
| "success": True, |
| "detection_id": detection_id, |
| "execution_time": execution_time, |
| "current_phase": current_phase, |
| "reality_node": { |
| "hash": node.content_hash[:32], |
| "proof_of_existence": node.proof_of_existence[:32] + "..." if node.proof_of_existence else None, |
| "witness_count": len(node.witness_signatures) |
| }, |
| "analysis": composite_analysis, |
| "quorum_result": quorum_result, |
| "attestation_result": { |
| "requested": len(validators), |
| "successful": successful_attestations, |
| "quorum_met": quorum_result.get("quorum_met", False) |
| }, |
| "ledger_result": ledger_result, |
| "metrics": { |
| "patterns_found": len(pattern_analysis.get("patterns", [])), |
| "methods_applied": method_results.get("methods_applied", 0), |
| "threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
| "equilibrium_detected": equilibrium_analysis.get("has_equilibrium", False) |
| }, |
| "engine_metadata": { |
| "version": "IRE_v5.0_Production", |
| "quantum_aware": True, |
| "n8n_integrated": True, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| } |
| |
| |
| await self._cache_result(detection_id, result) |
| |
| logger.info(f"Detection {detection_id} completed successfully in {execution_time:.2f}s") |
| |
| return result |
| |
| except Exception as e: |
| execution_time = time.time() - start_time |
| error_id = f"err_{uuid.uuid4().hex[:8]}" |
| |
| self._update_metrics(success=False, execution_time=execution_time) |
| |
| logger.error(f"Detection {detection_id} failed: {e}", error_id=error_id) |
| |
| return { |
| "success": False, |
| "detection_id": detection_id, |
| "error_id": error_id, |
| "error": str(e), |
| "execution_time": execution_time, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "engine_metadata": { |
| "version": "IRE_v5.0_Production", |
| "error_reported": True |
| } |
| } |
| |
| async def _analyze_content(self, content: Dict, context: Dict = None) -> Dict: |
| """Analyze content via n8n""" |
| payload = { |
| "operation": "content_analysis", |
| "content": content, |
| "context": context or {}, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| payload |
| ) |
| |
| return response.get("data", {}) if response.get("success") else {} |
| |
| async def _detect_patterns(self, content: Dict, content_analysis: Dict) -> Dict: |
| """Detect patterns via n8n""" |
| payload = { |
| "operation": "pattern_detection", |
| "content": content, |
| "content_analysis": content_analysis, |
| "lens_count": len(self.registry.lenses), |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
| payload |
| ) |
| |
| return response.get("data", {}) if response.get("success") else {} |
| |
| def _determine_phase(self, pattern_analysis: Dict) -> str: |
| """Determine suppression phase""" |
| patterns = pattern_analysis.get("patterns", []) |
| |
| |
| equilibrium_count = sum(1 for p in patterns if p.get("type") == "equilibrium") |
| |
| if equilibrium_count >= 3: |
| return SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value |
| elif equilibrium_count >= 1: |
| return SuppressionPhase.ESTABLISHING_SUPPRESSION.value |
| else: |
| return SuppressionPhase.ACTIVE_SUPPRESSION.value |
| |
| async def _apply_methods(self, content: Dict, phase: str, |
| pattern_analysis: Dict) -> Dict: |
| """Apply detection methods""" |
| payload = { |
| "operation": "method_execution", |
| "content": content, |
| "phase": phase, |
| "pattern_analysis": pattern_analysis, |
| "method_count": len(self.registry.methods), |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["method_execution"], |
| payload |
| ) |
| |
| return response.get("data", {}) if response.get("success") else {} |
| |
| async def _detect_equilibrium(self, pattern_analysis: Dict, |
| method_results: Dict) -> Dict: |
| """Detect equilibrium patterns""" |
| payload = { |
| "operation": "equilibrium_detection", |
| "pattern_analysis": pattern_analysis, |
| "method_results": method_results, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["equilibrium_detection"], |
| payload |
| ) |
| |
| return response.get("data", {}) if response.get("success") else {} |
| |
| async def _analyze_threats(self, system_state: Dict) -> Dict: |
| """Analyze STRIDE-E threats""" |
| payload = { |
| "operation": "threat_analysis", |
| "system_state": system_state, |
| "threat_model": "STRIDE-E", |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["threat_analysis"], |
| payload |
| ) |
| |
| return response.get("data", {}) if response.get("success") else {} |
| |
| def _create_composite_analysis(self, content_analysis: Dict, |
| pattern_analysis: Dict, |
| method_results: Dict, |
| equilibrium_analysis: Dict, |
| threat_analysis: Dict) -> Dict: |
| """Create composite analysis""" |
| |
| pattern_score = pattern_analysis.get("confidence", 0.0) |
| method_score = method_results.get("confidence", 0.0) |
| equilibrium_score = equilibrium_analysis.get("equilibrium_score", 0.0) |
| threat_score = threat_analysis.get("risk_score", 0.0) |
| |
| |
| weights = {"pattern": 0.3, "method": 0.4, "equilibrium": 0.2, "threat": 0.1} |
| composite_score = ( |
| pattern_score * weights["pattern"] + |
| method_score * weights["method"] + |
| equilibrium_score * weights["equilibrium"] + |
| (1 - threat_score) * weights["threat"] |
| ) |
| |
| |
| if threat_score > 0.7: |
| system_status = "CRITICAL" |
| elif threat_score > 0.4: |
| system_status = "DEGRADED" |
| elif composite_score > 0.7: |
| system_status = "HEALTHY" |
| elif composite_score > 0.4: |
| system_status = "MONITOR" |
| else: |
| system_status = "UNKNOWN" |
| |
| return { |
| "composite_score": round(composite_score, 3), |
| "system_status": system_status, |
| "component_scores": { |
| "pattern": round(pattern_score, 3), |
| "method": round(method_score, 3), |
| "equilibrium": round(equilibrium_score, 3), |
| "threat": round(threat_score, 3) |
| }, |
| "has_equilibrium": equilibrium_analysis.get("has_equilibrium", False), |
| "threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
| "pattern_count": len(pattern_analysis.get("patterns", [])), |
| "method_count": method_results.get("methods_applied", 0), |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "note": "Quantum-aware analysis, not quantum-resistant" |
| } |
| |
| def _select_validators(self, threat_analysis: Dict, phase: str) -> List[str]: |
| """Select validators based on analysis""" |
| validators = [] |
| |
| |
| validators.append("system_epistemic_v5") |
| validators.append("temporal_integrity_v5") |
| |
| |
| threat_level = threat_analysis.get("threat_level", "UNKNOWN") |
| if threat_level in ["HIGH", "CRITICAL"]: |
| validators.append("human_sovereign_v5") |
| |
| if phase == SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value: |
| validators.append("quantum_guardian_v5") |
| |
| |
| while len(validators) < ProductionConfig.MIN_VALIDATORS: |
| validators.append(f"backup_validator_{len(validators)}") |
| |
| return validators |
| |
| async def _get_attestations(self, node: RealityNode, |
| validators: List[str], |
| analysis: Dict) -> List[Dict]: |
| """Get validator attestations""" |
| attestations = [] |
| |
| for validator_id in validators: |
| payload = { |
| "operation": "validator_attestation", |
| "validator_id": validator_id, |
| "node": node.to_transport_format(), |
| "analysis": analysis, |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| } |
| |
| response = await self.n8n_client.execute_workflow( |
| ProductionConfig.WORKFLOW_IDS["validator_attestation"], |
| payload |
| ) |
| |
| if response.get("success"): |
| attestations.append({ |
| "validator_id": validator_id, |
| "success": True, |
| "signature_data": response.get("data", {}).get("signature"), |
| "attestation": response.get("data", {}).get("attestation"), |
| "timestamp": response.get("timestamp") |
| }) |
| else: |
| attestations.append({ |
| "validator_id": validator_id, |
| "success": False, |
| "error": response.get("error", "Unknown error"), |
| "timestamp": datetime.utcnow().isoformat() + "Z" |
| }) |
| |
| return attestations |
| |
| def _update_metrics(self, success: bool, execution_time: float, |
| phase: str = None, has_equilibrium: bool = False, |
| quorum_met: bool = False): |
| """Update engine metrics""" |
| self.metrics["total_detections"] += 1 |
| |
| if success: |
| self.metrics["successful_detections"] += 1 |
| else: |
| self.metrics["failed_detections"] += 1 |
| |
| |
| old_avg = self.metrics["average_execution_time"] |
| total = self.metrics["total_detections"] |
| self.metrics["average_execution_time"] = ( |
| (old_avg * (total - 1)) + execution_time |
| ) / total if total > 0 else execution_time |
| |
| if phase: |
| self.metrics["phase_distribution"][phase] += 1 |
| |
| if has_equilibrium: |
| self.metrics["equilibrium_detections"] += 1 |
| |
| if quorum_met: |
| self.metrics["quorum_validations"] += 1 |
| |
| async def _cache_result(self, detection_id: str, result: Dict): |
| """Cache result with TTL""" |
| async with self.cache_lock: |
| self.result_cache[detection_id] = { |
| "result": result, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "expires": (datetime.utcnow() + timedelta(hours=24)).isoformat() + "Z" |
| } |
| |
| async def _cleanup_loop(self): |
| """Background cleanup loop""" |
| while True: |
| try: |
| await asyncio.sleep(3600) |
| |
| now = datetime.utcnow() |
| expired_keys = [] |
| |
| async with self.cache_lock: |
| for key, entry in self.result_cache.items(): |
| expires = datetime.fromisoformat(entry["expires"].replace('Z', '+00:00')) |
| if now > expires: |
| expired_keys.append(key) |
| |
| for key in expired_keys: |
| del self.result_cache[key] |
| |
| if expired_keys: |
| logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") |
| |
| except asyncio.CancelledError: |
| break |
| except Exception as e: |
| logger.error(f"Cleanup loop error: {e}") |
| |
| async def get_system_report(self) -> Dict[str, Any]: |
| """Get comprehensive system report""" |
| ledger_health = self.ledger.analyze_health_sync() |
| |
| |
| total = self.metrics["total_detections"] |
| successful = self.metrics["successful_detections"] |
| success_rate = successful / total if total > 0 else 0.0 |
| |
| |
| phase_dist = dict(self.metrics["phase_distribution"]) |
| phase_percentages = { |
| phase: (count / total if total > 0 else 0) |
| for phase, count in phase_dist.items() |
| } |
| |
| return { |
| "report_timestamp": datetime.utcnow().isoformat() + "Z", |
| "engine_version": "IRE_v5.0_Production_Fixed", |
| "guarantees": { |
| "quantum_aware": True, |
| "quantum_resistant": False, |
| "n8n_integrated": True, |
| "async_processing": True, |
| "immutable_ledger": True, |
| "quorum_validation": True |
| }, |
| "metrics": { |
| **self.metrics, |
| "success_rate": round(success_rate, 3), |
| "phase_distribution": phase_percentages |
| }, |
| "registry_status": { |
| "lenses": len(self.registry.lenses), |
| "methods": len(self.registry.methods), |
| "last_sync": self.registry.last_sync |
| }, |
| "ledger_health": ledger_health, |
| "performance": { |
| "average_execution_time": round(self.metrics["average_execution_time"], 2), |
| "cache_size": len(self.result_cache), |
| "background_tasks": len(self._background_tasks) |
| }, |
| "config_summary": { |
| "min_validators": ProductionConfig.MIN_VALIDATORS, |
| "quorum_threshold": ProductionConfig.QUORUM_THRESHOLD, |
| "dissent_threshold": ProductionConfig.DISSENT_THRESHOLD, |
| "hash_algorithm": ProductionConfig.HASH_ALGORITHM |
| }, |
| "recommendations": self._generate_system_recommendations(ledger_health, success_rate) |
| } |
| |
| def _generate_system_recommendations(self, ledger_health: Dict, |
| success_rate: float) -> List[str]: |
| """Generate system recommendations""" |
| recommendations = [] |
| |
| |
| if ledger_health.get("health_score", 0) < 0.7: |
| recommendations.append("Improve ledger health by adding more nodes and validators") |
| |
| |
| if success_rate < 0.8 and self.metrics["total_detections"] > 10: |
| recommendations.append(f"Investigate failed detections (success rate: {success |