Coverage for agentlib_flexquant/generate_flex_agents.py: 86%

255 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-15 15:25 +0000

1import ast 

2import atexit 

3import inspect 

4import logging 

5import os 

6import astor 

7import black 

8import json 

9from copy import deepcopy 

10from pathlib import Path 

11from typing import List, Union 

12from pydantic import FilePath 

13from agentlib.core.agent import AgentConfig 

14from agentlib.core.datamodels import AgentVariable 

15from agentlib.core.errors import ConfigurationError 

16from agentlib.core.module import BaseModuleConfig 

17from agentlib.utils import custom_injection, load_config 

18from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable 

19from agentlib_mpc.models.casadi_model import CasadiModelConfig 

20from agentlib_mpc.modules.mpc_full import MPCConfig 

21import agentlib_flexquant.data_structures.globals as glbs 

22import agentlib_flexquant.utils.config_management as cmng 

23from agentlib_flexquant.utils.parsing import SetupSystemModifier 

24from agentlib_flexquant.data_structures.flexquant import FlexibilityIndicatorConfig, FlexibilityMarketConfig, FlexQuantConfig 

25from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData 

26from agentlib_flexquant.modules.flexibility_indicator import FlexibilityIndicatorModuleConfig 

27from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

28 

29 

30class FlexAgentGenerator: 

31 """Class for generating the flex agents 

32 

33 orig_mpc_module_config: the config for the original mpc, which has nothing to do with the flexibility quantification 

34 baseline_mpc_module_config: the config for the baseline mpc for flexibility quantification 

35 pos_flex_mpc_module_config: the config for the positive flexibility mpc for flexibility quantification 

36 neg_flex_mpc_module_config: the config for the negative flexibility mpc for flexibility quantification 

37 indicator_module_config: the config for the indicator for flexibility quantification 

38 market_module_config: the config for the market for flexibility quantification 

39 

40 """ 

41 

42 orig_mpc_module_config: MPCConfig 

43 baseline_mpc_module_config: MPCConfig 

44 pos_flex_mpc_module_config: MPCConfig 

45 neg_flex_mpc_module_config: MPCConfig 

46 indicator_module_config: FlexibilityIndicatorModuleConfig 

47 market_module_config: FlexibilityMarketModuleConfig 

48 

49 def __init__( 

50 self, 

51 flex_config: Union[str, FilePath, FlexQuantConfig], 

52 mpc_agent_config: Union[str, FilePath, AgentConfig], 

53 ): 

54 self.logger = logging.getLogger(__name__) 

55 

56 if isinstance(flex_config, str or FilePath): 

57 self.flex_config_file_name = os.path.basename(flex_config) 

58 else: 

59 # provide default name for json 

60 self.flex_config_file_name = "flex_config.json" 

61 # load configs 

62 self.flex_config = load_config.load_config( 

63 flex_config, config_type=FlexQuantConfig 

64 ) 

65 

66 # original mpc agent 

67 self.orig_mpc_agent_config = load_config.load_config( 

68 mpc_agent_config, config_type=AgentConfig 

69 ) 

70 # baseline agent 

71 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

72 # pos agent 

73 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

74 # neg agent 

75 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

76 

77 # original mpc module 

78 self.orig_mpc_module_config = cmng.get_module( 

79 config=self.orig_mpc_agent_config, 

80 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

81 ) 

82 # baseline module 

83 self.baseline_mpc_module_config = cmng.get_module( 

84 config=self.baseline_mpc_agent_config, 

85 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

86 ) 

87 # pos module 

88 self.pos_flex_mpc_module_config = cmng.get_module( 

89 config=self.pos_flex_mpc_agent_config, 

90 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

91 ) 

92 # neg module 

93 self.neg_flex_mpc_module_config = cmng.get_module( 

94 config=self.neg_flex_mpc_agent_config, 

95 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

96 ) 

97 # load indicator config 

98 self.indicator_config = load_config.load_config( 

99 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig 

100 ) 

101 # load indicator module config 

102 self.indicator_agent_config = load_config.load_config( 

103 self.indicator_config.agent_config, config_type=AgentConfig 

104 ) 

