Coverage for agentlib_flexquant/generate_flex_agents.py: 90%

339 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-03-26 09:43 +0000

1"""Generate agents for flexibility quantification. 

2 

3This module provides the FlexAgentGenerator class that creates and configures 

4flexibility agents. 

5The agents created include the baseline, positive and negative flexibility agents, 

6the flexibility indicator and market agents. The agents are created based on the 

7flex config and the MPC config. 

8""" 

9import ast 

10import atexit 

11import inspect 

12import logging 

13import os 

14 

15import astor 

16import black 

17import json 

18import numpy as np 

19from copy import deepcopy 

20from pathlib import Path 

21from typing import Union 

22from pydantic import FilePath 

23from agentlib.core.agent import AgentConfig 

24from agentlib.core.datamodels import AgentVariable 

25from agentlib.core.errors import ConfigurationError 

26from agentlib.core.module import BaseModuleConfig 

27from agentlib.utils import custom_injection, load_config 

28from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable 

29from agentlib_mpc.models.casadi_model import CasadiModelConfig 

30from agentlib_mpc.modules.mpc.mpc_full import MPCConfig 

31 

32from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation 

33from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions 

34import agentlib_flexquant.data_structures.globals as glbs 

35import agentlib_flexquant.utils.config_management as cmng 

36from agentlib_flexquant.utils.parsing import SetupSystemModifier 

37from agentlib_flexquant.data_structures.flexquant import ( 

38 FlexibilityIndicatorConfig, 

39 FlexibilityMarketConfig, 

40 FlexQuantConfig, 

41) 

42from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData 

43from agentlib_flexquant.modules.flexibility_indicator import ( 

44 FlexibilityIndicatorModuleConfig, 

45) 

46from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

47 

48 

49class FlexAgentGenerator: 

50 """Class for generating the flex agents 

51 

52 orig_mpc_module_config: the config for the original mpc, 

53 which has nothing to do with the flexibility quantification 

54 baseline_mpc_module_config: the config for the baseline mpc 

55 for flexibility quantification 

56 pos_flex_mpc_module_config: the config for the positive flexibility mpc 

57 for flexibility quantification 

58 neg_flex_mpc_module_config: the config for the negative flexibility mpc 

59 for flexibility quantification 

60 indicator_module_config: the config for the indicator for flexibility quantification 

61 market_module_config: the config for the market for flexibility quantification 

62 

63 """ 

64 

65 orig_mpc_module_config: MPCConfig 

66 baseline_mpc_module_config: MPCConfig 

67 pos_flex_mpc_module_config: MPCConfig 

68 neg_flex_mpc_module_config: MPCConfig 

69 indicator_module_config: FlexibilityIndicatorModuleConfig 

70 market_module_config: FlexibilityMarketModuleConfig 

71 

72 def __init__( 

73 self, 

74 flex_config: Union[str, FilePath, FlexQuantConfig], 

75 mpc_agent_config: Union[str, FilePath, AgentConfig], 

76 ): 

77 self.logger = logging.getLogger(__name__) 

78 

79 if isinstance(flex_config, str or FilePath): 

80 self.flex_config_file_name = os.path.basename(flex_config) 

81 else: 

82 # provide default name for json 

83 self.flex_config_file_name = "flex_config.json" 

84 # load configs 

85 self.flex_config = load_config.load_config(flex_config, 

86 config_type=FlexQuantConfig) 

87 

88 # original mpc agent 

89 self.orig_mpc_agent_config = load_config.load_config( 

90 mpc_agent_config, config_type=AgentConfig 

91 ) 

92 # baseline agent 

93 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

94 self.baseline_mpc_agent_config.id = (self.flex_config. 

95 baseline_config_generator_data.agent_id) 

96 # pos agent 

97 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

98 self.pos_flex_mpc_agent_config.id = (self.flex_config. 

99 shadow_mpc_config_generator_data. 

100 pos_flex.agent_id) 

101 # neg agent 

102 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

103 self.neg_flex_mpc_agent_config.id = (self.flex_config. 

104 shadow_mpc_config_generator_data. 

105 neg_flex.agent_id) 

106 

107 # original mpc module 

108 self.orig_mpc_module_config = cmng.get_module( 

109 config=self.orig_mpc_agent_config, 

110 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

111 ) 

112 # baseline module 

113 self.baseline_mpc_module_config = cmng.get_module( 

114 config=self.baseline_mpc_agent_config, 

115 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

116 ) 

117 # convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional 

118 # fields not present in the original 

119 self.baseline_mpc_module_config = cmng.get_flex_mpc_module_config( 

120 agent_config=self.baseline_mpc_agent_config, 

121 module_config=self.baseline_mpc_module_config, 

122 module_type=self.flex_config.baseline_config_generator_data.module_types[ 

123 self.baseline_mpc_module_config.type 

124 ] 

125 ) 

126 # pos module 

127 self.pos_flex_mpc_module_config = cmng.get_module( 

128 config=self.pos_flex_mpc_agent_config, 

129 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

130 ) 

131 # neg module 

132 self.neg_flex_mpc_module_config = cmng.get_module( 

133 config=self.neg_flex_mpc_agent_config, 

134 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

135 ) 

136 # load indicator config 

137 self.indicator_config = load_config.load_config( 

138 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig 

139 ) 

140 # load indicator module config 

141 self.indicator_agent_config = load_config.load_config( 

142 self.indicator_config.agent_config, config_type=AgentConfig 

143 ) 

