Coverage for agentlib_flexquant/generate_flex_agents.py: 92%

296 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1"""Generate agents for flexibility quantification. 

2 

3This module provides the FlexAgentGenerator class that creates and configures flexibility agents. 

4The agents created include the baseline, positive and negative flexibility agents, 

5the flexibility indicator and market agents. The agents are created based on the flex config and 

6the MPC config. 

7""" 

8import ast 

9import atexit 

10import inspect 

11import json 

12import logging 

13import os 

14from copy import deepcopy 

15from pathlib import Path 

16from typing import Union 

17 

18import astor 

19import black 

20import json 

21import numpy as np 

22from copy import deepcopy 

23from pathlib import Path 

24from typing import List, Union 

25from pydantic import FilePath 

26from agentlib.core.agent import AgentConfig 

27from agentlib.core.datamodels import AgentVariable 

28from agentlib.core.errors import ConfigurationError 

29from agentlib.core.module import BaseModuleConfig 

30from agentlib.utils import custom_injection, load_config 

31from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable 

32from agentlib_mpc.models.casadi_model import CasadiModelConfig 

33from agentlib_mpc.modules.mpc_full import MPCConfig 

34 

35from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation 

36from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions 

37import agentlib_flexquant.data_structures.globals as glbs 

38import agentlib_flexquant.utils.config_management as cmng 

39from agentlib_flexquant.utils.parsing import SetupSystemModifier 

40from agentlib_flexquant.data_structures.flexquant import ( 

41 FlexibilityIndicatorConfig, 

42 FlexibilityMarketConfig, 

43 FlexQuantConfig, 

44) 

45from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData 

46from agentlib_flexquant.modules.flexibility_indicator import ( 

47 FlexibilityIndicatorModuleConfig, 

48) 

49from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig 

50 

51 

52class FlexAgentGenerator: 

53 """Class for generating the flex agents 

54 

55 orig_mpc_module_config: the config for the original mpc, 

56 which has nothing to do with the flexibility quantification 

57 baseline_mpc_module_config: the config for the baseline mpc 

58 for flexibility quantification 

59 pos_flex_mpc_module_config: the config for the positive flexibility mpc 

60 for flexibility quantification 

61 neg_flex_mpc_module_config: the config for the negative flexibility mpc 

62 for flexibility quantification 

63 indicator_module_config: the config for the indicator for flexibility quantification 

64 market_module_config: the config for the market for flexibility quantification 

65 

66 """ 

67 

68 orig_mpc_module_config: MPCConfig 

69 baseline_mpc_module_config: MPCConfig 

70 pos_flex_mpc_module_config: MPCConfig 

71 neg_flex_mpc_module_config: MPCConfig 

72 indicator_module_config: FlexibilityIndicatorModuleConfig 

73 market_module_config: FlexibilityMarketModuleConfig 

74 

75 def __init__( 

76 self, 

77 flex_config: Union[str, FilePath, FlexQuantConfig], 

78 mpc_agent_config: Union[str, FilePath, AgentConfig], 

79 ): 

80 self.logger = logging.getLogger(__name__) 

81 

82 if isinstance(flex_config, str or FilePath): 

83 self.flex_config_file_name = os.path.basename(flex_config) 

84 else: 

85 # provide default name for json 

86 self.flex_config_file_name = "flex_config.json" 

87 # load configs 

88 self.flex_config = load_config.load_config(flex_config, config_type=FlexQuantConfig) 

89 

90 # original mpc agent 

91 self.orig_mpc_agent_config = load_config.load_config( 

92 mpc_agent_config, config_type=AgentConfig 

93 ) 

94 # baseline agent 

95 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

96 # pos agent 

97 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

98 # neg agent 

99 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__() 

100 

101 # original mpc module 

102 self.orig_mpc_module_config = cmng.get_module( 

103 config=self.orig_mpc_agent_config, 

104 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

105 ) 

106 # baseline module 

107 self.baseline_mpc_module_config = cmng.get_module( 

108 config=self.baseline_mpc_agent_config, 

109 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

110 ) 

111 # convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional 

112 # fields not present in the original 

