Skip to content

Scenario Run

A ScenarioRun is the plan you hand to model risk — shocks, base tables, aggregations, and an optional master seed bundled into a single object you can serialise to YAML, run, and reproduce byte-for-byte from the saved recipe alone.

Three things you get for the cost of building one:

  • A SHA over the recipe. Two equivalent plans hash to the same source_sha; changing any shock parameter, table content, or aggregator changes it.
  • An opt-in audit sidecar. A JSON file beside the run output that records what produced the numbers — the SHA, the canonical recipe, the library versions, and every aggregator output.
  • A bit-exact reload. Save the plan to YAML, hand both to a different machine, reload, run on the same inputs — every scalar aggregator matches the original to the bit.

You'll reach for ScenarioRun when the run needs to be reproducible months later. For one-shot exploration the raw for_each_scenario loop is fine — you can promote to a ScenarioRun when the analysis is settled.


Build a plan

A Solvency II SCR plan, sized small but shaped like a real one — three stress shocks against mortality and lapse tables, plus a mass-lapse override.

import polars as pl
from gaspatchio_core.assumptions import Table
from gaspatchio_core.frame import ActuarialFrame
from gaspatchio_core.scenarios import (
    CTE, ArgMax, MultiplicativeShock, OverrideShock, ScenarioRun, Sum,
)


def policies() -> ActuarialFrame:
    return ActuarialFrame({
        "policy_id": list(range(1, 11)),
        "age":       [30, 31, 32, 33, 34, 30, 31, 32, 33, 34],
        "sum_insured": [100_000.0] * 10,
    })


mortality = Table(
    name="mortality",
    source=pl.DataFrame({
        "age":  [30, 31, 32, 33, 34],
        "rate": [0.001, 0.0012, 0.0015, 0.0018, 0.0022],
    }),
    dimensions={"age": "age"},
    value="rate",
)

lapse = Table(
    name="lapse",
    source=pl.DataFrame({
        "age":  [30, 31, 32, 33, 34],
        "rate": [0.06, 0.05, 0.05, 0.04, 0.04],
    }),
    dimensions={"age": "age"},
    value="rate",
)


def bel_model(af, *, tables, drivers=None):
    """Expected claims = sum_insured × mortality × (1 - lapse)."""
    m  = tables["mortality"]
    lp = tables["lapse"]
    qx = m.lookup(scenario_id=af["scenario_id"], age=af["age"])
    px = lp.lookup(scenario_id=af["scenario_id"], age=af["age"])
    return af.with_columns(
        (af["sum_insured"] * qx * (1.0 - px)).alias("bel")
    )


plan = ScenarioRun(
    shocks={
        "BASE":         [],
        "MORTALITY_UP": [MultiplicativeShock(factor=1.5, table="mortality")],
        "LAPSE_UP":     [MultiplicativeShock(factor=1.5, table="lapse")],
        "MASS_LAPSE":   [OverrideShock(value=0.4, table="lapse")],
    },
    base_tables={"mortality": mortality, "lapse": lapse},
    aggregations=(
        Sum("bel").alias("bel"),
        CTE("bel", level=0.005, direction="upper").alias("scr"),
        ArgMax("bel").alias("worst_scenario"),
    ),
    master_seed=42,
)

print(plan.source_sha())
# sha256:48d9d126e...

A few things to notice. The model function takes a tables dict keyed by table name — the loop hands you the shocked version, stacked across scenarios. Your model code calls Table.lookup with both scenario_id and the table's dimension columns; the loop has already done the stacking.

The aggregations tuple mixes a scalar Sum, a sketch-backed CTE, and an ArgMax that returns a scenario name. Each carries its own column (Sum("bel")) and its own alias (.alias("bel")).

The master_seed participates in the SHA — two otherwise identical plans with different seeds have different source_sha values.


Run with an audit sidecar

plan.run(...) accepts an audit argument. Pass a Path to write the sidecar to that exact location; pass True to write to a default location under ./gaspatchio_audit/<run_id>.audit.json; leave it False (the default) for no sidecar.

from pathlib import Path

result = plan.run(
    policies(),
    bel_model,
    batch_size=1,
    audit=Path("./run.audit.json"),
)

print(result.aggregations["bel"])             # 6030.0
print(result.aggregations["scr"])             # 2198.20
print(result.aggregations["worst_scenario"])  # 'MORTALITY_UP'
print(result.audit_path)                      # PosixPath('run.audit.json')

result.aggregations is keyed by your aliases — scalars come back as floats, partitioned aggregators (.over(...)) come back as pl.DataFrame. result.audit_path is the file the loop wrote, or None if you didn't ask for one.


Read the audit sidecar

The sidecar is plain JSON. The reader is a one-liner.

from gaspatchio_core.scenarios._audit import read_audit

audit = read_audit(Path("./run.audit.json"))

print(sorted(audit.keys()))
# ['aggregator_outputs',
#  'input_data_fingerprint',
#  'plan_canonical_form',
#  'run_metadata',
#  'schema_version',
#  'source_sha']

print(audit["schema_version"])              # '1.0'
print(audit["source_sha"] == plan.source_sha())  # True
print(audit["aggregator_outputs"])
# {'bel': 6030.0, 'scr': 2198.20..., 'worst_scenario': 'MORTALITY_UP'}

print(sorted(audit["run_metadata"].keys()))
# ['batch_size', 'batch_size_resolution', 'ddsketch_version',
#  'library_version', 'master_seed', 'n_scenarios',
#  'polars_version', 'python_version', 'wall_time_s']

source_sha ties the sidecar to the plan; plan_canonical_form is the full recipe (shocks, base table fingerprints, aggregations); run_metadata records the library versions you ran against. Partitioned aggregator outputs are coerced to a list of row dictionaries so the file stays portable.


