Coverage for agentlib_flexquant/data_structures/flex_results.py: 97%

207 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-03-26 09:43 +0000

1""" 

2Module for generating and managing results dataframes for flexibility analysis. 

3Results include baseline, positive and negative flexibility data, 

4the indicator, market and simulator results/data. 

5""" 

6import copy 

7import json 

8from pathlib import Path 

9from typing import Any, Dict, Optional, Type, Union 

10 

11import pandas as pd 

12from pydantic import FilePath 

13from agentlib.core.agent import AgentConfig 

14from agentlib.modules.simulation.simulator import SimulatorConfig 

15from agentlib.utils import load_config 

16from agentlib_mpc.modules.mpc.mpc import BaseMPCConfig 

17from agentlib_mpc.utils import TimeConversionTypes 

18from agentlib_mpc.utils.analysis import load_mpc, load_mpc_stats, load_sim 

19 

20import agentlib_flexquant.utils.config_management as cmng 

21from agentlib_flexquant.data_structures.flexquant import ( 

22 FlexQuantConfig, 

23 FlexibilityMarketConfig, 

24 FlexibilityIndicatorConfig 

25) 

26from agentlib_flexquant.data_structures.mpcs import ( 

27 BaselineMPCData, 

28 NFMPCData, 

29 PFMPCData, 

30) 

31from agentlib_flexquant.modules.flexibility_indicator import ( 

32 FlexibilityIndicatorModuleConfig, 

33) 

34from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

35from agentlib_flexquant.utils.data_handling import convert_timescale_of_index 

36 

37 

38def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame: 

39 """Load the flexibility indicator results from the given file path. 

40 

41 Args: 

42 file_path: the file path of the indicator results file 

43 

44 Returns: 

45 DataFrame containing the indicator results 

46 

47 """ 

48 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

49 return df 

50 

51 

52def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame: 

53 """Load the market results from the given file path. 

54 

55 Args: 

56 file_path: the file path of the market results file 

57 

58 Returns: 

59 DataFrame containing the market results 

60 

61 """ 

62 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

63 return df 

64 

65 

66class Results: 

67 """ 

68 Loads the results for the baseline, positive and negative flexibility, 

69 the indicator, market and simulator results/data. Additionally the MPC stats 

70 are loaded. 

71 

72 Results can be loaded either from a user-specified custom base path or from the 

73 (default) base path specified in the flex config. 

74 

75 Loaded results are stored in pandas DataFrames which can be used for further 

76 processing, e.g. plotting and analysis. 

77 """ 

78 

79 # Configs: 

80 # Generator 

81 flex_config: FlexQuantConfig 

82 # Agents 

83 simulator_agent_config: Optional[AgentConfig] 

84 baseline_agent_config: AgentConfig 

85 pos_flex_agent_config: AgentConfig 

86 neg_flex_agent_config: AgentConfig 

87 indicator_agent_config: AgentConfig 

88 market_agent_config: AgentConfig 

89 # Modules 

90 simulator_module_config: SimulatorConfig 

91 baseline_module_config: BaseMPCConfig 

92 pos_flex_module_config: BaseMPCConfig 

93 neg_flex_module_config: BaseMPCConfig 

94 indicator_module_config: FlexibilityIndicatorModuleConfig 

95 market_module_config: FlexibilityMarketModuleConfig 

96 

97 # Dataframes 

98 df_simulation: pd.DataFrame 

99 df_baseline: pd.DataFrame 

100 df_pos_flex: pd.DataFrame 

101 df_neg_flex: pd.DataFrame 

102 df_indicator: pd.DataFrame 

103 df_market: pd.DataFrame 

104 

105 # Stats of the MPCs 

106 df_baseline_stats: pd.DataFrame 

107 df_pos_flex_stats: pd.DataFrame 

108 df_neg_flex_stats: pd.DataFrame 

109 

110 # time conversion 

111 current_timescale_of_data: TimeConversionTypes = "seconds" 

112 

