"""Context builder — assembles rich context for draft generation from DB queries.""" from __future__ import annotations import json from pathlib import Path import numpy as np from rich.console import Console from ..config import Config from ..db import Database console = Console() def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: dot = np.dot(a, b) norm = np.linalg.norm(a) * np.linalg.norm(b) if norm == 0: return 0.0 return float(dot / norm) class ContextBuilder: def __init__(self, config: Config, db: Database): self.config = config self.db = db def build_context(self, gap_topic: str) -> dict: """Assemble full context for a gap topic. All DB queries, zero Claude calls.""" gap = self._find_gap(gap_topic) if not gap: console.print(f"[yellow]No gap found matching '{gap_topic}', using topic as-is[/]") gap = { "id": 0, "topic": gap_topic, "description": gap_topic, "category": "", "evidence": "", "severity": "medium", } ideas = self._convergent_ideas(gap) rfcs = self._rfc_foundations(gap.get("category", "")) similar = self._similar_drafts(gap["description"]) top_rated = self._top_rated_in_category(gap.get("category", "")) wg_context = self._wg_context() ecosystem = self._ecosystem_vision() siblings = self._sibling_context(gap_topic) return { "gap": gap, "convergent_ideas": ideas, "rfc_foundations": rfcs, "similar_drafts": similar, "top_rated": top_rated, "wg_context": wg_context, "ecosystem_vision": ecosystem, "sibling_context": siblings, } def _find_gap(self, topic: str) -> dict | None: """Find a gap by topic string (fuzzy match).""" gaps = self.db.all_gaps() topic_lower = topic.lower() # Exact match first for g in gaps: if g["topic"].lower() == topic_lower: return g # Substring match for g in gaps: if topic_lower in g["topic"].lower() or topic_lower in g["description"].lower(): return g # Word overlap match topic_words = set(topic_lower.split()) best = None best_score = 0 for g in gaps: gap_words = set(g["topic"].lower().split()) | set(g["description"].lower().split()) overlap = len(topic_words & gap_words) if overlap > best_score: best_score = overlap best = g return best if best_score >= 2 else None def _convergent_ideas(self, gap: dict, limit: int = 20) -> list[dict]: """Find ideas that converge on this gap topic via keyword matching.""" all_ideas = self.db.all_ideas() if not all_ideas: return [] # Build search terms from gap topic + description search_text = (gap["topic"] + " " + gap["description"]).lower() search_words = set(search_text.split()) # Remove common words stop_words = {"the", "a", "an", "and", "or", "in", "of", "for", "to", "is", "are", "that", "this", "with", "not", "by", "on", "at", "from", "as", "be", "it", "no", "but", "has", "have", "do", "does"} search_words -= stop_words scored = [] for idea in all_ideas: idea_text = (idea["title"] + " " + idea["description"]).lower() idea_words = set(idea_text.split()) overlap = len(search_words & idea_words) if overlap >= 1: scored.append((overlap, idea)) scored.sort(key=lambda x: x[0], reverse=True) return [item for _, item in scored[:limit]] def _rfc_foundations(self, category: str, limit: int = 10) -> list[tuple[str, int]]: """Get most-referenced RFCs, optionally filtered by category.""" top_refs = self.db.top_referenced(ref_type="rfc", limit=limit * 2) if not category: return [(ref_id, count) for ref_id, count, _ in top_refs[:limit]] # Filter to RFCs referenced by drafts in this category category_lower = category.lower() pairs = self.db.drafts_with_ratings(limit=500) category_drafts = set() for draft, rating in pairs: for cat in rating.categories: if category_lower in cat.lower(): category_drafts.add(draft.name) if not category_drafts: return [(ref_id, count) for ref_id, count, _ in top_refs[:limit]] filtered = [] for ref_id, count, draft_names in top_refs: cat_count = sum(1 for d in draft_names if d in category_drafts) if cat_count > 0: filtered.append((ref_id, cat_count)) filtered.sort(key=lambda x: x[1], reverse=True) return filtered[:limit] def _similar_drafts(self, gap_desc: str, limit: int = 8) -> list[tuple[str, float]]: """Find semantically similar existing drafts via embeddings.""" all_embeddings = self.db.all_embeddings() if not all_embeddings: return [] # Try to embed the gap description via Ollama try: import ollama as ollama_lib client = ollama_lib.Client(host=self.config.ollama_url) resp = client.embed( model=self.config.ollama_embed_model, input=gap_desc[:8000], ) gap_vec = np.array(resp["embeddings"][0], dtype=np.float32) except Exception as e: console.print(f"[yellow]Ollama embedding failed, skipping similarity: {e}[/]") return [] similarities = [] for name, vec in all_embeddings.items(): sim = _cosine_similarity(gap_vec, vec) similarities.append((name, sim)) similarities.sort(key=lambda x: x[1], reverse=True) return similarities[:limit] def _top_rated_in_category(self, category: str, limit: int = 5) -> list[tuple]: """Get top-rated drafts in a category.""" pairs = self.db.drafts_with_ratings(limit=500) if not category: return [ (draft.name, draft.title, rating.composite_score) for draft, rating in pairs[:limit] ] category_lower = category.lower() matching = [] for draft, rating in pairs: for cat in rating.categories: if category_lower in cat.lower(): matching.append((draft.name, draft.title, rating.composite_score)) break return matching[:limit] def _wg_context(self) -> str: """Summarize WG adoption status.""" adoption = self.db.draft_adoption_status() wg_counts: dict[str, int] = {} adopted_count = 0 for d in adoption: if d["wg_adopted"]: adopted_count += 1 wg = d["wg_name"] wg_counts[wg] = wg_counts.get(wg, 0) + 1 total = len(adoption) if not wg_counts: return f"{total} drafts, none WG-adopted yet." top_wgs = sorted(wg_counts.items(), key=lambda x: x[1], reverse=True)[:5] wg_lines = ", ".join(f"{wg} ({n})" for wg, n in top_wgs) return f"{total} drafts, {adopted_count} WG-adopted. Top WGs: {wg_lines}" def _ecosystem_vision(self) -> str: """Load ecosystem vision document if it exists.""" vision_path = Path(self.config.data_dir) / "reports" / "holistic-agent-ecosystem-draft-outlines.md" if not vision_path.exists(): return "(No ecosystem vision document found)" text = vision_path.read_text() # Return the pitch section (compact) rather than the full document if "## 8. One-Page Pitch" in text: pitch = text.split("## 8. One-Page Pitch")[1].strip() return pitch[:2000] # Fallback: return the vision summary if "## 1. Vision Summary" in text: parts = text.split("## 1. Vision Summary")[1] if "## 2." in parts: parts = parts.split("## 2.")[0] return parts.strip()[:2000] return text[:2000] def _sibling_context(self, gap_topic: str) -> list[dict]: """Get outlines of sibling drafts from the same family.""" # Check all family drafts families = self.db.get_generated_drafts() if not families: return [] # Find which family this gap_topic belongs to topic_lower = gap_topic.lower() family_name = "" for gd in families: if topic_lower in gd.get("gap_topic", "").lower(): family_name = gd.get("family_name", "") break if not family_name: return [] siblings = self.db.get_family_drafts(family_name) result = [] for s in siblings: if s.get("gap_topic", "").lower() == topic_lower: continue # Skip self outline = {} if s.get("outline_json"): try: outline = json.loads(s["outline_json"]) if isinstance(s["outline_json"], str) else s["outline_json"] except (json.JSONDecodeError, TypeError): pass result.append({ "role": s.get("family_role", ""), "title": s.get("title", ""), "abstract": s.get("abstract", ""), "outline": outline, }) return result