Save and reload from YAML

plan.to_yaml(path) writes the recipe — shocks, aggregations, master seed — but not the base table data (tables travel separately). ScenarioRun.from_yaml(path, base_tables=...) reconstructs the plan; you provide the tables yourself.

plan.to_yaml(Path("./plan.yaml"))

reloaded = ScenarioRun.from_yaml(
    Path("./plan.yaml"),
    base_tables={"mortality": mortality, "lapse": lapse},
)

print(reloaded.source_sha() == plan.source_sha())  # True

reloaded_result = reloaded.run(policies(), bel_model, batch_size=1)
print(reloaded_result.aggregations["bel"] == result.aggregations["bel"])  # True
print(reloaded_result.aggregations["scr"] == result.aggregations["scr"])  # True

The SHA survives because the canonical form is order-independent — insertion order on the shocks dict and the aggregations tuple doesn't affect the hash, only content does. The aggregator values reproduce bit-exactly because the same inputs produce the same sketch state, the same Welford accumulator state, and the same fold order.

If you load a plan that was written in the pre-aggregator format (a dict rather than a list under aggregations:), from_yaml raises a pointed ValueError directing you at the migration path.

Cross-process reproduction — the governance journey

The point of to_yaml / from_yaml isn't single-process round-tripping. It's the model-risk workflow: an actuary builds a plan, runs it, sends the YAML + the JSON audit sidecar to model risk; model risk reloads the plan in a different process — different shell, different machine, different week — and re-runs against the same model points + the same base tables. The plan SHA matches; the aggregator values match byte-for-byte. The audit chain pins exactly which run produced which numbers.

The recipe ships only the shock list, the aggregator list, the master seed, and a content fingerprint for each base table. The base tables themselves travel separately (parquet files in a governed assumptions folder, lifelib output, anything you can re-hand). On the consumer side:

# In the reviewer's shell — `plan.yaml` arrived alongside the model
# release notes; assumption tables live in a versioned assumptions folder.

base_tables = {
    "mortality": Table(name="mortality", source="assumptions/q4_2026/mortality.parquet", ...),
    "lapse":     Table(name="lapse",     source="assumptions/q4_2026/lapse.parquet",     ...),
}

reloaded = ScenarioRun.from_yaml("plan.yaml", base_tables=base_tables)

# Verify the plan SHA matches what the release notes pinned. If this fails,
# something changed — either the YAML or one of the base tables.
assert reloaded.source_sha() == "sha256:48d9d126..."

# Run against the same model points the original run used (parquet stamped
# alongside the release).
mp = pl.read_parquet("model_points_q4_2026.parquet")
result = reloaded.run(ActuarialFrame(mp), model_fn, batch_size=1)

# The aggregator values are bit-for-bit identical to the original run.
assert result.aggregations["bel"] == "<<the value the release notes quoted>>"

Three things make this work:

  1. Plan SHA is content-only. Insertion order on shocks doesn't affect it; dict permutations and aggregator order shuffles produce the same hash.
  2. Base table fingerprints are recorded in the canonical form. The reloaded plan checks that the supplied tables produce the same fingerprints the original plan recorded. A swapped parquet (even with the same name) breaks the SHA.
  3. Aggregator state is reproducibility-stable. Welford accumulators (Mean, Std, Variance) merge order-independently; DDSketch quantile sketches (Quantile, CTE, Median, QuantileRank) are mergeable in any order with bit-exact equivalence; trivial reductions (Sum, Count, Min, Max) are by construction order-independent.

If the recipe drifts between the producer and the consumer side — a different master_seed, a different shock list, a different base-table fingerprint — source_sha() changes and the assertion fires. The failure is loud, structured, and tells you exactly which input shifted.

Limitation — non-flat shocks

Today, to_yaml/from_yaml round-trips the flat shock composables (MultiplicativeShock, AdditiveShock, OverrideShock). The conditional/composing shocks (FilteredShock, TimeConditionalShock, PipelineShock) do not yet have YAML serialisation. For non-flat plans, governance leans on the plan SHA + the JSON audit sidecar (both still work) rather than the YAML recipe — and the producing code lives in version control alongside the release.


master_seed and stochastic kernels

If your model uses random draws (e.g., numpy.random.default_rng(...)), set master_seed and read drivers["rng_seed"] inside model_fn:

def stochastic_bel(af, *, tables, drivers):
    rng = numpy.random.default_rng(drivers["rng_seed"])
    # use rng inside the kernel
    ...

The loop derives a per-scenario seed via sha256(f"gsp-100|{master_seed}|{scenario_id}"), so the same (master_seed, scenario_id) pair always produces the same draws — bit-stable across machines and Python versions.

master_seed is wired to model_fn only at batch_size=1. With batch_size > 1 the loop raises a clear ValueError rather than silently dropping the seed. The same constraint applies to the drivers-dict scenario shape.


Composers

ScenarioRun is immutable. Three helpers return new plans:

seeded   = plan.with_master_seed(42)
stressed = plan.with_extra_shocks({"INTEREST_DOWN": [...]})
broader  = plan.with_extra_aggregations((Sum("bel").alias("by_lob").over("lob"),))

Each returns a new plan with a new source_sha. The original is unchanged.


When to reach for ScenarioRun

You want... Use
Quick exploration; one-shot stress for_each_scenario(...) raw
A run you'll reproduce in 6 months ScenarioRun + audit=True
A plan you'll hand to model risk or a regulator ScenarioRun + to_yaml + audit=Path(...)
A run that needs to match across machines ScenarioRun(..., master_seed=...) + audit

Next

You've now seen the full pre-built surface — 14 aggregators, the typed plan, the audit chain. If you need an aggregator that's not in the box, the next page walks through writing a custom aggregator.