113 def __init__( 

114 self, 

115 flex_config: Optional[Union[str, FilePath, dict]], 

116 simulator_agent_config: Optional[Union[str, FilePath, dict]], 

117 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None, 

118 results: Optional[ 

119 Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"] 

120 ] = None, 

121 to_timescale: TimeConversionTypes = "seconds", 

122 ): 

123 # Already a Results instance — copy over its data 

124 if isinstance(results, Results): 

125 self.__dict__ = copy.deepcopy(results).__dict__ 

126 return 

127 

128 # Load flex config 

129 self._load_flex_config(flex_config, generated_flex_files_base_path) 

130 # Get filenames of configs to load agents and modules 

131 self._get_config_filenames() 

132 # Load configs for mpc, indicator, market 

133 self._load_agent_module_configs() 

134 # Load sim configs if present 

135 self._load_simulator_config(simulator_agent_config) 

136 # Load results and get a dict for generating dataframes 

137 results_dict, results_path = self._load_results(results) 

138 # Get dataframes for mpc, sim, flex indicator results 

139 self._load_results_dataframes(results_dict) 

140 # Get dataframes for mpc stats 

141 self._load_stats_dataframes(results_path) 

142 # Convert the time in the dataframes to the desired timescale 

143 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale) 

144 

145 # Clear unpicklable model reference to enable multiprocessing 

146 self._clear_unpicklable_references() 

147 

148 def _clear_unpicklable_references(self): 

149 """Remove references to objects that cannot be pickled. 

150 

151 This enables the Results object to be used with multiprocessing. 

152 The model field contains CDLL references that cannot be serialized. 

153 """ 

154 if (hasattr(self, 'simulator_module_config') and 

155 self.simulator_module_config is not None): 

156 if hasattr(self.simulator_module_config, 'model'): 

157 object.__setattr__(self.simulator_module_config, 'model', None) 

158 

159 def _load_flex_config( 

160 self, 

161 flex_config: Optional[Union[str, FilePath, dict]], 

162 custom_base_path: Optional[Union[str, FilePath]], 

163 ): 

164 """Load the flex config and optionally override the base directory path. 

165 

166 If a custom base path is provided, it overwrites the "flex_base_directory_path" 

167 in the given config. This is useful when the generated flex files are saved 

168 to a custom directory instead of the default (current working directory). 

169 

170 Args: 

171 flex_config: The config for flexibility quantification. 

172 custom_base_path: The custom directory for saving the generated flex files 

173 defined by user. 

174 

175 """ 

176 if custom_base_path is not None: 

177 if isinstance(flex_config, (str, Path)): 

178 with open(flex_config, "r", encoding="utf-8") as f: 

179 flex_config = json.load(f) 

180 flex_config["flex_base_directory_path"] = str(custom_base_path) 

181 

182 self.flex_config = load_config.load_config( 

183 config=flex_config, config_type=FlexQuantConfig 

184 ) 

185 

186 def _get_config_filenames(self): 

187 """Get filenames of configs to load agents and modules.""" 

188 self.config_filename_baseline = BaselineMPCData.model_validate( 

189 self.flex_config.baseline_config_generator_data 

190 ).name_of_created_file 

191 self.config_filename_pos_flex = PFMPCData.model_validate( 

192 self.flex_config.shadow_mpc_config_generator_data.pos_flex 

193 ).name_of_created_file 

194 self.config_filename_neg_flex = NFMPCData.model_validate( 

195 self.flex_config.shadow_mpc_config_generator_data.neg_flex 

196 ).name_of_created_file 

197 

198 self.config_filename_indicator = FlexibilityIndicatorConfig.model_validate( 

199 self.flex_config.indicator_config).name_of_created_file 

200 

201 if self.flex_config.market_config: 

202 if isinstance(self.flex_config.market_config, Union[str, Path]): 

203 self.config_filename_market = load_config.load_config( 

204 config=self.flex_config.market_config, 

205 config_type=FlexibilityMarketConfig 

206 ).name_of_created_file 

207 else: # is dict 

208 self.config_filename_market = FlexibilityMarketConfig.model_validate( 

209 self.flex_config.market_config).name_of_created_file 

210 

211 def _load_agent_module_configs(self): 

