Table Sensitivities¶
This page is the low-level Python reference for the shock primitives applied to a single Table. For multi-scenario stress runs — where the same shock list applies to many scenarios with a single audit chain — declare the shocks inside a ScenarioRun instead; it stacks tables across scenarios for you.
Reach for Table.with_shock() when you're applying a single, ad-hoc shock to a single table — typically for interactive exploration or for one-off comparisons where you don't need the full scenario-run plumbing.
Single-Table Shock¶
import polars as pl
from gaspatchio_core import ActuarialFrame
from gaspatchio_core.assumptions import Table
from gaspatchio_core.scenarios.shocks import MultiplicativeShock
# Load your existing base table
mortality = Table(
name="mortality",
source="assumptions/mortality.parquet",
dimensions={"age": "age", "duration": "duration"},
value="qx"
)
# Create a shocked version (original unchanged)
stressed_mortality = mortality.with_shock(MultiplicativeShock(factor=1.2))
# Use in model
af = ActuarialFrame(pl.read_parquet("model_points.parquet"))
af.mort_rate = stressed_mortality.lookup(age=af.age, duration=af.duration)
with_shock() returns a new Table — the original is never modified. For multi-scenario runs prefer ScenarioRun(shocks={…}, base_tables={…}, …) — see Scenarios Overview.
Shock Classes¶
Gaspatchio provides three shock types:
MultiplicativeShock¶
Scales values by a factor. Use for percentage changes.
from gaspatchio_core.scenarios.shocks import MultiplicativeShock
# 20% increase (multiply by 1.2)
shock = MultiplicativeShock(factor=1.2, table="mortality")
# 10% decrease (multiply by 0.9)
shock = MultiplicativeShock(factor=0.9, table="lapse")
# Target specific column within table
shock = MultiplicativeShock(factor=1.2, table="mortality", column="qx")
| Parameter | Type | Description |
|---|---|---|
factor |
float | Multiplicative factor (1.2 = +20%, 0.8 = -20%) |
table |
str | Target table name |
column |
str | Optional: specific column to shock |
AdditiveShock¶
Adds a constant delta. Use for basis point shifts.
from gaspatchio_core.scenarios.shocks import AdditiveShock
# Add 50 basis points
shock = AdditiveShock(delta=0.005, table="discount_rates")
# Subtract 100 basis points
shock = AdditiveShock(delta=-0.01, table="discount_rates")
| Parameter | Type | Description |
|---|---|---|
delta |
float | Additive constant (+0.01 = +100bps) |
table |
str | Target table name |
column |
str | Optional: specific column to shock |
OverrideShock¶
Replaces all values with a constant. Use for extreme scenarios.
from gaspatchio_core.scenarios.shocks import OverrideShock
# Zero out all lapses
shock = OverrideShock(value=0.0, table="lapse")
# Flat 5% discount rate
shock = OverrideShock(value=0.05, table="discount_rates")
| Parameter | Type | Description |
|---|---|---|
value |
Any | Constant replacement value |
table |
str | Target table name |
column |
str | Optional: specific column to shock |
Applying Shocks to Tables¶
Single Shock¶
# `mortality` is the base Table loaded above; `shock` is any Shock instance.
base_table = mortality
shock = MultiplicativeShock(factor=1.2)
# Apply one shock
stressed = base_table.with_shock(shock)
# With custom name
stressed = base_table.with_shock(shock, name="mortality_stressed")
Chaining Multiple Shocks¶
Shocks apply sequentially:
from gaspatchio_core.scenarios.shocks import MultiplicativeShock, AdditiveShock
# First multiply by 2, then add 0.001
double_shocked = mortality.with_shock(
MultiplicativeShock(factor=2.0)
).with_shock(
AdditiveShock(delta=0.001)
)
# Original value 0.001 becomes: 0.001 * 2 + 0.001 = 0.003
Batch Creation with from_shocks()¶
Create multiple shocked tables at once:
from gaspatchio_core.assumptions import Table
from gaspatchio_core.scenarios.shocks import MultiplicativeShock
# `base_mortality` is your governed base table (the `mortality` Table above).
base_mortality = mortality
shock_specs = {
"BASE": [], # Empty = no shocks
"UP_20": [MultiplicativeShock(factor=1.2)],
"DOWN_20": [MultiplicativeShock(factor=0.8)],
}
# Returns dict of Tables
tables = Table.from_shocks(base_mortality, shock_specs, value_column="qx")
# tables["BASE"], tables["UP_20"], tables["DOWN_20"]
Config Parsing Functions¶
parse_scenario_config()¶
Converts JSON config to shock objects:
from gaspatchio_core.scenarios import parse_scenario_config
config = [
{"id": "BASE"},
{"id": "MORT_UP_20", "shocks": [{"table": "mortality", "multiply": 1.2}]},
{"id": "RATES_DOWN", "shocks": [{"table": "discount_rates", "add": -0.005}]},
]
scenarios = parse_scenario_config(config)
# Returns: dict[str, list[Shock]]
# {
# "BASE": [],
# "MORT_UP_20": [MultiplicativeShock(factor=1.2, table="mortality")],
# "RATES_DOWN": [AdditiveShock(delta=-0.005, table="discount_rates")],
# }
Plan-level audit trail¶
For an audit-ready description of a scenario set, build a ScenarioRun and call .describe():
from gaspatchio_core.scenarios import ScenarioRun, Sum
# The shocks reference the `mortality` and `discount_rates` tables, so the plan
# needs the matching base tables to hash and describe them.
base_tables = {
"mortality": Table(
name="mortality",
source="assumptions/mortality.parquet",
dimensions={"age": "age", "duration": "duration"},
value="qx",
),
"discount_rates": Table(
name="discount_rates",
source="assumptions/discount_rates.parquet",
dimensions={"year": "year"},
value="rate",
),
}
plan = ScenarioRun(
shocks=scenarios,
base_tables=base_tables,
aggregations=(Sum("bel").alias("bel"),),
)
print(plan.describe())
The same plan emits a content hash (plan.source_sha()) and, when run with audit=True, a JSON sidecar suitable for handoff to model risk.
Sensitivity Analysis Helper¶
For parameter sweeps, build a list of ScenarioRun configurations directly — typically a base plan plus shocks generated in a loop. Example:
from gaspatchio_core.scenarios import MultiplicativeShock, AdditiveShock
# Mortality sensitivity sweep
mortality_sweep = {
"BASE": [],
**{
f"mortality_{v}": [MultiplicativeShock(factor=v, table="mortality")]
for v in (0.8, 0.9, 1.0, 1.1, 1.2)
},
}
# Interest rate parallel shifts
rate_sweep = {
"BASE": [],
**{
f"rates_{v:+}": [AdditiveShock(delta=v, table="discount_rates")]
for v in (-0.01, -0.005, 0.0, 0.005, 0.01)
},
}
Then build one ScenarioRun per sweep, or pass the dict directly to for_each_scenario if you don't need governance.
Complete Workflow Example¶
Step 1: Load Base Tables¶
import polars as pl
from gaspatchio_core.assumptions import Table
base_tables = {
"mortality": Table(
name="mortality",
source=pl.read_parquet("assumptions/mortality.parquet"),
dimensions={"age": "age", "duration": "duration"},
value="qx"
),
"lapse": Table(
name="lapse",
source=pl.read_parquet("assumptions/lapse.parquet"),
dimensions={"duration": "duration"},
value="rate"
),
"discount_rates": Table(
name="discount_rates",
source=pl.read_parquet("assumptions/discount_rates.parquet"),
dimensions={"year": "year"},
value="rate"
),
}
Step 2: Parse Scenario Config¶
from gaspatchio_core.scenarios import parse_scenario_config
config = [
{"id": "BASE"},
{"id": "ADVERSE", "shocks": [
{"table": "mortality", "multiply": 1.2},
{"table": "lapse", "multiply": 0.8},
{"table": "discount_rates", "add": -0.01}
]},
]
scenarios = parse_scenario_config(config)
Step 3: Run Model for Each Scenario¶
from gaspatchio_core import ActuarialFrame
def run_scenario(scenario_id, shocks, base_tables, model_points):
"""Run model with shocked tables for one scenario."""
# Create shocked tables for this scenario
tables = base_tables.copy()
for shock in shocks:
if shock.table in tables:
tables[shock.table] = tables[shock.table].with_shock(shock)
# Run model with shocked tables
af = ActuarialFrame(model_points)
af.scenario_id = scenario_id
# Lookups use shocked tables
af.mort_rate = tables["mortality"].lookup(age=af.age, duration=af.duration)
af.lapse_rate = tables["lapse"].lookup(duration=af.duration)
af.disc_rate = tables["discount_rates"].lookup(year=af.year)
# ... rest of model calculations
return af.collect()
# Run all scenarios
model_points = pl.read_parquet("model_points.parquet")
results = []
for scenario_id, shocks in scenarios.items():
result = run_scenario(scenario_id, shocks, base_tables, model_points)
results.append(result)
# Combine results
all_results = pl.concat(results)
Batch Processing¶
Memory-efficient processing of many scenarios is handled by the for_each_scenario loop, which batches scenarios automatically and merges aggregator state across batches.
from gaspatchio_core import ActuarialFrame
from gaspatchio_core.scenarios import Sum, for_each_scenario
def run_model(af, *, tables=None, drivers=None):
"""Per-scenario projection — here, net cash flow is simply the premium."""
return af.with_columns(af["premium"].alias("pv_net_cf"))
scenario_ids = list(range(1, 1_001)) # one thousand scenarios
result = for_each_scenario(
ActuarialFrame(model_points),
scenarios=scenario_ids,
model_fn=run_model,
aggregations=(Sum("pv_net_cf").alias("total_pv"),),
batch_size="auto", # the loop sizes batches to fit memory
)
print(result.aggregations["total_pv"])
The aggregator output is bit-equivalent across batch sizes — pick batch_size to fit your memory budget without changing the answer. See Performance at Scale for the memory tradeoff details.
Best Practices¶
1. Include BASE¶
A BASE scenario gives the unshocked baseline that other scenarios compare against:
# docs-skip
config = [
{"id": "BASE"}, # Listed first by convention
{"id": "STRESS_1", "shocks": [...]},
]
2. Use Consistent Table Names¶
Standardize table names across your models:
| Table Name | Description |
|---|---|
mortality |
Death rates |
lapse |
Withdrawal rates |
discount_rates |
Interest/discount rates |
expense |
Per-policy expenses |
inflation |
Expense inflation |
3. Validate Before Running¶
Parse the config and inspect the plan before execution:
from gaspatchio_core.scenarios import ScenarioRun, Sum
scenarios = parse_scenario_config(config)
plan = ScenarioRun(
shocks=scenarios,
base_tables=base_tables,
aggregations=(Sum("bel").alias("bel"),),
)
print(plan.describe()) # Review the shocks and aggregations
print(plan.source_sha()) # Content hash for the audit chain
4. Store Configs for Reproducibility¶
Save the plan as YAML alongside results:
from pathlib import Path
def model_fn(af, *, tables=None, drivers=None):
"""Expected claims = premium × shocked mortality for this scenario."""
qx = tables["mortality"].lookup(
scenario_id=af["scenario_id"],
age=af["age"],
duration=af["duration"],
)
return af.with_columns((af["premium"] * qx).alias("bel"))
plan.to_yaml(Path("scenario_plan.yaml"))
# When you run with audit=True, a JSON sidecar lands beside the outputs
# carrying source_sha, library versions, and every aggregator output.
result = plan.run(af, model_fn, batch_size=1, audit=True)
print(result.audit_path)
See Also¶
- What-If Analysis - Declarative config format and examples
- Scenarios Overview - High-level scenario concepts
- Performance - Optimizing large scenario runs