Skip to content

Pipeline Architecture

The RWA calculator processes exposures through a well-defined pipeline with discrete stages. This document details each stage and how they interact.

Pipeline Overview

flowchart TD
    subgraph Input
        A[Raw Data Files]
    end

    subgraph Stage1[Stage 1: Data Loading]
        B[Loader]
        B1[RawDataBundle]
    end

    subgraph Stage2[Stage 2: Hierarchy Resolution]
        C[Hierarchy Resolver]
        C1[ResolvedHierarchyBundle]
    end

    subgraph Stage3[Stage 3: Classification]
        D[Classifier]
        D1[ClassifiedExposuresBundle]
    end

    subgraph Stage4[Stage 4: CRM Processing]
        E[CRM Processor]
        E1[CRMAdjustedBundle]
    end

    subgraph Stage5[Stage 5: Split-Once + Parallel Calculate]
        M["re_split_exit edge<br/>(eager sealed frame)"]
        U["SA calculate_unified<br/>(Basel 3.1 floor only)"]
        SP[Split once by approach]
        F["SA branch<br/>calculate_branch()"]
        G["IRB branch<br/>calculate_branch()"]
        H["Slotting branch<br/>calculate_branch()"]
        CA["materialise_sealed_branches<br/>(parallel branch collection)"]
    end

    subgraph Equity[Equity — Separate Path]
        L[Equity Calculator]
        L1[EquityResultBundle]
    end

    subgraph Stage6[Stage 6: Aggregation]
        I["_aggregate_results()"]
        I1[AggregatedResultBundle]
    end

    A --> B --> B1
    B1 --> C --> C1
    C1 --> D --> D1
    D1 --> E --> E1
    E1 --> M --> U --> SP
    SP --> F & G & H
    F & G & H --> CA
    E1 --> L --> L1
    CA & L1 --> I --> I1

Fold over a literal stage registry

The pipeline is a fold over a literal stage registry:

  • engine/registry.py is the single ordered, literal StageSpec list (PIPELINE_STAGES) — no conditionals. Stages: securitisation allocator, hierarchy resolver, SA-CCR, classifier, CRM processor, RE splitter, calculators, equity calculator, aggregator.
  • engine/orchestrator.py::run_stages is a pure fold: it threads one immutable PipelineContext (a typed ArtifactKey[T] map defined in contracts/context.py) through the registered stages.
  • Each stage under engine/stages/ is a run(ctx, rulepack, run_config) -> PipelineContext adapter. Stages exchange eager sealed frames (materialise_edge at every stage exit; producer-sealed edge contracts in contracts/edges.py).
  • engine/pipeline.py remains the run-lifecycle facade (run_id, edge capture, FX-rate sync, error merge, audit persistence).

rulepack is the frozen ResolvedRulepack built once per run from (regime_id, reporting_date).

from rwa_calc.engine.pipeline import create_pipeline

# Create pipeline with default components
pipeline = create_pipeline()

# Run with configuration
result = pipeline.run(config)

# Or with pre-loaded data
result = pipeline.run_with_data(raw_data, config)

For full API documentation, see Pipeline API Reference.

Stage 1: Data Loading

Purpose

Load raw data from Parquet/CSV files into LazyFrames.

Input

File paths to data files.

Output

RawDataBundle containing: - counterparties: Counterparty master data - facilities: Credit facility data - loans: Individual loan/draw data - contingents: Off-balance sheet items - collateral: Collateral information - guarantees: Guarantee data - provisions: Provision allocations - ratings: External and internal ratings - org_mapping: Organization hierarchy - lending_mapping: Retail lending groups - model_permissions: Per-model IRB approach permissions (optional)

Implementation

class ParquetLoader:
    def load(self, path: Path) -> RawDataBundle:
        return RawDataBundle(
            counterparties=pl.scan_parquet(path / "counterparties.parquet"),
            facilities=pl.scan_parquet(path / "facilities.parquet"),
            loans=pl.scan_parquet(path / "loans.parquet"),
            # ... other data sources
        )

Validation

  • Schema validation against defined schemas
  • Required field checks
  • Data type validation

Stage 2: Hierarchy Resolution

Purpose

Resolve counterparty and facility hierarchies, inherit ratings, unify exposures, and prepare enriched data for classification.

Input

RawDataBundle

Output

ResolvedHierarchyBundle with: - exposures: Unified LazyFrame (loans + contingents + facility undrawn) - counterparty_lookup: Enriched counterparties with hierarchy metadata and inherited ratings - collateral, guarantees, provisions: FX-converted CRM data - lending_group_totals: Aggregated group exposure for retail threshold testing - model_permissions: Per-model IRB approach permissions (passed through from RawDataBundle) - hierarchy_errors: Accumulated non-blocking errors

Processing Steps

