Coverage for agentlib_flexquant/modules/shadow_mpc.py: 68%

247 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1""" 

2Defines shadow MPC and MINLP-MPC for positive/negative flexibility quantification. 

3""" 

4from typing import Dict, Union 

5 

6import os 

7import math 

8import numpy as np 

9import pandas as pd 

10from pydantic import Field 

11from typing import Dict, Union, Optional 

12from collections.abc import Iterable 

13from agentlib.core.datamodels import AgentVariable 

14from agentlib_mpc.modules import mpc_full, minlp_mpc 

15from agentlib_flexquant.utils.data_handling import fill_nans, MEAN 

16from agentlib_flexquant.data_structures.globals import ( 

17 full_trajectory_prefix, 

18 full_trajectory_suffix, 

19 STORED_ENERGY_ALIAS_NEG, 

20 STORED_ENERGY_ALIAS_POS, 

21) 

22 

23 

24class FlexibilityShadowMPCConfig(mpc_full.MPCConfig): 

25 

26 casadi_sim_time_step: int = Field( 

27 default=0, 

28 description="Time step for simulation with Casadi simulator. Value is read from " 

29 "FlexQuantConfig", 

30 ) 

31 power_variable_name: str = Field( 

32 default=None, description="Name of the power variable in the shadow mpc model." 

33 ) 

34 storage_variable_name: Optional[str] = Field( 

35 default=None, description="Name of the storage variable in the shadow mpc model." 

36 ) 

37 

38 

39class FlexibilityShadowMPC(mpc_full.MPC): 

40 """Shadow MPC for calculating positive/negative flexibility offers.""" 

41 

42 config: FlexibilityShadowMPCConfig 

43 

44 def __init__(self, *args, **kwargs): 

45 # create instance variable 

46 self._full_controls: Dict[str, Union[AgentVariable, None]] = {} 

47 # initialize flex_results with None 

48 self.flex_results = None 

49 super().__init__(*args, **kwargs) 

50 # set up necessary components if simulation is enabled 

51 if self.config.casadi_sim_time_step > 0: 

52 # generate a separate flex_model for integration to ensure the model used in MPC 

53 # optimization remains unaffected 

54 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step) 

55 # generate the filename for the simulation results 

56 self.res_file_flex = self.config.optimization_backend["results_file"].replace( 

57 "mpc", "mpc_sim" 

58 ) 

59 # clear the casadi simulator result at the first time step if already exists 

60 try: 

61 os.remove(self.res_file_flex) 

62 except: 

63 pass 

64 

65 def set_output(self, solution): 

66 """Takes the solution from optimization backend and sends it to AgentVariables.""" 

67 # Output must be defined in the config as "type"="pd.Series" 

68 if not self.config.set_outputs: 

69 return 

70 self.logger.info("Sending optimal output values to data_broker.") 

71 df = solution.df 

72 self.sim_flex_model(solution) 

73 if self.flex_results is not None: 

74 for output in self.var_ref.outputs: 

75 if output not in [ 

76 self.config.power_variable_name, 

77 self.config.storage_variable_name, 

78 ]: 

79 series = df.variable[output] 

80 self.set(output, series) 

81 # send the power and storage variable value from simulation results 

82 upsampled_output_power = self.flex_results[self.config.power_variable_name] 

83 self.set(self.config.power_variable_name, upsampled_output_power) 

84 if self.config.storage_variable_name is not None: 

85 upsampled_output_storage = self.flex_results[self.config.storage_variable_name] 

86 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna()) 

87 else: 

88 for output in self.var_ref.outputs: 

89 series = df.variable[output] 

90 self.set(output, series) 

91 

92 def sim_flex_model(self, solution): 

93 """simulate the flex model over the preditcion horizon and save results""" 

94 

95 # return if sim_time_step is not a positive integer and system is in provision 

96 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value): 

97 return 

98 

99 # read the defined simulation time step 

100 sim_time_step = self.config.casadi_sim_time_step 

101 mpc_time_step = self.config.time_step 

102 

103 # set the horizon length and the number of simulation steps 

104 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step) 

105 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step) 

106 

107 # read the current optimization result 

108 result_df = solution.df 

109 

110 # initialize the flex sim results Dataframe 

111 self._initialize_flex_results( 

112 n_simulation_steps, total_horizon_time, sim_time_step, result_df 

113 ) 

