Performance at Scale¶
Gaspatchio is designed to handle scenarios efficiently from small (3 scenarios) to large (10,000+ stochastic runs).
Streaming Engine (Built-in)¶
Gaspatchio automatically sets Polars' streaming engine as the default via pl.Config.set_engine_affinity("streaming") on import. This provides:
- Bounded memory: Process data in batches, not all at once
- Often faster: Better cache utilization and pushdown optimization
- No code changes: Works transparently with
collect()andsink_parquet()
For large scenario sets:
# Use lazy mode for streaming execution
af = ActuarialFrame(pl.scan_parquet("model_points.parquet")) # Lazy
af = with_scenarios(af, scenario_ids) # Still lazy
result = run_projection(af, ...) # Build lazy query plan
# Option A: Collect with streaming (bounded memory)
df = result.collect() # Uses streaming engine automatically
# Option B: Write directly to disk without full materialization
result.sink_parquet("results/scenario_output.parquet")
# Option C: Partition by scenario for efficient queries
result.sink_parquet(
"results/by_scenario/",
partition_by=["scenario_id"]
)
Two-Stage Aggregation Pattern¶
For risk metric calculation, aggregate to scenario-level first:
# Stage 1: Streaming aggregation to scenario totals (small table)
scenario_totals = (
result
.group_by("scenario_id")
.agg([
pl.col("pv_net_cf").sum().alias("total_pv"),
pl.col("pv_claims").sum().alias("total_claims"),
])
.collect() # Streaming engine handles this efficiently
)
# Stage 2: Risk metrics on the small aggregated table
scenario_totals.write_parquet("results/scenario_totals.parquet")
reserves = scenario_totals.sort("total_pv", descending=True)["total_pv"]
cte_98 = reserves.head(int(0.02 * len(reserves))).mean()
var_99 = reserves[int(0.99 * len(reserves))]
Why this works:
| Data | Size |
|---|---|
| Full results | O(policies x scenarios x periods) |
| Scenario totals | O(scenarios) |
By aggregating early, you reduce 100M+ rows to 10K rows before final analysis.
Scenario ID Encoding Matters¶
For large scenario counts, ID encoding significantly impacts performance:
# Option A: Integer IDs (best performance)
scenario_ids = list(range(1, 10001)) # UInt32
af = with_scenarios(af, scenario_ids)
# Option B: Categorical strings (readable + fast)
scenario_ids = ["BASE", "UP", "DOWN"]
af = with_scenarios(af, scenario_ids, categorical=True)
# Stored as dictionary-encoded integers internally
Memory impact (10K scenarios, 100M rows):
| ID Type | Memory | Join Speed |
|---|---|---|
String ("SCEN_00001") |
~1.2 GB | Slower |
| Categorical | ~400 MB | Fast |
| UInt32 | ~400 MB | Fastest |
For stochastic runs with thousands of scenarios, use integers. For named scenarios (BASE/UP/DOWN), use categorical.
Batching Patterns¶
For large scenario runs, there are two batching patterns:
Pattern 1: Batch-Aggregate (Memory Optimal)¶
Aggregate within each batch, combine aggregates at the end. Best when you know the aggregation upfront.
from gaspatchio_core.scenarios import batch_scenarios
scenario_ids = list(range(1, 10001))
batch_results = []
for batch_ids in batch_scenarios(scenario_ids, batch_size=1000):
af = ActuarialFrame(pl.scan_parquet("model_points.parquet"))
af = with_scenarios(af, batch_ids)
result = run_projection(af, ...).collect()
# Aggregate this batch (reduces memory before next batch)
batch_agg = result.group_by("scenario_id").agg([
pl.col("pv_net_cf").sum().alias("total_pv")
])
batch_results.append(batch_agg)
# Combine small aggregates
all_scenarios = pl.concat(batch_results)
Pattern 2: Sink-then-Stream (Flexible)¶
Write full results to parquet, then stream-aggregate later. Best when you want flexible post-hoc analysis.
from gaspatchio_core.scenarios import batch_scenarios
from pathlib import Path
scenario_ids = list(range(1, 10001))
output_dir = Path("results/scenario_batches")
output_dir.mkdir(exist_ok=True)
# Phase 1: Sink full results to parquet
for batch_num, batch_ids in enumerate(batch_scenarios(scenario_ids, batch_size=10)):
af = ActuarialFrame(pl.scan_parquet("model_points.parquet"))
af = with_scenarios(af, batch_ids)
result = run_projection(af, ...).collect()
result.write_parquet(output_dir / f"batch_{batch_num:04d}.parquet")
del result # Free memory before next batch
# Phase 2: Stream-aggregate from files (any aggregation you want!)
totals = (
pl.scan_parquet(output_dir / "*.parquet")
.group_by("scenario_id")
.agg(pl.col("pv_net_cf").sum())
.collect(engine="streaming") # Works! No cum_prod in aggregation
)
# Run different aggregations without re-running the model
means = pl.scan_parquet(output_dir / "*.parquet").group_by("scenario_id").agg(
pl.col("pv_net_cf").mean()
).collect(engine="streaming")
Why sink-then-stream works: The model execution has cumulative operations (cum_prod, previous_period) that prevent streaming. But the aggregation phase has no such operations, so Polars can stream through the parquet files efficiently.
Choosing a Pattern¶
| Pattern | Peak Memory | Time Overhead | Flexibility |
|---|---|---|---|
| Batch-aggregate | Lowest (~10x reduction) | ~2x slower | Must choose aggregation upfront |
| Sink-then-stream | Bounded per-batch | ~5% slower | Any aggregation later |
Benchmarks (1k policies x 100 scenarios = 100k rows):
| Approach | Peak Memory | Time |
|---|---|---|
| Unbatched | 5.6 GiB | ~114s |
| Batch-aggregate (10 x 10) | 636 MiB | ~223s |
| Sink-then-stream (10 x 10) | 1.2 GiB | ~118s |
At scale (10k policies x 100 scenarios = 1M rows):
| Approach | Peak Memory | Time | Disk |
|---|---|---|---|
| Unbatched | ~57 GiB (estimated) | ~19 min | - |
| Sink-then-stream | 5.9 GiB | ~20 min | ~19 GB |
The sink-then-stream pattern scales linearly with almost no time overhead compared to unbatched execution, while keeping memory bounded.