105 self.indicator_module_config = cmng.get_module( 

106 config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE 

107 ) 

108 # load market config 

109 if self.flex_config.market_config: 

110 self.market_config = load_config.load_config( 

111 self.flex_config.market_config, config_type=FlexibilityMarketConfig 

112 ) 

113 # load market module config 

114 self.market_agent_config = load_config.load_config( 

115 self.market_config.agent_config, config_type=AgentConfig 

116 ) 

117 self.market_module_config = cmng.get_module( 

118 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

119 ) 

120 else: 

121 self.flex_config.market_time = 0 

122 

123 self.run_config_validations() 

124 

125 def generate_flex_agents(self) -> list[str]: 

126 """Generate the configs and the python module for the flexibility agents. 

127 

128 Returns: 

129 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator and market config 

130 

131 """ 

132 # adapt modules to include necessary communication variables 

133 baseline_mpc_config = self.adapt_mpc_module_config( 

134 module_config=self.baseline_mpc_module_config, 

135 mpc_dataclass=self.flex_config.baseline_config_generator_data, 

136 ) 

137 pf_mpc_config = self.adapt_mpc_module_config( 

138 module_config=self.pos_flex_mpc_module_config, 

139 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

140 ) 

141 nf_mpc_config = self.adapt_mpc_module_config( 

142 module_config=self.neg_flex_mpc_module_config, 

143 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

144 ) 

145 indicator_module_config = self.adapt_indicator_config( 

146 module_config=self.indicator_module_config 

147 ) 

148 if self.flex_config.market_config: 

149 market_module_config = self.adapt_market_config( 

150 module_config=self.market_module_config 

151 ) 

152 

153 # dump jsons of the agents including the adapted module configs 

154 self.append_module_and_dump_agent( 

155 module=baseline_mpc_config, 

156 agent=self.baseline_mpc_agent_config, 

157 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

158 config_name=self.flex_config.baseline_config_generator_data.name_of_created_file, 

159 ) 

160 self.append_module_and_dump_agent( 

161 module=pf_mpc_config, 

162 agent=self.pos_flex_mpc_agent_config, 

163 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

164 config_name=self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file, 

165 ) 

166 self.append_module_and_dump_agent( 

167 module=nf_mpc_config, 

168 agent=self.neg_flex_mpc_agent_config, 

169 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

170 config_name=self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file, 

171 ) 

172 self.append_module_and_dump_agent( 

173 module=indicator_module_config, 

174 agent=self.indicator_agent_config, 

175 module_type=cmng.INDICATOR_CONFIG_TYPE, 

176 config_name=self.indicator_config.name_of_created_file, 

177 ) 

178 if self.flex_config.market_config: 

179 self.append_module_and_dump_agent( 

180 module=market_module_config, 

181 agent=self.market_agent_config, 

182 module_type=cmng.MARKET_CONFIG_TYPE, 

183 config_name=self.market_config.name_of_created_file, 

184 ) 

185 

186 # generate python files for the shadow mpcs 

187 self._generate_flex_model_definition() 

188 

189 # save flex config to created flex files 

190 with open(os.path.join(self.flex_config.flex_files_directory, self.flex_config_file_name), "w") as f: 

191 config_json = self.flex_config.model_dump_json(exclude_defaults=True) 

192 f.write(config_json) 

193 

194 # register the exit function if the corresponding flag is set 

195 if self.flex_config.delete_files: 

196 atexit.register(lambda: self._delete_created_files()) 

197 return self.get_config_file_paths() 

198 

199 def append_module_and_dump_agent( 

200 self, 

201 module: BaseModuleConfig, 

202 agent: AgentConfig, 

203 module_type: str, 

204 config_name: str, 

205 ): 

206 """Append the given module config to the given agent config and dumps the agent config to a json file. 

207 

208 The json file is named based on the config_name. 

209 

210 Args: 

211 module: The module config to be appended. 

212 agent: The agent config to be updated. 

213 module_type: The type of the module 

214 config_name: The name of the json file for module config (e.g. baseline.json) 

215 

216 """ 