114 

115 # Update model parameters and initial states 

116 self._update_model_parameters() 

117 self._update_initial_states(result_df) 

118 

119 # Run simulation 

120 self._run_simulation( 

121 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

122 ) 

123 

124 # set index of flex results to the same as mpc result 

125 store_results_df = self.flex_results.copy(deep=True) 

126 store_results_df.index = self.flex_results.index.tolist() 

127 

128 # save results 

129 if not os.path.exists(self.res_file_flex): 

130 store_results_df.to_csv(self.res_file_flex) 

131 else: 

132 store_results_df.to_csv(self.res_file_flex, mode="a", header=False) 

133 

134 # set the flex results format same as mpc result while updating Agentvariable 

135 self.flex_results.index = self.flex_results.index.get_level_values(1) 

136 

137 def register_callbacks(self): 

138 for control_var in self.config.controls: 

139 self.agent.data_broker.register_callback( 

140 name=f"{control_var.name+full_trajectory_suffix}", 

141 alias=f"{control_var.name+full_trajectory_suffix}", 

142 callback=self.calc_flex_callback, 

143 ) 

144 for input_var in self.config.inputs: 

145 adapted_name = input_var.name.replace(full_trajectory_suffix, "") 

146 if adapted_name in [control_var.name for control_var in self.config.controls]: 

147 self._full_controls[input_var.name] = input_var 

148 

149 super().register_callbacks() 

150 

151 def calc_flex_callback(self, inp: AgentVariable, name: str): 

152 """Set the control trajectories before calculating the flexibility offer. 

153 

154 self.model should account for flexibility in its cost function. 

155 

156 """ 

157 # during provision dont calculate flex 

158 if self.get("in_provision").value: 

159 return 

160 

161 # do not trigger callback on self set variables 

162 if self.agent.config.id == inp.source.agent_id: 

163 return 

164 

165 # get the value of the input 

166 vals = inp.value 

167 if vals.isna().any(): 

168 vals = fill_nans(series=vals, method=MEAN) 

169 # add time shift env.now to the mpc prediction index if it starts at t=0 

170 if vals.index[0] == 0: 

171 vals.index += self.env.time 

172 # update value in the mapping dictionary 

173 self._full_controls[name].value = vals 

174 # make sure all controls are set 

175 if all(x.value is not None for x in self._full_controls.values()): 

176 self.do_step() 

177 for _, control_var in self._full_controls.items(): 

178 control_var.value = None 

179 

180 def process(self): 

181 # the shadow mpc should only be run after the results of the baseline are sent 

182 yield self.env.event() 

183 

184 def _initialize_flex_results( 

185 self, n_simulation_steps, horizon_length, sim_time_step, result_df 

186 ): 

187 """Initialize the flex results dataframe with the correct dimension and index and fill with 

188 existing results from optimization""" 

189 

190 # create MultiIndex for collocation points 

191 index_coll = pd.MultiIndex.from_arrays( 

192 [[self.env.now] * len(result_df.index), result_df.index], 

193 names=["time_step", "time"] 

194 # Match the names with multi_index but note they're reversed 

195 ) 

196 # create Multiindex for full simulation sample times 

197 index_full_sample = pd.MultiIndex.from_tuples( 

198 zip( 

199 [self.env.now] * (n_simulation_steps + 1), 

200 range(0, horizon_length + sim_time_step, sim_time_step), 

201 ), 

202 names=["time_step", "time"], 

203 ) 

204 # merge indexes 

205 new_index = index_coll.union(index_full_sample).sort_values() 

206 # initialize the flex results with correct dimension 

207 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs) 

208 

209 # Get the optimization outputs and create a series for fixed optimization outputs with the 

210 # correct MultiIndex format 

211 opti_outputs = result_df.variable[self.config.power_variable_name] 

212 fixed_opti_output = pd.Series( 

213 opti_outputs.values, 

214 index=index_coll, 

215 ) 

216 # fill the output value at the time step where it already exists in optimization output 

217 for idx in fixed_opti_output.index: 

218 if idx in self.flex_results.index: 

219 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx] 

220 

221 def _update_model_parameters(self): 

222 """update the value of module parameters with value from config, 

223 since creating a model just reads the value in the model class but not the config 

224 """ 

225 

