Coverage for agentlib_flexquant/generate_flex_agents.py: 92%
296 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1"""Generate agents for flexibility quantification.
3This module provides the FlexAgentGenerator class that creates and configures flexibility agents.
4The agents created include the baseline, positive and negative flexibility agents,
5the flexibility indicator and market agents. The agents are created based on the flex config and
6the MPC config.
7"""
8import ast
9import atexit
10import inspect
11import json
12import logging
13import os
14from copy import deepcopy
15from pathlib import Path
16from typing import Union
18import astor
19import black
20import json
21import numpy as np
22from copy import deepcopy
23from pathlib import Path
24from typing import List, Union
25from pydantic import FilePath
26from agentlib.core.agent import AgentConfig
27from agentlib.core.datamodels import AgentVariable
28from agentlib.core.errors import ConfigurationError
29from agentlib.core.module import BaseModuleConfig
30from agentlib.utils import custom_injection, load_config
31from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable
32from agentlib_mpc.models.casadi_model import CasadiModelConfig
33from agentlib_mpc.modules.mpc_full import MPCConfig
35from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation
36from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions
37import agentlib_flexquant.data_structures.globals as glbs
38import agentlib_flexquant.utils.config_management as cmng
39from agentlib_flexquant.utils.parsing import SetupSystemModifier
40from agentlib_flexquant.data_structures.flexquant import (
41 FlexibilityIndicatorConfig,
42 FlexibilityMarketConfig,
43 FlexQuantConfig,
44)
45from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData
46from agentlib_flexquant.modules.flexibility_indicator import (
47 FlexibilityIndicatorModuleConfig,
48)
49from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
52class FlexAgentGenerator:
53 """Class for generating the flex agents
55 orig_mpc_module_config: the config for the original mpc,
56 which has nothing to do with the flexibility quantification
57 baseline_mpc_module_config: the config for the baseline mpc
58 for flexibility quantification
59 pos_flex_mpc_module_config: the config for the positive flexibility mpc
60 for flexibility quantification
61 neg_flex_mpc_module_config: the config for the negative flexibility mpc
62 for flexibility quantification
63 indicator_module_config: the config for the indicator for flexibility quantification
64 market_module_config: the config for the market for flexibility quantification
66 """
68 orig_mpc_module_config: MPCConfig
69 baseline_mpc_module_config: MPCConfig
70 pos_flex_mpc_module_config: MPCConfig
71 neg_flex_mpc_module_config: MPCConfig
72 indicator_module_config: FlexibilityIndicatorModuleConfig
73 market_module_config: FlexibilityMarketModuleConfig
75 def __init__(
76 self,
77 flex_config: Union[str, FilePath, FlexQuantConfig],
78 mpc_agent_config: Union[str, FilePath, AgentConfig],
79 ):
80 self.logger = logging.getLogger(__name__)
82 if isinstance(flex_config, str or FilePath):
83 self.flex_config_file_name = os.path.basename(flex_config)
84 else:
85 # provide default name for json
86 self.flex_config_file_name = "flex_config.json"
87 # load configs
88 self.flex_config = load_config.load_config(flex_config, config_type=FlexQuantConfig)
90 # original mpc agent
91 self.orig_mpc_agent_config = load_config.load_config(
92 mpc_agent_config, config_type=AgentConfig
93 )
94 # baseline agent
95 self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
96 # pos agent
97 self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
98 # neg agent
99 self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
101 # original mpc module
102 self.orig_mpc_module_config = cmng.get_module(
103 config=self.orig_mpc_agent_config,
104 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
105 )
106 # baseline module
107 self.baseline_mpc_module_config = cmng.get_module(
108 config=self.baseline_mpc_agent_config,
109 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
110 )
111 # convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional
112 # fields not present in the original
113 self.baseline_mpc_module_config = cmng.get_flex_mpc_module_config(
114 agent_config=self.baseline_mpc_agent_config,
115 module_config=self.baseline_mpc_module_config,
116 module_type=self.flex_config.baseline_config_generator_data.module_types[
117 self.baseline_mpc_module_config.type
118 ],
119 )
120 # pos module
121 self.pos_flex_mpc_module_config = cmng.get_module(
122 config=self.pos_flex_mpc_agent_config,
123 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
124 )
125 # neg module
126 self.neg_flex_mpc_module_config = cmng.get_module(
127 config=self.neg_flex_mpc_agent_config,
128 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
129 )
130 # load indicator config
131 self.indicator_config = load_config.load_config(
132 self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig
133 )
134 # load indicator module config
135 self.indicator_agent_config = load_config.load_config(
136 self.indicator_config.agent_config, config_type=AgentConfig
137 )
138 self.indicator_module_config = cmng.get_module(
139 config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE
140 )
141 # load market config
142 if self.flex_config.market_config:
143 self.market_config = load_config.load_config(
144 self.flex_config.market_config, config_type=FlexibilityMarketConfig
145 )
146 # load market module config
147 self.market_agent_config = load_config.load_config(
148 self.market_config.agent_config, config_type=AgentConfig
149 )
150 self.market_module_config = cmng.get_module(
151 config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
152 )
153 else:
154 self.flex_config.market_time = 0
156 self.run_config_validations()
158 def generate_flex_agents(self) -> list[str]:
159 """Generate the configs and the python module for the flexibility agents.
161 Returns:
162 list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator
163 and market config
165 """
166 # adapt modules to include necessary communication variables
167 baseline_mpc_config = self.adapt_mpc_module_config(
168 module_config=self.baseline_mpc_module_config,
169 mpc_dataclass=self.flex_config.baseline_config_generator_data,
170 agent_id=self.baseline_mpc_agent_config.id,
171 )
172 pf_mpc_config = self.adapt_mpc_module_config(
173 module_config=self.pos_flex_mpc_module_config,
174 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
175 agent_id=self.pos_flex_mpc_agent_config.id,
176 )
177 nf_mpc_config = self.adapt_mpc_module_config(
178 module_config=self.neg_flex_mpc_module_config,
179 mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
180 agent_id=self.neg_flex_mpc_agent_config.id,
181 )
182 indicator_module_config = self.adapt_indicator_config(
183 module_config=self.indicator_module_config
184 )
185 if self.flex_config.market_config:
186 market_module_config = self.adapt_market_config(module_config=self.market_module_config)
188 # dump jsons of the agents including the adapted module configs
189 self.append_module_and_dump_agent(
190 module=baseline_mpc_config,
191 agent=self.baseline_mpc_agent_config,
192 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
193 config_name=self.flex_config.baseline_config_generator_data.name_of_created_file,
194 )
195 self.append_module_and_dump_agent(
196 module=pf_mpc_config,
197 agent=self.pos_flex_mpc_agent_config,
198 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
199 config_name=self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file,
200 )
201 self.append_module_and_dump_agent(
202 module=nf_mpc_config,
203 agent=self.neg_flex_mpc_agent_config,
204 module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
205 config_name=self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file,
206 )
207 self.append_module_and_dump_agent(
208 module=indicator_module_config,
209 agent=self.indicator_agent_config,
210 module_type=cmng.INDICATOR_CONFIG_TYPE,
211 config_name=self.indicator_config.name_of_created_file,
212 )
213 if self.flex_config.market_config:
214 self.append_module_and_dump_agent(
215 module=market_module_config,
216 agent=self.market_agent_config,
217 module_type=cmng.MARKET_CONFIG_TYPE,
218 config_name=self.market_config.name_of_created_file,
219 )
220 # generate python files for the shadow mpcs
221 self._generate_flex_model_definition()
223 # save flex config to created flex files
224 with open(
225 os.path.join(self.flex_config.flex_files_directory, self.flex_config_file_name),
226 "w",
227 encoding="utf-8",
228 ) as f:
229 config_json = self.flex_config.model_dump_json(exclude_defaults=True)
230 f.write(config_json)
232 # register the exit function if the corresponding flag is set
233 if self.flex_config.delete_files:
234 atexit.register(lambda: self._delete_created_files())
235 return self.get_config_file_paths()
237 def append_module_and_dump_agent(
238 self,
239 module: BaseModuleConfig,
240 agent: AgentConfig,
241 module_type: str,
242 config_name: str,
243 ):
244 """Append the given module config to the given agent config and
245 dumps the agent config to a json file.
247 The json file is named based on the config_name.
249 Args:
250 module: The module config to be appended.
251 agent: The agent config to be updated.
252 module_type: The type of the module
253 config_name: The name of the json file for module config (e.g. baseline.json)
255 """
256 # if module is not from the baseline, set a new agent id, based on module id
257 if module.type is not self.baseline_mpc_module_config.type:
258 agent.id = module.module_id
259 # get the module as a dict without default values
260 module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module)
261 # write given module to agent config
262 for i, agent_module in enumerate(agent.modules):
263 if cmng.MODULE_TYPE_DICT[module_type] is cmng.MODULE_TYPE_DICT[agent_module["type"]]:
264 agent.modules[i] = module_dict
266 # dump agent config
267 if agent.modules:
268 if self.flex_config.overwrite_files:
269 try:
270 Path(os.path.join(self.flex_config.flex_files_directory, config_name)).unlink()
271 except OSError:
272 pass
273 with open(
274 os.path.join(self.flex_config.flex_files_directory, config_name),
275 "w+",
276 encoding="utf-8",
277 ) as f:
278 module_json = agent.model_dump_json(exclude_defaults=True)
279 f.write(module_json)
280 else:
281 logging.error("Provided agent config does not contain any modules.")
283 def get_config_file_paths(self) -> list[str]:
284 """Return a list of paths with the created config files."""
285 paths = [
286 os.path.join(
287 self.flex_config.flex_files_directory,
288 self.flex_config.baseline_config_generator_data.name_of_created_file,
289 ),
290 os.path.join(
291 self.flex_config.flex_files_directory,
292 self.flex_config.shadow_mpc_config_generator_data.pos_flex.name_of_created_file,
293 ),
294 os.path.join(
295 self.flex_config.flex_files_directory,
296 self.flex_config.shadow_mpc_config_generator_data.neg_flex.name_of_created_file,
297 ),
298 os.path.join(
299 self.flex_config.flex_files_directory,
300 self.indicator_config.name_of_created_file,
301 ),
302 ]
303 if self.flex_config.market_config:
304 paths.append(
305 os.path.join(
306 self.flex_config.flex_files_directory,
307 self.market_config.name_of_created_file,
308 )
309 )
310 return paths
312 def _delete_created_files(self):
313 """Function to run at exit if the files are to be deleted."""
314 to_be_deleted = self.get_config_file_paths()
315 to_be_deleted.append(
316 os.path.join(
317 self.flex_config.flex_files_directory,
318 self.flex_config_file_name,
319 )
320 )
321 # delete files
322 for file in to_be_deleted:
323 Path(file).unlink()
324 # also delete folder
325 Path(self.flex_config.flex_files_directory).rmdir()
327 def adapt_mpc_module_config(
328 self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str
329 ) -> MPCConfig:
330 """Adapt the mpc module config for automated flexibility quantification.
332 Things adapted among others are:
333 - the file name/path of the mpc config file
334 - names of the control variables for the shadow mpcs
335 - reduce communicated variables of shadow mpcs to outputs
336 - add the power variable to the outputs
337 - add parameters for the activation and quantification of flexibility
339 Args:
340 module_config: The module config to be adapted
341 mpc_dataclass: The dataclass corresponding to the type of the MPC module.
342 It contains all the extra data necessary for flexibility quantification,
343 which will be used to update the module_config.
344 agent_id: agent_id for creating the flexquant mpc module config
346 Returns:
347 The adapted module config
349 """
350 # allow the module config to be changed
351 module_config.model_config["frozen"] = False
353 # set new MPC type
354 module_config.type = mpc_dataclass.module_types[
355 cmng.get_orig_module_type(self.orig_mpc_agent_config)
356 ]
358 # set the MPC config type from the MPCConfig in agentlib_mpc to the corresponding one in
359 # flexquant and add additional fields
360 module_config_flex_dict = module_config.model_dump()
361 module_config_flex_dict["casadi_sim_time_step"] = self.flex_config.casadi_sim_time_step
362 module_config_flex_dict[
363 "power_variable_name"
364 ] = self.flex_config.baseline_config_generator_data.power_variable
365 module_config_flex_dict[
366 "storage_variable_name"
367 ] = self.indicator_module_config.correct_costs.stored_energy_variable
368 module_config_flex = cmng.MODULE_TYPE_DICT[module_config.type](
369 **module_config_flex_dict, _agent_id=agent_id
370 )
372 # allow the module config to be changed
373 module_config_flex.model_config["frozen"] = False
375 module_config_flex.module_id = mpc_dataclass.module_id
377 # append the new weights as parameter to the MPC or update its value
378 parameter_dict = {parameter.name: parameter for parameter in module_config_flex.parameters}
379 for weight in mpc_dataclass.weights:
380 if weight.name in parameter_dict:
381 parameter_dict[weight.name].value = weight.value
382 else:
383 module_config_flex.parameters.append(weight)
385 # set new id (needed for plotting)
386 module_config_flex.module_id = mpc_dataclass.module_id
387 # update optimization backend to use the created mpc files and classes
388 module_config_flex.optimization_backend["model"]["type"] = {
389 "file": os.path.join(
390 self.flex_config.flex_files_directory,
391 mpc_dataclass.created_flex_mpcs_file,
392 ),
393 "class_name": mpc_dataclass.class_name,
394 }
395 # extract filename from results file and update it with suffix and parent directory
396 result_filename = Path(
397 module_config_flex.optimization_backend["results_file"]
398 ).name.replace(".csv", mpc_dataclass.results_suffix)
399 full_path = self.flex_config.results_directory / result_filename
400 module_config_flex.optimization_backend["results_file"] = str(full_path)
401 # change cia backend to custom backend of flexquant
402 if module_config_flex.optimization_backend["type"] == "casadi_cia":
403 module_config_flex.optimization_backend["type"] = "casadi_cia_cons"
404 module_config_flex.optimization_backend["market_time"] = self.flex_config.market_time
406 # add the full control trajectory output from the baseline as input for the shadow mpcs
407 if not isinstance(mpc_dataclass, BaselineMPCData):
408 for control in module_config_flex.controls:
409 module_config_flex.inputs.append(
410 MPCVariable(
411 name=control.name + glbs.full_trajectory_suffix,
412 value=None,
413 type="pd.Series",
414 )
415 )
416 # change the alias of control variable in shadow mpc to prevent it from triggering
417 # the wrong callback
418 control.alias = control.name + glbs.shadow_suffix
419 # also include binary controls
420 if hasattr(module_config_flex, "binary_controls"):
421 for control in module_config_flex.binary_controls:
422 module_config_flex.inputs.append(
423 MPCVariable(
424 name=control.name + glbs.full_trajectory_suffix,
425 value=None,
426 type="pd.Series",
427 )
428 )
429 # change the alias of control variable in shadow mpc to prevent it from
430 # triggering the wrong callback
431 control.alias = control.name + glbs.shadow_suffix
432 # only communicate outputs for the shadow mpcs
433 module_config_flex.shared_variable_fields = ["outputs"]
434 else:
435 # add full_controls trajectory as AgentVariable to the config of Baseline mpc
436 for control in module_config_flex.controls:
437 module_config_flex.full_controls.append(
438 AgentVariable(
439 name=control.name + glbs.full_trajectory_suffix,
440 alias=control.name + glbs.full_trajectory_suffix,
441 shared=True,
442 )
443 )
444 if hasattr(module_config_flex, "binary_controls"):
445 for binary_controls in module_config_flex.binary_controls:
446 module_config_flex.full_controls.append(
447 AgentVariable(
448 name=binary_controls.name + glbs.full_trajectory_suffix,
449 alias=binary_controls.name + glbs.full_trajectory_suffix,
450 shared=True,
451 )
452 )
453 module_config_flex.set_outputs = True
454 # add outputs for the power variables, for easier handling create a lookup dict
455 output_dict = {output.name: output for output in module_config_flex.outputs}
456 if self.flex_config.baseline_config_generator_data.power_variable in output_dict:
457 output_dict[
458 self.flex_config.baseline_config_generator_data.power_variable
459 ].alias = mpc_dataclass.power_alias
460 else:
461 module_config_flex.outputs.append(
462 MPCVariable(
463 name=self.flex_config.baseline_config_generator_data.power_variable,
464 alias=mpc_dataclass.power_alias,
465 )
466 )
467 # add or change alias for stored energy variable
468 if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
469 output_dict[
470 self.indicator_module_config.correct_costs.stored_energy_variable
471 ].alias = mpc_dataclass.stored_energy_alias
473 # add extra inputs needed for activation of flex
474 module_config_flex.inputs.extend(mpc_dataclass.config_inputs_appendix)
475 # CONFIG_PARAMETERS_APPENDIX only includes dummy values
476 # overwrite dummy values with values from flex config and append it to module config
477 for var in mpc_dataclass.config_parameters_appendix:
478 if var.name in self.flex_config.model_fields:
479 var.value = getattr(self.flex_config, var.name)
480 if var.name in self.flex_config.baseline_config_generator_data.model_fields:
481 var.value = getattr(self.flex_config.baseline_config_generator_data, var.name)
482 module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix)
484 # freeze the config again
485 module_config_flex.model_config["frozen"] = True
487 return module_config_flex
489 def adapt_indicator_config(
490 self, module_config: FlexibilityIndicatorModuleConfig
491 ) -> FlexibilityIndicatorModuleConfig:
492 """Adapt the indicator module config for automated flexibility quantification."""
493 # append user-defined price var to indicator module config
494 module_config.inputs.append(
495 AgentVariable(
496 name=module_config.price_variable,
497 unit="ct/kWh",
498 type="pd.Series",
499 description="electricity price",
500 )
501 )
502 # allow the module config to be changed
503 module_config.model_config["frozen"] = False
504 for parameter in module_config.parameters:
505 if parameter.name == glbs.PREP_TIME:
506 parameter.value = self.flex_config.prep_time
507 if parameter.name == glbs.MARKET_TIME:
508 parameter.value = self.flex_config.market_time
509 if parameter.name == glbs.FLEX_EVENT_DURATION:
510 parameter.value = self.flex_config.flex_event_duration
511 if parameter.name == glbs.TIME_STEP:
512 parameter.value = self.baseline_mpc_module_config.time_step
513 if parameter.name == glbs.PREDICTION_HORIZON:
514 parameter.value = self.baseline_mpc_module_config.prediction_horizon
515 if parameter.name == glbs.COLLOCATION_TIME_GRID:
516 discretization_options = self.baseline_mpc_module_config.optimization_backend[
517 "discretization_options"
518 ]
519 parameter.value = self.get_collocation_time_grid(
520 discretization_options=discretization_options
521 )
522 # set power unit
523 module_config.power_unit = self.flex_config.baseline_config_generator_data.power_unit
524 module_config.results_file = (
525 self.flex_config.results_directory / module_config.results_file.name
526 )
527 module_config.model_config["frozen"] = True
528 return module_config
530 def adapt_market_config(
531 self, module_config: FlexibilityMarketModuleConfig
532 ) -> FlexibilityMarketModuleConfig:
533 """Adapt the market module config for automated flexibility quantification."""
534 # allow the module config to be changed
535 module_config.model_config["frozen"] = False
536 for field in module_config.__fields__:
537 if field in self.market_module_config.__fields__.keys():
538 module_config.__setattr__(field, getattr(self.market_module_config, field))
539 module_config.results_file = (
540 self.flex_config.results_directory / module_config.results_file.name
541 )
542 for parameter in module_config.parameters:
543 if parameter.name == glbs.COLLOCATION_TIME_GRID:
544 discretization_options = self.baseline_mpc_module_config.optimization_backend[
545 "discretization_options"
546 ]
547 parameter.value = self.get_collocation_time_grid(
548 discretization_options=discretization_options
549 )
550 if parameter.name == glbs.TIME_STEP:
551 parameter.value = self.baseline_mpc_module_config.time_step
552 module_config.model_config["frozen"] = True
553 return module_config
555 def get_collocation_time_grid(self, discretization_options: dict):
556 """Get the mpc output collocation grid over the horizon"""
557 # get the mpc time grid configuration
558 time_step = self.baseline_mpc_module_config.time_step
559 prediction_horizon = self.baseline_mpc_module_config.prediction_horizon
560 # get the collocation configuration
561 collocation_method = discretization_options["collocation_method"]
562 collocation_order = discretization_options["collocation_order"]
563 # get the collocation points
564 options = CasadiDiscretizationOptions(
565 collocation_order=collocation_order, collocation_method=collocation_method
566 )
567 collocation_points = DirectCollocation(options=options)._collocation_polynomial().root
568 # compute the mpc output collocation grid
569 discretization_points = np.arange(0, time_step * prediction_horizon, time_step)
570 collocation_time_grid = (
571 discretization_points[:, None] + collocation_points * time_step
572 ).ravel()
573 collocation_time_grid = collocation_time_grid[
574 ~np.isin(collocation_time_grid, discretization_points)
575 ]
576 collocation_time_grid = collocation_time_grid.tolist()
577 return collocation_time_grid
579 def _generate_flex_model_definition(self):
580 """Generate a python module for negative and positive flexibility agents
581 from the Baseline MPC model."""
582 output_file = os.path.join(
583 self.flex_config.flex_files_directory,
584 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
585 )
586 opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"]
588 # Extract the config class of the casadi model to check cost functions
589 config_class = inspect.get_annotations(custom_injection(opt_backend))["config"]
590 config_instance = config_class()
591 self.check_variables_in_casadi_config(
592 config_instance,
593 self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function,
594 )
595 self.check_variables_in_casadi_config(
596 config_instance,
597 self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function,
598 )
600 # parse mpc python file
601 with open(opt_backend["file"], "r", encoding="utf-8") as f:
602 source = f.read()
603 tree = ast.parse(source)
605 # create modifiers for python file
606 modifier_base = SetupSystemModifier(
607 mpc_data=self.flex_config.baseline_config_generator_data,
608 controls=self.baseline_mpc_module_config.controls,
609 binary_controls=self.baseline_mpc_module_config.binary_controls
610 if hasattr(self.baseline_mpc_module_config, "binary_controls")
611 else None,
612 )
613 modifier_pos = SetupSystemModifier(
614 mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
615 controls=self.pos_flex_mpc_module_config.controls,
616 binary_controls=self.pos_flex_mpc_module_config.binary_controls
617 if hasattr(self.pos_flex_mpc_module_config, "binary_controls")
618 else None,
619 )
620 modifier_neg = SetupSystemModifier(
621 mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
622 controls=self.neg_flex_mpc_module_config.controls,
623 binary_controls=self.neg_flex_mpc_module_config.binary_controls
624 if hasattr(self.neg_flex_mpc_module_config, "binary_controls")
625 else None,
626 )
627 # run the modification
628 modified_tree_base = modifier_base.visit(deepcopy(tree))
629 modified_tree_pos = modifier_pos.visit(deepcopy(tree))
630 modified_tree_neg = modifier_neg.visit(deepcopy(tree))
631 # combine modifications to one file
632 modified_tree = ast.Module(body=[], type_ignores=[])
633 modified_tree.body.extend(
634 modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body
635 )
636 modified_source = astor.to_source(modified_tree)
637 # Use black to format the generated code
638 formatted_code = black.format_str(modified_source, mode=black.FileMode())
640 if self.flex_config.overwrite_files:
641 try:
642 Path(
643 os.path.join(
644 self.flex_config.flex_files_directory,
645 self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
646 )
647 ).unlink()
648 except OSError:
649 pass
651 with open(output_file, "w", encoding="utf-8") as f:
652 f.write(formatted_code)
654 def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str):
655 """Check if all variables in the expression are defined in the config.
657 Args:
658 config: casadi model config.
659 expr: The expression to check.
661 Raises:
662 ValueError: If any variable in the expression is not defined in the config.
664 """
665 variables_in_config = set(config.get_variable_names())
666 variables_in_cost_function = set(ast.walk(ast.parse(expr)))
667 variables_in_cost_function = {
668 node.attr for node in variables_in_cost_function if isinstance(node, ast.Attribute)
669 }
670 variables_newly_created = set(
671 weight.name for weight in self.flex_config.shadow_mpc_config_generator_data.weights
672 )
673 unknown_vars = variables_in_cost_function - variables_in_config - variables_newly_created
674 if unknown_vars:
675 raise ValueError(f"Unknown variables in new cost function: {unknown_vars}")
677 def run_config_validations(self):
678 """Function to validate integrity of user-supplied flex config.
680 Since the validation depends on interactions between multiple configurations, it is
681 performed within this function rather than using Pydantic’s built-in validators for
682 individual configurations.
684 The following checks are performed:
685 1. Ensures the specified power variable exists in the MPC model outputs.
686 2. Ensures the specified comfort variable exists in the MPC model states.
687 3. Validates that the stored energy variable exists in MPC outputs if
688 energy cost correction is enabled.
689 4. Verifies the supported collocation method is used; otherwise,
690 switches to 'legendre' and raises a warning.
691 5. Ensures that the sum of prep time, market time, and flex event duration
692 does not exceed the prediction horizon.
693 6. Ensures market time equals the MPC model time step if market config is present.
694 7. Ensures that all flex time values are multiples of the MPC model time step.
695 8. Checks for mismatches between time-related parameters in the flex/MPC and
696 indicator configs and issues warnings
697 when discrepancies exist, using the flex/MPC config values as the source of truth.
699 """
700 # check if the power variable exists in the mpc config
701 power_var = self.flex_config.baseline_config_generator_data.power_variable
702 if power_var not in [output.name for output in self.baseline_mpc_module_config.outputs]:
703 raise ConfigurationError(
704 f"Given power variable {power_var} is not defined "
705 f"as output in baseline mpc config."
706 )
708 # check if the comfort variable exists in the mpc slack variables
709 if self.flex_config.baseline_config_generator_data.comfort_variable:
710 file_path = self.baseline_mpc_module_config.optimization_backend["model"]["type"][
711 "file"
712 ]
713 class_name = self.baseline_mpc_module_config.optimization_backend["model"]["type"][
714 "class_name"
715 ]
716 # Get the class
717 dynamic_class = cmng.get_class_from_file(file_path, class_name)
718 if self.flex_config.baseline_config_generator_data.comfort_variable not in [
719 state.name for state in dynamic_class().states
720 ]:
721 raise ConfigurationError(
722 f"Given comfort variable "
723 f"{self.flex_config.baseline_config_generator_data.comfort_variable} "
724 f"is not defined as state in baseline mpc config."
725 )
727 # check if the energy storage variable exists in the mpc config
728 if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
729 if self.indicator_module_config.correct_costs.stored_energy_variable not in [
730 output.name for output in self.baseline_mpc_module_config.outputs
731 ]:
732 raise ConfigurationError(
733 f"The stored energy variable "
734 f"{self.indicator_module_config.correct_costs.stored_energy_variable} "
735 f"is not defined in baseline mpc config. "
736 f"It must be defined in the base MPC model and config as output "
737 f"if the correction of costs is enabled."
738 )
740 # raise warning if unsupported collocation method is used and change to supported method
741 if (
742 "collocation_method"
743 not in self.baseline_mpc_module_config.optimization_backend["discretization_options"]
744 ):
745 raise ConfigurationError(
746 "Please use collocation as discretization method and define the collocation_method "
747 "in the mpc config"
748 )
749 else:
750 collocation_method = self.baseline_mpc_module_config.optimization_backend[
751 "discretization_options"
752 ]["collocation_method"]
753 if collocation_method != "legendre":
754 self.logger.warning(
755 "Collocation method %s is not supported. Switching to method legendre.",
756 collocation_method,
757 )
758 self.baseline_mpc_module_config.optimization_backend["discretization_options"][
759 "collocation_method"
760 ] = "legendre"
761 self.pos_flex_mpc_module_config.optimization_backend["discretization_options"][
762 "collocation_method"
763 ] = "legendre"
764 self.neg_flex_mpc_module_config.optimization_backend["discretization_options"][
765 "collocation_method"
766 ] = "legendre"
768 # time data validations
769 flex_times = {
770 glbs.PREP_TIME: self.flex_config.prep_time,
771 glbs.MARKET_TIME: self.flex_config.market_time,
772 glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration,
773 }
774 mpc_times = {
775 glbs.TIME_STEP: self.baseline_mpc_module_config.time_step,
776 glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon,
777 }
778 # total time length check (prep+market+flex_event)
779 if sum(flex_times.values()) > mpc_times["time_step"] * mpc_times["prediction_horizon"]:
780 raise ConfigurationError(
781 "Market time + prep time + flex event duration "
782 "can not exceed the prediction horizon."
783 )
784 # market time val check
785 if self.flex_config.market_config:
786 if flex_times["market_time"] % mpc_times["time_step"] != 0:
787 raise ConfigurationError(
788 "Market time must be an integer multiple of the time step."
789 )
790 # check for divisibility of flex_times by time_step
791 for name, value in flex_times.items():
792 if value % mpc_times["time_step"] != 0:
793 raise ConfigurationError(
794 f"{name} is not a multiple of the time step. Please redefine."
795 )
796 # raise warning if parameter value in flex indicator module config differs from
797 # value in flex config/ baseline mpc module config
798 for parameter in self.indicator_module_config.parameters:
799 if parameter.value is not None:
800 if parameter.name in flex_times:
801 flex_value = flex_times[parameter.name]
802 if parameter.value != flex_value:
803 self.logger.warning(
804 "Value mismatch for %s in flex config (field) "
805 "and indicator module config (parameter). "
806 "Flex config value will be used.",
807 parameter.name,
808 )
809 elif parameter.name in mpc_times:
810 mpc_value = mpc_times[parameter.name]
811 if parameter.value != mpc_value:
812 self.logger.warning(
813 "Value mismatch for %s in baseline MPC module "
814 "config (field) and indicator module config (parameter). "
815 "Baseline MPC module config value will be used.",
816 parameter.name,
817 )
819 def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path]) -> dict:
820 """
821 Optional helper function to adapt file path for simulator results in sim config,
822 so that sim results land in the same results directory as flex results.
824 Args:
825 simulator_agent_config: Path to the simulator agent config JSON file.
827 Returns:
828 The updated simulator config dictionary with the modified result file path.
830 Raises:
831 FileNotFoundError: If the specified config file does not exist.
833 """
834 # open config and extract sim module
835 with open(simulator_agent_config, "r", encoding="utf-8") as f:
836 sim_config = json.load(f)
837 sim_module_config = next(
838 (module for module in sim_config["modules"] if module["type"] == "simulator"),
839 None,
840 )
841 # convert filename string to path and extract the name
842 sim_file_name = Path(sim_module_config["result_filename"]).name
843 # set results path so that sim results lands in same directory as flex result CSVs
844 sim_module_config["result_filename"] = str(
845 self.flex_config.results_directory / sim_file_name
846 )
847 return sim_config