Coverage for agentlib_flexquant/data_structures/flex_results.py: 33%

162 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-01 15:10 +0000

1import copy 

2from typing import Union, Optional, Dict, Any, List, Type 

3 

4import agentlib 

5from pydantic import FilePath, BaseModel 

6from pathlib import Path 

7import json 

8import os 

9import pandas as pd 

10 

11from agentlib.core.agent import AgentConfig 

12from agentlib.core.module import BaseModuleConfig 

13from agentlib.utils import load_config 

14from agentlib_mpc.modules.mpc import BaseMPCConfig 

15from agentlib.modules.simulation.simulator import SimulatorConfig 

16from agentlib_flexquant.data_structures.flexquant import ( 

17 FlexQuantConfig, 

18 FlexibilityIndicatorConfig, 

19 FlexibilityMarketConfig, 

20) 

21from agentlib_flexquant.data_structures.mpcs import ( 

22 BaselineMPCData, 

23 NFMPCData, 

24 PFMPCData, 

25) 

26from agentlib_flexquant.utils.data_handling import convert_timescale_of_index 

27from agentlib_mpc.utils import TimeConversionTypes 

28from agentlib_mpc.utils.analysis import load_sim, load_mpc, load_mpc_stats 

29 

30from agentlib_flexquant.modules.flexibility_indicator import ( 

31 FlexibilityIndicatorModuleConfig, 

32) 

33from agentlib_flexquant.modules.flexibility_market import ( 

34 FlexibilityMarketModuleConfig, 

35) 

36import agentlib_flexquant.utils.config_management as cmng 

37 

38 

39def load_indicator(file_path: Union[str, FilePath]) -> pd.DataFrame: 

40 """ 

41 Load the flexibility indicator results from the given file path 

42 """ 

43 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

44 return df 

45 

46 

47def load_market(file_path: Union[str, FilePath]) -> pd.DataFrame: 

48 """ 

49 Load the market results from the given file path 

50 """ 

51 df = pd.read_csv(file_path, header=0, index_col=[0, 1]) 

52 return df 

53 

54 

55class Results: 

56 # Configs: 

57 # Generator 

58 generator_config: FlexQuantConfig 

59 # Agents 

60 simulator_agent_config: AgentConfig 

61 baseline_agent_config: AgentConfig 

62 pos_flex_agent_config: AgentConfig 

63 neg_flex_agent_config: AgentConfig 

64 indicator_agent_config: AgentConfig 

65 market_agent_config: AgentConfig 

66 # Modules 

67 simulator_module_config: SimulatorConfig 

68 baseline_module_config: BaseMPCConfig 

69 pos_flex_module_config: BaseMPCConfig 

70 neg_flex_module_config: BaseMPCConfig 

71 indicator_module_config: FlexibilityIndicatorModuleConfig 

72 market_module_config: FlexibilityMarketModuleConfig 

73 

74 # Dataframes 

75 df_simulation: pd.DataFrame 

76 df_baseline: pd.DataFrame 

77 df_pos_flex: pd.DataFrame 

78 df_neg_flex: pd.DataFrame 

79 df_indicator: pd.DataFrame 

80 df_market: pd.DataFrame 

81 

82 # Stats of the MPCs 

83 df_baseline_stats: pd.DataFrame 

84 df_pos_flex_stats: pd.DataFrame 

85 df_neg_flex_stats: pd.DataFrame 

86 

87 # time conversion 

88 current_timescale_of_data: TimeConversionTypes = "seconds" 

89 

90 def __init__( 

91 self, 

92 flex_config: Optional[Union[str, FilePath, dict]], 

93 simulator_agent_config: Optional[Union[str, FilePath, dict]], 

94 generated_flex_files_base_path: Optional[Union[str, FilePath]] = None, 

95 results: Optional[Union[str, FilePath, dict[str, dict[str, pd.DataFrame]], "Results"]] = None, 

96 to_timescale: TimeConversionTypes = "seconds", 

97 ): 

98 if isinstance(results, Results): 

99 # Already a Results instance — copy over its data 

100 self.__dict__ = copy.deepcopy(results).__dict__ 

101 return 

102 # if generated flex files are saved at a custom base directory and path is provided, 

103 # update and overwrite the path "flex_base_directory_path" in flex_config 

104 # By default: current working directory is used as base 

105 if generated_flex_files_base_path is not None: 

106 if isinstance(flex_config, (str, Path)): 

107 with open(flex_config, "r") as f: 

108 flex_config = json.load(f) 