113 self.baseline_mpc_module_config = cmng.get_flex_mpc_module_config( 

114 agent_config=self.baseline_mpc_agent_config, 

115 module_config=self.baseline_mpc_module_config, 

116 module_type=self.flex_config.baseline_config_generator_data.module_types[ 

117 self.baseline_mpc_module_config.type 

118 ], 

119 ) 

120 # pos module 

121 self.pos_flex_mpc_module_config = cmng.get_module( 

122 config=self.pos_flex_mpc_agent_config, 

123 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

124 ) 

125 # neg module 

126 self.neg_flex_mpc_module_config = cmng.get_module( 

127 config=self.neg_flex_mpc_agent_config, 

128 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

129 ) 

130 # load indicator config 

131 self.indicator_config = load_config.load_config( 

132 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig 

133 ) 

134 # load indicator module config 

135 self.indicator_agent_config = load_config.load_config( 

136 self.indicator_config.agent_config, config_type=AgentConfig 

137 ) 

138 self.indicator_module_config = cmng.get_module( 

139 config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE 

140 ) 

141 # load market config 

142 if self.flex_config.market_config: 

143 self.market_config = load_config.load_config( 

144 self.flex_config.market_config, config_type=FlexibilityMarketConfig 

145 ) 

146 # load market module config 

147 self.market_agent_config = load_config.load_config( 

148 self.market_config.agent_config, config_type=AgentConfig 

149 ) 

150 self.market_module_config = cmng.get_module( 

151 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE 

152 ) 

153 else: 

154 self.flex_config.market_time = 0 

155 

156 self.run_config_validations() 

157 

158 def generate_flex_agents(self) -> list[str]: 

159 """Generate the configs and the python module for the flexibility agents. 

160 

161 Returns: 

162 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator 

163 and market config 

164 

165 """ 

166 # adapt modules to include necessary communication variables 

167 baseline_mpc_config = self.adapt_mpc_module_config( 

168 module_config=self.baseline_mpc_module_config, 

169 mpc_dataclass=self.flex_config.baseline_config_generator_data, 

170 agent_id=self.baseline_mpc_agent_config.id, 

171 ) 

172 pf_mpc_config = self.adapt_mpc_module_config( 

173 module_config=self.pos_flex_mpc_module_config, 

174 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

175 agent_id=self.pos_flex_mpc_agent_config.id, 

176 ) 

177 nf_mpc_config = self.adapt_mpc_module_config( 

178 module_config=self.neg_flex_mpc_module_config, 

179 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

180 agent_id=self.neg_flex_mpc_agent_config.id, 

181 ) 

182 indicator_module_config = self.adapt_indicator_config( 

183 module_config=self.indicator_module_config 

184 ) 

185 if self.flex_config.market_config: 

186 market_module_config = self.adapt_market_config(module_config=self.market_module_config) 

187 

188 # dump jsons of the agents including the adapted module configs 

189 self.append_module_and_dump_agent( 

190 module=baseline_mpc_config, 

191 agent=self.baseline_mpc_agent_config, 

192 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

193 config_name=self.flex_config.baseline_config_generator_data.name_of_created_file, 

194 ) 

195 self.append_module_and_dump_agent( 

196 module=pf_mpc_config, 

197 agent=self.pos_flex_mpc_agent_config, 

198 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

199 config_name=self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file, 

200 ) 

201 self.append_module_and_dump_agent( 

202 module=nf_mpc_config, 

203 agent=self.neg_flex_mpc_agent_config, 

204 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config), 

205 config_name=self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file, 

206 ) 

207 self.append_module_and_dump_agent( 

208 module=indicator_module_config, 

209 agent=self.indicator_agent_config, 

210 module_type=cmng.INDICATOR_CONFIG_TYPE, 

211 config_name=self.indicator_config.name_of_created_file, 

212 ) 

213 if self.flex_config.market_config: 

214 self.append_module_and_dump_agent( 

215 module=market_module_config, 

216 agent=self.market_agent_config, 

217 module_type=cmng.MARKET_CONFIG_TYPE, 

218 config_name=self.market_config.name_of_created_file, 

219 ) 

220 # generate python files for the shadow mpcs 

221 self._generate_flex_model_definition() 

222 

223 # save flex config to created flex files 

224 with open( 

225 os.path.join(self.flex_config.flex_files_directory, self.flex_config_file_name), 

226 "w", 

227 encoding="utf-8", 

228 ) as f: 