226 for par in self.config.parameters: 

227 self.flex_model.set(par.name, par.value) 

228 

229 def _update_initial_states(self, result_df): 

230 """set the initial value of states""" 

231 

232 # get state values from the mpc optimization result 

233 state_values = result_df.variable[self.var_ref.states] 

234 # update state values with last measurement 

235 for state, value in zip(self.var_ref.states, state_values.iloc[0]): 

236 self.flex_model.set(state, value) 

237 

238 def _run_simulation( 

239 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

240 ): 

241 """simulate with flex model over the prediction horizon""" 

242 

243 # get control and input values from the mpc optimization result 

244 control_values = result_df.variable[self.var_ref.controls].dropna() 

245 input_values = result_df.parameter[self.var_ref.inputs].dropna() 

246 

247 # Get the simulation time step index 

248 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step) 

249 

250 # Reindex the controls and inputs to sim_time_index 

251 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill") 

252 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest") 

253 

254 for i in range(0, n_simulation_steps): 

255 current_sim_time = i * sim_time_step 

256 

257 # Apply control and input values from the appropriate MPC step 

258 for control, value in zip( 

259 self.var_ref.controls, control_values_full.loc[current_sim_time] 

260 ): 

261 self.flex_model.set(control, value) 

262 

263 for input_var, value in zip( 

264 self.var_ref.inputs, input_values_full.loc[current_sim_time] 

265 ): 

266 # change the type of iterable input, since casadi model can't deal with iterable 

267 if issubclass(eval(self.flex_model.get(input_var).type), Iterable): 

268 self.flex_model.get(input_var).type = type(value).__name__ 

269 self.flex_model.set(input_var, value) 

270 

271 # do integration 

272 # reduce the simulation time step so that the total horizon time will not be exceeded 

273 if current_sim_time + sim_time_step <= total_horizon_time: 

274 t_sample = sim_time_step 

275 else: 

276 t_sample = total_horizon_time - current_sim_time 

277 self.flex_model.do_step(t_start=0, t_sample=t_sample) 

278 

279 # save output 

280 for output in self.var_ref.outputs: 

281 self.flex_results.loc[ 

282 (self.env.now, current_sim_time + t_sample), output 

283 ] = self.flex_model.get_output(output).value 

284 

285 

286class FlexibilityShadowMINLPMPCConfig(minlp_mpc.MINLPMPCConfig): 

287 

288 casadi_sim_time_step: int = Field( 

289 default=0, 

290 description="Time step for simulation with Casadi simulator. Value is read from " 

291 "FlexQuantConfig", 

292 ) 

293 power_variable_name: str = Field( 

294 default=None, description="Name of the power variable in the shadow mpc model." 

295 ) 

296 storage_variable_name: Optional[str] = Field( 

297 default=None, description="Name of the storage variable in the shadow mpc model." 

298 ) 

299 

300 

301class FlexibilityShadowMINLPMPC(minlp_mpc.MINLPMPC): 

302 """Shadow MINLP-MPC for calculating positive/negatives flexibility offers.""" 

303 

304 config: FlexibilityShadowMINLPMPCConfig 

305 

306 def __init__(self, *args, **kwargs): 

307 # create instance variable 

308 self._full_controls: Dict[str, Union[AgentVariable, None]] = {} 

309 # initialize flex_results with None 

310 self.flex_results = None 

311 super().__init__(*args, **kwargs) 

312 # set up necessary components if simulation is enabled 

313 if self.config.casadi_sim_time_step > 0: 

314 # generate a separate flex_model for integration to ensure the model used in MPC 

315 # optimization remains unaffected 

316 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step) 

317 # generate the filename for the simulation results 

318 self.res_file_flex = self.config.optimization_backend["results_file"].replace( 

319 "mpc", "mpc_sim" 

320 ) 

321 # clear the casadi simulator result at the first time step if already exists 

322 try: 

323 os.remove(self.res_file_flex) 

324 except: 

325 pass 

326 

327 def register_callbacks(self): 

328 for control_var in self.config.controls + self.config.binary_controls: 

329 self.agent.data_broker.register_callback( 

330 name=f"{control_var.name}{full_trajectory_suffix}", 

331 alias=f"{control_var.name}{full_trajectory_suffix}", 

332 callback=self.calc_flex_callback, 

333 ) 

