Coverage for agentlib_flexquant/data_structures/flex_results.py: 97%
207 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
1"""
2Module for generating and managing results dataframes for flexibility analysis.
3Results include baseline, positive and negative flexibility data,
4the indicator, market and simulator results/data.
5"""
6import copy
7import json
8from pathlib import Path
9from typing import Any, Dict, Optional, Type, Union
11import pandas as pd
12from pydantic import FilePath
13from agentlib.core.agent import AgentConfig
14from agentlib.modules.simulation.simulator import SimulatorConfig
15from agentlib.utils import load_config
16from agentlib_mpc.modules.mpc.mpc import BaseMPCConfig
17from agentlib_mpc.utils import TimeConversionTypes
18from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats, load_sim
20import agentlib_flexquant.utils.config_management as cmng
21from agentlib_flexquant.data_structures.flexquant import (
22 FlexQuantConfig,
23 FlexibilityMarketConfig,
24 FlexibilityIndicatorConfig
25)
26from agentlib_flexquant.data_structures.mpcs import (
27 BaselineMPCData,
28 NFMPCData,
29 PFMPCData,
30)
31from agentlib_flexquant.modules.flexibility_indicator import (
32 FlexibilityIndicatorModuleConfig,
33)
34from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
35from agentlib_flexquant.utils.data_handling import convert_timescale_of_index
38def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame:
39 """Load the flexibility indicator results from the given file path.
41 Args:
42 file_path: the file path of the indicator results file
44 Returns:
45 DataFrame containing the indicator results
47 """
48 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
49 return df
52def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame:
53 """Load the market results from the given file path.
55 Args:
56 file_path: the file path of the market results file
58 Returns:
59 DataFrame containing the market results
61 """
62 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
63 return df
66class Results:
67 """
68 Loads the results for the baseline, positive and negative flexibility,
69 the indicator, market and simulator results/data. Additionally the MPC stats
70 are loaded.
72 Results can be loaded either from a user-specified custom base path or from the
73 (default) base path specified in the flex config.
75 Loaded results are stored in pandas DataFrames which can be used for further
76 processing, e.g. plotting and analysis.
77 """
79 # Configs:
80 # Generator
81 flex_config: FlexQuantConfig
82 # Agents
83 simulator_agent_config: Optional[AgentConfig]
84 baseline_agent_config: AgentConfig
85 pos_flex_agent_config: AgentConfig
86 neg_flex_agent_config: AgentConfig
87 indicator_agent_config: AgentConfig
88 market_agent_config: AgentConfig
89 # Modules
90 simulator_module_config: SimulatorConfig
91 baseline_module_config: BaseMPCConfig
92 pos_flex_module_config: BaseMPCConfig
93 neg_flex_module_config: BaseMPCConfig
94 indicator_module_config: FlexibilityIndicatorModuleConfig
95 market_module_config: FlexibilityMarketModuleConfig
97 # Dataframes
98 df_simulation: pd.DataFrame
99 df_baseline: pd.DataFrame
100 df_pos_flex: pd.DataFrame
101 df_neg_flex: pd.DataFrame
102 df_indicator: pd.DataFrame
103 df_market: pd.DataFrame
105 # Stats of the MPCs
106 df_baseline_stats: pd.DataFrame
107 df_pos_flex_stats: pd.DataFrame
108 df_neg_flex_stats: pd.DataFrame
110 # time conversion
111 current_timescale_of_data: TimeConversionTypes = "seconds"
113 def __init__(
114 self,
115 flex_config: Optional[Union[str, FilePath, dict]],
116 simulator_agent_config: Optional[Union[str, FilePath, dict]],
117 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
118 results: Optional[
119 Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]
120 ] = None,
121 to_timescale: TimeConversionTypes = "seconds",
122 ):
123 # Already a Results instance — copy over its data
124 if isinstance(results, Results):
125 self.__dict__ = copy.deepcopy(results).__dict__
126 return
128 # Load flex config
129 self._load_flex_config(flex_config, generated_flex_files_base_path)
130 # Get filenames of configs to load agents and modules
131 self._get_config_filenames()
132 # Load configs for mpc, indicator, market
133 self._load_agent_module_configs()
134 # Load sim configs if present
135 self._load_simulator_config(simulator_agent_config)
136 # Load results and get a dict for generating dataframes
137 results_dict, results_path = self._load_results(results)
138 # Get dataframes for mpc, sim, flex indicator results
139 self._load_results_dataframes(results_dict)
140 # Get dataframes for mpc stats
141 self._load_stats_dataframes(results_path)
142 # Convert the time in the dataframes to the desired timescale
143 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale)
145 # Clear unpicklable model reference to enable multiprocessing
146 self._clear_unpicklable_references()
148 def _clear_unpicklable_references(self):
149 """Remove references to objects that cannot be pickled.
151 This enables the Results object to be used with multiprocessing.
152 The model field contains CDLL references that cannot be serialized.
153 """
154 if (hasattr(self, 'simulator_module_config') and
155 self.simulator_module_config is not None):
156 if hasattr(self.simulator_module_config, 'model'):
157 object.__setattr__(self.simulator_module_config, 'model', None)
159 def _load_flex_config(
160 self,
161 flex_config: Optional[Union[str, FilePath, dict]],
162 custom_base_path: Optional[Union[str, FilePath]],
163 ):
164 """Load the flex config and optionally override the base directory path.
166 If a custom base path is provided, it overwrites the "flex_base_directory_path"
167 in the given config. This is useful when the generated flex files are saved
168 to a custom directory instead of the default (current working directory).
170 Args:
171 flex_config: The config for flexibility quantification.
172 custom_base_path: The custom directory for saving the generated flex files
173 defined by user.
175 """
176 if custom_base_path is not None:
177 if isinstance(flex_config, (str, Path)):
178 with open(flex_config, "r", encoding="utf-8") as f:
179 flex_config = json.load(f)
180 flex_config["flex_base_directory_path"] = str(custom_base_path)
182 self.flex_config = load_config.load_config(
183 config=flex_config, config_type=FlexQuantConfig
184 )
186 def _get_config_filenames(self):
187 """Get filenames of configs to load agents and modules."""
188 self.config_filename_baseline = BaselineMPCData.model_validate(
189 self.flex_config.baseline_config_generator_data
190 ).name_of_created_file
191 self.config_filename_pos_flex = PFMPCData.model_validate(
192 self.flex_config.shadow_mpc_config_generator_data.pos_flex
193 ).name_of_created_file
194 self.config_filename_neg_flex = NFMPCData.model_validate(
195 self.flex_config.shadow_mpc_config_generator_data.neg_flex
196 ).name_of_created_file
198 self.config_filename_indicator = FlexibilityIndicatorConfig.model_validate(
199 self.flex_config.indicator_config).name_of_created_file
201 if self.flex_config.market_config:
202 if isinstance(self.flex_config.market_config, Union[str, Path]):
203 self.config_filename_market = load_config.load_config(
204 config=self.flex_config.market_config,
205 config_type=FlexibilityMarketConfig
206 ).name_of_created_file
207 else: # is dict
208 self.config_filename_market = FlexibilityMarketConfig.model_validate(
209 self.flex_config.market_config).name_of_created_file
211 def _load_agent_module_configs(self):
212 """Load agent and module configs."""
213 files_found = []
214 for file_path in Path(self.flex_config.flex_files_directory).rglob(
215 "*.json"
216 ):
217 if file_path.name in self.config_filename_baseline:
218 self.baseline_agent_config = load_config.load_config(
219 config=file_path, config_type=AgentConfig
220 )
221 self.baseline_module_config = cmng.get_module(
222 config=self.baseline_agent_config,
223 module_type=
224 self._get_flexquant_mpc_module_type(self.baseline_agent_config),
225 )
226 files_found.append(self.config_filename_baseline)
228 elif file_path.name in self.config_filename_pos_flex:
229 self.pos_flex_agent_config = load_config.load_config(
230 config=file_path, config_type=AgentConfig
231 )
232 self.pos_flex_module_config = cmng.get_module(
233 config=self.pos_flex_agent_config,
234 module_type=
235 self._get_flexquant_mpc_module_type(self.pos_flex_agent_config),
236 )
237 files_found.append(self.config_filename_pos_flex)
239 elif file_path.name in self.config_filename_neg_flex:
240 self.neg_flex_agent_config = load_config.load_config(
241 config=file_path, config_type=AgentConfig
242 )
243 self.neg_flex_module_config = cmng.get_module(
244 config=self.neg_flex_agent_config,
245 module_type=
246 self._get_flexquant_mpc_module_type(self.neg_flex_agent_config),
247 )
248 files_found.append(self.config_filename_neg_flex)
250 elif file_path.name in self.config_filename_indicator:
251 self.indicator_agent_config = load_config.load_config(
252 config=file_path, config_type=AgentConfig
253 )
254 self.indicator_module_config = cmng.get_module(
255 config=self.indicator_agent_config,
256 module_type=cmng.INDICATOR_CONFIG_TYPE,
257 )
258 files_found.append(self.config_filename_indicator)
260 elif (
261 self.flex_config.market_config
262 and file_path.name in self.config_filename_market
263 ):
264 self.market_agent_config = load_config.load_config(
265 config=file_path, config_type=AgentConfig
266 )
267 self.market_module_config = cmng.get_module(
268 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
269 )
270 files_found.append(self.config_filename_market)
271 files_needed = [self.config_filename_baseline,
272 self.config_filename_pos_flex, self.config_filename_neg_flex,
273 self.config_filename_indicator]
274 if self.flex_config.market_config:
275 files_needed.append(self.config_filename_market)
276 difference = list(set(files_needed) - set(files_found))
277 if difference:
278 import warnings
279 warnings.warn(f"The files {difference} have not been found in the "
280 f"given Path. This will most likely cause problems "
281 f"later on. Please check the filenames.")
283 def _load_simulator_config(self, simulator_agent_config):
284 """Load simulator agent and module config separately.
286 Separate loading is required to skip pydantic validation for specific field(s).
288 """
289 # check config type: with results path adaptation -> dict; without -> str/Path
290 if not simulator_agent_config:
291 self.simulator_agent_config = None
292 return
293 if isinstance(simulator_agent_config, (str, Path)):
294 with open(simulator_agent_config, "r", encoding="utf-8") as f:
295 sim_config = json.load(f)
296 else: # is dict
297 sim_config = simulator_agent_config
298 sim_module_config = next(
299 (
300 module
301 for module in sim_config["modules"]
302 if module["type"] == "simulator"
303 ),
304 None,
305 )
306 # instantiate and validate sim agent config
307 self.simulator_agent_config = AgentConfig.model_validate(sim_config)
308 # instantiate sim module config by skipping validation for result_filename
309 # to prevent file deletion, if overwrite_result_file in sim config is true
310 self.simulator_module_config = (
311 self.create_simulator_config_with_skipped_validation(
312 sim_config_class=SimulatorConfig,
313 sim_config=sim_module_config,
314 skip_fields=["result_filename"],
315 ))
317 def _get_flexquant_mpc_module_type(self, agent_config: AgentConfig) -> str:
318 """Get the mpc module type from agent_config.
320 The module type is defined in agentlib_flexquant.
322 Args:
323 agent_config: the AgentConfig containing the mpc module
325 Returns:
326 The type of the mpc module
328 """
329 for module in agent_config.modules:
330 if module['type'] in [cmng.BASELINEMPC_CONFIG_TYPE,
331 cmng.BASELINEMINLPMPC_CONFIG_TYPE,
332 cmng.SHADOWMPC_CONFIG_TYPE,
333 cmng.SHADOWMINLPMPC_CONFIG_TYPE]:
334 return module['type']
336 raise ModuleNotFoundError(f'There is no matching mpc module type in '
337 f'Agentlib_FlexQuant for modules in agent '
338 f'{agent_config.id}.')
340 def _resolve_sim_results_path(
341 self, sim_result_filename: str, results_path: Union[str, Path]
342 ) -> Path:
343 """
344 Resolve simulator results path with fallback strategy.
346 Tries multiple strategies to locate the simulator results file:
347 1. Use absolute path if file exists there
348 2. Use relative path as-is from current directory
349 3. Use filename only and look in results directory
350 (handles both relative paths and just filenames)
352 Args:
353 sim_result_filename: The result filename from simulator config
354 results_path: The results directory path
356 Returns:
357 Path object pointing to the simulator results file
359 Raises:
360 FileNotFoundError: If file cannot be found using any strategy
361 """
362 sim_results_path = Path(sim_result_filename)
363 results_path = Path(results_path)
365 # Strategy 1: If it's an absolute path and exists, use it
366 if sim_results_path.is_absolute() and sim_results_path.exists():
367 return sim_results_path
369 # Strategy 2: If it's a relative path, try it as-is from current directory
370 if not sim_results_path.is_absolute() and sim_results_path.exists():
371 return sim_results_path
373 # Strategy 3: Try in results directory (handles both relative paths
374 # and just filenames) (fallback for helper function usage)
375 results_dir_path = results_path / sim_results_path.name
376 if results_dir_path.exists():
377 return results_dir_path
379 # If none of the strategies worked, raise an error
380 raise FileNotFoundError("Could not locate simulator results file.")
382 def _load_results(
383 self, results: Union[str, Path, dict]
384 ) -> [dict[str, dict[str, pd.DataFrame]], Union[str, Path]]:
385 """Load dict with results for mpc, indicator, market and sim
386 from specified results path."""
387 # load results
388 if results is None:
389 res_path = self.flex_config.results_directory
390 elif isinstance(results, (str, Path)):
391 res_path = results
392 elif isinstance(results, dict):
393 res_path = self.flex_config.results_directory
394 else:
395 raise ValueError("results must be a path or dict")
397 res = {
398 self.baseline_agent_config.id: {
399 self.baseline_module_config.module_id: load_mpc(
400 Path(
401 res_path,
402 Path(
403 self.baseline_module_config.optimization_backend[
404 "results_file"
405 ]
406 ).name,
407 )
408 )
409 },
410 self.pos_flex_agent_config.id: {
411 self.pos_flex_module_config.module_id: load_mpc(
412 Path(
413 res_path,
414 Path(
415 self.pos_flex_module_config.optimization_backend[
416 "results_file"
417 ]
418 ).name,
419 )
420 )
421 },
422 self.neg_flex_agent_config.id: {
423 self.neg_flex_module_config.module_id: load_mpc(
424 Path(
425 res_path,
426 Path(
427 self.neg_flex_module_config.optimization_backend[
428 "results_file"
429 ]
430 ).name,
431 )
432 )
433 },
434 self.indicator_agent_config.id: {
435 self.indicator_module_config.module_id: load_indicator(
436 Path(
437 res_path,
438 Path(self.indicator_module_config.results_file).name,
439 )
440 )
441 },
442 }
443 if self.simulator_agent_config:
444 resolved_sim_results_path = self._resolve_sim_results_path(
445 self.simulator_module_config.result_filename, res_path
446 )
447 print(f"Sim results extracted from: {resolved_sim_results_path}")
448 res[self.simulator_agent_config.id] = {
449 self.simulator_module_config.module_id: load_sim(
450 resolved_sim_results_path,
451 )
452 }
453 if self.flex_config.market_config:
454 res[self.market_agent_config.id] = {
455 self.market_module_config.module_id: load_market(
456 Path(
457 res_path,
458 Path(self.market_module_config.results_file).name,
459 )
460 )
461 }
462 return res, res_path
464 def _load_results_dataframes(self, results_dict: dict):
465 """Load results dataframes for mpc, indicator, market and sim."""
466 if self.simulator_agent_config:
467 self.df_simulation = results_dict[self.simulator_agent_config.id][
468 self.simulator_module_config.module_id
469 ]
470 self.df_baseline = results_dict[self.baseline_agent_config.id][
471 self.baseline_module_config.module_id
472 ]
473 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][
474 self.pos_flex_module_config.module_id
475 ]
476 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][
477 self.neg_flex_module_config.module_id
478 ]
479 self.df_indicator = results_dict[self.indicator_agent_config.id][
480 self.indicator_module_config.module_id
481 ]
482 if self.flex_config.market_config:
483 self.df_market = results_dict[self.market_agent_config.id][
484 self.market_module_config.module_id
485 ]
486 else:
487 self.df_market = None
489 def _load_stats_dataframes(self, results_path):
490 """Load dataframes for mpc stats."""
491 self.df_baseline_stats = load_mpc_stats(
492 Path(
493 results_path,
494 Path(
495 self.baseline_module_config.optimization_backend["results_file"]
496 ).name,
497 )
498 )
499 self.df_pos_flex_stats = load_mpc_stats(
500 Path(
501 results_path,
502 Path(
503 self.pos_flex_module_config.optimization_backend["results_file"]
504 ).name,
505 )
506 )
507 self.df_neg_flex_stats = load_mpc_stats(
508 Path(
509 results_path,
510 Path(
511 self.neg_flex_module_config.optimization_backend["results_file"]
512 ).name,
513 )
514 )
516 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes):
517 """Convert the time in the dataframes to the desired timescale
519 Args:
520 to_timescale: The timescale to convert the data to
522 """
523 for df in (
524 [
525 self.df_baseline,
526 self.df_baseline_stats,
527 self.df_pos_flex,
528 self.df_pos_flex_stats,
529 self.df_neg_flex,
530 self.df_neg_flex_stats,
531 self.df_indicator,
532 ]
533 + ([self.df_market] if self.flex_config.market_config else [])
534 + ([self.df_simulation] if self.simulator_agent_config else [])
535 ):
536 convert_timescale_of_index(
537 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale
538 )
540 # Update current unit
541 self.current_timescale_of_data = to_timescale
543 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]:
544 """Get the intersection of the MPCs and the simulator variables.
546 Returns:
547 dictionary with the following structure:
548 Key: variable alias (from baseline)
549 Value: {module id: variable name}
551 """
552 id_alias_name_dict = {}
554 def get_id_alias_name_dict_element(alias: str):
555 # id as key, {id: name} as value
556 id_alias_name_dict[alias] = {}
557 for config in [
558 self.simulator_module_config,
559 self.baseline_module_config,
560 self.pos_flex_module_config,
561 self.neg_flex_module_config,
562 ]:
563 for var in config.get_variables():
564 if alias in (var.alias, var.name):
565 id_alias_name_dict[alias][config.module_id] = var.name
567 # States, controls and power variable
568 for variables in [
569 self.baseline_module_config.states,
570 self.baseline_module_config.controls,
571 ]:
572 for variable in variables:
573 get_id_alias_name_dict_element(variable.alias)
574 get_id_alias_name_dict_element(
575 self.flex_config.baseline_config_generator_data.power_variable
576 )
578 return id_alias_name_dict
580 def create_simulator_config_with_skipped_validation(
581 self,
582 sim_config_class: Type[SimulatorConfig],
583 sim_config: Dict[str, Any],
584 skip_fields: Optional[list[str]] = None,
585 ) -> SimulatorConfig:
586 """Create a Pydantic model instance while skipping validation for
587 specified fields.
589 This function allows partial validation of a model's config dictionary
590 by validating all fields except those listed in `skip_fields`.
591 Skipped fields are set on the instance after construction without
592 triggering their validators.
594 Args:
595 sim_config_class: The Pydantic model class to instantiate.
596 sim_config: The input configuration dictionary.
597 skip_fields: A list of field names to exclude from validation.
598 These fields will be manually set after instantiation.
600 Returns:
601 SimulatorConfig: An instance of the model_class with validated and
602 skipped fields assigned.
604 """
605 if skip_fields is None:
606 skip_fields = []
607 # Separate data into validated and skipped fields
608 validated_fields = {
609 field: value for field, value in sim_config.items() if
610 field not in skip_fields
611 }
612 skipped_fields = {
613 field: value for field, value in sim_config.items() if
614 field in skip_fields
615 }
616 # Create instance with validation for non-skipped fields
617 if validated_fields:
618 instance = sim_config_class(
619 **validated_fields, _agent_id=self.simulator_agent_config.id
620 )
621 else:
622 instance = sim_config_class.model_construct()
623 # Add skipped fields without validation
624 for field, value in skipped_fields.items():
625 # bypass pydantic immutability to directly set attribute value
626 object.__setattr__(instance, field, value)
627 # Store metadata about bypassed fields for deepcopy compatibility
628 object.__setattr__(instance, "_bypassed_fields", skip_fields)
629 object.__setattr__(instance, "_original_config", sim_config)
630 return instance
632 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results":
633 """Custom deepcopy implementation that handles Pydantic models with bypassed
634 validation.
635 Needed, if a Results object should be copied with copy.deepcopy, without
636 deleting the simulator results due to its pydantic validators.
637 """
638 # Create a new instance of the same class
639 new_instance = self.__class__.__new__(self.__class__)
640 # Add to memo immediately to prevent circular reference issues
641 memo[id(self)] = new_instance
642 for key, value in self.__dict__.items():
643 if key in ["simulator_module_config"] and hasattr(
644 value, "_original_config"
645 ):
646 # Reconstruct the specific problematic object instead of deep copying
647 new_value = self.create_simulator_config_with_skipped_validation(
648 sim_config_class=value.__class__,
649 sim_config=copy.deepcopy(value._original_config, memo),
650 skip_fields=getattr(value, "_bypassed_fields", []),
651 )
652 setattr(new_instance, key, new_value)
653 else:
654 # Everything else should deepcopy normally
655 setattr(new_instance, key, copy.deepcopy(value, memo))
656 return new_instance