Source code for extral.error_tracking

"""Error tracking and reporting for the Extral ETL tool."""

import json
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field, asdict


logger = logging.getLogger(__name__)


[docs] @dataclass class ErrorDetails: """Details of a single error occurrence.""" pipeline: str dataset: str operation: str error_type: str error_message: str timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) duration_seconds: Optional[float] = None records_processed: Optional[int] = None retry_count: int = 0 stack_trace: Optional[str] = None
[docs] @dataclass class ErrorReport: """Summary report of all errors during ETL execution.""" start_time: str end_time: Optional[str] = None total_pipelines: int = 0 successful_pipelines: int = 0 failed_pipelines: int = 0 total_datasets: int = 0 successful_datasets: int = 0 failed_datasets: int = 0 errors: List[ErrorDetails] = field(default_factory=list)
[docs] def add_error(self, error: ErrorDetails): """Add an error to the report.""" self.errors.append(error)
[docs] def to_dict(self) -> Dict[str, Any]: """Convert report to dictionary for serialization.""" return asdict(self)
[docs] def save_to_file(self, filepath: Path): """Save error report to JSON file.""" with open(filepath, "w") as f: json.dump(self.to_dict(), f, indent=2)
[docs] def get_summary(self) -> str: """Get a human-readable summary of the error report.""" lines = [ "ETL Execution Summary", "=" * 50, f"Start Time: {self.start_time}", f"End Time: {self.end_time or 'In Progress'}", "", "Pipeline Summary:", f" Total: {self.total_pipelines}", f" Successful: {self.successful_pipelines}", f" Failed: {self.failed_pipelines}", "", "Dataset Summary:", f" Total: {self.total_datasets}", f" Successful: {self.successful_datasets}", f" Failed: {self.failed_datasets}", "", f"Total Errors: {len(self.errors)}", ] if self.errors: lines.extend( [ "", "Error Details:", "-" * 50, ] ) # Group errors by pipeline errors_by_pipeline: Dict[str, List[ErrorDetails]] = {} for error in self.errors: if error.pipeline not in errors_by_pipeline: errors_by_pipeline[error.pipeline] = [] errors_by_pipeline[error.pipeline].append(error) for pipeline, pipeline_errors in errors_by_pipeline.items(): lines.append(f"\nPipeline: {pipeline}") for error in pipeline_errors: lines.extend( [ f" Dataset: {error.dataset}", f" Operation: {error.operation}", f" Error Type: {error.error_type}", f" Message: {error.error_message}", f" Timestamp: {error.timestamp}", ] ) if error.retry_count > 0: lines.append(f" Retries: {error.retry_count}") if error.duration_seconds is not None: lines.append(f" Duration: {error.duration_seconds:.2f}s") if error.records_processed is not None: lines.append( f" Records Processed: {error.records_processed}" ) lines.append("") return "\n".join(lines)
[docs] class ErrorTracker: """Tracks errors during ETL execution.""" def __init__(self): self.report = ErrorReport(start_time=datetime.now().isoformat()) self.pipeline_errors: Dict[str, List[ErrorDetails]] = {} self.dataset_errors: Dict[str, List[ErrorDetails]] = {}
[docs] def track_error( self, pipeline: str, dataset: str, operation: str, exception: Exception, duration_seconds: Optional[float] = None, records_processed: Optional[int] = None, retry_count: int = 0, include_stack_trace: bool = False, ): """Track an error occurrence.""" import traceback error = ErrorDetails( pipeline=pipeline, dataset=dataset, operation=operation, error_type=type(exception).__name__, error_message=str(exception), duration_seconds=duration_seconds, records_processed=records_processed, retry_count=retry_count, stack_trace=traceback.format_exc() if include_stack_trace else None, ) self.report.add_error(error) # Track by pipeline if pipeline not in self.pipeline_errors: self.pipeline_errors[pipeline] = [] self.pipeline_errors[pipeline].append(error) # Track by dataset dataset_key = f"{pipeline}::{dataset}" if dataset_key not in self.dataset_errors: self.dataset_errors[dataset_key] = [] self.dataset_errors[dataset_key].append(error) logger.error( f"Error tracked - Pipeline: {pipeline}, Dataset: {dataset}, " f"Operation: {operation}, Type: {error.error_type}, Message: {error.error_message}" )
[docs] def finalize_report( self, total_pipelines: int, successful_pipelines: int, total_datasets: int, successful_datasets: int, ): """Finalize the error report with summary statistics.""" self.report.end_time = datetime.now().isoformat() self.report.total_pipelines = total_pipelines self.report.successful_pipelines = successful_pipelines self.report.failed_pipelines = total_pipelines - successful_pipelines self.report.total_datasets = total_datasets self.report.successful_datasets = successful_datasets self.report.failed_datasets = total_datasets - successful_datasets
[docs] def has_errors_for_pipeline(self, pipeline: str) -> bool: """Check if a pipeline has any errors.""" return pipeline in self.pipeline_errors
[docs] def has_errors_for_dataset(self, pipeline: str, dataset: str) -> bool: """Check if a dataset has any errors.""" dataset_key = f"{pipeline}::{dataset}" return dataset_key in self.dataset_errors
[docs] def get_errors_for_pipeline(self, pipeline: str) -> List[ErrorDetails]: """Get all errors for a specific pipeline.""" return self.pipeline_errors.get(pipeline, [])
[docs] def get_errors_for_dataset(self, pipeline: str, dataset: str) -> List[ErrorDetails]: """Get all errors for a specific dataset.""" dataset_key = f"{pipeline}::{dataset}" return self.dataset_errors.get(dataset_key, [])