Skip to content

Orchestrator

Core orchestration logic for the multi-agent grading pipeline.

Pipeline stages
  1. Validate the input summary.
  2. Parallel Score: three clinical roles score independently via asyncio.gather().
  3. Validate Scorecards: retry any role whose output fails validation (missing dims, out-of-range scores, etc.).
  4. Disagreement Map: compute per-dimension cross-role score gaps.
  5. Conditional Adjudication (LLM mode only): if any gap ≥ threshold, an adjudicator LLM refines disputed dimensions.
  6. Aggregate: compute per-role weighted overalls and cross-role mean.
  7. Return structured result dict.

build_disagreement_map(scorecards_by_role_id, gap_threshold)

Build a per-dimension disagreement map across the three roles.

For each dimension, computes the score gap (max - min) and flags dimensions where the gap meets or exceeds gap_threshold.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dict mapping dimension_id → {role_scores, score_gap, flag}.

Source code in src/grading_pipeline/orchestrator.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def build_disagreement_map(
    scorecards_by_role_id: Dict[str, Dict[str, Any]], gap_threshold: float
) -> Dict[str, Dict[str, Any]]:
    """Build a per-dimension disagreement map across the three roles.

    For each dimension, computes the score gap (max - min) and flags
    dimensions where the gap meets or exceeds ``gap_threshold``.

    Returns:
        Dict mapping dimension_id → {role_scores, score_gap, flag}.
    """
    disagreement_map: Dict[str, Dict[str, Any]] = {}

    for dim in DIMENSION_IDS:
        role_scores = {
            ROLE_NAME_BY_ID[role_id]: float(scorecards_by_role_id[role_id]["scores"][dim])
            for role_id in CANONICAL_ROLE_IDS
        }
        values = list(role_scores.values())
        gap = max(values) - min(values)
        disagreement_map[dim] = {
            "role_scores": role_scores,
            "score_gap": round(gap, 4),
            "flag": gap >= gap_threshold,
        }

    return disagreement_map

calibrate_weights(w_prior, delta_w=None)

Combine prior weights with an optional delta adjustment and normalize.

Note: delta_w is currently a no-op stub (always zeros). It exists as a hook for future weight calibration logic.

Source code in src/grading_pipeline/orchestrator.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def calibrate_weights(
    w_prior: Dict[str, float], delta_w: Dict[str, float] | None = None
) -> Dict[str, float]:
    """Combine prior weights with an optional delta adjustment and normalize.

    Note: ``delta_w`` is currently a no-op stub (always zeros).  It exists
    as a hook for future weight calibration logic.
    """
    if delta_w is None:
        delta_w = {dim: 0.0 for dim in DIMENSION_IDS}

    combined = {
        dim: float(w_prior.get(dim, 0.0)) + float(delta_w.get(dim, 0.0))
        for dim in DIMENSION_IDS
    }
    return _normalize_weights(combined)

run_pipeline(summary, mode, output_format, *, rubric, roles, model='gpt-4o-mini', temperature=0.2, gap_threshold=0.5, max_retries=2, role_scorer=None, adjudicator=None) async

Run the full grading pipeline end-to-end.

Parameters:

Name Type Description Default
summary str

Raw clinical summary text.

required
mode str

"llm" or "heuristic".

required
output_format str

"human" or "json" (passed through to meta).

required
rubric Rubric

Loaded rubric with dimension definitions.

required
roles List[RoleProfile]

List of 3 RoleProfile instances.

required
model str

OpenAI model ID (LLM mode only).

'gpt-4o-mini'
temperature float

Sampling temperature (LLM mode only).

0.2
gap_threshold float

Min score gap to trigger adjudication.

0.5
max_retries int

Max re-scoring attempts per role on validation failure.

2
role_scorer Callable[[str, RoleProfile, Rubric], AgentScore] | None

Optional override for the per-role scoring function (useful for testing without API calls).

None
adjudicator Callable[..., Dict[str, Dict[str, Dict[str, Any]]]] | None

Optional override for the adjudication function.

None

Returns:

Type Description
Dict[str, Any]

Dict with keys: per_role_scorecards, disagreement_map,

Dict[str, Any]

adjudication_ran, overall_across_roles, meta.

