Coverage for agentlib_flexquant/data_structures/flex_results.py: 31%

190 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1""" 

2Module for generating and managing results dataframes for flexibility analysis. 

3Results include baseline, positive and negative flexibility data, 

4the indicator, market and simulator results/data. 

5""" 

6import copy 

7import json 

8from pathlib import Path 

9from typing import Any, Dict, Optional, Type, Union 

10 

11import pandas as pd 

12from pydantic import BaseModel, FilePath 

13from agentlib.core.agent import AgentConfig 

14from agentlib.modules.simulation.simulator import SimulatorConfig 

15from agentlib.utils import load_config 

16from agentlib_mpc.modules.mpc import BaseMPCConfig 

17from agentlib_mpc.utils import TimeConversionTypes 

18from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats, load_sim 

19 

20import agentlib_flexquant.utils.config_management as cmng 

21from agentlib_flexquant.data_structures.flexquant import ( 

22 FlexQuantConfig, 

23 FlexibilityMarketConfig, 

24) 

25from agentlib_flexquant.data_structures.mpcs import ( 

26 BaselineMPCData, 

27 NFMPCData, 

28 PFMPCData, 

29) 

30from agentlib_flexquant.modules.flexibility_indicator import ( 

31 FlexibilityIndicatorModuleConfig, 

32) 

33from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

34from agentlib_flexquant.utils.data_handling import convert_timescale_of_index 

35 

36 

37def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame: 

38 """Load the flexibility indicator results from the given file path. 

39 

40 Args: 

41 file_path: the file path of the indicator results file 

42 

43 Returns: 

44 DataFrame containing the indicator results 

45 

46 """ 

47 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

48 return df 

49 

50 

51def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame: 

52 """Load the market results from the given file path. 

53 

54 Args: 

55 file_path: the file path of the market results file 

56 

57 Returns: 

58 DataFrame containing the market results 

59 

60 """ 

61 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

62 return df 

63 

64 

65class Results: 

66 """ 

67 Loads the results for the baseline, positive and negative flexibility, 

68 the indicator, market and simulator results/data. Additionally the MPC stats are loaded. 

69 

70 Results can be loaded either from a user-specified custom base path or from the 

71 (default) base path specified in the flex config. 

72 

73 Loaded results are stored in pandas DataFrames which can be used for further processing, 

74 e.g. plotting and analysis. 

75 """ 

76 

77 # Configs: 

78 # Generator 

79 generator_config: FlexQuantConfig 

80 # Agents 

81 simulator_agent_config: AgentConfig 

82 baseline_agent_config: AgentConfig 

83 pos_flex_agent_config: AgentConfig 

84 neg_flex_agent_config: AgentConfig 

85 indicator_agent_config: AgentConfig 

86 market_agent_config: AgentConfig 

87 # Modules 

88 simulator_module_config: SimulatorConfig 

89 baseline_module_config: BaseMPCConfig 

90 pos_flex_module_config: BaseMPCConfig 

91 neg_flex_module_config: BaseMPCConfig 

92 indicator_module_config: FlexibilityIndicatorModuleConfig 

93 market_module_config: FlexibilityMarketModuleConfig 

94 

95 # Dataframes 

96 df_simulation: pd.DataFrame 

97 df_baseline: pd.DataFrame 

98 df_pos_flex: pd.DataFrame 

99 df_neg_flex: pd.DataFrame 

100 df_indicator: pd.DataFrame 

101 df_market: pd.DataFrame 

102 

103 # Stats of the MPCs 

104 df_baseline_stats: pd.DataFrame 

105 df_pos_flex_stats: pd.DataFrame 

106 df_neg_flex_stats: pd.DataFrame 

107 

108 # time conversion 

109 current_timescale_of_data: TimeConversionTypes = "seconds" 

110 

111 def __init__( 

112 self, 

113 flex_config: Optional[Union[str, FilePath, dict]], 

114 simulator_agent_config: Optional[Union[str, FilePath, dict]], 

115 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None, 

116 results: Optional[ 

117 Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"] 

118 ] = None, 

119 to_timescale: TimeConversionTypes = "seconds", 

120 ): 

121 # Already a Results instance — copy over its data 

122 if isinstance(results, Results): 

123 self.__dict__ = copy.deepcopy(results).__dict__ 

124 return 

125 

126 # Load flex config 

127 self._load_flex_config(flex_config, generated_flex_files_base_path) 

128 # Get filenames of configs to load agents and modules 

