v0.2.0: visualizations, interactive browser, arXiv paper, gap analysis

New features:
- 12 interactive visualizations (ietf viz): t-SNE landscape, similarity
  heatmap, score distributions, timeline, bubble explorer, radar charts,
  author network graph, category treemap, quality vs overlap, org bar chart,
  ideas chart, and interactive draft browser
- Interactive draft browser (browser.html): filterable by category, keyword,
  score sliders with sortable table and expandable detail rows
- arXiv paper (paper/main.tex): 13-page manuscript with all findings
- Gap analysis: 12 identified under-addressed areas
- Author network: collaboration graph, org contributions, cross-org analysis
- Draft generation from gaps (ietf draft-gen)
- Auto-load .env for API keys (python-dotenv)

New modules: visualize.py, authors.py, draftgen.py
New reports: timeline, overlap-matrix, authors, gaps
New deps: plotly, matplotlib, seaborn, scipy, scikit-learn, networkx, python-dotenv

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 13:37:55 +01:00
parent f44f9265bd
commit be9cf9c5d9
32 changed files with 4447 additions and 4 deletions

View File

@@ -5,6 +5,12 @@ from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
# Load .env from project root (two levels up from this file, or cwd)
load_dotenv(Path(__file__).resolve().parent.parent.parent / ".env")
load_dotenv() # Also check cwd
import anthropic
from rich.console import Console
@@ -62,6 +68,53 @@ Compare these IETF drafts — overlaps, unique ideas, complementary vs competing
Be specific about concrete mechanisms and design choices."""
EXTRACT_IDEAS_PROMPT = """\
Extract discrete technical ideas and mechanisms from this IETF draft.
Return a JSON array. Each element: {{"title":"short name","description":"1-2 sentences","type":"mechanism|protocol|pattern|requirement|architecture|extension"}}
{name} | {title} | {pages}pg
Abstract: {abstract}
{text_excerpt}
Return 3-8 ideas. Focus on CONCRETE technical contributions, not general statements.
JSON array only, no fences."""
BATCH_IDEAS_PROMPT = """\
Extract ideas from each IETF draft below. Return a JSON object mapping draft name -> array of ideas.
Per idea: {{"title":"short name","description":"1 sentence","type":"mechanism|protocol|pattern|requirement|architecture|extension"}}
{drafts_block}
3-8 ideas per draft. CONCRETE technical contributions only.
Return ONLY a JSON object like {{"draft-name":[...], ...}}, no fences."""
GAP_ANALYSIS_PROMPT = """\
You are analyzing the landscape of {total} IETF Internet-Drafts related to AI agents and autonomous systems.
## Categories and Draft Counts
{category_summary}
## Most Common Technical Ideas
{top_ideas}
## Known Overlap Clusters (groups of highly similar drafts)
{overlap_summary}
Identify 8-15 GAPS — areas, problems, or technical challenges NOT adequately addressed by existing drafts.
Return a JSON array:
[{{"topic":"short topic name","description":"2-3 sentence description","category":"closest category or new","severity":"critical|high|medium|low","evidence":"what suggests this gap matters"}}]
Focus on:
1. Problems mentioned but not solved
2. Missing infrastructure pieces
3. Security/privacy/safety issues not addressed
4. Interoperability gaps between competing proposals
5. Real-world deployment concerns ignored
JSON array only, no fences."""
def _prompt_hash(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()[:16]
@@ -100,10 +153,15 @@ class Analyzer:
rated_at=datetime.now(timezone.utc).isoformat(),
)
def _call_claude(self, prompt: str, max_tokens: int = 512) -> tuple[str, int, int]:
"""Call Claude and return (text, input_tokens, output_tokens)."""
def _call_claude(self, prompt: str, max_tokens: int = 512, cheap: bool = False) -> tuple[str, int, int]:
"""Call Claude and return (text, input_tokens, output_tokens).
Args:
cheap: If True, use claude_model_cheap (Haiku) for lower cost.
"""
model = self.config.claude_model_cheap if cheap else self.config.claude_model
resp = self.client.messages.create(
model=self.config.claude_model,
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
@@ -252,6 +310,232 @@ class Analyzer:
)
return count
def extract_ideas(self, draft_name: str, use_cache: bool = True) -> list[dict] | None:
"""Extract technical ideas from a single draft."""
draft = self.db.get_draft(draft_name)
if draft is None:
console.print(f"[red]Draft not found: {draft_name}[/]")
return None
text_excerpt = ""
if draft.full_text:
text_excerpt = draft.full_text[:3000]
prompt = EXTRACT_IDEAS_PROMPT.format(
name=draft.name, title=draft.title,
pages=draft.pages or "?",
abstract=draft.abstract[:2000],
text_excerpt=text_excerpt,
)
phash = _prompt_hash("ideas-" + prompt)
if use_cache:
cached = self.db.get_cached_response(draft_name, phash)
if cached:
try:
ideas = json.loads(cached)
if isinstance(ideas, list):
self.db.insert_ideas(draft_name, ideas)
return ideas
except (json.JSONDecodeError, KeyError):
pass
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=1024)
text = self._extract_json(text)
ideas = json.loads(text)
if not isinstance(ideas, list):
ideas = [ideas]
self.db.cache_response(
draft_name, phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
self.db.insert_ideas(draft_name, ideas)
return ideas
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Failed ideas for {draft_name}: {e}[/]")
return None
def extract_ideas_batch(self, draft_names: list[str], cheap: bool = True) -> int:
"""Extract ideas from multiple drafts in a single API call.
Uses batching to share prompt overhead — ~5x fewer API calls,
~3x fewer tokens than individual extraction.
"""
drafts = []
for name in draft_names:
d = self.db.get_draft(name)
if d:
drafts.append(d)
if not drafts:
return 0
# Build compact batch block — abstract only (no full text for batch)
drafts_block = ""
for d in drafts:
drafts_block += f"\n---\n{d.name} | {d.title}\nAbstract: {d.abstract[:800]}\n"
prompt = BATCH_IDEAS_PROMPT.format(drafts_block=drafts_block)
phash = _prompt_hash(prompt)
try:
text, in_tok, out_tok = self._call_claude(
prompt, max_tokens=400 * len(drafts), cheap=cheap
)
text = self._extract_json(text)
results = json.loads(text)
if not isinstance(results, dict):
# Fallback: if it returned a list, try to match by order
if isinstance(results, list) and len(results) == len(drafts):
results = {d.name: r for d, r in zip(drafts, results)}
else:
return 0
count = 0
for d in drafts:
ideas = results.get(d.name, [])
if ideas:
if not isinstance(ideas, list):
ideas = [ideas]
self.db.cache_response(
d.name, _prompt_hash(f"batch-ideas-{phash}-{d.name}"),
self.config.claude_model_cheap if cheap else self.config.claude_model,
f"batch-ideas[{d.name}]", json.dumps(ideas),
in_tok // len(drafts), out_tok // len(drafts),
)
self.db.insert_ideas(d.name, ideas)
count += 1
return count
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Batch ideas failed: {e}[/]")
return 0
def extract_all_ideas(self, limit: int = 300, batch_size: int = 5, cheap: bool = True) -> int:
"""Extract ideas from all drafts that don't have them yet.
Args:
batch_size: Number of drafts per API call (default 5).
Set to 1 to use individual calls with full text.
cheap: Use Haiku model for ~10x lower cost (default True).
"""
missing = self.db.drafts_without_ideas(limit=limit)
if not missing:
console.print("All drafts already have extracted ideas.")
return 0
model_label = "Haiku" if cheap else "Sonnet"
if batch_size > 1:
console.print(
f"Extracting ideas from [bold]{len(missing)}[/] drafts "
f"(batches of {batch_size}, {model_label})..."
)
else:
console.print(f"Extracting ideas from [bold]{len(missing)}[/] drafts ({model_label})...")
count = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Extracting ideas...", total=len(missing))
if batch_size > 1:
for i in range(0, len(missing), batch_size):
batch = missing[i:i + batch_size]
names = ", ".join(n.split("-")[-1][:10] for n in batch)
progress.update(task, description=f"Batch: {names}")
n = self.extract_ideas_batch(batch, cheap=cheap)
count += n
progress.advance(task, advance=len(batch))
else:
for name in missing:
progress.update(task, description=f"Ideas: {name.split('-')[-1][:15]}")
result = self.extract_ideas(name)
if result:
count += 1
progress.advance(task)
in_tok, out_tok = self.db.total_tokens_used()
console.print(
f"Extracted ideas from [bold green]{count}[/] drafts "
f"({self.db.idea_count()} total ideas) "
f"| Tokens: {in_tok:,} in + {out_tok:,} out"
)
return count
def gap_analysis(self) -> list[dict]:
"""Analyze the full landscape and identify gaps."""
# Build compressed landscape summary
pairs = self.db.drafts_with_ratings(limit=500)
total = self.db.count_drafts()
# Category summary
from collections import defaultdict
cat_counts: dict[str, int] = defaultdict(int)
for _, rating in pairs:
for c in rating.categories:
cat_counts[c] += 1
category_summary = "\n".join(f"- {c}: {n} drafts" for c, n in
sorted(cat_counts.items(), key=lambda x: x[1], reverse=True))
# Top ideas (if available)
all_ideas = self.db.all_ideas()
idea_freq: dict[str, int] = defaultdict(int)
for idea in all_ideas:
idea_freq[idea["title"]] += 1
top_ideas_list = sorted(idea_freq.items(), key=lambda x: x[1], reverse=True)[:20]
if top_ideas_list:
top_ideas = "\n".join(f"- {title} ({count} drafts)" for title, count in top_ideas_list)
else:
top_ideas = "(No idea extraction data available yet)"
# Overlap summary — use clusters report if it exists
overlap_summary = "Multiple clusters of near-duplicate drafts exist, particularly in:\n"
for c, n in sorted(cat_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
overlap_summary += f"- {c} ({n} drafts, high internal overlap)\n"
prompt = GAP_ANALYSIS_PROMPT.format(
total=total,
category_summary=category_summary,
top_ideas=top_ideas,
overlap_summary=overlap_summary,
)
phash = _prompt_hash(prompt)
# Check cache
cached = self.db.get_cached_response("_landscape_", phash)
if cached:
try:
gaps = json.loads(cached)
if isinstance(gaps, list):
self.db.insert_gaps(gaps)
return gaps
except (json.JSONDecodeError, KeyError):
pass
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=4096)
text = self._extract_json(text)
gaps = json.loads(text)
if not isinstance(gaps, list):
gaps = [gaps]
self.db.cache_response(
"_landscape_", phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
self.db.insert_gaps(gaps)
return gaps
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Gap analysis failed: {e}[/]")
return []
def compare_drafts(self, draft_names: list[str]) -> str:
"""Compare multiple drafts and return analysis text."""
parts = []

View File

@@ -0,0 +1,137 @@
"""Author network — fetch authors from Datatracker, build collaboration graph."""
from __future__ import annotations
import time as time_mod
from datetime import datetime, timezone
import httpx
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
from .config import Config
from .db import Database
from .models import Author
API_BASE = "https://datatracker.ietf.org/api/v1"
console = Console()
class AuthorNetwork:
def __init__(self, config: Config | None = None, db: Database | None = None):
self.config = config or Config.load()
self.db = db or Database(self.config)
self.client = httpx.Client(timeout=30, follow_redirects=True)
self._person_cache: dict[int, Author] = {}
def close(self) -> None:
self.client.close()
def _extract_person_id(self, person_uri: str) -> int | None:
"""Extract person_id from a URI like /api/v1/person/person/12345/."""
if not person_uri:
return None
parts = person_uri.strip("/").split("/")
try:
return int(parts[-1])
except (ValueError, IndexError):
return None
def fetch_person(self, person_id: int) -> Author | None:
"""Fetch a person's details from Datatracker."""
if person_id in self._person_cache:
return self._person_cache[person_id]
try:
resp = self.client.get(
f"{API_BASE}/person/person/{person_id}/",
params={"format": "json"},
)
resp.raise_for_status()
data = resp.json()
author = Author(
person_id=person_id,
name=data.get("name", ""),
ascii_name=data.get("ascii", ""),
affiliation="", # Will be set from documentauthor
resource_uri=data.get("resource_uri", ""),
fetched_at=datetime.now(timezone.utc).isoformat(),
)
self._person_cache[person_id] = author
time_mod.sleep(self.config.fetch_delay)
return author
except httpx.HTTPError as e:
console.print(f"[dim]Could not fetch person {person_id}: {e}[/]")
return None
def fetch_authors_for_draft(self, draft_name: str) -> list[tuple[Author, int, str]]:
"""Fetch authors for a single draft. Returns [(Author, order, affiliation)]."""
try:
resp = self.client.get(
f"{API_BASE}/doc/documentauthor/",
params={"document__name": draft_name, "format": "json", "limit": 50},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
console.print(f"[dim]Could not fetch authors for {draft_name}: {e}[/]")
return []
results: list[tuple[Author, int, str]] = []
for obj in data.get("objects", []):
person_uri = obj.get("person", "")
person_id = self._extract_person_id(person_uri)
if person_id is None:
continue
affiliation = obj.get("affiliation", "")
order = obj.get("order", 1)
author = self.fetch_person(person_id)
if author is None:
continue
# Use the affiliation from the document author record
author_with_aff = Author(
person_id=author.person_id,
name=author.name,
ascii_name=author.ascii_name,
affiliation=affiliation or author.affiliation,
resource_uri=author.resource_uri,
fetched_at=author.fetched_at,
)
results.append((author_with_aff, order, affiliation))
time_mod.sleep(self.config.fetch_delay)
return results
def fetch_all_authors(self, limit: int = 500) -> int:
"""Fetch authors for all drafts missing author data."""
missing = self.db.drafts_without_authors(limit=limit)
if not missing:
console.print("All drafts already have author data.")
return 0
count = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Fetching authors...", total=len(missing))
for draft_name in missing:
progress.update(task, description=f"Authors: {draft_name.split('-')[-1][:15]}")
authors = self.fetch_authors_for_draft(draft_name)
for author, order, affiliation in authors:
self.db.upsert_author(author)
self.db.upsert_draft_author(draft_name, author.person_id, order, affiliation)
if authors:
count += 1
progress.advance(task)
console.print(f"Fetched authors for [bold green]{count}[/] drafts "
f"({self.db.author_count()} unique authors)")
return count

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
@@ -372,6 +374,435 @@ def digest(days: int):
db.close()
@report.command()
def timeline():
"""Timeline of draft submissions by month and category."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.timeline()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("overlap-matrix")
def overlap_matrix():
"""Full pairwise overlap matrix report."""
from .embeddings import Embedder
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
embedder = Embedder(cfg, db)
reporter = Reporter(cfg, db)
try:
console.print("Computing 260x260 similarity matrix...")
path = reporter.overlap_matrix(embedder)
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("authors")
def authors_report():
"""Author and organization network report."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.authors_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
@report.command("ideas")
def ideas_report():
"""Report on extracted technical ideas."""
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
reporter = Reporter(cfg, db)
try:
path = reporter.ideas_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
# ── visualize ────────────────────────────────────────────────────────────
@main.group()
def viz():
"""Generate interactive visualizations (HTML/PNG)."""
pass
@viz.command("all")
def viz_all():
"""Generate all available visualizations."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
paths = v.generate_all()
console.print(f"\n[bold green]{len(paths)} visualizations[/] saved to {v.output_dir}/")
finally:
db.close()
@viz.command("landscape")
@click.option("--method", "-m", default="tsne", type=click.Choice(["umap", "tsne"]),
help="Dimensionality reduction method")
def viz_landscape(method: str):
"""2D scatter of draft embeddings colored by category."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.landscape_scatter(method=method)
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("heatmap")
def viz_heatmap():
"""Clustered similarity heatmap (PNG)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.similarity_heatmap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("distributions")
def viz_distributions():
"""Rating dimension distributions by category (PNG)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.score_distributions()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("timeline")
def viz_timeline():
"""Stacked area chart of monthly submissions."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.timeline_chart()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("bubble")
def viz_bubble():
"""Interactive bubble chart: novelty vs maturity."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.bubble_explorer()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("radar")
def viz_radar():
"""Radar chart of average category rating profiles."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.category_radar()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("network")
@click.option("--min-shared", "-n", default=2, help="Minimum shared drafts for an edge")
def viz_network(min_shared: int):
"""Interactive author collaboration network graph."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.author_network(min_shared=min_shared)
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("treemap")
def viz_treemap():
"""Category treemap colored by average score."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.category_treemap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("quality")
def viz_quality():
"""Score vs uniqueness scatter (quality vs redundancy)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.score_vs_overlap()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("orgs")
def viz_orgs():
"""Organization contribution bar chart."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.org_contributions()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("ideas")
def viz_ideas():
"""Ideas frequency chart by type."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.ideas_chart()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
@viz.command("browser")
def viz_browser():
"""Interactive filterable draft browser (standalone HTML)."""
from .visualize import Visualizer
cfg = _get_config()
db = Database(cfg)
v = Visualizer(cfg, db)
try:
path = v.draft_browser()
console.print(f"Saved: [bold]{path}[/]")
finally:
db.close()
# ── authors ─────────────────────────────────────────────────────────────
@main.command()
@click.argument("name", required=False)
@click.option("--fetch/--no-fetch", default=False, help="Fetch author data from Datatracker first")
@click.option("--limit", "-n", default=20, help="Number of top authors to show")
def authors(name: str | None, fetch: bool, limit: int):
"""Show authors for a draft, or top authors overall."""
from .authors import AuthorNetwork
cfg = _get_config()
db = Database(cfg)
network = AuthorNetwork(cfg, db)
try:
if fetch:
count = network.fetch_all_authors()
console.print(f"Fetched authors for [bold green]{count}[/] drafts")
if name:
draft_authors = db.get_authors_for_draft(name)
if not draft_authors:
console.print(f"[yellow]No author data for {name}. Run `ietf authors --fetch` first.[/]")
return
console.print(f"\n[bold]Authors of {name}:[/]")
for a in draft_authors:
console.print(f" - {a.name} ({a.affiliation or 'no affiliation'})")
else:
top = db.top_authors(limit=limit)
if not top:
console.print("[yellow]No author data. Run `ietf authors --fetch` first.[/]")
return
table = Table(title=f"Top {limit} Authors")
table.add_column("#", justify="right", width=4)
table.add_column("Author", style="cyan")
table.add_column("Organization")
table.add_column("Drafts", justify="right", width=6)
for rank, (aname, aff, cnt, _) in enumerate(top, 1):
table.add_row(str(rank), aname, aff, str(cnt))
console.print(table)
finally:
db.close()
@main.command()
@click.option("--top", "-n", default=20, help="Top N to show")
def network(top: int):
"""Show author collaboration network."""
cfg = _get_config()
db = Database(cfg)
try:
console.print("\n[bold]Top Organizations[/]")
orgs = db.top_orgs(limit=top)
if orgs:
table = Table()
table.add_column("#", justify="right", width=4)
table.add_column("Organization", style="cyan")
table.add_column("Authors", justify="right", width=8)
table.add_column("Drafts", justify="right", width=6)
for rank, (org, auth_cnt, draft_cnt) in enumerate(orgs, 1):
table.add_row(str(rank), org, str(auth_cnt), str(draft_cnt))
console.print(table)
console.print("\n[bold]Cross-Org Collaboration[/]")
cross = db.cross_org_collaborations(limit=top)
if cross:
table = Table()
table.add_column("Org A", style="cyan")
table.add_column("Org B", style="cyan")
table.add_column("Shared Drafts", justify="right", width=8)
for org_a, org_b, shared in cross:
table.add_row(org_a, org_b, str(shared))
console.print(table)
else:
console.print("[yellow]No author data. Run `ietf authors --fetch` first.[/]")
finally:
db.close()
# ── ideas ───────────────────────────────────────────────────────────────
@main.command()
@click.argument("name", required=False)
@click.option("--all", "extract_all", is_flag=True, help="Extract ideas from all drafts")
@click.option("--limit", "-n", default=50, help="Max drafts to extract (with --all)")
@click.option("--batch", "-b", default=5, help="Drafts per API call (default 5, set 1 for individual)")
@click.option("--cheap/--quality", default=True, help="Use Haiku (cheap) vs Sonnet (quality)")
def ideas(name: str | None, extract_all: bool, limit: int, batch: int, cheap: bool):
"""Extract technical ideas from drafts using Claude."""
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
if extract_all:
count = analyzer.extract_all_ideas(limit=limit, batch_size=batch, cheap=cheap)
console.print(f"Extracted ideas from [bold green]{count}[/] drafts")
elif name:
idea_list = analyzer.extract_ideas(name)
if idea_list:
console.print(f"\n[bold]Ideas from {name}:[/]\n")
for idea in idea_list:
console.print(f" [{idea.get('type', '?')}] [bold]{idea['title']}[/]")
console.print(f" {idea['description']}\n")
else:
console.print("[red]Extraction failed or no ideas found[/]")
else:
console.print("Provide a draft name or use --all")
finally:
db.close()
# ── gaps ────────────────────────────────────────────────────────────────
@main.command()
@click.option("--refresh", is_flag=True, help="Re-run gap analysis even if cached")
def gaps(refresh: bool):
"""Identify gaps in the current draft landscape using Claude."""
from .analyzer import Analyzer
from .reports import Reporter
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
reporter = Reporter(cfg, db)
try:
existing = db.all_gaps()
if existing and not refresh:
console.print(f"[bold]{len(existing)} gaps[/] already identified (use --refresh to re-run)\n")
else:
gap_list = analyzer.gap_analysis()
console.print(f"\nIdentified [bold green]{len(gap_list)}[/] gaps\n")
existing = gap_list
for i, gap in enumerate(existing if isinstance(existing[0], dict) else [], 1):
sev = gap.get("severity", "medium").upper()
console.print(f" [bold]{i}. {gap['topic']}[/] [{sev}]")
console.print(f" {gap['description'][:100]}\n")
path = reporter.gaps_report()
console.print(f"Report saved: [bold]{path}[/]")
finally:
db.close()
# ── draft-gen ───────────────────────────────────────────────────────────
@main.command("draft-gen")
@click.argument("gap_topic")
@click.option("--output", "-o", help="Output file path")
def draft_gen(gap_topic: str, output: str | None):
"""Generate an Internet-Draft addressing a landscape gap."""
from .draftgen import DraftGenerator
from .analyzer import Analyzer
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
generator = DraftGenerator(cfg, db, analyzer)
try:
out_path = output or str(Path(cfg.data_dir) / "reports" / "generated-draft.txt")
console.print(f"Generating Internet-Draft on: [bold]{gap_topic}[/]")
path = generator.generate(gap_topic, output_path=out_path)
console.print(f"\nDraft saved: [bold green]{path}[/]")
finally:
db.close()
# ── config ───────────────────────────────────────────────────────────────────

View File

@@ -26,6 +26,7 @@ class Config:
ollama_url: str = "http://localhost:11434"
ollama_embed_model: str = "nomic-embed-text"
claude_model: str = "claude-sonnet-4-20250514"
claude_model_cheap: str = "claude-haiku-4-5-20251001"
search_keywords: list[str] = field(default_factory=lambda: list(DEFAULT_KEYWORDS))
# Only fetch drafts newer than this (ISO date string)
fetch_since: str = "2024-01-01"

View File

@@ -10,7 +10,7 @@ from pathlib import Path
import numpy as np
from .config import Config
from .models import Draft, Rating
from .models import Author, Draft, Rating
SCHEMA = """
CREATE TABLE IF NOT EXISTS drafts (
@@ -76,6 +76,47 @@ CREATE VIRTUAL TABLE IF NOT EXISTS drafts_fts USING fts5(
content_rowid='rowid'
);
-- Authors (fetched from Datatracker)
CREATE TABLE IF NOT EXISTS authors (
person_id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
ascii_name TEXT,
affiliation TEXT DEFAULT '',
resource_uri TEXT,
fetched_at TEXT
);
CREATE TABLE IF NOT EXISTS draft_authors (
draft_name TEXT NOT NULL REFERENCES drafts(name),
person_id INTEGER NOT NULL REFERENCES authors(person_id),
author_order INTEGER DEFAULT 1,
affiliation TEXT DEFAULT '',
PRIMARY KEY (draft_name, person_id)
);
-- Extracted ideas
CREATE TABLE IF NOT EXISTS ideas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draft_name TEXT NOT NULL REFERENCES drafts(name),
title TEXT NOT NULL,
description TEXT NOT NULL,
idea_type TEXT DEFAULT '',
extracted_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_ideas_draft ON ideas(draft_name);
-- Gap analysis results
CREATE TABLE IF NOT EXISTS gaps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
description TEXT NOT NULL,
category TEXT DEFAULT '',
evidence TEXT DEFAULT '',
severity TEXT DEFAULT 'medium',
analyzed_at TEXT
);
-- Triggers to keep FTS index in sync
CREATE TRIGGER IF NOT EXISTS drafts_ai AFTER INSERT ON drafts BEGIN
INSERT INTO drafts_fts(rowid, name, title, abstract, full_text)
@@ -341,6 +382,189 @@ class Database:
).fetchone()
return (row[0], row[1])
# --- Authors ---
def upsert_author(self, author: Author) -> None:
self.conn.execute(
"""INSERT INTO authors (person_id, name, ascii_name, affiliation, resource_uri, fetched_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(person_id) DO UPDATE SET
name=excluded.name, ascii_name=excluded.ascii_name,
affiliation=excluded.affiliation, resource_uri=excluded.resource_uri,
fetched_at=excluded.fetched_at
""",
(author.person_id, author.name, author.ascii_name,
author.affiliation, author.resource_uri, author.fetched_at),
)
self.conn.commit()
def upsert_draft_author(
self, draft_name: str, person_id: int, order: int = 1, affiliation: str = ""
) -> None:
self.conn.execute(
"""INSERT INTO draft_authors (draft_name, person_id, author_order, affiliation)
VALUES (?, ?, ?, ?)
ON CONFLICT(draft_name, person_id) DO UPDATE SET
author_order=excluded.author_order, affiliation=excluded.affiliation
""",
(draft_name, person_id, order, affiliation),
)
self.conn.commit()
def get_authors_for_draft(self, draft_name: str) -> list[Author]:
rows = self.conn.execute(
"""SELECT a.* FROM authors a
JOIN draft_authors da ON a.person_id = da.person_id
WHERE da.draft_name = ?
ORDER BY da.author_order""",
(draft_name,),
).fetchall()
return [Author(
person_id=r["person_id"], name=r["name"],
ascii_name=r.get("ascii_name", ""),
affiliation=r.get("affiliation", ""),
resource_uri=r.get("resource_uri", ""),
fetched_at=r.get("fetched_at"),
) for r in rows]
def drafts_without_authors(self, limit: int = 500) -> list[str]:
rows = self.conn.execute(
"""SELECT d.name FROM drafts d
LEFT JOIN draft_authors da ON d.name = da.draft_name
WHERE da.draft_name IS NULL
LIMIT ?""",
(limit,),
).fetchall()
return [r["name"] for r in rows]
def author_count(self) -> int:
return self.conn.execute("SELECT COUNT(*) FROM authors").fetchone()[0]
def top_authors(self, limit: int = 20) -> list[tuple[str, str, int, list[str]]]:
"""Return (name, affiliation, draft_count, [draft_names])."""
rows = self.conn.execute(
"""SELECT a.name, a.affiliation, COUNT(da.draft_name) as cnt,
GROUP_CONCAT(da.draft_name, '||') as drafts
FROM authors a
JOIN draft_authors da ON a.person_id = da.person_id
GROUP BY a.person_id
ORDER BY cnt DESC
LIMIT ?""",
(limit,),
).fetchall()
return [
(r["name"], r["affiliation"], r["cnt"],
r["drafts"].split("||") if r["drafts"] else [])
for r in rows
]
def top_orgs(self, limit: int = 20) -> list[tuple[str, int, int]]:
"""Return (org, author_count, draft_count)."""
rows = self.conn.execute(
"""SELECT da.affiliation as org,
COUNT(DISTINCT da.person_id) as authors,
COUNT(DISTINCT da.draft_name) as drafts
FROM draft_authors da
WHERE da.affiliation != ''
GROUP BY da.affiliation
ORDER BY drafts DESC
LIMIT ?""",
(limit,),
).fetchall()
return [(r["org"], r["authors"], r["drafts"]) for r in rows]
def coauthor_pairs(self) -> list[tuple[str, str, int]]:
"""Return (author_a, author_b, shared_drafts) for all co-author pairs."""
rows = self.conn.execute(
"""SELECT a1.name as a, a2.name as b, COUNT(*) as shared
FROM draft_authors da1
JOIN draft_authors da2 ON da1.draft_name = da2.draft_name AND da1.person_id < da2.person_id
JOIN authors a1 ON da1.person_id = a1.person_id
JOIN authors a2 ON da2.person_id = a2.person_id
GROUP BY da1.person_id, da2.person_id
ORDER BY shared DESC"""
).fetchall()
return [(r["a"], r["b"], r["shared"]) for r in rows]
def cross_org_collaborations(self, limit: int = 20) -> list[tuple[str, str, int]]:
"""Return (org_a, org_b, shared_drafts) for cross-org collaboration."""
rows = self.conn.execute(
"""SELECT da1.affiliation as org_a, da2.affiliation as org_b,
COUNT(DISTINCT da1.draft_name) as shared
FROM draft_authors da1
JOIN draft_authors da2 ON da1.draft_name = da2.draft_name
AND da1.person_id < da2.person_id
WHERE da1.affiliation != '' AND da2.affiliation != ''
AND da1.affiliation != da2.affiliation
GROUP BY da1.affiliation, da2.affiliation
ORDER BY shared DESC
LIMIT ?""",
(limit,),
).fetchall()
return [(r["org_a"], r["org_b"], r["shared"]) for r in rows]
# --- Ideas ---
def insert_ideas(self, draft_name: str, ideas: list[dict]) -> None:
# Clear existing ideas for this draft first
self.conn.execute("DELETE FROM ideas WHERE draft_name = ?", (draft_name,))
now = datetime.now(timezone.utc).isoformat()
for idea in ideas:
self.conn.execute(
"""INSERT INTO ideas (draft_name, title, description, idea_type, extracted_at)
VALUES (?, ?, ?, ?, ?)""",
(draft_name, idea["title"], idea["description"],
idea.get("type", ""), now),
)
self.conn.commit()
def get_ideas_for_draft(self, draft_name: str) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM ideas WHERE draft_name = ?", (draft_name,)
).fetchall()
return [{"title": r["title"], "description": r["description"],
"type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows]
def drafts_without_ideas(self, limit: int = 500) -> list[str]:
rows = self.conn.execute(
"""SELECT d.name FROM drafts d
LEFT JOIN ideas i ON d.name = i.draft_name
WHERE i.draft_name IS NULL
LIMIT ?""",
(limit,),
).fetchall()
return [r["name"] for r in rows]
def all_ideas(self) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM ideas ORDER BY draft_name"
).fetchall()
return [{"title": r["title"], "description": r["description"],
"type": r["idea_type"], "draft_name": r["draft_name"]} for r in rows]
def idea_count(self) -> int:
return self.conn.execute("SELECT COUNT(*) FROM ideas").fetchone()[0]
# --- Gaps ---
def insert_gaps(self, gaps: list[dict]) -> None:
self.conn.execute("DELETE FROM gaps") # Replace old analysis
now = datetime.now(timezone.utc).isoformat()
for g in gaps:
self.conn.execute(
"""INSERT INTO gaps (topic, description, category, evidence, severity, analyzed_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(g["topic"], g["description"], g.get("category", ""),
g.get("evidence", ""), g.get("severity", "medium"), now),
)
self.conn.commit()
def all_gaps(self) -> list[dict]:
rows = self.conn.execute("SELECT * FROM gaps ORDER BY id").fetchall()
return [{"id": r["id"], "topic": r["topic"], "description": r["description"],
"category": r["category"], "evidence": r["evidence"],
"severity": r["severity"]} for r in rows]
# --- Helpers ---
@staticmethod

View File

@@ -0,0 +1,235 @@
"""Internet-Draft generation from gap analysis."""
from __future__ import annotations
import json
import textwrap
from datetime import datetime, timezone, timedelta
from pathlib import Path
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
from .config import Config
from .db import Database
console = Console()
OUTLINE_PROMPT = """\
You are writing an IETF Internet-Draft to address this gap in the AI/agent standardization landscape:
Gap: {gap_topic}
{gap_context}
Related existing drafts (for context, not to duplicate):
{related_drafts}
Generate a detailed outline for an Internet-Draft.
Return JSON: {{"title":"full draft title","abstract":"150-250 word abstract","sections":[{{"title":"section title","summary":"2-3 sentence summary of content"}}],"target_wg":"suggested IETF working group","intended_status":"informational|standards-track|experimental"}}
Include standard sections: Introduction, Terminology, Problem Statement, then 2-4 technical sections, Security Considerations, IANA Considerations.
JSON only, no fences."""
SECTION_PROMPT = """\
Write the following section of an Internet-Draft titled "{draft_title}".
Abstract: {abstract}
Full outline:
{outline_text}
Write section {section_num}: {section_title}
Summary: {section_summary}
Follow IETF Internet-Draft conventions:
- Formal, precise technical language
- Use RFC 2119 keywords (MUST, SHOULD, MAY) where appropriate
- Reference existing RFCs and drafts where relevant
- 3-6 paragraphs per section
Write the section content only (no section number or title). Plain text."""
class DraftGenerator:
def __init__(self, config: Config, db: Database, analyzer):
self.config = config
self.db = db
self.analyzer = analyzer
def generate_outline(self, gap_topic: str) -> dict:
"""Generate draft outline from a gap topic."""
# Find related gaps in DB
gap_context = ""
gaps = self.db.all_gaps()
for g in gaps:
if gap_topic.lower() in g["topic"].lower() or gap_topic.lower() in g["description"].lower():
gap_context = f"Description: {g['description']}\nEvidence: {g['evidence']}"
break
# Get a sample of related drafts for context
pairs = self.db.drafts_with_ratings(limit=500)
related = []
for draft, rating in pairs:
if any(gap_topic.lower() in cat.lower() for cat in rating.categories):
related.append(f"- {draft.name}: {rating.summary[:80]}")
if len(related) >= 8:
break
if not related:
# Fallback: use top-rated drafts
for draft, rating in pairs[:5]:
related.append(f"- {draft.name}: {rating.summary[:80]}")
prompt = OUTLINE_PROMPT.format(
gap_topic=gap_topic,
gap_context=gap_context or "(No detailed gap analysis available)",
related_drafts="\n".join(related),
)
text, _, _ = self.analyzer._call_claude(prompt, max_tokens=2048)
text = self.analyzer._extract_json(text)
return json.loads(text)
def generate_section(self, outline: dict, section_idx: int) -> str:
"""Generate a single section of the draft."""
sections = outline["sections"]
section = sections[section_idx]
outline_text = "\n".join(
f"{i+1}. {s['title']}: {s['summary']}"
for i, s in enumerate(sections)
)
prompt = SECTION_PROMPT.format(
draft_title=outline["title"],
abstract=outline["abstract"],
outline_text=outline_text,
section_num=section_idx + 1,
section_title=section["title"],
section_summary=section["summary"],
)
text, _, _ = self.analyzer._call_claude(prompt, max_tokens=2048)
return text
def _wrap_text(self, text: str, indent: int = 3, width: int = 69) -> str:
"""Wrap text to Internet-Draft conventions (72 char lines, indented)."""
prefix = " " * indent
paragraphs = text.strip().split("\n\n")
wrapped = []
for para in paragraphs:
para = " ".join(para.split()) # Normalize whitespace
lines = textwrap.wrap(para, width=width, initial_indent=prefix,
subsequent_indent=prefix)
wrapped.append("\n".join(lines))
return "\n\n".join(wrapped)
def assemble_draft(self, outline: dict, sections: list[str]) -> str:
"""Assemble sections into Internet-Draft text format."""
now = datetime.now(timezone.utc)
expires = now + timedelta(days=185)
date_str = now.strftime("%B %Y")
exp_str = expires.strftime("%B %d, %Y")
title = outline["title"]
abstract = outline["abstract"]
status = outline.get("intended_status", "Informational")
wg = outline.get("target_wg", "individual")
# Generate a draft name from the title
words = title.lower().split()
slug = "-".join(w for w in words[:4] if w.isalnum())
draft_name = f"draft-ai-{slug}-00"
lines = []
# Header
lines.append(f"Internet-Draft AI/Agent WG")
lines.append(f"Intended status: {status:<44s}{date_str}")
lines.append(f"Expires: {exp_str}")
lines.append("")
lines.append("")
# Title (centered)
title_line = title
lines.append(f" {title_line}")
lines.append(f" {draft_name}")
lines.append("")
# Abstract
lines.append("Abstract")
lines.append("")
lines.append(self._wrap_text(abstract))
lines.append("")
# Status of This Memo
lines.append("Status of This Memo")
lines.append("")
lines.append(self._wrap_text(
"This Internet-Draft is submitted in full conformance with the "
"provisions of BCP 78 and BCP 79."
))
lines.append("")
lines.append(self._wrap_text(
f"This document is intended to have {status} status. "
"Distribution of this memo is unlimited."
))
lines.append("")
# Table of Contents
lines.append("Table of Contents")
lines.append("")
for i, section in enumerate(outline["sections"], 1):
dots = "." * (60 - len(section["title"]))
lines.append(f" {i}. {section['title']} {dots} {i + 2}")
lines.append("")
# Sections
for i, (section_info, section_text) in enumerate(
zip(outline["sections"], sections), 1
):
lines.append(f"{i}. {section_info['title']}")
lines.append("")
lines.append(self._wrap_text(section_text))
lines.append("")
# Author's Address
lines.append("Author's Address")
lines.append("")
lines.append(" Generated by IETF Draft Analyzer")
lines.append(f" {now.strftime('%Y-%m-%d')}")
lines.append("")
return "\n".join(lines)
def generate(self, gap_topic: str, output_path: str | None = None) -> str:
"""Full pipeline: outline -> sections -> assemble -> write file."""
console.print("[bold]Step 1/3:[/] Generating outline...")
outline = self.generate_outline(gap_topic)
console.print(f" Title: [cyan]{outline['title']}[/]")
console.print(f" Sections: {len(outline['sections'])}")
console.print(f" Target WG: {outline.get('target_wg', '?')}")
console.print("\n[bold]Step 2/3:[/] Generating sections...")
sections = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Writing...", total=len(outline["sections"]))
for i, s in enumerate(outline["sections"]):
progress.update(task, description=f"Section: {s['title'][:30]}")
text = self.generate_section(outline, i)
sections.append(text)
progress.advance(task)
console.print("\n[bold]Step 3/3:[/] Assembling draft...")
draft_text = self.assemble_draft(outline, sections)
out = Path(output_path) if output_path else Path(self.config.data_dir) / "reports" / "generated-draft.txt"
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(draft_text)
return str(out)

View File

@@ -6,6 +6,16 @@ from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Author:
person_id: int
name: str
ascii_name: str = ""
affiliation: str = ""
resource_uri: str = ""
fetched_at: str | None = None
@dataclass
class Draft:
name: str # e.g. "draft-zheng-dispatch-agent-identity-management"

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
import json
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
@@ -175,3 +177,427 @@ class Reporter:
path = self.output_dir / "digest.md"
path.write_text(report)
return str(path)
def timeline(self) -> str:
"""Generate a timeline report of draft submissions by month and category."""
pairs = self.db.drafts_with_ratings(limit=500)
all_drafts = self.db.list_drafts(limit=500, order_by="time ASC")
total = len(all_drafts)
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
# Group drafts by month
by_month: dict[str, list[Draft]] = defaultdict(list)
for d in all_drafts:
month = d.time[:7] if d.time else "unknown"
by_month[month].append(d)
months = sorted(by_month.keys())
# Build rating lookup by draft name
rating_map: dict[str, Rating] = {}
for draft, rating in pairs:
rating_map[draft.name] = rating
# Collect all categories
all_cats: set[str] = set()
for _, r in pairs:
for c in r.categories:
all_cats.add(c)
cats = sorted(all_cats)
# Category counts per month
cat_by_month: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for d in all_drafts:
month = d.time[:7] if d.time else "unknown"
r = rating_map.get(d.name)
if r:
for c in r.categories:
cat_by_month[month][c] += 1
lines = [
"# IETF AI/Agent Drafts Timeline",
f"*Generated {now}{total} drafts across {len(months)} months*\n",
"## Monthly Submission Volume\n",
"```",
]
max_count = max(len(ds) for ds in by_month.values()) if by_month else 1
for month in months:
count = len(by_month[month])
bar_len = int(count / max_count * 40) if max_count else 0
bar = "#" * bar_len
lines.append(f"{month} | {bar:<40s} {count:>3}")
lines.append("```\n")
# Category breakdown table
lines.append("## Category Breakdown by Month\n")
header = "| Month |" + " | ".join(f" {c[:12]:>12}" for c in cats) + " | Total |"
sep = "|---------|" + " | ".join("-" * 13 for _ in cats) + " | -----:|"
lines.append(header)
lines.append(sep)
for month in months:
counts = [str(cat_by_month[month].get(c, 0)).rjust(13) for c in cats]
total_m = len(by_month[month])
lines.append(f"| {month} |" + " | ".join(counts) + f" | {total_m:>5} |")
# Trends section
lines.append("\n## Trends\n")
if len(months) >= 4:
mid = len(months) // 2
early_months = months[:mid]
late_months = months[mid:]
early_cat: dict[str, int] = defaultdict(int)
late_cat: dict[str, int] = defaultdict(int)
for m in early_months:
for c in cats:
early_cat[c] += cat_by_month[m].get(c, 0)
for m in late_months:
for c in cats:
late_cat[c] += cat_by_month[m].get(c, 0)
growth = []
for c in cats:
e = early_cat[c]
l = late_cat[c]
if e > 0:
pct = ((l - e) / e) * 100
growth.append((c, pct, e, l))
elif l > 0:
growth.append((c, float('inf'), e, l))
growth.sort(key=lambda x: x[1], reverse=True)
for c, pct, e, l in growth[:5]:
if pct == float('inf'):
lines.append(f"- **{c}**: new (0 → {l} drafts)")
else:
lines.append(f"- **{c}**: {pct:+.0f}% ({e}{l} drafts, early vs late half)")
else:
lines.append("Not enough months for trend analysis.")
report = "\n".join(lines)
path = self.output_dir / "timeline.md"
path.write_text(report)
return str(path)
def overlap_matrix(self, embedder) -> str:
"""Generate overlap matrix report from pairwise similarity."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
names, matrix = embedder.similarity_matrix()
n = len(names)
# Build rating lookup
pairs_data = self.db.drafts_with_ratings(limit=500)
rating_map: dict[str, Rating] = {}
draft_map: dict[str, Draft] = {}
for draft, rating in pairs_data:
rating_map[draft.name] = rating
draft_map[draft.name] = draft
# Top similar pairs (above 0.80, excluding self)
sim_pairs: list[tuple[float, str, str]] = []
for i in range(n):
for j in range(i + 1, n):
if matrix[i, j] >= 0.80:
sim_pairs.append((float(matrix[i, j]), names[i], names[j]))
sim_pairs.sort(reverse=True)
lines = [
"# Overlap Matrix Report",
f"*Generated {now}{n}x{n} pairwise similarities*\n",
f"## Top {min(50, len(sim_pairs))} Most Similar Pairs\n",
"| Rank | Similarity | Draft A | Draft B |",
"|-----:|-----------:|---------|---------|",
]
for rank, (sim, a, b) in enumerate(sim_pairs[:50], 1):
a_link = f"[{a}](https://datatracker.ietf.org/doc/{a}/)"
b_link = f"[{b}](https://datatracker.ietf.org/doc/{b}/)"
lines.append(f"| {rank} | {sim:.3f} | {a_link} | {b_link} |")
# Per-category internal overlap
cat_drafts: dict[str, list[int]] = defaultdict(list)
name_idx = {name: i for i, name in enumerate(names)}
for name in names:
r = rating_map.get(name)
if r:
for c in r.categories:
if name in name_idx:
cat_drafts[c].append(name_idx[name])
lines.extend([
"\n## Per-Category Internal Overlap\n",
"| Category | Drafts | Avg Pairwise Sim | Most Similar Pair |",
"|----------|-------:|-----------------:|-------------------|",
])
for cat in sorted(cat_drafts.keys()):
indices = cat_drafts[cat]
if len(indices) < 2:
lines.append(f"| {cat} | {len(indices)} | — | — |")
continue
sims = []
best_sim, best_a, best_b = 0.0, "", ""
for ii in range(len(indices)):
for jj in range(ii + 1, len(indices)):
s = float(matrix[indices[ii], indices[jj]])
sims.append(s)
if s > best_sim:
best_sim = s
best_a = names[indices[ii]]
best_b = names[indices[jj]]
avg = sum(sims) / len(sims) if sims else 0
short_a = best_a.replace("draft-", "")[:25]
short_b = best_b.replace("draft-", "")[:25]
lines.append(f"| {cat} | {len(indices)} | {avg:.3f} | {short_a} / {short_b} ({best_sim:.3f}) |")
# Category cross-overlap matrix
cat_names = sorted(cat_drafts.keys())
if len(cat_names) > 1:
lines.extend([
"\n## Category Cross-Overlap\n",
"Average similarity between drafts in different categories.\n",
"| |" + " | ".join(c[:10] for c in cat_names) + " |",
"|-|" + " | ".join("---:" for _ in cat_names) + " |",
])
for ci, c1 in enumerate(cat_names):
row = f"| **{c1[:12]}** |"
for cj, c2 in enumerate(cat_names):
if ci > cj:
row += " |"
continue
idx1, idx2 = cat_drafts[c1], cat_drafts[c2]
sims = []
for i1 in idx1:
for i2 in idx2:
if i1 != i2:
sims.append(float(matrix[i1, i2]))
avg = sum(sims) / len(sims) if sims else 0
row += f" {avg:.2f} |"
lines.append(row)
# Most unique drafts
lines.append("\n## Most Unique Drafts (max similarity < 0.70)\n")
unique_drafts = []
for i, name in enumerate(names):
max_sim = 0.0
best_match = ""
for j in range(n):
if i != j and matrix[i, j] > max_sim:
max_sim = float(matrix[i, j])
best_match = names[j]
if max_sim < 0.70:
d = draft_map.get(name)
title = d.title[:60] if d else ""
unique_drafts.append((name, max_sim, best_match, title))
unique_drafts.sort(key=lambda x: x[1])
if unique_drafts:
for name, ms, bm, title in unique_drafts[:20]:
lines.append(f"- **{name}** (max sim: {ms:.3f} with {bm}) — {title}")
else:
lines.append("No drafts with max similarity below 0.70.")
report = "\n".join(lines)
path = self.output_dir / "overlap-matrix.md"
path.write_text(report)
return str(path)
def authors_report(self) -> str:
"""Generate author/organization network report."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
author_count = self.db.author_count()
total_drafts = self.db.count_drafts()
# Build rating lookup for category info
pairs_data = self.db.drafts_with_ratings(limit=500)
rating_map: dict[str, Rating] = {}
for draft, rating in pairs_data:
rating_map[draft.name] = rating
lines = [
"# Author & Organization Network",
f"*Generated {now}{author_count} unique authors across {total_drafts} drafts*\n",
]
# Top authors
top = self.db.top_authors(limit=30)
lines.extend([
"## Top Authors by Draft Count\n",
"| # | Author | Organization | Drafts | Categories |",
"|--:|--------|-------------|-------:|------------|",
])
for rank, (name, aff, cnt, draft_names) in enumerate(top, 1):
cats: set[str] = set()
for dn in draft_names:
r = rating_map.get(dn)
if r:
cats.update(r.categories)
cat_str = ", ".join(sorted(cats)[:3])
lines.append(f"| {rank} | {name} | {aff} | {cnt} | {cat_str} |")
# Top orgs
orgs = self.db.top_orgs(limit=20)
lines.extend([
"\n## Top Organizations\n",
"| # | Organization | Authors | Drafts |",
"|--:|-------------|--------:|-------:|",
])
for rank, (org, authors, drafts) in enumerate(orgs, 1):
lines.append(f"| {rank} | {org} | {authors} | {drafts} |")
# Co-author pairs
coauthors = self.db.coauthor_pairs()
if coauthors:
lines.extend([
"\n## Strongest Collaboration Pairs\n",
"| Author A | Author B | Shared Drafts |",
"|----------|----------|-----:|",
])
for a, b, shared in coauthors[:20]:
lines.append(f"| {a} | {b} | {shared} |")
# Cross-org
cross = self.db.cross_org_collaborations(limit=15)
if cross:
lines.extend([
"\n## Cross-Organization Collaboration\n",
"| Org A | Org B | Shared Drafts |",
"|-------|-------|-----:|",
])
for org_a, org_b, shared in cross:
lines.append(f"| {org_a} | {org_b} | {shared} |")
report = "\n".join(lines)
path = self.output_dir / "authors.md"
path.write_text(report)
return str(path)
def ideas_report(self) -> str:
"""Generate report on extracted ideas across all drafts."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
all_ideas = self.db.all_ideas()
# Build rating lookup for category info
pairs_data = self.db.drafts_with_ratings(limit=500)
rating_map: dict[str, Rating] = {}
for draft, rating in pairs_data:
rating_map[draft.name] = rating
# Group ideas by normalized title for frequency analysis
from difflib import SequenceMatcher
idea_groups: list[dict] = [] # [{canonical, ideas: [idea], drafts: set}]
for idea in all_ideas:
title_lower = idea["title"].lower().strip()
matched = False
for group in idea_groups:
ratio = SequenceMatcher(None, title_lower, group["canonical"]).ratio()
if ratio >= 0.75:
group["ideas"].append(idea)
group["drafts"].add(idea["draft_name"])
matched = True
break
if not matched:
idea_groups.append({
"canonical": title_lower,
"title": idea["title"],
"ideas": [idea],
"drafts": {idea["draft_name"]},
})
idea_groups.sort(key=lambda g: len(g["drafts"]), reverse=True)
drafts_with_ideas = len(set(i["draft_name"] for i in all_ideas))
lines = [
"# Technical Ideas Extracted from IETF AI/Agent Drafts",
f"*Generated {now}{len(all_ideas)} ideas from {drafts_with_ideas} drafts*\n",
]
# Most common ideas (3+ drafts)
common = [g for g in idea_groups if len(g["drafts"]) >= 3]
if common:
lines.extend([
"## Most Common Ideas (appearing in 3+ drafts)\n",
"| Idea | Appearances | Drafts |",
"|------|------------:|--------|",
])
for g in common:
draft_list = ", ".join(sorted(g["drafts"])[:5])
if len(g["drafts"]) > 5:
draft_list += f" +{len(g['drafts'])-5} more"
lines.append(f"| {g['title']} | {len(g['drafts'])} | {draft_list} |")
# Ideas appearing in 2 drafts
two = [g for g in idea_groups if len(g["drafts"]) == 2]
if two:
lines.append(f"\n## Ideas Appearing in 2 Drafts ({len(two)} ideas)\n")
for g in two[:30]:
draft_list = ", ".join(sorted(g["drafts"]))
lines.append(f"- **{g['title']}** — {draft_list}")
# Unique ideas (only 1 draft) - just count and top examples
unique = [g for g in idea_groups if len(g["drafts"]) == 1]
lines.append(f"\n## Unique Ideas ({len(unique)} ideas appearing in only 1 draft)\n")
for g in unique[:20]:
idea = g["ideas"][0]
lines.append(f"- **{g['title']}** ({idea['draft_name']}) — {idea['description'][:100]}")
if len(unique) > 20:
lines.append(f"\n*...and {len(unique) - 20} more unique ideas*")
# By type
by_type: dict[str, int] = defaultdict(int)
for idea in all_ideas:
by_type[idea.get("type", "other")] += 1
if by_type:
lines.extend(["\n## Ideas by Type\n"])
for t, count in sorted(by_type.items(), key=lambda x: x[1], reverse=True):
lines.append(f"- **{t or 'untyped'}**: {count}")
report = "\n".join(lines)
path = self.output_dir / "ideas.md"
path.write_text(report)
return str(path)
def gaps_report(self) -> str:
"""Generate gap analysis report."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
gaps = self.db.all_gaps()
total_drafts = self.db.count_drafts()
lines = [
"# Gap Analysis: IETF AI/Agent Draft Landscape",
f"*Generated {now} — analyzing {total_drafts} drafts*\n",
]
# Group by severity
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
gaps.sort(key=lambda g: severity_order.get(g["severity"], 4))
for i, gap in enumerate(gaps, 1):
sev = gap["severity"].upper()
lines.extend([
f"### {i}. {gap['topic']}",
f"**Severity:** {sev} ",
f"**Category:** {gap['category'] or 'cross-cutting'} ",
f"**Description:** {gap['description']} ",
f"**Evidence:** {gap['evidence']}\n",
])
# Summary
by_sev: dict[str, int] = defaultdict(int)
for g in gaps:
by_sev[g["severity"]] += 1
lines.append("## Summary by Severity\n")
for sev in ["critical", "high", "medium", "low"]:
if by_sev[sev]:
lines.append(f"- **{sev.title()}:** {by_sev[sev]} gaps")
report = "\n".join(lines)
path = self.output_dir / "gaps.md"
path.write_text(report)
return str(path)

File diff suppressed because it is too large Load Diff