API Reference

This section provides detailed documentation of Extral’s Python API.

Configuration Module

class extral.config.LoadStrategy(*values)[source]

Bases: Enum

Enumeration of supported load strategies.

APPEND = 'append'
REPLACE = 'replace'
MERGE = 'merge'
class extral.config.ReplaceMethod(*values)[source]

Bases: Enum

Enumeration of supported replace methods.

TRUNCATE = 'truncate'
RECREATE = 'recreate'
class extral.config.ExtractConfig(extract_type: str | None = None, incremental_field: str | None = None, last_value: str | int | None = None, batch_size: int | None = None)[source]

Bases: object

Configuration for data extraction operations.

extract_type: str | None = None
incremental_field: str | None = None
last_value: str | int | None = None
batch_size: int | None = None
to_dict() dict[str, str | int | None][source]

Convert to dictionary format for backward compatibility.

class extral.config.LoadConfig(strategy: LoadStrategy = LoadStrategy.REPLACE, replace_method: ReplaceMethod = ReplaceMethod.RECREATE, merge_key: str | None = None, batch_size: int | None = None)[source]

Bases: object

Configuration for data loading operations.

strategy: LoadStrategy = 'replace'
replace_method: ReplaceMethod = 'recreate'
merge_key: str | None = None
batch_size: int | None = None
class extral.config.LoggingConfig(level: str = 'INFO')[source]

Bases: object

Configuration for logging.

level: str = 'INFO'
classmethod from_dict(data: Dict[str, Any]) LoggingConfig[source]

Create LoggingConfig from dictionary.

class extral.config.ProcessingConfig(workers: int = 4)[source]

Bases: object

Configuration for processing.

workers: int = 4
classmethod from_dict(data: Dict[str, Any]) ProcessingConfig[source]

Create ProcessingConfig from dictionary.

class extral.config.DatabaseConfig(type: str, host: str, port: int, user: str, password: str, database: str, schema: str | None = None, charset: str = 'utf8mb4', tables: list[~extral.config.TableConfig] = <factory>)[source]

Bases: object

Configuration for database connections.

type: str
host: str
port: int
user: str
password: str
database: str
schema: str | None = None
charset: str = 'utf8mb4'
tables: list[TableConfig]
classmethod from_dict(data: Dict[str, Any]) DatabaseConfig[source]

Create DatabaseConfig from dictionary.

class extral.config.FileItemConfig(name: str, format: str, file_path: str | None = None, http_path: str | None = None, options: ~typing.Dict[str, ~typing.Any] = <factory>, strategy: ~extral.config.LoadStrategy = LoadStrategy.REPLACE, merge_key: str | None = None, batch_size: int | None = None)[source]

Bases: object

Configuration for a single file item.

name: str
format: str
file_path: str | None = None
http_path: str | None = None
options: Dict[str, Any]
strategy: LoadStrategy = 'replace'
merge_key: str | None = None
batch_size: int | None = None
__post_init__()[source]

Validate configuration after initialization.

classmethod from_dict(data: Dict[str, Any]) FileItemConfig[source]

Create FileItemConfig from dictionary.

class extral.config.FileConfig(type: str, files: list[~extral.config.FileItemConfig] = <factory>)[source]

Bases: object

Configuration for file connections.

type: str
files: list[FileItemConfig]
classmethod from_dict(data: Dict[str, Any]) FileConfig[source]

Create FileConfig from dictionary.

class extral.config.IncrementalConfig(field: str, type: str, initial_value: str | None = None)[source]

Bases: object

Configuration for incremental extraction.

field: str
type: str
initial_value: str | None = None
classmethod from_dict(data: Dict[str, Any]) IncrementalConfig[source]

Create IncrementalConfig from dictionary.

class extral.config.ReplaceConfig(how: ReplaceMethod = ReplaceMethod.RECREATE)[source]

Bases: object

Configuration for replace strategy.

how: ReplaceMethod = 'recreate'
classmethod from_dict(data: Dict[str, Any]) ReplaceConfig[source]

Create ReplaceConfig from dictionary.

class extral.config.TableConfig(name: str, strategy: LoadStrategy = LoadStrategy.REPLACE, merge_key: str | None = None, batch_size: int | None = None, incremental: IncrementalConfig | None = None, replace: ReplaceConfig | None = None)[source]