229 config_json = self.flex_config.model_dump_json(exclude_defaults=True) 

230 f.write(config_json) 

231 

232 # register the exit function if the corresponding flag is set 

233 if self.flex_config.delete_files: 

234 atexit.register(lambda: self._delete_created_files()) 

235 return self.get_config_file_paths() 

236 

237 def append_module_and_dump_agent( 

238 self, 

239 module: BaseModuleConfig, 

240 agent: AgentConfig, 

241 module_type: str, 

242 config_name: str, 

243 ): 

244 """Append the given module config to the given agent config and 

245 dumps the agent config to a json file. 

246 

247 The json file is named based on the config_name. 

248 

249 Args: 

250 module: The module config to be appended. 

251 agent: The agent config to be updated. 

252 module_type: The type of the module 

253 config_name: The name of the json file for module config (e.g. baseline.json) 

254 

255 """ 

256 # if module is not from the baseline, set a new agent id, based on module id 

257 if module.type is not self.baseline_mpc_module_config.type: 

258 agent.id = module.module_id 

259 # get the module as a dict without default values 

260 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module) 

261 # write given module to agent config 

262 for i, agent_module in enumerate(agent.modules): 

263 if cmng.MODULE_TYPE_DICT[module_type] is cmng.MODULE_TYPE_DICT[agent_module["type"]]: 

264 agent.modules[i] = module_dict 

265 

266 # dump agent config 

267 if agent.modules: 

268 if self.flex_config.overwrite_files: 

269 try: 

270 Path(os.path.join(self.flex_config.flex_files_directory, config_name)).unlink() 

271 except OSError: 

272 pass 

273 with open( 

274 os.path.join(self.flex_config.flex_files_directory, config_name), 

275 "w+", 

276 encoding="utf-8", 

277 ) as f: 

278 module_json = agent.model_dump_json(exclude_defaults=True) 

279 f.write(module_json) 

280 else: 

281 logging.error("Provided agent config does not contain any modules.") 

282 

283 def get_config_file_paths(self) -> list[str]: 

284 """Return a list of paths with the created config files.""" 

285 paths = [ 

286 os.path.join( 

287 self.flex_config.flex_files_directory, 

288 self.flex_config.baseline_config_generator_data.name_of_created_file, 

289 ), 

290 os.path.join( 

291 self.flex_config.flex_files_directory, 

292 self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file, 

293 ), 

294 os.path.join( 

295 self.flex_config.flex_files_directory, 

296 self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file, 

297 ), 

298 os.path.join( 

299 self.flex_config.flex_files_directory, 

300 self.indicator_config.name_of_created_file, 

301 ), 

302 ] 

303 if self.flex_config.market_config: 

304 paths.append( 

305 os.path.join( 

306 self.flex_config.flex_files_directory, 

307 self.market_config.name_of_created_file, 

308 ) 

309 ) 

310 return paths 

311 

312 def _delete_created_files(self): 

313 """Function to run at exit if the files are to be deleted.""" 

314 to_be_deleted = self.get_config_file_paths() 

315 to_be_deleted.append( 

316 os.path.join( 

317 self.flex_config.flex_files_directory, 

318 self.flex_config_file_name, 

319 ) 

320 ) 

321 # delete files 

322 for file in to_be_deleted: 

323 Path(file).unlink() 

324 # also delete folder 

325 Path(self.flex_config.flex_files_directory).rmdir() 

326 

327 def adapt_mpc_module_config( 

328 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str 

329 ) -> MPCConfig: 

330 """Adapt the mpc module config for automated flexibility quantification. 

331 

332 Things adapted among others are: 

333 - the file name/path of the mpc config file 

334 - names of the control variables for the shadow mpcs 

335 - reduce communicated variables of shadow mpcs to outputs 

336 - add the power variable to the outputs 

337 - add parameters for the activation and quantification of flexibility 

338 

339 Args: 

340 module_config: The module config to be adapted 

341 mpc_dataclass: The dataclass corresponding to the type of the MPC module. 

342 It contains all the extra data necessary for flexibility quantification, 

343 which will be used to update the module_config. 

344 agent_id: agent_id for creating the flexquant mpc module config 

345 

346 Returns: 

347 The adapted module config 

348 

349 """ 

350 # allow the module config to be changed 

351 module_config.model_config["frozen"] = False 

