Coverage for agentlib_flexquant/generate_flex_agents.py: 90%

343 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-06-17 09:09 +0000

1"""Generate agents for flexibility quantification. 

2 

3This module provides the FlexAgentGenerator class that creates and configures 

4flexibility agents. 

5The agents created include the baseline, positive and negative flexibility agents, 

6the flexibility indicator and market agents. The agents are created based on the 

7flex config and the MPC config. 

8""" 

9import ast 

10import atexit 

11import inspect 

12import logging 

13import os 

14 

15import astor 

16import black 

17import json 

18import numpy as np 

19from copy import deepcopy 

20from pathlib import Path 

21from typing import Union 

22from pydantic import FilePath 

23from agentlib.core.agent import AgentConfig 

24from agentlib.core.datamodels import AgentVariable 

25from agentlib.core.errors import ConfigurationError 

26from agentlib.core.module import BaseModuleConfig 

27from agentlib.utils import custom_injection, load_config 

28from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable 

29from agentlib_mpc.models.casadi_model import CasadiModelConfig 

30from agentlib_mpc.modules.mpc.mpc_full import MPCConfig 

31 

32from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation 

33from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions 

34import agentlib_flexquant.data_structures.globals as glbs 

35import agentlib_flexquant.utils.config_management as cmng 

36from agentlib_flexquant.utils.config_management import ModuleHandler 

37from agentlib_flexquant.utils.parsing import SetupSystemModifier 

38from agentlib_flexquant.data_structures.flexquant import ( 

39 FlexibilityIndicatorConfig, 

40 FlexibilityMarketConfig, 

41 FlexQuantConfig, 

42) 

43from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData 

44from agentlib_flexquant.modules.flexibility_indicator import ( 

45 FlexibilityIndicatorModuleConfig, 

46) 

47from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

48 

49 

50class FlexAgentGenerator: 

51 """Class for generating the flex agents 

52 

53 orig_mpc_module_config: the config for the original mpc, 

54 which has nothing to do with the flexibility quantification 

55 baseline_mpc_module_config: the config for the baseline mpc 

56 for flexibility quantification 

57 pos_flex_mpc_module_config: the config for the positive flexibility mpc 

58 for flexibility quantification 

59 neg_flex_mpc_module_config: the config for the negative flexibility mpc 

60 for flexibility quantification 

61 indicator_module_config: the config for the indicator for flexibility quantification 

62 market_module_config: the config for the market for flexibility quantification 

63 

64 """ 

65 

66 orig_mpc_module_config: MPCConfig 

67 baseline_mpc_module_config: MPCConfig 

68 pos_flex_mpc_module_config: MPCConfig 

69 neg_flex_mpc_module_config: MPCConfig 

70 indicator_module_config: FlexibilityIndicatorModuleConfig 

71 market_module_config: FlexibilityMarketModuleConfig 

72 

73 def __init__( 

74 self, 

75 flex_config: Union[str, FilePath, FlexQuantConfig], 

76 mpc_agent_config: Union[str, FilePath, AgentConfig], 

77 ): 

78 self.logger = logging.getLogger(__name__) 

79 

80 if isinstance(flex_config, str or FilePath): 

81 self.flex_config_file_name = os.path.basename(flex_config) 

82 else: 

83 # provide default name for json 

84 self.flex_config_file_name = "flex_config.json" 

85 # load configs 

86 self.flex_config = load_config.load_config(flex_config, 

87 config_type=FlexQuantConfig) 

88 

89 # initialize module handler class and load custom plugins if specified 

90 self.module_handler = ModuleHandler(extra_plugins=self.flex_config.custom_plugins) 

91 

92 # populate flex generator module_types 

93 self.flex_config.baseline_config_generator_data.module_types = self.module_handler.baseline_module_type_dict 

94 self.flex_config.shadow_mpc_config_generator_data.neg_flex.module_types = self.module_handler.shadow_module_type_dict 

95 self.flex_config.shadow_mpc_config_generator_data.pos_flex.module_types = self.module_handler.shadow_module_type_dict 

96 

97 # original mpc agent 

98 self.orig_mpc_agent_config = load_config.load_config( 

99 mpc_agent_config, config_type=AgentConfig 

100 ) 

101 # baseline agent 

102 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

103 self.baseline_mpc_agent_config.id = (self.flex_config. 

104 baseline_config_generator_data.agent_id) 

105 # pos agent 

106 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

107 self.pos_flex_mpc_agent_config.id = (self.flex_config. 

108 shadow_mpc_config_generator_data. 

109 pos_flex.agent_id) 

110 # neg agent 

111 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

112 self.neg_flex_mpc_agent_config.id = (self.flex_config. 

113 shadow_mpc_config_generator_data. 

114 neg_flex.agent_id) 

115 

116 # original mpc module 

117 self.orig_mpc_module_config = self.module_handler.get_module( 

118 config=self.orig_mpc_agent_config, 

119 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

120 ) 

121 # baseline module 

122 self.baseline_mpc_module_config = self.module_handler.get_module( 

123 config=self.baseline_mpc_agent_config, 

124 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

125 ) 

126 # convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional 

127 # fields not present in the original 

128 self.baseline_mpc_module_config = self.module_handler.get_flex_mpc_module_config( 

129 agent_config=self.baseline_mpc_agent_config, 

130 module_config=self.baseline_mpc_module_config, 

131 module_type=self.flex_config.baseline_config_generator_data.module_types[ 

132 self.baseline_mpc_module_config.type 

133 ] 

134 ) 

135 # pos module 

