Split webui into Flask blueprints and data domain modules

- Split app.py (66 routes) into 3 blueprints: pages (public), api (JSON), admin (@admin_required)
- Split data.py (4,360 LOC) into 7 domain modules: drafts, authors, ratings, gaps, analysis, search, proposals
- Add data/__init__.py re-exporting all public functions for backward compatibility
- Add custom 404/500 error pages matching dark theme
- Add request timing logging via before_request/after_request hooks
- Refactor app.py into create_app() factory pattern
- All 106 tests pass, all 66 routes preserved

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 03:37:15 +01:00
parent c066b04d74
commit 3fb17100d7
17 changed files with 4144 additions and 5170 deletions

View File

@@ -5,935 +5,81 @@ Run with: python src/webui/app.py
from __future__ import annotations from __future__ import annotations
import logging
import os
import sys import sys
import time
from pathlib import Path from pathlib import Path
# Ensure project src is on path # Ensure project src is on path
_project_root = Path(__file__).resolve().parent.parent.parent _project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(_project_root / "src")) sys.path.insert(0, str(_project_root / "src"))
import csv from flask import Flask, g, render_template, request
import io
import json
import time
import functools
from collections import defaultdict
from flask import Flask, render_template, request, jsonify, abort, g, Response, redirect, url_for from webui.auth import init_auth
from webui.analytics import init_analytics
from webui.blueprints import register_blueprints
from webui.data import get_db
from webui.auth import admin_required, init_auth
from webui.analytics import init_analytics, get_analytics_data
from webui.obsidian_export import build_obsidian_vault
from webui.data import (
get_db,
get_overview_stats,
get_category_counts,
get_drafts_page,
get_draft_detail,
get_rating_distributions,
get_timeline_data,
get_ideas_by_type,
get_all_gaps,
get_gap_detail,
get_generated_drafts,
read_generated_draft,
get_top_authors,
get_org_data,
get_category_radar_data,
get_score_histogram,
get_coauthor_network,
get_cross_org_data,
get_landscape_tsne,
get_similarity_graph,
get_timeline_animation_data,
get_idea_clusters,
get_monitor_status,
get_author_network_full,
get_citation_graph,
get_comparison_data,
get_ask_search,
get_ask_synthesize,
get_category_summary,
global_search,
get_architecture,
get_source_comparison,
get_false_positive_profile,
get_citation_influence,
get_bcp_analysis,
get_trends_data,
get_complexity_data,
get_idea_analysis,
get_all_proposals,
get_proposal_detail,
get_proposals_for_gap,
)
app = Flask( # --- App factory ---
def create_app(dev: bool = False) -> Flask:
"""Create and configure the Flask application."""
application = Flask(
__name__, __name__,
template_folder=str(Path(__file__).parent / "templates"), template_folder=str(Path(__file__).parent / "templates"),
static_folder=str(Path(__file__).parent / "static"), static_folder=str(Path(__file__).parent / "static"),
static_url_path="/static", static_url_path="/static",
) )
import os application.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex())
app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex())
# Auth is initialized at startup — see __main__ block and create_app()
# Default: production mode (admin disabled)
init_auth(app, dev=False)
# Analytics (GDPR-compliant, no cookies) # Auth
_analytics_db = str(_project_root / "data" / "analytics.db") init_auth(application, dev=dev)
init_analytics(app, db_path=_analytics_db)
# Analytics (GDPR-compliant, no cookies)
analytics_db = str(_project_root / "data" / "analytics.db")
init_analytics(application, db_path=analytics_db)
# --- Rate limiting for Claude-calling endpoints --- # Register blueprints
register_blueprints(application)
_rate_limit_store: dict[str, list[float]] = defaultdict(list) # Database lifecycle (per-request)
_RATE_LIMIT_MAX = 10 # max requests @application.teardown_appcontext
_RATE_LIMIT_WINDOW = 60 # per 60 seconds def close_db(exception=None):
def rate_limit(f):
"""Simple in-memory rate limiter: max 10 requests per minute per IP."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
ip = request.remote_addr or "unknown"
now = time.time()
# Prune timestamps outside the sliding window
timestamps = _rate_limit_store[ip]
_rate_limit_store[ip] = [t for t in timestamps if now - t < _RATE_LIMIT_WINDOW]
if len(_rate_limit_store[ip]) >= _RATE_LIMIT_MAX:
return jsonify({"error": "Rate limit exceeded. Try again later."}), 429
_rate_limit_store[ip].append(now)
return f(*args, **kwargs)
return wrapper
# --- Database lifecycle (per-request to avoid SQLite threading issues) ---
def db():
if "db" not in g:
g.db = get_db()
return g.db
@app.teardown_appcontext
def close_db(exception=None):
database = g.pop("db", None) database = g.pop("db", None)
if database is not None: if database is not None:
database.close() database.close()
# Error handlers
# --- Routes --- @application.errorhandler(404)
def not_found(e):
return render_template("errors/404.html"), 404
@app.route("/")
def overview(): @application.errorhandler(500)
stats = get_overview_stats(db()) def server_error(e):
categories = get_category_counts(db()) return render_template("errors/500.html"), 500
timeline = get_timeline_data(db())
scores = get_score_histogram(db()) # Request timing
radar = get_category_radar_data(db()) @application.before_request
return render_template( def start_timer():
"overview.html", g.start_time = time.time()
stats=stats,
categories=categories, @application.after_request
timeline=timeline, def log_request(response):
scores=scores, if hasattr(g, "start_time"):
radar=radar, duration = (time.time() - g.start_time) * 1000
) logger = logging.getLogger("webui")
logger.info("%s %s %s %.1fms", request.method, request.path,
response.status_code, duration)
@app.route("/drafts") return response
def drafts():
page = request.args.get("page", 1, type=int) return application
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "") # Module-level app instance for backward compatibility (import from webui.app import app)
min_score = request.args.get("min_score", 0.0, type=float) app = create_app(dev=False)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
result = get_drafts_page(
db(),
page=page,
search=search,
category=category,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
source=source,
)
categories = get_category_counts(db())
cat_summary = get_category_summary(db(), category) if category else None
return render_template(
"drafts.html",
result=result,
categories=categories,
cat_summary=cat_summary,
search=search,
current_cat=category,
current_source=source,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
)
@app.route("/drafts/<string:name>")
def draft_detail(name: str):
database = db()
detail = get_draft_detail(database, name)
if not detail:
abort(404)
# Build set of draft ref IDs that exist in our DB for internal linking
ref_draft_ids = [r["id"] for r in detail.get("refs", []) if r["type"] == "draft"]
known_drafts = set()
if ref_draft_ids:
placeholders = ",".join("?" * len(ref_draft_ids))
rows = database.conn.execute(
f"SELECT name FROM drafts WHERE name IN ({placeholders})", ref_draft_ids
).fetchall()
known_drafts = {r["name"] for r in rows}
return render_template("draft_detail.html", draft=detail, known_drafts=known_drafts)
@app.route("/ideas")
def ideas():
data = get_ideas_by_type(db())
return render_template("ideas.html", data=data)
@app.route("/gaps")
@admin_required
def gaps():
gap_list = get_all_gaps(db())
generated = get_generated_drafts()
return render_template("gaps.html", gaps=gap_list, generated_drafts=generated)
@app.route("/gaps/demo")
@admin_required
def gaps_demo():
"""Show a pre-generated example draft so users can see output without API calls."""
generated = get_generated_drafts()
# Default to the first generated draft, or allow selection via query param
selected = request.args.get("file", "")
draft_text = None
draft_info = None
if selected:
draft_text = read_generated_draft(selected)
for gd in generated:
if gd["filename"] == selected:
draft_info = gd
break
elif generated:
draft_info = generated[0]
draft_text = read_generated_draft(draft_info["filename"])
return render_template(
"gap_demo.html",
generated_drafts=generated,
draft_text=draft_text,
draft_info=draft_info,
selected=selected,
)
@app.route("/gaps/<int:gap_id>")
@admin_required
def gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
abort(404)
generated = get_generated_drafts()
gap_proposals = get_proposals_for_gap(db(), gap_id)
return render_template("gap_detail.html", gap=gap, generated_drafts=generated, proposals=gap_proposals)
@app.route("/gaps/<int:gap_id>/generate", methods=["POST"])
@admin_required
def gap_generate(gap_id: int):
"""Trigger draft generation for a gap. Returns JSON with the generated text."""
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
from ietf_analyzer.draftgen import DraftGenerator
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
generator = DraftGenerator(cfg, database, analyzer)
# Generate into a file named after the gap
slug = gap["topic"].lower().replace(" ", "-")[:40]
output_path = str(Path(_project_root) / "data" / "reports" / "generated-drafts" / f"draft-gap-{gap_id}-{slug}.txt")
path = generator.generate(gap["topic"], output_path=output_path)
draft_text = Path(path).read_text(errors="replace")
return jsonify({
"success": True,
"text": draft_text,
"filename": Path(path).name,
"path": path,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/ratings")
def ratings():
distributions = get_rating_distributions(db())
radar = get_category_radar_data(db())
return render_template(
"ratings.html",
dist=distributions,
radar=radar,
)
@app.route("/landscape")
@admin_required
def landscape():
distributions = get_rating_distributions(db())
tsne_data = get_landscape_tsne(db())
return render_template(
"landscape.html",
dist=distributions,
tsne_data=tsne_data,
)
@app.route("/timeline")
def timeline_animation():
data = get_timeline_animation_data(db())
return render_template("timeline.html", animation=data)
@app.route("/idea-clusters")
def idea_clusters():
data = get_idea_clusters(db())
return render_template("idea_clusters.html", clusters=data)
@app.route("/architecture")
def architecture():
data = get_architecture(db())
return render_template("architecture.html", arch=data)
@app.route("/api/architecture")
def api_architecture():
return jsonify(get_architecture(db()))
@app.route("/similarity")
@admin_required
def similarity():
network = get_similarity_graph(db())
return render_template("similarity.html", network=network)
@app.route("/authors")
def authors():
top = get_top_authors(db(), limit=50)
orgs = get_org_data(db(), limit=20)
network = get_author_network_full(db())
cross_org = get_cross_org_data(db(), limit=20)
return render_template(
"authors.html",
authors=top,
orgs=orgs,
orgs_data=orgs,
network=network,
cross_org=cross_org,
)
@app.route("/citations")
def citations():
from webui.auth import is_admin as check_admin
graph = get_citation_graph(db())
influence = get_citation_influence(db()) if check_admin() else None
bcp = get_bcp_analysis(db()) if check_admin() else None
return render_template("citations.html", graph=graph, influence=influence, bcp=bcp)
@app.route("/monitor")
@admin_required
def monitor_page():
status = get_monitor_status(db())
return render_template("monitor.html", status=status)
@app.route("/admin/analytics")
@admin_required
def analytics_dashboard():
data = get_analytics_data(_analytics_db)
return render_template("analytics.html", data=data)
@app.route("/about")
def about():
from ietf_analyzer.config import Config
cfg = Config.load()
stats = get_overview_stats(db())
return render_template("about.html", stats=stats, search_keywords=cfg.search_keywords,
fetch_since=cfg.fetch_since)
@app.route("/impressum")
def impressum():
return render_template("impressum.html")
@app.route("/datenschutz")
def datenschutz():
return render_template("datenschutz.html")
@app.route("/search")
def search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
total = sum(len(v) for v in results.values())
return render_template("search_results.html", query=q, results=results, total=total)
@app.route("/ask")
def ask_page():
question = request.args.get("q", "")
result = None
if question:
top_k = request.args.get("top", 5, type=int)
# Search only (free) — returns sources + cached answer if available
result = get_ask_search(db(), question, top_k=top_k)
return render_template("ask.html", question=question, result=result)
@app.route("/api/ask/synthesize", methods=["POST"])
@admin_required
@rate_limit
def api_ask_synthesize():
"""Synthesize an answer via Claude (costs tokens, cached permanently). Returns JSON."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_synthesize(db(), question, top_k=top_k, cheap=True)
return jsonify(result)
@app.route("/api/ask", methods=["POST"])
def api_ask():
"""Search only (free). Returns JSON with sources + cached answer if available."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_search(db(), question, top_k=top_k)
return jsonify(result)
@app.route("/compare")
@admin_required
def compare_page():
draft_names = request.args.get("drafts", "")
names = [n.strip() for n in draft_names.split(",") if n.strip()] if draft_names else []
data = None
if len(names) >= 2:
data = get_comparison_data(db(), names)
return render_template("comparison.html", names=names, data=data)
@app.route("/api/compare", methods=["POST"])
@admin_required
@rate_limit
def api_compare():
"""Run Claude comparison for drafts. Returns JSON with comparison text."""
req_data = request.get_json(force=True, silent=True)
if not req_data or "drafts" not in req_data:
return jsonify({"error": "Missing 'drafts' in request body"}), 400
names = req_data["drafts"]
if len(names) < 2:
return jsonify({"error": "Need at least 2 drafts to compare"}), 400
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
result = analyzer.compare_drafts(names)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
# --- API endpoints for AJAX (used by client-side charts) ---
def _to_csv_response(rows: list[dict], filename: str = "export.csv") -> Response:
"""Convert a list of dicts to a CSV download response."""
if not rows:
return Response("", mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
si = io.StringIO()
writer = csv.DictWriter(si, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
# Flatten any list/dict values to JSON strings
flat = {}
for k, v in row.items():
if isinstance(v, (list, dict)):
flat[k] = json.dumps(v)
else:
flat[k] = v
writer.writerow(flat)
return Response(si.getvalue(), mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
def _results_to_csv(results: dict) -> Response:
"""Convert global search results (multi-category) to a single CSV."""
rows = []
for category, items in results.items():
for item in items:
row = {"_category": category}
row.update(item)
rows.append(row)
return _to_csv_response(rows, "search_results.csv")
@app.route("/api/drafts")
def api_drafts():
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "")
min_score = request.args.get("min_score", 0.0, type=float)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
data = get_drafts_page(db(), page=page, search=search, category=category,
min_score=min_score, sort=sort, sort_dir=sort_dir,
source=source)
if request.args.get("format") == "csv":
return _to_csv_response(data.get("drafts", []), "drafts.csv")
return jsonify(data)
@app.route("/api/stats")
def api_stats():
return jsonify(get_overview_stats(db()))
@app.route("/api/authors/network")
def api_author_network():
return jsonify(get_author_network_full(db()))
@app.route("/api/citations")
def api_citations():
min_refs = request.args.get("min_refs", 2, type=int)
return jsonify(get_citation_graph(db(), min_refs=min_refs))
@app.route("/api/search")
def api_search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
if request.args.get("format") == "csv":
return _results_to_csv(results)
return jsonify(results)
@app.route("/api/ideas")
def api_ideas():
data = get_ideas_by_type(db())
if request.args.get("format") == "csv":
return _to_csv_response(data.get("ideas", []), "ideas.csv")
return jsonify(data)
@app.route("/api/gaps")
@admin_required
def api_gaps():
data = get_all_gaps(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "gaps.csv")
return jsonify(data)
@app.route("/api/gaps/<int:gap_id>")
@admin_required
def api_gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
return jsonify(gap)
@app.route("/api/ratings")
def api_ratings():
data = get_rating_distributions(db())
if request.args.get("format") == "csv":
# Transpose columnar data to rows
rows = []
for i in range(len(data.get("names", []))):
rows.append({
"name": data["names"][i],
"score": data["scores"][i],
"novelty": data["novelty"][i],
"maturity": data["maturity"][i],
"overlap": data["overlap"][i],
"momentum": data["momentum"][i],
"relevance": data["relevance"][i],
"category": data["categories"][i],
})
return _to_csv_response(rows, "ratings.csv")
return jsonify(data)
@app.route("/api/timeline")
def api_timeline():
data = get_timeline_data(db())
return jsonify(data)
@app.route("/api/landscape")
@admin_required
def api_landscape():
data = get_landscape_tsne(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "landscape.csv")
return jsonify(data)
@app.route("/api/similarity")
@admin_required
def api_similarity():
data = get_similarity_graph(db())
return jsonify(data)
@app.route("/api/idea-clusters")
def api_idea_clusters():
data = get_idea_clusters(db())
return jsonify(data)
@app.route("/api/monitor")
@admin_required
def api_monitor():
data = get_monitor_status(db())
return jsonify(data)
@app.route("/api/drafts/<string:name>")
def api_draft_detail(name: str):
detail = get_draft_detail(db(), name)
if not detail:
return jsonify({"error": "Draft not found"}), 404
return jsonify(detail)
@app.route("/api/categories")
def api_categories():
data = get_category_counts(db())
if request.args.get("format") == "csv":
rows = [{"category": k, "count": v} for k, v in data.items()]
return _to_csv_response(rows, "categories.csv")
return jsonify(data)
@app.route("/api/drafts/<string:name>/annotate", methods=["POST"])
@admin_required
def api_annotate(name: str):
"""Add or update annotation for a draft."""
import json as _json
database = db()
draft = database.get_draft(name)
if not draft:
return jsonify({"error": "Draft not found"}), 404
data = request.get_json(force=True, silent=True)
if not data:
return jsonify({"error": "Invalid JSON body"}), 400
note = data.get("note")
tags = data.get("tags")
add_tag = data.get("add_tag")
remove_tag = data.get("remove_tag")
# Handle add/remove tag operations
if add_tag or remove_tag:
existing = database.get_annotation(name)
current_tags = existing["tags"] if existing else []
if add_tag and add_tag not in current_tags:
current_tags.append(add_tag)
if remove_tag and remove_tag in current_tags:
current_tags.remove(remove_tag)
tags = current_tags
database.upsert_annotation(name, note=note, tags=tags)
annotation = database.get_annotation(name)
return jsonify({"success": True, "annotation": annotation})
@app.route("/export/obsidian")
@admin_required
def export_obsidian():
"""Download the entire research corpus as an Obsidian vault (ZIP)."""
data = build_obsidian_vault(db())
return Response(
data,
mimetype="application/zip",
headers={"Content-Disposition": "attachment; filename=IETF-AI-Agent-Drafts.zip"},
)
def create_app(dev: bool = False) -> Flask:
"""Re-initialize auth mode. Call before run() if needed."""
init_auth(app, dev=dev)
return app
# ── Sources & False Positives ────────────────────────────────────────────
@app.route("/sources")
@admin_required
def sources_page():
data = get_source_comparison(db())
return render_template("sources.html", data=data)
@app.route("/false-positives")
@admin_required
def false_positives_page():
data = get_false_positive_profile(db())
return render_template("false_positives.html", data=data)
@app.route("/api/sources")
@admin_required
def api_sources():
data = get_source_comparison(db())
return jsonify(data)
@app.route("/api/false-positives")
@admin_required
def api_false_positives():
data = get_false_positive_profile(db())
return jsonify(data)
# ── Citation Influence & BCP ─────────────────────────────────────────────
@app.route("/api/citations/influence")
@admin_required
def api_citation_influence():
return jsonify(get_citation_influence(db()))
@app.route("/api/citations/bcp")
@admin_required
def api_bcp_analysis():
return jsonify(get_bcp_analysis(db()))
# ── Idea Analysis ────────────────────────────────────────────────────────
@app.route("/idea-analysis")
@admin_required
def idea_analysis():
data = get_idea_analysis(db())
return render_template("idea_analysis.html", data=data)
@app.route("/api/idea-analysis")
@admin_required
def api_idea_analysis():
data = get_idea_analysis(db())
return jsonify(data)
# ── Trends & Complexity ──────────────────────────────────────────────────
@app.route("/trends")
@admin_required
def trends():
data = get_trends_data(db())
return render_template("trends_analysis.html", data=data)
@app.route("/complexity")
@admin_required
def complexity():
data = get_complexity_data(db())
return render_template("complexity.html", data=data)
@app.route("/api/trends")
@admin_required
def api_trends():
return jsonify(get_trends_data(db()))
@app.route("/api/complexity")
@admin_required
def api_complexity():
return jsonify(get_complexity_data(db()))
# ── Proposals (dev-only) ────────────────────────────────────────────────
@app.route("/proposals")
@admin_required
def proposals():
proposal_list = get_all_proposals(db())
gap_list = get_all_gaps(db())
return render_template("proposals.html", proposals=proposal_list, gaps=gap_list)
@app.route("/proposals/new", methods=["GET", "POST"])
@admin_required
def proposal_new():
if request.method == "POST":
data = request.form
slug = data.get("slug", "").strip()
if not slug:
import re
slug = re.sub(r'[^a-z0-9]+', '-', data["title"].lower()).strip('-')
gap_ids = [int(g) for g in request.form.getlist("gap_ids") if g]
proposal = {
"title": data["title"],
"slug": slug,
"status": data.get("status", "idea"),
"description": data.get("description", ""),
"content_md": data.get("content_md", ""),
"source_paper": data.get("source_paper", ""),
"source_url": data.get("source_url", ""),
"intended_wg": data.get("intended_wg", ""),
"draft_name": data.get("draft_name", ""),
"gap_ids": gap_ids,
}
pid = db().upsert_proposal(proposal)
return redirect(url_for("proposal_detail", proposal_id=pid))
gap_list = get_all_gaps(db())
return render_template("proposal_edit.html", proposal=None, gaps=gap_list)
@app.route("/proposals/<int:proposal_id>")
@admin_required
def proposal_detail(proposal_id):
proposal = get_proposal_detail(db(), proposal_id)
if not proposal:
abort(404)
return render_template("proposal_detail.html", proposal=proposal)
@app.route("/proposals/<int:proposal_id>/edit", methods=["GET", "POST"])
@admin_required
def proposal_edit(proposal_id):
if request.method == "POST":
data = request.form
slug = data.get("slug", "").strip()
if not slug:
import re
slug = re.sub(r'[^a-z0-9]+', '-', data["title"].lower()).strip('-')
gap_ids = [int(g) for g in request.form.getlist("gap_ids") if g]
proposal = {
"id": proposal_id,
"title": data["title"],
"slug": slug,
"status": data.get("status", "idea"),
"description": data.get("description", ""),
"content_md": data.get("content_md", ""),
"source_paper": data.get("source_paper", ""),
"source_url": data.get("source_url", ""),
"intended_wg": data.get("intended_wg", ""),
"draft_name": data.get("draft_name", ""),
"gap_ids": gap_ids,
}
db().upsert_proposal(proposal)
return redirect(url_for("proposal_detail", proposal_id=proposal_id))
proposal = get_proposal_detail(db(), proposal_id)
if not proposal:
abort(404)
gap_list = get_all_gaps(db())
return render_template("proposal_edit.html", proposal=proposal, gaps=gap_list)
@app.route("/proposals/<int:proposal_id>/delete", methods=["POST"])
@admin_required
def proposal_delete(proposal_id):
db().delete_proposal(proposal_id)
return redirect(url_for("proposals"))
@app.route("/api/proposals")
@admin_required
def api_proposals():
data = get_all_proposals(db())
return jsonify(data)
@app.route("/api/proposals/<int:proposal_id>")
@admin_required
def api_proposal_detail(proposal_id):
p = get_proposal_detail(db(), proposal_id)
if not p:
return jsonify({"error": "Proposal not found"}), 404
return jsonify(p)
@app.route("/proposals/intake", methods=["GET", "POST"])
@admin_required
def proposal_intake():
"""Paste text/URLs → Claude generates proposals automatically."""
if request.method == "POST":
raw_input = request.form.get("input_text", "").strip()
if not raw_input:
return jsonify({"error": "No input provided"}), 400
try:
from ietf_analyzer.config import Config
from ietf_analyzer.proposal_intake import ProposalIntake
cfg = Config.load()
intake = ProposalIntake(cfg, db())
proposals, usage = intake.process(raw_input, cheap=True)
return jsonify({
"success": True,
"count": len(proposals),
"proposals": [
{"id": p.get("id"), "title": p.get("title"), "slug": p.get("slug"),
"gap_ids": p.get("gap_ids", []), "description": p.get("description", ""),
"content_md": p.get("content_md", ""),
"intended_wg": p.get("intended_wg", ""), "draft_name": p.get("draft_name", ""),
"source_paper": p.get("source_paper", ""), "source_url": p.get("source_url", "")}
for p in proposals
],
"usage": usage,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
return render_template("proposal_intake.html")
if __name__ == "__main__": if __name__ == "__main__":
@@ -946,7 +92,7 @@ if __name__ == "__main__":
parser.add_argument("--port", type=int, default=5000) parser.add_argument("--port", type=int, default=5000)
args = parser.parse_args() args = parser.parse_args()
init_auth(app, dev=args.dev) app = create_app(dev=args.dev)
mode = "\033[33mDEV\033[0m (admin enabled)" if args.dev else "\033[32mPRODUCTION\033[0m (admin disabled)" mode = "\033[33mDEV\033[0m (admin enabled)" if args.dev else "\033[32mPRODUCTION\033[0m (admin disabled)"
print(f"Starting IETF Draft Analyzer — {mode}") print(f"Starting IETF Draft Analyzer — {mode}")

View File

@@ -0,0 +1,15 @@
"""Flask blueprints for the IETF Draft Analyzer web UI."""
from __future__ import annotations
from flask import Flask
from webui.blueprints.pages import pages_bp
from webui.blueprints.api import api_bp
from webui.blueprints.admin import admin_bp
def register_blueprints(app: Flask) -> None:
"""Register all blueprints with the Flask app."""
app.register_blueprint(pages_bp)
app.register_blueprint(api_bp)
app.register_blueprint(admin_bp)

View File

@@ -0,0 +1,562 @@
"""Admin-only routes (require @admin_required)."""
from __future__ import annotations
import functools
import time
from collections import defaultdict
from pathlib import Path
from flask import Blueprint, render_template, request, jsonify, abort, g, Response, redirect, url_for
from webui.auth import admin_required
from webui.analytics import get_analytics_data
from webui.obsidian_export import build_obsidian_vault
from webui.data import (
get_db,
get_overview_stats,
get_rating_distributions,
get_all_gaps,
get_gap_detail,
get_generated_drafts,
read_generated_draft,
get_monitor_status,
get_landscape_tsne,
get_similarity_graph,
get_comparison_data,
get_ask_search,
get_ask_synthesize,
get_source_comparison,
get_false_positive_profile,
get_citation_influence,
get_bcp_analysis,
get_idea_analysis,
get_trends_data,
get_complexity_data,
get_all_proposals,
get_proposal_detail,
get_proposals_for_gap,
)
admin_bp = Blueprint("admin", __name__)
_project_root = Path(__file__).resolve().parent.parent.parent.parent
# --- Rate limiting for Claude-calling endpoints ---
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
_RATE_LIMIT_MAX = 10 # max requests
_RATE_LIMIT_WINDOW = 60 # per 60 seconds
def rate_limit(f):
"""Simple in-memory rate limiter: max 10 requests per minute per IP."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
ip = request.remote_addr or "unknown"
now = time.time()
timestamps = _rate_limit_store[ip]
_rate_limit_store[ip] = [t for t in timestamps if now - t < _RATE_LIMIT_WINDOW]
if len(_rate_limit_store[ip]) >= _RATE_LIMIT_MAX:
return jsonify({"error": "Rate limit exceeded. Try again later."}), 429
_rate_limit_store[ip].append(now)
return f(*args, **kwargs)
return wrapper
def db():
if "db" not in g:
g.db = get_db()
return g.db
# ── Gap pages ────────────────────────────────────────────────────────────
@admin_bp.route("/gaps")
@admin_required
def gaps():
gap_list = get_all_gaps(db())
generated = get_generated_drafts()
return render_template("gaps.html", gaps=gap_list, generated_drafts=generated)
@admin_bp.route("/gaps/demo")
@admin_required
def gaps_demo():
"""Show a pre-generated example draft so users can see output without API calls."""
generated = get_generated_drafts()
selected = request.args.get("file", "")
draft_text = None
draft_info = None
if selected:
draft_text = read_generated_draft(selected)
for gd in generated:
if gd["filename"] == selected:
draft_info = gd
break
elif generated:
draft_info = generated[0]
draft_text = read_generated_draft(draft_info["filename"])
return render_template(
"gap_demo.html",
generated_drafts=generated,
draft_text=draft_text,
draft_info=draft_info,
selected=selected,
)
@admin_bp.route("/gaps/<int:gap_id>")
@admin_required
def gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
abort(404)
generated = get_generated_drafts()
gap_proposals = get_proposals_for_gap(db(), gap_id)
return render_template("gap_detail.html", gap=gap, generated_drafts=generated, proposals=gap_proposals)
@admin_bp.route("/gaps/<int:gap_id>/generate", methods=["POST"])
@admin_required
def gap_generate(gap_id: int):
"""Trigger draft generation for a gap. Returns JSON with the generated text."""
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
from ietf_analyzer.draftgen import DraftGenerator
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
generator = DraftGenerator(cfg, database, analyzer)
slug = gap["topic"].lower().replace(" ", "-")[:40]
output_path = str(Path(_project_root) / "data" / "reports" / "generated-drafts" / f"draft-gap-{gap_id}-{slug}.txt")
path = generator.generate(gap["topic"], output_path=output_path)
draft_text = Path(path).read_text(errors="replace")
return jsonify({
"success": True,
"text": draft_text,
"filename": Path(path).name,
"path": path,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@admin_bp.route("/api/gaps")
@admin_required
def api_gaps():
from webui.blueprints.api import _to_csv_response
data = get_all_gaps(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "gaps.csv")
return jsonify(data)
@admin_bp.route("/api/gaps/<int:gap_id>")
@admin_required
def api_gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
return jsonify(gap)
# ── Monitor ──────────────────────────────────────────────────────────────
@admin_bp.route("/monitor")
@admin_required
def monitor_page():
status = get_monitor_status(db())
return render_template("monitor.html", status=status)
@admin_bp.route("/api/monitor")
@admin_required
def api_monitor():
data = get_monitor_status(db())
return jsonify(data)
# ── Analytics ────────────────────────────────────────────────────────────
@admin_bp.route("/admin/analytics")
@admin_required
def analytics_dashboard():
analytics_db = str(_project_root / "data" / "analytics.db")
data = get_analytics_data(analytics_db)
return render_template("analytics.html", data=data)
# ── Landscape & Similarity ───────────────────────────────────────────────
@admin_bp.route("/landscape")
@admin_required
def landscape():
distributions = get_rating_distributions(db())
tsne_data = get_landscape_tsne(db())
return render_template(
"landscape.html",
dist=distributions,
tsne_data=tsne_data,
)
@admin_bp.route("/api/landscape")
@admin_required
def api_landscape():
from webui.blueprints.api import _to_csv_response
data = get_landscape_tsne(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "landscape.csv")
return jsonify(data)
@admin_bp.route("/similarity")
@admin_required
def similarity():
network = get_similarity_graph(db())
return render_template("similarity.html", network=network)
@admin_bp.route("/api/similarity")
@admin_required
def api_similarity():
data = get_similarity_graph(db())
return jsonify(data)
# ── Compare ──────────────────────────────────────────────────────────────
@admin_bp.route("/compare")
@admin_required
def compare_page():
draft_names = request.args.get("drafts", "")
names = [n.strip() for n in draft_names.split(",") if n.strip()] if draft_names else []
data = None
if len(names) >= 2:
data = get_comparison_data(db(), names)
return render_template("comparison.html", names=names, data=data)
@admin_bp.route("/api/compare", methods=["POST"])
@admin_required
@rate_limit
def api_compare():
"""Run Claude comparison for drafts. Returns JSON with comparison text."""
req_data = request.get_json(force=True, silent=True)
if not req_data or "drafts" not in req_data:
return jsonify({"error": "Missing 'drafts' in request body"}), 400
names = req_data["drafts"]
if len(names) < 2:
return jsonify({"error": "Need at least 2 drafts to compare"}), 400
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
result = analyzer.compare_drafts(names)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
# ── Annotations ──────────────────────────────────────────────────────────
@admin_bp.route("/api/drafts/<string:name>/annotate", methods=["POST"])
@admin_required
def api_annotate(name: str):
"""Add or update annotation for a draft."""
import json as _json
database = db()
draft = database.get_draft(name)
if not draft:
return jsonify({"error": "Draft not found"}), 404
data = request.get_json(force=True, silent=True)
if not data:
return jsonify({"error": "Invalid JSON body"}), 400
note = data.get("note")
tags = data.get("tags")
add_tag = data.get("add_tag")
remove_tag = data.get("remove_tag")
if add_tag or remove_tag:
existing = database.get_annotation(name)
current_tags = existing["tags"] if existing else []
if add_tag and add_tag not in current_tags:
current_tags.append(add_tag)
if remove_tag and remove_tag in current_tags:
current_tags.remove(remove_tag)
tags = current_tags
database.upsert_annotation(name, note=note, tags=tags)
annotation = database.get_annotation(name)
return jsonify({"success": True, "annotation": annotation})
# ── Ask/Synthesize (Claude-powered) ──────────────────────────────────────
@admin_bp.route("/api/ask/synthesize", methods=["POST"])
@admin_required
@rate_limit
def api_ask_synthesize():
"""Synthesize an answer via Claude (costs tokens, cached permanently). Returns JSON."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_synthesize(db(), question, top_k=top_k, cheap=True)
return jsonify(result)
# ── Sources & False Positives ────────────────────────────────────────────
@admin_bp.route("/sources")
@admin_required
def sources_page():
data = get_source_comparison(db())
return render_template("sources.html", data=data)
@admin_bp.route("/false-positives")
@admin_required
def false_positives_page():
data = get_false_positive_profile(db())
return render_template("false_positives.html", data=data)
@admin_bp.route("/api/sources")
@admin_required
def api_sources():
data = get_source_comparison(db())
return jsonify(data)
@admin_bp.route("/api/false-positives")
@admin_required
def api_false_positives():
data = get_false_positive_profile(db())
return jsonify(data)
# ── Citation Influence & BCP ─────────────────────────────────────────────
@admin_bp.route("/api/citations/influence")
@admin_required
def api_citation_influence():
return jsonify(get_citation_influence(db()))
@admin_bp.route("/api/citations/bcp")
@admin_required
def api_bcp_analysis():
return jsonify(get_bcp_analysis(db()))
# ── Idea Analysis ────────────────────────────────────────────────────────
@admin_bp.route("/idea-analysis")
@admin_required
def idea_analysis():
data = get_idea_analysis(db())
return render_template("idea_analysis.html", data=data)
@admin_bp.route("/api/idea-analysis")
@admin_required
def api_idea_analysis():
data = get_idea_analysis(db())
return jsonify(data)
# ── Trends & Complexity ──────────────────────────────────────────────────
@admin_bp.route("/trends")
@admin_required
def trends():
data = get_trends_data(db())
return render_template("trends_analysis.html", data=data)
@admin_bp.route("/complexity")
@admin_required
def complexity():
data = get_complexity_data(db())
return render_template("complexity.html", data=data)
@admin_bp.route("/api/trends")
@admin_required
def api_trends():
return jsonify(get_trends_data(db()))
@admin_bp.route("/api/complexity")
@admin_required
def api_complexity():
return jsonify(get_complexity_data(db()))
# ── Proposals ────────────────────────────────────────────────────────────
@admin_bp.route("/proposals")
@admin_required
def proposals():
proposal_list = get_all_proposals(db())
gap_list = get_all_gaps(db())
return render_template("proposals.html", proposals=proposal_list, gaps=gap_list)
@admin_bp.route("/proposals/new", methods=["GET", "POST"])
@admin_required
def proposal_new():
if request.method == "POST":
data = request.form
slug = data.get("slug", "").strip()
if not slug:
import re
slug = re.sub(r'[^a-z0-9]+', '-', data["title"].lower()).strip('-')
gap_ids = [int(g_val) for g_val in request.form.getlist("gap_ids") if g_val]
proposal = {
"title": data["title"],
"slug": slug,
"status": data.get("status", "idea"),
"description": data.get("description", ""),
"content_md": data.get("content_md", ""),
"source_paper": data.get("source_paper", ""),
"source_url": data.get("source_url", ""),
"intended_wg": data.get("intended_wg", ""),
"draft_name": data.get("draft_name", ""),
"gap_ids": gap_ids,
}
pid = db().upsert_proposal(proposal)
return redirect(url_for("admin.proposal_detail", proposal_id=pid))
gap_list = get_all_gaps(db())
return render_template("proposal_edit.html", proposal=None, gaps=gap_list)
@admin_bp.route("/proposals/<int:proposal_id>")
@admin_required
def proposal_detail(proposal_id):
proposal = get_proposal_detail(db(), proposal_id)
if not proposal:
abort(404)
return render_template("proposal_detail.html", proposal=proposal)
@admin_bp.route("/proposals/<int:proposal_id>/edit", methods=["GET", "POST"])
@admin_required
def proposal_edit(proposal_id):
if request.method == "POST":
data = request.form
slug = data.get("slug", "").strip()
if not slug:
import re
slug = re.sub(r'[^a-z0-9]+', '-', data["title"].lower()).strip('-')
gap_ids = [int(g_val) for g_val in request.form.getlist("gap_ids") if g_val]
proposal = {
"id": proposal_id,
"title": data["title"],
"slug": slug,
"status": data.get("status", "idea"),
"description": data.get("description", ""),
"content_md": data.get("content_md", ""),
"source_paper": data.get("source_paper", ""),
"source_url": data.get("source_url", ""),
"intended_wg": data.get("intended_wg", ""),
"draft_name": data.get("draft_name", ""),
"gap_ids": gap_ids,
}
db().upsert_proposal(proposal)
return redirect(url_for("admin.proposal_detail", proposal_id=proposal_id))
proposal = get_proposal_detail(db(), proposal_id)
if not proposal:
abort(404)
gap_list = get_all_gaps(db())
return render_template("proposal_edit.html", proposal=proposal, gaps=gap_list)
@admin_bp.route("/proposals/<int:proposal_id>/delete", methods=["POST"])
@admin_required
def proposal_delete(proposal_id):
db().delete_proposal(proposal_id)
return redirect(url_for("admin.proposals"))
@admin_bp.route("/api/proposals")
@admin_required
def api_proposals():
data = get_all_proposals(db())
return jsonify(data)
@admin_bp.route("/api/proposals/<int:proposal_id>")
@admin_required
def api_proposal_detail(proposal_id):
p = get_proposal_detail(db(), proposal_id)
if not p:
return jsonify({"error": "Proposal not found"}), 404
return jsonify(p)
@admin_bp.route("/proposals/intake", methods=["GET", "POST"])
@admin_required
def proposal_intake():
"""Paste text/URLs -> Claude generates proposals automatically."""
if request.method == "POST":
raw_input = request.form.get("input_text", "").strip()
if not raw_input:
return jsonify({"error": "No input provided"}), 400
try:
from ietf_analyzer.config import Config
from ietf_analyzer.proposal_intake import ProposalIntake
cfg = Config.load()
intake = ProposalIntake(cfg, db())
proposals_result, usage = intake.process(raw_input, cheap=True)
return jsonify({
"success": True,
"count": len(proposals_result),
"proposals": [
{"id": p.get("id"), "title": p.get("title"), "slug": p.get("slug"),
"gap_ids": p.get("gap_ids", []), "description": p.get("description", ""),
"content_md": p.get("content_md", ""),
"intended_wg": p.get("intended_wg", ""), "draft_name": p.get("draft_name", ""),
"source_paper": p.get("source_paper", ""), "source_url": p.get("source_url", "")}
for p in proposals_result
],
"usage": usage,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
return render_template("proposal_intake.html")
# ── Obsidian Export ──────────────────────────────────────────────────────
@admin_bp.route("/export/obsidian")
@admin_required
def export_obsidian():
"""Download the entire research corpus as an Obsidian vault (ZIP)."""
data = build_obsidian_vault(db())
return Response(
data,
mimetype="application/zip",
headers={"Content-Disposition": "attachment; filename=IETF-AI-Agent-Drafts.zip"},
)

180
src/webui/blueprints/api.py Normal file
View File

@@ -0,0 +1,180 @@
"""Public API endpoints (JSON responses)."""
from __future__ import annotations
import csv
import io
import json
from flask import Blueprint, request, jsonify, g, Response
from webui.data import (
get_db,
get_overview_stats,
get_drafts_page,
get_draft_detail,
get_rating_distributions,
get_timeline_data,
get_ideas_by_type,
get_category_counts,
get_author_network_full,
get_citation_graph,
get_idea_clusters,
global_search,
get_architecture,
get_ask_search,
)
api_bp = Blueprint("api", __name__)
def db():
if "db" not in g:
g.db = get_db()
return g.db
def _to_csv_response(rows: list[dict], filename: str = "export.csv") -> Response:
"""Convert a list of dicts to a CSV download response."""
if not rows:
return Response("", mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
si = io.StringIO()
writer = csv.DictWriter(si, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
flat = {}
for k, v in row.items():
if isinstance(v, (list, dict)):
flat[k] = json.dumps(v)
else:
flat[k] = v
writer.writerow(flat)
return Response(si.getvalue(), mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
def _results_to_csv(results: dict) -> Response:
"""Convert global search results (multi-category) to a single CSV."""
rows = []
for category, items in results.items():
for item in items:
row = {"_category": category}
row.update(item)
rows.append(row)
return _to_csv_response(rows, "search_results.csv")
@api_bp.route("/api/drafts")
def api_drafts():
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "")
min_score = request.args.get("min_score", 0.0, type=float)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
data = get_drafts_page(db(), page=page, search=search, category=category,
min_score=min_score, sort=sort, sort_dir=sort_dir,
source=source)
if request.args.get("format") == "csv":
return _to_csv_response(data.get("drafts", []), "drafts.csv")
return jsonify(data)
@api_bp.route("/api/stats")
def api_stats():
return jsonify(get_overview_stats(db()))
@api_bp.route("/api/authors/network")
def api_author_network():
return jsonify(get_author_network_full(db()))
@api_bp.route("/api/citations")
def api_citations():
min_refs = request.args.get("min_refs", 2, type=int)
return jsonify(get_citation_graph(db(), min_refs=min_refs))
@api_bp.route("/api/search")
def api_search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
if request.args.get("format") == "csv":
return _results_to_csv(results)
return jsonify(results)
@api_bp.route("/api/ideas")
def api_ideas():
data = get_ideas_by_type(db())
if request.args.get("format") == "csv":
return _to_csv_response(data.get("ideas", []), "ideas.csv")
return jsonify(data)
@api_bp.route("/api/ratings")
def api_ratings():
data = get_rating_distributions(db())
if request.args.get("format") == "csv":
rows = []
for i in range(len(data.get("names", []))):
rows.append({
"name": data["names"][i],
"score": data["scores"][i],
"novelty": data["novelty"][i],
"maturity": data["maturity"][i],
"overlap": data["overlap"][i],
"momentum": data["momentum"][i],
"relevance": data["relevance"][i],
"category": data["categories"][i],
})
return _to_csv_response(rows, "ratings.csv")
return jsonify(data)
@api_bp.route("/api/timeline")
def api_timeline():
data = get_timeline_data(db())
return jsonify(data)
@api_bp.route("/api/idea-clusters")
def api_idea_clusters():
data = get_idea_clusters(db())
return jsonify(data)
@api_bp.route("/api/categories")
def api_categories():
data = get_category_counts(db())
if request.args.get("format") == "csv":
rows = [{"category": k, "count": v} for k, v in data.items()]
return _to_csv_response(rows, "categories.csv")
return jsonify(data)
@api_bp.route("/api/drafts/<string:name>")
def api_draft_detail(name: str):
detail = get_draft_detail(db(), name)
if not detail:
return jsonify({"error": "Draft not found"}), 404
return jsonify(detail)
@api_bp.route("/api/architecture")
def api_architecture():
return jsonify(get_architecture(db()))
@api_bp.route("/api/ask", methods=["POST"])
def api_ask():
"""Search only (free). Returns JSON with sources + cached answer if available."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_search(db(), question, top_k=top_k)
return jsonify(result)

View File

@@ -0,0 +1,206 @@
"""Public page routes (no admin required)."""
from __future__ import annotations
from flask import Blueprint, render_template, request, abort, g
from webui.data import (
get_db,
get_overview_stats,
get_category_counts,
get_drafts_page,
get_draft_detail,
get_rating_distributions,
get_timeline_data,
get_timeline_animation_data,
get_ideas_by_type,
get_top_authors,
get_org_data,
get_category_radar_data,
get_score_histogram,
get_author_network_full,
get_cross_org_data,
get_citation_graph,
get_idea_clusters,
get_category_summary,
global_search,
get_architecture,
get_ask_search,
get_citation_influence,
get_bcp_analysis,
)
pages_bp = Blueprint("pages", __name__)
def db():
if "db" not in g:
g.db = get_db()
return g.db
@pages_bp.route("/")
def overview():
stats = get_overview_stats(db())
categories = get_category_counts(db())
timeline = get_timeline_data(db())
scores = get_score_histogram(db())
radar = get_category_radar_data(db())
return render_template(
"overview.html",
stats=stats,
categories=categories,
timeline=timeline,
scores=scores,
radar=radar,
)
@pages_bp.route("/drafts")
def drafts():
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "")
min_score = request.args.get("min_score", 0.0, type=float)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
result = get_drafts_page(
db(),
page=page,
search=search,
category=category,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
source=source,
)
categories = get_category_counts(db())
cat_summary = get_category_summary(db(), category) if category else None
return render_template(
"drafts.html",
result=result,
categories=categories,
cat_summary=cat_summary,
search=search,
current_cat=category,
current_source=source,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
)
@pages_bp.route("/drafts/<string:name>")
def draft_detail(name: str):
database = db()
detail = get_draft_detail(database, name)
if not detail:
abort(404)
# Build set of draft ref IDs that exist in our DB for internal linking
ref_draft_ids = [r["id"] for r in detail.get("refs", []) if r["type"] == "draft"]
known_drafts = set()
if ref_draft_ids:
placeholders = ",".join("?" * len(ref_draft_ids))
rows = database.conn.execute(
f"SELECT name FROM drafts WHERE name IN ({placeholders})", ref_draft_ids
).fetchall()
known_drafts = {r["name"] for r in rows}
return render_template("draft_detail.html", draft=detail, known_drafts=known_drafts)
@pages_bp.route("/ideas")
def ideas():
data = get_ideas_by_type(db())
return render_template("ideas.html", data=data)
@pages_bp.route("/ratings")
def ratings():
distributions = get_rating_distributions(db())
radar = get_category_radar_data(db())
return render_template(
"ratings.html",
dist=distributions,
radar=radar,
)
@pages_bp.route("/timeline")
def timeline_animation():
data = get_timeline_animation_data(db())
return render_template("timeline.html", animation=data)
@pages_bp.route("/idea-clusters")
def idea_clusters():
data = get_idea_clusters(db())
return render_template("idea_clusters.html", clusters=data)
@pages_bp.route("/architecture")
def architecture():
data = get_architecture(db())
return render_template("architecture.html", arch=data)
@pages_bp.route("/authors")
def authors():
top = get_top_authors(db(), limit=50)
orgs = get_org_data(db(), limit=20)
network = get_author_network_full(db())
cross_org = get_cross_org_data(db(), limit=20)
return render_template(
"authors.html",
authors=top,
orgs=orgs,
orgs_data=orgs,
network=network,
cross_org=cross_org,
)
@pages_bp.route("/citations")
def citations():
from webui.auth import is_admin as check_admin
graph = get_citation_graph(db())
influence = get_citation_influence(db()) if check_admin() else None
bcp = get_bcp_analysis(db()) if check_admin() else None
return render_template("citations.html", graph=graph, influence=influence, bcp=bcp)
@pages_bp.route("/about")
def about():
from ietf_analyzer.config import Config
cfg = Config.load()
stats = get_overview_stats(db())
return render_template("about.html", stats=stats, search_keywords=cfg.search_keywords,
fetch_since=cfg.fetch_since)
@pages_bp.route("/impressum")
def impressum():
return render_template("impressum.html")
@pages_bp.route("/datenschutz")
def datenschutz():
return render_template("datenschutz.html")
@pages_bp.route("/search")
def search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
total = sum(len(v) for v in results.values())
return render_template("search_results.html", query=q, results=results, total=total)
@pages_bp.route("/ask")
def ask_page():
question = request.args.get("q", "")
result = None
if question:
top_k = request.args.get("top", 5, type=int)
result = get_ask_search(db(), question, top_k=top_k)
return render_template("ask.html", question=question, result=result)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,97 @@
"""Data access layer for the web dashboard.
Thin wrapper around ietf_analyzer.db.Database that returns plain dicts
ready for JSON serialization or Jinja2 template rendering.
All public functions are re-exported here for backward compatibility:
from webui.data import get_overview_stats
"""
from __future__ import annotations
# Shared utilities
from webui.data._shared import get_db, _cached, _extract_month # noqa: F401
# Drafts
from webui.data.drafts import ( # noqa: F401
OverviewStats,
DraftListItem,
DraftsPage,
get_overview_stats,
get_category_counts,
get_category_summary,
get_drafts_page,
get_draft_detail,
get_generated_drafts,
read_generated_draft,
)
# Authors
from webui.data.authors import ( # noqa: F401
AuthorInfo,
AuthorNetworkNode,
AuthorNetworkEdge,
AuthorCluster,
AuthorNetwork,
get_top_authors,
get_org_data,
get_coauthor_network,
get_cross_org_data,
get_author_network_full,
)
# Ratings
from webui.data.ratings import ( # noqa: F401
get_rating_distributions,
get_category_radar_data,
get_score_histogram,
get_false_positive_profile,
)
# Gaps
from webui.data.gaps import ( # noqa: F401
get_all_gaps,
get_gap_detail,
)
# Analysis & Visualization
from webui.data.analysis import ( # noqa: F401
TimelineData,
SimilarityGraphStats,
SimilarityGraph,
CitationGraphStats,
CitationGraph,
MonitorCost,
MonitorPipeline,
MonitorStatus,
get_ideas_by_type,
get_timeline_data,
get_similarity_graph,
get_idea_clusters,
get_timeline_animation_data,
get_monitor_status,
get_citation_graph,
get_landscape_tsne,
get_comparison_data,
get_architecture,
get_idea_analysis,
get_trends_data,
get_complexity_data,
get_source_comparison,
get_citation_influence,
get_bcp_analysis,
)
# Search
from webui.data.search import ( # noqa: F401
SearchResults,
global_search,
get_ask_search,
get_ask_synthesize,
)
# Proposals
from webui.data.proposals import ( # noqa: F401
get_all_proposals,
get_proposal_detail,
get_proposals_for_gap,
)

46
src/webui/data/_shared.py Normal file
View File

@@ -0,0 +1,46 @@
"""Shared utilities for webui data modules."""
from __future__ import annotations
import sys
import time
from pathlib import Path
# Ensure project src is on path
_project_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root / "src"))
from ietf_analyzer.config import Config
from ietf_analyzer.db import Database
from ietf_analyzer.readiness import compute_readiness, compute_readiness_batch
# Simple TTL cache for expensive computations (t-SNE, clustering, similarity)
_cache: dict[str, tuple[float, object]] = {}
_CACHE_TTL = 300 # 5 minutes
def _extract_month(time_str: str | None) -> str:
"""Normalize a date string to YYYY-MM format."""
if not time_str:
return "unknown"
if len(time_str) >= 7 and time_str[4] == '-':
return time_str[:7] # Already YYYY-MM-DD
if len(time_str) >= 6 and time_str[:4].isdigit():
return time_str[:4] + '-' + time_str[4:6] # YYYYMMDD → YYYY-MM
return time_str[:7]
def _cached(key: str, fn, ttl: float = _CACHE_TTL):
"""Return cached result or compute and cache it."""
now = time.monotonic()
if key in _cache:
ts, val = _cache[key]
if now - ts < ttl:
return val
val = fn()
_cache[key] = (now, val)
return val
def get_db() -> Database:
"""Get a Database instance using default config."""
config = Config.load()
return Database(config)