217 # if module is not from the baseline, set a new agent id, based on module id 

218 if module.type is not self.baseline_mpc_module_config.type: 

219 agent.id = module.module_id 

220 # get the module as a dict without default values 

221 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module) 

222 # write given module to agent config 

223 for i, agent_module in enumerate(agent.modules): 

224 if ( 

225 cmng.MODULE_TYPE_DICT[module_type] 

226 is cmng.MODULE_TYPE_DICT[agent_module["type"]] 

227 ): 

228 agent.modules[i] = module_dict 

229 

230 # dump agent config 

231 if agent.modules: 

232 if self.flex_config.overwrite_files: 

233 try: 

234 Path( 

235 os.path.join(self.flex_config.flex_files_directory, config_name) 

236 ).unlink() 

237 except OSError: 

238 pass 

239 with open( 

240 os.path.join(self.flex_config.flex_files_directory, config_name), "w+" 

241 ) as f: 

242 module_json = agent.model_dump_json(exclude_defaults=True) 

243 f.write(module_json) 

244 else: 

245 logging.error("Provided agent config does not contain any modules.") 

246 

247 def get_config_file_paths(self) -> List[str]: 

248 """Return a list of paths with the created config files.""" 

249 paths = [ 

250 os.path.join( 

251 self.flex_config.flex_files_directory, 

252 self.flex_config.baseline_config_generator_data.name_of_created_file, 

253 ), 

254 os.path.join( 

255 self.flex_config.flex_files_directory, 

256 self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file, 

257 ), 

258 os.path.join( 

259 self.flex_config.flex_files_directory, 

260 self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file, 

261 ), 

262 os.path.join( 

263 self.flex_config.flex_files_directory, 

264 self.indicator_config.name_of_created_file, 

265 ), 

266 ] 

267 if self.flex_config.market_config: 

268 paths.append( 

269 os.path.join( 

270 self.flex_config.flex_files_directory, 

271 self.market_config.name_of_created_file, 

272 ) 

273 ) 

274 return paths 

275 

276 def _delete_created_files(self): 

277 """Function to run at exit if the files are to be deleted.""" 

278 to_be_deleted = self.get_config_file_paths() 

279 to_be_deleted.append( 

280 os.path.join( 

281 self.flex_config.flex_files_directory, 

282 self.flex_config_file_name, 

283 )) 

284 # delete files 

285 for file in to_be_deleted: 

286 Path(file).unlink() 

287 # also delete folder 

288 Path(self.flex_config.flex_files_directory).rmdir() 

289 

290 def adapt_mpc_module_config( 

291 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData 

292 ) -> MPCConfig: 

293 """Adapt the mpc module config for automated flexibility quantification. 

294 

295 Things adapted among others are: 

296 - the file name/path of the mpc config file 

297 - names of the control variables for the shadow mpcs 

298 - reduce communicated variables of shadow mpcs to outputs 

299 - add the power variable to the outputs 

300 - add parameters for the activation and quantification of flexibility 

301 

302 Args: 

303 module_config: The module config to be adapted 

304 mpc_dataclass: The dataclass corresponding to the type of the MPC module. 

305 It contains all the extra data necessary for flexibility quantification, which will be used to update the module_config. 

306 

307 Returns: 

308 The adapted module config 

309 

310 """ 

311 # allow the module config to be changed 

312 module_config.model_config["frozen"] = False 

313 

314 module_config.module_id = mpc_dataclass.module_id 

315 

316 # append the new weights as parameter to the MPC or update its value 

317 parameter_dict = { 

318 parameter.name: parameter for parameter in module_config.parameters 

319 } 

320 for weight in mpc_dataclass.weights: 

321 if weight.name in parameter_dict: 

322 parameter_dict[weight.name].value = weight.value 

323 else: 

324 module_config.parameters.append(weight) 

325 

326 # set new MPC type 

327 module_config.type = mpc_dataclass.module_types[ 

328 cmng.get_orig_module_type(self.orig_mpc_agent_config) 

329 ] 

330 # set new id (needed for plotting) 

331 module_config.module_id = mpc_dataclass.module_id 

332 # update optimization backend to use the created mpc files and classes 

