Data intake

An Algomancy app reads data through an ETL pipeline. The quickstart wizard has already generated the folder structure, schemas, and ETL factory for us. In this section we review the generated files, then write the custom transformation and loading logic for the TSP model.

Review the generated schemas

Open src/data_handling/generated_schemas.py. The wizard scanned the three input files and created a Schema subclass for each one, with inferred column names and types:

Code
generated_schemas.py (as generated)
 1from typing import Dict
 2from algomancy_data import Schema, DataType, FileExtension
 3from algomancy_data.schema import SchemaType
 4
 5
 6class DcSchema(Schema):
 7    _FILENAME = "dc"
 8    _EXTENSION = FileExtension.XLSX
 9    _SCHEMA_TYPE = SchemaType.SINGLE
10
11    ID = "ID"
12    X = "x"
13    Y = "y"
14
15    def _defined_datatypes(self) -> Dict[str, DataType]:
16        return {
17            DcSchema.ID: DataType.STRING,
18            DcSchema.X: DataType.INTEGER,
19            DcSchema.Y: DataType.INTEGER,
20        }
21
22
23dc_schema = DcSchema()
24
25
26class OtherlocationsSchema(Schema):
27    _FILENAME = "otherlocations"
28    _EXTENSION = FileExtension.XLSX
29    _SCHEMA_TYPE = SchemaType.MULTI
30
31    ID = "ID"
32    X = "x"
33    Y = "y"
34
35    def _defined_datatypes(self) -> Dict[str, Dict[str, DataType]]:
36        return {
37            "customer": {
38                OtherlocationsSchema.ID: DataType.STRING,
39                OtherlocationsSchema.X: DataType.INTEGER,
40                OtherlocationsSchema.Y: DataType.INTEGER,
41            },
42            "xdock": {
43                OtherlocationsSchema.ID: DataType.STRING,
44                OtherlocationsSchema.X: DataType.INTEGER,
45                OtherlocationsSchema.Y: DataType.INTEGER,
46            },
47        }
48
49
50otherlocations_schema = OtherlocationsSchema()
51
52
53class StoresSchema(Schema):
54    _FILENAME = "stores"
55    _EXTENSION = FileExtension.CSV
56    _SCHEMA_TYPE = SchemaType.SINGLE
57
58    ID = "ID"
59    X = "x"
60    Y = "y"
61
62    def _defined_datatypes(self) -> Dict[str, DataType]:
63        return {
64            StoresSchema.ID: DataType.STRING,
65            StoresSchema.X: DataType.INTEGER,
66            StoresSchema.Y: DataType.INTEGER,
67        }
68
69
70stores_schema = StoresSchema()
71
72
73all_schemas = [
74    dc_schema,
75    otherlocations_schema,
76    stores_schema,
77]

Note

The fields _FILENAME, _EXTENSION, and _SCHEMA_TYPE are required — an exception is raised at construction if any are missing.

Important

A single Schema corresponds to a single file. When a file contains more than one data table (e.g., multiple Excel sheets), set _SCHEMA_TYPE to MULTI and return a nested dictionary from _defined_datatypes, as done for OtherlocationsSchema above. The outer-dictionary keys must match the table identifiers (e.g., the sheet names of an xlsx).

Tip

Defining column names as class variables (e.g., ID = "ID") is not strictly necessary, but it makes the code more readable, prevents typos, and lets your IDE assist with autocompletion — especially when column names are long or appear in many places.

Review the generated ETL factory

Open src/data_handling/etl_factory.py. The wizard created a TSPETLFactory with extractors already configured for each input file:

