Coverage for agentlib_flexquant/modules/baseline_mpc.py: 65%
224 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1"""
2Defines MPC and MINLP-MPC for baseline flexibility quantification.
3"""
4from agentlib_mpc.modules import minlp_mpc, mpc_full
5import os
6import math
7import numpy as np
8import pandas as pd
9from typing import Optional, Dict
10from pydantic import Field
11from collections.abc import Iterable
12import agentlib_flexquant.data_structures.globals as glbs
13from agentlib import AgentVariable
14from agentlib_mpc.modules import mpc_full, minlp_mpc
15from agentlib_mpc.data_structures.mpc_datamodels import Results
16from agentlib_flexquant.data_structures.globals import full_trajectory_suffix
19class FlexibilityBaselineMPCConfig(mpc_full.MPCConfig):
20 # define an AgentVariable list for the full control trajectory, since use MPCVariable output
21 # affects the optimization result
22 full_controls: list[AgentVariable] = Field(default=[])
24 casadi_sim_time_step: int = Field(
25 default=0,
26 description="Time step for simulation with Casadi"
27 "simulator. Value is read from "
28 "FlexQuantConfig",
29 )
30 power_variable_name: str = Field(
31 default=None, description="Name of the power variable in the " "baseline mpc model."
32 )
33 storage_variable_name: Optional[str] = Field(
34 default=None, description="Name of the storage v" "ariable in the baseline " "mpc model."
35 )
38class FlexibilityBaselineMPC(mpc_full.MPC):
39 """MPC for baseline flexibility quantification."""
41 config: FlexibilityBaselineMPCConfig
43 def __init__(self, config, agent):
44 super().__init__(config, agent)
45 # initialize a control mapping dictionary which maps the full control names to the control
46 # names
47 self._controls_name_mapping: Dict[str, str] = {}
49 for full_control in self.config.full_controls:
50 # add full_control to the variables dictionary, so that the set function can be applied
51 # to it
52 self._variables_dict[full_control.name] = full_control
53 # fill the mapping dictionary
54 self._controls_name_mapping[full_control.name] = full_control.name.replace(
55 full_trajectory_suffix, ""
56 )
58 # initialize flex_results with None
59 self.flex_results = None
60 # set up necessary components if simulation is enabled
61 if self.config.casadi_sim_time_step > 0:
62 # generate a separate flex_model for integration to ensure
63 # the model used in MPC optimization remains unaffected
64 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
65 # generate the filename for the simulation results
66 self.res_file_flex = self.config.optimization_backend["results_file"].replace(
67 "mpc", "mpc_sim"
68 )
69 # clear the casadi simulator result at the first time step if already exists
70 try:
71 os.remove(self.res_file_flex)
72 except:
73 pass
75 def pre_computation_hook(self):
76 """Calculate relative start and end times for flexibility provision.
78 When in provision mode, computes the relative timing for flexibility
79 events based on the external power profile timestamps and current
80 environment time.
81 """
82 if self.get("in_provision").value:
83 self.set("rel_start", self.get("_P_external").value.index[0] - self.env.time)
84 # the provision profile gives a value for the start of a time step.
85 # For the end of the flex interval add time step!
86 self.set("rel_end", self.get("_P_external").value.index[-1] - self.env.time)
88 def set_actuation(self, solution: Results):
89 super().set_actuation(solution)
90 for full_control in self.config.full_controls:
91 # get the corresponding control name
92 control = self._controls_name_mapping[full_control.name]
93 # set value to full_control
94 self.set(full_control.name, solution.df.variable[control].ffill())
96 def set_output(self, solution):
97 """Takes the solution from optimization backend and sends it to AgentVariables."""
98 # Output must be defined in the config as "type"="pd.Series"
99 if not self.config.set_outputs:
100 return
101 self.logger.info("Sending optimal output values to data_broker.")
103 # simulate with the casadi simulator
104 self.sim_flex_model(solution)
106 # extract solution DataFrama
107 df = solution.df
109 # send the outputs
110 if self.flex_results is not None:
111 for output in self.var_ref.outputs:
112 if output not in [
113 self.config.power_variable_name,
114 self.config.storage_variable_name,
115 ]:
116 series = df.variable[output]
117 self.set(output, series)
118 # send the power and storage variable value from simulation results
119 upsampled_output_power = self.flex_results[self.config.power_variable_name]
120 self.set(self.config.power_variable_name, upsampled_output_power)
121 if self.config.storage_variable_name is not None:
122 upsampled_output_storage = self.flex_results[self.config.storage_variable_name]
123 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna())
124 else:
125 for output in self.var_ref.outputs:
126 series = df.variable[output]
127 self.set(output, series)
129 def sim_flex_model(self, solution):
130 """simulate the flex model over the preditcion horizon and save results"""
132 # return if sim_time_step is not a positive integer and system is in provision
133 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value):
134 return
136 # read the defined simulation time step
137 sim_time_step = self.config.casadi_sim_time_step
138 mpc_time_step = self.config.time_step
140 # set the horizon length and the number of simulation steps
141 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
142 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
144 # read the current optimization result
145 result_df = solution.df
147 # initialize the flex sim results Dataframe
148 self._initialize_flex_results(
149 n_simulation_steps, total_horizon_time, sim_time_step, result_df
150 )
152 # Update model parameters and initial states
153 self._update_model_parameters()
154 self._update_initial_states(result_df)
156 # Run simulation
157 self._run_simulation(
158 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
159 )
161 # set index of flex results to the same as mpc result
162 store_results_df = self.flex_results.copy(deep=True)
163 store_results_df.index = self.flex_results.index.tolist()
165 # save results
166 if not os.path.exists(self.res_file_flex):
167 store_results_df.to_csv(self.res_file_flex)
168 else:
169 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
171 # set the flex results format same as mpc result while updating Agentvariable
172 self.flex_results.index = self.flex_results.index.get_level_values(1)
174 def _initialize_flex_results(
175 self, n_simulation_steps, horizon_length, sim_time_step, result_df
176 ):
177 """Initialize the flex results dataframe with the correct dimension and index
178 and fill with existing results from optimization
179 """
181 # create MultiIndex for collocation points
182 index_coll = pd.MultiIndex.from_arrays(
183 [[self.env.now] * len(result_df.index), result_df.index],
184 names=["time_step", "time"]
185 # Match the names with multi_index but note they're reversed
186 )
187 # create Multiindex for full simulation sample times
188 index_full_sample = pd.MultiIndex.from_tuples(
189 zip(
190 [self.env.now] * (n_simulation_steps + 1),
191 range(0, horizon_length + sim_time_step, sim_time_step),
192 ),
193 names=["time_step", "time"],
194 )
195 # merge indexes
196 new_index = index_coll.union(index_full_sample).sort_values()
197 # initialize the flex results with correct dimension
198 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs)
200 # Get the optimization outputs and create a series for fixed optimization outputs with the
201 # correct MultiIndex format
202 opti_outputs = result_df.variable[self.config.power_variable_name]
203 fixed_opti_output = pd.Series(
204 opti_outputs.values,
205 index=index_coll,
206 )
207 # fill the output value at the time step where it already exists in optimization output
208 for idx in fixed_opti_output.index:
209 if idx in self.flex_results.index:
210 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx]
212 def _update_model_parameters(self):
213 """update the value of module parameters with value from config,
214 since creating a model just reads the value in the model class but not the config
215 """
217 for par in self.config.parameters:
218 self.flex_model.set(par.name, par.value)
220 def _update_initial_states(self, result_df):
221 """set the initial value of states"""
223 # get state values from the mpc optimization result
224 state_values = result_df.variable[self.var_ref.states]
225 # update state values with last measurement
226 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
227 self.flex_model.set(state, value)
229 def _run_simulation(
230 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
231 ):
232 """simulate with flex model over the prediction horizon"""
234 # get control and input values from the mpc optimization result
235 control_values = result_df.variable[self.var_ref.controls].dropna()
236 input_values = result_df.parameter[self.var_ref.inputs].dropna()
238 # Get the simulation time step index
239 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step)
241 # Reindex the controls and inputs to sim_time_index
242 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill")
243 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest")
245 for i in range(0, n_simulation_steps):
246 current_sim_time = i * sim_time_step
248 # Apply control and input values from the appropriate MPC step
249 for control, value in zip(
250 self.var_ref.controls, control_values_full.loc[current_sim_time]
251 ):
252 self.flex_model.set(control, value)
254 for input_var, value in zip(
255 self.var_ref.inputs, input_values_full.loc[current_sim_time]
256 ):
257 # change the type of iterable input, since casadi model can't deal with iterable
258 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
259 self.flex_model.get(input_var).type = type(value).__name__
260 self.flex_model.set(input_var, value)
262 # do integration
263 # reduce the simulation time step so that the total horizon time will not be exceeded
264 if current_sim_time + sim_time_step <= total_horizon_time:
265 t_sample = sim_time_step
266 else:
267 t_sample = total_horizon_time - current_sim_time
268 self.flex_model.do_step(t_start=0, t_sample=t_sample)
270 # save output
271 for output in self.var_ref.outputs:
272 self.flex_results.loc[
273 (self.env.now, current_sim_time + t_sample), output
274 ] = self.flex_model.get_output(output).value
277class FlexibilityBaselineMINLPMPCConfig(minlp_mpc.MINLPMPCConfig):
279 # define an AgentVariable list for the full control trajectory, since use MPCVariable output
280 # affects the optimization result
281 full_controls: list[AgentVariable] = Field(default=[])
283 casadi_sim_time_step: int = Field(
284 default=0,
285 description="Time step for simulation with Casadi simulator. Value is read from "
286 "FlexQuantConfig",
287 )
288 power_variable_name: str = Field(
289 default=None, description="Name of the power variable in the baseline mpc model."
290 )
291 storage_variable_name: Optional[str] = Field(
292 default=None, description="Name of the storage variable in the baseline mpc model."
293 )
296class FlexibilityBaselineMINLPMPC(minlp_mpc.MINLPMPC):
297 """MINLP-MPC for baseline flexibility quantification with mixed-integer optimization."""
299 config: FlexibilityBaselineMINLPMPCConfig
301 def __init__(self, config, agent):
302 super().__init__(config, agent)
303 # initialize a control mapping dictionary which maps the full control names to the control
304 # names
305 self._controls_name_mapping: Dict[str, str] = {}
307 for full_control in self.config.full_controls:
308 # add full_control to the variables dictionary, so that the set function can be applied
309 # to it
310 self._variables_dict[full_control.name] = full_control
311 # fill the mapping dictionary
312 self._controls_name_mapping[full_control.name] = full_control.name.replace(
313 full_trajectory_suffix, ""
314 )
316 # initialize flex_results with None
317 self.flex_results = None
318 # set up necessary components if simulation is enabled
319 if self.config.casadi_sim_time_step > 0:
320 # generate a separate flex_model for integration to ensure the model used in MPC
321 # optimization remains unaffected
322 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step)
323 # generate the filename for the simulation results
324 self.res_file_flex = self.config.optimization_backend["results_file"].replace(
325 "mpc", "mpc_sim"
326 )
327 # clear the casadi simulator result at the first time step if already exists
328 try:
329 os.remove(self.res_file_flex)
330 except:
331 pass
333 def pre_computation_hook(self):
334 """Calculate relative start and end times for flexibility provision.
336 When in provision mode, computes the relative timing for flexibility
337 events based on the external power profile timestamps and current
338 environment time.
339 """
340 if self.get("in_provision").value:
341 timestep = (
342 self.get("_P_external").value.index[1] - self.get("_P_external").value.index[0]
343 )
344 self.set("rel_start", self.get("_P_external").value.index[0] - self.env.time)
345 # the provision profile gives a value for the start of a time step.
346 # For the end of the flex interval add time step!
347 self.set(
348 "rel_end",
349 self.get("_P_external").value.index[-1] - self.env.time + timestep,
350 )
352 def set_actuation(self, solution: Results):
353 super().set_actuation(solution)
354 for full_control in self.config.full_controls:
355 # get the corresponding control name
356 control = self._controls_name_mapping[full_control.name]
357 # set value to full_control
358 self.set(full_control.name, solution.df.variable[control].ffill())
360 def set_output(self, solution):
361 """Takes the solution from optimization backend and sends it to AgentVariables."""
362 # Output must be defined in the config as "type"="pd.Series"
363 if not self.config.set_outputs:
364 return
365 self.logger.info("Sending optimal output values to data_broker.")
367 # simulate with the casadi simulator
368 self.sim_flex_model(solution)
370 df = solution.df
371 if self.flex_results is not None:
372 for output in self.var_ref.outputs:
373 if output not in [
374 self.config.power_variable_name,
375 self.config.storage_variable_name,
376 ]:
377 series = df.variable[output]
378 self.set(output, series)
379 # send the power and storage variable value from simulation results
380 upsampled_output_power = self.flex_results[self.config.power_variable_name]
381 self.set(self.config.power_variable_name, upsampled_output_power)
382 if self.config.storage_variable_name is not None:
383 upsampled_output_storage = self.flex_results[self.config.storage_variable_name]
384 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna())
385 else:
386 for output in self.var_ref.outputs:
387 series = df.variable[output]
388 self.set(output, series)
390 def sim_flex_model(self, solution):
391 """simulate the flex model over the preditcion horizon and save results"""
393 # return if sim_time_step is not a positive integer and system is in provision
394 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value):
395 return
397 # read the defined simulation time step
398 sim_time_step = self.config.casadi_sim_time_step
399 mpc_time_step = self.config.time_step
401 # set the horizon length and the number of simulation steps
402 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step)
403 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step)
405 # read the current optimization result
406 result_df = solution.df
408 # initialize the flex sim results Dataframe
409 self._initialize_flex_results(
410 n_simulation_steps, total_horizon_time, sim_time_step, result_df
411 )
413 # Update model parameters and initial states
414 self._update_model_parameters()
415 self._update_initial_states(result_df)
417 # Run simulation
418 self._run_simulation(
419 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
420 )
422 # set index of flex results to the same as mpc result
423 store_results_df = self.flex_results.copy(deep=True)
424 store_results_df.index = self.flex_results.index.tolist()
426 # save results
427 if not os.path.exists(self.res_file_flex):
428 store_results_df.to_csv(self.res_file_flex)
429 else:
430 store_results_df.to_csv(self.res_file_flex, mode="a", header=False)
432 # set the flex results format same as mpc result while updating Agentvariable
433 self.flex_results.index = self.flex_results.index.get_level_values(1)
435 def _initialize_flex_results(
436 self, n_simulation_steps, horizon_length, sim_time_step, result_df
437 ):
438 """Initialize the flex results dataframe with the correct dimension and index and fill
439 with existing results from optimization"""
441 # create MultiIndex for collocation points
442 index_coll = pd.MultiIndex.from_arrays(
443 [[self.env.now] * len(result_df.index), result_df.index],
444 names=["time_step", "time"]
445 # Match the names with multi_index but note they're reversed
446 )
447 # create Multiindex for full simulation sample times
448 index_full_sample = pd.MultiIndex.from_tuples(
449 zip(
450 [self.env.now] * (n_simulation_steps + 1),
451 range(0, horizon_length + sim_time_step, sim_time_step),
452 ),
453 names=["time_step", "time"],
454 )
455 # merge indexes
456 new_index = index_coll.union(index_full_sample).sort_values()
457 # initialize the flex results with correct dimension
458 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs)
460 # Get the optimization outputs and create a series for fixed optimization outputs with
461 # the correct MultiIndex format
462 opti_outputs = result_df.variable[self.config.power_variable_name]
463 fixed_opti_output = pd.Series(
464 opti_outputs.values,
465 index=index_coll,
466 )
467 # fill the output value at the time step where it already exists in optimization output
468 for idx in fixed_opti_output.index:
469 if idx in self.flex_results.index:
470 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx]
472 def _update_model_parameters(self):
473 """update the value of module parameters with value from config,
474 since creating a model just reads the value in the model class but not the config
475 """
477 for par in self.config.parameters:
478 self.flex_model.set(par.name, par.value)
480 def _update_initial_states(self, result_df):
481 """set the initial value of states"""
483 # get state values from the mpc optimization result
484 state_values = result_df.variable[self.var_ref.states]
485 # update state values with last measurement
486 for state, value in zip(self.var_ref.states, state_values.iloc[0]):
487 self.flex_model.set(state, value)
489 def _run_simulation(
490 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time
491 ):
492 """simulate with flex model over the prediction horizon"""
494 # get control and input values from the mpc optimization result
495 control_values = result_df.variable[
496 [*self.var_ref.controls, *self.var_ref.binary_controls]
497 ].dropna()
498 input_values = result_df.parameter[self.var_ref.inputs].dropna()
500 # Get the simulation time step index
501 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step)
503 # Reindex the controls and inputs to sim_time_index
504 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill")
505 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest")
507 for i in range(0, n_simulation_steps):
508 current_sim_time = i * sim_time_step
510 # Apply control and input values from the appropriate MPC step
511 for control, value in zip(
512 self.var_ref.controls,
513 control_values_full.loc[current_sim_time, self.var_ref.controls],
514 ):
515 self.flex_model.set(control, value)
517 for binary_control, value in zip(
518 self.var_ref.binary_controls,
519 control_values_full.loc[current_sim_time, self.var_ref.binary_controls],
520 ):
521 self.flex_model.set(binary_control, value)
523 for input_var, value in zip(
524 self.var_ref.inputs, input_values_full.loc[current_sim_time]
525 ):
526 # change the type of iterable input, since casadi model can't deal with iterable
527 if issubclass(eval(self.flex_model.get(input_var).type), Iterable):
528 self.flex_model.get(input_var).type = type(value).__name__
529 self.flex_model.set(input_var, value)
531 # do integration
532 # reduce the simulation time step so that the total horizon time will not be exceeded
533 if current_sim_time + sim_time_step <= total_horizon_time:
534 t_sample = sim_time_step
535 else:
536 t_sample = total_horizon_time - current_sim_time
537 self.flex_model.do_step(t_start=0, t_sample=t_sample)
539 # save output
540 for output in self.var_ref.outputs:
541 self.flex_results.loc[
542 (self.env.now, current_sim_time + t_sample), output
543 ] = self.flex_model.get_output(output).value