Source code in src/grading_pipeline/orchestrator.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
async def run_pipeline(
    summary: str,
    mode: str,
    output_format: str,
    *,
    rubric: Rubric,
    roles: List[RoleProfile],
    model: str = "gpt-4o-mini",
    temperature: float = 0.2,
    gap_threshold: float = 0.5,
    max_retries: int = 2,
    role_scorer: Callable[[str, RoleProfile, Rubric], AgentScore] | None = None,
    adjudicator: Callable[..., Dict[str, Dict[str, Dict[str, Any]]]] | None = None,
) -> Dict[str, Any]:
    """Run the full grading pipeline end-to-end.

    Args:
        summary: Raw clinical summary text.
        mode: ``"llm"`` or ``"heuristic"``.
        output_format: ``"human"`` or ``"json"`` (passed through to meta).
        rubric: Loaded rubric with dimension definitions.
        roles: List of 3 ``RoleProfile`` instances.
        model: OpenAI model ID (LLM mode only).
        temperature: Sampling temperature (LLM mode only).
        gap_threshold: Min score gap to trigger adjudication.
        max_retries: Max re-scoring attempts per role on validation failure.
        role_scorer: Optional override for the per-role scoring function
            (useful for testing without API calls).
        adjudicator: Optional override for the adjudication function.

    Returns:
        Dict with keys: ``per_role_scorecards``, ``disagreement_map``,
        ``adjudication_ran``, ``overall_across_roles``, ``meta``.
    """
    if mode not in {"llm", "heuristic"}:
        raise ValueError(f"Unsupported mode: {mode}")

    checked_summary = validate_summary_text(summary)

    roles_by_id = {role.id: role for role in roles}
    missing_roles = [role_id for role_id in CANONICAL_ROLE_IDS if role_id not in roles_by_id]
    if missing_roles:
        raise ValueError(f"Missing required role configs: {missing_roles}")

    def score_once(input_summary: str, role: RoleProfile, input_rubric: Rubric) -> AgentScore:
        if role_scorer is not None:
            return role_scorer(input_summary, role, input_rubric)
        if mode == "llm":
            return score_summary_llm(
                input_summary,
                role,
                input_rubric,
                model=model,
                temperature=temperature,
            )
        return score_summary_heuristic(input_summary, role, input_rubric)

    async def run_role(role: RoleProfile) -> AgentScore:
        return await asyncio.to_thread(score_once, checked_summary, role, rubric)

    initial_agents = await asyncio.gather(
        *(run_role(roles_by_id[role_id]) for role_id in CANONICAL_ROLE_IDS)
    )

    scorecards_by_role_id = {
        role_id: _agent_to_scorecard(agent, roles_by_id[role_id])
        for role_id, agent in zip(CANONICAL_ROLE_IDS, initial_agents)
    }

    for role_id in CANONICAL_ROLE_IDS:
        retries = 0
        while True:
            errors = _validate_scorecard(scorecards_by_role_id[role_id])
            if not errors:
                break

            if retries >= max_retries:
                raise RuntimeError(
                    f"Validation failed for {role_id} after {max_retries} retries: {errors}"
                )

            retries += 1
            repaired = await run_role(roles_by_id[role_id])
            scorecards_by_role_id[role_id] = _agent_to_scorecard(
                repaired, roles_by_id[role_id]
            )

    initial_disagreement_map = build_disagreement_map(scorecards_by_role_id, gap_threshold)
    disputed_dims = [
        dim for dim, item in initial_disagreement_map.items() if item["flag"]
    ]

    adjudication_ran = False
    if mode == "llm" and disputed_dims:
        adjudication_ran = True

        if adjudicator is not None:
            updates = adjudicator(
                summary=checked_summary,
                rubric=rubric,
                scorecards_by_role_id=scorecards_by_role_id,
                disputed_dims=disputed_dims,
                model="gpt-4o",
            )
        else:
            updates = await asyncio.to_thread(
                _default_adjudicator,
                summary=checked_summary,
                rubric=rubric,
                scorecards_by_role_id=scorecards_by_role_id,
                disputed_dims=disputed_dims,
                model="gpt-4o",
            )

        _apply_adjudication_updates(scorecards_by_role_id, updates, disputed_dims)

        for role_id in CANONICAL_ROLE_IDS:
            retries = 0
            while True:
                errors = _validate_scorecard(scorecards_by_role_id[role_id])
                if not errors:
                    break

                if retries >= max_retries:
                    raise RuntimeError(
                        "Validation failed after adjudication for "
                        f"{role_id} after {max_retries} retries: {errors}"
                    )

                retries += 1
                repaired = await run_role(roles_by_id[role_id])
                repaired_scorecard = _agent_to_scorecard(repaired, roles_by_id[role_id])
                _repair_disputed_fields(
                    scorecards_by_role_id[role_id],
                    repaired_scorecard,
                    disputed_dims,
                )

    disagreement_map = build_disagreement_map(scorecards_by_role_id, gap_threshold)
    overall_across_roles = _aggregate_role_overalls(scorecards_by_role_id, roles_by_id)

    per_role_scorecards = [
        scorecards_by_role_id[role_id] for role_id in CANONICAL_ROLE_IDS
    ]

    return {
        "per_role_scorecards": per_role_scorecards,
        "disagreement_map": disagreement_map,
        "adjudication_ran": adjudication_ran,
        "overall_across_roles": overall_across_roles,
        "meta": {
            "version": "orchestrator_v1",
            "mode": mode,
            "output_format": output_format,
            "gap_threshold": gap_threshold,
            "max_retries": max_retries,
        },
    }