Step 1: Build counterparty hierarchy lookup
  ├── _build_ultimate_parent_lazy()      → traverse org_mappings (up to 10 levels)
  ├── _build_rating_inheritance_lazy()   → inherit ratings (own → parent → unrated)
  └── _enrich_counterparties_with_hierarchy() → add hierarchy metadata

Step 2: Unify exposures
  ├── Standardise loans         → exposure_type = "loan"
  ├── Standardise contingents   → exposure_type = "contingent"
  ├── _build_facility_root_lookup()       → traverse facility-to-facility hierarchies
  ├── _calculate_facility_undrawn()       → limit - sum(descendant drawn amounts)
  └── pl.concat(all exposure types)       → single unified LazyFrame

Step 2a: FX conversion          → convert all monetary values to base currency
Step 2b: Add collateral LTV     → direct → facility → counterparty priority

Step 3: Residential property coverage
  └── Separate residential vs all-property collateral for threshold exclusion

Step 4: Lending group totals
  └── Aggregate by group, exclude residential from retail threshold (CRR Art. 123(c))

Step 5: Add lending group totals to exposures

Counterparty Hierarchy

The hierarchy resolution uses iterative Polars LazyFrame joins for performance. See engine/stages/hierarchy/graph.py (_build_ultimate_parent_lazy and the facility-root traversal) for the full implementation.

Build ultimate parent mapping using eager graph traversal.

Collects the small edge data eagerly, resolves the full graph via dict traversal, and returns the result as a LazyFrame for downstream joins.

Returns LazyFrame with columns: - counterparty_reference: The entity - ultimate_parent_reference: Its deepest reachable parent (the true root, or the parent at max_depth if the chain was truncated) - hierarchy_depth: Number of levels traversed - truncated: True iff the chain was cut off at max_depth; consumed by build_counterparty_lookup to synthesise HIE003 WARNINGs and stripped before the LazyFrame is exposed on CounterpartyLookup.

Rating Inheritance

Ratings are inherited from parent entities when not directly available. See engine/stages/hierarchy/ratings.py (build_rating_inheritance_lazy).

The inheritance priority is: 1. Entity's own rating 2. Ultimate parent's rating 3. Mark as unrated

Facility Hierarchy

Facilities can form multi-level hierarchies (e.g., master facility → sub-facilities). The resolver traverses these using the same iterative join pattern as counterparty hierarchies. See engine/stages/hierarchy/graph.py (build_facility_root_lookup) and facility_undrawn.py.

Key behaviour: - Facility-to-facility edges identified from facility_mappings where child_type = "facility" - Traverses up to 10 levels to find the root facility for each sub-facility - Drawn amounts from all descendant loans are aggregated to the root facility - Sub-facilities are excluded from producing their own undrawn exposure records - Only root/standalone facilities with undrawn_amount > 0 generate facility_undrawn exposures

Stage 3: Classification

Purpose

Assign exposure classes and determine calculation approach.

Input

ResolvedHierarchyBundle

Output

ClassifiedExposuresBundle with: - exposure_class: Regulatory exposure class - approach: SA, F-IRB, A-IRB, or Slotting - Grouped exposures by approach

Classification Logic

The classifier assigns exposure classes and calculation approaches. See engine/stages/classify/classifier.py for the core classification logic (with attributes.py, subtypes.py, permissions.py, approach.py, audit.py).

rwa_calc.engine.stages.classify.classifier.ExposureClassifier

Classify exposures by exposure class and approach.

Implements ClassifierProtocol for: - Mapping counterparty types to exposure classes - Checking SME criteria (turnover thresholds) - Checking retail criteria (aggregate exposure thresholds) - Determining IRB eligibility based on permissions - Identifying specialised lending for slotting - Splitting exposures by calculation approach

All operations use Polars LazyFrames for deferred execution. The classifier batches expressions into 4 .with_columns() calls to keep the query plan shallow (5 nodes instead of 21).

classify(data, config, *, pack=None)

Classify exposures and split by approach.

Parameters:

Name Type Description Default
data ResolvedHierarchyBundle

Hierarchy-resolved data from HierarchyResolver

required
config CalculationConfig

Calculation configuration

required

Returns:

Type Description
ClassifiedExposuresBundle

ClassifiedExposuresBundle with exposures split by approach

Classification Priority Order

  1. Central Govt / Central Bank: Government entities, central banks
  2. RGLA: Regional governments and local authorities
  3. PSE: Public sector entities
  4. MDB: Multilateral development banks
  5. Institution: Banks, regulated financial institutions, CCPs
  6. Retail: Individuals, small businesses meeting retail criteria
  7. Corporate: Non-financial corporates
  8. Specialised Lending: Project finance, object finance, etc.

Model Permissions Resolution