Bases: object

Configuration for table processing.

name: str
strategy: LoadStrategy = 'replace'
merge_key: str | None = None
batch_size: int | None = None
incremental: IncrementalConfig | None = None
replace: ReplaceConfig | None = None
classmethod from_dict(data: Dict[str, Any]) TableConfig[source]

Create TableConfig from dictionary.

class extral.config.PipelineConfig(name: str, source: DatabaseConfig | FileConfig, destination: DatabaseConfig | FileConfig, workers: int | None = None)[source]

Bases: object

Configuration for a single pipeline.

name: str
source: DatabaseConfig | FileConfig
destination: DatabaseConfig | FileConfig
workers: int | None = None
classmethod from_dict(data: Dict[str, Any]) PipelineConfig[source]

Create PipelineConfig from dictionary.

class extral.config.Config(logging: ~extral.config.LoggingConfig = <factory>, processing: ~extral.config.ProcessingConfig = <factory>, pipelines: list[~extral.config.PipelineConfig] = <factory>)[source]

Bases: object

Main configuration object.

logging: LoggingConfig
processing: ProcessingConfig
pipelines: list[PipelineConfig]
__post_init__()[source]

Validate configuration after initialization.

classmethod from_dict(data: Dict[str, Any]) Config[source]

Create Config from dictionary.

classmethod read_config(path: str) Config[source]

Read configuration from YAML file.

Main Module

extral.main.process_table(source_config: DatabaseConfig | FileConfig, destination_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, pipeline_name: str, error_tracker: ErrorTracker) bool[source]

Process a single table/dataset. Returns True if successful, False otherwise.

extral.main.main()[source]
extral.main.run(config_file_path: str, continue_on_error: bool = False, skip_datasets: list[str] | None = None, validate_only: bool = False, dry_run: bool = False)[source]

Connectors

Connector Base

Generic Connector Interface Module Defines the abstract interface for all data connectors in the ETL system.

class extral.connectors.connector.Connector[source]

Bases: ABC

Abstract base class for all data connectors.

This interface defines the core ETL operations that all connectors must implement: - extract_data: Extract data from any source (database, files, APIs, etc.) - load_data: Load data into any destination with configurable strategies

The interface is designed to be generic and not assume any specific storage type, allowing for databases, files, APIs, or any other data sources/destinations.

abstractmethod extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from the source dataset.

Parameters:
  • dataset_name – Name/identifier of the dataset to extract from (could be table name, file path, API endpoint, etc.)

  • extract_config – Configuration for extraction (incremental, batch size, etc.)

Yields:

Batches of database records as dictionaries

abstractmethod load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]

Load data into the destination dataset using the specified strategy.

Parameters:
  • dataset_name – Name/identifier of the target dataset (could be table name, file path, API endpoint, etc.)

  • data – List of records to load

  • load_config – Configuration specifying the load strategy

Strategies:
  • APPEND: Add new records without modifying existing data

  • REPLACE: Replace all data (with truncate or recreate method)

  • MERGE: Update existing records and insert new ones based on merge key

abstractmethod test_connection() bool[source]

Test connectivity to the data source/destination.

This method should verify that the connector can successfully connect to its configured data source or destination without performing any actual data operations.

Returns:

True if connection is successful, False otherwise

Return type:

bool

Raises:

ConnectionException – If connection fails with details about the failure

Database Connectors

Generic Database Connector

Generic Database Connector Interface Provides database-specific operations that extend the generic Connector interface.

class extral.connectors.database.generic.DatabaseConnector[source]

Bases: Connector

Generic database connector that extends the base Connector interface with database-specific operations.

This class provides the database-specific functionality like connection management, table operations, and schema handling while implementing the generic ETL operations.

abstractmethod connect(config: DatabaseConfig) None[source]

Establish connection to the database.

Parameters:

config – Database configuration containing connection parameters

abstractmethod disconnect() None[source]

Close the database connection.

abstractmethod is_table_exists(table_name: str) bool[source]

Check if a table exists in the database.

Parameters:

table_name – Name of the table to check

Returns:

True if table exists, False otherwise

abstractmethod extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]