144 self.indicator_module_config = cmng.get_module( 

145 config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE 

146 ) 

147 # load market config 

148 if self.flex_config.market_config: 

149 self.market_config = load_config.load_config( 

150 self.flex_config.market_config, config_type=FlexibilityMarketConfig 

151 ) 

152 # load market module config 

153 self.market_agent_config = load_config.load_config( 

154 self.market_config.agent_config, config_type=AgentConfig 

155 ) 

156 self.market_module_config = cmng.get_module( 

157 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

158 ) 

159 else: 

160 self.flex_config.market_time = 0 

161 

162 self.run_config_validations() 

163 

164 def generate_flex_agents(self) -> list[str]: 

165 """Generate the configs and the python module for the flexibility agents. 

166 

167 Returns: 

168 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator 

169 and market config 

170 

171 """ 

172 # adapt modules to include necessary communication variables 

173 baseline_mpc_config = self.adapt_mpc_module_config( 

174 module_config=self.baseline_mpc_module_config, 

175 mpc_dataclass=self.flex_config.baseline_config_generator_data, 

176 agent_id=self.flex_config.baseline_config_generator_data.agent_id, 

177 ) 

178 pf_mpc_config = self.adapt_mpc_module_config( 

179 module_config=self.pos_flex_mpc_module_config, 

180 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

181 agent_id=self.flex_config.shadow_mpc_config_generator_data.pos_flex.agent_id, 

182 ) 

183 nf_mpc_config = self.adapt_mpc_module_config( 

184 module_config=self.neg_flex_mpc_module_config, 

185 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

186 agent_id=self.flex_config.shadow_mpc_config_generator_data.neg_flex.agent_id, 

187 ) 

188 indicator_module_config = self.adapt_indicator_module_config( 

189 module_config=self.indicator_module_config 

190 ) 

191 if self.flex_config.market_config: 

192 market_module_config = self.adapt_market_module_config( 

193 module_config=self.market_module_config 

194 ) 

195 

196 # dump jsons of the agents including the adapted module configs 

197 self.append_module_and_dump_agent( 

198 module=baseline_mpc_config, 

199 agent=self.baseline_mpc_agent_config, 

200 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

201 config_name=self.flex_config.baseline_config_generator_data. 

202 name_of_created_file, 

203 ) 

204 self.append_module_and_dump_agent( 

205 module=pf_mpc_config, 

206 agent=self.pos_flex_mpc_agent_config, 

207 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

208 config_name=self.flex_config.shadow_mpc_config_generator_data. 

209 pos_flex.name_of_created_file, 

210 ) 

211 self.append_module_and_dump_agent( 

212 module=nf_mpc_config, 

213 agent=self.neg_flex_mpc_agent_config, 

214 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

215 config_name=self.flex_config.shadow_mpc_config_generator_data. 

216 neg_flex.name_of_created_file, 

217 ) 

218 self.append_module_and_dump_agent( 

219 module=indicator_module_config, 

220 agent=self.indicator_agent_config, 

221 module_type=cmng.INDICATOR_CONFIG_TYPE, 

222 config_name=self.indicator_config.name_of_created_file, 

223 ) 

224 if self.flex_config.market_config: 

225 self.append_module_and_dump_agent( 

226 module=market_module_config, 

227 agent=self.market_agent_config, 

228 module_type=cmng.MARKET_CONFIG_TYPE, 

229 config_name=self.market_config.name_of_created_file, 

230 ) 

231 # generate python files for the shadow mpcs 

232 self._generate_flex_model_definition() 

233 

234 # add new paths to flex config and dump it 

235 self.adapt_and_dump_flex_config() 

236 

237 # register the exit function if the corresponding flag is set 

238 if self.flex_config.delete_files: 

239 atexit.register(lambda: self._delete_created_files()) 

240 

241 return self.get_config_file_paths() 

242 

243 def append_module_and_dump_agent( 

244 self, 

245 module: BaseModuleConfig, 

246 agent: AgentConfig, 

247 module_type: str, 

248 config_name: str, 

249 ): 

250 """Append the given module config to the given agent config and 

251 dumps the agent config to a json file. 

252 

253 The json file is named based on the config_name. 

254 

255 Args: 

256 module: The module config to be appended. 

257 agent: The agent config to be updated. 

258 module_type: The type of the module 

259 config_name: The name of the json file for module config (e.g. baseline.json) 

260 

261 """ 

262 # get the module as a dict without default values 

263 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module) 

264 # write given module to agent config 

265 for i, agent_module in enumerate(agent.modules): 

266 if cmng.MODULE_TYPE_DICT[module_type] is cmng.MODULE_TYPE_DICT[ 

267 agent_module["type"]]: 

268 agent.modules[i] = module_dict 

269 

270 # dump agent config 

271 if agent.modules: 

272 if self.flex_config.overwrite_files: 

273 try: 

274 Path(os.path.join(self.flex_config.flex_files_directory, 

275 config_name)).unlink() 

276 except OSError: 

277 pass 

278 with open( 

279 os.path.join(self.flex_config.flex_files_directory, config_name), 

280 "w+", 

281 encoding="utf-8", 

282 ) as f: 

283 module_json = agent.model_dump_json(exclude_defaults=True) 

284 f.write(module_json) 

285 else: 

286 logging.error("Provided agent config does not contain any modules.") 

287 

288 def get_config_file_paths(self) -> list[str]: 

289 """Return a list of paths with the created config files.""" 