After exposure class assignment, the classifier resolves per-model IRB approach permissions via _resolve_model_permissions(). This optional step uses the model_permissions table (loaded from model_permissions.parquet) to determine which IRB approaches each internal rating model is approved for, scoped by exposure class, geography, and book code.

Resolution logic:

  1. Left-join exposures to model_permissions on model_id
  2. A permission row is valid when: exposure_class matches, geography passes (country_codes is null or cp_country_code is in the list), and the book code is not excluded
  3. Per-exposure boolean flags are computed: model_airb_permitted, model_firb_permitted, model_slotting_permitted
  4. These flags feed _assign_approach(), which applies Art. 147A hard constraints before model-level permissions

Exposures without a model_id receive all permission flags as False and fall back to SA. When the model_permissions table is absent, the org-wide IRBPermissions configuration applies instead.

Art. 147A Hard Constraints Override Model Permissions

Regulatory restrictions always take priority over model permissions. For example, institutions are limited to F-IRB (Art. 147A(1)(c)) even if a model has A-IRB approval, and equity exposures are SA-only (Art. 147A(1)(a)) regardless of any model permission. See the Model Permissions Specification for the full restriction table and precedence rules.

See engine/stages/classify/approach.py for assign_approach (Art. 147A hard constraints) and permissions.py for resolve_model_permissions.

Stage 4: CRM Processing

Purpose

Apply credit risk mitigation: collateral, guarantees, provisions.

Input

ClassifiedExposuresBundle plus CRM data

Output

CRMAdjustedBundle with: - Adjusted EAD values - Applied haircuts - Substituted risk weights (guarantees) - Provision adjustments

Processing Order

Provisions are resolved before CCF application so that the nominal amount is adjusted before credit conversion. The full CRM pipeline order is:

  1. Resolve provisions — drawn-first deduction (SA only); IRB tracks but does not deduct
  2. Apply CCFs — uses nominal_after_provision for off-balance sheet conversion
  3. Initialize EAD waterfall — set ead_pre_crm from drawn + interest + CCF contribution. Also sets ead_for_crm = on_bs_for_ead + nominal_after_provision (CCF=100% basis per CRR Art. 223(4) / PS1/26 Art. 223(4)) and effective_ccf (used to recouple the actual CCF in the SA post-collateral EAD per Art. 228(1))
  4. Apply collateral — collateral is netted against ead_for_crm (full nominal) for FCCM and FCM purposes; under SA the resulting E* is then multiplied by effective_ccf to give ead_after_collateral; under FIRB / Slotting the LGD* formula uses ead_for_crm as E and ead_gross (post-CCF) flows through unchanged as the EAD basis
  5. Apply guarantees (substitution approach, cross-approach CCF substitution)
  6. Finalize EAD — floor at zero; provisions already baked into ead_pre_crm

Why two EAD bases? CRR Art. 223(4) and PS1/26 Art. 223(4) both require off-balance-sheet items to be valued at 100% of nominal when computing the exposure value E used for CRM, overriding the regulatory CCF. The actual CCF re-couples afterwards: under SA per Art. 228(1) the CCF is applied to E*, while under FIRB the actual CCF stays in EAD but is absent from the LGD* ratio. The pipeline therefore carries ead_gross (post-CCF, the actual EAD that feeds RWA) and ead_for_crm (the CCF=100% basis used to net collateral). For pure on-balance-sheet rows the two are equal.

class CRMProcessor:
    def get_crm_unified_bundle(
        self,
        data: ClassifiedExposuresBundle,
        config: CalculationConfig,
    ) -> CRMAdjustedBundle:
        """Apply CRM in correct order (Art. 111(1)(a)-(b) compliant).

        The single CRM entry point: returns the unified frame; the pipeline
        splits by approach once, just before the calculators."""

        # Step 1: Resolve provisions (before CCF)
        #   SA: drawn-first deduction, remainder reduces nominal
        #   IRB/Slotting: tracked but not deducted from EAD
        after_provisions = self._resolve_provisions(exposures, provisions, config)

        # Step 2: Apply CCFs (uses nominal_after_provision)
        after_ccf = self._apply_ccf(after_provisions, config)

        # Step 3: Initialize EAD waterfall — includes collect barrier
        #   (flattens deep lazy plan to prevent 3× re-evaluation downstream)
        after_init = self._initialize_ead(after_ccf)

        # Step 4: Collateral (3 lookup collects: direct/facility/counterparty)
        after_collateral = self._apply_collateral(after_init, collateral, config)

        # Step 5: Guarantees (cross-approach CCF substitution)
        after_guarantees = self._apply_guarantees(
            after_collateral, guarantees, counterparty_lookup, config
        )

        # Step 6: Finalize (no provision subtraction — already in ead_pre_crm)
        return self._finalize_ead(after_guarantees)

Stage 5: RWA Calculation

