Files
ietf-draft-analyzer/src/webui/app.py
Christian Nennemann dec8667193 Add 6 new analysis pages and 5 CLI reports
New web UI pages with Plotly charts:
- /sources: cross-source comparison (ratings, categories by standards body)
- /false-positives: profiling of 73 false positives (box plots, terms)
- /trends: temporal evolution (submissions, ratings, safety ratio over time)
- /complexity: draft complexity matrix (correlations, scatter plots)
- /idea-analysis: idea novelty deep dive (sunburst, distribution, shared ideas)
- /citations: enhanced with influence analysis and BCP dependency tabs

New CLI reports (ietf report <name>):
- sources, false-positives, citations, complexity, idea-analysis

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 20:35:32 +01:00

796 lines
23 KiB
Python

"""IETF Draft Analyzer — Web Dashboard.
Run with: python src/webui/app.py
"""
from __future__ import annotations
import sys
from pathlib import Path
# Ensure project src is on path
_project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(_project_root / "src"))
import csv
import io
import json
import time
import functools
from collections import defaultdict
from flask import Flask, render_template, request, jsonify, abort, g, Response
from webui.auth import admin_required, init_auth
from webui.analytics import init_analytics, get_analytics_data
from webui.obsidian_export import build_obsidian_vault
from webui.data import (
get_db,
get_overview_stats,
get_category_counts,
get_drafts_page,
get_draft_detail,
get_rating_distributions,
get_timeline_data,
get_ideas_by_type,
get_all_gaps,
get_gap_detail,
get_generated_drafts,
read_generated_draft,
get_top_authors,
get_org_data,
get_category_radar_data,
get_score_histogram,
get_coauthor_network,
get_cross_org_data,
get_landscape_tsne,
get_similarity_graph,
get_timeline_animation_data,
get_idea_clusters,
get_monitor_status,
get_author_network_full,
get_citation_graph,
get_comparison_data,
get_ask_search,
get_ask_synthesize,
get_category_summary,
global_search,
get_architecture,
get_source_comparison,
get_false_positive_profile,
get_citation_influence,
get_bcp_analysis,
get_trends_data,
get_complexity_data,
get_idea_analysis,
)
app = Flask(
__name__,
template_folder=str(Path(__file__).parent / "templates"),
static_folder=str(Path(__file__).parent / "static"),
static_url_path="/static",
)
import os
app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY", os.urandom(24).hex())
# Auth is initialized at startup — see __main__ block and create_app()
# Default: production mode (admin disabled)
init_auth(app, dev=False)
# Analytics (GDPR-compliant, no cookies)
_analytics_db = str(_project_root / "data" / "analytics.db")
init_analytics(app, db_path=_analytics_db)
# --- Rate limiting for Claude-calling endpoints ---
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
_RATE_LIMIT_MAX = 10 # max requests
_RATE_LIMIT_WINDOW = 60 # per 60 seconds
def rate_limit(f):
"""Simple in-memory rate limiter: max 10 requests per minute per IP."""
@functools.wraps(f)
def wrapper(*args, **kwargs):
ip = request.remote_addr or "unknown"
now = time.time()
# Prune timestamps outside the sliding window
timestamps = _rate_limit_store[ip]
_rate_limit_store[ip] = [t for t in timestamps if now - t < _RATE_LIMIT_WINDOW]
if len(_rate_limit_store[ip]) >= _RATE_LIMIT_MAX:
return jsonify({"error": "Rate limit exceeded. Try again later."}), 429
_rate_limit_store[ip].append(now)
return f(*args, **kwargs)
return wrapper
# --- Database lifecycle (per-request to avoid SQLite threading issues) ---
def db():
if "db" not in g:
g.db = get_db()
return g.db
@app.teardown_appcontext
def close_db(exception=None):
database = g.pop("db", None)
if database is not None:
database.close()
# --- Routes ---
@app.route("/")
def overview():
stats = get_overview_stats(db())
categories = get_category_counts(db())
timeline = get_timeline_data(db())
scores = get_score_histogram(db())
radar = get_category_radar_data(db())
return render_template(
"overview.html",
stats=stats,
categories=categories,
timeline=timeline,
scores=scores,
radar=radar,
)
@app.route("/drafts")
def drafts():
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "")
min_score = request.args.get("min_score", 0.0, type=float)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
result = get_drafts_page(
db(),
page=page,
search=search,
category=category,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
source=source,
)
categories = get_category_counts(db())
cat_summary = get_category_summary(db(), category) if category else None
return render_template(
"drafts.html",
result=result,
categories=categories,
cat_summary=cat_summary,
search=search,
current_cat=category,
current_source=source,
min_score=min_score,
sort=sort,
sort_dir=sort_dir,
)
@app.route("/drafts/<string:name>")
def draft_detail(name: str):
database = db()
detail = get_draft_detail(database, name)
if not detail:
abort(404)
# Build set of draft ref IDs that exist in our DB for internal linking
ref_draft_ids = [r["id"] for r in detail.get("refs", []) if r["type"] == "draft"]
known_drafts = set()
if ref_draft_ids:
placeholders = ",".join("?" * len(ref_draft_ids))
rows = database.conn.execute(
f"SELECT name FROM drafts WHERE name IN ({placeholders})", ref_draft_ids
).fetchall()
known_drafts = {r["name"] for r in rows}
return render_template("draft_detail.html", draft=detail, known_drafts=known_drafts)
@app.route("/ideas")
def ideas():
data = get_ideas_by_type(db())
return render_template("ideas.html", data=data)
@app.route("/gaps")
@admin_required
def gaps():
gap_list = get_all_gaps(db())
generated = get_generated_drafts()
return render_template("gaps.html", gaps=gap_list, generated_drafts=generated)
@app.route("/gaps/demo")
@admin_required
def gaps_demo():
"""Show a pre-generated example draft so users can see output without API calls."""
generated = get_generated_drafts()
# Default to the first generated draft, or allow selection via query param
selected = request.args.get("file", "")
draft_text = None
draft_info = None
if selected:
draft_text = read_generated_draft(selected)
for gd in generated:
if gd["filename"] == selected:
draft_info = gd
break
elif generated:
draft_info = generated[0]
draft_text = read_generated_draft(draft_info["filename"])
return render_template(
"gap_demo.html",
generated_drafts=generated,
draft_text=draft_text,
draft_info=draft_info,
selected=selected,
)
@app.route("/gaps/<int:gap_id>")
@admin_required
def gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
abort(404)
generated = get_generated_drafts()
return render_template("gap_detail.html", gap=gap, generated_drafts=generated)
@app.route("/gaps/<int:gap_id>/generate", methods=["POST"])
@admin_required
def gap_generate(gap_id: int):
"""Trigger draft generation for a gap. Returns JSON with the generated text."""
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
from ietf_analyzer.draftgen import DraftGenerator
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
generator = DraftGenerator(cfg, database, analyzer)
# Generate into a file named after the gap
slug = gap["topic"].lower().replace(" ", "-")[:40]
output_path = str(Path(_project_root) / "data" / "reports" / "generated-drafts" / f"draft-gap-{gap_id}-{slug}.txt")
path = generator.generate(gap["topic"], output_path=output_path)
draft_text = Path(path).read_text(errors="replace")
return jsonify({
"success": True,
"text": draft_text,
"filename": Path(path).name,
"path": path,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/ratings")
def ratings():
distributions = get_rating_distributions(db())
radar = get_category_radar_data(db())
return render_template(
"ratings.html",
dist=distributions,
radar=radar,
)
@app.route("/landscape")
def landscape():
distributions = get_rating_distributions(db())
tsne_data = get_landscape_tsne(db())
return render_template(
"landscape.html",
dist=distributions,
tsne_data=tsne_data,
)
@app.route("/timeline")
def timeline_animation():
data = get_timeline_animation_data(db())
return render_template("timeline.html", animation=data)
@app.route("/idea-clusters")
def idea_clusters():
data = get_idea_clusters(db())
return render_template("idea_clusters.html", clusters=data)
@app.route("/architecture")
def architecture():
data = get_architecture(db())
return render_template("architecture.html", arch=data)
@app.route("/api/architecture")
def api_architecture():
return jsonify(get_architecture(db()))
@app.route("/similarity")
def similarity():
network = get_similarity_graph(db())
return render_template("similarity.html", network=network)
@app.route("/authors")
def authors():
top = get_top_authors(db(), limit=50)
orgs = get_org_data(db(), limit=20)
network = get_author_network_full(db())
cross_org = get_cross_org_data(db(), limit=20)
return render_template(
"authors.html",
authors=top,
orgs=orgs,
orgs_data=orgs,
network=network,
cross_org=cross_org,
)
@app.route("/citations")
def citations():
graph = get_citation_graph(db())
influence = get_citation_influence(db())
bcp = get_bcp_analysis(db())
return render_template("citations.html", graph=graph, influence=influence, bcp=bcp)
@app.route("/monitor")
@admin_required
def monitor_page():
status = get_monitor_status(db())
return render_template("monitor.html", status=status)
@app.route("/admin/analytics")
@admin_required
def analytics_dashboard():
data = get_analytics_data(_analytics_db)
return render_template("analytics.html", data=data)
@app.route("/about")
def about():
from ietf_analyzer.config import Config
cfg = Config.load()
stats = get_overview_stats(db())
return render_template("about.html", stats=stats, search_keywords=cfg.search_keywords,
fetch_since=cfg.fetch_since)
@app.route("/impressum")
def impressum():
return render_template("impressum.html")
@app.route("/datenschutz")
def datenschutz():
return render_template("datenschutz.html")
@app.route("/search")
def search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
total = sum(len(v) for v in results.values())
return render_template("search_results.html", query=q, results=results, total=total)
@app.route("/ask")
def ask_page():
question = request.args.get("q", "")
result = None
if question:
top_k = request.args.get("top", 5, type=int)
# Search only (free) — returns sources + cached answer if available
result = get_ask_search(db(), question, top_k=top_k)
return render_template("ask.html", question=question, result=result)
@app.route("/api/ask/synthesize", methods=["POST"])
@admin_required
@rate_limit
def api_ask_synthesize():
"""Synthesize an answer via Claude (costs tokens, cached permanently). Returns JSON."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_synthesize(db(), question, top_k=top_k, cheap=True)
return jsonify(result)
@app.route("/api/ask", methods=["POST"])
def api_ask():
"""Search only (free). Returns JSON with sources + cached answer if available."""
data = request.get_json(force=True, silent=True)
if not data or "question" not in data:
return jsonify({"error": "Missing 'question' in request body"}), 400
question = data["question"]
top_k = data.get("top_k", 5)
result = get_ask_search(db(), question, top_k=top_k)
return jsonify(result)
@app.route("/compare")
@admin_required
def compare_page():
draft_names = request.args.get("drafts", "")
names = [n.strip() for n in draft_names.split(",") if n.strip()] if draft_names else []
data = None
if len(names) >= 2:
data = get_comparison_data(db(), names)
return render_template("comparison.html", names=names, data=data)
@app.route("/api/compare", methods=["POST"])
@admin_required
@rate_limit
def api_compare():
"""Run Claude comparison for drafts. Returns JSON with comparison text."""
req_data = request.get_json(force=True, silent=True)
if not req_data or "drafts" not in req_data:
return jsonify({"error": "Missing 'drafts' in request body"}), 400
names = req_data["drafts"]
if len(names) < 2:
return jsonify({"error": "Need at least 2 drafts to compare"}), 400
try:
from ietf_analyzer.config import Config
from ietf_analyzer.analyzer import Analyzer
cfg = Config.load()
database = db()
analyzer = Analyzer(cfg, database)
result = analyzer.compare_drafts(names)
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
# --- API endpoints for AJAX (used by client-side charts) ---
def _to_csv_response(rows: list[dict], filename: str = "export.csv") -> Response:
"""Convert a list of dicts to a CSV download response."""
if not rows:
return Response("", mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
si = io.StringIO()
writer = csv.DictWriter(si, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
# Flatten any list/dict values to JSON strings
flat = {}
for k, v in row.items():
if isinstance(v, (list, dict)):
flat[k] = json.dumps(v)
else:
flat[k] = v
writer.writerow(flat)
return Response(si.getvalue(), mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"})
def _results_to_csv(results: dict) -> Response:
"""Convert global search results (multi-category) to a single CSV."""
rows = []
for category, items in results.items():
for item in items:
row = {"_category": category}
row.update(item)
rows.append(row)
return _to_csv_response(rows, "search_results.csv")
@app.route("/api/drafts")
def api_drafts():
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "")
category = request.args.get("cat", "")
source = request.args.get("source", "")
min_score = request.args.get("min_score", 0.0, type=float)
sort = request.args.get("sort", "score")
sort_dir = request.args.get("dir", "desc")
data = get_drafts_page(db(), page=page, search=search, category=category,
min_score=min_score, sort=sort, sort_dir=sort_dir,
source=source)
if request.args.get("format") == "csv":
return _to_csv_response(data.get("drafts", []), "drafts.csv")
return jsonify(data)
@app.route("/api/stats")
def api_stats():
return jsonify(get_overview_stats(db()))
@app.route("/api/authors/network")
def api_author_network():
return jsonify(get_author_network_full(db()))
@app.route("/api/citations")
def api_citations():
min_refs = request.args.get("min_refs", 2, type=int)
return jsonify(get_citation_graph(db(), min_refs=min_refs))
@app.route("/api/search")
def api_search():
q = request.args.get("q", "").strip()
results = global_search(db(), q) if q else {"drafts": [], "ideas": [], "authors": [], "gaps": []}
if request.args.get("format") == "csv":
return _results_to_csv(results)
return jsonify(results)
@app.route("/api/ideas")
def api_ideas():
data = get_ideas_by_type(db())
if request.args.get("format") == "csv":
return _to_csv_response(data.get("ideas", []), "ideas.csv")
return jsonify(data)
@app.route("/api/gaps")
@admin_required
def api_gaps():
data = get_all_gaps(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "gaps.csv")
return jsonify(data)
@app.route("/api/gaps/<int:gap_id>")
@admin_required
def api_gap_detail(gap_id: int):
gap = get_gap_detail(db(), gap_id)
if not gap:
return jsonify({"error": "Gap not found"}), 404
return jsonify(gap)
@app.route("/api/ratings")
def api_ratings():
data = get_rating_distributions(db())
if request.args.get("format") == "csv":
# Transpose columnar data to rows
rows = []
for i in range(len(data.get("names", []))):
rows.append({
"name": data["names"][i],
"score": data["scores"][i],
"novelty": data["novelty"][i],
"maturity": data["maturity"][i],
"overlap": data["overlap"][i],
"momentum": data["momentum"][i],
"relevance": data["relevance"][i],
"category": data["categories"][i],
})
return _to_csv_response(rows, "ratings.csv")
return jsonify(data)
@app.route("/api/timeline")
def api_timeline():
data = get_timeline_data(db())
return jsonify(data)
@app.route("/api/landscape")
def api_landscape():
data = get_landscape_tsne(db())
if request.args.get("format") == "csv":
return _to_csv_response(data, "landscape.csv")
return jsonify(data)
@app.route("/api/similarity")
def api_similarity():
data = get_similarity_graph(db())
return jsonify(data)
@app.route("/api/idea-clusters")
def api_idea_clusters():
data = get_idea_clusters(db())
return jsonify(data)
@app.route("/api/monitor")
@admin_required
def api_monitor():
data = get_monitor_status(db())
return jsonify(data)
@app.route("/api/drafts/<string:name>")
def api_draft_detail(name: str):
detail = get_draft_detail(db(), name)
if not detail:
return jsonify({"error": "Draft not found"}), 404
return jsonify(detail)
@app.route("/api/categories")
def api_categories():
data = get_category_counts(db())
if request.args.get("format") == "csv":
rows = [{"category": k, "count": v} for k, v in data.items()]
return _to_csv_response(rows, "categories.csv")
return jsonify(data)
@app.route("/api/drafts/<string:name>/annotate", methods=["POST"])
@admin_required
def api_annotate(name: str):
"""Add or update annotation for a draft."""
import json as _json
database = db()
draft = database.get_draft(name)
if not draft:
return jsonify({"error": "Draft not found"}), 404
data = request.get_json(force=True, silent=True)
if not data:
return jsonify({"error": "Invalid JSON body"}), 400
note = data.get("note")
tags = data.get("tags")
add_tag = data.get("add_tag")
remove_tag = data.get("remove_tag")
# Handle add/remove tag operations
if add_tag or remove_tag:
existing = database.get_annotation(name)
current_tags = existing["tags"] if existing else []
if add_tag and add_tag not in current_tags:
current_tags.append(add_tag)
if remove_tag and remove_tag in current_tags:
current_tags.remove(remove_tag)
tags = current_tags
database.upsert_annotation(name, note=note, tags=tags)
annotation = database.get_annotation(name)
return jsonify({"success": True, "annotation": annotation})
@app.route("/export/obsidian")
def export_obsidian():
"""Download the entire research corpus as an Obsidian vault (ZIP)."""
data = build_obsidian_vault(db())
return Response(
data,
mimetype="application/zip",
headers={"Content-Disposition": "attachment; filename=IETF-AI-Agent-Drafts.zip"},
)
def create_app(dev: bool = False) -> Flask:
"""Re-initialize auth mode. Call before run() if needed."""
init_auth(app, dev=dev)
return app
# ── Sources & False Positives ────────────────────────────────────────────
@app.route("/sources")
def sources_page():
data = get_source_comparison(db())
return render_template("sources.html", data=data)
@app.route("/false-positives")
def false_positives_page():
data = get_false_positive_profile(db())
return render_template("false_positives.html", data=data)
@app.route("/api/sources")
def api_sources():
data = get_source_comparison(db())
return jsonify(data)
@app.route("/api/false-positives")
def api_false_positives():
data = get_false_positive_profile(db())
return jsonify(data)
# ── Citation Influence & BCP ─────────────────────────────────────────────
@app.route("/api/citations/influence")
def api_citation_influence():
return jsonify(get_citation_influence(db()))
@app.route("/api/citations/bcp")
def api_bcp_analysis():
return jsonify(get_bcp_analysis(db()))
# ── Idea Analysis ────────────────────────────────────────────────────────
@app.route("/idea-analysis")
def idea_analysis():
data = get_idea_analysis(db())
return render_template("idea_analysis.html", data=data)
@app.route("/api/idea-analysis")
def api_idea_analysis():
data = get_idea_analysis(db())
return jsonify(data)
# ── Trends & Complexity ──────────────────────────────────────────────────
@app.route("/trends")
def trends():
data = get_trends_data(db())
return render_template("trends_analysis.html", data=data)
@app.route("/complexity")
def complexity():
data = get_complexity_data(db())
return render_template("complexity.html", data=data)
@app.route("/api/trends")
def api_trends():
return jsonify(get_trends_data(db()))
@app.route("/api/complexity")
def api_complexity():
return jsonify(get_complexity_data(db()))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="IETF Draft Analyzer Web UI")
parser.add_argument("--dev", action="store_true",
help="Development mode: enables admin features (gaps, monitor, compare, annotations)")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=5000)
args = parser.parse_args()
init_auth(app, dev=args.dev)
mode = "\033[33mDEV\033[0m (admin enabled)" if args.dev else "\033[32mPRODUCTION\033[0m (admin disabled)"
print(f"Starting IETF Draft Analyzer — {mode}")
print(f" http://{args.host}:{args.port}")
if args.dev:
print(" Admin features: gaps, monitor, compare, annotations, AI synthesis")
app.run(debug=args.dev, host=args.host, port=args.port)