212 """Load agent and module configs.""" 

213 files_found = [] 

214 for file_path in Path(self.flex_config.flex_files_directory).rglob( 

215 "*.json" 

216 ): 

217 if file_path.name in self.config_filename_baseline: 

218 self.baseline_agent_config = load_config.load_config( 

219 config=file_path, config_type=AgentConfig 

220 ) 

221 self.baseline_module_config = cmng.get_module( 

222 config=self.baseline_agent_config, 

223 module_type= 

224 self._get_flexquant_mpc_module_type(self.baseline_agent_config), 

225 ) 

226 files_found.append(self.config_filename_baseline) 

227 

228 elif file_path.name in self.config_filename_pos_flex: 

229 self.pos_flex_agent_config = load_config.load_config( 

230 config=file_path, config_type=AgentConfig 

231 ) 

232 self.pos_flex_module_config = cmng.get_module( 

233 config=self.pos_flex_agent_config, 

234 module_type= 

235 self._get_flexquant_mpc_module_type(self.pos_flex_agent_config), 

236 ) 

237 files_found.append(self.config_filename_pos_flex) 

238 

239 elif file_path.name in self.config_filename_neg_flex: 

240 self.neg_flex_agent_config = load_config.load_config( 

241 config=file_path, config_type=AgentConfig 

242 ) 

243 self.neg_flex_module_config = cmng.get_module( 

244 config=self.neg_flex_agent_config, 

245 module_type= 

246 self._get_flexquant_mpc_module_type(self.neg_flex_agent_config), 

247 ) 

248 files_found.append(self.config_filename_neg_flex) 

249 

250 elif file_path.name in self.config_filename_indicator: 

251 self.indicator_agent_config = load_config.load_config( 

252 config=file_path, config_type=AgentConfig 

253 ) 

254 self.indicator_module_config = cmng.get_module( 

255 config=self.indicator_agent_config, 

256 module_type=cmng.INDICATOR_CONFIG_TYPE, 

257 ) 

258 files_found.append(self.config_filename_indicator) 

259 

260 elif ( 

261 self.flex_config.market_config 

262 and file_path.name in self.config_filename_market 

263 ): 

264 self.market_agent_config = load_config.load_config( 

265 config=file_path, config_type=AgentConfig 

266 ) 

267 self.market_module_config = cmng.get_module( 

268 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

269 ) 

270 files_found.append(self.config_filename_market) 

271 files_needed = [self.config_filename_baseline, 

272 self.config_filename_pos_flex, self.config_filename_neg_flex, 

273 self.config_filename_indicator] 

274 if self.flex_config.market_config: 

275 files_needed.append(self.config_filename_market) 

276 difference = list(set(files_needed) - set(files_found)) 

277 if difference: 

278 import warnings 

279 warnings.warn(f"The files {difference} have not been found in the " 

280 f"given Path. This will most likely cause problems " 

281 f"later on. Please check the filenames.") 

282 

283 def _load_simulator_config(self, simulator_agent_config): 

284 """Load simulator agent and module config separately. 

285 

286 Separate loading is required to skip pydantic validation for specific field(s). 

287 

288 """ 

289 # check config type: with results path adaptation -> dict; without -> str/Path 

290 if not simulator_agent_config: 

291 self.simulator_agent_config = None 

292 return 

293 if isinstance(simulator_agent_config, (str, Path)): 

294 with open(simulator_agent_config, "r", encoding="utf-8") as f: 

295 sim_config = json.load(f) 

296 else: # is dict 

297 sim_config = simulator_agent_config 

298 sim_module_config = next( 

299 ( 

300 module 

301 for module in sim_config["modules"] 

302 if module["type"] == "simulator" 

303 ), 

304 None, 

305 ) 

306 # instantiate and validate sim agent config 

307 self.simulator_agent_config = AgentConfig.model_validate(sim_config) 

308 # instantiate sim module config by skipping validation for result_filename 

309 # to prevent file deletion, if overwrite_result_file in sim config is true 

