package ect import ( "errors" "fmt" ) // DefaultClockSkewTolerance is the recommended clock skew between agents (Section 6.2). const DefaultClockSkewTolerance = 30 // seconds // DefaultMaxAncestorLimit is the recommended max ancestor traversal for cycle detection (Section 6.3). const DefaultMaxAncestorLimit = 10000 // ECTStore provides lookup of ECTs by task ID (and optionally workflow ID) for DAG validation. // Implemented by ledger or in-memory cache of verified parent ECTs. type ECTStore interface { // GetByTid returns the payload for the given task ID, or nil if not found. GetByTid(tid string) *Payload // Contains returns true if (tid, wid) already exists. wid may be empty for global scope. Contains(tid, wid string) bool } // DAGConfig holds parameters for DAG validation. type DAGConfig struct { ClockSkewTolerance int // seconds; recommended 30 MaxAncestorLimit int // recommended 10000 MaxPredLength int // max pred length (0 = no limit; recommended 100) } // DefaultDAGConfig returns recommended defaults. func DefaultDAGConfig() DAGConfig { return DAGConfig{ ClockSkewTolerance: DefaultClockSkewTolerance, MaxAncestorLimit: DefaultMaxAncestorLimit, MaxPredLength: DefaultMaxPredLength, } } // ValidateDAG runs Section 6.2 validation rules: uniqueness, parent existence, // temporal ordering, acyclicity, parent policy decision. func ValidateDAG(ect *Payload, store ECTStore, cfg DAGConfig) error { if store == nil { return errors.New("ect: ECTStore required for DAG validation") } if cfg.ClockSkewTolerance <= 0 { cfg.ClockSkewTolerance = DefaultClockSkewTolerance } if cfg.MaxAncestorLimit <= 0 { cfg.MaxAncestorLimit = DefaultMaxAncestorLimit } if cfg.MaxPredLength > 0 && len(ect.Pred) > cfg.MaxPredLength { return ErrPredLength } // 1. Task ID Uniqueness (task id = jti per spec) if store.Contains(ect.Jti, ect.Wid) { return fmt.Errorf("ect: task ID (jti) already exists: %s", ect.Jti) } // 2. Predecessor Existence and 3. Temporal Ordering for _, predID := range ect.Pred { pred := store.GetByTid(predID) if pred == nil { return fmt.Errorf("ect: predecessor task not found: %s", predID) } // pred.iat < child.iat + clock_skew_tolerance if pred.Iat >= ect.Iat+int64(cfg.ClockSkewTolerance) { return fmt.Errorf("ect: predecessor task not earlier than current: %s", predID) } } // 4. Acyclicity (and depth limit) visited := make(map[string]struct{}) if hasCycle(ect.Jti, ect.Pred, store, visited, cfg.MaxAncestorLimit) { return errors.New("ect: circular dependency or depth limit exceeded") } // 5. Predecessor Policy Decision (only when predecessor has policy claims in ext per -01) for _, predID := range ect.Pred { pred := store.GetByTid(predID) if pred != nil && pred.HasPolicyClaims() { polDec := pred.PolDecision() if polDec == "rejected" || polDec == "pending_human_review" { if !ect.CompensationRequired() { return errors.New("ect: predecessor has non-approved pol_decision; current ECT must be compensation/remediation or have ext.compensation_required true") } } } } return nil } // hasCycle returns true if following pred from the given predecessor IDs leads back to targetTid // or if traversal exceeds maxDepth. visited is mutated. func hasCycle(targetTid string, predIDs []string, store ECTStore, visited map[string]struct{}, maxDepth int) bool { if len(visited) >= maxDepth { return true } for _, predID := range predIDs { if predID == targetTid { return true } if _, ok := visited[predID]; ok { continue } visited[predID] = struct{}{} pred := store.GetByTid(predID) if pred != nil { if hasCycle(targetTid, pred.Pred, store, visited, maxDepth) { return true } } } return false }