# Copyright 2025 Michael Anckaert
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Optional
from extral import __version__
from extral.config import Config, ConnectorConfig, TableConfig, FileItemConfig, DatabaseConfig
from extral.extract import extract_table
from extral.load import load_data
from extral.state import state
from extral.error_tracking import ErrorTracker
from extral.validation import PipelineValidator, format_validation_report
import argparse
logger = logging.getLogger(__name__)
DEFAULT_WORKER_COUNT = 4
[docs]
def process_table(
source_config: ConnectorConfig,
destination_config: ConnectorConfig,
dataset_config: TableConfig | FileItemConfig,
pipeline_name: str,
error_tracker: ErrorTracker,
) -> bool:
"""Process a single table/dataset. Returns True if successful, False otherwise."""
start_time = time.time()
try:
logger.info(f"Processing dataset: {dataset_config.name}")
# Extract phase
try:
file_path, schema_path = extract_table(
source_config, dataset_config, pipeline_name
)
if file_path is None or schema_path is None:
logger.info(
f"Skipping dataset load for '{dataset_config.name}' as there is no data extracted."
)
return True
except Exception as e:
duration = time.time() - start_time
error_tracker.track_error(
pipeline=pipeline_name,
dataset=dataset_config.name,
operation="extract",
exception=e,
duration_seconds=duration,
include_stack_trace=True,
)
raise
# Load phase
try:
load_data(
destination_config,
dataset_config,
file_path,
schema_path,
pipeline_name,
)
except Exception as e:
duration = time.time() - start_time
error_tracker.track_error(
pipeline=pipeline_name,
dataset=dataset_config.name,
operation="load",
exception=e,
duration_seconds=duration,
include_stack_trace=True,
)
raise
return True
except Exception as e:
logger.error(f"Error processing dataset '{dataset_config.name}': {e}")
return False
def _setup_logging(args: argparse.Namespace):
config = Config.read_config(args.config)
logging_config = config.logging
if logging_config.level == "debug":
level = logging.DEBUG
elif logging_config.level == "info":
level = logging.INFO
elif logging_config.level == "warning":
level = logging.WARNING
elif logging_config.level == "error":
level = logging.ERROR
elif logging_config.level == "critical":
level = logging.CRITICAL
else:
logger.warning(
f"Unknown logging level '{logging_config.level}', defaulting to INFO."
)
level = logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
[docs]
def main():
parser = argparse.ArgumentParser(
description=f"Extract and Load Data Tool (v{__version__})"
)
parser.add_argument(
"-c",
"--config",
type=str,
default="config.yaml",
help="Path to the configuration file. Defaults to 'config.yaml'.",
)
parser.add_argument(
"-v",
"--version",
action="version",
version=f"%(prog)s {__version__}",
help="Show the version of the tool.",
)
parser.add_argument(
"--continue-on-error",
action="store_true",
help="Continue processing even if some datasets fail.",
)
parser.add_argument(
"--skip-datasets",
type=str,
nargs="+",
help="Skip specified datasets during processing.",
)
parser.add_argument(
"--validate-only",
action="store_true",
help="Only validate the configuration without executing pipelines.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Perform validation and show execution plan without running pipelines.",
)
args = parser.parse_args()
_setup_logging(args)
logger.debug(f"Parsed arguments: {args} ")
config_file_path = args.config
run(
config_file_path,
args.continue_on_error,
args.skip_datasets or [],
args.validate_only,
args.dry_run
)
[docs]
def run(
config_file_path: str,
continue_on_error: bool = False,
skip_datasets: Optional[list[str]] = None,
validate_only: bool = False,
dry_run: bool = False,
):
if skip_datasets is None:
skip_datasets = []
state.load_state()
config = Config.read_config(config_file_path)
# Perform pre-flight validation
logger.info("Performing pre-flight validation...")
validator = PipelineValidator()
validation_report = validator.validate_configuration(config)
# Print validation report
print(format_validation_report(validation_report))
# Handle validation-only mode
if validate_only:
logger.info("Validation complete. Exiting (--validate-only mode).")
sys.exit(0 if validation_report.overall_valid else 1)
# Check if validation failed
if not validation_report.overall_valid:
logger.error("Configuration validation failed. Cannot proceed with execution.")
logger.error("Use --validate-only for detailed validation report.")
sys.exit(1)
# Handle dry-run mode
if dry_run:
logger.info("=== DRY RUN MODE ===")
logger.info("Validation passed. Execution plan:")
for pipeline in config.pipelines:
logger.info(f"Pipeline: {pipeline.name}")
if isinstance(pipeline.source, DatabaseConfig):
logger.info(f" Source: {pipeline.source.type} ({len(pipeline.source.tables)} tables)")
for table in pipeline.source.tables:
logger.info(f" - Table: {table.name} (strategy: {table.strategy.value})")
else: # FileConfig
logger.info(f" Source: {pipeline.source.type} ({len(pipeline.source.files)} files)")
for file_item in pipeline.source.files:
file_name = file_item.file_path or file_item.http_path or "unknown"
logger.info(f" - File: {file_name} (strategy: {file_item.strategy.value})")
logger.info(f" Destination: {pipeline.destination.type}")
logger.info(f" Workers: {pipeline.workers or DEFAULT_WORKER_COUNT}")
logger.info("Dry run complete. Exiting (--dry-run mode).")
sys.exit(0)
if not config.pipelines:
logger.error("No pipelines specified in the configuration.")
sys.exit(1)
# Initialize error tracker
error_tracker = ErrorTracker()
# Log configuration options
if continue_on_error:
logger.info("Running in continue-on-error mode")
if skip_datasets:
logger.info(f"Skipping datasets: {', '.join(skip_datasets)}")
# Track overall statistics
total_pipelines = len(config.pipelines)
successful_pipelines = 0
total_datasets = 0
successful_datasets = 0
# Process pipelines sequentially
for pipeline in config.pipelines:
logger.info(f"Processing pipeline: {pipeline.name}")
pipeline_start = time.time()
pipeline_success = True
# Get worker count (pipeline-specific or global default)
worker_count = (
pipeline.workers or config.processing.workers or DEFAULT_WORKER_COUNT
)
# Get tables/datasets from the source configuration
datasets: list[TableConfig | FileItemConfig] = []
if hasattr(pipeline.source, "tables"):
datasets = getattr(pipeline.source, "tables", [])
elif hasattr(pipeline.source, "files"):
datasets = getattr(pipeline.source, "files", [])
if not datasets:
logger.error(
f"No datasets (tables or files) found in pipeline '{pipeline.name}'"
)
error_tracker.track_error(
pipeline=pipeline.name,
dataset="N/A",
operation="pipeline_setup",
exception=Exception("No datasets found in pipeline configuration"),
duration_seconds=time.time() - pipeline_start,
)
continue
logger.info(
f"Found {len(datasets)} datasets to process in pipeline '{pipeline.name}'"
)
total_datasets += len(datasets)
# Track datasets for this pipeline
pipeline_dataset_success = 0
# Filter out skipped datasets
datasets_to_process = []
for dataset in datasets:
if dataset.name in skip_datasets:
logger.info(f"Skipping dataset '{dataset.name}' as requested")
else:
datasets_to_process.append(dataset)
if not datasets_to_process:
logger.info(f"All datasets in pipeline '{pipeline.name}' were skipped")
continue
# Process datasets in parallel within the pipeline
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {
executor.submit(
process_table,
pipeline.source,
pipeline.destination,
dataset,
pipeline.name,
error_tracker,
): dataset
for dataset in datasets_to_process
}
for future in as_completed(futures):
dataset = futures[future]
try:
success = future.result()
if success:
pipeline_dataset_success += 1
successful_datasets += 1
logger.info(
f"Completed processing dataset '{dataset.name}' in pipeline '{pipeline.name}'"
)
else:
pipeline_success = False
except Exception as e:
pipeline_success = False
logger.error(
f"Error processing dataset '{dataset.name}' in pipeline '{pipeline.name}': {e}"
)
if not continue_on_error:
logger.error(
"Stopping execution due to error (use --continue-on-error to proceed)"
)
# Finalize report and exit
error_tracker.finalize_report(
total_pipelines=total_pipelines,
successful_pipelines=successful_pipelines,
total_datasets=total_datasets,
successful_datasets=successful_datasets,
)
logger.info("\n" + error_tracker.report.get_summary())
if error_tracker.report.errors:
error_report_path = Path("extral_error_report.json")
error_tracker.report.save_to_file(error_report_path)
logger.info(f"Error report saved to: {error_report_path}")
sys.exit(1)
if pipeline_success and pipeline_dataset_success == len(datasets_to_process):
successful_pipelines += 1
logger.info(f"Successfully completed pipeline: {pipeline.name}")
else:
logger.warning(
f"Pipeline '{pipeline.name}' completed with errors. "
f"Successful datasets: {pipeline_dataset_success}/{len(datasets_to_process)}"
)
# Finalize error report
error_tracker.finalize_report(
total_pipelines=total_pipelines,
successful_pipelines=successful_pipelines,
total_datasets=total_datasets,
successful_datasets=successful_datasets,
)
# Display error summary
logger.info("\n" + error_tracker.report.get_summary())
# Save error report if there were errors
if error_tracker.report.errors:
error_report_path = Path("extral_error_report.json")
error_tracker.report.save_to_file(error_report_path)
logger.info(f"Error report saved to: {error_report_path}")
# Store state
state.store_state()
# Exit with error code if there were failures
if (
error_tracker.report.failed_pipelines > 0
or error_tracker.report.failed_datasets > 0
):
sys.exit(1)
if __name__ == "__main__":
main()