129 self._get_config_filenames() 

130 # Load configs for mpc, indicator, market 

131 self._load_agent_module_configs() 

132 # Load sim configs if present 

133 if simulator_agent_config: 

134 self._load_simulator_config(simulator_agent_config) 

135 # Load results and get a dict for generating dataframes 

136 results_dict, results_path = self._load_results(results) 

137 # Get dataframes for mpc, sim, flex indicator results 

138 self._load_results_dataframes(results_dict) 

139 # Get dataframes for mpc stats 

140 self._load_stats_dataframes(results_path) 

141 # Convert the time in the dataframes to the desired timescale 

142 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale) 

143 

144 def _load_flex_config( 

145 self, 

146 flex_config: Optional[Union[str, FilePath, dict]], 

147 custom_base_path: Optional[Union[str, FilePath]], 

148 ): 

149 """Load the flex config and optionally override the base directory path. 

150 

151 If a custom base path is provided, it overwrites the "flex_base_directory_path" 

152 in the given config. This is useful when the generated flex files are saved 

153 to a custom directory instead of the default (current working directory). 

154 

155 Args: 

156 flex_config: The config for flexibility quantification. 

157 custom_base_path: The custom directory for saving the generated flex files 

158 defined by user. 

159 

160 """ 

161 if custom_base_path is not None: 

162 if isinstance(flex_config, (str, Path)): 

163 with open(flex_config, "r", encoding="utf-8") as f: 

164 flex_config = json.load(f) 

165 flex_config["flex_base_directory_path"] = str(custom_base_path) 

166 

167 self.generator_config = load_config.load_config( 

168 config=flex_config, config_type=FlexQuantConfig 

169 ) 

170 

171 def _get_config_filenames(self): 

172 """Get filenames of configs to load agents and modules.""" 

173 self.config_filename_baseline = BaselineMPCData.model_validate( 

174 self.generator_config.baseline_config_generator_data 

175 ).name_of_created_file 

176 self.config_filename_pos_flex = PFMPCData.model_validate( 

177 self.generator_config.shadow_mpc_config_generator_data.pos_flex 

178 ).name_of_created_file 

179 self.config_filename_neg_flex = NFMPCData.model_validate( 

180 self.generator_config.shadow_mpc_config_generator_data.neg_flex 

181 ).name_of_created_file 

182 self.config_filename_indicator = ( 

183 self.generator_config.indicator_config.name_of_created_file 

184 ) 

185 

186 if self.generator_config.market_config: 

187 market_config_raw = self.generator_config.market_config 

188 if isinstance(market_config_raw, (str, Path)): 

189 market_config = FlexibilityMarketConfig.model_validate_json( 

190 Path(market_config_raw).read_text(encoding="utf-8") 

191 ) 

192 else: 

193 market_config = FlexibilityMarketConfig.model_validate( 

194 market_config_raw 

195 ) 

196 self.config_filename_market = market_config.name_of_created_file 

197 

198 def _load_agent_module_configs(self): 

199 """Load agent and module configs.""" 

200 for file_path in Path(self.generator_config.flex_files_directory).rglob( 

201 "*.json" 

202 ): 

203 if file_path.name in self.config_filename_baseline: 

204 self.baseline_agent_config = load_config.load_config( 

205 config=file_path, config_type=AgentConfig 

206 ) 

207 self.baseline_module_config = cmng.get_module( 

208 config=self.baseline_agent_config, 

209 module_type=self._get_flexquant_mpc_module_type(self.baseline_agent_config), 

210 ) 

211 

212 elif file_path.name in self.config_filename_pos_flex: 

213 self.pos_flex_agent_config = load_config.load_config( 

214 config=file_path, config_type=AgentConfig 

215 ) 

216 self.pos_flex_module_config = cmng.get_module( 

217 config=self.pos_flex_agent_config, 

218 module_type=self._get_flexquant_mpc_module_type(self.pos_flex_agent_config), 

219 ) 

220 

221 elif file_path.name in self.config_filename_neg_flex: 

222 self.neg_flex_agent_config = load_config.load_config( 

223 config=file_path, config_type=AgentConfig 

224 ) 

225 self.neg_flex_module_config = cmng.get_module( 

226 config=self.neg_flex_agent_config, 

227 module_type=self._get_flexquant_mpc_module_type(self.neg_flex_agent_config), 

228 ) 

229 

230 elif file_path.name in self.config_filename_indicator: 

