Coverage for agentlib_flexquant/data_structures/flex_results.py: 33%

172 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-15 15:25 +0000

1import copy 

2import json 

3import pandas as pd 

4from typing import Union, Optional, Dict, Any, List, Type 

5from pydantic import FilePath, BaseModel 

6from pathlib import Path 

7from agentlib.core.agent import AgentConfig 

8from agentlib.utils import load_config 

9from agentlib.modules.simulation.simulator import SimulatorConfig 

10from agentlib_mpc.modules.mpc import BaseMPCConfig 

11from agentlib_mpc.utils import TimeConversionTypes 

12from agentlib_mpc.utils.analysis import load_sim, load_mpc, load_mpc_stats 

13import agentlib_flexquant.utils.config_management as cmng 

14from agentlib_flexquant.data_structures.flexquant import FlexQuantConfig, FlexibilityMarketConfig 

15from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, NFMPCData, PFMPCData 

16from agentlib_flexquant.utils.data_handling import convert_timescale_of_index 

17from agentlib_flexquant.modules.flexibility_indicator import FlexibilityIndicatorModuleConfig 

18from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

19 

20 

21def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame: 

22 """Load the flexibility indicator results from the given file path. 

23 

24 Args: 

25 file_path: the file path of the indicator results file 

26 

27 Returns: 

28 DataFrame containing the indicator results 

29 

30 """ 

31 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

32 return df 

33 

34 

35def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame: 

36 """Load the market results from the given file path. 

37 

38 Args: 

39 file_path: the file path of the market results file 

40 

41 Returns: 

42 DataFrame containing the market results 

43 

44 """ 

45 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

46 return df 

47 

48 

49class Results: 

50 # Configs: 

51 # Generator 

52 generator_config: FlexQuantConfig 

53 # Agents 

54 simulator_agent_config: AgentConfig 

55 baseline_agent_config: AgentConfig 

56 pos_flex_agent_config: AgentConfig 

57 neg_flex_agent_config: AgentConfig 

58 indicator_agent_config: AgentConfig 

59 market_agent_config: AgentConfig 

60 # Modules 

61 simulator_module_config: SimulatorConfig 

62 baseline_module_config: BaseMPCConfig 

63 pos_flex_module_config: BaseMPCConfig 

64 neg_flex_module_config: BaseMPCConfig 

65 indicator_module_config: FlexibilityIndicatorModuleConfig 

66 market_module_config: FlexibilityMarketModuleConfig 

67 

68 # Dataframes 

69 df_simulation: pd.DataFrame 

70 df_baseline: pd.DataFrame 

71 df_pos_flex: pd.DataFrame 

72 df_neg_flex: pd.DataFrame 

73 df_indicator: pd.DataFrame 

74 df_market: pd.DataFrame 

75 

76 # Stats of the MPCs 

77 df_baseline_stats: pd.DataFrame 

78 df_pos_flex_stats: pd.DataFrame 

79 df_neg_flex_stats: pd.DataFrame 

80 

81 # time conversion 

82 current_timescale_of_data: TimeConversionTypes = "seconds" 

83 

84 def __init__( 

85 self, 

86 flex_config: Optional[Union[str, FilePath, dict]], 

87 simulator_agent_config: Optional[Union[str, FilePath, dict]], 

88 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None, 

89 results: Optional[Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]] = None, 

90 to_timescale: TimeConversionTypes = "seconds", 

91 ): 

92 # Already a Results instance — copy over its data 

93 if isinstance(results, Results): 

94 self.__dict__ = copy.deepcopy(results).__dict__ 

95 return 

96 

97 # Load flex config 

98 self._load_flex_config(flex_config, generated_flex_files_base_path) 

99 # Get filenames of configs to load agents and modules 

100 self._get_config_filenames() 

101 # Load configs for mpc, indicator, market 

102 self._load_agent_module_configs() 

103 # Load sim configs if present 

104 if simulator_agent_config: 

105 self._load_simulator_config(simulator_agent_config) 

106 # Load results and get a dict for generating dataframes 

107 results_dict, results_path = self._load_results(results) 

108 # Get dataframes for mpc, sim, flex indicator results 

109 self._load_results_dataframes(results_dict) 

110 # Get dataframes for mpc stats 

111 self._load_stats_dataframes(results_path) 

112 # Convert the time in the dataframes to the desired timescale 

113 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale) 

114 

115 def _load_flex_config(self, flex_config: Optional[Union[str, FilePath, dict]], custom_base_path: Optional[Union[str, FilePath]]): 

