Pipeline Architecture¶
The RWA calculator processes exposures through a well-defined pipeline with discrete stages. This document details each stage and how they interact.
Pipeline Overview¶
flowchart TD
subgraph Input
A[Raw Data Files]
end
subgraph Stage1[Stage 1: Data Loading]
B[Loader]
B1[RawDataBundle]
end
subgraph Stage2[Stage 2: Hierarchy Resolution]
C[Hierarchy Resolver]
C1[ResolvedHierarchyBundle]
end
subgraph Stage3[Stage 3: Classification]
D[Classifier]
D1[ClassifiedExposuresBundle]
end
subgraph Stage4[Stage 4: CRM Processing]
E[CRM Processor]
E1[CRMAdjustedBundle]
end
subgraph Stage5[Stage 5: Split-Once + Parallel Calculate]
M["re_split_exit edge<br/>(eager sealed frame)"]
U["SA calculate_unified<br/>(Basel 3.1 floor only)"]
SP[Split once by approach]
F["SA branch<br/>calculate_branch()"]
G["IRB branch<br/>calculate_branch()"]
H["Slotting branch<br/>calculate_branch()"]
CA["materialise_sealed_branches<br/>(parallel branch collection)"]
end
subgraph Equity[Equity — Separate Path]
L[Equity Calculator]
L1[EquityResultBundle]
end
subgraph Stage6[Stage 6: Aggregation]
I["_aggregate_results()"]
I1[AggregatedResultBundle]
end
A --> B --> B1
B1 --> C --> C1
C1 --> D --> D1
D1 --> E --> E1
E1 --> M --> U --> SP
SP --> F & G & H
F & G & H --> CA
E1 --> L --> L1
CA & L1 --> I --> I1
Fold over a literal stage registry¶
The pipeline is a fold over a literal stage registry:
engine/registry.pyis the single ordered, literalStageSpeclist (PIPELINE_STAGES) — no conditionals. Stages: securitisation allocator, hierarchy resolver, SA-CCR, classifier, CRM processor, RE splitter, calculators, equity calculator, aggregator.engine/orchestrator.py::run_stagesis a pure fold: it threads one immutablePipelineContext(a typedArtifactKey[T]map defined incontracts/context.py) through the registered stages.- Each stage under
engine/stages/is arun(ctx, rulepack, run_config) -> PipelineContextadapter. Stages exchange eager sealed frames (materialise_edgeat every stage exit; producer-sealed edge contracts incontracts/edges.py). engine/pipeline.pyremains the run-lifecycle facade (run_id, edge capture, FX-rate sync, error merge, audit persistence).
rulepack is the frozen ResolvedRulepack built once per run from
(regime_id, reporting_date).
from rwa_calc.engine.pipeline import create_pipeline
# Create pipeline with default components
pipeline = create_pipeline()
# Run with configuration
result = pipeline.run(config)
# Or with pre-loaded data
result = pipeline.run_with_data(raw_data, config)
For full API documentation, see Pipeline API Reference.
Stage 1: Data Loading¶
Purpose¶
Load raw data from Parquet/CSV files into LazyFrames.
Input¶
File paths to data files.
Output¶
RawDataBundle containing:
- counterparties: Counterparty master data
- facilities: Credit facility data
- loans: Individual loan/draw data
- contingents: Off-balance sheet items
- collateral: Collateral information
- guarantees: Guarantee data
- provisions: Provision allocations
- ratings: External and internal ratings
- org_mapping: Organization hierarchy
- lending_mapping: Retail lending groups
- model_permissions: Per-model IRB approach permissions (optional)
Implementation¶
class ParquetLoader:
def load(self, path: Path) -> RawDataBundle:
return RawDataBundle(
counterparties=pl.scan_parquet(path / "counterparties.parquet"),
facilities=pl.scan_parquet(path / "facilities.parquet"),
loans=pl.scan_parquet(path / "loans.parquet"),
# ... other data sources
)
Validation¶
- Schema validation against defined schemas
- Required field checks
- Data type validation
Stage 2: Hierarchy Resolution¶
Purpose¶
Resolve counterparty and facility hierarchies, inherit ratings, unify exposures, and prepare enriched data for classification.
Input¶
RawDataBundle
Output¶
ResolvedHierarchyBundle with:
- exposures: Unified LazyFrame (loans + contingents + facility undrawn)
- counterparty_lookup: Enriched counterparties with hierarchy metadata and inherited ratings
- collateral, guarantees, provisions: FX-converted CRM data
- lending_group_totals: Aggregated group exposure for retail threshold testing
- model_permissions: Per-model IRB approach permissions (passed through from RawDataBundle)
- hierarchy_errors: Accumulated non-blocking errors
Processing Steps¶
Step 1: Build counterparty hierarchy lookup
├── _build_ultimate_parent_lazy() → traverse org_mappings (up to 10 levels)
├── _build_rating_inheritance_lazy() → inherit ratings (own → parent → unrated)
└── _enrich_counterparties_with_hierarchy() → add hierarchy metadata
Step 2: Unify exposures
├── Standardise loans → exposure_type = "loan"
├── Standardise contingents → exposure_type = "contingent"
├── _build_facility_root_lookup() → traverse facility-to-facility hierarchies
├── _calculate_facility_undrawn() → limit - sum(descendant drawn amounts)
└── pl.concat(all exposure types) → single unified LazyFrame
Step 2a: FX conversion → convert all monetary values to base currency
Step 2b: Add collateral LTV → direct → facility → counterparty priority
Step 3: Residential property coverage
└── Separate residential vs all-property collateral for threshold exclusion
Step 4: Lending group totals
└── Aggregate by group, exclude residential from retail threshold (CRR Art. 123(c))
Step 5: Add lending group totals to exposures
Counterparty Hierarchy¶
The hierarchy resolution uses iterative Polars LazyFrame joins for performance. See engine/stages/hierarchy/graph.py (_build_ultimate_parent_lazy and the facility-root traversal) for the full implementation.
Build ultimate parent mapping using eager graph traversal.
Collects the small edge data eagerly, resolves the full graph via dict traversal, and returns the result as a LazyFrame for downstream joins.
Returns LazyFrame with columns:
- counterparty_reference: The entity
- ultimate_parent_reference: Its deepest reachable parent (the true
root, or the parent at max_depth if the chain was truncated)
- hierarchy_depth: Number of levels traversed
- truncated: True iff the chain was cut off at max_depth; consumed
by build_counterparty_lookup to synthesise HIE003 WARNINGs and
stripped before the LazyFrame is exposed on CounterpartyLookup.
Rating Inheritance¶
Ratings are inherited from parent entities when not directly available. See engine/stages/hierarchy/ratings.py (build_rating_inheritance_lazy).
The inheritance priority is: 1. Entity's own rating 2. Ultimate parent's rating 3. Mark as unrated
Facility Hierarchy¶
Facilities can form multi-level hierarchies (e.g., master facility → sub-facilities). The resolver traverses these using the same iterative join pattern as counterparty hierarchies. See engine/stages/hierarchy/graph.py (build_facility_root_lookup) and facility_undrawn.py.
Key behaviour:
- Facility-to-facility edges identified from facility_mappings where child_type = "facility"
- Traverses up to 10 levels to find the root facility for each sub-facility
- Drawn amounts from all descendant loans are aggregated to the root facility
- Sub-facilities are excluded from producing their own undrawn exposure records
- Only root/standalone facilities with undrawn_amount > 0 generate facility_undrawn exposures
Stage 3: Classification¶
Purpose¶
Assign exposure classes and determine calculation approach.
Input¶
ResolvedHierarchyBundle
Output¶
ClassifiedExposuresBundle with:
- exposure_class: Regulatory exposure class
- approach: SA, F-IRB, A-IRB, or Slotting
- Grouped exposures by approach
Classification Logic¶
The classifier assigns exposure classes and calculation approaches. See engine/stages/classify/classifier.py for the core classification logic (with attributes.py, subtypes.py, permissions.py, approach.py, audit.py).
rwa_calc.engine.stages.classify.classifier.ExposureClassifier
¶
Classify exposures by exposure class and approach.
Implements ClassifierProtocol for: - Mapping counterparty types to exposure classes - Checking SME criteria (turnover thresholds) - Checking retail criteria (aggregate exposure thresholds) - Determining IRB eligibility based on permissions - Identifying specialised lending for slotting - Splitting exposures by calculation approach
All operations use Polars LazyFrames for deferred execution. The classifier batches expressions into 4 .with_columns() calls to keep the query plan shallow (5 nodes instead of 21).
classify(data, config, *, pack=None)
¶
Classify exposures and split by approach.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
ResolvedHierarchyBundle
|
Hierarchy-resolved data from HierarchyResolver |
required |
config
|
CalculationConfig
|
Calculation configuration |
required |
Returns:
| Type | Description |
|---|---|
ClassifiedExposuresBundle
|
ClassifiedExposuresBundle with exposures split by approach |
Classification Priority Order
- Central Govt / Central Bank: Government entities, central banks
- RGLA: Regional governments and local authorities
- PSE: Public sector entities
- MDB: Multilateral development banks
- Institution: Banks, regulated financial institutions, CCPs
- Retail: Individuals, small businesses meeting retail criteria
- Corporate: Non-financial corporates
- Specialised Lending: Project finance, object finance, etc.
Model Permissions Resolution¶
After exposure class assignment, the classifier resolves per-model IRB approach permissions via _resolve_model_permissions(). This optional step uses the model_permissions table (loaded from model_permissions.parquet) to determine which IRB approaches each internal rating model is approved for, scoped by exposure class, geography, and book code.
Resolution logic:
- Left-join exposures to
model_permissionsonmodel_id - A permission row is valid when:
exposure_classmatches, geography passes (country_codesis null orcp_country_codeis in the list), and the book code is not excluded - Per-exposure boolean flags are computed:
model_airb_permitted,model_firb_permitted,model_slotting_permitted - These flags feed
_assign_approach(), which applies Art. 147A hard constraints before model-level permissions
Exposures without a model_id receive all permission flags as False and fall back to SA. When the model_permissions table is absent, the org-wide IRBPermissions configuration applies instead.
Art. 147A Hard Constraints Override Model Permissions
Regulatory restrictions always take priority over model permissions. For example, institutions are limited to F-IRB (Art. 147A(1)(c)) even if a model has A-IRB approval, and equity exposures are SA-only (Art. 147A(1)(a)) regardless of any model permission. See the Model Permissions Specification for the full restriction table and precedence rules.
See engine/stages/classify/approach.py for assign_approach (Art. 147A hard constraints) and permissions.py for resolve_model_permissions.
Stage 4: CRM Processing¶
Purpose¶
Apply credit risk mitigation: collateral, guarantees, provisions.
Input¶
ClassifiedExposuresBundle plus CRM data
Output¶
CRMAdjustedBundle with:
- Adjusted EAD values
- Applied haircuts
- Substituted risk weights (guarantees)
- Provision adjustments
Processing Order¶
Provisions are resolved before CCF application so that the nominal amount is adjusted before credit conversion. The full CRM pipeline order is:
- Resolve provisions — drawn-first deduction (SA only); IRB tracks but does not deduct
- Apply CCFs — uses
nominal_after_provisionfor off-balance sheet conversion - Initialize EAD waterfall — set
ead_pre_crmfrom drawn + interest + CCF contribution. Also setsead_for_crm = on_bs_for_ead + nominal_after_provision(CCF=100% basis per CRR Art. 223(4) / PS1/26 Art. 223(4)) andeffective_ccf(used to recouple the actual CCF in the SA post-collateral EAD per Art. 228(1)) - Apply collateral — collateral is netted against
ead_for_crm(full nominal) for FCCM and FCM purposes; under SA the resultingE*is then multiplied byeffective_ccfto giveead_after_collateral; under FIRB / Slotting the LGD* formula usesead_for_crmasEandead_gross(post-CCF) flows through unchanged as the EAD basis - Apply guarantees (substitution approach, cross-approach CCF substitution)
- Finalize EAD — floor at zero; provisions already baked into
ead_pre_crm
Why two EAD bases? CRR Art. 223(4) and PS1/26 Art. 223(4) both require off-balance-sheet items to be valued at 100% of nominal when computing the exposure value
Eused for CRM, overriding the regulatory CCF. The actual CCF re-couples afterwards: under SA per Art. 228(1) the CCF is applied toE*, while under FIRB the actual CCF stays in EAD but is absent from the LGD* ratio. The pipeline therefore carriesead_gross(post-CCF, the actual EAD that feeds RWA) andead_for_crm(the CCF=100% basis used to net collateral). For pure on-balance-sheet rows the two are equal.
class CRMProcessor:
def get_crm_unified_bundle(
self,
data: ClassifiedExposuresBundle,
config: CalculationConfig,
) -> CRMAdjustedBundle:
"""Apply CRM in correct order (Art. 111(1)(a)-(b) compliant).
The single CRM entry point: returns the unified frame; the pipeline
splits by approach once, just before the calculators."""
# Step 1: Resolve provisions (before CCF)
# SA: drawn-first deduction, remainder reduces nominal
# IRB/Slotting: tracked but not deducted from EAD
after_provisions = self._resolve_provisions(exposures, provisions, config)
# Step 2: Apply CCFs (uses nominal_after_provision)
after_ccf = self._apply_ccf(after_provisions, config)
# Step 3: Initialize EAD waterfall — includes collect barrier
# (flattens deep lazy plan to prevent 3× re-evaluation downstream)
after_init = self._initialize_ead(after_ccf)
# Step 4: Collateral (3 lookup collects: direct/facility/counterparty)
after_collateral = self._apply_collateral(after_init, collateral, config)
# Step 5: Guarantees (cross-approach CCF substitution)
after_guarantees = self._apply_guarantees(
after_collateral, guarantees, counterparty_lookup, config
)
# Step 6: Finalize (no provision subtraction — already in ead_pre_crm)
return self._finalize_ead(after_guarantees)
Stage 5: RWA Calculation¶
Purpose¶
Calculate RWA using appropriate approach for each exposure.
The frame arrives as an eager sealed re_split_exit edge before the calculators
fork the plan three ways; the three per-approach branches are collected through
materialise_sealed_branches (engine/stages/calc.py). See
Collect Barriers & Stage Edges for the edge
model that replaced the old ad-hoc materialise barrier.
SA Calculator¶
class SACalculator:
def calculate_branch(
self,
exposures: pl.LazyFrame, # pre-filtered SA-only rows
config: CalculationConfig,
*,
errors: list[CalculationError] | None = None,
) -> pl.LazyFrame:
"""Calculate SA RWA on the SA branch."""
return (
exposures.pipe(apply_risk_weights, config, pack=pack)
.pipe(apply_fcsm_rw_substitution, config)
.pipe(apply_life_insurance_rw_mapping)
.pipe(apply_guarantee_substitution, config, pack=pack)
.pipe(apply_currency_mismatch_multiplier, config, pack=pack)
.pipe(apply_due_diligence_override, config, errors=errors, pack=pack)
.pipe(calculate_rwa)
.pipe(apply_supporting_factors, config, errors=errors, pack=pack) # CRR only
) # + approach_applied / rwa_final for the aggregator
# transforms live in engine/sa/risk_weights.py, rw_adjustments.py, factors_output.py
IRB Calculator¶
class IRBCalculator:
def calculate_branch(
self,
exposures: pl.LazyFrame, # pre-filtered IRB-only rows
config: CalculationConfig,
*,
errors: list[CalculationError] | None = None,
) -> pl.LazyFrame:
"""Calculate IRB RWA."""
result = (
exposures
.with_columns(
# Apply PD floor — read from the resolved rulepack, not config
pd_floored=pl.max_horizontal(
pl.col("pd"),
pl.lit(pack.formula("pd_floors")["minimum"])
)
)
.with_columns(
# Calculate correlation
correlation=self._calculate_correlation(
pl.col("exposure_class"),
pl.col("pd_floored"),
pl.col("turnover")
),
# Calculate maturity adjustment
ma=self._calculate_maturity_adjustment(
pl.col("pd_floored"),
pl.col("effective_maturity")
)
)
.with_columns(
# Calculate K
k=self._calculate_k(
pl.col("pd_floored"),
pl.col("lgd"),
pl.col("correlation")
)
)
.with_columns(
# Calculate RWA — scaling factor read from the resolved rulepack
rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") *
pack.scalar("scaling_factor")
)
)
return result # + approach_applied / rwa_final for the aggregator
Slotting Calculator¶
class SlottingCalculator:
def calculate_branch(
self,
exposures: pl.LazyFrame, # pre-filtered slotting-only rows
config: CalculationConfig,
*,
errors: list[CalculationError] | None = None,
) -> pl.LazyFrame:
"""Calculate Slotting RWA."""
return (
exposures.pipe(prepare_columns, config)
.pipe(apply_slotting_weights, config, pack=pack) # Art. 153(5) tables
.pipe(calculate_rwa)
.pipe(apply_el_rates, config, pack=pack)
.pipe(compute_el_shortfall_excess, errors=errors)
) # + supporting factors (CRR) and aggregator columns
# transforms live in engine/slotting/transforms.py
Stage 6: Aggregation¶
Purpose¶
Combine results, apply output floor, produce final output.
Input¶
The three collected branch frames (SA / IRB / slotting) plus the equity result bundle
Output¶
AggregatedResultBundle with:
- Total RWA
- RWA by approach
- RWA by exposure class
- Floor impact (Basel 3.1)
- Detailed breakdown
Output Floor (Basel 3.1)¶
def apply_output_floor(
irb_rwa: pl.LazyFrame,
sa_rwa: pl.LazyFrame,
floor_percentage: float
) -> pl.LazyFrame:
"""Apply output floor to IRB results."""
return (
irb_rwa
.join(sa_rwa, on="exposure_reference", suffix="_sa")
.with_columns(
floor=pl.col("rwa_sa") * floor_percentage,
rwa_floored=pl.max_horizontal(
pl.col("rwa"),
pl.col("rwa_sa") * floor_percentage
)
)
)
Error Handling¶
Each stage accumulates errors:
@dataclass
class LazyFrameResult:
frame: pl.LazyFrame
errors: list[CalculationError] = field(default_factory=list)
# Pipeline accumulates across stages
all_errors = []
all_errors.extend(loader_result.errors)
all_errors.extend(hierarchy_result.errors)
# ... etc.
Next Steps¶
- Data Flow - Detailed data flow documentation
- Component Overview - Individual component details
- API Reference - Pipeline API documentation