231 self.indicator_agent_config = load_config.load_config( 

232 config=file_path, config_type=AgentConfig 

233 ) 

234 self.indicator_module_config = cmng.get_module( 

235 config=self.indicator_agent_config, 

236 module_type=cmng.INDICATOR_CONFIG_TYPE, 

237 ) 

238 

239 elif ( 

240 self.generator_config.market_config 

241 and file_path.name in self.config_filename_market 

242 ): 

243 self.market_agent_config = load_config.load_config( 

244 config=file_path, config_type=AgentConfig 

245 ) 

246 self.market_module_config = cmng.get_module( 

247 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

248 ) 

249 

250 def _load_simulator_config(self, simulator_agent_config): 

251 """Load simulator agent and module config separately. 

252 

253 Separate loading is required to skip pydantic validation for specific field(s). 

254 

255 """ 

256 # check config type: with results path adaptation -> dict; without -> str/Path 

257 if isinstance(simulator_agent_config, (str, Path)): 

258 with open(simulator_agent_config, "r", encoding="utf-8") as f: 

259 sim_config = json.load(f) 

260 elif isinstance(simulator_agent_config, dict): 

261 sim_config = simulator_agent_config 

262 sim_module_config = next( 

263 ( 

264 module 

265 for module in sim_config["modules"] 

266 if module["type"] == "simulator" 

267 ), 

268 None, 

269 ) 

270 # instantiate and validate sim agent config 

271 self.simulator_agent_config = AgentConfig.model_validate(sim_config) 

272 # instantiate sim module config by skipping validation for result_filename 

273 # to prevent file deletion 

274 self.simulator_module_config = self.create_instance_with_skipped_validation( 

275 model_class=SimulatorConfig, 

276 config=sim_module_config, 

277 skip_fields=["result_filename"], 

278 ) 

279 

280 def _get_flexquant_mpc_module_type(self, agent_config: AgentConfig) -> str: 

281 """Get the mpc module type from agent_config. 

282 

283 The module type is defined in agentlib_flexquant. 

284 

285 Args: 

286 agent_config: the AgentConfig containing the mpc module 

287 

288 Returns: 

289 The type of the mpc module 

290 

291 """ 

292 for module in agent_config.modules: 

293 if module['type'] in [cmng.BASELINEMPC_CONFIG_TYPE, cmng.BASELINEMINLPMPC_CONFIG_TYPE, 

294 cmng.SHADOWMPC_CONFIG_TYPE, cmng.SHADOWMINLPMPC_CONFIG_TYPE]: 

295 return module['type'] 

296 

297 raise ModuleNotFoundError(f'There is no matching mpc module type in Agentlib_FlexQuant for ' 

298 f'modules in agent {agent_config.id}.') 

299 

300 def _resolve_sim_results_path( 

301 self, sim_result_filename: str, results_path: Union[str, Path] 

302 ) -> Path: 

303 """ 

304 Resolve simulator results path with fallback strategy. 

305 

306 Tries multiple strategies to locate the simulator results file: 

307 1. Use absolute path if file exists there 

308 2. Use relative path as-is from current directory 

309 3. Use filename only and look in results directory 

310 (handles both relative paths and just filenames) 

311 

312 Args: 

313 sim_result_filename: The result filename from simulator config 

314 results_path: The results directory path 

315 

316 Returns: 

317 Path object pointing to the simulator results file 

318 

319 Raises: 

320 FileNotFoundError: If file cannot be found using any strategy 

321 """ 

322 sim_results_path = Path(sim_result_filename) 

323 results_path = Path(results_path) 

324 

325 # Strategy 1: If it's an absolute path and exists, use it 

326 if sim_results_path.is_absolute() and sim_results_path.exists(): 

327 return sim_results_path 

328 

329 # Strategy 2: If it's a relative path, try it as-is from current directory 

330 if not sim_results_path.is_absolute() and sim_results_path.exists(): 

331 return sim_results_path 

332 

333 # Strategy 3: Try in results directory (handles both relative paths and just filenames) 

334 # (fallback for helper function usage) 

335 results_dir_path = results_path / sim_results_path.name 

336 if results_dir_path.exists(): 

337 return results_dir_path 

338 

339 # If none of the strategies worked, raise an error 

340 raise FileNotFoundError("Could not locate simulator results file.") 

341 

342 def _load_results( 

343 self, results: Union[str, Path, dict] 

344 ) -> dict[str, dict[str, pd.DataFrame]]: 

345 """Load dict with results for mpc, indicator, market and sim 

346 from specified results path.""" 