352 

353 # set new MPC type 

354 module_config.type = mpc_dataclass.module_types[ 

355 cmng.get_orig_module_type(self.orig_mpc_agent_config) 

356 ] 

357 

358 # set the MPC config type from the MPCConfig in agentlib_mpc to the corresponding one in 

359 # flexquant and add additional fields 

360 module_config_flex_dict = module_config.model_dump() 

361 module_config_flex_dict["casadi_sim_time_step"] = self.flex_config.casadi_sim_time_step 

362 module_config_flex_dict[ 

363 "power_variable_name" 

364 ] = self.flex_config.baseline_config_generator_data.power_variable 

365 module_config_flex_dict[ 

366 "storage_variable_name" 

367 ] = self.indicator_module_config.correct_costs.stored_energy_variable 

368 module_config_flex = cmng.MODULE_TYPE_DICT[module_config.type]( 

369 **module_config_flex_dict, _agent_id=agent_id 

370 ) 

371 

372 # allow the module config to be changed 

373 module_config_flex.model_config["frozen"] = False 

374 

375 module_config_flex.module_id = mpc_dataclass.module_id 

376 

377 # append the new weights as parameter to the MPC or update its value 

378 parameter_dict = {parameter.name: parameter for parameter in module_config_flex.parameters} 

379 for weight in mpc_dataclass.weights: 

380 if weight.name in parameter_dict: 

381 parameter_dict[weight.name].value = weight.value 

382 else: 

383 module_config_flex.parameters.append(weight) 

384 

385 # set new id (needed for plotting) 

386 module_config_flex.module_id = mpc_dataclass.module_id 

387 # update optimization backend to use the created mpc files and classes 

388 module_config_flex.optimization_backend["model"]["type"] = { 

389 "file": os.path.join( 

390 self.flex_config.flex_files_directory, 

391 mpc_dataclass.created_flex_mpcs_file, 

392 ), 

393 "class_name": mpc_dataclass.class_name, 

394 } 

395 # extract filename from results file and update it with suffix and parent directory 

396 result_filename = Path( 

397 module_config_flex.optimization_backend["results_file"] 

398 ).name.replace(".csv", mpc_dataclass.results_suffix) 

399 full_path = self.flex_config.results_directory / result_filename 

400 module_config_flex.optimization_backend["results_file"] = str(full_path) 

401 # change cia backend to custom backend of flexquant 

402 if module_config_flex.optimization_backend["type"] == "casadi_cia": 

403 module_config_flex.optimization_backend["type"] = "casadi_cia_cons" 

404 module_config_flex.optimization_backend["market_time"] = self.flex_config.market_time 

405 

406 # add the full control trajectory output from the baseline as input for the shadow mpcs 

407 if not isinstance(mpc_dataclass, BaselineMPCData): 

408 for control in module_config_flex.controls: 

409 module_config_flex.inputs.append( 

410 MPCVariable( 

411 name=control.name + glbs.full_trajectory_suffix, 

412 value=None, 

413 type="pd.Series", 

414 ) 

415 ) 

416 # change the alias of control variable in shadow mpc to prevent it from triggering 

417 # the wrong callback 

418 control.alias = control.name + glbs.shadow_suffix 

419 # also include binary controls 

420 if hasattr(module_config_flex, "binary_controls"): 

421 for control in module_config_flex.binary_controls: 

422 module_config_flex.inputs.append( 

423 MPCVariable( 

424 name=control.name + glbs.full_trajectory_suffix, 

425 value=None, 

426 type="pd.Series", 

427 ) 

428 ) 

429 # change the alias of control variable in shadow mpc to prevent it from 

430 # triggering the wrong callback 

431 control.alias = control.name + glbs.shadow_suffix 

432 # only communicate outputs for the shadow mpcs 

433 module_config_flex.shared_variable_fields = ["outputs"] 

434 else: 

435 # add full_controls trajectory as AgentVariable to the config of Baseline mpc 

436 for control in module_config_flex.controls: 

437 module_config_flex.full_controls.append( 

438 AgentVariable( 

439 name=control.name + glbs.full_trajectory_suffix, 

440 alias=control.name + glbs.full_trajectory_suffix, 

441 shared=True, 

442 ) 

443 ) 

444 if hasattr(module_config_flex, "binary_controls"): 