Extract schema information for a table.

Parameters:

table_name – Name of the table

Returns:

Tuple of dictionaries containing column information

abstractmethod create_table(table_name: str, schema: TargetDatabaseSchema) None[source]

Create a table with the specified schema.

Parameters:
  • table_name – Name of the table to create

  • schema – Schema definition for the table

abstractmethod truncate_table(table_name: str) None[source]

Truncate a table (remove all data but keep structure).

Parameters:

table_name – Name of the table to truncate

extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from a database table.

Parameters:
  • dataset_name – Name of the table to extract from

  • extract_config – Configuration for extraction

Yields:

Batches of database records

load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]

Load data into a database table using the specified strategy.

Parameters:
  • dataset_name – Name of the target table

  • data – List of records to load

  • load_config – Configuration specifying the load strategy

get_table_schema(table_name: str) Dict[str, Any][source]

Get schema information for a table.

Parameters:

table_name – Name of the table

Returns:

Dictionary containing schema information

dataset_exists(dataset_name: str) bool[source]

Check if a dataset (table) exists - implements generic Connector interface.

Parameters:

dataset_name – Name of the dataset/table to check

Returns:

True if dataset exists, False otherwise

get_dataset_schema(dataset_name: str) Dict[str, Any][source]

Get schema information for a dataset (table) - implements generic Connector interface.

Parameters:

dataset_name – Name of the dataset/table

Returns:

Dictionary containing schema information

MySQL Connector

MySQL Database Connector Implementation of the generic DatabaseConnector interface for MySQL/MariaDB databases.

class extral.connectors.database.mysql.MySQLConnector[source]

Bases: DatabaseConnector

MySQL implementation of the generic DatabaseConnector interface.

This connector handles MySQL-specific operations while implementing both the generic Connector interface and database-specific operations.

connect(config: DatabaseConfig) None[source]

Establish connection to MySQL database.

disconnect() None[source]

Close the database connection.

test_connection() bool[source]

Test connectivity to the MySQL database.

Returns:

True if connection is successful, False otherwise

Return type:

bool

Raises:

ConnectionException – If connection fails with details about the failure

is_table_exists(table_name: str) bool[source]

Check if a table exists in the database.

extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]

Extract schema information for a table.

create_table(table_name: str, schema: TargetDatabaseSchema) None[source]

Create a table with the specified schema.

truncate_table(table_name: str) None[source]

Truncate a table in the MySQL database.

extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from a MySQL table.

PostgreSQL Connector

PostgreSQL Database Connector Implementation of the generic DatabaseConnector interface for PostgreSQL databases.

class extral.connectors.database.postgresql.PostgreSQLConnector[source]

Bases: DatabaseConnector

PostgreSQL implementation of the generic DatabaseConnector interface.

This connector handles PostgreSQL-specific operations while implementing both the generic Connector interface and database-specific operations.

connect(config: DatabaseConfig) None[source]

Establish connection to PostgreSQL database.

disconnect() None[source]

Close the database connection.

test_connection() bool[source]

Test connectivity to the PostgreSQL database.

Returns:

True if connection is successful, False otherwise

Return type:

bool

Raises:

ConnectionException – If connection fails with details about the failure

is_table_exists(table_name: str) bool[source]

Check if a table exists in the database.

extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]

Extract the schema for a given table in the PostgreSQL database.

create_table(table_name: str, schema: TargetDatabaseSchema) None[source]

Create a table in the PostgreSQL database based on the provided schema.

truncate_table(table_name: str) None[source]

Truncate a table in the PostgreSQL database.

extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from a PostgreSQL table.

File Connectors

File Base Connector

Base file connector implementation.

class extral.connectors.file.base.FileConnector(config: FileItemConfig)[source]

Bases: Connector

Abstract base class for file-based connectors.

This class extends the generic Connector interface to provide file-specific functionality for ETL operations.

abstractmethod extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from the file source.

Parameters:
  • dataset_name – Name of the dataset (file path or identifier)

  • extract_config – Configuration for extraction

Yields:

Batches of database records

abstractmethod load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]

Load data to the file destination.

Parameters:
  • dataset_name – Name of the target dataset (file path)

  • data – List of records to load

  • load_config – Configuration specifying the load strategy