347 # load results 

348 if results is None: 

349 res_path = self.generator_config.results_directory 

350 elif isinstance(results, (str, Path)): 

351 res_path = results 

352 elif isinstance(results, dict): 

353 res_path = self.generator_config.results_directory 

354 else: 

355 raise ValueError("results must be a path or dict") 

356 

357 res = { 

358 self.baseline_agent_config.id: { 

359 self.baseline_module_config.module_id: load_mpc( 

360 Path( 

361 res_path, 

362 Path( 

363 self.baseline_module_config.optimization_backend[ 

364 "results_file" 

365 ] 

366 ).name, 

367 ) 

368 ) 

369 }, 

370 self.pos_flex_agent_config.id: { 

371 self.pos_flex_module_config.module_id: load_mpc( 

372 Path( 

373 res_path, 

374 Path( 

375 self.pos_flex_module_config.optimization_backend[ 

376 "results_file" 

377 ] 

378 ).name, 

379 ) 

380 ) 

381 }, 

382 self.neg_flex_agent_config.id: { 

383 self.neg_flex_module_config.module_id: load_mpc( 

384 Path( 

385 res_path, 

386 Path( 

387 self.neg_flex_module_config.optimization_backend[ 

388 "results_file" 

389 ] 

390 ).name, 

391 ) 

392 ) 

393 }, 

394 self.indicator_agent_config.id: { 

395 self.indicator_module_config.module_id: load_indicator( 

396 Path( 

397 res_path, 

398 Path(self.indicator_module_config.results_file).name, 

399 ) 

400 ) 

401 }, 

402 } 

403 if self.simulator_agent_config: 

404 resolved_sim_results_path = self._resolve_sim_results_path( 

405 self.simulator_module_config.result_filename, res_path 

406 ) 

407 print(f"Sim results extracted from: {resolved_sim_results_path}") 

408 res[self.simulator_agent_config.id] = { 

409 self.simulator_module_config.module_id: load_sim( 

410 resolved_sim_results_path, 

411 ) 

412 } 

413 if self.generator_config.market_config: 

414 res[self.market_agent_config.id] = { 

415 self.market_module_config.module_id: load_market( 

416 Path( 

417 res_path, 

418 Path(self.market_module_config.results_file).name, 

419 ) 

420 ) 

421 } 

422 return res, res_path 

423 

424 def _load_results_dataframes(self, results_dict: dict): 

425 """Load results dataframes for mpc, indicator, market and sim.""" 

426 if self.simulator_agent_config: 

427 self.df_simulation = results_dict[self.simulator_agent_config.id][ 

428 self.simulator_module_config.module_id 

429 ] 

430 self.df_baseline = results_dict[self.baseline_agent_config.id][ 

431 self.baseline_module_config.module_id 

432 ] 

433 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][ 

434 self.pos_flex_module_config.module_id 

435 ] 

436 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][ 

437 self.neg_flex_module_config.module_id 

438 ] 

439 self.df_indicator = results_dict[self.indicator_agent_config.id][ 

440 self.indicator_module_config.module_id 

441 ] 

442 if self.generator_config.market_config: 

443 self.df_market = results_dict[self.market_agent_config.id][ 

444 self.market_module_config.module_id 

445 ] 

446 else: 

447 self.df_market = None 

448 

449 def _load_stats_dataframes(self, results_path): 

450 """Load dataframes for mpc stats.""" 

451 self.df_baseline_stats = load_mpc_stats( 

452 Path( 

453 results_path, 

454 Path( 

455 self.baseline_module_config.optimization_backend["results_file"] 

456 ).name, 

457 ) 

458 ) 

459 self.df_pos_flex_stats = load_mpc_stats( 

460 Path( 

461 results_path, 

462 Path( 

463 self.pos_flex_module_config.optimization_backend["results_file"] 

464 ).name, 

465 ) 

466 ) 

467 self.df_neg_flex_stats = load_mpc_stats( 

468 Path( 

469 results_path, 

470 Path( 

471 self.neg_flex_module_config.optimization_backend["results_file"] 

472 ).name, 

473 ) 

474 ) 

475 

476 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes): 

477 """Convert the time in the dataframes to the desired timescale 

478 

479 Args: 

480 to_timescale: The timescale to convert the data to 

481 

482 """ 

