Coverage for agentlib_flexquant/data_structures/flex_results.py: 33%
172 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
1import copy
2import json
3import pandas as pd
4from typing import Union, Optional, Dict, Any, List, Type
5from pydantic import FilePath, BaseModel
6from pathlib import Path
7from agentlib.core.agent import AgentConfig
8from agentlib.utils import load_config
9from agentlib.modules.simulation.simulator import SimulatorConfig
10from agentlib_mpc.modules.mpc import BaseMPCConfig
11from agentlib_mpc.utils import TimeConversionTypes
12from agentlib_mpc.utils.analysis import load_sim, load_mpc, load_mpc_stats
13import agentlib_flexquant.utils.config_management as cmng
14from agentlib_flexquant.data_structures.flexquant import FlexQuantConfig, FlexibilityMarketConfig
15from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, NFMPCData, PFMPCData
16from agentlib_flexquant.utils.data_handling import convert_timescale_of_index
17from agentlib_flexquant.modules.flexibility_indicator import FlexibilityIndicatorModuleConfig
18from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
21def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame:
22 """Load the flexibility indicator results from the given file path.
24 Args:
25 file_path: the file path of the indicator results file
27 Returns:
28 DataFrame containing the indicator results
30 """
31 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
32 return df
35def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame:
36 """Load the market results from the given file path.
38 Args:
39 file_path: the file path of the market results file
41 Returns:
42 DataFrame containing the market results
44 """
45 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
46 return df
49class Results:
50 # Configs:
51 # Generator
52 generator_config: FlexQuantConfig
53 # Agents
54 simulator_agent_config: AgentConfig
55 baseline_agent_config: AgentConfig
56 pos_flex_agent_config: AgentConfig
57 neg_flex_agent_config: AgentConfig
58 indicator_agent_config: AgentConfig
59 market_agent_config: AgentConfig
60 # Modules
61 simulator_module_config: SimulatorConfig
62 baseline_module_config: BaseMPCConfig
63 pos_flex_module_config: BaseMPCConfig
64 neg_flex_module_config: BaseMPCConfig
65 indicator_module_config: FlexibilityIndicatorModuleConfig
66 market_module_config: FlexibilityMarketModuleConfig
68 # Dataframes
69 df_simulation: pd.DataFrame
70 df_baseline: pd.DataFrame
71 df_pos_flex: pd.DataFrame
72 df_neg_flex: pd.DataFrame
73 df_indicator: pd.DataFrame
74 df_market: pd.DataFrame
76 # Stats of the MPCs
77 df_baseline_stats: pd.DataFrame
78 df_pos_flex_stats: pd.DataFrame
79 df_neg_flex_stats: pd.DataFrame
81 # time conversion
82 current_timescale_of_data: TimeConversionTypes = "seconds"
84 def __init__(
85 self,
86 flex_config: Optional[Union[str, FilePath, dict]],
87 simulator_agent_config: Optional[Union[str, FilePath, dict]],
88 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
89 results: Optional[Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]] = None,
90 to_timescale: TimeConversionTypes = "seconds",
91 ):
92 # Already a Results instance — copy over its data
93 if isinstance(results, Results):
94 self.__dict__ = copy.deepcopy(results).__dict__
95 return
97 # Load flex config
98 self._load_flex_config(flex_config, generated_flex_files_base_path)
99 # Get filenames of configs to load agents and modules
100 self._get_config_filenames()
101 # Load configs for mpc, indicator, market
102 self._load_agent_module_configs()
103 # Load sim configs if present
104 if simulator_agent_config:
105 self._load_simulator_config(simulator_agent_config)
106 # Load results and get a dict for generating dataframes
107 results_dict, results_path = self._load_results(results)
108 # Get dataframes for mpc, sim, flex indicator results
109 self._load_results_dataframes(results_dict)
110 # Get dataframes for mpc stats
111 self._load_stats_dataframes(results_path)
112 # Convert the time in the dataframes to the desired timescale
113 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale)
115 def _load_flex_config(self, flex_config: Optional[Union[str, FilePath, dict]], custom_base_path: Optional[Union[str, FilePath]]):
116 """Load the flex config and optionally override the base directory path.
118 If a custom base path is provided, it overwrites the "flex_base_directory_path"
119 in the given config. This is useful when the generated flex files are saved
120 to a custom directory instead of the default (current working directory).
122 Args:
123 flex_config: The config for flexibility quantification.
124 custom_base_path: The custom directory for saving the generated flex files defined by user.
126 """
127 if custom_base_path is not None:
128 if isinstance(flex_config, (str, Path)):
129 with open(flex_config, "r") as f:
130 flex_config = json.load(f)
131 flex_config["flex_base_directory_path"] = str(custom_base_path)
133 self.generator_config = load_config.load_config(
134 config=flex_config, config_type=FlexQuantConfig)
136 def _get_config_filenames(self):
137 """Get filenames of configs to load agents and modules."""
138 self.config_filename_baseline = BaselineMPCData.model_validate(
139 self.generator_config.baseline_config_generator_data
140 ).name_of_created_file
141 self.config_filename_pos_flex = PFMPCData.model_validate(
142 self.generator_config.shadow_mpc_config_generator_data.pos_flex
143 ).name_of_created_file
144 self.config_filename_neg_flex = NFMPCData.model_validate(
145 self.generator_config.shadow_mpc_config_generator_data.neg_flex
146 ).name_of_created_file
147 self.config_filename_indicator = self.generator_config.indicator_config.name_of_created_file
149 if self.generator_config.market_config:
150 market_config_raw = self.generator_config.market_config
151 if isinstance(market_config_raw, (str, Path)):
152 market_config = FlexibilityMarketConfig.model_validate_json(
153 Path(market_config_raw).read_text()
154 )
155 else:
156 market_config = FlexibilityMarketConfig.model_validate(market_config_raw)
157 self.config_filename_market = market_config.name_of_created_file
159 def _load_agent_module_configs(self):
160 """Load agent and module configs."""
161 for file_path in Path(self.generator_config.flex_files_directory).rglob("*.json"):
162 if file_path.name in self.config_filename_baseline:
163 self.baseline_agent_config = load_config.load_config(
164 config=file_path, config_type=AgentConfig
165 )
166 self.baseline_module_config = cmng.get_module(
167 config=self.baseline_agent_config,
168 module_type=cmng.BASELINEMPC_CONFIG_TYPE,
169 )
171 elif file_path.name in self.config_filename_pos_flex:
172 self.pos_flex_agent_config = load_config.load_config(
173 config=file_path, config_type=AgentConfig
174 )
175 self.pos_flex_module_config = cmng.get_module(
176 config=self.pos_flex_agent_config,
177 module_type=cmng.SHADOWMPC_CONFIG_TYPE,
178 )
180 elif file_path.name in self.config_filename_neg_flex:
181 self.neg_flex_agent_config = load_config.load_config(
182 config=file_path, config_type=AgentConfig
183 )
184 self.neg_flex_module_config = cmng.get_module(
185 config=self.neg_flex_agent_config,
186 module_type=cmng.SHADOWMPC_CONFIG_TYPE,
187 )
189 elif file_path.name in self.config_filename_indicator:
190 self.indicator_agent_config = load_config.load_config(
191 config=file_path, config_type=AgentConfig
192 )
193 self.indicator_module_config = cmng.get_module(
194 config=self.indicator_agent_config,
195 module_type=cmng.INDICATOR_CONFIG_TYPE,
196 )
198 elif (
199 self.generator_config.market_config
200 and file_path.name in self.config_filename_market
201 ):
202 self.market_agent_config = load_config.load_config(
203 config=file_path, config_type=AgentConfig
204 )
205 self.market_module_config = cmng.get_module(
206 config=self.market_agent_config,
207 module_type=cmng.MARKET_CONFIG_TYPE
208 )
210 def _load_simulator_config(self, simulator_agent_config):
211 """Load simulator agent and module config separately.
213 Separate loading is required to skip pydantic validation for specific field(s).
215 """
216 # check config type: with results path adaptation -> dict; without -> str/Path
217 if isinstance(simulator_agent_config, (str, Path)):
218 with open(simulator_agent_config, "r") as f:
219 sim_config = json.load(f)
220 elif isinstance(simulator_agent_config, dict):
221 sim_config = simulator_agent_config
222 sim_module_config = next(
223 (module for module in sim_config["modules"] if module["type"] == "simulator"),
224 None
225 )
226 # instantiate and validate sim agent config
227 self.simulator_agent_config = AgentConfig.model_validate(sim_config)
228 # instantiate sim module config by skipping validation for result_filename
229 # to prevent file deletion
230 self.simulator_module_config = self.create_instance_with_skipped_validation(
231 model_class=SimulatorConfig,
232 config=sim_module_config,
233 skip_fields=["result_filename"]
234 )
236 def _load_results(
237 self, results: Union[str, Path, dict]
238 ) -> dict[str, dict[str, pd.DataFrame]]:
239 """Load dict with results for mpc, indicator, market and sim from specified results path."""
240 # load results
241 if results is None:
242 res_path = self.generator_config.results_directory
243 elif isinstance(results, (str, Path)):
244 res_path = results
245 elif isinstance(results, dict):
246 res_path = self.generator_config.results_directory
247 else:
248 raise ValueError("results must be a path or dict")
250 res = {
251 self.baseline_agent_config.id: {
252 self.baseline_module_config.module_id: load_mpc(
253 Path(
254 res_path,
255 Path(
256 self.baseline_module_config.optimization_backend[
257 "results_file"
258 ]
259 ).name,
260 )
261 )
262 },
263 self.pos_flex_agent_config.id: {
264 self.pos_flex_module_config.module_id: load_mpc(
265 Path(
266 res_path,
267 Path(
268 self.pos_flex_module_config.optimization_backend[
269 "results_file"
270 ]
271 ).name,
272 )
273 )
274 },
275 self.neg_flex_agent_config.id: {
276 self.neg_flex_module_config.module_id: load_mpc(
277 Path(
278 res_path,
279 Path(
280 self.neg_flex_module_config.optimization_backend[
281 "results_file"
282 ]
283 ).name,
284 )
285 )
286 },
287 self.indicator_agent_config.id: {
288 self.indicator_module_config.module_id: load_indicator(
289 Path(
290 res_path,
291 Path(self.indicator_module_config.results_file).name,
292 )
293 )
294 }
295 }
296 if self.simulator_agent_config:
297 res[self.simulator_agent_config.id] = {
298 self.simulator_module_config.module_id: load_sim(
299 Path(self.simulator_module_config.result_filename)
300 )
301 }
302 if self.generator_config.market_config:
303 res[self.market_agent_config.id] = {
304 self.market_module_config.module_id: load_market(
305 Path(
306 res_path,
307 Path(self.market_module_config.results_file).name,
308 )
309 )
310 }
311 return res, res_path
313 def _load_results_dataframes(self, results_dict: dict):
314 """Load results dataframes for mpc, indicator, market and sim."""
315 if self.simulator_agent_config:
316 self.df_simulation = results_dict[self.simulator_agent_config.id][
317 self.simulator_module_config.module_id
318 ]
319 self.df_baseline = results_dict[self.baseline_agent_config.id][
320 self.baseline_module_config.module_id
321 ]
322 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][
323 self.pos_flex_module_config.module_id
324 ]
325 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][
326 self.neg_flex_module_config.module_id
327 ]
328 self.df_indicator = results_dict[self.indicator_agent_config.id][
329 self.indicator_module_config.module_id
330 ]
331 if self.generator_config.market_config:
332 self.df_market = results_dict[self.market_agent_config.id][
333 self.market_module_config.module_id
334 ]
335 else:
336 self.df_market = None
338 def _load_stats_dataframes(self, results_path):
339 """Load dataframes for mpc stats."""
340 self.df_baseline_stats = load_mpc_stats(
341 Path(
342 results_path,
343 Path(
344 self.baseline_module_config.optimization_backend["results_file"]
345 ).name,
346 )
347 )
348 self.df_pos_flex_stats = load_mpc_stats(
349 Path(
350 results_path,
351 Path(
352 self.pos_flex_module_config.optimization_backend["results_file"]
353 ).name,
354 )
355 )
356 self.df_neg_flex_stats = load_mpc_stats(
357 Path(
358 results_path,
359 Path(
360 self.neg_flex_module_config.optimization_backend["results_file"]
361 ).name,
362 )
363 )
365 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes):
366 """Convert the time in the dataframes to the desired timescale
368 Args:
369 to_timescale: The timescale to convert the data to
371 """
372 for df in ([
373 self.df_baseline,
374 self.df_baseline_stats,
375 self.df_pos_flex,
376 self.df_pos_flex_stats,
377 self.df_neg_flex,
378 self.df_neg_flex_stats,
379 self.df_indicator,
380 ] + ([self.df_market] if self.generator_config.market_config else []) +
381 ([self.df_simulation] if self.simulator_agent_config else [])):
382 convert_timescale_of_index(
383 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale
384 )
386 # Update current unit
387 self.current_timescale_of_data = to_timescale
389 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]:
390 """Get the intersection of the MPCs and the simulator variables.
392 Returns:
393 dictionary with the following structure: Key: variable alias (from baseline)
394 Value: {module id: variable name}
396 """
397 id_alias_name_dict = {}
399 def get_id_alias_name_dict_element(alias: str):
400 # id as key, {id: name} as value
401 id_alias_name_dict[alias] = {}
402 for config in [
403 self.simulator_module_config,
404 self.baseline_module_config,
405 self.pos_flex_module_config,
406 self.neg_flex_module_config,
407 ]:
408 for var in config.get_variables():
409 if var.alias == alias or var.name == alias:
410 id_alias_name_dict[alias][config.module_id] = var.name
412 # States, controls and power variable
413 for variables in [
414 self.baseline_module_config.states,
415 self.baseline_module_config.controls,
416 ]:
417 for variable in variables:
418 get_id_alias_name_dict_element(variable.alias)
419 get_id_alias_name_dict_element(
420 self.generator_config.baseline_config_generator_data.power_variable
421 )
423 return id_alias_name_dict
425 def create_instance_with_skipped_validation(
426 self,
427 model_class: Type[BaseModel],
428 config: Dict[str, Any],
429 skip_fields: Optional[List[str]] = None
430 ) -> BaseModel:
431 """Create a Pydantic model instance while skipping validation for specified fields.
433 This function allows partial validation of a model's config dictionary by validating
434 all fields except those listed in `skip_fields`. Skipped fields are set on the instance
435 after construction without triggering their validators.
437 Args:
438 model_class (Type[BaseModel]): The Pydantic model class to instantiate.
439 config (Dict[str, Any]): The input configuration dictionary.
440 skip_fields (Optional[List[str]]): A list of field names to exclude from validation.
441 These fields will be manually set after instantiation.
443 Returns:
444 BaseModel: An instance of the model_class with validated and skipped fields assigned.
446 """
447 if skip_fields is None:
448 skip_fields = []
449 # Separate data into validated and skipped fields
450 validated_fields = {field: value for field, value in config.items() if field not in skip_fields}
451 skipped_fields = {field: value for field, value in config.items() if field in skip_fields}
452 # Create instance with validation for non-skipped fields
453 if validated_fields:
454 instance = model_class(
455 **validated_fields,
456 _agent_id=self.simulator_agent_config.id
457 )
458 else:
459 instance = model_class.model_construct()
460 # Add skipped fields without validation
461 for field, value in skipped_fields.items():
462 # bypass pydantic immutability to directly set attribute value
463 object.__setattr__(instance, field, value)
464 # Store metadata about bypassed fields for deepcopy compatibility
465 object.__setattr__(instance, '_bypassed_fields', skip_fields)
466 object.__setattr__(instance, '_original_config', config)
467 return instance
469 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results":
470 """Custom deepcopy implementation that handles Pydantic models with bypassed validation."""
471 # Create a new instance of the same class
472 new_instance = self.__class__.__new__(self.__class__)
473 # Add to memo immediately to prevent circular reference issues
474 memo[id(self)] = new_instance
475 for key, value in self.__dict__.items():
476 if key in ['simulator_module_config'] and hasattr(value, '_original_config'):
477 # Reconstruct the specific problematic object instead of deepcopying
478 new_value = self.create_instance_with_skipped_validation(
479 model_class=value.__class__,
480 config=copy.deepcopy(value._original_config, memo),
481 skip_fields=getattr(value, '_bypassed_fields', [])
482 )
483 setattr(new_instance, key, new_value)
484 else:
485 # Everything else should deepcopy normally
486 setattr(new_instance, key, copy.deepcopy(value, memo))
487 return new_instance