290 paths = [ 

291 os.path.join( 

292 self.flex_config.flex_files_directory, 

293 self.flex_config.baseline_config_generator_data. 

294 name_of_created_file, 

295 ), 

296 os.path.join( 

297 self.flex_config.flex_files_directory, 

298 self.flex_config.shadow_mpc_config_generator_data.pos_flex. 

299 name_of_created_file, 

300 ), 

301 os.path.join( 

302 self.flex_config.flex_files_directory, 

303 self.flex_config.shadow_mpc_config_generator_data.neg_flex. 

304 name_of_created_file, 

305 ), 

306 os.path.join( 

307 self.flex_config.flex_files_directory, 

308 self.indicator_config.name_of_created_file, 

309 ), 

310 ] 

311 if self.flex_config.market_config: 

312 paths.append( 

313 os.path.join( 

314 self.flex_config.flex_files_directory, 

315 self.market_config.name_of_created_file, 

316 ) 

317 ) 

318 return paths 

319 

320 def _delete_created_files(self): 

321 """Function to run at exit if the files are to be deleted.""" 

322 to_be_deleted = self.get_config_file_paths() 

323 to_be_deleted.append( 

324 os.path.join( 

325 self.flex_config.flex_files_directory, 

326 self.flex_config_file_name, 

327 ) 

328 ) 

329 # delete files 

330 for file in to_be_deleted: 

331 Path(file).unlink() 

332 # also delete folder 

333 Path(self.flex_config.flex_files_directory).rmdir() 

334 

335 def adapt_mpc_module_config( 

336 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str 

337 ) -> MPCConfig: 

338 """Adapt the mpc module config for automated flexibility quantification. 

339 

340 Things adapted among others are: 

341 - the file name/path of the mpc config file 

342 - names of the control variables for the shadow mpcs 

343 - reduce communicated variables of shadow mpcs to outputs 

344 - add the power variable to the outputs 

345 - add parameters for the activation and quantification of flexibility 

346 

347 Args: 

348 module_config: The module config to be adapted 

349 mpc_dataclass: The dataclass corresponding to the type of the MPC module. 

350 It contains all the extra data necessary for flexibility 

351 quantification, which will be used to update the 

352 module_config. 

353 agent_id: agent_id for creating the FlexQuant mpc module config 

354 

355 Returns: 

356 The adapted module config 

357 

358 """ 

359 # allow the module config to be changed 

360 module_config.model_config["frozen"] = False 

361 

362 # set new MPC type 

363 module_config.type = mpc_dataclass.module_types[ 

364 cmng.get_orig_module_type(self.orig_mpc_agent_config) 

365 ] 

366 

367 # set the MPC config type from the MPCConfig in agentlib_mpc to the 

368 # corresponding one in flexquant and add additional fields 

369 module_config_flex_dict = module_config.model_dump() 

370 module_config_flex_dict["casadi_sim_time_step"] = ( 

371 self.flex_config.casadi_sim_time_step) 

372 module_config_flex_dict["power_variable_name"] = ( 

373 self.flex_config.baseline_config_generator_data.power_variable) 

374 module_config_flex_dict["storage_variable_name"] = ( 

375 self.indicator_module_config.correct_costs.stored_energy_variable) 

376 module_config_flex = cmng.MODULE_TYPE_DICT[module_config.type]( 

377 **module_config_flex_dict, _agent_id=agent_id 

378 ) 

379 

380 # HOTFIX due to AgentLib-MPC bug. Needs to be adapted after Objectives 

381 # in AgentLib-MPC are fixed. 

382 if module_config_flex.r_del_u is None: 

383 module_config_flex = module_config_flex.model_copy(update={"r_del_u": {}}) 

384 

385 # allow the module config to be changed 

386 module_config_flex.model_config["frozen"] = False 

387 

388 module_config_flex.module_id = mpc_dataclass.module_id 

389 

390 # set new id (needed for plotting) 

391 module_config_flex.module_id = mpc_dataclass.module_id 

392 # update optimization backend to use the created mpc files and classes 

393 module_config_flex.optimization_backend["model"]["type"] = { 

394 "file": os.path.join( 

395 self.flex_config.flex_files_directory, 

396 mpc_dataclass.created_flex_mpcs_file, 

397 ), 

398 "class_name": mpc_dataclass.class_name, 

399 } 

400 # extract filename from results file and update it with 

401 # suffix and parent directory 

402 result_filename = Path( 

403 module_config_flex.optimization_backend["results_file"] 

404 ).name.replace(".csv", mpc_dataclass.results_suffix) 

405 full_path = self.flex_config.results_directory / result_filename 

406 module_config_flex.optimization_backend["results_file"] = str(full_path) 

407 # change cia backend to custom backend of flexquant 

408 if module_config_flex.optimization_backend["type"] == "casadi_cia": 

409 module_config_flex.optimization_backend["type"] = \ 

410 "agentlib_flexquant.casadi_cia_cons" 

411 if (module_config_flex.optimization_backend["type"] == 

412 "agentlib_flexquant.casadi_cia_cons"): 

413 module_config_flex.optimization_backend["market_time"] = ( 

414 self.flex_config.market_time) 

415 

416 # add the full control trajectory output from the baseline as input for the 

417 # shadow mpcs, they are directly included in the optimization problem 

418 if not isinstance(mpc_dataclass, BaselineMPCData): 

419 for control in module_config_flex.controls: 

