"""Rating-related data access functions.""" from __future__ import annotations import json import re from collections import Counter, defaultdict from ietf_analyzer.db import Database def get_rating_distributions(db: Database) -> dict: """Return arrays for each rating dimension, suitable for Plotly.""" pairs = db.drafts_with_ratings(limit=1000) dims = { "novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": [], "scores": [], "categories": [], "names": [], "sources": [], } for draft, rating in pairs: dims["novelty"].append(rating.novelty) dims["maturity"].append(rating.maturity) dims["overlap"].append(rating.overlap) dims["momentum"].append(rating.momentum) dims["relevance"].append(rating.relevance) dims["scores"].append(round(rating.composite_score, 2)) dims["categories"].append(rating.categories[0] if rating.categories else "Other") dims["names"].append(draft.name) dims["sources"].append(getattr(draft, "source", "ietf") or "ietf") return dims def get_category_radar_data(db: Database) -> dict: """Return average rating profiles per category for radar chart.""" pairs = db.drafts_with_ratings(limit=1000) cat_ratings: dict[str, list] = defaultdict(list) for _, r in pairs: for c in r.categories: cat_ratings[c].append(r) top_cats = sorted(cat_ratings.keys(), key=lambda c: len(cat_ratings[c]), reverse=True)[:8] result = {} for cat in top_cats: ratings = cat_ratings[cat] n = len(ratings) result[cat] = { "count": n, "novelty": round(sum(r.novelty for r in ratings) / n, 2), "maturity": round(sum(r.maturity for r in ratings) / n, 2), "relevance": round(sum(r.relevance for r in ratings) / n, 2), "momentum": round(sum(r.momentum for r in ratings) / n, 2), "low_overlap": round(sum(6 - r.overlap for r in ratings) / n, 2), } return result def get_score_histogram(db: Database) -> list[float]: """Return list of composite scores for histogram.""" pairs = db.drafts_with_ratings(limit=1000) return [round(r.composite_score, 2) for _, r in pairs] def get_false_positive_profile(db: Database) -> dict: """Profile drafts flagged as false positives.""" # Get false positives fp_rows = db.false_positive_drafts_raw() # Get non-FP rated drafts for comparison nonfp_rows = db.non_false_positive_ratings_raw() total_rated = db.rated_count() total_drafts = db.count_drafts(include_false_positives=True) # Build FP list fp_list = [] fp_categories: Counter = Counter() fp_sources: Counter = Counter() fp_dims = {"novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": []} for row in fp_rows: cats = json.loads(row["r_categories"]) if row["r_categories"] else [] src = row["source"] or "ietf" fp_list.append({ "name": row["name"], "title": row["title"], "source": src, "categories": cats, "relevance": row["relevance"], "novelty": row["novelty"], "maturity": row["maturity"], "overlap": row["overlap"], "momentum": row["momentum"], "summary": row["summary"] or "", }) for cat in cats: fp_categories[cat] += 1 fp_sources[src] += 1 fp_dims["novelty"].append(row["novelty"]) fp_dims["maturity"].append(row["maturity"]) fp_dims["overlap"].append(row["overlap"]) fp_dims["momentum"].append(row["momentum"]) fp_dims["relevance"].append(row["relevance"]) # Non-FP dimensions for comparison nonfp_dims = {"novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": []} nonfp_categories: Counter = Counter() for row in nonfp_rows: nonfp_dims["novelty"].append(row["novelty"]) nonfp_dims["maturity"].append(row["maturity"]) nonfp_dims["overlap"].append(row["overlap"]) nonfp_dims["momentum"].append(row["momentum"]) nonfp_dims["relevance"].append(row["relevance"]) cats = json.loads(row["r_categories"]) if row["r_categories"] else [] for cat in cats: nonfp_categories[cat] += 1 # Top terms from FP abstracts from collections import Counter as _Counter stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "is", "it", "that", "this", "are", "was", "be", "as", "can", "may", "will", "not", "has", "have", "been", "which", "their", "its", "also", "such", "these", "would", "should", "could", "more", "other", "than", "into", "about", "between", "over", "after", "all", "one", "two", "new", "they", "we", "our", "each", "some", "any", "there", "what", "when", "how", "where", "who", "does", "do", "did", "no", "if", "so", "up", "out", "only", "used", "using", "use", "based", "through", "both", "well", "within", "must", "while", "had", "were", } word_counter: Counter = Counter() for row in fp_rows: abstract = (row["abstract"] or "").lower() title = (row["title"] or "").lower() text = abstract + " " + title words = re.findall(r'[a-z]{3,}', text) for w in words: if w not in stop_words: word_counter[w] += 1 top_terms = word_counter.most_common(30) return { "count": len(fp_list), "total_rated": total_rated, "total_drafts": total_drafts, "pct_of_total": round(100 * len(fp_list) / total_drafts, 1) if total_drafts else 0, "pct_of_rated": round(100 * len(fp_list) / total_rated, 1) if total_rated else 0, "fp_list": fp_list, "fp_categories": dict(fp_categories.most_common()), "fp_sources": dict(fp_sources.most_common()), "fp_dims": fp_dims, "nonfp_dims": nonfp_dims, "top_terms": top_terms, "nonfp_categories": dict(nonfp_categories.most_common(20)), }