310 self.simulator_module_config = ( 

311 self.create_simulator_config_with_skipped_validation( 

312 sim_config_class=SimulatorConfig, 

313 sim_config=sim_module_config, 

314 skip_fields=["result_filename"], 

315 )) 

316 

317 def _get_flexquant_mpc_module_type(self, agent_config: AgentConfig) -> str: 

318 """Get the mpc module type from agent_config. 

319 

320 The module type is defined in agentlib_flexquant. 

321 

322 Args: 

323 agent_config: the AgentConfig containing the mpc module 

324 

325 Returns: 

326 The type of the mpc module 

327 

328 """ 

329 for module in agent_config.modules: 

330 if module['type'] in [cmng.BASELINEMPC_CONFIG_TYPE, 

331 cmng.BASELINEMINLPMPC_CONFIG_TYPE, 

332 cmng.SHADOWMPC_CONFIG_TYPE, 

333 cmng.SHADOWMINLPMPC_CONFIG_TYPE]: 

334 return module['type'] 

335 

336 raise ModuleNotFoundError(f'There is no matching mpc module type in ' 

337 f'Agentlib_FlexQuant for modules in agent ' 

338 f'{agent_config.id}.') 

339 

340 def _resolve_sim_results_path( 

341 self, sim_result_filename: str, results_path: Union[str, Path] 

342 ) -> Path: 

343 """ 

344 Resolve simulator results path with fallback strategy. 

345 

346 Tries multiple strategies to locate the simulator results file: 

347 1. Use absolute path if file exists there 

348 2. Use relative path as-is from current directory 

349 3. Use filename only and look in results directory 

350 (handles both relative paths and just filenames) 

351 

352 Args: 

353 sim_result_filename: The result filename from simulator config 

354 results_path: The results directory path 

355 

356 Returns: 

357 Path object pointing to the simulator results file 

358 

359 Raises: 

360 FileNotFoundError: If file cannot be found using any strategy 

361 """ 

362 sim_results_path = Path(sim_result_filename) 

363 results_path = Path(results_path) 

364 

365 # Strategy 1: If it's an absolute path and exists, use it 

366 if sim_results_path.is_absolute() and sim_results_path.exists(): 

367 return sim_results_path 

368 

369 # Strategy 2: If it's a relative path, try it as-is from current directory 

370 if not sim_results_path.is_absolute() and sim_results_path.exists(): 

371 return sim_results_path 

372 

373 # Strategy 3: Try in results directory (handles both relative paths 

374 # and just filenames) (fallback for helper function usage) 

375 results_dir_path = results_path / sim_results_path.name 

376 if results_dir_path.exists(): 

377 return results_dir_path 

378 

379 # If none of the strategies worked, raise an error 

380 raise FileNotFoundError("Could not locate simulator results file.") 

381 

382 def _load_results( 

383 self, results: Union[str, Path, dict] 

384 ) -> [dict[str, dict[str, pd.DataFrame]], Union[str, Path]]: 

385 """Load dict with results for mpc, indicator, market and sim 

386 from specified results path.""" 

387 # load results 

388 if results is None: 

389 res_path = self.flex_config.results_directory 

390 elif isinstance(results, (str, Path)): 

391 res_path = results 

392 elif isinstance(results, dict): 

393 res_path = self.flex_config.results_directory 

394 else: 

395 raise ValueError("results must be a path or dict") 

396 

397 res = { 

398 self.baseline_agent_config.id: { 

399 self.baseline_module_config.module_id: load_mpc( 

400 Path( 

401 res_path, 

402 Path( 

403 self.baseline_module_config.optimization_backend[ 

404 "results_file" 

405 ] 

406 ).name, 

407 ) 

408 ) 

409 }, 

410 self.pos_flex_agent_config.id: { 

411 self.pos_flex_module_config.module_id: load_mpc( 

412 Path( 

413 res_path, 

414 Path( 

415 self.pos_flex_module_config.optimization_backend[ 

416 "results_file" 

417 ] 

418 ).name, 

419 ) 

420 ) 

421 }, 

422 self.neg_flex_agent_config.id: { 

423 self.neg_flex_module_config.module_id: load_mpc( 

424 Path( 

425 res_path, 

426 Path( 

427 self.neg_flex_module_config.optimization_backend[ 

428 "results_file" 

429 ] 

430 ).name, 

431 ) 

432 ) 

433 }, 

434 self.indicator_agent_config.id: { 

435 self.indicator_module_config.module_id: load_indicator( 

436 Path( 

437 res_path, 

438 Path(self.indicator_module_config.results_file).name, 

439 ) 

440 ) 

441 }, 

442 } 