109 flex_config["flex_base_directory_path"] = str(generated_flex_files_base_path) 

110 # load configs of agents and modules 

111 # Generator config 

112 self.generator_config = load_config.load_config( 

113 config=flex_config, config_type=FlexQuantConfig 

114 ) 

115 

116 # get names of the config files 

117 config_filename_baseline = BaselineMPCData.model_validate( 

118 self.generator_config.baseline_config_generator_data 

119 ).name_of_created_file 

120 config_filename_pos_flex = PFMPCData.model_validate( 

121 self.generator_config.shadow_mpc_config_generator_data.pos_flex 

122 ).name_of_created_file 

123 config_filename_neg_flex = NFMPCData.model_validate( 

124 self.generator_config.shadow_mpc_config_generator_data.neg_flex 

125 ).name_of_created_file 

126 config_filename_indicator = self.generator_config.indicator_config.name_of_created_file 

127 if self.generator_config.market_config: 

128 if self.generator_config.market_config is str or Path: 

129 config_filename_market = FlexibilityMarketConfig.parse_file( 

130 self.generator_config.market_config 

131 ).name_of_created_file 

132 else: 

133 config_filename_market = FlexibilityMarketConfig.model_validate( 

134 self.generator_config.market_config 

135 ).name_of_created_file 

136 

137 # load the agent and module configs 

138 if simulator_agent_config: 

139 # check config type: with results path adaptation -> dict; without -> str/Path 

140 if isinstance(simulator_agent_config, (str, Path)): 

141 with open(simulator_agent_config, "r") as f: 

142 sim_config = json.load(f) 

143 elif isinstance(simulator_agent_config, dict): 

144 sim_config = simulator_agent_config 

145 sim_module_config = next( 

146 (module for module in sim_config["modules"] if module["type"] == "simulator"), 

147 None 

148 ) 

149 # instantiate and validate sim agent config 

150 self.simulator_agent_config = AgentConfig.model_validate(sim_config) 

151 # instantiate sim module config by skipping validation for result_filename  

152 # to prevent file deletion 

153 self.simulator_module_config = self.create_instance_with_skipped_validation( 

154 model_class=SimulatorConfig, 

155 config=sim_module_config, 

156 skip_fields=["result_filename"] 

157 ) 

158 

159 for file_path in Path(self.generator_config.flex_files_directory).rglob("*.json"): 

160 if file_path.name in config_filename_baseline: 

161 self.baseline_agent_config = load_config.load_config( 

162 config=file_path, config_type=AgentConfig 

163 ) 

164 self.baseline_module_config = cmng.get_module( 

165 config=self.baseline_agent_config, 

166 module_type=cmng.BASELINEMPC_CONFIG_TYPE, 

167 ) 

168 

169 elif file_path.name in config_filename_pos_flex: 

170 self.pos_flex_agent_config = load_config.load_config( 

171 config=file_path, config_type=AgentConfig 

172 ) 

173 self.pos_flex_module_config = cmng.get_module( 

174 config=self.pos_flex_agent_config, 

175 module_type=cmng.SHADOWMPC_CONFIG_TYPE, 

176 ) 

177 

178 elif file_path.name in config_filename_neg_flex: 

179 self.neg_flex_agent_config = load_config.load_config( 

180 config=file_path, config_type=AgentConfig 

181 ) 

182 self.neg_flex_module_config = cmng.get_module( 

183 config=self.neg_flex_agent_config, 

184 module_type=cmng.SHADOWMPC_CONFIG_TYPE, 

185 ) 

186 

187 elif file_path.name in config_filename_indicator: 

188 self.indicator_agent_config = load_config.load_config( 

189 config=file_path, config_type=AgentConfig 

190 ) 

191 self.indicator_module_config = cmng.get_module( 

192 config=self.indicator_agent_config, 

193 module_type=cmng.INDICATOR_CONFIG_TYPE, 

194 ) 

195 

196 elif ( 

197 self.generator_config.market_config 

198 and file_path.name in config_filename_market 

199 ): 

200 self.market_agent_config = load_config.load_config( 

201 config=file_path, config_type=AgentConfig 

202 ) 

203 self.market_module_config = cmng.get_module( 

204 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

205 ) 

206 

207 # load results 

208 if results is None: 

209 results_path = self.generator_config.results_directory 

210 results = self._load_results(res_path=results_path) 

211 if isinstance(results, (str, Path)): 

212 results_path = results 

213 results = self._load_results(res_path=results_path) 

214 elif isinstance(results, dict): 

215 results_path = self.generator_config.results_directory 

