Add DB indexes and extract shared query methods to Database class

Add missing indexes on ratings(false_positive), drafts(source), and
draft_authors(person_id) for faster filtering. Extract 12 shared query
methods (false_positive_drafts_raw, non_false_positive_ratings_raw,
false_positive_names, rated_count, gap_count, search_gaps, search_authors,
draft_affiliation_pairs, all_persons_info, category_counts,
draft_author_count_map, source_counts) to eliminate duplicated SQL across
cli.py, data.py, and reports.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 03:33:18 +01:00
parent 4710668419
commit c066b04d74
4 changed files with 141 additions and 168 deletions

View File

@@ -135,6 +135,7 @@ CREATE TABLE IF NOT EXISTS draft_refs (
);
CREATE INDEX IF NOT EXISTS idx_draft_refs_ref ON draft_refs(ref_type, ref_id);
CREATE INDEX IF NOT EXISTS idx_draft_authors_person ON draft_authors(person_id);
-- Generated drafts from gap-to-draft pipeline
CREATE TABLE IF NOT EXISTS generated_drafts (
@@ -303,6 +304,10 @@ class Database:
if "novelty_score" not in idea_cols:
self._conn.execute("ALTER TABLE ideas ADD COLUMN novelty_score INTEGER")
# Create indexes on columns that may have been added via migration
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_ratings_false_positive ON ratings(false_positive)")
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_drafts_source ON drafts(source)")
self._conn.commit()
def close(self) -> None:
@@ -927,6 +932,107 @@ class Database:
"category": r["category"], "evidence": r["evidence"],
"severity": r["severity"]} for r in rows]
def gap_count(self) -> int:
return self.conn.execute("SELECT COUNT(*) FROM gaps").fetchone()[0]
def search_gaps(self, query: str, limit: int = 50) -> list[dict]:
"""Search gaps by topic or description (LIKE match)."""
like = f"%{query}%"
rows = self.conn.execute(
"""SELECT id, topic, description, category, severity FROM gaps
WHERE topic LIKE ? OR description LIKE ?
ORDER BY id LIMIT ?""",
(like, like, limit),
).fetchall()
return [{"id": r["id"], "topic": r["topic"],
"description": (r["description"] or "")[:200],
"category": r["category"], "severity": r["severity"]}
for r in rows]
# --- Shared query helpers ---
def rated_count(self) -> int:
"""Return total number of rated drafts (including false positives)."""
return self.conn.execute("SELECT COUNT(*) FROM ratings").fetchone()[0]
def false_positive_drafts_raw(self) -> list[sqlite3.Row]:
"""Return raw rows of drafts flagged as false positives, joined with ratings."""
return self.conn.execute(
"""SELECT d.*, r.novelty, r.maturity, r.overlap, r.momentum, r.relevance,
r.summary, r.categories as r_categories, r.false_positive
FROM drafts d
JOIN ratings r ON d.name = r.draft_name
WHERE r.false_positive = 1
ORDER BY d.name"""
).fetchall()
def non_false_positive_ratings_raw(self) -> list[sqlite3.Row]:
"""Return raw rating rows for non-false-positive drafts."""
return self.conn.execute(
"""SELECT r.novelty, r.maturity, r.overlap, r.momentum, r.relevance,
r.categories as r_categories
FROM ratings r
WHERE COALESCE(r.false_positive, 0) = 0"""
).fetchall()
def false_positive_names(self) -> set[str]:
"""Return set of draft names flagged as false positives."""
return {r[0] for r in self.conn.execute(
"SELECT draft_name FROM ratings WHERE false_positive = 1").fetchall()}
def draft_affiliation_pairs(self) -> list[tuple[str, str]]:
"""Return (draft_name, affiliation) for all draft_authors with affiliation."""
rows = self.conn.execute(
"SELECT da.draft_name, a.affiliation FROM draft_authors da "
"JOIN authors a ON da.person_id = a.person_id "
"WHERE a.affiliation != ''"
).fetchall()
return [(r[0], r[1]) for r in rows]
def all_persons_info(self) -> list[tuple[int, str, str]]:
"""Return (person_id, name, affiliation) for all authors."""
rows = self.conn.execute(
"SELECT person_id, name, affiliation FROM authors"
).fetchall()
return [(r[0], r[1], r[2]) for r in rows]
def search_authors(self, query: str, limit: int = 50) -> list[dict]:
"""Search authors by name or affiliation (LIKE match)."""
like = f"%{query}%"
rows = self.conn.execute(
"""SELECT person_id, name, affiliation FROM authors
WHERE name LIKE ? OR affiliation LIKE ?
ORDER BY name LIMIT ?""",
(like, like, limit),
).fetchall()
return [{"person_id": r["person_id"], "name": r["name"],
"affiliation": r["affiliation"] or ""}
for r in rows]
def category_counts(self) -> dict[str, int]:
"""Return {category: draft_count} from rated non-FP drafts."""
from collections import Counter
pairs = self.drafts_with_ratings(limit=2000)
counts: Counter = Counter()
for _, rating in pairs:
for cat in rating.categories:
counts[cat] += 1
return dict(counts.most_common())
def draft_author_count_map(self) -> dict[str, int]:
"""Return {draft_name: author_count} for all drafts."""
rows = self.conn.execute(
"SELECT draft_name, COUNT(*) as cnt FROM draft_authors GROUP BY draft_name"
).fetchall()
return {r[0]: r[1] for r in rows}
def source_counts(self) -> list[tuple[str, int]]:
"""Return [(source, count)] ordered by count desc."""
rows = self.conn.execute(
"SELECT source, COUNT(*) as cnt FROM drafts GROUP BY source ORDER BY cnt DESC"
).fetchall()
return [(r[0], r[1]) for r in rows]
# --- Proposals ---
def all_proposals(self) -> list[dict]: