Skip to content

Performance at Scale

Gaspatchio is designed to handle scenarios efficiently from small (3 scenarios) to large (10,000+ stochastic runs).

Streaming Engine (Built-in)

Gaspatchio automatically sets Polars' streaming engine as the default via pl.Config.set_engine_affinity("streaming") on import. This provides:

  • Bounded memory: Process data in batches, not all at once
  • Often faster: Better cache utilization and pushdown optimization
  • No code changes: Works transparently with collect() and sink_parquet()

For large scenario sets:

# Use lazy mode for streaming execution
af = ActuarialFrame(pl.scan_parquet("model_points.parquet"))  # Lazy
af = with_scenarios(af, scenario_ids)  # Still lazy

result = run_projection(af, ...)  # Build lazy query plan

# Option A: Collect with streaming (bounded memory)
df = result.collect()  # Uses streaming engine automatically

# Option B: Write directly to disk without full materialization
result.sink_parquet("results/scenario_output.parquet")

# Option C: Partition by scenario for efficient queries
result.sink_parquet(
    "results/by_scenario/",
    partition_by=["scenario_id"]
)

Two-Stage Aggregation Pattern

For risk metric calculation, aggregate to scenario-level first:

# Stage 1: Streaming aggregation to scenario totals (small table)
scenario_totals = (
    result
    .group_by("scenario_id")
    .agg([
        pl.col("pv_net_cf").sum().alias("total_pv"),
        pl.col("pv_claims").sum().alias("total_claims"),
    ])
    .collect()  # Streaming engine handles this efficiently
)

# Stage 2: Risk metrics on the small aggregated table
scenario_totals.write_parquet("results/scenario_totals.parquet")

reserves = scenario_totals.sort("total_pv", descending=True)["total_pv"]
cte_98 = reserves.head(int(0.02 * len(reserves))).mean()
var_99 = reserves[int(0.99 * len(reserves))]

Why this works:

Data Size
Full results O(policies x scenarios x periods)
Scenario totals O(scenarios)

By aggregating early, you reduce 100M+ rows to 10K rows before final analysis.

Scenario ID Encoding Matters

For large scenario counts, ID encoding significantly impacts performance:

# Option A: Integer IDs (best performance)
scenario_ids = list(range(1, 10001))  # UInt32
af = with_scenarios(af, scenario_ids)

# Option B: Categorical strings (readable + fast)
scenario_ids = ["BASE", "UP", "DOWN"]
af = with_scenarios(af, scenario_ids, categorical=True)
# Stored as dictionary-encoded integers internally

Memory impact (10K scenarios, 100M rows):

ID Type Memory Join Speed
String ("SCEN_00001") ~1.2 GB Slower
Categorical ~400 MB Fast
UInt32 ~400 MB Fastest

For stochastic runs with thousands of scenarios, use integers. For named scenarios (BASE/UP/DOWN), use categorical.

Batching Patterns

For large scenario runs, there are two batching patterns:

Pattern 1: Batch-Aggregate (Memory Optimal)

Aggregate within each batch, combine aggregates at the end. Best when you know the aggregation upfront.

from gaspatchio_core.scenarios import batch_scenarios

scenario_ids = list(range(1, 10001))
batch_results = []

for batch_ids in batch_scenarios(scenario_ids, batch_size=1000):
    af = ActuarialFrame(pl.scan_parquet("model_points.parquet"))
    af = with_scenarios(af, batch_ids)
    result = run_projection(af, ...).collect()

    # Aggregate this batch (reduces memory before next batch)
    batch_agg = result.group_by("scenario_id").agg([
        pl.col("pv_net_cf").sum().alias("total_pv")
    ])
    batch_results.append(batch_agg)

# Combine small aggregates
all_scenarios = pl.concat(batch_results)

Pattern 2: Sink-then-Stream (Flexible)

Write full results to parquet, then stream-aggregate later. Best when you want flexible post-hoc analysis.

from gaspatchio_core.scenarios import batch_scenarios
from pathlib import Path

scenario_ids = list(range(1, 10001))
output_dir = Path("results/scenario_batches")
output_dir.mkdir(exist_ok=True)

# Phase 1: Sink full results to parquet
for batch_num, batch_ids in enumerate(batch_scenarios(scenario_ids, batch_size=10)):
    af = ActuarialFrame(pl.scan_parquet("model_points.parquet"))
    af = with_scenarios(af, batch_ids)
    result = run_projection(af, ...).collect()

    result.write_parquet(output_dir / f"batch_{batch_num:04d}.parquet")
    del result  # Free memory before next batch

# Phase 2: Stream-aggregate from files (any aggregation you want!)
totals = (
    pl.scan_parquet(output_dir / "*.parquet")
    .group_by("scenario_id")
    .agg(pl.col("pv_net_cf").sum())
    .collect(engine="streaming")  # Works! No cum_prod in aggregation
)

# Run different aggregations without re-running the model
means = pl.scan_parquet(output_dir / "*.parquet").group_by("scenario_id").agg(
    pl.col("pv_net_cf").mean()
).collect(engine="streaming")

Why sink-then-stream works: The model execution has cumulative operations (cum_prod, previous_period) that prevent streaming. But the aggregation phase has no such operations, so Polars can stream through the parquet files efficiently.

Choosing a Pattern

Pattern Peak Memory Time Overhead Flexibility
Batch-aggregate Lowest (~10x reduction) ~2x slower Must choose aggregation upfront
Sink-then-stream Bounded per-batch ~5% slower Any aggregation later

Benchmarks (1k policies x 100 scenarios = 100k rows):

Approach Peak Memory Time
Unbatched 5.6 GiB ~114s
Batch-aggregate (10 x 10) 636 MiB ~223s
Sink-then-stream (10 x 10) 1.2 GiB ~118s

At scale (10k policies x 100 scenarios = 1M rows):

Approach Peak Memory Time Disk
Unbatched ~57 GiB (estimated) ~19 min -
Sink-then-stream 5.9 GiB ~20 min ~19 GB

The sink-then-stream pattern scales linearly with almost no time overhead compared to unbatched execution, while keeping memory bounded.