Files
ietf-draft-analyzer/src/ietf_analyzer/analyzer.py
Christian Nennemann e7527ad68e Fix remaining critical, high, and medium issues from 4-perspective review
Critical fixes:
- Fix rating clamp range 1-10 → 1-5 (actual scale)
- Add `ietf ideas convergence` command (SequenceMatcher at 0.75 threshold)
- Fix "628 cross-org ideas" → 130 (verified from current DB) across 8 files

Security fixes:
- Sanitize FTS5 query input (strip special chars + boolean operators)
- Add rate limiting (10 req/min/IP) on Claude-calling endpoints
- Change <path:name> → <string:name> on draft routes

Codebase fixes:
- Add Database context manager (__enter__/__exit__)
- Wire false_positive filtering into queries (exclude by default in web UI)
- Fix Post 3 arithmetic ("~300" → "~409" distinct proposals)

Content & licensing:
- Add MIT LICENSE file
- Add IPR/FRAND notes (BCP 79, RFC 8179) to Posts 03 and 07
- Qualify "4:1 safety ratio" with monthly variation in 6 remaining files
- Add "Data as of March 2026" freeze-date headers to all 10 blog posts
- Hedge causal language in Post 04

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 12:47:47 +01:00

939 lines
39 KiB
Python

"""Claude-based analysis — summarization, rating, categorization, overlap detection."""
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
# Load .env from project root (two levels up from this file, or cwd)
load_dotenv(Path(__file__).resolve().parent.parent.parent / ".env")
load_dotenv() # Also check cwd
import anthropic
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
from .config import Config
from .db import Database
from .models import Draft, Rating
console = Console()
CATEGORIES_SHORT = [
"A2A protocols", # Agent-to-agent communication protocols
"AI safety/alignment", # AI safety / guardrails / alignment
"ML traffic mgmt", # ML-based traffic management / optimization
"Autonomous netops", # Autonomous network operations
"Agent identity/auth", # Identity / authentication for AI agents
"Data formats/interop",# Data formats / semantics for AI interop
"Policy/governance", # Policy / governance / ethical frameworks
"Model serving/inference", # AI model serving / inference protocols
"Agent discovery/reg", # Agent discovery / registration
"Human-agent interaction",
"Other AI/agent",
]
# ============================================================================
# METHODOLOGY NOTE — LLM-as-Judge Rating Approach
#
# Limitations of this rating system (see also data/reports/methodology.md):
#
# 1. ABSTRACT-ONLY: Ratings are generated from the draft's abstract (truncated
# to 2000 chars), not the full text. Maturity and overlap scores in
# particular may be unreliable when the abstract omits key details.
#
# 2. NO HUMAN CALIBRATION: No inter-rater reliability study has been performed.
# Claude is the sole judge; scores have not been validated against human
# expert ratings. Even a small calibration set (20-30 drafts) would
# substantially strengthen confidence in the ratings.
#
# 3. NO INTRA-RATER CONSISTENCY CHECK: The same draft is never re-rated to
# measure Claude's self-consistency. Prompt-hash caching means re-runs
# return cached results, so actual consistency is untested.
#
# 4. OVERLAP SCORE LIMITATION: The overlap dimension asks Claude whether a
# draft overlaps with other known work, but Claude rates each draft
# independently — it does not have access to the full corpus during rating.
# The overlap score reflects Claude's general knowledge, not corpus-specific
# similarity. Use embedding-based similarity for corpus-level overlap.
#
# 5. BATCH EFFECTS: Batch rating (BATCH_PROMPT) processes multiple drafts
# together. Position effects and comparison effects are uncontrolled.
# Abstracts are also truncated more aggressively (1500 chars vs 2000).
#
# 6. RELEVANCE INFLATION: The relevance distribution is right-skewed because
# keyword-matched drafts tend to score high on relevance by construction.
# The corpus likely contains 30-50 false positives from ambiguous keywords
# like "agent" (user agent), "autonomous" (autonomous systems), and
# "intelligent" (intelligent networking).
#
# INTERPRETATION: Scores should be treated as RELATIVE RANKINGS within this
# corpus, not as absolute quality measures. A score of 4.0 means "above
# average for this corpus," not "objectively high quality."
# ============================================================================
# Compact prompt — abstract only, saves ~10x tokens vs full-text
RATE_PROMPT_COMPACT = """\
Rate this {doc_type}. JSON only.
{name} | {title} | {time} | {pages}pg
Abstract: {abstract}
Return JSON: {{"s":"2-3 sentence summary","n":<1-5>,"nn":"novelty note","m":<1-5>,"mn":"maturity note","o":<1-5>,"on":"overlap note","mo":<1-5>,"mon":"momentum note","r":<1-5>,"rn":"relevance note","c":["categories"]}}
Rating scale (use the FULL range 1-5, avoid clustering at 3-4):
- Novelty: 1=trivial/obvious extension, 2=incremental, 3=useful contribution, 4=notable originality, 5=genuinely novel approach
- Maturity: 1=problem statement only, 2=early sketch, 3=defined protocol/mechanism, 4=detailed spec with examples, 5=implementation-ready with test vectors
- Overlap: 1=unique approach, 2=minor similarities, 3=shares concepts with 1-2 drafts, 4=significant overlap, 5=near-duplicate of existing work
- Momentum: 1=inactive/abandoned, 2=single revision, 3=active development, 4=WG interest/adoption, 5=strong community momentum
- Relevance: 1=not about AI/agents (false positive), 2=tangentially related, 3=partially relevant, 4=directly relevant, 5=core AI agent topic
Categories: {categories}
JSON only, no fences."""
# Batch prompt — rate multiple drafts in one call
BATCH_PROMPT = """\
Rate each document below. Return a JSON array with one object per draft, in order.
{drafts_block}
Per-draft JSON: {{"name":"draft-name","s":"2-3 sentence summary","n":<1-5>,"nn":"novelty note","m":<1-5>,"mn":"maturity note","o":<1-5>,"on":"overlap with known drafts","mo":<1-5>,"mon":"momentum note","r":<1-5>,"rn":"relevance note","c":["categories"]}}
Scale: 1=very low..5=very high. Overlap: 1=unique,5=heavy overlap.
Categories: {categories}
Return ONLY a JSON array, no fences."""
COMPARE_PROMPT = """\
Compare these documents — overlaps, unique ideas, complementary vs competing vs redundant.
{drafts_section}
Be specific about concrete mechanisms and design choices."""
EXTRACT_IDEAS_PROMPT = """\
Extract discrete technical ideas and mechanisms from this {doc_type}.
Return a JSON array. Each element: {{"title":"short name","description":"1-2 sentences","type":"mechanism|protocol|pattern|requirement|architecture|extension"}}
{name} | {title} | {pages}pg
Abstract: {abstract}
{text_excerpt}
Return 1-4 ideas. Extract only TOP-LEVEL novel contributions. Do NOT list sub-features, optimizations, variants, or extensions as separate ideas. If a draft defines one protocol with multiple features, that is ONE idea, not several. Each idea must be independently novel — could it be its own draft? If not, merge it with the parent idea. Only include CONCRETE, NOVEL technical contributions — not restatements of the abstract or general goals. If the draft has no substantive technical ideas (e.g. it is a problem statement, administrative document, or off-topic), return an empty array [].
JSON array only, no fences."""
BATCH_IDEAS_PROMPT = """\
Extract ideas from each document below. Return a JSON object mapping document name -> array of ideas.
Per idea: {{"title":"short name","description":"1 sentence","type":"mechanism|protocol|pattern|requirement|architecture|extension"}}
{drafts_block}
1-4 ideas per draft. Extract only TOP-LEVEL novel contributions. Do NOT list sub-features, optimizations, variants, or extensions as separate ideas. If a draft defines one protocol with multiple features, that is ONE idea, not several. Each idea must be independently novel — could it be its own draft? If not, merge it with the parent idea. Only include CONCRETE, NOVEL technical contributions. If a draft has no substantive ideas, map it to an empty array. Do not pad with restatements of the abstract.
Return ONLY a JSON object like {{"draft-name":[...], ...}}, no fences."""
# ============================================================================
# GAP ANALYSIS METHODOLOGY NOTE
#
# This is a SINGLE-SHOT LLM analysis: Claude receives compressed statistics
# about the landscape (category counts, top ideas, overlap summary) and
# generates gaps in one pass. Limitations:
#
# 1. No systematic coverage analysis against a reference taxonomy. A rigorous
# approach would compare the corpus against an explicit reference architecture
# (e.g., NIST AI RMF, FIPA agent platform model, or a custom agent ecosystem
# reference model) to identify gaps systematically rather than relying on
# Claude's general knowledge.
#
# 2. The overlap_summary fed to the prompt is category-level only — it does not
# tell Claude which specific technical areas overlap within categories.
#
# 3. Evidence quality varies: some gaps cite specific data ("only N drafts"),
# others are based on Claude's inference about what is missing.
#
# 4. Gap severity is assigned by Claude in a single pass without defined
# thresholds (what makes "critical" vs "high" is implicit).
#
# Strengthening options: ground against a reference architecture, run multiple
# independent gap analyses and intersect results, have domain experts validate.
# ============================================================================
GAP_ANALYSIS_PROMPT = """\
You are analyzing the landscape of {total} IETF Internet-Drafts related to AI agents and autonomous systems.
## Categories and Draft Counts
{category_summary}
## Most Common Technical Ideas
{top_ideas}
## Known Overlap Clusters (groups of highly similar drafts)
{overlap_summary}
Identify 8-15 GAPS — areas, problems, or technical challenges NOT adequately addressed by existing drafts.
Return a JSON array:
[{{"topic":"short topic name","description":"2-3 sentence description","category":"closest category or new","severity":"critical|high|medium|low","evidence":"what suggests this gap matters"}}]
Focus on:
1. Problems mentioned but not solved
2. Missing infrastructure pieces
3. Security/privacy/safety issues not addressed
4. Interoperability gaps between competing proposals
5. Real-world deployment concerns ignored
JSON array only, no fences."""
SCORE_NOVELTY_PROMPT = """\
Rate each idea's novelty/originality on a 1-5 scale.
1 = Generic building block anyone would include (e.g. "Agent Gateway", "Certificate Authority")
2 = Obvious extension of existing work, minimal originality
3 = Useful and relevant but expected given the problem space
4 = Interesting contribution with some original thinking
5 = Genuinely novel mechanism, protocol, or architectural insight
Ideas to score:
{ideas_block}
Return ONLY a JSON object mapping idea ID to score, like {{"123": 3, "456": 1, ...}}.
No fences, no explanation."""
def _prompt_hash(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()[:16]
def _doc_type_label(source: str) -> str:
"""Return a human-readable document type based on source."""
labels = {
"ietf": "IETF draft",
"w3c": "W3C specification",
}
return labels.get(source, f"{source} document")
class Analyzer:
def __init__(self, config: Config | None = None, db: Database | None = None):
self.config = config or Config.load()
self.db = db or Database(self.config)
try:
self.client = anthropic.Anthropic()
except Exception:
console.print(
"[red bold]No Anthropic API key found.[/]\n"
"Set ANTHROPIC_API_KEY environment variable or run:\n"
" export ANTHROPIC_API_KEY=sk-ant-..."
)
raise SystemExit(1)
@staticmethod
def _clamp_rating(value, default: int = 3, lo: int = 1, hi: int = 5) -> int:
"""Clamp a rating value to [lo, hi] integers."""
try:
return max(lo, min(hi, int(value)))
except (ValueError, TypeError):
return default
def _parse_rating(self, draft_name: str, data: dict) -> Rating:
"""Parse a rating from compact JSON keys."""
return Rating(
draft_name=draft_name,
novelty=self._clamp_rating(data.get("n", data.get("novelty", 3))),
maturity=self._clamp_rating(data.get("m", data.get("maturity", 3))),
overlap=self._clamp_rating(data.get("o", data.get("overlap", 3))),
momentum=self._clamp_rating(data.get("mo", data.get("momentum", 3))),
relevance=self._clamp_rating(data.get("r", data.get("relevance", 3))),
summary=data.get("s", data.get("summary", "")),
novelty_note=data.get("nn", data.get("novelty_note", "")),
maturity_note=data.get("mn", data.get("maturity_note", "")),
overlap_note=data.get("on", data.get("overlap_note", "")),
momentum_note=data.get("mon", data.get("momentum_note", "")),
relevance_note=data.get("rn", data.get("relevance_note", "")),
categories=data.get("c", data.get("categories", [])),
rated_at=datetime.now(timezone.utc).isoformat(),
)
def _call_claude(self, prompt: str, max_tokens: int = 512, cheap: bool = False) -> tuple[str, int, int]:
"""Call Claude and return (text, input_tokens, output_tokens).
Args:
cheap: If True, use claude_model_cheap (Haiku) for lower cost.
"""
model = self.config.claude_model_cheap if cheap else self.config.claude_model
resp = self.client.messages.create(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
text = resp.content[0].text.strip()
return text, resp.usage.input_tokens, resp.usage.output_tokens
def _extract_json(self, text: str) -> str:
"""Strip markdown fences if present."""
text = text.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1]
if text.rstrip().endswith("```"):
text = text.rstrip()[:-3]
return text.strip()
def rate_draft(self, draft_name: str, use_cache: bool = True) -> Rating | None:
"""Analyze and rate a single draft."""
draft = self.db.get_draft(draft_name)
if draft is None:
console.print(f"[red]Draft not found: {draft_name}[/]")
return None
prompt = RATE_PROMPT_COMPACT.format(
doc_type=_doc_type_label(draft.source),
name=draft.name, title=draft.title, time=draft.date,
pages=draft.pages or "?",
abstract=draft.abstract[:2000],
categories=", ".join(CATEGORIES_SHORT),
)
phash = _prompt_hash(prompt)
# Check cache
if use_cache:
cached = self.db.get_cached_response(draft_name, phash)
if cached:
try:
data = json.loads(cached)
rating = self._parse_rating(draft_name, data)
self.db.upsert_rating(rating)
draft.categories = rating.categories
self.db.upsert_draft(draft)
return rating
except (json.JSONDecodeError, KeyError):
pass # Re-analyze if cache is corrupt
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=512)
text = self._extract_json(text)
data = json.loads(text)
# Cache the raw response
self.db.cache_response(
draft_name, phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
except (json.JSONDecodeError, anthropic.APIError, IndexError, KeyError) as e:
console.print(f"[red]Failed {draft_name}: {e}[/]")
return None
rating = self._parse_rating(draft_name, data)
self.db.upsert_rating(rating)
draft.categories = rating.categories
self.db.upsert_draft(draft)
return rating
def rate_batch(self, drafts: list[Draft], batch_size: int = 5) -> int:
"""Rate multiple drafts in batched API calls to save tokens."""
count = 0
for i in range(0, len(drafts), batch_size):
batch = drafts[i:i + batch_size]
# Build batch prompt
drafts_block = ""
for d in batch:
drafts_block += f"\n---\n{d.name} | {d.title} | {d.date} | {d.pages or '?'}pg\nAbstract: {d.abstract[:1500]}\n"
prompt = BATCH_PROMPT.format(
drafts_block=drafts_block,
categories=", ".join(CATEGORIES_SHORT),
)
phash = _prompt_hash(prompt)
try:
text, in_tok, out_tok = self._call_claude(
prompt, max_tokens=400 * len(batch)
)
text = self._extract_json(text)
results = json.loads(text)
if not isinstance(results, list):
results = [results]
for j, data in enumerate(results):
draft_name = data.get("name", batch[j].name if j < len(batch) else None)
if not draft_name:
continue
# Cache each result individually
self.db.cache_response(
draft_name, _prompt_hash(f"batch-{phash}-{draft_name}"),
self.config.claude_model, f"batch[{i}]", json.dumps(data),
in_tok // len(results), out_tok // len(results),
)
rating = self._parse_rating(draft_name, data)
self.db.upsert_rating(rating)
draft = self.db.get_draft(draft_name)
if draft:
draft.categories = rating.categories
self.db.upsert_draft(draft)
count += 1
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Batch {i//batch_size+1} failed: {e}[/]")
# Fallback: rate individually
for d in batch:
r = self.rate_draft(d.name)
if r:
count += 1
return count
def rate_all_unrated(self, limit: int = 300, batch_size: int = 5) -> int:
"""Rate all drafts that haven't been rated yet, using batching."""
unrated = self.db.unrated_drafts(limit=limit)
if not unrated:
console.print("All drafts already rated.")
return 0
console.print(f"Rating [bold]{len(unrated)}[/] drafts in batches of {batch_size}...")
count = 0
failures: list[tuple[str, str]] = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Analyzing...", total=len(unrated))
for i in range(0, len(unrated), batch_size):
batch = unrated[i:i + batch_size]
names = ", ".join(d.name.split("-")[-1][:12] for d in batch)
progress.update(task, description=f"Batch: {names}")
try:
n = self.rate_batch(batch, batch_size=batch_size)
count += n
except Exception as e:
batch_names = [d.name for d in batch]
for bn in batch_names:
failures.append((bn, str(e)))
console.print(f"[red]Batch failed: {e}[/]")
progress.advance(task, advance=len(batch))
in_tok, out_tok = self.db.total_tokens_used()
total_attempted = len(unrated)
console.print(
f"Rated [bold green]{count}[/] drafts "
f"| Total tokens used: {in_tok:,} in + {out_tok:,} out"
)
if failures:
console.print(
f"[yellow]Processed {count}/{total_attempted} drafts, "
f"{len(failures)} failure(s):[/]"
)
for name, err in failures[:20]:
console.print(f" [red]{name}[/]: {err}")
return count
def extract_ideas(self, draft_name: str, use_cache: bool = True) -> list[dict] | None:
"""Extract technical ideas from a single draft."""
draft = self.db.get_draft(draft_name)
if draft is None:
console.print(f"[red]Draft not found: {draft_name}[/]")
return None
text_excerpt = ""
if draft.full_text:
text_excerpt = draft.full_text[:3000]
prompt = EXTRACT_IDEAS_PROMPT.format(
doc_type=_doc_type_label(draft.source),
name=draft.name, title=draft.title,
pages=draft.pages or "?",
abstract=draft.abstract[:2000],
text_excerpt=text_excerpt,
)
phash = _prompt_hash("ideas-" + prompt)
if use_cache:
cached = self.db.get_cached_response(draft_name, phash)
if cached:
try:
ideas = json.loads(cached)
if isinstance(ideas, list):
self.db.insert_ideas(draft_name, ideas)
return ideas
except (json.JSONDecodeError, KeyError):
pass
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=1024)
text = self._extract_json(text)
ideas = json.loads(text)
if not isinstance(ideas, list):
ideas = [ideas]
self.db.cache_response(
draft_name, phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
self.db.insert_ideas(draft_name, ideas)
return ideas
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Failed ideas for {draft_name}: {e}[/]")
return None
def extract_ideas_batch(self, draft_names: list[str], cheap: bool = True) -> int:
"""Extract ideas from multiple drafts in a single API call.
Uses batching to share prompt overhead — ~5x fewer API calls,
~3x fewer tokens than individual extraction.
"""
drafts = []
for name in draft_names:
d = self.db.get_draft(name)
if d:
drafts.append(d)
if not drafts:
return 0
# Build compact batch block — abstract only (no full text for batch)
drafts_block = ""
for d in drafts:
drafts_block += f"\n---\n{d.name} | {d.title}\nAbstract: {d.abstract[:800]}\n"
prompt = BATCH_IDEAS_PROMPT.format(drafts_block=drafts_block)
phash = _prompt_hash(prompt)
try:
text, in_tok, out_tok = self._call_claude(
prompt, max_tokens=400 * len(drafts), cheap=cheap
)
text = self._extract_json(text)
results = json.loads(text)
if not isinstance(results, dict):
# Fallback: if it returned a list, try to match by order
if isinstance(results, list) and len(results) == len(drafts):
results = {d.name: r for d, r in zip(drafts, results)}
else:
return 0
count = 0
for d in drafts:
ideas = results.get(d.name, [])
if not isinstance(ideas, list):
ideas = [ideas] if ideas else []
self.db.cache_response(
d.name, _prompt_hash(f"batch-ideas-{phash}-{d.name}"),
self.config.claude_model_cheap if cheap else self.config.claude_model,
f"batch-ideas[{d.name}]", json.dumps(ideas),
in_tok // len(drafts), out_tok // len(drafts),
)
self.db.insert_ideas(d.name, ideas)
if ideas:
count += 1
return count
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Batch ideas failed: {e}[/]")
return 0
def extract_all_ideas(self, limit: int = 300, batch_size: int = 5, cheap: bool = True) -> int:
"""Extract ideas from all drafts that don't have them yet.
Args:
batch_size: Number of drafts per API call (default 5).
Set to 1 to use individual calls with full text.
cheap: Use Haiku model for ~10x lower cost (default True).
"""
missing = self.db.drafts_without_ideas(limit=limit)
if not missing:
console.print("All drafts already have extracted ideas.")
return 0
model_label = "Haiku" if cheap else "Sonnet"
if batch_size > 1:
console.print(
f"Extracting ideas from [bold]{len(missing)}[/] drafts "
f"(batches of {batch_size}, {model_label})..."
)
else:
console.print(f"Extracting ideas from [bold]{len(missing)}[/] drafts ({model_label})...")
count = 0
failures: list[tuple[str, str]] = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Extracting ideas...", total=len(missing))
if batch_size > 1:
for i in range(0, len(missing), batch_size):
batch = missing[i:i + batch_size]
names = ", ".join(n.split("-")[-1][:10] for n in batch)
progress.update(task, description=f"Batch: {names}")
try:
n = self.extract_ideas_batch(batch, cheap=cheap)
count += n
except Exception as e:
for bn in batch:
failures.append((bn, str(e)))
console.print(f"[red]Batch failed: {e}[/]")
progress.advance(task, advance=len(batch))
else:
for name in missing:
progress.update(task, description=f"Ideas: {name.split('-')[-1][:15]}")
try:
result = self.extract_ideas(name)
if result:
count += 1
except Exception as e:
failures.append((name, str(e)))
console.print(f"[red]Failed {name}: {e}[/]")
progress.advance(task)
total_attempted = len(missing)
in_tok, out_tok = self.db.total_tokens_used()
console.print(
f"Extracted ideas from [bold green]{count}[/] drafts "
f"({self.db.idea_count()} total ideas) "
f"| Tokens: {in_tok:,} in + {out_tok:,} out"
)
if failures:
console.print(
f"[yellow]Processed {count}/{total_attempted} drafts, "
f"{len(failures)} failure(s):[/]"
)
for name, err in failures[:20]:
console.print(f" [red]{name}[/]: {err}")
return count
def gap_analysis(self) -> list[dict]:
"""Analyze the full landscape and identify gaps."""
# Build compressed landscape summary
pairs = self.db.drafts_with_ratings(limit=500)
total = self.db.count_drafts()
# Category summary
from collections import defaultdict
cat_counts: dict[str, int] = defaultdict(int)
for _, rating in pairs:
for c in rating.categories:
cat_counts[c] += 1
category_summary = "\n".join(f"- {c}: {n} drafts" for c, n in
sorted(cat_counts.items(), key=lambda x: x[1], reverse=True))
# Top ideas (if available)
all_ideas = self.db.all_ideas()
idea_freq: dict[str, int] = defaultdict(int)
for idea in all_ideas:
idea_freq[idea["title"]] += 1
top_ideas_list = sorted(idea_freq.items(), key=lambda x: x[1], reverse=True)[:20]
if top_ideas_list:
top_ideas = "\n".join(f"- {title} ({count} drafts)" for title, count in top_ideas_list)
else:
top_ideas = "(No idea extraction data available yet)"
# Overlap summary — use clusters report if it exists
overlap_summary = "Multiple clusters of near-duplicate drafts exist, particularly in:\n"
for c, n in sorted(cat_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
overlap_summary += f"- {c} ({n} drafts, high internal overlap)\n"
prompt = GAP_ANALYSIS_PROMPT.format(
total=total,
category_summary=category_summary,
top_ideas=top_ideas,
overlap_summary=overlap_summary,
)
phash = _prompt_hash(prompt)
# Check cache
cached = self.db.get_cached_response("_landscape_", phash)
if cached:
try:
gaps = json.loads(cached)
if isinstance(gaps, list):
self.db.insert_gaps(gaps)
return gaps
except (json.JSONDecodeError, KeyError):
pass
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=4096)
text = self._extract_json(text)
gaps = json.loads(text)
if not isinstance(gaps, list):
gaps = [gaps]
self.db.cache_response(
"_landscape_", phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
self.db.insert_gaps(gaps)
return gaps
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Gap analysis failed: {e}[/]")
return []
def compare_drafts(self, draft_names: list[str], use_cache: bool = True) -> dict:
"""Compare multiple drafts and return structured comparison.
Returns dict with keys: text, drafts (list of names that were compared),
or a dict with key 'error' on failure.
"""
valid_names = []
parts = []
for name in draft_names:
draft = self.db.get_draft(name)
if draft is None:
console.print(f"[yellow]Skipping unknown draft: {name}[/]")
continue
valid_names.append(name)
parts.append(f"### {draft.title}\n**{name}**\n{draft.abstract}")
if len(parts) < 2:
return {"error": "Need at least 2 valid drafts to compare.", "drafts": valid_names}
prompt = COMPARE_PROMPT.format(
drafts_section="\n\n---\n\n".join(parts)
)
phash = _prompt_hash(prompt)
cache_key = "_compare_" + "_".join(sorted(valid_names))
# Check cache
if use_cache:
cached = self.db.get_cached_response(cache_key, phash)
if cached:
return {"text": cached, "drafts": valid_names}
try:
text, in_tok, out_tok = self._call_claude(prompt, max_tokens=2048)
# Cache the result
self.db.cache_response(
cache_key, phash, self.config.claude_model,
prompt, text, in_tok, out_tok,
)
return {"text": text, "drafts": valid_names}
except anthropic.APIError as e:
return {"error": f"API error: {e}", "drafts": valid_names}
def dedup_ideas(self, threshold: float = 0.85, dry_run: bool = True,
draft_name: str | None = None) -> dict:
"""Deduplicate ideas within each draft using embedding similarity.
For each draft, computes pairwise cosine similarity of idea embeddings.
Ideas above the threshold are merged (keeping the one with the longer
description).
Args:
threshold: Cosine similarity threshold for merging (default 0.85).
dry_run: If True, report what would be merged without deleting.
draft_name: If provided, only dedup ideas for this draft.
Returns:
Dict with keys: total_before, total_after, merged_count, examples.
"""
import numpy as np
import ollama as ollama_lib
client = ollama_lib.Client(host=self.config.ollama_url)
# Get list of drafts to process
if draft_name:
draft_names = [draft_name]
else:
rows = self.db.conn.execute(
"SELECT DISTINCT draft_name FROM ideas ORDER BY draft_name"
).fetchall()
draft_names = [r["draft_name"] for r in rows]
total_before = 0
merged_count = 0
examples = []
ids_to_delete = []
for dname in draft_names:
ideas = self.db.get_ideas_for_draft(dname)
if len(ideas) < 2:
total_before += len(ideas)
continue
total_before += len(ideas)
# Embed each idea: "title: description"
texts = [f"{idea['title']}: {idea['description']}" for idea in ideas]
try:
resp = client.embed(
model=self.config.ollama_embed_model, input=texts
)
vectors = [
np.array(v, dtype=np.float32)
for v in resp["embeddings"]
]
except Exception as e:
console.print(f"[red]Failed to embed ideas for {dname}: {e}[/]")
continue
# Track which ideas are already marked for deletion in this draft
deleted_in_draft = set()
# Compare all pairs within this draft
for i in range(len(ideas)):
if ideas[i]["id"] in deleted_in_draft:
continue
for j in range(i + 1, len(ideas)):
if ideas[j]["id"] in deleted_in_draft:
continue
# Cosine similarity
dot = np.dot(vectors[i], vectors[j])
norm = np.linalg.norm(vectors[i]) * np.linalg.norm(vectors[j])
sim = float(dot / norm) if norm > 0 else 0.0
if sim >= threshold:
# Keep the idea with the longer description
keep = ideas[i] if len(ideas[i]["description"]) >= len(ideas[j]["description"]) else ideas[j]
drop = ideas[j] if keep is ideas[i] else ideas[i]
ids_to_delete.append(drop["id"])
deleted_in_draft.add(drop["id"])
merged_count += 1
if len(examples) < 20:
examples.append({
"draft": dname,
"keep": keep["title"],
"drop": drop["title"],
"similarity": round(sim, 3),
})
if not dry_run:
for idea_id in ids_to_delete:
self.db.delete_idea(idea_id)
total_after = total_before - merged_count
return {
"total_before": total_before,
"total_after": total_after,
"merged_count": merged_count,
"examples": examples,
}
def score_idea_novelty(self, batch_size: int = 20, cheap: bool = True) -> dict:
"""Score all unscored ideas for novelty (1-5) using Claude.
Args:
batch_size: Number of ideas per API call (default 20).
cheap: Use Haiku model for lower cost (default True).
Returns:
Dict with keys: scored_count, avg_score, distribution.
"""
unscored = self.db.ideas_with_drafts(unscored_only=True)
if not unscored:
console.print("All ideas already scored.")
return {"scored_count": 0, "avg_score": 0.0, "distribution": {}}
model_label = "Haiku" if cheap else "Sonnet"
console.print(
f"Scoring [bold]{len(unscored)}[/] ideas for novelty "
f"(batches of {batch_size}, {model_label})..."
)
scored_count = 0
all_scores: list[int] = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
console=console,
) as progress:
task = progress.add_task("Scoring novelty...", total=len(unscored))
for i in range(0, len(unscored), batch_size):
batch = unscored[i:i + batch_size]
progress.update(task, description=f"Batch {i // batch_size + 1}")
# Build ideas block for prompt
ideas_block = ""
for idea in batch:
ideas_block += (
f"\n---\nID: {idea['id']}\n"
f"Draft: {idea['draft_title']}\n"
f"Idea: {idea['title']}\n"
f"Description: {idea['description']}\n"
)
prompt = SCORE_NOVELTY_PROMPT.format(ideas_block=ideas_block)
phash = _prompt_hash(prompt)
# Check cache
cached = self.db.get_cached_response("_novelty_score_", phash)
if cached:
try:
scores = json.loads(cached)
if isinstance(scores, dict):
batch_scores = {int(k): int(v) for k, v in scores.items()}
self.db.update_idea_scores_bulk(batch_scores)
scored_count += len(batch_scores)
all_scores.extend(batch_scores.values())
progress.advance(task, advance=len(batch))
continue
except (json.JSONDecodeError, KeyError, ValueError):
pass
try:
text, in_tok, out_tok = self._call_claude(
prompt, max_tokens=50 * len(batch), cheap=cheap
)
text = self._extract_json(text)
scores = json.loads(text)
if not isinstance(scores, dict):
console.print(f"[red]Batch {i // batch_size + 1}: unexpected response format[/]")
progress.advance(task, advance=len(batch))
continue
# Cache the raw response
self.db.cache_response(
"_novelty_score_", phash,
self.config.claude_model_cheap if cheap else self.config.claude_model,
prompt, text, in_tok, out_tok,
)
# Parse and store scores
batch_scores = {}
for k, v in scores.items():
try:
idea_id = int(k)
score = max(1, min(5, int(v)))
batch_scores[idea_id] = score
except (ValueError, TypeError):
continue
self.db.update_idea_scores_bulk(batch_scores)
scored_count += len(batch_scores)
all_scores.extend(batch_scores.values())
except (json.JSONDecodeError, anthropic.APIError) as e:
console.print(f"[red]Batch {i // batch_size + 1} failed: {e}[/]")
progress.advance(task, advance=len(batch))
# Build distribution
distribution: dict[int, int] = {}
for s in all_scores:
distribution[s] = distribution.get(s, 0) + 1
avg = sum(all_scores) / len(all_scores) if all_scores else 0.0
in_tok, out_tok = self.db.total_tokens_used()
console.print(
f"Scored [bold green]{scored_count}[/] ideas "
f"(avg: {avg:.1f}) | Tokens: {in_tok:,} in + {out_tok:,} out"
)
return {"scored_count": scored_count, "avg_score": round(avg, 2), "distribution": distribution}