Source code for algomancy_data.loader
"""Loader interfaces for materializing data objects.
This module defines the abstract ``Loader`` contract and a concrete
``DataSourceLoader`` implementation that turns a mapping of pandas
DataFrames into a ``DataSource`` while preserving validation messages.
"""
from abc import ABC, abstractmethod
from typing import List, Dict
import pandas as pd
from .validator import ValidationMessage
from .datasource import DataClassification, DataSource, BASEDATASOURCE
[docs]
class Loader(ABC):
"""Abstract interface for loading transformed data into a destination."""
def __init__(self, logger) -> None:
self.logger = logger
[docs]
@abstractmethod
def load(
self,
name: str,
data: Dict[str, pd.DataFrame],
validation_messages: List[ValidationMessage],
ds_type: DataClassification, # -- todo remove input argument: ETL'd data should always be master?
) -> BASEDATASOURCE:
"""Create the destination object from transformed data.
Args:
name: Logical name of the dataset/destination.
data: Mapping from table name to pandas DataFrame.
validation_messages: Messages collected during validation.
ds_type: Classification of the destination data.
Returns:
BASEDATASOURCE: A destination object containing the data.
"""
raise NotImplementedError
[docs]
class DataSourceLoader(Loader):
"""Loader that builds and populates a ``DataSource``."""
[docs]
def load(
self,
name: str,
data: dict[str, pd.DataFrame],
validation_messages: List[ValidationMessage],
ds_type: DataClassification = DataClassification.MASTER_DATA,
) -> DataSource:
"""Instantiate a ``DataSource`` and populate it with tables.
Args:
name: Name of the resulting data source.
data: Mapping of table names to DataFrames.
validation_messages: Messages collected during validation.
ds_type: Data classification for the data source.
Returns:
DataSource: The populated data source.
"""
datasource = DataSource(
ds_type=ds_type,
name=name,
validation_messages=validation_messages,
)
if self.logger:
self.logger.log("Loading data into DataSource")
for name, df in data.items():
datasource.add_table(name, df)
return datasource