136 self.pos_flex_mpc_module_config = self.module_handler.get_module( 

137 config=self.pos_flex_mpc_agent_config, 

138 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

139 ) 

140 # neg module 

141 self.neg_flex_mpc_module_config = self.module_handler.get_module( 

142 config=self.neg_flex_mpc_agent_config, 

143 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

144 ) 

145 # load indicator config 

146 self.indicator_config = load_config.load_config( 

147 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig 

148 ) 

149 # load indicator module config 

150 self.indicator_agent_config = load_config.load_config( 

151 self.indicator_config.agent_config, config_type=AgentConfig 

152 ) 

153 self.indicator_module_config = self.module_handler.get_module( 

154 config=self.indicator_agent_config, module_type=self.indicator_config.module_type 

155 ) 

156 # load market config 

157 if self.flex_config.market_config: 

158 self.market_config = load_config.load_config( 

159 self.flex_config.market_config, config_type=FlexibilityMarketConfig 

160 ) 

161 

162 # load market module config 

163 self.market_agent_config = load_config.load_config( 

164 self.market_config.agent_config, config_type=AgentConfig 

165 ) 

166 

167 self.market_module_config = self.module_handler.get_module( 

168 config=self.market_agent_config, module_type=self.market_config.module_type 

169 ) 

170 

171 self.run_config_validations() 

172 

173 def generate_flex_agents(self) -> list[str]: 

174 """Generate the configs and the python module for the flexibility agents. 

175 

176 Returns: 

177 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator 

178 and market config 

179 

180 """ 

181 # adapt modules to include necessary communication variables 

182 baseline_mpc_config = self.adapt_mpc_module_config( 

183 module_config=self.baseline_mpc_module_config, 

184 mpc_dataclass=self.flex_config.baseline_config_generator_data, 

185 agent_id=self.flex_config.baseline_config_generator_data.agent_id, 

186 ) 

187 pf_mpc_config = self.adapt_mpc_module_config( 

188 module_config=self.pos_flex_mpc_module_config, 

189 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

190 agent_id=self.flex_config.shadow_mpc_config_generator_data.pos_flex.agent_id, 

191 ) 

192 nf_mpc_config = self.adapt_mpc_module_config( 

193 module_config=self.neg_flex_mpc_module_config, 

194 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

195 agent_id=self.flex_config.shadow_mpc_config_generator_data.neg_flex.agent_id, 

196 ) 

197 indicator_module_config = self.adapt_indicator_module_config( 

198 module_config=self.indicator_module_config 

199 ) 

200 if self.flex_config.market_config: 

201 market_module_config = self.adapt_market_module_config( 

202 module_config=self.market_module_config 

203 ) 

204 

205 # dump jsons of the agents including the adapted module configs 

206 self.append_module_and_dump_agent( 

207 module=baseline_mpc_config, 

208 agent=self.baseline_mpc_agent_config, 

209 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

210 config_name=self.flex_config.baseline_config_generator_data. 

211 name_of_created_file, 

212 ) 

213 self.append_module_and_dump_agent( 

214 module=pf_mpc_config, 

215 agent=self.pos_flex_mpc_agent_config, 

216 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

217 config_name=self.flex_config.shadow_mpc_config_generator_data. 

218 pos_flex.name_of_created_file, 

219 ) 

220 self.append_module_and_dump_agent( 

221 module=nf_mpc_config, 

222 agent=self.neg_flex_mpc_agent_config, 

223 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

224 config_name=self.flex_config.shadow_mpc_config_generator_data. 

225 neg_flex.name_of_created_file, 

226 ) 

227 self.append_module_and_dump_agent( 

228 module=indicator_module_config, 

229 agent=self.indicator_agent_config, 

230 module_type=cmng.INDICATOR_CONFIG_TYPE, 

231 config_name=self.indicator_config.name_of_created_file, 

232 ) 

233 if self.flex_config.market_config: 

234 self.append_module_and_dump_agent( 

235 module=market_module_config, 

236 agent=self.market_agent_config, 

237 module_type=self.market_config.module_type, 

238 config_name=self.market_config.name_of_created_file, 

239 ) 

240 # generate python files for the shadow mpcs 

241 self._generate_flex_model_definition() 

242 

243 # add new paths to flex config and dump it 

244 self.adapt_and_dump_flex_config() 

245 

246 # register the exit function if the corresponding flag is set 

247 if self.flex_config.delete_files: 

248 atexit.register(lambda: self._delete_created_files()) 

249 

250 return self.get_config_file_paths() 

251 

252 def append_module_and_dump_agent( 

253 self, 

254 module: BaseModuleConfig, 

255 agent: AgentConfig, 

256 module_type: str, 

257 config_name: str, 

258 ): 

259 """Append the given module config to the given agent config and 

260 dumps the agent config to a json file. 

261 

262 The json file is named based on the config_name. 

263 

264 Args: 

265 module: The module config to be appended. 

266 agent: The agent config to be updated. 

267 module_type: The type of the module 

268 config_name: The name of the json file for module config (e.g. baseline.json) 

269 

270 """ 

271 # get the module as a dict without default values 

272 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module) 

273 # write given module to agent config 

274 for i, agent_module in enumerate(agent.modules): 

275 if module_type == agent_module["type"]: 

276 agent.modules[i] = module_dict 

277 

278 # dump agent config 

279 if agent.modules: 

280 if self.flex_config.overwrite_files: 

281 try: 

282 Path(os.path.join(self.flex_config.flex_files_directory, 

283 config_name)).unlink() 

284 except OSError: 