420 module_config_flex.inputs.append( 

421 MPCVariable( 

422 name=control.name + glbs.full_trajectory_suffix, 

423 value=None, 

424 type="pd.Series", 

425 ) 

426 ) 

427 # add full control names to shadow MPC config for inputs tracking 

428 module_config_flex.full_control_names.append( 

429 control.name + glbs.full_trajectory_suffix) 

430 # change the alias of control variable in shadow mpc to 

431 # prevent it from triggering the wrong callback 

432 control.alias = control.name + glbs.shadow_suffix 

433 # also include binary controls 

434 if hasattr(module_config_flex, "binary_controls"): 

435 for control in module_config_flex.binary_controls: 

436 module_config_flex.inputs.append( 

437 MPCVariable( 

438 name=control.name + glbs.full_trajectory_suffix, 

439 value=None, 

440 type="pd.Series", 

441 ) 

442 ) 

443 # add full control names to shadow MPC config for inputs tracking 

444 module_config_flex.full_control_names.append( 

445 control.name + glbs.full_trajectory_suffix) 

446 # change the alias of control variable in shadow mpc to 

447 # prevent it from triggering the wrong callback 

448 control.alias = control.name + glbs.shadow_suffix 

449 # only communicate outputs for the shadow mpcs 

450 module_config_flex.shared_variable_fields = ["outputs"] 

451 

452 # In addition to creating the full control variables, the inputs 

453 # and states of the Baseline are communicated to the Shadow MPC 

454 # to ensure synchronisation. Therefore, all inputs and states of 

455 # the Baseline are added to the Shadow MPCs with an alias 

456 baseline_names = {inp.name for inp in 

457 self.baseline_mpc_module_config.inputs} 

458 for i, input in enumerate(module_config_flex.inputs): 

459 if input.name in baseline_names: 

460 module_config_flex.inputs[i].alias = ( 

461 input.name + glbs.base_vars_to_communicate_suffix) 

462 

463 # add Baseline input names to shadow MPC config for inputs tracking 

464 # if Baseline variable is also set in config_inputs_appendix this is 

465 # due to overwriting the alias, so variable should not be added here 

466 appendix_names = {inp.name for inp in mpc_dataclass.config_inputs_appendix} 

467 module_config_flex.baseline_input_names = [ 

468 input.name + glbs.base_vars_to_communicate_suffix for input in 

469 self.baseline_mpc_module_config.inputs 

470 if input.name not in appendix_names] 

471 

472 # add custom input names for the shadow MPC to track. Here, the 

473 # communication suffix is not added, as the user is free to define 

474 # custom inputs as desired. 

475 # Exclude in_provision, as this is not regularly set and would prevent 

476 # the do_step of the shadow MPC. 

477 module_config_flex.custom_input_names.extend([ 

478 {"name": input.name, "alias": input.alias} 

479 for input in mpc_dataclass.config_inputs_appendix 

480 if input.name not in [glbs.PROVISION_VAR_NAME] 

481 ]) 

482 

483 for i, state in enumerate(module_config_flex.states): 

484 if state in self.baseline_mpc_module_config.states: 

485 module_config_flex.states[i].alias = ( 

486 state.name + glbs.base_vars_to_communicate_suffix) 

487 # add Baseline state names to shadow MPC config for inputs tracking 

488 module_config_flex.baseline_state_names = [ 

489 state.name + glbs.base_vars_to_communicate_suffix for state in 

490 self.baseline_mpc_module_config.states] 

491 module_config_flex.baseline_agent_id = ( 

492 self.flex_config.baseline_config_generator_data.agent_id) 

493 

494 else: 

495 # all the variables here are added to the custom MPCConfig of 

496 # FlexQuant to avoid them being added to the optimization problem 

497 # add full_controls trajectory as AgentVariable to the config of 

498 # Baseline mpc 

499 for control in module_config_flex.controls: 

500 module_config_flex.full_controls.append( 

501 AgentVariable( 

502 name=control.name + glbs.full_trajectory_suffix, 

503 alias=control.name + glbs.full_trajectory_suffix, 

504 shared=True, 

505 ) 

506 ) 

507 if hasattr(module_config_flex, "binary_controls"): 

508 for binary_controls in module_config_flex.binary_controls: 

509 module_config_flex.full_controls.append( 

510 AgentVariable( 

511 name=binary_controls.name + glbs.full_trajectory_suffix, 

512 alias=binary_controls.name + glbs.full_trajectory_suffix, 

513 shared=True, 

514 ) 

515 ) 

516 # add full controls to custom cia backend to constrain 

517 # during market time 

518 if (module_config_flex.optimization_backend["type"] == 

519 "agentlib_flexquant.casadi_cia_cons"): 

520 module_config_flex.optimization_backend["full_controls_dict"] = ( 

521 dict(zip([var.name for var in module_config_flex.full_controls], 

522 [None] * len(module_config_flex.full_controls)) 

523 )) 

524 # add input and states copy variables which send the Baseline inputs 

525 # to the shadow MPC 

526 for input in module_config_flex.inputs: 

527 module_config_flex.vars_to_communicate.append( 

528 AgentVariable( 

529 name=input.name + glbs.base_vars_to_communicate_suffix, 

530 alias=input.name + glbs.base_vars_to_communicate_suffix, 

531 shared=True, 

532 ) 

533 ) 

534 for state in module_config_flex.states: 

535 module_config_flex.vars_to_communicate.append( 

536 AgentVariable( 

537 name=state.name + glbs.base_vars_to_communicate_suffix, 

538 alias=state.name + glbs.base_vars_to_communicate_suffix, 

539 shared=True, 

540 ) 

541 ) 

