ETL-Factory

ETL pipeline composition and abstract factory.

This module defines ETLPipeline which orchestrates the Extract-Validate- Transform-Load steps, and ETLFactory that builds the pipeline components for a concrete dataset configuration.

ETLPipeline.run() returns an ETLResult describing the outcome of the job. Data-quality failures (validation, missing/malformed inputs) are reported via status='failed' rather than as exceptions. Programmer errors (unexpected KeyError/AttributeError/TypeError etc. from user-supplied components) still propagate so that real defects are not masked.

class algomancy_data.etl.ETLResult(status, datasource=None, validation_result=None, raised=None)[source]

Bases: object

Structured outcome of an ETLPipeline.run() invocation.

status: Literal['success', 'failed']

'success' if the run completed and validation passed; 'failed' if a data-quality issue was detected.

datasource: BASEDATASOURCE | None = None

Loaded destination object (None on failure).

validation_result: ValidationResult | None = None

Messages and counts from the validation step. Always present, even when extraction never produced data.

raised: Exception | None = None

Original exception when a recognised data-quality exception was caught and converted to a failure. None otherwise. Programmer errors are not captured here — they propagate from run() unchanged.

property is_success: bool
property is_failure: bool
property messages: List[ValidationMessage]

Convenience accessor for validation_result.messages.

class algomancy_data.etl.ETLPipeline(destination_name, extraction_sequence, validation_sequence, transformation_sequence, loader, logger)[source]

Bases: object

Coordinates a single end-to-end ETL job.

run()[source]

Execute the ETL job and return an ETLResult.

Orchestrates Extraction → Validation → Transformation → Load.

Returns:

status='success' with a loaded datasource when the job completes and validation passes; status='failed' (with messages on validation_result) when a data-quality issue is detected.

Return type:

ETLResult

Raises:

Exception – Programmer errors (KeyError, AttributeError, TypeError and anything else not classified as an expected data-quality failure) propagate so that real defects are not masked. Use validators for data-quality checks instead.

exception algomancy_data.etl.ETLConstructionError(message)[source]

Bases: Exception

Raised when the ETL pipeline cannot be constructed.

class algomancy_data.etl.ETLFactory(schemas, logger=None)[source]

Bases: ABC

Factory that constructs ETL sequences and loader.

Provides sensible defaults so that subclasses can override only the pieces they need to customise. The previous fully-abstract contract is preserved by the fact that all four create_* methods are still overridable — they are simply no longer @abstractmethod.

property schemas_dct: Dict[str, Schema]

Return a mapping from file name to its schema.

get_schema(file_name)[source]

Return schema(s) for the given file name based on configuration.

Parameters:

file_name (str) – Logical file name as defined in a schema.

Returns:

Schema or mapping of sub-name to Schema depending on the configuration type (single or multi).

Raises:

ETLConstructionError – If no configuration exists or it is invalid.

Return type:

Dict[str, Schema] | Schema

create_extraction_sequence(files=None)[source]

Default extractor wiring keyed off the registry.

For each File in files looks up the matching schema by name and selects an extractor class via get_extractor_class on (extension, schema_type). Override only when you need non-default extractor parameters (e.g. CSV separator).

Raises:

ETLConstructionError – If no extractor is registered for a schema’s (extension, schema_type) pair.

create_validation_sequence()[source]

Default validation sequence using the new built-in validators.

Includes (in order): RequiredColumnsValidator, SchemaValidator, and PrimaryKeyValidator. The PK validator skips schemas with no declared primary key internally (and decomposes MULTI schemas into per-group synthetic SINGLE schemas via _schema_table_map), so it is safe to append unconditionally. Subclasses can override to add domain-specific validators.

create_transformation_sequence()[source]

Default to a no-op transformation step. Override to customise.

create_loader()[source]

Default loader materialises a DataSource.

build_pipeline(dataset_name, files, logger=None)[source]

Assemble and return an ETLPipeline instance.

Parameters:
  • dataset_name (str) – Destination dataset name.

  • files (Dict[str, File]) – Mapping of logical file names to File objects.

  • logger (Logger | None) – Logger for the pipeline; falls back to self.logger.

Returns:

ETLPipeline ready to run.

Return type:

ETLPipeline

class algomancy_data.etl.SimpleETLFactory(schemas, transformers=None, loader=None, logger=None)[source]

Bases: ETLFactory

Concrete factory for the common zero-subclass case.

Lets users build a working pipeline by simply pointing schemas at files. Custom transformers and/or a custom loader can be supplied without subclassing; everything else is inherited from ETLFactory’s defaults.

Parameters:
  • schemas (List[Type[Schema]]) – List of Schema classes describing the input files.

  • transformers (Optional[List[Transformer]]) – Optional ordered list of Transformer instances. Defaults to [NoopTransformer()].

  • loader (Optional[Loader]) – Optional custom Loader. Defaults to DataSourceLoader.

  • logger (Optional[Logger]) – Optional logger forwarded to extractors/validators.

create_transformation_sequence()[source]

Default to a no-op transformation step. Override to customise.

create_loader()[source]

Default loader materialises a DataSource.