Source code for extral.config

# Copyright 2025 Michael Anckaert
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from enum import Enum
from typing import Any, Dict, Optional, Union
from dataclasses import dataclass, field

import yaml

logger = logging.getLogger(__name__)


[docs] class LoadStrategy(Enum): """Enumeration of supported load strategies.""" APPEND = "append" REPLACE = "replace" MERGE = "merge"
[docs] class ReplaceMethod(Enum): """Enumeration of supported replace methods.""" TRUNCATE = "truncate" RECREATE = "recreate"
[docs] @dataclass class ExtractConfig: """Configuration for data extraction operations.""" extract_type: Optional[str] = None incremental_field: Optional[str] = None last_value: Optional[Union[str, int]] = None batch_size: Optional[int] = None
[docs] def to_dict(self) -> dict[str, Optional[Union[str, int]]]: """Convert to dictionary format for backward compatibility.""" return { "extract_type": self.extract_type, "incremental_field": self.incremental_field, "last_value": self.last_value, "batch_size": self.batch_size, }
[docs] @dataclass class LoadConfig: """Configuration for data loading operations.""" strategy: LoadStrategy = LoadStrategy.REPLACE replace_method: ReplaceMethod = ReplaceMethod.RECREATE merge_key: Optional[str] = None batch_size: Optional[int] = None
[docs] @dataclass class LoggingConfig: """Configuration for logging.""" level: str = "INFO"
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "LoggingConfig": """Create LoggingConfig from dictionary.""" return cls(level=data.get("level", "INFO"))
[docs] @dataclass class ProcessingConfig: """Configuration for processing.""" workers: int = 4
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ProcessingConfig": """Create ProcessingConfig from dictionary.""" return cls(workers=data.get("workers", 4))
[docs] @dataclass class DatabaseConfig: """Configuration for database connections.""" type: str host: str port: int user: str password: str database: str schema: Optional[str] = None charset: str = "utf8mb4" tables: list["TableConfig"] = field(default_factory=list)
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "DatabaseConfig": """Create DatabaseConfig from dictionary.""" # Parse tables if present tables = [] if "tables" in data: tables = [ TableConfig.from_dict(table_data) for table_data in data["tables"] ] return cls( type=data["type"], host=data["host"], port=data.get("port", 3306 if data["type"] == "mysql" else 5432), user=data["user"], password=data["password"], database=data["database"], schema=data.get("schema"), charset=data.get("charset", "utf8mb4"), tables=tables, )
[docs] @dataclass class FileItemConfig: """Configuration for a single file item.""" name: str # Logical name for the file (like table name) format: str # "csv", "json" file_path: Optional[str] = None http_path: Optional[str] = None options: Dict[str, Any] = field(default_factory=dict) strategy: LoadStrategy = LoadStrategy.REPLACE merge_key: Optional[str] = None batch_size: Optional[int] = None
[docs] def __post_init__(self): """Validate configuration after initialization.""" if not self.file_path and not self.http_path: raise ValueError("Either file_path or http_path must be provided") if self.file_path and self.http_path: raise ValueError("Cannot specify both file_path and http_path")
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "FileItemConfig": """Create FileItemConfig from dictionary.""" strategy_str = data.get("strategy", "replace") strategy = LoadStrategy(strategy_str) return cls( name=data["name"], format=data.get("format", "csv"), # Default to CSV if not specified file_path=data.get("file_path"), http_path=data.get("http_path"), options=data.get("options", {}), strategy=strategy, merge_key=data.get("merge_key"), batch_size=data.get("batch_size"), )
[docs] @dataclass class FileConfig: """Configuration for file connections.""" type: str # "file" files: list[FileItemConfig] = field(default_factory=list)
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "FileConfig": """Create FileConfig from dictionary.""" if "files" not in data: raise ValueError("'files' configuration is required for file sources") files = [FileItemConfig.from_dict(file_data) for file_data in data["files"]] return cls(type=data["type"], files=files)
[docs] @dataclass class IncrementalConfig: """Configuration for incremental extraction.""" field: str type: str initial_value: Optional[str] = None
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "IncrementalConfig": """Create IncrementalConfig from dictionary.""" return cls( field=data["field"], type=data["type"], initial_value=data.get("initial_value"), )
[docs] @dataclass class ReplaceConfig: """Configuration for replace strategy.""" how: ReplaceMethod = ReplaceMethod.RECREATE
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ReplaceConfig": """Create ReplaceConfig from dictionary.""" how_str = data.get("how", "recreate") return cls(how=ReplaceMethod(how_str))
[docs] @dataclass class TableConfig: """Configuration for table processing.""" name: str strategy: LoadStrategy = LoadStrategy.REPLACE merge_key: Optional[str] = None batch_size: Optional[int] = None incremental: Optional[IncrementalConfig] = None replace: Optional[ReplaceConfig] = None
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "TableConfig": """Create TableConfig from dictionary.""" strategy_str = data.get("strategy", "replace") strategy = LoadStrategy(strategy_str) incremental = None if "incremental" in data and data["incremental"]: incremental = IncrementalConfig.from_dict(data["incremental"]) replace = None if "replace" in data and data["replace"]: replace = ReplaceConfig.from_dict(data["replace"]) return cls( name=data["name"], strategy=strategy, merge_key=data.get("merge_key"), batch_size=data.get("batch_size"), incremental=incremental, replace=replace, )
ConnectorConfig = Union[DatabaseConfig, FileConfig]
[docs] @dataclass class PipelineConfig: """Configuration for a single pipeline.""" name: str source: ConnectorConfig destination: ConnectorConfig workers: Optional[int] = None
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "PipelineConfig": """Create PipelineConfig from dictionary.""" name = data.get("name", "default") # Parse source config source_data = data.get("source", {}) source_config: ConnectorConfig source_type = source_data.get("type", "") if source_type in ["file", "csv", "json"]: source_config = FileConfig.from_dict(source_data) else: source_config = DatabaseConfig.from_dict(source_data) # Parse destination config destination_data = data.get("destination", {}) destination_config: ConnectorConfig destination_type = destination_data.get("type", "") if destination_type in ["file", "csv", "json"]: destination_config = FileConfig.from_dict(destination_data) else: destination_config = DatabaseConfig.from_dict(destination_data) return cls( name=name, source=source_config, destination=destination_config, workers=data.get("workers"), )
[docs] @dataclass class Config: """Main configuration object.""" logging: LoggingConfig = field(default_factory=LoggingConfig) processing: ProcessingConfig = field(default_factory=ProcessingConfig) pipelines: list[PipelineConfig] = field(default_factory=list)
[docs] def __post_init__(self): """Validate configuration after initialization.""" self._validate_pipelines(self.pipelines)
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Config": """Create Config from dictionary.""" # Parse logging config logging_data = data.get("logging", {}) logging_config = LoggingConfig.from_dict(logging_data) # Parse processing config processing_data = data.get("processing", {}) processing_config = ProcessingConfig.from_dict(processing_data) # Parse pipelines if "pipelines" not in data: raise ValueError("'pipelines' configuration is required") pipelines = [ PipelineConfig.from_dict(pipeline_data) for pipeline_data in data["pipelines"] ] # Validate pipelines cls._validate_pipelines(pipelines) return cls( logging=logging_config, processing=processing_config, pipelines=pipelines )
@classmethod def _validate_pipelines(cls, pipelines: list[PipelineConfig]) -> None: """Validate pipeline configuration.""" if not pipelines: raise ValueError("At least one pipeline must be configured") # Check for duplicate pipeline names names = [pipeline.name for pipeline in pipelines] if len(names) != len(set(names)): raise ValueError("Pipeline names must be unique") # Validate each pipeline has valid source and destination for pipeline in pipelines: if not pipeline.source: raise ValueError( f"Pipeline '{pipeline.name}' must have a source configuration" ) if not pipeline.destination: raise ValueError( f"Pipeline '{pipeline.name}' must have a destination configuration" ) # Validate source has tables/files if isinstance(pipeline.source, DatabaseConfig): if not pipeline.source.tables: raise ValueError( f"Database source in pipeline '{pipeline.name}' must have at least one table" ) elif isinstance(pipeline.source, FileConfig): if not pipeline.source.files: raise ValueError( f"File source in pipeline '{pipeline.name}' must have at least one file" )
[docs] @classmethod def read_config(cls, path: str) -> "Config": """Read configuration from YAML file.""" with open(path, "r") as file: data = yaml.safe_load(file) return cls.from_dict(data)