285 pass 

286 with open( 

287 os.path.join(self.flex_config.flex_files_directory, config_name), 

288 "w+", 

289 encoding="utf-8", 

290 ) as f: 

291 module_json = agent.model_dump_json(exclude_defaults=True) 

292 f.write(module_json) 

293 else: 

294 logging.error("Provided agent config does not contain any modules.") 

295 

296 def get_config_file_paths(self) -> list[str]: 

297 """Return a list of paths with the created config files.""" 

298 paths = [ 

299 os.path.join( 

300 self.flex_config.flex_files_directory, 

301 self.flex_config.baseline_config_generator_data. 

302 name_of_created_file, 

303 ), 

304 os.path.join( 

305 self.flex_config.flex_files_directory, 

306 self.flex_config.shadow_mpc_config_generator_data.pos_flex. 

307 name_of_created_file, 

308 ), 

309 os.path.join( 

310 self.flex_config.flex_files_directory, 

311 self.flex_config.shadow_mpc_config_generator_data.neg_flex. 

312 name_of_created_file, 

313 ), 

314 os.path.join( 

315 self.flex_config.flex_files_directory, 

316 self.indicator_config.name_of_created_file, 

317 ), 

318 ] 

319 if self.flex_config.market_config: 

320 paths.append( 

321 os.path.join( 

322 self.flex_config.flex_files_directory, 

323 self.market_config.name_of_created_file, 

324 ) 

325 ) 

326 return paths 

327 

328 def _delete_created_files(self): 

329 """Function to run at exit if the files are to be deleted.""" 

330 to_be_deleted = self.get_config_file_paths() 

331 to_be_deleted.append( 

332 os.path.join( 

333 self.flex_config.flex_files_directory, 

334 self.flex_config_file_name, 

335 ) 

336 ) 

337 # delete files 

338 for file in to_be_deleted: 

339 Path(file).unlink() 

340 # also delete folder 

341 Path(self.flex_config.flex_files_directory).rmdir() 

342 

343 def adapt_mpc_module_config( 

344 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str 

345 ) -> MPCConfig: 

346 """Adapt the mpc module config for automated flexibility quantification. 

347 

348 Things adapted among others are: 

349 - the file name/path of the mpc config file 

350 - names of the control variables for the shadow mpcs 

351 - reduce communicated variables of shadow mpcs to outputs 

352 - add the power variable to the outputs 

353 - add parameters for the activation and quantification of flexibility 

354 

355 Args: 

356 module_config: The module config to be adapted 

357 mpc_dataclass: The dataclass corresponding to the type of the MPC module. 

358 It contains all the extra data necessary for flexibility 

359 quantification, which will be used to update the 

360 module_config. 

361 agent_id: agent_id for creating the FlexQuant mpc module config 

362 

363 Returns: 

364 The adapted module config 

365 

366 """ 

367 # allow the module config to be changed 

368 module_config.model_config["frozen"] = False 

369 

370 # set new MPC type 

371 module_config.type = mpc_dataclass.module_types[ 

372 cmng.get_orig_module_type(self.orig_mpc_agent_config) 

373 ] 

374 

375 # set the MPC config type from the MPCConfig in agentlib_mpc to the 

376 # corresponding one in flexquant and add additional fields 

377 module_config_flex_dict = module_config.model_dump() 

378 module_config_flex_dict["casadi_sim_time_step"] = ( 

379 self.flex_config.casadi_sim_time_step) 

380 module_config_flex_dict["power_variable_name"] = ( 

381 self.flex_config.baseline_config_generator_data.power_variable) 

382 module_config_flex_dict["storage_variable_name"] = ( 

383 self.indicator_module_config.correct_costs.stored_energy_variable) 

384 module_config_flex = self.module_handler.module_type_dict[module_config.type]( 

385 **module_config_flex_dict, _agent_id=agent_id 

386 ) 

387 

388 # HOTFIX due to AgentLib-MPC bug. Needs to be adapted after Objectives 

389 # in AgentLib-MPC are fixed. 

390 if module_config_flex.r_del_u is None: 

391 module_config_flex = module_config_flex.model_copy(update={"r_del_u": {}}) 

392 

393 # allow the module config to be changed 

394 module_config_flex.model_config["frozen"] = False 

395 

396 module_config_flex.module_id = mpc_dataclass.module_id 

397 

398 # set new id (needed for plotting) 

399 module_config_flex.module_id = mpc_dataclass.module_id 

400 # update optimization backend to use the created mpc files and classes 

401 module_config_flex.optimization_backend["model"]["type"] = { 

402 "file": os.path.join( 

403 self.flex_config.flex_files_directory, 

404 mpc_dataclass.created_flex_mpcs_file, 

405 ), 

406 "class_name": mpc_dataclass.class_name, 

407 } 

408 # extract filename from results file and update it with 

409 # suffix and parent directory 

410 result_filename = Path( 

411 module_config_flex.optimization_backend["results_file"] 

412 ).name.replace(".csv", mpc_dataclass.results_suffix) 

413 full_path = self.flex_config.results_directory / result_filename 

414 module_config_flex.optimization_backend["results_file"] = str(full_path) 

415 # change cia backend to custom backend of flexquant 

416 if module_config_flex.optimization_backend["type"] == "casadi_cia": 

417 module_config_flex.optimization_backend["type"] = \ 

418 "agentlib_flexquant.casadi_cia_cons" 

419 if (module_config_flex.optimization_backend["type"] == 

420 "agentlib_flexquant.casadi_cia_cons"): 