Code
etl_factory.py (as generated)
 1from typing import Dict
 2
 3import algomancy_data as de
 4from algomancy_data import File
 5from algomancy_data.extractor import (
 6    ExtractionSequence,
 7    CSVSingleExtractor,
 8    XLSXMultiExtractor,
 9    XLSXSingleExtractor,
10)
11from algomancy_data.transformer import TransformationSequence
12from algomancy_utils import Logger
13
14from src.data_handling.generated_schemas import all_schemas
15from src.data_handling.generated_schemas import dc_schema, otherlocations_schema, stores_schema
16
17
18class TSPETLFactory(de.ETLFactory):
19
20    def __init__(self, schemas, logger: Logger = None):
21        super().__init__(schemas, logger)
22
23    def create_extraction_sequence(self, files: Dict[str, File]) -> ExtractionSequence:
24        sequence = ExtractionSequence(logger=self.logger)
25
26        # Extract dc
27        sequence.add_extractor(
28            XLSXSingleExtractor(
29                file=files["dc"],
30                schema=dc_schema,
31                sheet_name="Sheet1",
32                logger=self.logger,
33            )
34        )
35
36        # Extract otherlocations
37        sequence.add_extractor(
38            XLSXMultiExtractor(
39                file=files["otherlocations"],
40                schema=otherlocations_schema,
41                logger=self.logger,
42            )
43        )
44
45        # Extract stores
46        sequence.add_extractor(
47            CSVSingleExtractor(
48                file=files["stores"],
49                schema=stores_schema,
50                logger=self.logger,
51                separator=",",
52            )
53        )
54
55        return sequence
56
57    def create_transformation_sequence(self) -> TransformationSequence:
58        # TODO: Add transformers to process your data.
59        return TransformationSequence(logger=self.logger)
60
61    def create_validation_sequence(self) -> de.ValidationSequence:
62        vs = de.ValidationSequence(logger=self.logger)
63        vs.add_validator(de.ExtractionSuccessVerification())
64        vs.add_validator(
65            de.SchemaValidator(
66                schemas=self.schemas,
67                severity=de.ValidationSeverity.CRITICAL,
68            )
69        )
70        return vs
71
72    def create_loader(self) -> de.Loader:
73        # TODO: Customize if you need a custom data container.
74        return de.DataSourceLoader(self.logger)

An ETL factory has four responsibilities:

  1. Extract — read the input files as configured by the schemas.

  2. Validate — run validations on the extracted data.

  3. Transform — reshape the extracted DataFrames into the form needed for loading.

  4. Load — build the application data model from the transformed data.

Extraction and validation are already complete. We now need to replace the placeholder create_transformation_sequence and create_loader with TSP-specific implementations.

At this point you can verify that extraction works:

  1. Run main.py.

  2. Open the dashboard at http://127.0.0.1:8050.

  3. Go to the Data page and import the files from data/setup/.

  4. Verify that all three files are loaded without errors.

Transform

We transform all input data into a single pandas DataFrame that lists the locations, then derive a routes DataFrame from it.

  1. Create the directory src/data_handling/transformers/.

  2. Create transform_create_location_df.py — initialise an empty locations DataFrame:

Code
import pandas as pd
from algomancy_data import Transformer


class TransformCreateLocations(Transformer):
    def __init__(self, location_df_name: str, logger=None) -> None:
        super().__init__(name="Create location df transformer", logger=logger)
        self.location_df_name = location_df_name

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        if data.get(self.location_df_name, None) is None:
            data[self.location_df_name] = pd.DataFrame(columns=['id', 'x', 'y'])
  1. Create one transformer per input source that appends its rows to the locations DataFrame. Each follows the same pattern — rename columns and concatenate:

Code — customer transformer
# transform_customer_to_location.py
import pandas as pd
from algomancy_data import Transformer


class TransformCustomerToLocation(Transformer):
    def __init__(self, location_df_name: str, logger=None) -> None:
        super().__init__(name="Location Transformer", logger=logger)
        self.location_df_name = location_df_name
        self.df_name = 'otherlocations.customer'
        self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        data_df = data.get(self.df_name, None)
        data_df_locations = data.get(self.location_df_name, None)

        if (data_df is not None) and (data_df_locations is not None):
            normalized = (
                data_df
                .rename(columns=self.column_mapping)
                .reindex(columns=data_df_locations.columns)
                .astype(data_df_locations.dtypes.to_dict())
            )
            data[self.location_df_name] = pd.concat(
                [data_df_locations, normalized], ignore_index=True
            )

Create TransformXDockToLocation, TransformDCToLocation, and TransformStoresToLocation in the same way, changing df_name to 'otherlocations.xdock', 'dc', and 'stores' respectively:

Code — remaining source transformers
# transform_xdock_to_location.py
class TransformXDockToLocation(Transformer):
    def __init__(self, location_df_name: str, logger=None) -> None:
        super().__init__(name="Location Transformer", logger=logger)
        self.location_df_name = location_df_name
        self.df_name = 'otherlocations.xdock'
        self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        data_df = data.get(self.df_name, None)
        data_df_locations = data.get(self.location_df_name, None)
        if (data_df is not None) and (data_df_locations is not None):
            normalized = (
                data_df.rename(columns=self.column_mapping)
                .reindex(columns=data_df_locations.columns)
                .astype(data_df_locations.dtypes.to_dict())
            )
            data[self.location_df_name] = pd.concat(
                [data_df_locations, normalized], ignore_index=True
            )