get_effective_path(dataset_name: str | None = None) str[source]

Get the effective file path, handling both local and HTTP paths.

Parameters:

dataset_name – Optional dataset name to use as file path

Returns:

The resolved file path or URL

is_http_path(path: str) bool[source]

Check if the given path is an HTTP/HTTPS URL.

test_connection() bool[source]

Test accessibility to the file source/destination.

This method verifies that the file path or HTTP URL is accessible for reading (for extraction) or writing (for loading).

Returns:

True if file/path is accessible, False otherwise

Return type:

bool

Raises:

ConnectionException – If file access fails with details

abstractmethod infer_schema(dataset_name: str) Dict[str, Dict[str, Any]][source]

Infer schema from the file.

Parameters:

dataset_name – Name of the dataset (file path)

Returns:

Schema dictionary with column names and types

CSV Connector

CSV file connector implementation.

class extral.connectors.file.csv_connector.CSVConnector(config: FileItemConfig)[source]

Bases: FileConnector

CSV file connector for reading and writing CSV files.

Supports both local files and HTTP/HTTPS URLs for extraction.

extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from CSV file.

Parameters:
  • dataset_name – File path or identifier

  • extract_config – Extraction configuration

Yields:

Batches of database records

load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]

Load data to CSV file.

Parameters:
  • dataset_name – Target file path

  • data – Records to write

  • load_config – Load configuration (strategy: REPLACE or APPEND)

infer_schema(dataset_name: str) Dict[str, Dict[str, Any]][source]

Infer schema from CSV file.

For CSV files, all columns are treated as VARCHAR since CSV doesn’t have type information.

Parameters:

dataset_name – File path

Returns:

Schema dictionary with column names and types

JSON Connector

JSON file connector implementation.

class extral.connectors.file.json_connector.JSONConnector(config: FileItemConfig)[source]

Bases: FileConnector

JSON file connector for reading and writing JSON files.

Supports both JSON array format and JSON Lines format. Supports both local files and HTTP/HTTPS URLs for extraction.

extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]

Extract data from JSON file.

Parameters:
  • dataset_name – File path or identifier

  • extract_config – Extraction configuration

Yields:

Batches of database records

load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]

Load data to JSON file.

Parameters:
  • dataset_name – Target file path

  • data – Records to write

  • load_config – Load configuration (only REPLACE strategy supported)

infer_schema(dataset_name: str) Dict[str, Dict[str, Any]][source]

Infer schema from JSON file by sampling records.

Parameters:

dataset_name – File path

Returns:

Schema dictionary with column names and types

File Utilities

Utility functions for file connectors.

extral.connectors.file.utils.get_file_handle(path: str, mode: str = 'r') Generator[Path | _TemporaryFileWrapper, None, None][source]

Get a file handle for either local file or HTTP/HTTPS URL.

For HTTP/HTTPS URLs, downloads the file to a temporary location. For local files, returns the path directly.

Parameters:
  • path – Local file path or HTTP/HTTPS URL

  • mode – File open mode (only used for local files)

Yields:

Path object for local files or temporary file for URLs

Raises:
  • HTTPError – If HTTP download fails

  • URLError – If URL is invalid or network error

  • FileNotFoundError – If local file doesn’t exist

extral.connectors.file.utils.estimate_file_size(path: str) int | None[source]

Estimate the size of a file from local path or HTTP headers.

Parameters:

path – Local file path or HTTP/HTTPS URL

Returns:

File size in bytes, or None if cannot be determined

Core Modules

Extract Module

extral.extract.extract_schema_from_source(source_config: DatabaseConfig | FileConfig, table_name: str) TargetDatabaseSchema | None[source]

Determine the schema for a given table.

extral.extract.extract_table(source_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, pipeline_name: str) Tuple[str | None, str | None][source]

Load Module

extral.load.load_data(destination_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, file_path: str, schema_path: str, pipeline_name: str | None = None)[source]

Schema Module

exception extral.schema.SchemaCreateException[source]

Bases: Exception

Custom exception for schema creation errors.

class extral.schema.DatabaseSchema

Bases: TypedDict

schema_source: str
schema: dict[str, dict[str, str]]
class extral.schema.TargetDatabaseSchema

