Skip to content

Table Sensitivities

This page is the low-level Python reference for the shock primitives applied to a single Table. For multi-scenario stress runs — where the same shock list applies to many scenarios with a single audit chain — declare the shocks inside a ScenarioRun instead; it stacks tables across scenarios for you.

Reach for Table.with_shock() when you're applying a single, ad-hoc shock to a single table — typically for interactive exploration or for one-off comparisons where you don't need the full scenario-run plumbing.

Single-Table Shock

import polars as pl
from gaspatchio_core import ActuarialFrame
from gaspatchio_core.assumptions import Table
from gaspatchio_core.scenarios.shocks import MultiplicativeShock

# Load your existing base table
mortality = Table(
    name="mortality",
    source="assumptions/mortality.parquet",
    dimensions={"age": "age", "duration": "duration"},
    value="qx"
)

# Create a shocked version (original unchanged)
stressed_mortality = mortality.with_shock(MultiplicativeShock(factor=1.2))

# Use in model
af = ActuarialFrame(pl.read_parquet("model_points.parquet"))
af.mort_rate = stressed_mortality.lookup(age=af.age, duration=af.duration)

with_shock() returns a new Table — the original is never modified. For multi-scenario runs prefer ScenarioRun(shocks={…}, base_tables={…}, …) — see Scenarios Overview.


Shock Classes

Gaspatchio provides three shock types:

MultiplicativeShock

Scales values by a factor. Use for percentage changes.

from gaspatchio_core.scenarios.shocks import MultiplicativeShock

# 20% increase (multiply by 1.2)
shock = MultiplicativeShock(factor=1.2, table="mortality")

# 10% decrease (multiply by 0.9)
shock = MultiplicativeShock(factor=0.9, table="lapse")

# Target specific column within table
shock = MultiplicativeShock(factor=1.2, table="mortality", column="qx")
Parameter Type Description
factor float Multiplicative factor (1.2 = +20%, 0.8 = -20%)
table str Target table name
column str Optional: specific column to shock

AdditiveShock

Adds a constant delta. Use for basis point shifts.

from gaspatchio_core.scenarios.shocks import AdditiveShock

# Add 50 basis points
shock = AdditiveShock(delta=0.005, table="discount_rates")

# Subtract 100 basis points
shock = AdditiveShock(delta=-0.01, table="discount_rates")
Parameter Type Description
delta float Additive constant (+0.01 = +100bps)
table str Target table name
column str Optional: specific column to shock

OverrideShock

Replaces all values with a constant. Use for extreme scenarios.

from gaspatchio_core.scenarios.shocks import OverrideShock

# Zero out all lapses
shock = OverrideShock(value=0.0, table="lapse")

# Flat 5% discount rate
shock = OverrideShock(value=0.05, table="discount_rates")
Parameter Type Description
value Any Constant replacement value
table str Target table name
column str Optional: specific column to shock

Applying Shocks to Tables

Single Shock

# `mortality` is the base Table loaded above; `shock` is any Shock instance.
base_table = mortality
shock = MultiplicativeShock(factor=1.2)

# Apply one shock
stressed = base_table.with_shock(shock)

# With custom name
stressed = base_table.with_shock(shock, name="mortality_stressed")

Chaining Multiple Shocks

Shocks apply sequentially:

from gaspatchio_core.scenarios.shocks import MultiplicativeShock, AdditiveShock

# First multiply by 2, then add 0.001
double_shocked = mortality.with_shock(
    MultiplicativeShock(factor=2.0)
).with_shock(
    AdditiveShock(delta=0.001)
)
# Original value 0.001 becomes: 0.001 * 2 + 0.001 = 0.003

Batch Creation with from_shocks()

Create multiple shocked tables at once:

from gaspatchio_core.assumptions import Table
from gaspatchio_core.scenarios.shocks import MultiplicativeShock

# `base_mortality` is your governed base table (the `mortality` Table above).
base_mortality = mortality