542 

543 module_config_flex.set_outputs = True 

544 # add outputs for the power variables, for easier handling create a lookup dict 

545 output_dict = {output.name: output for output in module_config_flex.outputs} 

546 if self.flex_config.baseline_config_generator_data.power_variable in output_dict: 

547 output_dict[ 

548 self.flex_config.baseline_config_generator_data.power_variable 

549 ].alias = mpc_dataclass.power_alias 

550 else: 

551 module_config_flex.outputs.append( 

552 MPCVariable( 

553 name=self.flex_config.baseline_config_generator_data.power_variable, 

554 alias=mpc_dataclass.power_alias, 

555 ) 

556 ) 

557 # add or change alias for stored energy variable 

558 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

559 output_dict[ 

560 self.indicator_module_config.correct_costs.stored_energy_variable 

561 ].alias = mpc_dataclass.stored_energy_alias 

562 

563 # add extra inputs needed for activation of flex or custom cost functions 

564 existing_input_names = {inp.name: idx for idx, inp in 

565 enumerate(module_config_flex.inputs)} 

566 for appendix_inp in mpc_dataclass.config_inputs_appendix.copy(): 

567 # If variable already exists in the config 

568 if appendix_inp.name in existing_input_names: 

569 self.logger.warning(f"The given variable {appendix_inp.name} in the " 

570 f"config_inputs_appendix already exists in the MPC " 

571 f"model. I am updating the alias of the existing " 

572 f"variable to {appendix_inp.alias} (provided by you). " 

573 f"However, this can still cause issues down the line " 

574 f"if the alias is not chosen wisely.") 

575 # Update only the alias of the existing input 

576 idx = existing_input_names[appendix_inp.name] 

577 existing_inp = module_config_flex.inputs[idx] 

578 inp_dict = existing_inp.dict() 

579 inp_dict["alias"] = appendix_inp.alias 

580 module_config_flex.inputs[idx] = type(existing_inp)(**inp_dict) 

581 # Remove variable from appendix list to avoid creation during parsing 

582 mpc_dataclass.config_inputs_appendix.remove(appendix_inp) 

583 else: 

584 # Add the new input 

585 module_config_flex.inputs.append(appendix_inp) 

586 

587 # add extra parameters needed for activation of flex or custom weights 

588 for var in mpc_dataclass.config_parameters_appendix: 

589 if var.name in self.flex_config.model_fields: 

590 var.value = getattr(self.flex_config, var.name) 

591 if var.name in self.flex_config.baseline_config_generator_data.model_fields: 

592 var.value = getattr(self.flex_config.baseline_config_generator_data, 

593 var.name) 

594 module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix) 

595 

596 # freeze the config again 

597 module_config_flex.model_config["frozen"] = True 

598 

599 return module_config_flex 

600 

601 def adapt_indicator_module_config( 

602 self, module_config: FlexibilityIndicatorModuleConfig 

603 ) -> FlexibilityIndicatorModuleConfig: 

604 """Adapt the indicator module config for automated flexibility 

605 quantification. 

606 

607 """ 

608 # append user-defined price var to indicator module config 

609 module_config.inputs.append( 

610 AgentVariable( 

611 name=module_config.price_variable, 

612 unit="ct/kWh", 

613 type="pd.Series", 

614 description="electricity price", 

615 ) 

616 ) 

617 module_config.inputs.append( 

618 AgentVariable( 

619 name=module_config.price_variable_feed_in, 

620 unit="ct/kWh", 

621 type="pd.Series", 

622 description="electricity feed-in price", 

623 ) 

624 ) 

625 # allow the module config to be changed 

626 module_config.model_config["frozen"] = False 

627 for parameter in module_config.parameters: 

628 if parameter.name == glbs.PREP_TIME: 

629 parameter.value = self.flex_config.prep_time 

630 if parameter.name == glbs.MARKET_TIME: 

631 parameter.value = self.flex_config.market_time 

632 if parameter.name == glbs.FLEX_EVENT_DURATION: 

633 parameter.value = self.flex_config.flex_event_duration 

634 if parameter.name == glbs.TIME_STEP: 

635 parameter.value = self.baseline_mpc_module_config.time_step 

636 if parameter.name == glbs.PREDICTION_HORIZON: 

637 parameter.value = self.baseline_mpc_module_config.prediction_horizon 

638 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

639 dis_op = self.baseline_mpc_module_config.optimization_backend[ 

640 "discretization_options" 

641 ] 

642 parameter.value = self.get_collocation_time_grid( 

643 discretization_options=dis_op 

644 ) 

645 # set power unit 

646 module_config.power_unit = ( 

647 self.flex_config.baseline_config_generator_data.power_unit) 

648 module_config.results_file = ( 

649 self.flex_config.results_directory / module_config.results_file.name 

650 ) 

651 module_config.model_config["frozen"] = True 

652 return module_config 

653 

654 def adapt_market_module_config( 

655 self, module_config: FlexibilityMarketModuleConfig 

656 ) -> FlexibilityMarketModuleConfig: 

657 """Adapt the market module config for automated flexibility quantification.""" 

658 # allow the module config to be changed 

659 module_config.model_config["frozen"] = False 

660 for field in module_config.__fields__: 

661 if field in self.market_module_config.__fields__.keys(): 

662 module_config.__setattr__(field, getattr(self.market_module_config, 

663 field)) 

