Aggregators¶
When you run a plan across many scenarios, you need numbers that summarise the run — total claims, the worst case, capital at the 99.5% tail, the spread across runs. The aggregator layer turns each scenario's projection into one of those figures, and combines figures across scenarios in a way that's mergeable, partitionable, and reproducible byte-for-byte.
Aggregators compute as the run progresses. Each aggregator keeps a small running summary as scenarios stream past — Sum carries a running total, Max carries the largest-so-far, Mean carries (total, count). The alternative — collecting every per-scenario value into one array and summarising at the end — is what you'd reach for in a spreadsheet, but it doesn't survive into the millions of scenarios. Streaming aggregation keeps peak memory bounded regardless of run length, which is what lets the same aggregator definitions run three deterministic scenarios and ten thousand stochastic ones unchanged.
Every aggregator carries:
- a column to read from your model's per-scenario output, and
- a within-scenario reduction (sum by default — see
within=later).
You combine aggregators into a tuple, pass them to for_each_scenario or ScenarioRun, and read the result back by .alias().
Compute totals¶
The most common scenario question — what's the total across every scenario, and how many scenarios contributed?
import polars as pl
from gaspatchio_core.frame import ActuarialFrame
from gaspatchio_core.scenarios import Sum, Count, for_each_scenario
def policies() -> ActuarialFrame:
return ActuarialFrame({
"policy_id": [1, 2, 3, 4, 5, 6, 7, 8],
"age": [30, 31, 32, 33, 30, 31, 32, 33],
"premium": [100.0, 150.0, 200.0, 250.0, 300.0, 350.0, 400.0, 450.0],
})
def claim_model(af, *, tables=None, drivers=None):
return af.with_columns(af["premium"].alias("claim"))
result = for_each_scenario(
policies(),
scenarios=["BASE", "STRESS_A", "STRESS_B"],
model_fn=claim_model,
aggregations=(
Sum("claim").alias("total_claims"),
Count("claim").alias("scenario_count"),
),
)
print(result.aggregations["total_claims"]) # 6600.0
print(result.aggregations["scenario_count"]) # 3
Each policy contributes its premium as a claim. Within each scenario the claims sum to 2,200; across three scenarios that sums to 6,600. Count is the number of scenarios that contributed at least one value — 3.
Two patterns to notice. The aggregator's column lives on the aggregator (Sum("claim")) — not on a separate config. The alias is the key you read the result by (result.aggregations["total_claims"]).
Find the worst case¶
When you're stressing assumptions, "which scenario produced the largest loss" is just as important as the size of the loss. ArgMax answers it — it returns the scenario_id, not the value.
from gaspatchio_core.scenarios import Max, Min, ArgMax, for_each_scenario
def stressed_model(af, *, tables=None, drivers=None):
return af.with_columns(
pl.when(pl.col("scenario_id") == "BASE").then(pl.col("premium") * 1.0)
.when(pl.col("scenario_id") == "STRESS_LOW").then(pl.col("premium") * 1.2)
.when(pl.col("scenario_id") == "STRESS_MED").then(pl.col("premium") * 1.5)
.otherwise(pl.col("premium") * 2.0)
.alias("claim"),
)
result = for_each_scenario(
policies(),
scenarios=["BASE", "STRESS_LOW", "STRESS_MED", "STRESS_HIGH"],
model_fn=stressed_model,
aggregations=(
Max("claim").alias("worst_case_claim"),
Min("claim").alias("best_case_claim"),
ArgMax("claim").alias("worst_scenario"),
),
)
print(result.aggregations["worst_case_claim"]) # 4400.0
print(result.aggregations["best_case_claim"]) # 2200.0
print(result.aggregations["worst_scenario"]) # 'STRESS_HIGH'
ArgMin is the mirror — the scenario_id of the lowest-claim scenario. On a tie, the lexicographically smallest scenario_id wins, so the result is reproducible.
Capital at the tail¶
The Solvency II SCR is a 99.5%-tail conditional expectation. The CTE aggregator computes it directly — it averages every value above the 99.5th percentile across scenarios.
from gaspatchio_core.scenarios import CTE, Quantile, Median, QuantileRank
scenarios = [f"STOCH_{i:03d}" for i in range(200)]
def stochastic_model(af, *, tables=None, drivers=None):
sid = af["scenario_id"]
factor = sid.str.slice(-3).cast(pl.Float64) / 200.0 * 3.0 + 1.0
return af.with_columns((af["premium"] * factor).alias("claim"))
result = for_each_scenario(
policies(),
scenarios=scenarios,
model_fn=stochastic_model,
aggregations=(
CTE("claim", level=0.005, direction="upper").alias("scr"),
Quantile("claim", levels=(0.50, 0.95, 0.995)).alias("quantiles"),
Median("claim").alias("median_claim"),
QuantileRank("claim", at=5000.0).alias("rank_at_5k"),
),
)
print(result.aggregations["scr"]) # 8750.02
print(result.aggregations["quantiles"]) # {0.5: 5467.08, 0.95: 8437.99, 0.995: 8733.47}
print(result.aggregations["median_claim"]) # 5467.08
print(result.aggregations["rank_at_5k"]) # 0.4264432...
The sign convention is worth pinning. For a positive-is-loss column (the actuarial convention), 99.5% SCR is CTE(level=0.005, direction="upper") — average of values above the 1 - level quantile. If your column is positive-is-profit, use direction="lower".
Quantile(column, levels=(...)) returns a dict keyed by level so you can read several quantiles in one pass. Median is shorthand for Quantile(column, levels=(0.5,)) returning a bare float. QuantileRank(column, at=value) is the inverse — what fraction of scenarios sit below that value.
These four are sketch-backed (DDSketch). The output is mergeable across batches and bit-stable across processes given the same data. See Sketch precision and memory at the end of the page for the tradeoff knobs.
Spread of outcomes¶
Run-to-run dispersion — mean, variance, standard deviation. These use Welford+Chan parallel merge, so the result is order-independent and bit-stable across batch sizes.
from gaspatchio_core.scenarios import Mean, Std, Variance
result = for_each_scenario(
policies(),
scenarios=scenarios,
model_fn=stochastic_model,
aggregations=(
Mean("claim").alias("mean_claim"),
Std("claim").alias("std_claim"),
Variance("claim").alias("var_claim"),
),
)
print(result.aggregations["mean_claim"]) # 5483.5
print(result.aggregations["std_claim"]) # 1910.01
print(result.aggregations["var_claim"]) # 3648150.0
Modifiers¶
Every aggregator supports three modifiers. They're chainable in this order: .alias(...) first, then .over(...) or .of(...).
.alias(name) — required, names the output¶
Sum("claim").alias("total_claims")
The alias is the key on result.aggregations. You must call .alias() on every aggregator passed to for_each_scenario or ScenarioRun — there is no implicit default. Aliases must be unique within a run.
.over(by) — partitioned outputs¶
A partitioned aggregator returns a pl.DataFrame instead of a scalar, with one row per partition value.
def lob_model(af, *, tables=None, drivers=None):
return af.with_columns(
pl.when(pl.col("policy_id") % 2 == 0).then(pl.lit("home"))
.otherwise(pl.lit("motor"))
.alias("lob"),
af["premium"].alias("claim"),
)
result = for_each_scenario(
policies(),
scenarios=["BASE", "STRESS"],
model_fn=lob_model,
aggregations=(
Sum("claim").alias("total_claims"),
Sum("claim").alias("by_lob").over("lob"),
),
)
print(result.aggregations["total_claims"])
# 4400.0
print(result.aggregations["by_lob"].sort("lob"))
# shape: (2, 2)
# ┌───────┬────────┐
# │ lob ┆ by_lob │
# │ --- ┆ --- │
# │ str ┆ f64 │
# ╞═══════╪════════╡
# │ home ┆ 2400.0 │
# │ motor ┆ 2000.0 │
# └───────┴────────┘
The partition column must exist on the per-scenario projection your model_fn returns. Multi-key partitioning uses a tuple: .over(("lob", "peril")) produces a DataFrame keyed by both columns.
ArgMax("claim").over("lob") returns the worst scenario per LOB, which is a different question from ArgMax("claim").alias("...") returning the worst scenario overall.
.of(pl.Expr) — replace the within-scenario reduction¶
By default Sum("claim") sums the claim column within each scenario, then sums those scenario-level totals across scenarios. .of(pl.Expr) overrides the per-scenario reduction with a raw polars expression:
# Per scenario: sum(claim × weight); then sum across scenarios.
weighted = Sum.of((pl.col("claim") * pl.col("weight")).sum()).alias("weighted_total")
# Per scenario: take the max claim; then take the mean across scenarios.
avg_worst = Mean.of(pl.col("claim").max()).alias("avg_worst_per_scenario")
.of() is bound to the polars backend — its expression is a pl.Expr. It does not survive YAML round-trip; reconstruct it in code when you reload a plan. For most analytic work Sum("claim") plus a model_fn that derives the column you want is the cleaner pattern.
Within-scenario reductions¶
A second axis lives on every aggregator: how to reduce each scenario down to one value before aggregating across scenarios. Default is "sum"; the other named choices are "mean", "max", "min", "count", "first", "last".
# Each scenario reduces to its max claim; then summed across scenarios.
Sum("claim", within="max").alias("sum_of_per_scenario_maxes")
Sum("claim") and Sum("claim", within="sum") are equivalent — that's the default.
Sketch precision and memory¶
Quantile, Median, CTE, and QuantileRank are backed by DDSketch — a relative-error data structure that's exactly mergeable across batches and across processes.
The precision/memory knob is relative_accuracy. The defaults:
| Setting | Per-sketch memory | Tail precision at 99.5% |
|---|---|---|
relative_accuracy=1e-4 (default) |
~1.2 MB | ~10 bp |
relative_accuracy=1e-3 |
~125 KB | ~100 bp |
relative_accuracy=1e-5 |
~12 MB | ~1 bp |
For SCR work, 10 bp at the 99.5% tail is well inside actuarial tolerance — you'd lose more than that to model assumption uncertainty. If you're aggregating many partitioned sketches (.over(("region", "peril")) with hundreds of partitions), tighten to relative_accuracy=1e-3 to keep total memory bounded.
Two partition aggregators that share the same column don't share storage — each has its own sketch.
Next¶
The aggregators on this page are the building blocks. The next page composes them into a Scenario Run — a typed plan that captures shocks, base tables, and aggregations together, with a source_sha you can hand to model risk.
If you need an aggregator that's not built in (Skewness, Sharpe ratio, expected shortfall with custom weighting), see Writing a custom aggregator.