Source code for algomancy_scenario.scenariomanager

from typing import Dict, List, Optional, TypeVar, Type

from algomancy_data import (
    ETLFactory,
    StatefulDataManager,
    StatelessDataManager,
    BASEDATASOURCE,
    Schema,
)

from algomancy_utils.logger import Logger
from .basealgorithm import ALGORITHM
from algomancy_utils.baseparameterset import BASE_PARAMS_BOUND

from .core_configuration import CoreConfig
from .keyperformanceindicator import BASE_KPI
from .scenario import Scenario
from .scenarioregistry import ScenarioRegistry
from .scenariofactory import ScenarioFactory
from .scenarioprocessor import ScenarioProcessor


[docs] class ScenarioManager: """ Facade that coordinates data management, scenario creation/registry, and processing. """ E = TypeVar("E", bound=ETLFactory)
[docs] @classmethod def from_config(cls, cfg) -> "ScenarioManager": """Build from either a CoreConfig (or subclass like ApiConfiguration) or any wrapper exposing a `.core` CoreConfig (e.g. AppConfig). """ if isinstance(cfg, CoreConfig): core = cfg else: core = getattr(cfg, "core", None) if not isinstance(core, CoreConfig): raise TypeError( "from_config expects a CoreConfig (or subclass) or an object " f"with a `.core` CoreConfig attribute; got {type(cfg).__name__}" ) return cls( etl_factory=core.etl_factory, kpis=core.kpis, algorithms=core.algorithms, schemas=core.schemas, data_object_type=core.data_object_type, data_folder=core.data_path, has_persistent_state=core.has_persistent_state, save_type=core.save_type, autocreate=core.autocreate, default_algo_name=core.default_algo, default_param_values=core.default_algo_params_values, autorun=core.autorun, )
def __init__( self, etl_factory: type[E], kpis: Dict[str, Type[BASE_KPI]], algorithms: Dict[str, Type[ALGORITHM]], schemas: List[Schema], data_object_type: type[BASEDATASOURCE], # for extensions of datasource data_folder: str = None, logger: Logger = None, scenario_save_location: str = "scenarios.json", has_persistent_state: bool = False, save_type: str = "json", # adjusts the format autocreate: bool = False, default_algo_name: str = None, default_param_values: Dict[str, any] = None, autorun: bool = False, data_manager=None, scenario_repository=None, ) -> None: self.logger = logger if logger else Logger() self.scenario_save_location = scenario_save_location self._has_persistent_state = has_persistent_state self._auto_create_scenario = autocreate self._default_algo_name = default_algo_name self._default_param_values = default_param_values assert save_type in ["json"], "Save type must be parquet or json." self._save_type = save_type # Components — prefer injected implementations over auto-constructed ones if data_manager is not None: self._dm = data_manager elif self._has_persistent_state: assert data_folder, ( "Data folder must be specified if data manager has state." ) self._dm = StatefulDataManager( etl_factory=etl_factory, schemas=schemas, data_folder=data_folder, save_type=save_type, data_object_type=data_object_type, logger=self.logger, ) else: self._dm = StatelessDataManager( etl_factory=etl_factory, schemas=schemas, save_type=save_type, logger=self.logger, data_object_type=data_object_type, ) self._registry = ( scenario_repository if scenario_repository is not None else ScenarioRegistry(logger=self.logger) ) self._factory = ScenarioFactory( kpis=kpis, algorithms=algorithms, data_manager=self._dm, logger=self.logger, ) # Wire the post-run callback so DB-backed repositories can persist run results _on_processed = None if hasattr(self._registry, "persist_run"): _on_processed = self._registry.persist_run self._processor = ScenarioProcessor( logger=self.logger, on_processed=_on_processed ) self.toggle_autorun(autorun) # Keep inputs for accessors self._schemas = schemas # Load initial data and scenario history try: self._dm.startup() if hasattr(self._registry, "startup"): self._registry.startup() if self._auto_create_scenario: self.auto_create_scenarios(self._dm.get_data_keys()) except Exception as e: if self.logger: self.logger.error("Error loading initial data.") self.logger.log_traceback(e) if self.logger: self.logger.log("ScenarioManager initialized.") @property def has_persistent_state(self): return self._has_persistent_state # Accessors @property def save_type(self): return self._save_type @property def schemas(self): return self._schemas @property def available_algorithms(self): return self._factory.available_algorithms @property def available_kpis(self): return self._factory.available_kpis @property def auto_run_scenarios(self): return self._processor.auto_run_scenarios @property def currently_processing(self) -> Optional[Scenario]: return self._processor.currently_processing
[docs] def get_algorithm_parameters(self, key) -> BASE_PARAMS_BOUND: return self._factory.algorithms.get(key).initialize_parameters()
# Data operations (delegated)
[docs] def get_data_keys(self) -> List[str]: return self._dm.get_data_keys()
[docs] def get_data(self, data_key): return self._dm.get_data(data_key)
[docs] def set_data(self, data_key, data): self._dm.set_data(data_key, data)
[docs] def derive_data(self, derive_from_key: str, new_data_key: str) -> None: self._dm.derive_data(derive_from_key, new_data_key) if self._auto_create_scenario: self.auto_create_scenarios([new_data_key])
[docs] def delete_data( self, data_key: str, prevent_masterdata_removal: bool = False ) -> None: # prevent delete if used by scenarios assert data_key not in self._registry.used_datasets(), ( "Cannot delete data used in scenarios." ) self._dm.delete_data(data_key, prevent_masterdata_removal)
[docs] def store_data(self, dataset_name: str, data): if isinstance(self._dm, StatefulDataManager): self._dm.store_data(dataset_name, data) else: if self.logger: self.logger.error( "Store data is not supported for stateless data manager. " ) pass
[docs] def toggle_autorun(self, value: bool = None) -> None: if value is None: self._processor.auto_run_scenarios = not self._processor.auto_run_scenarios else: self._processor.auto_run_scenarios = value if self.logger: self.logger.log( f"Auto-run scenarios set to {self._processor.auto_run_scenarios}" )
# Processing operations (delegated)
[docs] def process_scenario_async(self, scenario): self._processor.enqueue(scenario)
[docs] def wait_for_processing(self): self._processor.wait_for_processing()
[docs] def shutdown_processing(self): self._processor.shutdown()
# Scenario creation/registry
[docs] def get_associated_parameters( self, algo_name: str, dataset_key: Optional[str] = None ): return self._factory.get_associated_parameters(algo_name, dataset_key)
[docs] def get_data_parameters(self, dataset_key: str) -> BASE_PARAMS_BOUND: return self.get_data(dataset_key).initialize_data_parameters()
[docs] def create_scenario( self, tag: str, dataset_key: str = "Master data", algo_name: str = "", algo_params=None, data_params=None, ) -> Scenario: if self._registry.has_tag(tag): if self.logger: self.logger.log( f"Scenario with tag '{tag}' already exists. Skipping creation." ) raise ValueError(f"A scenario with tag '{tag}' already exists.") scenario = self._factory.create( tag=tag, dataset_key=dataset_key, algo_name=algo_name, algo_params=algo_params, data_params=data_params, ) self._registry.add(scenario) if self._processor.auto_run_scenarios: self._processor.enqueue(scenario) return scenario
[docs] def get_by_id(self, scenario_id: str) -> Optional[Scenario]: return self._registry.get_by_id(scenario_id)
[docs] def get_by_tag(self, tag: str) -> Optional[Scenario]: return self._registry.get_by_tag(tag)
[docs] def delete_scenario(self, scenario_id: str) -> bool: return self._registry.delete(scenario_id)
[docs] def list_scenarios(self) -> List[Scenario]: return self._registry.list()
[docs] def list_ids(self): return self._registry.list_ids()
[docs] def list_tags(self) -> List[str]: """ Returns a list of all registered scenario tag strings for the current session, by delegating to the underlying ScenarioRegistry """ return self._registry.list_tags()
[docs] def toggle_autocreate( self, value: bool = None, default_algo_name: str = "" ) -> None: if value is None: self._auto_create_scenario = not self._auto_create_scenario self._default_algo_name = ( default_algo_name if self._auto_create_scenario else None ) else: self._auto_create_scenario = value self._default_algo_name = ( default_algo_name if self._auto_create_scenario else None ) if self.logger: self.logger.log( f"Auto-create scenarios set to {self._auto_create_scenario}" )
[docs] def add_datasource_from_json(self, json_string): # Create data source from JSON datasource = self._dm.data_object_type.from_json(json_string) # Add data source to datamanager self._dm.add_data_source(datasource) # create scenario if auto-create is enabled if self._auto_create_scenario: self.auto_create_scenarios([datasource.name])
[docs] def etl_data(self, files, dataset_name: str): """Run ETL for ``dataset_name``; returns the underlying ETLResult. Auto-creates a scenario only on successful loads. """ result = self._dm.etl_data(files, dataset_name) # create scenario if auto-create is enabled and ETL succeeded if result.is_success and self._auto_create_scenario: self.auto_create_scenarios([dataset_name]) return result
[docs] def auto_create_scenarios(self, keys: List[str] = None): for key in keys: self.create_scenario( tag=f"{key} [auto]", dataset_key=key, algo_name=self._default_algo_name, algo_params=self._default_param_values, )
[docs] def get_data_as_json(self, key: str) -> str: return self._dm.get_data(key).to_json()
[docs] def store_data_as_json(self, set_name): if isinstance(self._dm, StatefulDataManager): self._dm.store_data_source_as_json(set_name) else: raise AttributeError( "Stateless data manager does not support internal serialization." )
[docs] def debug_load_data(self, dataset_name: str) -> None: if isinstance(self._dm, StatefulDataManager): self._dm.load_data_from_dir(dataset_name) elif isinstance(self._dm, StatelessDataManager): raise NotImplementedError( "Todo: implement loading for stateless data manager." ) else: raise Exception("Data manager not initialized.")
[docs] def debug_create_and_run_scenario( self, scenario_tag: str, dataset_key: str, algo_name: str, algo_params: Dict[str, any], ) -> Scenario: """ Creates and runs a scenario for debugging purposes. The method uses a factory to create a scenario instance, registers it, enqueues it for processing, and waits for the processing to complete. Returns the fully processed scenario. Parameters: scenario_tag (str): A unique identifier for the scenario being created and run. dataset_key (str): The key for the dataset to be used in the scenario. algo_name (str): The name of the algorithm to be applied in the scenario. algo_params (Dict): Additional parameters for the algorithm. Returns: Scenario: The fully processed scenario created and executed within this method. """ scenario = self._factory.create( tag=scenario_tag, dataset_key=dataset_key, algo_name=algo_name, algo_params=algo_params, ) self._registry.add(scenario) self._processor.enqueue(scenario) self.wait_for_processing() return scenario
[docs] def debug_etl_data(self, dataset_name: str) -> None: """ Debugging utility to run ETL on a directory as if loaded on startup. """ # Retrieve files from directory if isinstance(self._dm, StatefulDataManager): self._dm.load_data_from_dir(dataset_name) else: raise AttributeError( "Stateless data manager does not support internal ETL." )
[docs] def debug_load_serialized_data(self, file_name: str): """ Debugging utility to upload a file as if loaded on startup. """ if isinstance(self._dm, StatefulDataManager): self._dm.load_data_from_file(file_name) else: raise AttributeError( "Stateless data manager does not support internal deserialization." )
[docs] def debug_import_data(self, directory: str) -> None: """ Debugging utility to import data from a directory. """ raise NotImplementedError("todo: write import data method")
[docs] def debug_upload_data(self, file_name: str) -> None: """ Debugging utility to upload data from a file. """ raise NotImplementedError("todo: write upload data method")