# Copyright 2025 Michael Anckaert
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pre-flight validation system for ETL pipeline configurations.
This module provides comprehensive validation for multi-pipeline configurations,
including connectivity testing, resource conflict detection, and configuration
consistency checks.
"""
import logging
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from extral.config import Config, PipelineConfig, DatabaseConfig, FileConfig
from extral.exceptions import ValidationException, ConnectionException
from extral.connectors.database.mysql import MySQLConnector
from extral.connectors.database.postgresql import PostgreSQLConnector
from extral.connectors.file.csv_connector import CSVConnector
from extral.connectors.file.json_connector import JSONConnector
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ValidationResult:
"""Result of a validation operation."""
is_valid: bool
error_message: Optional[str] = None
warnings: List[str] = field(default_factory=list)
details: Dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class PipelineValidationResult:
"""Validation result for a single pipeline."""
pipeline_name: str
connectivity_result: ValidationResult
resource_result: ValidationResult
config_result: ValidationResult
overall_valid: bool = False
[docs]
def __post_init__(self):
"""Calculate overall validity after initialization."""
self.overall_valid = (
self.connectivity_result.is_valid and
self.resource_result.is_valid and
self.config_result.is_valid
)
[docs]
@dataclass
class ValidationReport:
"""Comprehensive validation report for all pipelines."""
overall_valid: bool = False
pipeline_results: Dict[str, PipelineValidationResult] = field(default_factory=dict)
global_conflicts: ValidationResult = field(default_factory=lambda: ValidationResult(is_valid=True))
summary: Dict[str, Any] = field(default_factory=dict)
[docs]
def __post_init__(self):
"""Calculate overall validity and summary after initialization."""
self._update_summary()
def _update_summary(self):
"""Update the summary and overall validity based on current state."""
if self.pipeline_results:
valid_pipelines = sum(1 for result in self.pipeline_results.values() if result.overall_valid)
total_pipelines = len(self.pipeline_results)
self.overall_valid = (
self.global_conflicts.is_valid and
valid_pipelines == total_pipelines and
total_pipelines > 0
)
self.summary = {
"total_pipelines": total_pipelines,
"valid_pipelines": valid_pipelines,
"failed_pipelines": total_pipelines - valid_pipelines,
"has_global_conflicts": not self.global_conflicts.is_valid,
}
else:
self.overall_valid = False
self.summary = {
"total_pipelines": 0,
"valid_pipelines": 0,
"failed_pipelines": 0,
"has_global_conflicts": not self.global_conflicts.is_valid,
}
[docs]
class PipelineValidator:
"""Comprehensive pipeline validation system."""
def __init__(self):
"""Initialize the validator."""
self.logger = logging.getLogger(__name__)
[docs]
def validate_configuration(self, config: Config) -> ValidationReport:
"""
Perform comprehensive validation of the entire configuration.
Args:
config: Configuration object to validate
Returns:
ValidationReport with detailed results
"""
report = ValidationReport()
try:
# Validate global resource conflicts first
report.global_conflicts = self._validate_global_resource_conflicts(config.pipelines)
# Validate each pipeline individually
for pipeline in config.pipelines:
pipeline_result = self._validate_pipeline(pipeline)
report.pipeline_results[pipeline.name] = pipeline_result
# Update the summary and overall validity after populating all results
report._update_summary()
logger.info(f"Configuration validation completed. Overall valid: {report.overall_valid}")
except Exception as e:
logger.error(f"Validation failed with unexpected error: {str(e)}")
report.overall_valid = False
report.global_conflicts = ValidationResult(
is_valid=False,
error_message=f"Validation system error: {str(e)}",
details={"exception_type": type(e).__name__}
)
return report
def _validate_pipeline(self, pipeline: PipelineConfig) -> PipelineValidationResult:
"""
Validate a single pipeline configuration.
Args:
pipeline: Pipeline configuration to validate
Returns:
PipelineValidationResult with validation details
"""
logger.debug(f"Validating pipeline: {pipeline.name}")
# Test connectivity
connectivity_result = self._test_pipeline_connectivity(pipeline)
# Validate resources
resource_result = self._validate_pipeline_resources(pipeline)
# Validate configuration consistency
config_result = self._validate_pipeline_configuration(pipeline)
return PipelineValidationResult(
pipeline_name=pipeline.name,
connectivity_result=connectivity_result,
resource_result=resource_result,
config_result=config_result
)
def _test_pipeline_connectivity(self, pipeline: PipelineConfig) -> ValidationResult:
"""
Test connectivity for all components in a pipeline.
Args:
pipeline: Pipeline to test
Returns:
ValidationResult with connectivity test results
"""
try:
warnings: List[str] = []
details: Dict[str, Any] = {}
# Test source connectivity
source_connector = self._create_connector(pipeline.source)
try:
source_connector.test_connection()
details["source_connectivity"] = "success"
logger.debug(f"Source connectivity test passed for pipeline: {pipeline.name}")
except ConnectionException as e:
details["source_connectivity"] = "failed"
details["source_error"] = str(e)
return ValidationResult(
is_valid=False,
error_message=f"Source connectivity failed: {str(e)}",
details=details
)
# Test destination connectivity
dest_connector = self._create_connector(pipeline.destination)
try:
dest_connector.test_connection()
details["destination_connectivity"] = "success"
logger.debug(f"Destination connectivity test passed for pipeline: {pipeline.name}")
except ConnectionException as e:
details["destination_connectivity"] = "failed"
details["destination_error"] = str(e)
return ValidationResult(
is_valid=False,
error_message=f"Destination connectivity failed: {str(e)}",
details=details
)
return ValidationResult(is_valid=True, warnings=warnings, details=details)
except Exception as e:
logger.error(f"Connectivity test failed for pipeline {pipeline.name}: {str(e)}")
return ValidationResult(
is_valid=False,
error_message=f"Connectivity test error: {str(e)}",
details={"exception_type": type(e).__name__}
)
def _validate_pipeline_resources(self, pipeline: PipelineConfig) -> ValidationResult:
"""
Validate pipeline resource configuration (tables, files).
Args:
pipeline: Pipeline to validate
Returns:
ValidationResult with resource validation results
"""
try:
warnings: List[str] = []
details: Dict[str, Any] = {}
# Validate source resources
if isinstance(pipeline.source, DatabaseConfig):
if not pipeline.source.tables:
return ValidationResult(
is_valid=False,
error_message="Database source has no tables configured",
details={"source_type": "database"}
)
# Check for duplicate table names
table_names = [table.name for table in pipeline.source.tables]
if len(table_names) != len(set(table_names)):
duplicates = [name for name in set(table_names) if table_names.count(name) > 1]
return ValidationResult(
is_valid=False,
error_message=f"Duplicate table names found: {duplicates}",
details={"source_type": "database", "duplicates": duplicates}
)
details["source_tables"] = len(table_names)
elif isinstance(pipeline.source, FileConfig):
if not pipeline.source.files:
return ValidationResult(
is_valid=False,
error_message="File source has no files configured",
details={"source_type": "file"}
)
# Check for duplicate file names/paths
file_paths = []
for file_item in pipeline.source.files:
if file_item.file_path:
file_paths.append(file_item.file_path)
elif file_item.http_path:
file_paths.append(file_item.http_path)
if len(file_paths) != len(set(file_paths)):
duplicates = [path for path in set(file_paths) if file_paths.count(path) > 1]
return ValidationResult(
is_valid=False,
error_message=f"Duplicate file paths found: {duplicates}",
details={"source_type": "file", "duplicates": duplicates}
)
details["source_files"] = len(file_paths)
return ValidationResult(is_valid=True, warnings=warnings, details=details)
except Exception as e:
logger.error(f"Resource validation failed for pipeline {pipeline.name}: {str(e)}")
return ValidationResult(
is_valid=False,
error_message=f"Resource validation error: {str(e)}",
details={"exception_type": type(e).__name__}
)
def _validate_pipeline_configuration(self, pipeline: PipelineConfig) -> ValidationResult:
"""
Validate pipeline configuration consistency.
Args:
pipeline: Pipeline to validate
Returns:
ValidationResult with configuration validation results
"""
try:
warnings: List[str] = []
details: Dict[str, Any] = {}
# Validate worker count
if pipeline.workers is not None and pipeline.workers <= 0:
return ValidationResult(
is_valid=False,
error_message=f"Invalid worker count: {pipeline.workers}. Must be positive.",
details={"workers": pipeline.workers}
)
# Validate source-specific configuration
if isinstance(pipeline.source, DatabaseConfig):
# Validate table configurations
for table in pipeline.source.tables:
if table.strategy.value == "merge" and not table.merge_key:
warnings.append(f"Table {table.name} uses merge strategy but no merge_key specified")
if table.batch_size is not None and table.batch_size <= 0:
return ValidationResult(
is_valid=False,
error_message=f"Invalid batch size for table {table.name}: {table.batch_size}",
details={"table": table.name, "batch_size": table.batch_size}
)
elif isinstance(pipeline.source, FileConfig):
# Validate file configurations
for file_item in pipeline.source.files:
if file_item.strategy.value == "merge" and not file_item.merge_key:
file_name = file_item.file_path or file_item.http_path or "unknown"
warnings.append(f"File {file_name} uses merge strategy but no merge_key specified")
if file_item.batch_size is not None and file_item.batch_size <= 0:
file_name = file_item.file_path or file_item.http_path or "unknown"
return ValidationResult(
is_valid=False,
error_message=f"Invalid batch size for file {file_name}: {file_item.batch_size}",
details={"file": file_name, "batch_size": file_item.batch_size}
)
return ValidationResult(is_valid=True, warnings=warnings, details=details)
except Exception as e:
logger.error(f"Configuration validation failed for pipeline {pipeline.name}: {str(e)}")
return ValidationResult(
is_valid=False,
error_message=f"Configuration validation error: {str(e)}",
details={"exception_type": type(e).__name__}
)
def _validate_global_resource_conflicts(self, pipelines: List[PipelineConfig]) -> ValidationResult:
"""
Check for resource conflicts between pipelines.
Args:
pipelines: List of all pipelines to check for conflicts
Returns:
ValidationResult indicating if there are global conflicts
"""
try:
warnings: List[str] = []
details: Dict[str, Any] = {}
# Track database resources (host:port:database:schema:table)
db_resources: Dict[str, List[str]] = {}
# Track file resources (file paths)
file_resources: Dict[str, List[str]] = {}
for pipeline in pipelines:
# Check database destination conflicts
if isinstance(pipeline.destination, DatabaseConfig):
dest_key = f"{pipeline.destination.host}:{pipeline.destination.port}:{pipeline.destination.database}"
schema = getattr(pipeline.destination, 'schema', 'public')
# For database sources, check table-level conflicts
if isinstance(pipeline.source, DatabaseConfig):
for table in pipeline.source.tables:
table_key = f"{dest_key}:{schema}:{table.name}"
if table_key not in db_resources:
db_resources[table_key] = []
db_resources[table_key].append(pipeline.name)
# For file sources, use the destination database as the resource key
else:
if dest_key not in db_resources:
db_resources[dest_key] = []
db_resources[dest_key].append(pipeline.name)
# Check file destination conflicts
elif isinstance(pipeline.destination, FileConfig):
# File destinations could potentially conflict
for file_item in pipeline.destination.files:
dest_path = None
if file_item.file_path:
dest_path = file_item.file_path
elif file_item.http_path:
dest_path = file_item.http_path
if dest_path:
if dest_path not in file_resources:
file_resources[dest_path] = []
file_resources[dest_path].append(pipeline.name)
# Check for conflicts
conflicts = []
# Database conflicts
for resource, pipelines_using in db_resources.items():
if len(pipelines_using) > 1:
conflicts.append(f"Database resource '{resource}' used by pipelines: {', '.join(pipelines_using)}")
# File conflicts
for resource, pipelines_using in file_resources.items():
if len(pipelines_using) > 1:
conflicts.append(f"File resource '{resource}' used by pipelines: {', '.join(pipelines_using)}")
if conflicts:
return ValidationResult(
is_valid=False,
error_message="Resource conflicts detected between pipelines",
details={"conflicts": conflicts}
)
# Add summary details
details["database_resources_checked"] = len(db_resources)
details["file_resources_checked"] = len(file_resources)
return ValidationResult(is_valid=True, warnings=warnings, details=details)
except Exception as e:
logger.error(f"Global resource conflict validation failed: {str(e)}")
return ValidationResult(
is_valid=False,
error_message=f"Global validation error: {str(e)}",
details={"exception_type": type(e).__name__}
)
def _create_connector(self, config: Union[DatabaseConfig, FileConfig]):
"""
Create appropriate connector instance for testing connectivity.
Args:
config: Database or file configuration
Returns:
Connector instance for testing
Raises:
ValidationException: If connector type is not supported
"""
if isinstance(config, DatabaseConfig):
if config.type.lower() == "mysql":
mysql_connector = MySQLConnector()
mysql_connector.connect(config)
return mysql_connector
elif config.type.lower() == "postgresql":
pg_connector = PostgreSQLConnector()
pg_connector.connect(config)
return pg_connector
else:
raise ValidationException(
f"Unsupported database type for connectivity testing: {config.type}",
operation="create_connector",
details={"database_type": config.type}
)
elif isinstance(config, FileConfig):
# For file connectors, we need to create a connector with a file item
# We'll use the first file item for testing purposes
if not config.files:
raise ValidationException(
"No files configured for file connector testing",
operation="create_connector"
)
file_item = config.files[0]
if config.type.lower() == "csv":
return CSVConnector(file_item)
elif config.type.lower() == "json":
return JSONConnector(file_item)
else:
raise ValidationException(
f"Unsupported file type for connectivity testing: {config.type}",
operation="create_connector",
details={"file_type": config.type}
)
else:
raise ValidationException(
f"Unsupported config type for connectivity testing: {type(config)}",
operation="create_connector",
details={"config_type": str(type(config))}
)