# transform_dc_to_location.py
class TransformDCToLocation(Transformer):
    def __init__(self, location_df_name: str, logger=None) -> None:
        super().__init__(name="Location Transformer", logger=logger)
        self.location_df_name = location_df_name
        self.df_name = 'dc'
        self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        data_df = data.get(self.df_name, None)
        data_df_locations = data.get(self.location_df_name, None)
        if (data_df is not None) and (data_df_locations is not None):
            normalized = (
                data_df.rename(columns=self.column_mapping)
                .reindex(columns=data_df_locations.columns)
                .astype(data_df_locations.dtypes.to_dict())
            )
            data[self.location_df_name] = pd.concat(
                [data_df_locations, normalized], ignore_index=True
            )


# transform_stores_to_location.py
class TransformStoresToLocation(Transformer):
    def __init__(self, location_df_name: str, logger=None) -> None:
        super().__init__(name="Location Transformer", logger=logger)
        self.location_df_name = location_df_name
        self.df_name = 'stores'
        self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        data_df = data.get(self.df_name, None)
        data_df_locations = data.get(self.location_df_name, None)
        if (data_df is not None) and (data_df_locations is not None):
            normalized = (
                data_df.rename(columns=self.column_mapping)
                .reindex(columns=data_df_locations.columns)
                .astype(data_df_locations.dtypes.to_dict())
            )
            data[self.location_df_name] = pd.concat(
                [data_df_locations, normalized], ignore_index=True
            )
  1. Create transform_location_to_routes.py — derive a routes DataFrame as the Cartesian product of all locations, with Euclidean distance as cost:

Code
import pandas as pd
from algomancy_data import Transformer


class TransformLocationToRoutes(Transformer):
    def __init__(self, location_df_name: str, routes_df_name: str, logger=None) -> None:
        super().__init__(name="Transform location to routes", logger=logger)
        self._location_df_name = location_df_name
        self._routes_df_name = routes_df_name

    def transform(self, data: dict[str, pd.DataFrame]) -> None:
        locations = data.get(self._location_df_name, None)

        # Cartesian product with itself
        routes = locations.merge(locations, how='cross', suffixes=('_from', '_to'))
        routes = routes[routes['id_from'] != routes['id_to']]

        routes['distance'] = routes.apply(
            lambda row: (
                (row['x_from'] - row['x_to']) ** 2 + (row['y_from'] - row['y_to']) ** 2
            ) ** 0.5,
            axis=1
        )
        routes['cost'] = routes['distance'] * 1.0

        data[self._routes_df_name] = routes
  1. Register all transformers in etl_factory.py by replacing the placeholder create_transformation_sequence:

Code
def create_transformation_sequence(self) -> TransformationSequence:
    sequence = TransformationSequence()
    location_df_name = 'transform_locations'
    routes_df_name = 'transform_routes'
    sequence.add_transformer(TransformCreateLocations(location_df_name=location_df_name, logger=self.logger))
    sequence.add_transformer(TransformCustomerToLocation(location_df_name=location_df_name, logger=self.logger))
    sequence.add_transformer(TransformXDockToLocation(location_df_name=location_df_name, logger=self.logger))
    sequence.add_transformer(TransformStoresToLocation(location_df_name=location_df_name, logger=self.logger))
    sequence.add_transformer(TransformDCToLocation(location_df_name=location_df_name, logger=self.logger))
    sequence.add_transformer(TransformLocationToRoutes(
        location_df_name=location_df_name,
        routes_df_name=routes_df_name,
        logger=self.logger
    ))
    return sequence
  1. Run main.py, import the data, and verify that transform_locations appears as a combined table.

Load

We build a domain-specific data model from the transformed DataFrames — a network of Location and Route objects managed by a NetworkManager.

Create the directory src/data_handling/data_model/.

Locations

We will use locations in the visualisation part of this tutorial. Create location.py:

Code
class Location:
    def __init__(self, id: str, x: float, y: float):
        self._id = id
        self._x = x
        self._y = y

    @property
    def id(self):
        return self._id

    @property
    def x(self):
        return self._x

    @property
    def y(self):
        return self._y

Routes

We will use routes in the optimisation part of this tutorial. Create route.py:

Code
from data_handling.data_model.location import Location


class Route:
    def __init__(self, from_id: str, to_id: str, cost: float):
        self.route_id = from_id + '_' + to_id
        self._from_id = from_id
        self._to_id = to_id
        self._cost = cost

    @property
    def cost(self):
        return self._cost

    @property
    def from_id(self):
        return self._from_id

    @property
    def to_id(self):
        return self._to_id