421 module_config_flex.optimization_backend["market_time"] = ( 

422 self.flex_config.market_time) 

423 

424 # add the full control trajectory output from the baseline as input for the 

425 # shadow mpcs, they are directly included in the optimization problem 

426 if not isinstance(mpc_dataclass, BaselineMPCData): 

427 for control in module_config_flex.controls: 

428 module_config_flex.inputs.append( 

429 MPCVariable( 

430 name=control.name + glbs.full_trajectory_suffix, 

431 value=None, 

432 type="pd.Series", 

433 ) 

434 ) 

435 # add full control names to shadow MPC config for inputs tracking 

436 module_config_flex.full_control_names.append( 

437 control.name + glbs.full_trajectory_suffix) 

438 # change the alias of control variable in shadow mpc to 

439 # prevent it from triggering the wrong callback 

440 control.alias = control.name + glbs.shadow_suffix 

441 # also include binary controls 

442 if hasattr(module_config_flex, "binary_controls"): 

443 for control in module_config_flex.binary_controls: 

444 module_config_flex.inputs.append( 

445 MPCVariable( 

446 name=control.name + glbs.full_trajectory_suffix, 

447 value=None, 

448 type="pd.Series", 

449 ) 

450 ) 

451 # add full control names to shadow MPC config for inputs tracking 

452 module_config_flex.full_control_names.append( 

453 control.name + glbs.full_trajectory_suffix) 

454 # change the alias of control variable in shadow mpc to 

455 # prevent it from triggering the wrong callback 

456 control.alias = control.name + glbs.shadow_suffix 

457 # only communicate outputs for the shadow mpcs 

458 module_config_flex.shared_variable_fields = ["outputs"] 

459 

460 # In addition to creating the full control variables, the inputs 

461 # and states of the Baseline are communicated to the Shadow MPC 

462 # to ensure synchronisation. Therefore, all inputs and states of 

463 # the Baseline are added to the Shadow MPCs with an alias 

464 baseline_names = {inp.name for inp in 

465 self.baseline_mpc_module_config.inputs} 

466 for i, input in enumerate(module_config_flex.inputs): 

467 if input.name in baseline_names: 

468 module_config_flex.inputs[i].alias = ( 

469 input.name + glbs.base_vars_to_communicate_suffix) 

470 

471 # add Baseline input names to shadow MPC config for inputs tracking 

472 # if Baseline variable is also set in config_inputs_appendix this is 

473 # due to overwriting the alias, so variable should not be added here 

474 appendix_names = {inp.name for inp in mpc_dataclass.config_inputs_appendix} 

475 module_config_flex.baseline_input_names = [ 

476 input.name + glbs.base_vars_to_communicate_suffix for input in 

477 self.baseline_mpc_module_config.inputs 

478 if input.name not in appendix_names] 

479 

480 # add custom input names for the shadow MPC to track. Here, the 

481 # communication suffix is not added, as the user is free to define 

482 # custom inputs as desired. 

483 # Exclude in_provision, as this is not regularly set and would prevent 

484 # the do_step of the shadow MPC. 

485 module_config_flex.custom_input_names.extend([ 

486 {"name": input.name, "alias": input.alias} 

487 for input in mpc_dataclass.config_inputs_appendix 

488 if input.name not in [glbs.PROVISION_VAR_NAME] 

489 ]) 

490 

491 for i, state in enumerate(module_config_flex.states): 

492 if state in self.baseline_mpc_module_config.states: 

493 module_config_flex.states[i].alias = ( 

494 state.name + glbs.base_vars_to_communicate_suffix) 

495 # add Baseline state names to shadow MPC config for inputs tracking 

496 module_config_flex.baseline_state_names = [ 

497 state.name + glbs.base_vars_to_communicate_suffix for state in 

498 self.baseline_mpc_module_config.states] 

499 module_config_flex.baseline_agent_id = ( 

500 self.flex_config.baseline_config_generator_data.agent_id) 

501 

502 else: 

503 # all the variables here are added to the custom MPCConfig of 

504 # FlexQuant to avoid them being added to the optimization problem 

505 # add full_controls trajectory as AgentVariable to the config of 

506 # Baseline mpc 

507 for control in module_config_flex.controls: 

508 module_config_flex.full_controls.append( 

509 AgentVariable( 

510 name=control.name + glbs.full_trajectory_suffix, 

511 alias=control.name + glbs.full_trajectory_suffix, 

512 shared=True, 

513 ) 

514 ) 

515 if hasattr(module_config_flex, "binary_controls"): 

516 for binary_controls in module_config_flex.binary_controls: 

517 module_config_flex.full_controls.append( 

518 AgentVariable( 

519 name=binary_controls.name + glbs.full_trajectory_suffix, 

520 alias=binary_controls.name + glbs.full_trajectory_suffix, 

521 shared=True, 

522 ) 

523 ) 

524 # add full controls to custom cia backend to constrain 

525 # during market time 

526 if (module_config_flex.optimization_backend["type"] == 

527 "agentlib_flexquant.casadi_cia_cons"): 

528 module_config_flex.optimization_backend["full_controls_dict"] = ( 

529 dict(zip([var.name for var in module_config_flex.full_controls], 

530 [None] * len(module_config_flex.full_controls)) 

531 )) 

532 # add input and states copy variables which send the Baseline inputs 

533 # to the shadow MPC 

534 for input in module_config_flex.inputs: 

535 module_config_flex.vars_to_communicate.append( 

536 AgentVariable( 

537 name=input.name + glbs.base_vars_to_communicate_suffix, 

538 alias=input.name + glbs.base_vars_to_communicate_suffix, 

539 shared=True, 

540 ) 

541 ) 