445 for binary_controls in module_config_flex.binary_controls: 

446 module_config_flex.full_controls.append( 

447 AgentVariable( 

448 name=binary_controls.name + glbs.full_trajectory_suffix, 

449 alias=binary_controls.name + glbs.full_trajectory_suffix, 

450 shared=True, 

451 ) 

452 ) 

453 module_config_flex.set_outputs = True 

454 # add outputs for the power variables, for easier handling create a lookup dict 

455 output_dict = {output.name: output for output in module_config_flex.outputs} 

456 if self.flex_config.baseline_config_generator_data.power_variable in output_dict: 

457 output_dict[ 

458 self.flex_config.baseline_config_generator_data.power_variable 

459 ].alias = mpc_dataclass.power_alias 

460 else: 

461 module_config_flex.outputs.append( 

462 MPCVariable( 

463 name=self.flex_config.baseline_config_generator_data.power_variable, 

464 alias=mpc_dataclass.power_alias, 

465 ) 

466 ) 

467 # add or change alias for stored energy variable 

468 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

469 output_dict[ 

470 self.indicator_module_config.correct_costs.stored_energy_variable 

471 ].alias = mpc_dataclass.stored_energy_alias 

472 

473 # add extra inputs needed for activation of flex 

474 module_config_flex.inputs.extend(mpc_dataclass.config_inputs_appendix) 

475 # CONFIG_PARAMETERS_APPENDIX only includes dummy values 

476 # overwrite dummy values with values from flex config and append it to module config 

477 for var in mpc_dataclass.config_parameters_appendix: 

478 if var.name in self.flex_config.model_fields: 

479 var.value = getattr(self.flex_config, var.name) 

480 if var.name in self.flex_config.baseline_config_generator_data.model_fields: 

481 var.value = getattr(self.flex_config.baseline_config_generator_data, var.name) 

482 module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix) 

483 

484 # freeze the config again 

485 module_config_flex.model_config["frozen"] = True 

486 

487 return module_config_flex 

488 

489 def adapt_indicator_config( 

490 self, module_config: FlexibilityIndicatorModuleConfig 

491 ) -> FlexibilityIndicatorModuleConfig: 

492 """Adapt the indicator module config for automated flexibility quantification.""" 

493 # append user-defined price var to indicator module config 

494 module_config.inputs.append( 

495 AgentVariable( 

496 name=module_config.price_variable, 

497 unit="ct/kWh", 

498 type="pd.Series", 

499 description="electricity price", 

500 ) 

501 ) 

502 # allow the module config to be changed 

503 module_config.model_config["frozen"] = False 

504 for parameter in module_config.parameters: 

505 if parameter.name == glbs.PREP_TIME: 

506 parameter.value = self.flex_config.prep_time 

507 if parameter.name == glbs.MARKET_TIME: 

508 parameter.value = self.flex_config.market_time 

509 if parameter.name == glbs.FLEX_EVENT_DURATION: 

510 parameter.value = self.flex_config.flex_event_duration 

511 if parameter.name == glbs.TIME_STEP: 

512 parameter.value = self.baseline_mpc_module_config.time_step 

513 if parameter.name == glbs.PREDICTION_HORIZON: 

514 parameter.value = self.baseline_mpc_module_config.prediction_horizon 

515 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

516 discretization_options = self.baseline_mpc_module_config.optimization_backend[ 

517 "discretization_options" 

518 ] 

519 parameter.value = self.get_collocation_time_grid( 

520 discretization_options=discretization_options 

521 ) 

522 # set power unit 

523 module_config.power_unit = self.flex_config.baseline_config_generator_data.power_unit 

524 module_config.results_file = ( 

525 self.flex_config.results_directory / module_config.results_file.name 

526 ) 

527 module_config.model_config["frozen"] = True 

528 return module_config 

529 

530 def adapt_market_config( 

531 self, module_config: FlexibilityMarketModuleConfig 

532 ) -> FlexibilityMarketModuleConfig: 

533 """Adapt the market module config for automated flexibility quantification.""" 

534 # allow the module config to be changed 

535 module_config.model_config["frozen"] = False 

536 for field in module_config.__fields__: 

537 if field in self.market_module_config.__fields__.keys(): 

538 module_config.__setattr__(field, getattr(self.market_module_config, field)) 

