Coverage for agentlib_flexquant/generate_flex_agents.py: 90%
339 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
1"""Generate agents for flexibility quantification.
3This module provides the FlexAgentGenerator class that creates and configures
4flexibility agents.
5The agents created include the baseline, positive and negative flexibility agents,
6the flexibility indicator and market agents. The agents are created based on the
7flex config and the MPC config.
8"""
9import ast
10import atexit
11import inspect
12import logging
13import os
15import astor
16import black
17import json
18import numpy as np
19from copy import deepcopy
20from pathlib import Path
21from typing import Union
22from pydantic import FilePath
23from agentlib.core.agent import AgentConfig
24from agentlib.core.datamodels import AgentVariable
25from agentlib.core.errors import ConfigurationError
26from agentlib.core.module import BaseModuleConfig
27from agentlib.utils import custom_injection, load_config
28from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable
29from agentlib_mpc.models.casadi_model import CasadiModelConfig
30from agentlib_mpc.modules.mpc.mpc_full import MPCConfig
32from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation
33from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions
34import agentlib_flexquant.data_structures.globals as glbs
35import agentlib_flexquant.utils.config_management as cmng
36from agentlib_flexquant.utils.parsing import SetupSystemModifier
37from agentlib_flexquant.data_structures.flexquant import (
38 FlexibilityIndicatorConfig,
39 FlexibilityMarketConfig,
40 FlexQuantConfig,
41)
42from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData
43from agentlib_flexquant.modules.flexibility_indicator import (
44 FlexibilityIndicatorModuleConfig,
45)
46from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
49class FlexAgentGenerator:
50 """Class for generating the flex agents
52 orig_mpc_module_config: the config for the original mpc,
53 which has nothing to do with the flexibility quantification
54 baseline_mpc_module_config: the config for the baseline mpc
55 for flexibility quantification
56 pos_flex_mpc_module_config: the config for the positive flexibility mpc
57 for flexibility quantification
58 neg_flex_mpc_module_config: the config for the negative flexibility mpc
59 for flexibility quantification
60 indicator_module_config: the config for the indicator for flexibility quantification
61 market_module_config: the config for the market for flexibility quantification
63 """
65 orig_mpc_module_config: MPCConfig
66 baseline_mpc_module_config: MPCConfig
67 pos_flex_mpc_module_config: MPCConfig
68 neg_flex_mpc_module_config: MPCConfig
69 indicator_module_config: FlexibilityIndicatorModuleConfig
70 market_module_config: FlexibilityMarketModuleConfig
72 def __init__(
73 self,
74 flex_config: Union[str, FilePath, FlexQuantConfig],
75 mpc_agent_config: Union[str, FilePath, AgentConfig],
76 ):
77 self.logger = logging.getLogger(__name__)
79 if isinstance(flex_config, str or FilePath):
80 self.flex_config_file_name = os.path.basename(flex_config)
81 else:
82 # provide default name for json
83 self.flex_config_file_name = "flex_config.json"
84 # load configs
85 self.flex_config = load_config.load_config(flex_config,
86 config_type=FlexQuantConfig)
88 # original mpc agent
89 self.orig_mpc_agent_config = load_config.load_config(
90 mpc_agent_config, config_type=AgentConfig
91 )
92 # baseline agent
93 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
94 self.baseline_mpc_agent_config.id = (self.flex_config.
95 baseline_config_generator_data.agent_id)
96 # pos agent
97 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
98 self.pos_flex_mpc_agent_config.id = (self.flex_config.
99 shadow_mpc_config_generator_data.
100 pos_flex.agent_id)
101 # neg agent
102 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
103 self.neg_flex_mpc_agent_config.id = (self.flex_config.
104 shadow_mpc_config_generator_data.
105 neg_flex.agent_id)
107 # original mpc module
108 self.orig_mpc_module_config = cmng.get_module(
109 config=self.orig_mpc_agent_config,
110 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
111 )
112 # baseline module
113 self.baseline_mpc_module_config = cmng.get_module(
114 config=self.baseline_mpc_agent_config,
115 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
116 )
117 # convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional
118 # fields not present in the original
119 self.baseline_mpc_module_config = cmng.get_flex_mpc_module_config(
120 agent_config=self.baseline_mpc_agent_config,
121 module_config=self.baseline_mpc_module_config,
122 module_type=self.flex_config.baseline_config_generator_data.module_types[
123 self.baseline_mpc_module_config.type
124 ]
125 )
126 # pos module
127 self.pos_flex_mpc_module_config = cmng.get_module(
128 config=self.pos_flex_mpc_agent_config,
129 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
130 )
131 # neg module
132 self.neg_flex_mpc_module_config = cmng.get_module(
133 config=self.neg_flex_mpc_agent_config,
134 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
135 )
136 # load indicator config
137 self.indicator_config = load_config.load_config(
138 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig
139 )
140 # load indicator module config
141 self.indicator_agent_config = load_config.load_config(
142 self.indicator_config.agent_config, config_type=AgentConfig
143 )
144 self.indicator_module_config = cmng.get_module(
145 config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE
146 )
147 # load market config
148 if self.flex_config.market_config:
149 self.market_config = load_config.load_config(
150 self.flex_config.market_config, config_type=FlexibilityMarketConfig
151 )
152 # load market module config
153 self.market_agent_config = load_config.load_config(
154 self.market_config.agent_config, config_type=AgentConfig
155 )
156 self.market_module_config = cmng.get_module(
157 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
158 )
159 else:
160 self.flex_config.market_time = 0
162 self.run_config_validations()
164 def generate_flex_agents(self) -> list[str]:
165 """Generate the configs and the python module for the flexibility agents.
167 Returns:
168 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator
169 and market config
171 """
172 # adapt modules to include necessary communication variables
173 baseline_mpc_config = self.adapt_mpc_module_config(
174 module_config=self.baseline_mpc_module_config,
175 mpc_dataclass=self.flex_config.baseline_config_generator_data,
176 agent_id=self.flex_config.baseline_config_generator_data.agent_id,
177 )
178 pf_mpc_config = self.adapt_mpc_module_config(
179 module_config=self.pos_flex_mpc_module_config,
180 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
181 agent_id=self.flex_config.shadow_mpc_config_generator_data.pos_flex.agent_id,
182 )
183 nf_mpc_config = self.adapt_mpc_module_config(
184 module_config=self.neg_flex_mpc_module_config,
185 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
186 agent_id=self.flex_config.shadow_mpc_config_generator_data.neg_flex.agent_id,
187 )
188 indicator_module_config = self.adapt_indicator_module_config(
189 module_config=self.indicator_module_config
190 )
191 if self.flex_config.market_config:
192 market_module_config = self.adapt_market_module_config(
193 module_config=self.market_module_config
194 )
196 # dump jsons of the agents including the adapted module configs
197 self.append_module_and_dump_agent(
198 module=baseline_mpc_config,
199 agent=self.baseline_mpc_agent_config,
200 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
201 config_name=self.flex_config.baseline_config_generator_data.
202 name_of_created_file,
203 )
204 self.append_module_and_dump_agent(
205 module=pf_mpc_config,
206 agent=self.pos_flex_mpc_agent_config,
207 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
208 config_name=self.flex_config.shadow_mpc_config_generator_data.
209 pos_flex.name_of_created_file,
210 )
211 self.append_module_and_dump_agent(
212 module=nf_mpc_config,
213 agent=self.neg_flex_mpc_agent_config,
214 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
215 config_name=self.flex_config.shadow_mpc_config_generator_data.
216 neg_flex.name_of_created_file,
217 )
218 self.append_module_and_dump_agent(
219 module=indicator_module_config,
220 agent=self.indicator_agent_config,
221 module_type=cmng.INDICATOR_CONFIG_TYPE,
222 config_name=self.indicator_config.name_of_created_file,
223 )
224 if self.flex_config.market_config:
225 self.append_module_and_dump_agent(
226 module=market_module_config,
227 agent=self.market_agent_config,
228 module_type=cmng.MARKET_CONFIG_TYPE,
229 config_name=self.market_config.name_of_created_file,
230 )
231 # generate python files for the shadow mpcs
232 self._generate_flex_model_definition()
234 # add new paths to flex config and dump it
235 self.adapt_and_dump_flex_config()
237 # register the exit function if the corresponding flag is set
238 if self.flex_config.delete_files:
239 atexit.register(lambda: self._delete_created_files())
241 return self.get_config_file_paths()
243 def append_module_and_dump_agent(
244 self,
245 module: BaseModuleConfig,
246 agent: AgentConfig,
247 module_type: str,
248 config_name: str,
249 ):
250 """Append the given module config to the given agent config and
251 dumps the agent config to a json file.
253 The json file is named based on the config_name.
255 Args:
256 module: The module config to be appended.
257 agent: The agent config to be updated.
258 module_type: The type of the module
259 config_name: The name of the json file for module config (e.g. baseline.json)
261 """
262 # get the module as a dict without default values
263 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module)
264 # write given module to agent config
265 for i, agent_module in enumerate(agent.modules):
266 if cmng.MODULE_TYPE_DICT[module_type] is cmng.MODULE_TYPE_DICT[
267 agent_module["type"]]:
268 agent.modules[i] = module_dict
270 # dump agent config
271 if agent.modules:
272 if self.flex_config.overwrite_files:
273 try:
274 Path(os.path.join(self.flex_config.flex_files_directory,
275 config_name)).unlink()
276 except OSError:
277 pass
278 with open(
279 os.path.join(self.flex_config.flex_files_directory, config_name),
280 "w+",
281 encoding="utf-8",
282 ) as f:
283 module_json = agent.model_dump_json(exclude_defaults=True)
284 f.write(module_json)
285 else:
286 logging.error("Provided agent config does not contain any modules.")
288 def get_config_file_paths(self) -> list[str]:
289 """Return a list of paths with the created config files."""
290 paths = [
291 os.path.join(
292 self.flex_config.flex_files_directory,
293 self.flex_config.baseline_config_generator_data.
294 name_of_created_file,
295 ),
296 os.path.join(
297 self.flex_config.flex_files_directory,
298 self.flex_config.shadow_mpc_config_generator_data.pos_flex.
299 name_of_created_file,
300 ),
301 os.path.join(
302 self.flex_config.flex_files_directory,
303 self.flex_config.shadow_mpc_config_generator_data.neg_flex.
304 name_of_created_file,
305 ),
306 os.path.join(
307 self.flex_config.flex_files_directory,
308 self.indicator_config.name_of_created_file,
309 ),
310 ]
311 if self.flex_config.market_config:
312 paths.append(
313 os.path.join(
314 self.flex_config.flex_files_directory,
315 self.market_config.name_of_created_file,
316 )
317 )
318 return paths
320 def _delete_created_files(self):
321 """Function to run at exit if the files are to be deleted."""
322 to_be_deleted = self.get_config_file_paths()
323 to_be_deleted.append(
324 os.path.join(
325 self.flex_config.flex_files_directory,
326 self.flex_config_file_name,
327 )
328 )
329 # delete files
330 for file in to_be_deleted:
331 Path(file).unlink()
332 # also delete folder
333 Path(self.flex_config.flex_files_directory).rmdir()
335 def adapt_mpc_module_config(
336 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str
337 ) -> MPCConfig:
338 """Adapt the mpc module config for automated flexibility quantification.
340 Things adapted among others are:
341 - the file name/path of the mpc config file
342 - names of the control variables for the shadow mpcs
343 - reduce communicated variables of shadow mpcs to outputs
344 - add the power variable to the outputs
345 - add parameters for the activation and quantification of flexibility
347 Args:
348 module_config: The module config to be adapted
349 mpc_dataclass: The dataclass corresponding to the type of the MPC module.
350 It contains all the extra data necessary for flexibility
351 quantification, which will be used to update the
352 module_config.
353 agent_id: agent_id for creating the FlexQuant mpc module config
355 Returns:
356 The adapted module config
358 """
359 # allow the module config to be changed
360 module_config.model_config["frozen"] = False
362 # set new MPC type
363 module_config.type = mpc_dataclass.module_types[
364 cmng.get_orig_module_type(self.orig_mpc_agent_config)
365 ]
367 # set the MPC config type from the MPCConfig in agentlib_mpc to the
368 # corresponding one in flexquant and add additional fields
369 module_config_flex_dict = module_config.model_dump()
370 module_config_flex_dict["casadi_sim_time_step"] = (
371 self.flex_config.casadi_sim_time_step)
372 module_config_flex_dict["power_variable_name"] = (
373 self.flex_config.baseline_config_generator_data.power_variable)
374 module_config_flex_dict["storage_variable_name"] = (
375 self.indicator_module_config.correct_costs.stored_energy_variable)
376 module_config_flex = cmng.MODULE_TYPE_DICT[module_config.type](
377 **module_config_flex_dict, _agent_id=agent_id
378 )
380 # HOTFIX due to AgentLib-MPC bug. Needs to be adapted after Objectives
381 # in AgentLib-MPC are fixed.
382 if module_config_flex.r_del_u is None:
383 module_config_flex = module_config_flex.model_copy(update={"r_del_u": {}})
385 # allow the module config to be changed
386 module_config_flex.model_config["frozen"] = False
388 module_config_flex.module_id = mpc_dataclass.module_id
390 # set new id (needed for plotting)
391 module_config_flex.module_id = mpc_dataclass.module_id
392 # update optimization backend to use the created mpc files and classes
393 module_config_flex.optimization_backend["model"]["type"] = {
394 "file": os.path.join(
395 self.flex_config.flex_files_directory,
396 mpc_dataclass.created_flex_mpcs_file,
397 ),
398 "class_name": mpc_dataclass.class_name,
399 }
400 # extract filename from results file and update it with
401 # suffix and parent directory
402 result_filename = Path(
403 module_config_flex.optimization_backend["results_file"]
404 ).name.replace(".csv", mpc_dataclass.results_suffix)
405 full_path = self.flex_config.results_directory / result_filename
406 module_config_flex.optimization_backend["results_file"] = str(full_path)
407 # change cia backend to custom backend of flexquant
408 if module_config_flex.optimization_backend["type"] == "casadi_cia":
409 module_config_flex.optimization_backend["type"] = \
410 "agentlib_flexquant.casadi_cia_cons"
411 if (module_config_flex.optimization_backend["type"] ==
412 "agentlib_flexquant.casadi_cia_cons"):
413 module_config_flex.optimization_backend["market_time"] = (
414 self.flex_config.market_time)
416 # add the full control trajectory output from the baseline as input for the
417 # shadow mpcs, they are directly included in the optimization problem
418 if not isinstance(mpc_dataclass, BaselineMPCData):
419 for control in module_config_flex.controls:
420 module_config_flex.inputs.append(
421 MPCVariable(
422 name=control.name + glbs.full_trajectory_suffix,
423 value=None,
424 type="pd.Series",
425 )
426 )
427 # add full control names to shadow MPC config for inputs tracking
428 module_config_flex.full_control_names.append(
429 control.name + glbs.full_trajectory_suffix)
430 # change the alias of control variable in shadow mpc to
431 # prevent it from triggering the wrong callback
432 control.alias = control.name + glbs.shadow_suffix
433 # also include binary controls
434 if hasattr(module_config_flex, "binary_controls"):
435 for control in module_config_flex.binary_controls:
436 module_config_flex.inputs.append(
437 MPCVariable(
438 name=control.name + glbs.full_trajectory_suffix,
439 value=None,
440 type="pd.Series",
441 )
442 )
443 # add full control names to shadow MPC config for inputs tracking
444 module_config_flex.full_control_names.append(
445 control.name + glbs.full_trajectory_suffix)
446 # change the alias of control variable in shadow mpc to
447 # prevent it from triggering the wrong callback
448 control.alias = control.name + glbs.shadow_suffix
449 # only communicate outputs for the shadow mpcs
450 module_config_flex.shared_variable_fields = ["outputs"]
452 # In addition to creating the full control variables, the inputs
453 # and states of the Baseline are communicated to the Shadow MPC
454 # to ensure synchronisation. Therefore, all inputs and states of
455 # the Baseline are added to the Shadow MPCs with an alias
456 baseline_names = {inp.name for inp in
457 self.baseline_mpc_module_config.inputs}
458 for i, input in enumerate(module_config_flex.inputs):
459 if input.name in baseline_names:
460 module_config_flex.inputs[i].alias = (
461 input.name + glbs.base_vars_to_communicate_suffix)
463 # add Baseline input names to shadow MPC config for inputs tracking
464 # if Baseline variable is also set in config_inputs_appendix this is
465 # due to overwriting the alias, so variable should not be added here
466 appendix_names = {inp.name for inp in mpc_dataclass.config_inputs_appendix}
467 module_config_flex.baseline_input_names = [
468 input.name + glbs.base_vars_to_communicate_suffix for input in
469 self.baseline_mpc_module_config.inputs
470 if input.name not in appendix_names]
472 # add custom input names for the shadow MPC to track. Here, the
473 # communication suffix is not added, as the user is free to define
474 # custom inputs as desired.
475 # Exclude in_provision, as this is not regularly set and would prevent
476 # the do_step of the shadow MPC.
477 module_config_flex.custom_input_names.extend([
478 {"name": input.name, "alias": input.alias}
479 for input in mpc_dataclass.config_inputs_appendix
480 if input.name not in [glbs.PROVISION_VAR_NAME]
481 ])
483 for i, state in enumerate(module_config_flex.states):
484 if state in self.baseline_mpc_module_config.states:
485 module_config_flex.states[i].alias = (
486 state.name + glbs.base_vars_to_communicate_suffix)
487 # add Baseline state names to shadow MPC config for inputs tracking
488 module_config_flex.baseline_state_names = [
489 state.name + glbs.base_vars_to_communicate_suffix for state in
490 self.baseline_mpc_module_config.states]
491 module_config_flex.baseline_agent_id = (
492 self.flex_config.baseline_config_generator_data.agent_id)
494 else:
495 # all the variables here are added to the custom MPCConfig of
496 # FlexQuant to avoid them being added to the optimization problem
497 # add full_controls trajectory as AgentVariable to the config of
498 # Baseline mpc
499 for control in module_config_flex.controls:
500 module_config_flex.full_controls.append(
501 AgentVariable(
502 name=control.name + glbs.full_trajectory_suffix,
503 alias=control.name + glbs.full_trajectory_suffix,
504 shared=True,
505 )
506 )
507 if hasattr(module_config_flex, "binary_controls"):
508 for binary_controls in module_config_flex.binary_controls:
509 module_config_flex.full_controls.append(
510 AgentVariable(
511 name=binary_controls.name + glbs.full_trajectory_suffix,
512 alias=binary_controls.name + glbs.full_trajectory_suffix,
513 shared=True,
514 )
515 )
516 # add full controls to custom cia backend to constrain
517 # during market time
518 if (module_config_flex.optimization_backend["type"] ==
519 "agentlib_flexquant.casadi_cia_cons"):
520 module_config_flex.optimization_backend["full_controls_dict"] = (
521 dict(zip([var.name for var in module_config_flex.full_controls],
522 [None] * len(module_config_flex.full_controls))
523 ))
524 # add input and states copy variables which send the Baseline inputs
525 # to the shadow MPC
526 for input in module_config_flex.inputs:
527 module_config_flex.vars_to_communicate.append(
528 AgentVariable(
529 name=input.name + glbs.base_vars_to_communicate_suffix,
530 alias=input.name + glbs.base_vars_to_communicate_suffix,
531 shared=True,
532 )
533 )
534 for state in module_config_flex.states:
535 module_config_flex.vars_to_communicate.append(
536 AgentVariable(
537 name=state.name + glbs.base_vars_to_communicate_suffix,
538 alias=state.name + glbs.base_vars_to_communicate_suffix,
539 shared=True,
540 )
541 )
543 module_config_flex.set_outputs = True
544 # add outputs for the power variables, for easier handling create a lookup dict
545 output_dict = {output.name: output for output in module_config_flex.outputs}
546 if self.flex_config.baseline_config_generator_data.power_variable in output_dict:
547 output_dict[
548 self.flex_config.baseline_config_generator_data.power_variable
549 ].alias = mpc_dataclass.power_alias
550 else:
551 module_config_flex.outputs.append(
552 MPCVariable(
553 name=self.flex_config.baseline_config_generator_data.power_variable,
554 alias=mpc_dataclass.power_alias,
555 )
556 )
557 # add or change alias for stored energy variable
558 if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
559 output_dict[
560 self.indicator_module_config.correct_costs.stored_energy_variable
561 ].alias = mpc_dataclass.stored_energy_alias
563 # add extra inputs needed for activation of flex or custom cost functions
564 existing_input_names = {inp.name: idx for idx, inp in
565 enumerate(module_config_flex.inputs)}
566 for appendix_inp in mpc_dataclass.config_inputs_appendix.copy():
567 # If variable already exists in the config
568 if appendix_inp.name in existing_input_names:
569 self.logger.warning(f"The given variable {appendix_inp.name} in the "
570 f"config_inputs_appendix already exists in the MPC "
571 f"model. I am updating the alias of the existing "
572 f"variable to {appendix_inp.alias} (provided by you). "
573 f"However, this can still cause issues down the line "
574 f"if the alias is not chosen wisely.")
575 # Update only the alias of the existing input
576 idx = existing_input_names[appendix_inp.name]
577 existing_inp = module_config_flex.inputs[idx]
578 inp_dict = existing_inp.dict()
579 inp_dict["alias"] = appendix_inp.alias
580 module_config_flex.inputs[idx] = type(existing_inp)(**inp_dict)
581 # Remove variable from appendix list to avoid creation during parsing
582 mpc_dataclass.config_inputs_appendix.remove(appendix_inp)
583 else:
584 # Add the new input
585 module_config_flex.inputs.append(appendix_inp)
587 # add extra parameters needed for activation of flex or custom weights
588 for var in mpc_dataclass.config_parameters_appendix:
589 if var.name in self.flex_config.model_fields:
590 var.value = getattr(self.flex_config, var.name)
591 if var.name in self.flex_config.baseline_config_generator_data.model_fields:
592 var.value = getattr(self.flex_config.baseline_config_generator_data,
593 var.name)
594 module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix)
596 # freeze the config again
597 module_config_flex.model_config["frozen"] = True
599 return module_config_flex
601 def adapt_indicator_module_config(
602 self, module_config: FlexibilityIndicatorModuleConfig
603 ) -> FlexibilityIndicatorModuleConfig:
604 """Adapt the indicator module config for automated flexibility
605 quantification.
607 """
608 # append user-defined price var to indicator module config
609 module_config.inputs.append(
610 AgentVariable(
611 name=module_config.price_variable,
612 unit="ct/kWh",
613 type="pd.Series",
614 description="electricity price",
615 )
616 )
617 module_config.inputs.append(
618 AgentVariable(
619 name=module_config.price_variable_feed_in,
620 unit="ct/kWh",
621 type="pd.Series",
622 description="electricity feed-in price",
623 )
624 )
625 # allow the module config to be changed
626 module_config.model_config["frozen"] = False
627 for parameter in module_config.parameters:
628 if parameter.name == glbs.PREP_TIME:
629 parameter.value = self.flex_config.prep_time
630 if parameter.name == glbs.MARKET_TIME:
631 parameter.value = self.flex_config.market_time
632 if parameter.name == glbs.FLEX_EVENT_DURATION:
633 parameter.value = self.flex_config.flex_event_duration
634 if parameter.name == glbs.TIME_STEP:
635 parameter.value = self.baseline_mpc_module_config.time_step
636 if parameter.name == glbs.PREDICTION_HORIZON:
637 parameter.value = self.baseline_mpc_module_config.prediction_horizon
638 if parameter.name == glbs.COLLOCATION_TIME_GRID:
639 dis_op = self.baseline_mpc_module_config.optimization_backend[
640 "discretization_options"
641 ]
642 parameter.value = self.get_collocation_time_grid(
643 discretization_options=dis_op
644 )
645 # set power unit
646 module_config.power_unit = (
647 self.flex_config.baseline_config_generator_data.power_unit)
648 module_config.results_file = (
649 self.flex_config.results_directory / module_config.results_file.name
650 )
651 module_config.model_config["frozen"] = True
652 return module_config
654 def adapt_market_module_config(
655 self, module_config: FlexibilityMarketModuleConfig
656 ) -> FlexibilityMarketModuleConfig:
657 """Adapt the market module config for automated flexibility quantification."""
658 # allow the module config to be changed
659 module_config.model_config["frozen"] = False
660 for field in module_config.__fields__:
661 if field in self.market_module_config.__fields__.keys():
662 module_config.__setattr__(field, getattr(self.market_module_config,
663 field))
664 module_config.results_file = (
665 self.flex_config.results_directory / module_config.results_file.name
666 )
667 for parameter in module_config.parameters:
668 if parameter.name == glbs.COLLOCATION_TIME_GRID:
669 dis_op = self.baseline_mpc_module_config.optimization_backend[
670 "discretization_options"
671 ]
672 parameter.value = self.get_collocation_time_grid(
673 discretization_options=dis_op
674 )
675 if parameter.name == glbs.TIME_STEP:
676 parameter.value = self.baseline_mpc_module_config.time_step
677 module_config.model_config["frozen"] = True
678 return module_config
680 def adapt_and_dump_flex_config(self):
681 """Update flex_config to reference the newly generated market/indicator agent configs and
682 dump the updated flex configuration to disk.
684 This method replaces the market and indicator configuration entries in ``self.flex_config``
685 with the internally created ``self.market_config`` and ``self.indicator_config``. If a
686 market configuration is present, its ``agent_config`` attribute is updated to the path of
687 the newly created market agent config file under ``flex_files_directory``. Likewise, the
688 indicator configuration's ``agent_config`` attribute is set to the path of the newly
689 created indicator agent config file. These paths correspond to the new locations of the
690 market or indicator config files when they were originally provided to the
691 ``FlexAgentGenerator`` as file paths.
693 After updating these paths, the complete ``flex_config`` is serialized (excluding default
694 values) and written as JSON to ``flex_files_directory / flex_config_file_name`` so that
695 subsequent runs can use the resolved configuration directly.
696 """
697 # store market and indicator with file path of created agent config
698 if self.flex_config.market_config:
699 self.flex_config.market_config = self.market_config
700 self.flex_config.market_config.agent_config = os.path.join(
701 self.flex_config.flex_files_directory,
702 self.market_config.name_of_created_file)
703 self.flex_config.indicator_config = self.indicator_config
704 self.flex_config.indicator_config.agent_config = os.path.join(
705 self.flex_config.flex_files_directory,
706 self.indicator_config.name_of_created_file)
707 # save flex config to created flex files
708 with open(os.path.join(self.flex_config.flex_files_directory,
709 self.flex_config_file_name),
710 "w", encoding="utf-8", ) as f:
711 config_json = self.flex_config.model_dump_json(exclude_defaults=True)
712 f.write(config_json)
714 def get_collocation_time_grid(self, discretization_options: dict):
715 """Get the mpc output collocation grid over the horizon"""
716 # get the mpc time grid configuration
717 time_step = self.baseline_mpc_module_config.time_step
718 prediction_horizon = self.baseline_mpc_module_config.prediction_horizon
719 # get the collocation configuration
720 collocation_method = discretization_options["collocation_method"]
721 collocation_order = discretization_options["collocation_order"]
722 # get the collocation points
723 options = CasadiDiscretizationOptions(
724 collocation_order=collocation_order, collocation_method=collocation_method
725 )
726 collocation_points = DirectCollocation(options=
727 options)._collocation_polynomial().root
728 # compute the mpc output collocation grid
729 discretization_points = np.arange(0, time_step * prediction_horizon, time_step)
730 collocation_time_grid = (
731 discretization_points[:, None] + collocation_points * time_step
732 ).ravel()
733 collocation_time_grid = collocation_time_grid[
734 ~np.isin(collocation_time_grid, discretization_points)
735 ]
736 collocation_time_grid = collocation_time_grid.tolist()
737 return collocation_time_grid
739 def _generate_flex_model_definition(self):
740 """Generate a python module for negative and positive flexibility agents
741 from the Baseline MPC model."""
742 output_file = os.path.join(
743 self.flex_config.flex_files_directory,
744 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
745 )
746 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"]
748 # Extract the config class of the casadi model to check cost functions
749 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"]
750 # Get custom module fields provided by the user and add them
751 model_fields = self.baseline_mpc_module_config.optimization_backend["model"]
752 _ = model_fields.pop("type")
753 config_instance = config_class(**model_fields)
754 # The " + " is just there to simplify the validation, it does not affect
755 # the generated code
756 self.check_variables_in_casadi_config(
757 config_instance,
758 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function +
759 (
760 " + " + self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix
761 if self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix else ""),
762 shadow_mpc_type="neg_flex"
763 )
764 self.check_variables_in_casadi_config(
765 config_instance,
766 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function +
767 (
768 " + " + self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix
769 if self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix else ""),
770 shadow_mpc_type="pos_flex"
771 )
773 # parse mpc python file
774 with open(opt_backend["file"], "r", encoding="utf-8") as f:
775 source = f.read()
776 tree = ast.parse(source)
778 # create modifiers for python file
779 modifier_base = SetupSystemModifier(
780 mpc_data=self.flex_config.baseline_config_generator_data,
781 controls=self.baseline_mpc_module_config.controls,
782 binary_controls=self.baseline_mpc_module_config.binary_controls
783 if hasattr(self.baseline_mpc_module_config, "binary_controls")
784 else None,
785 )
786 modifier_pos = SetupSystemModifier(
787 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
788 controls=self.pos_flex_mpc_module_config.controls,
789 binary_controls=self.pos_flex_mpc_module_config.binary_controls
790 if hasattr(self.pos_flex_mpc_module_config, "binary_controls")
791 else None,
792 )
793 modifier_neg = SetupSystemModifier(
794 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
795 controls=self.neg_flex_mpc_module_config.controls,
796 binary_controls=self.neg_flex_mpc_module_config.binary_controls
797 if hasattr(self.neg_flex_mpc_module_config, "binary_controls")
798 else None,
799 )
800 # run the modification
801 modified_tree_base = modifier_base.visit(deepcopy(tree))
802 modified_tree_pos = modifier_pos.visit(deepcopy(tree))
803 modified_tree_neg = modifier_neg.visit(deepcopy(tree))
804 # combine modifications to one file
805 modified_tree = ast.Module(body=[], type_ignores=[])
806 modified_tree.body.extend(
807 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body
808 )
809 modified_source = astor.to_source(modified_tree)
810 # Use black to format the generated code
811 formatted_code = black.format_str(modified_source, mode=black.FileMode())
813 if self.flex_config.overwrite_files:
814 try:
815 Path(
816 os.path.join(
817 self.flex_config.flex_files_directory,
818 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
819 )
820 ).unlink()
821 except OSError:
822 pass
824 with open(output_file, "w", encoding="utf-8") as f:
825 f.write(formatted_code)
827 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str,
828 shadow_mpc_type: str):
829 """Check if all variables in the expression are defined in the config.
831 Args:
832 config: casadi model config.
833 expr: The expression to check.
835 Raises:
836 ValueError: If any variable in the expression is not defined in the config.
838 """
839 variables_in_config = set(config.get_variable_names())
840 variables_in_cost_function = set(ast.walk(ast.parse(expr)))
841 variables_in_cost_function = {
842 node.attr for node in variables_in_cost_function
843 if isinstance(node, ast.Attribute)
844 }
845 flex_config_data = (self.flex_config.shadow_mpc_config_generator_data.pos_flex
846 if shadow_mpc_type == "pos_flex"
847 else self.flex_config.shadow_mpc_config_generator_data.neg_flex)
848 variables_newly_created = set(
849 [par.name for par in flex_config_data.config_parameters_appendix] +
850 [inp.name for inp in flex_config_data.config_inputs_appendix]
851 )
853 unknown_vars = (variables_in_cost_function - variables_in_config -
854 variables_newly_created)
855 if unknown_vars:
856 self.logger.warning(f"Unknown variables in new cost function: "
857 f"{unknown_vars}. This might cause problems with "
858 f"the optimization backend.")
860 def run_config_validations(self):
861 """Function to validate integrity of user-supplied flex config.
863 Since the validation depends on interactions between multiple configurations,
864 it is performed within this function rather than using Pydantic’s built-in
865 validators for individual configurations.
867 The following checks are performed:
868 1. Ensures the specified power variable exists in the MPC model outputs.
869 2. Ensures the specified comfort variable exists in the MPC model states.
870 3. Validates that the stored energy variable exists in MPC outputs if
871 energy cost correction is enabled.
872 4. Verifies the supported collocation method is used; otherwise,
873 switches to 'legendre' and raises a warning.
874 5. Ensures that the sum of prep time, market time, and flex event duration
875 does not exceed the prediction horizon.
876 6. Ensures market time equals the MPC model time step if market config is
877 present.
878 7. Ensures that all flex time values are multiples of the MPC model time step.
879 8. Checks for mismatches between time-related parameters in the flex/MPC and
880 indicator configs and issues warnings when discrepancies exist, using the
881 flex/MPC config values as the source of truth.
883 """
884 # check if the power variable exists in the mpc config
885 power_var = self.flex_config.baseline_config_generator_data.power_variable
886 if power_var not in [output.name for output in
887 self.baseline_mpc_module_config.outputs]:
888 raise ConfigurationError(
889 f"Given power variable {power_var} is not defined "
890 f"as output in baseline mpc config."
891 )
893 # check if the comfort variable exists in the mpc slack variables
894 mod_type = self.baseline_mpc_module_config.optimization_backend["model"]["type"]
895 if self.flex_config.baseline_config_generator_data.comfort_variable:
896 file_path = mod_type["file"]
897 class_name = mod_type["class_name"]
898 # Get the class
899 dynamic_class = cmng.get_class_from_file(file_path, class_name)
900 if self.flex_config.baseline_config_generator_data.comfort_variable not in [
901 state.name for state in dynamic_class().states
902 ]:
903 raise ConfigurationError(
904 f"Given comfort variable "
905 f"{self.flex_config.baseline_config_generator_data.comfort_variable} "
906 f"is not defined as state in baseline mpc config."
907 )
909 # check if the energy storage variable exists in the mpc config
910 if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
911 if self.indicator_module_config.correct_costs.stored_energy_variable not in [
912 output.name for output in self.baseline_mpc_module_config.outputs
913 ]:
914 raise ConfigurationError(
915 f"The stored energy variable "
916 f"{self.indicator_module_config.correct_costs.stored_energy_variable} "
917 f"is not defined in baseline mpc config. "
918 f"It must be defined in the base MPC model and config as output "
919 f"if the correction of costs is enabled."
920 )
922 # raise warning if unsupported collocation method is used and change
923 # to supported method
924 if (
925 "collocation_method"
926 not in self.baseline_mpc_module_config.optimization_backend[
927 "discretization_options"]
928 ):
929 raise ConfigurationError(
930 "Please use collocation as discretization method and define the "
931 "collocation_method in the mpc config"
932 )
933 else:
934 collocation_method = self.baseline_mpc_module_config.optimization_backend[
935 "discretization_options"
936 ]["collocation_method"]
937 if collocation_method != "legendre":
938 self.logger.warning(
939 "Collocation method %s is not supported. Switching to "
940 "method legendre.",
941 collocation_method,
942 )
943 self.baseline_mpc_module_config.optimization_backend[
944 "discretization_options"][
945 "collocation_method"
946 ] = "legendre"
947 self.pos_flex_mpc_module_config.optimization_backend[
948 "discretization_options"][
949 "collocation_method"
950 ] = "legendre"
951 self.neg_flex_mpc_module_config.optimization_backend[
952 "discretization_options"][
953 "collocation_method"
954 ] = "legendre"
956 # time data validations
957 flex_times = {
958 glbs.PREP_TIME: self.flex_config.prep_time,
959 glbs.MARKET_TIME: self.flex_config.market_time,
960 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration,
961 }
962 mpc_times = {
963 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step,
964 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon,
965 }
966 # total time length check (prep+market+flex_event)
967 if (sum(flex_times.values()) > mpc_times["time_step"] *
968 mpc_times["prediction_horizon"]):
969 raise ConfigurationError(
970 "Market time + prep time + flex event duration "
971 "can not exceed the prediction horizon."
972 )
973 # market time val check
974 if self.flex_config.market_config:
975 if flex_times["market_time"] % mpc_times["time_step"] != 0:
976 raise ConfigurationError(
977 "Market time must be an integer multiple of the time step."
978 )
979 # check for divisibility of flex_times by time_step
980 for name, value in flex_times.items():
981 if value % mpc_times["time_step"] != 0:
982 raise ConfigurationError(
983 f"{name} is not a multiple of the time step. Please redefine."
984 )
985 # raise warning if parameter value in flex indicator module config differs from
986 # value in flex config/ baseline mpc module config
987 for parameter in self.indicator_module_config.parameters:
988 if parameter.value is not None:
989 if parameter.name in flex_times:
990 flex_value = flex_times[parameter.name]
991 if parameter.value != flex_value:
992 self.logger.warning(
993 "Value mismatch for %s in flex config (field) "
994 "and indicator module config (parameter). "
995 "Flex config value will be used.",
996 parameter.name,
997 )
998 elif parameter.name in mpc_times:
999 mpc_value = mpc_times[parameter.name]
1000 if parameter.value != mpc_value:
1001 self.logger.warning(
1002 "Value mismatch for %s in baseline MPC module "
1003 "config (field) and indicator module config (parameter). "
1004 "Baseline MPC module config value will be used.",
1005 parameter.name,
1006 )
1008 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path],
1009 save_name_suffix: str = "") -> Union[str, Path]:
1010 """
1011 Optional helper function to adapt file path for simulator results in sim config,
1012 so that sim results land in the same results directory as flex results.
1014 Args:
1015 simulator_agent_config: Path to the simulator agent config JSON file.
1016 save_name_suffix: Suffix added to the newly created sim_config file.
1018 Returns:
1019 The updated simulator config dictionary with the modified result file path.
1021 Raises:
1022 FileNotFoundError: If the specified config file does not exist.
1024 """
1025 simulator_agent_config = Path(simulator_agent_config)
1026 # open config and extract sim module
1027 with open(simulator_agent_config, "r", encoding="utf-8") as f:
1028 sim_config = json.load(f)
1029 sim_module_config = next(
1030 (module for module in sim_config["modules"] if
1031 module["type"] == "simulator"), None)
1032 # convert filename string to path and extract the name
1033 sim_file_name = Path(sim_module_config["result_filename"]).name
1034 # set results path so that sim results lands in same directory
1035 # as flex result CSVs
1036 sim_module_config["result_filename"] = str(
1037 self.flex_config.results_directory / sim_file_name
1038 )
1039 try:
1040 with open(Path(str(simulator_agent_config.parent) + "\\" +
1041 str(simulator_agent_config.stem) + save_name_suffix + ".json"),
1042 "w", encoding="utf-8") as f:
1043 json.dump(sim_config, f, indent=4)
1044 return Path(str(simulator_agent_config.parent) + "\\" +
1045 str(simulator_agent_config.stem) + save_name_suffix + ".json")
1046 except Exception as e:
1047 raise Exception(f"Could not adapt and create a new simulation config "
1048 f"due to: {e}. "
1049 f"Please check {simulator_agent_config} and "
1050 f"'{save_name_suffix}'")