API Reference¶
This section provides detailed documentation of Extral’s Python API.
Configuration Module¶
- class extral.config.LoadStrategy(*values)[source]¶
Bases:
EnumEnumeration of supported load strategies.
- APPEND = 'append'¶
- REPLACE = 'replace'¶
- MERGE = 'merge'¶
- class extral.config.ReplaceMethod(*values)[source]¶
Bases:
EnumEnumeration of supported replace methods.
- TRUNCATE = 'truncate'¶
- RECREATE = 'recreate'¶
- class extral.config.ExtractConfig(extract_type: str | None = None, incremental_field: str | None = None, last_value: str | int | None = None, batch_size: int | None = None)[source]¶
Bases:
objectConfiguration for data extraction operations.
- class extral.config.LoadConfig(strategy: LoadStrategy = LoadStrategy.REPLACE, replace_method: ReplaceMethod = ReplaceMethod.RECREATE, merge_key: str | None = None, batch_size: int | None = None)[source]¶
Bases:
objectConfiguration for data loading operations.
- strategy: LoadStrategy = 'replace'¶
- replace_method: ReplaceMethod = 'recreate'¶
- class extral.config.LoggingConfig(level: str = 'INFO')[source]¶
Bases:
objectConfiguration for logging.
- class extral.config.ProcessingConfig(workers: int = 4)[source]¶
Bases:
objectConfiguration for processing.
- class extral.config.DatabaseConfig(type: str, host: str, port: int, user: str, password: str, database: str, schema: str | None = None, charset: str = 'utf8mb4', tables: list[~extral.config.TableConfig] = <factory>)[source]¶
Bases:
objectConfiguration for database connections.
- tables: list[TableConfig]¶
- class extral.config.FileItemConfig(name: str, format: str, file_path: str | None = None, http_path: str | None = None, options: ~typing.Dict[str, ~typing.Any] = <factory>, strategy: ~extral.config.LoadStrategy = LoadStrategy.REPLACE, merge_key: str | None = None, batch_size: int | None = None)[source]¶
Bases:
objectConfiguration for a single file item.
- strategy: LoadStrategy = 'replace'¶
- class extral.config.FileConfig(type: str, files: list[~extral.config.FileItemConfig] = <factory>)[source]¶
Bases:
objectConfiguration for file connections.
- files: list[FileItemConfig]¶
- class extral.config.IncrementalConfig(field: str, type: str, initial_value: str | None = None)[source]¶
Bases:
objectConfiguration for incremental extraction.
- class extral.config.ReplaceConfig(how: ReplaceMethod = ReplaceMethod.RECREATE)[source]¶
Bases:
objectConfiguration for replace strategy.
- how: ReplaceMethod = 'recreate'¶
- class extral.config.TableConfig(name: str, strategy: LoadStrategy = LoadStrategy.REPLACE, merge_key: str | None = None, batch_size: int | None = None, incremental: IncrementalConfig | None = None, replace: ReplaceConfig | None = None)[source]¶
Bases:
objectConfiguration for table processing.
- strategy: LoadStrategy = 'replace'¶
- incremental: IncrementalConfig | None = None¶
- replace: ReplaceConfig | None = None¶
- class extral.config.PipelineConfig(name: str, source: DatabaseConfig | FileConfig, destination: DatabaseConfig | FileConfig, workers: int | None = None)[source]¶
Bases:
objectConfiguration for a single pipeline.
- source: DatabaseConfig | FileConfig¶
- destination: DatabaseConfig | FileConfig¶
- class extral.config.Config(logging: ~extral.config.LoggingConfig = <factory>, processing: ~extral.config.ProcessingConfig = <factory>, pipelines: list[~extral.config.PipelineConfig] = <factory>)[source]¶
Bases:
objectMain configuration object.
- logging: LoggingConfig¶
- processing: ProcessingConfig¶
- pipelines: list[PipelineConfig]¶
Main Module¶
- extral.main.process_table(source_config: DatabaseConfig | FileConfig, destination_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, pipeline_name: str, error_tracker: ErrorTracker) bool[source]¶
Process a single table/dataset. Returns True if successful, False otherwise.
Connectors¶
Connector Base¶
Generic Connector Interface Module Defines the abstract interface for all data connectors in the ETL system.
- class extral.connectors.connector.Connector[source]¶
Bases:
ABCAbstract base class for all data connectors.
This interface defines the core ETL operations that all connectors must implement: - extract_data: Extract data from any source (database, files, APIs, etc.) - load_data: Load data into any destination with configurable strategies
The interface is designed to be generic and not assume any specific storage type, allowing for databases, files, APIs, or any other data sources/destinations.
- abstractmethod extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]¶
Extract data from the source dataset.
- Parameters:
dataset_name – Name/identifier of the dataset to extract from (could be table name, file path, API endpoint, etc.)
extract_config – Configuration for extraction (incremental, batch size, etc.)
- Yields:
Batches of database records as dictionaries
- abstractmethod load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]¶
Load data into the destination dataset using the specified strategy.
- Parameters:
dataset_name – Name/identifier of the target dataset (could be table name, file path, API endpoint, etc.)
data – List of records to load
load_config – Configuration specifying the load strategy
- Strategies:
APPEND: Add new records without modifying existing data
REPLACE: Replace all data (with truncate or recreate method)
MERGE: Update existing records and insert new ones based on merge key
- abstractmethod test_connection() bool[source]¶
Test connectivity to the data source/destination.
This method should verify that the connector can successfully connect to its configured data source or destination without performing any actual data operations.
- Returns:
True if connection is successful, False otherwise
- Return type:
- Raises:
ConnectionException – If connection fails with details about the failure
Database Connectors¶
Generic Database Connector¶
Generic Database Connector Interface Provides database-specific operations that extend the generic Connector interface.
- class extral.connectors.database.generic.DatabaseConnector[source]¶
Bases:
ConnectorGeneric database connector that extends the base Connector interface with database-specific operations.
This class provides the database-specific functionality like connection management, table operations, and schema handling while implementing the generic ETL operations.
- abstractmethod connect(config: DatabaseConfig) None[source]¶
Establish connection to the database.
- Parameters:
config – Database configuration containing connection parameters
- abstractmethod is_table_exists(table_name: str) bool[source]¶
Check if a table exists in the database.
- Parameters:
table_name – Name of the table to check
- Returns:
True if table exists, False otherwise
- abstractmethod extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]¶
Extract schema information for a table.
- Parameters:
table_name – Name of the table
- Returns:
Tuple of dictionaries containing column information
- abstractmethod create_table(table_name: str, schema: TargetDatabaseSchema) None[source]¶
Create a table with the specified schema.
- Parameters:
table_name – Name of the table to create
schema – Schema definition for the table
- abstractmethod truncate_table(table_name: str) None[source]¶
Truncate a table (remove all data but keep structure).
- Parameters:
table_name – Name of the table to truncate
- extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]¶
Extract data from a database table.
- Parameters:
dataset_name – Name of the table to extract from
extract_config – Configuration for extraction
- Yields:
Batches of database records
- load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]¶
Load data into a database table using the specified strategy.
- Parameters:
dataset_name – Name of the target table
data – List of records to load
load_config – Configuration specifying the load strategy
- get_table_schema(table_name: str) Dict[str, Any][source]¶
Get schema information for a table.
- Parameters:
table_name – Name of the table
- Returns:
Dictionary containing schema information
MySQL Connector¶
MySQL Database Connector Implementation of the generic DatabaseConnector interface for MySQL/MariaDB databases.
- class extral.connectors.database.mysql.MySQLConnector[source]¶
Bases:
DatabaseConnectorMySQL implementation of the generic DatabaseConnector interface.
This connector handles MySQL-specific operations while implementing both the generic Connector interface and database-specific operations.
- connect(config: DatabaseConfig) None[source]¶
Establish connection to MySQL database.
- test_connection() bool[source]¶
Test connectivity to the MySQL database.
- Returns:
True if connection is successful, False otherwise
- Return type:
- Raises:
ConnectionException – If connection fails with details about the failure
- extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]¶
Extract schema information for a table.
- create_table(table_name: str, schema: TargetDatabaseSchema) None[source]¶
Create a table with the specified schema.
PostgreSQL Connector¶
PostgreSQL Database Connector Implementation of the generic DatabaseConnector interface for PostgreSQL databases.
- class extral.connectors.database.postgresql.PostgreSQLConnector[source]¶
Bases:
DatabaseConnectorPostgreSQL implementation of the generic DatabaseConnector interface.
This connector handles PostgreSQL-specific operations while implementing both the generic Connector interface and database-specific operations.
- connect(config: DatabaseConfig) None[source]¶
Establish connection to PostgreSQL database.
- test_connection() bool[source]¶
Test connectivity to the PostgreSQL database.
- Returns:
True if connection is successful, False otherwise
- Return type:
- Raises:
ConnectionException – If connection fails with details about the failure
- extract_schema_for_table(table_name: str) Tuple[Dict[str, Any], ...][source]¶
Extract the schema for a given table in the PostgreSQL database.
- create_table(table_name: str, schema: TargetDatabaseSchema) None[source]¶
Create a table in the PostgreSQL database based on the provided schema.
File Connectors¶
File Base Connector¶
Base file connector implementation.
- class extral.connectors.file.base.FileConnector(config: FileItemConfig)[source]¶
Bases:
ConnectorAbstract base class for file-based connectors.
This class extends the generic Connector interface to provide file-specific functionality for ETL operations.
- abstractmethod extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]¶
Extract data from the file source.
- Parameters:
dataset_name – Name of the dataset (file path or identifier)
extract_config – Configuration for extraction
- Yields:
Batches of database records
- abstractmethod load_data(dataset_name: str, data: list[dict[str, str | None]], load_config: LoadConfig) None[source]¶
Load data to the file destination.
- Parameters:
dataset_name – Name of the target dataset (file path)
data – List of records to load
load_config – Configuration specifying the load strategy
- get_effective_path(dataset_name: str | None = None) str[source]¶
Get the effective file path, handling both local and HTTP paths.
- Parameters:
dataset_name – Optional dataset name to use as file path
- Returns:
The resolved file path or URL
- test_connection() bool[source]¶
Test accessibility to the file source/destination.
This method verifies that the file path or HTTP URL is accessible for reading (for extraction) or writing (for loading).
- Returns:
True if file/path is accessible, False otherwise
- Return type:
- Raises:
ConnectionException – If file access fails with details
CSV Connector¶
CSV file connector implementation.
- class extral.connectors.file.csv_connector.CSVConnector(config: FileItemConfig)[source]¶
Bases:
FileConnectorCSV file connector for reading and writing CSV files.
Supports both local files and HTTP/HTTPS URLs for extraction.
- extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]¶
Extract data from CSV file.
- Parameters:
dataset_name – File path or identifier
extract_config – Extraction configuration
- Yields:
Batches of database records
JSON Connector¶
JSON file connector implementation.
- class extral.connectors.file.json_connector.JSONConnector(config: FileItemConfig)[source]¶
Bases:
FileConnectorJSON file connector for reading and writing JSON files.
Supports both JSON array format and JSON Lines format. Supports both local files and HTTP/HTTPS URLs for extraction.
- extract_data(dataset_name: str, extract_config: ExtractConfig) Generator[list[dict[str, str | None]], None, None][source]¶
Extract data from JSON file.
- Parameters:
dataset_name – File path or identifier
extract_config – Extraction configuration
- Yields:
Batches of database records
File Utilities¶
Utility functions for file connectors.
- extral.connectors.file.utils.get_file_handle(path: str, mode: str = 'r') Generator[Path | _TemporaryFileWrapper, None, None][source]¶
Get a file handle for either local file or HTTP/HTTPS URL.
For HTTP/HTTPS URLs, downloads the file to a temporary location. For local files, returns the path directly.
- Parameters:
path – Local file path or HTTP/HTTPS URL
mode – File open mode (only used for local files)
- Yields:
Path object for local files or temporary file for URLs
- Raises:
HTTPError – If HTTP download fails
URLError – If URL is invalid or network error
FileNotFoundError – If local file doesn’t exist
Core Modules¶
Extract Module¶
- extral.extract.extract_schema_from_source(source_config: DatabaseConfig | FileConfig, table_name: str) TargetDatabaseSchema | None[source]¶
Determine the schema for a given table.
- extral.extract.extract_table(source_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, pipeline_name: str) Tuple[str | None, str | None][source]¶
Load Module¶
- extral.load.load_data(destination_config: DatabaseConfig | FileConfig, dataset_config: TableConfig | FileItemConfig, file_path: str, schema_path: str, pipeline_name: str | None = None)[source]¶
Schema Module¶
- exception extral.schema.SchemaCreateException[source]¶
Bases:
ExceptionCustom exception for schema creation errors.
State Module¶
- class extral.state.State[source]¶
Bases:
object- get_dataset_state(pipeline_name: str, dataset_id: str) DatasetState | None[source]¶
Get the state for a specific dataset in a specific pipeline.
- Parameters:
pipeline_name – Name of the pipeline
dataset_id – Identifier for the dataset (table name or file name/logical name)
- set_dataset_state(pipeline_name: str, dataset_id: str, dataset_state: DatasetState) None[source]¶
Set the state for a specific dataset in a specific pipeline.
- Parameters:
pipeline_name – Name of the pipeline
dataset_id – Identifier for the dataset (table name or file name/logical name)
dataset_state – State data for the dataset
- get_pipeline_state(pipeline_name: str) PipelineState | None[source]¶
Get the complete state for a specific pipeline.
Store Module¶
Validation Module¶
Pre-flight validation system for ETL pipeline configurations.
This module provides comprehensive validation for multi-pipeline configurations, including connectivity testing, resource conflict detection, and configuration consistency checks.
- class extral.validation.ValidationResult(is_valid: bool, error_message: str | None = None, warnings: ~typing.List[str] = <factory>, details: ~typing.Dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
objectResult of a validation operation.
- class extral.validation.PipelineValidationResult(pipeline_name: str, connectivity_result: ValidationResult, resource_result: ValidationResult, config_result: ValidationResult, overall_valid: bool = False)[source]¶
Bases:
objectValidation result for a single pipeline.
- connectivity_result: ValidationResult¶
- resource_result: ValidationResult¶
- config_result: ValidationResult¶
- class extral.validation.ValidationReport(overall_valid: bool = False, pipeline_results: ~typing.Dict[str, ~extral.validation.PipelineValidationResult] = <factory>, global_conflicts: ~extral.validation.ValidationResult = <factory>, summary: ~typing.Dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
objectComprehensive validation report for all pipelines.
- pipeline_results: Dict[str, PipelineValidationResult]¶
- global_conflicts: ValidationResult¶
- class extral.validation.PipelineValidator[source]¶
Bases:
objectComprehensive pipeline validation system.
- validate_configuration(config: Config) ValidationReport[source]¶
Perform comprehensive validation of the entire configuration.
- Parameters:
config – Configuration object to validate
- Returns:
ValidationReport with detailed results
- extral.validation.format_validation_report(report: ValidationReport) str[source]¶
Format a validation report for human-readable output.
- Parameters:
report – Validation report to format
- Returns:
Formatted string representation of the report
Utility Modules¶
Database Module¶
Database Type Translator Module Translates data types between MySQL and PostgreSQL databases
- class extral.database.DatabaseTypeTranslator[source]¶
Bases:
objectTranslates database column types between MySQL and PostgreSQL
- translate(type_str: str, from_db: str, to_db: str) str[source]¶
Translate a type string from one database to another
- Parameters:
type_str – The type string to translate (e.g., ‘varchar(255)’, ‘int unsigned’)
from_db – Source database (‘mysql’ or ‘postgresql’)
to_db – Target database (‘mysql’ or ‘postgresql’)
- Returns:
Translated type string
- extral.database.translate_type(type_str: str, from_db: str, to_db: str) str[source]¶
Translate a database type from one system to another
- Parameters:
type_str – The type string to translate
from_db – Source database (‘mysql’ or ‘postgresql’)
to_db – Target database (‘mysql’ or ‘postgresql’)
- Returns:
Translated type string
Encoder Module¶
- class extral.encoder.CustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Bases:
JSONEncoder- default(o: Any)[source]¶
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
Error Tracking Module¶
Error tracking and reporting for the Extral ETL tool.
- class extral.error_tracking.ErrorDetails(pipeline: str, dataset: str, operation: str, error_type: str, error_message: str, timestamp: str = <factory>, duration_seconds: float | None = None, records_processed: int | None = None, retry_count: int = 0, stack_trace: str | None = None)[source]¶
Bases:
objectDetails of a single error occurrence.
- class extral.error_tracking.ErrorReport(start_time: str, end_time: str | None = None, total_pipelines: int = 0, successful_pipelines: int = 0, failed_pipelines: int = 0, total_datasets: int = 0, successful_datasets: int = 0, failed_datasets: int = 0, errors: ~typing.List[~extral.error_tracking.ErrorDetails] = <factory>)[source]¶
Bases:
objectSummary report of all errors during ETL execution.
- errors: List[ErrorDetails]¶
- add_error(error: ErrorDetails)[source]¶
Add an error to the report.
- class extral.error_tracking.ErrorTracker[source]¶
Bases:
objectTracks errors during ETL execution.
- track_error(pipeline: str, dataset: str, operation: str, exception: Exception, duration_seconds: float | None = None, records_processed: int | None = None, retry_count: int = 0, include_stack_trace: bool = False)[source]¶
Track an error occurrence.
- finalize_report(total_pipelines: int, successful_pipelines: int, total_datasets: int, successful_datasets: int)[source]¶
Finalize the error report with summary statistics.
- has_errors_for_dataset(pipeline: str, dataset: str) bool[source]¶
Check if a dataset has any errors.
- get_errors_for_pipeline(pipeline: str) List[ErrorDetails][source]¶
Get all errors for a specific pipeline.
Exceptions Module¶
Custom exceptions for the Extral ETL tool.
- exception extral.exceptions.ExtralException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExceptionBase exception for all Extral-specific errors.
- exception extral.exceptions.ExtractException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExtralExceptionRaised when data extraction fails.
- exception extral.exceptions.LoadException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExtralExceptionRaised when data loading fails.
- exception extral.exceptions.ConnectionException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExtralExceptionRaised when database connection fails.
- exception extral.exceptions.ConfigurationException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExtralExceptionRaised when configuration is invalid or missing.
- exception extral.exceptions.StateException(message: str, pipeline: str | None = None, dataset: str | None = None, operation: str | None = None, details: Dict[str, Any] | None = None)[source]¶
Bases:
ExtralExceptionRaised when state management operations fail.