539 module_config.results_file = ( 

540 self.flex_config.results_directory / module_config.results_file.name 

541 ) 

542 for parameter in module_config.parameters: 

543 if parameter.name == glbs.COLLOCATION_TIME_GRID: 

544 discretization_options = self.baseline_mpc_module_config.optimization_backend[ 

545 "discretization_options" 

546 ] 

547 parameter.value = self.get_collocation_time_grid( 

548 discretization_options=discretization_options 

549 ) 

550 if parameter.name == glbs.TIME_STEP: 

551 parameter.value = self.baseline_mpc_module_config.time_step 

552 module_config.model_config["frozen"] = True 

553 return module_config 

554 

555 def get_collocation_time_grid(self, discretization_options: dict): 

556 """Get the mpc output collocation grid over the horizon""" 

557 # get the mpc time grid configuration 

558 time_step = self.baseline_mpc_module_config.time_step 

559 prediction_horizon = self.baseline_mpc_module_config.prediction_horizon 

560 # get the collocation configuration 

561 collocation_method = discretization_options["collocation_method"] 

562 collocation_order = discretization_options["collocation_order"] 

563 # get the collocation points 

564 options = CasadiDiscretizationOptions( 

565 collocation_order=collocation_order, collocation_method=collocation_method 

566 ) 

567 collocation_points = DirectCollocation(options=options)._collocation_polynomial().root 

568 # compute the mpc output collocation grid 

569 discretization_points = np.arange(0, time_step * prediction_horizon, time_step) 

570 collocation_time_grid = ( 

571 discretization_points[:, None] + collocation_points * time_step 

572 ).ravel() 

573 collocation_time_grid = collocation_time_grid[ 

574 ~np.isin(collocation_time_grid, discretization_points) 

575 ] 

576 collocation_time_grid = collocation_time_grid.tolist() 

577 return collocation_time_grid 

578 

579 def _generate_flex_model_definition(self): 

580 """Generate a python module for negative and positive flexibility agents 

581 from the Baseline MPC model.""" 

582 output_file = os.path.join( 

583 self.flex_config.flex_files_directory, 

584 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

585 ) 

586 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"] 

587 

588 # Extract the config class of the casadi model to check cost functions 

589 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"] 

590 config_instance = config_class() 

591 self.check_variables_in_casadi_config( 

592 config_instance, 

593 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function, 

594 ) 

595 self.check_variables_in_casadi_config( 

596 config_instance, 

597 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function, 

598 ) 

599 

600 # parse mpc python file 

601 with open(opt_backend["file"], "r", encoding="utf-8") as f: 

602 source = f.read() 

603 tree = ast.parse(source) 

604 

605 # create modifiers for python file 

606 modifier_base = SetupSystemModifier( 

607 mpc_data=self.flex_config.baseline_config_generator_data, 

608 controls=self.baseline_mpc_module_config.controls, 

609 binary_controls=self.baseline_mpc_module_config.binary_controls 

610 if hasattr(self.baseline_mpc_module_config, "binary_controls") 

611 else None, 

612 ) 

613 modifier_pos = SetupSystemModifier( 

614 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex, 

615 controls=self.pos_flex_mpc_module_config.controls, 

616 binary_controls=self.pos_flex_mpc_module_config.binary_controls 

617 if hasattr(self.pos_flex_mpc_module_config, "binary_controls") 

618 else None, 

619 ) 

620 modifier_neg = SetupSystemModifier( 

621 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex, 

622 controls=self.neg_flex_mpc_module_config.controls, 

623 binary_controls=self.neg_flex_mpc_module_config.binary_controls 

624 if hasattr(self.neg_flex_mpc_module_config, "binary_controls") 

625 else None, 

626 ) 

627 # run the modification 

628 modified_tree_base = modifier_base.visit(deepcopy(tree)) 

629 modified_tree_pos = modifier_pos.visit(deepcopy(tree)) 

630 modified_tree_neg = modifier_neg.visit(deepcopy(tree)) 

631 # combine modifications to one file 

632 modified_tree = ast.Module(body=[], type_ignores=[]) 

633 modified_tree.body.extend( 

634 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body 

635 ) 

636 modified_source = astor.to_source(modified_tree) 

637 # Use black to format the generated code 