334 for input_var in self.config.inputs: 

335 adapted_name = input_var.name.replace(full_trajectory_suffix, "") 

336 if adapted_name in [ 

337 control_var.name 

338 for control_var in self.config.controls + self.config.binary_controls 

339 ]: 

340 self._full_controls[input_var.name] = input_var 

341 

342 super().register_callbacks() 

343 

344 def calc_flex_callback(self, inp: AgentVariable, name: str): 

345 """Set the control trajectories before calculating the flexibility offer. 

346 

347 self.model should account for flexibility in its cost function 

348 

349 """ 

350 # during provision dont calculate flex 

351 if self.get("in_provision").value: 

352 return 

353 

354 # do not trigger callback on self set variables 

355 if self.agent.config.id == inp.source.agent_id: 

356 return 

357 

358 # get the value of the input 

359 vals = inp.value 

360 if vals.isna().any(): 

361 vals = fill_nans(series=vals, method=MEAN) 

362 # add time shift env.now to the mpc prediction index if it starts at t=0 

363 if vals.index[0] == 0: 

364 vals.index += self.env.time 

365 # update value in the mapping dictionary 

366 self._full_controls[name].value = vals 

367 # update the value of the variable in the model if we want to limit the binary control in 

368 # the market time during optimization 

369 self.model.set(name, vals) 

370 # make sure all controls are set 

371 if all(x.value is not None for x in self._full_controls.values()): 

372 self.do_step() 

373 for _, control_var in self._full_controls.items(): 

374 control_var.value = None 

375 

376 def process(self): 

377 # the shadow mpc should only be run after the results of the baseline are sent 

378 yield self.env.event() 

379 

380 def set_output(self, solution): 

381 """Takes the solution from optimization backend and sends it to AgentVariables.""" 

382 # Output must be defined in the config as "type"="pd.Series" 

383 if not self.config.set_outputs: 

384 return 

385 self.logger.info("Sending optimal output values to data_broker.") 

386 

387 # simulate with the casadi simulator 

388 self.sim_flex_model(solution) 

389 

390 df = solution.df 

391 if self.flex_results is not None: 

392 for output in self.var_ref.outputs: 

393 if output not in [ 

394 self.config.power_variable_name, 

395 self.config.storage_variable_name, 

396 ]: 

397 series = df.variable[output] 

398 self.set(output, series) 

399 # send the power and storage variable value from simulation results 

400 upsampled_output_power = self.flex_results[self.config.power_variable_name] 

401 self.set(self.config.power_variable_name, upsampled_output_power) 

402 if self.config.storage_variable_name is not None: 

403 upsampled_output_storage = self.flex_results[self.config.storage_variable_name] 

404 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna()) 

405 else: 

406 for output in self.var_ref.outputs: 

407 series = df.variable[output] 

408 self.set(output, series) 

409 

410 def sim_flex_model(self, solution): 

411 """simulate the flex model over the preditcion horizon and save results""" 

412 

413 # return if sim_time_step is not a positive integer and system is in provision 

414 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value): 

415 return 

416 

417 # read the defined simulation time step 

418 sim_time_step = self.config.casadi_sim_time_step 

419 mpc_time_step = self.config.time_step 

420 

421 # set the horizon length and the number of simulation steps 

422 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step) 

423 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step) 

424 

425 # read the current optimization result 

426 result_df = solution.df 

427 

428 # initialize the flex sim results Dataframe 

429 self._initialize_flex_results( 

430 n_simulation_steps, total_horizon_time, sim_time_step, result_df 

431 ) 

432 

433 # Update model parameters and initial states 

434 self._update_model_parameters() 

435 self._update_initial_states(result_df) 

436 

437 # Run simulation 

438 self._run_simulation( 

439 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

440 ) 

441 

442 # set index of flex results to the same as mpc result 

443 store_results_df = self.flex_results.copy(deep=True) 

444 store_results_df.index = self.flex_results.index.tolist() 

445 

446 # save results 

447 if not os.path.exists(self.res_file_flex): 

448 store_results_df.to_csv(self.res_file_flex) 

449 else: 

450 store_results_df.to_csv(self.res_file_flex, mode="a", header=False) 

451 

452 # set the flex results format same as mpc result while updating Agentvariable 

453 self.flex_results.index = self.flex_results.index.get_level_values(1) 