116 """Load the flex config and optionally override the base directory path. 

117 

118 If a custom base path is provided, it overwrites the "flex_base_directory_path" 

119 in the given config. This is useful when the generated flex files are saved 

120 to a custom directory instead of the default (current working directory). 

121 

122 Args: 

123 flex_config: The config for flexibility quantification. 

124 custom_base_path: The custom directory for saving the generated flex files defined by user. 

125 

126 """ 

127 if custom_base_path is not None: 

128 if isinstance(flex_config, (str, Path)): 

129 with open(flex_config, "r") as f: 

130 flex_config = json.load(f) 

131 flex_config["flex_base_directory_path"] = str(custom_base_path) 

132 

133 self.generator_config = load_config.load_config( 

134 config=flex_config, config_type=FlexQuantConfig) 

135 

136 def _get_config_filenames(self): 

137 """Get filenames of configs to load agents and modules.""" 

138 self.config_filename_baseline = BaselineMPCData.model_validate( 

139 self.generator_config.baseline_config_generator_data 

140 ).name_of_created_file 

141 self.config_filename_pos_flex = PFMPCData.model_validate( 

142 self.generator_config.shadow_mpc_config_generator_data.pos_flex 

143 ).name_of_created_file 

144 self.config_filename_neg_flex = NFMPCData.model_validate( 

145 self.generator_config.shadow_mpc_config_generator_data.neg_flex 

146 ).name_of_created_file 

147 self.config_filename_indicator = self.generator_config.indicator_config.name_of_created_file 

148 

149 if self.generator_config.market_config: 

150 market_config_raw = self.generator_config.market_config 

151 if isinstance(market_config_raw, (str, Path)): 

152 market_config = FlexibilityMarketConfig.model_validate_json( 

153 Path(market_config_raw).read_text() 

154 ) 

155 else: 

156 market_config = FlexibilityMarketConfig.model_validate(market_config_raw) 

157 self.config_filename_market = market_config.name_of_created_file 

158 

159 def _load_agent_module_configs(self): 

160 """Load agent and module configs.""" 

161 for file_path in Path(self.generator_config.flex_files_directory).rglob("*.json"): 

162 if file_path.name in self.config_filename_baseline: 

163 self.baseline_agent_config = load_config.load_config( 

164 config=file_path, config_type=AgentConfig 

165 ) 

166 self.baseline_module_config = cmng.get_module( 

167 config=self.baseline_agent_config, 

168 module_type=cmng.BASELINEMPC_CONFIG_TYPE, 

169 ) 

170 

171 elif file_path.name in self.config_filename_pos_flex: 

172 self.pos_flex_agent_config = load_config.load_config( 

173 config=file_path, config_type=AgentConfig 

174 ) 

175 self.pos_flex_module_config = cmng.get_module( 

176 config=self.pos_flex_agent_config, 

177 module_type=cmng.SHADOWMPC_CONFIG_TYPE, 

178 ) 

179 

180 elif file_path.name in self.config_filename_neg_flex: 

181 self.neg_flex_agent_config = load_config.load_config( 

182 config=file_path, config_type=AgentConfig 

183 ) 

184 self.neg_flex_module_config = cmng.get_module( 

185 config=self.neg_flex_agent_config, 

186 module_type=cmng.SHADOWMPC_CONFIG_TYPE, 

187 ) 

188 

189 elif file_path.name in self.config_filename_indicator: 

190 self.indicator_agent_config = load_config.load_config( 

191 config=file_path, config_type=AgentConfig 

192 ) 

193 self.indicator_module_config = cmng.get_module( 

194 config=self.indicator_agent_config, 

195 module_type=cmng.INDICATOR_CONFIG_TYPE, 

196 ) 

197 

198 elif ( 

199 self.generator_config.market_config 

200 and file_path.name in self.config_filename_market 

201 ): 

202 self.market_agent_config = load_config.load_config( 

203 config=file_path, config_type=AgentConfig 

204 ) 

205 self.market_module_config = cmng.get_module( 

206 config=self.market_agent_config, 

207 module_type=cmng.MARKET_CONFIG_TYPE 

208 ) 

209 

210 def _load_simulator_config(self, simulator_agent_config): 

211 """Load simulator agent and module config separately. 

212 

213 Separate loading is required to skip pydantic validation for specific field(s). 

214 

215 """ 

216 # check config type: with results path adaptation -> dict; without -> str/Path 

217 if isinstance(simulator_agent_config, (str, Path)): 

218 with open(simulator_agent_config, "r") as f: 

219 sim_config = json.load(f) 

220 elif isinstance(simulator_agent_config, dict): 

221 sim_config = simulator_agent_config 

222 sim_module_config = next( 

223 (module for module in sim_config["modules"] if module["type"] == "simulator"), 

224 None 

225 ) 

226 # instantiate and validate sim agent config 