664 module_config.results_file = ( 

665 self.flex_config.results_directory / module_config.results_file.name 

666 ) 

667 for parameter in module_config.parameters: 

668 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

669 dis_op = self.baseline_mpc_module_config.optimization_backend[ 

670 "discretization_options" 

671 ] 

672 parameter.value = self.get_collocation_time_grid( 

673 discretization_options=dis_op 

674 ) 

675 if parameter.name == glbs.TIME_STEP: 

676 parameter.value = self.baseline_mpc_module_config.time_step 

677 module_config.model_config["frozen"] = True 

678 return module_config 

679 

680 def adapt_and_dump_flex_config(self): 

681 """Update flex_config to reference the newly generated market/indicator agent configs and 

682 dump the updated flex configuration to disk. 

683 

684 This method replaces the market and indicator configuration entries in ``self.flex_config`` 

685 with the internally created ``self.market_config`` and ``self.indicator_config``. If a 

686 market configuration is present, its ``agent_config`` attribute is updated to the path of 

687 the newly created market agent config file under ``flex_files_directory``. Likewise, the 

688 indicator configuration's ``agent_config`` attribute is set to the path of the newly 

689 created indicator agent config file. These paths correspond to the new locations of the 

690 market or indicator config files when they were originally provided to the 

691 ``FlexAgentGenerator`` as file paths. 

692 

693 After updating these paths, the complete ``flex_config`` is serialized (excluding default 

694 values) and written as JSON to ``flex_files_directory / flex_config_file_name`` so that 

695 subsequent runs can use the resolved configuration directly. 

696 """ 

697 # store market and indicator with file path of created agent config 

698 if self.flex_config.market_config: 

699 self.flex_config.market_config = self.market_config 

700 self.flex_config.market_config.agent_config = os.path.join( 

701 self.flex_config.flex_files_directory, 

702 self.market_config.name_of_created_file) 

703 self.flex_config.indicator_config = self.indicator_config 

704 self.flex_config.indicator_config.agent_config = os.path.join( 

705 self.flex_config.flex_files_directory, 

706 self.indicator_config.name_of_created_file) 

707 # save flex config to created flex files 

708 with open(os.path.join(self.flex_config.flex_files_directory, 

709 self.flex_config_file_name), 

710 "w", encoding="utf-8", ) as f: 

711 config_json = self.flex_config.model_dump_json(exclude_defaults=True) 

712 f.write(config_json) 

713 

714 def get_collocation_time_grid(self, discretization_options: dict): 

715 """Get the mpc output collocation grid over the horizon""" 

716 # get the mpc time grid configuration 

717 time_step = self.baseline_mpc_module_config.time_step 

718 prediction_horizon = self.baseline_mpc_module_config.prediction_horizon 

719 # get the collocation configuration 

720 collocation_method = discretization_options["collocation_method"] 

721 collocation_order = discretization_options["collocation_order"] 

722 # get the collocation points 

723 options = CasadiDiscretizationOptions( 

724 collocation_order=collocation_order, collocation_method=collocation_method 

725 ) 

726 collocation_points = DirectCollocation(options= 

727 options)._collocation_polynomial().root 

728 # compute the mpc output collocation grid 

729 discretization_points = np.arange(0, time_step * prediction_horizon, time_step) 

730 collocation_time_grid = ( 

731 discretization_points[:, None] + collocation_points * time_step 

732 ).ravel() 

733 collocation_time_grid = collocation_time_grid[ 

734 ~np.isin(collocation_time_grid, discretization_points) 

735 ] 

736 collocation_time_grid = collocation_time_grid.tolist() 

737 return collocation_time_grid 

738 

739 def _generate_flex_model_definition(self): 

740 """Generate a python module for negative and positive flexibility agents 

741 from the Baseline MPC model.""" 

742 output_file = os.path.join( 

743 self.flex_config.flex_files_directory, 

744 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

745 ) 

746 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"] 

747 

748 # Extract the config class of the casadi model to check cost functions 

749 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"] 

750 # Get custom module fields provided by the user and add them 

751 model_fields = self.baseline_mpc_module_config.optimization_backend["model"] 

752 _ = model_fields.pop("type") 

753 config_instance = config_class(**model_fields) 

754 # The " + " is just there to simplify the validation, it does not affect 

755 # the generated code 

756 self.check_variables_in_casadi_config( 

757 config_instance, 

758 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function + 

759 ( 

760 " + " + self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix 

761 if self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix else ""), 

762 shadow_mpc_type="neg_flex" 

763 ) 

764 self.check_variables_in_casadi_config( 

765 config_instance, 

766 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function + 

767 ( 

768 " + " + self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix 

769 if self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix else ""), 

770 shadow_mpc_type="pos_flex" 

771 ) 

772 

773 # parse mpc python file 

774 with open(opt_backend["file"], "r", encoding="utf-8") as f: 

775 source = f.read() 

776 tree = ast.parse(source) 

777 

778 # create modifiers for python file 

779 modifier_base = SetupSystemModifier( 

780 mpc_data=self.flex_config.baseline_config_generator_data, 

781 controls=self.baseline_mpc_module_config.controls, 

782 binary_controls=self.baseline_mpc_module_config.binary_controls 

783 if hasattr(self.baseline_mpc_module_config, "binary_controls") 

784 else None, 

785 ) 