333 module_config.optimization_backend["model"]["type"] = { 

334 "file": os.path.join( 

335 self.flex_config.flex_files_directory, 

336 mpc_dataclass.created_flex_mpcs_file, 

337 ), 

338 "class_name": mpc_dataclass.class_name, 

339 } 

340 # extract filename from results file and update it with suffix and parent directory 

341 result_filename = Path( 

342 module_config.optimization_backend["results_file"] 

343 ).name.replace(".csv", mpc_dataclass.results_suffix) 

344 full_path = ( 

345 self.flex_config.results_directory 

346 / result_filename 

347 ) 

348 module_config.optimization_backend["results_file"] = str(full_path) 

349 # change cia backend to custom backend of flexquant 

350 if module_config.optimization_backend["type"] == "casadi_cia": 

351 module_config.optimization_backend["type"] = "casadi_cia_cons" 

352 module_config.optimization_backend["market_time"] = ( 

353 self.flex_config.market_time 

354 ) 

355 

356 # add the control signal of the baseline to outputs (used during market time) 

357 # and as inputs for the shadow mpcs 

358 if type(mpc_dataclass) is not BaselineMPCData: 

359 for control in module_config.controls: 

360 module_config.inputs.append( 

361 MPCVariable( 

362 name=glbs.full_trajectory_prefix 

363 + control.name 

364 + glbs.full_trajectory_suffix, 

365 value=control.value, 

366 ) 

367 ) 

368 # also include binary controls 

369 if hasattr(module_config, "binary_controls"): 

370 for control in module_config.binary_controls: 

371 module_config.inputs.append( 

372 MPCVariable( 

373 name=glbs.full_trajectory_prefix 

374 + control.name 

375 + glbs.full_trajectory_suffix, 

376 value=control.value, 

377 ) 

378 ) 

379 

380 # only communicate outputs for the shadow mpcs 

381 module_config.shared_variable_fields = ["outputs"] 

382 else: 

383 for control in module_config.controls: 

384 module_config.outputs.append( 

385 MPCVariable( 

386 name=glbs.full_trajectory_prefix 

387 + control.name 

388 + glbs.full_trajectory_suffix, 

389 value=control.value, 

390 ) 

391 ) 

392 # also include binary controls 

393 if hasattr(module_config, "binary_controls"): 

394 for control in module_config.binary_controls: 

395 module_config.outputs.append( 

396 MPCVariable( 

397 name=glbs.full_trajectory_prefix 

398 + control.name 

399 + glbs.full_trajectory_suffix, 

400 value=control.value, 

401 ) 

402 ) 

403 module_config.set_outputs = True 

404 # add outputs for the power variables, for easier handling create a lookup dict 

405 output_dict = {output.name: output for output in module_config.outputs} 

406 if ( 

407 self.flex_config.baseline_config_generator_data.power_variable 

408 in output_dict 

409 ): 

410 output_dict[ 

411 self.flex_config.baseline_config_generator_data.power_variable 

412 ].alias = mpc_dataclass.power_alias 

413 else: 

414 module_config.outputs.append( 

415 MPCVariable( 

416 name=self.flex_config.baseline_config_generator_data.power_variable, 

417 alias=mpc_dataclass.power_alias, 

418 ) 

419 ) 

420 # add or change alias for stored energy variable 

421 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

422 output_dict[ 

423 self.indicator_module_config.correct_costs.stored_energy_variable 

424 ].alias = mpc_dataclass.stored_energy_alias 

425 

426 # add extra inputs needed for activation of flex 

427 module_config.inputs.extend(mpc_dataclass.config_inputs_appendix) 

428 # CONFIG_PARAMETERS_APPENDIX only includes dummy values 

429 # overwrite dummy values with values from flex config and append it to module config 

430 for var in mpc_dataclass.config_parameters_appendix: 

431 if var.name in self.flex_config.model_fields: 

432 var.value = getattr(self.flex_config, var.name) 

433 if var.name in self.flex_config.baseline_config_generator_data.model_fields: 

434 var.value = getattr(self.flex_config.baseline_config_generator_data, var.name) 

