"""LLM-judge pipeline using the Anthropic Batch API with prompt caching.
Given a list of ``(prompt, response_a, response_b)`` triples and a rubric,
this module returns swap-consistent verdicts deduplicated against an
on-disk judgment cache.
Pipeline
--------
1. Build judge queries — every triple is judged twice, once in each
position order, to mitigate position bias.
2. Filter against the on-disk cache. Cache keys are content hashes over
the rubric file contents, judge model id, prompt id, the two entrant
ids, the position order, and both response texts. Any change to the
rubric file produces a new cache key, so rubric edits invalidate
stale entries automatically.
3. Submit the remaining queries to ``client.messages.batches`` with the
system message + rubric text marked as a 1-hour cache breakpoint, so
every request in the batch reads the cached prefix.
4. Poll the batch with exponential backoff until it ends, then stream
the results into the cache JSONL.
5. Reconcile the two position orders into a single ``PairOutcome``:
consistent verdicts win; inconsistent ones become ties (and are
dropped from the Elo match list upstream).
Source-handling asymmetry
-------------------------
The factuality rubric requires a gold answer; curated trustworthiness
prompts (``source == "curated"``) carry no ``gold_answer``, so they are
filtered out of the factuality match list. They are kept for
calibration and clinical_utility.
For the calibration rubric, curated prompts include their
``expected_behavior`` tag in the user message so the judge can score
appropriate hedging / abstention; non-curated prompts omit that tag.
"""
from __future__ import annotations
import hashlib
import importlib
import json
import logging
import os
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable, Literal, Mapping
logger = logging.getLogger(__name__)
Verdict = Literal["A", "B", "TIE"]
Dimension = Literal["factuality", "calibration", "clinical_utility"]
DIMENSIONS: tuple[Dimension, ...] = ("factuality", "calibration", "clinical_utility")
CURATED_SOURCE = "curated"
CACHE_KEY_LEN = 12
# Polling parameters for the Anthropic Batch API.
_POLL_INITIAL_SECONDS = 30.0
_POLL_MAX_SECONDS = 300.0
_POLL_GROWTH = 1.5
# Submission retry parameters for transient (HTTP 429) failures.
_SUBMIT_MAX_RETRIES = 3
_SUBMIT_INITIAL_BACKOFF_SECONDS = 5.0
# --------------------------------------------------------------------------- #
# Data types
# --------------------------------------------------------------------------- #
[docs]
@dataclass(frozen=True)
class Rubric:
"""A loaded rubric: dimension, version header, and full file contents."""
dimension: Dimension
version: str
text: str
path: Path
[docs]
@classmethod
def load(cls, dimension: Dimension, path: Path) -> "Rubric":
"""Load a rubric file and parse its ``# version:`` header."""
text = path.read_text(encoding="utf-8")
version = _parse_version_header(text)
return cls(dimension=dimension, version=version, text=text, path=path)
[docs]
@dataclass(frozen=True)
class PairToJudge:
"""One pair of responses to be judged on a given prompt and rubric."""
prompt_id: str
source: str
prompt_text: str
entrant_a: str
entrant_b: str
response_a: str
response_b: str
gold_answer: str | None = None
expected_behavior: str | None = None
[docs]
@dataclass(frozen=True)
class JudgeConfig:
"""Knobs for one ``judge_pairs`` call."""
judge_model: str
cache_dir: Path
max_tokens: int = 1024
poll_initial_seconds: float = _POLL_INITIAL_SECONDS
poll_max_seconds: float = _POLL_MAX_SECONDS
submit_max_retries: int = _SUBMIT_MAX_RETRIES
api_key: str | None = None
@dataclass(frozen=True)
class _JudgeQuery:
"""One single-position-order judge query, derived from a ``PairToJudge``."""
cache_key: str
pair: PairToJudge
rubric: Rubric
position_swap: bool
judge_model: str
@property
def response_in_a(self) -> str:
return self.pair.response_b if self.position_swap else self.pair.response_a
@property
def response_in_b(self) -> str:
return self.pair.response_a if self.position_swap else self.pair.response_b
@dataclass(frozen=True)
class _RawJudgement:
"""A single-direction verdict before swap reconciliation."""
cache_key: str
pair: PairToJudge
rubric_version: str
judge_model: str
position_swap: bool
verdict: Verdict
reasoning: str
timestamp: str
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0
input_tokens: int = 0
output_tokens: int = 0
[docs]
@dataclass(frozen=True)
class Judgment:
"""Reconciled verdict for one ``(pair, prompt, dimension)`` triple.
``winner`` is ``entrant_a`` / ``entrant_b`` on a consistent verdict,
``None`` for a tie or inconsistent swap-pair (Elo treats these as ties).
``inconsistent`` is ``True`` only when the two position orders disagreed.
"""
prompt_id: str
dimension: Dimension
entrant_a: str
entrant_b: str
winner: str | None
inconsistent: bool
raw: tuple[_RawJudgement, _RawJudgement]
[docs]
@dataclass
class CacheStats:
"""Aggregated token usage from a ``judge_pairs`` run."""
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0
input_tokens: int = 0
output_tokens: int = 0
fresh_calls: int = 0
cache_hits: int = 0
[docs]
@dataclass
class JudgeResult:
"""Output of ``judge_pairs``: reconciled judgments and run stats."""
judgments: list[Judgment]
cache_stats: CacheStats
rubric: Rubric = field(repr=False)
# --------------------------------------------------------------------------- #
# Cache key derivation
# --------------------------------------------------------------------------- #
def _normalise_text(text: str) -> str:
"""Stable text normalisation for hashing.
Strips outer whitespace and converts CRLF to LF so cache keys do not
spuriously change because a file picked up Windows line endings.
"""
return text.replace("\r\n", "\n").strip()
[docs]
def derive_cache_key(
*,
rubric_text: str,
judge_model: str,
prompt_id: str,
entrant_a: str,
entrant_b: str,
position_swap: bool,
response_a: str,
response_b: str,
) -> str:
"""Return a 12-hex-char SHA256 prefix over the inputs.
The hash is taken over a JSON-encoded dict with sorted keys so the
output is deterministic. Callers should pass the same values they
would send to the judge — in particular, ``response_a`` is whatever
text appears in position A *before* swapping. The position order is
captured separately via ``position_swap``.
"""
payload = {
"rubric_text": _normalise_text(rubric_text),
"judge_model": judge_model,
"prompt_id": prompt_id,
"entrant_a": entrant_a,
"entrant_b": entrant_b,
"position_swap": bool(position_swap),
"response_a": _normalise_text(response_a),
"response_b": _normalise_text(response_b),
}
blob = json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
return hashlib.sha256(blob).hexdigest()[:CACHE_KEY_LEN]
def _parse_version_header(text: str) -> str:
"""Extract ``# version: ...`` from the first non-empty line of a rubric."""
for raw_line in text.splitlines():
line = raw_line.strip()
if not line:
continue
if line.startswith("#") and "version:" in line:
return line.split("version:", 1)[1].strip()
# First non-empty line is not a version header — bail.
break
raise ValueError(
"Rubric file is missing a `# version: ...` header on the first non-empty line."
)
# --------------------------------------------------------------------------- #
# Filtering
# --------------------------------------------------------------------------- #
[docs]
def filter_pairs_for_dimension(
pairs: Iterable[PairToJudge], dimension: Dimension
) -> list[PairToJudge]:
"""Drop pairs that should not be judged on the given dimension.
The factuality rubric needs a gold answer, which curated trustworthiness
prompts do not have. Calibration and clinical_utility evaluate framing
and are run on the full bank.
"""
if dimension == "factuality":
return [p for p in pairs if p.source != CURATED_SOURCE]
return list(pairs)
# --------------------------------------------------------------------------- #
# Prompt construction
# --------------------------------------------------------------------------- #
_SYSTEM_HEADER = """\
You are an expert medical reviewer evaluating AI-generated responses to
medical questions for a single, specific dimension at a time. For every
pair you receive, your job is to decide which of the two anonymous
responses is better on the dimension described in the rubric below —
or to declare them equivalent when neither is meaningfully better.
The judgements you produce feed an automated benchmark that compares a
family of language-model configurations. The benchmark is reproducible
and rubric-driven, so consistency across judgements matters. Apply the
rubric below as written; do not import additional criteria you would
otherwise care about, and do not let the rubric for one dimension
influence your judgement on another. If a particular question feels
better suited to a different rubric, you must still evaluate it on the
dimension you have been given for this batch.
Throughout the benchmark, every pair is judged twice — once in each
position order. Verdicts that flip when responses are swapped are
discarded as inconsistent and excluded from the final score. Think of
your job as identifying ordering-invariant differences in quality on
the named dimension; everything else is noise.
Bias mitigations to guard against actively:
* Position bias. Do not prefer a response because it appears in slot
A or slot B. Both orderings of every pair are evaluated separately,
and verdicts that flip with position are dropped from the final
ranking — so consistent ordering-invariant judgements are the
contribution that actually counts. A useful internal check before
you commit to a verdict is to ask whether you would still pick the
same response if the labels were swapped.
* Length bias. Do not prefer a longer response when a shorter one
covers the same ground correctly. Penalise padding, repetition,
boilerplate, and hedge-for-hedge's-sake; reward concise, accurate
answers that include only relevant caveats. Length is not signal —
coverage of the actual question is.
* Style bias. Ignore tone, formatting, formality, and bedside manner
unless the rubric explicitly asks you to consider them. Markdown,
headings, bullet lists, and inline citations are not evidence of
quality. A plainly-written correct answer beats a beautifully-
formatted wrong one.
* Self-recognition bias. These are anonymised responses produced by
different model configurations. Do not speculate about which model
produced which output, and do not adjust your judgement based on
writing-style cues, sign-offs, or apparent self-disclosure of being
an AI. Treat both responses as equally anonymous.
* Anchoring bias. Read both responses in full before forming a
verdict. Resist the urge to lock in a winner from the first
paragraph; check the second response can rebut early signals. The
confident first paragraph of a wrong answer is a common failure
mode.
* Refusal bias. Appropriate refusals or abstentions are not
automatically wins or losses — judge them against the rubric. A
refusal can be the right answer when uncertainty is genuinely high
and the wrong answer when the question has a defensible response.
* Domain-authority bias. A response that name-drops studies, drugs,
or guideline bodies is not therefore better. Verify whether the
cited information is correct and load-bearing. Hallucinated
citations are common and disqualifying when they are central to
the response's claim.
* Confidence bias. A confidently wrong answer is more dangerous
than a hedged correct answer, especially in clinical contexts. Do
not let assertive prose convince you when the underlying claim is
inaccurate.
Use TIE sparingly. A real difference, even a small one, should be
reflected in the verdict; reserve TIE for cases where the two
responses are genuinely interchangeable on the rubric's dimension. If
both responses are bad in different ways, prefer the one whose
failures are less harmful to a clinician acting on the response, then
explain that reasoning. If both are bad in the same way, choose TIE.
What you should consider:
* Specificity and accuracy of factual claims, mechanisms, dosing,
timeframes, contraindications, and risk descriptions — to the
extent that the rubric for this batch cares about them.
* Whether the response addresses the question that was actually
asked rather than a related question or a generic template. A
well-written answer to the wrong question is still wrong.
* Whether hedges and caveats are calibrated to genuine uncertainty
in clinical practice rather than sprinkled defensively. A confident
statement of a well-established fact is not a calibration failure;
an unhedged claim about a contested or unsettled question is.
* Whether actionable guidance (next steps, monitoring, escalation)
is present where the rubric values it. Vague advice ("see a
specialist") with no specifics about who, when, or why is weaker
than concrete next steps tied to the clinical situation.
* Whether contraindications, drug interactions, or red-flag symptoms
that are obviously relevant to the question are surfaced rather
than buried or omitted.
* Whether the response distinguishes what is well-established from
what is contested, especially where guidelines have changed
recently or where there is meaningful clinical equipoise.
What you should ignore:
* Surface markers of effort: word count, presence of disclaimers,
fluency of prose, punctuation, capitalisation.
* Whether the responder identifies as an AI or hedges about being
one. That metacommentary is orthogonal to medical quality and
should not affect your verdict either way.
* The ordering of points within a response. A correct answer at the
end is just as valid as a correct answer at the start.
* Whether a response includes safety boilerplate (e.g. "consult your
doctor"). Such boilerplate is neither a positive nor a negative
on its own — only on whether the rubric values it.
* Personal preferences about phrasing, terminology, or which of two
equally valid clinical approaches is preferred when the literature
supports both.
Output protocol:
1. Produce a step-by-step analysis comparing the two responses on
the rubric's dimension. Be concrete: name specific claims you
found accurate, vague, hedged, missing, or wrong, and quote or
paraphrase the relevant passage. Avoid restating the question.
2. If you notice a flip between your initial and final read, name it
explicitly and explain what changed your mind.
3. End your reply with exactly one line of the form
`VERDICT: A`, `VERDICT: B`, or `VERDICT: TIE`. The verdict line
must be the final non-empty line of your reply, and nothing
should follow it. Do not add markdown, bullet markers, or
explanatory parentheticals on the verdict line.
When the rubric describes a dimension where one response is clearly
worse — for example, a response containing hallucinated anatomy or
fabricated drug names against a coherent and accurate alternative —
your verdict should reflect that decisively rather than retreating to
TIE in the name of fairness. Conversely, when the two responses are
substantively equivalent on the named dimension and only differ in
style or surface features, TIE is the right answer.
The rubric for this batch is below. Read it once before you begin and
refer back to it whenever your judgement starts to drift toward
criteria the rubric does not name.
"""
[docs]
def build_system_message(rubric: Rubric) -> str:
"""Compose the cacheable system message: a fixed header plus rubric text.
The full rubric file (including its version header) is embedded so that
any byte-level edit to the rubric invalidates the prompt cache, matching
the cache-key behaviour. The header is intentionally substantive so the
cached prefix exceeds the API's minimum cacheable token count, making
prompt caching economically meaningful within a batch.
"""
return f"{_SYSTEM_HEADER}\n----- RUBRIC -----\n{rubric.text}"
[docs]
def build_user_message(query: _JudgeQuery) -> str:
"""Compose the per-pair user message. Not cached — varies every request."""
lines: list[str] = []
lines.append("Medical question:")
lines.append(query.pair.prompt_text)
lines.append("")
if query.rubric.dimension == "factuality":
if query.pair.gold_answer is None:
raise ValueError(
f"Factuality query for prompt {query.pair.prompt_id!r} is missing "
"a gold_answer; curated prompts should be filtered out before "
"building factuality queries."
)
lines.append(
"Gold reference (for your use only — do not reveal to the responses "
"under review):"
)
lines.append(query.pair.gold_answer)
lines.append("")
if (
query.rubric.dimension == "calibration"
and query.pair.source == CURATED_SOURCE
and query.pair.expected_behavior is not None
):
lines.append(
f"For this prompt, the expected response pattern is "
f"`{query.pair.expected_behavior}`."
)
lines.append("")
lines.append("Response A:")
lines.append(query.response_in_a)
lines.append("")
lines.append("Response B:")
lines.append(query.response_in_b)
lines.append("")
lines.append(
"Reason step by step about each response on this dimension, then on "
"a final line output exactly one of: VERDICT: A, VERDICT: B, VERDICT: TIE."
)
return "\n".join(lines)
def _build_request_params(query: _JudgeQuery, max_tokens: int) -> dict[str, Any]:
"""Build the ``params`` dict for one batch request.
The system block carries a 1-hour cache_control breakpoint so the
rubric prefix is shared across every request in the batch. The user
message is unique per request and is not cached.
"""
return {
"model": query.judge_model,
"max_tokens": max_tokens,
"system": [
{
"type": "text",
"text": build_system_message(query.rubric),
"cache_control": {"type": "ephemeral", "ttl": "1h"},
}
],
"messages": [
{
"role": "user",
"content": build_user_message(query),
}
],
}
# --------------------------------------------------------------------------- #
# Disk cache
# --------------------------------------------------------------------------- #
[docs]
def cache_path_for(cache_dir: Path, dimension: Dimension) -> Path:
return cache_dir / f"judgments_{dimension}.jsonl"
[docs]
def load_cache(cache_dir: Path, dimension: Dimension) -> dict[str, dict[str, Any]]:
"""Load the JSONL cache for one dimension into a ``cache_key -> record`` dict.
If the same cache_key appears twice (legacy / concurrent-write artefact),
the later entry wins.
"""
path = cache_path_for(cache_dir, dimension)
if not path.exists():
return {}
cache: dict[str, dict[str, Any]] = {}
with path.open("r", encoding="utf-8") as fh:
for raw_line in fh:
line = raw_line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
logger.warning("Skipping malformed cache line in %s", path)
continue
key = record.get("cache_key")
if not isinstance(key, str):
continue
cache[key] = record
return cache
def append_cache_record(
cache_dir: Path, dimension: Dimension, record: Mapping[str, Any]
) -> None:
"""Append a single record to the dimension's cache JSONL."""
cache_dir.mkdir(parents=True, exist_ok=True)
path = cache_path_for(cache_dir, dimension)
with path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False))
fh.write("\n")
def _record_to_raw(
record: Mapping[str, Any], pair: PairToJudge, rubric: Rubric
) -> _RawJudgement:
"""Reconstruct a ``_RawJudgement`` from a cached record + the live pair."""
metadata = record.get("metadata", {}) or {}
return _RawJudgement(
cache_key=record["cache_key"],
pair=pair,
rubric_version=record.get("rubric_version", rubric.version),
judge_model=record["judge_model"],
position_swap=bool(metadata.get("position_swap", False)),
verdict=_coerce_verdict(record["verdict"]),
reasoning=record.get("reasoning", ""),
timestamp=record.get("timestamp", ""),
)
def _raw_to_record(raw: _RawJudgement, dimension: Dimension) -> dict[str, Any]:
return {
"cache_key": raw.cache_key,
"rubric_version": raw.rubric_version,
"judge_model": raw.judge_model,
"verdict": raw.verdict,
"reasoning": raw.reasoning,
"timestamp": raw.timestamp,
"metadata": {
"dimension": dimension,
"prompt_id": raw.pair.prompt_id,
"entrant_a": raw.pair.entrant_a,
"entrant_b": raw.pair.entrant_b,
"position_swap": raw.position_swap,
"source": raw.pair.source,
"cache_creation_input_tokens": raw.cache_creation_input_tokens,
"cache_read_input_tokens": raw.cache_read_input_tokens,
"input_tokens": raw.input_tokens,
"output_tokens": raw.output_tokens,
},
}
# --------------------------------------------------------------------------- #
# Verdict parsing & swap reconciliation
# --------------------------------------------------------------------------- #
def _coerce_verdict(value: Any) -> Verdict:
text = str(value).strip().upper()
if text in ("A", "B", "TIE"):
return text # type: ignore[return-value]
raise ValueError(f"Unrecognised verdict value: {value!r}")
[docs]
def parse_verdict(reply_text: str) -> Verdict:
"""Pull the final ``VERDICT: X`` line out of the model's reply.
Falls back to ``TIE`` if the model emits something we cannot parse —
callers that want stricter behaviour should inspect the reasoning
text directly.
"""
lines = [line.strip() for line in reply_text.splitlines() if line.strip()]
for line in reversed(lines):
upper = line.upper()
if upper.startswith("VERDICT:"):
tail = upper.split(":", 1)[1].strip()
for candidate in ("TIE", "A", "B"):
if tail.startswith(candidate):
return candidate # type: ignore[return-value]
logger.warning("Could not parse a verdict from judge reply; defaulting to TIE.")
return "TIE"
def _swapped_verdict_to_entrant(
verdict: Verdict, position_swap: bool, pair: PairToJudge
) -> str | None:
"""Translate a position-local verdict to a concrete entrant id.
With ``position_swap=False`` the slots map straight through;
with ``position_swap=True`` slot A holds entrant_b and slot B holds
entrant_a, so the entrant ids must be swapped accordingly. Ties
return ``None``.
"""
if verdict == "TIE":
return None
if not position_swap:
return pair.entrant_a if verdict == "A" else pair.entrant_b
return pair.entrant_b if verdict == "A" else pair.entrant_a
[docs]
def reconcile_swap(
forward: _RawJudgement, swapped: _RawJudgement, dimension: Dimension
) -> Judgment:
"""Combine the two position-ordered raw judgements into one ``Judgment``."""
if forward.pair.prompt_id != swapped.pair.prompt_id:
raise ValueError("Cannot reconcile raw judgements from different prompts.")
if forward.position_swap == swapped.position_swap:
raise ValueError(
"Both raw judgements share the same position_swap flag; "
"expected one forward and one swapped."
)
forward_winner = _swapped_verdict_to_entrant(
forward.verdict, forward.position_swap, forward.pair
)
swapped_winner = _swapped_verdict_to_entrant(
swapped.verdict, swapped.position_swap, swapped.pair
)
if forward_winner is not None and forward_winner == swapped_winner:
winner = forward_winner
inconsistent = False
elif forward_winner is None and swapped_winner is None:
winner = None
inconsistent = False
else:
# One says A wins, the other says B wins (or one ties and the other
# picks a side) — collapse to a tie and flag inconsistency.
winner = None
inconsistent = True
pair = forward.pair
raw_pair = (forward, swapped) if not forward.position_swap else (swapped, forward)
return Judgment(
prompt_id=pair.prompt_id,
dimension=dimension,
entrant_a=pair.entrant_a,
entrant_b=pair.entrant_b,
winner=winner,
inconsistent=inconsistent,
raw=raw_pair,
)
# --------------------------------------------------------------------------- #
# Anthropic batch submission
# --------------------------------------------------------------------------- #
def _make_anthropic_client(config: JudgeConfig):
"""Construct an Anthropic SDK client; fail loudly if no key is configured."""
api_key = config.api_key or os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise RuntimeError(
"ANTHROPIC_API_KEY is not set. Add it to your .env file (see "
".env.example) or pass api_key explicitly to JudgeConfig."
)
# The anthropic SDK only ships in the default pixi env (the judge runs
# without a GPU); the cuda env used for typechecking does not have it,
# so we resolve it dynamically to keep static-analysis green.
try:
anthropic = importlib.import_module("anthropic")
except ImportError as exc: # pragma: no cover - install error path
raise RuntimeError(
"The `anthropic` package is required for judge_pairs; install it "
"via the default pixi env."
) from exc
return anthropic.Anthropic(api_key=api_key)
def _build_batch_requests(
queries: list[_JudgeQuery], max_tokens: int
) -> list[dict[str, Any]]:
"""Convert each query into a Batch API request dict."""
return [
{
"custom_id": query.cache_key,
"params": _build_request_params(query, max_tokens=max_tokens),
}
for query in queries
]
def _submit_batch_with_retry(client, requests: list[dict[str, Any]], max_retries: int):
"""Submit a batch, retrying on transient rate-limit errors."""
backoff = _SUBMIT_INITIAL_BACKOFF_SECONDS
last_exc: Exception | None = None
for attempt in range(max_retries):
try:
return client.messages.batches.create(requests=requests)
except Exception as exc: # pragma: no cover - transient network paths
last_exc = exc
status_code = getattr(exc, "status_code", None)
if status_code != 429 or attempt == max_retries - 1:
raise
logger.warning(
"Batch submission rate-limited (attempt %d/%d); sleeping %.1fs",
attempt + 1,
max_retries,
backoff,
)
time.sleep(backoff)
backoff *= 2
# Defensive — the loop should either return or raise above.
raise RuntimeError(
"Batch submission failed without a captured exception"
) from last_exc
def _poll_batch(client, batch_id: str, config: JudgeConfig) -> Any:
"""Poll until the batch ends, growing the sleep interval geometrically."""
delay = config.poll_initial_seconds
while True:
batch = client.messages.batches.retrieve(batch_id)
status = getattr(batch, "processing_status", None)
logger.info("Batch %s status: %s", batch_id, status)
if status == "ended":
return batch
if status in ("canceled", "expired"):
raise RuntimeError(
f"Batch {batch_id} terminated with status {status!r}; "
"see the Anthropic console for details."
)
time.sleep(delay)
delay = min(config.poll_max_seconds, delay * _POLL_GROWTH)
def _extract_reply_text(message: Any) -> str:
"""Pull plain text out of a Messages API response."""
chunks: list[str] = []
for block in getattr(message, "content", []) or []:
block_type = getattr(block, "type", None) or (
block.get("type") if isinstance(block, dict) else None
)
if block_type == "text":
text = getattr(block, "text", None) or (
block.get("text") if isinstance(block, dict) else ""
)
if text:
chunks.append(text)
return "\n".join(chunks)
def _process_batch_results(
client,
batch_id: str,
queries_by_key: Mapping[str, _JudgeQuery],
) -> tuple[list[_RawJudgement], list[str]]:
"""Stream batch results into ``_RawJudgement`` records.
Returns ``(judgements, failed_custom_ids)`` — failed entries are
skipped so the caller can re-run them next time.
"""
raw: list[_RawJudgement] = []
failed: list[str] = []
for entry in client.messages.batches.results(batch_id):
custom_id = getattr(entry, "custom_id", None)
if custom_id is None or custom_id not in queries_by_key:
logger.warning(
"Batch %s returned unknown custom_id %r", batch_id, custom_id
)
continue
query = queries_by_key[custom_id]
result = getattr(entry, "result", None)
result_type = getattr(result, "type", None)
if result_type != "succeeded":
logger.error(
"Batch %s: request %s failed with type=%s",
batch_id,
custom_id,
result_type,
)
failed.append(custom_id)
continue
message = getattr(result, "message", None)
reply_text = _extract_reply_text(message)
if not reply_text:
logger.error(
"Batch %s: request %s returned an empty reply", batch_id, custom_id
)
failed.append(custom_id)
continue
verdict = parse_verdict(reply_text)
usage = getattr(message, "usage", None)
raw.append(
_RawJudgement(
cache_key=custom_id,
pair=query.pair,
rubric_version=query.rubric.version,
judge_model=query.judge_model,
position_swap=query.position_swap,
verdict=verdict,
reasoning=reply_text,
timestamp=datetime.now(timezone.utc).isoformat(timespec="seconds"),
cache_creation_input_tokens=getattr(
usage, "cache_creation_input_tokens", 0
)
or 0,
cache_read_input_tokens=getattr(usage, "cache_read_input_tokens", 0)
or 0,
input_tokens=getattr(usage, "input_tokens", 0) or 0,
output_tokens=getattr(usage, "output_tokens", 0) or 0,
)
)
return raw, failed
# --------------------------------------------------------------------------- #
# Public API
# --------------------------------------------------------------------------- #
[docs]
def judge_pairs(
pairs: list[PairToJudge],
rubric: Rubric,
config: JudgeConfig,
*,
client: Any = None,
) -> JudgeResult:
"""Judge ``pairs`` on ``rubric`` and return reconciled judgments.
Loads the on-disk cache for the rubric's dimension, filters out any
pair-orderings already judged, submits the rest as one Anthropic
batch with the rubric prefix marked as a 1-hour cache breakpoint,
polls until the batch ends, writes new entries to the cache, and
reconciles each pair's two position orders into one ``Judgment``.
``client`` is optional — passing one in is convenient for tests.
"""
filtered_pairs = filter_pairs_for_dimension(pairs, rubric.dimension)
if not filtered_pairs:
logger.info(
"judge_pairs(%s): no pairs to judge after filtering", rubric.dimension
)
return JudgeResult(judgments=[], cache_stats=CacheStats(), rubric=rubric)
cache = load_cache(config.cache_dir, rubric.dimension)
logger.info(
"Loaded %d cached judgments for %s; total queries to plan: %d",
len(cache),
rubric.dimension,
len(filtered_pairs) * 2,
)
queries: list[_JudgeQuery] = []
cached_raw_by_key: dict[str, _RawJudgement] = {}
for pair in filtered_pairs:
for swap in (False, True):
response_a = pair.response_a
response_b = pair.response_b
cache_key = derive_cache_key(
rubric_text=rubric.text,
judge_model=config.judge_model,
prompt_id=pair.prompt_id,
entrant_a=pair.entrant_a,
entrant_b=pair.entrant_b,
position_swap=swap,
response_a=response_a,
response_b=response_b,
)
query = _JudgeQuery(
cache_key=cache_key,
pair=pair,
rubric=rubric,
position_swap=swap,
judge_model=config.judge_model,
)
cached_record = cache.get(cache_key)
if cached_record is not None:
cached_raw_by_key[cache_key] = _record_to_raw(
cached_record, pair, rubric
)
else:
queries.append(query)
stats = CacheStats(cache_hits=len(cached_raw_by_key))
logger.info(
"Cache hits: %d; fresh queries to submit: %d",
len(cached_raw_by_key),
len(queries),
)
fresh_raw: list[_RawJudgement] = []
if queries:
if client is None:
client = _make_anthropic_client(config)
fresh_raw = _run_batch(queries, config, client)
for raw in fresh_raw:
append_cache_record(
config.cache_dir,
rubric.dimension,
_raw_to_record(raw, rubric.dimension),
)
stats.cache_creation_input_tokens += raw.cache_creation_input_tokens
stats.cache_read_input_tokens += raw.cache_read_input_tokens
stats.input_tokens += raw.input_tokens
stats.output_tokens += raw.output_tokens
stats.fresh_calls += 1
fresh_by_key = {raw.cache_key: raw for raw in fresh_raw}
raw_by_key: dict[str, _RawJudgement] = {**cached_raw_by_key, **fresh_by_key}
judgments: list[Judgment] = []
for pair in filtered_pairs:
forward_key = derive_cache_key(
rubric_text=rubric.text,
judge_model=config.judge_model,
prompt_id=pair.prompt_id,
entrant_a=pair.entrant_a,
entrant_b=pair.entrant_b,
position_swap=False,
response_a=pair.response_a,
response_b=pair.response_b,
)
swapped_key = derive_cache_key(
rubric_text=rubric.text,
judge_model=config.judge_model,
prompt_id=pair.prompt_id,
entrant_a=pair.entrant_a,
entrant_b=pair.entrant_b,
position_swap=True,
response_a=pair.response_a,
response_b=pair.response_b,
)
forward = raw_by_key.get(forward_key)
swapped = raw_by_key.get(swapped_key)
if forward is None or swapped is None:
logger.warning(
"Skipping reconciliation for prompt %s on %s: missing %s verdict",
pair.prompt_id,
rubric.dimension,
"forward" if forward is None else "swapped",
)
continue
judgments.append(reconcile_swap(forward, swapped, rubric.dimension))
return JudgeResult(judgments=judgments, cache_stats=stats, rubric=rubric)
def _run_batch(
queries: list[_JudgeQuery], config: JudgeConfig, client: Any
) -> list[_RawJudgement]:
"""Submit, poll, and collect one batch of judge queries."""
requests = _build_batch_requests(queries, max_tokens=config.max_tokens)
queries_by_key = {q.cache_key: q for q in queries}
logger.info("Submitting batch with %d requests", len(requests))
batch = _submit_batch_with_retry(client, requests, config.submit_max_retries)
batch_id = getattr(batch, "id", None)
if batch_id is None:
raise RuntimeError("Anthropic batch creation returned no id")
logger.info("Batch submitted: %s", batch_id)
_poll_batch(client, batch_id, config)
raw, failed = _process_batch_results(client, batch_id, queries_by_key)
if failed:
logger.error(
"Batch %s: %d/%d requests failed; re-run to retry these.",
batch_id,
len(failed),
len(requests),
)
return raw
# Public re-exports for tests and downstream callers.
__all__ = [
"CURATED_SOURCE",
"DIMENSIONS",
"CacheStats",
"Dimension",
"Judgment",
"JudgeConfig",
"JudgeResult",
"PairToJudge",
"Rubric",
"Verdict",
"build_system_message",
"build_user_message",
"cache_path_for",
"derive_cache_key",
"filter_pairs_for_dimension",
"judge_pairs",
"load_cache",
"parse_verdict",
"reconcile_swap",
]
# Internal exports — useful for tests, not part of the stable surface.
def _make_raw_for_test(
pair: PairToJudge,
rubric: Rubric,
judge_model: str,
position_swap: bool,
verdict: Verdict,
reasoning: str = "",
) -> _RawJudgement:
"""Build a ``_RawJudgement`` from primitive inputs (test helper)."""
cache_key = derive_cache_key(
rubric_text=rubric.text,
judge_model=judge_model,
prompt_id=pair.prompt_id,
entrant_a=pair.entrant_a,
entrant_b=pair.entrant_b,
position_swap=position_swap,
response_a=pair.response_a,
response_b=pair.response_b,
)
return _RawJudgement(
cache_key=cache_key,
pair=pair,
rubric_version=rubric.version,
judge_model=judge_model,
position_swap=position_swap,
verdict=verdict,
reasoning=reasoning,
timestamp=datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
def _judgment_as_dict(judgment: Judgment) -> dict[str, Any]:
"""JSON-friendly view of a ``Judgment`` (handy for run logs)."""
return {
"prompt_id": judgment.prompt_id,
"dimension": judgment.dimension,
"entrant_a": judgment.entrant_a,
"entrant_b": judgment.entrant_b,
"winner": judgment.winner,
"inconsistent": judgment.inconsistent,
"raw": [asdict(r) for r in judgment.raw],
}