786 modifier_pos = SetupSystemModifier( 

787 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

788 controls=self.pos_flex_mpc_module_config.controls, 

789 binary_controls=self.pos_flex_mpc_module_config.binary_controls 

790 if hasattr(self.pos_flex_mpc_module_config, "binary_controls") 

791 else None, 

792 ) 

793 modifier_neg = SetupSystemModifier( 

794 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

795 controls=self.neg_flex_mpc_module_config.controls, 

796 binary_controls=self.neg_flex_mpc_module_config.binary_controls 

797 if hasattr(self.neg_flex_mpc_module_config, "binary_controls") 

798 else None, 

799 ) 

800 # run the modification 

801 modified_tree_base = modifier_base.visit(deepcopy(tree)) 

802 modified_tree_pos = modifier_pos.visit(deepcopy(tree)) 

803 modified_tree_neg = modifier_neg.visit(deepcopy(tree)) 

804 # combine modifications to one file 

805 modified_tree = ast.Module(body=[], type_ignores=[]) 

806 modified_tree.body.extend( 

807 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body 

808 ) 

809 modified_source = astor.to_source(modified_tree) 

810 # Use black to format the generated code 

811 formatted_code = black.format_str(modified_source, mode=black.FileMode()) 

812 

813 if self.flex_config.overwrite_files: 

814 try: 

815 Path( 

816 os.path.join( 

817 self.flex_config.flex_files_directory, 

818 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

819 ) 

820 ).unlink() 

821 except OSError: 

822 pass 

823 

824 with open(output_file, "w", encoding="utf-8") as f: 

825 f.write(formatted_code) 

826 

827 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str, 

828 shadow_mpc_type: str): 

829 """Check if all variables in the expression are defined in the config. 

830 

831 Args: 

832 config: casadi model config. 

833 expr: The expression to check. 

834 

835 Raises: 

836 ValueError: If any variable in the expression is not defined in the config. 

837 

838 """ 

839 variables_in_config = set(config.get_variable_names()) 

840 variables_in_cost_function = set(ast.walk(ast.parse(expr))) 

841 variables_in_cost_function = { 

842 node.attr for node in variables_in_cost_function 

843 if isinstance(node, ast.Attribute) 

844 } 

845 flex_config_data = (self.flex_config.shadow_mpc_config_generator_data.pos_flex 

846 if shadow_mpc_type == "pos_flex" 

847 else self.flex_config.shadow_mpc_config_generator_data.neg_flex) 

848 variables_newly_created = set( 

849 [par.name for par in flex_config_data.config_parameters_appendix] + 

850 [inp.name for inp in flex_config_data.config_inputs_appendix] 

851 ) 

852 

853 unknown_vars = (variables_in_cost_function - variables_in_config - 

854 variables_newly_created) 

855 if unknown_vars: 

856 self.logger.warning(f"Unknown variables in new cost function: " 

857 f"{unknown_vars}. This might cause problems with " 

858 f"the optimization backend.") 

859 

860 def run_config_validations(self): 

861 """Function to validate integrity of user-supplied flex config. 

862 

863 Since the validation depends on interactions between multiple configurations, 

864 it is performed within this function rather than using Pydantic’s built-in 

865 validators for individual configurations. 

866 

867 The following checks are performed: 

868 1. Ensures the specified power variable exists in the MPC model outputs. 

869 2. Ensures the specified comfort variable exists in the MPC model states. 

870 3. Validates that the stored energy variable exists in MPC outputs if 

871 energy cost correction is enabled. 

872 4. Verifies the supported collocation method is used; otherwise, 

873 switches to 'legendre' and raises a warning. 

874 5. Ensures that the sum of prep time, market time, and flex event duration 

875 does not exceed the prediction horizon. 

876 6. Ensures market time equals the MPC model time step if market config is 

877 present. 

878 7. Ensures that all flex time values are multiples of the MPC model time step. 

879 8. Checks for mismatches between time-related parameters in the flex/MPC and 

880 indicator configs and issues warnings when discrepancies exist, using the 

881 flex/MPC config values as the source of truth. 

882 

883 """ 

884 # check if the power variable exists in the mpc config 

885 power_var = self.flex_config.baseline_config_generator_data.power_variable 

886 if power_var not in [output.name for output in 

887 self.baseline_mpc_module_config.outputs]: 

888 raise ConfigurationError( 

889 f"Given power variable {power_var} is not defined " 

890 f"as output in baseline mpc config." 

891 ) 

892 

893 # check if the comfort variable exists in the mpc slack variables 

894 mod_type = self.baseline_mpc_module_config.optimization_backend["model"]["type"] 

895 if self.flex_config.baseline_config_generator_data.comfort_variable: 

896 file_path = mod_type["file"] 

897 class_name = mod_type["class_name"] 

898 # Get the class 

899 dynamic_class = cmng.get_class_from_file(file_path, class_name) 

900 if self.flex_config.baseline_config_generator_data.comfort_variable not in [ 

901 state.name for state in dynamic_class().states 

902 ]: 

903 raise ConfigurationError( 

904 f"Given comfort variable " 

905 f"{self.flex_config.baseline_config_generator_data.comfort_variable} " 

906 f"is not defined as state in baseline mpc config." 

907 ) 

908 

909 # check if the energy storage variable exists in the mpc config 

910 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

911 if self.indicator_module_config.correct_costs.stored_energy_variable not in [ 

912 output.name for output in self.baseline_mpc_module_config.outputs 

913 ]: 