1968
src/webui/data/analysis.py Normal file

File diff suppressed because it is too large Load Diff

276
src/webui/data/authors.py Normal file
View File

@@ -0,0 +1,276 @@
"""Author-related data access functions."""
from __future__ import annotations
import re
from collections import Counter, defaultdict
from typing import TypedDict
from ietf_analyzer.db import Database
from webui.data._shared import _cached
class AuthorInfo(TypedDict):
"""Author entry from :func:`get_top_authors`."""
name: str
affiliation: str
draft_count: int
drafts: list[str]
class AuthorNetworkNode(TypedDict):
"""Node in the author network graph."""
id: str
name: str
org: str
draft_count: int
avg_score: float
drafts: list[str]
class AuthorNetworkEdge(TypedDict):
"""Edge in the author network graph."""
source: str
target: str
weight: int
class AuthorCluster(TypedDict):
"""Cluster in the author network."""
id: int
members: list[str]
org_mix: dict[str, int]
size: int
drafts: list[dict[str, str]]
draft_count: int
class AuthorNetwork(TypedDict):
"""Full author network from :func:`get_author_network_full`."""
nodes: list[AuthorNetworkNode]
edges: list[AuthorNetworkEdge]
clusters: list[AuthorCluster]
def get_top_authors(db: Database, limit: int = 30) -> list[AuthorInfo]:
"""Return top authors by draft count."""
rows = db.top_authors(limit=limit)
return [
{"name": name, "affiliation": aff, "draft_count": cnt, "drafts": drafts}
for name, aff, cnt, drafts in rows
]
def get_org_data(db: Database, limit: int = 20) -> list[dict]:
"""Return organization contribution data."""
rows = db.top_orgs(limit=limit)
return [
{"org": org, "author_count": authors, "draft_count": drafts}
for org, authors, drafts in rows
]
def get_coauthor_network(db: Database, min_shared: int = 1) -> dict:
"""Return co-authorship network data for force-directed graph.
Returns {nodes: [{id, name, org, draft_count}], edges: [{source, target, weight}]}
"""
pairs = db.coauthor_pairs()
top = db.top_authors(limit=100)
# Build node set from authors who have co-authorships
author_info = {name: {"org": aff, "draft_count": cnt} for name, aff, cnt, _ in top}
node_set = set()
edges = []
for a, b, shared in pairs:
if shared >= min_shared:
node_set.add(a)
node_set.add(b)
edges.append({"source": a, "target": b, "weight": shared})
nodes = []
for name in node_set:
info = author_info.get(name, {"org": "", "draft_count": 1})
nodes.append({
"id": name,
"name": name,
"org": info["org"],
"draft_count": info["draft_count"],
})
return {"nodes": nodes, "edges": edges}
def get_cross_org_data(db: Database, limit: int = 20) -> list[dict]:
"""Return cross-org collaboration pairs."""
rows = db.cross_org_collaborations(limit=limit)
return [
{"org_a": a, "org_b": b, "shared_drafts": cnt}
for a, b, cnt in rows
]
def get_author_network_full(db: Database) -> AuthorNetwork:
"""Return author network (cached for 5 min)."""
return _cached("author_network", lambda: _compute_author_network_full(db))
def _compute_author_network_full(db: Database) -> AuthorNetwork:
"""Return enriched co-authorship network with avg scores and cluster info.
Returns {
nodes: [{id, name, org, draft_count, avg_score, drafts: [name,...]}],
edges: [{source, target, weight}],
clusters: [{id, members: [name,...], org_mix: {org: count}, size}],
}
"""
pairs = db.coauthor_pairs()
top = db.top_authors(limit=500)
# Build rating lookup for avg scores
rated = db.drafts_with_ratings(limit=2000)
draft_score = {d.name: r.composite_score for d, r in rated}
# Author info map
author_info = {}
for name, aff, cnt, drafts in top:
scores = [draft_score[dn] for dn in drafts if dn in draft_score]
avg = round(sum(scores) / len(scores), 2) if scores else 0
author_info[name] = {
"org": aff, "draft_count": cnt, "drafts": drafts, "avg_score": avg
}
# Build node set: authors with meaningful collaboration (2+ shared drafts)
node_set = set()
edges = []
for a, b, shared in pairs:
if shared >= 2:
node_set.add(a)
node_set.add(b)
edges.append({"source": a, "target": b, "weight": shared})
# Also include authors with 3+ drafts even if no co-authorships
for name, info in author_info.items():
if info["draft_count"] >= 3:
node_set.add(name)
nodes = []
for name in node_set:
info = author_info.get(name, {"org": "", "draft_count": 1, "drafts": [], "avg_score": 0})
nodes.append({
"id": name,
"name": name,
"org": info["org"],
"draft_count": info["draft_count"],
"avg_score": info["avg_score"],
"drafts": info["drafts"][:8], # cap for JSON size
})
# Cluster detection via connected components (BFS)
adjacency: dict[str, set[str]] = defaultdict(set)
for e in edges:
adjacency[e["source"]].add(e["target"])
adjacency[e["target"]].add(e["source"])
visited: set[str] = set()
clusters = []
# Batch-load all drafts referenced by authors (avoid N+1 in cluster loop)
_all_dn = set()
for _ai in author_info.values():
_all_dn.update(_ai.get("drafts", []))
_all_drafts_map = db.get_drafts_by_names(list(_all_dn))
for node in sorted(node_set):
if node in visited:
continue
component: list[str] = []
queue = [node]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adjacency.get(current, []):
if neighbor not in visited:
queue.append(neighbor)
if len(component) >= 2:
org_mix: dict[str, int] = Counter()
member_orgs: dict[str, str] = {}
cluster_drafts: dict[str, str] = {} # name -> title
for m in component:
org = author_info.get(m, {}).get("org", "")
if org:
org_mix[org] += 1
member_orgs[m] = org
for dn in author_info.get(m, {}).get("drafts", []):
if dn not in cluster_drafts:
d = _all_drafts_map.get(dn)
cluster_drafts[dn] = d.title[:80] if d else dn
clusters.append({
"id": len(clusters),
"members": component,
"member_orgs": member_orgs,
"org_mix": dict(org_mix.most_common()),
"size": len(component),
"drafts": [{"name": n, "title": t} for n, t in list(cluster_drafts.items())],
"draft_count": len(cluster_drafts),
})
clusters.sort(key=lambda c: c["size"], reverse=True)
# Generate meaningful names for clusters
for cl in clusters:
cl["name"] = _author_cluster_name(cl)
return {"nodes": nodes, "edges": edges, "clusters": clusters}
def _normalize_org(name: str) -> str:
"""Shorten verbose org names for display."""
# Remove common suffixes
for suffix in (", Inc.", " Inc.", ", Ltd.", " Ltd.", " Co.", " Technologies",
" Corporation", " Corp.", " Limited", " GmbH", " AG",
" Europe Ltd", " Research", " Systems"):
name = name.replace(suffix, "")
return name.strip().rstrip(",").rstrip("&").rstrip()
def _author_cluster_name(cluster: dict) -> str:
"""Derive a meaningful name for an author cluster from orgs and draft titles."""
# Org part: top 1-2 orgs, normalized
raw_orgs = list(cluster.get("org_mix", {}).keys())
orgs = []
seen_short: set[str] = set()
for o in raw_orgs:
short = _normalize_org(o)
if short.lower() not in seen_short:
seen_short.add(short.lower())
orgs.append(short)
if len(orgs) >= 2:
org_label = f"{orgs[0]} + {orgs[1]}"
elif orgs:
org_label = orgs[0]
else:
# Fall back to first member's last name
members = cluster.get("members", [])
org_label = members[0].split()[-1] if members else "Unknown"
# Topic part: extract common keywords from draft titles
stopwords = {
"a", "an", "the", "of", "for", "in", "to", "and", "on", "with",
"using", "based", "draft", "internet", "ietf", "protocol", "framework",
"requirements", "architecture", "considerations", "use", "cases", "via",
"towards", "over", "from", "into", "between", "specification", "extension",
"extensions", "mechanisms", "mechanism", "version", "new", "general",
}
word_counts: Counter = Counter()
for d in cluster.get("drafts", []):
title = d.get("title", "")
words = re.findall(r"[A-Za-z]{3,}", title)
for w in words:
wl = w.lower()
if wl not in stopwords:
word_counts[wl] += 1
# Pick top keyword(s) that appear in multiple drafts
top_words = [w for w, c in word_counts.most_common(3) if c >= 2]
if not top_words:
top_words = [w for w, _ in word_counts.most_common(1)]
if top_words:
topic = " ".join(w.capitalize() for w in top_words[:2])
name = f"{org_label}{topic}"
else:
name = org_label
# Truncate if too long for display
return name if len(name) <= 50 else name[:47] + ""

