"""Data access layer for the web dashboard. Thin wrapper around ietf_analyzer.db.Database that returns plain dicts ready for JSON serialization or Jinja2 template rendering. """ from __future__ import annotations import json import sys from collections import Counter, defaultdict from pathlib import Path # Add project root to path so we can import ietf_analyzer _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root / "src")) from ietf_analyzer.config import Config from ietf_analyzer.db import Database def get_db() -> Database: """Get a Database instance using default config.""" config = Config.load() return Database(config) def get_overview_stats(db: Database) -> dict: """Return high-level stats for the dashboard home page.""" total_drafts = db.count_drafts() rated_pairs = db.drafts_with_ratings(limit=1000) rated_count = len(rated_pairs) author_count = db.author_count() idea_count = db.idea_count() gaps = db.all_gaps() input_tok, output_tok = db.total_tokens_used() return { "total_drafts": total_drafts, "rated_count": rated_count, "author_count": author_count, "idea_count": idea_count, "gap_count": len(gaps), "input_tokens": input_tok, "output_tokens": output_tok, } def get_category_counts(db: Database) -> dict[str, int]: """Return {category: draft_count} for all categories.""" pairs = db.drafts_with_ratings(limit=1000) counts: dict[str, int] = Counter() for _, rating in pairs: for cat in rating.categories: counts[cat] += 1 return dict(counts.most_common()) def get_drafts_page( db: Database, page: int = 1, per_page: int = 50, search: str = "", category: str = "", min_score: float = 0.0, sort: str = "score", sort_dir: str = "desc", ) -> dict: """Return a paginated, filtered list of drafts with ratings. Returns dict with keys: drafts, total, page, per_page, pages. """ pairs = db.drafts_with_ratings(limit=1000) # Filter filtered = [] for draft, rating in pairs: if min_score > 0 and rating.composite_score < min_score: continue if category and category not in rating.categories: continue if search: haystack = f"{draft.name} {draft.title} {rating.summary}".lower() if not all(w in haystack for w in search.lower().split()): continue filtered.append((draft, rating)) # Sort sort_keys = { "score": lambda p: p[1].composite_score, "name": lambda p: p[0].name, "date": lambda p: p[0].time or "", "novelty": lambda p: p[1].novelty, "maturity": lambda p: p[1].maturity, "relevance": lambda p: p[1].relevance, "overlap": lambda p: p[1].overlap, "momentum": lambda p: p[1].momentum, } key_fn = sort_keys.get(sort, sort_keys["score"]) reverse = sort_dir == "desc" filtered.sort(key=key_fn, reverse=reverse) total = len(filtered) pages = max(1, (total + per_page - 1) // per_page) page = max(1, min(page, pages)) start = (page - 1) * per_page page_items = filtered[start : start + per_page] drafts = [] for draft, rating in page_items: drafts.append({ "name": draft.name, "title": draft.title, "date": draft.date, "url": draft.datatracker_url, "pages": draft.pages or 0, "group": draft.group or "individual", "score": round(rating.composite_score, 2), "novelty": rating.novelty, "maturity": rating.maturity, "overlap": rating.overlap, "momentum": rating.momentum, "relevance": rating.relevance, "categories": rating.categories, "summary": rating.summary, }) return { "drafts": drafts, "total": total, "page": page, "per_page": per_page, "pages": pages, } def get_draft_detail(db: Database, name: str) -> dict | None: """Return full detail for a single draft.""" draft = db.get_draft(name) if not draft: return None rating = db.get_rating(name) authors = db.get_authors_for_draft(name) ideas = db.get_ideas_for_draft(name) refs = db.get_refs_for_draft(name) result = { "name": draft.name, "title": draft.title, "rev": draft.rev, "abstract": draft.abstract, "date": draft.date, "time": draft.time, "url": draft.datatracker_url, "text_url": draft.text_url, "pages": draft.pages, "words": draft.words, "group": draft.group or "individual", "categories": draft.categories, "tags": draft.tags, "authors": [ {"name": a.name, "affiliation": a.affiliation, "person_id": a.person_id} for a in authors ], "ideas": ideas, "refs": [{"type": t, "id": rid} for t, rid in refs], } if rating: result["rating"] = { "score": round(rating.composite_score, 2), "novelty": rating.novelty, "maturity": rating.maturity, "overlap": rating.overlap, "momentum": rating.momentum, "relevance": rating.relevance, "summary": rating.summary, "novelty_note": rating.novelty_note, "maturity_note": rating.maturity_note, "overlap_note": rating.overlap_note, "momentum_note": rating.momentum_note, "relevance_note": rating.relevance_note, "categories": rating.categories, } return result def get_rating_distributions(db: Database) -> dict: """Return arrays for each rating dimension, suitable for Plotly.""" pairs = db.drafts_with_ratings(limit=1000) dims = { "novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": [], "scores": [], "categories": [], "names": [], } for draft, rating in pairs: dims["novelty"].append(rating.novelty) dims["maturity"].append(rating.maturity) dims["overlap"].append(rating.overlap) dims["momentum"].append(rating.momentum) dims["relevance"].append(rating.relevance) dims["scores"].append(round(rating.composite_score, 2)) dims["categories"].append(rating.categories[0] if rating.categories else "Other") dims["names"].append(draft.name) return dims def get_timeline_data(db: Database) -> dict: """Return monthly counts by category for timeline chart.""" pairs = db.drafts_with_ratings(limit=1000) all_drafts = db.list_drafts(limit=1000, order_by="time ASC") rating_map = {d.name: r for d, r in pairs} month_cat: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for d in all_drafts: month = d.time[:7] if d.time else "unknown" r = rating_map.get(d.name) if r: cat = r.categories[0] if r.categories else "Other" month_cat[month][cat] += 1 months = sorted(month_cat.keys()) cat_totals: Counter = Counter() for mc in month_cat.values(): for c, cnt in mc.items(): cat_totals[c] += cnt top_cats = [c for c, _ in cat_totals.most_common(10)] series = {} for cat in top_cats: series[cat] = [month_cat[m].get(cat, 0) for m in months] return {"months": months, "series": series, "categories": top_cats} def get_ideas_by_type(db: Database) -> dict: """Return ideas grouped by type with counts.""" all_ideas = db.all_ideas() type_counts = Counter(i.get("type", "other") or "other" for i in all_ideas) return { "total": len(all_ideas), "by_type": dict(type_counts.most_common()), "ideas": all_ideas, } def get_all_gaps(db: Database) -> list[dict]: """Return all gap analysis results.""" return db.all_gaps() def get_gap_detail(db: Database, gap_id: int) -> dict | None: """Return a single gap by ID, or None if not found.""" gaps = db.all_gaps() for g in gaps: if g["id"] == gap_id: return g return None def get_generated_drafts() -> list[dict]: """Return list of pre-generated draft files in data/reports/generated-drafts/.""" drafts_dir = _project_root / "data" / "reports" / "generated-drafts" if not drafts_dir.exists(): return [] results = [] for f in sorted(drafts_dir.glob("draft-*.txt")): # Extract title from first non-empty content line after header title = f.stem text = f.read_text(errors="replace") for line in text.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("Internet-Draft") and \ not stripped.startswith("Intended status") and \ not stripped.startswith("Expires:") and stripped != "": title = stripped break results.append({ "filename": f.name, "stem": f.stem, "title": title, "size": f.stat().st_size, "path": str(f), }) return results def read_generated_draft(filename: str) -> str | None: """Read a generated draft file by filename. Returns text or None.""" drafts_dir = _project_root / "data" / "reports" / "generated-drafts" path = drafts_dir / filename if not path.exists() or not path.is_file(): return None # Safety: ensure we're not reading outside the directory if not str(path.resolve()).startswith(str(drafts_dir.resolve())): return None return path.read_text(errors="replace") def get_top_authors(db: Database, limit: int = 30) -> list[dict]: """Return top authors by draft count.""" rows = db.top_authors(limit=limit) return [ {"name": name, "affiliation": aff, "draft_count": cnt, "drafts": drafts} for name, aff, cnt, drafts in rows ] def get_org_data(db: Database, limit: int = 20) -> list[dict]: """Return organization contribution data.""" rows = db.top_orgs(limit=limit) return [ {"org": org, "author_count": authors, "draft_count": drafts} for org, authors, drafts in rows ] def get_category_radar_data(db: Database) -> dict: """Return average rating profiles per category for radar chart.""" pairs = db.drafts_with_ratings(limit=1000) cat_ratings: dict[str, list] = defaultdict(list) for _, r in pairs: for c in r.categories: cat_ratings[c].append(r) top_cats = sorted(cat_ratings.keys(), key=lambda c: len(cat_ratings[c]), reverse=True)[:8] result = {} for cat in top_cats: ratings = cat_ratings[cat] n = len(ratings) result[cat] = { "count": n, "novelty": round(sum(r.novelty for r in ratings) / n, 2), "maturity": round(sum(r.maturity for r in ratings) / n, 2), "relevance": round(sum(r.relevance for r in ratings) / n, 2), "momentum": round(sum(r.momentum for r in ratings) / n, 2), "low_overlap": round(sum(6 - r.overlap for r in ratings) / n, 2), } return result def get_score_histogram(db: Database) -> list[float]: """Return list of composite scores for histogram.""" pairs = db.drafts_with_ratings(limit=1000) return [round(r.composite_score, 2) for _, r in pairs] def get_coauthor_network(db: Database, min_shared: int = 1) -> dict: """Return co-authorship network data for force-directed graph. Returns {nodes: [{id, name, org, draft_count}], edges: [{source, target, weight}]} """ pairs = db.coauthor_pairs() top = db.top_authors(limit=100) # Build node set from authors who have co-authorships author_info = {name: {"org": aff, "draft_count": cnt} for name, aff, cnt, _ in top} node_set = set() edges = [] for a, b, shared in pairs: if shared >= min_shared: node_set.add(a) node_set.add(b) edges.append({"source": a, "target": b, "weight": shared}) nodes = [] for name in node_set: info = author_info.get(name, {"org": "", "draft_count": 1}) nodes.append({ "id": name, "name": name, "org": info["org"], "draft_count": info["draft_count"], }) return {"nodes": nodes, "edges": edges} def get_similarity_graph(db: Database, threshold: float = 0.75) -> dict: """Return draft similarity network for force-directed graph. Returns {nodes: [{name, title, category, score}], edges: [{source, target, similarity}], stats: {node_count, edge_count, avg_similarity}} """ import numpy as np embeddings = db.all_embeddings() if len(embeddings) < 2: return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}} pairs = db.drafts_with_ratings(limit=1000) rating_map = {d.name: r for d, r in pairs} draft_map = {d.name: d for d, _ in pairs} # Filter to drafts with both embeddings and ratings names = [n for n in embeddings if n in rating_map] if len(names) < 2: return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}} matrix = np.array([embeddings[n] for n in names]) # L2-normalize and compute cosine similarity norms = np.linalg.norm(matrix, axis=1, keepdims=True) norms[norms == 0] = 1.0 normalized = matrix / norms sim_matrix = normalized @ normalized.T # Find pairs above threshold (upper triangle only) edges = [] node_set = set() for i in range(len(names)): for j in range(i + 1, len(names)): sim = float(sim_matrix[i, j]) if sim >= threshold: edges.append({"source": names[i], "target": names[j], "similarity": round(sim, 4)}) node_set.add(names[i]) node_set.add(names[j]) # Build nodes from connected drafts only nodes = [] for name in names: if name not in node_set: continue r = rating_map[name] d = draft_map.get(name) nodes.append({ "name": name, "title": d.title if d else name, "category": r.categories[0] if r.categories else "Other", "score": round(r.composite_score, 2), }) avg_sim = round(sum(e["similarity"] for e in edges) / max(len(edges), 1), 4) return { "nodes": nodes, "edges": edges, "stats": {"node_count": len(nodes), "edge_count": len(edges), "avg_similarity": avg_sim}, } def get_cross_org_data(db: Database, limit: int = 20) -> list[dict]: """Return cross-org collaboration pairs.""" rows = db.cross_org_collaborations(limit=limit) return [ {"org_a": a, "org_b": b, "shared_drafts": cnt} for a, b, cnt in rows ] def get_author_network_full(db: Database) -> dict: """Return enriched co-authorship network with avg scores and cluster info. Returns { nodes: [{id, name, org, draft_count, avg_score, drafts: [name,...]}], edges: [{source, target, weight}], clusters: [{id, members: [name,...], org_mix: {org: count}, size}], } """ pairs = db.coauthor_pairs() top = db.top_authors(limit=500) # Build rating lookup for avg scores rated = db.drafts_with_ratings(limit=2000) draft_score = {d.name: r.composite_score for d, r in rated} # Author info map author_info = {} for name, aff, cnt, drafts in top: scores = [draft_score[dn] for dn in drafts if dn in draft_score] avg = round(sum(scores) / len(scores), 2) if scores else 0 author_info[name] = { "org": aff, "draft_count": cnt, "drafts": drafts, "avg_score": avg } # Build node set: authors with 2+ drafts OR 1+ co-authorship node_set = set() edges = [] for a, b, shared in pairs: if shared >= 1: node_set.add(a) node_set.add(b) edges.append({"source": a, "target": b, "weight": shared}) # Also include authors with 2+ drafts even if no co-authorships for name, info in author_info.items(): if info["draft_count"] >= 2: node_set.add(name) nodes = [] for name in node_set: info = author_info.get(name, {"org": "", "draft_count": 1, "drafts": [], "avg_score": 0}) nodes.append({ "id": name, "name": name, "org": info["org"], "draft_count": info["draft_count"], "avg_score": info["avg_score"], "drafts": info["drafts"][:8], # cap for JSON size }) # Cluster detection via connected components (BFS) adjacency: dict[str, set[str]] = defaultdict(set) for e in edges: adjacency[e["source"]].add(e["target"]) adjacency[e["target"]].add(e["source"]) visited: set[str] = set() clusters = [] for node in sorted(node_set): if node in visited: continue component: list[str] = [] queue = [node] while queue: current = queue.pop(0) if current in visited: continue visited.add(current) component.append(current) for neighbor in adjacency.get(current, []): if neighbor not in visited: queue.append(neighbor) if len(component) >= 2: org_mix: dict[str, int] = Counter() for m in component: org = author_info.get(m, {}).get("org", "") if org: org_mix[org] += 1 clusters.append({ "id": len(clusters), "members": component, "org_mix": dict(org_mix.most_common()), "size": len(component), }) clusters.sort(key=lambda c: c["size"], reverse=True) return {"nodes": nodes, "edges": edges, "clusters": clusters} def get_idea_clusters(db: Database) -> dict: """Cluster ideas by embedding similarity, return clusters + t-SNE scatter.""" import numpy as np embeddings = db.all_idea_embeddings() if not embeddings: return {"clusters": [], "scatter": [], "stats": {"total": 0, "clustered": 0, "num_clusters": 0}, "empty": True} # Fetch ideas with IDs for metadata lookup rows = db.conn.execute("SELECT id, title, description, idea_type, draft_name FROM ideas").fetchall() idea_map = {r["id"]: {"title": r["title"], "description": r["description"], "type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows} # Build matrix from embeddings that have matching ideas idea_ids = [iid for iid in embeddings if iid in idea_map] if len(idea_ids) < 5: return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True} matrix = np.array([embeddings[iid] for iid in idea_ids]) # Agglomerative clustering with cosine distance try: from sklearn.cluster import AgglomerativeClustering clustering = AgglomerativeClustering( n_clusters=None, distance_threshold=0.5, metric='cosine', linkage='average', ) labels = clustering.fit_predict(matrix) except Exception: return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True} # Build cluster data cluster_ideas: dict[int, list] = defaultdict(list) for idx, iid in enumerate(idea_ids): cluster_ideas[labels[idx]].append(iid) # Filter to clusters with 2+ ideas stop = {"a", "an", "the", "of", "for", "in", "to", "and", "or", "with", "on", "by", "is", "as", "at", "from", "that", "this", "it"} clusters = [] for cid in sorted(cluster_ideas.keys()): members = cluster_ideas[cid] if len(members) < 2: continue ideas_in_cluster = [idea_map[iid] for iid in members if iid in idea_map] # Theme: most common significant words in titles words = Counter() for idea in ideas_in_cluster: for w in idea["title"].lower().split(): w_clean = w.strip("()[].,;:-\"'") if len(w_clean) > 2 and w_clean not in stop: words[w_clean] += 1 top_words = [w for w, _ in words.most_common(4)] theme = " ".join(top_words).title() if top_words else f"Cluster {cid}" drafts = list({idea["draft_name"] for idea in ideas_in_cluster}) clusters.append({ "id": len(clusters), "theme": theme, "size": len(ideas_in_cluster), "ideas": ideas_in_cluster[:20], "drafts": drafts, }) # t-SNE for scatter scatter = [] try: from sklearn.manifold import TSNE perp = min(30, len(idea_ids) - 1) tsne = TSNE(n_components=2, perplexity=perp, random_state=42, max_iter=500) coords = tsne.fit_transform(matrix) for idx, iid in enumerate(idea_ids): info = idea_map.get(iid, {}) scatter.append({ "x": round(float(coords[idx, 0]), 3), "y": round(float(coords[idx, 1]), 3), "cluster_id": int(labels[idx]), "title": info.get("title", ""), "draft_name": info.get("draft_name", ""), }) except Exception: pass total = len(idea_ids) clustered = sum(c["size"] for c in clusters) return { "clusters": clusters, "scatter": scatter, "stats": {"total": total, "clustered": clustered, "num_clusters": len(clusters)}, "empty": False, } def get_timeline_animation_data(db: Database) -> dict: """Compute t-SNE on all drafts, return points with month info + category_monthly. t-SNE is computed once on ALL drafts so coordinates are stable across animation frames. Each point carries a ``month`` field (YYYY-MM) so the front-end can build cumulative animation frames. """ import numpy as np embeddings = db.all_embeddings() if len(embeddings) < 5: return {"points": [], "months": [], "category_monthly": {}} pairs = db.drafts_with_ratings(limit=1000) rating_map = {d.name: r for d, r in pairs} draft_map = {d.name: d for d, _ in pairs} # Filter to drafts that have both embeddings and ratings names = [n for n in embeddings if n in rating_map] if len(names) < 5: return {"points": [], "months": [], "category_monthly": {}} matrix = np.array([embeddings[n] for n in names]) try: from sklearn.manifold import TSNE tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1), random_state=42, max_iter=500) coords = tsne.fit_transform(matrix) except Exception: return {"points": [], "months": [], "category_monthly": {}} # Build points with month points = [] month_set: set[str] = set() category_monthly: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) for i, name in enumerate(names): r = rating_map[name] d = draft_map.get(name) month = (d.time[:7] if d and d.time else "unknown") cat = r.categories[0] if r.categories else "Other" month_set.add(month) category_monthly[month][cat] += 1 points.append({ "name": name, "title": d.title if d else name, "x": round(float(coords[i, 0]), 3), "y": round(float(coords[i, 1]), 3), "category": cat, "score": round(r.composite_score, 2), "month": month, }) months = sorted(month_set) # Convert defaultdict to plain dict for JSON cat_monthly_plain = {m: dict(cats) for m, cats in category_monthly.items()} return { "points": points, "months": months, "category_monthly": cat_monthly_plain, } def get_monitor_status(db: Database) -> dict: """Return monitoring status data for dashboard.""" runs = db.get_monitor_runs(limit=20) last = runs[0] if runs else None unrated = len(db.unrated_drafts(limit=9999)) unembedded = len(db.drafts_without_embeddings(limit=9999)) no_ideas = len(db.drafts_without_ideas(limit=9999)) return { "last_run": last, "runs": runs, "unprocessed": {"unrated": unrated, "unembedded": unembedded, "no_ideas": no_ideas}, "total_runs": len(runs), } def get_landscape_tsne(db: Database) -> list[dict]: """Compute t-SNE from embeddings, return [{name, title, x, y, category, score}]. Uses cached coordinates if available, otherwise computes fresh. """ import numpy as np embeddings = db.all_embeddings() if len(embeddings) < 5: return [] pairs = db.drafts_with_ratings(limit=1000) rating_map = {d.name: r for d, r in pairs} draft_map = {d.name: d for d, _ in pairs} # Filter to drafts that have both embeddings and ratings names = [n for n in embeddings if n in rating_map] if len(names) < 5: return [] matrix = np.array([embeddings[n] for n in names]) try: from sklearn.manifold import TSNE tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1), random_state=42, max_iter=500) coords = tsne.fit_transform(matrix) except Exception: return [] result = [] for i, name in enumerate(names): r = rating_map[name] d = draft_map.get(name) result.append({ "name": name, "title": d.title if d else name, "x": round(float(coords[i, 0]), 3), "y": round(float(coords[i, 1]), 3), "category": r.categories[0] if r.categories else "Other", "score": round(r.composite_score, 2), }) return result