Move Go reference implementation to refimpl/go-lang/ and add new Python reference implementation in refimpl/python/. Update build.sh with renamed draft and simplified tool paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
98 lines
3.2 KiB
Python
98 lines
3.2 KiB
Python
"""DAG validation per Section 6."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from ect.types import Payload
|
|
|
|
from ect.validate import DEFAULT_MAX_PAR_LENGTH
|
|
|
|
DEFAULT_CLOCK_SKEW_TOLERANCE = 30
|
|
DEFAULT_MAX_ANCESTOR_LIMIT = 10000
|
|
|
|
|
|
class ECTStore(ABC):
|
|
"""Lookup of ECTs by task ID for DAG validation."""
|
|
|
|
@abstractmethod
|
|
def get_by_tid(self, tid: str) -> "Payload | None":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def contains(self, tid: str, wid: str) -> bool:
|
|
pass
|
|
|
|
|
|
class DAGConfig:
|
|
def __init__(
|
|
self,
|
|
clock_skew_tolerance: int = DEFAULT_CLOCK_SKEW_TOLERANCE,
|
|
max_ancestor_limit: int = DEFAULT_MAX_ANCESTOR_LIMIT,
|
|
max_par_length: int = 0,
|
|
):
|
|
self.clock_skew_tolerance = clock_skew_tolerance or DEFAULT_CLOCK_SKEW_TOLERANCE
|
|
self.max_ancestor_limit = max_ancestor_limit or DEFAULT_MAX_ANCESTOR_LIMIT
|
|
self.max_par_length = max_par_length or 0
|
|
|
|
|
|
def default_dag_config() -> DAGConfig:
|
|
return DAGConfig()
|
|
|
|
|
|
def _has_cycle(
|
|
target_tid: str,
|
|
parent_ids: list[str],
|
|
store: ECTStore,
|
|
visited: set[str],
|
|
max_depth: int,
|
|
) -> bool:
|
|
if len(visited) >= max_depth:
|
|
return True
|
|
for parent_id in parent_ids:
|
|
if parent_id == target_tid:
|
|
return True
|
|
if parent_id in visited:
|
|
continue
|
|
visited.add(parent_id)
|
|
parent = store.get_by_tid(parent_id)
|
|
if parent is not None:
|
|
if _has_cycle(target_tid, parent.par, store, visited, max_depth):
|
|
return True
|
|
return False
|
|
|
|
|
|
def validate_dag(
|
|
payload: "Payload",
|
|
store: ECTStore,
|
|
cfg: DAGConfig,
|
|
) -> None:
|
|
"""Section 6.2: uniqueness (by jti), parent existence, temporal ordering, acyclicity, parent policy."""
|
|
if cfg.max_par_length > 0 and len(payload.par) > cfg.max_par_length:
|
|
raise ValueError("ect: par exceeds max length")
|
|
if store.contains(payload.jti, payload.wid or ""):
|
|
raise ValueError(f"ect: task ID (jti) already exists: {payload.jti}")
|
|
from ect.types import POL_DECISION_REJECTED, POL_DECISION_PENDING_HUMAN_REVIEW
|
|
|
|
for parent_id in payload.par:
|
|
parent = store.get_by_tid(parent_id)
|
|
if parent is None:
|
|
raise ValueError(f"ect: parent task not found: {parent_id}")
|
|
if parent.iat >= payload.iat + cfg.clock_skew_tolerance:
|
|
raise ValueError(f"ect: parent task not earlier than current: {parent_id}")
|
|
|
|
visited: set[str] = set()
|
|
if _has_cycle(payload.jti, payload.par, store, visited, cfg.max_ancestor_limit):
|
|
raise ValueError("ect: circular dependency or depth limit exceeded")
|
|
|
|
# Parent policy decision: only when parent has policy claims per spec
|
|
for parent_id in payload.par:
|
|
parent = store.get_by_tid(parent_id)
|
|
if parent and parent.has_policy_claims() and parent.pol_decision in (POL_DECISION_REJECTED, POL_DECISION_PENDING_HUMAN_REVIEW):
|
|
if not payload.compensation_required():
|
|
raise ValueError(
|
|
"ect: parent has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true"
|
|
)
|