Bases: TypedDict

schema_source: str
schema: dict[str, dict[str, str | bool]]
extral.schema.infer_schema(data: Iterable[dict[str, str | None]]) TargetDatabaseSchema[source]

Infer database schema based on the provided data.

State Module

class extral.state.DatasetState

Bases: TypedDict

incremental: dict[str, str | int | None]
class extral.state.PipelineState

Bases: TypedDict

datasets: dict[str, DatasetState]
class extral.state.State[source]

Bases: object

get_dataset_state(pipeline_name: str, dataset_id: str) DatasetState | None[source]

Get the state for a specific dataset in a specific pipeline.

Parameters:
  • pipeline_name – Name of the pipeline

  • dataset_id – Identifier for the dataset (table name or file name/logical name)

set_dataset_state(pipeline_name: str, dataset_id: str, dataset_state: DatasetState) None[source]

Set the state for a specific dataset in a specific pipeline.

Parameters:
  • pipeline_name – Name of the pipeline

  • dataset_id – Identifier for the dataset (table name or file name/logical name)

  • dataset_state – State data for the dataset

get_pipeline_state(pipeline_name: str) PipelineState | None[source]

Get the complete state for a specific pipeline.

list_pipelines() list[str][source]

List all pipeline names that have state.

list_datasets(pipeline_name: str) list[str][source]

List all dataset IDs for a specific pipeline.

store_state()[source]

Store the current state to a JSON file.

load_state()[source]

Store Module

extral.store.compress_data(data: bytes) bytes[source]

Compress data using gzip.

extral.store.decompress_data(data: bytes) bytes[source]

Decompress data using gzip.

extral.store.store_data(data: bytes, file_path: str) None[source]

Store compressed data to a file.

Validation Module

Pre-flight validation system for ETL pipeline configurations.

This module provides comprehensive validation for multi-pipeline configurations, including connectivity testing, resource conflict detection, and configuration consistency checks.