216 else: 

217 raise ValueError("results must be a path or dict") 

218 

219 # Get result dataframes 

220 if simulator_agent_config: 

221 self.df_simulation = results[self.simulator_agent_config.id][ 

222 self.simulator_module_config.module_id 

223 ] 

224 self.df_baseline = results[self.baseline_agent_config.id][ 

225 self.baseline_module_config.module_id 

226 ] 

227 self.df_pos_flex = results[self.pos_flex_agent_config.id][ 

228 self.pos_flex_module_config.module_id 

229 ] 

230 self.df_neg_flex = results[self.neg_flex_agent_config.id][ 

231 self.neg_flex_module_config.module_id 

232 ] 

233 self.df_indicator = results[self.indicator_agent_config.id][ 

234 self.indicator_module_config.module_id 

235 ] 

236 if self.generator_config.market_config: 

237 self.df_market = results[self.market_agent_config.id][ 

238 self.market_module_config.module_id 

239 ] 

240 else: 

241 self.df_market = None 

242 

243 # Load the statistics 

244 self.df_baseline_stats = load_mpc_stats( 

245 Path( 

246 results_path, 

247 Path( 

248 self.baseline_module_config.optimization_backend["results_file"] 

249 ).name, 

250 ) 

251 ) 

252 self.df_pos_flex_stats = load_mpc_stats( 

253 Path( 

254 results_path, 

255 Path( 

256 self.pos_flex_module_config.optimization_backend["results_file"] 

257 ).name, 

258 ) 

259 ) 

260 self.df_neg_flex_stats = load_mpc_stats( 

261 Path( 

262 results_path, 

263 Path( 

264 self.neg_flex_module_config.optimization_backend["results_file"] 

265 ).name, 

266 ) 

267 ) 

268 

269 # Convert the time in the dataframes to the desired timescale 

270 self.convert_timescale_of_dataframe_index(to_timescale=to_timescale) 

271 

272 def _load_results( 

273 self, res_path: Union[str, Path] 

274 ) -> dict[str, dict[str, pd.DataFrame]]: 

275 res = { 

276 self.baseline_agent_config.id: { 

277 self.baseline_module_config.module_id: load_mpc( 

278 Path( 

279 res_path, 

280 Path( 

281 self.baseline_module_config.optimization_backend[ 

282 "results_file" 

283 ] 

284 ).name, 

285 ) 

286 ) 

287 }, 

288 self.pos_flex_agent_config.id: { 

289 self.pos_flex_module_config.module_id: load_mpc( 

290 Path( 

291 res_path, 

292 Path( 

293 self.pos_flex_module_config.optimization_backend[ 

294 "results_file" 

295 ] 

296 ).name, 

297 ) 

298 ) 

299 }, 

300 self.neg_flex_agent_config.id: { 

301 self.neg_flex_module_config.module_id: load_mpc( 

302 Path( 

303 res_path, 

304 Path( 

305 self.neg_flex_module_config.optimization_backend[ 

306 "results_file" 

307 ] 

308 ).name, 

309 ) 

310 ) 

311 }, 

312 self.indicator_agent_config.id: { 

313 self.indicator_module_config.module_id: load_indicator( 

314 Path( 

315 res_path, 

316 Path(self.indicator_module_config.results_file).name, 

317 ) 

318 ) 

319 } 

320 } 

321 if self.simulator_agent_config: 

322 res[self.simulator_agent_config.id] = { 

323 self.simulator_module_config.module_id: load_sim( 

324 Path( 

325 res_path, 

326 Path(self.simulator_module_config.result_filename).name, 

327 ) 

328 ) 

329 } 

330 if self.generator_config.market_config: 

331 res[self.market_agent_config.id] = { 

332 self.market_module_config.module_id: load_market( 

333 Path( 

334 res_path, 

335 Path(self.market_module_config.results_file).name, 

336 ) 

337 ) 

338 } 

339 return res 

340 

341 def convert_timescale_of_dataframe_index(self, to_timescale: TimeConversionTypes): 

342 """Convert the time in the dataframes to the desired timescale 

343 

344 Keyword arguments: 

345 timescale -- The timescale to convert the data to 

346 """ 

347 # Convert the time in the dataframes 

348 for df in ([ 

349 self.df_baseline, 

350 self.df_baseline_stats, 

351 self.df_pos_flex, 

352 self.df_pos_flex_stats, 

353 self.df_neg_flex, 

354 self.df_neg_flex_stats, 

355 self.df_indicator, 

356 ] + ([self.df_market] if self.generator_config.market_config else []) + 

357 ([self.df_simulation] if self.simulator_agent_config else [])): 

