Data Flow¶
This document describes how data flows through the RWA calculator, including schemas, transformations, and validation points.
Data Flow Overview¶
flowchart TD
subgraph External
A[Parquet Files]
B[CSV Files]
end
subgraph Loading
C[Schema Validation]
D[RawDataBundle]
end
subgraph Processing
E[Hierarchy Resolution]
F[Classification]
G[CRM Application]
end
subgraph Calculation
H[SA/IRB/Slotting/Equity]
end
subgraph Output
I[Aggregation]
J[Results]
end
A & B --> C --> D
D --> E --> F --> G --> H --> I --> J
Input Data¶
Required Files¶
| File | Description | Required |
|---|---|---|
counterparties.parquet |
Counterparty master data | Yes |
facilities.parquet |
Credit facilities | Yes |
loans.parquet |
Individual loans/draws | Yes |
Optional Files¶
| File | Description |
|---|---|
contingents.parquet |
Off-balance sheet items |
collateral.parquet |
Collateral details |
guarantees.parquet |
Guarantee information |
provisions.parquet |
Provision allocations |
ratings.parquet |
Credit ratings |
org_mapping.parquet |
Organization hierarchy |
lending_mapping.parquet |
Retail lending groups |
model_permissions.parquet |
Per-model IRB approach permissions (overrides org-wide IRBPermissions) |
Schema Definitions¶
Schemas are defined in data/schemas.py. The key column names use _reference suffixes (e.g., counterparty_reference, facility_reference) rather than _id.
Counterparty Schema¶
COUNTERPARTY_SCHEMA = {
"counterparty_reference": pl.String, # Unique identifier
"counterparty_name": pl.String, # Legal name
"entity_type": pl.String, # Single source of truth: sovereign, institution, corporate, etc.
"country_code": pl.String, # ISO country code
"annual_revenue": pl.Float64, # For SME classification (EUR 50m threshold)
"total_assets": pl.Float64, # For LFSE threshold (EUR 70bn CRR / GBP 79bn B31)
"default_status": pl.Boolean, # Default indicator
"sector_code": pl.String, # Based on SIC
"apply_fi_scalar": pl.Boolean, # User flag: True = apply 1.25x FI correlation scalar
"is_managed_as_retail": pl.Boolean, # SME managed on pooled retail basis - 75% RW
"scra_grade": pl.String, # SCRA grade for unrated institutions: A, B, C
"is_investment_grade": pl.Boolean, # Publicly traded + investment grade → 65% SA RW
}
Facility Schema¶
FACILITY_SCHEMA = {
"facility_reference": pl.String, # Unique identifier
"product_type": pl.String, # Product classification
"book_code": pl.String, # Book identifier
"counterparty_reference": pl.String, # Link to counterparty
"value_date": pl.Date, # Facility start
"maturity_date": pl.Date, # Final maturity
"currency": pl.String, # ISO currency code
"limit": pl.Float64, # Total commitment
"committed": pl.Boolean, # Committed flag
"lgd": pl.Float64, # A-IRB modelled LGD
"beel": pl.Float64, # Best estimate expected loss
"is_revolving": pl.Boolean, # Revolving facility flag
"seniority": pl.String, # senior/subordinated - affects F-IRB LGD
"risk_type": pl.String, # FR/MR/MLR/LR - determines CCF
"ccf_modelled": pl.Float64, # A-IRB modelled CCF (0.0-1.5)
"is_short_term_trade_lc": pl.Boolean, # 20% CCF under F-IRB (Art. 166(9))
"is_buy_to_let": pl.Boolean, # BTL - excluded from SME supporting factor
"is_qrre_transactor": pl.Boolean, # QRRE transactor flag (CRE30.55)
}
Loan Schema¶
LOAN_SCHEMA = {
"loan_reference": pl.String, # Unique identifier
"product_type": pl.String, # Product classification
"book_code": pl.String, # Book identifier
"counterparty_reference": pl.String, # Link to counterparty
"value_date": pl.Date, # Loan start
"maturity_date": pl.Date, # Final maturity
"currency": pl.String, # ISO currency code
"drawn_amount": pl.Float64, # Outstanding principal
"interest": pl.Float64, # Accrued interest (adds to EAD)
"lgd": pl.Float64, # A-IRB modelled LGD
"beel": pl.Float64, # Best estimate expected loss
"seniority": pl.String, # senior/subordinated
"is_buy_to_let": pl.Boolean, # BTL property lending
"netting_agreement_reference": pl.String, # CRR Art. 195/219 netting set (drives netting)
}
Rating Schema¶
RATINGS_SCHEMA = {
"rating_reference": pl.String, # Unique identifier
"counterparty_reference": pl.String, # Link to counterparty
"rating_type": pl.String, # internal/external
"rating_agency": pl.String, # internal, S&P, Moodys, Fitch, DBRS, etc.
"rating_value": pl.String, # AAA, AA+, Aa1, etc.
"cqs": pl.Int8, # Credit Quality Step 1-6
"pd": pl.Float64, # Probability of Default (for internal ratings)
"rating_date": pl.Date, # Rating as-of date
"is_solicited": pl.Boolean, # Whether rating was solicited
"model_id": pl.String, # IRB model identifier
}
Collateral Schema¶
COLLATERAL_SCHEMA = {
"collateral_reference": pl.String, # Unique identifier
"collateral_type": pl.String, # cash, gold, equity, bond, real_estate, etc.
"currency": pl.String, # ISO currency code
"maturity_date": pl.Date, # Collateral maturity
"market_value": pl.Float64, # Current market value
"nominal_value": pl.Float64, # Nominal value
"pledge_percentage": pl.Float64, # Fraction of beneficiary EAD (0.5 = 50%)
"beneficiary_type": pl.String, # counterparty/loan/facility/contingent
"beneficiary_reference": pl.String, # Reference to linked entity
"issuer_cqs": pl.Int8, # CQS of issuer (1-6) for haircut lookup
"issuer_type": pl.String, # sovereign/pse/corporate/securitisation
"residual_maturity_years": pl.Float64, # For haircut bands
"property_type": pl.String, # residential/commercial (RE collateral)
"property_ltv": pl.Float64, # Loan-to-value ratio
"is_income_producing": pl.Boolean, # Material income dependence
"is_adc": pl.Boolean, # Acquisition/Development/Construction
"is_eligible_financial_collateral": pl.Boolean, # SA eligibility (CRR Art 197)
"is_eligible_irb_collateral": pl.Boolean, # IRB eligibility (CRR Art 199)
"valuation_date": pl.Date, # Date of last valuation
"valuation_type": pl.String, # market, indexed, independent
"pledge_percentage": pl.Float64, # Fraction of beneficiary EAD
"is_presold": pl.Boolean, # ADC pre-sold to qualifying buyer
}
Data Transformations¶
Stage 1: Loading¶
Input: Raw files
Output: RawDataBundle
Transformations: - Load files as LazyFrames - Validate against schemas - Convert data types - Add metadata columns
# Example transformation
counterparties = (
pl.scan_parquet(path / "counterparties.parquet")
.with_columns(
_load_timestamp=pl.lit(datetime.now()),
_source_file=pl.lit("counterparties.parquet")
)
)
Stage 2: Hierarchy Resolution¶
Input: RawDataBundle
Output: ResolvedHierarchyBundle
See the engine/stages/hierarchy/ package (resolver.py, graph.py, ratings.py, …) for implementation. (engine/hierarchy.py is now a back-compat import shim.)
Transformations: - Resolve parent-child relationships - Calculate aggregate exposures - Inherit ratings - Resolve lending groups
# Hierarchy resolution adds columns
resolved = (
exposures
.with_columns(
ultimate_parent_id=...,
group_total_exposure=...,
inherited_rating=...,
inherited_cqs=...,
)
)
See engine/stages/hierarchy/resolver.py for the HierarchyResolver recipe and its delegators (graph.py, ratings.py, facility_undrawn.py, unify.py, enrich.py).
Stage 3: Classification¶
Input: ResolvedHierarchyBundle
Output: ClassifiedExposuresBundle
See the engine/stages/classify/ package (classifier.py, attributes.py, subtypes.py, permissions.py, approach.py, audit.py) for implementation. (engine/classifier.py is now a back-compat import shim.)
Transformations:
- Assign exposure class
- Resolve per-model IRB permissions (model_permissions join on model_id, producing model_airb_permitted, model_firb_permitted, model_slotting_permitted flags)
- Determine calculation approach (Art. 147A hard constraints override model permissions)
- Calculate CCFs
- Calculate EAD
# Classification adds columns
classified = (
resolved
.with_columns(
exposure_class=...,
approach_type=...,
ccf=...,
ead=pl.col("drawn_amount") + pl.col("undrawn_amount") * pl.col("ccf"),
)
)
See engine/stages/classify/classifier.py for the ExposureClassifier recipe (with attributes.py, subtypes.py, permissions.py, approach.py, audit.py).
Stage 4: CRM Processing¶
Input: ClassifiedExposuresBundle
Output: CRMAdjustedBundle
See crm/processor.py for implementation.
Transformations (Art. 111(1)(a)-(b) compliant order):
- Resolve provisions — drawn-first deduction (SA), tracking only (IRB/Slotting)
- Apply CCFs — uses
nominal_after_provisionfor off-balance sheet conversion - Initialize EAD — set
ead_pre_crmfrom drawn + interest + CCF contribution - Apply collateral — haircuts, overcollateralisation, multi-level allocation
- Apply guarantees — substitution, cross-approach CCF substitution
- Finalize EAD — floor at zero, no further provision subtraction
# CRM processing adds columns
crm_adjusted = (
classified
.with_columns(
# Provision resolution (before CCF)
provision_allocated=...,
provision_on_drawn=...,
provision_on_nominal=...,
nominal_after_provision=...,
provision_deducted=...,
# Collateral
collateral_value=...,
collateral_haircut=...,
# Guarantees
guaranteed_amount=...,
guarantor_rw=...,
# Final
net_ead=...,
)
)
CRM Processing (processor.py)
) -> pl.Expr:
"""
Build a Boolean column expression that falls back to literal False if absent.
When ``aggregate`` is ``"max"`` (used in group_by aggregations to flag a group
if any row carries the bit), apply the aggregation; otherwise return the
raw fill_null(False) expression.
"""
if not available:
return pl.lit(False).alias(alias)
base = pl.col(col_name).fill_null(False)
if aggregate == "max":
base = base.max()
return base.alias(alias)
def _build_direct_lookup(
exposures: pl.LazyFrame,
exposure_ccy_col: str,
*,
has_floor_col: bool,
has_sft_col: bool,
) -> pl.LazyFrame:
"""Build the direct (per-exposure) lookup frame via the allocation kernel."""
return direct_level_lookup(
exposures,
key="exposure_reference",
out_key="_ben_ref_direct",
values=[
pl.col("ead_for_crm").alias("_ead_direct"),
pl.col(exposure_ccy_col).alias("_currency_direct"),
pl.col("maturity_date").alias("_maturity_direct"),
_optional_bool_col("has_one_day_maturity_floor", "_floor_direct", has_floor_col),
_optional_bool_col("is_sft", "_sft_direct", has_sft_col),
],
)
def _build_facility_lookup(
exposures: pl.LazyFrame,
exposure_ccy_col: str,
pool_expr: pl.Expr,
*,
has_parent_col: bool,
has_floor_col: bool,
has_sft_col: bool,
) -> pl.LazyFrame:
"""Build the facility-aggregated lookup frame, keyed by each ancestor facility.
For nested facility hierarchies (collateral pledged at a grandparent
facility) the EAD / currency / maturity aggregates for a facility ``F`` cover
ALL descendant exposures of ``F`` — not only exposures whose immediate
``parent_facility_reference`` is ``F``. Subtree membership comes from the
``ancestor_facilities`` list column (parent + all ancestors up to root, incl.
self) built by the HierarchyResolver; when absent it falls back to the
1-element ``[parent_facility_reference]`` list, identical to the legacy
single-level behaviour. This makes a ``pledge_percentage`` pledged at any
ancestor facility resolve against the full subtree EAD.
"""
if not has_parent_col:
return pl.LazyFrame(
schema={
"_ben_ref_facility": pl.String,
"_ead_facility": pl.Float64,
"_ead_facility_airb": pl.Float64,
"_ead_facility_non_airb": pl.Float64,
"_currency_facility": pl.String,
"_maturity_facility": pl.Date,
"_floor_facility": pl.Boolean,
"_sft_facility": pl.Boolean,
}
)
facility_agg = [
pl.col("ead_for_crm").sum().alias("_ead_facility"),
Stage 5: RWA Calculation¶
Input: CRMAdjustedBundle
Output: Result bundles
See sa/calculator.py and irb/formulas.py for implementations.
Transformations: - Look up risk weights (SA) - Calculate K formula (IRB) - Apply maturity adjustment - Calculate RWA
# SA calculation
sa_result = (
sa_exposures
.with_columns(
risk_weight=...,
supporting_factor=...,
rwa=pl.col("ead") * pl.col("risk_weight") * pl.col("supporting_factor"),
)
)
# IRB calculation
irb_result = (
irb_exposures
.with_columns(
pd_floored=...,
lgd_floored=...,
correlation=...,
k=...,
maturity_adjustment=...,
rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") * scaling,
expected_loss=pl.col("pd") * pl.col("lgd") * pl.col("ead"),
)
)
IRB Formula Application (formulas.py)
"subordinated": rows[("unsecured", "subordinated", False)],
"covered_bond": rows[("covered_bond", "senior", False)],
"financial_collateral": rows[("financial_collateral", "senior", False)],
"receivables": rows[("receivables", "senior", False)],
"residential_re": rows[("residential_re", "senior", False)],
"commercial_re": rows[("commercial_re", "senior", False)],
"other_physical": rows[("other_physical", "senior", False)],
"purchased_receivables_senior": rows[("purchased_receivables", "senior", False)],
"purchased_receivables_subordinated": rows[
("purchased_receivables", "subordinated", False)
],
"dilution_risk": rows[("purchased_receivables", "dilution_risk", False)],
}
fse = rows.get(("unsecured", "senior", True))
if fse is not None and fse != values["unsecured_senior"]:
values["unsecured_senior_fse"] = fse
for key, collateral_type in (
("financial_collateral_subordinated", "financial_collateral"),
("receivables_subordinated", "receivables"),
("residential_re_subordinated", "residential_re"),
("commercial_re_subordinated", "commercial_re"),
("other_physical_subordinated", "other_physical"),
):
subordinated = rows.get((collateral_type, "subordinated", False))
if subordinated is not None:
values[key] = subordinated
return values
# =============================================================================
# PD AND LGD FLOOR EXPRESSION HELPERS
# =============================================================================
@cites("CRR Art. 163")
@cites("PS1/26, paragraph 163")
def _pd_floor_expression(
config: CalculationConfig,
*,
has_transactor_col: bool = True,
exposure_class_col: str = "exposure_class",
transactor_col: str = "is_qrre_transactor",
pack: ResolvedRulepack | None = None,
) -> pl.Expr:
"""
Build Polars expression for per-exposure-class PD floor.
Under CRR (Art. 163): Uniform 0.03% floor for all exposure classes.
Under Basel 3.1 (CRE30.55): Differentiated floors:
- Corporate/SME: 0.05%
- Retail mortgage: 0.10% (Art. 163(1)(b))
- QRRE transactors: 0.05%, revolvers: 0.10% (Art. 163(1)(c))
- Retail other: 0.05%
Args:
config: Calculation configuration
has_transactor_col: Whether the LazyFrame has the transactor column.
When True (pipeline path), uses per-row transactor/revolver distinction.
When False (isolated expressions), defaults to conservative revolver floor.
exposure_class_col: Name of the column to read the exposure class from.
Defaults to ``exposure_class`` (the borrower's class). For guarantor
PD substitution (CRR Art. 161(3) / B31 CRE22.70-85, Art. 160(4)),
pass ``guarantor_exposure_class`` so the floor reads the guarantor's
own class — the guaranteed portion is treated as a direct exposure
to the guarantor, so the guarantor's class floor governs.
transactor_col: Name of the QRRE transactor flag column. For guarantor
PD floors this is normally not relevant (guarantors are typically
not QRRE), but the parameter is exposed for symmetry with
``exposure_class_col``.
Returns a Polars expression evaluating to the per-row PD floor value.
"""
resolved_pack = pack if pack is not None else RulepackV0.from_config(config).pack
floors = formula_float_map(resolved_pack.formula("pd_floors"))
# Optimisation: if all floors are the same (CRR case), return a scalar
all_values = set(floors.values())
if len(all_values) == 1:
return pl.lit(all_values.pop())
# Basel 3.1: differentiated floors by exposure class
exp_class = pl.col(exposure_class_col).cast(pl.String).fill_null("CORPORATE").str.to_uppercase()
# QRRE transactor/revolver distinction (CRE30.55):
# Transactors (repay in full each period) get 0.03% floor;
# revolvers (carry balance) get 0.10% floor.
if has_transactor_col:
qrre_floor = (
pl.when(pl.col(transactor_col).fill_null(False))
.then(pl.lit(floors["retail_qrre_transactor"]))
.otherwise(pl.lit(floors["retail_qrre_revolver"]))
)
else:
# Conservative default: revolver floor (0.10% under Basel 3.1)
qrre_floor = pl.lit(floors["retail_qrre_revolver"])
sovereign_value = ExposureClass.CENTRAL_GOVT_CENTRAL_BANK.value.upper()
institution_value = ExposureClass.INSTITUTION.value.upper()
return (
pl.when(exp_class.str.contains("QRRE"))
.then(qrre_floor)
.when(exp_class.str.contains("MORTGAGE") | exp_class.str.contains("RESIDENTIAL"))
.then(pl.lit(floors["retail_mortgage"]))
.when(exp_class.str.contains("RETAIL"))
Stage 6: Aggregation¶
Input: Result bundles
Output: AggregatedResultBundle
See the engine/aggregator/ package (aggregator.py, with _floor.py for the output-floor logic, _summaries.py, _el_summary.py, …) for implementation.
Transformations: - Combine results from all approaches - Apply output floor (Basel 3.1) - Calculate totals and breakdowns
# Aggregation
aggregated = (
pl.concat([sa_result, irb_result, slotting_result])
.with_columns(
# Basel 3.1 output floor
rwa_floored=pl.when(framework == "BASEL_3_1")
.then(pl.max_horizontal("rwa", "sa_equivalent_rwa" * floor))
.otherwise(pl.col("rwa"))
)
)
See engine/aggregator/aggregator.py for the combine logic and engine/aggregator/_floor.py for the output-floor application.
Data Validation¶
Schema Validation¶
def validate_schema(
df: pl.LazyFrame,
expected_schema: dict[str, pl.DataType]
) -> list[ValidationError]:
"""Validate DataFrame against expected schema."""
errors = []
for column, expected_type in expected_schema.items():
if column not in df.columns:
errors.append(ValidationError(
field=column,
message=f"Missing required column: {column}"
))
elif df.schema[column] != expected_type:
errors.append(ValidationError(
field=column,
message=f"Type mismatch: expected {expected_type}, got {df.schema[column]}"
))
return errors
Business Rule Validation¶
def validate_exposure(exposure: dict) -> list[ValidationError]:
"""Validate exposure against business rules."""
errors = []
# EAD must be positive
if exposure["ead"] <= 0:
errors.append(ValidationError(
field="ead",
message="EAD must be positive"
))
# PD must be in valid range
if not (0 <= exposure["pd"] <= 1):
errors.append(ValidationError(
field="pd",
message="PD must be between 0 and 1"
))
return errors
Output Data¶
Result Schema¶
CALCULATION_OUTPUT_SCHEMA = {
"exposure_reference": pl.String,
"counterparty_reference": pl.String,
"exposure_class": pl.String,
"approach_applied": pl.String,
"final_ead": pl.Float64,
"sa_final_risk_weight": pl.Float64,
"irb_risk_weight": pl.Float64,
"final_rwa": pl.Float64,
"irb_expected_loss": pl.Float64, # IRB only
"rwa_before_floor": pl.Float64, # Basel 3.1
"floor_impact": pl.Float64, # Basel 3.1
}
Export Formats¶
# Export to various formats
result.to_parquet("results.parquet")
result.to_csv("results.csv")
result.to_json("results.json")
# Get as DataFrame
df = result.to_dataframe()
Data Lineage¶
Each calculator produces an audit trail LazyFrame alongside results. The audit trail captures the full calculation reasoning per exposure, including classification decisions, CRM adjustments, and formula parameters. Access audit data via the result bundles:
# Access audit trails from result bundles
result = pipeline.run_with_data(raw_data, config)
# SA audit
sa_audit = result.sa_result.calculation_audit
# IRB audit
irb_audit = result.irb_result.calculation_audit
# Materialize results with full detail
df = result.to_dataframe()
Next Steps¶
- Component Overview - Individual components
- API Reference - API documentation
- Data Model - Complete schema reference