638 formatted_code = black.format_str(modified_source, mode=black.FileMode()) 

639 

640 if self.flex_config.overwrite_files: 

641 try: 

642 Path( 

643 os.path.join( 

644 self.flex_config.flex_files_directory, 

645 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file, 

646 ) 

647 ).unlink() 

648 except OSError: 

649 pass 

650 

651 with open(output_file, "w", encoding="utf-8") as f: 

652 f.write(formatted_code) 

653 

654 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str): 

655 """Check if all variables in the expression are defined in the config. 

656 

657 Args: 

658 config: casadi model config. 

659 expr: The expression to check. 

660 

661 Raises: 

662 ValueError: If any variable in the expression is not defined in the config. 

663 

664 """ 

665 variables_in_config = set(config.get_variable_names()) 

666 variables_in_cost_function = set(ast.walk(ast.parse(expr))) 

667 variables_in_cost_function = { 

668 node.attr for node in variables_in_cost_function if isinstance(node, ast.Attribute) 

669 } 

670 variables_newly_created = set( 

671 weight.name for weight in self.flex_config.shadow_mpc_config_generator_data.weights 

672 ) 

673 unknown_vars = variables_in_cost_function - variables_in_config - variables_newly_created 

674 if unknown_vars: 

675 raise ValueError(f"Unknown variables in new cost function: {unknown_vars}") 

676 

677 def run_config_validations(self): 

678 """Function to validate integrity of user-supplied flex config. 

679 

680 Since the validation depends on interactions between multiple configurations, it is 

681 performed within this function rather than using Pydantic’s built-in validators for 

682 individual configurations. 

683 

684 The following checks are performed: 

685 1. Ensures the specified power variable exists in the MPC model outputs. 

686 2. Ensures the specified comfort variable exists in the MPC model states. 

687 3. Validates that the stored energy variable exists in MPC outputs if 

688 energy cost correction is enabled. 

689 4. Verifies the supported collocation method is used; otherwise, 

690 switches to 'legendre' and raises a warning. 

691 5. Ensures that the sum of prep time, market time, and flex event duration 

692 does not exceed the prediction horizon. 

693 6. Ensures market time equals the MPC model time step if market config is present. 

694 7. Ensures that all flex time values are multiples of the MPC model time step. 

695 8. Checks for mismatches between time-related parameters in the flex/MPC and 

696 indicator configs and issues warnings 

697 when discrepancies exist, using the flex/MPC config values as the source of truth. 

698 

699 """ 

700 # check if the power variable exists in the mpc config 

701 power_var = self.flex_config.baseline_config_generator_data.power_variable 

702 if power_var not in [output.name for output in self.baseline_mpc_module_config.outputs]: 

703 raise ConfigurationError( 

704 f"Given power variable {power_var} is not defined " 

705 f"as output in baseline mpc config." 

706 ) 

707 

708 # check if the comfort variable exists in the mpc slack variables 

709 if self.flex_config.baseline_config_generator_data.comfort_variable: 

710 file_path = self.baseline_mpc_module_config.optimization_backend["model"]["type"][ 

711 "file" 

712 ] 

713 class_name = self.baseline_mpc_module_config.optimization_backend["model"]["type"][ 

714 "class_name" 

715 ] 

716 # Get the class 

717 dynamic_class = cmng.get_class_from_file(file_path, class_name) 

718 if self.flex_config.baseline_config_generator_data.comfort_variable not in [ 

719 state.name for state in dynamic_class().states 

720 ]: 

721 raise ConfigurationError( 

722 f"Given comfort variable " 

723 f"{self.flex_config.baseline_config_generator_data.comfort_variable} " 

724 f"is not defined as state in baseline mpc config." 

725 ) 

726 

727 # check if the energy storage variable exists in the mpc config 

728 if self.indicator_module_config.correct_costs.enable_energy_costs_correction: 

729 if self.indicator_module_config.correct_costs.stored_energy_variable not in [ 

730 output.name for output in self.baseline_mpc_module_config.outputs 

731 ]: 

732 raise ConfigurationError( 

733 f"The stored energy variable " 

734 f"{self.indicator_module_config.correct_costs.stored_energy_variable} " 

735 f"is not defined in baseline mpc config. " 

736 f"It must be defined in the base MPC model and config as output " 

737 f"if the correction of costs is enabled." 

738 ) 