443 if self.simulator_agent_config: 

444 resolved_sim_results_path = self._resolve_sim_results_path( 

445 self.simulator_module_config.result_filename, res_path 

446 ) 

447 print(f"Sim results extracted from: {resolved_sim_results_path}") 

448 res[self.simulator_agent_config.id] = { 

449 self.simulator_module_config.module_id: load_sim( 

450 resolved_sim_results_path, 

451 ) 

452 } 

453 if self.flex_config.market_config: 

454 res[self.market_agent_config.id] = { 

455 self.market_module_config.module_id: load_market( 

456 Path( 

457 res_path, 

458 Path(self.market_module_config.results_file).name, 

459 ) 

460 ) 

461 } 

462 return res, res_path 

463 

464 def _load_results_dataframes(self, results_dict: dict): 

465 """Load results dataframes for mpc, indicator, market and sim.""" 

466 if self.simulator_agent_config: 

467 self.df_simulation = results_dict[self.simulator_agent_config.id][ 

468 self.simulator_module_config.module_id 

469 ] 

470 self.df_baseline = results_dict[self.baseline_agent_config.id][ 

471 self.baseline_module_config.module_id 

472 ] 

473 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][ 

474 self.pos_flex_module_config.module_id 

475 ] 

476 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][ 

477 self.neg_flex_module_config.module_id 

478 ] 

479 self.df_indicator = results_dict[self.indicator_agent_config.id][ 

480 self.indicator_module_config.module_id 

481 ] 

482 if self.flex_config.market_config: 

483 self.df_market = results_dict[self.market_agent_config.id][ 

484 self.market_module_config.module_id 

485 ] 

486 else: 

487 self.df_market = None 

488 

489 def _load_stats_dataframes(self, results_path): 

490 """Load dataframes for mpc stats.""" 

491 self.df_baseline_stats = load_mpc_stats( 

492 Path( 

493 results_path, 

494 Path( 

495 self.baseline_module_config.optimization_backend["results_file"] 

496 ).name, 

497 ) 

498 ) 

499 self.df_pos_flex_stats = load_mpc_stats( 

500 Path( 

501 results_path, 

502 Path( 

503 self.pos_flex_module_config.optimization_backend["results_file"] 

504 ).name, 

505 ) 

506 ) 

507 self.df_neg_flex_stats = load_mpc_stats( 

508 Path( 

509 results_path, 

510 Path( 

511 self.neg_flex_module_config.optimization_backend["results_file"] 

512 ).name, 

513 ) 

514 ) 

515 

516 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes): 

517 """Convert the time in the dataframes to the desired timescale 

518 

519 Args: 

520 to_timescale: The timescale to convert the data to 

521 

522 """ 

523 for df in ( 

524 [ 

525 self.df_baseline, 

526 self.df_baseline_stats, 

527 self.df_pos_flex, 

528 self.df_pos_flex_stats, 

529 self.df_neg_flex, 

530 self.df_neg_flex_stats, 

531 self.df_indicator, 

532 ] 

533 + ([self.df_market] if self.flex_config.market_config else []) 

534 + ([self.df_simulation] if self.simulator_agent_config else []) 

535 ): 

536 convert_timescale_of_index( 

537 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale 

538 ) 

539 

540 # Update current unit 

541 self.current_timescale_of_data = to_timescale 

542 

543 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]: 

544 """Get the intersection of the MPCs and the simulator variables. 

545 

546 Returns: 

547 dictionary with the following structure: 

548 Key: variable alias (from baseline) 

549 Value: {module id: variable name} 

550 

551 """ 

