"""Generate agents for flexibility quantification.
This module provides the FlexAgentGenerator class that creates and configures
flexibility agents.
The agents created include the baseline, positive and negative flexibility agents,
the flexibility indicator and market agents. The agents are created based on the
flex config and the MPC config.
"""
import ast
import atexit
import inspect
import logging
import os
import astor
import black
import json
import numpy as np
from copy import deepcopy
from pathlib import Path
from typing import Union
from pydantic import FilePath
from agentlib.core.agent import AgentConfig
from agentlib.core.datamodels import AgentVariable
from agentlib.core.errors import ConfigurationError
from agentlib.core.module import BaseModuleConfig
from agentlib.utils import custom_injection, load_config
from agentlib_mpc.data_structures.mpc_datamodels import MPCVariable
from agentlib_mpc.models.casadi_model import CasadiModelConfig
from agentlib_mpc.modules.mpc.mpc_full import MPCConfig
from agentlib_mpc.optimization_backends.casadi_.basic import DirectCollocation
from agentlib_mpc.data_structures.casadi_utils import CasadiDiscretizationOptions
import agentlib_flexquant.data_structures.globals as glbs
import agentlib_flexquant.utils.config_management as cmng
from agentlib_flexquant.utils.parsing import SetupSystemModifier
from agentlib_flexquant.data_structures.flexquant import (
FlexibilityIndicatorConfig,
FlexibilityMarketConfig,
FlexQuantConfig,
)
from agentlib_flexquant.data_structures.mpcs import BaselineMPCData, BaseMPCData
from agentlib_flexquant.modules.flexibility_indicator import (
FlexibilityIndicatorModuleConfig,
)
from agentlib_flexquant.modules.flexibility_market import FlexibilityMarketModuleConfig
[docs]class FlexAgentGenerator:
"""Class for generating the flex agents
orig_mpc_module_config: the config for the original mpc,
which has nothing to do with the flexibility quantification
baseline_mpc_module_config: the config for the baseline mpc
for flexibility quantification
pos_flex_mpc_module_config: the config for the positive flexibility mpc
for flexibility quantification
neg_flex_mpc_module_config: the config for the negative flexibility mpc
for flexibility quantification
indicator_module_config: the config for the indicator for flexibility quantification
market_module_config: the config for the market for flexibility quantification
"""
orig_mpc_module_config: MPCConfig
baseline_mpc_module_config: MPCConfig
pos_flex_mpc_module_config: MPCConfig
neg_flex_mpc_module_config: MPCConfig
indicator_module_config: FlexibilityIndicatorModuleConfig
market_module_config: FlexibilityMarketModuleConfig
def __init__(
self,
flex_config: Union[str, FilePath, FlexQuantConfig],
mpc_agent_config: Union[str, FilePath, AgentConfig],
):
self.logger = logging.getLogger(__name__)
if isinstance(flex_config, str or FilePath):
self.flex_config_file_name = os.path.basename(flex_config)
else:
# provide default name for json
self.flex_config_file_name = "flex_config.json"
# load configs
self.flex_config = load_config.load_config(flex_config,
config_type=FlexQuantConfig)
# original mpc agent
self.orig_mpc_agent_config = load_config.load_config(
mpc_agent_config, config_type=AgentConfig
)
# baseline agent
self.baseline_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
self.baseline_mpc_agent_config.id = (self.flex_config.
baseline_config_generator_data.agent_id)
# pos agent
self.pos_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
self.pos_flex_mpc_agent_config.id = (self.flex_config.
shadow_mpc_config_generator_data.
pos_flex.agent_id)
# neg agent
self.neg_flex_mpc_agent_config = self.orig_mpc_agent_config.__deepcopy__()
self.neg_flex_mpc_agent_config.id = (self.flex_config.
shadow_mpc_config_generator_data.
neg_flex.agent_id)
# original mpc module
self.orig_mpc_module_config = cmng.get_module(
config=self.orig_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
)
# baseline module
self.baseline_mpc_module_config = cmng.get_module(
config=self.baseline_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
)
# convert agentlib_mpc’s ModuleConfig to flexquant’s ModuleConfig to include additional
# fields not present in the original
self.baseline_mpc_module_config = cmng.get_flex_mpc_module_config(
agent_config=self.baseline_mpc_agent_config,
module_config=self.baseline_mpc_module_config,
module_type=self.flex_config.baseline_config_generator_data.module_types[
self.baseline_mpc_module_config.type
]
)
# pos module
self.pos_flex_mpc_module_config = cmng.get_module(
config=self.pos_flex_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
)
# neg module
self.neg_flex_mpc_module_config = cmng.get_module(
config=self.neg_flex_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
)
# load indicator config
self.indicator_config = load_config.load_config(
self.flex_config.indicator_config, config_type=FlexibilityIndicatorConfig
)
# load indicator module config
self.indicator_agent_config = load_config.load_config(
self.indicator_config.agent_config, config_type=AgentConfig
)
self.indicator_module_config = cmng.get_module(
config=self.indicator_agent_config, module_type=cmng.INDICATOR_CONFIG_TYPE
)
# load market config
if self.flex_config.market_config:
self.market_config = load_config.load_config(
self.flex_config.market_config, config_type=FlexibilityMarketConfig
)
# load market module config
self.market_agent_config = load_config.load_config(
self.market_config.agent_config, config_type=AgentConfig
)
self.market_module_config = cmng.get_module(
config=self.market_agent_config, module_type=cmng.MARKET_CONFIG_TYPE
)
else:
self.flex_config.market_time = 0
self.run_config_validations()
[docs] def generate_flex_agents(self) -> list[str]:
"""Generate the configs and the python module for the flexibility agents.
Returns:
list of the full path for baseline mpc, pos_flex mpc, neg_flex mpc, indicator
and market config
"""
# adapt modules to include necessary communication variables
baseline_mpc_config = self.adapt_mpc_module_config(
module_config=self.baseline_mpc_module_config,
mpc_dataclass=self.flex_config.baseline_config_generator_data,
agent_id=self.flex_config.baseline_config_generator_data.agent_id,
)
pf_mpc_config = self.adapt_mpc_module_config(
module_config=self.pos_flex_mpc_module_config,
mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
agent_id=self.flex_config.shadow_mpc_config_generator_data.pos_flex.agent_id,
)
nf_mpc_config = self.adapt_mpc_module_config(
module_config=self.neg_flex_mpc_module_config,
mpc_dataclass=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
agent_id=self.flex_config.shadow_mpc_config_generator_data.neg_flex.agent_id,
)
indicator_module_config = self.adapt_indicator_module_config(
module_config=self.indicator_module_config
)
if self.flex_config.market_config:
market_module_config = self.adapt_market_module_config(
module_config=self.market_module_config
)
# dump jsons of the agents including the adapted module configs
self.append_module_and_dump_agent(
module=baseline_mpc_config,
agent=self.baseline_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
config_name=self.flex_config.baseline_config_generator_data.
name_of_created_file,
)
self.append_module_and_dump_agent(
module=pf_mpc_config,
agent=self.pos_flex_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
config_name=self.flex_config.shadow_mpc_config_generator_data.
pos_flex.name_of_created_file,
)
self.append_module_and_dump_agent(
module=nf_mpc_config,
agent=self.neg_flex_mpc_agent_config,
module_type=cmng.get_orig_module_type(self.orig_mpc_agent_config),
config_name=self.flex_config.shadow_mpc_config_generator_data.
neg_flex.name_of_created_file,
)
self.append_module_and_dump_agent(
module=indicator_module_config,
agent=self.indicator_agent_config,
module_type=cmng.INDICATOR_CONFIG_TYPE,
config_name=self.indicator_config.name_of_created_file,
)
if self.flex_config.market_config:
self.append_module_and_dump_agent(
module=market_module_config,
agent=self.market_agent_config,
module_type=cmng.MARKET_CONFIG_TYPE,
config_name=self.market_config.name_of_created_file,
)
# generate python files for the shadow mpcs
self._generate_flex_model_definition()
# add new paths to flex config and dump it
self.adapt_and_dump_flex_config()
# register the exit function if the corresponding flag is set
if self.flex_config.delete_files:
atexit.register(lambda: self._delete_created_files())
return self.get_config_file_paths()
[docs] def append_module_and_dump_agent(
self,
module: BaseModuleConfig,
agent: AgentConfig,
module_type: str,
config_name: str,
):
"""Append the given module config to the given agent config and
dumps the agent config to a json file.
The json file is named based on the config_name.
Args:
module: The module config to be appended.
agent: The agent config to be updated.
module_type: The type of the module
config_name: The name of the json file for module config (e.g. baseline.json)
"""
# get the module as a dict without default values
module_dict = cmng.to_dict_and_remove_unnecessary_fields(module=module)
# write given module to agent config
for i, agent_module in enumerate(agent.modules):
if cmng.MODULE_TYPE_DICT[module_type] is cmng.MODULE_TYPE_DICT[
agent_module["type"]]:
agent.modules[i] = module_dict
# dump agent config
if agent.modules:
if self.flex_config.overwrite_files:
try:
Path(os.path.join(self.flex_config.flex_files_directory,
config_name)).unlink()
except OSError:
pass
with open(
os.path.join(self.flex_config.flex_files_directory, config_name),
"w+",
encoding="utf-8",
) as f:
module_json = agent.model_dump_json(exclude_defaults=True)
f.write(module_json)
else:
logging.error("Provided agent config does not contain any modules.")
[docs] def get_config_file_paths(self) -> list[str]:
"""Return a list of paths with the created config files."""
paths = [
os.path.join(
self.flex_config.flex_files_directory,
self.flex_config.baseline_config_generator_data.
name_of_created_file,
),
os.path.join(
self.flex_config.flex_files_directory,
self.flex_config.shadow_mpc_config_generator_data.pos_flex.
name_of_created_file,
),
os.path.join(
self.flex_config.flex_files_directory,
self.flex_config.shadow_mpc_config_generator_data.neg_flex.
name_of_created_file,
),
os.path.join(
self.flex_config.flex_files_directory,
self.indicator_config.name_of_created_file,
),
]
if self.flex_config.market_config:
paths.append(
os.path.join(
self.flex_config.flex_files_directory,
self.market_config.name_of_created_file,
)
)
return paths
def _delete_created_files(self):
"""Function to run at exit if the files are to be deleted."""
to_be_deleted = self.get_config_file_paths()
to_be_deleted.append(
os.path.join(
self.flex_config.flex_files_directory,
self.flex_config_file_name,
)
)
# delete files
for file in to_be_deleted:
Path(file).unlink()
# also delete folder
Path(self.flex_config.flex_files_directory).rmdir()
[docs] def adapt_mpc_module_config(
self, module_config: MPCConfig, mpc_dataclass: BaseMPCData, agent_id: str
) -> MPCConfig:
"""Adapt the mpc module config for automated flexibility quantification.
Things adapted among others are:
- the file name/path of the mpc config file
- names of the control variables for the shadow mpcs
- reduce communicated variables of shadow mpcs to outputs
- add the power variable to the outputs
- add parameters for the activation and quantification of flexibility
Args:
module_config: The module config to be adapted
mpc_dataclass: The dataclass corresponding to the type of the MPC module.
It contains all the extra data necessary for flexibility
quantification, which will be used to update the
module_config.
agent_id: agent_id for creating the FlexQuant mpc module config
Returns:
The adapted module config
"""
# allow the module config to be changed
module_config.model_config["frozen"] = False
# set new MPC type
module_config.type = mpc_dataclass.module_types[
cmng.get_orig_module_type(self.orig_mpc_agent_config)
]
# set the MPC config type from the MPCConfig in agentlib_mpc to the
# corresponding one in flexquant and add additional fields
module_config_flex_dict = module_config.model_dump()
module_config_flex_dict["casadi_sim_time_step"] = (
self.flex_config.casadi_sim_time_step)
module_config_flex_dict["power_variable_name"] = (
self.flex_config.baseline_config_generator_data.power_variable)
module_config_flex_dict["storage_variable_name"] = (
self.indicator_module_config.correct_costs.stored_energy_variable)
module_config_flex = cmng.MODULE_TYPE_DICT[module_config.type](
**module_config_flex_dict, _agent_id=agent_id
)
# HOTFIX due to AgentLib-MPC bug. Needs to be adapted after Objectives
# in AgentLib-MPC are fixed.
if module_config_flex.r_del_u is None:
module_config_flex = module_config_flex.model_copy(update={"r_del_u": {}})
# allow the module config to be changed
module_config_flex.model_config["frozen"] = False
module_config_flex.module_id = mpc_dataclass.module_id
# set new id (needed for plotting)
module_config_flex.module_id = mpc_dataclass.module_id
# update optimization backend to use the created mpc files and classes
module_config_flex.optimization_backend["model"]["type"] = {
"file": os.path.join(
self.flex_config.flex_files_directory,
mpc_dataclass.created_flex_mpcs_file,
),
"class_name": mpc_dataclass.class_name,
}
# extract filename from results file and update it with
# suffix and parent directory
result_filename = Path(
module_config_flex.optimization_backend["results_file"]
).name.replace(".csv", mpc_dataclass.results_suffix)
full_path = self.flex_config.results_directory / result_filename
module_config_flex.optimization_backend["results_file"] = str(full_path)
# change cia backend to custom backend of flexquant
if module_config_flex.optimization_backend["type"] == "casadi_cia":
module_config_flex.optimization_backend["type"] = \
"agentlib_flexquant.casadi_cia_cons"
if (module_config_flex.optimization_backend["type"] ==
"agentlib_flexquant.casadi_cia_cons"):
module_config_flex.optimization_backend["market_time"] = (
self.flex_config.market_time)
# add the full control trajectory output from the baseline as input for the
# shadow mpcs, they are directly included in the optimization problem
if not isinstance(mpc_dataclass, BaselineMPCData):
for control in module_config_flex.controls:
module_config_flex.inputs.append(
MPCVariable(
name=control.name + glbs.full_trajectory_suffix,
value=None,
type="pd.Series",
)
)
# add full control names to shadow MPC config for inputs tracking
module_config_flex.full_control_names.append(
control.name + glbs.full_trajectory_suffix)
# change the alias of control variable in shadow mpc to
# prevent it from triggering the wrong callback
control.alias = control.name + glbs.shadow_suffix
# also include binary controls
if hasattr(module_config_flex, "binary_controls"):
for control in module_config_flex.binary_controls:
module_config_flex.inputs.append(
MPCVariable(
name=control.name + glbs.full_trajectory_suffix,
value=None,
type="pd.Series",
)
)
# add full control names to shadow MPC config for inputs tracking
module_config_flex.full_control_names.append(
control.name + glbs.full_trajectory_suffix)
# change the alias of control variable in shadow mpc to
# prevent it from triggering the wrong callback
control.alias = control.name + glbs.shadow_suffix
# only communicate outputs for the shadow mpcs
module_config_flex.shared_variable_fields = ["outputs"]
# In addition to creating the full control variables, the inputs
# and states of the Baseline are communicated to the Shadow MPC
# to ensure synchronisation. Therefore, all inputs and states of
# the Baseline are added to the Shadow MPCs with an alias
baseline_names = {inp.name for inp in
self.baseline_mpc_module_config.inputs}
for i, input in enumerate(module_config_flex.inputs):
if input.name in baseline_names:
module_config_flex.inputs[i].alias = (
input.name + glbs.base_vars_to_communicate_suffix)
# add Baseline input names to shadow MPC config for inputs tracking
# if Baseline variable is also set in config_inputs_appendix this is
# due to overwriting the alias, so variable should not be added here
appendix_names = {inp.name for inp in mpc_dataclass.config_inputs_appendix}
module_config_flex.baseline_input_names = [
input.name + glbs.base_vars_to_communicate_suffix for input in
self.baseline_mpc_module_config.inputs
if input.name not in appendix_names]
# add custom input names for the shadow MPC to track. Here, the
# communication suffix is not added, as the user is free to define
# custom inputs as desired.
# Exclude in_provision, as this is not regularly set and would prevent
# the do_step of the shadow MPC.
module_config_flex.custom_input_names.extend([
{"name": input.name, "alias": input.alias}
for input in mpc_dataclass.config_inputs_appendix
if input.name not in [glbs.PROVISION_VAR_NAME]
])
for i, state in enumerate(module_config_flex.states):
if state in self.baseline_mpc_module_config.states:
module_config_flex.states[i].alias = (
state.name + glbs.base_vars_to_communicate_suffix)
# add Baseline state names to shadow MPC config for inputs tracking
module_config_flex.baseline_state_names = [
state.name + glbs.base_vars_to_communicate_suffix for state in
self.baseline_mpc_module_config.states]
module_config_flex.baseline_agent_id = (
self.flex_config.baseline_config_generator_data.agent_id)
else:
# all the variables here are added to the custom MPCConfig of
# FlexQuant to avoid them being added to the optimization problem
# add full_controls trajectory as AgentVariable to the config of
# Baseline mpc
for control in module_config_flex.controls:
module_config_flex.full_controls.append(
AgentVariable(
name=control.name + glbs.full_trajectory_suffix,
alias=control.name + glbs.full_trajectory_suffix,
shared=True,
)
)
if hasattr(module_config_flex, "binary_controls"):
for binary_controls in module_config_flex.binary_controls:
module_config_flex.full_controls.append(
AgentVariable(
name=binary_controls.name + glbs.full_trajectory_suffix,
alias=binary_controls.name + glbs.full_trajectory_suffix,
shared=True,
)
)
# add full controls to custom cia backend to constrain
# during market time
if (module_config_flex.optimization_backend["type"] ==
"agentlib_flexquant.casadi_cia_cons"):
module_config_flex.optimization_backend["full_controls_dict"] = (
dict(zip([var.name for var in module_config_flex.full_controls],
[None] * len(module_config_flex.full_controls))
))
# add input and states copy variables which send the Baseline inputs
# to the shadow MPC
for input in module_config_flex.inputs:
module_config_flex.vars_to_communicate.append(
AgentVariable(
name=input.name + glbs.base_vars_to_communicate_suffix,
alias=input.name + glbs.base_vars_to_communicate_suffix,
shared=True,
)
)
for state in module_config_flex.states:
module_config_flex.vars_to_communicate.append(
AgentVariable(
name=state.name + glbs.base_vars_to_communicate_suffix,
alias=state.name + glbs.base_vars_to_communicate_suffix,
shared=True,
)
)
module_config_flex.set_outputs = True
# add outputs for the power variables, for easier handling create a lookup dict
output_dict = {output.name: output for output in module_config_flex.outputs}
if self.flex_config.baseline_config_generator_data.power_variable in output_dict:
output_dict[
self.flex_config.baseline_config_generator_data.power_variable
].alias = mpc_dataclass.power_alias
else:
module_config_flex.outputs.append(
MPCVariable(
name=self.flex_config.baseline_config_generator_data.power_variable,
alias=mpc_dataclass.power_alias,
)
)
# add or change alias for stored energy variable
if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
output_dict[
self.indicator_module_config.correct_costs.stored_energy_variable
].alias = mpc_dataclass.stored_energy_alias
# add extra inputs needed for activation of flex or custom cost functions
existing_input_names = {inp.name: idx for idx, inp in
enumerate(module_config_flex.inputs)}
for appendix_inp in mpc_dataclass.config_inputs_appendix.copy():
# If variable already exists in the config
if appendix_inp.name in existing_input_names:
self.logger.warning(f"The given variable {appendix_inp.name} in the "
f"config_inputs_appendix already exists in the MPC "
f"model. I am updating the alias of the existing "
f"variable to {appendix_inp.alias} (provided by you). "
f"However, this can still cause issues down the line "
f"if the alias is not chosen wisely.")
# Update only the alias of the existing input
idx = existing_input_names[appendix_inp.name]
existing_inp = module_config_flex.inputs[idx]
inp_dict = existing_inp.dict()
inp_dict["alias"] = appendix_inp.alias
module_config_flex.inputs[idx] = type(existing_inp)(**inp_dict)
# Remove variable from appendix list to avoid creation during parsing
mpc_dataclass.config_inputs_appendix.remove(appendix_inp)
else:
# Add the new input
module_config_flex.inputs.append(appendix_inp)
# add extra parameters needed for activation of flex or custom weights
for var in mpc_dataclass.config_parameters_appendix:
if var.name in self.flex_config.model_fields:
var.value = getattr(self.flex_config, var.name)
if var.name in self.flex_config.baseline_config_generator_data.model_fields:
var.value = getattr(self.flex_config.baseline_config_generator_data,
var.name)
module_config_flex.parameters.extend(mpc_dataclass.config_parameters_appendix)
# freeze the config again
module_config_flex.model_config["frozen"] = True
return module_config_flex
[docs] def adapt_indicator_module_config(
self, module_config: FlexibilityIndicatorModuleConfig
) -> FlexibilityIndicatorModuleConfig:
"""Adapt the indicator module config for automated flexibility
quantification.
"""
# append user-defined price var to indicator module config
module_config.inputs.append(
AgentVariable(
name=module_config.price_variable,
unit="ct/kWh",
type="pd.Series",
description="electricity price",
)
)
module_config.inputs.append(
AgentVariable(
name=module_config.price_variable_feed_in,
unit="ct/kWh",
type="pd.Series",
description="electricity feed-in price",
)
)
# allow the module config to be changed
module_config.model_config["frozen"] = False
for parameter in module_config.parameters:
if parameter.name == glbs.PREP_TIME:
parameter.value = self.flex_config.prep_time
if parameter.name == glbs.MARKET_TIME:
parameter.value = self.flex_config.market_time
if parameter.name == glbs.FLEX_EVENT_DURATION:
parameter.value = self.flex_config.flex_event_duration
if parameter.name == glbs.TIME_STEP:
parameter.value = self.baseline_mpc_module_config.time_step
if parameter.name == glbs.PREDICTION_HORIZON:
parameter.value = self.baseline_mpc_module_config.prediction_horizon
if parameter.name == glbs.COLLOCATION_TIME_GRID:
dis_op = self.baseline_mpc_module_config.optimization_backend[
"discretization_options"
]
parameter.value = self.get_collocation_time_grid(
discretization_options=dis_op
)
# set power unit
module_config.power_unit = (
self.flex_config.baseline_config_generator_data.power_unit)
module_config.results_file = (
self.flex_config.results_directory / module_config.results_file.name
)
module_config.model_config["frozen"] = True
return module_config
[docs] def adapt_market_module_config(
self, module_config: FlexibilityMarketModuleConfig
) -> FlexibilityMarketModuleConfig:
"""Adapt the market module config for automated flexibility quantification."""
# allow the module config to be changed
module_config.model_config["frozen"] = False
for field in module_config.__fields__:
if field in self.market_module_config.__fields__.keys():
module_config.__setattr__(field, getattr(self.market_module_config,
field))
module_config.results_file = (
self.flex_config.results_directory / module_config.results_file.name
)
for parameter in module_config.parameters:
if parameter.name == glbs.COLLOCATION_TIME_GRID:
dis_op = self.baseline_mpc_module_config.optimization_backend[
"discretization_options"
]
parameter.value = self.get_collocation_time_grid(
discretization_options=dis_op
)
if parameter.name == glbs.TIME_STEP:
parameter.value = self.baseline_mpc_module_config.time_step
module_config.model_config["frozen"] = True
return module_config
[docs] def adapt_and_dump_flex_config(self):
"""Update flex_config to reference the newly generated market/indicator agent configs and
dump the updated flex configuration to disk.
This method replaces the market and indicator configuration entries in ``self.flex_config``
with the internally created ``self.market_config`` and ``self.indicator_config``. If a
market configuration is present, its ``agent_config`` attribute is updated to the path of
the newly created market agent config file under ``flex_files_directory``. Likewise, the
indicator configuration's ``agent_config`` attribute is set to the path of the newly
created indicator agent config file. These paths correspond to the new locations of the
market or indicator config files when they were originally provided to the
``FlexAgentGenerator`` as file paths.
After updating these paths, the complete ``flex_config`` is serialized (excluding default
values) and written as JSON to ``flex_files_directory / flex_config_file_name`` so that
subsequent runs can use the resolved configuration directly.
"""
# store market and indicator with file path of created agent config
if self.flex_config.market_config:
self.flex_config.market_config = self.market_config
self.flex_config.market_config.agent_config = os.path.join(
self.flex_config.flex_files_directory,
self.market_config.name_of_created_file)
self.flex_config.indicator_config = self.indicator_config
self.flex_config.indicator_config.agent_config = os.path.join(
self.flex_config.flex_files_directory,
self.indicator_config.name_of_created_file)
# save flex config to created flex files
with open(os.path.join(self.flex_config.flex_files_directory,
self.flex_config_file_name),
"w", encoding="utf-8", ) as f:
config_json = self.flex_config.model_dump_json(exclude_defaults=True)
f.write(config_json)
[docs] def get_collocation_time_grid(self, discretization_options: dict):
"""Get the mpc output collocation grid over the horizon"""
# get the mpc time grid configuration
time_step = self.baseline_mpc_module_config.time_step
prediction_horizon = self.baseline_mpc_module_config.prediction_horizon
# get the collocation configuration
collocation_method = discretization_options["collocation_method"]
collocation_order = discretization_options["collocation_order"]
# get the collocation points
options = CasadiDiscretizationOptions(
collocation_order=collocation_order, collocation_method=collocation_method
)
collocation_points = DirectCollocation(options=
options)._collocation_polynomial().root
# compute the mpc output collocation grid
discretization_points = np.arange(0, time_step * prediction_horizon, time_step)
collocation_time_grid = (
discretization_points[:, None] + collocation_points * time_step
).ravel()
collocation_time_grid = collocation_time_grid[
~np.isin(collocation_time_grid, discretization_points)
]
collocation_time_grid = collocation_time_grid.tolist()
return collocation_time_grid
def _generate_flex_model_definition(self):
"""Generate a python module for negative and positive flexibility agents
from the Baseline MPC model."""
output_file = os.path.join(
self.flex_config.flex_files_directory,
self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
)
opt_backend = self.orig_mpc_module_config.optimization_backend["model"]["type"]
# Extract the config class of the casadi model to check cost functions
config_class = inspect.get_annotations(custom_injection(opt_backend))["config"]
# Get custom module fields provided by the user and add them
model_fields = self.baseline_mpc_module_config.optimization_backend["model"]
_ = model_fields.pop("type")
config_instance = config_class(**model_fields)
# The " + " is just there to simplify the validation, it does not affect
# the generated code
self.check_variables_in_casadi_config(
config_instance,
self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function +
(
" + " + self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix
if self.flex_config.shadow_mpc_config_generator_data.neg_flex.flex_cost_function_appendix else ""),
shadow_mpc_type="neg_flex"
)
self.check_variables_in_casadi_config(
config_instance,
self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function +
(
" + " + self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix
if self.flex_config.shadow_mpc_config_generator_data.pos_flex.flex_cost_function_appendix else ""),
shadow_mpc_type="pos_flex"
)
# parse mpc python file
with open(opt_backend["file"], "r", encoding="utf-8") as f:
source = f.read()
tree = ast.parse(source)
# create modifiers for python file
modifier_base = SetupSystemModifier(
mpc_data=self.flex_config.baseline_config_generator_data,
controls=self.baseline_mpc_module_config.controls,
binary_controls=self.baseline_mpc_module_config.binary_controls
if hasattr(self.baseline_mpc_module_config, "binary_controls")
else None,
)
modifier_pos = SetupSystemModifier(
mpc_data=self.flex_config.shadow_mpc_config_generator_data.pos_flex,
controls=self.pos_flex_mpc_module_config.controls,
binary_controls=self.pos_flex_mpc_module_config.binary_controls
if hasattr(self.pos_flex_mpc_module_config, "binary_controls")
else None,
)
modifier_neg = SetupSystemModifier(
mpc_data=self.flex_config.shadow_mpc_config_generator_data.neg_flex,
controls=self.neg_flex_mpc_module_config.controls,
binary_controls=self.neg_flex_mpc_module_config.binary_controls
if hasattr(self.neg_flex_mpc_module_config, "binary_controls")
else None,
)
# run the modification
modified_tree_base = modifier_base.visit(deepcopy(tree))
modified_tree_pos = modifier_pos.visit(deepcopy(tree))
modified_tree_neg = modifier_neg.visit(deepcopy(tree))
# combine modifications to one file
modified_tree = ast.Module(body=[], type_ignores=[])
modified_tree.body.extend(
modified_tree_base.body + modified_tree_pos.body + modified_tree_neg.body
)
modified_source = astor.to_source(modified_tree)
# Use black to format the generated code
formatted_code = black.format_str(modified_source, mode=black.FileMode())
if self.flex_config.overwrite_files:
try:
Path(
os.path.join(
self.flex_config.flex_files_directory,
self.flex_config.baseline_config_generator_data.created_flex_mpcs_file,
)
).unlink()
except OSError:
pass
with open(output_file, "w", encoding="utf-8") as f:
f.write(formatted_code)
[docs] def check_variables_in_casadi_config(self, config: CasadiModelConfig, expr: str,
shadow_mpc_type: str):
"""Check if all variables in the expression are defined in the config.
Args:
config: casadi model config.
expr: The expression to check.
Raises:
ValueError: If any variable in the expression is not defined in the config.
"""
variables_in_config = set(config.get_variable_names())
variables_in_cost_function = set(ast.walk(ast.parse(expr)))
variables_in_cost_function = {
node.attr for node in variables_in_cost_function
if isinstance(node, ast.Attribute)
}
flex_config_data = (self.flex_config.shadow_mpc_config_generator_data.pos_flex
if shadow_mpc_type == "pos_flex"
else self.flex_config.shadow_mpc_config_generator_data.neg_flex)
variables_newly_created = set(
[par.name for par in flex_config_data.config_parameters_appendix] +
[inp.name for inp in flex_config_data.config_inputs_appendix]
)
unknown_vars = (variables_in_cost_function - variables_in_config -
variables_newly_created)
if unknown_vars:
self.logger.warning(f"Unknown variables in new cost function: "
f"{unknown_vars}. This might cause problems with "
f"the optimization backend.")
[docs] def run_config_validations(self):
"""Function to validate integrity of user-supplied flex config.
Since the validation depends on interactions between multiple configurations,
it is performed within this function rather than using Pydantic’s built-in
validators for individual configurations.
The following checks are performed:
1. Ensures the specified power variable exists in the MPC model outputs.
2. Ensures the specified comfort variable exists in the MPC model states.
3. Validates that the stored energy variable exists in MPC outputs if
energy cost correction is enabled.
4. Verifies the supported collocation method is used; otherwise,
switches to 'legendre' and raises a warning.
5. Ensures that the sum of prep time, market time, and flex event duration
does not exceed the prediction horizon.
6. Ensures market time equals the MPC model time step if market config is
present.
7. Ensures that all flex time values are multiples of the MPC model time step.
8. Checks for mismatches between time-related parameters in the flex/MPC and
indicator configs and issues warnings when discrepancies exist, using the
flex/MPC config values as the source of truth.
"""
# check if the power variable exists in the mpc config
power_var = self.flex_config.baseline_config_generator_data.power_variable
if power_var not in [output.name for output in
self.baseline_mpc_module_config.outputs]:
raise ConfigurationError(
f"Given power variable {power_var} is not defined "
f"as output in baseline mpc config."
)
# check if the comfort variable exists in the mpc slack variables
mod_type = self.baseline_mpc_module_config.optimization_backend["model"]["type"]
if self.flex_config.baseline_config_generator_data.comfort_variable:
file_path = mod_type["file"]
class_name = mod_type["class_name"]
# Get the class
dynamic_class = cmng.get_class_from_file(file_path, class_name)
if self.flex_config.baseline_config_generator_data.comfort_variable not in [
state.name for state in dynamic_class().states
]:
raise ConfigurationError(
f"Given comfort variable "
f"{self.flex_config.baseline_config_generator_data.comfort_variable} "
f"is not defined as state in baseline mpc config."
)
# check if the energy storage variable exists in the mpc config
if self.indicator_module_config.correct_costs.enable_energy_costs_correction:
if self.indicator_module_config.correct_costs.stored_energy_variable not in [
output.name for output in self.baseline_mpc_module_config.outputs
]:
raise ConfigurationError(
f"The stored energy variable "
f"{self.indicator_module_config.correct_costs.stored_energy_variable} "
f"is not defined in baseline mpc config. "
f"It must be defined in the base MPC model and config as output "
f"if the correction of costs is enabled."
)
# raise warning if unsupported collocation method is used and change
# to supported method
if (
"collocation_method"
not in self.baseline_mpc_module_config.optimization_backend[
"discretization_options"]
):
raise ConfigurationError(
"Please use collocation as discretization method and define the "
"collocation_method in the mpc config"
)
else:
collocation_method = self.baseline_mpc_module_config.optimization_backend[
"discretization_options"
]["collocation_method"]
if collocation_method != "legendre":
self.logger.warning(
"Collocation method %s is not supported. Switching to "
"method legendre.",
collocation_method,
)
self.baseline_mpc_module_config.optimization_backend[
"discretization_options"][
"collocation_method"
] = "legendre"
self.pos_flex_mpc_module_config.optimization_backend[
"discretization_options"][
"collocation_method"
] = "legendre"
self.neg_flex_mpc_module_config.optimization_backend[
"discretization_options"][
"collocation_method"
] = "legendre"
# time data validations
flex_times = {
glbs.PREP_TIME: self.flex_config.prep_time,
glbs.MARKET_TIME: self.flex_config.market_time,
glbs.FLEX_EVENT_DURATION: self.flex_config.flex_event_duration,
}
mpc_times = {
glbs.TIME_STEP: self.baseline_mpc_module_config.time_step,
glbs.PREDICTION_HORIZON: self.baseline_mpc_module_config.prediction_horizon,
}
# total time length check (prep+market+flex_event)
if (sum(flex_times.values()) > mpc_times["time_step"] *
mpc_times["prediction_horizon"]):
raise ConfigurationError(
"Market time + prep time + flex event duration "
"can not exceed the prediction horizon."
)
# market time val check
if self.flex_config.market_config:
if flex_times["market_time"] % mpc_times["time_step"] != 0:
raise ConfigurationError(
"Market time must be an integer multiple of the time step."
)
# check for divisibility of flex_times by time_step
for name, value in flex_times.items():
if value % mpc_times["time_step"] != 0:
raise ConfigurationError(
f"{name} is not a multiple of the time step. Please redefine."
)
# raise warning if parameter value in flex indicator module config differs from
# value in flex config/ baseline mpc module config
for parameter in self.indicator_module_config.parameters:
if parameter.value is not None:
if parameter.name in flex_times:
flex_value = flex_times[parameter.name]
if parameter.value != flex_value:
self.logger.warning(
"Value mismatch for %s in flex config (field) "
"and indicator module config (parameter). "
"Flex config value will be used.",
parameter.name,
)
elif parameter.name in mpc_times:
mpc_value = mpc_times[parameter.name]
if parameter.value != mpc_value:
self.logger.warning(
"Value mismatch for %s in baseline MPC module "
"config (field) and indicator module config (parameter). "
"Baseline MPC module config value will be used.",
parameter.name,
)
[docs] def adapt_sim_results_path(self, simulator_agent_config: Union[str, Path],
save_name_suffix: str = "") -> Union[str, Path]:
"""
Optional helper function to adapt file path for simulator results in sim config,
so that sim results land in the same results directory as flex results.
Args:
simulator_agent_config: Path to the simulator agent config JSON file.
save_name_suffix: Suffix added to the newly created sim_config file.
Returns:
The updated simulator config dictionary with the modified result file path.
Raises:
FileNotFoundError: If the specified config file does not exist.
"""
simulator_agent_config = Path(simulator_agent_config)
# open config and extract sim module
with open(simulator_agent_config, "r", encoding="utf-8") as f:
sim_config = json.load(f)
sim_module_config = next(
(module for module in sim_config["modules"] if
module["type"] == "simulator"), None)
# convert filename string to path and extract the name
sim_file_name = Path(sim_module_config["result_filename"]).name
# set results path so that sim results lands in same directory
# as flex result CSVs
sim_module_config["result_filename"] = str(
self.flex_config.results_directory / sim_file_name
)
try:
with open(Path(str(simulator_agent_config.parent) + "\\" +
str(simulator_agent_config.stem) + save_name_suffix + ".json"),
"w", encoding="utf-8") as f:
json.dump(sim_config, f, indent=4)
return Path(str(simulator_agent_config.parent) + "\\" +
str(simulator_agent_config.stem) + save_name_suffix + ".json")
except Exception as e:
raise Exception(f"Could not adapt and create a new simulation config "
f"due to: {e}. "
f"Please check {simulator_agent_config} and "
f"'{save_name_suffix}'")