739 

740 # raise warning if unsupported collocation method is used and change to supported method 

741 if ( 

742 "collocation_method" 

743 not in self.baseline_mpc_module_config.optimization_backend["discretization_options"] 

744 ): 

745 raise ConfigurationError( 

746 "Please use collocation as discretization method and define the collocation_method " 

747 "in the mpc config" 

748 ) 

749 else: 

750 collocation_method = self.baseline_mpc_module_config.optimization_backend[ 

751 "discretization_options" 

752 ]["collocation_method"] 

753 if collocation_method != "legendre": 

754 self.logger.warning( 

755 "Collocation method %s is not supported. Switching to method legendre.", 

756 collocation_method, 

757 ) 

758 self.baseline_mpc_module_config.optimization_backend["discretization_options"][ 

759 "collocation_method" 

760 ] = "legendre" 

761 self.pos_flex_mpc_module_config.optimization_backend["discretization_options"][ 

762 "collocation_method" 

763 ] = "legendre" 

764 self.neg_flex_mpc_module_config.optimization_backend["discretization_options"][ 

765 "collocation_method" 

766 ] = "legendre" 

767 

768 # time data validations 

769 flex_times = { 

770 glbs.PREP_TIME: self.flex_config.prep_time, 

771 glbs.MARKET_TIME: self.flex_config.market_time, 

772 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration, 

773 } 

774 mpc_times = { 

775 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step, 

776 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon, 

777 } 

778 # total time length check (prep+market+flex_event) 

779 if sum(flex_times.values()) > mpc_times["time_step"] * mpc_times["prediction_horizon"]: 

780 raise ConfigurationError( 

781 "Market time + prep time + flex event duration " 

782 "can not exceed the prediction horizon." 

783 ) 

784 # market time val check 

785 if self.flex_config.market_config: 

786 if flex_times["market_time"] % mpc_times["time_step"] != 0: 

787 raise ConfigurationError( 

788 "Market time must be an integer multiple of the time step." 

789 ) 

790 # check for divisibility of flex_times by time_step 

791 for name, value in flex_times.items(): 

792 if value % mpc_times["time_step"] != 0: 

793 raise ConfigurationError( 

794 f"{name} is not a multiple of the time step. Please redefine." 

795 ) 

796 # raise warning if parameter value in flex indicator module config differs from 

797 # value in flex config/ baseline mpc module config 

798 for parameter in self.indicator_module_config.parameters: 

799 if parameter.value is not None: 

800 if parameter.name in flex_times: 

801 flex_value = flex_times[parameter.name] 

802 if parameter.value != flex_value: 

803 self.logger.warning( 

804 "Value mismatch for %s in flex config (field) " 

805 "and indicator module config (parameter). " 

806 "Flex config value will be used.", 

807 parameter.name, 

808 ) 

809 elif parameter.name in mpc_times: 

810 mpc_value = mpc_times[parameter.name] 

811 if parameter.value != mpc_value: 

812 self.logger.warning( 

813 "Value mismatch for %s in baseline MPC module " 

814 "config (field) and indicator module config (parameter). " 

815 "Baseline MPC module config value will be used.", 

816 parameter.name, 

817 ) 

818 

819 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path]) -> dict: 

820 """ 

821 Optional helper function to adapt file path for simulator results in sim config, 

822 so that sim results land in the same results directory as flex results. 

823 

824 Args: 

825 simulator_agent_config: Path to the simulator agent config JSON file. 

826 

827 Returns: 

828 The updated simulator config dictionary with the modified result file path. 

829 

830 Raises: 

831 FileNotFoundError: If the specified config file does not exist. 

832 

833 """ 

834 # open config and extract sim module 

835 with open(simulator_agent_config, "r", encoding="utf-8") as f: 

836 sim_config = json.load(f) 

837 sim_module_config = next( 

838 (module for module in sim_config["modules"] if module["type"] == "simulator"), 

839 None, 

840 ) 

841 # convert filename string to path and extract the name 

842 sim_file_name = Path(sim_module_config["result_filename"]).name 

843 # set results path so that sim results lands in same directory as flex result CSVs 

844 sim_module_config["result_filename"] = str( 

845 self.flex_config.results_directory / sim_file_name 

846 ) 

847 return sim_config