class extral.validation.ValidationResult(is_valid: bool, error_message: str | None = None, warnings: ~typing.List[str] = <factory>, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Result of a validation operation.

is_valid: bool
error_message: str | None = None
warnings: List[str]
details: Dict[str, Any]
class extral.validation.PipelineValidationResult(pipeline_name: str, connectivity_result: ValidationResult, resource_result: ValidationResult, config_result: ValidationResult, overall_valid: bool = False)[source]

Bases: object

Validation result for a single pipeline.

pipeline_name: str
connectivity_result: ValidationResult
resource_result: ValidationResult
config_result: ValidationResult
overall_valid: bool = False
__post_init__()[source]

Calculate overall validity after initialization.

class extral.validation.ValidationReport(overall_valid: bool = False, pipeline_results: ~typing.Dict[str, ~extral.validation.PipelineValidationResult] = <factory>, global_conflicts: ~extral.validation.ValidationResult = <factory>, summary: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Comprehensive validation report for all pipelines.

overall_valid: bool = False
pipeline_results: Dict[str, PipelineValidationResult]
global_conflicts: ValidationResult
summary: Dict[str, Any]
__post_init__()[source]

Calculate overall validity and summary after initialization.

class extral.validation.PipelineValidator[source]

Bases: object

Comprehensive pipeline validation system.

validate_configuration(config: Config) ValidationReport[source]

Perform comprehensive validation of the entire configuration.

Parameters:

config – Configuration object to validate

Returns:

ValidationReport with detailed results

extral.validation.format_validation_report(report: ValidationReport) str[source]

Format a validation report for human-readable output.

Parameters:

report – Validation report to format

Returns:

Formatted string representation of the report

Utility Modules

Database Module

Database Type Translator Module Translates data types between MySQL and PostgreSQL databases

class extral.database.DatabaseTypeTranslator[source]

Bases: object

Translates database column types between MySQL and PostgreSQL

translate(type_str: str, from_db: str, to_db: str) str[source]

Translate a type string from one database to another

Parameters:
  • type_str – The type string to translate (e.g., ‘varchar(255)’, ‘int unsigned’)

  • from_db – Source database (‘mysql’ or ‘postgresql’)

  • to_db – Target database (‘mysql’ or ‘postgresql’)

Returns:

Translated type string

get_supported_types(database: str) Dict[str, str][source]

Get all supported types for a database

extral.database.translate_type(type_str: str, from_db: str, to_db: str) str[source]

Translate a database type from one system to another

Parameters:
  • type_str – The type string to translate

  • from_db – Source database (‘mysql’ or ‘postgresql’)

  • to_db – Target database (‘mysql’ or ‘postgresql’)

Returns:

Translated type string

extral.database.mysql_to_postgresql(type_str: str) str[source]

Convert MySQL type to PostgreSQL

extral.database.postgresql_to_mysql(type_str: str) str[source]

Convert PostgreSQL type to MySQL

Encoder Module

class extral.encoder.CustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(o: Any)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
exception extral.encoder.EncodingException[source]

Bases: Exception

Custom exception for encoding errors.

extral.encoder.encode_data(data: list[dict[str, str | None]]) bytes[source]

Encode data to JSON bytes using a custom encoder for datetime and Decimal types.

Error Tracking Module

Error tracking and reporting for the Extral ETL tool.

class extral.error_tracking.ErrorDetails(pipeline: str, dataset: str, operation: str, error_type: str, error_message: str, timestamp: str = <factory>, duration_seconds: float | None = None, records_processed: int | None = None, retry_count: int = 0, stack_trace: str | None = None)[source]

Bases: object

Details of a single error occurrence.

pipeline: str
dataset: str
operation: str
error_type: str
error_message: str
timestamp: str
duration_seconds: float | None = None
records_processed: int | None = None
retry_count: int = 0
stack_trace: str | None = None
class extral.error_tracking.ErrorReport(start_time: str, end_time: str | None = None, total_pipelines: int = 0, successful_pipelines: int = 0, failed_pipelines: int = 0, total_datasets: int = 0, successful_datasets: int = 0, failed_datasets: int = 0, errors: ~typing.List[~extral.error_tracking.ErrorDetails] = <factory>)[source]

Bases: object

Summary report of all errors during ETL execution.

start_time: str
end_time: str | None = None
total_pipelines: int = 0
successful_pipelines: int = 0
failed_pipelines: int = 0
total_datasets: int = 0
successful_datasets: int = 0
failed_datasets: int = 0
errors: List[ErrorDetails]
add_error(error: ErrorDetails)[source]

Add an error to the report.

to_dict() Dict[str, Any][source]

Convert report to dictionary for serialization.

save_to_file(filepath: Path)[source]

Save error report to JSON file.

get_summary() str[source]

Get a human-readable summary of the error report.

class extral.error_tracking.ErrorTracker[source]

Bases: object

Tracks errors during ETL execution.

track_error(pipeline: str, dataset: str, operation: str, exception: Exception, duration_seconds: float | None = None, records_processed: int | None = None, retry_count: int = 0, include_stack_trace: bool = False)[source]

Track an error occurrence.

finalize_report(total_pipelines: int, successful_pipelines: int, total_datasets: int, successful_datasets: int)[source]

Finalize the error report with summary statistics.

has_errors_for_pipeline(pipeline: str) bool[source]

Check if a pipeline has any errors.

has_errors_for_dataset(pipeline: str, dataset: str) bool[source]

Check if a dataset has any errors.

get_errors_for_pipeline(pipeline: str) List[ErrorDetails][source]

Get all errors for a specific pipeline.

get_errors_for_dataset(pipeline: str, dataset: str) List[ErrorDetails][source]

Get all errors for a specific dataset.

Exceptions Module

Custom exceptions for the Extral ETL tool.

exception extral.exceptions.ExtralException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: Exception

Base exception for all Extral-specific errors.

exception extral.exceptions.ExtractException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when data extraction fails.

exception extral.exceptions.LoadException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when data loading fails.

exception extral.exceptions.ConnectionException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when database connection fails.

exception extral.exceptions.ConfigurationException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when configuration is invalid or missing.

exception extral.exceptions.StateException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when state management operations fail.

exception extral.exceptions.ValidationException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]

Bases: ExtralException

Raised when validation checks fail.

exception extral.exceptions.RetryableException(message: str, max_retries: int = 3, **kwargs: Any)[source]

Bases: ExtralException

Base class for exceptions that can be retried.