IETF Draft Analyzer v0.1.0 — track, categorize, and rate AI/agent drafts

Python CLI tool that fetches AI/agent-related Internet-Drafts from the IETF
Datatracker, rates them using Claude, generates embeddings via Ollama for
similarity/clustering, and produces markdown reports.

Features:
- Fetch drafts by keyword from Datatracker API with full text download
- Batch analysis with Claude (token-optimized, responses cached in SQLite)
- Embedding-based similarity search and overlap cluster detection
- Reports: overview, landscape by category, overlap clusters, weekly digest
- SQLite with FTS5 for full-text search across 260 tracked drafts

Initial analysis of 260 drafts reveals OAuth agent auth (13 drafts) and
agent gateway/collaboration (10 drafts) as the most crowded clusters,
while AI safety/alignment is underserved with the highest quality scores.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 00:36:45 +01:00
commit 6771a4c235
17 changed files with 2823 additions and 0 deletions

View File

@@ -0,0 +1,204 @@
"""Datatracker API client — search, fetch metadata, download full text."""
from __future__ import annotations
import time as time_mod
from datetime import datetime, timezone
import httpx
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
from .config import Config
from .models import Draft
API_BASE = "https://datatracker.ietf.org/api/v1"
TEXT_BASE = "https://www.ietf.org/archive/id"
SEARCH_FIELDS = ("name__contains", "abstract__contains")
console = Console()
class Fetcher:
def __init__(self, config: Config | None = None):
self.config = config or Config.load()
self.client = httpx.Client(timeout=30, follow_redirects=True)
self._group_cache: dict[str, str] = {}
def close(self) -> None:
self.client.close()
# --- Search & fetch metadata ---
def search_drafts(
self,
keywords: list[str] | None = None,
since: str | None = None,
limit_per_keyword: int = 200,
) -> list[Draft]:
"""Search for drafts matching keywords. Deduplicates by name."""
keywords = keywords or self.config.search_keywords
since = since or self.config.fetch_since
seen: dict[str, Draft] = {}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
# Search both name and abstract for each keyword
searches = []
for kw in keywords:
for field in SEARCH_FIELDS:
searches.append((kw, field))
task = progress.add_task("Searching Datatracker...", total=len(searches))
for kw, search_field in searches:
progress.update(task, description=f"Searching {search_field.split('__')[0]}: {kw}")
drafts = self._paginated_search(search_field, kw, since, limit_per_keyword)
for d in drafts:
if d.name not in seen:
seen[d.name] = d
progress.advance(task)
console.print(f"Found [bold green]{len(seen)}[/] unique drafts")
return list(seen.values())
def _paginated_search(
self,
search_field: str,
keyword: str,
since: str,
max_results: int,
) -> list[Draft]:
results: list[Draft] = []
offset = 0
page_size = 100
while offset < max_results:
params = {
"format": "json",
search_field: keyword,
"time__gte": since,
"type__slug": "draft",
"limit": min(page_size, max_results - offset),
"offset": offset,
}
try:
resp = self.client.get(f"{API_BASE}/doc/document/", params=params)
resp.raise_for_status()
except httpx.HTTPError as e:
console.print(f"[red]API error: {e}[/]")
break
data = resp.json()
objects = data.get("objects", [])
if not objects:
break
for obj in objects:
results.append(self._api_obj_to_draft(obj))
offset += len(objects)
if not data.get("meta", {}).get("next"):
break
time_mod.sleep(self.config.fetch_delay)
return results
def fetch_draft(self, name: str) -> Draft | None:
"""Fetch a single draft by name."""
try:
resp = self.client.get(
f"{API_BASE}/doc/document/{name}/", params={"format": "json"}
)
resp.raise_for_status()
return self._api_obj_to_draft(resp.json())
except httpx.HTTPError as e:
console.print(f"[red]Error fetching {name}: {e}[/]")
return None
# --- Full text ---
def download_full_text(self, draft: Draft) -> str | None:
"""Download the plain text of a draft."""
url = draft.text_url
try:
resp = self.client.get(url)
resp.raise_for_status()
return resp.text
except httpx.HTTPError:
# Try without revision if it fails
try:
alt_url = f"{TEXT_BASE}/{draft.name}.txt"
resp = self.client.get(alt_url)
resp.raise_for_status()
return resp.text
except httpx.HTTPError as e:
console.print(f"[dim]Could not download text for {draft.name}: {e}[/]")
return None
def download_texts(self, drafts: list[Draft]) -> dict[str, str]:
"""Download full text for multiple drafts. Returns {name: text}."""
results: dict[str, str] = {}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Downloading draft texts...", total=len(drafts))
for draft in drafts:
text = self.download_full_text(draft)
if text:
results[draft.name] = text
progress.advance(task)
time_mod.sleep(self.config.fetch_delay)
console.print(f"Downloaded [bold green]{len(results)}[/] / {len(drafts)} texts")
return results
# --- Group resolution ---
def resolve_group(self, group_uri: str) -> str:
"""Resolve a group API URI to a group acronym/name."""
if not group_uri:
return ""
if group_uri in self._group_cache:
return self._group_cache[group_uri]
try:
resp = self.client.get(
f"https://datatracker.ietf.org{group_uri}", params={"format": "json"}
)
resp.raise_for_status()
name = resp.json().get("acronym", resp.json().get("name", ""))
self._group_cache[group_uri] = name
time_mod.sleep(self.config.fetch_delay)
return name
except httpx.HTTPError:
return ""
# --- Helpers ---
def _api_obj_to_draft(self, obj: dict) -> Draft:
return Draft(
name=obj.get("name", ""),
rev=obj.get("rev", "00"),
title=obj.get("title", ""),
abstract=obj.get("abstract", "").strip(),
time=obj.get("time", ""),
dt_id=obj.get("id"),
pages=obj.get("pages"),
words=obj.get("words"),
group=None, # Resolved lazily
group_uri=obj.get("group", ""),
expires=obj.get("expires"),
ad=obj.get("ad"),
shepherd=obj.get("shepherd"),
states=[s for s in (obj.get("states") or []) if isinstance(s, str)],
fetched_at=datetime.now(timezone.utc).isoformat(),
)