shock_specs = {
    "BASE": [],  # Empty = no shocks
    "UP_20": [MultiplicativeShock(factor=1.2)],
    "DOWN_20": [MultiplicativeShock(factor=0.8)],
}

# Returns dict of Tables
tables = Table.from_shocks(base_mortality, shock_specs, value_column="qx")
# tables["BASE"], tables["UP_20"], tables["DOWN_20"]

Config Parsing Functions

parse_scenario_config()

Converts JSON config to shock objects:

from gaspatchio_core.scenarios import parse_scenario_config

config = [
    {"id": "BASE"},
    {"id": "MORT_UP_20", "shocks": [{"table": "mortality", "multiply": 1.2}]},
    {"id": "RATES_DOWN", "shocks": [{"table": "discount_rates", "add": -0.005}]},
]

scenarios = parse_scenario_config(config)
# Returns: dict[str, list[Shock]]
# {
#     "BASE": [],
#     "MORT_UP_20": [MultiplicativeShock(factor=1.2, table="mortality")],
#     "RATES_DOWN": [AdditiveShock(delta=-0.005, table="discount_rates")],
# }

Plan-level audit trail

For an audit-ready description of a scenario set, build a ScenarioRun and call .describe():

from gaspatchio_core.scenarios import ScenarioRun, Sum

# The shocks reference the `mortality` and `discount_rates` tables, so the plan
# needs the matching base tables to hash and describe them.
base_tables = {
    "mortality": Table(
        name="mortality",
        source="assumptions/mortality.parquet",
        dimensions={"age": "age", "duration": "duration"},
        value="qx",
    ),
    "discount_rates": Table(
        name="discount_rates",
        source="assumptions/discount_rates.parquet",
        dimensions={"year": "year"},
        value="rate",
    ),
}

plan = ScenarioRun(
    shocks=scenarios,
    base_tables=base_tables,
    aggregations=(Sum("bel").alias("bel"),),
)
print(plan.describe())

The same plan emits a content hash (plan.source_sha()) and, when run with audit=True, a JSON sidecar suitable for handoff to model risk.


Sensitivity Analysis Helper

For parameter sweeps, build a list of ScenarioRun configurations directly — typically a base plan plus shocks generated in a loop. Example:

from gaspatchio_core.scenarios import MultiplicativeShock, AdditiveShock

# Mortality sensitivity sweep
mortality_sweep = {
    "BASE": [],
    **{
        f"mortality_{v}": [MultiplicativeShock(factor=v, table="mortality")]
        for v in (0.8, 0.9, 1.0, 1.1, 1.2)
    },
}

# Interest rate parallel shifts
rate_sweep = {
    "BASE": [],
    **{
        f"rates_{v:+}": [AdditiveShock(delta=v, table="discount_rates")]
        for v in (-0.01, -0.005, 0.0, 0.005, 0.01)
    },
}

Then build one ScenarioRun per sweep, or pass the dict directly to for_each_scenario if you don't need governance.


Complete Workflow Example

Step 1: Load Base Tables

import polars as pl
from gaspatchio_core.assumptions import Table

base_tables = {
    "mortality": Table(
        name="mortality",
        source=pl.read_parquet("assumptions/mortality.parquet"),
        dimensions={"age": "age", "duration": "duration"},
        value="qx"
    ),
    "lapse": Table(
        name="lapse",
        source=pl.read_parquet("assumptions/lapse.parquet"),
        dimensions={"duration": "duration"},
        value="rate"
    ),
    "discount_rates": Table(
        name="discount_rates",
        source=pl.read_parquet("assumptions/discount_rates.parquet"),
        dimensions={"year": "year"},
        value="rate"
    ),
}

Step 2: Parse Scenario Config

from gaspatchio_core.scenarios import parse_scenario_config

