Coverage for agentlib_flexquant/data_structures/flex_results.py: 97%
211 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-06-17 09:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-06-17 09:09 +0000
1"""
2Module for generating and managing results dataframes for flexibility analysis.
3Results include baseline, positive and negative flexibility data,
4the indicator, market and simulator results/data.
5"""
6import copy
7import json
8from pathlib import Path
9from typing import Any, Dict, Optional, Type, Union
11import pandas as pd
12from pydantic import FilePath
13from agentlib.core.agent import AgentConfig
14from agentlib.modules.simulation.simulator import SimulatorConfig
15from agentlib.utils import load_config
16from agentlib_mpc.modules.mpc.mpc import BaseMPCConfig
17from agentlib_mpc.utils import TimeConversionTypes
18from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats, load_sim
20import agentlib_flexquant.utils.config_management as cmng
21from agentlib_flexquant.utils.config_management import ModuleHandler
22from agentlib_flexquant.data_structures.flexquant import (
23 FlexQuantConfig,
24 FlexibilityMarketConfig,
25 FlexibilityIndicatorConfig
26)
27from agentlib_flexquant.data_structures.mpcs import (
28 BaselineMPCData,
29 NFMPCData,
30 PFMPCData,
31)
32from agentlib_flexquant.modules.flexibility_indicator import (
33 FlexibilityIndicatorModuleConfig,
34)
35from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
36from agentlib_flexquant.utils.data_handling import convert_timescale_of_index
39def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame:
40 """Load the flexibility indicator results from the given file path.
42 Args:
43 file_path: the file path of the indicator results file
45 Returns:
46 DataFrame containing the indicator results
48 """
49 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
50 return df
53def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame:
54 """Load the market results from the given file path.
56 Args:
57 file_path: the file path of the market results file
59 Returns:
60 DataFrame containing the market results
62 """
63 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
64 return df
67class Results:
68 """
69 Loads the results for the baseline, positive and negative flexibility,
70 the indicator, market and simulator results/data. Additionally the MPC stats
71 are loaded.
73 Results can be loaded either from a user-specified custom base path or from the
74 (default) base path specified in the flex config.
76 Loaded results are stored in pandas DataFrames which can be used for further
77 processing, e.g. plotting and analysis.
78 """
80 # Configs:
81 # Generator
82 flex_config: FlexQuantConfig
83 # Agents
84 simulator_agent_config: Optional[AgentConfig]
85 baseline_agent_config: AgentConfig
86 pos_flex_agent_config: AgentConfig
87 neg_flex_agent_config: AgentConfig
88 indicator_agent_config: AgentConfig
89 market_agent_config: AgentConfig
90 # Modules
91 simulator_module_config: SimulatorConfig
92 baseline_module_config: BaseMPCConfig
93 pos_flex_module_config: BaseMPCConfig
94 neg_flex_module_config: BaseMPCConfig
95 indicator_module_config: FlexibilityIndicatorModuleConfig
96 market_module_config: FlexibilityMarketModuleConfig
98 # Dataframes
99 df_simulation: pd.DataFrame
100 df_baseline: pd.DataFrame
101 df_pos_flex: pd.DataFrame
102 df_neg_flex: pd.DataFrame
103 df_indicator: pd.DataFrame
104 df_market: pd.DataFrame
106 # Stats of the MPCs
107 df_baseline_stats: pd.DataFrame
108 df_pos_flex_stats: pd.DataFrame
109 df_neg_flex_stats: pd.DataFrame
111 # time conversion
112 current_timescale_of_data: TimeConversionTypes = "seconds"
114 def __init__(
115 self,
116 flex_config: Optional[Union[str, FilePath, dict]],
117 simulator_agent_config: Optional[Union[str, FilePath, dict]],
118 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
119 results: Optional[
120 Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]
121 ] = None,
122 to_timescale: TimeConversionTypes = "seconds",
123 ):
124 # Already a Results instance — copy over its data
125 if isinstance(results, Results):
126 self.__dict__ = copy.deepcopy(results).__dict__
127 return
129 # Load flex config
130 self._load_flex_config(flex_config, generated_flex_files_base_path)
131 # Get filenames of configs to load agents and modules
132 self._get_config_filenames()
133 # Load configs for mpc, indicator, market
134 self._load_agent_module_configs()
135 # Load sim configs if present
136 self._load_simulator_config(simulator_agent_config)
137 # Load results and get a dict for generating dataframes
138 results_dict, results_path = self._load_results(results)
139 # Get dataframes for mpc, sim, flex indicator results
140 self._load_results_dataframes(results_dict)
141 # Get dataframes for mpc stats
142 self._load_stats_dataframes(results_path)
143 # Convert the time in the dataframes to the desired timescale
144 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale)
146 # Clear unpicklable model reference to enable multiprocessing
147 self._clear_unpicklable_references()
149 def _clear_unpicklable_references(self):
150 """Remove references to objects that cannot be pickled.
152 This enables the Results object to be used with multiprocessing.
153 The model field contains CDLL references that cannot be serialized.
154 """
155 if (hasattr(self, 'simulator_module_config') and
156 self.simulator_module_config is not None):
157 if hasattr(self.simulator_module_config, 'model'):
158 object.__setattr__(self.simulator_module_config, 'model', None)
160 def _load_flex_config(
161 self,
162 flex_config: Optional[Union[str, FilePath, dict]],
163 custom_base_path: Optional[Union[str, FilePath]],
164 ):
165 """Load the flex config and optionally override the base directory path.
167 If a custom base path is provided, it overwrites the "flex_base_directory_path"
168 in the given config. This is useful when the generated flex files are saved
169 to a custom directory instead of the default (current working directory).
171 Args:
172 flex_config: The config for flexibility quantification.
173 custom_base_path: The custom directory for saving the generated flex files
174 defined by user.
176 """
177 if custom_base_path is not None:
178 if isinstance(flex_config, (str, Path)):
179 with open(flex_config, "r", encoding="utf-8") as f:
180 flex_config = json.load(f)
181 flex_config["flex_base_directory_path"] = str(custom_base_path)
183 self.flex_config = load_config.load_config(
184 config=flex_config, config_type=FlexQuantConfig
185 )
187 if isinstance(self.flex_config.market_config, Path):
188 self.flex_config.market_config = load_config.load_config(
189 self.flex_config.market_config, config_type=FlexibilityMarketConfig
190 )
192 def _get_config_filenames(self):
193 """Get filenames of configs to load agents and modules."""
194 self.config_filename_baseline = BaselineMPCData.model_validate(
195 self.flex_config.baseline_config_generator_data
196 ).name_of_created_file
197 self.config_filename_pos_flex = PFMPCData.model_validate(
198 self.flex_config.shadow_mpc_config_generator_data.pos_flex
199 ).name_of_created_file
200 self.config_filename_neg_flex = NFMPCData.model_validate(
201 self.flex_config.shadow_mpc_config_generator_data.neg_flex
202 ).name_of_created_file
204 self.config_filename_indicator = FlexibilityIndicatorConfig.model_validate(
205 self.flex_config.indicator_config).name_of_created_file
207 if self.flex_config.market_config:
208 if isinstance(self.flex_config.market_config, Union[str, Path]):
209 self.config_filename_market = load_config.load_config(
210 config=self.flex_config.market_config,
211 config_type=FlexibilityMarketConfig
212 ).name_of_created_file
213 else: # is dict
214 self.config_filename_market = FlexibilityMarketConfig.model_validate(
215 self.flex_config.market_config).name_of_created_file
217 def _load_agent_module_configs(self):
218 """Load agent and module configs."""
219 files_found = []
220 for file_path in Path(self.flex_config.flex_files_directory).rglob(
221 "*.json"
222 ):
223 module_handler = ModuleHandler(extra_plugins=self.flex_config.custom_plugins)
224 if file_path.name in self.config_filename_baseline:
225 self.baseline_agent_config = load_config.load_config(
226 config=file_path, config_type=AgentConfig
227 )
228 self.baseline_module_config = module_handler.get_module(
229 config=self.baseline_agent_config,
230 module_type=
231 self._get_flexquant_mpc_module_type(self.baseline_agent_config),
232 )
233 files_found.append(self.config_filename_baseline)
235 elif file_path.name in self.config_filename_pos_flex:
236 self.pos_flex_agent_config = load_config.load_config(
237 config=file_path, config_type=AgentConfig
238 )
239 self.pos_flex_module_config = module_handler.get_module(
240 config=self.pos_flex_agent_config,
241 module_type=
242 self._get_flexquant_mpc_module_type(self.pos_flex_agent_config),
243 )
244 files_found.append(self.config_filename_pos_flex)
246 elif file_path.name in self.config_filename_neg_flex:
247 self.neg_flex_agent_config = load_config.load_config(
248 config=file_path, config_type=AgentConfig
249 )
250 self.neg_flex_module_config = module_handler.get_module(
251 config=self.neg_flex_agent_config,
252 module_type=
253 self._get_flexquant_mpc_module_type(self.neg_flex_agent_config),
254 )
255 files_found.append(self.config_filename_neg_flex)
257 elif file_path.name in self.config_filename_indicator:
258 self.indicator_agent_config = load_config.load_config(
259 config=file_path, config_type=AgentConfig
260 )
261 self.indicator_module_config = module_handler.get_module(
262 config=self.indicator_agent_config,
263 module_type=self.flex_config.indicator_config.module_type,
264 )
265 files_found.append(self.config_filename_indicator)
267 elif (
268 self.flex_config.market_config
269 and file_path.name in self.config_filename_market
270 ):
271 self.market_agent_config = load_config.load_config(
272 config=file_path, config_type=AgentConfig
273 )
274 self.market_module_config = module_handler.get_module(
275 config=self.market_agent_config, module_type=self.flex_config.market_config.module_type
276 )
277 files_found.append(self.config_filename_market)
278 files_needed = [self.config_filename_baseline,
279 self.config_filename_pos_flex, self.config_filename_neg_flex,
280 self.config_filename_indicator]
281 if self.flex_config.market_config:
282 files_needed.append(self.config_filename_market)
283 difference = list(set(files_needed) - set(files_found))
284 if difference:
285 import warnings
286 warnings.warn(f"The files {difference} have not been found in the "
287 f"given Path. This will most likely cause problems "
288 f"later on. Please check the filenames.")
290 def _load_simulator_config(self, simulator_agent_config):
291 """Load simulator agent and module config separately.
293 Separate loading is required to skip pydantic validation for specific field(s).
295 """
296 # check config type: with results path adaptation -> dict; without -> str/Path
297 if not simulator_agent_config:
298 self.simulator_agent_config = None
299 return
300 if isinstance(simulator_agent_config, (str, Path)):
301 with open(simulator_agent_config, "r", encoding="utf-8") as f:
302 sim_config = json.load(f)
303 else: # is dict
304 sim_config = simulator_agent_config
305 sim_module_config = next(
306 (
307 module
308 for module in sim_config["modules"]
309 if module["type"] == "simulator"
310 ),
311 None,
312 )
313 # instantiate and validate sim agent config
314 self.simulator_agent_config = AgentConfig.model_validate(sim_config)
315 # instantiate sim module config by skipping validation for result_filename
316 # to prevent file deletion, if overwrite_result_file in sim config is true
317 self.simulator_module_config = (
318 self.create_simulator_config_with_skipped_validation(
319 sim_config_class=SimulatorConfig,
320 sim_config=sim_module_config,
321 skip_fields=["result_filename"],
322 ))
324 def _get_flexquant_mpc_module_type(self, agent_config: AgentConfig) -> str:
325 """Get the mpc module type from agent_config.
327 The module type is defined in agentlib_flexquant.
329 Args:
330 agent_config: the AgentConfig containing the mpc module
332 Returns:
333 The type of the mpc module
335 """
336 for module in agent_config.modules:
337 if module['type'] in [cmng.BASELINEMPC_CONFIG_TYPE,
338 cmng.BASELINEMINLPMPC_CONFIG_TYPE,
339 cmng.SHADOWMPC_CONFIG_TYPE,
340 cmng.SHADOWMINLPMPC_CONFIG_TYPE]:
341 return module['type']
343 raise ModuleNotFoundError(f'There is no matching mpc module type in '
344 f'Agentlib_FlexQuant for modules in agent '
345 f'{agent_config.id}.')
347 def _resolve_sim_results_path(
348 self, sim_result_filename: str, results_path: Union[str, Path]
349 ) -> Path:
350 """
351 Resolve simulator results path with fallback strategy.
353 Tries multiple strategies to locate the simulator results file:
354 1. Use absolute path if file exists there
355 2. Use relative path as-is from current directory
356 3. Use filename only and look in results directory
357 (handles both relative paths and just filenames)
359 Args:
360 sim_result_filename: The result filename from simulator config
361 results_path: The results directory path
363 Returns:
364 Path object pointing to the simulator results file
366 Raises:
367 FileNotFoundError: If file cannot be found using any strategy
368 """
369 sim_results_path = Path(sim_result_filename)
370 results_path = Path(results_path)
372 # Strategy 1: If it's an absolute path and exists, use it
373 if sim_results_path.is_absolute() and sim_results_path.exists():
374 return sim_results_path
376 # Strategy 2: If it's a relative path, try it as-is from current directory
377 if not sim_results_path.is_absolute() and sim_results_path.exists():
378 return sim_results_path
380 # Strategy 3: Try in results directory (handles both relative paths
381 # and just filenames) (fallback for helper function usage)
382 results_dir_path = results_path / sim_results_path.name
383 if results_dir_path.exists():
384 return results_dir_path
386 # If none of the strategies worked, raise an error
387 raise FileNotFoundError("Could not locate simulator results file.")
389 def _load_results(
390 self, results: Union[str, Path, dict]
391 ) -> [dict[str, dict[str, pd.DataFrame]], Union[str, Path]]:
392 """Load dict with results for mpc, indicator, market and sim
393 from specified results path."""
394 # load results
395 if results is None:
396 res_path = self.flex_config.results_directory
397 elif isinstance(results, (str, Path)):
398 res_path = results
399 elif isinstance(results, dict):
400 res_path = self.flex_config.results_directory
401 else:
402 raise ValueError("results must be a path or dict")
404 res = {
405 self.baseline_agent_config.id: {
406 self.baseline_module_config.module_id: load_mpc(
407 Path(
408 res_path,
409 Path(
410 self.baseline_module_config.optimization_backend[
411 "results_file"
412 ]
413 ).name,
414 )
415 )
416 },
417 self.pos_flex_agent_config.id: {
418 self.pos_flex_module_config.module_id: load_mpc(
419 Path(
420 res_path,
421 Path(
422 self.pos_flex_module_config.optimization_backend[
423 "results_file"
424 ]
425 ).name,
426 )
427 )
428 },
429 self.neg_flex_agent_config.id: {
430 self.neg_flex_module_config.module_id: load_mpc(
431 Path(
432 res_path,
433 Path(
434 self.neg_flex_module_config.optimization_backend[
435 "results_file"
436 ]
437 ).name,
438 )
439 )
440 },
441 self.indicator_agent_config.id: {
442 self.indicator_module_config.module_id: load_indicator(
443 Path(
444 res_path,
445 Path(self.indicator_module_config.results_file).name,
446 )
447 )
448 },
449 }
450 if self.simulator_agent_config:
451 resolved_sim_results_path = self._resolve_sim_results_path(
452 self.simulator_module_config.result_filename, res_path
453 )
454 print(f"Sim results extracted from: {resolved_sim_results_path}")
455 res[self.simulator_agent_config.id] = {
456 self.simulator_module_config.module_id: load_sim(
457 resolved_sim_results_path,
458 )
459 }
460 if self.flex_config.market_config:
461 res[self.market_agent_config.id] = {
462 self.market_module_config.module_id: load_market(
463 Path(
464 res_path,
465 Path(self.market_module_config.results_file).name,
466 )
467 )
468 }
469 return res, res_path
471 def _load_results_dataframes(self, results_dict: dict):
472 """Load results dataframes for mpc, indicator, market and sim."""
473 if self.simulator_agent_config:
474 self.df_simulation = results_dict[self.simulator_agent_config.id][
475 self.simulator_module_config.module_id
476 ]
477 self.df_baseline = results_dict[self.baseline_agent_config.id][
478 self.baseline_module_config.module_id
479 ]
480 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][
481 self.pos_flex_module_config.module_id
482 ]
483 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][
484 self.neg_flex_module_config.module_id
485 ]
486 self.df_indicator = results_dict[self.indicator_agent_config.id][
487 self.indicator_module_config.module_id
488 ]
489 if self.flex_config.market_config:
490 self.df_market = results_dict[self.market_agent_config.id][
491 self.market_module_config.module_id
492 ]
493 else:
494 self.df_market = None
496 def _load_stats_dataframes(self, results_path):
497 """Load dataframes for mpc stats."""
498 self.df_baseline_stats = load_mpc_stats(
499 Path(
500 results_path,
501 Path(
502 self.baseline_module_config.optimization_backend["results_file"]
503 ).name,
504 )
505 )
506 self.df_pos_flex_stats = load_mpc_stats(
507 Path(
508 results_path,
509 Path(
510 self.pos_flex_module_config.optimization_backend["results_file"]
511 ).name,
512 )
513 )
514 self.df_neg_flex_stats = load_mpc_stats(
515 Path(
516 results_path,
517 Path(
518 self.neg_flex_module_config.optimization_backend["results_file"]
519 ).name,
520 )
521 )
523 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes):
524 """Convert the time in the dataframes to the desired timescale
526 Args:
527 to_timescale: The timescale to convert the data to
529 """
530 for df in (
531 [
532 self.df_baseline,
533 self.df_baseline_stats,
534 self.df_pos_flex,
535 self.df_pos_flex_stats,
536 self.df_neg_flex,
537 self.df_neg_flex_stats,
538 self.df_indicator,
539 ]
540 + ([self.df_market] if self.flex_config.market_config else [])
541 + ([self.df_simulation] if self.simulator_agent_config else [])
542 ):
543 convert_timescale_of_index(
544 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale
545 )
547 # Update current unit
548 self.current_timescale_of_data = to_timescale
550 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]:
551 """Get the intersection of the MPCs and the simulator variables.
553 Returns:
554 dictionary with the following structure:
555 Key: variable alias (from baseline)
556 Value: {module id: variable name}
558 """
559 id_alias_name_dict = {}
561 def get_id_alias_name_dict_element(alias: str):
562 # id as key, {id: name} as value
563 id_alias_name_dict[alias] = {}
564 for config in [
565 self.simulator_module_config,
566 self.baseline_module_config,
567 self.pos_flex_module_config,
568 self.neg_flex_module_config,
569 ]:
570 for var in config.get_variables():
571 if alias in (var.alias, var.name):
572 id_alias_name_dict[alias][config.module_id] = var.name
574 # States, controls and power variable
575 for variables in [
576 self.baseline_module_config.states,
577 self.baseline_module_config.controls,
578 ]:
579 for variable in variables:
580 get_id_alias_name_dict_element(variable.alias)
581 get_id_alias_name_dict_element(
582 self.flex_config.baseline_config_generator_data.power_variable
583 )
585 return id_alias_name_dict
587 def create_simulator_config_with_skipped_validation(
588 self,
589 sim_config_class: Type[SimulatorConfig],
590 sim_config: Dict[str, Any],
591 skip_fields: Optional[list[str]] = None,
592 ) -> SimulatorConfig:
593 """Create a Pydantic model instance while skipping validation for
594 specified fields.
596 This function allows partial validation of a model's config dictionary
597 by validating all fields except those listed in `skip_fields`.
598 Skipped fields are set on the instance after construction without
599 triggering their validators.
601 Args:
602 sim_config_class: The Pydantic model class to instantiate.
603 sim_config: The input configuration dictionary.
604 skip_fields: A list of field names to exclude from validation.
605 These fields will be manually set after instantiation.
607 Returns:
608 SimulatorConfig: An instance of the model_class with validated and
609 skipped fields assigned.
611 """
612 if skip_fields is None:
613 skip_fields = []
614 # Separate data into validated and skipped fields
615 validated_fields = {
616 field: value for field, value in sim_config.items() if
617 field not in skip_fields
618 }
619 skipped_fields = {
620 field: value for field, value in sim_config.items() if
621 field in skip_fields
622 }
623 # Create instance with validation for non-skipped fields
624 if validated_fields:
625 instance = sim_config_class(
626 **validated_fields, _agent_id=self.simulator_agent_config.id
627 )
628 else:
629 instance = sim_config_class.model_construct()
630 # Add skipped fields without validation
631 for field, value in skipped_fields.items():
632 # bypass pydantic immutability to directly set attribute value
633 object.__setattr__(instance, field, value)
634 # Store metadata about bypassed fields for deepcopy compatibility
635 object.__setattr__(instance, "_bypassed_fields", skip_fields)
636 object.__setattr__(instance, "_original_config", sim_config)
637 return instance
639 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results":
640 """Custom deepcopy implementation that handles Pydantic models with bypassed
641 validation.
642 Needed, if a Results object should be copied with copy.deepcopy, without
643 deleting the simulator results due to its pydantic validators.
644 """
645 # Create a new instance of the same class
646 new_instance = self.__class__.__new__(self.__class__)
647 # Add to memo immediately to prevent circular reference issues
648 memo[id(self)] = new_instance
649 for key, value in self.__dict__.items():
650 if key in ["simulator_module_config"] and hasattr(
651 value, "_original_config"
652 ):
653 # Reconstruct the specific problematic object instead of deep copying
654 new_value = self.create_simulator_config_with_skipped_validation(
655 sim_config_class=value.__class__,
656 sim_config=copy.deepcopy(value._original_config, memo),
657 skip_fields=getattr(value, "_bypassed_fields", []),
658 )
659 setattr(new_instance, key, new_value)
660 else:
661 # Everything else should deepcopy normally
662 setattr(new_instance, key, copy.deepcopy(value, memo))
663 return new_instance