Pipeline API¶
The pipeline module provides the main entry points for RWA calculation. See pipeline.py for the full implementation.
Module: rwa_calc.engine.pipeline¶
create_pipeline¶
Factory function to create a pipeline with default components:
Create a pipeline orchestrator with default components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_path
|
str | Path | None
|
Path to data directory (creates ParquetLoader) |
None
|
loader
|
LoaderProtocol | None
|
Pre-configured loader (overrides data_path) |
None
|
Returns:
| Type | Description |
|---|---|
PipelineOrchestrator
|
PipelineOrchestrator ready for use |
PipelineOrchestrator¶
The main pipeline class (note: the class is PipelineOrchestrator, not RWAPipeline):
rwa_calc.engine.pipeline.PipelineOrchestrator
¶
Orchestrate the complete RWA calculation pipeline.
Implements PipelineProtocol over the Phase 4 fold orchestrator: the
stage sequence is the literal engine/registry.PIPELINE_STAGES list,
folded by engine/orchestrator.run_stages. This facade owns the run
lifecycle (run_id, edge capture, FX-rate sync, error merge, audit
persistence) and the per-run component wiring.
Components can be injected for testing or customisation; defaults are built fresh every run from the effective config (never cached across runs — a framework switch on a reused orchestrator gets framework-fresh components).
Usage
orchestrator = PipelineOrchestrator(loader=ParquetLoader(base_path)) result = orchestrator.run(config)
run(config)
¶
Execute the complete RWA calculation pipeline.
Requires a loader to be configured.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
CalculationConfig
|
Calculation configuration |
required |
Returns:
| Type | Description |
|---|---|
AggregatedResultBundle
|
AggregatedResultBundle with all results and audit trail |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no loader is configured |
run_with_data(data, config, *, rulepack=None)
¶
Execute pipeline with pre-loaded data.
Folds a PipelineContext through the literal stage registry; all approach-specific calculators run sequentially on one unified LazyFrame, avoiding plan tree duplication and mid-pipeline materialisation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
RawDataBundle
|
Pre-loaded raw data bundle |
required |
config
|
CalculationConfig
|
Calculation configuration |
required |
rulepack
|
RulepackV0 | None
|
Pre-resolved rulepack used verbatim instead of
|
None
|
Returns:
| Type | Description |
|---|---|
AggregatedResultBundle
|
AggregatedResultBundle with all results and audit trail |
Pipeline Implementation (pipeline.py)
from rwa_calc.rulebook.audit import serialize_pack
if TYPE_CHECKING:
import polars as pl
from rwa_calc.contracts.bundles import AggregatedResultBundle, RawDataBundle
from rwa_calc.contracts.config import CalculationConfig
from rwa_calc.contracts.protocols import (
ClassifierProtocol,
CRMProcessorProtocol,
EquityCalculatorProtocol,
HierarchyResolverProtocol,
IRBCalculatorProtocol,
LoaderProtocol,
OutputAggregatorProtocol,
RealEstateSplitterProtocol,
SACalculatorProtocol,
SecuritisationAllocatorProtocol,
SlottingCalculatorProtocol,
)
logger = logging.getLogger(__name__)
__all__ = [
"PipelineError",
"PipelineOrchestrator",
"create_pipeline",
"create_test_pipeline",
]
# =============================================================================
# Pipeline Orchestrator Facade
# =============================================================================
class PipelineOrchestrator:
"""
Orchestrate the complete RWA calculation pipeline.
Implements PipelineProtocol over the Phase 4 fold orchestrator: the
stage sequence is the literal ``engine/registry.PIPELINE_STAGES`` list,
folded by ``engine/orchestrator.run_stages``. This facade owns the run
lifecycle (run_id, edge capture, FX-rate sync, error merge, audit
persistence) and the per-run component wiring.
Components can be injected for testing or customisation; defaults are
built fresh every run from the effective config (never cached across
runs — a framework switch on a reused orchestrator gets framework-fresh
components).
Usage:
orchestrator = PipelineOrchestrator(loader=ParquetLoader(base_path))
result = orchestrator.run(config)
"""
def __init__(
self,
loader: LoaderProtocol | None = None,
securitisation_allocator: SecuritisationAllocatorProtocol | None = None,
hierarchy_resolver: HierarchyResolverProtocol | None = None,
classifier: ClassifierProtocol | None = None,
crm_processor: CRMProcessorProtocol | None = None,
re_splitter: RealEstateSplitterProtocol | None = None,
sa_calculator: SACalculatorProtocol | None = None,
irb_calculator: IRBCalculatorProtocol | None = None,
slotting_calculator: SlottingCalculatorProtocol | None = None,
equity_calculator: EquityCalculatorProtocol | None = None,
output_aggregator: OutputAggregatorProtocol | None = None,
) -> None:
"""
Initialize the pipeline facade.
Args:
loader: Data loader (optional - required for run())
securitisation_allocator: Securitisation pool allocator
hierarchy_resolver: Hierarchy resolver
classifier: Exposure classifier
crm_processor: CRM processor
re_splitter: Real estate loan-splitter (CRR Art. 125/126,
B3.1 Art. 124F/H)
sa_calculator: SA calculator
irb_calculator: IRB calculator
slotting_calculator: Slotting calculator
equity_calculator: Equity calculator
output_aggregator: Output aggregator
"""
self._loader = loader
self._securitisation_allocator = securitisation_allocator
self._hierarchy_resolver = hierarchy_resolver
self._classifier = classifier
self._crm_processor = crm_processor
self._re_splitter = re_splitter
self._sa_calculator = sa_calculator
self._irb_calculator = irb_calculator
self._slotting_calculator = slotting_calculator
self._equity_calculator = equity_calculator
self._output_aggregator = output_aggregator
# =========================================================================
# Public API
Usage Examples¶
Basic Usage¶
from datetime import date
from rwa_calc.engine.pipeline import create_pipeline
from rwa_calc.contracts.config import CalculationConfig
# Create pipeline
pipeline = create_pipeline()
# Configure for CRR
config = CalculationConfig.crr(
reporting_date=date(2026, 12, 31),
)
# Run calculation
result = pipeline.run(config)
# Access results
print(f"Total RWA: {result.total_rwa:,.0f}")
print(f"SA RWA: {result.sa_rwa:,.0f}")
print(f"IRB RWA: {result.irb_rwa:,.0f}")
With Custom Data Path¶
from pathlib import Path
from rwa_calc.engine.loader import ParquetLoader
# Load data from custom path
loader = ParquetLoader()
raw_data = loader.load(Path("/path/to/data"))
# Run with pre-loaded data
result = pipeline.run_with_data(raw_data, config)
Framework Comparison¶
# Run under both frameworks
config_crr = CalculationConfig.crr(date(2026, 12, 31))
config_b31 = CalculationConfig.basel_3_1(date(2027, 1, 1))
result_crr = pipeline.run(config_crr)
result_b31 = pipeline.run(config_b31)
# Compare
impact = (result_b31.total_rwa / result_crr.total_rwa - 1) * 100
print(f"Basel 3.1 impact: {impact:+.1f}%")
Error Handling¶
result = pipeline.run(config)
# Check for errors
if result.has_errors:
print("Calculation completed with errors:")
for error in result.errors:
print(f" - {error.exposure_id}: {error.message}")
# Check for warnings
if result.has_warnings:
print("Warnings:")
for warning in result.warnings:
print(f" - {warning.message}")