542 for state in module_config_flex.states: 

543 module_config_flex.vars_to_communicate.append( 

544 AgentVariable( 

545 name=state.name + glbs.base_vars_to_communicate_suffix, 

546 alias=state.name + glbs.base_vars_to_communicate_suffix, 

547 shared=True, 

548 ) 

549 ) 

550 

551 module_config_flex.set_outputs = True 

552 # add outputs for the power variables, for easier handling create a lookup dict 

553 output_dict = {output.name: output for output in module_config_flex.outputs} 

554 if self.flex_config.baseline_config_generator_data.power_variable in output_dict: 

555 output_dict[ 

556 self.flex_config.baseline_config_generator_data.power_variable 

557 ].alias = mpc_dataclass.power_alias 

558 else: 

559 module_config_flex.outputs.append( 

560 MPCVariable( 

561 name=self.flex_config.baseline_config_generator_data.power_variable, 

562 alias=mpc_dataclass.power_alias, 

563 ) 

564 ) 

565 # add or change alias for stored energy variable 

566 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

567 output_dict[ 

568 self.indicator_module_config.correct_costs.stored_energy_variable 

569 ].alias = mpc_dataclass.stored_energy_alias 

570 

571 # add extra inputs needed for activation of flex or custom cost functions 

572 existing_input_names = {inp.name: idx for idx, inp in 

573 enumerate(module_config_flex.inputs)} 

574 for appendix_inp in mpc_dataclass.config_inputs_appendix.copy(): 

575 # If variable already exists in the config 

576 if appendix_inp.name in existing_input_names: 

577 self.logger.warning(f"The given variable {appendix_inp.name} in the " 

578 f"config_inputs_appendix already exists in the MPC " 

579 f"model. I am updating the alias of the existing " 

580 f"variable to {appendix_inp.alias} (provided by you). " 

581 f"However, this can still cause issues down the line " 

582 f"if the alias is not chosen wisely.") 

583 # Update only the alias of the existing input 

584 idx = existing_input_names[appendix_inp.name] 

585 existing_inp = module_config_flex.inputs[idx] 

586 inp_dict = existing_inp.dict() 

587 inp_dict["alias"] = appendix_inp.alias 

588 module_config_flex.inputs[idx] = type(existing_inp)(**inp_dict) 

589 # Remove variable from appendix list to avoid creation during parsing 

590 mpc_dataclass.config_inputs_appendix.remove(appendix_inp) 

591 else: 

592 # Add the new input 

593 module_config_flex.inputs.append(appendix_inp) 

594 

595 # add extra parameters needed for activation of flex or custom weights 

596 for var in mpc_dataclass.config_parameters_appendix: 

597 if var.name in self.flex_config.model_fields: 

598 var.value = getattr(self.flex_config, var.name) 

599 if var.name in self.flex_config.baseline_config_generator_data.model_fields: 

600 var.value = getattr(self.flex_config.baseline_config_generator_data, 

601 var.name) 

602 module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix) 

603 

604 # freeze the config again 

605 module_config_flex.model_config["frozen"] = True 

606 

607 return module_config_flex 

608 

609 def adapt_indicator_module_config( 

610 self, module_config: FlexibilityIndicatorModuleConfig 

611 ) -> FlexibilityIndicatorModuleConfig: 

612 """Adapt the indicator module config for automated flexibility 

613 quantification. 

614 

615 """ 

616 # append user-defined price var to indicator module config 

617 module_config.inputs.append( 

618 AgentVariable( 

619 name=module_config.price_variable, 

620 unit="ct/kWh", 

621 type="pd.Series", 

622 description="electricity price", 

623 ) 

624 ) 

625 module_config.inputs.append( 

626 AgentVariable( 

627 name=module_config.price_variable_feed_in, 

628 unit="ct/kWh", 

629 type="pd.Series", 

630 description="electricity feed-in price", 

631 ) 

632 ) 

633 # allow the module config to be changed 

634 module_config.model_config["frozen"] = False 

635 for parameter in module_config.parameters: 

636 if parameter.name == glbs.PREP_TIME: 

637 parameter.value = self.flex_config.prep_time 

638 if parameter.name == glbs.MARKET_TIME: 

639 parameter.value = self.flex_config.market_time 

640 if parameter.name == glbs.FLEX_EVENT_DURATION: 

641 parameter.value = self.flex_config.flex_event_duration 

642 if parameter.name == glbs.TIME_STEP: 

643 parameter.value = self.baseline_mpc_module_config.time_step 

644 if parameter.name == glbs.PREDICTION_HORIZON: 

645 parameter.value = self.baseline_mpc_module_config.prediction_horizon 

646 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

647 dis_op = self.baseline_mpc_module_config.optimization_backend[ 

648 "discretization_options" 

649 ] 

650 parameter.value = self.get_collocation_time_grid( 

651 discretization_options=dis_op 

652 ) 

653 # set power unit 

654 module_config.power_unit = ( 

655 self.flex_config.baseline_config_generator_data.power_unit) 

656 module_config.results_file = ( 

657 self.flex_config.results_directory / module_config.results_file.name 

658 ) 

659 module_config.model_config["frozen"] = True 

660 return module_config 

661 

662 def adapt_market_module_config( 

663 self, module_config: FlexibilityMarketModuleConfig 

664 ) -> FlexibilityMarketModuleConfig: 

665 """Adapt the market module config for automated flexibility quantification.""" 

666 # allow the module config to be changed 

