"""SQLite database layer with FTS5 full-text search.""" from __future__ import annotations import json import sqlite3 from datetime import datetime, timezone from pathlib import Path import numpy as np from .config import Config from .models import Author, Draft, Rating, normalize_category SCHEMA = """ CREATE TABLE IF NOT EXISTS drafts ( name TEXT PRIMARY KEY, rev TEXT NOT NULL, title TEXT NOT NULL, abstract TEXT NOT NULL DEFAULT '', time TEXT, dt_id INTEGER, pages INTEGER, words INTEGER, "group" TEXT, group_uri TEXT, expires TEXT, ad TEXT, shepherd TEXT, states TEXT DEFAULT '[]', -- JSON array full_text TEXT, categories TEXT DEFAULT '[]', -- JSON array tags TEXT DEFAULT '[]', -- JSON array fetched_at TEXT ); CREATE TABLE IF NOT EXISTS ratings ( draft_name TEXT PRIMARY KEY REFERENCES drafts(name), novelty INTEGER NOT NULL, maturity INTEGER NOT NULL, overlap INTEGER NOT NULL, momentum INTEGER NOT NULL, relevance INTEGER NOT NULL, summary TEXT NOT NULL DEFAULT '', novelty_note TEXT DEFAULT '', maturity_note TEXT DEFAULT '', overlap_note TEXT DEFAULT '', momentum_note TEXT DEFAULT '', relevance_note TEXT DEFAULT '', categories TEXT DEFAULT '[]', -- JSON array rated_at TEXT, false_positive INTEGER DEFAULT 0 -- 1 = flagged as not AI-agent related ); CREATE TABLE IF NOT EXISTS embeddings ( draft_name TEXT PRIMARY KEY REFERENCES drafts(name), model TEXT NOT NULL, vector BLOB NOT NULL, -- numpy float32 array as bytes created_at TEXT ); CREATE TABLE IF NOT EXISTS llm_cache ( draft_name TEXT NOT NULL, prompt_hash TEXT NOT NULL, model TEXT NOT NULL, request_json TEXT NOT NULL, -- full prompt sent response_json TEXT NOT NULL, -- raw Claude response input_tokens INTEGER, output_tokens INTEGER, created_at TEXT, PRIMARY KEY (draft_name, prompt_hash) ); CREATE VIRTUAL TABLE IF NOT EXISTS drafts_fts USING fts5( name, title, abstract, full_text, content='drafts', content_rowid='rowid' ); -- Authors (fetched from Datatracker) CREATE TABLE IF NOT EXISTS authors ( person_id INTEGER PRIMARY KEY, name TEXT NOT NULL, ascii_name TEXT, affiliation TEXT DEFAULT '', resource_uri TEXT, fetched_at TEXT ); CREATE TABLE IF NOT EXISTS draft_authors ( draft_name TEXT NOT NULL REFERENCES drafts(name), person_id INTEGER NOT NULL REFERENCES authors(person_id), author_order INTEGER DEFAULT 1, affiliation TEXT DEFAULT '', PRIMARY KEY (draft_name, person_id) ); -- Extracted ideas CREATE TABLE IF NOT EXISTS ideas ( id INTEGER PRIMARY KEY AUTOINCREMENT, draft_name TEXT NOT NULL REFERENCES drafts(name), title TEXT NOT NULL, description TEXT NOT NULL, idea_type TEXT DEFAULT '', extracted_at TEXT ); CREATE INDEX IF NOT EXISTS idx_ideas_draft ON ideas(draft_name); -- Idea embeddings (for clustering) CREATE TABLE IF NOT EXISTS idea_embeddings ( idea_id INTEGER PRIMARY KEY REFERENCES ideas(id), model TEXT NOT NULL, vector BLOB NOT NULL, created_at TEXT ); -- Gap analysis results CREATE TABLE IF NOT EXISTS gaps ( id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, description TEXT NOT NULL, category TEXT DEFAULT '', evidence TEXT DEFAULT '', severity TEXT DEFAULT 'medium', analyzed_at TEXT ); -- Cross-references (RFC, draft, BCP references found in draft text) CREATE TABLE IF NOT EXISTS draft_refs ( draft_name TEXT NOT NULL REFERENCES drafts(name), ref_type TEXT NOT NULL, -- 'rfc', 'draft', 'bcp' ref_id TEXT NOT NULL, -- e.g. '8259', 'draft-ietf-httpbis-semantics', 'BCP14' UNIQUE(draft_name, ref_type, ref_id) ); CREATE INDEX IF NOT EXISTS idx_draft_refs_ref ON draft_refs(ref_type, ref_id); -- Generated drafts from gap-to-draft pipeline CREATE TABLE IF NOT EXISTS generated_drafts ( id INTEGER PRIMARY KEY AUTOINCREMENT, gap_topic TEXT NOT NULL, draft_name TEXT NOT NULL, title TEXT NOT NULL, abstract TEXT NOT NULL DEFAULT '', outline_json TEXT DEFAULT '{}', sections_json TEXT DEFAULT '[]', full_text TEXT, family_name TEXT DEFAULT '', family_role TEXT DEFAULT '', version INTEGER DEFAULT 0, rating_json TEXT DEFAULT '{}', novelty_score REAL DEFAULT 0.0, quality_score REAL DEFAULT 0.0, status TEXT DEFAULT 'draft', created_at TEXT ); CREATE TABLE IF NOT EXISTS generation_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_name TEXT DEFAULT '', gap_ids TEXT DEFAULT '[]', total_input_tokens INTEGER DEFAULT 0, total_output_tokens INTEGER DEFAULT 0, model_used TEXT DEFAULT '', status TEXT DEFAULT 'running', started_at TEXT, completed_at TEXT ); -- Observatory tables CREATE TABLE IF NOT EXISTS sources ( name TEXT PRIMARY KEY, last_fetch TEXT, doc_count INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS observatory_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, snapshot_at TEXT NOT NULL, total_docs INTEGER DEFAULT 0, new_since_last INTEGER DEFAULT 0, changed_gaps INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS gap_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, snapshot_id INTEGER REFERENCES observatory_snapshots(id), gap_topic TEXT NOT NULL, gap_description TEXT NOT NULL, severity TEXT DEFAULT 'medium', status TEXT DEFAULT 'open', recorded_at TEXT ); -- Annotations (user notes + tags per draft) CREATE TABLE IF NOT EXISTS annotations ( id INTEGER PRIMARY KEY AUTOINCREMENT, draft_name TEXT NOT NULL REFERENCES drafts(name), note TEXT DEFAULT '', tags TEXT DEFAULT '[]', created_at TEXT, updated_at TEXT, UNIQUE(draft_name) ); -- Monitor runs CREATE TABLE IF NOT EXISTS monitor_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TEXT NOT NULL, completed_at TEXT, status TEXT DEFAULT 'running', new_drafts_found INTEGER DEFAULT 0, drafts_analyzed INTEGER DEFAULT 0, drafts_embedded INTEGER DEFAULT 0, ideas_extracted INTEGER DEFAULT 0, error_message TEXT DEFAULT '', duration_seconds REAL DEFAULT 0 ); -- Triggers to keep FTS index in sync CREATE TRIGGER IF NOT EXISTS drafts_ai AFTER INSERT ON drafts BEGIN INSERT INTO drafts_fts(rowid, name, title, abstract, full_text) VALUES (new.rowid, new.name, new.title, new.abstract, new.full_text); END; CREATE TRIGGER IF NOT EXISTS drafts_ad AFTER DELETE ON drafts BEGIN INSERT INTO drafts_fts(drafts_fts, rowid, name, title, abstract, full_text) VALUES ('delete', old.rowid, old.name, old.title, old.abstract, old.full_text); END; CREATE TRIGGER IF NOT EXISTS drafts_au AFTER UPDATE ON drafts BEGIN INSERT INTO drafts_fts(drafts_fts, rowid, name, title, abstract, full_text) VALUES ('delete', old.rowid, old.name, old.title, old.abstract, old.full_text); INSERT INTO drafts_fts(rowid, name, title, abstract, full_text) VALUES (new.rowid, new.name, new.title, new.abstract, new.full_text); END; """ class Database: def __init__(self, config: Config | None = None): self.config = config or Config.load() self.db_path = self.config.db_path Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) self._conn: sqlite3.Connection | None = None @property def conn(self) -> sqlite3.Connection: if self._conn is None: self._conn = sqlite3.connect(self.db_path) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA foreign_keys=ON") self._conn.executescript(SCHEMA) self._migrate_schema() return self._conn def _migrate_schema(self) -> None: """Additive migration — add columns if missing.""" cols = {r[1] for r in self._conn.execute("PRAGMA table_info(drafts)").fetchall()} migrations = [ ("source", "TEXT DEFAULT 'ietf'"), ("source_id", "TEXT DEFAULT ''"), ("source_url", "TEXT DEFAULT ''"), ("doc_status", "TEXT DEFAULT ''"), ] for col, typedef in migrations: if col not in cols: self._conn.execute(f"ALTER TABLE drafts ADD COLUMN {col} {typedef}") # ratings table migrations rating_cols = {r[1] for r in self._conn.execute("PRAGMA table_info(ratings)").fetchall()} if "false_positive" not in rating_cols: self._conn.execute("ALTER TABLE ratings ADD COLUMN false_positive INTEGER DEFAULT 0") # ideas table migrations idea_cols = {r[1] for r in self._conn.execute("PRAGMA table_info(ideas)").fetchall()} if "novelty_score" not in idea_cols: self._conn.execute("ALTER TABLE ideas ADD COLUMN novelty_score INTEGER") self._conn.commit() def close(self) -> None: if self._conn: self._conn.close() self._conn = None # --- Drafts --- def upsert_draft(self, draft: Draft) -> None: self.conn.execute( """INSERT INTO drafts (name, rev, title, abstract, time, dt_id, pages, words, "group", group_uri, expires, ad, shepherd, states, full_text, categories, tags, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET rev=excluded.rev, title=excluded.title, abstract=excluded.abstract, time=excluded.time, dt_id=excluded.dt_id, pages=excluded.pages, words=excluded.words, "group"=excluded."group", group_uri=excluded.group_uri, expires=excluded.expires, ad=excluded.ad, shepherd=excluded.shepherd, states=excluded.states, full_text=COALESCE(excluded.full_text, full_text), categories=excluded.categories, tags=excluded.tags, fetched_at=excluded.fetched_at """, ( draft.name, draft.rev, draft.title, draft.abstract, draft.time, draft.dt_id, draft.pages, draft.words, draft.group, draft.group_uri, draft.expires, draft.ad, draft.shepherd, json.dumps(draft.states), draft.full_text, json.dumps(draft.categories), json.dumps(draft.tags), draft.fetched_at or datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() def get_draft(self, name: str) -> Draft | None: row = self.conn.execute("SELECT * FROM drafts WHERE name = ?", (name,)).fetchone() if row is None: return None return self._row_to_draft(row) def list_drafts( self, limit: int = 100, offset: int = 0, order_by: str = "time DESC", ) -> list[Draft]: # Sanitize order_by to prevent injection allowed = {"time", "name", "title", "pages", "words", "fetched_at"} parts = order_by.split() col = parts[0] if parts else "time" direction = parts[1].upper() if len(parts) > 1 else "DESC" if col not in allowed: col = "time" if direction not in ("ASC", "DESC"): direction = "DESC" safe_order = f'"{col}" {direction}' if col == "group" else f"{col} {direction}" rows = self.conn.execute( f"SELECT * FROM drafts ORDER BY {safe_order} LIMIT ? OFFSET ?", (limit, offset), ).fetchall() return [self._row_to_draft(r) for r in rows] def count_drafts(self) -> int: return self.conn.execute("SELECT COUNT(*) FROM drafts").fetchone()[0] def search_drafts(self, query: str, limit: int = 50) -> list[Draft]: rows = self.conn.execute( """SELECT d.* FROM drafts d JOIN drafts_fts f ON d.rowid = f.rowid WHERE drafts_fts MATCH ? ORDER BY rank LIMIT ?""", (query, limit), ).fetchall() return [self._row_to_draft(r) for r in rows] def drafts_without_text(self, limit: int = 100) -> list[Draft]: rows = self.conn.execute( "SELECT * FROM drafts WHERE full_text IS NULL LIMIT ?", (limit,) ).fetchall() return [self._row_to_draft(r) for r in rows] # --- Ratings --- def upsert_rating(self, rating: Rating) -> None: self.conn.execute( """INSERT INTO ratings (draft_name, novelty, maturity, overlap, momentum, relevance, summary, novelty_note, maturity_note, overlap_note, momentum_note, relevance_note, categories, rated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(draft_name) DO UPDATE SET novelty=excluded.novelty, maturity=excluded.maturity, overlap=excluded.overlap, momentum=excluded.momentum, relevance=excluded.relevance, summary=excluded.summary, novelty_note=excluded.novelty_note, maturity_note=excluded.maturity_note, overlap_note=excluded.overlap_note, momentum_note=excluded.momentum_note, relevance_note=excluded.relevance_note, categories=excluded.categories, rated_at=excluded.rated_at """, ( rating.draft_name, rating.novelty, rating.maturity, rating.overlap, rating.momentum, rating.relevance, rating.summary, rating.novelty_note, rating.maturity_note, rating.overlap_note, rating.momentum_note, rating.relevance_note, json.dumps(rating.categories), rating.rated_at or datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() def get_rating(self, draft_name: str) -> Rating | None: row = self.conn.execute( "SELECT * FROM ratings WHERE draft_name = ?", (draft_name,) ).fetchone() if row is None: return None return self._row_to_rating(row) def unrated_drafts(self, limit: int = 100) -> list[Draft]: rows = self.conn.execute( """SELECT d.* FROM drafts d LEFT JOIN ratings r ON d.name = r.draft_name WHERE r.draft_name IS NULL LIMIT ?""", (limit,), ).fetchall() return [self._row_to_draft(r) for r in rows] def drafts_with_ratings(self, limit: int = 200) -> list[tuple[Draft, Rating]]: rows = self.conn.execute( """SELECT d.*, r.novelty, r.maturity, r.overlap, r.momentum, r.relevance, r.summary, r.novelty_note, r.maturity_note, r.overlap_note, r.momentum_note, r.relevance_note, r.categories as r_categories, r.rated_at FROM drafts d JOIN ratings r ON d.name = r.draft_name ORDER BY (r.novelty * 0.30 + r.relevance * 0.25 + r.maturity * 0.20 + r.momentum * 0.15 + (6 - r.overlap) * 0.10) DESC LIMIT ?""", (limit,), ).fetchall() results = [] for r in rows: draft = self._row_to_draft(r) rating = Rating( draft_name=r["draft_name"] if "draft_name" in r.keys() else draft.name, novelty=r["novelty"], maturity=r["maturity"], overlap=r["overlap"], momentum=r["momentum"], relevance=r["relevance"], summary=r["summary"], novelty_note=r["novelty_note"], maturity_note=r["maturity_note"], overlap_note=r["overlap_note"], momentum_note=r["momentum_note"], relevance_note=r["relevance_note"], categories=[normalize_category(c) for c in json.loads(r["r_categories"])] if r["r_categories"] else [], rated_at=r["rated_at"], ) results.append((draft, rating)) return results # --- Embeddings --- def store_embedding(self, draft_name: str, model: str, vector: np.ndarray) -> None: self.conn.execute( """INSERT INTO embeddings (draft_name, model, vector, created_at) VALUES (?, ?, ?, ?) ON CONFLICT(draft_name) DO UPDATE SET model=excluded.model, vector=excluded.vector, created_at=excluded.created_at """, (draft_name, model, vector.astype(np.float32).tobytes(), datetime.now(timezone.utc).isoformat()), ) self.conn.commit() def get_embedding(self, draft_name: str) -> np.ndarray | None: row = self.conn.execute( "SELECT vector FROM embeddings WHERE draft_name = ?", (draft_name,) ).fetchone() if row is None: return None return np.frombuffer(row["vector"], dtype=np.float32) def all_embeddings(self) -> dict[str, np.ndarray]: rows = self.conn.execute("SELECT draft_name, vector FROM embeddings").fetchall() return { r["draft_name"]: np.frombuffer(r["vector"], dtype=np.float32) for r in rows } def drafts_without_embeddings(self, limit: int = 500) -> list[str]: rows = self.conn.execute( """SELECT d.name FROM drafts d LEFT JOIN embeddings e ON d.name = e.draft_name WHERE e.draft_name IS NULL LIMIT ?""", (limit,), ).fetchall() return [r["name"] for r in rows] # --- LLM Cache --- def cache_response( self, draft_name: str, prompt_hash: str, model: str, request_json: str, response_json: str, input_tokens: int = 0, output_tokens: int = 0, ) -> None: self.conn.execute( """INSERT INTO llm_cache (draft_name, prompt_hash, model, request_json, response_json, input_tokens, output_tokens, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(draft_name, prompt_hash) DO UPDATE SET model=excluded.model, response_json=excluded.response_json, input_tokens=excluded.input_tokens, output_tokens=excluded.output_tokens, created_at=excluded.created_at """, (draft_name, prompt_hash, model, request_json, response_json, input_tokens, output_tokens, datetime.now(timezone.utc).isoformat()), ) self.conn.commit() def get_cached_response(self, draft_name: str, prompt_hash: str) -> str | None: row = self.conn.execute( "SELECT response_json FROM llm_cache WHERE draft_name = ? AND prompt_hash = ?", (draft_name, prompt_hash), ).fetchone() return row["response_json"] if row else None def total_tokens_used(self) -> tuple[int, int]: row = self.conn.execute( "SELECT COALESCE(SUM(input_tokens),0), COALESCE(SUM(output_tokens),0) FROM llm_cache" ).fetchone() return (row[0], row[1]) # --- Authors --- def upsert_author(self, author: Author) -> None: self.conn.execute( """INSERT INTO authors (person_id, name, ascii_name, affiliation, resource_uri, fetched_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(person_id) DO UPDATE SET name=excluded.name, ascii_name=excluded.ascii_name, affiliation=excluded.affiliation, resource_uri=excluded.resource_uri, fetched_at=excluded.fetched_at """, (author.person_id, author.name, author.ascii_name, author.affiliation, author.resource_uri, author.fetched_at), ) self.conn.commit() def upsert_draft_author( self, draft_name: str, person_id: int, order: int = 1, affiliation: str = "" ) -> None: self.conn.execute( """INSERT INTO draft_authors (draft_name, person_id, author_order, affiliation) VALUES (?, ?, ?, ?) ON CONFLICT(draft_name, person_id) DO UPDATE SET author_order=excluded.author_order, affiliation=excluded.affiliation """, (draft_name, person_id, order, affiliation), ) self.conn.commit() def get_authors_for_draft(self, draft_name: str) -> list[Author]: rows = self.conn.execute( """SELECT a.* FROM authors a JOIN draft_authors da ON a.person_id = da.person_id WHERE da.draft_name = ? ORDER BY da.author_order""", (draft_name,), ).fetchall() results = [] for r in rows: d = dict(r) results.append(Author( person_id=d["person_id"], name=d["name"], ascii_name=d.get("ascii_name", ""), affiliation=d.get("affiliation", ""), resource_uri=d.get("resource_uri", ""), fetched_at=d.get("fetched_at"), )) return results def drafts_without_authors(self, limit: int = 500) -> list[str]: rows = self.conn.execute( """SELECT d.name FROM drafts d LEFT JOIN draft_authors da ON d.name = da.draft_name WHERE da.draft_name IS NULL LIMIT ?""", (limit,), ).fetchall() return [r["name"] for r in rows] def author_count(self) -> int: return self.conn.execute("SELECT COUNT(*) FROM authors").fetchone()[0] def top_authors(self, limit: int = 20) -> list[tuple[str, str, int, list[str]]]: """Return (name, affiliation, draft_count, [draft_names]).""" rows = self.conn.execute( """SELECT a.name, a.affiliation, COUNT(da.draft_name) as cnt, GROUP_CONCAT(da.draft_name, '||') as drafts FROM authors a JOIN draft_authors da ON a.person_id = da.person_id GROUP BY a.person_id ORDER BY cnt DESC LIMIT ?""", (limit,), ).fetchall() return [ (r["name"], r["affiliation"], r["cnt"], r["drafts"].split("||") if r["drafts"] else []) for r in rows ] def top_orgs(self, limit: int = 20) -> list[tuple[str, int, int]]: """Return (org, author_count, draft_count).""" rows = self.conn.execute( """SELECT da.affiliation as org, COUNT(DISTINCT da.person_id) as authors, COUNT(DISTINCT da.draft_name) as drafts FROM draft_authors da WHERE da.affiliation != '' GROUP BY da.affiliation ORDER BY drafts DESC LIMIT ?""", (limit,), ).fetchall() return [(r["org"], r["authors"], r["drafts"]) for r in rows] def coauthor_pairs(self) -> list[tuple[str, str, int]]: """Return (author_a, author_b, shared_drafts) for all co-author pairs.""" rows = self.conn.execute( """SELECT a1.name as a, a2.name as b, COUNT(*) as shared FROM draft_authors da1 JOIN draft_authors da2 ON da1.draft_name = da2.draft_name AND da1.person_id < da2.person_id JOIN authors a1 ON da1.person_id = a1.person_id JOIN authors a2 ON da2.person_id = a2.person_id GROUP BY da1.person_id, da2.person_id ORDER BY shared DESC""" ).fetchall() return [(r["a"], r["b"], r["shared"]) for r in rows] def cross_org_collaborations(self, limit: int = 20) -> list[tuple[str, str, int]]: """Return (org_a, org_b, shared_drafts) for cross-org collaboration.""" rows = self.conn.execute( """SELECT da1.affiliation as org_a, da2.affiliation as org_b, COUNT(DISTINCT da1.draft_name) as shared FROM draft_authors da1 JOIN draft_authors da2 ON da1.draft_name = da2.draft_name AND da1.person_id < da2.person_id WHERE da1.affiliation != '' AND da2.affiliation != '' AND da1.affiliation != da2.affiliation GROUP BY da1.affiliation, da2.affiliation ORDER BY shared DESC LIMIT ?""", (limit,), ).fetchall() return [(r["org_a"], r["org_b"], r["shared"]) for r in rows] def org_data_raw(self) -> list[tuple[str, int, str]]: """Return (affiliation, person_id, draft_name) for all draft_authors with affiliation.""" rows = self.conn.execute( "SELECT affiliation, person_id, draft_name FROM draft_authors WHERE affiliation != ''" ).fetchall() return [(r[0], r[1], r[2]) for r in rows] def author_draft_counts(self) -> dict[int, int]: """Return {person_id: draft_count} for all authors.""" rows = self.conn.execute( "SELECT person_id, COUNT(*) FROM draft_authors GROUP BY person_id" ).fetchall() return {r[0]: r[1] for r in rows} def author_draft_sets(self) -> dict[int, set[str]]: """Return {person_id: set(draft_names)} for all authors.""" rows = self.conn.execute( "SELECT person_id, draft_name FROM draft_authors" ).fetchall() result: dict[int, set[str]] = {} for r in rows: result.setdefault(r[0], set()).add(r[1]) return result # --- Ideas --- def insert_ideas(self, draft_name: str, ideas: list[dict]) -> None: # Clear existing ideas for this draft first self.conn.execute("DELETE FROM ideas WHERE draft_name = ?", (draft_name,)) now = datetime.now(timezone.utc).isoformat() for idea in ideas: self.conn.execute( """INSERT INTO ideas (draft_name, title, description, idea_type, extracted_at) VALUES (?, ?, ?, ?, ?)""", (draft_name, idea["title"], idea["description"], idea.get("type", ""), now), ) self.conn.commit() def delete_ideas(self, draft_name: str | None = None) -> int: """Delete ideas from the ideas table. Args: draft_name: If provided, delete only ideas for this draft. If None, delete all ideas. Returns: Number of rows deleted. """ if draft_name: self.conn.execute( "DELETE FROM idea_embeddings WHERE idea_id IN (SELECT id FROM ideas WHERE draft_name = ?)", (draft_name,) ) cursor = self.conn.execute( "DELETE FROM ideas WHERE draft_name = ?", (draft_name,) ) else: self.conn.execute("DELETE FROM idea_embeddings") cursor = self.conn.execute("DELETE FROM ideas") self.conn.commit() return cursor.rowcount def get_ideas_for_draft(self, draft_name: str) -> list[dict]: rows = self.conn.execute( "SELECT * FROM ideas WHERE draft_name = ?", (draft_name,) ).fetchall() return [{"id": r["id"], "title": r["title"], "description": r["description"], "type": r["idea_type"], "draft_name": r["draft_name"], "novelty_score": r["novelty_score"]} for r in rows] def delete_idea(self, idea_id: int) -> None: """Delete a single idea and its embedding by ID.""" self.conn.execute("DELETE FROM idea_embeddings WHERE idea_id = ?", (idea_id,)) self.conn.execute("DELETE FROM ideas WHERE id = ?", (idea_id,)) self.conn.commit() def drafts_without_ideas(self, limit: int = 500) -> list[str]: rows = self.conn.execute( """SELECT d.name FROM drafts d LEFT JOIN ideas i ON d.name = i.draft_name LEFT JOIN llm_cache lc ON d.name = lc.draft_name AND lc.request_json LIKE 'batch-ideas[%' WHERE i.draft_name IS NULL AND lc.draft_name IS NULL LIMIT ?""", (limit,), ).fetchall() return [r["name"] for r in rows] def all_ideas(self) -> list[dict]: rows = self.conn.execute( "SELECT * FROM ideas ORDER BY draft_name" ).fetchall() return [{"title": r["title"], "description": r["description"], "type": r["idea_type"], "draft_name": r["draft_name"], "novelty_score": r["novelty_score"]} for r in rows] def idea_count(self) -> int: return self.conn.execute("SELECT COUNT(*) FROM ideas").fetchone()[0] def ideas_with_drafts(self, unscored_only: bool = False, limit: int = 5000) -> list[dict]: """Return ideas joined with draft title, optionally only unscored ones.""" where = "WHERE i.novelty_score IS NULL" if unscored_only else "" rows = self.conn.execute( f"""SELECT i.id, i.draft_name, i.title, i.description, i.idea_type, i.novelty_score, d.title AS draft_title FROM ideas i JOIN drafts d ON i.draft_name = d.name {where} ORDER BY i.id LIMIT ?""", (limit,), ).fetchall() return [dict(r) for r in rows] def update_idea_score(self, idea_id: int, score: int) -> None: """Set the novelty_score for a single idea.""" self.conn.execute( "UPDATE ideas SET novelty_score = ? WHERE id = ?", (score, idea_id), ) self.conn.commit() def update_idea_scores_bulk(self, scores: dict[int, int]) -> None: """Bulk-update novelty scores. scores maps idea_id -> score.""" self.conn.executemany( "UPDATE ideas SET novelty_score = ? WHERE id = ?", [(score, idea_id) for idea_id, score in scores.items()], ) self.conn.commit() def delete_low_score_ideas(self, min_score: int) -> int: """Delete ideas with novelty_score below min_score. Returns count deleted.""" # Also clean up associated idea embeddings self.conn.execute( """DELETE FROM idea_embeddings WHERE idea_id IN (SELECT id FROM ideas WHERE novelty_score IS NOT NULL AND novelty_score < ?)""", (min_score,), ) cursor = self.conn.execute( "DELETE FROM ideas WHERE novelty_score IS NOT NULL AND novelty_score < ?", (min_score,), ) self.conn.commit() return cursor.rowcount def idea_score_distribution(self) -> dict[int, int]: """Return {score: count} for scored ideas.""" rows = self.conn.execute( "SELECT novelty_score, COUNT(*) as cnt FROM ideas " "WHERE novelty_score IS NOT NULL GROUP BY novelty_score ORDER BY novelty_score" ).fetchall() return {r["novelty_score"]: r["cnt"] for r in rows} def ideas_below_score(self, min_score: int) -> list[dict]: """Return ideas with novelty_score below min_score.""" rows = self.conn.execute( """SELECT i.id, i.draft_name, i.title, i.description, i.novelty_score, d.title AS draft_title FROM ideas i JOIN drafts d ON i.draft_name = d.name WHERE i.novelty_score IS NOT NULL AND i.novelty_score < ? ORDER BY i.novelty_score, i.title""", (min_score,), ).fetchall() return [dict(r) for r in rows] # --- Idea Embeddings --- def store_idea_embedding(self, idea_id: int, model: str, vector: np.ndarray) -> None: self.conn.execute( """INSERT INTO idea_embeddings (idea_id, model, vector, created_at) VALUES (?, ?, ?, ?) ON CONFLICT(idea_id) DO UPDATE SET model=excluded.model, vector=excluded.vector, created_at=excluded.created_at """, (idea_id, model, vector.astype(np.float32).tobytes(), datetime.now(timezone.utc).isoformat()), ) self.conn.commit() def all_idea_embeddings(self) -> dict[int, np.ndarray]: rows = self.conn.execute("SELECT idea_id, vector FROM idea_embeddings").fetchall() return { r["idea_id"]: np.frombuffer(r["vector"], dtype=np.float32) for r in rows } def ideas_without_embeddings(self, limit: int = 500) -> list[dict]: rows = self.conn.execute( """SELECT i.id, i.title, i.description, i.idea_type, i.draft_name FROM ideas i LEFT JOIN idea_embeddings ie ON i.id = ie.idea_id WHERE ie.idea_id IS NULL LIMIT ?""", (limit,), ).fetchall() return [{"id": r["id"], "title": r["title"], "description": r["description"], "type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows] # --- Gaps --- def insert_gaps(self, gaps: list[dict]) -> None: self.conn.execute("DELETE FROM gaps") # Replace old analysis now = datetime.now(timezone.utc).isoformat() for g in gaps: self.conn.execute( """INSERT INTO gaps (topic, description, category, evidence, severity, analyzed_at) VALUES (?, ?, ?, ?, ?, ?)""", (g["topic"], g["description"], g.get("category", ""), g.get("evidence", ""), g.get("severity", "medium"), now), ) self.conn.commit() def all_gaps(self) -> list[dict]: rows = self.conn.execute("SELECT * FROM gaps ORDER BY id").fetchall() return [{"id": r["id"], "topic": r["topic"], "description": r["description"], "category": r["category"], "evidence": r["evidence"], "severity": r["severity"]} for r in rows] # --- Refs --- def insert_refs(self, draft_name: str, refs: list[tuple[str, str]]) -> None: """Insert cross-references for a draft. refs = [(ref_type, ref_id), ...].""" for ref_type, ref_id in refs: self.conn.execute( """INSERT OR IGNORE INTO draft_refs (draft_name, ref_type, ref_id) VALUES (?, ?, ?)""", (draft_name, ref_type, ref_id), ) self.conn.commit() def get_refs_for_draft(self, draft_name: str) -> list[tuple[str, str]]: """Return [(ref_type, ref_id)] for a draft.""" rows = self.conn.execute( "SELECT ref_type, ref_id FROM draft_refs WHERE draft_name = ?", (draft_name,), ).fetchall() return [(r["ref_type"], r["ref_id"]) for r in rows] def top_referenced(self, ref_type: str = "rfc", limit: int = 30) -> list[tuple[str, int, list[str]]]: """Return (ref_id, count, [draft_names]) for most-referenced items.""" rows = self.conn.execute( """SELECT ref_id, COUNT(*) as cnt, GROUP_CONCAT(draft_name, '||') as drafts FROM draft_refs WHERE ref_type = ? GROUP BY ref_id ORDER BY cnt DESC LIMIT ?""", (ref_type, limit), ).fetchall() return [ (r["ref_id"], r["cnt"], r["drafts"].split("||") if r["drafts"] else []) for r in rows ] def drafts_referencing(self, ref_type: str, ref_id: str) -> list[str]: """Return draft names that reference a specific RFC/draft/BCP.""" rows = self.conn.execute( "SELECT draft_name FROM draft_refs WHERE ref_type = ? AND ref_id = ?", (ref_type, ref_id), ).fetchall() return [r["draft_name"] for r in rows] def ref_counts_by_draft(self) -> list[tuple[str, int, int, int]]: """Return (draft_name, rfc_count, draft_count, bcp_count) for all drafts with refs.""" rows = self.conn.execute( """SELECT draft_name, SUM(CASE WHEN ref_type = 'rfc' THEN 1 ELSE 0 END) as rfcs, SUM(CASE WHEN ref_type = 'draft' THEN 1 ELSE 0 END) as drafts, SUM(CASE WHEN ref_type = 'bcp' THEN 1 ELSE 0 END) as bcps FROM draft_refs GROUP BY draft_name ORDER BY rfcs DESC""" ).fetchall() return [(r["draft_name"], r["rfcs"], r["drafts"], r["bcps"]) for r in rows] def drafts_without_refs(self, limit: int = 500) -> list[str]: """Return draft names that have full_text but no refs extracted yet.""" rows = self.conn.execute( """SELECT d.name FROM drafts d LEFT JOIN draft_refs dr ON d.name = dr.draft_name WHERE d.full_text IS NOT NULL AND dr.draft_name IS NULL LIMIT ?""", (limit,), ).fetchall() return [r["name"] for r in rows] def ref_stats(self) -> dict: """Return summary stats for refs table.""" row = self.conn.execute( """SELECT COUNT(DISTINCT draft_name) as drafts_with_refs, COUNT(*) as total_refs, SUM(CASE WHEN ref_type = 'rfc' THEN 1 ELSE 0 END) as rfc_refs, SUM(CASE WHEN ref_type = 'draft' THEN 1 ELSE 0 END) as draft_refs, SUM(CASE WHEN ref_type = 'bcp' THEN 1 ELSE 0 END) as bcp_refs, COUNT(DISTINCT ref_id) as unique_refs FROM draft_refs""" ).fetchone() return dict(row) # --- Generated Drafts --- def upsert_generated_draft(self, data: dict) -> int: """Insert or update a generated draft. Returns row id.""" now = datetime.now(timezone.utc).isoformat() existing = self.conn.execute( "SELECT id FROM generated_drafts WHERE draft_name = ? AND version = ?", (data["draft_name"], data.get("version", 0)), ).fetchone() if existing: self.conn.execute( """UPDATE generated_drafts SET gap_topic=?, title=?, abstract=?, outline_json=?, sections_json=?, full_text=?, family_name=?, family_role=?, rating_json=?, novelty_score=?, quality_score=?, status=? WHERE id=?""", (data["gap_topic"], data["title"], data.get("abstract", ""), json.dumps(data.get("outline", {})), json.dumps(data.get("sections", [])), data.get("full_text"), data.get("family_name", ""), data.get("family_role", ""), json.dumps(data.get("rating", {})), data.get("novelty_score", 0.0), data.get("quality_score", 0.0), data.get("status", "draft"), existing["id"]), ) self.conn.commit() return existing["id"] else: cur = self.conn.execute( """INSERT INTO generated_drafts (gap_topic, draft_name, title, abstract, outline_json, sections_json, full_text, family_name, family_role, version, rating_json, novelty_score, quality_score, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (data["gap_topic"], data["draft_name"], data["title"], data.get("abstract", ""), json.dumps(data.get("outline", {})), json.dumps(data.get("sections", [])), data.get("full_text"), data.get("family_name", ""), data.get("family_role", ""), data.get("version", 0), json.dumps(data.get("rating", {})), data.get("novelty_score", 0.0), data.get("quality_score", 0.0), data.get("status", "draft"), now), ) self.conn.commit() return cur.lastrowid def get_generated_drafts(self, status: str | None = None) -> list[dict]: query = "SELECT * FROM generated_drafts" params: list = [] if status: query += " WHERE status = ?" params.append(status) query += " ORDER BY created_at DESC" rows = self.conn.execute(query, params).fetchall() return [dict(r) for r in rows] def get_generated_draft(self, draft_id: int) -> dict | None: row = self.conn.execute( "SELECT * FROM generated_drafts WHERE id = ?", (draft_id,) ).fetchone() return dict(row) if row else None def get_family_drafts(self, family_name: str) -> list[dict]: rows = self.conn.execute( "SELECT * FROM generated_drafts WHERE family_name = ? ORDER BY family_role", (family_name,), ).fetchall() return [dict(r) for r in rows] def log_generation_run(self, data: dict) -> int: now = datetime.now(timezone.utc).isoformat() cur = self.conn.execute( """INSERT INTO generation_runs (family_name, gap_ids, total_input_tokens, total_output_tokens, model_used, status, started_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (data.get("family_name", ""), json.dumps(data.get("gap_ids", [])), data.get("total_input_tokens", 0), data.get("total_output_tokens", 0), data.get("model_used", ""), data.get("status", "running"), now), ) self.conn.commit() return cur.lastrowid _GENERATION_RUN_COLUMNS = frozenset({ "family_name", "gap_ids", "total_input_tokens", "total_output_tokens", "model_used", "status", "started_at", "completed_at", }) def update_generation_run(self, run_id: int, **kwargs) -> None: sets = [] params = [] for k, v in kwargs.items(): if k not in self._GENERATION_RUN_COLUMNS: raise ValueError(f"Invalid column for generation_runs: {k!r}") sets.append(f"{k} = ?") params.append(v) if not sets: return params.append(run_id) self.conn.execute( f"UPDATE generation_runs SET {', '.join(sets)} WHERE id = ?", params ) self.conn.commit() # --- Observatory --- def upsert_source(self, name: str, doc_count: int = 0) -> None: now = datetime.now(timezone.utc).isoformat() self.conn.execute( """INSERT INTO sources (name, last_fetch, doc_count) VALUES (?, ?, ?) ON CONFLICT(name) DO UPDATE SET last_fetch=excluded.last_fetch, doc_count=excluded.doc_count""", (name, now, doc_count), ) self.conn.commit() def get_source(self, name: str) -> dict | None: row = self.conn.execute("SELECT * FROM sources WHERE name = ?", (name,)).fetchone() return dict(row) if row else None def all_sources(self) -> list[dict]: rows = self.conn.execute("SELECT * FROM sources ORDER BY name").fetchall() return [dict(r) for r in rows] def create_snapshot(self) -> int: now = datetime.now(timezone.utc).isoformat() total = self.count_drafts() # Count new since last snapshot last = self.conn.execute( "SELECT snapshot_at FROM observatory_snapshots ORDER BY id DESC LIMIT 1" ).fetchone() new_count = 0 if last: new_count = self.conn.execute( "SELECT COUNT(*) FROM drafts WHERE fetched_at > ?", (last["snapshot_at"],) ).fetchone()[0] else: new_count = total cur = self.conn.execute( """INSERT INTO observatory_snapshots (snapshot_at, total_docs, new_since_last, changed_gaps) VALUES (?, ?, ?, 0)""", (now, total, new_count), ) self.conn.commit() return cur.lastrowid def record_gap_history(self, snapshot_id: int, gaps: list[dict]) -> None: now = datetime.now(timezone.utc).isoformat() for g in gaps: self.conn.execute( """INSERT INTO gap_history (snapshot_id, gap_topic, gap_description, severity, status, recorded_at) VALUES (?, ?, ?, ?, ?, ?)""", (snapshot_id, g["topic"], g["description"], g.get("severity", "medium"), g.get("status", "open"), now), ) self.conn.commit() def gap_history_timeline(self) -> list[dict]: rows = self.conn.execute( """SELECT gh.*, os.snapshot_at FROM gap_history gh JOIN observatory_snapshots os ON gh.snapshot_id = os.id ORDER BY os.snapshot_at, gh.gap_topic""" ).fetchall() return [dict(r) for r in rows] def get_snapshots(self, limit: int = 20) -> list[dict]: rows = self.conn.execute( "SELECT * FROM observatory_snapshots ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def drafts_by_source(self, source: str, limit: int = 500) -> list[Draft]: rows = self.conn.execute( "SELECT * FROM drafts WHERE source = ? ORDER BY time DESC LIMIT ?", (source, limit), ).fetchall() return [self._row_to_draft(r) for r in rows] # --- WG/Status --- def draft_adoption_status(self) -> list[dict]: """Return adoption status for all drafts based on naming convention. Returns list of dicts: {name, title, time, wg_adopted, wg_name, stream} """ import re rows = self.conn.execute( 'SELECT name, title, time FROM drafts' ).fetchall() results = [] for r in rows: name = r["name"] wg_adopted = False wg_name = "" stream = "individual" # Primary signal: draft-ietf-{wg}-* naming convention m = re.match(r'^draft-ietf-(\w+)-', name) if m: wg_adopted = True wg_name = m.group(1) stream = "ietf" elif name.startswith("draft-irtf-"): m2 = re.match(r'^draft-irtf-(\w+)-', name) wg_name = m2.group(1) if m2 else "" stream = "irtf" results.append({ "name": name, "title": r["title"], "time": r["time"], "wg_adopted": wg_adopted, "wg_name": wg_name, "stream": stream, }) return results def revision_velocity(self) -> list[dict]: """Return revision data for all drafts. Returns list of dicts: {name, title, time, rev, rev_int} """ rows = self.conn.execute( "SELECT name, title, time, rev FROM drafts" ).fetchall() return [ { "name": r["name"], "title": r["title"], "time": r["time"], "rev": r["rev"], "rev_int": int(r["rev"]) if r["rev"].isdigit() else 0, } for r in rows ] # --- Working Groups --- def wg_summary(self) -> list[dict]: """Return per-WG summary: group, draft_count, avg scores, categories, idea_count. Excludes 'none' (individual submissions) — those are returned separately. """ rows = self.conn.execute(""" SELECT d."group" as wg, COUNT(*) as draft_count, AVG(r.novelty) as avg_novelty, AVG(r.maturity) as avg_maturity, AVG(r.overlap) as avg_overlap, AVG(r.momentum) as avg_momentum, AVG(r.relevance) as avg_relevance, (SELECT COUNT(*) FROM ideas i WHERE i.draft_name IN (SELECT name FROM drafts WHERE "group" = d."group")) as idea_count FROM drafts d LEFT JOIN ratings r ON d.name = r.draft_name WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none' GROUP BY d."group" ORDER BY draft_count DESC """).fetchall() # Build categories per WG from a separate query cat_rows = self.conn.execute(""" SELECT d."group" as wg, r.categories FROM drafts d JOIN ratings r ON d.name = r.draft_name WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none' """).fetchall() wg_cats: dict[str, dict[str, int]] = {} for cr in cat_rows: wg = cr["wg"] if wg not in wg_cats: wg_cats[wg] = {} try: for c in json.loads(cr["categories"]): c = normalize_category(c) wg_cats[wg][c] = wg_cats[wg].get(c, 0) + 1 except (json.JSONDecodeError, TypeError): pass results = [] for r in rows: results.append({ "wg": r["wg"], "draft_count": r["draft_count"], "avg_novelty": round(r["avg_novelty"] or 0, 1), "avg_maturity": round(r["avg_maturity"] or 0, 1), "avg_overlap": round(r["avg_overlap"] or 0, 1), "avg_momentum": round(r["avg_momentum"] or 0, 1), "avg_relevance": round(r["avg_relevance"] or 0, 1), "categories": wg_cats.get(r["wg"], {}), "idea_count": r["idea_count"], }) return results def wg_drafts(self, wg: str) -> list[Draft]: """Return all drafts for a specific working group.""" rows = self.conn.execute( 'SELECT * FROM drafts WHERE "group" = ? ORDER BY time DESC', (wg,) ).fetchall() return [self._row_to_draft(r) for r in rows] def wg_category_matrix(self) -> dict[str, dict[str, int]]: """Return {wg: {category: count}} matrix for all WGs (excluding 'none').""" rows = self.conn.execute(""" SELECT d."group" as wg, r.categories FROM drafts d JOIN ratings r ON d.name = r.draft_name WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none' """).fetchall() matrix: dict[str, dict[str, int]] = {} for r in rows: wg = r["wg"] if wg not in matrix: matrix[wg] = {} try: for c in json.loads(r["categories"]): c = normalize_category(c) matrix[wg][c] = matrix[wg].get(c, 0) + 1 except (json.JSONDecodeError, TypeError): pass return matrix def wg_idea_overlap(self) -> list[dict]: """Find ideas that appear across multiple WGs — signals for alignment. Returns list of {idea_title, wgs: [{wg, draft_name, draft_title}], wg_count}. """ rows = self.conn.execute(""" SELECT i.title as idea_title, i.description, d."group" as wg, d.name as draft_name, d.title as draft_title FROM ideas i JOIN drafts d ON i.draft_name = d.name WHERE d."group" IS NOT NULL AND d."group" != '' ORDER BY i.title, d."group" """).fetchall() # Group by idea title from collections import defaultdict idea_groups: dict[str, list[dict]] = defaultdict(list) for r in rows: idea_groups[r["idea_title"]].append({ "wg": r["wg"], "draft_name": r["draft_name"], "draft_title": r["draft_title"], }) # Only keep ideas spanning 2+ distinct WGs results = [] for title, entries in idea_groups.items(): wgs = set(e["wg"] for e in entries) if len(wgs) >= 2: results.append({ "idea_title": title, "wgs": entries, "wg_count": len(wgs), "wg_names": sorted(wgs), }) return sorted(results, key=lambda x: x["wg_count"], reverse=True) def individual_vs_wg_categories(self) -> dict[str, dict[str, int]]: """Compare category distribution: individual submissions vs WG-adopted. Returns {"individual": {cat: count}, "wg_adopted": {cat: count}}. """ rows = self.conn.execute(""" SELECT CASE WHEN d."group" = 'none' OR d."group" IS NULL THEN 'individual' ELSE 'wg_adopted' END as stream, r.categories FROM drafts d JOIN ratings r ON d.name = r.draft_name """).fetchall() result: dict[str, dict[str, int]] = {"individual": {}, "wg_adopted": {}} for r in rows: stream = r["stream"] try: for c in json.loads(r["categories"]): c = normalize_category(c) result[stream][c] = result[stream].get(c, 0) + 1 except (json.JSONDecodeError, TypeError): pass return result def category_wg_spread(self) -> list[dict]: """For each category, which WGs contribute drafts? High spread = alignment opportunity. Returns [{category, wgs: [{wg, count}], wg_count, total_drafts}]. """ rows = self.conn.execute(""" SELECT d."group" as wg, r.categories FROM drafts d JOIN ratings r ON d.name = r.draft_name WHERE d."group" IS NOT NULL AND d."group" != '' """).fetchall() from collections import defaultdict cat_wgs: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for r in rows: wg = r["wg"] try: for c in json.loads(r["categories"]): c = normalize_category(c) cat_wgs[c][wg] += 1 except (json.JSONDecodeError, TypeError): pass results = [] for cat, wg_counts in cat_wgs.items(): wg_list = sorted(wg_counts.items(), key=lambda x: x[1], reverse=True) results.append({ "category": cat, "wgs": [{"wg": wg, "count": cnt} for wg, cnt in wg_list], "wg_count": len(wg_list), "total_drafts": sum(wg_counts.values()), }) return sorted(results, key=lambda x: x["wg_count"], reverse=True) # --- Monitor Runs --- def start_monitor_run(self) -> int: now = datetime.now(timezone.utc).isoformat() cur = self.conn.execute( "INSERT INTO monitor_runs (started_at, status) VALUES (?, 'running')", (now,), ) self.conn.commit() return cur.lastrowid def complete_monitor_run(self, run_id: int, stats: dict) -> None: now = datetime.now(timezone.utc).isoformat() started = self.conn.execute( "SELECT started_at FROM monitor_runs WHERE id = ?", (run_id,) ).fetchone() duration = 0.0 if started: try: start_dt = datetime.fromisoformat(started["started_at"]) duration = (datetime.now(timezone.utc) - start_dt).total_seconds() except (ValueError, TypeError): pass self.conn.execute( """UPDATE monitor_runs SET status='completed', completed_at=?, new_drafts_found=?, drafts_analyzed=?, drafts_embedded=?, ideas_extracted=?, duration_seconds=? WHERE id=?""", (now, stats.get("new_drafts_found", 0), stats.get("drafts_analyzed", 0), stats.get("drafts_embedded", 0), stats.get("ideas_extracted", 0), duration, run_id), ) self.conn.commit() def fail_monitor_run(self, run_id: int, error: str) -> None: now = datetime.now(timezone.utc).isoformat() started = self.conn.execute( "SELECT started_at FROM monitor_runs WHERE id = ?", (run_id,) ).fetchone() duration = 0.0 if started: try: start_dt = datetime.fromisoformat(started["started_at"]) duration = (datetime.now(timezone.utc) - start_dt).total_seconds() except (ValueError, TypeError): pass self.conn.execute( """UPDATE monitor_runs SET status='failed', completed_at=?, error_message=?, duration_seconds=? WHERE id=?""", (now, error, duration, run_id), ) self.conn.commit() def get_monitor_runs(self, limit: int = 20) -> list[dict]: rows = self.conn.execute( "SELECT * FROM monitor_runs ORDER BY started_at DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def get_last_successful_run(self) -> dict | None: row = self.conn.execute( "SELECT * FROM monitor_runs WHERE status='completed' ORDER BY started_at DESC LIMIT 1" ).fetchone() return dict(row) if row else None # --- Annotations --- def upsert_annotation(self, draft_name: str, note: str | None = None, tags: list[str] | None = None) -> None: """Insert or update an annotation for a draft.""" now = datetime.now(timezone.utc).isoformat() existing = self.conn.execute( "SELECT id, note, tags FROM annotations WHERE draft_name = ?", (draft_name,), ).fetchone() if existing: current_note = note if note is not None else existing["note"] current_tags = tags if tags is not None else json.loads(existing["tags"] or "[]") self.conn.execute( "UPDATE annotations SET note = ?, tags = ?, updated_at = ? WHERE draft_name = ?", (current_note, json.dumps(current_tags), now, draft_name), ) else: self.conn.execute( """INSERT INTO annotations (draft_name, note, tags, created_at, updated_at) VALUES (?, ?, ?, ?, ?)""", (draft_name, note or "", json.dumps(tags or []), now, now), ) self.conn.commit() def get_annotation(self, draft_name: str) -> dict | None: """Return annotation for a draft, or None.""" row = self.conn.execute( "SELECT * FROM annotations WHERE draft_name = ?", (draft_name,) ).fetchone() if not row: return None return { "id": row["id"], "draft_name": row["draft_name"], "note": row["note"], "tags": json.loads(row["tags"] or "[]"), "created_at": row["created_at"], "updated_at": row["updated_at"], } def get_all_annotations(self) -> list[dict]: """Return all annotations.""" rows = self.conn.execute( "SELECT * FROM annotations ORDER BY updated_at DESC" ).fetchall() return [ { "id": r["id"], "draft_name": r["draft_name"], "note": r["note"], "tags": json.loads(r["tags"] or "[]"), "created_at": r["created_at"], "updated_at": r["updated_at"], } for r in rows ] def search_by_tag(self, tag: str) -> list[str]: """Return draft names that have a specific tag in their annotation.""" rows = self.conn.execute( "SELECT draft_name, tags FROM annotations" ).fetchall() results = [] for r in rows: tags = json.loads(r["tags"] or "[]") if tag in tags: results.append(r["draft_name"]) return results # --- Helpers --- @staticmethod def _row_to_draft(row: sqlite3.Row) -> Draft: d = dict(row) return Draft( name=d["name"], rev=d["rev"], title=d["title"], abstract=d["abstract"], time=d["time"], dt_id=d.get("dt_id"), pages=d.get("pages"), words=d.get("words"), group=d.get("group"), group_uri=d.get("group_uri"), expires=d.get("expires"), ad=d.get("ad"), shepherd=d.get("shepherd"), states=json.loads(d.get("states") or "[]"), full_text=d.get("full_text"), categories=json.loads(d.get("categories") or "[]"), tags=json.loads(d.get("tags") or "[]"), fetched_at=d.get("fetched_at"), source=d.get("source", "ietf"), source_id=d.get("source_id", ""), source_url=d.get("source_url", ""), doc_status=d.get("doc_status", ""), ) @staticmethod def _row_to_rating(row: sqlite3.Row) -> Rating: d = dict(row) raw_cats = json.loads(d.get("categories") or "[]") return Rating( draft_name=d["draft_name"], novelty=d["novelty"], maturity=d["maturity"], overlap=d["overlap"], momentum=d["momentum"], relevance=d["relevance"], summary=d["summary"], novelty_note=d.get("novelty_note", ""), maturity_note=d.get("maturity_note", ""), overlap_note=d.get("overlap_note", ""), momentum_note=d.get("momentum_note", ""), relevance_note=d.get("relevance_note", ""), categories=[normalize_category(c) for c in raw_cats], rated_at=d.get("rated_at"), )