Files
ietf-draft-analyzer/src/webui/data.py
Christian Nennemann 02049c37a8 Show related drafts in author cluster cards
Each cluster card now shows draft count badge and up to 5 linked draft
titles with clickable links to draft detail pages. Data collected from
member nodes' draft lists during cluster detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 21:23:13 +01:00

1232 lines
43 KiB
Python

"""Data access layer for the web dashboard.
Thin wrapper around ietf_analyzer.db.Database that returns plain dicts
ready for JSON serialization or Jinja2 template rendering.
"""
from __future__ import annotations
import json
import sys
from collections import Counter, defaultdict
from pathlib import Path
# Add project root to path so we can import ietf_analyzer
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root / "src"))
from ietf_analyzer.config import Config
from ietf_analyzer.db import Database
def get_db() -> Database:
"""Get a Database instance using default config."""
config = Config.load()
return Database(config)
def get_overview_stats(db: Database) -> dict:
"""Return high-level stats for the dashboard home page."""
total_drafts = db.count_drafts()
rated_pairs = db.drafts_with_ratings(limit=1000)
rated_count = len(rated_pairs)
author_count = db.author_count()
idea_count = db.idea_count()
gaps = db.all_gaps()
input_tok, output_tok = db.total_tokens_used()
return {
"total_drafts": total_drafts,
"rated_count": rated_count,
"author_count": author_count,
"idea_count": idea_count,
"gap_count": len(gaps),
"input_tokens": input_tok,
"output_tokens": output_tok,
}
def get_category_counts(db: Database) -> dict[str, int]:
"""Return {category: draft_count} for all categories."""
pairs = db.drafts_with_ratings(limit=1000)
counts: dict[str, int] = Counter()
for _, rating in pairs:
for cat in rating.categories:
counts[cat] += 1
return dict(counts.most_common())
def get_drafts_page(
db: Database,
page: int = 1,
per_page: int = 50,
search: str = "",
category: str = "",
min_score: float = 0.0,
sort: str = "score",
sort_dir: str = "desc",
source: str = "",
) -> dict:
"""Return a paginated, filtered list of drafts with ratings.
Returns dict with keys: drafts, total, page, per_page, pages.
"""
pairs = db.drafts_with_ratings(limit=1000)
# Build author lookup for search (draft_name -> "author1 author2 ...")
author_text_by_draft: dict[str, str] = {}
if search:
rows = db.conn.execute(
"""SELECT da.draft_name, GROUP_CONCAT(a.name, ' ') as names
FROM draft_authors da JOIN authors a ON da.person_id = a.person_id
GROUP BY da.draft_name"""
).fetchall()
for r in rows:
author_text_by_draft[r[0]] = r[1] or ""
# Filter
filtered = []
for draft, rating in pairs:
if min_score > 0 and rating.composite_score < min_score:
continue
if category and category not in rating.categories:
continue
if source and draft.source != source:
continue
if search:
author_names = author_text_by_draft.get(draft.name, "")
haystack = f"{draft.name} {draft.title} {rating.summary} {author_names}".lower()
if not all(w in haystack for w in search.lower().split()):
continue
filtered.append((draft, rating))
# Sort
sort_keys = {
"score": lambda p: p[1].composite_score,
"name": lambda p: p[0].name,
"date": lambda p: p[0].time or "",
"novelty": lambda p: p[1].novelty,
"maturity": lambda p: p[1].maturity,
"relevance": lambda p: p[1].relevance,
"overlap": lambda p: p[1].overlap,
"momentum": lambda p: p[1].momentum,
"readiness": lambda p: (1.0 if p[0].name.startswith("draft-ietf-") else 0.0) * 0.25 +
min(int(p[0].rev or "0") / 5.0, 1.0) * 0.15 +
((p[1].momentum - 1) / 4.0) * 0.15,
}
key_fn = sort_keys.get(sort, sort_keys["score"])
reverse = sort_dir == "desc"
filtered.sort(key=key_fn, reverse=reverse)
total = len(filtered)
pages = max(1, (total + per_page - 1) // per_page)
page = max(1, min(page, pages))
start = (page - 1) * per_page
page_items = filtered[start : start + per_page]
# Pre-compute readiness for page items (lightweight version)
from ietf_analyzer.readiness import compute_readiness
readiness_cache = {}
for draft, rating in page_items:
readiness_cache[draft.name] = compute_readiness(db, draft.name)
drafts = []
for draft, rating in page_items:
r_score = readiness_cache.get(draft.name, {}).get("score", 0)
drafts.append({
"name": draft.name,
"title": draft.title,
"date": draft.date,
"url": draft.source_url if draft.source != "ietf" else draft.datatracker_url,
"pages": draft.pages or 0,
"group": draft.group or "individual",
"source": draft.source or "ietf",
"score": round(rating.composite_score, 2),
"novelty": rating.novelty,
"maturity": rating.maturity,
"overlap": rating.overlap,
"momentum": rating.momentum,
"relevance": rating.relevance,
"categories": rating.categories,
"summary": rating.summary,
"readiness": r_score,
})
return {
"drafts": drafts,
"total": total,
"page": page,
"per_page": per_page,
"pages": pages,
}
def get_draft_detail(db: Database, name: str) -> dict | None:
"""Return full detail for a single draft."""
draft = db.get_draft(name)
if not draft:
return None
rating = db.get_rating(name)
authors = db.get_authors_for_draft(name)
ideas = db.get_ideas_for_draft(name)
refs = db.get_refs_for_draft(name)
result = {
"name": draft.name,
"title": draft.title,
"rev": draft.rev,
"abstract": draft.abstract,
"date": draft.date,
"time": draft.time,
"url": draft.datatracker_url,
"text_url": draft.text_url,
"pages": draft.pages,
"words": draft.words,
"group": draft.group or "individual",
"categories": draft.categories,
"tags": draft.tags,
"authors": [
{"name": a.name, "affiliation": a.affiliation, "person_id": a.person_id}
for a in authors
],
"ideas": ideas,
"refs": [{"type": t, "id": rid} for t, rid in refs],
}
if rating:
result["rating"] = {
"score": round(rating.composite_score, 2),
"novelty": rating.novelty,
"maturity": rating.maturity,
"overlap": rating.overlap,
"momentum": rating.momentum,
"relevance": rating.relevance,
"summary": rating.summary,
"novelty_note": rating.novelty_note,
"maturity_note": rating.maturity_note,
"overlap_note": rating.overlap_note,
"momentum_note": rating.momentum_note,
"relevance_note": rating.relevance_note,
"categories": rating.categories,
}
# Readiness score
from ietf_analyzer.readiness import compute_readiness
result["readiness"] = compute_readiness(db, name)
# Annotation
annotation = db.get_annotation(name)
result["annotation"] = annotation
return result
def get_rating_distributions(db: Database) -> dict:
"""Return arrays for each rating dimension, suitable for Plotly."""
pairs = db.drafts_with_ratings(limit=1000)
dims = {
"novelty": [],
"maturity": [],
"overlap": [],
"momentum": [],
"relevance": [],
"scores": [],
"categories": [],
"names": [],
}
for draft, rating in pairs:
dims["novelty"].append(rating.novelty)
dims["maturity"].append(rating.maturity)
dims["overlap"].append(rating.overlap)
dims["momentum"].append(rating.momentum)
dims["relevance"].append(rating.relevance)
dims["scores"].append(round(rating.composite_score, 2))
dims["categories"].append(rating.categories[0] if rating.categories else "Other")
dims["names"].append(draft.name)
return dims
def get_timeline_data(db: Database) -> dict:
"""Return monthly counts by category for timeline chart."""
pairs = db.drafts_with_ratings(limit=1000)
all_drafts = db.list_drafts(limit=1000, order_by="time ASC")
rating_map = {d.name: r for d, r in pairs}
month_cat: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for d in all_drafts:
month = d.time[:7] if d.time else "unknown"
r = rating_map.get(d.name)
if r:
cat = r.categories[0] if r.categories else "Other"
month_cat[month][cat] += 1
months = sorted(month_cat.keys())
cat_totals: Counter = Counter()
for mc in month_cat.values():
for c, cnt in mc.items():
cat_totals[c] += cnt
top_cats = [c for c, _ in cat_totals.most_common(10)]
series = {}
for cat in top_cats:
series[cat] = [month_cat[m].get(cat, 0) for m in months]
return {"months": months, "series": series, "categories": top_cats}
def get_ideas_by_type(db: Database) -> dict:
"""Return ideas grouped by type with counts."""
all_ideas = db.all_ideas()
type_counts = Counter(i.get("type", "other") or "other" for i in all_ideas)
return {
"total": len(all_ideas),
"by_type": dict(type_counts.most_common()),
"ideas": all_ideas,
}
def get_all_gaps(db: Database) -> list[dict]:
"""Return all gap analysis results, sorted by severity (critical first)."""
_sev_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
gaps = db.all_gaps()
gaps.sort(key=lambda g: _sev_order.get(g.get("severity", "low"), 99))
return gaps
def get_gap_detail(db: Database, gap_id: int) -> dict | None:
"""Return a single gap by ID, or None if not found."""
gaps = db.all_gaps()
for g in gaps:
if g["id"] == gap_id:
return g
return None
def get_generated_drafts() -> list[dict]:
"""Return list of pre-generated draft files in data/reports/generated-drafts/."""
drafts_dir = _project_root / "data" / "reports" / "generated-drafts"
if not drafts_dir.exists():
return []
results = []
for f in sorted(drafts_dir.glob("draft-*.txt")):
# Extract title from first non-empty content line after header
title = f.stem
text = f.read_text(errors="replace")
for line in text.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("Internet-Draft") and \
not stripped.startswith("Intended status") and \
not stripped.startswith("Expires:") and stripped != "":
title = stripped
break
results.append({
"filename": f.name,
"stem": f.stem,
"title": title,
"size": f.stat().st_size,
"path": str(f),
})
return results
def read_generated_draft(filename: str) -> str | None:
"""Read a generated draft file by filename. Returns text or None."""
drafts_dir = _project_root / "data" / "reports" / "generated-drafts"
path = drafts_dir / filename
if not path.exists() or not path.is_file():
return None
# Safety: ensure we're not reading outside the directory
if not str(path.resolve()).startswith(str(drafts_dir.resolve())):
return None
return path.read_text(errors="replace")
def get_top_authors(db: Database, limit: int = 30) -> list[dict]:
"""Return top authors by draft count."""
rows = db.top_authors(limit=limit)
return [
{"name": name, "affiliation": aff, "draft_count": cnt, "drafts": drafts}
for name, aff, cnt, drafts in rows
]
def get_org_data(db: Database, limit: int = 20) -> list[dict]:
"""Return organization contribution data."""
rows = db.top_orgs(limit=limit)
return [
{"org": org, "author_count": authors, "draft_count": drafts}
for org, authors, drafts in rows
]
def get_category_radar_data(db: Database) -> dict:
"""Return average rating profiles per category for radar chart."""
pairs = db.drafts_with_ratings(limit=1000)
cat_ratings: dict[str, list] = defaultdict(list)
for _, r in pairs:
for c in r.categories:
cat_ratings[c].append(r)
top_cats = sorted(cat_ratings.keys(), key=lambda c: len(cat_ratings[c]), reverse=True)[:8]
result = {}
for cat in top_cats:
ratings = cat_ratings[cat]
n = len(ratings)
result[cat] = {
"count": n,
"novelty": round(sum(r.novelty for r in ratings) / n, 2),
"maturity": round(sum(r.maturity for r in ratings) / n, 2),
"relevance": round(sum(r.relevance for r in ratings) / n, 2),
"momentum": round(sum(r.momentum for r in ratings) / n, 2),
"low_overlap": round(sum(6 - r.overlap for r in ratings) / n, 2),
}
return result
def get_score_histogram(db: Database) -> list[float]:
"""Return list of composite scores for histogram."""
pairs = db.drafts_with_ratings(limit=1000)
return [round(r.composite_score, 2) for _, r in pairs]
def get_coauthor_network(db: Database, min_shared: int = 1) -> dict:
"""Return co-authorship network data for force-directed graph.
Returns {nodes: [{id, name, org, draft_count}], edges: [{source, target, weight}]}
"""
pairs = db.coauthor_pairs()
top = db.top_authors(limit=100)
# Build node set from authors who have co-authorships
author_info = {name: {"org": aff, "draft_count": cnt} for name, aff, cnt, _ in top}
node_set = set()
edges = []
for a, b, shared in pairs:
if shared >= min_shared:
node_set.add(a)
node_set.add(b)
edges.append({"source": a, "target": b, "weight": shared})
nodes = []
for name in node_set:
info = author_info.get(name, {"org": "", "draft_count": 1})
nodes.append({
"id": name,
"name": name,
"org": info["org"],
"draft_count": info["draft_count"],
})
return {"nodes": nodes, "edges": edges}
def get_similarity_graph(db: Database, threshold: float = 0.75) -> dict:
"""Return draft similarity network for force-directed graph.
Returns {nodes: [{name, title, category, score}],
edges: [{source, target, similarity}],
stats: {node_count, edge_count, avg_similarity}}
"""
import numpy as np
embeddings = db.all_embeddings()
if len(embeddings) < 2:
return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}}
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts with both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 2:
return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}}
matrix = np.array([embeddings[n] for n in names])
# L2-normalize and compute cosine similarity
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
norms[norms == 0] = 1.0
normalized = matrix / norms
sim_matrix = normalized @ normalized.T
# Find pairs above threshold (upper triangle only)
edges = []
node_set = set()
for i in range(len(names)):
for j in range(i + 1, len(names)):
sim = float(sim_matrix[i, j])
if sim >= threshold:
edges.append({"source": names[i], "target": names[j], "similarity": round(sim, 4)})
node_set.add(names[i])
node_set.add(names[j])
# Build nodes from connected drafts only
nodes = []
for name in names:
if name not in node_set:
continue
r = rating_map[name]
d = draft_map.get(name)
nodes.append({
"name": name,
"title": d.title if d else name,
"category": r.categories[0] if r.categories else "Other",
"score": round(r.composite_score, 2),
})
avg_sim = round(sum(e["similarity"] for e in edges) / max(len(edges), 1), 4)
return {
"nodes": nodes,
"edges": edges,
"stats": {"node_count": len(nodes), "edge_count": len(edges), "avg_similarity": avg_sim},
}
def get_cross_org_data(db: Database, limit: int = 20) -> list[dict]:
"""Return cross-org collaboration pairs."""
rows = db.cross_org_collaborations(limit=limit)
return [
{"org_a": a, "org_b": b, "shared_drafts": cnt}
for a, b, cnt in rows
]
def get_author_network_full(db: Database) -> dict:
"""Return enriched co-authorship network with avg scores and cluster info.
Returns {
nodes: [{id, name, org, draft_count, avg_score, drafts: [name,...]}],
edges: [{source, target, weight}],
clusters: [{id, members: [name,...], org_mix: {org: count}, size}],
}
"""
pairs = db.coauthor_pairs()
top = db.top_authors(limit=500)
# Build rating lookup for avg scores
rated = db.drafts_with_ratings(limit=2000)
draft_score = {d.name: r.composite_score for d, r in rated}
# Author info map
author_info = {}
for name, aff, cnt, drafts in top:
scores = [draft_score[dn] for dn in drafts if dn in draft_score]
avg = round(sum(scores) / len(scores), 2) if scores else 0
author_info[name] = {
"org": aff, "draft_count": cnt, "drafts": drafts, "avg_score": avg
}
# Build node set: authors with meaningful collaboration (2+ shared drafts)
node_set = set()
edges = []
for a, b, shared in pairs:
if shared >= 2:
node_set.add(a)
node_set.add(b)
edges.append({"source": a, "target": b, "weight": shared})
# Also include authors with 3+ drafts even if no co-authorships
for name, info in author_info.items():
if info["draft_count"] >= 3:
node_set.add(name)
nodes = []
for name in node_set:
info = author_info.get(name, {"org": "", "draft_count": 1, "drafts": [], "avg_score": 0})
nodes.append({
"id": name,
"name": name,
"org": info["org"],
"draft_count": info["draft_count"],
"avg_score": info["avg_score"],
"drafts": info["drafts"][:8], # cap for JSON size
})
# Cluster detection via connected components (BFS)
adjacency: dict[str, set[str]] = defaultdict(set)
for e in edges:
adjacency[e["source"]].add(e["target"])
adjacency[e["target"]].add(e["source"])
visited: set[str] = set()
clusters = []
for node in sorted(node_set):
if node in visited:
continue
component: list[str] = []
queue = [node]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adjacency.get(current, []):
if neighbor not in visited:
queue.append(neighbor)
if len(component) >= 2:
org_mix: dict[str, int] = Counter()
cluster_drafts: dict[str, str] = {} # name -> title
for m in component:
org = author_info.get(m, {}).get("org", "")
if org:
org_mix[org] += 1
for dn in author_info.get(m, {}).get("drafts", []):
if dn not in cluster_drafts:
d = db.get_draft(dn)
cluster_drafts[dn] = d.title[:80] if d else dn
clusters.append({
"id": len(clusters),
"members": component,
"org_mix": dict(org_mix.most_common()),
"size": len(component),
"drafts": [{"name": n, "title": t} for n, t in list(cluster_drafts.items())[:15]],
"draft_count": len(cluster_drafts),
})
clusters.sort(key=lambda c: c["size"], reverse=True)
return {"nodes": nodes, "edges": edges, "clusters": clusters}
def get_idea_clusters(db: Database) -> dict:
"""Cluster ideas by embedding similarity, return clusters + t-SNE scatter.
Uses Ward linkage on L2-normalized embeddings (approximates cosine) with
a target of ~30 clusters for readable groupings. Enriches each cluster
with WG info and category breakdown.
"""
import json as _json
import numpy as np
from sklearn.preprocessing import normalize as sk_normalize
embeddings = db.all_idea_embeddings()
if not embeddings:
return {"clusters": [], "scatter": [], "stats": {"total": 0, "clustered": 0, "num_clusters": 0}, "empty": True}
# Fetch ideas with IDs for metadata lookup
rows = db.conn.execute("SELECT id, title, description, idea_type, draft_name FROM ideas").fetchall()
idea_map = {r["id"]: {"title": r["title"], "description": r["description"],
"type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows}
# Draft -> WG and category lookup
draft_rows = db.conn.execute('SELECT name, "group", title FROM drafts').fetchall()
draft_wg = {r["name"]: r["group"] or "none" for r in draft_rows}
draft_title_map = {r["name"]: r["title"] for r in draft_rows}
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings").fetchall()
draft_cats: dict[str, list[str]] = {}
for r in rating_rows:
try:
draft_cats[r["draft_name"]] = _json.loads(r["categories"]) if r["categories"] else []
except (_json.JSONDecodeError, TypeError):
draft_cats[r["draft_name"]] = []
# Build matrix from embeddings that have matching ideas
idea_ids = [iid for iid in embeddings if iid in idea_map]
if len(idea_ids) < 5:
return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True}
matrix = np.array([embeddings[iid] for iid in idea_ids])
matrix_norm = sk_normalize(matrix)
# Ward clustering on normalized vectors — target ~30 clusters scaled by dataset size
n_target = max(10, min(40, len(idea_ids) // 12))
try:
from sklearn.cluster import AgglomerativeClustering
clustering = AgglomerativeClustering(n_clusters=n_target, linkage='ward')
labels = clustering.fit_predict(matrix_norm)
except Exception:
return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True}
# Build cluster data
cluster_ideas_map: dict[int, list] = defaultdict(list)
for idx, iid in enumerate(idea_ids):
cluster_ideas_map[labels[idx]].append(iid)
stop = {"a", "an", "the", "of", "for", "in", "to", "and", "or", "with",
"on", "by", "is", "as", "at", "from", "that", "this", "it",
"based", "using", "protocol", "mechanism", "framework", "system",
"network", "agent", "agents"}
clusters = []
for cid in sorted(cluster_ideas_map.keys()):
members = cluster_ideas_map[cid]
ideas_in_cluster = [idea_map[iid] for iid in members if iid in idea_map]
if len(ideas_in_cluster) < 2:
continue
# Theme: most common significant words in titles
words = Counter()
for idea in ideas_in_cluster:
for w in idea["title"].lower().split():
w_clean = w.strip("()[].,;:-\"'")
if len(w_clean) > 2 and w_clean not in stop:
words[w_clean] += 1
top_words = [w for w, _ in words.most_common(4)]
theme = " ".join(top_words).title() if top_words else f"Cluster {cid}"
drafts = list({idea["draft_name"] for idea in ideas_in_cluster})
# Enrich: WG breakdown
wg_counts: dict[str, int] = Counter()
cat_counts: dict[str, int] = Counter()
for dname in drafts:
wg = draft_wg.get(dname, "none")
wg_counts[wg] += 1
for cat in draft_cats.get(dname, []):
cat_counts[cat] += 1
wg_list = [{"wg": wg, "count": cnt} for wg, cnt in wg_counts.most_common(5)]
cat_list = [{"cat": cat, "count": cnt} for cat, cnt in cat_counts.most_common(3)]
cross_wg = len([w for w in wg_counts if w != "none"]) >= 2
clusters.append({
"id": len(clusters),
"theme": theme,
"size": len(ideas_in_cluster),
"ideas": ideas_in_cluster[:20],
"drafts": drafts,
"wgs": wg_list,
"categories": cat_list,
"cross_wg": cross_wg,
"wg_count": len(wg_counts),
})
clusters.sort(key=lambda c: c["size"], reverse=True)
# Build mapping: original cluster label -> sorted index
# Each cluster remembers which original label it came from via its member ids
old_label_to_new: dict[int, int] = {}
for new_idx, c in enumerate(clusters):
c["id"] = new_idx
# Find original label for any member of this cluster
for old_cid, members in cluster_ideas_map.items():
if members and members[0] in [iid for iid in members if iid in idea_map]:
member_titles = {idea_map[m]["title"] for m in members if m in idea_map}
c_titles = {idea["title"] for idea in c["ideas"]}
if member_titles == c_titles or (member_titles & c_titles and len(members) == c["size"]):
old_label_to_new[old_cid] = new_idx
break
# Fallback: build from idea_id -> label mapping
iid_to_new: dict[int, int] = {}
for old_cid, members in cluster_ideas_map.items():
new_idx = old_label_to_new.get(old_cid, old_cid)
for iid in members:
iid_to_new[iid] = new_idx
# t-SNE for scatter
scatter = []
try:
from sklearn.manifold import TSNE
perp = min(30, len(idea_ids) - 1)
tsne = TSNE(n_components=2, perplexity=perp, random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix_norm)
for idx, iid in enumerate(idea_ids):
info = idea_map.get(iid, {})
scatter.append({
"x": round(float(coords[idx, 0]), 3),
"y": round(float(coords[idx, 1]), 3),
"cluster_id": iid_to_new.get(iid, int(labels[idx])),
"title": info.get("title", ""),
"draft_name": info.get("draft_name", ""),
"wg": draft_wg.get(info.get("draft_name", ""), ""),
})
except Exception:
pass
total = len(idea_ids)
clustered = sum(c["size"] for c in clusters)
return {
"clusters": clusters,
"scatter": scatter,
"stats": {"total": total, "clustered": clustered, "num_clusters": len(clusters)},
"empty": False,
}
def get_timeline_animation_data(db: Database) -> dict:
"""Compute t-SNE on all drafts, return points with month info + category_monthly.
t-SNE is computed once on ALL drafts so coordinates are stable across
animation frames. Each point carries a ``month`` field (YYYY-MM) so the
front-end can build cumulative animation frames.
"""
import numpy as np
embeddings = db.all_embeddings()
if len(embeddings) < 5:
return {"points": [], "months": [], "category_monthly": {}}
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts that have both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 5:
return {"points": [], "months": [], "category_monthly": {}}
matrix = np.array([embeddings[n] for n in names])
try:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1),
random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix)
except Exception:
return {"points": [], "months": [], "category_monthly": {}}
# Build points with month
points = []
month_set: set[str] = set()
category_monthly: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for i, name in enumerate(names):
r = rating_map[name]
d = draft_map.get(name)
month = (d.time[:7] if d and d.time else "unknown")
cat = r.categories[0] if r.categories else "Other"
month_set.add(month)
category_monthly[month][cat] += 1
points.append({
"name": name,
"title": d.title if d else name,
"x": round(float(coords[i, 0]), 3),
"y": round(float(coords[i, 1]), 3),
"category": cat,
"score": round(r.composite_score, 2),
"month": month,
})
months = sorted(month_set)
# Convert defaultdict to plain dict for JSON
cat_monthly_plain = {m: dict(cats) for m, cats in category_monthly.items()}
return {
"points": points,
"months": months,
"category_monthly": cat_monthly_plain,
}
def get_monitor_status(db: Database) -> dict:
"""Return monitoring status data for dashboard."""
runs = db.get_monitor_runs(limit=20)
last = runs[0] if runs else None
total_drafts = db.count_drafts()
rated_count = len(db.drafts_with_ratings(limit=10000))
unrated = len(db.unrated_drafts(limit=9999))
unembedded = len(db.drafts_without_embeddings(limit=9999))
embedded_count = total_drafts - unembedded
no_ideas = len(db.drafts_without_ideas(limit=9999))
ideas_count = total_drafts - no_ideas
idea_total = db.idea_count()
gap_count = len(db.all_gaps())
input_tok, output_tok = db.total_tokens_used()
# Estimate cost (Sonnet pricing: $3/M input, $15/M output)
est_cost = (input_tok * 3.0 / 1_000_000) + (output_tok * 15.0 / 1_000_000)
return {
"last_run": last,
"runs": runs,
"unprocessed": {"unrated": unrated, "unembedded": unembedded, "no_ideas": no_ideas},
"total_runs": len(runs),
"pipeline": {
"total_drafts": total_drafts,
"rated": rated_count,
"embedded": embedded_count,
"with_ideas": ideas_count,
"idea_total": idea_total,
"gap_count": gap_count,
},
"cost": {
"input_tokens": input_tok,
"output_tokens": output_tok,
"estimated_usd": round(est_cost, 2),
},
}
def get_citation_graph(db: Database, min_refs: int = 2) -> dict:
"""Return citation network data for force-directed graph.
Returns {nodes: [{id, type, title, influence, ...}],
edges: [{source, target}],
stats: {node_count, edge_count, ...}}
"""
# Get all references
rows = db.conn.execute(
"SELECT draft_name, ref_type, ref_id FROM draft_refs"
).fetchall()
# Count in-degree for each referenced item
in_degree: dict[str, int] = Counter()
edges_raw = []
for r in rows:
ref_key = f"{r['ref_type']}:{r['ref_id']}"
in_degree[ref_key] += 1
edges_raw.append((r["draft_name"], ref_key))
# Also count drafts as source nodes
draft_out: dict[str, int] = Counter()
for draft_name, _ in edges_raw:
draft_out[draft_name] += 1
# Get draft titles for labeling
draft_rows = db.conn.execute("SELECT name, title FROM drafts").fetchall()
draft_titles = {r["name"]: r["title"] for r in draft_rows}
# Get rating categories for draft coloring
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings").fetchall()
draft_cats = {}
for r in rating_rows:
try:
cats = json.loads(r["categories"]) if r["categories"] else []
draft_cats[r["draft_name"]] = cats[0] if cats else "Other"
except Exception:
draft_cats[r["draft_name"]] = "Other"
# Filter: keep RFCs with min_refs+ references and all drafts that reference them
top_refs = {k: v for k, v in in_degree.items() if v >= min_refs}
# Build node set
node_set = set()
filtered_edges = []
for draft_name, ref_key in edges_raw:
if ref_key in top_refs:
node_set.add(draft_name)
node_set.add(ref_key)
filtered_edges.append({"source": draft_name, "target": ref_key})
# Limit to ~200 nodes max for readability
if len(node_set) > 250:
# Keep only refs with higher in-degree
sorted_refs = sorted(top_refs.items(), key=lambda x: x[1], reverse=True)
keep_refs = set(k for k, _ in sorted_refs[:80])
node_set = set()
filtered_edges = []
for draft_name, ref_key in edges_raw:
if ref_key in keep_refs:
node_set.add(draft_name)
node_set.add(ref_key)
filtered_edges.append({"source": draft_name, "target": ref_key})
# Build nodes
nodes = []
for nid in node_set:
if ":" in nid and not nid.startswith("draft-"):
# It's a reference node (rfc:1234, bcp:14, etc.)
ref_type, ref_id = nid.split(":", 1)
influence = in_degree.get(nid, 0)
if ref_type == "rfc":
try:
title = f"RFC {int(ref_id)}"
except ValueError:
title = f"RFC {ref_id}"
else:
title = f"{ref_type.upper()} {ref_id}"
nodes.append({
"id": nid,
"type": ref_type,
"title": title,
"influence": influence,
"ref_id": ref_id,
})
else:
# It's a draft node
influence = in_degree.get(nid, 0) + draft_out.get(nid, 0)
nodes.append({
"id": nid,
"type": "draft",
"title": draft_titles.get(nid, nid),
"influence": draft_out.get(nid, 0),
"category": draft_cats.get(nid, "Other"),
})
# Stats
rfc_count = sum(1 for n in nodes if n["type"] == "rfc")
draft_count = sum(1 for n in nodes if n["type"] == "draft")
return {
"nodes": nodes,
"edges": filtered_edges,
"stats": {
"node_count": len(nodes),
"edge_count": len(filtered_edges),
"rfc_count": rfc_count,
"draft_count": draft_count,
},
}
def global_search(db: Database, query: str) -> dict:
"""Search across drafts (FTS5), ideas, authors, and gaps.
Returns {drafts: [...], ideas: [...], authors: [...], gaps: [...]}.
"""
results: dict = {"drafts": [], "ideas": [], "authors": [], "gaps": []}
if not query or not query.strip():
return results
q = query.strip()
# 1. Drafts via FTS5
try:
fts_query = " ".join(f'"{w}"' for w in q.split() if w)
rows = db.conn.execute(
"""SELECT d.name, d.title, d.abstract, d.time, d."group"
FROM drafts d
JOIN drafts_fts f ON d.rowid = f.rowid
WHERE drafts_fts MATCH ?
ORDER BY rank
LIMIT 50""",
(fts_query,),
).fetchall()
for r in rows:
results["drafts"].append({
"name": r["name"],
"title": r["title"],
"abstract": (r["abstract"] or "")[:200],
"date": r["time"],
"group": r["group"] or "individual",
})
except Exception:
# FTS5 match can fail on certain query syntax; fall back to LIKE
like = f"%{q}%"
rows = db.conn.execute(
"""SELECT name, title, abstract, time, "group" FROM drafts
WHERE title LIKE ? OR name LIKE ? OR abstract LIKE ?
LIMIT 50""",
(like, like, like),
).fetchall()
for r in rows:
results["drafts"].append({
"name": r["name"],
"title": r["title"],
"abstract": (r["abstract"] or "")[:200],
"date": r["time"],
"group": r["group"] or "individual",
})
# 2. Ideas via LIKE
like = f"%{q}%"
rows = db.conn.execute(
"""SELECT id, title, description, idea_type, draft_name FROM ideas
WHERE title LIKE ? OR description LIKE ?
ORDER BY id LIMIT 50""",
(like, like),
).fetchall()
for r in rows:
results["ideas"].append({
"id": r["id"],
"title": r["title"],
"description": (r["description"] or "")[:200],
"type": r["idea_type"],
"draft_name": r["draft_name"],
})
# 3. Authors via LIKE
rows = db.conn.execute(
"""SELECT person_id, name, affiliation FROM authors
WHERE name LIKE ? OR affiliation LIKE ?
ORDER BY name LIMIT 50""",
(like, like),
).fetchall()
for r in rows:
results["authors"].append({
"person_id": r["person_id"],
"name": r["name"],
"affiliation": r["affiliation"] or "",
})
# 4. Gaps via LIKE
rows = db.conn.execute(
"""SELECT id, topic, description, category, severity FROM gaps
WHERE topic LIKE ? OR description LIKE ?
ORDER BY id LIMIT 50""",
(like, like),
).fetchall()
for r in rows:
results["gaps"].append({
"id": r["id"],
"topic": r["topic"],
"description": (r["description"] or "")[:200],
"category": r["category"],
"severity": r["severity"],
})
return results
def get_landscape_tsne(db: Database) -> list[dict]:
"""Compute t-SNE from embeddings, return [{name, title, x, y, category, score}].
Uses cached coordinates if available, otherwise computes fresh.
"""
import numpy as np
embeddings = db.all_embeddings()
if len(embeddings) < 5:
return []
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts that have both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 5:
return []
matrix = np.array([embeddings[n] for n in names])
try:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1),
random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix)
except Exception:
return []
result = []
for i, name in enumerate(names):
r = rating_map[name]
d = draft_map.get(name)
result.append({
"name": name,
"title": d.title if d else name,
"x": round(float(coords[i, 0]), 3),
"y": round(float(coords[i, 1]), 3),
"category": r.categories[0] if r.categories else "Other",
"score": round(r.composite_score, 2),
})
return result
def get_comparison_data(db: Database, names: list[str]) -> dict | None:
"""Get comparison data for a list of drafts.
Returns {
drafts: [{name, title, abstract, rating, ideas, refs, ...}],
shared_ideas: [{title, drafts: [name,...]}],
unique_ideas: {name: [{title, description}]},
shared_refs: [{type, id, drafts: [name,...]}],
unique_refs: {name: [{type, id}]},
similarities: [{a, b, similarity}],
comparison_text: str | None,
}
"""
import numpy as np
drafts_data = []
all_ideas: dict[str, list[dict]] = {}
all_refs: dict[str, list[tuple[str, str]]] = {}
for name in names:
detail = get_draft_detail(db, name)
if not detail:
continue
drafts_data.append(detail)
all_ideas[name] = detail.get("ideas", [])
all_refs[name] = [(r["type"], r["id"]) for r in detail.get("refs", [])]
if len(drafts_data) < 2:
return None
# Find shared vs unique ideas (by title similarity)
idea_title_drafts: dict[str, list[str]] = {}
for name, ideas in all_ideas.items():
for idea in ideas:
title_lower = idea["title"].lower().strip()
if title_lower not in idea_title_drafts:
idea_title_drafts[title_lower] = []
idea_title_drafts[title_lower].append(name)
shared_ideas = [
{"title": title, "drafts": draft_list}
for title, draft_list in idea_title_drafts.items()
if len(set(draft_list)) > 1
]
unique_ideas: dict[str, list[dict]] = {}
for name, ideas in all_ideas.items():
unique = []
for idea in ideas:
title_lower = idea["title"].lower().strip()
if len(set(idea_title_drafts.get(title_lower, []))) <= 1:
unique.append({"title": idea["title"], "description": idea.get("description", "")})
unique_ideas[name] = unique
# Find shared vs unique references
ref_drafts: dict[tuple[str, str], list[str]] = {}
for name, refs in all_refs.items():
for ref in refs:
if ref not in ref_drafts:
ref_drafts[ref] = []
ref_drafts[ref].append(name)
shared_refs = [
{"type": ref[0], "id": ref[1], "drafts": draft_list}
for ref, draft_list in ref_drafts.items()
if len(set(draft_list)) > 1
]
unique_refs: dict[str, list[dict]] = {}
for name, refs in all_refs.items():
unique = []
for ref in refs:
if len(set(ref_drafts.get(ref, []))) <= 1:
unique.append({"type": ref[0], "id": ref[1]})
unique_refs[name] = unique
# Pairwise embedding similarities
embeddings = db.all_embeddings()
similarities = []
valid_names = [d["name"] for d in drafts_data]
for i in range(len(valid_names)):
for j in range(i + 1, len(valid_names)):
a, b = valid_names[i], valid_names[j]
if a in embeddings and b in embeddings:
vec_a = embeddings[a]
vec_b = embeddings[b]
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
sim = float(dot / norm) if norm > 0 else 0.0
similarities.append({"a": a, "b": b, "similarity": round(sim, 4)})
return {
"drafts": drafts_data,
"shared_ideas": shared_ideas,
"unique_ideas": unique_ideas,
"shared_refs": shared_refs,
"unique_refs": unique_refs,
"similarities": similarities,
"comparison_text": None,
}
def get_ask_search(db: Database, question: str, top_k: int = 5) -> dict:
"""Search-only (free) — returns sources + cached answer if available."""
from ietf_analyzer.config import Config
from ietf_analyzer.search import HybridSearch
config = Config.load()
searcher = HybridSearch(config, db)
return searcher.search_only(question, top_k=top_k)
def get_ask_synthesize(db: Database, question: str, top_k: int = 5, cheap: bool = True) -> dict:
"""Run Claude synthesis (costs tokens, result is cached permanently)."""
from ietf_analyzer.config import Config
from ietf_analyzer.search import HybridSearch
config = Config.load()
searcher = HybridSearch(config, db)
return searcher.ask(question, top_k=top_k, cheap=cheap)