Extract, Transform, Load¶
The ETL engine orchestrates the movement of data from source files into a
DataSource. It uses small composable Sequences for the extraction,
validation, and transformation stages and returns a structured
ETLResult you can inspect.
Important
Versions 0.6+ introduce a number of API changes covered by the
migration guide. At a glance: schemas now use Column
instances, validators emit structured messages, ETLPipeline.run()
returns an ETLResult instead of raising, and SimpleETLFactory covers
the common zero-subclass case.
Pipeline components¶
ETLFactory.build_pipeline(name, files)
│
▼
ExtractionSequence ─▶ ValidationSequence ─▶ TransformationSequence ─▶ Loader
│ │ │
└────── dtype coercion ─────── ValidationMessage[] ──────── DataSource
Component |
Role |
|---|---|
|
Declares columns, dtypes, primary keys, and the source file metadata. |
|
Reads one file (or one sheet) and applies dtype coercion. |
|
Emits structured |
|
Reshapes the validated data into the final layout. |
|
Materialises the data as a |
|
Composes the four sequences for one dataset. |
|
Runs the composed pipeline and returns an |
Quickstart with SimpleETLFactory¶
For most projects you no longer need to subclass ETLFactory. Point
SimpleETLFactory at your schemas and it will pick the right extractors
from the registry and assemble sensible default validators
(RequiredColumnsValidator, SchemaValidator, and
PrimaryKeyValidator when a primary key is declared).
from algomancy_data import (
Column,
DataType,
FileExtension,
Schema,
SimpleETLFactory,
)
from algomancy_data.schema import SchemaType
class OrdersSchema(Schema):
_FILENAME = "orders"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
QTY = Column(name="qty", dtype=DataType.INTEGER)
DISCOUNT = Column(name="discount", dtype=DataType.FLOAT, optional=True, default=0.0)
factory = SimpleETLFactory([OrdersSchema])
result = factory.build_pipeline("orders_2026", files={"orders": orders_file}).run()
if result.is_success:
use(result.datasource)
else:
for message in result.messages:
print(message)
Schemas¶
Schemas are declared with Column instances as class attributes. Each
Column carries its name, dtype, and optional metadata
(optional, primary_key, default, nullable, unique,
description):
from algomancy_data import Column, DataType, FileExtension, Schema
from algomancy_data.schema import SchemaType
class OrdersSchema(Schema):
_FILENAME = "orders"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
QTY = Column(name="qty", dtype=DataType.INTEGER)
DATE = Column(name="order_date", dtype=DataType.DATETIME)
Use OrdersSchema.columns(), required_columns(), optional_columns(),
and primary_key() to introspect the schema. The legacy
_DATATYPES = {...} form still works but emits a DeprecationWarning.
Extractors¶
The framework ships extractors for CSV, JSON, and XLSX (single and
multi-sheet). ETLFactory.create_extraction_sequence() picks the right
one for each schema based on a (FileExtension, SchemaType) lookup in
the extractor registry.
Extractor |
Description |
|---|---|
|
Single table from a CSV. |
|
Single table from a JSON file. |
|
Multiple related tables from a nested JSON file. |
|
Single sheet from an XLSX file. |
|
Several sheets from an XLSX file. |
|
A pre-built |
You only need to override create_extraction_sequence() if you need
custom parameters (e.g. a non-default CSV separator):
from algomancy_data import (
CSVSingleExtractor,
ETLFactory,
ExtractionSequence,
)
class MyETLFactory(ETLFactory):
def create_extraction_sequence(self, files):
seq = ExtractionSequence(logger=self.logger)
seq.add_extractor(
CSVSingleExtractor(
file=files["orders"],
schema=self.get_schema("orders"),
separator=",",
)
)
return seq
Validators¶
Validators emit ValidationMessages with structured location fields
(table, column, row, code). The ValidationSequence aggregates
them and returns a ValidationResult whose is_valid is governed by a
configurable halt_on severity (default CRITICAL).
Built-in validators:
Validator |
Purpose |
|---|---|
|
Every required column is present per schema. |
|
dtype + unexpected-column check per schema. |
|
Uniqueness + non-null over each |
|
Per-column unique / null checks. |
|
Cross-table referential integrity. |
|
Each extracted DataFrame is non-empty. |
The defaults from ETLFactory.create_validation_sequence() already
cover the most common needs; add your own with:
from algomancy_data import (
ETLFactory,
ForeignKeyValidator,
ValidationSeverity,
)
class MyETLFactory(ETLFactory):
def create_validation_sequence(self):
seq = super().create_validation_sequence()
seq.add_validator(
ForeignKeyValidator("order_lines", "product_id", "products", "id")
)
seq.halt_on = ValidationSeverity.ERROR
return seq
Writing a custom validator¶
from algomancy_data import Validator, ValidationSeverity
class NonNegativeQuantityValidator(Validator):
def __init__(self, table: str, column: str = "qty") -> None:
super().__init__()
self.table = table
self.column = column
def validate(self, data):
df = data[self.table]
negatives = df.index[df[self.column] < 0].tolist()
for row in negatives:
self.add_message(
ValidationSeverity.ERROR,
f"Negative quantity in {self.table}.{self.column}",
table=self.table,
column=self.column,
row=int(row),
code="NEGATIVE_QTY",
)
return self.messages
Transformers¶
Transformers run after validation succeeds and reshape the data into the form expected downstream. The framework provides:
Transformer |
Purpose |
|---|---|
|
Pass-through; no changes. |
|
dropna + lowercase columns. |
|
Joins tables together. |
|
Injects missing optional columns using |
Subclass Transformer for project-specific reshaping.
Important
Programmer errors raised from inside a transformer (KeyError,
AttributeError, TypeError …) propagate from ETLPipeline.run() — they
are not converted to ETLResult(status='failed') because they indicate
real defects rather than data-quality issues.
Relational cascade cleanup¶
Real-world input data is often incomplete: an order may reference a
product that wasn’t in the latest export, a parent record may have lost
all its children mid-pipeline, and so on. The CascadeDropTransformer
declaratively cleans up such inconsistencies by walking foreign-key
relations declared on your schemas and dropping rows whose references
are unsatisfied. Drops surface as aggregated ValidationMessages with
Severity.ERROR — visible but non-halting by default.
Declaring relations on Column¶
Foreign-key relations live on the child column. Two opt-in flags control parent-side cascade behavior:
Field |
Purpose |
|---|---|
|
Declares the FK reference. |
|
Drop the parent when it has zero referencing children on this relation. |
|
Enable partial-loss detection (drop the parent when it loses some of its children mid-pipeline). Requires a paired |
from algomancy_data import Column, DataType, FileExtension, Schema
from algomancy_data.schema import SchemaType
class ProductSchema(Schema):
_FILENAME = "product"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
class OrderSchema(Schema):
_FILENAME = "order"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
PRODUCT_ID = Column(
name="product_id",
dtype=DataType.STRING,
foreign_key=("product", "id"),
parent_requires_child=True,
)
The three drop rules¶
CascadeDropTransformer.transform(data) iterates to fixpoint, applying
on each pass:
Rule |
Code |
Trigger |
|---|---|---|
Orphan-child drop |
|
Child row’s FK is not in the parent’s referenced column set. Always on. |
Required-child parent drop |
|
Parent row has zero referencing children. Opt-in via |
Partial-loss parent drop |
|
Parent row lost some (but not all) of its children vs. a baseline snapshot. Opt-in via |
NULL values in an FK column are treated as “no reference” and never
trigger an orphan drop. Combine with nullable=False on the column if
you want NULLs to be rejected by MissingValueValidator instead.
Wiring it into the pipeline¶
CascadeDropTransformer is a regular Transformer — add it to the
transformation sequence:
from algomancy_data import (
CascadeDropTransformer,
SimpleETLFactory,
)
factory = SimpleETLFactory(
schemas=[ProductSchema, OrderSchema],
transformers=[CascadeDropTransformer(schemas=[ProductSchema, OrderSchema])],
)
result = factory.build_pipeline("orders_2026", files).run()
for message in result.messages:
if message.code and message.code.startswith("CASCADE_"):
print(message)
# e.g. "ERROR: 3 row(s) dropped from 'order' by CASCADE_ORPHAN_DROP …"
Partial-loss with CascadeSnapshot¶
Partial-loss detection compares the current child-counts to a baseline
captured before any drop-capable transformer runs. Place a
CascadeSnapshot early in the transformation sequence and pass it to
the cascade transformer:
from algomancy_data import CascadeDropTransformer, CascadeSnapshot
snapshot = CascadeSnapshot(schemas=[ProductSchema, OrderSchema])
factory = SimpleETLFactory(
schemas=[ProductSchema, OrderSchema],
transformers=[
snapshot,
# ... any user transformers that may drop rows go here ...
CascadeDropTransformer(
schemas=[ProductSchema, OrderSchema],
snapshot=snapshot,
),
],
)
The snapshot is a no-op on data — it only records baselines — so it is safe to leave in place even when no other transformer drops rows.
Overriding schema-derived relations¶
If you need to add or override a relation without changing the schema,
pass extra_relations= to the transformer. Overrides match on
(child_table, child_cols) and replace any schema-derived entry with
the same key:
from algomancy_data import CascadeDropTransformer, Relation
CascadeDropTransformer(
schemas=[ProductSchema, OrderSchema],
extra_relations=[
Relation("order", ("product_id",), "product", ("id",),
parent_requires_child=False), # turn the schema flag off
],
)
Auto-wiring ForeignKeyValidator from the same declarations¶
ForeignKeyValidator.from_schemas([...]) returns one validator per
relation, derived from the same Column.foreign_key declarations. Use
it together with CascadeDropTransformer so that violations are
reported (validator) and cleaned (transformer) without duplicating the
relation list:
from algomancy_data import ForeignKeyValidator
validators = ForeignKeyValidator.from_schemas([ProductSchema, OrderSchema])
Loader¶
DataSourceLoader is the default and produces a DataSource whose
validation_messages carry the messages collected during validation.
Subclass Loader if you need a custom destination type.
ETLPipeline.run() and ETLResult¶
ETLPipeline.run() never raises on data-quality problems. Instead it
returns an ETLResult:
@dataclass
class ETLResult:
status: Literal["success", "failed"]
datasource: BaseDataSource | None
validation_result: ValidationResult | None
raised: Exception | None
Convenience accessors: result.is_success, result.is_failure,
result.messages, and result.validation_result.as_dataframe().
Putting it all together¶
from algomancy_data import (
Column,
DataType,
FileExtension,
ForeignKeyValidator,
Schema,
SimpleETLFactory,
)
from algomancy_data.schema import SchemaType
class OrdersSchema(Schema):
_FILENAME = "orders"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
PRODUCT_ID = Column(name="product_id", dtype=DataType.STRING)
class ProductsSchema(Schema):
_FILENAME = "products"
_EXTENSION = FileExtension.CSV
_SCHEMA_TYPE = SchemaType.SINGLE
ID = Column(name="id", dtype=DataType.STRING, primary_key=True)
class OrdersFactory(SimpleETLFactory):
def create_validation_sequence(self):
seq = super().create_validation_sequence()
seq.add_validator(
ForeignKeyValidator("orders", "product_id", "products", "id")
)
return seq
factory = OrdersFactory([OrdersSchema, ProductsSchema])
result = factory.build_pipeline("Q1", files=files).run()