914 raise ConfigurationError( 

915 f"The stored energy variable " 

916 f"{self.indicator_module_config.correct_costs.stored_energy_variable} " 

917 f"is not defined in baseline mpc config. " 

918 f"It must be defined in the base MPC model and config as output " 

919 f"if the correction of costs is enabled." 

920 ) 

921 

922 # raise warning if unsupported collocation method is used and change 

923 # to supported method 

924 if ( 

925 "collocation_method" 

926 not in self.baseline_mpc_module_config.optimization_backend[ 

927 "discretization_options"] 

928 ): 

929 raise ConfigurationError( 

930 "Please use collocation as discretization method and define the " 

931 "collocation_method in the mpc config" 

932 ) 

933 else: 

934 collocation_method = self.baseline_mpc_module_config.optimization_backend[ 

935 "discretization_options" 

936 ]["collocation_method"] 

937 if collocation_method != "legendre": 

938 self.logger.warning( 

939 "Collocation method %s is not supported. Switching to " 

940 "method legendre.", 

941 collocation_method, 

942 ) 

943 self.baseline_mpc_module_config.optimization_backend[ 

944 "discretization_options"][ 

945 "collocation_method" 

946 ] = "legendre" 

947 self.pos_flex_mpc_module_config.optimization_backend[ 

948 "discretization_options"][ 

949 "collocation_method" 

950 ] = "legendre" 

951 self.neg_flex_mpc_module_config.optimization_backend[ 

952 "discretization_options"][ 

953 "collocation_method" 

954 ] = "legendre" 

955 

956 # time data validations 

957 flex_times = { 

958 glbs.PREP_TIME: self.flex_config.prep_time, 

959 glbs.MARKET_TIME: self.flex_config.market_time, 

960 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration, 

961 } 

962 mpc_times = { 

963 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step, 

964 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon, 

965 } 

966 # total time length check (prep+market+flex_event) 

967 if (sum(flex_times.values()) > mpc_times["time_step"] * 

968 mpc_times["prediction_horizon"]): 

969 raise ConfigurationError( 

970 "Market time + prep time + flex event duration " 

971 "can not exceed the prediction horizon." 

972 ) 

973 # market time val check 

974 if self.flex_config.market_config: 

975 if flex_times["market_time"] % mpc_times["time_step"] != 0: 

976 raise ConfigurationError( 

977 "Market time must be an integer multiple of the time step." 

978 ) 

979 # check for divisibility of flex_times by time_step 

980 for name, value in flex_times.items(): 

981 if value % mpc_times["time_step"] != 0: 

982 raise ConfigurationError( 

983 f"{name} is not a multiple of the time step. Please redefine." 

984 ) 

985 # raise warning if parameter value in flex indicator module config differs from 

986 # value in flex config/ baseline mpc module config 

987 for parameter in self.indicator_module_config.parameters: 

988 if parameter.value is not None: 

989 if parameter.name in flex_times: 

990 flex_value = flex_times[parameter.name] 

991 if parameter.value != flex_value: 

992 self.logger.warning( 

993 "Value mismatch for %s in flex config (field) " 

994 "and indicator module config (parameter). " 

995 "Flex config value will be used.", 

996 parameter.name, 

997 ) 

998 elif parameter.name in mpc_times: 

999 mpc_value = mpc_times[parameter.name] 

1000 if parameter.value != mpc_value: 

1001 self.logger.warning( 

1002 "Value mismatch for %s in baseline MPC module " 

1003 "config (field) and indicator module config (parameter). " 

1004 "Baseline MPC module config value will be used.", 

1005 parameter.name, 

1006 ) 

1007 

1008 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path], 

1009 save_name_suffix: str = "") -> Union[str, Path]: 

1010 """ 

1011 Optional helper function to adapt file path for simulator results in sim config, 

1012 so that sim results land in the same results directory as flex results. 

1013 

1014 Args: 

1015 simulator_agent_config: Path to the simulator agent config JSON file. 

1016 save_name_suffix: Suffix added to the newly created sim_config file. 

1017 

1018 Returns: 

1019 The updated simulator config dictionary with the modified result file path. 

1020 

1021 Raises: 

1022 FileNotFoundError: If the specified config file does not exist. 

1023 

1024 """ 

1025 simulator_agent_config = Path(simulator_agent_config) 

1026 # open config and extract sim module 

1027 with open(simulator_agent_config, "r", encoding="utf-8") as f: 

1028 sim_config = json.load(f) 

1029 sim_module_config = next( 

1030 (module for module in sim_config["modules"] if 

1031 module["type"] == "simulator"), None) 

1032 # convert filename string to path and extract the name 

1033 sim_file_name = Path(sim_module_config["result_filename"]).name 

1034 # set results path so that sim results lands in same directory 

1035 # as flex result CSVs 

1036 sim_module_config["result_filename"] = str( 

1037 self.flex_config.results_directory / sim_file_name 

1038 ) 

1039 try: 

1040 with open(Path(str(simulator_agent_config.parent) + "\\" + 

1041 str(simulator_agent_config.stem) + save_name_suffix + ".json"), 

1042 "w", encoding="utf-8") as f: 

1043 json.dump(sim_config, f, indent=4) 

1044 return Path(str(simulator_agent_config.parent) + "\\" + 

1045 str(simulator_agent_config.stem) + save_name_suffix + ".json") 

1046 except Exception as e: 

1047 raise Exception(f"Could not adapt and create a new simulation config " 

1048 f"due to: {e}. " 

1049 f"Please check {simulator_agent_config} and " 

1050 f"'{save_name_suffix}'")