667 module_config.model_config["frozen"] = False 

668 for field in module_config.__fields__: 

669 if field in self.market_module_config.__fields__.keys(): 

670 module_config.__setattr__(field, getattr(self.market_module_config, 

671 field)) 

672 module_config.results_file = ( 

673 self.flex_config.results_directory / module_config.results_file.name 

674 ) 

675 for parameter in module_config.parameters: 

676 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

677 dis_op = self.baseline_mpc_module_config.optimization_backend[ 

678 "discretization_options" 

679 ] 

680 parameter.value = self.get_collocation_time_grid( 

681 discretization_options=dis_op 

682 ) 

683 if parameter.name == glbs.TIME_STEP: 

684 parameter.value = self.baseline_mpc_module_config.time_step 

685 module_config.model_config["frozen"] = True 

686 return module_config 

687 

688 def adapt_and_dump_flex_config(self): 

689 """Update flex_config to reference the newly generated market/indicator agent configs and 

690 dump the updated flex configuration to disk. 

691 

692 This method replaces the market and indicator configuration entries in ``self.flex_config`` 

693 with the internally created ``self.market_config`` and ``self.indicator_config``. If a 

694 market configuration is present, its ``agent_config`` attribute is updated to the path of 

695 the newly created market agent config file under ``flex_files_directory``. Likewise, the 

696 indicator configuration's ``agent_config`` attribute is set to the path of the newly 

697 created indicator agent config file. These paths correspond to the new locations of the 

698 market or indicator config files when they were originally provided to the 

699 ``FlexAgentGenerator`` as file paths. 

700 

701 After updating these paths, the complete ``flex_config`` is serialized (excluding default 

702 values) and written as JSON to ``flex_files_directory / flex_config_file_name`` so that 

703 subsequent runs can use the resolved configuration directly. 

704 """ 

705 # store market and indicator with file path of created agent config 

706 if self.flex_config.market_config: 

707 self.flex_config.market_config = self.market_config 

708 self.flex_config.market_config.agent_config = os.path.join( 

709 self.flex_config.flex_files_directory, 

710 self.market_config.name_of_created_file) 

711 self.flex_config.indicator_config = self.indicator_config 

712 self.flex_config.indicator_config.agent_config = os.path.join( 

713 self.flex_config.flex_files_directory, 

714 self.indicator_config.name_of_created_file) 

715 # save flex config to created flex files 

716 with open(os.path.join(self.flex_config.flex_files_directory, 

717 self.flex_config_file_name), 

718 "w", encoding="utf-8", ) as f: 

719 config_json = self.flex_config.model_dump_json(exclude_defaults=True) 

720 f.write(config_json) 

721 

722 def get_collocation_time_grid(self, discretization_options: dict): 

723 """Get the mpc output collocation grid over the horizon""" 

724 # get the mpc time grid configuration 

725 time_step = self.baseline_mpc_module_config.time_step 

726 prediction_horizon = self.baseline_mpc_module_config.prediction_horizon 

727 # get the collocation configuration 

728 collocation_method = discretization_options["collocation_method"] 

729 collocation_order = discretization_options["collocation_order"] 

730 # get the collocation points 

731 options = CasadiDiscretizationOptions( 

732 collocation_order=collocation_order, collocation_method=collocation_method 

733 ) 

734 collocation_points = DirectCollocation(options= 

735 options)._collocation_polynomial().root 

736 # compute the mpc output collocation grid 

737 discretization_points = np.arange(0, time_step * prediction_horizon, time_step) 

738 collocation_time_grid = ( 

739 discretization_points[:, None] + collocation_points * time_step 

740 ).ravel() 

741 collocation_time_grid = collocation_time_grid[ 

742 ~np.isin(collocation_time_grid, discretization_points) 

743 ] 

744 collocation_time_grid = collocation_time_grid.tolist() 

745 return collocation_time_grid 

746 

747 def _generate_flex_model_definition(self): 

748 """Generate a python module for negative and positive flexibility agents 

749 from the Baseline MPC model.""" 

750 output_file = os.path.join( 

751 self.flex_config.flex_files_directory, 

752 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

753 ) 

754 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"] 

755 

756 # Extract the config class of the casadi model to check cost functions 

757 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"] 

758 # Get custom module fields provided by the user and add them 

759 model_fields = self.baseline_mpc_module_config.optimization_backend["model"] 

760 _ = model_fields.pop("type") 

761 config_instance = config_class(**model_fields) 

762 # The " + " is just there to simplify the validation, it does not affect 

763 # the generated code 

764 self.check_variables_in_casadi_config( 

765 config_instance, 

766 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function + 

767 ( 

768 " + " + self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix 

769 if self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix else ""), 

770 shadow_mpc_type="neg_flex" 

771 ) 

772 self.check_variables_in_casadi_config( 

773 config_instance, 

774 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function + 

775 ( 

776 " + " + self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix 

777 if self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix else ""), 

778 shadow_mpc_type="pos_flex" 

779 ) 

780 

781 # parse mpc python file 

782 with open(opt_backend["file"], "r", encoding="utf-8") as f: 

783 source = f.read() 

784 tree = ast.parse(source) 

785 

786 # create modifiers for python file 

787 modifier_base = SetupSystemModifier( 

788 mpc_data=self.flex_config.baseline_config_generator_data, 

789 controls=self.baseline_mpc_module_config.controls, 

790 binary_controls=self.baseline_mpc_module_config.binary_controls 

791 if hasattr(self.baseline_mpc_module_config, "binary_controls") 

792 else None, 

793 ) 

