package ect import ( "errors" "fmt" ) // DefaultClockSkewTolerance is the recommended clock skew between agents (Section 6.2). const DefaultClockSkewTolerance = 30 // seconds // DefaultMaxAncestorLimit is the recommended max ancestor traversal for cycle detection (Section 6.3). const DefaultMaxAncestorLimit = 10000 // ECTStore provides lookup of ECTs by task ID (and optionally workflow ID) for DAG validation. // Implemented by ledger or in-memory cache of verified parent ECTs. type ECTStore interface { // GetByTid returns the payload for the given task ID, or nil if not found. GetByTid(tid string) *Payload // Contains returns true if (tid, wid) already exists. wid may be empty for global scope. Contains(tid, wid string) bool } // DAGConfig holds parameters for DAG validation. type DAGConfig struct { ClockSkewTolerance int // seconds; recommended 30 MaxAncestorLimit int // recommended 10000 } // DefaultDAGConfig returns recommended defaults. func DefaultDAGConfig() DAGConfig { return DAGConfig{ ClockSkewTolerance: DefaultClockSkewTolerance, MaxAncestorLimit: DefaultMaxAncestorLimit, } } // ValidateDAG runs Section 6.2 validation rules: uniqueness, parent existence, // temporal ordering, acyclicity, parent policy decision. func ValidateDAG(ect *Payload, store ECTStore, cfg DAGConfig) error { if store == nil { return errors.New("ect: ECTStore required for DAG validation") } if cfg.ClockSkewTolerance <= 0 { cfg.ClockSkewTolerance = DefaultClockSkewTolerance } if cfg.MaxAncestorLimit <= 0 { cfg.MaxAncestorLimit = DefaultMaxAncestorLimit } // 1. Task ID Uniqueness if store.Contains(ect.Tid, ect.Wid) { return fmt.Errorf("ect: task ID already exists: %s", ect.Tid) } // 2. Parent Existence and 3. Temporal Ordering for _, parentID := range ect.Par { parent := store.GetByTid(parentID) if parent == nil { return fmt.Errorf("ect: parent task not found: %s", parentID) } // parent.iat < child.iat + clock_skew_tolerance => parent.iat - ect.iat <= clock_skew_tolerance if parent.Iat >= ect.Iat+int64(cfg.ClockSkewTolerance) { return fmt.Errorf("ect: parent task not earlier than current: %s", parentID) } } // 4. Acyclicity (and depth limit) visited := make(map[string]struct{}) if hasCycle(ect.Tid, ect.Par, store, visited, cfg.MaxAncestorLimit) { return errors.New("ect: circular dependency or depth limit exceeded") } // 5. Parent Policy Decision for _, parentID := range ect.Par { parent := store.GetByTid(parentID) if parent.PolDecision == PolDecisionRejected || parent.PolDecision == PolDecisionPendingHumanReview { if !ect.CompensationRequired { return errors.New("ect: parent has non-approved pol_decision; current ECT must be compensation/remediation or have compensation_required true") } } } return nil } // hasCycle returns true if following par from the given parent IDs leads back to targetTid // or if traversal exceeds maxDepth. visited is mutated. func hasCycle(targetTid string, parentIDs []string, store ECTStore, visited map[string]struct{}, maxDepth int) bool { if len(visited) >= maxDepth { return true } for _, parentID := range parentIDs { if parentID == targetTid { return true } if _, ok := visited[parentID]; ok { continue } visited[parentID] = struct{}{} parent := store.GetByTid(parentID) if parent != nil { if hasCycle(targetTid, parent.Par, store, visited, maxDepth) { return true } } } return false }