Coverage for agentlib_flexquant/modules/shadow_mpc.py: 68%
247 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1"""
2Defines shadow MPC and MINLP-MPC for positive/negative flexibility quantification.
3"""
4from typing import Dict, Union
6import os
7import math
8import numpy as np
9import pandas as pd
10from pydantic import Field
11from typing import Dict, Union, Optional
12from collections.abc import Iterable
13from agentlib.core.datamodels import AgentVariable
14from agentlib_mpc.modules import mpc_full, minlp_mpc
15from agentlib_flexquant.utils.data_handling import fill_nans, MEAN
16from agentlib_flexquant.data_structures.globals import (
17 full_trajectory_prefix,
18 full_trajectory_suffix,
19 STORED_ENERGY_ALIAS_NEG,
20 STORED_ENERGY_ALIAS_POS,
21)
24class FlexibilityShadowMPCConfig(mpc_full.MPCConfig):
26 casadi_sim_time_step: int = Field(
27 default=0,
28 description="Time step for simulation with Casadi simulator. Value is read from "
29 "FlexQuantConfig",
30 )
31 power_variable_name: str = Field(
32 default=None, description="Name of the power variable in the shadow mpc model."
33 )
34 storage_variable_name: Optional[str] = Field(
35 default=None, description="Name of the storage variable in the shadow mpc model."
36 )
39class FlexibilityShadowMPC(mpc_full.MPC):
40 """Shadow MPC for calculating positive/negative flexibility offers."""
42 config: FlexibilityShadowMPCConfig
44 def __init__(self, *args, **kwargs):
45 # create instance variable
46 self._full_controls: Dict[str, Union[AgentVariable, None]] = {}
47 # initialize flex_results with None
48 self.flex_results = None
49 super().__init__(*args, **kwargs)
50 # set up necessary components if simulation is enabled
51 if self.config.casadi_sim_time_step > 0:
52 # generate a separate flex_model for integration to ensure the model used in MPC
53 # optimization remains unaffected
54 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
55 # generate the filename for the simulation results
56 self.res_file_flex = self.config.optimization_backend["results_file"].replace(
57 "mpc", "mpc_sim"
58 )
59 # clear the casadi simulator result at the first time step if already exists
60 try:
61 os.remove(self.res_file_flex)
62 except:
63 pass
65 def set_output(self, solution):
66 """Takes the solution from optimization backend and sends it to AgentVariables."""
67 # Output must be defined in the config as "type"="pd.Series"
68 if not self.config.set_outputs:
69 return
70 self.logger.info("Sending optimal output values to data_broker.")
71 df = solution.df
72 self.sim_flex_model(solution)
73 if self.flex_results is not None:
74 for output in self.var_ref.outputs:
75 if output not in [
76 self.config.power_variable_name,
77 self.config.storage_variable_name,
78 ]:
79 series = df.variable[output]
80 self.set(output, series)
81 # send the power and storage variable value from simulation results
82 upsampled_output_power = self.flex_results[self.config.power_variable_name]
83 self.set(self.config.power_variable_name, upsampled_output_power)
84 if self.config.storage_variable_name is not None:
85 upsampled_output_storage = self.flex_results[self.config.storage_variable_name]
86 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna())
87 else:
88 for output in self.var_ref.outputs:
89 series = df.variable[output]
90 self.set(output, series)
92 def sim_flex_model(self, solution):
93 """simulate the flex model over the preditcion horizon and save results"""
95 # return if sim_time_step is not a positive integer and system is in provision
96 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value):
97 return
99 # read the defined simulation time step
100 sim_time_step = self.config.casadi_sim_time_step
101 mpc_time_step = self.config.time_step
103 # set the horizon length and the number of simulation steps
104 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
105 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
107 # read the current optimization result
108 result_df = solution.df
110 # initialize the flex sim results Dataframe
111 self._initialize_flex_results(
112 n_simulation_steps, total_horizon_time, sim_time_step, result_df
113 )
115 # Update model parameters and initial states
116 self._update_model_parameters()
117 self._update_initial_states(result_df)
119 # Run simulation
120 self._run_simulation(
121 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
122 )
124 # set index of flex results to the same as mpc result
125 store_results_df = self.flex_results.copy(deep=True)
126 store_results_df.index = self.flex_results.index.tolist()
128 # save results
129 if not os.path.exists(self.res_file_flex):
130 store_results_df.to_csv(self.res_file_flex)
131 else:
132 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
134 # set the flex results format same as mpc result while updating Agentvariable
135 self.flex_results.index = self.flex_results.index.get_level_values(1)
137 def register_callbacks(self):
138 for control_var in self.config.controls:
139 self.agent.data_broker.register_callback(
140 name=f"{control_var.name+full_trajectory_suffix}",
141 alias=f"{control_var.name+full_trajectory_suffix}",
142 callback=self.calc_flex_callback,
143 )
144 for input_var in self.config.inputs:
145 adapted_name = input_var.name.replace(full_trajectory_suffix, "")
146 if adapted_name in [control_var.name for control_var in self.config.controls]:
147 self._full_controls[input_var.name] = input_var
149 super().register_callbacks()
151 def calc_flex_callback(self, inp: AgentVariable, name: str):
152 """Set the control trajectories before calculating the flexibility offer.
154 self.model should account for flexibility in its cost function.
156 """
157 # during provision dont calculate flex
158 if self.get("in_provision").value:
159 return
161 # do not trigger callback on self set variables
162 if self.agent.config.id == inp.source.agent_id:
163 return
165 # get the value of the input
166 vals = inp.value
167 if vals.isna().any():
168 vals = fill_nans(series=vals, method=MEAN)
169 # add time shift env.now to the mpc prediction index if it starts at t=0
170 if vals.index[0] == 0:
171 vals.index += self.env.time
172 # update value in the mapping dictionary
173 self._full_controls[name].value = vals
174 # make sure all controls are set
175 if all(x.value is not None for x in self._full_controls.values()):
176 self.do_step()
177 for _, control_var in self._full_controls.items():
178 control_var.value = None
180 def process(self):
181 # the shadow mpc should only be run after the results of the baseline are sent
182 yield self.env.event()
184 def _initialize_flex_results(
185 self, n_simulation_steps, horizon_length, sim_time_step, result_df
186 ):
187 """Initialize the flex results dataframe with the correct dimension and index and fill with
188 existing results from optimization"""
190 # create MultiIndex for collocation points
191 index_coll = pd.MultiIndex.from_arrays(
192 [[self.env.now] * len(result_df.index), result_df.index],
193 names=["time_step", "time"]
194 # Match the names with multi_index but note they're reversed
195 )
196 # create Multiindex for full simulation sample times
197 index_full_sample = pd.MultiIndex.from_tuples(
198 zip(
199 [self.env.now] * (n_simulation_steps + 1),
200 range(0, horizon_length + sim_time_step, sim_time_step),
201 ),
202 names=["time_step", "time"],
203 )
204 # merge indexes
205 new_index = index_coll.union(index_full_sample).sort_values()
206 # initialize the flex results with correct dimension
207 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs)
209 # Get the optimization outputs and create a series for fixed optimization outputs with the
210 # correct MultiIndex format
211 opti_outputs = result_df.variable[self.config.power_variable_name]
212 fixed_opti_output = pd.Series(
213 opti_outputs.values,
214 index=index_coll,
215 )
216 # fill the output value at the time step where it already exists in optimization output
217 for idx in fixed_opti_output.index:
218 if idx in self.flex_results.index:
219 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx]
221 def _update_model_parameters(self):
222 """update the value of module parameters with value from config,
223 since creating a model just reads the value in the model class but not the config
224 """
226 for par in self.config.parameters:
227 self.flex_model.set(par.name, par.value)
229 def _update_initial_states(self, result_df):
230 """set the initial value of states"""
232 # get state values from the mpc optimization result
233 state_values = result_df.variable[self.var_ref.states]
234 # update state values with last measurement
235 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
236 self.flex_model.set(state, value)
238 def _run_simulation(
239 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
240 ):
241 """simulate with flex model over the prediction horizon"""
243 # get control and input values from the mpc optimization result
244 control_values = result_df.variable[self.var_ref.controls].dropna()
245 input_values = result_df.parameter[self.var_ref.inputs].dropna()
247 # Get the simulation time step index
248 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step)
250 # Reindex the controls and inputs to sim_time_index
251 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill")
252 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest")
254 for i in range(0, n_simulation_steps):
255 current_sim_time = i * sim_time_step
257 # Apply control and input values from the appropriate MPC step
258 for control, value in zip(
259 self.var_ref.controls, control_values_full.loc[current_sim_time]
260 ):
261 self.flex_model.set(control, value)
263 for input_var, value in zip(
264 self.var_ref.inputs, input_values_full.loc[current_sim_time]
265 ):
266 # change the type of iterable input, since casadi model can't deal with iterable
267 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
268 self.flex_model.get(input_var).type = type(value).__name__
269 self.flex_model.set(input_var, value)
271 # do integration
272 # reduce the simulation time step so that the total horizon time will not be exceeded
273 if current_sim_time + sim_time_step <= total_horizon_time:
274 t_sample = sim_time_step
275 else:
276 t_sample = total_horizon_time - current_sim_time
277 self.flex_model.do_step(t_start=0, t_sample=t_sample)
279 # save output
280 for output in self.var_ref.outputs:
281 self.flex_results.loc[
282 (self.env.now, current_sim_time + t_sample), output
283 ] = self.flex_model.get_output(output).value
286class FlexibilityShadowMINLPMPCConfig(minlp_mpc.MINLPMPCConfig):
288 casadi_sim_time_step: int = Field(
289 default=0,
290 description="Time step for simulation with Casadi simulator. Value is read from "
291 "FlexQuantConfig",
292 )
293 power_variable_name: str = Field(
294 default=None, description="Name of the power variable in the shadow mpc model."
295 )
296 storage_variable_name: Optional[str] = Field(
297 default=None, description="Name of the storage variable in the shadow mpc model."
298 )
301class FlexibilityShadowMINLPMPC(minlp_mpc.MINLPMPC):
302 """Shadow MINLP-MPC for calculating positive/negatives flexibility offers."""
304 config: FlexibilityShadowMINLPMPCConfig
306 def __init__(self, *args, **kwargs):
307 # create instance variable
308 self._full_controls: Dict[str, Union[AgentVariable, None]] = {}
309 # initialize flex_results with None
310 self.flex_results = None
311 super().__init__(*args, **kwargs)
312 # set up necessary components if simulation is enabled
313 if self.config.casadi_sim_time_step > 0:
314 # generate a separate flex_model for integration to ensure the model used in MPC
315 # optimization remains unaffected
316 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
317 # generate the filename for the simulation results
318 self.res_file_flex = self.config.optimization_backend["results_file"].replace(
319 "mpc", "mpc_sim"
320 )
321 # clear the casadi simulator result at the first time step if already exists
322 try:
323 os.remove(self.res_file_flex)
324 except:
325 pass
327 def register_callbacks(self):
328 for control_var in self.config.controls + self.config.binary_controls:
329 self.agent.data_broker.register_callback(
330 name=f"{control_var.name}{full_trajectory_suffix}",
331 alias=f"{control_var.name}{full_trajectory_suffix}",
332 callback=self.calc_flex_callback,
333 )
334 for input_var in self.config.inputs:
335 adapted_name = input_var.name.replace(full_trajectory_suffix, "")
336 if adapted_name in [
337 control_var.name
338 for control_var in self.config.controls + self.config.binary_controls
339 ]:
340 self._full_controls[input_var.name] = input_var
342 super().register_callbacks()
344 def calc_flex_callback(self, inp: AgentVariable, name: str):
345 """Set the control trajectories before calculating the flexibility offer.
347 self.model should account for flexibility in its cost function
349 """
350 # during provision dont calculate flex
351 if self.get("in_provision").value:
352 return
354 # do not trigger callback on self set variables
355 if self.agent.config.id == inp.source.agent_id:
356 return
358 # get the value of the input
359 vals = inp.value
360 if vals.isna().any():
361 vals = fill_nans(series=vals, method=MEAN)
362 # add time shift env.now to the mpc prediction index if it starts at t=0
363 if vals.index[0] == 0:
364 vals.index += self.env.time
365 # update value in the mapping dictionary
366 self._full_controls[name].value = vals
367 # update the value of the variable in the model if we want to limit the binary control in
368 # the market time during optimization
369 self.model.set(name, vals)
370 # make sure all controls are set
371 if all(x.value is not None for x in self._full_controls.values()):
372 self.do_step()
373 for _, control_var in self._full_controls.items():
374 control_var.value = None
376 def process(self):
377 # the shadow mpc should only be run after the results of the baseline are sent
378 yield self.env.event()
380 def set_output(self, solution):
381 """Takes the solution from optimization backend and sends it to AgentVariables."""
382 # Output must be defined in the config as "type"="pd.Series"
383 if not self.config.set_outputs:
384 return
385 self.logger.info("Sending optimal output values to data_broker.")
387 # simulate with the casadi simulator
388 self.sim_flex_model(solution)
390 df = solution.df
391 if self.flex_results is not None:
392 for output in self.var_ref.outputs:
393 if output not in [
394 self.config.power_variable_name,
395 self.config.storage_variable_name,
396 ]:
397 series = df.variable[output]
398 self.set(output, series)
399 # send the power and storage variable value from simulation results
400 upsampled_output_power = self.flex_results[self.config.power_variable_name]
401 self.set(self.config.power_variable_name, upsampled_output_power)
402 if self.config.storage_variable_name is not None:
403 upsampled_output_storage = self.flex_results[self.config.storage_variable_name]
404 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna())
405 else:
406 for output in self.var_ref.outputs:
407 series = df.variable[output]
408 self.set(output, series)
410 def sim_flex_model(self, solution):
411 """simulate the flex model over the preditcion horizon and save results"""
413 # return if sim_time_step is not a positive integer and system is in provision
414 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value):
415 return
417 # read the defined simulation time step
418 sim_time_step = self.config.casadi_sim_time_step
419 mpc_time_step = self.config.time_step
421 # set the horizon length and the number of simulation steps
422 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
423 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
425 # read the current optimization result
426 result_df = solution.df
428 # initialize the flex sim results Dataframe
429 self._initialize_flex_results(
430 n_simulation_steps, total_horizon_time, sim_time_step, result_df
431 )
433 # Update model parameters and initial states
434 self._update_model_parameters()
435 self._update_initial_states(result_df)
437 # Run simulation
438 self._run_simulation(
439 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
440 )
442 # set index of flex results to the same as mpc result
443 store_results_df = self.flex_results.copy(deep=True)
444 store_results_df.index = self.flex_results.index.tolist()
446 # save results
447 if not os.path.exists(self.res_file_flex):
448 store_results_df.to_csv(self.res_file_flex)
449 else:
450 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
452 # set the flex results format same as mpc result while updating Agentvariable
453 self.flex_results.index = self.flex_results.index.get_level_values(1)
455 def _initialize_flex_results(
456 self, n_simulation_steps, horizon_length, sim_time_step, result_df
457 ):
458 """Initialize the flex results dataframe with the correct dimension and index and fill with
459 existing results from optimization"""
461 # create MultiIndex for collocation points
462 index_coll = pd.MultiIndex.from_arrays(
463 [[self.env.now] * len(result_df.index), result_df.index],
464 names=["time_step", "time"]
465 # Match the names with multi_index but note they're reversed
466 )
467 # create Multiindex for full simulation sample times
468 index_full_sample = pd.MultiIndex.from_tuples(
469 zip(
470 [self.env.now] * (n_simulation_steps + 1),
471 range(0, horizon_length + sim_time_step, sim_time_step),
472 ),
473 names=["time_step", "time"],
474 )
475 # merge indexes
476 new_index = index_coll.union(index_full_sample).sort_values()
477 # initialize the flex results with correct dimension
478 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs)
480 # Get the optimization outputs and create a series for fixed optimization outputs with the
481 # correct MultiIndex format
482 opti_outputs = result_df.variable[self.config.power_variable_name]
483 fixed_opti_output = pd.Series(
484 opti_outputs.values,
485 index=index_coll,
486 )
487 # fill the output value at the time step where it already exists in optimization output
488 for idx in fixed_opti_output.index:
489 if idx in self.flex_results.index:
490 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx]
492 def _update_model_parameters(self):
493 """update the value of module parameters with value from config,
494 since creating a model just reads the value in the model class but not the config
495 """
497 for par in self.config.parameters:
498 self.flex_model.set(par.name, par.value)
500 def _update_initial_states(self, result_df):
501 """set the initial value of states"""
503 # get state values from the mpc optimization result
504 state_values = result_df.variable[self.var_ref.states]
505 # update state values with last measurement
506 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
507 self.flex_model.set(state, value)
509 def _run_simulation(
510 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
511 ):
512 """simulate with flex model over the prediction horizon"""
514 # get control and input values from the mpc optimization result
515 control_values = result_df.variable[
516 [*self.var_ref.controls, *self.var_ref.binary_controls]
517 ].dropna()
518 input_values = result_df.parameter[self.var_ref.inputs].dropna()
520 # Get the simulation time step index
521 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step)
523 # Reindex the controls and inputs to sim_time_index
524 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill")
525 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest")
527 for i in range(0, n_simulation_steps):
528 current_sim_time = i * sim_time_step
530 # Apply control and input values from the appropriate MPC step
531 for control, value in zip(
532 self.var_ref.controls,
533 control_values_full.loc[current_sim_time, self.var_ref.controls],
534 ):
535 self.flex_model.set(control, value)
537 for binary_control, value in zip(
538 self.var_ref.binary_controls,
539 control_values_full.loc[current_sim_time, self.var_ref.binary_controls],
540 ):
541 self.flex_model.set(binary_control, value)
543 for input_var, value in zip(
544 self.var_ref.inputs, input_values_full.loc[current_sim_time]
545 ):
546 # change the type of iterable input, since casadi model can't deal with iterable
547 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
548 self.flex_model.get(input_var).type = type(value).__name__
549 self.flex_model.set(input_var, value)
551 # do integration
552 # reduce the simulation time step so that the total horizon time will not be exceeded
553 if current_sim_time + sim_time_step <= total_horizon_time:
554 t_sample = sim_time_step
555 else:
556 t_sample = total_horizon_time - current_sim_time
557 self.flex_model.do_step(t_start=0, t_sample=t_sample)
559 # save output
560 for output in self.var_ref.outputs:
561 self.flex_results.loc[
562 (self.env.now, current_sim_time + t_sample), output
563 ] = self.flex_model.get_output(output).value