Coverage for agentlib_flexquant/data_structures/flex_results.py: 31%
190 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1"""
2Module for generating and managing results dataframes for flexibility analysis.
3Results include baseline, positive and negative flexibility data,
4the indicator, market and simulator results/data.
5"""
6import copy
7import json
8from pathlib import Path
9from typing import Any, Dict, Optional, Type, Union
11import pandas as pd
12from pydantic import BaseModel, FilePath
13from agentlib.core.agent import AgentConfig
14from agentlib.modules.simulation.simulator import SimulatorConfig
15from agentlib.utils import load_config
16from agentlib_mpc.modules.mpc import BaseMPCConfig
17from agentlib_mpc.utils import TimeConversionTypes
18from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats, load_sim
20import agentlib_flexquant.utils.config_management as cmng
21from agentlib_flexquant.data_structures.flexquant import (
22 FlexQuantConfig,
23 FlexibilityMarketConfig,
24)
25from agentlib_flexquant.data_structures.mpcs import (
26 BaselineMPCData,
27 NFMPCData,
28 PFMPCData,
29)
30from agentlib_flexquant.modules.flexibility_indicator import (
31 FlexibilityIndicatorModuleConfig,
32)
33from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
34from agentlib_flexquant.utils.data_handling import convert_timescale_of_index
37def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame:
38 """Load the flexibility indicator results from the given file path.
40 Args:
41 file_path: the file path of the indicator results file
43 Returns:
44 DataFrame containing the indicator results
46 """
47 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
48 return df
51def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame:
52 """Load the market results from the given file path.
54 Args:
55 file_path: the file path of the market results file
57 Returns:
58 DataFrame containing the market results
60 """
61 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
62 return df
65class Results:
66 """
67 Loads the results for the baseline, positive and negative flexibility,
68 the indicator, market and simulator results/data. Additionally the MPC stats are loaded.
70 Results can be loaded either from a user-specified custom base path or from the
71 (default) base path specified in the flex config.
73 Loaded results are stored in pandas DataFrames which can be used for further processing,
74 e.g. plotting and analysis.
75 """
77 # Configs:
78 # Generator
79 generator_config: FlexQuantConfig
80 # Agents
81 simulator_agent_config: AgentConfig
82 baseline_agent_config: AgentConfig
83 pos_flex_agent_config: AgentConfig
84 neg_flex_agent_config: AgentConfig
85 indicator_agent_config: AgentConfig
86 market_agent_config: AgentConfig
87 # Modules
88 simulator_module_config: SimulatorConfig
89 baseline_module_config: BaseMPCConfig
90 pos_flex_module_config: BaseMPCConfig
91 neg_flex_module_config: BaseMPCConfig
92 indicator_module_config: FlexibilityIndicatorModuleConfig
93 market_module_config: FlexibilityMarketModuleConfig
95 # Dataframes
96 df_simulation: pd.DataFrame
97 df_baseline: pd.DataFrame
98 df_pos_flex: pd.DataFrame
99 df_neg_flex: pd.DataFrame
100 df_indicator: pd.DataFrame
101 df_market: pd.DataFrame
103 # Stats of the MPCs
104 df_baseline_stats: pd.DataFrame
105 df_pos_flex_stats: pd.DataFrame
106 df_neg_flex_stats: pd.DataFrame
108 # time conversion
109 current_timescale_of_data: TimeConversionTypes = "seconds"
111 def __init__(
112 self,
113 flex_config: Optional[Union[str, FilePath, dict]],
114 simulator_agent_config: Optional[Union[str, FilePath, dict]],
115 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
116 results: Optional[
117 Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]
118 ] = None,
119 to_timescale: TimeConversionTypes = "seconds",
120 ):
121 # Already a Results instance — copy over its data
122 if isinstance(results, Results):
123 self.__dict__ = copy.deepcopy(results).__dict__
124 return
126 # Load flex config
127 self._load_flex_config(flex_config, generated_flex_files_base_path)
128 # Get filenames of configs to load agents and modules
129 self._get_config_filenames()
130 # Load configs for mpc, indicator, market
131 self._load_agent_module_configs()
132 # Load sim configs if present
133 if simulator_agent_config:
134 self._load_simulator_config(simulator_agent_config)
135 # Load results and get a dict for generating dataframes
136 results_dict, results_path = self._load_results(results)
137 # Get dataframes for mpc, sim, flex indicator results
138 self._load_results_dataframes(results_dict)
139 # Get dataframes for mpc stats
140 self._load_stats_dataframes(results_path)
141 # Convert the time in the dataframes to the desired timescale
142 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale)
144 def _load_flex_config(
145 self,
146 flex_config: Optional[Union[str, FilePath, dict]],
147 custom_base_path: Optional[Union[str, FilePath]],
148 ):
149 """Load the flex config and optionally override the base directory path.
151 If a custom base path is provided, it overwrites the "flex_base_directory_path"
152 in the given config. This is useful when the generated flex files are saved
153 to a custom directory instead of the default (current working directory).
155 Args:
156 flex_config: The config for flexibility quantification.
157 custom_base_path: The custom directory for saving the generated flex files
158 defined by user.
160 """
161 if custom_base_path is not None:
162 if isinstance(flex_config, (str, Path)):
163 with open(flex_config, "r", encoding="utf-8") as f:
164 flex_config = json.load(f)
165 flex_config["flex_base_directory_path"] = str(custom_base_path)
167 self.generator_config = load_config.load_config(
168 config=flex_config, config_type=FlexQuantConfig
169 )
171 def _get_config_filenames(self):
172 """Get filenames of configs to load agents and modules."""
173 self.config_filename_baseline = BaselineMPCData.model_validate(
174 self.generator_config.baseline_config_generator_data
175 ).name_of_created_file
176 self.config_filename_pos_flex = PFMPCData.model_validate(
177 self.generator_config.shadow_mpc_config_generator_data.pos_flex
178 ).name_of_created_file
179 self.config_filename_neg_flex = NFMPCData.model_validate(
180 self.generator_config.shadow_mpc_config_generator_data.neg_flex
181 ).name_of_created_file
182 self.config_filename_indicator = (
183 self.generator_config.indicator_config.name_of_created_file
184 )
186 if self.generator_config.market_config:
187 market_config_raw = self.generator_config.market_config
188 if isinstance(market_config_raw, (str, Path)):
189 market_config = FlexibilityMarketConfig.model_validate_json(
190 Path(market_config_raw).read_text(encoding="utf-8")
191 )
192 else:
193 market_config = FlexibilityMarketConfig.model_validate(
194 market_config_raw
195 )
196 self.config_filename_market = market_config.name_of_created_file
198 def _load_agent_module_configs(self):
199 """Load agent and module configs."""
200 for file_path in Path(self.generator_config.flex_files_directory).rglob(
201 "*.json"
202 ):
203 if file_path.name in self.config_filename_baseline:
204 self.baseline_agent_config = load_config.load_config(
205 config=file_path, config_type=AgentConfig
206 )
207 self.baseline_module_config = cmng.get_module(
208 config=self.baseline_agent_config,
209 module_type=self._get_flexquant_mpc_module_type(self.baseline_agent_config),
210 )
212 elif file_path.name in self.config_filename_pos_flex:
213 self.pos_flex_agent_config = load_config.load_config(
214 config=file_path, config_type=AgentConfig
215 )
216 self.pos_flex_module_config = cmng.get_module(
217 config=self.pos_flex_agent_config,
218 module_type=self._get_flexquant_mpc_module_type(self.pos_flex_agent_config),
219 )
221 elif file_path.name in self.config_filename_neg_flex:
222 self.neg_flex_agent_config = load_config.load_config(
223 config=file_path, config_type=AgentConfig
224 )
225 self.neg_flex_module_config = cmng.get_module(
226 config=self.neg_flex_agent_config,
227 module_type=self._get_flexquant_mpc_module_type(self.neg_flex_agent_config),
228 )
230 elif file_path.name in self.config_filename_indicator:
231 self.indicator_agent_config = load_config.load_config(
232 config=file_path, config_type=AgentConfig
233 )
234 self.indicator_module_config = cmng.get_module(
235 config=self.indicator_agent_config,
236 module_type=cmng.INDICATOR_CONFIG_TYPE,
237 )
239 elif (
240 self.generator_config.market_config
241 and file_path.name in self.config_filename_market
242 ):
243 self.market_agent_config = load_config.load_config(
244 config=file_path, config_type=AgentConfig
245 )
246 self.market_module_config = cmng.get_module(
247 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
248 )
250 def _load_simulator_config(self, simulator_agent_config):
251 """Load simulator agent and module config separately.
253 Separate loading is required to skip pydantic validation for specific field(s).
255 """
256 # check config type: with results path adaptation -> dict; without -> str/Path
257 if isinstance(simulator_agent_config, (str, Path)):
258 with open(simulator_agent_config, "r", encoding="utf-8") as f:
259 sim_config = json.load(f)
260 elif isinstance(simulator_agent_config, dict):
261 sim_config = simulator_agent_config
262 sim_module_config = next(
263 (
264 module
265 for module in sim_config["modules"]
266 if module["type"] == "simulator"
267 ),
268 None,
269 )
270 # instantiate and validate sim agent config
271 self.simulator_agent_config = AgentConfig.model_validate(sim_config)
272 # instantiate sim module config by skipping validation for result_filename
273 # to prevent file deletion
274 self.simulator_module_config = self.create_instance_with_skipped_validation(
275 model_class=SimulatorConfig,
276 config=sim_module_config,
277 skip_fields=["result_filename"],
278 )
280 def _get_flexquant_mpc_module_type(self, agent_config: AgentConfig) -> str:
281 """Get the mpc module type from agent_config.
283 The module type is defined in agentlib_flexquant.
285 Args:
286 agent_config: the AgentConfig containing the mpc module
288 Returns:
289 The type of the mpc module
291 """
292 for module in agent_config.modules:
293 if module['type'] in [cmng.BASELINEMPC_CONFIG_TYPE, cmng.BASELINEMINLPMPC_CONFIG_TYPE,
294 cmng.SHADOWMPC_CONFIG_TYPE, cmng.SHADOWMINLPMPC_CONFIG_TYPE]:
295 return module['type']
297 raise ModuleNotFoundError(f'There is no matching mpc module type in Agentlib_FlexQuant for '
298 f'modules in agent {agent_config.id}.')
300 def _resolve_sim_results_path(
301 self, sim_result_filename: str, results_path: Union[str, Path]
302 ) -> Path:
303 """
304 Resolve simulator results path with fallback strategy.
306 Tries multiple strategies to locate the simulator results file:
307 1. Use absolute path if file exists there
308 2. Use relative path as-is from current directory
309 3. Use filename only and look in results directory
310 (handles both relative paths and just filenames)
312 Args:
313 sim_result_filename: The result filename from simulator config
314 results_path: The results directory path
316 Returns:
317 Path object pointing to the simulator results file
319 Raises:
320 FileNotFoundError: If file cannot be found using any strategy
321 """
322 sim_results_path = Path(sim_result_filename)
323 results_path = Path(results_path)
325 # Strategy 1: If it's an absolute path and exists, use it
326 if sim_results_path.is_absolute() and sim_results_path.exists():
327 return sim_results_path
329 # Strategy 2: If it's a relative path, try it as-is from current directory
330 if not sim_results_path.is_absolute() and sim_results_path.exists():
331 return sim_results_path
333 # Strategy 3: Try in results directory (handles both relative paths and just filenames)
334 # (fallback for helper function usage)
335 results_dir_path = results_path / sim_results_path.name
336 if results_dir_path.exists():
337 return results_dir_path
339 # If none of the strategies worked, raise an error
340 raise FileNotFoundError("Could not locate simulator results file.")
342 def _load_results(
343 self, results: Union[str, Path, dict]
344 ) -> dict[str, dict[str, pd.DataFrame]]:
345 """Load dict with results for mpc, indicator, market and sim
346 from specified results path."""
347 # load results
348 if results is None:
349 res_path = self.generator_config.results_directory
350 elif isinstance(results, (str, Path)):
351 res_path = results
352 elif isinstance(results, dict):
353 res_path = self.generator_config.results_directory
354 else:
355 raise ValueError("results must be a path or dict")
357 res = {
358 self.baseline_agent_config.id: {
359 self.baseline_module_config.module_id: load_mpc(
360 Path(
361 res_path,
362 Path(
363 self.baseline_module_config.optimization_backend[
364 "results_file"
365 ]
366 ).name,
367 )
368 )
369 },
370 self.pos_flex_agent_config.id: {
371 self.pos_flex_module_config.module_id: load_mpc(
372 Path(
373 res_path,
374 Path(
375 self.pos_flex_module_config.optimization_backend[
376 "results_file"
377 ]
378 ).name,
379 )
380 )
381 },
382 self.neg_flex_agent_config.id: {
383 self.neg_flex_module_config.module_id: load_mpc(
384 Path(
385 res_path,
386 Path(
387 self.neg_flex_module_config.optimization_backend[
388 "results_file"
389 ]
390 ).name,
391 )
392 )
393 },
394 self.indicator_agent_config.id: {
395 self.indicator_module_config.module_id: load_indicator(
396 Path(
397 res_path,
398 Path(self.indicator_module_config.results_file).name,
399 )
400 )
401 },
402 }
403 if self.simulator_agent_config:
404 resolved_sim_results_path = self._resolve_sim_results_path(
405 self.simulator_module_config.result_filename, res_path
406 )
407 print(f"Sim results extracted from: {resolved_sim_results_path}")
408 res[self.simulator_agent_config.id] = {
409 self.simulator_module_config.module_id: load_sim(
410 resolved_sim_results_path,
411 )
412 }
413 if self.generator_config.market_config:
414 res[self.market_agent_config.id] = {
415 self.market_module_config.module_id: load_market(
416 Path(
417 res_path,
418 Path(self.market_module_config.results_file).name,
419 )
420 )
421 }
422 return res, res_path
424 def _load_results_dataframes(self, results_dict: dict):
425 """Load results dataframes for mpc, indicator, market and sim."""
426 if self.simulator_agent_config:
427 self.df_simulation = results_dict[self.simulator_agent_config.id][
428 self.simulator_module_config.module_id
429 ]
430 self.df_baseline = results_dict[self.baseline_agent_config.id][
431 self.baseline_module_config.module_id
432 ]
433 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][
434 self.pos_flex_module_config.module_id
435 ]
436 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][
437 self.neg_flex_module_config.module_id
438 ]
439 self.df_indicator = results_dict[self.indicator_agent_config.id][
440 self.indicator_module_config.module_id
441 ]
442 if self.generator_config.market_config:
443 self.df_market = results_dict[self.market_agent_config.id][
444 self.market_module_config.module_id
445 ]
446 else:
447 self.df_market = None
449 def _load_stats_dataframes(self, results_path):
450 """Load dataframes for mpc stats."""
451 self.df_baseline_stats = load_mpc_stats(
452 Path(
453 results_path,
454 Path(
455 self.baseline_module_config.optimization_backend["results_file"]
456 ).name,
457 )
458 )
459 self.df_pos_flex_stats = load_mpc_stats(
460 Path(
461 results_path,
462 Path(
463 self.pos_flex_module_config.optimization_backend["results_file"]
464 ).name,
465 )
466 )
467 self.df_neg_flex_stats = load_mpc_stats(
468 Path(
469 results_path,
470 Path(
471 self.neg_flex_module_config.optimization_backend["results_file"]
472 ).name,
473 )
474 )
476 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes):
477 """Convert the time in the dataframes to the desired timescale
479 Args:
480 to_timescale: The timescale to convert the data to
482 """
483 for df in (
484 [
485 self.df_baseline,
486 self.df_baseline_stats,
487 self.df_pos_flex,
488 self.df_pos_flex_stats,
489 self.df_neg_flex,
490 self.df_neg_flex_stats,
491 self.df_indicator,
492 ]
493 + ([self.df_market] if self.generator_config.market_config else [])
494 + ([self.df_simulation] if self.simulator_agent_config else [])
495 ):
496 convert_timescale_of_index(
497 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale
498 )
500 # Update current unit
501 self.current_timescale_of_data = to_timescale
503 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]:
504 """Get the intersection of the MPCs and the simulator variables.
506 Returns:
507 dictionary with the following structure: Key: variable alias (from baseline)
508 Value: {module id: variable name}
510 """
511 id_alias_name_dict = {}
513 def get_id_alias_name_dict_element(alias: str):
514 # id as key, {id: name} as value
515 id_alias_name_dict[alias] = {}
516 for config in [
517 self.simulator_module_config,
518 self.baseline_module_config,
519 self.pos_flex_module_config,
520 self.neg_flex_module_config,
521 ]:
522 for var in config.get_variables():
523 if alias in (var.alias, var.name):
524 id_alias_name_dict[alias][config.module_id] = var.name
526 # States, controls and power variable
527 for variables in [
528 self.baseline_module_config.states,
529 self.baseline_module_config.controls,
530 ]:
531 for variable in variables:
532 get_id_alias_name_dict_element(variable.alias)
533 get_id_alias_name_dict_element(
534 self.generator_config.baseline_config_generator_data.power_variable
535 )
537 return id_alias_name_dict
539 def create_instance_with_skipped_validation(
540 self,
541 model_class: Type[BaseModel],
542 config: Dict[str, Any],
543 skip_fields: Optional[list[str]] = None,
544 ) -> BaseModel:
545 """Create a Pydantic model instance while skipping validation for specified fields.
547 This function allows partial validation of a model's config dictionary by validating
548 all fields except those listed in `skip_fields`. Skipped fields are set on the instance
549 after construction without triggering their validators.
551 Args:
552 model_class: The Pydantic model class to instantiate.
553 config: The input configuration dictionary.
554 skip_fields: A list of field names to exclude from validation.
555 These fields will be manually set after instantiation.
557 Returns:
558 BaseModel: An instance of the model_class with validated and skipped fields assigned.
560 """
561 if skip_fields is None:
562 skip_fields = []
563 # Separate data into validated and skipped fields
564 validated_fields = {
565 field: value for field, value in config.items() if field not in skip_fields
566 }
567 skipped_fields = {
568 field: value for field, value in config.items() if field in skip_fields
569 }
570 # Create instance with validation for non-skipped fields
571 if validated_fields:
572 instance = model_class(
573 **validated_fields, _agent_id=self.simulator_agent_config.id
574 )
575 else:
576 instance = model_class.model_construct()
577 # Add skipped fields without validation
578 for field, value in skipped_fields.items():
579 # bypass pydantic immutability to directly set attribute value
580 object.__setattr__(instance, field, value)
581 # Store metadata about bypassed fields for deepcopy compatibility
582 object.__setattr__(instance, "_bypassed_fields", skip_fields)
583 object.__setattr__(instance, "_original_config", config)
584 return instance
586 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results":
587 """Custom deepcopy implementation that handles Pydantic models with bypassed validation."""
588 # Create a new instance of the same class
589 new_instance = self.__class__.__new__(self.__class__)
590 # Add to memo immediately to prevent circular reference issues
591 memo[id(self)] = new_instance
592 for key, value in self.__dict__.items():
593 if key in ["simulator_module_config"] and hasattr(
594 value, "_original_config"
595 ):
596 # Reconstruct the specific problematic object instead of deepcopying
597 new_value = self.create_instance_with_skipped_validation(
598 model_class=value.__class__,
599 config=copy.deepcopy(value._original_config, memo),
600 skip_fields=getattr(value, "_bypassed_fields", []),
601 )
602 setattr(new_instance, key, new_value)
603 else:
604 # Everything else should deepcopy normally
605 setattr(new_instance, key, copy.deepcopy(value, memo))
606 return new_instance