454 

455 def _initialize_flex_results( 

456 self, n_simulation_steps, horizon_length, sim_time_step, result_df 

457 ): 

458 """Initialize the flex results dataframe with the correct dimension and index and fill with 

459 existing results from optimization""" 

460 

461 # create MultiIndex for collocation points 

462 index_coll = pd.MultiIndex.from_arrays( 

463 [[self.env.now] * len(result_df.index), result_df.index], 

464 names=["time_step", "time"] 

465 # Match the names with multi_index but note they're reversed 

466 ) 

467 # create Multiindex for full simulation sample times 

468 index_full_sample = pd.MultiIndex.from_tuples( 

469 zip( 

470 [self.env.now] * (n_simulation_steps + 1), 

471 range(0, horizon_length + sim_time_step, sim_time_step), 

472 ), 

473 names=["time_step", "time"], 

474 ) 

475 # merge indexes 

476 new_index = index_coll.union(index_full_sample).sort_values() 

477 # initialize the flex results with correct dimension 

478 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs) 

479 

480 # Get the optimization outputs and create a series for fixed optimization outputs with the 

481 # correct MultiIndex format 

482 opti_outputs = result_df.variable[self.config.power_variable_name] 

483 fixed_opti_output = pd.Series( 

484 opti_outputs.values, 

485 index=index_coll, 

486 ) 

487 # fill the output value at the time step where it already exists in optimization output 

488 for idx in fixed_opti_output.index: 

489 if idx in self.flex_results.index: 

490 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx] 

491 

492 def _update_model_parameters(self): 

493 """update the value of module parameters with value from config, 

494 since creating a model just reads the value in the model class but not the config 

495 """ 

496 

497 for par in self.config.parameters: 

498 self.flex_model.set(par.name, par.value) 

499 

500 def _update_initial_states(self, result_df): 

501 """set the initial value of states""" 

502 

503 # get state values from the mpc optimization result 

504 state_values = result_df.variable[self.var_ref.states] 

505 # update state values with last measurement 

506 for state, value in zip(self.var_ref.states, state_values.iloc[0]): 

507 self.flex_model.set(state, value) 

508 

509 def _run_simulation( 

510 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

511 ): 

512 """simulate with flex model over the prediction horizon""" 

513 

514 # get control and input values from the mpc optimization result 

515 control_values = result_df.variable[ 

516 [*self.var_ref.controls, *self.var_ref.binary_controls] 

517 ].dropna() 

518 input_values = result_df.parameter[self.var_ref.inputs].dropna() 

519 

520 # Get the simulation time step index 

521 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step) 

522 

523 # Reindex the controls and inputs to sim_time_index 

524 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill") 

525 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest") 

526 

527 for i in range(0, n_simulation_steps): 

528 current_sim_time = i * sim_time_step 

529 

530 # Apply control and input values from the appropriate MPC step 

531 for control, value in zip( 

532 self.var_ref.controls, 

533 control_values_full.loc[current_sim_time, self.var_ref.controls], 

534 ): 

535 self.flex_model.set(control, value) 

536 

537 for binary_control, value in zip( 

538 self.var_ref.binary_controls, 

539 control_values_full.loc[current_sim_time, self.var_ref.binary_controls], 

540 ): 

541 self.flex_model.set(binary_control, value) 

542 

543 for input_var, value in zip( 

544 self.var_ref.inputs, input_values_full.loc[current_sim_time] 

545 ): 

546 # change the type of iterable input, since casadi model can't deal with iterable 

547 if issubclass(eval(self.flex_model.get(input_var).type), Iterable): 

548 self.flex_model.get(input_var).type = type(value).__name__ 

549 self.flex_model.set(input_var, value) 

550 

551 # do integration 

552 # reduce the simulation time step so that the total horizon time will not be exceeded 

553 if current_sim_time + sim_time_step <= total_horizon_time: 

554 t_sample = sim_time_step 

555 else: 

556 t_sample = total_horizon_time - current_sim_time 

557 self.flex_model.do_step(t_start=0, t_sample=t_sample) 

558 

559 # save output 

560 for output in self.var_ref.outputs: 

561 self.flex_results.loc[ 

562 (self.env.now, current_sim_time + t_sample), output 

563 ] = self.flex_model.get_output(output).value