435 module_config.parameters.extend(mpc_dataclass.config_parameters_appendix) 

436 

437 # freeze the config again 

438 module_config.model_config["frozen"] = True 

439 

440 return module_config 

441 

442 def adapt_indicator_config( 

443 self, module_config: FlexibilityIndicatorModuleConfig 

444 ) -> FlexibilityIndicatorModuleConfig: 

445 """Adapt the indicator module config for automated flexibility quantification.""" 

446 # append user-defined price var to indicator module config 

447 module_config.inputs.append( 

448 AgentVariable( 

449 name=module_config.price_variable, 

450 unit="ct/kWh", 

451 type="pd.Series", 

452 description="electricity price" 

453 ) 

454 ) 

455 # allow the module config to be changed 

456 module_config.model_config["frozen"] = False 

457 for parameter in module_config.parameters: 

458 if parameter.name == glbs.PREP_TIME: 

459 parameter.value = self.flex_config.prep_time 

460 if parameter.name == glbs.MARKET_TIME: 

461 parameter.value = self.flex_config.market_time 

462 if parameter.name == glbs.FLEX_EVENT_DURATION: 

463 parameter.value = self.flex_config.flex_event_duration 

464 if parameter.name == "time_step": 

465 parameter.value = self.baseline_mpc_module_config.time_step 

466 if parameter.name == "prediction_horizon": 

467 parameter.value = self.baseline_mpc_module_config.prediction_horizon 

468 

469 # set power unit 

470 module_config.power_unit = ( 

471 self.flex_config.baseline_config_generator_data.power_unit 

472 ) 

473 module_config.results_file = ( 

474 self.flex_config.results_directory 

475 / module_config.results_file.name 

476 ) 

477 module_config.model_config["frozen"] = True 

478 return module_config 

479 

480 def adapt_market_config( 

481 self, module_config: FlexibilityMarketModuleConfig 

482 ) -> FlexibilityMarketModuleConfig: 

483 """Adapt the market module config for automated flexibility quantification.""" 

484 # allow the module config to be changed 

485 module_config.model_config["frozen"] = False 

486 for field in module_config.__fields__: 

487 if field in self.market_module_config.__fields__.keys(): 

488 module_config.__setattr__( 

489 field, getattr(self.market_module_config, field) 

490 ) 

491 module_config.results_file = ( 

492 self.flex_config.results_directory 

493 / module_config.results_file.name 

494 ) 

495 module_config.model_config["frozen"] = True 

496 return module_config 

497 

498 def _generate_flex_model_definition(self): 

499 """Generate a python module for negative and positive flexibility agents from the Baseline MPC model.""" 

500 output_file = os.path.join( 

501 self.flex_config.flex_files_directory, 

502 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

503 ) 

504 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"] 

505 

506 # Extract the config class of the casadi model to check cost functions 

507 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"] 

508 config_instance = config_class() 

509 self.check_variables_in_casadi_config( 

510 config_instance, 

511 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function, 

512 ) 

513 self.check_variables_in_casadi_config( 

514 config_instance, 

515 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function, 

516 ) 

517 

518 # parse mpc python file 

519 with open(opt_backend["file"], "r") as f: 

520 source = f.read() 

521 tree = ast.parse(source) 

522 

523 # create modifiers for python file 

524 modifier_base = SetupSystemModifier( 

525 mpc_data=self.flex_config.baseline_config_generator_data, 

526 controls=self.baseline_mpc_module_config.controls, 

527 binary_controls=self.baseline_mpc_module_config.binary_controls if hasattr(self.baseline_mpc_module_config, "binary_controls") else None, 

528 ) 

529 modifier_pos = SetupSystemModifier( 

530 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

531 controls=self.pos_flex_mpc_module_config.controls, 

532 binary_controls=self.pos_flex_mpc_module_config.binary_controls if hasattr(self.pos_flex_mpc_module_config, "binary_controls") else None, 

533 ) 

534 modifier_neg = SetupSystemModifier( 

535 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

536 controls=self.neg_flex_mpc_module_config.controls, 

537 binary_controls=self.neg_flex_mpc_module_config.binary_controls if hasattr(self.neg_flex_mpc_module_config, "binary_controls") else None, 

538 ) 