Purpose

Calculate RWA using appropriate approach for each exposure.

The frame arrives as an eager sealed re_split_exit edge before the calculators fork the plan three ways; the three per-approach branches are collected through materialise_sealed_branches (engine/stages/calc.py). See Collect Barriers & Stage Edges for the edge model that replaced the old ad-hoc materialise barrier.

SA Calculator

class SACalculator:
    def calculate_branch(
        self,
        exposures: pl.LazyFrame,        # pre-filtered SA-only rows
        config: CalculationConfig,
        *,
        errors: list[CalculationError] | None = None,
    ) -> pl.LazyFrame:
        """Calculate SA RWA on the SA branch."""
        return (
            exposures.pipe(apply_risk_weights, config, pack=pack)
            .pipe(apply_fcsm_rw_substitution, config)
            .pipe(apply_life_insurance_rw_mapping)
            .pipe(apply_guarantee_substitution, config, pack=pack)
            .pipe(apply_currency_mismatch_multiplier, config, pack=pack)
            .pipe(apply_due_diligence_override, config, errors=errors, pack=pack)
            .pipe(calculate_rwa)
            .pipe(apply_supporting_factors, config, errors=errors, pack=pack)  # CRR only
        )  # + approach_applied / rwa_final for the aggregator
        # transforms live in engine/sa/risk_weights.py, rw_adjustments.py, factors_output.py

IRB Calculator

class IRBCalculator:
    def calculate_branch(
        self,
        exposures: pl.LazyFrame,        # pre-filtered IRB-only rows
        config: CalculationConfig,
        *,
        errors: list[CalculationError] | None = None,
    ) -> pl.LazyFrame:
        """Calculate IRB RWA."""
        result = (
            exposures
            .with_columns(
                # Apply PD floor — read from the resolved rulepack, not config
                pd_floored=pl.max_horizontal(
                    pl.col("pd"),
                    pl.lit(pack.formula("pd_floors")["minimum"])
                )
            )
            .with_columns(
                # Calculate correlation
                correlation=self._calculate_correlation(
                    pl.col("exposure_class"),
                    pl.col("pd_floored"),
                    pl.col("turnover")
                ),
                # Calculate maturity adjustment
                ma=self._calculate_maturity_adjustment(
                    pl.col("pd_floored"),
                    pl.col("effective_maturity")
                )
            )
            .with_columns(
                # Calculate K
                k=self._calculate_k(
                    pl.col("pd_floored"),
                    pl.col("lgd"),
                    pl.col("correlation")
                )
            )
            .with_columns(
                # Calculate RWA — scaling factor read from the resolved rulepack
                rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") *
                    pack.scalar("scaling_factor")
            )
        )

        return result  # + approach_applied / rwa_final for the aggregator

Slotting Calculator

class SlottingCalculator:
    def calculate_branch(
        self,
        exposures: pl.LazyFrame,        # pre-filtered slotting-only rows
        config: CalculationConfig,
        *,
        errors: list[CalculationError] | None = None,
    ) -> pl.LazyFrame:
        """Calculate Slotting RWA."""
        return (
            exposures.pipe(prepare_columns, config)
            .pipe(apply_slotting_weights, config, pack=pack)   # Art. 153(5) tables
            .pipe(calculate_rwa)
            .pipe(apply_el_rates, config, pack=pack)
            .pipe(compute_el_shortfall_excess, errors=errors)
        )  # + supporting factors (CRR) and aggregator columns
        # transforms live in engine/slotting/transforms.py

Stage 6: Aggregation

Purpose

Combine results, apply output floor, produce final output.

Input

The three collected branch frames (SA / IRB / slotting) plus the equity result bundle

Output

AggregatedResultBundle with: - Total RWA - RWA by approach - RWA by exposure class - Floor impact (Basel 3.1) - Detailed breakdown

Output Floor (Basel 3.1)

def apply_output_floor(
    irb_rwa: pl.LazyFrame,
    sa_rwa: pl.LazyFrame,
    floor_percentage: float
) -> pl.LazyFrame:
    """Apply output floor to IRB results."""
    return (
        irb_rwa
        .join(sa_rwa, on="exposure_reference", suffix="_sa")
        .with_columns(
            floor=pl.col("rwa_sa") * floor_percentage,
            rwa_floored=pl.max_horizontal(
                pl.col("rwa"),
                pl.col("rwa_sa") * floor_percentage
            )
        )
    )

Error Handling

Each stage accumulates errors:

@dataclass
class LazyFrameResult:
    frame: pl.LazyFrame
    errors: list[CalculationError] = field(default_factory=list)

# Pipeline accumulates across stages
all_errors = []
all_errors.extend(loader_result.errors)
all_errors.extend(hierarchy_result.errors)
# ... etc.

Next Steps