config = [
    {"id": "BASE"},
    {"id": "ADVERSE", "shocks": [
        {"table": "mortality", "multiply": 1.2},
        {"table": "lapse", "multiply": 0.8},
        {"table": "discount_rates", "add": -0.01}
    ]},
]

scenarios = parse_scenario_config(config)

Step 3: Run Model for Each Scenario

from gaspatchio_core import ActuarialFrame

def run_scenario(scenario_id, shocks, base_tables, model_points):
    """Run model with shocked tables for one scenario."""

    # Create shocked tables for this scenario
    tables = base_tables.copy()
    for shock in shocks:
        if shock.table in tables:
            tables[shock.table] = tables[shock.table].with_shock(shock)

    # Run model with shocked tables
    af = ActuarialFrame(model_points)
    af.scenario_id = scenario_id

    # Lookups use shocked tables
    af.mort_rate = tables["mortality"].lookup(age=af.age, duration=af.duration)
    af.lapse_rate = tables["lapse"].lookup(duration=af.duration)
    af.disc_rate = tables["discount_rates"].lookup(year=af.year)

    # ... rest of model calculations

    return af.collect()

# Run all scenarios
model_points = pl.read_parquet("model_points.parquet")
results = []

for scenario_id, shocks in scenarios.items():
    result = run_scenario(scenario_id, shocks, base_tables, model_points)
    results.append(result)

# Combine results
all_results = pl.concat(results)

Batch Processing

Memory-efficient processing of many scenarios is handled by the for_each_scenario loop, which batches scenarios automatically and merges aggregator state across batches.

from gaspatchio_core import ActuarialFrame
from gaspatchio_core.scenarios import Sum, for_each_scenario


def run_model(af, *, tables=None, drivers=None):
    """Per-scenario projection — here, net cash flow is simply the premium."""
    return af.with_columns(af["premium"].alias("pv_net_cf"))


scenario_ids = list(range(1, 1_001))  # one thousand scenarios

result = for_each_scenario(
    ActuarialFrame(model_points),
    scenarios=scenario_ids,
    model_fn=run_model,
    aggregations=(Sum("pv_net_cf").alias("total_pv"),),
    batch_size="auto",  # the loop sizes batches to fit memory
)
print(result.aggregations["total_pv"])

The aggregator output is bit-equivalent across batch sizes — pick batch_size to fit your memory budget without changing the answer. See Performance at Scale for the memory tradeoff details.


Best Practices

1. Include BASE

A BASE scenario gives the unshocked baseline that other scenarios compare against:

# docs-skip
config = [
    {"id": "BASE"},  # Listed first by convention
    {"id": "STRESS_1", "shocks": [...]},
]

2. Use Consistent Table Names

Standardize table names across your models:

Table Name Description
mortality Death rates
lapse Withdrawal rates
discount_rates Interest/discount rates
expense Per-policy expenses
inflation Expense inflation

3. Validate Before Running

Parse the config and inspect the plan before execution:

from gaspatchio_core.scenarios import ScenarioRun, Sum

scenarios = parse_scenario_config(config)
plan = ScenarioRun(
    shocks=scenarios,
    base_tables=base_tables,
    aggregations=(Sum("bel").alias("bel"),),
)
print(plan.describe())  # Review the shocks and aggregations
print(plan.source_sha())  # Content hash for the audit chain

4. Store Configs for Reproducibility

Save the plan as YAML alongside results:

from pathlib import Path


def model_fn(af, *, tables=None, drivers=None):
    """Expected claims = premium × shocked mortality for this scenario."""
    qx = tables["mortality"].lookup(
        scenario_id=af["scenario_id"],
        age=af["age"],
        duration=af["duration"],
    )
    return af.with_columns((af["premium"] * qx).alias("bel"))


plan.to_yaml(Path("scenario_plan.yaml"))

# When you run with audit=True, a JSON sidecar lands beside the outputs
# carrying source_sha, library versions, and every aggregator output.
result = plan.run(af, model_fn, batch_size=1, audit=True)
print(result.audit_path)

See Also