from typing import Dict, List, Optional, TypeVar, Type
from algomancy_data import (
ETLFactory,
StatefulDataManager,
StatelessDataManager,
BASEDATASOURCE,
Schema,
)
from algomancy_utils.logger import Logger
from .basealgorithm import ALGORITHM
from algomancy_utils.baseparameterset import BASE_PARAMS_BOUND
from .core_configuration import CoreConfig
from .keyperformanceindicator import BASE_KPI
from .scenario import Scenario
from .scenarioregistry import ScenarioRegistry
from .scenariofactory import ScenarioFactory
from .scenarioprocessor import ScenarioProcessor
[docs]
class ScenarioManager:
"""
Facade that coordinates data management, scenario creation/registry, and processing.
"""
E = TypeVar("E", bound=ETLFactory)
[docs]
@classmethod
def from_config(cls, cfg) -> "ScenarioManager":
"""Build from either a CoreConfig (or subclass like ApiConfiguration)
or any wrapper exposing a `.core` CoreConfig (e.g. AppConfig).
"""
if isinstance(cfg, CoreConfig):
core = cfg
else:
core = getattr(cfg, "core", None)
if not isinstance(core, CoreConfig):
raise TypeError(
"from_config expects a CoreConfig (or subclass) or an object "
f"with a `.core` CoreConfig attribute; got {type(cfg).__name__}"
)
return cls(
etl_factory=core.etl_factory,
kpis=core.kpis,
algorithms=core.algorithms,
schemas=core.schemas,
data_object_type=core.data_object_type,
data_folder=core.data_path,
has_persistent_state=core.has_persistent_state,
save_type=core.save_type,
autocreate=core.autocreate,
default_algo_name=core.default_algo,
default_param_values=core.default_algo_params_values,
autorun=core.autorun,
)
def __init__(
self,
etl_factory: type[E],
kpis: Dict[str, Type[BASE_KPI]],
algorithms: Dict[str, Type[ALGORITHM]],
schemas: List[Schema],
data_object_type: type[BASEDATASOURCE], # for extensions of datasource
data_folder: str = None,
logger: Logger = None,
scenario_save_location: str = "scenarios.json",
has_persistent_state: bool = False,
save_type: str = "json", # adjusts the format
autocreate: bool = False,
default_algo_name: str = None,
default_param_values: Dict[str, any] = None,
autorun: bool = False,
data_manager=None,
scenario_repository=None,
) -> None:
self.logger = logger if logger else Logger()
self.scenario_save_location = scenario_save_location
self._has_persistent_state = has_persistent_state
self._auto_create_scenario = autocreate
self._default_algo_name = default_algo_name
self._default_param_values = default_param_values
assert save_type in ["json"], "Save type must be parquet or json."
self._save_type = save_type
# Components — prefer injected implementations over auto-constructed ones
if data_manager is not None:
self._dm = data_manager
elif self._has_persistent_state:
assert data_folder, (
"Data folder must be specified if data manager has state."
)
self._dm = StatefulDataManager(
etl_factory=etl_factory,
schemas=schemas,
data_folder=data_folder,
save_type=save_type,
data_object_type=data_object_type,
logger=self.logger,
)
else:
self._dm = StatelessDataManager(
etl_factory=etl_factory,
schemas=schemas,
save_type=save_type,
logger=self.logger,
data_object_type=data_object_type,
)
self._registry = (
scenario_repository
if scenario_repository is not None
else ScenarioRegistry(logger=self.logger)
)
self._factory = ScenarioFactory(
kpis=kpis,
algorithms=algorithms,
data_manager=self._dm,
logger=self.logger,
)
# Wire the post-run callback so DB-backed repositories can persist run results
_on_processed = None
if hasattr(self._registry, "persist_run"):
_on_processed = self._registry.persist_run
self._processor = ScenarioProcessor(
logger=self.logger, on_processed=_on_processed
)
self.toggle_autorun(autorun)
# Keep inputs for accessors
self._schemas = schemas
# Load initial data and scenario history
try:
self._dm.startup()
if hasattr(self._registry, "startup"):
self._registry.startup()
if self._auto_create_scenario:
self.auto_create_scenarios(self._dm.get_data_keys())
except Exception as e:
if self.logger:
self.logger.error("Error loading initial data.")
self.logger.log_traceback(e)
if self.logger:
self.logger.log("ScenarioManager initialized.")
@property
def has_persistent_state(self):
return self._has_persistent_state
# Accessors
@property
def save_type(self):
return self._save_type
@property
def schemas(self):
return self._schemas
@property
def available_algorithms(self):
return self._factory.available_algorithms
@property
def available_kpis(self):
return self._factory.available_kpis
@property
def auto_run_scenarios(self):
return self._processor.auto_run_scenarios
@property
def currently_processing(self) -> Optional[Scenario]:
return self._processor.currently_processing
[docs]
def get_algorithm_parameters(self, key) -> BASE_PARAMS_BOUND:
return self._factory.algorithms.get(key).initialize_parameters()
# Data operations (delegated)
[docs]
def get_data_keys(self) -> List[str]:
return self._dm.get_data_keys()
[docs]
def get_data(self, data_key):
return self._dm.get_data(data_key)
[docs]
def set_data(self, data_key, data):
self._dm.set_data(data_key, data)
[docs]
def derive_data(self, derive_from_key: str, new_data_key: str) -> None:
self._dm.derive_data(derive_from_key, new_data_key)
if self._auto_create_scenario:
self.auto_create_scenarios([new_data_key])
[docs]
def delete_data(
self, data_key: str, prevent_masterdata_removal: bool = False
) -> None:
# prevent delete if used by scenarios
assert data_key not in self._registry.used_datasets(), (
"Cannot delete data used in scenarios."
)
self._dm.delete_data(data_key, prevent_masterdata_removal)
[docs]
def store_data(self, dataset_name: str, data):
if isinstance(self._dm, StatefulDataManager):
self._dm.store_data(dataset_name, data)
else:
if self.logger:
self.logger.error(
"Store data is not supported for stateless data manager. "
)
pass
[docs]
def toggle_autorun(self, value: bool = None) -> None:
if value is None:
self._processor.auto_run_scenarios = not self._processor.auto_run_scenarios
else:
self._processor.auto_run_scenarios = value
if self.logger:
self.logger.log(
f"Auto-run scenarios set to {self._processor.auto_run_scenarios}"
)
# Processing operations (delegated)
[docs]
def process_scenario_async(self, scenario):
self._processor.enqueue(scenario)
[docs]
def wait_for_processing(self):
self._processor.wait_for_processing()
[docs]
def shutdown_processing(self):
self._processor.shutdown()
# Scenario creation/registry
[docs]
def get_associated_parameters(
self, algo_name: str, dataset_key: Optional[str] = None
):
return self._factory.get_associated_parameters(algo_name, dataset_key)
[docs]
def get_data_parameters(self, dataset_key: str) -> BASE_PARAMS_BOUND:
return self.get_data(dataset_key).initialize_data_parameters()
[docs]
def create_scenario(
self,
tag: str,
dataset_key: str = "Master data",
algo_name: str = "",
algo_params=None,
data_params=None,
) -> Scenario:
if self._registry.has_tag(tag):
if self.logger:
self.logger.log(
f"Scenario with tag '{tag}' already exists. Skipping creation."
)
raise ValueError(f"A scenario with tag '{tag}' already exists.")
scenario = self._factory.create(
tag=tag,
dataset_key=dataset_key,
algo_name=algo_name,
algo_params=algo_params,
data_params=data_params,
)
self._registry.add(scenario)
if self._processor.auto_run_scenarios:
self._processor.enqueue(scenario)
return scenario
[docs]
def get_by_id(self, scenario_id: str) -> Optional[Scenario]:
return self._registry.get_by_id(scenario_id)
[docs]
def get_by_tag(self, tag: str) -> Optional[Scenario]:
return self._registry.get_by_tag(tag)
[docs]
def delete_scenario(self, scenario_id: str) -> bool:
return self._registry.delete(scenario_id)
[docs]
def list_scenarios(self) -> List[Scenario]:
return self._registry.list()
[docs]
def list_ids(self):
return self._registry.list_ids()
[docs]
def toggle_autocreate(
self, value: bool = None, default_algo_name: str = ""
) -> None:
if value is None:
self._auto_create_scenario = not self._auto_create_scenario
self._default_algo_name = (
default_algo_name if self._auto_create_scenario else None
)
else:
self._auto_create_scenario = value
self._default_algo_name = (
default_algo_name if self._auto_create_scenario else None
)
if self.logger:
self.logger.log(
f"Auto-create scenarios set to {self._auto_create_scenario}"
)
[docs]
def add_datasource_from_json(self, json_string):
# Create data source from JSON
datasource = self._dm.data_object_type.from_json(json_string)
# Add data source to datamanager
self._dm.add_data_source(datasource)
# create scenario if auto-create is enabled
if self._auto_create_scenario:
self.auto_create_scenarios([datasource.name])
[docs]
def etl_data(self, files, dataset_name: str):
"""Run ETL for ``dataset_name``; returns the underlying ETLResult.
Auto-creates a scenario only on successful loads.
"""
result = self._dm.etl_data(files, dataset_name)
# create scenario if auto-create is enabled and ETL succeeded
if result.is_success and self._auto_create_scenario:
self.auto_create_scenarios([dataset_name])
return result
[docs]
def auto_create_scenarios(self, keys: List[str] = None):
for key in keys:
self.create_scenario(
tag=f"{key} [auto]",
dataset_key=key,
algo_name=self._default_algo_name,
algo_params=self._default_param_values,
)
[docs]
def get_data_as_json(self, key: str) -> str:
return self._dm.get_data(key).to_json()
[docs]
def store_data_as_json(self, set_name):
if isinstance(self._dm, StatefulDataManager):
self._dm.store_data_source_as_json(set_name)
else:
raise AttributeError(
"Stateless data manager does not support internal serialization."
)
[docs]
def debug_load_data(self, dataset_name: str) -> None:
if isinstance(self._dm, StatefulDataManager):
self._dm.load_data_from_dir(dataset_name)
elif isinstance(self._dm, StatelessDataManager):
raise NotImplementedError(
"Todo: implement loading for stateless data manager."
)
else:
raise Exception("Data manager not initialized.")
[docs]
def debug_create_and_run_scenario(
self,
scenario_tag: str,
dataset_key: str,
algo_name: str,
algo_params: Dict[str, any],
) -> Scenario:
"""
Creates and runs a scenario for debugging purposes. The method uses a factory to create a
scenario instance, registers it, enqueues it for processing, and waits for the processing to
complete. Returns the fully processed scenario.
Parameters:
scenario_tag (str): A unique identifier for the scenario being created and run.
dataset_key (str): The key for the dataset to be used in the scenario.
algo_name (str): The name of the algorithm to be applied in the scenario.
algo_params (Dict): Additional parameters for the algorithm.
Returns:
Scenario: The fully processed scenario created and executed within this method.
"""
scenario = self._factory.create(
tag=scenario_tag,
dataset_key=dataset_key,
algo_name=algo_name,
algo_params=algo_params,
)
self._registry.add(scenario)
self._processor.enqueue(scenario)
self.wait_for_processing()
return scenario
[docs]
def debug_etl_data(self, dataset_name: str) -> None:
"""
Debugging utility to run ETL on a directory as if loaded on startup.
"""
# Retrieve files from directory
if isinstance(self._dm, StatefulDataManager):
self._dm.load_data_from_dir(dataset_name)
else:
raise AttributeError(
"Stateless data manager does not support internal ETL."
)
[docs]
def debug_load_serialized_data(self, file_name: str):
"""
Debugging utility to upload a file as if loaded on startup.
"""
if isinstance(self._dm, StatefulDataManager):
self._dm.load_data_from_file(file_name)
else:
raise AttributeError(
"Stateless data manager does not support internal deserialization."
)
[docs]
def debug_import_data(self, directory: str) -> None:
"""
Debugging utility to import data from a directory.
"""
raise NotImplementedError("todo: write import data method")
[docs]
def debug_upload_data(self, file_name: str) -> None:
"""
Debugging utility to upload data from a file.
"""
raise NotImplementedError("todo: write upload data method")