381
src/webui/data/drafts.py Normal file
View File

@@ -0,0 +1,381 @@
"""Draft-related data access functions."""
from __future__ import annotations
import json
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import TypedDict
from ietf_analyzer.db import Database
from ietf_analyzer.readiness import compute_readiness, compute_readiness_batch
from webui.data._shared import _project_root
class OverviewStats(TypedDict):
"""High-level dashboard statistics from :func:`get_overview_stats`."""
total_drafts: int
rated_count: int
author_count: int
idea_count: int
gap_count: int
input_tokens: int
output_tokens: int
false_positive_count: int
class DraftListItem(TypedDict):
"""Single draft in the paginated listing from :func:`get_drafts_page`."""
name: str
title: str
date: str | None
url: str
pages: int
group: str
source: str
score: float
novelty: float
maturity: float
overlap: float
momentum: float
relevance: float
categories: list[str]
summary: str
readiness: float
class DraftsPage(TypedDict):
"""Paginated draft listing from :func:`get_drafts_page`."""
drafts: list[DraftListItem]
total: int
page: int
per_page: int
pages: int
def get_overview_stats(db: Database) -> OverviewStats:
"""Return high-level stats for the dashboard home page.
Excludes drafts flagged as false positives from rated counts.
"""
total_drafts = db.count_drafts(include_false_positives=False)
rated_pairs = db.drafts_with_ratings(limit=1000) # already excludes FPs
rated_count = len(rated_pairs)
author_count = db.author_count()
idea_count = db.idea_count()
gaps = db.all_gaps()
input_tok, output_tok = db.total_tokens_used()
# Count false positives separately for transparency
total_all = db.count_drafts(include_false_positives=True)
false_positive_count = total_all - total_drafts
return {
"total_drafts": total_drafts,
"rated_count": rated_count,
"author_count": author_count,
"idea_count": idea_count,
"gap_count": len(gaps),
"input_tokens": input_tok,
"output_tokens": output_tok,
"false_positive_count": false_positive_count,
}
def get_category_counts(db: Database) -> dict[str, int]:
"""Return {category: draft_count} for all categories."""
return db.category_counts()
def get_category_summary(db: Database, category: str) -> dict | None:
"""Build a data-driven summary for a category. Returns None if category not found."""
pairs = db.drafts_with_ratings(limit=2000)
all_authors = db.top_authors(limit=500)
# Filter to drafts in this category
cat_pairs = [(d, r) for d, r in pairs if category in r.categories]
if not cat_pairs:
return None
# Author lookup: draft_name -> [author names]
author_drafts_map: dict[str, list[str]] = defaultdict(list)
for name, aff, cnt, drafts in all_authors:
for dn in drafts:
author_drafts_map[dn].append(name)
# Dimension averages
n = len(cat_pairs)
avg = lambda vals: round(sum(vals) / len(vals), 1) if vals else 0
novelty_vals = [r.novelty for _, r in cat_pairs]
maturity_vals = [r.maturity for _, r in cat_pairs]
overlap_vals = [r.overlap for _, r in cat_pairs]
momentum_vals = [r.momentum for _, r in cat_pairs]
relevance_vals = [r.relevance for _, r in cat_pairs]
scores = [r.composite_score for _, r in cat_pairs]
# Top drafts
sorted_pairs = sorted(cat_pairs, key=lambda p: p[1].composite_score, reverse=True)
top_3 = [(d.name, d.title, round(r.composite_score, 1)) for d, r in sorted_pairs[:3]]
# Top authors in this category
author_counter: Counter = Counter()
org_counter: Counter = Counter()
author_aff: dict[str, str] = {}
for name, aff, cnt, drafts in all_authors:
author_aff[name] = aff or ""
for d, r in cat_pairs:
for a in author_drafts_map.get(d.name, []):
author_counter[a] += 1
if author_aff.get(a):
org_counter[author_aff[a]] += 1
top_authors = author_counter.most_common(5)
top_orgs = org_counter.most_common(5)
# Strongest and weakest dimensions
dim_avgs = {
"Novelty": avg(novelty_vals),
"Maturity": avg(maturity_vals),
"Overlap": avg(overlap_vals),
"Momentum": avg(momentum_vals),
"Relevance": avg(relevance_vals),
}
strongest = max(dim_avgs, key=dim_avgs.get)
weakest = min(dim_avgs, key=dim_avgs.get)
# Activity trend: how many are recent (last 6 months)?
recent = sum(1 for d, _ in cat_pairs if d.time and d.time >= "2025-09")
total_all = len(pairs)
# Build text summary
lines = []
lines.append(f"**{n} drafts** ({n * 100 // total_all}% of all rated drafts) "
f"with an average composite score of **{avg(scores):.1f}/5.0**.")
# Dimension profile
lines.append(f"Strongest dimension: **{strongest}** ({dim_avgs[strongest]}), "
f"weakest: **{weakest}** ({dim_avgs[weakest]}).")
# Maturity vs novelty insight
if dim_avgs["Maturity"] < 2.5 and dim_avgs["Novelty"] >= 3.0:
lines.append("This category has **high novelty but low maturity** — many early-stage proposals with fresh ideas that haven't been fully developed yet.")
elif dim_avgs["Maturity"] >= 3.0 and dim_avgs["Novelty"] < 2.5:
lines.append("This category is **mature but less novel** — established approaches being refined rather than introducing fundamentally new concepts.")
elif dim_avgs["Maturity"] >= 3.0 and dim_avgs["Novelty"] >= 3.0:
lines.append("This category shows **both high novelty and maturity** — well-developed proposals with genuinely new contributions.")
# Overlap insight
if dim_avgs["Overlap"] >= 3.5:
lines.append(f"High overlap ({dim_avgs['Overlap']}) suggests **significant duplication** — multiple drafts cover similar ground, which may indicate convergence or fragmentation.")
elif dim_avgs["Overlap"] <= 2.0:
lines.append(f"Low overlap ({dim_avgs['Overlap']}) indicates **diverse approaches** — drafts in this category tackle distinct problems with little redundancy.")
# Activity
if recent > 0:
lines.append(f"**{recent} draft{'s' if recent != 1 else ''}** submitted in the last 6 months, "
f"suggesting {'active' if recent >= 3 else 'moderate'} development.")
return {
"text": " ".join(lines),
"count": n,
"avg_score": avg(scores),
"dimensions": dim_avgs,
"top_drafts": top_3,
"top_authors": top_authors,
"top_orgs": top_orgs,
"strongest": strongest,
"weakest": weakest,
}
def get_drafts_page(
db: Database,
page: int = 1,
per_page: int = 50,
search: str = "",
category: str = "",
min_score: float = 0.0,
sort: str = "score",
sort_dir: str = "desc",
source: str = "",
) -> DraftsPage:
"""Return a paginated, filtered list of drafts with ratings.
Returns dict with keys: drafts, total, page, per_page, pages.
"""
pairs = db.drafts_with_ratings(limit=1000)
# Build author lookup for search (draft_name -> "author1 author2 ...")
author_text_by_draft: dict[str, str] = {}
if search:
rows = db.conn.execute(
"""SELECT da.draft_name, GROUP_CONCAT(a.name, ' ') as names
FROM draft_authors da JOIN authors a ON da.person_id = a.person_id
GROUP BY da.draft_name"""
).fetchall()
for r in rows:
author_text_by_draft[r[0]] = r[1] or ""
# Filter
filtered = []
for draft, rating in pairs:
if min_score > 0 and rating.composite_score < min_score:
continue
if category and category not in rating.categories:
continue
if source and draft.source != source:
continue
if search:
author_names = author_text_by_draft.get(draft.name, "")
haystack = f"{draft.name} {draft.title} {rating.summary} {author_names}".lower()
if not all(w in haystack for w in search.lower().split()):
continue
filtered.append((draft, rating))
# Sort
sort_keys = {
"score": lambda p: p[1].composite_score,
"name": lambda p: p[0].name,
"date": lambda p: p[0].time or "",
"novelty": lambda p: p[1].novelty,
"maturity": lambda p: p[1].maturity,
"relevance": lambda p: p[1].relevance,
"overlap": lambda p: p[1].overlap,
"momentum": lambda p: p[1].momentum,
"readiness": lambda p: (1.0 if p[0].name.startswith("draft-ietf-") else 0.0) * 0.25 +
min(int(p[0].rev or "0") / 5.0, 1.0) * 0.15 +
((p[1].momentum - 1) / 4.0) * 0.15,
}
key_fn = sort_keys.get(sort, sort_keys["score"])
reverse = sort_dir == "desc"
filtered.sort(key=key_fn, reverse=reverse)
total = len(filtered)
pages = max(1, (total + per_page - 1) // per_page)
page = max(1, min(page, pages))
start = (page - 1) * per_page
page_items = filtered[start : start + per_page]
# Pre-compute readiness in batch (~6 queries total instead of ~200)
readiness_cache = compute_readiness_batch(db, [d.name for d, _ in page_items])
drafts = []
for draft, rating in page_items:
r_score = readiness_cache.get(draft.name, {}).get("score", 0)
drafts.append({
"name": draft.name,
"title": draft.title,
"date": draft.date,
"url": draft.source_url if draft.source != "ietf" else draft.datatracker_url,
"pages": draft.pages or 0,
"group": draft.group or "individual",
"source": draft.source or "ietf",
"score": round(rating.composite_score, 2),
"novelty": rating.novelty,
"maturity": rating.maturity,
"overlap": rating.overlap,
"momentum": rating.momentum,
"relevance": rating.relevance,
"categories": rating.categories,
"summary": rating.summary,
"readiness": r_score,
})
return {
"drafts": drafts,
"total": total,
"page": page,
"per_page": per_page,
"pages": pages,
}
def get_draft_detail(db: Database, name: str) -> dict | None:
"""Return full detail for a single draft."""
draft = db.get_draft(name)
if not draft:
return None
rating = db.get_rating(name)
authors = db.get_authors_for_draft(name)
ideas = db.get_ideas_for_draft(name)
refs = db.get_refs_for_draft(name)
result = {
"name": draft.name,
"title": draft.title,
"rev": draft.rev,
"abstract": draft.abstract,
"date": draft.date,
"time": draft.time,
"url": draft.datatracker_url,
"text_url": draft.text_url,
"pages": draft.pages,
"words": draft.words,
"group": draft.group or "individual",
"categories": draft.categories,
"tags": draft.tags,
"authors": [
{"name": a.name, "affiliation": a.affiliation, "person_id": a.person_id}
for a in authors
],
"ideas": ideas,
"refs": [{"type": t, "id": rid} for t, rid in refs],
}
if rating:
result["rating"] = {
"score": round(rating.composite_score, 2),
"novelty": rating.novelty,
"maturity": rating.maturity,
"overlap": rating.overlap,
"momentum": rating.momentum,
"relevance": rating.relevance,
"summary": rating.summary,
"novelty_note": rating.novelty_note,
"maturity_note": rating.maturity_note,
"overlap_note": rating.overlap_note,
"momentum_note": rating.momentum_note,
"relevance_note": rating.relevance_note,
"categories": rating.categories,
}
# Readiness score
result["readiness"] = compute_readiness(db, name)
# Annotation
annotation = db.get_annotation(name)
result["annotation"] = annotation
return result
def get_generated_drafts() -> list[dict]:
"""Return list of pre-generated draft files in data/reports/generated-drafts/."""
drafts_dir = _project_root / "data" / "reports" / "generated-drafts"
if not drafts_dir.exists():
return []
results = []
for f in sorted(drafts_dir.glob("draft-*.txt")):
# Extract title from first non-empty content line after header
title = f.stem
text = f.read_text(errors="replace")
for line in text.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("Internet-Draft") and \
not stripped.startswith("Intended status") and \
not stripped.startswith("Expires:") and stripped != "":
title = stripped
break
results.append({
"filename": f.name,
"stem": f.stem,
"title": title,
"size": f.stat().st_size,
"path": str(f),
})
return results
def read_generated_draft(filename: str) -> str | None:
"""Read a generated draft file by filename. Returns text or None."""
drafts_dir = _project_root / "data" / "reports" / "generated-drafts"
path = drafts_dir / filename
if not path.exists() or not path.is_file():
return None
# Safety: ensure we're not reading outside the directory
if not str(path.resolve()).startswith(str(drafts_dir.resolve())):
return None
return path.read_text(errors="replace")

20
src/webui/data/gaps.py Normal file
View File

@@ -0,0 +1,20 @@
"""Gap analysis data access functions."""
from __future__ import annotations
from ietf_analyzer.db import Database
def get_all_gaps(db: Database) -> list[dict]:
"""Return all gap analysis results, sorted by severity (critical first)."""
_sev_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
gaps = db.all_gaps()
gaps.sort(key=lambda g: _sev_order.get(g.get("severity", "low"), 99))
return gaps
def get_gap_detail(db: Database, gap_id: int) -> dict | None:
"""Return a single gap by ID, or None if not found."""
gaps = db.all_gaps()
for g in gaps:
if g["id"] == gap_id:
return g
return None

View File

@@ -0,0 +1,26 @@
"""Proposal data access functions."""
from __future__ import annotations
from ietf_analyzer.db import Database
def get_all_proposals(db: Database) -> list[dict]:
"""Return all proposals with linked gap info."""
proposals = db.all_proposals()
gaps = {g["id"]: g for g in db.all_gaps()}
for p in proposals:
p["gaps"] = [gaps[gid] for gid in p.get("gap_ids", []) if gid in gaps]
return proposals
def get_proposal_detail(db: Database, proposal_id: int) -> dict | None:
"""Return a single proposal with full gap details."""
p = db.get_proposal(proposal_id)
if not p:
return None
gaps = {g["id"]: g for g in db.all_gaps()}
p["gaps"] = [gaps[gid] for gid in p.get("gap_ids", []) if gid in gaps]
return p
def get_proposals_for_gap(db: Database, gap_id: int) -> list[dict]:
"""Return proposals linked to a specific gap."""
return db.get_proposals_for_gap(gap_id)

155
src/webui/data/ratings.py Normal file
View File

@@ -0,0 +1,155 @@
"""Rating-related data access functions."""
from __future__ import annotations
import json
from collections import Counter, defaultdict
from ietf_analyzer.db import Database
def get_rating_distributions(db: Database) -> dict:
"""Return arrays for each rating dimension, suitable for Plotly."""
pairs = db.drafts_with_ratings(limit=1000)
dims = {
"novelty": [],
"maturity": [],
"overlap": [],
"momentum": [],
"relevance": [],
"scores": [],
"categories": [],
"names": [],
"sources": [],
}
for draft, rating in pairs:
dims["novelty"].append(rating.novelty)
dims["maturity"].append(rating.maturity)
dims["overlap"].append(rating.overlap)
dims["momentum"].append(rating.momentum)
dims["relevance"].append(rating.relevance)
dims["scores"].append(round(rating.composite_score, 2))
dims["categories"].append(rating.categories[0] if rating.categories else "Other")
dims["names"].append(draft.name)
dims["sources"].append(getattr(draft, "source", "ietf") or "ietf")
return dims
def get_category_radar_data(db: Database) -> dict:
"""Return average rating profiles per category for radar chart."""
pairs = db.drafts_with_ratings(limit=1000)
cat_ratings: dict[str, list] = defaultdict(list)
for _, r in pairs:
for c in r.categories:
cat_ratings[c].append(r)
top_cats = sorted(cat_ratings.keys(), key=lambda c: len(cat_ratings[c]), reverse=True)[:8]
result = {}
for cat in top_cats:
ratings = cat_ratings[cat]
n = len(ratings)
result[cat] = {
"count": n,
"novelty": round(sum(r.novelty for r in ratings) / n, 2),
"maturity": round(sum(r.maturity for r in ratings) / n, 2),
"relevance": round(sum(r.relevance for r in ratings) / n, 2),
"momentum": round(sum(r.momentum for r in ratings) / n, 2),
"low_overlap": round(sum(6 - r.overlap for r in ratings) / n, 2),
}
return result
def get_score_histogram(db: Database) -> list[float]:
"""Return list of composite scores for histogram."""
pairs = db.drafts_with_ratings(limit=1000)
return [round(r.composite_score, 2) for _, r in pairs]
def get_false_positive_profile(db: Database) -> dict:
"""Profile drafts flagged as false positives."""
# Get false positives
fp_rows = db.false_positive_drafts_raw()
# Get non-FP rated drafts for comparison
nonfp_rows = db.non_false_positive_ratings_raw()
total_rated = db.rated_count()
total_drafts = db.count_drafts(include_false_positives=True)
# Build FP list
fp_list = []
fp_categories: Counter = Counter()
fp_sources: Counter = Counter()
fp_dims = {"novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": []}
for row in fp_rows:
cats = json.loads(row["r_categories"]) if row["r_categories"] else []
src = row["source"] or "ietf"
fp_list.append({
"name": row["name"],
"title": row["title"],
"source": src,
"categories": cats,
"relevance": row["relevance"],
"novelty": row["novelty"],
"maturity": row["maturity"],
"overlap": row["overlap"],
"momentum": row["momentum"],
"summary": row["summary"] or "",
})
for cat in cats:
fp_categories[cat] += 1
fp_sources[src] += 1
fp_dims["novelty"].append(row["novelty"])
fp_dims["maturity"].append(row["maturity"])
fp_dims["overlap"].append(row["overlap"])
fp_dims["momentum"].append(row["momentum"])
fp_dims["relevance"].append(row["relevance"])
# Non-FP dimensions for comparison
nonfp_dims = {"novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": []}
nonfp_categories: Counter = Counter()
for row in nonfp_rows:
nonfp_dims["novelty"].append(row["novelty"])
nonfp_dims["maturity"].append(row["maturity"])
nonfp_dims["overlap"].append(row["overlap"])
nonfp_dims["momentum"].append(row["momentum"])
nonfp_dims["relevance"].append(row["relevance"])
cats = json.loads(row["r_categories"]) if row["r_categories"] else []
for cat in cats:
nonfp_categories[cat] += 1
# Top terms from FP abstracts
from collections import Counter as _Counter
stop_words = {
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"of", "with", "by", "from", "is", "it", "that", "this", "are", "was",
"be", "as", "can", "may", "will", "not", "has", "have", "been", "which",
"their", "its", "also", "such", "these", "would", "should", "could",
"more", "other", "than", "into", "about", "between", "over", "after",
"all", "one", "two", "new", "they", "we", "our", "each", "some", "any",
"there", "what", "when", "how", "where", "who", "does", "do", "did",
"no", "if", "so", "up", "out", "only", "used", "using", "use", "based",
"through", "both", "well", "within", "must", "while", "had", "were",
}
word_counter: Counter = Counter()
for row in fp_rows:
abstract = (row["abstract"] or "").lower()
title = (row["title"] or "").lower()
text = abstract + " " + title
words = re.findall(r'[a-z]{3,}', text)
for w in words:
if w not in stop_words:
word_counter[w] += 1
top_terms = word_counter.most_common(30)
return {
"count": len(fp_list),
"total_rated": total_rated,
"total_drafts": total_drafts,
"pct_of_total": round(100 * len(fp_list) / total_drafts, 1) if total_drafts else 0,
"pct_of_rated": round(100 * len(fp_list) / total_rated, 1) if total_rated else 0,
"fp_list": fp_list,
"fp_categories": dict(fp_categories.most_common()),
"fp_sources": dict(fp_sources.most_common()),
"fp_dims": fp_dims,
"nonfp_dims": nonfp_dims,
"top_terms": top_terms,
"nonfp_categories": dict(nonfp_categories.most_common(20)),
}

107
src/webui/data/search.py Normal file
View File

@@ -0,0 +1,107 @@
"""Search and Q&A data access functions."""
from __future__ import annotations
import re
from typing import TypedDict
from ietf_analyzer.config import Config
from ietf_analyzer.db import Database
from ietf_analyzer.search import HybridSearch
class SearchResults(TypedDict):
"""Global search results from :func:`global_search`."""
drafts: list[dict]
ideas: list[dict]
authors: list[dict]
gaps: list[dict]
def global_search(db: Database, query: str) -> SearchResults:
"""Search across drafts (FTS5), ideas, authors, and gaps.
Returns {drafts: [...], ideas: [...], authors: [...], gaps: [...]}.
"""
results: dict = {"drafts": [], "ideas": [], "authors": [], "gaps": []}
if not query or not query.strip():
return results
q = query.strip()
# 1. Drafts via FTS5
try:
fts_query = re.sub(r'[^\w\s]', '', q)
fts_query = re.sub(r'\b(NEAR|OR|AND|NOT)\b', '', fts_query, flags=re.IGNORECASE)
fts_query = re.sub(r'\s+', ' ', fts_query).strip()
if not fts_query:
raise ValueError("empty query after sanitization")
rows = db.conn.execute(
"""SELECT d.name, d.title, d.abstract, d.time, d."group"
FROM drafts d
JOIN drafts_fts f ON d.rowid = f.rowid
WHERE drafts_fts MATCH ?
ORDER BY rank
LIMIT 50""",
(fts_query,),
).fetchall()
for r in rows:
results["drafts"].append({
"name": r["name"],
"title": r["title"],
"abstract": (r["abstract"] or "")[:200],
"date": r["time"],
"group": r["group"] or "individual",
})
except Exception:
# FTS5 match can fail on certain query syntax; fall back to LIKE
like = f"%{q}%"
rows = db.conn.execute(
"""SELECT name, title, abstract, time, "group" FROM drafts
WHERE title LIKE ? OR name LIKE ? OR abstract LIKE ?
LIMIT 50""",
(like, like, like),
).fetchall()
for r in rows:
results["drafts"].append({
"name": r["name"],
"title": r["title"],
"abstract": (r["abstract"] or "")[:200],
"date": r["time"],
"group": r["group"] or "individual",
})
# 2. Ideas via LIKE
like = f"%{q}%"
rows = db.conn.execute(
"""SELECT id, title, description, idea_type, draft_name FROM ideas
WHERE title LIKE ? OR description LIKE ?
ORDER BY id LIMIT 50""",
(like, like),
).fetchall()
for r in rows:
results["ideas"].append({
"id": r["id"],
"title": r["title"],
"description": (r["description"] or "")[:200],
"type": r["idea_type"],
"draft_name": r["draft_name"],
})
# 3. Authors via LIKE
results["authors"] = db.search_authors(q, limit=50)
# 4. Gaps via LIKE
results["gaps"] = db.search_gaps(q, limit=50)
return results
def get_ask_search(db: Database, question: str, top_k: int = 5) -> dict:
"""Search-only (free) — returns sources + cached answer if available."""
config = Config.load()
searcher = HybridSearch(config, db)
return searcher.search_only(question, top_k=top_k)
def get_ask_synthesize(db: Database, question: str, top_k: int = 5, cheap: bool = True) -> dict:
"""Run Claude synthesis (costs tokens, result is cached permanently)."""
config = Config.load()
searcher = HybridSearch(config, db)
return searcher.ask(question, top_k=top_k, cheap=cheap)

View File

@@ -0,0 +1,23 @@
{% extends "base.html" %}
{% block title %}404 — Not Found{% endblock %}
{% block content %}
<div class="flex items-center justify-center min-h-[60vh]">
<div class="text-center max-w-lg">
<h1 class="text-8xl font-bold text-gray-600 mb-4">404</h1>
<h2 class="text-2xl font-semibold text-gray-300 mb-4">Page Not Found</h2>
<p class="text-gray-400 mb-8">
The page you're looking for doesn't exist or has been moved.
</p>
<div class="flex gap-4 justify-center">
<a href="/" class="px-6 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg transition">
Back to Overview
</a>
<a href="/search" class="px-6 py-2 bg-gray-700 hover:bg-gray-600 text-gray-200 rounded-lg transition">
Search Drafts
</a>
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,20 @@
{% extends "base.html" %}
{% block title %}500 — Server Error{% endblock %}
{% block content %}
<div class="flex items-center justify-center min-h-[60vh]">
<div class="text-center max-w-lg">
<h1 class="text-8xl font-bold text-gray-600 mb-4">500</h1>
<h2 class="text-2xl font-semibold text-gray-300 mb-4">Internal Server Error</h2>
<p class="text-gray-400 mb-8">
Something went wrong on our end. Please try again later.
</p>
<div class="flex gap-4 justify-center">
<a href="/" class="px-6 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg transition">
Back to Overview
</a>
</div>
</div>
</div>
{% endblock %}