539 # run the modification 

540 modified_tree_base = modifier_base.visit(deepcopy(tree)) 

541 modified_tree_pos = modifier_pos.visit(deepcopy(tree)) 

542 modified_tree_neg = modifier_neg.visit(deepcopy(tree)) 

543 # combine modifications to one file 

544 modified_tree = ast.Module(body=[], type_ignores=[]) 

545 modified_tree.body.extend( 

546 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body 

547 ) 

548 modified_source = astor.to_source(modified_tree) 

549 # Use black to format the generated code 

550 formatted_code = black.format_str(modified_source, mode=black.FileMode()) 

551 

552 if self.flex_config.overwrite_files: 

553 try: 

554 Path( 

555 os.path.join( 

556 self.flex_config.flex_files_directory, 

557 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

558 ) 

559 ).unlink() 

560 except OSError: 

561 pass 

562 

563 with open(output_file, "w") as f: 

564 f.write(formatted_code) 

565 

566 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str): 

567 """Check if all variables in the expression are defined in the config. 

568 

569 Args: 

570 config: casadi model config. 

571 expr: The expression to check. 

572 

573 Raises: 

574 ValueError: If any variable in the expression is not defined in the config. 

575 

576 """ 

577 variables_in_config = set(config.get_variable_names()) 

578 variables_in_cost_function = set(ast.walk(ast.parse(expr))) 

579 variables_in_cost_function = { 

580 node.attr 

581 for node in variables_in_cost_function 

582 if isinstance(node, ast.Attribute) 

583 } 

584 variables_newly_created = set( 

585 weight.name 

586 for weight in self.flex_config.shadow_mpc_config_generator_data.weights 

587 ) 

588 unknown_vars = ( 

589 variables_in_cost_function - variables_in_config - variables_newly_created 

590 ) 

591 if unknown_vars: 

592 raise ValueError(f"Unknown variables in new cost function: {unknown_vars}") 

593 

594 def run_config_validations(self): 

595 """Function to validate integrity of user-supplied flex config. 

596 

597 The following checks are performed: 

598 1. Ensures the specified power variable exists in the MPC model outputs. 

599 2. Ensures the specified comfort variable exists in the MPC model states. 

600 3. Validates that the stored energy variable exists in MPC outputs if energy cost correction is enabled. 

601 4. Verifies the supported collocation method is used; otherwise, switches to 'legendre' and raises a warning. 

602 5. Ensures that the sum of prep time, market time, and flex event duration does not exceed the prediction horizon. 

603 6. Ensures market time equals the MPC model time step if market config is present. 

604 7. Ensures that all flex time values are multiples of the MPC model time step. 

605 8. Checks for mismatches between time-related parameters in the flex/MPC and indicator configs and issues warnings 

606 when discrepancies exist, using the flex/MPC config values as the source of truth. 

607 

608 """ 

609 # check if the power variable exists in the mpc config 

610 if self.flex_config.baseline_config_generator_data.power_variable not in [ 

611 output.name for output in self.baseline_mpc_module_config.outputs 

612 ]: 

613 raise ConfigurationError( 

614 f"Given power variable {self.flex_config.baseline_config_generator_data.power_variable} is not defined as output in baseline mpc config." 

615 ) 

616 

617 # check if the comfort variable exists in the mpc slack variables 

618 if self.flex_config.baseline_config_generator_data.comfort_variable: 

619 file_path = self.baseline_mpc_module_config.optimization_backend["model"]["type"]["file"] 

620 class_name = self.baseline_mpc_module_config.optimization_backend["model"]["type"]["class_name"] 

621 # Get the class 

622 dynamic_class = cmng.get_class_from_file(file_path, class_name) 

623 if self.flex_config.baseline_config_generator_data.comfort_variable not in [ 

624 state.name for state in dynamic_class().states 

625 ]: 

626 raise ConfigurationError( 

627 f"Given comfort variable {self.flex_config.baseline_config_generator_data.comfort_variable} is not defined as state in baseline mpc config." 

628 ) 

