Add full proposal system: DB schema (proposals + proposal_gaps tables), CLI `ietf intake` command, and web UI with Quick Generate on /proposals/new. The new page merges AI intake (paste URL/text → Haiku generates multiple proposals auto-linked to gaps) with manual form entry. Generated proposals are clickable cards that fill the editor below for refinement. Uses claude_model_cheap (Haiku) for cost-efficient web intake. Includes CaML-inspired draft proposals from arXiv:2503.18813 analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1691 lines
66 KiB
Python
1691 lines
66 KiB
Python
"""SQLite database layer with FTS5 full-text search."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
from .config import Config
|
|
from .models import Author, Draft, Rating, normalize_category
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS drafts (
|
|
name TEXT PRIMARY KEY,
|
|
rev TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
abstract TEXT NOT NULL DEFAULT '',
|
|
time TEXT,
|
|
dt_id INTEGER,
|
|
pages INTEGER,
|
|
words INTEGER,
|
|
"group" TEXT,
|
|
group_uri TEXT,
|
|
expires TEXT,
|
|
ad TEXT,
|
|
shepherd TEXT,
|
|
states TEXT DEFAULT '[]', -- JSON array
|
|
full_text TEXT,
|
|
categories TEXT DEFAULT '[]', -- JSON array
|
|
tags TEXT DEFAULT '[]', -- JSON array
|
|
fetched_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS ratings (
|
|
draft_name TEXT PRIMARY KEY REFERENCES drafts(name),
|
|
novelty INTEGER NOT NULL,
|
|
maturity INTEGER NOT NULL,
|
|
overlap INTEGER NOT NULL,
|
|
momentum INTEGER NOT NULL,
|
|
relevance INTEGER NOT NULL,
|
|
summary TEXT NOT NULL DEFAULT '',
|
|
novelty_note TEXT DEFAULT '',
|
|
maturity_note TEXT DEFAULT '',
|
|
overlap_note TEXT DEFAULT '',
|
|
momentum_note TEXT DEFAULT '',
|
|
relevance_note TEXT DEFAULT '',
|
|
categories TEXT DEFAULT '[]', -- JSON array
|
|
rated_at TEXT,
|
|
false_positive INTEGER DEFAULT 0 -- 1 = flagged as not AI-agent related
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS embeddings (
|
|
draft_name TEXT PRIMARY KEY REFERENCES drafts(name),
|
|
model TEXT NOT NULL,
|
|
vector BLOB NOT NULL, -- numpy float32 array as bytes
|
|
created_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS llm_cache (
|
|
draft_name TEXT NOT NULL,
|
|
prompt_hash TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
request_json TEXT NOT NULL, -- full prompt sent
|
|
response_json TEXT NOT NULL, -- raw Claude response
|
|
input_tokens INTEGER,
|
|
output_tokens INTEGER,
|
|
created_at TEXT,
|
|
PRIMARY KEY (draft_name, prompt_hash)
|
|
);
|
|
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS drafts_fts USING fts5(
|
|
name, title, abstract, full_text,
|
|
content='drafts',
|
|
content_rowid='rowid'
|
|
);
|
|
|
|
-- Authors (fetched from Datatracker)
|
|
CREATE TABLE IF NOT EXISTS authors (
|
|
person_id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
ascii_name TEXT,
|
|
affiliation TEXT DEFAULT '',
|
|
resource_uri TEXT,
|
|
fetched_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS draft_authors (
|
|
draft_name TEXT NOT NULL REFERENCES drafts(name),
|
|
person_id INTEGER NOT NULL REFERENCES authors(person_id),
|
|
author_order INTEGER DEFAULT 1,
|
|
affiliation TEXT DEFAULT '',
|
|
PRIMARY KEY (draft_name, person_id)
|
|
);
|
|
|
|
-- Extracted ideas
|
|
CREATE TABLE IF NOT EXISTS ideas (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
draft_name TEXT NOT NULL REFERENCES drafts(name),
|
|
title TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
idea_type TEXT DEFAULT '',
|
|
extracted_at TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ideas_draft ON ideas(draft_name);
|
|
|
|
-- Idea embeddings (for clustering)
|
|
CREATE TABLE IF NOT EXISTS idea_embeddings (
|
|
idea_id INTEGER PRIMARY KEY REFERENCES ideas(id),
|
|
model TEXT NOT NULL,
|
|
vector BLOB NOT NULL,
|
|
created_at TEXT
|
|
);
|
|
|
|
-- Gap analysis results
|
|
CREATE TABLE IF NOT EXISTS gaps (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
topic TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
category TEXT DEFAULT '',
|
|
evidence TEXT DEFAULT '',
|
|
severity TEXT DEFAULT 'medium',
|
|
analyzed_at TEXT
|
|
);
|
|
|
|
-- Cross-references (RFC, draft, BCP references found in draft text)
|
|
CREATE TABLE IF NOT EXISTS draft_refs (
|
|
draft_name TEXT NOT NULL REFERENCES drafts(name),
|
|
ref_type TEXT NOT NULL, -- 'rfc', 'draft', 'bcp'
|
|
ref_id TEXT NOT NULL, -- e.g. '8259', 'draft-ietf-httpbis-semantics', 'BCP14'
|
|
UNIQUE(draft_name, ref_type, ref_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_draft_refs_ref ON draft_refs(ref_type, ref_id);
|
|
|
|
-- Generated drafts from gap-to-draft pipeline
|
|
CREATE TABLE IF NOT EXISTS generated_drafts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
gap_topic TEXT NOT NULL,
|
|
draft_name TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
abstract TEXT NOT NULL DEFAULT '',
|
|
outline_json TEXT DEFAULT '{}',
|
|
sections_json TEXT DEFAULT '[]',
|
|
full_text TEXT,
|
|
family_name TEXT DEFAULT '',
|
|
family_role TEXT DEFAULT '',
|
|
version INTEGER DEFAULT 0,
|
|
rating_json TEXT DEFAULT '{}',
|
|
novelty_score REAL DEFAULT 0.0,
|
|
quality_score REAL DEFAULT 0.0,
|
|
status TEXT DEFAULT 'draft',
|
|
created_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS generation_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
family_name TEXT DEFAULT '',
|
|
gap_ids TEXT DEFAULT '[]',
|
|
total_input_tokens INTEGER DEFAULT 0,
|
|
total_output_tokens INTEGER DEFAULT 0,
|
|
model_used TEXT DEFAULT '',
|
|
status TEXT DEFAULT 'running',
|
|
started_at TEXT,
|
|
completed_at TEXT
|
|
);
|
|
|
|
-- Observatory tables
|
|
CREATE TABLE IF NOT EXISTS sources (
|
|
name TEXT PRIMARY KEY,
|
|
last_fetch TEXT,
|
|
doc_count INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS observatory_snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
snapshot_at TEXT NOT NULL,
|
|
total_docs INTEGER DEFAULT 0,
|
|
new_since_last INTEGER DEFAULT 0,
|
|
changed_gaps INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS gap_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
snapshot_id INTEGER REFERENCES observatory_snapshots(id),
|
|
gap_topic TEXT NOT NULL,
|
|
gap_description TEXT NOT NULL,
|
|
severity TEXT DEFAULT 'medium',
|
|
status TEXT DEFAULT 'open',
|
|
recorded_at TEXT
|
|
);
|
|
|
|
-- Draft proposals (user's own IETF draft ideas)
|
|
CREATE TABLE IF NOT EXISTS proposals (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL,
|
|
slug TEXT NOT NULL UNIQUE,
|
|
status TEXT DEFAULT 'idea',
|
|
description TEXT DEFAULT '',
|
|
content_md TEXT DEFAULT '',
|
|
source_paper TEXT DEFAULT '',
|
|
source_url TEXT DEFAULT '',
|
|
intended_wg TEXT DEFAULT '',
|
|
draft_name TEXT DEFAULT '',
|
|
created_at TEXT,
|
|
updated_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS proposal_gaps (
|
|
proposal_id INTEGER NOT NULL REFERENCES proposals(id) ON DELETE CASCADE,
|
|
gap_id INTEGER NOT NULL REFERENCES gaps(id),
|
|
PRIMARY KEY (proposal_id, gap_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_proposal_gaps_gap ON proposal_gaps(gap_id);
|
|
|
|
-- Annotations (user notes + tags per draft)
|
|
CREATE TABLE IF NOT EXISTS annotations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
draft_name TEXT NOT NULL REFERENCES drafts(name),
|
|
note TEXT DEFAULT '',
|
|
tags TEXT DEFAULT '[]',
|
|
created_at TEXT,
|
|
updated_at TEXT,
|
|
UNIQUE(draft_name)
|
|
);
|
|
|
|
-- Monitor runs
|
|
CREATE TABLE IF NOT EXISTS monitor_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
started_at TEXT NOT NULL,
|
|
completed_at TEXT,
|
|
status TEXT DEFAULT 'running',
|
|
new_drafts_found INTEGER DEFAULT 0,
|
|
drafts_analyzed INTEGER DEFAULT 0,
|
|
drafts_embedded INTEGER DEFAULT 0,
|
|
ideas_extracted INTEGER DEFAULT 0,
|
|
error_message TEXT DEFAULT '',
|
|
duration_seconds REAL DEFAULT 0
|
|
);
|
|
|
|
-- Triggers to keep FTS index in sync
|
|
CREATE TRIGGER IF NOT EXISTS drafts_ai AFTER INSERT ON drafts BEGIN
|
|
INSERT INTO drafts_fts(rowid, name, title, abstract, full_text)
|
|
VALUES (new.rowid, new.name, new.title, new.abstract, new.full_text);
|
|
END;
|
|
|
|
CREATE TRIGGER IF NOT EXISTS drafts_ad AFTER DELETE ON drafts BEGIN
|
|
INSERT INTO drafts_fts(drafts_fts, rowid, name, title, abstract, full_text)
|
|
VALUES ('delete', old.rowid, old.name, old.title, old.abstract, old.full_text);
|
|
END;
|
|
|
|
CREATE TRIGGER IF NOT EXISTS drafts_au AFTER UPDATE ON drafts BEGIN
|
|
INSERT INTO drafts_fts(drafts_fts, rowid, name, title, abstract, full_text)
|
|
VALUES ('delete', old.rowid, old.name, old.title, old.abstract, old.full_text);
|
|
INSERT INTO drafts_fts(rowid, name, title, abstract, full_text)
|
|
VALUES (new.rowid, new.name, new.title, new.abstract, new.full_text);
|
|
END;
|
|
"""
|
|
|
|
|
|
class Database:
|
|
def __init__(self, config: Config | None = None):
|
|
self.config = config or Config.load()
|
|
self.db_path = self.config.db_path
|
|
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
self._conn: sqlite3.Connection | None = None
|
|
|
|
@property
|
|
def conn(self) -> sqlite3.Connection:
|
|
if self._conn is None:
|
|
self._conn = sqlite3.connect(self.db_path)
|
|
self._conn.row_factory = sqlite3.Row
|
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
|
self._conn.execute("PRAGMA foreign_keys=ON")
|
|
self._conn.executescript(SCHEMA)
|
|
self._migrate_schema()
|
|
return self._conn
|
|
|
|
def _migrate_schema(self) -> None:
|
|
"""Additive migration — add columns if missing."""
|
|
cols = {r[1] for r in self._conn.execute("PRAGMA table_info(drafts)").fetchall()}
|
|
migrations = [
|
|
("source", "TEXT DEFAULT 'ietf'"),
|
|
("source_id", "TEXT DEFAULT ''"),
|
|
("source_url", "TEXT DEFAULT ''"),
|
|
("doc_status", "TEXT DEFAULT ''"),
|
|
]
|
|
for col, typedef in migrations:
|
|
if col not in cols:
|
|
self._conn.execute(f"ALTER TABLE drafts ADD COLUMN {col} {typedef}")
|
|
|
|
# ratings table migrations
|
|
rating_cols = {r[1] for r in self._conn.execute("PRAGMA table_info(ratings)").fetchall()}
|
|
if "false_positive" not in rating_cols:
|
|
self._conn.execute("ALTER TABLE ratings ADD COLUMN false_positive INTEGER DEFAULT 0")
|
|
|
|
# ideas table migrations
|
|
idea_cols = {r[1] for r in self._conn.execute("PRAGMA table_info(ideas)").fetchall()}
|
|
if "novelty_score" not in idea_cols:
|
|
self._conn.execute("ALTER TABLE ideas ADD COLUMN novelty_score INTEGER")
|
|
|
|
self._conn.commit()
|
|
|
|
def close(self) -> None:
|
|
if self._conn:
|
|
self._conn.close()
|
|
self._conn = None
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
self.close()
|
|
|
|
# --- Drafts ---
|
|
|
|
def upsert_draft(self, draft: Draft) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO drafts (name, rev, title, abstract, time, dt_id, pages, words,
|
|
"group", group_uri, expires, ad, shepherd, states, full_text, categories, tags, fetched_at,
|
|
source, source_id, source_url, doc_status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(name) DO UPDATE SET
|
|
rev=excluded.rev, title=excluded.title, abstract=excluded.abstract,
|
|
time=excluded.time, dt_id=excluded.dt_id, pages=excluded.pages,
|
|
words=excluded.words, "group"=excluded."group", group_uri=excluded.group_uri,
|
|
expires=excluded.expires, ad=excluded.ad, shepherd=excluded.shepherd,
|
|
states=excluded.states,
|
|
full_text=COALESCE(excluded.full_text, full_text),
|
|
categories=excluded.categories, tags=excluded.tags,
|
|
fetched_at=excluded.fetched_at,
|
|
source=excluded.source, source_id=excluded.source_id,
|
|
source_url=excluded.source_url, doc_status=excluded.doc_status
|
|
""",
|
|
(
|
|
draft.name, draft.rev, draft.title, draft.abstract, draft.time,
|
|
draft.dt_id, draft.pages, draft.words, draft.group, draft.group_uri,
|
|
draft.expires, draft.ad, draft.shepherd,
|
|
json.dumps(draft.states), draft.full_text,
|
|
json.dumps(draft.categories), json.dumps(draft.tags),
|
|
draft.fetched_at or datetime.now(timezone.utc).isoformat(),
|
|
draft.source, draft.source_id, draft.source_url, draft.doc_status,
|
|
),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_draft(self, name: str) -> Draft | None:
|
|
row = self.conn.execute("SELECT * FROM drafts WHERE name = ?", (name,)).fetchone()
|
|
if row is None:
|
|
return None
|
|
return self._row_to_draft(row)
|
|
|
|
def get_drafts_by_names(self, names: list[str]) -> dict[str, "Draft"]:
|
|
"""Batch-fetch drafts by name. Returns {name: Draft} dict."""
|
|
if not names:
|
|
return {}
|
|
result = {}
|
|
# SQLite has a variable limit (~999), so chunk if needed
|
|
for i in range(0, len(names), 900):
|
|
chunk = names[i : i + 900]
|
|
placeholders = ",".join("?" for _ in chunk)
|
|
rows = self.conn.execute(
|
|
f"SELECT * FROM drafts WHERE name IN ({placeholders})", chunk
|
|
).fetchall()
|
|
for r in rows:
|
|
d = self._row_to_draft(r)
|
|
result[d.name] = d
|
|
return result
|
|
|
|
def list_drafts(
|
|
self,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
order_by: str = "time DESC",
|
|
) -> list[Draft]:
|
|
# Sanitize order_by to prevent injection
|
|
allowed = {"time", "name", "title", "pages", "words", "fetched_at"}
|
|
parts = order_by.split()
|
|
col = parts[0] if parts else "time"
|
|
direction = parts[1].upper() if len(parts) > 1 else "DESC"
|
|
if col not in allowed:
|
|
col = "time"
|
|
if direction not in ("ASC", "DESC"):
|
|
direction = "DESC"
|
|
safe_order = f'"{col}" {direction}' if col == "group" else f"{col} {direction}"
|
|
|
|
rows = self.conn.execute(
|
|
f"SELECT * FROM drafts ORDER BY {safe_order} LIMIT ? OFFSET ?",
|
|
(limit, offset),
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
def count_drafts(self, include_false_positives: bool = True) -> int:
|
|
if include_false_positives:
|
|
return self.conn.execute("SELECT COUNT(*) FROM drafts").fetchone()[0]
|
|
return self.conn.execute(
|
|
"""SELECT COUNT(*) FROM drafts d
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM ratings r
|
|
WHERE r.draft_name = d.name AND r.false_positive = 1
|
|
)"""
|
|
).fetchone()[0]
|
|
|
|
def search_drafts(self, query: str, limit: int = 50) -> list[Draft]:
|
|
rows = self.conn.execute(
|
|
"""SELECT d.* FROM drafts d
|
|
JOIN drafts_fts f ON d.rowid = f.rowid
|
|
WHERE drafts_fts MATCH ?
|
|
ORDER BY rank
|
|
LIMIT ?""",
|
|
(query, limit),
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
def drafts_without_text(self, limit: int = 100) -> list[Draft]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM drafts WHERE full_text IS NULL LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
# --- Ratings ---
|
|
|
|
def upsert_rating(self, rating: Rating) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO ratings (draft_name, novelty, maturity, overlap, momentum, relevance,
|
|
summary, novelty_note, maturity_note, overlap_note, momentum_note, relevance_note,
|
|
categories, rated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(draft_name) DO UPDATE SET
|
|
novelty=excluded.novelty, maturity=excluded.maturity, overlap=excluded.overlap,
|
|
momentum=excluded.momentum, relevance=excluded.relevance, summary=excluded.summary,
|
|
novelty_note=excluded.novelty_note, maturity_note=excluded.maturity_note,
|
|
overlap_note=excluded.overlap_note, momentum_note=excluded.momentum_note,
|
|
relevance_note=excluded.relevance_note, categories=excluded.categories,
|
|
rated_at=excluded.rated_at
|
|
""",
|
|
(
|
|
rating.draft_name, rating.novelty, rating.maturity, rating.overlap,
|
|
rating.momentum, rating.relevance, rating.summary,
|
|
rating.novelty_note, rating.maturity_note, rating.overlap_note,
|
|
rating.momentum_note, rating.relevance_note,
|
|
json.dumps(rating.categories),
|
|
rating.rated_at or datetime.now(timezone.utc).isoformat(),
|
|
),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_rating(self, draft_name: str) -> Rating | None:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM ratings WHERE draft_name = ?", (draft_name,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return self._row_to_rating(row)
|
|
|
|
def unrated_drafts(self, limit: int = 100) -> list[Draft]:
|
|
rows = self.conn.execute(
|
|
"""SELECT d.* FROM drafts d
|
|
LEFT JOIN ratings r ON d.name = r.draft_name
|
|
WHERE r.draft_name IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
def drafts_with_ratings(
|
|
self, limit: int = 200, include_false_positives: bool = False,
|
|
) -> list[tuple[Draft, Rating]]:
|
|
fp_clause = "" if include_false_positives else "WHERE COALESCE(r.false_positive, 0) = 0"
|
|
rows = self.conn.execute(
|
|
f"""SELECT d.*, r.novelty, r.maturity, r.overlap, r.momentum, r.relevance,
|
|
r.summary, r.novelty_note, r.maturity_note, r.overlap_note,
|
|
r.momentum_note, r.relevance_note, r.categories as r_categories, r.rated_at
|
|
FROM drafts d
|
|
JOIN ratings r ON d.name = r.draft_name
|
|
{fp_clause}
|
|
ORDER BY (r.novelty * 0.30 + r.relevance * 0.25 + r.maturity * 0.20
|
|
+ r.momentum * 0.15 + (6 - r.overlap) * 0.10) DESC
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
results = []
|
|
for r in rows:
|
|
draft = self._row_to_draft(r)
|
|
rating = Rating(
|
|
draft_name=r["draft_name"] if "draft_name" in r.keys() else draft.name,
|
|
novelty=r["novelty"], maturity=r["maturity"], overlap=r["overlap"],
|
|
momentum=r["momentum"], relevance=r["relevance"], summary=r["summary"],
|
|
novelty_note=r["novelty_note"], maturity_note=r["maturity_note"],
|
|
overlap_note=r["overlap_note"], momentum_note=r["momentum_note"],
|
|
relevance_note=r["relevance_note"],
|
|
categories=[normalize_category(c) for c in json.loads(r["r_categories"])] if r["r_categories"] else [],
|
|
rated_at=r["rated_at"],
|
|
)
|
|
results.append((draft, rating))
|
|
return results
|
|
|
|
# --- Embeddings ---
|
|
|
|
def store_embedding(self, draft_name: str, model: str, vector: np.ndarray) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO embeddings (draft_name, model, vector, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(draft_name) DO UPDATE SET
|
|
model=excluded.model, vector=excluded.vector, created_at=excluded.created_at
|
|
""",
|
|
(draft_name, model, vector.astype(np.float32).tobytes(),
|
|
datetime.now(timezone.utc).isoformat()),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_embedding(self, draft_name: str) -> np.ndarray | None:
|
|
row = self.conn.execute(
|
|
"SELECT vector FROM embeddings WHERE draft_name = ?", (draft_name,)
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return np.frombuffer(row["vector"], dtype=np.float32)
|
|
|
|
def all_embeddings(self) -> dict[str, np.ndarray]:
|
|
rows = self.conn.execute("SELECT draft_name, vector FROM embeddings").fetchall()
|
|
return {
|
|
r["draft_name"]: np.frombuffer(r["vector"], dtype=np.float32)
|
|
for r in rows
|
|
}
|
|
|
|
def drafts_without_embeddings(self, limit: int = 500) -> list[str]:
|
|
rows = self.conn.execute(
|
|
"""SELECT d.name FROM drafts d
|
|
LEFT JOIN embeddings e ON d.name = e.draft_name
|
|
WHERE e.draft_name IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [r["name"] for r in rows]
|
|
|
|
# --- LLM Cache ---
|
|
|
|
def cache_response(
|
|
self, draft_name: str, prompt_hash: str, model: str,
|
|
request_json: str, response_json: str,
|
|
input_tokens: int = 0, output_tokens: int = 0,
|
|
) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO llm_cache (draft_name, prompt_hash, model, request_json,
|
|
response_json, input_tokens, output_tokens, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(draft_name, prompt_hash) DO UPDATE SET
|
|
model=excluded.model, response_json=excluded.response_json,
|
|
input_tokens=excluded.input_tokens, output_tokens=excluded.output_tokens,
|
|
created_at=excluded.created_at
|
|
""",
|
|
(draft_name, prompt_hash, model, request_json, response_json,
|
|
input_tokens, output_tokens, datetime.now(timezone.utc).isoformat()),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_cached_response(self, draft_name: str, prompt_hash: str) -> str | None:
|
|
row = self.conn.execute(
|
|
"SELECT response_json FROM llm_cache WHERE draft_name = ? AND prompt_hash = ?",
|
|
(draft_name, prompt_hash),
|
|
).fetchone()
|
|
return row["response_json"] if row else None
|
|
|
|
def total_tokens_used(self) -> tuple[int, int]:
|
|
row = self.conn.execute(
|
|
"SELECT COALESCE(SUM(input_tokens),0), COALESCE(SUM(output_tokens),0) FROM llm_cache"
|
|
).fetchone()
|
|
return (row[0], row[1])
|
|
|
|
# --- Authors ---
|
|
|
|
def upsert_author(self, author: Author) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO authors (person_id, name, ascii_name, affiliation, resource_uri, fetched_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(person_id) DO UPDATE SET
|
|
name=excluded.name, ascii_name=excluded.ascii_name,
|
|
affiliation=excluded.affiliation, resource_uri=excluded.resource_uri,
|
|
fetched_at=excluded.fetched_at
|
|
""",
|
|
(author.person_id, author.name, author.ascii_name,
|
|
author.affiliation, author.resource_uri, author.fetched_at),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def upsert_draft_author(
|
|
self, draft_name: str, person_id: int, order: int = 1, affiliation: str = ""
|
|
) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO draft_authors (draft_name, person_id, author_order, affiliation)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(draft_name, person_id) DO UPDATE SET
|
|
author_order=excluded.author_order, affiliation=excluded.affiliation
|
|
""",
|
|
(draft_name, person_id, order, affiliation),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_authors_for_draft(self, draft_name: str) -> list[Author]:
|
|
rows = self.conn.execute(
|
|
"""SELECT a.* FROM authors a
|
|
JOIN draft_authors da ON a.person_id = da.person_id
|
|
WHERE da.draft_name = ?
|
|
ORDER BY da.author_order""",
|
|
(draft_name,),
|
|
).fetchall()
|
|
results = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
results.append(Author(
|
|
person_id=d["person_id"], name=d["name"],
|
|
ascii_name=d.get("ascii_name", ""),
|
|
affiliation=d.get("affiliation", ""),
|
|
resource_uri=d.get("resource_uri", ""),
|
|
fetched_at=d.get("fetched_at"),
|
|
))
|
|
return results
|
|
|
|
def drafts_without_authors(self, limit: int = 500) -> list[str]:
|
|
rows = self.conn.execute(
|
|
"""SELECT d.name FROM drafts d
|
|
LEFT JOIN draft_authors da ON d.name = da.draft_name
|
|
WHERE da.draft_name IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [r["name"] for r in rows]
|
|
|
|
def author_count(self) -> int:
|
|
return self.conn.execute("SELECT COUNT(*) FROM authors").fetchone()[0]
|
|
|
|
def top_authors(self, limit: int = 20) -> list[tuple[str, str, int, list[str]]]:
|
|
"""Return (name, affiliation, draft_count, [draft_names])."""
|
|
rows = self.conn.execute(
|
|
"""SELECT a.name, a.affiliation, COUNT(da.draft_name) as cnt,
|
|
GROUP_CONCAT(da.draft_name, '||') as drafts
|
|
FROM authors a
|
|
JOIN draft_authors da ON a.person_id = da.person_id
|
|
GROUP BY a.person_id
|
|
ORDER BY cnt DESC
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [
|
|
(r["name"], r["affiliation"], r["cnt"],
|
|
r["drafts"].split("||") if r["drafts"] else [])
|
|
for r in rows
|
|
]
|
|
|
|
def top_orgs(self, limit: int = 20) -> list[tuple[str, int, int]]:
|
|
"""Return (org, author_count, draft_count)."""
|
|
rows = self.conn.execute(
|
|
"""SELECT da.affiliation as org,
|
|
COUNT(DISTINCT da.person_id) as authors,
|
|
COUNT(DISTINCT da.draft_name) as drafts
|
|
FROM draft_authors da
|
|
WHERE da.affiliation != ''
|
|
GROUP BY da.affiliation
|
|
ORDER BY drafts DESC
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [(r["org"], r["authors"], r["drafts"]) for r in rows]
|
|
|
|
def coauthor_pairs(self) -> list[tuple[str, str, int]]:
|
|
"""Return (author_a, author_b, shared_drafts) for all co-author pairs."""
|
|
rows = self.conn.execute(
|
|
"""SELECT a1.name as a, a2.name as b, COUNT(*) as shared
|
|
FROM draft_authors da1
|
|
JOIN draft_authors da2 ON da1.draft_name = da2.draft_name AND da1.person_id < da2.person_id
|
|
JOIN authors a1 ON da1.person_id = a1.person_id
|
|
JOIN authors a2 ON da2.person_id = a2.person_id
|
|
GROUP BY da1.person_id, da2.person_id
|
|
ORDER BY shared DESC"""
|
|
).fetchall()
|
|
return [(r["a"], r["b"], r["shared"]) for r in rows]
|
|
|
|
def cross_org_collaborations(self, limit: int = 20) -> list[tuple[str, str, int]]:
|
|
"""Return (org_a, org_b, shared_drafts) for cross-org collaboration."""
|
|
rows = self.conn.execute(
|
|
"""SELECT da1.affiliation as org_a, da2.affiliation as org_b,
|
|
COUNT(DISTINCT da1.draft_name) as shared
|
|
FROM draft_authors da1
|
|
JOIN draft_authors da2 ON da1.draft_name = da2.draft_name
|
|
AND da1.person_id < da2.person_id
|
|
WHERE da1.affiliation != '' AND da2.affiliation != ''
|
|
AND da1.affiliation != da2.affiliation
|
|
GROUP BY da1.affiliation, da2.affiliation
|
|
ORDER BY shared DESC
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [(r["org_a"], r["org_b"], r["shared"]) for r in rows]
|
|
|
|
def org_data_raw(self) -> list[tuple[str, int, str]]:
|
|
"""Return (affiliation, person_id, draft_name) for all draft_authors with affiliation."""
|
|
rows = self.conn.execute(
|
|
"SELECT affiliation, person_id, draft_name FROM draft_authors WHERE affiliation != ''"
|
|
).fetchall()
|
|
return [(r[0], r[1], r[2]) for r in rows]
|
|
|
|
def author_draft_counts(self) -> dict[int, int]:
|
|
"""Return {person_id: draft_count} for all authors."""
|
|
rows = self.conn.execute(
|
|
"SELECT person_id, COUNT(*) FROM draft_authors GROUP BY person_id"
|
|
).fetchall()
|
|
return {r[0]: r[1] for r in rows}
|
|
|
|
def author_draft_sets(self) -> dict[int, set[str]]:
|
|
"""Return {person_id: set(draft_names)} for all authors."""
|
|
rows = self.conn.execute(
|
|
"SELECT person_id, draft_name FROM draft_authors"
|
|
).fetchall()
|
|
result: dict[int, set[str]] = {}
|
|
for r in rows:
|
|
result.setdefault(r[0], set()).add(r[1])
|
|
return result
|
|
|
|
# --- Ideas ---
|
|
|
|
def insert_ideas(self, draft_name: str, ideas: list[dict]) -> None:
|
|
# Clear existing ideas for this draft first
|
|
self.conn.execute("DELETE FROM ideas WHERE draft_name = ?", (draft_name,))
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
for idea in ideas:
|
|
self.conn.execute(
|
|
"""INSERT INTO ideas (draft_name, title, description, idea_type, extracted_at)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(draft_name, idea["title"], idea["description"],
|
|
idea.get("type", ""), now),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def delete_ideas(self, draft_name: str | None = None) -> int:
|
|
"""Delete ideas from the ideas table.
|
|
|
|
Args:
|
|
draft_name: If provided, delete only ideas for this draft.
|
|
If None, delete all ideas.
|
|
|
|
Returns:
|
|
Number of rows deleted.
|
|
"""
|
|
if draft_name:
|
|
self.conn.execute(
|
|
"DELETE FROM idea_embeddings WHERE idea_id IN (SELECT id FROM ideas WHERE draft_name = ?)", (draft_name,)
|
|
)
|
|
cursor = self.conn.execute(
|
|
"DELETE FROM ideas WHERE draft_name = ?", (draft_name,)
|
|
)
|
|
else:
|
|
self.conn.execute("DELETE FROM idea_embeddings")
|
|
cursor = self.conn.execute("DELETE FROM ideas")
|
|
self.conn.commit()
|
|
return cursor.rowcount
|
|
|
|
def get_ideas_for_draft(self, draft_name: str) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM ideas WHERE draft_name = ?", (draft_name,)
|
|
).fetchall()
|
|
return [{"id": r["id"], "title": r["title"], "description": r["description"],
|
|
"type": r["idea_type"], "draft_name": r["draft_name"],
|
|
"novelty_score": r["novelty_score"]} for r in rows]
|
|
|
|
def delete_idea(self, idea_id: int) -> None:
|
|
"""Delete a single idea and its embedding by ID."""
|
|
self.conn.execute("DELETE FROM idea_embeddings WHERE idea_id = ?", (idea_id,))
|
|
self.conn.execute("DELETE FROM ideas WHERE id = ?", (idea_id,))
|
|
self.conn.commit()
|
|
|
|
def drafts_without_ideas(self, limit: int = 500) -> list[str]:
|
|
rows = self.conn.execute(
|
|
"""SELECT d.name FROM drafts d
|
|
LEFT JOIN ideas i ON d.name = i.draft_name
|
|
LEFT JOIN llm_cache lc ON d.name = lc.draft_name
|
|
AND lc.request_json LIKE 'batch-ideas[%'
|
|
WHERE i.draft_name IS NULL AND lc.draft_name IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [r["name"] for r in rows]
|
|
|
|
def all_ideas(self, include_false_positives: bool = False) -> list[dict]:
|
|
if include_false_positives:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM ideas ORDER BY draft_name"
|
|
).fetchall()
|
|
else:
|
|
rows = self.conn.execute(
|
|
"SELECT i.* FROM ideas i "
|
|
"WHERE i.draft_name NOT IN "
|
|
"(SELECT draft_name FROM ratings WHERE false_positive = 1) "
|
|
"ORDER BY i.draft_name"
|
|
).fetchall()
|
|
return [{"title": r["title"], "description": r["description"],
|
|
"type": r["idea_type"], "draft_name": r["draft_name"],
|
|
"novelty_score": r["novelty_score"]} for r in rows]
|
|
|
|
def idea_count(self, include_false_positives: bool = False) -> int:
|
|
if include_false_positives:
|
|
return self.conn.execute("SELECT COUNT(*) FROM ideas").fetchone()[0]
|
|
return self.conn.execute(
|
|
"SELECT COUNT(*) FROM ideas "
|
|
"WHERE draft_name NOT IN "
|
|
"(SELECT draft_name FROM ratings WHERE false_positive = 1)"
|
|
).fetchone()[0]
|
|
|
|
def ideas_with_drafts(self, unscored_only: bool = False, limit: int = 5000) -> list[dict]:
|
|
"""Return ideas joined with draft title, optionally only unscored ones."""
|
|
where = "WHERE i.novelty_score IS NULL" if unscored_only else ""
|
|
rows = self.conn.execute(
|
|
f"""SELECT i.id, i.draft_name, i.title, i.description, i.idea_type,
|
|
i.novelty_score, d.title AS draft_title
|
|
FROM ideas i JOIN drafts d ON i.draft_name = d.name
|
|
{where}
|
|
ORDER BY i.id LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def update_idea_score(self, idea_id: int, score: int) -> None:
|
|
"""Set the novelty_score for a single idea."""
|
|
self.conn.execute(
|
|
"UPDATE ideas SET novelty_score = ? WHERE id = ?",
|
|
(score, idea_id),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def update_idea_scores_bulk(self, scores: dict[int, int]) -> None:
|
|
"""Bulk-update novelty scores. scores maps idea_id -> score."""
|
|
self.conn.executemany(
|
|
"UPDATE ideas SET novelty_score = ? WHERE id = ?",
|
|
[(score, idea_id) for idea_id, score in scores.items()],
|
|
)
|
|
self.conn.commit()
|
|
|
|
def delete_low_score_ideas(self, min_score: int) -> int:
|
|
"""Delete ideas with novelty_score below min_score. Returns count deleted."""
|
|
# Also clean up associated idea embeddings
|
|
self.conn.execute(
|
|
"""DELETE FROM idea_embeddings WHERE idea_id IN
|
|
(SELECT id FROM ideas WHERE novelty_score IS NOT NULL AND novelty_score < ?)""",
|
|
(min_score,),
|
|
)
|
|
cursor = self.conn.execute(
|
|
"DELETE FROM ideas WHERE novelty_score IS NOT NULL AND novelty_score < ?",
|
|
(min_score,),
|
|
)
|
|
self.conn.commit()
|
|
return cursor.rowcount
|
|
|
|
def idea_score_distribution(self) -> dict[int, int]:
|
|
"""Return {score: count} for scored ideas."""
|
|
rows = self.conn.execute(
|
|
"SELECT novelty_score, COUNT(*) as cnt FROM ideas "
|
|
"WHERE novelty_score IS NOT NULL GROUP BY novelty_score ORDER BY novelty_score"
|
|
).fetchall()
|
|
return {r["novelty_score"]: r["cnt"] for r in rows}
|
|
|
|
def ideas_below_score(self, min_score: int) -> list[dict]:
|
|
"""Return ideas with novelty_score below min_score."""
|
|
rows = self.conn.execute(
|
|
"""SELECT i.id, i.draft_name, i.title, i.description, i.novelty_score,
|
|
d.title AS draft_title
|
|
FROM ideas i JOIN drafts d ON i.draft_name = d.name
|
|
WHERE i.novelty_score IS NOT NULL AND i.novelty_score < ?
|
|
ORDER BY i.novelty_score, i.title""",
|
|
(min_score,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# --- Idea Embeddings ---
|
|
|
|
def store_idea_embedding(self, idea_id: int, model: str, vector: np.ndarray) -> None:
|
|
self.conn.execute(
|
|
"""INSERT INTO idea_embeddings (idea_id, model, vector, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(idea_id) DO UPDATE SET
|
|
model=excluded.model, vector=excluded.vector, created_at=excluded.created_at
|
|
""",
|
|
(idea_id, model, vector.astype(np.float32).tobytes(),
|
|
datetime.now(timezone.utc).isoformat()),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def all_idea_embeddings(self) -> dict[int, np.ndarray]:
|
|
rows = self.conn.execute("SELECT idea_id, vector FROM idea_embeddings").fetchall()
|
|
return {
|
|
r["idea_id"]: np.frombuffer(r["vector"], dtype=np.float32)
|
|
for r in rows
|
|
}
|
|
|
|
def ideas_without_embeddings(self, limit: int = 500) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"""SELECT i.id, i.title, i.description, i.idea_type, i.draft_name
|
|
FROM ideas i
|
|
LEFT JOIN idea_embeddings ie ON i.id = ie.idea_id
|
|
WHERE ie.idea_id IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [{"id": r["id"], "title": r["title"], "description": r["description"],
|
|
"type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows]
|
|
|
|
# --- Gaps ---
|
|
|
|
def insert_gaps(self, gaps: list[dict]) -> None:
|
|
self.conn.execute("DELETE FROM gaps") # Replace old analysis
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
for g in gaps:
|
|
self.conn.execute(
|
|
"""INSERT INTO gaps (topic, description, category, evidence, severity, analyzed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(g["topic"], g["description"], g.get("category", ""),
|
|
g.get("evidence", ""), g.get("severity", "medium"), now),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def all_gaps(self) -> list[dict]:
|
|
rows = self.conn.execute("SELECT * FROM gaps ORDER BY id").fetchall()
|
|
return [{"id": r["id"], "topic": r["topic"], "description": r["description"],
|
|
"category": r["category"], "evidence": r["evidence"],
|
|
"severity": r["severity"]} for r in rows]
|
|
|
|
# --- Proposals ---
|
|
|
|
def all_proposals(self) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM proposals ORDER BY updated_at DESC"
|
|
).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
p = dict(r)
|
|
gap_rows = self.conn.execute(
|
|
"SELECT gap_id FROM proposal_gaps WHERE proposal_id = ?", (r["id"],)
|
|
).fetchall()
|
|
p["gap_ids"] = [gr["gap_id"] for gr in gap_rows]
|
|
result.append(p)
|
|
return result
|
|
|
|
def get_proposal(self, proposal_id: int) -> dict | None:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM proposals WHERE id = ?", (proposal_id,)
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
p = dict(row)
|
|
gap_rows = self.conn.execute(
|
|
"SELECT gap_id FROM proposal_gaps WHERE proposal_id = ?", (proposal_id,)
|
|
).fetchall()
|
|
p["gap_ids"] = [gr["gap_id"] for gr in gap_rows]
|
|
return p
|
|
|
|
def get_proposal_by_slug(self, slug: str) -> dict | None:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM proposals WHERE slug = ?", (slug,)
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
p = dict(row)
|
|
gap_rows = self.conn.execute(
|
|
"SELECT gap_id FROM proposal_gaps WHERE proposal_id = ?", (p["id"],)
|
|
).fetchall()
|
|
p["gap_ids"] = [gr["gap_id"] for gr in gap_rows]
|
|
return p
|
|
|
|
def upsert_proposal(self, proposal: dict) -> int:
|
|
"""Insert or update a proposal. Returns the proposal ID."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
if proposal.get("id"):
|
|
self.conn.execute(
|
|
"""UPDATE proposals SET title=?, slug=?, status=?, description=?,
|
|
content_md=?, source_paper=?, source_url=?, intended_wg=?,
|
|
draft_name=?, updated_at=?
|
|
WHERE id=?""",
|
|
(proposal["title"], proposal["slug"], proposal.get("status", "idea"),
|
|
proposal.get("description", ""), proposal.get("content_md", ""),
|
|
proposal.get("source_paper", ""), proposal.get("source_url", ""),
|
|
proposal.get("intended_wg", ""), proposal.get("draft_name", ""),
|
|
now, proposal["id"]),
|
|
)
|
|
pid = proposal["id"]
|
|
else:
|
|
cur = self.conn.execute(
|
|
"""INSERT INTO proposals (title, slug, status, description, content_md,
|
|
source_paper, source_url, intended_wg, draft_name, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(proposal["title"], proposal["slug"], proposal.get("status", "idea"),
|
|
proposal.get("description", ""), proposal.get("content_md", ""),
|
|
proposal.get("source_paper", ""), proposal.get("source_url", ""),
|
|
proposal.get("intended_wg", ""), proposal.get("draft_name", ""),
|
|
now, now),
|
|
)
|
|
pid = cur.lastrowid
|
|
# Update gap links
|
|
self.conn.execute("DELETE FROM proposal_gaps WHERE proposal_id = ?", (pid,))
|
|
for gid in proposal.get("gap_ids", []):
|
|
self.conn.execute(
|
|
"INSERT OR IGNORE INTO proposal_gaps (proposal_id, gap_id) VALUES (?, ?)",
|
|
(pid, gid),
|
|
)
|
|
self.conn.commit()
|
|
return pid
|
|
|
|
def delete_proposal(self, proposal_id: int) -> bool:
|
|
self.conn.execute("DELETE FROM proposal_gaps WHERE proposal_id = ?", (proposal_id,))
|
|
cur = self.conn.execute("DELETE FROM proposals WHERE id = ?", (proposal_id,))
|
|
self.conn.commit()
|
|
return cur.rowcount > 0
|
|
|
|
def get_proposals_for_gap(self, gap_id: int) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"""SELECT p.* FROM proposals p
|
|
JOIN proposal_gaps pg ON p.id = pg.proposal_id
|
|
WHERE pg.gap_id = ?
|
|
ORDER BY p.updated_at DESC""",
|
|
(gap_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# --- Refs ---
|
|
|
|
def insert_refs(self, draft_name: str, refs: list[tuple[str, str]]) -> None:
|
|
"""Insert cross-references for a draft. refs = [(ref_type, ref_id), ...]."""
|
|
for ref_type, ref_id in refs:
|
|
self.conn.execute(
|
|
"""INSERT OR IGNORE INTO draft_refs (draft_name, ref_type, ref_id)
|
|
VALUES (?, ?, ?)""",
|
|
(draft_name, ref_type, ref_id),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_refs_for_draft(self, draft_name: str) -> list[tuple[str, str]]:
|
|
"""Return [(ref_type, ref_id)] for a draft."""
|
|
rows = self.conn.execute(
|
|
"SELECT ref_type, ref_id FROM draft_refs WHERE draft_name = ?",
|
|
(draft_name,),
|
|
).fetchall()
|
|
return [(r["ref_type"], r["ref_id"]) for r in rows]
|
|
|
|
def top_referenced(self, ref_type: str = "rfc", limit: int = 30) -> list[tuple[str, int, list[str]]]:
|
|
"""Return (ref_id, count, [draft_names]) for most-referenced items."""
|
|
rows = self.conn.execute(
|
|
"""SELECT ref_id, COUNT(*) as cnt,
|
|
GROUP_CONCAT(draft_name, '||') as drafts
|
|
FROM draft_refs
|
|
WHERE ref_type = ?
|
|
GROUP BY ref_id
|
|
ORDER BY cnt DESC
|
|
LIMIT ?""",
|
|
(ref_type, limit),
|
|
).fetchall()
|
|
return [
|
|
(r["ref_id"], r["cnt"], r["drafts"].split("||") if r["drafts"] else [])
|
|
for r in rows
|
|
]
|
|
|
|
def drafts_referencing(self, ref_type: str, ref_id: str) -> list[str]:
|
|
"""Return draft names that reference a specific RFC/draft/BCP."""
|
|
rows = self.conn.execute(
|
|
"SELECT draft_name FROM draft_refs WHERE ref_type = ? AND ref_id = ?",
|
|
(ref_type, ref_id),
|
|
).fetchall()
|
|
return [r["draft_name"] for r in rows]
|
|
|
|
def ref_counts_by_draft(self) -> list[tuple[str, int, int, int]]:
|
|
"""Return (draft_name, rfc_count, draft_count, bcp_count) for all drafts with refs."""
|
|
rows = self.conn.execute(
|
|
"""SELECT draft_name,
|
|
SUM(CASE WHEN ref_type = 'rfc' THEN 1 ELSE 0 END) as rfcs,
|
|
SUM(CASE WHEN ref_type = 'draft' THEN 1 ELSE 0 END) as drafts,
|
|
SUM(CASE WHEN ref_type = 'bcp' THEN 1 ELSE 0 END) as bcps
|
|
FROM draft_refs
|
|
GROUP BY draft_name
|
|
ORDER BY rfcs DESC"""
|
|
).fetchall()
|
|
return [(r["draft_name"], r["rfcs"], r["drafts"], r["bcps"]) for r in rows]
|
|
|
|
def drafts_without_refs(self, limit: int = 500) -> list[str]:
|
|
"""Return draft names that have full_text but no refs extracted yet."""
|
|
rows = self.conn.execute(
|
|
"""SELECT d.name FROM drafts d
|
|
LEFT JOIN draft_refs dr ON d.name = dr.draft_name
|
|
WHERE d.full_text IS NOT NULL AND dr.draft_name IS NULL
|
|
LIMIT ?""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [r["name"] for r in rows]
|
|
|
|
def ref_stats(self) -> dict:
|
|
"""Return summary stats for refs table."""
|
|
row = self.conn.execute(
|
|
"""SELECT COUNT(DISTINCT draft_name) as drafts_with_refs,
|
|
COUNT(*) as total_refs,
|
|
SUM(CASE WHEN ref_type = 'rfc' THEN 1 ELSE 0 END) as rfc_refs,
|
|
SUM(CASE WHEN ref_type = 'draft' THEN 1 ELSE 0 END) as draft_refs,
|
|
SUM(CASE WHEN ref_type = 'bcp' THEN 1 ELSE 0 END) as bcp_refs,
|
|
COUNT(DISTINCT ref_id) as unique_refs
|
|
FROM draft_refs"""
|
|
).fetchone()
|
|
return dict(row)
|
|
|
|
# --- Generated Drafts ---
|
|
|
|
def upsert_generated_draft(self, data: dict) -> int:
|
|
"""Insert or update a generated draft. Returns row id."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
existing = self.conn.execute(
|
|
"SELECT id FROM generated_drafts WHERE draft_name = ? AND version = ?",
|
|
(data["draft_name"], data.get("version", 0)),
|
|
).fetchone()
|
|
if existing:
|
|
self.conn.execute(
|
|
"""UPDATE generated_drafts SET
|
|
gap_topic=?, title=?, abstract=?, outline_json=?,
|
|
sections_json=?, full_text=?, family_name=?, family_role=?,
|
|
rating_json=?, novelty_score=?, quality_score=?, status=?
|
|
WHERE id=?""",
|
|
(data["gap_topic"], data["title"], data.get("abstract", ""),
|
|
json.dumps(data.get("outline", {})), json.dumps(data.get("sections", [])),
|
|
data.get("full_text"), data.get("family_name", ""),
|
|
data.get("family_role", ""), json.dumps(data.get("rating", {})),
|
|
data.get("novelty_score", 0.0), data.get("quality_score", 0.0),
|
|
data.get("status", "draft"), existing["id"]),
|
|
)
|
|
self.conn.commit()
|
|
return existing["id"]
|
|
else:
|
|
cur = self.conn.execute(
|
|
"""INSERT INTO generated_drafts
|
|
(gap_topic, draft_name, title, abstract, outline_json, sections_json,
|
|
full_text, family_name, family_role, version, rating_json,
|
|
novelty_score, quality_score, status, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(data["gap_topic"], data["draft_name"], data["title"],
|
|
data.get("abstract", ""), json.dumps(data.get("outline", {})),
|
|
json.dumps(data.get("sections", [])), data.get("full_text"),
|
|
data.get("family_name", ""), data.get("family_role", ""),
|
|
data.get("version", 0), json.dumps(data.get("rating", {})),
|
|
data.get("novelty_score", 0.0), data.get("quality_score", 0.0),
|
|
data.get("status", "draft"), now),
|
|
)
|
|
self.conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def get_generated_drafts(self, status: str | None = None) -> list[dict]:
|
|
query = "SELECT * FROM generated_drafts"
|
|
params: list = []
|
|
if status:
|
|
query += " WHERE status = ?"
|
|
params.append(status)
|
|
query += " ORDER BY created_at DESC"
|
|
rows = self.conn.execute(query, params).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_generated_draft(self, draft_id: int) -> dict | None:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM generated_drafts WHERE id = ?", (draft_id,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def get_family_drafts(self, family_name: str) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM generated_drafts WHERE family_name = ? ORDER BY family_role",
|
|
(family_name,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def log_generation_run(self, data: dict) -> int:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
cur = self.conn.execute(
|
|
"""INSERT INTO generation_runs
|
|
(family_name, gap_ids, total_input_tokens, total_output_tokens,
|
|
model_used, status, started_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(data.get("family_name", ""), json.dumps(data.get("gap_ids", [])),
|
|
data.get("total_input_tokens", 0), data.get("total_output_tokens", 0),
|
|
data.get("model_used", ""), data.get("status", "running"), now),
|
|
)
|
|
self.conn.commit()
|
|
return cur.lastrowid
|
|
|
|
_GENERATION_RUN_COLUMNS = frozenset({
|
|
"family_name", "gap_ids", "total_input_tokens", "total_output_tokens",
|
|
"model_used", "status", "started_at", "completed_at",
|
|
})
|
|
|
|
def update_generation_run(self, run_id: int, **kwargs) -> None:
|
|
sets = []
|
|
params = []
|
|
for k, v in kwargs.items():
|
|
if k not in self._GENERATION_RUN_COLUMNS:
|
|
raise ValueError(f"Invalid column for generation_runs: {k!r}")
|
|
sets.append(f"{k} = ?")
|
|
params.append(v)
|
|
if not sets:
|
|
return
|
|
params.append(run_id)
|
|
self.conn.execute(
|
|
f"UPDATE generation_runs SET {', '.join(sets)} WHERE id = ?", params
|
|
)
|
|
self.conn.commit()
|
|
|
|
# --- Observatory ---
|
|
|
|
def upsert_source(self, name: str, doc_count: int = 0) -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
self.conn.execute(
|
|
"""INSERT INTO sources (name, last_fetch, doc_count)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(name) DO UPDATE SET last_fetch=excluded.last_fetch, doc_count=excluded.doc_count""",
|
|
(name, now, doc_count),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_source(self, name: str) -> dict | None:
|
|
row = self.conn.execute("SELECT * FROM sources WHERE name = ?", (name,)).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def all_sources(self) -> list[dict]:
|
|
rows = self.conn.execute("SELECT * FROM sources ORDER BY name").fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def create_snapshot(self) -> int:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
total = self.count_drafts()
|
|
# Count new since last snapshot
|
|
last = self.conn.execute(
|
|
"SELECT snapshot_at FROM observatory_snapshots ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
new_count = 0
|
|
if last:
|
|
new_count = self.conn.execute(
|
|
"SELECT COUNT(*) FROM drafts WHERE fetched_at > ?", (last["snapshot_at"],)
|
|
).fetchone()[0]
|
|
else:
|
|
new_count = total
|
|
cur = self.conn.execute(
|
|
"""INSERT INTO observatory_snapshots (snapshot_at, total_docs, new_since_last, changed_gaps)
|
|
VALUES (?, ?, ?, 0)""",
|
|
(now, total, new_count),
|
|
)
|
|
self.conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def record_gap_history(self, snapshot_id: int, gaps: list[dict]) -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
for g in gaps:
|
|
self.conn.execute(
|
|
"""INSERT INTO gap_history (snapshot_id, gap_topic, gap_description, severity, status, recorded_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)""",
|
|
(snapshot_id, g["topic"], g["description"],
|
|
g.get("severity", "medium"), g.get("status", "open"), now),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def gap_history_timeline(self) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"""SELECT gh.*, os.snapshot_at FROM gap_history gh
|
|
JOIN observatory_snapshots os ON gh.snapshot_id = os.id
|
|
ORDER BY os.snapshot_at, gh.gap_topic"""
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_snapshots(self, limit: int = 20) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM observatory_snapshots ORDER BY id DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def drafts_by_source(self, source: str, limit: int = 500) -> list[Draft]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM drafts WHERE source = ? ORDER BY time DESC LIMIT ?",
|
|
(source, limit),
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
# --- WG/Status ---
|
|
|
|
def draft_adoption_status(self) -> list[dict]:
|
|
"""Return adoption status for all drafts based on naming convention.
|
|
|
|
Returns list of dicts: {name, title, time, wg_adopted, wg_name, stream}
|
|
"""
|
|
import re
|
|
rows = self.conn.execute(
|
|
'SELECT name, title, time FROM drafts'
|
|
).fetchall()
|
|
results = []
|
|
for r in rows:
|
|
name = r["name"]
|
|
wg_adopted = False
|
|
wg_name = ""
|
|
stream = "individual"
|
|
|
|
# Primary signal: draft-ietf-{wg}-* naming convention
|
|
m = re.match(r'^draft-ietf-(\w+)-', name)
|
|
if m:
|
|
wg_adopted = True
|
|
wg_name = m.group(1)
|
|
stream = "ietf"
|
|
elif name.startswith("draft-irtf-"):
|
|
m2 = re.match(r'^draft-irtf-(\w+)-', name)
|
|
wg_name = m2.group(1) if m2 else ""
|
|
stream = "irtf"
|
|
|
|
results.append({
|
|
"name": name,
|
|
"title": r["title"],
|
|
"time": r["time"],
|
|
"wg_adopted": wg_adopted,
|
|
"wg_name": wg_name,
|
|
"stream": stream,
|
|
})
|
|
return results
|
|
|
|
def revision_velocity(self) -> list[dict]:
|
|
"""Return revision data for all drafts.
|
|
|
|
Returns list of dicts: {name, title, time, rev, rev_int}
|
|
"""
|
|
rows = self.conn.execute(
|
|
"SELECT name, title, time, rev FROM drafts"
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"name": r["name"],
|
|
"title": r["title"],
|
|
"time": r["time"],
|
|
"rev": r["rev"],
|
|
"rev_int": int(r["rev"]) if r["rev"].isdigit() else 0,
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
# --- Working Groups ---
|
|
|
|
def wg_summary(self) -> list[dict]:
|
|
"""Return per-WG summary: group, draft_count, avg scores, categories, idea_count.
|
|
|
|
Excludes 'none' (individual submissions) — those are returned separately.
|
|
"""
|
|
rows = self.conn.execute("""
|
|
SELECT d."group" as wg, COUNT(*) as draft_count,
|
|
AVG(r.novelty) as avg_novelty, AVG(r.maturity) as avg_maturity,
|
|
AVG(r.overlap) as avg_overlap, AVG(r.momentum) as avg_momentum,
|
|
AVG(r.relevance) as avg_relevance,
|
|
(SELECT COUNT(*) FROM ideas i WHERE i.draft_name IN
|
|
(SELECT name FROM drafts WHERE "group" = d."group")) as idea_count
|
|
FROM drafts d
|
|
LEFT JOIN ratings r ON d.name = r.draft_name
|
|
WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none'
|
|
GROUP BY d."group"
|
|
ORDER BY draft_count DESC
|
|
""").fetchall()
|
|
|
|
# Build categories per WG from a separate query
|
|
cat_rows = self.conn.execute("""
|
|
SELECT d."group" as wg, r.categories
|
|
FROM drafts d JOIN ratings r ON d.name = r.draft_name
|
|
WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none'
|
|
""").fetchall()
|
|
wg_cats: dict[str, dict[str, int]] = {}
|
|
for cr in cat_rows:
|
|
wg = cr["wg"]
|
|
if wg not in wg_cats:
|
|
wg_cats[wg] = {}
|
|
try:
|
|
for c in json.loads(cr["categories"]):
|
|
c = normalize_category(c)
|
|
wg_cats[wg][c] = wg_cats[wg].get(c, 0) + 1
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
results = []
|
|
for r in rows:
|
|
results.append({
|
|
"wg": r["wg"],
|
|
"draft_count": r["draft_count"],
|
|
"avg_novelty": round(r["avg_novelty"] or 0, 1),
|
|
"avg_maturity": round(r["avg_maturity"] or 0, 1),
|
|
"avg_overlap": round(r["avg_overlap"] or 0, 1),
|
|
"avg_momentum": round(r["avg_momentum"] or 0, 1),
|
|
"avg_relevance": round(r["avg_relevance"] or 0, 1),
|
|
"categories": wg_cats.get(r["wg"], {}),
|
|
"idea_count": r["idea_count"],
|
|
})
|
|
return results
|
|
|
|
def wg_drafts(self, wg: str) -> list[Draft]:
|
|
"""Return all drafts for a specific working group."""
|
|
rows = self.conn.execute(
|
|
'SELECT * FROM drafts WHERE "group" = ? ORDER BY time DESC', (wg,)
|
|
).fetchall()
|
|
return [self._row_to_draft(r) for r in rows]
|
|
|
|
def wg_category_matrix(self) -> dict[str, dict[str, int]]:
|
|
"""Return {wg: {category: count}} matrix for all WGs (excluding 'none')."""
|
|
rows = self.conn.execute("""
|
|
SELECT d."group" as wg, r.categories
|
|
FROM drafts d
|
|
JOIN ratings r ON d.name = r.draft_name
|
|
WHERE d."group" IS NOT NULL AND d."group" != '' AND d."group" != 'none'
|
|
""").fetchall()
|
|
matrix: dict[str, dict[str, int]] = {}
|
|
for r in rows:
|
|
wg = r["wg"]
|
|
if wg not in matrix:
|
|
matrix[wg] = {}
|
|
try:
|
|
for c in json.loads(r["categories"]):
|
|
c = normalize_category(c)
|
|
matrix[wg][c] = matrix[wg].get(c, 0) + 1
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return matrix
|
|
|
|
def wg_idea_overlap(self) -> list[dict]:
|
|
"""Find ideas that appear across multiple WGs — signals for alignment.
|
|
|
|
Returns list of {idea_title, wgs: [{wg, draft_name, draft_title}], wg_count}.
|
|
"""
|
|
rows = self.conn.execute("""
|
|
SELECT i.title as idea_title, i.description, d."group" as wg,
|
|
d.name as draft_name, d.title as draft_title
|
|
FROM ideas i
|
|
JOIN drafts d ON i.draft_name = d.name
|
|
WHERE d."group" IS NOT NULL AND d."group" != ''
|
|
ORDER BY i.title, d."group"
|
|
""").fetchall()
|
|
|
|
# Group by idea title
|
|
from collections import defaultdict
|
|
idea_groups: dict[str, list[dict]] = defaultdict(list)
|
|
for r in rows:
|
|
idea_groups[r["idea_title"]].append({
|
|
"wg": r["wg"],
|
|
"draft_name": r["draft_name"],
|
|
"draft_title": r["draft_title"],
|
|
})
|
|
|
|
# Only keep ideas spanning 2+ distinct WGs
|
|
results = []
|
|
for title, entries in idea_groups.items():
|
|
wgs = set(e["wg"] for e in entries)
|
|
if len(wgs) >= 2:
|
|
results.append({
|
|
"idea_title": title,
|
|
"wgs": entries,
|
|
"wg_count": len(wgs),
|
|
"wg_names": sorted(wgs),
|
|
})
|
|
return sorted(results, key=lambda x: x["wg_count"], reverse=True)
|
|
|
|
def individual_vs_wg_categories(self) -> dict[str, dict[str, int]]:
|
|
"""Compare category distribution: individual submissions vs WG-adopted.
|
|
|
|
Returns {"individual": {cat: count}, "wg_adopted": {cat: count}}.
|
|
"""
|
|
rows = self.conn.execute("""
|
|
SELECT CASE WHEN d."group" = 'none' OR d."group" IS NULL THEN 'individual'
|
|
ELSE 'wg_adopted' END as stream,
|
|
r.categories
|
|
FROM drafts d
|
|
JOIN ratings r ON d.name = r.draft_name
|
|
""").fetchall()
|
|
result: dict[str, dict[str, int]] = {"individual": {}, "wg_adopted": {}}
|
|
for r in rows:
|
|
stream = r["stream"]
|
|
try:
|
|
for c in json.loads(r["categories"]):
|
|
c = normalize_category(c)
|
|
result[stream][c] = result[stream].get(c, 0) + 1
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return result
|
|
|
|
def category_wg_spread(self) -> list[dict]:
|
|
"""For each category, which WGs contribute drafts? High spread = alignment opportunity.
|
|
|
|
Returns [{category, wgs: [{wg, count}], wg_count, total_drafts}].
|
|
"""
|
|
rows = self.conn.execute("""
|
|
SELECT d."group" as wg, r.categories
|
|
FROM drafts d
|
|
JOIN ratings r ON d.name = r.draft_name
|
|
WHERE d."group" IS NOT NULL AND d."group" != ''
|
|
""").fetchall()
|
|
|
|
from collections import defaultdict
|
|
cat_wgs: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
|
|
for r in rows:
|
|
wg = r["wg"]
|
|
try:
|
|
for c in json.loads(r["categories"]):
|
|
c = normalize_category(c)
|
|
cat_wgs[c][wg] += 1
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
results = []
|
|
for cat, wg_counts in cat_wgs.items():
|
|
wg_list = sorted(wg_counts.items(), key=lambda x: x[1], reverse=True)
|
|
results.append({
|
|
"category": cat,
|
|
"wgs": [{"wg": wg, "count": cnt} for wg, cnt in wg_list],
|
|
"wg_count": len(wg_list),
|
|
"total_drafts": sum(wg_counts.values()),
|
|
})
|
|
return sorted(results, key=lambda x: x["wg_count"], reverse=True)
|
|
|
|
# --- Monitor Runs ---
|
|
|
|
def start_monitor_run(self) -> int:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
cur = self.conn.execute(
|
|
"INSERT INTO monitor_runs (started_at, status) VALUES (?, 'running')",
|
|
(now,),
|
|
)
|
|
self.conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def complete_monitor_run(self, run_id: int, stats: dict) -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
started = self.conn.execute(
|
|
"SELECT started_at FROM monitor_runs WHERE id = ?", (run_id,)
|
|
).fetchone()
|
|
duration = 0.0
|
|
if started:
|
|
try:
|
|
start_dt = datetime.fromisoformat(started["started_at"])
|
|
duration = (datetime.now(timezone.utc) - start_dt).total_seconds()
|
|
except (ValueError, TypeError):
|
|
pass
|
|
self.conn.execute(
|
|
"""UPDATE monitor_runs SET
|
|
status='completed', completed_at=?,
|
|
new_drafts_found=?, drafts_analyzed=?,
|
|
drafts_embedded=?, ideas_extracted=?,
|
|
duration_seconds=?
|
|
WHERE id=?""",
|
|
(now, stats.get("new_drafts_found", 0), stats.get("drafts_analyzed", 0),
|
|
stats.get("drafts_embedded", 0), stats.get("ideas_extracted", 0),
|
|
duration, run_id),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def fail_monitor_run(self, run_id: int, error: str) -> None:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
started = self.conn.execute(
|
|
"SELECT started_at FROM monitor_runs WHERE id = ?", (run_id,)
|
|
).fetchone()
|
|
duration = 0.0
|
|
if started:
|
|
try:
|
|
start_dt = datetime.fromisoformat(started["started_at"])
|
|
duration = (datetime.now(timezone.utc) - start_dt).total_seconds()
|
|
except (ValueError, TypeError):
|
|
pass
|
|
self.conn.execute(
|
|
"""UPDATE monitor_runs SET
|
|
status='failed', completed_at=?, error_message=?, duration_seconds=?
|
|
WHERE id=?""",
|
|
(now, error, duration, run_id),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_monitor_runs(self, limit: int = 20) -> list[dict]:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM monitor_runs ORDER BY started_at DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_last_successful_run(self) -> dict | None:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM monitor_runs WHERE status='completed' ORDER BY started_at DESC LIMIT 1"
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
# --- Annotations ---
|
|
|
|
def upsert_annotation(self, draft_name: str, note: str | None = None, tags: list[str] | None = None) -> None:
|
|
"""Insert or update an annotation for a draft."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
existing = self.conn.execute(
|
|
"SELECT id, note, tags FROM annotations WHERE draft_name = ?",
|
|
(draft_name,),
|
|
).fetchone()
|
|
if existing:
|
|
current_note = note if note is not None else existing["note"]
|
|
current_tags = tags if tags is not None else json.loads(existing["tags"] or "[]")
|
|
self.conn.execute(
|
|
"UPDATE annotations SET note = ?, tags = ?, updated_at = ? WHERE draft_name = ?",
|
|
(current_note, json.dumps(current_tags), now, draft_name),
|
|
)
|
|
else:
|
|
self.conn.execute(
|
|
"""INSERT INTO annotations (draft_name, note, tags, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(draft_name, note or "", json.dumps(tags or []), now, now),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_annotation(self, draft_name: str) -> dict | None:
|
|
"""Return annotation for a draft, or None."""
|
|
row = self.conn.execute(
|
|
"SELECT * FROM annotations WHERE draft_name = ?", (draft_name,)
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"id": row["id"],
|
|
"draft_name": row["draft_name"],
|
|
"note": row["note"],
|
|
"tags": json.loads(row["tags"] or "[]"),
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
|
|
def get_all_annotations(self) -> list[dict]:
|
|
"""Return all annotations."""
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM annotations ORDER BY updated_at DESC"
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"draft_name": r["draft_name"],
|
|
"note": r["note"],
|
|
"tags": json.loads(r["tags"] or "[]"),
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
def search_by_tag(self, tag: str) -> list[str]:
|
|
"""Return draft names that have a specific tag in their annotation."""
|
|
rows = self.conn.execute(
|
|
"SELECT draft_name, tags FROM annotations"
|
|
).fetchall()
|
|
results = []
|
|
for r in rows:
|
|
tags = json.loads(r["tags"] or "[]")
|
|
if tag in tags:
|
|
results.append(r["draft_name"])
|
|
return results
|
|
|
|
# --- Helpers ---
|
|
|
|
@staticmethod
|
|
def _row_to_draft(row: sqlite3.Row) -> Draft:
|
|
d = dict(row)
|
|
return Draft(
|
|
name=d["name"], rev=d["rev"], title=d["title"], abstract=d["abstract"],
|
|
time=d["time"], dt_id=d.get("dt_id"), pages=d.get("pages"),
|
|
words=d.get("words"), group=d.get("group"), group_uri=d.get("group_uri"),
|
|
expires=d.get("expires"), ad=d.get("ad"), shepherd=d.get("shepherd"),
|
|
states=json.loads(d.get("states") or "[]"),
|
|
full_text=d.get("full_text"),
|
|
categories=json.loads(d.get("categories") or "[]"),
|
|
tags=json.loads(d.get("tags") or "[]"),
|
|
fetched_at=d.get("fetched_at"),
|
|
source=d.get("source", "ietf"),
|
|
source_id=d.get("source_id", ""),
|
|
source_url=d.get("source_url", ""),
|
|
doc_status=d.get("doc_status", ""),
|
|
)
|
|
|
|
@staticmethod
|
|
def _row_to_rating(row: sqlite3.Row) -> Rating:
|
|
d = dict(row)
|
|
raw_cats = json.loads(d.get("categories") or "[]")
|
|
return Rating(
|
|
draft_name=d["draft_name"], novelty=d["novelty"], maturity=d["maturity"],
|
|
overlap=d["overlap"], momentum=d["momentum"], relevance=d["relevance"],
|
|
summary=d["summary"],
|
|
novelty_note=d.get("novelty_note", ""),
|
|
maturity_note=d.get("maturity_note", ""),
|
|
overlap_note=d.get("overlap_note", ""),
|
|
momentum_note=d.get("momentum_note", ""),
|
|
relevance_note=d.get("relevance_note", ""),
|
|
categories=[normalize_category(c) for c in raw_cats],
|
|
rated_at=d.get("rated_at"),
|
|
)
|