Skip to content

Pipeline API

The pipeline module provides the main entry points for RWA calculation. See pipeline.py for the full implementation.

Module: rwa_calc.engine.pipeline

create_pipeline

Factory function to create a pipeline with default components:

Create a pipeline orchestrator with default components.

Parameters:

Name Type Description Default
data_path str | Path | None

Path to data directory (creates ParquetLoader)

None
loader LoaderProtocol | None

Pre-configured loader (overrides data_path)

None

Returns:

Type Description
PipelineOrchestrator

PipelineOrchestrator ready for use

Usage

With data path (uses ParquetLoader)

pipeline = create_pipeline(data_path="/path/to/data")

With custom loader

pipeline = create_pipeline(loader=CSVLoader("/path/to/data"))

Without loader (use run_with_data)

pipeline = create_pipeline()

PipelineOrchestrator

The main pipeline class (note: the class is PipelineOrchestrator, not RWAPipeline):

rwa_calc.engine.pipeline.PipelineOrchestrator

Orchestrate the complete RWA calculation pipeline.

Implements PipelineProtocol over the Phase 4 fold orchestrator: the stage sequence is the literal engine/registry.PIPELINE_STAGES list, folded by engine/orchestrator.run_stages. This facade owns the run lifecycle (run_id, edge capture, FX-rate sync, error merge, audit persistence) and the per-run component wiring.

Components can be injected for testing or customisation; defaults are built fresh every run from the effective config (never cached across runs — a framework switch on a reused orchestrator gets framework-fresh components).

Usage

orchestrator = PipelineOrchestrator(loader=ParquetLoader(base_path)) result = orchestrator.run(config)

run(config)

Execute the complete RWA calculation pipeline.

Requires a loader to be configured.

Parameters:

Name Type Description Default
config CalculationConfig

Calculation configuration

required

Returns:

Type Description
AggregatedResultBundle

AggregatedResultBundle with all results and audit trail

Raises:

Type Description
ValueError

If no loader is configured

run_with_data(data, config, *, rulepack=None)

Execute pipeline with pre-loaded data.

Folds a PipelineContext through the literal stage registry; all approach-specific calculators run sequentially on one unified LazyFrame, avoiding plan tree duplication and mid-pipeline materialisation.

Parameters:

Name Type Description Default
data RawDataBundle

Pre-loaded raw data bundle

required
config CalculationConfig

Calculation configuration

required
rulepack RulepackV0 | None

Pre-resolved rulepack used verbatim instead of RulepackV0.from_config(config) — for amendment overlays and tests that substitute a custom resolved pack (e.g. an overridden floor entry). The EUR/GBP FX-sync still runs on config; an injected pack is not re-derived from the synced config.

None

Returns:

Type Description
AggregatedResultBundle

AggregatedResultBundle with all results and audit trail

Pipeline Implementation (pipeline.py)
from rwa_calc.rulebook.audit import serialize_pack

if TYPE_CHECKING:
    import polars as pl

    from rwa_calc.contracts.bundles import AggregatedResultBundle, RawDataBundle
    from rwa_calc.contracts.config import CalculationConfig
    from rwa_calc.contracts.protocols import (
        ClassifierProtocol,
        CRMProcessorProtocol,
        EquityCalculatorProtocol,
        HierarchyResolverProtocol,
        IRBCalculatorProtocol,
        LoaderProtocol,
        OutputAggregatorProtocol,
        RealEstateSplitterProtocol,
        SACalculatorProtocol,
        SecuritisationAllocatorProtocol,
        SlottingCalculatorProtocol,
    )

logger = logging.getLogger(__name__)

__all__ = [
    "PipelineError",
    "PipelineOrchestrator",
    "create_pipeline",
    "create_test_pipeline",
]


# =============================================================================
# Pipeline Orchestrator Facade
# =============================================================================


