Source code for olmo_tap.final_evals.elo.run_tournament

"""Tournament orchestrator: response cache → judges → Elo → artifacts.

Wires together the existing per-entrant response cache, the
LLM-judge batch pipeline, and the local permutation-averaged Elo
engine. Two run modes are supported:

  * ``pilot``: 50-prompt stratified subset judged by Sonnet, no
    prompt-cache pre-warming. Used as a cheap sanity gate before the
    headline run.
  * ``headline``: the full 143-prompt bank judged by Opus, with a
    1-query pre-warm batch per rubric so Anthropic's prompt cache is
    populated before the bulk batch goes out.

Output directory layout::

    runs/<mode>_<timestamp>/
    ├── manifest.json         # mode, model, prompt count, seeds, timestamps
    ├── matches/matches.jsonl # one record per (dimension, pair)
    ├── verdicts/{factuality,calibration,clinical_utility}.jsonl
    ├── elo_results.json      # mean ± SEM per entrant per dimension
    ├── elo_per_perm.npz      # full (n_perms × entrants) traces
    └── pairwise_winrates.csv # per-pair win/loss/tie counts

Usage::

    pixi run -e default python -m olmo_tap.final_evals.elo.run_tournament \\
        --config olmo_tap/final_evals/elo/configs/tournament1.yaml \\
        --mode pilot
"""

from __future__ import annotations

import argparse
import csv
import json
import logging
from collections import defaultdict
from dataclasses import asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable, Literal

import numpy as np
import yaml
from dotenv import load_dotenv

from olmo_tap.final_evals.elo.elo_engine import (
    DEFAULT_INITIAL_RATING,
    DEFAULT_K_SWEEP,
    EloResult,
    Match,
    compute_elo_permutation,
    k_factor_sweep,
)
from olmo_tap.final_evals.elo.judge import (
    DIMENSIONS,
    Dimension,
    JudgeConfig,
    Judgment,
    PairToJudge,
    Rubric,
    _judgment_as_dict,
    judge_pairs,
)
from olmo_tap.final_evals.elo.match_builder import (
    DEFAULT_PILOT_SEED,
    build_match_list,
    select_pilot_subset,
)
from olmo_tap.final_evals.elo.types import (
    GeneratedResponse,
    load_prompt_bank,
)


logger = logging.getLogger(__name__)

Mode = Literal["pilot", "headline"]
MODES: tuple[Mode, ...] = ("pilot", "headline")


def _now_stamp() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #


