Coverage for agentlib_flexquant/data_structures/flex_results.py: 33%
162 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
1import copy
2from typing import Union, Optional, Dict, Any, List, Type
4import agentlib
5from pydantic import FilePath, BaseModel
6from pathlib import Path
7import json
8import os
9import pandas as pd
11from agentlib.core.agent import AgentConfig
12from agentlib.core.module import BaseModuleConfig
13from agentlib.utils import load_config
14from agentlib_mpc.modules.mpc import BaseMPCConfig
15from agentlib.modules.simulation.simulator import SimulatorConfig
16from agentlib_flexquant.data_structures.flexquant import (
17 FlexQuantConfig,
18 FlexibilityIndicatorConfig,
19 FlexibilityMarketConfig,
20)
21from agentlib_flexquant.data_structures.mpcs import (
22 BaselineMPCData,
23 NFMPCData,
24 PFMPCData,
25)
26from agentlib_flexquant.utils.data_handling import convert_timescale_of_index
27from agentlib_mpc.utils import TimeConversionTypes
28from agentlib_mpc.utils.analysis import load_sim, load_mpc, load_mpc_stats
30from agentlib_flexquant.modules.flexibility_indicator import (
31 FlexibilityIndicatorModuleConfig,
32)
33from agentlib_flexquant.modules.flexibility_market import (
34 FlexibilityMarketModuleConfig,
35)
36import agentlib_flexquant.utils.config_management as cmng
39def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame:
40 """
41 Load the flexibility indicator results from the given file path
42 """
43 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
44 return df
47def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame:
48 """
49 Load the market results from the given file path
50 """
51 df = pd.read_csv(file_path, header=0, index_col=[0, 1])
52 return df
55class Results:
56 # Configs:
57 # Generator
58 generator_config: FlexQuantConfig
59 # Agents
60 simulator_agent_config: AgentConfig
61 baseline_agent_config: AgentConfig
62 pos_flex_agent_config: AgentConfig
63 neg_flex_agent_config: AgentConfig
64 indicator_agent_config: AgentConfig
65 market_agent_config: AgentConfig
66 # Modules
67 simulator_module_config: SimulatorConfig
68 baseline_module_config: BaseMPCConfig
69 pos_flex_module_config: BaseMPCConfig
70 neg_flex_module_config: BaseMPCConfig
71 indicator_module_config: FlexibilityIndicatorModuleConfig
72 market_module_config: FlexibilityMarketModuleConfig
74 # Dataframes
75 df_simulation: pd.DataFrame
76 df_baseline: pd.DataFrame
77 df_pos_flex: pd.DataFrame
78 df_neg_flex: pd.DataFrame
79 df_indicator: pd.DataFrame
80 df_market: pd.DataFrame
82 # Stats of the MPCs
83 df_baseline_stats: pd.DataFrame
84 df_pos_flex_stats: pd.DataFrame
85 df_neg_flex_stats: pd.DataFrame
87 # time conversion
88 current_timescale_of_data: TimeConversionTypes = "seconds"
90 def __init__(
91 self,
92 flex_config: Optional[Union[str, FilePath, dict]],
93 simulator_agent_config: Optional[Union[str, FilePath, dict]],
94 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
95 results: Optional[Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]] = None,
96 to_timescale: TimeConversionTypes = "seconds",
97 ):
98 if isinstance(results, Results):
99 # Already a Results instance — copy over its data
100 self.__dict__ = copy.deepcopy(results).__dict__
101 return
102 # if generated flex files are saved at a custom base directory and path is provided,
103 # update and overwrite the path "flex_base_directory_path" in flex_config
104 # By default: current working directory is used as base
105 if generated_flex_files_base_path is not None:
106 if isinstance(flex_config, (str, Path)):
107 with open(flex_config, "r") as f:
108 flex_config = json.load(f)
109 flex_config["flex_base_directory_path"] = str(generated_flex_files_base_path)
110 # load configs of agents and modules
111 # Generator config
112 self.generator_config = load_config.load_config(
113 config=flex_config, config_type=FlexQuantConfig
114 )
116 # get names of the config files
117 config_filename_baseline = BaselineMPCData.model_validate(
118 self.generator_config.baseline_config_generator_data
119 ).name_of_created_file
120 config_filename_pos_flex = PFMPCData.model_validate(
121 self.generator_config.shadow_mpc_config_generator_data.pos_flex
122 ).name_of_created_file
123 config_filename_neg_flex = NFMPCData.model_validate(
124 self.generator_config.shadow_mpc_config_generator_data.neg_flex
125 ).name_of_created_file
126 config_filename_indicator = self.generator_config.indicator_config.name_of_created_file
127 if self.generator_config.market_config:
128 if self.generator_config.market_config is str or Path:
129 config_filename_market = FlexibilityMarketConfig.parse_file(
130 self.generator_config.market_config
131 ).name_of_created_file
132 else:
133 config_filename_market = FlexibilityMarketConfig.model_validate(
134 self.generator_config.market_config
135 ).name_of_created_file
137 # load the agent and module configs
138 if simulator_agent_config:
139 # check config type: with results path adaptation -> dict; without -> str/Path
140 if isinstance(simulator_agent_config, (str, Path)):
141 with open(simulator_agent_config, "r") as f:
142 sim_config = json.load(f)
143 elif isinstance(simulator_agent_config, dict):
144 sim_config = simulator_agent_config
145 sim_module_config = next(
146 (module for module in sim_config["modules"] if module["type"] == "simulator"),
147 None
148 )
149 # instantiate and validate sim agent config
150 self.simulator_agent_config = AgentConfig.model_validate(sim_config)
151 # instantiate sim module config by skipping validation for result_filename
152 # to prevent file deletion
153 self.simulator_module_config = self.create_instance_with_skipped_validation(
154 model_class=SimulatorConfig,
155 config=sim_module_config,
156 skip_fields=["result_filename"]
157 )
159 for file_path in Path(self.generator_config.flex_files_directory).rglob("*.json"):
160 if file_path.name in config_filename_baseline:
161 self.baseline_agent_config = load_config.load_config(
162 config=file_path, config_type=AgentConfig
163 )
164 self.baseline_module_config = cmng.get_module(
165 config=self.baseline_agent_config,
166 module_type=cmng.BASELINEMPC_CONFIG_TYPE,
167 )
169 elif file_path.name in config_filename_pos_flex:
170 self.pos_flex_agent_config = load_config.load_config(
171 config=file_path, config_type=AgentConfig
172 )
173 self.pos_flex_module_config = cmng.get_module(
174 config=self.pos_flex_agent_config,
175 module_type=cmng.SHADOWMPC_CONFIG_TYPE,
176 )
178 elif file_path.name in config_filename_neg_flex:
179 self.neg_flex_agent_config = load_config.load_config(
180 config=file_path, config_type=AgentConfig
181 )
182 self.neg_flex_module_config = cmng.get_module(
183 config=self.neg_flex_agent_config,
184 module_type=cmng.SHADOWMPC_CONFIG_TYPE,
185 )
187 elif file_path.name in config_filename_indicator:
188 self.indicator_agent_config = load_config.load_config(
189 config=file_path, config_type=AgentConfig
190 )
191 self.indicator_module_config = cmng.get_module(
192 config=self.indicator_agent_config,
193 module_type=cmng.INDICATOR_CONFIG_TYPE,
194 )
196 elif (
197 self.generator_config.market_config
198 and file_path.name in config_filename_market
199 ):
200 self.market_agent_config = load_config.load_config(
201 config=file_path, config_type=AgentConfig
202 )
203 self.market_module_config = cmng.get_module(
204 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
205 )
207 # load results
208 if results is None:
209 results_path = self.generator_config.results_directory
210 results = self._load_results(res_path=results_path)
211 if isinstance(results, (str, Path)):
212 results_path = results
213 results = self._load_results(res_path=results_path)
214 elif isinstance(results, dict):
215 results_path = self.generator_config.results_directory
216 else:
217 raise ValueError("results must be a path or dict")
219 # Get result dataframes
220 if simulator_agent_config:
221 self.df_simulation = results[self.simulator_agent_config.id][
222 self.simulator_module_config.module_id
223 ]
224 self.df_baseline = results[self.baseline_agent_config.id][
225 self.baseline_module_config.module_id
226 ]
227 self.df_pos_flex = results[self.pos_flex_agent_config.id][
228 self.pos_flex_module_config.module_id
229 ]
230 self.df_neg_flex = results[self.neg_flex_agent_config.id][
231 self.neg_flex_module_config.module_id
232 ]
233 self.df_indicator = results[self.indicator_agent_config.id][
234 self.indicator_module_config.module_id
235 ]
236 if self.generator_config.market_config:
237 self.df_market = results[self.market_agent_config.id][
238 self.market_module_config.module_id
239 ]
240 else:
241 self.df_market = None
243 # Load the statistics
244 self.df_baseline_stats = load_mpc_stats(
245 Path(
246 results_path,
247 Path(
248 self.baseline_module_config.optimization_backend["results_file"]
249 ).name,
250 )
251 )
252 self.df_pos_flex_stats = load_mpc_stats(
253 Path(
254 results_path,
255 Path(
256 self.pos_flex_module_config.optimization_backend["results_file"]
257 ).name,
258 )
259 )
260 self.df_neg_flex_stats = load_mpc_stats(
261 Path(
262 results_path,
263 Path(
264 self.neg_flex_module_config.optimization_backend["results_file"]
265 ).name,
266 )
267 )
269 # Convert the time in the dataframes to the desired timescale
270 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale)
272 def _load_results(
273 self, res_path: Union[str, Path]
274 ) -> dict[str, dict[str, pd.DataFrame]]:
275 res = {
276 self.baseline_agent_config.id: {
277 self.baseline_module_config.module_id: load_mpc(
278 Path(
279 res_path,
280 Path(
281 self.baseline_module_config.optimization_backend[
282 "results_file"
283 ]
284 ).name,
285 )
286 )
287 },
288 self.pos_flex_agent_config.id: {
289 self.pos_flex_module_config.module_id: load_mpc(
290 Path(
291 res_path,
292 Path(
293 self.pos_flex_module_config.optimization_backend[
294 "results_file"
295 ]
296 ).name,
297 )
298 )
299 },
300 self.neg_flex_agent_config.id: {
301 self.neg_flex_module_config.module_id: load_mpc(
302 Path(
303 res_path,
304 Path(
305 self.neg_flex_module_config.optimization_backend[
306 "results_file"
307 ]
308 ).name,
309 )
310 )
311 },
312 self.indicator_agent_config.id: {
313 self.indicator_module_config.module_id: load_indicator(
314 Path(
315 res_path,
316 Path(self.indicator_module_config.results_file).name,
317 )
318 )
319 }
320 }
321 if self.simulator_agent_config:
322 res[self.simulator_agent_config.id] = {
323 self.simulator_module_config.module_id: load_sim(
324 Path(
325 res_path,
326 Path(self.simulator_module_config.result_filename).name,
327 )
328 )
329 }
330 if self.generator_config.market_config:
331 res[self.market_agent_config.id] = {
332 self.market_module_config.module_id: load_market(
333 Path(
334 res_path,
335 Path(self.market_module_config.results_file).name,
336 )
337 )
338 }
339 return res
341 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes):
342 """Convert the time in the dataframes to the desired timescale
344 Keyword arguments:
345 timescale -- The timescale to convert the data to
346 """
347 # Convert the time in the dataframes
348 for df in ([
349 self.df_baseline,
350 self.df_baseline_stats,
351 self.df_pos_flex,
352 self.df_pos_flex_stats,
353 self.df_neg_flex,
354 self.df_neg_flex_stats,
355 self.df_indicator,
356 ] + ([self.df_market] if self.generator_config.market_config else []) +
357 ([self.df_simulation] if self.simulator_agent_config else [])):
358 convert_timescale_of_index(
359 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale
360 )
362 # Update current unit
363 self.current_timescale_of_data = to_timescale
365 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]:
366 """
367 Get the intersection of the MPCs and the simulator variables.
368 returns a dictionary with the following structure:
369 Key: variable alias (from baseline)
370 Value: {module id: variable name}
371 """
372 id_alias_name_dict = {}
374 def get_id_alias_name_dict_element(alias: str):
375 # id as key, {id: name} as value
376 id_alias_name_dict[alias] = {}
377 for config in [
378 self.simulator_module_config,
379 self.baseline_module_config,
380 self.pos_flex_module_config,
381 self.neg_flex_module_config,
382 ]:
383 for var in config.get_variables():
384 if var.alias == alias or var.name == alias:
385 id_alias_name_dict[alias][config.module_id] = var.name
387 # States, controls and power variable
388 for variables in [
389 self.baseline_module_config.states,
390 self.baseline_module_config.controls,
391 ]:
392 for variable in variables:
393 get_id_alias_name_dict_element(variable.alias)
394 get_id_alias_name_dict_element(
395 self.generator_config.baseline_config_generator_data.power_variable
396 )
398 return id_alias_name_dict
400 def create_instance_with_skipped_validation(
401 self,
402 model_class: Type[BaseModel],
403 config: Dict[str, Any],
404 skip_fields: Optional[List[str]] = None
405 ) -> BaseModel:
406 """
407 Create a Pydantic model instance while skipping validation for specified fields.
409 This function allows partial validation of a model's config dictionary by validating
410 all fields except those listed in `skip_fields`. Skipped fields are set on the instance
411 after construction without triggering their validators.
413 Args:
414 model_class (Type[BaseModel]): The Pydantic model class to instantiate.
415 config (Dict[str, Any]): The input configuration dictionary.
416 skip_fields (Optional[List[str]]): A list of field names to exclude from validation.
417 These fields will be manually set after instantiation.
419 Returns:
420 BaseModel: An instance of the model_class with validated and skipped fields assigned.
421 """
422 if skip_fields is None:
423 skip_fields = []
424 # Separate data into validated and skipped fields
425 validated_fields = {field: value for field, value in config.items() if field not in skip_fields}
426 skipped_fields = {field: value for field, value in config.items() if field in skip_fields}
427 # Create instance with validation for non-skipped fields
428 if validated_fields:
429 instance = model_class(
430 **validated_fields,
431 _agent_id=self.simulator_agent_config.id
432 )
433 else:
434 instance = model_class.model_construct()
435 # Add skipped fields without validation
436 for field, value in skipped_fields.items():
437 # bypass pydantic immutability to directly set attribute value
438 object.__setattr__(instance, field, value)
439 # Store metadata about bypassed fields for deepcopy compatibility
440 object.__setattr__(instance, '_bypassed_fields', skip_fields)
441 object.__setattr__(instance, '_original_config', config)
442 return instance
444 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results":
445 """
446 Custom deepcopy implementation that handles Pydantic models with bypassed validation.
447 """
448 # Create a new instance of the same class
449 new_instance = self.__class__.__new__(self.__class__)
450 # Add to memo immediately to prevent circular reference issues
451 memo[id(self)] = new_instance
452 for key, value in self.__dict__.items():
453 if key in ['simulator_module_config'] and hasattr(value, '_original_config'):
454 # Reconstruct the specific problematic object instead of deepcopying
455 new_value = self.create_instance_with_skipped_validation(
456 model_class=value.__class__,
457 config=copy.deepcopy(value._original_config, memo),
458 skip_fields=getattr(value, '_bypassed_fields', [])
459 )
460 setattr(new_instance, key, new_value)
461 else:
462 # Everything else should deepcopy normally
463 setattr(new_instance, key, copy.deepcopy(value, memo))
464 return new_instance