Skip to content

Aggregating at Scale

A reserving or capital run rarely needs every policy's full cashflow array kept in memory. What the valuation report wants is the portfolio shape: total net cashflow each month, the mean reserve, the tail of the loss distribution. Yet the obvious way to get there — project the whole book, concatenate every output, then aggregate — holds millions of per-period arrays in memory at once and falls over with an out-of-memory error long before the numbers land.

run_aggregated inverts that. It runs your projection over the portfolio in memory-safe batches and folds each batch down to per-period aggregates as it goes, so the peak memory you pay is one batch of output — not the whole book. You hand it the same model_fn you'd run on a single frame, the model points, and the aggregates you want; you get portfolio and per-period summaries over millions of policies, plus the telemetry to prove what the run cost.

You reach for run_aggregated when the deliverable is a portfolio summary — total cashflow by month, mean reserve, a tail quantile — rather than a per-policy output file, and the book is large enough that holding every projection at once is the thing standing between you and an answer.


A portfolio fold

A small term-life book, projected three months forward. Net cashflow each month is premium less the running claim cost; the aggregates are the portfolio total and mean per month, a per-month upper tail of net cashflow, and the total present value as a single scalar.

import polars as pl
from gaspatchio_core import ActuarialFrame
from gaspatchio_core.scenarios import PeriodSum, PeriodMean, PeriodQuantile, Sum, run_aggregated


def project(af: ActuarialFrame) -> ActuarialFrame:
    """Term-life net cashflow: premium less the running claim cost, three months out."""
    months = 3
    af.net_cf = pl.concat_list(
        [pl.col("premium") - pl.col("claim_cost") * (t + 1) for t in range(months)]
    )
    af.pv = pl.col("premium")
    return af


model_points = pl.DataFrame({
    "policy_id": [1, 2, 3, 4],
    "premium": [1200.0, 2400.0, 600.0, 1800.0],
    "claim_cost": [50.0, 120.0, 20.0, 90.0],
})

result = run_aggregated(
    project,
    model_points,
    [
        PeriodSum("net_cf").alias("total_net_cf"),
        PeriodMean("net_cf").alias("mean_net_cf"),
        PeriodQuantile("net_cf", levels=(0.95,)).alias("p95_net_cf"),
        Sum("pv").alias("total_pv"),
    ],
    batch_size=2,
)

print(result.total_net_cf)   # [5720. 5440. 5160.]  — portfolio net cashflow per month
print(result.mean_net_cf)    # [1430. 1360. 1290.]  — mean per policy per month
print(result.total_pv)       # 6000.0               — portfolio total, one scalar

The portfolio is sliced into batches of two policies. Each batch runs project, its output is folded into the running aggregates, and the batch is released before the next one starts — so the four-policy total comes back without ever holding all four projections at once. At a real book size the policy count changes; the memory profile does not.

Inside model_fn, build the projection columns with native pl.col(...) and assign them onto the frame as attributes. The aggregator names — PeriodSum("net_cf") — refer to the columns your model produces.


Per-period arrays vs. portfolio scalars

The aggregators come in two shapes, and the shape decides what the result attribute holds.

The Period* family folds across policies within each period, so each one returns an array indexed by projection month:

print(result.total_net_cf)   # ndarray, one entry per month: [5720. 5440. 5160.]
print(result.mean_net_cf)    # ndarray: [1430. 1360. 1290.]

PeriodQuantile returns a dict keyed by the levels you asked for, each value a per-period array — so one call gives you the same tail at every month:

print(result.p95_net_cf)     # {0.95: array([...per month...])}
print(result.p95_net_cf[0.95])

The scalar family folds across both policies and periods to a single number — a portfolio total, the natural shape for a present value:

print(result.total_pv)       # 6000.0

Each aggregator surfaces on the result under the name you gave it with .alias(...). The alias is mandatory — it is the attribute you read the answer back from.


Tail risk, folded in one pass

The valuation tail — the conditional expectation beyond a quantile — is the same fold pattern. PeriodCTE carries a mergeable sketch through the batches, so the tail estimate is built incrementally without re-reading the book:

from gaspatchio_core.scenarios import PeriodCTE

tail = run_aggregated(
    project,
    model_points,
    [
        PeriodSum("net_cf").alias("total_net_cf"),
        PeriodCTE("net_cf", level=0.95, direction="lower").alias("cte95_net_cf"),
    ],
    batch_size=2,
)

print(tail.total_net_cf)     # [5720. 5440. 5160.]
print(tail.cte95_net_cf)     # per-month lower-tail conditional expectation

direction="lower" reads the adverse tail of net cashflow — the months where the book does worst. Because the sketch merges across batches, the tail you get from a hundred batches is the tail you'd get from one, to the sketch's stated accuracy.


Read the telemetry

Every result carries what the run cost, so the memory claim is auditable rather than asserted:

print(result.n_policies)     # 4     — policies folded
print(result.n_periods)      # 3     — projection months
print(result.batch_size)     # 2     — policies per batch
print(result.wall_time_s)    # seconds of wall time
print(result.peak_rss_mb)    # peak resident memory, MB

peak_rss_mb is the number that justifies the batch fold: it is the high-water mark of the whole run, and it tracks one batch of output rather than the full book. Quote it alongside n_policies and you have shown that a portfolio of any size was summarised inside a fixed memory budget.


Letting the batch size find itself

The example pins batch_size=2 so the fold is visible. In a production run, leave it at the default:

# docs-skip
result = run_aggregated(project, model_points, aggregations)  # batch_size="auto"

With batch_size="auto", the batch is sized to a memory budget — the largest batch whose predicted peak fits within the available allowance, read from the container's cgroup limit where one is set. You declare the aggregates you want; the run picks the largest batch that stays inside the memory you actually have, and result.batch_size reports the size it settled on.


Partitioning by segment

By default the fold collapses the whole book into one figure per period. To get a figure per segment instead — by region, by product, by any model-point column — partition the aggregator with .over(...):

by_region = run_aggregated(
    project,
    model_points.with_columns(pl.Series("region", ["A", "A", "B", "B"])),
    [PeriodSum("net_cf").alias("net_cf_by_region").over("region")],
    batch_size=2,
)

print(by_region.net_cf_by_region)   # one row per (region, period)

The result carries one row per partition key per period, still computed inside the same memory budget — the partition is folded as the run goes, not by holding the segments apart. .over(...) takes one or more keys.

One aggregator does not partition: PeriodQuantile.over(...) is not supported — a per-partition streaming quantile is a separate sketch. For a result per scenario — each with its own shocked assumptions — reach for for_each_scenario.


When to reach for run_aggregated

You want... Use
Per-policy output for every model point, too large for memory run_to_parquet
Portfolio / per-period summaries over a large book run_aggregated with Period* + scalar aggregators
Those summaries split by a model-point segment run_aggregated with .over(...)
A result per scenario, each with its own shocked assumptions for_each_scenario
Proof of the memory the run cost result.peak_rss_mb alongside result.n_policies