Network Manager

Create network_manager.py to manage the set of locations and routes:

Code
from typing import List
from src.data_handling.data_model.location import Location
from src.data_handling.data_model.route import Route


class NetworkManager:
    def __init__(self):
        self._locations: dict[str, Location] = {}
        self._routes: dict[tuple[str, str], Route] = {}
        self._reachable_locations_from_location: dict[str, List[Location]] = {}

    def add_location(self, location: Location):
        self._locations[location.id] = location

    def add_route(self, route: Route):
        from_location, to_location = self.get_route_locations(route)
        if from_location is not None and to_location is not None:
            self._routes[(from_location.id, to_location.id)] = route
            if self._reachable_locations_from_location.get(from_location.id, None) is None:
                self._reachable_locations_from_location[from_location.id] = [to_location]
            else:
                self._reachable_locations_from_location[from_location.id] += [to_location]

    def get_locations(self) -> list[Location]:
        return list(self._locations.values())

    def get_location(self, location_id: str) -> Location:
        return self._locations[location_id]

    def get_route_locations(self, route: Route) -> tuple[Location, Location]:
        return self.get_location(route.from_id), self.get_location(route.to_id)

    def get_routes(self) -> list[Route]:
        return list(self._routes.values())

    def get_route(self, from_location_id: str, to_location_id: str) -> Route:
        return self._routes[(from_location_id, to_location_id)]

    def get_reachable_locations(self, location_id: str) -> List[Location]:
        return self._reachable_locations_from_location[location_id]

Data Model

Create data_model.py as a DataSource subclass so we can attach domain objects to the loaded data:

Code
from datetime import datetime
from typing import List

import pandas as pd
from algomancy_data import DataSource, DataClassification, ValidationMessage
from data_handling.data_model.network_manager import NetworkManager


class DataModel(DataSource):
    def __init__(
            self,
            ds_type: DataClassification,
            name: str = None,
            tables: dict[str, pd.DataFrame] | None = None,
            validation_messages: List[ValidationMessage] = None,
            ds_id: str | None = None,
            creation_datetime: datetime | None = None,
    ):
        super().__init__(
            ds_type=ds_type,
            name=name,
            validation_messages=validation_messages,
            ds_id=ds_id,
            creation_datetime=creation_datetime,
        )

        if tables is not None:
            self.tables = tables

        self._network_manager: NetworkManager | None = None

    def set_network_manager(self, network_manager: NetworkManager):
        self._network_manager = network_manager

    @property
    def network_manager(self):
        return self._network_manager

Loader

Create the directory src/data_handling/loaders/ and add loader.py:

Code
from typing import List
from algomancy_data import Loader, ValidationMessage, DataClassification
import pandas as pd
from data_handling.data_model.data_model import DataModel
from data_handling.data_model.location import Location
from data_handling.data_model.network_manager import NetworkManager
from data_handling.data_model.route import Route


class DataModelLoader(Loader):
    def load(
        self,
        name: str,
        data: dict[str, pd.DataFrame],
        validation_messages: List[ValidationMessage],
        ds_type: DataClassification = DataClassification.MASTER_DATA,
    ) -> DataModel:
        datamodel = DataModel(
            tables=data,
            ds_type=ds_type,
            name=name,
            validation_messages=validation_messages,
        )
        self.load_network_manager(dm=datamodel)
        self.load_locations(dm=datamodel)
        self.load_routes(dm=datamodel)
        return datamodel

    @staticmethod
    def load_network_manager(dm: DataModel):
        dm.set_network_manager(NetworkManager())

    @staticmethod
    def load_locations(dm: DataModel):
        data_locations = dm.get_table("transform_locations")
        nm = dm.network_manager
        for _, row in data_locations.iterrows():
            nm.add_location(Location(id=row["id"], x=row["x"], y=row["y"]))

    @staticmethod
    def load_routes(dm: DataModel):
        data_routes = dm.get_table("transform_routes")
        nm = dm.network_manager
        for _, row in data_routes.iterrows():
            route = Route(from_id=row["id_from"], to_id=row["id_to"], cost=row["cost"])
            from_location, to_location = nm.get_route_locations(route=route)
            if from_location is None or to_location is None:
                continue
            nm.add_route(route=route)

Register the loader in etl_factory.py by replacing DataSourceLoader with DataModelLoader:

def create_loader(self) -> Loader:
    return DataModelLoader(self.logger)

Also update main.py to use DataModel as the data object type:

data_object_type=DataModel,

Next step

All right. The information is loaded in Algomancy. Now it is time to define the algorithm(s).