"""DAG validation per Section 6.""" from __future__ import annotations from abc import ABC, abstractmethod from typing import TYPE_CHECKING if TYPE_CHECKING: from ect.types import Payload from ect.validate import DEFAULT_MAX_PRED_LENGTH DEFAULT_CLOCK_SKEW_TOLERANCE = 30 DEFAULT_MAX_ANCESTOR_LIMIT = 10000 class ECTStore(ABC): """Lookup of ECTs by task ID for DAG validation.""" @abstractmethod def get_by_tid(self, tid: str) -> "Payload | None": pass @abstractmethod def contains(self, tid: str, wid: str) -> bool: pass class DAGConfig: def __init__( self, clock_skew_tolerance: int = DEFAULT_CLOCK_SKEW_TOLERANCE, max_ancestor_limit: int = DEFAULT_MAX_ANCESTOR_LIMIT, max_pred_length: int = 0, ): self.clock_skew_tolerance = clock_skew_tolerance or DEFAULT_CLOCK_SKEW_TOLERANCE self.max_ancestor_limit = max_ancestor_limit or DEFAULT_MAX_ANCESTOR_LIMIT self.max_pred_length = max_pred_length or 0 def default_dag_config() -> DAGConfig: return DAGConfig() def _has_cycle( target_tid: str, pred_ids: list[str], store: ECTStore, visited: set[str], max_depth: int, ) -> bool: if len(visited) >= max_depth: return True for pred_id in pred_ids: if pred_id == target_tid: return True if pred_id in visited: continue visited.add(pred_id) pred = store.get_by_tid(pred_id) if pred is not None: if _has_cycle(target_tid, pred.pred, store, visited, max_depth): return True return False def validate_dag( payload: "Payload", store: ECTStore, cfg: DAGConfig, ) -> None: """Section 6.2: uniqueness (by jti), predecessor existence, temporal ordering, acyclicity, predecessor policy.""" if cfg.max_pred_length > 0 and len(payload.pred) > cfg.max_pred_length: raise ValueError("ect: pred exceeds max length") if store.contains(payload.jti, payload.wid or ""): raise ValueError(f"ect: task ID (jti) already exists: {payload.jti}") for pred_id in payload.pred: pred = store.get_by_tid(pred_id) if pred is None: raise ValueError(f"ect: predecessor task not found: {pred_id}") if pred.iat >= payload.iat + cfg.clock_skew_tolerance: raise ValueError(f"ect: predecessor task not earlier than current: {pred_id}") visited: set[str] = set() if _has_cycle(payload.jti, payload.pred, store, visited, cfg.max_ancestor_limit): raise ValueError("ect: circular dependency or depth limit exceeded") # Predecessor policy decision: only when predecessor has policy claims in ext per -01 for pred_id in payload.pred: pred = store.get_by_tid(pred_id) if pred and pred.has_policy_claims() and pred.pol_decision() in ("rejected", "pending_human_review"): if not payload.compensation_required(): raise ValueError( "ect: predecessor has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true" )