Data intake¶
An Algomancy app reads data through an ETL pipeline. The quickstart wizard has already generated the folder structure, schemas, and ETL factory for us. In this section we review the generated files, then write the custom transformation and loading logic for the TSP model.
Review the generated schemas¶
Open src/data_handling/generated_schemas.py. The wizard scanned the three input files and created a Schema subclass for each one, with inferred column names and types:
Code
generated_schemas.py (as generated)¶ 1from typing import Dict
2from algomancy_data import Schema, DataType, FileExtension
3from algomancy_data.schema import SchemaType
4
5
6class DcSchema(Schema):
7 _FILENAME = "dc"
8 _EXTENSION = FileExtension.XLSX
9 _SCHEMA_TYPE = SchemaType.SINGLE
10
11 ID = "ID"
12 X = "x"
13 Y = "y"
14
15 def _defined_datatypes(self) -> Dict[str, DataType]:
16 return {
17 DcSchema.ID: DataType.STRING,
18 DcSchema.X: DataType.INTEGER,
19 DcSchema.Y: DataType.INTEGER,
20 }
21
22
23dc_schema = DcSchema()
24
25
26class OtherlocationsSchema(Schema):
27 _FILENAME = "otherlocations"
28 _EXTENSION = FileExtension.XLSX
29 _SCHEMA_TYPE = SchemaType.MULTI
30
31 ID = "ID"
32 X = "x"
33 Y = "y"
34
35 def _defined_datatypes(self) -> Dict[str, Dict[str, DataType]]:
36 return {
37 "customer": {
38 OtherlocationsSchema.ID: DataType.STRING,
39 OtherlocationsSchema.X: DataType.INTEGER,
40 OtherlocationsSchema.Y: DataType.INTEGER,
41 },
42 "xdock": {
43 OtherlocationsSchema.ID: DataType.STRING,
44 OtherlocationsSchema.X: DataType.INTEGER,
45 OtherlocationsSchema.Y: DataType.INTEGER,
46 },
47 }
48
49
50otherlocations_schema = OtherlocationsSchema()
51
52
53class StoresSchema(Schema):
54 _FILENAME = "stores"
55 _EXTENSION = FileExtension.CSV
56 _SCHEMA_TYPE = SchemaType.SINGLE
57
58 ID = "ID"
59 X = "x"
60 Y = "y"
61
62 def _defined_datatypes(self) -> Dict[str, DataType]:
63 return {
64 StoresSchema.ID: DataType.STRING,
65 StoresSchema.X: DataType.INTEGER,
66 StoresSchema.Y: DataType.INTEGER,
67 }
68
69
70stores_schema = StoresSchema()
71
72
73all_schemas = [
74 dc_schema,
75 otherlocations_schema,
76 stores_schema,
77]
Note
The fields _FILENAME, _EXTENSION, and _SCHEMA_TYPE are required — an exception is raised at construction if any are missing.
Important
A single Schema corresponds to a single file.
When a file contains more than one data table (e.g., multiple Excel sheets), set _SCHEMA_TYPE to MULTI and return a nested dictionary from _defined_datatypes, as done for OtherlocationsSchema above.
The outer-dictionary keys must match the table identifiers (e.g., the sheet names of an xlsx).
Tip
Defining column names as class variables (e.g., ID = "ID") is not strictly necessary, but it makes the code more readable, prevents typos, and lets your IDE assist with autocompletion — especially when column names are long or appear in many places.
Review the generated ETL factory¶
Open src/data_handling/etl_factory.py. The wizard created a TSPETLFactory with extractors already configured for each input file:
Code
etl_factory.py (as generated)¶ 1from typing import Dict
2
3import algomancy_data as de
4from algomancy_data import File
5from algomancy_data.extractor import (
6 ExtractionSequence,
7 CSVSingleExtractor,
8 XLSXMultiExtractor,
9 XLSXSingleExtractor,
10)
11from algomancy_data.transformer import TransformationSequence
12from algomancy_utils import Logger
13
14from src.data_handling.generated_schemas import all_schemas
15from src.data_handling.generated_schemas import dc_schema, otherlocations_schema, stores_schema
16
17
18class TSPETLFactory(de.ETLFactory):
19
20 def __init__(self, schemas, logger: Logger = None):
21 super().__init__(schemas, logger)
22
23 def create_extraction_sequence(self, files: Dict[str, File]) -> ExtractionSequence:
24 sequence = ExtractionSequence(logger=self.logger)
25
26 # Extract dc
27 sequence.add_extractor(
28 XLSXSingleExtractor(
29 file=files["dc"],
30 schema=dc_schema,
31 sheet_name="Sheet1",
32 logger=self.logger,
33 )
34 )
35
36 # Extract otherlocations
37 sequence.add_extractor(
38 XLSXMultiExtractor(
39 file=files["otherlocations"],
40 schema=otherlocations_schema,
41 logger=self.logger,
42 )
43 )
44
45 # Extract stores
46 sequence.add_extractor(
47 CSVSingleExtractor(
48 file=files["stores"],
49 schema=stores_schema,
50 logger=self.logger,
51 separator=",",
52 )
53 )
54
55 return sequence
56
57 def create_transformation_sequence(self) -> TransformationSequence:
58 # TODO: Add transformers to process your data.
59 return TransformationSequence(logger=self.logger)
60
61 def create_validation_sequence(self) -> de.ValidationSequence:
62 vs = de.ValidationSequence(logger=self.logger)
63 vs.add_validator(de.ExtractionSuccessVerification())
64 vs.add_validator(
65 de.SchemaValidator(
66 schemas=self.schemas,
67 severity=de.ValidationSeverity.CRITICAL,
68 )
69 )
70 return vs
71
72 def create_loader(self) -> de.Loader:
73 # TODO: Customize if you need a custom data container.
74 return de.DataSourceLoader(self.logger)
An ETL factory has four responsibilities:
Extract — read the input files as configured by the schemas.
Validate — run validations on the extracted data.
Transform — reshape the extracted DataFrames into the form needed for loading.
Load — build the application data model from the transformed data.
Extraction and validation are already complete. We now need to replace the placeholder create_transformation_sequence and create_loader with TSP-specific implementations.
At this point you can verify that extraction works:
Run
main.py.Open the dashboard at
http://127.0.0.1:8050.Go to the Data page and import the files from
data/setup/.Verify that all three files are loaded without errors.
Transform¶
We transform all input data into a single pandas DataFrame that lists the locations, then derive a routes DataFrame from it.
Create the directory
src/data_handling/transformers/.Create
transform_create_location_df.py— initialise an empty locations DataFrame:
Code
import pandas as pd
from algomancy_data import Transformer
class TransformCreateLocations(Transformer):
def __init__(self, location_df_name: str, logger=None) -> None:
super().__init__(name="Create location df transformer", logger=logger)
self.location_df_name = location_df_name
def transform(self, data: dict[str, pd.DataFrame]) -> None:
if data.get(self.location_df_name, None) is None:
data[self.location_df_name] = pd.DataFrame(columns=['id', 'x', 'y'])
Create one transformer per input source that appends its rows to the locations DataFrame. Each follows the same pattern — rename columns and concatenate:
Code — customer transformer
# transform_customer_to_location.py
import pandas as pd
from algomancy_data import Transformer
class TransformCustomerToLocation(Transformer):
def __init__(self, location_df_name: str, logger=None) -> None:
super().__init__(name="Location Transformer", logger=logger)
self.location_df_name = location_df_name
self.df_name = 'otherlocations.customer'
self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}
def transform(self, data: dict[str, pd.DataFrame]) -> None:
data_df = data.get(self.df_name, None)
data_df_locations = data.get(self.location_df_name, None)
if (data_df is not None) and (data_df_locations is not None):
normalized = (
data_df
.rename(columns=self.column_mapping)
.reindex(columns=data_df_locations.columns)
.astype(data_df_locations.dtypes.to_dict())
)
data[self.location_df_name] = pd.concat(
[data_df_locations, normalized], ignore_index=True
)
Create TransformXDockToLocation, TransformDCToLocation, and TransformStoresToLocation in the same way,
changing df_name to 'otherlocations.xdock', 'dc', and 'stores' respectively:
Code — remaining source transformers
# transform_xdock_to_location.py
class TransformXDockToLocation(Transformer):
def __init__(self, location_df_name: str, logger=None) -> None:
super().__init__(name="Location Transformer", logger=logger)
self.location_df_name = location_df_name
self.df_name = 'otherlocations.xdock'
self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}
def transform(self, data: dict[str, pd.DataFrame]) -> None:
data_df = data.get(self.df_name, None)
data_df_locations = data.get(self.location_df_name, None)
if (data_df is not None) and (data_df_locations is not None):
normalized = (
data_df.rename(columns=self.column_mapping)
.reindex(columns=data_df_locations.columns)
.astype(data_df_locations.dtypes.to_dict())
)
data[self.location_df_name] = pd.concat(
[data_df_locations, normalized], ignore_index=True
)
# transform_dc_to_location.py
class TransformDCToLocation(Transformer):
def __init__(self, location_df_name: str, logger=None) -> None:
super().__init__(name="Location Transformer", logger=logger)
self.location_df_name = location_df_name
self.df_name = 'dc'
self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}
def transform(self, data: dict[str, pd.DataFrame]) -> None:
data_df = data.get(self.df_name, None)
data_df_locations = data.get(self.location_df_name, None)
if (data_df is not None) and (data_df_locations is not None):
normalized = (
data_df.rename(columns=self.column_mapping)
.reindex(columns=data_df_locations.columns)
.astype(data_df_locations.dtypes.to_dict())
)
data[self.location_df_name] = pd.concat(
[data_df_locations, normalized], ignore_index=True
)
# transform_stores_to_location.py
class TransformStoresToLocation(Transformer):
def __init__(self, location_df_name: str, logger=None) -> None:
super().__init__(name="Location Transformer", logger=logger)
self.location_df_name = location_df_name
self.df_name = 'stores'
self.column_mapping = {'ID': 'id', 'x': 'x', 'y': 'y'}
def transform(self, data: dict[str, pd.DataFrame]) -> None:
data_df = data.get(self.df_name, None)
data_df_locations = data.get(self.location_df_name, None)
if (data_df is not None) and (data_df_locations is not None):
normalized = (
data_df.rename(columns=self.column_mapping)
.reindex(columns=data_df_locations.columns)
.astype(data_df_locations.dtypes.to_dict())
)
data[self.location_df_name] = pd.concat(
[data_df_locations, normalized], ignore_index=True
)
Create
transform_location_to_routes.py— derive a routes DataFrame as the Cartesian product of all locations, with Euclidean distance as cost:
Code
import pandas as pd
from algomancy_data import Transformer
class TransformLocationToRoutes(Transformer):
def __init__(self, location_df_name: str, routes_df_name: str, logger=None) -> None:
super().__init__(name="Transform location to routes", logger=logger)
self._location_df_name = location_df_name
self._routes_df_name = routes_df_name
def transform(self, data: dict[str, pd.DataFrame]) -> None:
locations = data.get(self._location_df_name, None)
# Cartesian product with itself
routes = locations.merge(locations, how='cross', suffixes=('_from', '_to'))
routes = routes[routes['id_from'] != routes['id_to']]
routes['distance'] = routes.apply(
lambda row: (
(row['x_from'] - row['x_to']) ** 2 + (row['y_from'] - row['y_to']) ** 2
) ** 0.5,
axis=1
)
routes['cost'] = routes['distance'] * 1.0
data[self._routes_df_name] = routes
Register all transformers in
etl_factory.pyby replacing the placeholdercreate_transformation_sequence:
Code
def create_transformation_sequence(self) -> TransformationSequence:
sequence = TransformationSequence()
location_df_name = 'transform_locations'
routes_df_name = 'transform_routes'
sequence.add_transformer(TransformCreateLocations(location_df_name=location_df_name, logger=self.logger))
sequence.add_transformer(TransformCustomerToLocation(location_df_name=location_df_name, logger=self.logger))
sequence.add_transformer(TransformXDockToLocation(location_df_name=location_df_name, logger=self.logger))
sequence.add_transformer(TransformStoresToLocation(location_df_name=location_df_name, logger=self.logger))
sequence.add_transformer(TransformDCToLocation(location_df_name=location_df_name, logger=self.logger))
sequence.add_transformer(TransformLocationToRoutes(
location_df_name=location_df_name,
routes_df_name=routes_df_name,
logger=self.logger
))
return sequence
Run
main.py, import the data, and verify thattransform_locationsappears as a combined table.
Load¶
We build a domain-specific data model from the transformed DataFrames — a network of Location and Route objects managed by a NetworkManager.
Create the directory src/data_handling/data_model/.
Locations¶
We will use locations in the visualisation part of this tutorial. Create location.py:
Code
class Location:
def __init__(self, id: str, x: float, y: float):
self._id = id
self._x = x
self._y = y
@property
def id(self):
return self._id
@property
def x(self):
return self._x
@property
def y(self):
return self._y
Routes¶
We will use routes in the optimisation part of this tutorial. Create route.py:
Code
from data_handling.data_model.location import Location
class Route:
def __init__(self, from_id: str, to_id: str, cost: float):
self.route_id = from_id + '_' + to_id
self._from_id = from_id
self._to_id = to_id
self._cost = cost
@property
def cost(self):
return self._cost
@property
def from_id(self):
return self._from_id
@property
def to_id(self):
return self._to_id
Network Manager¶
Create network_manager.py to manage the set of locations and routes:
Code
from typing import List
from src.data_handling.data_model.location import Location
from src.data_handling.data_model.route import Route
class NetworkManager:
def __init__(self):
self._locations: dict[str, Location] = {}
self._routes: dict[tuple[str, str], Route] = {}
self._reachable_locations_from_location: dict[str, List[Location]] = {}
def add_location(self, location: Location):
self._locations[location.id] = location
def add_route(self, route: Route):
from_location, to_location = self.get_route_locations(route)
if from_location is not None and to_location is not None:
self._routes[(from_location.id, to_location.id)] = route
if self._reachable_locations_from_location.get(from_location.id, None) is None:
self._reachable_locations_from_location[from_location.id] = [to_location]
else:
self._reachable_locations_from_location[from_location.id] += [to_location]
def get_locations(self) -> list[Location]:
return list(self._locations.values())
def get_location(self, location_id: str) -> Location:
return self._locations[location_id]
def get_route_locations(self, route: Route) -> tuple[Location, Location]:
return self.get_location(route.from_id), self.get_location(route.to_id)
def get_routes(self) -> list[Route]:
return list(self._routes.values())
def get_route(self, from_location_id: str, to_location_id: str) -> Route:
return self._routes[(from_location_id, to_location_id)]
def get_reachable_locations(self, location_id: str) -> List[Location]:
return self._reachable_locations_from_location[location_id]
Data Model¶
Create data_model.py as a DataSource subclass so we can attach domain objects to the loaded data:
Code
from datetime import datetime
from typing import List
import pandas as pd
from algomancy_data import DataSource, DataClassification, ValidationMessage
from data_handling.data_model.network_manager import NetworkManager
class DataModel(DataSource):
def __init__(
self,
ds_type: DataClassification,
name: str = None,
tables: dict[str, pd.DataFrame] | None = None,
validation_messages: List[ValidationMessage] = None,
ds_id: str | None = None,
creation_datetime: datetime | None = None,
):
super().__init__(
ds_type=ds_type,
name=name,
validation_messages=validation_messages,
ds_id=ds_id,
creation_datetime=creation_datetime,
)
if tables is not None:
self.tables = tables
self._network_manager: NetworkManager | None = None
def set_network_manager(self, network_manager: NetworkManager):
self._network_manager = network_manager
@property
def network_manager(self):
return self._network_manager
Loader¶
Create the directory src/data_handling/loaders/ and add loader.py:
Code
from typing import List
from algomancy_data import Loader, ValidationMessage, DataClassification
import pandas as pd
from data_handling.data_model.data_model import DataModel
from data_handling.data_model.location import Location
from data_handling.data_model.network_manager import NetworkManager
from data_handling.data_model.route import Route
class DataModelLoader(Loader):
def load(
self,
name: str,
data: dict[str, pd.DataFrame],
validation_messages: List[ValidationMessage],
ds_type: DataClassification = DataClassification.MASTER_DATA,
) -> DataModel:
datamodel = DataModel(
tables=data,
ds_type=ds_type,
name=name,
validation_messages=validation_messages,
)
self.load_network_manager(dm=datamodel)
self.load_locations(dm=datamodel)
self.load_routes(dm=datamodel)
return datamodel
@staticmethod
def load_network_manager(dm: DataModel):
dm.set_network_manager(NetworkManager())
@staticmethod
def load_locations(dm: DataModel):
data_locations = dm.get_table("transform_locations")
nm = dm.network_manager
for _, row in data_locations.iterrows():
nm.add_location(Location(id=row["id"], x=row["x"], y=row["y"]))
@staticmethod
def load_routes(dm: DataModel):
data_routes = dm.get_table("transform_routes")
nm = dm.network_manager
for _, row in data_routes.iterrows():
route = Route(from_id=row["id_from"], to_id=row["id_to"], cost=row["cost"])
from_location, to_location = nm.get_route_locations(route=route)
if from_location is None or to_location is None:
continue
nm.add_route(route=route)
Register the loader in etl_factory.py by replacing DataSourceLoader with DataModelLoader:
def create_loader(self) -> Loader:
return DataModelLoader(self.logger)
Also update main.py to use DataModel as the data object type:
data_object_type=DataModel,
Next step¶
All right. The information is loaded in Algomancy. Now it is time to define the algorithm(s).