794 modifier_pos = SetupSystemModifier( 

795 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

796 controls=self.pos_flex_mpc_module_config.controls, 

797 binary_controls=self.pos_flex_mpc_module_config.binary_controls 

798 if hasattr(self.pos_flex_mpc_module_config, "binary_controls") 

799 else None, 

800 ) 

801 modifier_neg = SetupSystemModifier( 

802 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

803 controls=self.neg_flex_mpc_module_config.controls, 

804 binary_controls=self.neg_flex_mpc_module_config.binary_controls 

805 if hasattr(self.neg_flex_mpc_module_config, "binary_controls") 

806 else None, 

807 ) 

808 # run the modification 

809 modified_tree_base = modifier_base.visit(deepcopy(tree)) 

810 modified_tree_pos = modifier_pos.visit(deepcopy(tree)) 

811 modified_tree_neg = modifier_neg.visit(deepcopy(tree)) 

812 # combine modifications to one file 

813 modified_tree = ast.Module(body=[], type_ignores=[]) 

814 modified_tree.body.extend( 

815 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body 

816 ) 

817 modified_source = astor.to_source(modified_tree) 

818 # Use black to format the generated code 

819 formatted_code = black.format_str(modified_source, mode=black.FileMode()) 

820 

821 if self.flex_config.overwrite_files: 

822 try: 

823 Path( 

824 os.path.join( 

825 self.flex_config.flex_files_directory, 

826 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

827 ) 

828 ).unlink() 

829 except OSError: 

830 pass 

831 

832 with open(output_file, "w", encoding="utf-8") as f: 

833 f.write(formatted_code) 

834 

835 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str, 

836 shadow_mpc_type: str): 

837 """Check if all variables in the expression are defined in the config. 

838 

839 Args: 

840 config: casadi model config. 

841 expr: The expression to check. 

842 

843 Raises: 

844 ValueError: If any variable in the expression is not defined in the config. 

845 

846 """ 

847 variables_in_config = set(config.get_variable_names()) 

848 variables_in_cost_function = set(ast.walk(ast.parse(expr))) 

849 variables_in_cost_function = { 

850 node.attr for node in variables_in_cost_function 

851 if isinstance(node, ast.Attribute) 

852 } 

853 flex_config_data = (self.flex_config.shadow_mpc_config_generator_data.pos_flex 

854 if shadow_mpc_type == "pos_flex" 

855 else self.flex_config.shadow_mpc_config_generator_data.neg_flex) 

856 variables_newly_created = set( 

857 [par.name for par in flex_config_data.config_parameters_appendix] + 

858 [inp.name for inp in flex_config_data.config_inputs_appendix] 

859 ) 

860 

861 unknown_vars = (variables_in_cost_function - variables_in_config - 

862 variables_newly_created) 

863 if unknown_vars: 

864 self.logger.warning(f"Unknown variables in new cost function: " 

865 f"{unknown_vars}. This might cause problems with " 

866 f"the optimization backend.") 

867 

868 def run_config_validations(self): 

869 """Function to validate integrity of user-supplied flex config. 

870 

871 Since the validation depends on interactions between multiple configurations, 

872 it is performed within this function rather than using Pydantic’s built-in 

873 validators for individual configurations. 

874 

875 The following checks are performed: 

876 1. Ensures the specified power variable exists in the MPC model outputs. 

877 2. Ensures the specified comfort variable exists in the MPC model states. 

878 3. Validates that the stored energy variable exists in MPC outputs if 

879 energy cost correction is enabled. 

880 4. Verifies the supported collocation method is used; otherwise, 

881 switches to 'legendre' and raises a warning. 

882 5. Ensures that the sum of prep time, market time, and flex event duration 

883 does not exceed the prediction horizon. 

884 6. Ensures market time equals the MPC model time step if market config is 

885 present. 

886 7. Ensures that all flex time values are multiples of the MPC model time step. 

887 8. Checks for mismatches between time-related parameters in the flex/MPC and 

888 indicator configs and issues warnings when discrepancies exist, using the 

889 flex/MPC config values as the source of truth. 

890 

891 """ 

892 # check if the power variable exists in the mpc config 

893 power_var = self.flex_config.baseline_config_generator_data.power_variable 

894 if power_var not in [output.name for output in 

895 self.baseline_mpc_module_config.outputs]: 

896 raise ConfigurationError( 

897 f"Given power variable {power_var} is not defined " 

898 f"as output in baseline mpc config." 

899 ) 

900 

901 # check if the comfort variable exists in the mpc slack variables 

902 mod_type = self.baseline_mpc_module_config.optimization_backend["model"]["type"] 

903 if self.flex_config.baseline_config_generator_data.comfort_variable: 

904 file_path = mod_type["file"] 

905 class_name = mod_type["class_name"] 

906 # Get the class 

907 dynamic_class = cmng.get_class_from_file(file_path, class_name) 

908 if self.flex_config.baseline_config_generator_data.comfort_variable not in [ 

909 state.name for state in dynamic_class().states 

910 ]: 

911 raise ConfigurationError( 

912 f"Given comfort variable " 

913 f"{self.flex_config.baseline_config_generator_data.comfort_variable} " 

914 f"is not defined as state in baseline mpc config." 

915 ) 

916 

917 # check if the energy storage variable exists in the mpc config 

918 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

919 if self.indicator_module_config.correct_costs.stored_energy_variable not in [ 

920 output.name for output in self.baseline_mpc_module_config.outputs 

921 ]: 