227 self.simulator_agent_config = AgentConfig.model_validate(sim_config) 

228 # instantiate sim module config by skipping validation for result_filename 

229 # to prevent file deletion 

230 self.simulator_module_config = self.create_instance_with_skipped_validation( 

231 model_class=SimulatorConfig, 

232 config=sim_module_config, 

233 skip_fields=["result_filename"] 

234 ) 

235 

236 def _load_results( 

237 self, results: Union[str, Path, dict] 

238 ) -> dict[str, dict[str, pd.DataFrame]]: 

239 """Load dict with results for mpc, indicator, market and sim from specified results path.""" 

240 # load results 

241 if results is None: 

242 res_path = self.generator_config.results_directory 

243 elif isinstance(results, (str, Path)): 

244 res_path = results 

245 elif isinstance(results, dict): 

246 res_path = self.generator_config.results_directory 

247 else: 

248 raise ValueError("results must be a path or dict") 

249 

250 res = { 

251 self.baseline_agent_config.id: { 

252 self.baseline_module_config.module_id: load_mpc( 

253 Path( 

254 res_path, 

255 Path( 

256 self.baseline_module_config.optimization_backend[ 

257 "results_file" 

258 ] 

259 ).name, 

260 ) 

261 ) 

262 }, 

263 self.pos_flex_agent_config.id: { 

264 self.pos_flex_module_config.module_id: load_mpc( 

265 Path( 

266 res_path, 

267 Path( 

268 self.pos_flex_module_config.optimization_backend[ 

269 "results_file" 

270 ] 

271 ).name, 

272 ) 

273 ) 

274 }, 

275 self.neg_flex_agent_config.id: { 

276 self.neg_flex_module_config.module_id: load_mpc( 

277 Path( 

278 res_path, 

279 Path( 

280 self.neg_flex_module_config.optimization_backend[ 

281 "results_file" 

282 ] 

283 ).name, 

284 ) 

285 ) 

286 }, 

287 self.indicator_agent_config.id: { 

288 self.indicator_module_config.module_id: load_indicator( 

289 Path( 

290 res_path, 

291 Path(self.indicator_module_config.results_file).name, 

292 ) 

293 ) 

294 } 

295 } 

296 if self.simulator_agent_config: 

297 res[self.simulator_agent_config.id] = { 

298 self.simulator_module_config.module_id: load_sim( 

299 Path(self.simulator_module_config.result_filename) 

300 ) 

301 } 

302 if self.generator_config.market_config: 

303 res[self.market_agent_config.id] = { 

304 self.market_module_config.module_id: load_market( 

305 Path( 

306 res_path, 

307 Path(self.market_module_config.results_file).name, 

308 ) 

309 ) 

310 } 

311 return res, res_path 

312 

313 def _load_results_dataframes(self, results_dict: dict): 

314 """Load results dataframes for mpc, indicator, market and sim.""" 

315 if self.simulator_agent_config: 

316 self.df_simulation = results_dict[self.simulator_agent_config.id][ 

317 self.simulator_module_config.module_id 

318 ] 

319 self.df_baseline = results_dict[self.baseline_agent_config.id][ 

320 self.baseline_module_config.module_id 

321 ] 

322 self.df_pos_flex = results_dict[self.pos_flex_agent_config.id][ 

323 self.pos_flex_module_config.module_id 

324 ] 

325 self.df_neg_flex = results_dict[self.neg_flex_agent_config.id][ 

326 self.neg_flex_module_config.module_id 

327 ] 

328 self.df_indicator = results_dict[self.indicator_agent_config.id][ 

329 self.indicator_module_config.module_id 

330 ] 

331 if self.generator_config.market_config: 

332 self.df_market = results_dict[self.market_agent_config.id][ 

333 self.market_module_config.module_id 

334 ] 

335 else: 

336 self.df_market = None 

337 

338 def _load_stats_dataframes(self, results_path): 

339 """Load dataframes for mpc stats.""" 

340 self.df_baseline_stats = load_mpc_stats( 

341 Path( 

342 results_path, 

343 Path( 

344 self.baseline_module_config.optimization_backend["results_file"] 

345 ).name, 

346 ) 

347 ) 

348 self.df_pos_flex_stats = load_mpc_stats( 

349 Path( 

350 results_path, 

351 Path( 

352 self.pos_flex_module_config.optimization_backend["results_file"] 

353 ).name, 

354 ) 

355 ) 

356 self.df_neg_flex_stats = load_mpc_stats( 

357 Path( 

358 results_path, 

359 Path( 

360 self.neg_flex_module_config.optimization_backend["results_file"] 

361 ).name, 

362 ) 

363 ) 

364 

365 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes): 