[docs] def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Run a configuration-level Elo tournament against the cached " "responses produced by the generate pipeline." ), ) parser.add_argument( "--config", type=Path, required=True, help="Path to a tournament YAML config (see configs/tournament1.yaml).", ) parser.add_argument( "--mode", type=str, choices=MODES, required=True, help=( "`pilot` runs Sonnet against the 50-prompt subset; `headline` " "runs Opus against the full bank with rubric-cache pre-warming." ), ) parser.add_argument( "--output-dir", type=Path, default=None, help=( "Directory for run artifacts. Defaults to " "`olmo_tap/final_evals/elo/runs/<mode>_<timestamp>`." ), ) parser.add_argument( "--judge-model", type=str, default=None, help="Override the judge model id selected by --mode.", ) parser.add_argument( "--n-perms", type=int, default=None, help="Override `elo.n_perms` from the config.", ) parser.add_argument( "--limit-pairs", type=int, default=None, help=( "Cap the per-dimension pair list at the first N entries. " "Diagnostic aid only — leave unset for normal runs." ), ) parser.add_argument( "--skip-judging", action="store_true", help=( "Skip the judge step entirely; useful for re-running the Elo " "math from already-populated verdict caches." ), ) return parser.parse_args(argv)
# --------------------------------------------------------------------------- # # Config + cache loading # --------------------------------------------------------------------------- #
[docs] def load_config(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as fh: return yaml.safe_load(fh)
[docs] def load_response_cache( cache_dir: Path, entrants: Iterable[str] ) -> dict[tuple[str, str], GeneratedResponse]: """Load every per-entrant ``responses_<entrant>.jsonl`` into a flat dict. Missing per-entrant files are reported and skipped; downstream :func:`match_builder.build_match_list` decides whether to abort by counting how many ``(entrant, prompt)`` cells are present. """ cache: dict[tuple[str, str], GeneratedResponse] = {} for eid in entrants: path = cache_dir / f"responses_{eid}.jsonl" if not path.exists(): logger.error("Missing response cache for entrant %s at %s", eid, path) continue with path.open("r", encoding="utf-8") as fh: for raw_line in fh: line = raw_line.strip() if not line: continue record = json.loads(line) cache[(eid, record["prompt_id"])] = GeneratedResponse(**record) return cache
# --------------------------------------------------------------------------- # # On-disk artifact writers # --------------------------------------------------------------------------- #
[docs] def write_manifest( out_dir: Path, *, mode: Mode, config_path: Path, judge_model: str, entrants: list[str], bank_size: int, selected_size: int, n_perms: int, seed: int, pilot_seed: int, started_at: str, finished_at: str | None, ) -> None: payload = { "mode": mode, "config_path": str(config_path), "judge_model": judge_model, "entrants": entrants, "bank_size": bank_size, "selected_prompt_count": selected_size, "n_perms": n_perms, "shuffle_seed": seed, "pilot_seed": pilot_seed, "started_at": started_at, "finished_at": finished_at, } (out_dir / "manifest.json").write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8" )
[docs] def write_matches( out_dir: Path, matches_by_dim: dict[Dimension, list[PairToJudge]] ) -> None: matches_dir = out_dir / "matches" matches_dir.mkdir(parents=True, exist_ok=True) path = matches_dir / "matches.jsonl" with path.open("w", encoding="utf-8") as fh: for dimension, pairs in matches_by_dim.items(): for pair in pairs: row = {"dimension": dimension, **asdict(pair)} fh.write(json.dumps(row, ensure_ascii=False)) fh.write("\n")
[docs] def write_verdicts( out_dir: Path, dimension: Dimension, judgments: list[Judgment] ) -> None: verdicts_dir = out_dir / "verdicts" verdicts_dir.mkdir(parents=True, exist_ok=True) path = verdicts_dir / f"{dimension}.jsonl" with path.open("w", encoding="utf-8") as fh: for j in judgments: fh.write(json.dumps(_judgment_as_dict(j), ensure_ascii=False)) fh.write("\n")
# --------------------------------------------------------------------------- # # Reconciliation: Judgment → Elo Match # --------------------------------------------------------------------------- #
[docs] def judgments_to_matches(judgments: Iterable[Judgment]) -> list[Match]: """Drop ties / inconsistent verdicts; return ``(a, b, winner)`` triples.""" out: list[Match] = [] for j in judgments: if j.winner is None: continue out.append((j.entrant_a, j.entrant_b, j.winner)) return out
[docs] def pairwise_winrates( judgments: Iterable[Judgment], ) -> dict[tuple[str, str], dict[str, int]]: """Aggregate per-pair counts. ``(a, b)`` keys are sorted lexicographically so ``(A, B)`` and ``(B, A)`` collapse onto one row. """ counts: dict[tuple[str, str], dict[str, int]] = defaultdict( lambda: {"a_wins": 0, "b_wins": 0, "ties": 0, "inconsistent": 0} ) for j in judgments: a, b = sorted((j.entrant_a, j.entrant_b)) cell = counts[(a, b)] if j.inconsistent: cell["inconsistent"] += 1 cell["ties"] += 1 continue if j.winner is None: cell["ties"] += 1 elif j.winner == a: cell["a_wins"] += 1 else: cell["b_wins"] += 1 return counts
[docs] def write_pairwise_winrates_csv( out_dir: Path, by_dim: dict[Dimension, dict[tuple[str, str], dict[str, int]]], ) -> None: path = out_dir / "pairwise_winrates.csv" with path.open("w", encoding="utf-8", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "dimension", "entrant_a", "entrant_b", "a_wins", "b_wins", "ties", "inconsistent", ] ) for dimension, pairs in by_dim.items(): for (a, b), cell in sorted(pairs.items()): writer.writerow( [ dimension, a, b, cell["a_wins"], cell["b_wins"], cell["ties"], cell["inconsistent"], ] )
# --------------------------------------------------------------------------- # # Elo result serialisation # --------------------------------------------------------------------------- # def _elo_result_to_dict(result: EloResult) -> dict[str, float | str]: return { "entrant_id": result.entrant_id, "mean": result.mean, "sem": result.sem, "ci95_low": result.ci95_low, "ci95_high": result.ci95_high, }
[docs] def write_elo_results( out_dir: Path, *, primary: dict[Dimension, dict[str, EloResult]], sweep: dict[Dimension, dict[float, dict[str, EloResult]]], n_matches: dict[Dimension, int], default_k: float, ) -> None: payload: dict[str, Any] = { "default_k": default_k, "primary": { dim: {eid: _elo_result_to_dict(r) for eid, r in res.items()} for dim, res in primary.items() }, "k_sensitivity": { dim: { str(k): {eid: _elo_result_to_dict(r) for eid, r in inner.items()} for k, inner in by_k.items() } for dim, by_k in sweep.items() }, "n_decisive_matches": dict(n_matches), } (out_dir / "elo_results.json").write_text( json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8" )
[docs] def write_elo_per_perm( out_dir: Path, primary: dict[Dimension, dict[str, EloResult]] ) -> None: """Persist the full ``(n_perms × entrants)`` rating array per dimension. Saved as a single ``.npz`` archive with two keys per dimension: ``<dim>__data`` (float64, shape ``(n_perms, n_entrants)``) and ``<dim>__entrants`` (string array of column ids). """ arrays: dict[str, np.ndarray] = {} for dimension, results in primary.items(): if not results: continue eids = sorted(results.keys()) stacked = np.stack([results[eid].per_perm_ratings for eid in eids], axis=1) arrays[f"{dimension}__data"] = stacked arrays[f"{dimension}__entrants"] = np.array(eids) if arrays: np.savez(out_dir / "elo_per_perm.npz", **arrays)
# --------------------------------------------------------------------------- # # Stdout summary # --------------------------------------------------------------------------- # # --------------------------------------------------------------------------- # # Pre-warming (headline mode only) # --------------------------------------------------------------------------- # def _prewarm_rubric_cache( pairs: list[PairToJudge], rubric: Rubric, config: JudgeConfig, ) -> None: """Submit a tiny pre-warm batch so Anthropic's prompt cache is populated with the rubric prefix before the bulk batch goes out. Used only in headline mode — the pilot is fast enough that the wait on a pre-warm batch outweighs the cache hit savings. """ if not pairs: return logger.info("Pre-warming prompt cache for %s with 1-pair batch", rubric.dimension) judge_pairs(pairs[:1], rubric, config) # --------------------------------------------------------------------------- # # Main flow # --------------------------------------------------------------------------- #
[docs] def main(argv: list[str] | None = None) -> None: args = parse_args(argv) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) # Pick up ANTHROPIC_API_KEY from .env so the judge module finds it via # os.environ. Mirrors the convention in olmo_tap.constants. load_dotenv() config = load_config(args.config) mode: Mode = args.mode judge_cfg = config["judge"] elo_cfg = config["elo"] rubrics_cfg = config["rubrics"] caches_cfg = config["caches"] judge_model = args.judge_model or ( judge_cfg["pilot_model"] if mode == "pilot" else judge_cfg["headline_model"] ) n_perms = args.n_perms if args.n_perms is not None else int(elo_cfg["n_perms"]) shuffle_seed = int(elo_cfg.get("shuffle_seed", 0)) initial_rating = float(elo_cfg.get("initial_rating", DEFAULT_INITIAL_RATING)) default_k = float(elo_cfg.get("default_k", 16)) k_sweep = tuple(elo_cfg.get("k_factor_sweep", DEFAULT_K_SWEEP)) runs_default = Path("olmo_tap/final_evals/elo/runs") if args.output_dir is None: out_dir = runs_default / f"{mode}_{_now_stamp()}" else: out_dir = args.output_dir out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "matches").mkdir(parents=True, exist_ok=True) (out_dir / "verdicts").mkdir(parents=True, exist_ok=True) started_at = _now_iso() logger.info("Run start: mode=%s out=%s judge=%s", mode, out_dir, judge_model) bank_path = Path(config["prompts"]["bank_path"]) full_bank = load_prompt_bank(bank_path) bank_size = len(full_bank) entrants = [e["entrant_id"] for e in config["entrants"]] response_cache = load_response_cache(Path(caches_cfg["responses_dir"]), entrants) expected = len(entrants) * len(full_bank) if len(response_cache) < expected: logger.warning( "Response cache covers %d/%d (entrant, prompt) cells. " "Missing rows will be dropped from the match list.", len(response_cache), expected, ) if mode == "pilot": bank = select_pilot_subset(full_bank) logger.info("Pilot subset: %d prompts", len(bank)) else: bank = full_bank matches_by_dim = build_match_list( bank, response_cache, entrants, dimensions=DIMENSIONS ) if args.limit_pairs is not None: matches_by_dim = { d: pairs[: args.limit_pairs] for d, pairs in matches_by_dim.items() } write_matches(out_dir, matches_by_dim) for dim, pairs in matches_by_dim.items(): logger.info( "Dimension %s: %d unordered pairs (≈ %d judge queries)", dim, len(pairs), 2 * len(pairs), ) # Initial manifest — finished_at gets filled in at the end. write_manifest( out_dir, mode=mode, config_path=args.config, judge_model=judge_model, entrants=entrants, bank_size=bank_size, selected_size=len(bank), n_perms=n_perms, seed=shuffle_seed, pilot_seed=DEFAULT_PILOT_SEED, started_at=started_at, finished_at=None, ) # ---- Judging ---- judge_cache_dir = Path(caches_cfg["judgments_dir"]) judge_cache_dir.mkdir(parents=True, exist_ok=True) j_config = JudgeConfig(judge_model=judge_model, cache_dir=judge_cache_dir) judgments_by_dim: dict[Dimension, list[Judgment]] = {} if args.skip_judging: logger.warning("--skip-judging set; no judging performed this run.") for dimension in DIMENSIONS: judgments_by_dim[dimension] = [] else: for dimension in DIMENSIONS: pairs = matches_by_dim.get(dimension, []) if not pairs: judgments_by_dim[dimension] = [] continue rubric = Rubric.load(dimension, Path(rubrics_cfg[dimension]["path"])) if mode == "headline": _prewarm_rubric_cache(pairs, rubric, j_config) logger.info( "Judging %d pairs on %s with %s", len(pairs), dimension, judge_model ) result = judge_pairs(pairs, rubric, j_config) judgments_by_dim[dimension] = result.judgments write_verdicts(out_dir, dimension, result.judgments) stats = result.cache_stats logger.info( "Dimension %s done: cache_hits=%d fresh=%d " "cache_creation=%d cache_read=%d input=%d output=%d", dimension, stats.cache_hits, stats.fresh_calls, stats.cache_creation_input_tokens, stats.cache_read_input_tokens, stats.input_tokens, stats.output_tokens, ) # ---- Elo ---- primary: dict[Dimension, dict[str, EloResult]] = {} sweep: dict[Dimension, dict[float, dict[str, EloResult]]] = {} n_matches: dict[Dimension, int] = {} pairwise_by_dim: dict[Dimension, dict[tuple[str, str], dict[str, int]]] = {} for dimension in DIMENSIONS: judgments = judgments_by_dim.get(dimension, []) elo_matches = judgments_to_matches(judgments) n_matches[dimension] = len(elo_matches) pairwise_by_dim[dimension] = pairwise_winrates(judgments) if not elo_matches: logger.warning( "Dimension %s: no decisive matches; skipping Elo.", dimension ) primary[dimension] = {} sweep[dimension] = {} continue primary[dimension] = compute_elo_permutation( elo_matches, k=default_k, initial_rating=initial_rating, n_perms=n_perms, seed=shuffle_seed, ) sweep[dimension] = k_factor_sweep( elo_matches, k_values=tuple(int(k) for k in k_sweep), initial_rating=initial_rating, n_perms=n_perms, seed=shuffle_seed, ) write_elo_results( out_dir, primary=primary, sweep=sweep, n_matches=n_matches, default_k=default_k, ) write_elo_per_perm(out_dir, primary) write_pairwise_winrates_csv(out_dir, pairwise_by_dim) print_summary_table(primary, n_matches) write_manifest( out_dir, mode=mode, config_path=args.config, judge_model=judge_model, entrants=entrants, bank_size=bank_size, selected_size=len(bank), n_perms=n_perms, seed=shuffle_seed, pilot_seed=DEFAULT_PILOT_SEED, started_at=started_at, finished_at=_now_iso(), ) logger.info("Run complete: %s", out_dir)
if __name__ == "__main__": main()