552 id_alias_name_dict = {} 

553 

554 def get_id_alias_name_dict_element(alias: str): 

555 # id as key, {id: name} as value 

556 id_alias_name_dict[alias] = {} 

557 for config in [ 

558 self.simulator_module_config, 

559 self.baseline_module_config, 

560 self.pos_flex_module_config, 

561 self.neg_flex_module_config, 

562 ]: 

563 for var in config.get_variables(): 

564 if alias in (var.alias, var.name): 

565 id_alias_name_dict[alias][config.module_id] = var.name 

566 

567 # States, controls and power variable 

568 for variables in [ 

569 self.baseline_module_config.states, 

570 self.baseline_module_config.controls, 

571 ]: 

572 for variable in variables: 

573 get_id_alias_name_dict_element(variable.alias) 

574 get_id_alias_name_dict_element( 

575 self.flex_config.baseline_config_generator_data.power_variable 

576 ) 

577 

578 return id_alias_name_dict 

579 

580 def create_simulator_config_with_skipped_validation( 

581 self, 

582 sim_config_class: Type[SimulatorConfig], 

583 sim_config: Dict[str, Any], 

584 skip_fields: Optional[list[str]] = None, 

585 ) -> SimulatorConfig: 

586 """Create a Pydantic model instance while skipping validation for 

587 specified fields. 

588 

589 This function allows partial validation of a model's config dictionary 

590 by validating all fields except those listed in `skip_fields`. 

591 Skipped fields are set on the instance after construction without 

592 triggering their validators. 

593 

594 Args: 

595 sim_config_class: The Pydantic model class to instantiate. 

596 sim_config: The input configuration dictionary. 

597 skip_fields: A list of field names to exclude from validation. 

598 These fields will be manually set after instantiation. 

599 

600 Returns: 

601 SimulatorConfig: An instance of the model_class with validated and 

602 skipped fields assigned. 

603 

604 """ 

605 if skip_fields is None: 

606 skip_fields = [] 

607 # Separate data into validated and skipped fields 

608 validated_fields = { 

609 field: value for field, value in sim_config.items() if 

610 field not in skip_fields 

611 } 

612 skipped_fields = { 

613 field: value for field, value in sim_config.items() if 

614 field in skip_fields 

615 } 

616 # Create instance with validation for non-skipped fields 

617 if validated_fields: 

618 instance = sim_config_class( 

619 **validated_fields, _agent_id=self.simulator_agent_config.id 

620 ) 

621 else: 

622 instance = sim_config_class.model_construct() 

623 # Add skipped fields without validation 

624 for field, value in skipped_fields.items(): 

625 # bypass pydantic immutability to directly set attribute value 

626 object.__setattr__(instance, field, value) 

627 # Store metadata about bypassed fields for deepcopy compatibility 

628 object.__setattr__(instance, "_bypassed_fields", skip_fields) 

629 object.__setattr__(instance, "_original_config", sim_config) 

630 return instance 

631 

632 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results": 

633 """Custom deepcopy implementation that handles Pydantic models with bypassed 

634 validation. 

635 Needed, if a Results object should be copied with copy.deepcopy, without 

636 deleting the simulator results due to its pydantic validators. 

637 """ 

638 # Create a new instance of the same class 

639 new_instance = self.__class__.__new__(self.__class__) 

640 # Add to memo immediately to prevent circular reference issues 

641 memo[id(self)] = new_instance 

642 for key, value in self.__dict__.items(): 

643 if key in ["simulator_module_config"] and hasattr( 

644 value, "_original_config" 

645 ): 

646 # Reconstruct the specific problematic object instead of deep copying 

647 new_value = self.create_simulator_config_with_skipped_validation( 

648 sim_config_class=value.__class__, 

649 sim_config=copy.deepcopy(value._original_config, memo), 

650 skip_fields=getattr(value, "_bypassed_fields", []), 

651 ) 

652 setattr(new_instance, key, new_value) 

653 else: 

654 # Everything else should deepcopy normally 

655 setattr(new_instance, key, copy.deepcopy(value, memo)) 

656 return new_instance