Data Validation Guide¶
This guide explains how data validation works in the RWA calculator, the complete set of validation functions, and how to troubleshoot data issues.
Source of truth: All validation utilities are in
src/rwa_calc/contracts/validation.py. Valid value constraints are defined inCOLUMN_VALUE_CONSTRAINTSinsrc/rwa_calc/data/schemas.py.
Overview¶
The RWA calculator validates input data at multiple stages:
- Load-time validation — Schema checks when data is loaded
- Pipeline boundary validation — Checks at each processing stage
- Business rule validation — Domain-specific constraints (PD/LGD ranges, risk type codes)
- Column value validation — Categorical values against allowed sets
Validation is performed without materialising data where possible, using Polars LazyFrame
schema inspection for efficiency. Only column value validation requires .collect().
Schema Validation Functions¶
validate_schema()¶
Validates a LazyFrame's schema against an expected schema dictionary without materialising data.
from rwa_calc.contracts.validation import validate_schema
from rwa_calc.data.schemas import FACILITY_SCHEMA
import polars as pl
facilities = pl.scan_parquet("data/exposures/facilities.parquet")
errors = validate_schema(
lf=facilities,
expected_schema=FACILITY_SCHEMA,
context="facilities",
strict=False # Set True to flag unexpected extra columns
)
if errors:
for error in errors:
print(f" - {error}")
Parameters:
| Parameter | Type | Description |
|---|---|---|
lf |
pl.LazyFrame |
LazyFrame to validate |
expected_schema |
dict[str, pl.DataType] |
Expected column names and types |
context |
str |
Label for error messages (e.g., "facilities") |
strict |
bool |
If True, flags unexpected extra columns |
Returns: list[str] — plain string error messages (empty if valid).
validate_required_columns()¶
Checks that specific columns are present (without type checking).
from rwa_calc.contracts.validation import validate_required_columns
missing = validate_required_columns(
lf=counterparties,
required_columns=["counterparty_reference", "entity_type", "country_code"],
context="counterparties"
)
Returns: list[str] — missing-column error messages.
validate_schema_to_errors()¶
Same logic as validate_schema() but returns structured CalculationError objects for
integration with the pipeline error accumulation pattern.
from rwa_calc.contracts.validation import validate_schema_to_errors
from rwa_calc.data.schemas import LOAN_SCHEMA
errors = validate_schema_to_errors(
lf=loans,
expected_schema=LOAN_SCHEMA,
context="loans"
)
for error in errors:
print(f"Code: {error.code}, Field: {error.field_name}")
print(f"Expected: {error.expected_value}, Actual: {error.actual_value}")
Returns: list[CalculationError] — with category SCHEMA_VALIDATION, severity ERROR.
Bundle Validation Functions¶
These functions validate entire pipeline bundles at stage boundaries, checking that expected columns exist after each transformation.
validate_raw_data_bundle()¶
Validates all LazyFrames in a RawDataBundle against expected schemas.
from rwa_calc.contracts.validation import validate_raw_data_bundle
errors = validate_raw_data_bundle(bundle, schemas)
Validates up to 11 named frames: facilities, loans, contingents, counterparties,
collateral, guarantees, provisions, ratings, facility_mappings, org_mappings,
lending_mappings.
Returns: list[CalculationError]
validate_resolved_hierarchy_bundle()¶
Validates that hierarchy columns exist in a ResolvedHierarchyBundle.exposures LazyFrame.
from rwa_calc.contracts.validation import validate_resolved_hierarchy_bundle
hierarchy_columns = [
"counterparty_has_parent", "parent_counterparty_reference",
"ultimate_parent_reference", "counterparty_hierarchy_depth",
"rating_inherited", "rating_source_counterparty",
]
errors = validate_resolved_hierarchy_bundle(bundle, hierarchy_columns)
Parameters:
| Parameter | Type | Description |
|---|---|---|
bundle |
ResolvedHierarchyBundle |
Bundle to validate |
expected_columns |
list[str] |
Hierarchy columns to check for |
Returns: list[CalculationError]
validate_classified_bundle()¶
Validates classification columns across all_exposures, sa_exposures, and
irb_exposures in a ClassifiedExposuresBundle.
from rwa_calc.contracts.validation import validate_classified_bundle
classification_columns = [
"exposure_class", "approach_applied", "cqs", "pd", "is_sme",
]
errors = validate_classified_bundle(bundle, classification_columns)
Returns: list[CalculationError]
validate_crm_adjusted_bundle()¶
Validates CRM-related columns across exposures, sa_exposures, and irb_exposures
in a CRMAdjustedBundle.
from rwa_calc.contracts.validation import validate_crm_adjusted_bundle
crm_columns = [
"ccf_applied", "gross_ead", "final_ead",
"collateral_adjusted_value", "ead_after_collateral",
]
errors = validate_crm_adjusted_bundle(bundle, crm_columns)
Returns: list[CalculationError]
Business Rule Validators¶
These functions add boolean validation flag columns to LazyFrames without materialising data.
The flag columns follow the naming convention _valid_{column_name}.
validate_non_negative_amounts()¶
Adds validation flag columns for non-negative amount checks.
from rwa_calc.contracts.validation import validate_non_negative_amounts
validated = validate_non_negative_amounts(
lf=loans,
amount_columns=["drawn_amount", "limit"],
context="loans"
)
# Adds _valid_drawn_amount and _valid_limit boolean columns
Returns: pl.LazyFrame — with added _valid_{col} flag columns.
validate_pd_range()¶
Validates that PD values are in [0, 1].
from rwa_calc.contracts.validation import validate_pd_range
validated = validate_pd_range(lf=ratings, pd_column="pd", min_pd=0.0, max_pd=1.0)
valid_ratings = validated.filter(pl.col("_valid_pd"))
Returns: pl.LazyFrame — with _valid_pd column.
validate_lgd_range()¶
Validates that LGD values are in [0, 1.25]. The upper bound exceeds 1.0 because LGD can legitimately exceed 100% in certain Basel scenarios.
from rwa_calc.contracts.validation import validate_lgd_range
validated = validate_lgd_range(lf=exposures, lgd_column="lgd", min_lgd=0.0, max_lgd=1.25)
Returns: pl.LazyFrame — with _valid_lgd column.
validate_ccf_modelled()¶
Validates that modelled CCF values are in [0.0, 1.5]. Null values are treated as valid since the field is optional. The 150% cap accommodates Retail IRB CCFs that can exceed 100% due to additional drawdown behaviour during stress.
from rwa_calc.contracts.validation import validate_ccf_modelled
validated = validate_ccf_modelled(lf=facilities, column="ccf_modelled")
# Adds _valid_ccf_modelled boolean column
Returns: pl.LazyFrame — with _valid_ccf_modelled column.
Risk-type validation lives in the data layer
Input risk_type values are validated by the bundle-level value validation
below (COLUMN_VALUE_CONSTRAINTS in data/schemas.py defines
VALID_RISK_TYPES_INPUT and RISK_TYPE_SYNONYMS), and short codes are
normalised inside the CCF lookup (engine/ccf.py::_normalize_risk_type,
using RISK_TYPE_SYNONYMS from data/schemas.py). The former
standalone validate_risk_type() / normalize_risk_type() helpers were
dead code and have been removed.
Column Value Validation¶
These functions check actual data values against allowed sets. They are the only
validation functions that materialise data (call .collect()).
validate_column_values()¶
Validates that all non-null values in a column belong to a set of allowed values. Performs case-insensitive comparison. Groups invalid values by distinct value with counts.
from rwa_calc.contracts.validation import validate_column_values
from rwa_calc.data.schemas import VALID_ENTITY_TYPES
errors = validate_column_values(
lf=counterparties,
column="entity_type",
valid_values=VALID_ENTITY_TYPES,
context="counterparties"
)
for error in errors:
print(f"Invalid value '{error.actual_value}' found {error.message}")
Returns: list[CalculationError] — with code ERROR_INVALID_COLUMN_VALUE,
severity WARNING, category DATA_QUALITY.
validate_bundle_values()¶
Validates all categorical column values across an entire RawDataBundle in one call.
Uses the COLUMN_VALUE_CONSTRAINTS registry from data/schemas.py by default.
from rwa_calc.contracts.validation import validate_bundle_values
# Using default constraints from COLUMN_VALUE_CONSTRAINTS
errors = validate_bundle_values(bundle)
# Or with custom constraints
custom_constraints = {
"counterparties": {"entity_type": {"corporate", "institution"}},
}
errors = validate_bundle_values(bundle, constraints=custom_constraints)
The function validates these tables (when present in the bundle):
| Table | Validated Columns |
|---|---|
facilities |
seniority |
loans |
seniority |
contingents |
seniority, bs_type |
counterparties |
entity_type |
collateral |
collateral_type, property_type, issuer_type, valuation_type, beneficiary_type |
provisions |
provision_type, beneficiary_type |
ratings |
rating_type |
specialised_lending |
sl_type, slotting_category |
equity_exposures |
equity_type |
guarantees |
beneficiary_type |
facility_mappings |
child_type |
Performance: Internally uses _validate_table_columns_batched() which checks
multiple columns per table in a single .collect() call.
Returns: list[CalculationError]
Type Compatibility¶
The validator allows some type flexibility:
| Expected Type | Allowed Actual Types |
|---|---|
Int64 |
Int8, Int16, Int32, Int64 |
Float64 |
Float32, Float64 |
String |
Utf8, String |
This means if your file has Int32 but the schema expects Int64, validation will pass.
Validation in the Pipeline¶
The pipeline validates data at stage boundaries:
Load → [validate_raw_data_bundle] → Hierarchy → [validate_resolved_hierarchy_bundle]
→ Classify → [validate_classified_bundle] → CRM → [validate_crm_adjusted_bundle] → ...
If validation fails, the pipeline:
- Accumulates errors — Does not fail immediately
- Continues where possible — Processes valid records
- Reports all issues — Returns complete error list in the result bundle
Common Validation Issues¶
1. Missing Column¶
Fix: Add the missing column with a default value:
2. Type Mismatch¶
Fix: Cast the column to the correct type:
3. Invalid Categorical Values¶
Fix: Map invalid values to valid ones:
counterparties = counterparties.with_columns(
pl.col("entity_type").str.to_lowercase().replace({"corp": "corporate"})
)
4. Date Format Issues¶
Fix: Parse dates from strings:
5. Invalid PD/LGD Values¶
Fix: Clip values to valid ranges:
Debugging Tips¶
Inspect Schema Before Validation¶
import polars as pl
lf = pl.scan_parquet("data/facilities.parquet")
print("Actual schema:")
for name, dtype in lf.collect_schema().items():
print(f" {name}: {dtype}")
Compare Expected vs Actual¶
from rwa_calc.data.schemas import FACILITY_SCHEMA
expected_cols = set(FACILITY_SCHEMA.keys())
actual_cols = set(lf.collect_schema().names())
print(f"Missing columns: {expected_cols - actual_cols}")
print(f"Extra columns: {actual_cols - expected_cols}")
Check Value Distributions¶
pd_stats = ratings.select([
pl.col("pd").min().alias("min"),
pl.col("pd").max().alias("max"),
pl.col("pd").mean().alias("mean"),
pl.col("pd").null_count().alias("nulls"),
]).collect()
print(pd_stats)
Next Steps¶
- Input Schemas — Complete schema definitions
- Data Flow — How data moves through pipeline
- Error Handling — Error types and handling