358 convert_timescale_of_index( 

359 df=df, from_unit=self.current_timescale_of_data, to_unit=to_timescale 

360 ) 

361 

362 # Update current unit 

363 self.current_timescale_of_data = to_timescale 

364 

365 def get_intersection_mpcs_sim(self) -> dict[str, dict[str, str]]: 

366 """ 

367 Get the intersection of the MPCs and the simulator variables. 

368 returns a dictionary with the following structure: 

369 Key: variable alias (from baseline) 

370 Value: {module id: variable name} 

371 """ 

372 id_alias_name_dict = {} 

373 

374 def get_id_alias_name_dict_element(alias: str): 

375 # id as key, {id: name} as value 

376 id_alias_name_dict[alias] = {} 

377 for config in [ 

378 self.simulator_module_config, 

379 self.baseline_module_config, 

380 self.pos_flex_module_config, 

381 self.neg_flex_module_config, 

382 ]: 

383 for var in config.get_variables(): 

384 if var.alias == alias or var.name == alias: 

385 id_alias_name_dict[alias][config.module_id] = var.name 

386 

387 # States, controls and power variable 

388 for variables in [ 

389 self.baseline_module_config.states, 

390 self.baseline_module_config.controls, 

391 ]: 

392 for variable in variables: 

393 get_id_alias_name_dict_element(variable.alias) 

394 get_id_alias_name_dict_element( 

395 self.generator_config.baseline_config_generator_data.power_variable 

396 ) 

397 

398 return id_alias_name_dict 

399 

400 def create_instance_with_skipped_validation( 

401 self, 

402 model_class: Type[BaseModel], 

403 config: Dict[str, Any], 

404 skip_fields: Optional[List[str]] = None 

405 ) -> BaseModel: 

406 """ 

407 Create a Pydantic model instance while skipping validation for specified fields. 

408 

409 This function allows partial validation of a model's config dictionary by validating  

410 all fields except those listed in `skip_fields`. Skipped fields are set on the instance  

411 after construction without triggering their validators. 

412 

413 Args: 

414 model_class (Type[BaseModel]): The Pydantic model class to instantiate. 

415 config (Dict[str, Any]): The input configuration dictionary. 

416 skip_fields (Optional[List[str]]): A list of field names to exclude from validation.  

417 These fields will be manually set after instantiation. 

418 

419 Returns: 

420 BaseModel: An instance of the model_class with validated and skipped fields assigned. 

421 """ 

422 if skip_fields is None: 

423 skip_fields = [] 

424 # Separate data into validated and skipped fields 

425 validated_fields = {field: value for field, value in config.items() if field not in skip_fields} 

426 skipped_fields = {field: value for field, value in config.items() if field in skip_fields} 

427 # Create instance with validation for non-skipped fields 

428 if validated_fields: 

429 instance = model_class( 

430 **validated_fields, 

431 _agent_id=self.simulator_agent_config.id 

432 ) 

433 else: 

434 instance = model_class.model_construct() 

435 # Add skipped fields without validation 

436 for field, value in skipped_fields.items(): 

437 # bypass pydantic immutability to directly set attribute value 

438 object.__setattr__(instance, field, value) 

439 # Store metadata about bypassed fields for deepcopy compatibility 

440 object.__setattr__(instance, '_bypassed_fields', skip_fields) 

441 object.__setattr__(instance, '_original_config', config) 

442 return instance 

443 

444 def __deepcopy__(self, memo: Dict[int, Any]) -> "Results": 

445 """ 

446 Custom deepcopy implementation that handles Pydantic models with bypassed validation. 

447 """ 

448 # Create a new instance of the same class 

449 new_instance = self.__class__.__new__(self.__class__) 

450 # Add to memo immediately to prevent circular reference issues 

451 memo[id(self)] = new_instance 

452 for key, value in self.__dict__.items(): 

453 if key in ['simulator_module_config'] and hasattr(value, '_original_config'): 

454 # Reconstruct the specific problematic object instead of deepcopying 

455 new_value = self.create_instance_with_skipped_validation( 

456 model_class=value.__class__, 

457 config=copy.deepcopy(value._original_config, memo), 

458 skip_fields=getattr(value, '_bypassed_fields', []) 

459 ) 

460 setattr(new_instance, key, new_value) 

461 else: 

462 # Everything else should deepcopy normally 

463 setattr(new_instance, key, copy.deepcopy(value, memo)) 

464 return new_instance