class PipelineOrchestrator:
    """
    Orchestrate the complete RWA calculation pipeline.

    Implements PipelineProtocol over the Phase 4 fold orchestrator: the
    stage sequence is the literal ``engine/registry.PIPELINE_STAGES`` list,
    folded by ``engine/orchestrator.run_stages``. This facade owns the run
    lifecycle (run_id, edge capture, FX-rate sync, error merge, audit
    persistence) and the per-run component wiring.

    Components can be injected for testing or customisation; defaults are
    built fresh every run from the effective config (never cached across
    runs — a framework switch on a reused orchestrator gets framework-fresh
    components).

    Usage:
        orchestrator = PipelineOrchestrator(loader=ParquetLoader(base_path))
        result = orchestrator.run(config)
    """

    def __init__(
        self,
        loader: LoaderProtocol | None = None,
        securitisation_allocator: SecuritisationAllocatorProtocol | None = None,
        hierarchy_resolver: HierarchyResolverProtocol | None = None,
        classifier: ClassifierProtocol | None = None,
        crm_processor: CRMProcessorProtocol | None = None,
        re_splitter: RealEstateSplitterProtocol | None = None,
        sa_calculator: SACalculatorProtocol | None = None,
        irb_calculator: IRBCalculatorProtocol | None = None,
        slotting_calculator: SlottingCalculatorProtocol | None = None,
        equity_calculator: EquityCalculatorProtocol | None = None,
        output_aggregator: OutputAggregatorProtocol | None = None,
    ) -> None:
        """
        Initialize the pipeline facade.

        Args:
            loader: Data loader (optional - required for run())
            securitisation_allocator: Securitisation pool allocator
            hierarchy_resolver: Hierarchy resolver
            classifier: Exposure classifier
            crm_processor: CRM processor
            re_splitter: Real estate loan-splitter (CRR Art. 125/126,
                B3.1 Art. 124F/H)
            sa_calculator: SA calculator
            irb_calculator: IRB calculator
            slotting_calculator: Slotting calculator
            equity_calculator: Equity calculator
            output_aggregator: Output aggregator
        """
        self._loader = loader
        self._securitisation_allocator = securitisation_allocator
        self._hierarchy_resolver = hierarchy_resolver
        self._classifier = classifier
        self._crm_processor = crm_processor
        self._re_splitter = re_splitter
        self._sa_calculator = sa_calculator
        self._irb_calculator = irb_calculator
        self._slotting_calculator = slotting_calculator
        self._equity_calculator = equity_calculator
        self._output_aggregator = output_aggregator

    # =========================================================================
    # Public API

Usage Examples

Basic Usage

from datetime import date
from rwa_calc.engine.pipeline import create_pipeline
from rwa_calc.contracts.config import CalculationConfig

# Create pipeline
pipeline = create_pipeline()

# Configure for CRR
config = CalculationConfig.crr(
    reporting_date=date(2026, 12, 31),
)

# Run calculation
result = pipeline.run(config)

# Access results
print(f"Total RWA: {result.total_rwa:,.0f}")
print(f"SA RWA: {result.sa_rwa:,.0f}")
print(f"IRB RWA: {result.irb_rwa:,.0f}")

With Custom Data Path

from pathlib import Path
from rwa_calc.engine.loader import ParquetLoader

# Load data from custom path
loader = ParquetLoader()
raw_data = loader.load(Path("/path/to/data"))

# Run with pre-loaded data
result = pipeline.run_with_data(raw_data, config)

Framework Comparison

# Run under both frameworks
config_crr = CalculationConfig.crr(date(2026, 12, 31))
config_b31 = CalculationConfig.basel_3_1(date(2027, 1, 1))

result_crr = pipeline.run(config_crr)
result_b31 = pipeline.run(config_b31)

# Compare
impact = (result_b31.total_rwa / result_crr.total_rwa - 1) * 100
print(f"Basel 3.1 impact: {impact:+.1f}%")

Error Handling

result = pipeline.run(config)

# Check for errors
if result.has_errors:
    print("Calculation completed with errors:")
    for error in result.errors:
        print(f"  - {error.exposure_id}: {error.message}")

# Check for warnings
if result.has_warnings:
    print("Warnings:")
    for warning in result.warnings:
        print(f"  - {warning.message}")