Split cli.py (3,438 LOC) into modular command packages

Move 98 CLI commands from monolithic cli.py into organized modules:
- commands/common.py: shared utilities (console, pass_cfg_db, _get_config)
- commands/fetch.py: fetch, search, list, show, annotate, classify, authors, network
- commands/analysis.py: analyze, ask, compare, embed, ideas, gaps, refs, trends, etc.
- commands/reports.py: report group, viz group, wg group, export
- commands/admin.py: config, pipeline, observatory, monitor, auto-heal
- commands/proposals.py: draft-gen, intake

cli.py is now a slim 30-line entry point that registers all modules.
All command names, options, and behavior preserved exactly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 03:40:57 +01:00
parent 3fb17100d7
commit b10572c628
8 changed files with 3551 additions and 3406 deletions

View File

@@ -0,0 +1,913 @@
"""Config, pipeline, observatory, monitor, and auto-heal commands."""
from __future__ import annotations
from pathlib import Path
import click
from rich.table import Table
from .common import console, pass_cfg_db, _get_config
from ..config import Config
from ..db import Database
def register(main):
"""Register all admin commands with the main CLI group."""
main.add_command(config_cmd)
main.add_command(pipeline)
main.add_command(observatory)
main.add_command(monitor)
main.add_command(auto)
# ── config ───────────────────────────────────────────────────────────────────
@click.command("config")
@click.option("--set", "set_key", nargs=2, help="Set a config key (e.g. --set claude_model claude-opus-4-20250514)")
@click.option("--show", is_flag=True, help="Show effective config with env var sources noted")
def config_cmd(set_key: tuple[str, str] | None, show: bool):
"""Show or modify configuration."""
from dataclasses import asdict
cfg = _get_config()
if set_key:
key, value = set_key
if hasattr(cfg, key):
# Coerce types
current = getattr(cfg, key)
if isinstance(current, float):
value = float(value)
elif isinstance(current, int):
value = int(value)
elif isinstance(current, list):
import json
value = json.loads(value)
setattr(cfg, key, value)
cfg.save()
console.print(f"Set [bold]{key}[/] = {value}")
else:
console.print(f"[red]Unknown config key: {key}[/]")
else:
from dataclasses import asdict
env_sources = cfg.env_sources()
for key, val in asdict(cfg).items():
source_note = ""
if key in env_sources:
source_note = f" [yellow](from ${env_sources[key]})[/]"
console.print(f" [bold]{key}:[/] {val}{source_note}")
if env_sources:
console.print(f"\n [dim]({len(env_sources)} value(s) overridden by environment variables)[/]")
# Note about ANTHROPIC_API_KEY
import os
if os.environ.get("ANTHROPIC_API_KEY"):
console.print(" [dim]ANTHROPIC_API_KEY is set in environment[/]")
else:
console.print(" [dim]ANTHROPIC_API_KEY is NOT set in environment[/]")
# ── pipeline ────────────────────────────────────────────────────────────────
@click.group()
def pipeline():
"""Gap-to-Draft generation pipeline."""
pass
@pipeline.command("context")
@click.argument("gap_topic")
def pipeline_context(gap_topic: str):
"""Preview assembled context for a gap topic (dry run)."""
from ..pipeline import ContextBuilder
cfg = _get_config()
db = Database(cfg)
try:
builder = ContextBuilder(cfg, db)
ctx = builder.build_context(gap_topic)
console.print(f"\n[bold]Context for gap: {gap_topic}[/]\n")
gap = ctx.get("gap")
if gap:
console.print(f"[cyan]Gap:[/] {gap.get('topic', '?')}")
console.print(f" {gap.get('description', '')[:200]}")
console.print(f" Severity: {gap.get('severity', '?')}")
ideas = ctx.get("ideas", [])
console.print(f"\n[cyan]Convergent ideas:[/] {len(ideas)}")
for idea in ideas[:10]:
console.print(f" - {idea.get('title', '?')}: {idea.get('description', '')[:80]}")
rfcs = ctx.get("rfc_foundations", [])
console.print(f"\n[cyan]RFC foundations:[/] {len(rfcs)}")
for ref_id, count in rfcs[:10]:
console.print(f" - RFC {ref_id} (cited by {count} drafts)")
similar = ctx.get("similar_drafts", [])
console.print(f"\n[cyan]Similar existing drafts:[/] {len(similar)}")
for name, score in similar[:8]:
console.print(f" - {name} (similarity: {score:.3f})")
top_rated = ctx.get("top_rated", [])
console.print(f"\n[cyan]Top-rated in category:[/] {len(top_rated)}")
wg_ctx = ctx.get("wg_context", [])
adopted = [w for w in wg_ctx if w.get("wg_adopted")]
console.print(f"\n[cyan]WG context:[/] {len(adopted)} WG-adopted drafts")
vision = ctx.get("ecosystem_vision", "")
if vision:
console.print(f"\n[cyan]Ecosystem vision:[/] {len(vision)} chars loaded")
finally:
db.close()
@pipeline.command("generate")
@click.argument("gap_topic")
@click.option("--cheap/--quality", default=False, help="Use Haiku (cheap) or Sonnet (quality)")
@click.option("--dry-run", is_flag=True, help="Show outline only, don't generate sections")
@click.option("--family", "family_name", default="", help="Family name for multi-draft generation")
def pipeline_generate(gap_topic: str, cheap: bool, dry_run: bool, family_name: str):
"""Generate a single draft from a gap topic."""
from ..analyzer import Analyzer
from ..pipeline import PipelineGenerator, ContextBuilder
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
builder = ContextBuilder(cfg, db)
generator = PipelineGenerator(cfg, db, analyzer)
ctx = builder.build_context(gap_topic)
console.print(f"[bold]Generating draft for gap: {gap_topic}[/]")
outline = generator.generate_outline(ctx, cheap=cheap)
console.print(f" Title: [cyan]{outline.get('title', '?')}[/]")
console.print(f" Sections: {len(outline.get('sections', []))}")
console.print(f" Target WG: {outline.get('target_wg', '?')}")
if dry_run:
import json
console.print("\n[bold]Outline (dry run):[/]")
console.print(json.dumps(outline, indent=2))
return
result = generator.generate_full(gap_topic, cheap=cheap)
console.print(f"\n[bold green]Draft generated![/]")
console.print(f" ID: {result.get('id', '?')}")
console.print(f" Draft name: {result.get('draft_name', '?')}")
# Export text file
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
draft_name = result.get("draft_name", "draft-unknown")
out_path = output_dir / f"{draft_name}.txt"
if result.get("full_text"):
out_path.write_text(result["full_text"])
console.print(f" Saved: {out_path}")
finally:
db.close()
@pipeline.command("family")
@click.option("--name", "family_name", default="agent-ecosystem", help="Family name")
@click.option("--cheap/--quality", default=False, help="Use Haiku (cheap) or Sonnet (quality)")
def pipeline_family(family_name: str, cheap: bool):
"""Generate the full 5-draft ecosystem family."""
from ..analyzer import Analyzer
from ..pipeline import FamilyCoordinator
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
coordinator = FamilyCoordinator(cfg, db, analyzer)
console.print(f"[bold]Generating draft family: {family_name}[/]\n")
results = coordinator.generate_family(family_name=family_name, cheap=cheap)
console.print(f"\n[bold green]Generated {len(results)} drafts![/]")
# Export all
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
for r in results:
draft_name = r.get("draft_name", "draft-unknown")
if r.get("full_text"):
out_path = output_dir / f"{draft_name}.txt"
out_path.write_text(r["full_text"])
console.print(f" [green]{r.get('family_role', '?')}[/] -> {out_path}")
# Family summary
summary_path = output_dir / "family-summary.md"
lines = [f"# Draft Family: {family_name}\n"]
for r in results:
lines.append(f"## {r.get('family_role', '?')}: {r.get('title', '?')}")
lines.append(f"- Draft: `{r.get('draft_name', '?')}`")
lines.append(f"- Gap: {r.get('gap_topic', '?')}")
lines.append(f"- Sections: {len(r.get('sections', []))}")
lines.append("")
summary_path.write_text("\n".join(lines))
console.print(f"\n Summary: {summary_path}")
# Consistency check
consistency = coordinator.check_consistency(family_name)
if consistency.get("issues"):
console.print(f"\n[yellow]Consistency issues:[/]")
for issue in consistency["issues"]:
console.print(f" - {issue}")
else:
console.print(f"\n[green]No consistency issues found[/]")
finally:
db.close()
@pipeline.command("quality")
@click.argument("draft_id", type=int)
def pipeline_quality(draft_id: int):
"""Run quality gates on a generated draft."""
from ..analyzer import Analyzer
from ..pipeline import QualityGates
cfg = _get_config()
db = Database(cfg)
analyzer = Analyzer(cfg, db)
try:
gates = QualityGates(cfg, db, analyzer)
console.print(f"[bold]Running quality gates on draft #{draft_id}[/]\n")
results = gates.run_all(draft_id)
for gate_name, result in results.items():
status = "[green]PASS[/]" if result["passed"] else "[red]FAIL[/]"
console.print(f" {status} {gate_name}: {result.get('details', '')[:100]}")
if "score" in result:
console.print(f" Score: {result['score']:.2f}")
finally:
db.close()
@pipeline.command("status")
def pipeline_status():
"""Show pipeline health: processing stages, generated drafts, and API cost."""
cfg = _get_config()
db = Database(cfg)
try:
# Pipeline health overview
total = db.count_drafts()
rated_count = len(db.drafts_with_ratings(limit=10000))
unrated = len(db.unrated_drafts(limit=10000))
unembedded = len(db.drafts_without_embeddings(limit=10000))
embedded_count = total - unembedded
no_ideas = len(db.drafts_without_ideas(limit=10000))
ideas_count = total - no_ideas
idea_total = db.idea_count()
gap_count = len(db.all_gaps())
input_tok, output_tok = db.total_tokens_used()
est_cost = (input_tok * 3.0 / 1_000_000) + (output_tok * 15.0 / 1_000_000)
# Last update
snapshots = db.get_snapshots(limit=1)
last_update = snapshots[0]["snapshot_at"][:19] if snapshots else "never"
console.print("\n[bold]Pipeline Status[/]\n")
console.print(f" Total documents: [bold]{total}[/]")
console.print(f" Last update: {last_update}")
console.print()
# Stage table
stage_table = Table(title="Processing Stages")
stage_table.add_column("Stage", width=20)
stage_table.add_column("Done", justify="right", width=8)
stage_table.add_column("Missing", justify="right", width=8)
stage_table.add_column("Progress", width=20)
def bar(done, total_n):
pct = int(done / total_n * 100) if total_n > 0 else 0
filled = pct // 5
return f"[green]{'#' * filled}[/][dim]{'.' * (20 - filled)}[/] {pct}%"
stage_table.add_row("Rated", str(rated_count), str(unrated), bar(rated_count, total))
stage_table.add_row("Embedded", str(embedded_count), str(unembedded), bar(embedded_count, total))
stage_table.add_row("Ideas extracted", str(ideas_count), str(no_ideas), bar(ideas_count, total))
console.print(stage_table)
console.print(f"\n Total ideas: [bold]{idea_total}[/]")
console.print(f" Gaps identified: [bold]{gap_count}[/]")
console.print(f"\n API tokens: {input_tok:,} in + {output_tok:,} out")
console.print(f" Estimated cost: [bold]${est_cost:.2f}[/]")
# Generated drafts
gen_drafts = db.get_generated_drafts()
if gen_drafts:
console.print()
table = Table(title=f"Generated Drafts ({len(gen_drafts)})")
table.add_column("ID", justify="right", width=4)
table.add_column("Draft Name", style="cyan")
table.add_column("Gap Topic")
table.add_column("Family", width=15)
table.add_column("Status", width=10)
table.add_column("Quality", justify="right", width=7)
table.add_column("Created", width=10)
for d in gen_drafts:
table.add_row(
str(d["id"]),
d["draft_name"],
d["gap_topic"][:30],
d.get("family_name", ""),
d.get("status", "?"),
f"{d.get('quality_score', 0):.1f}" if d.get("quality_score") else "-",
(d.get("created_at") or "")[:10],
)
console.print(table)
finally:
db.close()
@pipeline.command("export")
@click.argument("draft_id", type=int)
@click.option("--output", "-o", help="Output file path")
def pipeline_export(draft_id: int, output: str | None):
"""Export a generated draft as I-D text."""
cfg = _get_config()
db = Database(cfg)
try:
draft = db.get_generated_draft(draft_id)
if not draft:
console.print(f"[red]Draft #{draft_id} not found[/]")
return
text = draft.get("full_text", "")
if not text:
console.print(f"[red]Draft #{draft_id} has no generated text[/]")
return
if output:
out_path = Path(output)
else:
output_dir = Path(cfg.data_dir) / "reports" / "generated-drafts"
output_dir.mkdir(parents=True, exist_ok=True)
out_path = output_dir / f"{draft['draft_name']}.txt"
out_path.write_text(text)
console.print(f"Exported: [bold green]{out_path}[/]")
finally:
db.close()
# ── observatory ─────────────────────────────────────────────────────────────
@click.group()
def observatory():
"""Living Standards Observatory — monitor AI standards across bodies."""
pass
@observatory.command("update")
@click.option("--source", "-s", default=None, help="Comma-separated sources (e.g. ietf,w3c)")
@click.option("--full/--delta", default=False, help="Full refresh or delta only")
@click.option("--dry-run", is_flag=True, default=False, help="Show what would happen without making changes")
def observatory_update(source: str | None, full: bool, dry_run: bool):
"""Fetch, analyze, and update the observatory."""
from ..observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
if dry_run:
obs = Observatory(cfg, db)
else:
from ..analyzer import Analyzer
analyzer = Analyzer(cfg, db)
obs = Observatory(cfg, db, analyzer)
sources = source.split(",") if source else None
mode = "full" if full else "delta"
console.print(f"[bold]Observatory update[/] ({mode}{' [DRY RUN]' if dry_run else ''})")
result = obs.update(sources=sources, full=full, dry_run=dry_run)
if not dry_run:
console.print(f"\n[bold green]Update complete![/]")
console.print(f" New docs: {result.get('new_docs', 0)}")
console.print(f" Analyzed: {result.get('analyzed', 0)}")
console.print(f" Embedded: {result.get('embedded', 0)}")
console.print(f" Ideas extracted: {result.get('ideas', 0)}")
if result.get("gaps_changed"):
console.print(f" Gaps re-analyzed: yes")
if result.get("errors"):
console.print(f"\n [yellow]Errors ({len(result['errors'])}):[/]")
for err in result["errors"]:
console.print(f" - {err}")
finally:
db.close()
@observatory.command("dashboard")
def observatory_dashboard():
"""Regenerate the static dashboard site."""
from ..dashboard import DashboardGenerator
cfg = _get_config()
db = Database(cfg)
try:
gen = DashboardGenerator(cfg, db)
path = gen.generate()
console.print(f"[bold green]Dashboard generated:[/] {path}")
console.print(f" Open: file://{path}/index.html")
finally:
db.close()
@observatory.command("status")
def observatory_status():
"""Show observatory status — doc counts, sources, last update."""
from ..observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
obs = Observatory(cfg, db)
status = obs.status()
console.print(f"\n[bold]Observatory Status[/]\n")
console.print(f" Total documents: [bold]{status.get('total_docs', 0)}[/]")
console.print(f" Unrated: {status.get('unrated', 0)}")
console.print(f" Unembedded: {status.get('unembedded', 0)}")
console.print(f" Gaps: {status.get('gaps', 0)}")
sources = status.get("sources", {})
if sources:
console.print(f"\n [bold]Sources:[/]")
for name, count in sources.items():
console.print(f" {name}: {count} docs")
last_update = status.get("last_update")
if last_update:
console.print(f"\n Last update: {last_update[:10]}")
console.print(f" Snapshots: {status.get('snapshots', 0)}")
finally:
db.close()
@observatory.command("snapshot")
def observatory_snapshot():
"""Record current state as a snapshot."""
cfg = _get_config()
db = Database(cfg)
try:
snap_id = db.create_snapshot()
gaps = db.all_gaps()
if gaps:
db.record_gap_history(snap_id, gaps)
console.print(f"[bold green]Snapshot #{snap_id} created[/] ({db.count_drafts()} docs, {len(gaps)} gaps)")
finally:
db.close()
@observatory.command("diff")
@click.option("--since", help="Show changes since this date (YYYY-MM-DD)")
def observatory_diff(since: str | None):
"""Show what changed since a date."""
from ..observatory import Observatory
cfg = _get_config()
db = Database(cfg)
try:
obs = Observatory(cfg, db)
result = obs.diff(since=since)
console.print(f"\n[bold]Observatory Diff[/]")
if since:
console.print(f" Since: {result.get('since', since)}")
new_docs = result.get("new_docs", [])
console.print(f" New documents: {result.get('new_doc_count', len(new_docs))}")
gap_changes = result.get("gap_changes", [])
console.print(f" Gap history entries: {len(gap_changes)}")
if new_docs:
console.print(f"\n [bold]New documents:[/]")
for doc in new_docs[:20]:
d = dict(doc) if not isinstance(doc, dict) else doc
console.print(f" [{d.get('source', '?')}] {d.get('name', '?')}: {d.get('title', '')[:60]}")
finally:
db.close()
# ── monitor ─────────────────────────────────────────────────────────────
@click.group()
def monitor():
"""Monitor IETF Datatracker for new AI/agent drafts."""
pass
@monitor.command("run")
@click.option("--analyze/--no-analyze", default=True, help="Analyze new drafts")
@click.option("--embed/--no-embed", default=True, help="Generate embeddings")
@click.option("--ideas/--no-ideas", default=True, help="Extract ideas")
def monitor_run(analyze, embed, ideas):
"""Run one monitoring cycle: fetch -> analyze -> embed -> ideas."""
from ..analyzer import Analyzer
from ..embeddings import Embedder
from ..fetcher import Fetcher
cfg = _get_config()
db = Database(cfg)
run_id = db.start_monitor_run()
stats = {
"new_drafts_found": 0,
"drafts_analyzed": 0,
"drafts_embedded": 0,
"ideas_extracted": 0,
}
try:
console.print("[bold]Monitor run started[/]")
# Determine since date from last successful run
last_run = db.get_last_successful_run()
since = last_run["completed_at"][:10] if last_run and last_run.get("completed_at") else cfg.fetch_since
console.print(f" Fetching drafts since: [cyan]{since}[/]")
# Fetch new drafts
fetcher = Fetcher(cfg)
try:
existing_count = db.count_drafts()
drafts = fetcher.search_drafts(keywords=list(cfg.search_keywords), since=since)
for draft in drafts:
db.upsert_draft(draft)
# Download text for any missing
missing_text = db.drafts_without_text()
if missing_text:
console.print(f" Downloading text for [bold]{len(missing_text)}[/] drafts...")
texts = fetcher.download_texts(missing_text)
for name, text in texts.items():
draft = db.get_draft(name)
if draft:
draft.full_text = text
db.upsert_draft(draft)
finally:
fetcher.close()
new_count = db.count_drafts() - existing_count
stats["new_drafts_found"] = max(new_count, 0)
console.print(f" New drafts found: [bold green]{stats['new_drafts_found']}[/]")
# Analyze unrated drafts
if analyze:
unrated = db.unrated_drafts(limit=200)
if unrated:
console.print(f" Analyzing [bold]{len(unrated)}[/] unrated drafts...")
analyzer = Analyzer(cfg, db)
count = analyzer.rate_all_unrated(limit=200)
stats["drafts_analyzed"] = count
console.print(f" Analyzed: [bold green]{count}[/]")
# Embed missing drafts
if embed:
missing_embed = db.drafts_without_embeddings(limit=500)
if missing_embed:
console.print(f" Embedding [bold]{len(missing_embed)}[/] drafts...")
embedder = Embedder(cfg, db)
count = embedder.embed_all_missing()
stats["drafts_embedded"] = count
console.print(f" Embedded: [bold green]{count}[/]")
# Extract ideas
if ideas:
missing_ideas = db.drafts_without_ideas(limit=500)
if missing_ideas:
console.print(f" Extracting ideas from [bold]{len(missing_ideas)}[/] drafts...")
analyzer = Analyzer(cfg, db)
count = analyzer.extract_all_ideas(limit=500, batch_size=5, cheap=True)
stats["ideas_extracted"] = count
console.print(f" Ideas extracted from: [bold green]{count}[/] drafts")
db.complete_monitor_run(run_id, stats)
console.print("\n[bold green]Monitor run completed successfully[/]")
except Exception as e:
db.fail_monitor_run(run_id, str(e))
console.print(f"\n[bold red]Monitor run failed:[/] {e}")
raise
finally:
db.close()
@monitor.command("status")
def monitor_status():
"""Show monitoring status and recent runs."""
cfg = _get_config()
db = Database(cfg)
try:
runs = db.get_monitor_runs(limit=20)
last = db.get_last_successful_run()
# Unprocessed counts
unrated = len(db.unrated_drafts(limit=9999))
unembedded = len(db.drafts_without_embeddings(limit=9999))
no_ideas = len(db.drafts_without_ideas(limit=9999))
console.print("\n[bold]Monitor Status[/]\n")
if last:
console.print(f" Last successful run: [green]{last['completed_at']}[/]")
console.print(f" Duration: {last['duration_seconds']:.1f}s")
console.print(f" New drafts: {last['new_drafts_found']}")
else:
console.print(" [yellow]No successful runs yet[/]")
console.print(f"\n[bold]Unprocessed[/]")
console.print(f" Unrated: [{'yellow' if unrated > 0 else 'green'}]{unrated}[/]")
console.print(f" Unembedded: [{'yellow' if unembedded > 0 else 'green'}]{unembedded}[/]")
console.print(f" No ideas: [{'yellow' if no_ideas > 0 else 'green'}]{no_ideas}[/]")
if runs:
console.print(f"\n[bold]Recent Runs[/] ({len(runs)} total)\n")
table = Table()
table.add_column("#", justify="right", width=4)
table.add_column("Started", width=20)
table.add_column("Duration", justify="right", width=8)
table.add_column("Status", width=10)
table.add_column("New", justify="right", width=5)
table.add_column("Analyzed", justify="right", width=8)
table.add_column("Embedded", justify="right", width=8)
table.add_column("Ideas", justify="right", width=6)
for r in runs:
status_style = {"completed": "green", "failed": "red", "running": "yellow"}.get(r["status"], "dim")
table.add_row(
str(r["id"]),
r["started_at"][:19] if r["started_at"] else "",
f"{r['duration_seconds']:.1f}s" if r["duration_seconds"] else "-",
f"[{status_style}]{r['status']}[/{status_style}]",
str(r["new_drafts_found"]),
str(r["drafts_analyzed"]),
str(r["drafts_embedded"]),
str(r["ideas_extracted"]),
)
console.print(table)
finally:
db.close()
# ── auto ─────────────────────────────────────────────────────────────────────
@click.command("auto")
@click.option("--cost-limit", default=2.0, help="Auto-approve operations under this USD amount (default: $2)")
@click.option("--yes", "-y", is_flag=True, help="Skip all confirmation prompts")
@click.option("--dry-run", is_flag=True, help="Show what would be done without doing it")
@click.option("--source", "-s", default=None, help="Limit to specific source (ietf,w3c,etsi,iso,itu)")
def auto(cost_limit: float, yes: bool, dry_run: bool, source: str | None):
"""Auto-heal: fetch, analyze, embed, extract ideas, and update gaps.
Automatically processes all unrated, unembedded, and idea-less drafts
across all sources. Uses cheap models (Haiku) for bulk operations.
Operations estimated above --cost-limit require confirmation.
Examples:
ietf auto # run full pipeline, auto-approve under $2
ietf auto --dry-run # show plan without executing
ietf auto -s iso # only process ISO drafts
ietf auto --cost-limit 5 # raise approval threshold to $5
ietf auto -y # skip all prompts (for cron)
"""
cfg = Config.load()
db = Database(cfg)
try:
_auto_heal(cfg, db, cost_limit=cost_limit, yes=yes, dry_run=dry_run, source_filter=source)
finally:
db.close()
def _estimate_cost(n_drafts: int, operation: str) -> float:
"""Estimate USD cost for an operation. Conservative estimates."""
costs = {
"analyze_cheap": n_drafts * 0.0005,
"analyze_quality": n_drafts * 0.005,
"ideas_cheap": n_drafts * 0.001,
"ideas_quality": n_drafts * 0.008,
"gaps": 0.05,
"embed": 0.0,
"authors": 0.0,
"fetch": 0.0,
}
return costs.get(operation, 0.0)
def _auto_heal(cfg, db, cost_limit: float, yes: bool, dry_run: bool, source_filter: str | None):
"""Run the full auto-heal pipeline."""
import time as _time
from rich.panel import Panel
steps: list[dict] = []
total_cost = 0.0
# ── Step 1: Fetch new drafts from all sources ──
sources = [source_filter] if source_filter else cfg.observatory_sources
steps.append({
"name": f"Fetch new drafts from {', '.join(sources)}",
"sources": sources,
"cost": 0.0,
"action": "fetch",
})
# ── Step 2: Analyze unrated drafts ──
unrated = db.unrated_drafts(limit=10000)
if source_filter:
unrated = [d for d in unrated if (d.source or "ietf") == source_filter]
n_unrated = len(unrated)
analyze_cost = _estimate_cost(n_unrated, "analyze_cheap")
steps.append({
"name": f"Analyze {n_unrated} unrated drafts (Haiku)",
"count": n_unrated,
"cost": analyze_cost,
"action": "analyze",
})
total_cost += analyze_cost
# ── Step 3: Fetch authors ──
missing_authors = db.conn.execute(
"SELECT COUNT(*) FROM drafts WHERE name NOT IN (SELECT DISTINCT draft_name FROM draft_authors)"
).fetchone()[0]
steps.append({
"name": f"Fetch authors for {missing_authors} drafts",
"count": missing_authors,
"cost": 0.0,
"action": "authors",
})
# ── Step 4: Embed missing drafts ──
missing_embed = db.drafts_without_embeddings(limit=10000)
if source_filter:
source_names = {row[0] for row in db.conn.execute(
"SELECT name FROM drafts WHERE source = ?", (source_filter,)
).fetchall()}
missing_embed = [n for n in missing_embed if n in source_names]
n_embed = len(missing_embed)
steps.append({
"name": f"Embed {n_embed} drafts (Ollama, free)",
"count": n_embed,
"cost": 0.0,
"action": "embed",
})
# ── Step 5: Extract ideas ──
missing_ideas = db.drafts_without_ideas(limit=10000)
if source_filter:
if not source_names:
source_names = {row[0] for row in db.conn.execute(
"SELECT name FROM drafts WHERE source = ?", (source_filter,)
).fetchall()}
missing_ideas = [n for n in missing_ideas if n in source_names]
n_ideas = len(missing_ideas)
ideas_cost = _estimate_cost(n_ideas, "ideas_cheap")
steps.append({
"name": f"Extract ideas from {n_ideas} drafts (Haiku)",
"count": n_ideas,
"cost": ideas_cost,
"action": "ideas",
})
total_cost += ideas_cost
# ── Step 6: Refresh gaps ──
gap_cost = _estimate_cost(0, "gaps")
steps.append({
"name": "Refresh gap analysis",
"cost": gap_cost,
"action": "gaps",
})
total_cost += gap_cost
# ── Show plan ──
plan_lines = []
for s in steps:
count = s.get("count", 1)
if count == 0:
plan_lines.append(f" [dim]SKIP[/] {s['name']}")
else:
cost_str = f" [yellow]~${s['cost']:.2f}[/]" if s["cost"] > 0 else ""
plan_lines.append(f" [green]RUN[/] {s['name']}{cost_str}")
auto_approved = total_cost <= cost_limit
plan_lines.append(f"\n [bold]Estimated total cost: ${total_cost:.2f}[/]")
if auto_approved:
plan_lines.append(f" [green]Auto-approved (under ${cost_limit:.2f} limit)[/]")
else:
plan_lines.append(f" [yellow]Requires approval (over ${cost_limit:.2f} limit)[/]")
console.print(Panel("\n".join(plan_lines), title="Auto-Heal Plan"))
if dry_run:
console.print("[bold yellow]DRY RUN[/] — no changes made.")
return
# ── Approval ──
if not auto_approved and not yes:
if not click.confirm(f"Estimated cost ${total_cost:.2f} exceeds ${cost_limit:.2f} limit. Proceed?"):
console.print("[yellow]Aborted.[/]")
return
# ── Execute ──
start = _time.time()
for step in steps:
action = step["action"]
count = step.get("count", 0)
if action == "fetch":
console.print(f"\n[bold cyan]>>> Fetching from {step['sources']}...[/]")
from ..sources import get_fetcher
from ..observatory import _doc_to_draft
for src_name in step["sources"]:
try:
fetcher = get_fetcher(src_name, cfg)
before = db.count_drafts()
results = fetcher.search(keywords=cfg.search_keywords)
for doc in results:
db.upsert_draft(_doc_to_draft(doc))
after = db.count_drafts()
new = after - before
console.print(f" [{src_name}] +{new} new drafts")
fetcher.close()
except Exception as e:
console.print(f" [{src_name}] [red]Error: {e}[/]")
elif action == "analyze" and count > 0:
console.print(f"\n[bold cyan]>>> Analyzing {count} drafts (Haiku)...[/]")
from ..analyzer import Analyzer
analyzer = Analyzer(cfg, db)
orig_model = cfg.claude_model
cfg.claude_model = cfg.claude_model_cheap
try:
done = analyzer.rate_all_unrated(limit=count)
console.print(f" Analyzed [bold green]{done}[/] drafts")
finally:
cfg.claude_model = orig_model
elif action == "authors" and count > 0:
console.print(f"\n[bold cyan]>>> Fetching authors for {count} drafts...[/]")
from ..authors import AuthorNetwork
author_net = AuthorNetwork(cfg, db)
done = author_net.fetch_all_authors()
console.print(f" Fetched authors for [bold green]{done}[/] drafts")
elif action == "embed" and count > 0:
console.print(f"\n[bold cyan]>>> Embedding {count} drafts (Ollama)...[/]")
from ..embeddings import Embedder
with Embedder(cfg, db) as embedder:
done = embedder.embed_all_missing()
console.print(f" Embedded [bold green]{done}[/] drafts")
elif action == "ideas" and count > 0:
console.print(f"\n[bold cyan]>>> Extracting ideas from {count} drafts (Haiku)...[/]")
from ..analyzer import Analyzer
analyzer = Analyzer(cfg, db)
done = analyzer.extract_all_ideas(limit=count, batch_size=5, cheap=True)
console.print(f" Extracted ideas from [bold green]{done}[/] drafts")
elif action == "gaps":
console.print(f"\n[bold cyan]>>> Refreshing gap analysis...[/]")
from ..analyzer import Analyzer
analyzer = Analyzer(cfg, db)
gaps = analyzer.gap_analysis()
if gaps:
console.print(f" Found [bold green]{len(gaps)}[/] gaps")
elapsed = _time.time() - start
console.print(f"\n[bold green]Auto-heal complete![/] ({elapsed:.1f}s, ~${total_cost:.2f})")
# Show final counts
total = db.count_drafts()
rated = db.conn.execute("SELECT COUNT(*) FROM ratings").fetchone()[0]
embedded = db.conn.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0]
idea_count = db.conn.execute("SELECT COUNT(*) FROM ideas").fetchone()[0]
gap_count = db.conn.execute("SELECT COUNT(*) FROM gaps").fetchone()[0]
console.print(f" Drafts: {total} | Rated: {rated} | Embedded: {embedded} | Ideas: {idea_count} | Gaps: {gap_count}")
by_source = db.conn.execute(
"SELECT source, COUNT(*) FROM drafts GROUP BY source ORDER BY COUNT(*) DESC"
).fetchall()
source_str = " | ".join(f"{s}: {c}" for s, c in by_source)
console.print(f" Sources: {source_str}")