Transformer

Transformation primitives for ETL pipelines.

Defines the abstract Transformer contract and a few simple concrete transformers, as well as a TransformationSequence to compose multiple transformers into a single pipeline step.

class algomancy_data.transformer.Transformer(name='Abstract Transformer', logger=None)[source]

Bases: ABC

Base class for a transformation step operating on tabular data.

Subclasses implement transform and can mutate the provided mapping of DataFrames in-place or return a new mapping where applicable.

messages

ValidationMessages produced by this transformer during its most recent transform invocation. The ETL pipeline collects these from each transformer in the sequence and folds them into the run’s ValidationResult so they surface via ETLResult.messages.

abstractmethod transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

algomancy_data.transformer.fill_empty(data)[source]

Forward-fill missing values across columns in a single row.

Parameters:

data (DataFrame) – DataFrame to fill.

Returns:

DataFrame with values forward-filled along axis=1.

Return type:

DataFrame

algomancy_data.transformer.drop_empty(data)[source]

Drop rows containing any NA values.

Parameters:

data (DataFrame) – Input DataFrame.

Returns:

DataFrame without rows containing NA values.

Return type:

DataFrame

class algomancy_data.transformer.NoopTransformer(logger=None)[source]

Bases: Transformer

Transformer that returns the input data unchanged.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

class algomancy_data.transformer.CleanTransformer(logger=None)[source]

Bases: Transformer

Basic cleanup: drop NA rows and normalize column names to lowercase.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

class algomancy_data.transformer.JoinTransformer(left, right, on, output, logger=None)[source]

Bases: Transformer

Join two input tables and write the result to a new table key.

left

Name of the left table to join.

right

Name of the right table to join.

on

Column name to join on.

output

Key under which the merged table is stored.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

class algomancy_data.transformer.CascadeDropTransformer(schemas=None, extra_relations=None, snapshot=None, name='Cascade drop transformer', logger=None)[source]

Bases: Transformer

Drop rows whose declared foreign-key relations are unsatisfied.

Reads relations from supplied schemas (default source of truth) and optionally merges extra_relations on top. Iterates to fixpoint, on each pass applying:

  1. Orphan-child drop (always on) — drop child rows whose FK tuple is not in the parent’s referenced column set.

  2. Required-child parent drop — for relations with parent_requires_child=True: drop parent rows whose PK doesn’t appear in any child’s FK column.

Aggregated ValidationMessage``s are emitted with :class:`ValidationSeverity.ERROR` one per ``(table, rule, relation) with the dropped row count.

Parameters:
  • schemas (Sequence[Type[Schema]] | None) – Schemas whose Column.foreign_key declarations supply the default relation set.

  • extra_relations (Sequence[Relation] | None) – Additional or override relations; override wins on matching (child_table, child_cols).

  • snapshot (CascadeSnapshot | None) – Optional CascadeSnapshot paired transformer. Used for partial-loss detection (see CascadeSnapshot).

  • name (str) – Override the transformer’s display name.

  • logger – Optional logger.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

class algomancy_data.transformer.CascadeSnapshot(schemas=None, extra_relations=None, logger=None)[source]

Bases: Transformer

Captures referenced-child counts for partial-loss cascade detection.

A read-only transformer that, for every relation flagged track_partial_loss=True, records the number of referencing children per parent row. Paired with CascadeDropTransformer (passed via its snapshot= argument) to enable the partial-loss drop rule.

Place this transformer before any drop-capable transformer so it captures the pre-cleanup baseline.

Parameters:
  • schemas (Sequence[Type[Schema]] | None) – Schemas whose foreign-key declarations supply the relation set. Only relations with track_partial_loss=True are tracked.

  • extra_relations (Sequence[Relation] | None) – Additional or override relations.

  • logger – Optional logger.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

counts_for(relation)[source]

Return captured {parent_key_tuple: count} for the relation, or None.

class algomancy_data.transformer.OptionalColumnGuard(schemas, logger=None)[source]

Bases: Transformer

Materialise missing optional columns using each Column.default.

Injects missing optional columns into the corresponding DataFrame in-place, using Column.default and coercing to the declared dtype. Downstream code can then assume the full schema is present.

_schemas

Schemas whose optional columns may be injected.

transform(data)[source]

Apply the transformation to the provided data.

Parameters:

data (dict[str, DataFrame]) – Mapping from table name to pandas DataFrame. Implementations may mutate this mapping in place or create/replace entries.

class algomancy_data.transformer.TransformationSequence(transformers=None, logger=None)[source]

Bases: object

A sequence of transformers executed in order.

add_transformer(transformer)[source]

Append a single transformer to the sequence.

add_transformers(transformers)[source]

Append multiple transformers to the sequence.

run_transformation(data)[source]

Run all transformers sequentially on a deepcopy of data.

Parameters:

data (dict[str, DataFrame]) – Mapping of tables to DataFrames.

Returns:

Transformed copy of the input mapping.

Return type:

dict[str, pd.DataFrame]

collect_messages()[source]

Aggregate ``ValidationMessage``s produced by all transformers.

Returns messages produced during the most recent run_transformation() invocation, in transformer order.