922 raise ConfigurationError( 

923 f"The stored energy variable " 

924 f"{self.indicator_module_config.correct_costs.stored_energy_variable} " 

925 f"is not defined in baseline mpc config. " 

926 f"It must be defined in the base MPC model and config as output " 

927 f"if the correction of costs is enabled." 

928 ) 

929 

930 # raise warning if unsupported collocation method is used and change 

931 # to supported method 

932 if ( 

933 "collocation_method" 

934 not in self.baseline_mpc_module_config.optimization_backend[ 

935 "discretization_options"] 

936 ): 

937 raise ConfigurationError( 

938 "Please use collocation as discretization method and define the " 

939 "collocation_method in the mpc config" 

940 ) 

941 else: 

942 collocation_method = self.baseline_mpc_module_config.optimization_backend[ 

943 "discretization_options" 

944 ]["collocation_method"] 

945 if collocation_method != "legendre": 

946 self.logger.warning( 

947 "Collocation method %s is not supported. Switching to " 

948 "method legendre.", 

949 collocation_method, 

950 ) 

951 self.baseline_mpc_module_config.optimization_backend[ 

952 "discretization_options"][ 

953 "collocation_method" 

954 ] = "legendre" 

955 self.pos_flex_mpc_module_config.optimization_backend[ 

956 "discretization_options"][ 

957 "collocation_method" 

958 ] = "legendre" 

959 self.neg_flex_mpc_module_config.optimization_backend[ 

960 "discretization_options"][ 

961 "collocation_method" 

962 ] = "legendre" 

963 

964 # time data validations 

965 flex_times = { 

966 glbs.PREP_TIME: self.flex_config.prep_time, 

967 glbs.MARKET_TIME: self.flex_config.market_time, 

968 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration, 

969 } 

970 mpc_times = { 

971 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step, 

972 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon, 

973 } 

974 # total time length check (prep+market+flex_event) 

975 if (sum(flex_times.values()) > mpc_times["time_step"] * 

976 mpc_times["prediction_horizon"]): 

977 raise ConfigurationError( 

978 "Market time + prep time + flex event duration " 

979 "can not exceed the prediction horizon." 

980 ) 

981 # market time val check 

982 if self.flex_config.market_config: 

983 if flex_times["market_time"] % mpc_times["time_step"] != 0: 

984 raise ConfigurationError( 

985 "Market time must be an integer multiple of the time step." 

986 ) 

987 # check for divisibility of flex_times by time_step 

988 for name, value in flex_times.items(): 

989 if value % mpc_times["time_step"] != 0: 

990 raise ConfigurationError( 

991 f"{name} is not a multiple of the time step. Please redefine." 

992 ) 

993 # raise warning if parameter value in flex indicator module config differs from 

994 # value in flex config/ baseline mpc module config 

995 for parameter in self.indicator_module_config.parameters: 

996 if parameter.value is not None: 

997 if parameter.name in flex_times: 

998 flex_value = flex_times[parameter.name] 

999 if parameter.value != flex_value: 

1000 self.logger.warning( 

1001 "Value mismatch for %s in flex config (field) " 

1002 "and indicator module config (parameter). " 

1003 "Flex config value will be used.", 

1004 parameter.name, 

1005 ) 

1006 elif parameter.name in mpc_times: 

1007 mpc_value = mpc_times[parameter.name] 

1008 if parameter.value != mpc_value: 

1009 self.logger.warning( 

1010 "Value mismatch for %s in baseline MPC module " 

1011 "config (field) and indicator module config (parameter). " 

1012 "Baseline MPC module config value will be used.", 

1013 parameter.name, 

1014 ) 

1015 

1016 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path], 

1017 save_name_suffix: str = "") -> Union[str, Path]: 

1018 """ 

1019 Optional helper function to adapt file path for simulator results in sim config, 

1020 so that sim results land in the same results directory as flex results. 

1021 

1022 Args: 

1023 simulator_agent_config: Path to the simulator agent config JSON file. 

1024 save_name_suffix: Suffix added to the newly created sim_config file. 

1025 

1026 Returns: 

1027 The updated simulator config dictionary with the modified result file path. 

1028 

1029 Raises: 

1030 FileNotFoundError: If the specified config file does not exist. 

1031 

1032 """ 

1033 simulator_agent_config = Path(simulator_agent_config) 

1034 # open config and extract sim module 

1035 with open(simulator_agent_config, "r", encoding="utf-8") as f: 

1036 sim_config = json.load(f) 

1037 sim_module_config = next( 

1038 (module for module in sim_config["modules"] if 

1039 module["type"] == "simulator"), None) 

1040 # convert filename string to path and extract the name 

1041 sim_file_name = Path(sim_module_config["result_filename"]).name 

1042 # set results path so that sim results lands in same directory 

1043 # as flex result CSVs 

1044 sim_module_config["result_filename"] = str( 

1045 self.flex_config.results_directory / sim_file_name 

1046 ) 

1047 try: 

1048 with open(Path(str(simulator_agent_config.parent) + "\\" + 

1049 str(simulator_agent_config.stem) + save_name_suffix + ".json"), 

1050 "w", encoding="utf-8") as f: 

1051 json.dump(sim_config, f, indent=4) 

1052 return Path(str(simulator_agent_config.parent) + "\\" + 

1053 str(simulator_agent_config.stem) + save_name_suffix + ".json") 

1054 except Exception as e: 

1055 raise Exception(f"Could not adapt and create a new simulation config " 

1056 f"due to: {e}. " 

1057 f"Please check {simulator_agent_config} and " 

1058 f"'{save_name_suffix}'")