629 

630 # check if the energy storage variable exists in the mpc config 

631 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

632 if self.indicator_module_config.correct_costs.stored_energy_variable not in [ 

633 output.name for output in self.baseline_mpc_module_config.outputs 

634 ]: 

635 raise ConfigurationError( 

636 f"The stored energy variable {self.indicator_module_config.correct_costs.stored_energy_variable} is not defined in baseline mpc config. " 

637 f"It must be defined in the base MPC model and config as output if the correction of costs is enabled." 

638 ) 

639 

640 # raise warning if unsupported collocation method is used and change to supported method 

641 if self.baseline_mpc_module_config.optimization_backend["discretization_options"]["collocation_method"] != "legendre": 

642 self.logger.warning(f'Collocation method {self.baseline_mpc_module_config.optimization_backend["discretization_options"]["collocation_method"]} is not supported. ' 

643 f'Switching to method legendre.') 

644 self.baseline_mpc_module_config.optimization_backend["discretization_options"]["collocation_method"] = "legendre" 

645 self.pos_flex_mpc_module_config.optimization_backend["discretization_options"]["collocation_method"] = "legendre" 

646 self.neg_flex_mpc_module_config.optimization_backend["discretization_options"]["collocation_method"] = "legendre" 

647 

648 #time data validations 

649 flex_times = { 

650 glbs.PREP_TIME: self.flex_config.prep_time, 

651 glbs.MARKET_TIME: self.flex_config.market_time, 

652 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration 

653 } 

654 mpc_times = { 

655 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step, 

656 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon 

657 } 

658 # total time length check (prep+market+flex_event) 

659 if sum(flex_times.values()) > mpc_times["time_step"] * mpc_times["prediction_horizon"]: 

660 raise ConfigurationError(f'Market time + prep time + flex event duration can not exceed the prediction horizon.') 

661 # market time val check 

662 if self.flex_config.market_config: 

663 if flex_times["market_time"] != mpc_times["time_step"]: 

664 raise ConfigurationError(f'Market time must be equal to the time step.') 

665 # check for divisibility of flex_times by time_step  

666 for name, value in flex_times.items(): 

667 if value % mpc_times["time_step"] != 0: 

668 raise ConfigurationError(f'{name} is not a multiple of the time step. Please redefine.') 

669 # raise warning if parameter value in flex indicator module config differs from value in flex config/ baseline mpc module config 

670 for parameter in self.indicator_module_config.parameters: 

671 if parameter.value is not None: 

672 if parameter.name in flex_times: 

673 flex_value = flex_times[parameter.name] 

674 if parameter.value != flex_value: 

675 self.logger.warning(f'Value mismatch for {parameter.name} in flex config (field) and indicator module config (parameter). ' 

676 f'Flex config value will be used.') 

677 elif parameter.name in mpc_times: 

678 mpc_value = mpc_times[parameter.name] 

679 if parameter.value != mpc_value: 

680 self.logger.warning(f'Value mismatch for {parameter.name} in baseline MPC module config (field) and indicator module config (parameter). ' 

681 f'Baseline MPC module config value will be used.') 

682 

683 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path]) -> dict: 

684 """Optional helper function to adapt file path for simulator results in sim config, 

685 so that sim results land in the same results directory as flex results. 

686 

687 Args: 

688 simulator_agent_config: Path to the simulator agent config JSON file. 

689 

690 Returns: 

691 The updated simulator config dictionary with the modified result file path. 

692  

693 Raises: 

694 FileNotFoundError: If the specified config file does not exist. 

695 

696 """ 

697 # open config and extract sim module 

698 with open(simulator_agent_config, "r") as f: 

699 sim_config = json.load(f) 

700 sim_module_config = next( 

701 (module for module in sim_config["modules"] if module["type"] == "simulator"), 

702 None 

703 ) 

704 # convert filename string to path and extract the name 

705 sim_file_name = Path(sim_module_config["result_filename"]).name 

706 # set results path so that sim results lands in same directory as flex result CSVs 

707 sim_module_config["result_filename"] = str(self.flex_config.results_directory / sim_file_name) 

708 return sim_config