Source code for extral.connectors.file.json_connector

"""JSON file connector implementation."""

import json
import logging
from pathlib import Path
from typing import Any, Dict, Generator, List

from extral.config import ExtractConfig, FileItemConfig, LoadConfig, LoadStrategy
from extral.connectors.file.base import FileConnector
from extral.connectors.file.utils import get_file_handle
from extral.database import DatabaseRecord

logger = logging.getLogger(__name__)


[docs] class JSONConnector(FileConnector): """ JSON file connector for reading and writing JSON files. Supports both JSON array format and JSON Lines format. Supports both local files and HTTP/HTTPS URLs for extraction. """ def __init__(self, config: FileItemConfig): """ Initialize JSON connector. Args: config: FileItemConfig with JSON-specific options """ super().__init__(config) # JSON format: "array" (default) or "jsonl" (JSON Lines) self.json_format = self.config.options.get("format", "array")
[docs] def extract_data( self, dataset_name: str, extract_config: ExtractConfig, ) -> Generator[list[DatabaseRecord], None, None]: """ Extract data from JSON file. Args: dataset_name: File path or identifier extract_config: Extraction configuration Yields: Batches of database records """ path = self.get_effective_path(dataset_name) batch_size = extract_config.batch_size or 50000 logger.info( f"Extracting data from JSON file: {path} (format: {self.json_format})" ) with get_file_handle(path, "r") as file_path: if self.json_format == "jsonl": # JSON Lines format - one JSON object per line batch: list[DatabaseRecord] = [] row_count = 0 with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: record = json.loads(line) # Convert to DatabaseRecord format (all values as strings) db_record = self._convert_to_database_record(record) batch.append(db_record) row_count += 1 if len(batch) >= batch_size: logger.debug(f"Yielding batch of {len(batch)} records") yield batch batch = [] except json.JSONDecodeError as e: logger.error(f"Error parsing JSON line: {e}") raise # Yield remaining records if batch: logger.debug(f"Yielding final batch of {len(batch)} records") yield batch logger.info(f"Extracted {row_count} records from JSON Lines file") else: # Array format - entire file is a JSON array with open(file_path, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError as e: logger.error(f"Error parsing JSON file: {e}") raise if not isinstance(data, list): raise ValueError(f"Expected JSON array, got {type(data).__name__}") data_batch: list[DatabaseRecord] = [] row_count = 0 for record in data: db_record = self._convert_to_database_record(record) data_batch.append(db_record) row_count += 1 if len(data_batch) >= batch_size: logger.debug(f"Yielding batch of {len(data_batch)} records") yield data_batch data_batch = [] # Yield remaining records if data_batch: logger.debug(f"Yielding final batch of {len(data_batch)} records") yield data_batch logger.info(f"Extracted {row_count} records from JSON array file")
[docs] def load_data( self, dataset_name: str, data: list[DatabaseRecord], load_config: LoadConfig, ) -> None: """ Load data to JSON file. Args: dataset_name: Target file path data: Records to write load_config: Load configuration (only REPLACE strategy supported) """ if self.config.http_path: raise ValueError("Cannot write to HTTP/HTTPS URLs") path = self.get_effective_path(dataset_name) strategy = load_config.strategy if strategy != LoadStrategy.REPLACE: raise ValueError( f"JSON connector only supports REPLACE strategy, got {strategy}" ) logger.info( f"Loading {len(data)} records to JSON file: {path} (format: {self.json_format})" ) file_path = Path(path) if self.json_format == "jsonl": # JSON Lines format with open(file_path, "w", encoding="utf-8") as f: for record in data: # Convert back from DatabaseRecord format json_record = self._convert_from_database_record(record) f.write(json.dumps(json_record) + "\n") else: # Array format records = [self._convert_from_database_record(record) for record in data] with open(file_path, "w", encoding="utf-8") as f: json.dump(records, f, indent=2) logger.info(f"Successfully wrote {len(data)} records to JSON file")
[docs] def infer_schema(self, dataset_name: str) -> Dict[str, Dict[str, Any]]: """ Infer schema from JSON file by sampling records. Args: dataset_name: File path Returns: Schema dictionary with column names and types """ path = self.get_effective_path(dataset_name) schema: Dict[str, Dict[str, Any]] = {} with get_file_handle(path, "r") as file_path: sample_records: List[Dict[str, Any]] = [] if self.json_format == "jsonl": # Read first few lines with open(file_path, "r", encoding="utf-8") as f: for i, line in enumerate(f): if i >= 100: # Sample first 100 records break line = line.strip() if line: sample_records.append(json.loads(line)) else: # Read entire array (or first part if large) with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): sample_records = data[:100] # Sample first 100 records # Infer schema from sample records for record in sample_records: for key, value in record.items(): if key not in schema: schema[key] = {"type": self._infer_type(value), "nullable": False} elif value is None: schema[key]["nullable"] = True return schema
def _convert_to_database_record(self, record: Dict[str, Any]) -> DatabaseRecord: """Convert JSON record to DatabaseRecord format (all values as strings).""" db_record: DatabaseRecord = {} for key, value in record.items(): if value is None: db_record[key] = None elif isinstance(value, (dict, list)): # Convert complex types to JSON strings db_record[key] = json.dumps(value) else: # Convert primitive types to strings db_record[key] = str(value) return db_record def _convert_from_database_record(self, record: DatabaseRecord) -> Dict[str, Any]: """Convert DatabaseRecord back to JSON-compatible format.""" json_record: Dict[str, Any] = {} for key, value in record.items(): if value is None: json_record[key] = None else: # Try to parse as JSON first (for nested objects/arrays) try: parsed = json.loads(value) if isinstance(parsed, (dict, list)): json_record[key] = parsed else: json_record[key] = value except (json.JSONDecodeError, TypeError): # Keep as string if not valid JSON json_record[key] = value return json_record def _infer_type(self, value: Any) -> str: """Infer SQL type from Python value.""" if value is None: return "VARCHAR(255)" elif isinstance(value, bool): return "BOOLEAN" elif isinstance(value, int): # Check if it's a large integer if abs(value) > 2147483647: return "BIGINT" else: return "INTEGER" elif isinstance(value, float): return "DOUBLE PRECISION" elif isinstance(value, str): # Check string length if len(value) > 255: return "TEXT" else: return "VARCHAR(255)" elif isinstance(value, (dict, list)): return "JSON" else: return "VARCHAR(255)"