"""Audit ledger per Section 9.""" from __future__ import annotations import time from abc import ABC, abstractmethod from dataclasses import dataclass from typing import TYPE_CHECKING from ect.types import Payload if TYPE_CHECKING: pass class ErrTaskIDExists(Exception): """Raised when appending an ECT whose tid already exists.""" @dataclass class LedgerEntry: ledger_sequence: int task_id: str agent_id: str action: str predecessors: list[str] ect_jws: str signature_verified: bool verification_timestamp: float stored_timestamp: float class Ledger(ABC): """Append-only audit ledger; lookup by task id (jti).""" @abstractmethod def append(self, ect_jws: str, payload: Payload) -> int: """Returns new ledger sequence number.""" pass @abstractmethod def get_by_tid(self, tid: str) -> Payload | None: pass @abstractmethod def contains(self, tid: str, wid: str) -> bool: pass class MemoryLedger(Ledger): """In-memory append-only ECT store implementing Ledger and ECTStore.""" def __init__(self) -> None: self._seq = 0 self._by_tid: dict[str, "Payload"] = {} self._entries: list[LedgerEntry] = [] self._lock = __import__("threading").Lock() def append(self, ect_jws: str, payload: Payload) -> int: if payload is None: return 0 with self._lock: wid = payload.wid or "" if self._contains_locked(payload.jti, wid): raise ErrTaskIDExists("ect: task ID (jti) already exists in ledger") self._seq += 1 now = time.time() entry = LedgerEntry( ledger_sequence=self._seq, task_id=payload.jti, agent_id=payload.iss, action=payload.exec_act, predecessors=list(payload.pred) if payload.pred else [], ect_jws=ect_jws, signature_verified=True, verification_timestamp=now, stored_timestamp=now, ) self._by_tid[payload.jti] = payload self._entries.append(entry) return self._seq def get_by_tid(self, tid: str) -> Payload | None: with self._lock: return self._by_tid.get(tid) def contains(self, tid: str, wid: str) -> bool: with self._lock: return self._contains_locked(tid, wid) def _contains_locked(self, tid: str, wid: str) -> bool: p = self._by_tid.get(tid) if p is None: return False if not wid: return True return (p.wid or "") == wid