483 for df in ( 

484 [ 

485 self.df_baseline, 

486 self.df_baseline_stats, 

487 self.df_pos_flex, 

488 self.df_pos_flex_stats, 

489 self.df_neg_flex, 

490 self.df_neg_flex_stats, 

491 self.df_indicator, 

492 ] 

493 + ([self.df_market] if self.generator_config.market_config else []) 

494 + ([self.df_simulation] if self.simulator_agent_config else []) 

495 ): 

496 convert_timescale_of_index( 

497 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale 

498 ) 

499 

500 # Update current unit 

501 self.current_timescale_of_data = to_timescale 

502 

503 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]: 

504 """Get the intersection of the MPCs and the simulator variables. 

505 

506 Returns: 

507 dictionary with the following structure: Key: variable alias (from baseline) 

508 Value: {module id: variable name} 

509 

510 """ 

511 id_alias_name_dict = {} 

512 

513 def get_id_alias_name_dict_element(alias: str): 

514 # id as key, {id: name} as value 

515 id_alias_name_dict[alias] = {} 

516 for config in [ 

517 self.simulator_module_config, 

518 self.baseline_module_config, 

519 self.pos_flex_module_config, 

520 self.neg_flex_module_config, 

521 ]: 

522 for var in config.get_variables(): 

523 if alias in (var.alias, var.name): 

524 id_alias_name_dict[alias][config.module_id] = var.name 

525 

526 # States, controls and power variable 

527 for variables in [ 

528 self.baseline_module_config.states, 

529 self.baseline_module_config.controls, 

530 ]: 

531 for variable in variables: 

532 get_id_alias_name_dict_element(variable.alias) 

533 get_id_alias_name_dict_element( 

534 self.generator_config.baseline_config_generator_data.power_variable 

535 ) 

536 

537 return id_alias_name_dict 

538 

539 def create_instance_with_skipped_validation( 

540 self, 

541 model_class: Type[BaseModel], 

542 config: Dict[str, Any], 

543 skip_fields: Optional[list[str]] = None, 

544 ) -> BaseModel: 

545 """Create a Pydantic model instance while skipping validation for specified fields. 

546 

547 This function allows partial validation of a model's config dictionary by validating 

548 all fields except those listed in `skip_fields`. Skipped fields are set on the instance 

549 after construction without triggering their validators. 

550 

551 Args: 

552 model_class: The Pydantic model class to instantiate. 

553 config: The input configuration dictionary. 

554 skip_fields: A list of field names to exclude from validation. 

555 These fields will be manually set after instantiation. 

556 

557 Returns: 

558 BaseModel: An instance of the model_class with validated and skipped fields assigned. 

559 

560 """ 

561 if skip_fields is None: 

562 skip_fields = [] 

563 # Separate data into validated and skipped fields 

564 validated_fields = { 

565 field: value for field, value in config.items() if field not in skip_fields 

566 } 

567 skipped_fields = { 

568 field: value for field, value in config.items() if field in skip_fields 

569 } 

570 # Create instance with validation for non-skipped fields 

571 if validated_fields: 

572 instance = model_class( 

573 **validated_fields, _agent_id=self.simulator_agent_config.id 

574 ) 

575 else: 

576 instance = model_class.model_construct() 

577 # Add skipped fields without validation 

578 for field, value in skipped_fields.items(): 

579 # bypass pydantic immutability to directly set attribute value 

580 object.__setattr__(instance, field, value) 

581 # Store metadata about bypassed fields for deepcopy compatibility 

582 object.__setattr__(instance, "_bypassed_fields", skip_fields) 

583 object.__setattr__(instance, "_original_config", config) 

584 return instance 

585 

586 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results": 

587 """Custom deepcopy implementation that handles Pydantic models with bypassed validation.""" 

588 # Create a new instance of the same class 

589 new_instance = self.__class__.__new__(self.__class__) 

590 # Add to memo immediately to prevent circular reference issues 

591 memo[id(self)] = new_instance 

592 for key, value in self.__dict__.items(): 

593 if key in ["simulator_module_config"] and hasattr( 

594 value, "_original_config" 

595 ): 

596 # Reconstruct the specific problematic object instead of deepcopying 

597 new_value = self.create_instance_with_skipped_validation( 

598 model_class=value.__class__, 

599 config=copy.deepcopy(value._original_config, memo), 

600 skip_fields=getattr(value, "_bypassed_fields", []), 

601 ) 

602 setattr(new_instance, key, new_value) 

603 else: 

604 # Everything else should deepcopy normally 

605 setattr(new_instance, key, copy.deepcopy(value, memo)) 

606 return new_instance