Coverage for agentlib_flexquant/modules/baseline_mpc.py: 69%
264 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
1"""
2Defines MPC and MINLP-MPC for baseline flexibility quantification.
3"""
4import os
5import math
6import numpy as np
7import pandas as pd
8from typing import Optional, Dict
9from pydantic import Field
10from collections.abc import Iterable
11import agentlib_flexquant.data_structures.globals as glbs
12from agentlib import AgentVariable
13from agentlib_mpc.modules.mpc import mpc_full, minlp_mpc
14from agentlib_mpc.data_structures.mpc_datamodels import Results, InitStatus
15from agentlib_flexquant.data_structures.globals import (full_trajectory_suffix,
16 base_vars_to_communicate_suffix)
19class FlexibilityBaselineMPCConfig(mpc_full.MPCConfig):
20 # define an AgentVariable list for the full control trajectory
21 full_controls: list[AgentVariable] = Field(default=[])
23 # define an AgentVariable list for the variables to communicate to the shadow MPCs
24 vars_to_communicate: list[AgentVariable] = Field(default=[])
26 casadi_sim_time_step: int = Field(
27 default=0,
28 description="Time step for simulation with Casadi"
29 "simulator. Value is read from "
30 "FlexQuantConfig",
31 )
32 power_variable_name: str = Field(
33 default=None, description="Name of the power variable in the "
34 "baseline mpc model."
35 )
36 storage_variable_name: Optional[str] = Field(
37 default=None, description="Name of the storage variable in the "
38 "baseline mpc model."
39 )
42class FlexibilityBaselineMPC(mpc_full.MPC):
43 """MPC for baseline flexibility quantification."""
45 config: FlexibilityBaselineMPCConfig
47 def __init__(self, config, agent):
48 super().__init__(config, agent)
49 # initialize a control mapping dictionary which maps the names of the
50 # incoming AgentVariables (with suffix) to the names without suffix
51 self._controls_name_mapping: Dict[str, str] = {}
52 self._vars_to_com_name_mapping: Dict[str, str] = {}
54 for full_control in self.config.full_controls:
55 # fill the mapping dictionary
56 self._controls_name_mapping[full_control.name] = full_control.name.replace(
57 full_trajectory_suffix, ""
58 )
60 for vars_to_com in self.config.vars_to_communicate:
61 # fill the mapping dictionary
62 self._vars_to_com_name_mapping[vars_to_com.name] = vars_to_com.name.replace(
63 base_vars_to_communicate_suffix, ""
64 )
66 # initialize flex_results with None
67 self.flex_results = None
68 # set up necessary components if simulation is enabled
69 if self.config.casadi_sim_time_step > 0:
70 # generate a separate flex_model for integration to ensure
71 # the model used in MPC optimization remains unaffected
72 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
73 # generate the filename for the simulation results
74 self.res_file_flex = self.config.optimization_backend["results_file"].replace(
75 "_base", "_sim_base"
76 )
77 # clear the casadi simulator result at the first time step if already exists
78 try:
79 os.remove(self.res_file_flex)
80 except FileNotFoundError:
81 pass
83 def do_step(self):
84 """
85 Performs an MPC step.
86 """
87 if not self.init_status == InitStatus.ready:
88 self.logger.warning("Skipping step, optimization_backend is not ready.")
89 return
91 self.pre_computation_hook()
93 # get new values from data_broker
94 updated_vars = self.collect_variables_for_optimization()
96 # solve optimization problem with up-to-date values from data_broker
97 result = self.optimization_backend.solve(self.env.time, updated_vars)
99 # Set variables in data_broker
100 self.set_actuation(result)
101 # Set variables, so that shadow MPCs are initialized with the
102 # same values as the Baseline
103 self.set_vars_for_shadow(result)
104 self.set_output(result)
105 self._remove_old_values_from_history()
107 def pre_computation_hook(self):
108 """Calculate relative start and end times for flexibility provision.
110 When in provision mode, computes the relative timing for flexibility
111 events based on the external power profile timestamps and current
112 environment time.
113 """
114 if self.get(glbs.PROVISION_VAR_NAME).value:
115 self.set(glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
116 self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[0] -
117 self.env.time)
118 # the provision profile gives a value for the start of a time step.
119 # For the end of the flex interval add time step!
120 self.set(glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
121 self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[-1] -
122 self.env.time)
124 def set_vars_for_shadow(self, solution):
125 """Sets the variables of the Baseline MPC needed by the shadow MPCs
126 with a predefined suffix.
127 This essentially sends the same inputs and states the Baseline used
128 for optimization to the Shadow MPC, ensuring synchronisation.
129 """
130 for vars_to_com in self.config.vars_to_communicate:
131 vars_name = self._vars_to_com_name_mapping[vars_to_com.name]
132 if vars_name in solution.df.variable:
133 vars_value = solution.df.variable[vars_name]
134 else: # parameter
135 vars_value = solution.df.parameter[vars_name]
136 self.set(vars_to_com.name, vars_value)
138 def set_actuation(self, solution: Results):
139 super().set_actuation(solution)
140 for full_control in self.config.full_controls:
141 # get the corresponding control name
142 control = self._controls_name_mapping[full_control.name]
143 # set value to full_control
144 self.set(full_control.name, solution.df.variable[control].ffill())
146 def set_output(self, solution):
147 """Takes the solution from optimization backend and sends it
148 to AgentVariables.
150 """
151 # Output must be defined in the config as "type"="pd.Series"
152 if not self.config.set_outputs:
153 return
154 self.logger.info("Sending optimal output values to data_broker.")
156 # simulate with the casadi simulator
157 self.sim_flex_model(solution)
159 # extract solution DataFrama
160 df = solution.df
162 # send the outputs
163 if self.flex_results is not None:
164 for output in self.var_ref.outputs:
165 if output not in [
166 self.config.power_variable_name,
167 self.config.storage_variable_name,
168 ]:
169 series = df.variable[output]
170 self.set(output, series)
171 # send the power and storage variable value from simulation results
172 upsampled_output_power = self.flex_results[self.config.power_variable_name]
173 self.set(self.config.power_variable_name, upsampled_output_power)
174 if self.config.storage_variable_name is not None:
175 upsampled_output_storage = (
176 self.flex_results)[self.config.storage_variable_name]
177 self.set(self.config.storage_variable_name,
178 upsampled_output_storage.dropna())
179 else:
180 for output in self.var_ref.outputs:
181 series = df.variable[output]
182 self.set(output, series)
184 def sim_flex_model(self, solution):
185 """simulate the flex model over the preditcion horizon and save results
187 """
189 # return if sim_time_step is not a positive integer and system is in provision
190 if not (self.config.casadi_sim_time_step > 0 and not
191 self.get(glbs.PROVISION_VAR_NAME).value):
192 return
194 # read the defined simulation time step
195 sim_time_step = self.config.casadi_sim_time_step
196 mpc_time_step = self.config.time_step
198 # set the horizon length and the number of simulation steps
199 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
200 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
202 # read the current optimization result
203 result_df = solution.df
205 # initialize the flex sim results Dataframe
206 self._initialize_flex_results(
207 n_simulation_steps, total_horizon_time, sim_time_step, result_df
208 )
210 # Update model parameters and initial states
211 self._update_model_parameters()
212 self._update_initial_states(result_df)
214 # Run simulation
215 self._run_simulation(
216 n_simulation_steps, sim_time_step, mpc_time_step, result_df,
217 total_horizon_time
218 )
220 # set index of flex results to the same as mpc result
221 store_results_df = self.flex_results.copy(deep=True)
222 store_results_df.index = self.flex_results.index.tolist()
224 # save results
225 if not os.path.exists(self.res_file_flex):
226 store_results_df.to_csv(self.res_file_flex)
227 else:
228 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
230 # set the flex results format same as mpc result while updating Agentvariable
231 self.flex_results.index = self.flex_results.index.get_level_values(1)
233 def _initialize_flex_results(
234 self, n_simulation_steps, horizon_length, sim_time_step, result_df
235 ):
236 """Initialize the flex results dataframe with the correct dimension and index
237 and fill with existing results from optimization
239 """
241 # create MultiIndex for collocation points
242 index_coll = pd.MultiIndex.from_arrays(
243 [[self.env.now] * len(result_df.index), result_df.index],
244 names=["time_step", "time"]
245 # Match the names with multi_index but note they're reversed
246 )
247 # create Multiindex for full simulation sample times
248 index_full_sample = pd.MultiIndex.from_tuples(
249 zip(
250 [self.env.now] * (n_simulation_steps + 1),
251 range(0, horizon_length + sim_time_step, sim_time_step),
252 ),
253 names=["time_step", "time"],
254 )
255 # merge indexes
256 new_index = index_coll.union(index_full_sample).sort_values()
257 # initialize the flex results with correct dimension
258 self.flex_results = pd.DataFrame(np.nan, index=new_index,
259 columns=self.var_ref.outputs)
261 # Get the optimization outputs and create a series for fixed
262 # optimization outputs with the correct MultiIndex format
263 opti_outputs = result_df.variable[self.config.power_variable_name]
264 fixed_opti_output = pd.Series(
265 opti_outputs.values,
266 index=index_coll,
267 )
268 # fill the output value at the time step where it already exists
269 # in optimization output
270 for idx in fixed_opti_output.index:
271 if idx in self.flex_results.index:
272 self.flex_results.loc[idx, self.config.power_variable_name] = (
273 fixed_opti_output)[idx]
275 def _update_model_parameters(self):
276 """update the value of module parameters with value from config,
277 since creating a model just reads the value in the model class but
278 not the config
280 """
282 for par in self.config.parameters:
283 self.flex_model.set(par.name, par.value)
285 def _update_initial_states(self, result_df):
286 """set the initial value of states"""
288 # get state values from the mpc optimization result
289 state_values = result_df.variable[self.var_ref.states]
290 # update state values with last measurement
291 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
292 self.flex_model.set(state, value)
294 def _run_simulation(
295 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df,
296 total_horizon_time
297 ):
298 """simulate with flex model over the prediction horizon
300 """
302 # get control and input values from the mpc optimization result
303 control_values = result_df.variable[self.var_ref.controls].dropna()
304 input_values = result_df.parameter[self.var_ref.inputs].dropna()
306 # Get the simulation time step index
307 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step,
308 sim_time_step)
310 # Reindex the controls and inputs to sim_time_index
311 control_values_full = control_values.copy().reindex(sim_time_index,
312 method="ffill")
313 input_values_full = input_values.copy().reindex(sim_time_index,
314 method="nearest")
316 for i in range(0, n_simulation_steps):
317 current_sim_time = i * sim_time_step
319 # Apply control and input values from the appropriate MPC step
320 for control, value in zip(
321 self.var_ref.controls, control_values_full.loc[current_sim_time]
322 ):
323 self.flex_model.set(control, value)
325 for input_var, value in zip(
326 self.var_ref.inputs, input_values_full.loc[current_sim_time]
327 ):
328 # change the type of iterable input, since casadi model
329 # can't deal with iterable
330 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
331 self.flex_model.get(input_var).type = type(value).__name__
332 self.flex_model.set(input_var, value)
334 # do integration
335 # reduce the simulation time step so that the total horizon time
336 # will not be exceeded
337 if current_sim_time + sim_time_step <= total_horizon_time:
338 t_sample = sim_time_step
339 else:
340 t_sample = total_horizon_time - current_sim_time
341 self.flex_model.do_step(t_start=0, t_sample=t_sample)
343 # save output
344 for output in self.var_ref.outputs:
345 self.flex_results.loc[
346 (self.env.now, current_sim_time + t_sample), output
347 ] = self.flex_model.get_output(output).value
350class FlexibilityBaselineMINLPMPCConfig(minlp_mpc.MINLPMPCConfig):
351 # define an AgentVariable list for the full control trajectory
352 full_controls: list[AgentVariable] = Field(default=[])
354 # define an AgentVariable list for the variables to communicate to the shadow MPCs
355 vars_to_communicate: list[AgentVariable] = Field(default=[])
357 casadi_sim_time_step: int = Field(
358 default=0,
359 description="Time step for simulation with Casadi simulator. "
360 "Value is read from FlexQuantConfig",
361 )
362 power_variable_name: str = Field(
363 default=None, description="Name of the power variable in the "
364 "baseline mpc model."
365 )
366 storage_variable_name: Optional[str] = Field(
367 default=None, description="Name of the storage variable in the "
368 "baseline mpc model."
369 )
372class FlexibilityBaselineMINLPMPC(minlp_mpc.MINLPMPC):
373 """MINLP-MPC for baseline flexibility quantification with mixed-integer
374 optimization.
376 """
378 config: FlexibilityBaselineMINLPMPCConfig
380 def __init__(self, config, agent):
381 super().__init__(config, agent)
382 # initialize a control mapping dictionary which maps the names of the
383 # incoming AgentVariables (with suffix) to the names without suffix
384 self._controls_name_mapping: Dict[str, str] = {}
385 self._vars_to_com_name_mapping: Dict[str, str] = {}
387 for full_control in self.config.full_controls:
388 # fill the mapping dictionary
389 self._controls_name_mapping[full_control.name] = full_control.name.replace(
390 full_trajectory_suffix, ""
391 )
393 for vars_to_com in self.config.vars_to_communicate:
394 # fill the mapping dictionary
395 self._vars_to_com_name_mapping[vars_to_com.name] = vars_to_com.name.replace(
396 base_vars_to_communicate_suffix, ""
397 )
399 # initialize flex_results with None
400 self.flex_results = None
401 # set up necessary components if simulation is enabled
402 if self.config.casadi_sim_time_step > 0:
403 # generate a separate flex_model for integration to ensure
404 # the model used in MPC optimization remains unaffected
405 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
406 # generate the filename for the simulation results
407 self.res_file_flex = self.config.optimization_backend[
408 "results_file"].replace("_base", "_sim_base")
409 # clear the casadi simulator result at the first time step if already exists
410 try:
411 os.remove(self.res_file_flex)
412 except FileNotFoundError:
413 pass
415 def do_step(self):
416 """
417 Performs an MPC step.
418 """
419 if not self.init_status == InitStatus.ready:
420 self.logger.warning("Skipping step, optimization_backend is not ready.")
421 return
423 self.pre_computation_hook()
425 # get new values from data_broker
426 updated_vars = self.collect_variables_for_optimization()
428 # solve optimization problem with up-to-date values from data_broker
429 result = self.optimization_backend.solve(self.env.time, updated_vars)
431 # Set variables in data_broker
432 self.set_actuation(result)
433 # Set variables, so that shadow MPCs are initialized
434 # with the same values as the Baseline
435 self.set_vars_for_shadow(result)
436 self.set_output(result)
438 def pre_computation_hook(self):
439 """Calculate relative start and end times for flexibility provision.
441 When in provision mode, computes the relative timing for flexibility
442 events based on the external power profile timestamps and current
443 environment time.
444 """
445 if self.get(glbs.PROVISION_VAR_NAME).value:
446 timestep = (self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[1] -
447 self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[0])
448 self.set(glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
449 self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[0] -
450 self.env.time)
451 # the provision profile gives a value for the start of a time step.
452 # For the end of the flex interval add time step!
453 self.set(glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
454 self.get(glbs.ACCEPTED_POWER_VAR_NAME).value.index[-1] -
455 self.env.time + timestep,)
457 def set_vars_for_shadow(self, solution):
458 """Sets the variables of the Baseline MPC needed by the shadow MPCs
459 with a predefined suffix.
460 This essentially sends the same inputs and states the Baseline used
461 for optimization to the Shadow MPC, ensuring synchronisation.
462 """
463 for vars_to_com in self.config.vars_to_communicate:
464 vars_name = self._vars_to_com_name_mapping[vars_to_com.name]
465 if vars_name in solution.df.variable:
466 vars_value = solution.df.variable[vars_name]
467 else: # parameter
468 vars_value = solution.df.parameter[vars_name]
469 self.set(vars_to_com.name, vars_value)
471 def set_actuation(self, solution: Results):
472 super().set_actuation(solution)
473 for full_control in self.config.full_controls:
474 # get the corresponding control name
475 control = self._controls_name_mapping[full_control.name]
476 # set value to full_control
477 self.set(full_control.name, solution.df.variable[control].ffill())
479 def set_output(self, solution):
480 """Takes the solution from optimization backend and sends it
481 to AgentVariables.
483 """
484 # Output must be defined in the config as "type"="pd.Series"
485 if not self.config.set_outputs:
486 return
487 self.logger.info("Sending optimal output values to data_broker.")
489 # simulate with the casadi simulator
490 self.sim_flex_model(solution)
492 df = solution.df
493 if self.flex_results is not None:
494 for output in self.var_ref.outputs:
495 if output not in [
496 self.config.power_variable_name,
497 self.config.storage_variable_name,
498 ]:
499 series = df.variable[output]
500 self.set(output, series)
501 # send the power and storage variable value from simulation results
502 upsampled_output_power = (
503 self.flex_results)[self.config.power_variable_name]
504 self.set(self.config.power_variable_name, upsampled_output_power)
505 if self.config.storage_variable_name is not None:
506 upsampled_output_storage = (
507 self.flex_results)[self.config.storage_variable_name]
508 self.set(self.config.storage_variable_name,
509 upsampled_output_storage.dropna())
510 else:
511 for output in self.var_ref.outputs:
512 series = df.variable[output]
513 self.set(output, series)
515 def sim_flex_model(self, solution):
516 """simulate the flex model over the preditcion horizon and save results
518 """
520 # return if sim_time_step is not a positive integer and system is in provision
521 if not (self.config.casadi_sim_time_step > 0 and not
522 self.get(glbs.PROVISION_VAR_NAME).value):
523 return
525 # read the defined simulation time step
526 sim_time_step = self.config.casadi_sim_time_step
527 mpc_time_step = self.config.time_step
529 # set the horizon length and the number of simulation steps
530 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
531 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
533 # read the current optimization result
534 result_df = solution.df
536 # initialize the flex sim results Dataframe
537 self._initialize_flex_results(
538 n_simulation_steps, total_horizon_time, sim_time_step, result_df
539 )
541 # Update model parameters and initial states
542 self._update_model_parameters()
543 self._update_initial_states(result_df)
545 # Run simulation
546 self._run_simulation(
547 n_simulation_steps, sim_time_step, mpc_time_step, result_df,
548 total_horizon_time
549 )
551 # set index of flex results to the same as mpc result
552 store_results_df = self.flex_results.copy(deep=True)
553 store_results_df.index = self.flex_results.index.tolist()
555 # save results
556 if not os.path.exists(self.res_file_flex):
557 store_results_df.to_csv(self.res_file_flex)
558 else:
559 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
561 # set the flex results format same as mpc result while updating Agentvariable
562 self.flex_results.index = self.flex_results.index.get_level_values(1)
564 def _initialize_flex_results(
565 self, n_simulation_steps, horizon_length, sim_time_step, result_df
566 ):
567 """Initialize the flex results dataframe with the correct dimension
568 and index and fill with existing results from optimization
570 """
572 # create MultiIndex for collocation points
573 index_coll = pd.MultiIndex.from_arrays(
574 [[self.env.now] * len(result_df.index), result_df.index],
575 names=["time_step", "time"]
576 # Match the names with multi_index but note they're reversed
577 )
578 # create Multiindex for full simulation sample times
579 index_full_sample = pd.MultiIndex.from_tuples(
580 zip(
581 [self.env.now] * (n_simulation_steps + 1),
582 range(0, horizon_length + sim_time_step, sim_time_step),
583 ),
584 names=["time_step", "time"],
585 )
586 # merge indexes
587 new_index = index_coll.union(index_full_sample).sort_values()
588 # initialize the flex results with correct dimension
589 self.flex_results = pd.DataFrame(np.nan, index=new_index,
590 columns=self.var_ref.outputs)
592 # Get the optimization outputs and create a series for fixed
593 # optimization outputs with the correct MultiIndex format
594 opti_outputs = result_df.variable[self.config.power_variable_name]
595 fixed_opti_output = pd.Series(
596 opti_outputs.values,
597 index=index_coll,
598 )
599 # fill the output value at the time step where it already exists
600 # in optimization output
601 for idx in fixed_opti_output.index:
602 if idx in self.flex_results.index:
603 self.flex_results.loc[idx, self.config.power_variable_name] = (
604 fixed_opti_output)[idx]
606 def _update_model_parameters(self):
607 """update the value of module parameters with value from config,
608 since creating a model just reads the value in the model class but
609 not the config
611 """
613 for par in self.config.parameters:
614 self.flex_model.set(par.name, par.value)
616 def _update_initial_states(self, result_df):
617 """set the initial value of states"""
619 # get state values from the mpc optimization result
620 state_values = result_df.variable[self.var_ref.states]
621 # update state values with last measurement
622 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
623 self.flex_model.set(state, value)
625 def _run_simulation(
626 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df,
627 total_horizon_time
628 ):
629 """simulate with flex model over the prediction horizon
631 """
633 # get control and input values from the mpc optimization result
634 control_values = result_df.variable[
635 [*self.var_ref.controls, *self.var_ref.binary_controls]
636 ].dropna()
637 input_values = result_df.parameter[self.var_ref.inputs].dropna()
639 # Get the simulation time step index
640 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step,
641 sim_time_step)
643 # Reindex the controls and inputs to sim_time_index
644 control_values_full = control_values.copy().reindex(sim_time_index,
645 method="ffill")
646 input_values_full = input_values.copy().reindex(sim_time_index,
647 method="nearest")
649 for i in range(0, n_simulation_steps):
650 current_sim_time = i * sim_time_step
652 # Apply control and input values from the appropriate MPC step
653 for control, value in zip(
654 self.var_ref.controls,
655 control_values_full.loc[current_sim_time, self.var_ref.controls],
656 ):
657 self.flex_model.set(control, value)
659 for binary_control, value in zip(
660 self.var_ref.binary_controls,
661 control_values_full.loc[current_sim_time, self.var_ref.binary_controls],
662 ):
663 self.flex_model.set(binary_control, value)
665 for input_var, value in zip(
666 self.var_ref.inputs, input_values_full.loc[current_sim_time]
667 ):
668 # change the type of iterable input, since casadi model can't
669 # deal with iterable
670 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
671 self.flex_model.get(input_var).type = type(value).__name__
672 self.flex_model.set(input_var, value)
674 # do integration
675 # reduce the simulation time step so that the total horizon time
676 # will not be exceeded
677 if current_sim_time + sim_time_step <= total_horizon_time:
678 t_sample = sim_time_step
679 else:
680 t_sample = total_horizon_time - current_sim_time
681 self.flex_model.do_step(t_start=0, t_sample=t_sample)
683 # save output
684 for output in self.var_ref.outputs:
685 self.flex_results.loc[
686 (self.env.now, current_sim_time + t_sample), output
687 ] = self.flex_model.get_output(output).value