Files
ietf-draft-analyzer/src/ietf_analyzer/cli.py
Christian Nennemann 757b781c67 Platform upgrade: semantic search, citations, readiness, tests, Docker
Major features added by 5 parallel agent teams:
- Semantic "Ask" (NL queries via FTS5 + embeddings + Claude synthesis)
- Global search across drafts, ideas, authors, gaps
- REST API expansion (14 endpoints, up from 3) with CSV/JSON export
- Citation graph visualization (D3.js, 440 nodes, 2422 edges)
- Standards readiness scoring (0-100 composite from 6 factors)
- Side-by-side draft comparison view with shared/unique analysis
- Annotation system (notes + tags per draft, DB-persisted)
- Docker deployment (Dockerfile + docker-compose with Ollama)
- Scheduled updates (cron script with log rotation)
- Pipeline health dashboard (stage progress bars, cost tracking)
- Test suite foundation (54 pytest tests covering DB, models, web data)

Fixes: compare_drafts() stubbed→working, get_authors_for_draft() bug,
source-aware analysis prompts, config env var overrides + validation,
resilient batch error handling with --retry-failed, observatory --dry-run

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 20:52:56 +01:00

2996 lines
113 KiB
Python

"""CLI entry point — all user-facing commands."""
from __future__ import annotations
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
from .config import Config
from .db import Database
console = Console()
def _get_config() -> Config:
cfg = Config.load()
return cfg
@click.group()
@click.version_option(version="0.1.0")
def main():
"""IETF Draft Analyzer — track, categorize, and rate AI/agent Internet-Drafts."""
pass
# ── fetch ────────────────────────────────────────────────────────────────────
@main.command()
@click.option("--keywords", "-k", multiple=True, help="Extra keywords to search for")
@click.option("--since", "-s", help="Only fetch drafts newer than this date (YYYY-MM-DD)")
@click.option("--download-text/--no-download-text", default=True, help="Download full text of drafts")
def fetch(keywords: tuple[str, ...], since: str | None, download_text: bool):
"""Fetch AI/agent drafts from IETF Datatracker."""
from .fetcher import Fetcher
cfg = _get_config()
db = Database(cfg)
fetcher = Fetcher(cfg)
kw_list = list(cfg.search_keywords)
if keywords:
kw_list.extend(keywords)
try:
drafts = fetcher.search_drafts(keywords=kw_list, since=since)
for draft in drafts:
db.upsert_draft(draft)
console.print(f"Stored [bold green]{len(drafts)}[/] drafts in database")
if download_text:
missing = db.drafts_without_text()
if missing:
console.print(f"Downloading text for [bold]{len(missing)}[/] drafts...")
texts = fetcher.download_texts(missing)
for name, text in texts.items():
draft = db.get_draft(name)
if draft:
draft.full_text = text
db.upsert_draft(draft)
finally:
fetcher.close()
db.close()
# ── list ─────────────────────────────────────────────────────────────────────
@main.command("list")
@click.option("--limit", "-n", default=30, help="Number of drafts to show")
@click.option("--sort", "-s", default="time DESC", help="Sort order (e.g. 'time DESC', 'name ASC')")
def list_drafts(limit: int, sort: str):
"""List tracked drafts."""
cfg = _get_config()
db = Database(cfg)
try:
drafts = db.list_drafts(limit=limit, order_by=sort)
total = db.count_drafts()
table = Table(title=f"Tracked Drafts ({total} total, showing {len(drafts)})")
table.add_column("Date", style="dim", width=10)
table.add_column("Name", style="cyan", max_width=55)
table.add_column("Title", max_width=50)
table.add_column("Pg", justify="right", width=4)
table.add_column("Text", justify="center", width=4)
table.add_column("Rated", justify="center", width=5)
for d in drafts:
has_text = "\u2713" if d.full_text else ""
rated = "\u2713" if db.get_rating(d.name) else ""
table.add_row(d.date, d.name, d.title[:50], str(d.pages or ""), has_text, rated)
console.print(table)
finally:
db.close()
# ── search ───────────────────────────────────────────────────────────────────
@main.command()
@click.argument("query")
@click.option("--limit", "-n", default=20, help="Max results")
def search(query: str, limit: int):
"""Full-text search across stored drafts."""
cfg = _get_config()
db = Database(cfg)
try:
results = db.search_drafts(query, limit=limit)
if not results:
console.print(f"No results for [bold]{query}[/]")
return
table = Table(title=f"Search: {query} ({len(results)} results)")
table.add_column("Date", style="dim", width=10)
table.add_column("Name", style="cyan")
table.add_column("Title")
for d in results:
table.add_row(d.date, d.name, d.title[:60])
console.print(table)
finally:
db.close()
# ── show ─────────────────────────────────────────────────────────────────────
@main.command()
@click.argument("name")
def show(name: str):
"""Show detailed info for a draft."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
draft = db.get_draft(name)
if draft is None:
console.print(f"[red]Draft not found: {name}[/]")
return
rating = db.get_rating(name)
console.print(f"\n[bold]{draft.title}[/]")
console.print(f"[dim]{draft.name}[/] rev {draft.rev} | {draft.date} | {draft.pages or '?'} pages")
console.print(f"Group: {draft.group or 'individual'} | {draft.datatracker_url}")
console.print(f"\n[italic]{draft.abstract}[/]\n")
if rating:
console.print("[bold]AI Assessment[/]")
console.print(f" Score: [bold green]{rating.composite_score:.1f}[/]")
console.print(f" Summary: {rating.summary}\n")
table = Table(show_header=True)
table.add_column("Dimension", width=12)
table.add_column("Score", justify="center", width=7)
table.add_column("Notes")
table.add_row("Novelty", f"{rating.novelty}/5", rating.novelty_note)
table.add_row("Maturity", f"{rating.maturity}/5", rating.maturity_note)
table.add_row("Overlap", f"{rating.overlap}/5", rating.overlap_note)
table.add_row("Momentum", f"{rating.momentum}/5", rating.momentum_note)
table.add_row("Relevance", f"{rating.relevance}/5", rating.relevance_note)
console.print(table)
if rating.categories:
console.print(f"\nCategories: {', '.join(rating.categories)}")
else:
console.print("[dim]Not yet rated — run: ietf analyze {name}[/]")
# Readiness score
from .readiness import compute_readiness
readiness = compute_readiness(db, name)
if readiness["score"] > 0:
console.print(f"\n[bold]Standards Readiness: [cyan]{readiness['score']}/100[/][/]")
rtable = Table(show_header=True)
rtable.add_column("Factor", width=20)
rtable.add_column("Value", justify="center", width=10)
rtable.add_column("Points", justify="right", width=8)
rtable.add_column("Detail")
for key, f in readiness["factors"].items():
rtable.add_row(f["label"], f"{f['value']:.2f}", f"+{f['contribution']}", f["detail"])
console.print(rtable)
# Save detailed report too
path = reporter.draft_detail(name)
if path:
console.print(f"\n[dim]Report saved: {path}[/]")
finally:
db.close()
# ── annotate ─────────────────────────────────────────────────────────────────
@main.command()
@click.argument("draft_name")
@click.option("--note", "-n", default=None, help="Set/update the note text")
@click.option("--tag", "-t", multiple=True, help="Add a tag (can be used multiple times)")
@click.option("--remove-tag", "-r", multiple=True, help="Remove a tag (can be used multiple times)")
def annotate(draft_name: str, note: str | None, tag: tuple[str, ...], remove_tag: tuple[str, ...]):
"""Add or view annotations (notes & tags) for a draft."""
cfg = _get_config()
db = Database(cfg)
try:
draft = db.get_draft(draft_name)
if draft is None:
console.print(f"[red]Draft not found: {draft_name}[/]")
return
# If no options, display current annotation
if note is None and not tag and not remove_tag:
ann = db.get_annotation(draft_name)
if ann:
console.print(f"\n[bold]Annotation for {draft_name}[/]")
console.print(f" Note: {ann['note'] or '(empty)'}")
console.print(f" Tags: {', '.join(ann['tags']) if ann['tags'] else '(none)'}")
console.print(f" Updated: {ann['updated_at']}")
else:
console.print(f"[dim]No annotation for {draft_name}. Use --note or --tag to add one.[/]")
return
# Fetch existing tags for add/remove operations
existing = db.get_annotation(draft_name)
current_tags = existing["tags"] if existing else []
for t in tag:
if t not in current_tags:
current_tags.append(t)
for t in remove_tag:
if t in current_tags:
current_tags.remove(t)
db.upsert_annotation(draft_name, note=note, tags=current_tags)
ann = db.get_annotation(draft_name)
console.print(f"[green]Annotation updated for {draft_name}[/]")
console.print(f" Note: {ann['note'] or '(empty)'}")
console.print(f" Tags: {', '.join(ann['tags']) if ann['tags'] else '(none)'}")
finally:
db.close()
# ── analyze ──────────────────────────────────────────────────────────────────
@main.command()
@click.argument("name", required=False)
@click.option("--all", "analyze_all", is_flag=True, help="Analyze all unrated drafts")
@click.option("--limit", "-n", default=50, help="Max drafts to analyze (with --all)")
@click.option("--retry-failed", is_flag=True, help="Re-analyze drafts that previously failed (clears cache)")
def analyze(name: str | None, analyze_all: bool, limit: int, retry_failed: bool):
"""Analyze and rate drafts using Claude."""
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
if retry_failed:
# Find drafts that have cache entries but no ratings (failed analyses)
unrated = db.unrated_drafts(limit=limit)
retryable = []
for draft in unrated:
# Check if there's a cache entry for this draft (it was attempted)
row = db.conn.execute(
"SELECT COUNT(*) FROM llm_cache WHERE draft_name = ?",
(draft.name,),
).fetchone()
if row[0] > 0:
retryable.append(draft)
if not retryable:
console.print("No previously failed drafts to retry.")
else:
console.print(f"Retrying [bold]{len(retryable)}[/] previously failed drafts...")
count = 0
for draft in retryable:
rating = analyzer.rate_draft(draft.name, use_cache=False)
if rating:
count += 1
console.print(f"Successfully re-analyzed [bold green]{count}[/] of {len(retryable)} drafts")
elif analyze_all:
count = analyzer.rate_all_unrated(limit=limit)
console.print(f"Analyzed [bold green]{count}[/] drafts")
elif name:
rating = analyzer.rate_draft(name)
if rating:
console.print(f"\n[bold green]Rating for {name}:[/]")
console.print(f" Score: {rating.composite_score:.1f}")
console.print(f" Summary: {rating.summary}")
console.print(f" Novelty={rating.novelty} Maturity={rating.maturity} "
f"Overlap={rating.overlap} Momentum={rating.momentum} "
f"Relevance={rating.relevance}")
else:
console.print("[red]Analysis failed[/]")
else:
console.print("Provide a draft name or use --all")
finally:
db.close()
# ── ask ──────────────────────────────────────────────────────────────────────
@main.command()
@click.argument("question")
@click.option("--top", "-n", default=5, help="Number of source drafts to use")
@click.option("--cheap/--quality", default=True, help="Use Haiku (cheap) vs Sonnet (quality)")
def ask(question: str, top: int, cheap: bool):
"""Ask a natural language question about the drafts.
Examples:
ietf ask "Which drafts address agent authentication?"
ietf ask "What are the competing approaches to agent delegation?" --top 10
ietf ask "How do safety mechanisms work?" --cheap
"""
from .search import HybridSearch
cfg = _get_config()
db = Database(cfg)
try:
searcher = HybridSearch(cfg, db)
console.print(f"\n[dim]Searching for relevant drafts...[/]")
result = searcher.ask(question, top_k=top, cheap=cheap)
# Display the answer
console.print()
console.print("[bold cyan]Answer[/]")
console.print("[dim]" + "-" * 60 + "[/]")
console.print(result["answer"])
console.print()
# Display source drafts table
if result["sources"]:
table = Table(title="Source Drafts")
table.add_column("#", style="dim", width=3)
table.add_column("Draft", style="cyan", max_width=50)
table.add_column("Title", max_width=45)
table.add_column("Match", width=10)
table.add_column("Score", justify="right", width=8)
for i, src in enumerate(result["sources"], 1):
score_str = f"{src['similarity']:.3f}" if src.get("similarity") else "-"
table.add_row(
str(i),
src["name"],
src["title"][:45],
src.get("match_type", ""),
score_str,
)
console.print(table)
finally:
db.close()
# ── compare ──────────────────────────────────────────────────────────────────
@main.command()
@click.argument("names", nargs=-1, required=True)
def compare(names: tuple[str, ...]):
"""Compare multiple drafts for overlap and unique contributions."""
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
result = analyzer.compare_drafts(list(names))
if "error" in result:
console.print(f"[red]{result['error']}[/]")
else:
console.print(f"\n[bold cyan]Comparison of {len(result['drafts'])} drafts[/]")
console.print("[dim]" + "-" * 60 + "[/]")
console.print(result["text"])
finally:
db.close()
# ── embed ────────────────────────────────────────────────────────────────────
@main.command()
def embed():
"""Generate embeddings for all drafts (requires Ollama)."""
from .embeddings import Embedder
cfg = _get_config()
db = Database(cfg)
embedder = Embedder(cfg, db)
try:
count = embedder.embed_all_missing()
console.print(f"Embedded [bold green]{count}[/] drafts")
finally:
db.close()
# ── embed-ideas ──────────────────────────────────────────────────────────────
@main.command("embed-ideas")
@click.option("--limit", default=0, help="Max ideas to embed (0=all)")
@click.option("--batch-size", default=50, help="Batch size for Ollama")
def embed_ideas(limit: int, batch_size: int):
"""Generate embeddings for extracted ideas via Ollama."""
import ollama as ollama_lib
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
cfg = _get_config()
db = Database(cfg)
client = ollama_lib.Client(host=cfg.ollama_url)
try:
missing = db.ideas_without_embeddings(limit=limit if limit > 0 else 10000)
if not missing:
console.print("All ideas already have embeddings.")
return
total = len(missing)
console.print(f"Embedding [bold]{total}[/] ideas in batches of {batch_size}...")
count = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Embedding ideas...", total=total)
for start in range(0, total, batch_size):
batch = missing[start:start + batch_size]
texts = [f"{idea['title']}. {idea['description']}" for idea in batch]
try:
resp = client.embed(model=cfg.ollama_embed_model, input=texts)
for i, idea in enumerate(batch):
import numpy as np
vec = np.array(resp["embeddings"][i], dtype=np.float32)
db.store_idea_embedding(idea["id"], cfg.ollama_embed_model, vec)
count += 1
progress.advance(task)
except Exception as e:
console.print(f"[red]Batch failed: {e}[/]")
for _ in batch:
progress.advance(task)
console.print(f"Embedded [bold green]{count}[/] ideas")
finally:
db.close()
# ── similar ──────────────────────────────────────────────────────────────────
@main.command()
@click.argument("name")
@click.option("--top", "-n", default=10, help="Number of similar drafts to show")
def similar(name: str, top: int):
"""Find drafts most similar to a given draft."""
from .embeddings import Embedder
cfg = _get_config()
db = Database(cfg)
embedder = Embedder(cfg, db)
try:
results = embedder.find_similar(name, top_n=top)
if not results:
console.print(f"[yellow]No similar drafts found (need embeddings — run `ietf embed` first)[/]")
return
table = Table(title=f"Drafts similar to {name}")
table.add_column("Similarity", justify="right", width=10)
table.add_column("Draft", style="cyan")
table.add_column("Title")
for sim_name, score in results:
draft = db.get_draft(sim_name)
title = draft.title[:60] if draft else ""
table.add_row(f"{score:.3f}", sim_name, title)
console.print(table)
finally:
db.close()
# ── clusters ─────────────────────────────────────────────────────────────────
@main.command()
@click.option("--threshold", "-t", default=0.85, help="Similarity threshold for clustering")
def clusters(threshold: float):
"""Find clusters of highly similar (potentially overlapping) drafts."""
from .embeddings import Embedder
cfg = _get_config()
db = Database(cfg)
embedder = Embedder(cfg, db)
try:
cluster_list = embedder.find_clusters(threshold=threshold)
if not cluster_list:
console.print("No clusters found at this threshold.")
return
console.print(f"\n[bold]Found {len(cluster_list)} clusters[/] (threshold={threshold})\n")
for i, cluster in enumerate(cluster_list, 1):
console.print(f"[bold cyan]Cluster {i}[/] ({len(cluster)} drafts):")
for name in cluster:
draft = db.get_draft(name)
title = draft.title[:60] if draft else ""
console.print(f" - {name} [dim]{title}[/]")
console.print()
finally:
db.close()
# ── report ───────────────────────────────────────────────────────────────────
@main.group()
def report():
"""Generate markdown reports."""
pass
@report.command()
def overview():
"""Overview table of all rated drafts."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.overview()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command()
def landscape():
"""Category-grouped landscape view."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.landscape()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command()
@click.option("--days", "-d", default=7, help="Look back N days")
def digest(days: int):
"""What's new digest."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.digest(since_days=days)
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command()
def timeline():
"""Timeline of draft submissions by month and category."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.timeline()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("overlap-matrix")
def overlap_matrix():
"""Full pairwise overlap matrix report."""
from .embeddings import Embedder
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
embedder = Embedder(cfg, db)
reporter = Reporter(cfg, db)
try:
console.print("Computing 260x260 similarity matrix...")
path = reporter.overlap_matrix(embedder)
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("authors")
def authors_report():
"""Author and organization network report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.authors_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("ideas")
def ideas_report():
"""Report on extracted technical ideas."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.ideas_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("refs")
def refs_report():
"""Cross-reference report — which standards the ecosystem builds on."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.refs_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("trends")
def trends_report():
"""Category trend analysis report (markdown)."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.trends_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("idea-overlap")
def idea_overlap_report():
"""Cross-organization idea overlap report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.idea_overlap_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("status")
def status_report():
"""WG adoption status report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.status_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("revisions")
def revisions_report():
"""Draft revision velocity report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.revisions_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("centrality")
def centrality_report():
"""Author network centrality report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.centrality_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("co-occurrence")
def co_occurrence_report():
"""Category co-occurrence matrix report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.co_occurrence_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("wg")
def wg_report():
"""Working group analysis report — overlaps, alignment, submission targets."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.wg_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
# ── wg (working group analysis) ─────────────────────────────────────────
@main.group()
def wg():
"""Working group analysis — overlaps, alignment opportunities, submission targets."""
pass
@wg.command("list")
@click.option("--min-drafts", default=1, help="Minimum drafts to show a WG")
def wg_list(min_drafts: int):
"""List working groups with draft counts and average scores."""
cfg = _get_config()
db = Database(cfg)
try:
summaries = db.wg_summary()
if not summaries:
console.print("[yellow]No WG data. Run: python scripts/backfill-wg-names.py[/]")
return
summaries = [s for s in summaries if s["draft_count"] >= min_drafts]
table = Table(title=f"Working Groups ({len(summaries)} with >= {min_drafts} drafts)")
table.add_column("WG", style="cyan", width=12)
table.add_column("#", justify="right", width=4)
table.add_column("Ideas", justify="right", width=5)
table.add_column("Nov", justify="center", width=4)
table.add_column("Mat", justify="center", width=4)
table.add_column("Ovl", justify="center", width=4)
table.add_column("Mom", justify="center", width=4)
table.add_column("Rel", justify="center", width=4)
table.add_column("Top Categories")
for s in summaries:
top_cats = sorted(s["categories"].items(), key=lambda x: x[1], reverse=True)[:3]
cats_str = ", ".join(f"{c}({n})" for c, n in top_cats) if top_cats else "-"
table.add_row(
s["wg"], str(s["draft_count"]), str(s["idea_count"]),
str(s["avg_novelty"]), str(s["avg_maturity"]),
str(s["avg_overlap"]), str(s["avg_momentum"]),
str(s["avg_relevance"]), cats_str,
)
console.print(table)
# Also show individual submission count
indiv = db.conn.execute(
'SELECT COUNT(*) FROM drafts WHERE "group" = \'none\' OR "group" IS NULL'
).fetchone()[0]
console.print(f"\n[dim]Individual submissions (no WG): {indiv}[/]")
finally:
db.close()
@wg.command("show")
@click.argument("name")
def wg_show(name: str):
"""Show details for a specific working group."""
cfg = _get_config()
db = Database(cfg)
try:
drafts = db.wg_drafts(name)
if not drafts:
console.print(f"[red]No drafts found for WG: {name}[/]")
return
console.print(f"\n[bold]Working Group: {name}[/] ({len(drafts)} drafts)\n")
table = Table()
table.add_column("Date", style="dim", width=10)
table.add_column("Name", style="cyan")
table.add_column("Title", max_width=50)
table.add_column("Score", justify="right", width=6)
for d in drafts:
rating = db.get_rating(d.name)
score = f"{rating.composite_score:.1f}" if rating else "-"
table.add_row(d.date, d.name, d.title[:50], score)
console.print(table)
# Show ideas for this WG
ideas = []
for d in drafts:
ideas.extend(db.get_ideas_for_draft(d.name))
if ideas:
console.print(f"\n[bold]Ideas ({len(ideas)}):[/]")
for idea in ideas[:15]:
console.print(f" - [cyan]{idea['title']}[/]: {idea['description'][:80]}")
if len(ideas) > 15:
console.print(f" [dim]... and {len(ideas) - 15} more[/]")
finally:
db.close()
@wg.command("overlaps")
@click.option("--min-wgs", default=2, help="Minimum WGs sharing a category to show")
def wg_overlaps(min_wgs: int):
"""Find categories and ideas that span multiple WGs — alignment opportunities."""
cfg = _get_config()
db = Database(cfg)
try:
# Category spread across WGs
spread = db.category_wg_spread()
multi = [s for s in spread if s["wg_count"] >= min_wgs
and not all(w["wg"] == "none" for w in s["wgs"])]
if multi:
console.print(f"\n[bold]Categories spanning {min_wgs}+ WGs[/]\n")
for s in multi:
wg_strs = [f"{w['wg']}({w['count']})" for w in s["wgs"] if w["wg"] != "none"]
if wg_strs:
console.print(f" [cyan]{s['category']}[/] — {s['total_drafts']} drafts across {s['wg_count']} WGs")
console.print(f" WGs: {', '.join(wg_strs)}")
# Idea overlap across WGs
idea_overlaps = db.wg_idea_overlap()
cross_wg = [o for o in idea_overlaps
if not all(w == "none" for w in o["wg_names"])]
if cross_wg:
console.print(f"\n[bold]Ideas appearing in {min_wgs}+ WGs ({len(cross_wg)} found)[/]\n")
for o in cross_wg[:20]:
real_wgs = [w for w in o["wg_names"] if w != "none"]
console.print(f" [cyan]{o['idea_title']}[/] — WGs: {', '.join(real_wgs)}")
for entry in o["wgs"]:
if entry["wg"] != "none":
console.print(f" - [{entry['wg']}] {entry['draft_name']}")
if len(cross_wg) > 20:
console.print(f"\n [dim]... and {len(cross_wg) - 20} more[/]")
if not multi and not cross_wg:
console.print("[yellow]No cross-WG overlaps found.[/]")
finally:
db.close()
@wg.command("alignment")
def wg_alignment():
"""Identify where individual drafts should be consolidated into WG standards."""
cfg = _get_config()
db = Database(cfg)
try:
# Compare individual vs WG category distribution
dist = db.individual_vs_wg_categories()
indiv = dist["individual"]
adopted = dist["wg_adopted"]
console.print("\n[bold]Individual vs WG-Adopted Category Distribution[/]\n")
table = Table()
table.add_column("Category", width=25)
table.add_column("Individual", justify="right", width=10)
table.add_column("WG-Adopted", justify="right", width=10)
table.add_column("Signal", width=40)
all_cats = sorted(set(list(indiv.keys()) + list(adopted.keys())))
for cat in all_cats:
i_count = indiv.get(cat, 0)
w_count = adopted.get(cat, 0)
signal = ""
if i_count >= 5 and w_count == 0:
signal = "[yellow]High individual activity, no WG — needs WG?[/]"
elif i_count >= 3 and w_count >= 1:
signal = "[green]WG exists, individual drafts could target it[/]"
elif w_count > i_count and i_count > 0:
signal = "[dim]WG leading, some individual work[/]"
table.add_row(cat, str(i_count), str(w_count), signal)
console.print(table)
# Find overlap clusters within individual submissions that might warrant a WG
console.print("\n[bold]Consolidation Candidates[/]")
console.print("[dim]Categories with many individual drafts but no WG adoption — "
"potential for new WG or BoF[/]\n")
candidates = []
for cat in all_cats:
i_count = indiv.get(cat, 0)
w_count = adopted.get(cat, 0)
if i_count >= 5 and w_count == 0:
candidates.append((cat, i_count))
if candidates:
for cat, count in sorted(candidates, key=lambda x: x[1], reverse=True):
console.print(f" [yellow]{cat}[/]: {count} individual drafts, no WG home")
# Show sample drafts
rows = db.conn.execute("""
SELECT d.name, d.title FROM drafts d
JOIN ratings r ON d.name = r.draft_name
WHERE (d."group" = 'none' OR d."group" IS NULL)
AND r.categories LIKE ?
ORDER BY (r.novelty * 0.30 + r.relevance * 0.25 + r.maturity * 0.20
+ r.momentum * 0.15 + (6 - r.overlap) * 0.10) DESC
LIMIT 5
""", (f"%{cat}%",)).fetchall()
for row in rows:
console.print(f" - {row['name']}: {row['title'][:60]}")
console.print()
else:
console.print(" [green]All active categories have WG representation.[/]")
finally:
db.close()
@wg.command("targets")
def wg_targets():
"""Suggest best WGs for submitting new work in each category."""
cfg = _get_config()
db = Database(cfg)
try:
spread = db.category_wg_spread()
summaries = {s["wg"]: s for s in db.wg_summary()}
console.print("\n[bold]Recommended Submission Targets by Category[/]\n")
for s in spread:
cat = s["category"]
# Filter to real WGs (not 'none')
real_wgs = [w for w in s["wgs"] if w["wg"] != "none"]
if not real_wgs:
console.print(f" [cyan]{cat}[/]: [yellow]No active WG — individual submission[/]")
continue
best = real_wgs[0]
wg_info = summaries.get(best["wg"], {})
console.print(
f" [cyan]{cat}[/]: [bold green]{best['wg']}[/] "
f"({best['count']} drafts"
f"{', avg relevance ' + str(wg_info.get('avg_relevance', '?')) if wg_info else ''})"
)
if len(real_wgs) > 1:
alts = ", ".join(f"{w['wg']}({w['count']})" for w in real_wgs[1:3])
console.print(f" Also: {alts}")
console.print()
finally:
db.close()
# ── visualize ────────────────────────────────────────────────────────────
@main.group()
def viz():
"""Generate interactive visualizations (HTML/PNG)."""
pass
@viz.command("all")
def viz_all():
"""Generate all available visualizations."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
paths = v.generate_all()
console.print(f"\n[bold green]{len(paths)} visualizations[/] saved to {v.output_dir}/")
finally:
db.close()
@viz.command("landscape")
@click.option("--method", "-m", default="tsne", type=click.Choice(["umap", "tsne"]),
help="Dimensionality reduction method")
def viz_landscape(method: str):
"""2D scatter of draft embeddings colored by category."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.landscape_scatter(method=method)
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("heatmap")
def viz_heatmap():
"""Clustered similarity heatmap (PNG)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.similarity_heatmap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("distributions")
def viz_distributions():
"""Rating dimension distributions by category (PNG)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.score_distributions()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("timeline")
def viz_timeline():
"""Stacked area chart of monthly submissions."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.timeline_chart()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("bubble")
def viz_bubble():
"""Interactive bubble chart: novelty vs maturity."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.bubble_explorer()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("radar")
def viz_radar():
"""Radar chart of average category rating profiles."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.category_radar()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("network")
@click.option("--min-shared", "-n", default=2, help="Minimum shared drafts for an edge")
def viz_network(min_shared: int):
"""Interactive author collaboration network graph."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.author_network(min_shared=min_shared)
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("treemap")
def viz_treemap():
"""Category treemap colored by average score."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.category_treemap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("quality")
def viz_quality():
"""Score vs uniqueness scatter (quality vs redundancy)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.score_vs_overlap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("orgs")
def viz_orgs():
"""Organization contribution bar chart."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.org_contributions()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("ideas")
def viz_ideas():
"""Ideas frequency chart by type."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.ideas_chart()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("browser")
def viz_browser():
"""Interactive filterable draft browser (standalone HTML)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.draft_browser()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
# ── authors ─────────────────────────────────────────────────────────────
@main.command()
@click.argument("name", required=False)
@click.option("--fetch/--no-fetch", default=False, help="Fetch author data from Datatracker first")
@click.option("--limit", "-n", default=20, help="Number of top authors to show")
def authors(name: str | None, fetch: bool, limit: int):
"""Show authors for a draft, or top authors overall."""
from .authors import AuthorNetwork
cfg = _get_config()
db = Database(cfg)
network = AuthorNetwork(cfg, db)
try:
if fetch:
count = network.fetch_all_authors()
console.print(f"Fetched authors for [bold green]{count}[/] drafts")
if name:
draft_authors = db.get_authors_for_draft(name)
if not draft_authors:
console.print(f"[yellow]No author data for {name}. Run `ietf authors --fetch` first.[/]")
return
console.print(f"\n[bold]Authors of {name}:[/]")
for a in draft_authors:
console.print(f" - {a.name} ({a.affiliation or 'no affiliation'})")
else:
top = db.top_authors(limit=limit)
if not top:
console.print("[yellow]No author data. Run `ietf authors --fetch` first.[/]")
return
table = Table(title=f"Top {limit} Authors")
table.add_column("#", justify="right", width=4)
table.add_column("Author", style="cyan")
table.add_column("Organization")
table.add_column("Drafts", justify="right", width=6)
for rank, (aname, aff, cnt, _) in enumerate(top, 1):
table.add_row(str(rank), aname, aff, str(cnt))
console.print(table)
finally:
db.close()
@main.command()
@click.option("--top", "-n", default=20, help="Top N to show")
def network(top: int):
"""Show author collaboration network."""
cfg = _get_config()
db = Database(cfg)
try:
console.print("\n[bold]Top Organizations[/]")
orgs = db.top_orgs(limit=top)
if orgs:
table = Table()
table.add_column("#", justify="right", width=4)
table.add_column("Organization", style="cyan")
table.add_column("Authors", justify="right", width=8)
table.add_column("Drafts", justify="right", width=6)
for rank, (org, auth_cnt, draft_cnt) in enumerate(orgs, 1):
table.add_row(str(rank), org, str(auth_cnt), str(draft_cnt))
console.print(table)
console.print("\n[bold]Cross-Org Collaboration[/]")
cross = db.cross_org_collaborations(limit=top)
if cross:
table = Table()
table.add_column("Org A", style="cyan")
table.add_column("Org B", style="cyan")
table.add_column("Shared Drafts", justify="right", width=8)
for org_a, org_b, shared in cross:
table.add_row(org_a, org_b, str(shared))
console.print(table)
else:
console.print("[yellow]No author data. Run `ietf authors --fetch` first.[/]")
finally:
db.close()
# ── ideas ───────────────────────────────────────────────────────────────
@main.group(invoke_without_command=True)
@click.option("--name", default=None, help="Extract ideas from a specific draft")
@click.option("--all", "extract_all", is_flag=True, help="Extract ideas from all drafts")
@click.option("--limit", "-n", default=50, help="Max drafts to extract (with --all)")
@click.option("--batch", "-b", default=5, help="Drafts per API call (default 5, set 1 for individual)")
@click.option("--cheap/--quality", default=True, help="Use Haiku (cheap) vs Sonnet (quality)")
@click.option("--reextract", is_flag=True, help="Clear existing ideas and re-extract with current prompt")
@click.option("--draft", "reextract_draft", default=None, help="Specific draft to re-extract (with --reextract)")
@click.pass_context
def ideas(ctx, name: str | None, extract_all: bool, limit: int, batch: int, cheap: bool,
reextract: bool, reextract_draft: str | None):
"""Extract, score, and filter technical ideas from drafts."""
if ctx.invoked_subcommand is not None:
return
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
if reextract:
# Clear existing ideas, then re-extract
deleted = db.delete_ideas(draft_name=reextract_draft)
if reextract_draft:
console.print(f"Cleared [bold]{deleted}[/] ideas for {reextract_draft}")
idea_list = analyzer.extract_ideas(reextract_draft, use_cache=True)
if idea_list:
console.print(f"Re-extracted [bold green]{len(idea_list)}[/] ideas:")
for idea in idea_list:
console.print(f" [{idea.get('type', '?')}] [bold]{idea['title']}[/]")
console.print(f" {idea['description']}\n")
else:
console.print("[red]Re-extraction failed or no ideas found[/]")
else:
console.print(f"Cleared [bold]{deleted}[/] ideas from all drafts")
count = analyzer.extract_all_ideas(limit=limit, batch_size=batch, cheap=cheap)
console.print(f"Re-extracted ideas from [bold green]{count}[/] drafts")
elif extract_all:
count = analyzer.extract_all_ideas(limit=limit, batch_size=batch, cheap=cheap)
console.print(f"Extracted ideas from [bold green]{count}[/] drafts")
elif name:
idea_list = analyzer.extract_ideas(name)
if idea_list:
console.print(f"\n[bold]Ideas from {name}:[/]\n")
for idea in idea_list:
console.print(f" [{idea.get('type', '?')}] [bold]{idea['title']}[/]")
console.print(f" {idea['description']}\n")
else:
console.print("[red]Extraction failed or no ideas found[/]")
else:
console.print("Use --name DRAFT, --all, or a subcommand: ideas score / ideas filter")
finally:
db.close()
@ideas.command("score")
@click.option("--cheap/--quality", default=True, help="Use Haiku (cheap) vs Sonnet (quality)")
@click.option("--batch", "-b", default=20, help="Ideas per API call (default 20)")
def ideas_score(cheap: bool, batch: int):
"""Score ideas for novelty (1=generic, 5=genuinely novel)."""
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
stats = analyzer.score_idea_novelty(batch_size=batch, cheap=cheap)
if stats["scored_count"] == 0:
return
# Show distribution table
dist = db.idea_score_distribution()
table = Table(title="Novelty Score Distribution")
table.add_column("Score", style="bold", justify="center")
table.add_column("Label", style="dim")
table.add_column("Count", justify="right")
table.add_column("Bar", min_width=30)
labels = {
1: "Generic building block",
2: "Obvious extension",
3: "Useful but expected",
4: "Interesting contribution",
5: "Genuinely novel",
}
max_count = max(dist.values()) if dist else 1
for score in range(1, 6):
count = dist.get(score, 0)
bar_len = int(30 * count / max_count) if max_count > 0 else 0
table.add_row(
str(score), labels[score], str(count),
"[green]" + "#" * bar_len + "[/]"
)
total = sum(dist.values())
unscored = db.idea_count() - total
console.print(table)
console.print(f"\nTotal scored: [bold]{total}[/] | Unscored: {unscored} | Avg: [bold]{stats['avg_score']:.1f}[/]")
finally:
db.close()
@ideas.command("filter")
@click.option("--min-score", "-m", default=2, help="Remove ideas below this score (default 2)")
@click.option("--dry-run/--execute", default=True, help="Preview (default) or actually delete")
def ideas_filter(min_score: int, dry_run: bool):
"""Filter out low-novelty ideas by score threshold."""
cfg = _get_config()
db = Database(cfg)
try:
candidates = db.ideas_below_score(min_score)
if not candidates:
console.print(f"No ideas with novelty_score < {min_score}.")
return
# Show what would be removed
table = Table(
title=f"Ideas with novelty_score < {min_score} "
f"({'DRY RUN' if dry_run else 'WILL DELETE'})"
)
table.add_column("Score", style="bold", justify="center")
table.add_column("Idea", style="cyan", max_width=40)
table.add_column("Draft", max_width=50)
table.add_column("Description", max_width=60)
for idea in candidates[:50]: # Show first 50
table.add_row(
str(idea["novelty_score"]),
idea["title"],
idea["draft_title"],
idea["description"][:60] + ("..." if len(idea["description"]) > 60 else ""),
)
console.print(table)
if len(candidates) > 50:
console.print(f" ... and {len(candidates) - 50} more")
console.print(f"\nTotal to remove: [bold red]{len(candidates)}[/] / {db.idea_count()} ideas")
if not dry_run:
deleted = db.delete_low_score_ideas(min_score)
console.print(f"[bold red]Deleted {deleted} low-novelty ideas.[/]")
console.print(f"Remaining ideas: [bold green]{db.idea_count()}[/]")
else:
console.print("[dim]Use --execute to actually delete.[/]")
finally:
db.close()
# ── dedup-ideas ─────────────────────────────────────────────────────────
@main.command("dedup-ideas")
@click.option("--threshold", "-t", default=0.85, type=float,
help="Cosine similarity threshold for merging (default 0.85)")
@click.option("--dry-run/--execute", default=True,
help="Preview merges (default) vs actually delete duplicates")
@click.option("--draft", "draft_name", default=None,
help="Limit to a single draft name")
def dedup_ideas(threshold: float, dry_run: bool, draft_name: str | None):
"""Deduplicate similar ideas within each draft using embedding similarity."""
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
mode = "[bold yellow]DRY RUN[/]" if dry_run else "[bold red]EXECUTE[/]"
console.print(f"\n{mode} — Deduplicating ideas (threshold={threshold})")
if draft_name:
console.print(f"Limiting to draft: [bold]{draft_name}[/]")
console.print()
result = analyzer.dedup_ideas(
threshold=threshold, dry_run=dry_run, draft_name=draft_name
)
if result["examples"]:
table = Table(title="Merge Candidates" if dry_run else "Merged Ideas")
table.add_column("Draft", style="dim", max_width=40)
table.add_column("Keep", style="green")
table.add_column("Drop", style="red")
table.add_column("Similarity", justify="right")
for ex in result["examples"]:
table.add_row(
ex["draft"].split("/")[-1][:40],
ex["keep"],
ex["drop"],
f"{ex['similarity']:.3f}",
)
console.print(table)
console.print()
action = "Would remove" if dry_run else "Removed"
console.print(
f"Ideas before: [bold]{result['total_before']}[/] | "
f"{action}: [bold]{result['merged_count']}[/] | "
f"After: [bold]{result['total_after']}[/]"
)
if dry_run and result["merged_count"] > 0:
console.print(
"\n[dim]Run with --execute to apply these merges.[/]"
)
finally:
db.close()
# ── gaps ────────────────────────────────────────────────────────────────
@main.command()
@click.option("--refresh", is_flag=True, help="Re-run gap analysis even if cached")
def gaps(refresh: bool):
"""Identify gaps in the current draft landscape using Claude."""
from .analyzer import Analyzer
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
reporter = Reporter(cfg, db)
try:
existing = db.all_gaps()
if existing and not refresh:
console.print(f"[bold]{len(existing)} gaps[/] already identified (use --refresh to re-run)\n")
else:
gap_list = analyzer.gap_analysis()
console.print(f"\nIdentified [bold green]{len(gap_list)}[/] gaps\n")
existing = gap_list
for i, gap in enumerate(existing if isinstance(existing[0], dict) else [], 1):
sev = gap.get("severity", "medium").upper()
console.print(f" [bold]{i}. {gap['topic']}[/] [{sev}]")
console.print(f" {gap['description'][:100]}\n")
path = reporter.gaps_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
# ── refs ────────────────────────────────────────────────────────────────
@main.command()
@click.argument("name", required=False)
@click.option("--extract/--no-extract", default=False, help="Extract refs from all drafts with text")
@click.option("--top", "-n", default=30, help="Number of top-referenced items to show")
@click.option("--type", "ref_type", default="rfc", type=click.Choice(["rfc", "draft", "bcp"]),
help="Reference type to show top results for")
def refs(name: str | None, extract: bool, top: int, ref_type: str):
"""Parse and show cross-references (RFCs, drafts, BCPs) in draft texts."""
import re
cfg = _get_config()
db = Database(cfg)
try:
if extract:
missing = db.drafts_without_refs()
if not missing:
console.print("[green]All drafts with text already have refs extracted.[/]")
else:
console.print(f"Extracting refs from [bold]{len(missing)}[/] drafts...")
extracted = 0
for draft_name in missing:
draft = db.get_draft(draft_name)
if not draft or not draft.full_text:
continue
found_refs = _extract_refs(draft.full_text, draft.name)
if found_refs:
db.insert_refs(draft_name, found_refs)
extracted += 1
console.print(f"Extracted refs from [bold green]{extracted}[/] drafts")
if name:
# Show refs for a specific draft
draft_refs = db.get_refs_for_draft(name)
if not draft_refs:
console.print(f"[yellow]No refs found for {name}. Run `ietf refs --extract` first.[/]")
return
table = Table(title=f"References in {name}")
table.add_column("Type", style="dim", width=6)
table.add_column("Reference", style="cyan")
for rt, rid in sorted(draft_refs):
table.add_row(rt.upper(), rid)
console.print(table)
else:
# Show top-referenced items
stats = db.ref_stats()
if stats["total_refs"] == 0:
console.print("[yellow]No refs extracted yet. Run `ietf refs --extract` first.[/]")
return
console.print(f"\n[bold]Reference Stats[/]: {stats['drafts_with_refs']} drafts, "
f"{stats['total_refs']} total refs "
f"({stats['rfc_refs']} RFC, {stats['draft_refs']} draft, {stats['bcp_refs']} BCP)\n")
top_items = db.top_referenced(ref_type=ref_type, limit=top)
table = Table(title=f"Top {len(top_items)} Most-Referenced {ref_type.upper()}s")
table.add_column("#", justify="right", width=4)
table.add_column("Reference", style="cyan", width=30)
table.add_column("Count", justify="right", width=6)
table.add_column("Referenced By", max_width=60)
for rank, (rid, cnt, drafts) in enumerate(top_items, 1):
label = f"RFC {rid}" if ref_type == "rfc" else rid
draft_list = ", ".join(d.replace("draft-", "")[:25] for d in drafts[:4])
if len(drafts) > 4:
draft_list += f" +{len(drafts) - 4}"
table.add_row(str(rank), label, str(cnt), draft_list)
console.print(table)
finally:
db.close()
def _extract_refs(text: str, self_name: str) -> list[tuple[str, str]]:
"""Extract RFC, draft, and BCP references from draft full text."""
import re
refs: set[tuple[str, str]] = set()
# RFC references: RFC 1234, RFC1234, [RFC1234], [RFC 1234]
for m in re.finditer(r'\[?RFC\s*(\d{4,})\]?', text, re.IGNORECASE):
refs.add(("rfc", m.group(1)))
# BCP references: BCP 14, BCP14, [BCP14]
for m in re.finditer(r'\[?BCP\s*(\d+)\]?', text, re.IGNORECASE):
refs.add(("bcp", m.group(1)))
# Draft references: draft-ietf-something-name
for m in re.finditer(r'(draft-[\w][\w-]+[\w])', text, re.IGNORECASE):
draft_ref = m.group(1).lower()
# Strip trailing version numbers (e.g., draft-foo-bar-03 -> draft-foo-bar)
draft_ref = re.sub(r'-\d{2,}$', '', draft_ref)
# Don't reference self
if draft_ref != self_name:
refs.add(("draft", draft_ref))
return list(refs)
# ── trends ─────────────────────────────────────────────────────────────
@main.command()
@click.option("--category", "-c", help="Filter to a specific category")
@click.option("--json-out", is_flag=True, help="Also output JSON for visualization")
def trends(category: str | None, json_out: bool):
"""Show category trend analysis — monthly breakdown with growth rates."""
import json as json_mod
from collections import defaultdict
cfg = _get_config()
db = Database(cfg)
try:
pairs = db.drafts_with_ratings(limit=500)
all_drafts = db.list_drafts(limit=500, order_by="time ASC")
if not pairs:
console.print("[yellow]No rated drafts. Run `ietf analyze --all` first.[/]")
return
# Build rating lookup
rating_map = {draft.name: rating for draft, rating in pairs}
# Collect monthly counts per category
monthly: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
all_cats: set[str] = set()
for d in all_drafts:
month = d.time[:7] if d.time else "unknown"
r = rating_map.get(d.name)
if r:
for c in r.categories:
if category and c.lower() != category.lower():
continue
monthly[month][c] += 1
all_cats.add(c)
if not all_cats:
console.print(f"[yellow]No data for category '{category}'[/]" if category
else "[yellow]No category data found.[/]")
return
months = sorted(m for m in monthly.keys() if m != "unknown")
cats = sorted(all_cats)
# Compute cumulative and growth
rows_data = []
cumulative: dict[str, int] = defaultdict(int)
prev_count: dict[str, int] = defaultdict(int)
for month in months:
for cat in cats:
count = monthly[month].get(cat, 0)
cumulative[cat] += count
growth = 0.0
if prev_count[cat] > 0:
growth = ((count - prev_count[cat]) / prev_count[cat]) * 100
rows_data.append({
"month": month,
"category": cat,
"count": count,
"cumulative": cumulative[cat],
"growth_rate": growth,
})
prev_count[cat] = count
# Display summary table
console.print(f"\n[bold]Category Trends[/] — {len(months)} months, {len(cats)} categories\n")
# Show per-category totals and recent momentum
table = Table(title="Category Growth Summary")
table.add_column("Category", style="cyan")
table.add_column("Total", justify="right", width=6)
table.add_column("Last 3mo", justify="right", width=8)
table.add_column("Prev 3mo", justify="right", width=8)
table.add_column("Growth", justify="right", width=8)
recent_months = months[-3:] if len(months) >= 3 else months
prev_months = months[-6:-3] if len(months) >= 6 else []
for cat in cats:
total = cumulative[cat]
recent = sum(monthly[m].get(cat, 0) for m in recent_months)
prev = sum(monthly[m].get(cat, 0) for m in prev_months) if prev_months else 0
if prev > 0:
growth_str = f"{((recent - prev) / prev) * 100:+.0f}%"
elif recent > 0:
growth_str = "new"
else:
growth_str = "-"
table.add_row(cat, str(total), str(recent), str(prev) if prev_months else "-", growth_str)
console.print(table)
# Monthly detail
console.print(f"\n[bold]Monthly Breakdown[/]\n")
detail_table = Table()
detail_table.add_column("Month", style="dim", width=8)
for cat in cats:
detail_table.add_column(cat[:14], justify="right", width=max(6, len(cat[:14])))
detail_table.add_column("Total", justify="right", width=6, style="bold")
for month in months:
row = [month]
total = 0
for cat in cats:
c = monthly[month].get(cat, 0)
total += c
row.append(str(c) if c else "")
row.append(str(total))
detail_table.add_row(*row)
console.print(detail_table)
# Optional JSON output
if json_out:
out_path = Path(cfg.data_dir) / "reports" / "trends.json"
out_path.write_text(json_mod.dumps(rows_data, indent=2))
console.print(f"\nJSON saved: [bold]{out_path}[/]")
finally:
db.close()
# ── status ──────────────────────────────────────────────────────────────
@main.command()
@click.option("--wg", "-w", help="Filter to a specific WG")
def status(wg: str | None):
"""Show WG adoption status — which drafts have institutional backing."""
import json as json_mod
from collections import defaultdict
cfg = _get_config()
db = Database(cfg)
try:
all_status = db.draft_adoption_status()
total = len(all_status)
adopted = [s for s in all_status if s["wg_adopted"]]
individual = [s for s in all_status if not s["wg_adopted"]]
irtf = [s for s in all_status if s["stream"] == "irtf"]
console.print(f"\n[bold]Draft Adoption Status[/]: {total} total drafts\n")
console.print(f" WG-adopted (draft-ietf-*): [bold green]{len(adopted)}[/] ({len(adopted)/total*100:.1f}%)")
console.print(f" IRTF (draft-irtf-*): [bold blue]{len(irtf)}[/]")
console.print(f" Individual: [bold]{len(individual)}[/] ({len(individual)/total*100:.1f}%)\n")
# WG breakdown
wg_groups: dict[str, list[dict]] = defaultdict(list)
for s in adopted:
wg_groups[s["wg_name"]].append(s)
if wg:
# Show drafts for a specific WG
wg_drafts = wg_groups.get(wg, [])
if not wg_drafts:
console.print(f"[yellow]No WG-adopted drafts for '{wg}'[/]")
return
table = Table(title=f"WG '{wg}' Drafts ({len(wg_drafts)})")
table.add_column("Date", style="dim", width=10)
table.add_column("Name", style="cyan")
table.add_column("Title", max_width=50)
for s in sorted(wg_drafts, key=lambda x: x["time"] or ""):
table.add_row(s["time"][:10] if s["time"] else "", s["name"], s["title"][:50])
console.print(table)
else:
# Show WG summary
table = Table(title=f"Working Groups with AI/Agent Drafts ({len(wg_groups)} WGs)")
table.add_column("#", justify="right", width=4)
table.add_column("WG", style="cyan", width=12)
table.add_column("Drafts", justify="right", width=6)
table.add_column("Draft Names", max_width=60)
for rank, (wg_name, drafts) in enumerate(
sorted(wg_groups.items(), key=lambda x: -len(x[1])), 1
):
draft_list = ", ".join(d["name"].replace("draft-ietf-", "")[:30] for d in drafts[:4])
if len(drafts) > 4:
draft_list += f" +{len(drafts) - 4}"
table.add_row(str(rank), wg_name, str(len(drafts)), draft_list)
console.print(table)
# Score comparison
pairs = db.drafts_with_ratings(limit=500)
if pairs:
adopted_names = {s["name"] for s in adopted}
adopted_scores = [r.composite_score for d, r in pairs if d.name in adopted_names]
individual_scores = [r.composite_score for d, r in pairs if d.name not in adopted_names]
if adopted_scores and individual_scores:
console.print(f"\n[bold]Score Comparison[/]:")
avg_adopted = sum(adopted_scores) / len(adopted_scores)
avg_individual = sum(individual_scores) / len(individual_scores)
console.print(f" WG-adopted avg score: [bold green]{avg_adopted:.2f}[/] ({len(adopted_scores)} rated)")
console.print(f" Individual avg score: [bold]{avg_individual:.2f}[/] ({len(individual_scores)} rated)")
# Check gap coverage
gaps = db.all_gaps()
if gaps:
gap_cats = {g["category"].lower() for g in gaps}
adopted_cats: set[str] = set()
for d, r in pairs:
if d.name in adopted_names:
for c in r.categories:
adopted_cats.add(c.lower())
covered = gap_cats & adopted_cats
uncovered = gap_cats - adopted_cats
console.print(f"\n[bold]Gap Coverage by WG-Adopted Work[/]:")
console.print(f" Gap categories with WG backing: {len(covered)}")
if covered:
console.print(f" {', '.join(sorted(covered))}")
console.print(f" Gap categories without WG backing: {len(uncovered)}")
if uncovered:
console.print(f" [yellow]{', '.join(sorted(uncovered))}[/]")
finally:
db.close()
# ── revisions ──────────────────────────────────────────────────────────
@main.command()
@click.option("--org", "-o", help="Filter to a specific organization")
@click.option("--top", "-n", default=20, help="Number of orgs to show")
def revisions(org: str | None, top: int):
"""Analyze draft revision velocity — who iterates vs fire-and-forget."""
from collections import defaultdict
from .orgs import normalize_org
cfg = _get_config()
db = Database(cfg)
try:
all_revs = db.revision_velocity()
total = len(all_revs)
at_00 = sum(1 for r in all_revs if r["rev_int"] == 0)
avg_rev = sum(r["rev_int"] for r in all_revs) / total if total else 0
console.print(f"\n[bold]Draft Revision Velocity[/]: {total} drafts\n")
console.print(f" Average revision: [bold]{avg_rev:.2f}[/]")
console.print(f" At -00 (first draft): [bold]{at_00}[/] ({at_00/total*100:.1f}%)")
console.print(f" Iterated (rev >= 01): [bold]{total - at_00}[/] ({(total-at_00)/total*100:.1f}%)")
console.print(f" Highly iterated (rev >= 05): [bold]{sum(1 for r in all_revs if r['rev_int'] >= 5)}[/]\n")
# Get per-org stats using normalized org names
aff_rows = db.conn.execute(
"SELECT da.draft_name, a.affiliation FROM draft_authors da "
"JOIN authors a ON da.person_id = a.person_id "
"WHERE a.affiliation != ''"
).fetchall()
# Map draft -> rev
draft_rev = {r["name"]: r["rev_int"] for r in all_revs}
# Group drafts by normalized org (deduped)
org_drafts: dict[str, set[str]] = defaultdict(set)
for row in aff_rows:
norm = normalize_org(row["affiliation"])
if norm:
org_drafts[norm].add(row["draft_name"])
if org:
# Show drafts for a specific org
drafts = org_drafts.get(org, set())
if not drafts:
console.print(f"[yellow]No drafts for '{org}'[/]")
return
table = Table(title=f"'{org}' Drafts by Revision ({len(drafts)})")
table.add_column("Rev", justify="right", width=4)
table.add_column("Name", style="cyan", max_width=50)
table.add_column("Title", max_width=40)
draft_details = [(d, draft_rev.get(d, 0)) for d in drafts]
for name, rev in sorted(draft_details, key=lambda x: -x[1]):
title_row = next((r["title"] for r in all_revs if r["name"] == name), "")
table.add_row(f"-{rev:02d}", name, title_row[:40])
console.print(table)
else:
# Show org summary
org_stats = []
for org_name, drafts in org_drafts.items():
if len(drafts) < 3:
continue
revs = [draft_rev.get(d, 0) for d in drafts]
n_00 = sum(1 for r in revs if r == 0)
org_stats.append({
"org": org_name,
"drafts": len(drafts),
"avg_rev": sum(revs) / len(revs),
"at_00": n_00,
"pct_00": n_00 / len(drafts) * 100,
"max_rev": max(revs),
})
org_stats.sort(key=lambda x: -x["drafts"])
table = Table(title=f"Revision Velocity by Organization (>= 3 drafts, top {top})")
table.add_column("#", justify="right", width=4)
table.add_column("Organization", style="cyan", width=28)
table.add_column("Drafts", justify="right", width=6)
table.add_column("Avg Rev", justify="right", width=8)
table.add_column("At -00", justify="right", width=6)
table.add_column("%-00", justify="right", width=6)
table.add_column("Max", justify="right", width=4)
for rank, s in enumerate(org_stats[:top], 1):
table.add_row(
str(rank), s["org"][:28], str(s["drafts"]),
f"{s['avg_rev']:.2f}", str(s["at_00"]),
f"{s['pct_00']:.0f}%", str(s["max_rev"]),
)
console.print(table)
# Highlight the fire-and-forget vs iterators narrative
high_00 = [s for s in org_stats if s["pct_00"] >= 70 and s["drafts"] >= 5]
iterators = [s for s in org_stats if s["avg_rev"] >= 3.0 and s["drafts"] >= 3]
if high_00:
console.print("\n[bold]Fire-and-Forget[/] (>= 70% at -00, >= 5 drafts):")
for s in high_00:
console.print(f" {s['org']}: {s['at_00']}/{s['drafts']} at -00 ({s['pct_00']:.0f}%)")
if iterators:
console.print("\n[bold]Active Iterators[/] (avg revision >= 3.0):")
for s in iterators:
console.print(f" {s['org']}: avg rev {s['avg_rev']:.1f}, max -{s['max_rev']:02d}")
# Generate report
from .reports import Reporter
reporter = Reporter(cfg, db)
path = reporter.revisions_report()
console.print(f"\nReport saved: [bold]{path}[/]")
finally:
db.close()
# ── idea-overlap ────────────────────────────────────────────────────────
@main.command("idea-overlap")
@click.option("--threshold", "-t", default=0.75, help="Title similarity threshold (0-1)")
@click.option("--limit", "-n", default=50, help="Max results to show")
def idea_overlap(threshold: float, limit: int):
"""Find ideas that appear across multiple organizations."""
from collections import defaultdict
from difflib import SequenceMatcher
from .orgs import normalize_org
cfg = _get_config()
db = Database(cfg)
try:
all_ideas = db.all_ideas()
if not all_ideas:
console.print("[yellow]No ideas extracted yet. Run `ietf ideas --all` first.[/]")
return
# Build draft -> org mapping
draft_orgs: dict[str, set[str]] = defaultdict(set)
rows = db.conn.execute(
"""SELECT da.draft_name, a.affiliation
FROM draft_authors da
JOIN authors a ON da.person_id = a.person_id
WHERE a.affiliation != ''"""
).fetchall()
for r in rows:
org = normalize_org(r["affiliation"])
if org and org != "Independent":
draft_orgs[r["draft_name"]].add(org)
# Group similar ideas (same logic as ideas_report but tracking orgs)
idea_groups: list[dict] = []
for idea in all_ideas:
title_lower = idea["title"].lower().strip()
matched = False
for group in idea_groups:
ratio = SequenceMatcher(None, title_lower, group["canonical"]).ratio()
if ratio >= threshold:
group["ideas"].append(idea)
group["drafts"].add(idea["draft_name"])
group["orgs"].update(draft_orgs.get(idea["draft_name"], set()))
matched = True
break
if not matched:
idea_groups.append({
"canonical": title_lower,
"title": idea["title"],
"ideas": [idea],
"drafts": {idea["draft_name"]},
"orgs": set(draft_orgs.get(idea["draft_name"], set())),
})
# Filter to cross-org ideas (2+ orgs)
cross_org = [g for g in idea_groups if len(g["orgs"]) >= 2]
cross_org.sort(key=lambda g: (-len(g["orgs"]), -len(g["drafts"])))
console.print(f"\n[bold]Cross-Organization Idea Overlap[/]")
console.print(f"{len(all_ideas)} ideas, {len(idea_groups)} unique, "
f"[bold green]{len(cross_org)}[/] appear across 2+ orgs\n")
if not cross_org:
console.print("[yellow]No cross-org idea overlap found at this threshold.[/]")
return
table = Table(title=f"Ideas Shared Across Organizations (top {min(limit, len(cross_org))})")
table.add_column("#", justify="right", width=4)
table.add_column("Idea", style="bold", max_width=40)
table.add_column("Orgs", justify="right", width=5)
table.add_column("Drafts", justify="right", width=6)
table.add_column("Organizations", max_width=50)
for rank, g in enumerate(cross_org[:limit], 1):
org_list = ", ".join(sorted(g["orgs"])[:5])
if len(g["orgs"]) > 5:
org_list += f" +{len(g['orgs']) - 5}"
table.add_row(
str(rank), g["title"][:40], str(len(g["orgs"])),
str(len(g["drafts"])), org_list,
)
console.print(table)
# Also generate the report
from .reports import Reporter
reporter = Reporter(cfg, db)
path = reporter.idea_overlap_report()
console.print(f"\nReport saved: [bold]{path}[/]")
finally:
db.close()
# ── co-occurrence ──────────────────────────────────────────────────────
@main.command("co-occurrence")
def co_occurrence():
"""Category co-occurrence matrix — which categories appear together."""
from collections import defaultdict
cfg = _get_config()
db = Database(cfg)
try:
pairs = db.drafts_with_ratings(limit=500)
total = len(pairs)
multi_cat = sum(1 for d, r in pairs if len(r.categories) > 1)
console.print(f"\n[bold]Category Co-occurrence Analysis[/]: {total} drafts\n")
console.print(f" Multi-category drafts: [bold]{multi_cat}[/] ({multi_cat/total*100:.1f}%)\n")
# Build counts
cat_counts: dict[str, int] = defaultdict(int)
cooccur: dict[tuple[str, str], int] = defaultdict(int)
for d, r in pairs:
for c in r.categories:
cat_counts[c] += 1
for i, c1 in enumerate(r.categories):
for c2 in r.categories[i + 1:]:
key = tuple(sorted([c1, c2]))
cooccur[key] += 1
# Top co-occurrences
table = Table(title="Top 15 Category Co-occurrences")
table.add_column("#", justify="right", width=4)
table.add_column("Category A", style="cyan", width=22)
table.add_column("Category B", style="cyan", width=22)
table.add_column("Count", justify="right", width=6)
top_pairs = sorted(cooccur.items(), key=lambda x: -x[1])[:15]
for rank, ((c1, c2), n) in enumerate(top_pairs, 1):
table.add_row(str(rank), c1, c2, str(n))
console.print(table)
# AI safety isolation check
safety_cooccur = {k: v for k, v in cooccur.items() if "AI safety/alignment" in k}
if safety_cooccur:
console.print("\n[bold]AI Safety/Alignment Co-occurrences[/]:")
for (c1, c2), n in sorted(safety_cooccur.items(), key=lambda x: -x[1]):
other = c2 if c1 == "AI safety/alignment" else c1
console.print(f" {n:>3d} + {other}")
# Generate report
from .reports import Reporter
reporter = Reporter(cfg, db)
path = reporter.co_occurrence_report()
console.print(f"\nReport saved: [bold]{path}[/]")
finally:
db.close()
# ── centrality ─────────────────────────────────────────────────────────
@main.command()
@click.option("--top", "-n", default=20, help="Number of results to show")
def centrality(top: int):
"""Author network centrality — bridge-builders and key connectors."""
import networkx as nx
from collections import defaultdict
from .orgs import normalize_org
cfg = _get_config()
db = Database(cfg)
try:
# Build co-authorship graph
rows = db.conn.execute(
"""SELECT da1.person_id as p1, da2.person_id as p2, COUNT(*) as shared
FROM draft_authors da1
JOIN draft_authors da2 ON da1.draft_name = da2.draft_name
AND da1.person_id < da2.person_id
GROUP BY da1.person_id, da2.person_id"""
).fetchall()
G = nx.Graph()
for r in rows:
G.add_edge(r[0], r[1], weight=r[2])
persons = db.conn.execute(
"SELECT person_id, name, affiliation FROM authors"
).fetchall()
person_info = {r[0]: (r[1], normalize_org(r[2])) for r in persons}
console.print(f"\n[bold]Author Network Analysis[/]: {G.number_of_nodes()} authors, {G.number_of_edges()} co-authorship edges\n")
# Cross-org vs intra-org edges
chinese_orgs = {
"Huawei", "China Mobile", "China Telecom", "China Unicom",
"ZTE Corporation", "Tsinghua University", "BUPT",
"Pengcheng Laboratory", "CAICT", "AsiaInfo",
"Zhongguancun Laboratory", "CNIC, CAS",
"Tsinghua Shenzhen International Graduate School & Pengcheng Laboratory",
"Huazhong University of Science and Technology",
}
cross_org = intra_org = cross_divide = 0
for u, v in G.edges():
_, org_u = person_info.get(u, ("?", ""))
_, org_v = person_info.get(v, ("?", ""))
if org_u and org_v:
if org_u == org_v:
intra_org += 1
else:
cross_org += 1
if (org_u in chinese_orgs) != (org_v in chinese_orgs):
cross_divide += 1
total_edges = cross_org + intra_org
console.print(f" Intra-org edges: [bold]{intra_org}[/] ({intra_org/total_edges*100:.1f}%)")
console.print(f" Cross-org edges: [bold]{cross_org}[/] ({cross_org/total_edges*100:.1f}%)")
console.print(f" Cross Chinese-Western edges: [bold]{cross_divide}[/]")
avg_clustering = nx.average_clustering(G)
components = list(nx.connected_components(G))
console.print(f" Clustering coefficient: [bold]{avg_clustering:.3f}[/]")
console.print(f" Connected components: [bold]{len(components)}[/], largest: {len(max(components, key=len))}\n")
# Betweenness centrality
bc = nx.betweenness_centrality(G)
table = Table(title=f"Top {top} Authors by Betweenness Centrality")
table.add_column("#", justify="right", width=4)
table.add_column("Author", style="bold", width=28)
table.add_column("Organization", style="cyan", width=20)
table.add_column("BC Score", justify="right", width=8)
table.add_column("Degree", justify="right", width=6)
table.add_column("CN/West", justify="center", width=8)
top_bc = sorted(bc.items(), key=lambda x: -x[1])[:top]
for rank, (pid, score) in enumerate(top_bc, 1):
name, org = person_info.get(pid, ("?", "?"))
degree = G.degree(pid)
cn = sum(1 for n in G.neighbors(pid) if person_info.get(n, ("", ""))[1] in chinese_orgs)
west = sum(1 for n in G.neighbors(pid) if person_info.get(n, ("", ""))[1] not in chinese_orgs and person_info.get(n, ("", ""))[1])
table.add_row(str(rank), name[:28], org[:20], f"{score:.4f}", str(degree), f"{cn}/{west}")
console.print(table)
# Bridge-builders
bridges = []
for pid in G.nodes():
name, org = person_info.get(pid, ("?", ""))
cn = sum(1 for n in G.neighbors(pid) if person_info.get(n, ("", ""))[1] in chinese_orgs)
west = sum(1 for n in G.neighbors(pid) if person_info.get(n, ("", ""))[1] not in chinese_orgs and person_info.get(n, ("", ""))[1])
if cn > 0 and west > 0:
bridges.append((pid, name, org, bc.get(pid, 0), cn, west))
bridges.sort(key=lambda x: -x[3])
console.print(f"\n[bold]Cross-Divide Bridge-Builders[/] ({len(bridges)} people with neighbors in both blocs):\n")
for pid, name, org, bc_score, cn, west in bridges[:10]:
console.print(f" [bold]{name}[/] ({org}): BC={bc_score:.4f}, CN neighbors={cn}, Western={west}")
# Generate report
from .reports import Reporter
reporter = Reporter(cfg, db)
path = reporter.centrality_report()
console.print(f"\nReport saved: [bold]{path}[/]")
finally:
db.close()
# ── draft-gen ───────────────────────────────────────────────────────────
@main.command("draft-gen")
@click.argument("gap_topic")
@click.option("--output", "-o", help="Output file path")
def draft_gen(gap_topic: str, output: str | None):
"""Generate an Internet-Draft addressing a landscape gap."""
from .draftgen import DraftGenerator
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
generator = DraftGenerator(cfg, db, analyzer)
try:
out_path = output or str(Path(cfg.data_dir) / "reports" / "generated-draft.txt")
console.print(f"Generating Internet-Draft on: [bold]{gap_topic}[/]")
path = generator.generate(gap_topic, output_path=out_path)
console.print(f"\nDraft saved: [bold green]{path}[/]")
finally:
db.close()
# ── config ───────────────────────────────────────────────────────────────────
@main.command("config")
@click.option("--set", "set_key", nargs=2, help="Set a config key (e.g. --set claude_model claude-opus-4-20250514)")
@click.option("--show", is_flag=True, help="Show effective config with env var sources noted")
def config_cmd(set_key: tuple[str, str] | None, show: bool):
"""Show or modify configuration."""
from dataclasses import asdict
cfg = _get_config()
if set_key:
key, value = set_key
if hasattr(cfg, key):
# Coerce types
current = getattr(cfg, key)
if isinstance(current, float):
value = float(value)
elif isinstance(current, int):
value = int(value)
elif isinstance(current, list):
import json
value = json.loads(value)
setattr(cfg, key, value)
cfg.save()
console.print(f"Set [bold]{key}[/] = {value}")
else:
console.print(f"[red]Unknown config key: {key}[/]")
else:
from dataclasses import asdict
env_sources = cfg.env_sources()
for key, val in asdict(cfg).items():
source_note = ""
if key in env_sources:
source_note = f" [yellow](from ${env_sources[key]})[/]"
console.print(f" [bold]{key}:[/] {val}{source_note}")
if env_sources:
console.print(f"\n [dim]({len(env_sources)} value(s) overridden by environment variables)[/]")
# Note about ANTHROPIC_API_KEY
import os
if os.environ.get("ANTHROPIC_API_KEY"):
console.print(" [dim]ANTHROPIC_API_KEY is set in environment[/]")
else:
console.print(" [dim]ANTHROPIC_API_KEY is NOT set in environment[/]")
# ── pipeline ────────────────────────────────────────────────────────────────
@main.group()
def pipeline():
"""Gap-to-Draft generation pipeline."""
pass
@pipeline.command("context")
@click.argument("gap_topic")
def pipeline_context(gap_topic: str):
"""Preview assembled context for a gap topic (dry run)."""
from .pipeline import ContextBuilder
cfg = _get_config()
db = Database(cfg)
try:
builder = ContextBuilder(cfg, db)
ctx = builder.build_context(gap_topic)
console.print(f"\n[bold]Context for gap: {gap_topic}[/]\n")
gap = ctx.get("gap")
if gap:
console.print(f"[cyan]Gap:[/] {gap.get('topic', '?')}")
console.print(f" {gap.get('description', '')[:200]}")
console.print(f" Severity: {gap.get('severity', '?')}")
ideas = ctx.get("ideas", [])
console.print(f"\n[cyan]Convergent ideas:[/] {len(ideas)}")
for idea in ideas[:10]:
console.print(f" - {idea.get('title', '?')}: {idea.get('description', '')[:80]}")
rfcs = ctx.get("rfc_foundations", [])
console.print(f"\n[cyan]RFC foundations:[/] {len(rfcs)}")
for ref_id, count in rfcs[:10]:
console.print(f" - RFC {ref_id} (cited by {count} drafts)")
similar = ctx.get("similar_drafts", [])
console.print(f"\n[cyan]Similar existing drafts:[/] {len(similar)}")
for name, score in similar[:8]:
console.print(f" - {name} (similarity: {score:.3f})")
top_rated = ctx.get("top_rated", [])
console.print(f"\n[cyan]Top-rated in category:[/] {len(top_rated)}")
wg_ctx = ctx.get("wg_context", [])
adopted = [w for w in wg_ctx if w.get("wg_adopted")]
console.print(f"\n[cyan]WG context:[/] {len(adopted)} WG-adopted drafts")
vision = ctx.get("ecosystem_vision", "")
if vision:
console.print(f"\n[cyan]Ecosystem vision:[/] {len(vision)} chars loaded")
finally:
db.close()
@pipeline.command("generate")
@click.argument("gap_topic")
@click.option("--cheap/--quality", default=False, help="Use Haiku (cheap) or Sonnet (quality)")
@click.option("--dry-run", is_flag=True, help="Show outline only, don't generate sections")
@click.option("--family", "family_name", default="", help="Family name for multi-draft generation")
def pipeline_generate(gap_topic: str, cheap: bool, dry_run: bool, family_name: str):
"""Generate a single draft from a gap topic."""
from .analyzer import Analyzer
from .pipeline import PipelineGenerator, ContextBuilder
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
builder = ContextBuilder(cfg, db)
generator = PipelineGenerator(cfg, db, analyzer)
ctx = builder.build_context(gap_topic)
console.print(f"[bold]Generating draft for gap: {gap_topic}[/]")
outline = generator.generate_outline(ctx, cheap=cheap)
console.print(f" Title: [cyan]{outline.get('title', '?')}[/]")
console.print(f" Sections: {len(outline.get('sections', []))}")
console.print(f" Target WG: {outline.get('target_wg', '?')}")
if dry_run:
import json
console.print("\n[bold]Outline (dry run):[/]")
console.print(json.dumps(outline, indent=2))
return
result = generator.generate_full(gap_topic, cheap=cheap)
console.print(f"\n[bold green]Draft generated![/]")
console.print(f" ID: {result.get('id', '?')}")
console.print(f" Draft name: {result.get('draft_name', '?')}")
# Export text file
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
draft_name = result.get("draft_name", "draft-unknown")
out_path = output_dir / f"{draft_name}.txt"
if result.get("full_text"):
out_path.write_text(result["full_text"])
console.print(f" Saved: {out_path}")
finally:
db.close()
@pipeline.command("family")
@click.option("--name", "family_name", default="agent-ecosystem", help="Family name")
@click.option("--cheap/--quality", default=False, help="Use Haiku (cheap) or Sonnet (quality)")
def pipeline_family(family_name: str, cheap: bool):
"""Generate the full 5-draft ecosystem family."""
from .analyzer import Analyzer
from .pipeline import FamilyCoordinator
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
coordinator = FamilyCoordinator(cfg, db, analyzer)
console.print(f"[bold]Generating draft family: {family_name}[/]\n")
results = coordinator.generate_family(family_name=family_name, cheap=cheap)
console.print(f"\n[bold green]Generated {len(results)} drafts![/]")
# Export all
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
for r in results:
draft_name = r.get("draft_name", "draft-unknown")
if r.get("full_text"):
out_path = output_dir / f"{draft_name}.txt"
out_path.write_text(r["full_text"])
console.print(f" [green]{r.get('family_role', '?')}[/] → {out_path}")
# Family summary
summary_path = output_dir / "family-summary.md"
lines = [f"# Draft Family: {family_name}\n"]
for r in results:
lines.append(f"## {r.get('family_role', '?')}: {r.get('title', '?')}")
lines.append(f"- Draft: `{r.get('draft_name', '?')}`")
lines.append(f"- Gap: {r.get('gap_topic', '?')}")
lines.append(f"- Sections: {len(r.get('sections', []))}")
lines.append("")
summary_path.write_text("\n".join(lines))
console.print(f"\n Summary: {summary_path}")
# Consistency check
consistency = coordinator.check_consistency(family_name)
if consistency.get("issues"):
console.print(f"\n[yellow]Consistency issues:[/]")
for issue in consistency["issues"]:
console.print(f" - {issue}")
else:
console.print(f"\n[green]No consistency issues found[/]")
finally:
db.close()
@pipeline.command("quality")
@click.argument("draft_id", type=int)
def pipeline_quality(draft_id: int):
"""Run quality gates on a generated draft."""
from .analyzer import Analyzer
from .pipeline import QualityGates
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
gates = QualityGates(cfg, db, analyzer)
console.print(f"[bold]Running quality gates on draft #{draft_id}[/]\n")
results = gates.run_all(draft_id)
for gate_name, result in results.items():
status = "[green]PASS[/]" if result["passed"] else "[red]FAIL[/]"
console.print(f" {status} {gate_name}: {result.get('details', '')[:100]}")
if "score" in result:
console.print(f" Score: {result['score']:.2f}")
finally:
db.close()
@pipeline.command("status")
def pipeline_status():
"""Show pipeline health: processing stages, generated drafts, and API cost."""
cfg = _get_config()
db = Database(cfg)
try:
# Pipeline health overview
total = db.count_drafts()
rated_count = len(db.drafts_with_ratings(limit=10000))
unrated = len(db.unrated_drafts(limit=10000))
unembedded = len(db.drafts_without_embeddings(limit=10000))
embedded_count = total - unembedded
no_ideas = len(db.drafts_without_ideas(limit=10000))
ideas_count = total - no_ideas
idea_total = db.idea_count()
gap_count = len(db.all_gaps())
input_tok, output_tok = db.total_tokens_used()
est_cost = (input_tok * 3.0 / 1_000_000) + (output_tok * 15.0 / 1_000_000)
# Last update
snapshots = db.get_snapshots(limit=1)
last_update = snapshots[0]["snapshot_at"][:19] if snapshots else "never"
console.print("\n[bold]Pipeline Status[/]\n")
console.print(f" Total documents: [bold]{total}[/]")
console.print(f" Last update: {last_update}")
console.print()
# Stage table
stage_table = Table(title="Processing Stages")
stage_table.add_column("Stage", width=20)
stage_table.add_column("Done", justify="right", width=8)
stage_table.add_column("Missing", justify="right", width=8)
stage_table.add_column("Progress", width=20)
def bar(done, total_n):
pct = int(done / total_n * 100) if total_n > 0 else 0
filled = pct // 5
return f"[green]{'#' * filled}[/][dim]{'.' * (20 - filled)}[/] {pct}%"
stage_table.add_row("Rated", str(rated_count), str(unrated), bar(rated_count, total))
stage_table.add_row("Embedded", str(embedded_count), str(unembedded), bar(embedded_count, total))
stage_table.add_row("Ideas extracted", str(ideas_count), str(no_ideas), bar(ideas_count, total))
console.print(stage_table)
console.print(f"\n Total ideas: [bold]{idea_total}[/]")
console.print(f" Gaps identified: [bold]{gap_count}[/]")
console.print(f"\n API tokens: {input_tok:,} in + {output_tok:,} out")
console.print(f" Estimated cost: [bold]${est_cost:.2f}[/]")
# Generated drafts
gen_drafts = db.get_generated_drafts()
if gen_drafts:
console.print()
table = Table(title=f"Generated Drafts ({len(gen_drafts)})")
table.add_column("ID", justify="right", width=4)
table.add_column("Draft Name", style="cyan")
table.add_column("Gap Topic")
table.add_column("Family", width=15)
table.add_column("Status", width=10)
table.add_column("Quality", justify="right", width=7)
table.add_column("Created", width=10)
for d in gen_drafts:
table.add_row(
str(d["id"]),
d["draft_name"],
d["gap_topic"][:30],
d.get("family_name", ""),
d.get("status", "?"),
f"{d.get('quality_score', 0):.1f}" if d.get("quality_score") else "-",
(d.get("created_at") or "")[:10],
)
console.print(table)
finally:
db.close()
@pipeline.command("export")
@click.argument("draft_id", type=int)
@click.option("--output", "-o", help="Output file path")
def pipeline_export(draft_id: int, output: str | None):
"""Export a generated draft as I-D text."""
cfg = _get_config()
db = Database(cfg)
try:
draft = db.get_generated_draft(draft_id)
if not draft:
console.print(f"[red]Draft #{draft_id} not found[/]")
return
text = draft.get("full_text", "")
if not text:
console.print(f"[red]Draft #{draft_id} has no generated text[/]")
return
if output:
out_path = Path(output)
else:
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
out_path = output_dir / f"{draft['draft_name']}.txt"
out_path.write_text(text)
console.print(f"Exported: [bold green]{out_path}[/]")
finally:
db.close()
# ── observatory ─────────────────────────────────────────────────────────────
@main.group()
def observatory():
"""Living Standards Observatory — monitor AI standards across bodies."""
pass
@observatory.command("update")
@click.option("--source", "-s", default=None, help="Comma-separated sources (e.g. ietf,w3c)")
@click.option("--full/--delta", default=False, help="Full refresh or delta only")
@click.option("--dry-run", is_flag=True, default=False, help="Show what would happen without making changes")
def observatory_update(source: str | None, full: bool, dry_run: bool):
"""Fetch, analyze, and update the observatory."""
from .observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
if dry_run:
obs = Observatory(cfg, db)
else:
from .analyzer import Analyzer
analyzer = Analyzer(cfg, db)
obs = Observatory(cfg, db, analyzer)
sources = source.split(",") if source else None
mode = "full" if full else "delta"
console.print(f"[bold]Observatory update[/] ({mode}{' [DRY RUN]' if dry_run else ''})")
result = obs.update(sources=sources, full=full, dry_run=dry_run)
if not dry_run:
console.print(f"\n[bold green]Update complete![/]")
console.print(f" New docs: {result.get('new_docs', 0)}")
console.print(f" Analyzed: {result.get('analyzed', 0)}")
console.print(f" Embedded: {result.get('embedded', 0)}")
console.print(f" Ideas extracted: {result.get('ideas', 0)}")
if result.get("gaps_changed"):
console.print(f" Gaps re-analyzed: yes")
if result.get("errors"):
console.print(f"\n [yellow]Errors ({len(result['errors'])}):[/]")
for err in result["errors"]:
console.print(f" - {err}")
finally:
db.close()
@observatory.command("dashboard")
def observatory_dashboard():
"""Regenerate the static dashboard site."""
from .dashboard import DashboardGenerator
cfg = _get_config()
db = Database(cfg)
try:
gen = DashboardGenerator(cfg, db)
path = gen.generate()
console.print(f"[bold green]Dashboard generated:[/] {path}")
console.print(f" Open: file://{path}/index.html")
finally:
db.close()
@observatory.command("status")
def observatory_status():
"""Show observatory status — doc counts, sources, last update."""
from .observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
obs = Observatory(cfg, db)
status = obs.status()
console.print(f"\n[bold]Observatory Status[/]\n")
console.print(f" Total documents: [bold]{status.get('total_docs', 0)}[/]")
console.print(f" Unrated: {status.get('unrated', 0)}")
console.print(f" Unembedded: {status.get('unembedded', 0)}")
console.print(f" Gaps: {status.get('gaps', 0)}")
sources = status.get("sources", {})
if sources:
console.print(f"\n [bold]Sources:[/]")
for name, count in sources.items():
console.print(f" {name}: {count} docs")
last_update = status.get("last_update")
if last_update:
console.print(f"\n Last update: {last_update[:10]}")
console.print(f" Snapshots: {status.get('snapshots', 0)}")
finally:
db.close()
@observatory.command("snapshot")
def observatory_snapshot():
"""Record current state as a snapshot."""
cfg = _get_config()
db = Database(cfg)
try:
snap_id = db.create_snapshot()
gaps = db.all_gaps()
if gaps:
db.record_gap_history(snap_id, gaps)
console.print(f"[bold green]Snapshot #{snap_id} created[/] ({db.count_drafts()} docs, {len(gaps)} gaps)")
finally:
db.close()
@observatory.command("diff")
@click.option("--since", help="Show changes since this date (YYYY-MM-DD)")
def observatory_diff(since: str | None):
"""Show what changed since a date."""
from .observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
obs = Observatory(cfg, db)
result = obs.diff(since=since)
console.print(f"\n[bold]Observatory Diff[/]")
if since:
console.print(f" Since: {result.get('since', since)}")
new_docs = result.get("new_docs", [])
console.print(f" New documents: {result.get('new_doc_count', len(new_docs))}")
gap_changes = result.get("gap_changes", [])
console.print(f" Gap history entries: {len(gap_changes)}")
if new_docs:
console.print(f"\n [bold]New documents:[/]")
for doc in new_docs[:20]:
d = dict(doc) if not isinstance(doc, dict) else doc
console.print(f" [{d.get('source', '?')}] {d.get('name', '?')}: {d.get('title', '')[:60]}")
finally:
db.close()
# ── monitor ─────────────────────────────────────────────────────────────
@main.group()
def monitor():
"""Monitor IETF Datatracker for new AI/agent drafts."""
pass
@monitor.command("run")
@click.option("--analyze/--no-analyze", default=True, help="Analyze new drafts")
@click.option("--embed/--no-embed", default=True, help="Generate embeddings")
@click.option("--ideas/--no-ideas", default=True, help="Extract ideas")
def monitor_run(analyze, embed, ideas):
"""Run one monitoring cycle: fetch -> analyze -> embed -> ideas."""
from .analyzer import Analyzer
from .embeddings import Embedder
from .fetcher import Fetcher
cfg = _get_config()
db = Database(cfg)
run_id = db.start_monitor_run()
stats = {
"new_drafts_found": 0,
"drafts_analyzed": 0,
"drafts_embedded": 0,
"ideas_extracted": 0,
}
try:
console.print("[bold]Monitor run started[/]")
# Determine since date from last successful run
last_run = db.get_last_successful_run()
since = last_run["completed_at"][:10] if last_run and last_run.get("completed_at") else cfg.fetch_since
console.print(f" Fetching drafts since: [cyan]{since}[/]")
# Fetch new drafts
fetcher = Fetcher(cfg)
try:
existing_count = db.count_drafts()
drafts = fetcher.search_drafts(keywords=list(cfg.search_keywords), since=since)
for draft in drafts:
db.upsert_draft(draft)
# Download text for any missing
missing_text = db.drafts_without_text()
if missing_text:
console.print(f" Downloading text for [bold]{len(missing_text)}[/] drafts...")
texts = fetcher.download_texts(missing_text)
for name, text in texts.items():
draft = db.get_draft(name)
if draft:
draft.full_text = text
db.upsert_draft(draft)
finally:
fetcher.close()
new_count = db.count_drafts() - existing_count
stats["new_drafts_found"] = max(new_count, 0)
console.print(f" New drafts found: [bold green]{stats['new_drafts_found']}[/]")
# Analyze unrated drafts
if analyze:
unrated = db.unrated_drafts(limit=200)
if unrated:
console.print(f" Analyzing [bold]{len(unrated)}[/] unrated drafts...")
analyzer = Analyzer(cfg, db)
count = analyzer.rate_all_unrated(limit=200)
stats["drafts_analyzed"] = count
console.print(f" Analyzed: [bold green]{count}[/]")
# Embed missing drafts
if embed:
missing_embed = db.drafts_without_embeddings(limit=500)
if missing_embed:
console.print(f" Embedding [bold]{len(missing_embed)}[/] drafts...")
embedder = Embedder(cfg, db)
count = embedder.embed_all_missing()
stats["drafts_embedded"] = count
console.print(f" Embedded: [bold green]{count}[/]")
# Extract ideas
if ideas:
missing_ideas = db.drafts_without_ideas(limit=500)
if missing_ideas:
console.print(f" Extracting ideas from [bold]{len(missing_ideas)}[/] drafts...")
analyzer = Analyzer(cfg, db)
count = analyzer.extract_all_ideas(limit=500, batch_size=5, cheap=True)
stats["ideas_extracted"] = count
console.print(f" Ideas extracted from: [bold green]{count}[/] drafts")
db.complete_monitor_run(run_id, stats)
console.print("\n[bold green]Monitor run completed successfully[/]")
except Exception as e:
db.fail_monitor_run(run_id, str(e))
console.print(f"\n[bold red]Monitor run failed:[/] {e}")
raise
finally:
db.close()
@monitor.command("status")
def monitor_status():
"""Show monitoring status and recent runs."""
cfg = _get_config()
db = Database(cfg)
try:
runs = db.get_monitor_runs(limit=20)
last = db.get_last_successful_run()
# Unprocessed counts
unrated = len(db.unrated_drafts(limit=9999))
unembedded = len(db.drafts_without_embeddings(limit=9999))
no_ideas = len(db.drafts_without_ideas(limit=9999))
console.print("\n[bold]Monitor Status[/]\n")
if last:
console.print(f" Last successful run: [green]{last['completed_at']}[/]")
console.print(f" Duration: {last['duration_seconds']:.1f}s")
console.print(f" New drafts: {last['new_drafts_found']}")
else:
console.print(" [yellow]No successful runs yet[/]")
console.print(f"\n[bold]Unprocessed[/]")
console.print(f" Unrated: [{'yellow' if unrated > 0 else 'green'}]{unrated}[/]")
console.print(f" Unembedded: [{'yellow' if unembedded > 0 else 'green'}]{unembedded}[/]")
console.print(f" No ideas: [{'yellow' if no_ideas > 0 else 'green'}]{no_ideas}[/]")
if runs:
console.print(f"\n[bold]Recent Runs[/] ({len(runs)} total)\n")
table = Table()
table.add_column("#", justify="right", width=4)
table.add_column("Started", width=20)
table.add_column("Duration", justify="right", width=8)
table.add_column("Status", width=10)
table.add_column("New", justify="right", width=5)
table.add_column("Analyzed", justify="right", width=8)
table.add_column("Embedded", justify="right", width=8)
table.add_column("Ideas", justify="right", width=6)
for r in runs:
status_style = {"completed": "green", "failed": "red", "running": "yellow"}.get(r["status"], "dim")
table.add_row(
str(r["id"]),
r["started_at"][:19] if r["started_at"] else "",
f"{r['duration_seconds']:.1f}s" if r["duration_seconds"] else "-",
f"[{status_style}]{r['status']}[/{status_style}]",
str(r["new_drafts_found"]),
str(r["drafts_analyzed"]),
str(r["drafts_embedded"]),
str(r["ideas_extracted"]),
)
console.print(table)
finally:
db.close()
# ── export ──────────────────────────────────────────────────────────────────
@main.command()
@click.option("--type", "export_type", type=click.Choice(["drafts", "ideas", "gaps", "authors", "ratings"]),
required=True, help="Type of data to export")
@click.option("--format", "fmt", type=click.Choice(["json", "csv"]), default="json", help="Output format")
@click.option("--output", "-o", "output_file", type=click.Path(), default=None,
help="Output file (default: stdout)")
def export(export_type: str, fmt: str, output_file: str | None):
"""Export data as JSON or CSV."""
import csv as csv_mod
import io
import json
cfg = _get_config()
db = Database(cfg)
try:
rows: list[dict] = []
if export_type == "drafts":
drafts = db.list_drafts(limit=10000, order_by="name ASC")
for d in drafts:
rating = db.get_rating(d.name)
row = {
"name": d.name,
"title": d.title,
"rev": d.rev,
"date": d.date,
"pages": d.pages or 0,
"group": d.group or "",
}
if rating:
row["score"] = round(rating.composite_score, 2)
row["novelty"] = rating.novelty
row["maturity"] = rating.maturity
row["overlap"] = rating.overlap
row["momentum"] = rating.momentum
row["relevance"] = rating.relevance
row["categories"] = json.dumps(rating.categories)
row["summary"] = rating.summary
rows.append(row)
elif export_type == "ideas":
ideas = db.all_ideas()
rows = ideas
elif export_type == "gaps":
gaps = db.all_gaps()
rows = gaps
elif export_type == "authors":
top = db.top_authors(limit=10000)
for name, aff, cnt, drafts_list in top:
rows.append({
"name": name,
"affiliation": aff,
"draft_count": cnt,
"drafts": json.dumps(drafts_list),
})
elif export_type == "ratings":
pairs = db.drafts_with_ratings(limit=10000)
for draft, rating in pairs:
rows.append({
"name": draft.name,
"title": draft.title,
"score": round(rating.composite_score, 2),
"novelty": rating.novelty,
"maturity": rating.maturity,
"overlap": rating.overlap,
"momentum": rating.momentum,
"relevance": rating.relevance,
"categories": json.dumps(rating.categories),
"summary": rating.summary,
})
if fmt == "json":
text = json.dumps(rows, indent=2, ensure_ascii=False)
else:
# CSV
if not rows:
text = ""
else:
si = io.StringIO()
writer = csv_mod.DictWriter(si, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
writer.writerow(row)
text = si.getvalue()
if output_file:
Path(output_file).write_text(text, encoding="utf-8")
console.print(f"Exported [bold green]{len(rows)}[/] {export_type} to [cyan]{output_file}[/] ({fmt})")
else:
click.echo(text)
finally:
db.close()