366 """Convert the time in the dataframes to the desired timescale 

367 

368 Args: 

369 to_timescale: The timescale to convert the data to 

370 

371 """ 

372 for df in ([ 

373 self.df_baseline, 

374 self.df_baseline_stats, 

375 self.df_pos_flex, 

376 self.df_pos_flex_stats, 

377 self.df_neg_flex, 

378 self.df_neg_flex_stats, 

379 self.df_indicator, 

380 ] + ([self.df_market] if self.generator_config.market_config else []) + 

381 ([self.df_simulation] if self.simulator_agent_config else [])): 

382 convert_timescale_of_index( 

383 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale 

384 ) 

385 

386 # Update current unit 

387 self.current_timescale_of_data = to_timescale 

388 

389 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]: 

390 """Get the intersection of the MPCs and the simulator variables. 

391 

392 Returns: 

393 dictionary with the following structure: Key: variable alias (from baseline) 

394 Value: {module id: variable name} 

395 

396 """ 

397 id_alias_name_dict = {} 

398 

399 def get_id_alias_name_dict_element(alias: str): 

400 # id as key, {id: name} as value 

401 id_alias_name_dict[alias] = {} 

402 for config in [ 

403 self.simulator_module_config, 

404 self.baseline_module_config, 

405 self.pos_flex_module_config, 

406 self.neg_flex_module_config, 

407 ]: 

408 for var in config.get_variables(): 

409 if var.alias == alias or var.name == alias: 

410 id_alias_name_dict[alias][config.module_id] = var.name 

411 

412 # States, controls and power variable 

413 for variables in [ 

414 self.baseline_module_config.states, 

415 self.baseline_module_config.controls, 

416 ]: 

417 for variable in variables: 

418 get_id_alias_name_dict_element(variable.alias) 

419 get_id_alias_name_dict_element( 

420 self.generator_config.baseline_config_generator_data.power_variable 

421 ) 

422 

423 return id_alias_name_dict 

424 

425 def create_instance_with_skipped_validation( 

426 self, 

427 model_class: Type[BaseModel], 

428 config: Dict[str, Any], 

429 skip_fields: Optional[List[str]] = None 

430 ) -> BaseModel: 

431 """Create a Pydantic model instance while skipping validation for specified fields. 

432 

433 This function allows partial validation of a model's config dictionary by validating 

434 all fields except those listed in `skip_fields`. Skipped fields are set on the instance 

435 after construction without triggering their validators. 

436 

437 Args: 

438 model_class (Type[BaseModel]): The Pydantic model class to instantiate. 

439 config (Dict[str, Any]): The input configuration dictionary. 

440 skip_fields (Optional[List[str]]): A list of field names to exclude from validation. 

441 These fields will be manually set after instantiation. 

442 

443 Returns: 

444 BaseModel: An instance of the model_class with validated and skipped fields assigned. 

445 

446 """ 

447 if skip_fields is None: 

448 skip_fields = [] 

449 # Separate data into validated and skipped fields 

450 validated_fields = {field: value for field, value in config.items() if field not in skip_fields} 

451 skipped_fields = {field: value for field, value in config.items() if field in skip_fields} 

452 # Create instance with validation for non-skipped fields 

453 if validated_fields: 

454 instance = model_class( 

455 **validated_fields, 

456 _agent_id=self.simulator_agent_config.id 

457 ) 

458 else: 

459 instance = model_class.model_construct() 

460 # Add skipped fields without validation 

461 for field, value in skipped_fields.items(): 

462 # bypass pydantic immutability to directly set attribute value 

463 object.__setattr__(instance, field, value) 

464 # Store metadata about bypassed fields for deepcopy compatibility 

465 object.__setattr__(instance, '_bypassed_fields', skip_fields) 

466 object.__setattr__(instance, '_original_config', config) 

467 return instance 

468 

469 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results": 

470 """Custom deepcopy implementation that handles Pydantic models with bypassed validation.""" 

471 # Create a new instance of the same class 

472 new_instance = self.__class__.__new__(self.__class__) 

473 # Add to memo immediately to prevent circular reference issues 

474 memo[id(self)] = new_instance 

475 for key, value in self.__dict__.items(): 

476 if key in ['simulator_module_config'] and hasattr(value, '_original_config'): 

477 # Reconstruct the specific problematic object instead of deepcopying 

478 new_value = self.create_instance_with_skipped_validation( 

479 model_class=value.__class__, 

480 config=copy.deepcopy(value._original_config, memo), 

481 skip_fields=getattr(value, '_bypassed_fields', []) 

482 ) 

483 setattr(new_instance, key, new_value) 

484 else: 

485 # Everything else should deepcopy normally 

486 setattr(new_instance, key, copy.deepcopy(value, memo)) 

487 return new_instance