Coverage for agentlib_flexquant/modules/baseline_mpc.py: 65%

224 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1""" 

2Defines MPC and MINLP-MPC for baseline flexibility quantification. 

3""" 

4from agentlib_mpc.modules import minlp_mpc, mpc_full 

5import os 

6import math 

7import numpy as np 

8import pandas as pd 

9from typing import Optional, Dict 

10from pydantic import Field 

11from collections.abc import Iterable 

12import agentlib_flexquant.data_structures.globals as glbs 

13from agentlib import AgentVariable 

14from agentlib_mpc.modules import mpc_full, minlp_mpc 

15from agentlib_mpc.data_structures.mpc_datamodels import Results 

16from agentlib_flexquant.data_structures.globals import full_trajectory_suffix 

17 

18 

19class FlexibilityBaselineMPCConfig(mpc_full.MPCConfig): 

20 # define an AgentVariable list for the full control trajectory, since use MPCVariable output 

21 # affects the optimization result 

22 full_controls: list[AgentVariable] = Field(default=[]) 

23 

24 casadi_sim_time_step: int = Field( 

25 default=0, 

26 description="Time step for simulation with Casadi" 

27 "simulator. Value is read from " 

28 "FlexQuantConfig", 

29 ) 

30 power_variable_name: str = Field( 

31 default=None, description="Name of the power variable in the " "baseline mpc model." 

32 ) 

33 storage_variable_name: Optional[str] = Field( 

34 default=None, description="Name of the storage v" "ariable in the baseline " "mpc model." 

35 ) 

36 

37 

38class FlexibilityBaselineMPC(mpc_full.MPC): 

39 """MPC for baseline flexibility quantification.""" 

40 

41 config: FlexibilityBaselineMPCConfig 

42 

43 def __init__(self, config, agent): 

44 super().__init__(config, agent) 

45 # initialize a control mapping dictionary which maps the full control names to the control 

46 # names 

47 self._controls_name_mapping: Dict[str, str] = {} 

48 

49 for full_control in self.config.full_controls: 

50 # add full_control to the variables dictionary, so that the set function can be applied 

51 # to it 

52 self._variables_dict[full_control.name] = full_control 

53 # fill the mapping dictionary 

54 self._controls_name_mapping[full_control.name] = full_control.name.replace( 

55 full_trajectory_suffix, "" 

56 ) 

57 

58 # initialize flex_results with None 

59 self.flex_results = None 

60 # set up necessary components if simulation is enabled 

61 if self.config.casadi_sim_time_step > 0: 

62 # generate a separate flex_model for integration to ensure 

63 # the model used in MPC optimization remains unaffected 

64 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step) 

65 # generate the filename for the simulation results 

66 self.res_file_flex = self.config.optimization_backend["results_file"].replace( 

67 "mpc", "mpc_sim" 

68 ) 

69 # clear the casadi simulator result at the first time step if already exists 

70 try: 

71 os.remove(self.res_file_flex) 

72 except: 

73 pass 

74 

75 def pre_computation_hook(self): 

76 """Calculate relative start and end times for flexibility provision. 

77 

78 When in provision mode, computes the relative timing for flexibility 

79 events based on the external power profile timestamps and current 

80 environment time. 

81 """ 

82 if self.get("in_provision").value: 

83 self.set("rel_start", self.get("_P_external").value.index[0] - self.env.time) 

84 # the provision profile gives a value for the start of a time step. 

85 # For the end of the flex interval add time step! 

86 self.set("rel_end", self.get("_P_external").value.index[-1] - self.env.time) 

87 

88 def set_actuation(self, solution: Results): 

89 super().set_actuation(solution) 

90 for full_control in self.config.full_controls: 

91 # get the corresponding control name 

92 control = self._controls_name_mapping[full_control.name] 

93 # set value to full_control 

94 self.set(full_control.name, solution.df.variable[control].ffill()) 

95 

96 def set_output(self, solution): 

97 """Takes the solution from optimization backend and sends it to AgentVariables.""" 

98 # Output must be defined in the config as "type"="pd.Series" 

99 if not self.config.set_outputs: 

100 return 

101 self.logger.info("Sending optimal output values to data_broker.") 

102 

103 # simulate with the casadi simulator 

104 self.sim_flex_model(solution) 

105 

106 # extract solution DataFrama 

107 df = solution.df 

108 

109 # send the outputs 

110 if self.flex_results is not None: 

111 for output in self.var_ref.outputs: 

112 if output not in [ 

113 self.config.power_variable_name, 

114 self.config.storage_variable_name, 

115 ]: 

116 series = df.variable[output] 

117 self.set(output, series) 

118 # send the power and storage variable value from simulation results 

119 upsampled_output_power = self.flex_results[self.config.power_variable_name] 

120 self.set(self.config.power_variable_name, upsampled_output_power) 

121 if self.config.storage_variable_name is not None: 

122 upsampled_output_storage = self.flex_results[self.config.storage_variable_name] 

123 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna()) 

124 else: 

125 for output in self.var_ref.outputs: 

126 series = df.variable[output] 

127 self.set(output, series) 

128 

129 def sim_flex_model(self, solution): 

130 """simulate the flex model over the preditcion horizon and save results""" 

131 

132 # return if sim_time_step is not a positive integer and system is in provision 

133 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value): 

134 return 

135 

136 # read the defined simulation time step 

137 sim_time_step = self.config.casadi_sim_time_step 

138 mpc_time_step = self.config.time_step 

139 

140 # set the horizon length and the number of simulation steps 

141 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step) 

142 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step) 

143 

144 # read the current optimization result 

145 result_df = solution.df 

146 

147 # initialize the flex sim results Dataframe 

148 self._initialize_flex_results( 

149 n_simulation_steps, total_horizon_time, sim_time_step, result_df 

150 ) 

151 

152 # Update model parameters and initial states 

153 self._update_model_parameters() 

154 self._update_initial_states(result_df) 

155 

156 # Run simulation 

157 self._run_simulation( 

158 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

159 ) 

160 

161 # set index of flex results to the same as mpc result 

162 store_results_df = self.flex_results.copy(deep=True) 

163 store_results_df.index = self.flex_results.index.tolist() 

164 

165 # save results 

166 if not os.path.exists(self.res_file_flex): 

167 store_results_df.to_csv(self.res_file_flex) 

168 else: 

169 store_results_df.to_csv(self.res_file_flex, mode="a", header=False) 

170 

171 # set the flex results format same as mpc result while updating Agentvariable 

172 self.flex_results.index = self.flex_results.index.get_level_values(1) 

173 

174 def _initialize_flex_results( 

175 self, n_simulation_steps, horizon_length, sim_time_step, result_df 

176 ): 

177 """Initialize the flex results dataframe with the correct dimension and index 

178 and fill with existing results from optimization 

179 """ 

180 

181 # create MultiIndex for collocation points 

182 index_coll = pd.MultiIndex.from_arrays( 

183 [[self.env.now] * len(result_df.index), result_df.index], 

184 names=["time_step", "time"] 

185 # Match the names with multi_index but note they're reversed 

186 ) 

187 # create Multiindex for full simulation sample times 

188 index_full_sample = pd.MultiIndex.from_tuples( 

189 zip( 

190 [self.env.now] * (n_simulation_steps + 1), 

191 range(0, horizon_length + sim_time_step, sim_time_step), 

192 ), 

193 names=["time_step", "time"], 

194 ) 

195 # merge indexes 

196 new_index = index_coll.union(index_full_sample).sort_values() 

197 # initialize the flex results with correct dimension 

198 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs) 

199 

200 # Get the optimization outputs and create a series for fixed optimization outputs with the 

201 # correct MultiIndex format 

202 opti_outputs = result_df.variable[self.config.power_variable_name] 

203 fixed_opti_output = pd.Series( 

204 opti_outputs.values, 

205 index=index_coll, 

206 ) 

207 # fill the output value at the time step where it already exists in optimization output 

208 for idx in fixed_opti_output.index: 

209 if idx in self.flex_results.index: 

210 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx] 

211 

212 def _update_model_parameters(self): 

213 """update the value of module parameters with value from config, 

214 since creating a model just reads the value in the model class but not the config 

215 """ 

216 

217 for par in self.config.parameters: 

218 self.flex_model.set(par.name, par.value) 

219 

220 def _update_initial_states(self, result_df): 

221 """set the initial value of states""" 

222 

223 # get state values from the mpc optimization result 

224 state_values = result_df.variable[self.var_ref.states] 

225 # update state values with last measurement 

226 for state, value in zip(self.var_ref.states, state_values.iloc[0]): 

227 self.flex_model.set(state, value) 

228 

229 def _run_simulation( 

230 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

231 ): 

232 """simulate with flex model over the prediction horizon""" 

233 

234 # get control and input values from the mpc optimization result 

235 control_values = result_df.variable[self.var_ref.controls].dropna() 

236 input_values = result_df.parameter[self.var_ref.inputs].dropna() 

237 

238 # Get the simulation time step index 

239 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step) 

240 

241 # Reindex the controls and inputs to sim_time_index 

242 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill") 

243 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest") 

244 

245 for i in range(0, n_simulation_steps): 

246 current_sim_time = i * sim_time_step 

247 

248 # Apply control and input values from the appropriate MPC step 

249 for control, value in zip( 

250 self.var_ref.controls, control_values_full.loc[current_sim_time] 

251 ): 

252 self.flex_model.set(control, value) 

253 

254 for input_var, value in zip( 

255 self.var_ref.inputs, input_values_full.loc[current_sim_time] 

256 ): 

257 # change the type of iterable input, since casadi model can't deal with iterable 

258 if issubclass(eval(self.flex_model.get(input_var).type), Iterable): 

259 self.flex_model.get(input_var).type = type(value).__name__ 

260 self.flex_model.set(input_var, value) 

261 

262 # do integration 

263 # reduce the simulation time step so that the total horizon time will not be exceeded 

264 if current_sim_time + sim_time_step <= total_horizon_time: 

265 t_sample = sim_time_step 

266 else: 

267 t_sample = total_horizon_time - current_sim_time 

268 self.flex_model.do_step(t_start=0, t_sample=t_sample) 

269 

270 # save output 

271 for output in self.var_ref.outputs: 

272 self.flex_results.loc[ 

273 (self.env.now, current_sim_time + t_sample), output 

274 ] = self.flex_model.get_output(output).value 

275 

276 

277class FlexibilityBaselineMINLPMPCConfig(minlp_mpc.MINLPMPCConfig): 

278 

279 # define an AgentVariable list for the full control trajectory, since use MPCVariable output 

280 # affects the optimization result 

281 full_controls: list[AgentVariable] = Field(default=[]) 

282 

283 casadi_sim_time_step: int = Field( 

284 default=0, 

285 description="Time step for simulation with Casadi simulator. Value is read from " 

286 "FlexQuantConfig", 

287 ) 

288 power_variable_name: str = Field( 

289 default=None, description="Name of the power variable in the baseline mpc model." 

290 ) 

291 storage_variable_name: Optional[str] = Field( 

292 default=None, description="Name of the storage variable in the baseline mpc model." 

293 ) 

294 

295 

296class FlexibilityBaselineMINLPMPC(minlp_mpc.MINLPMPC): 

297 """MINLP-MPC for baseline flexibility quantification with mixed-integer optimization.""" 

298 

299 config: FlexibilityBaselineMINLPMPCConfig 

300 

301 def __init__(self, config, agent): 

302 super().__init__(config, agent) 

303 # initialize a control mapping dictionary which maps the full control names to the control 

304 # names 

305 self._controls_name_mapping: Dict[str, str] = {} 

306 

307 for full_control in self.config.full_controls: 

308 # add full_control to the variables dictionary, so that the set function can be applied 

309 # to it 

310 self._variables_dict[full_control.name] = full_control 

311 # fill the mapping dictionary 

312 self._controls_name_mapping[full_control.name] = full_control.name.replace( 

313 full_trajectory_suffix, "" 

314 ) 

315 

316 # initialize flex_results with None 

317 self.flex_results = None 

318 # set up necessary components if simulation is enabled 

319 if self.config.casadi_sim_time_step > 0: 

320 # generate a separate flex_model for integration to ensure the model used in MPC 

321 # optimization remains unaffected 

322 self.flex_model = type(self.model)(dt=self.config.casadi_sim_time_step) 

323 # generate the filename for the simulation results 

324 self.res_file_flex = self.config.optimization_backend["results_file"].replace( 

325 "mpc", "mpc_sim" 

326 ) 

327 # clear the casadi simulator result at the first time step if already exists 

328 try: 

329 os.remove(self.res_file_flex) 

330 except: 

331 pass 

332 

333 def pre_computation_hook(self): 

334 """Calculate relative start and end times for flexibility provision. 

335 

336 When in provision mode, computes the relative timing for flexibility 

337 events based on the external power profile timestamps and current 

338 environment time. 

339 """ 

340 if self.get("in_provision").value: 

341 timestep = ( 

342 self.get("_P_external").value.index[1] - self.get("_P_external").value.index[0] 

343 ) 

344 self.set("rel_start", self.get("_P_external").value.index[0] - self.env.time) 

345 # the provision profile gives a value for the start of a time step. 

346 # For the end of the flex interval add time step! 

347 self.set( 

348 "rel_end", 

349 self.get("_P_external").value.index[-1] - self.env.time + timestep, 

350 ) 

351 

352 def set_actuation(self, solution: Results): 

353 super().set_actuation(solution) 

354 for full_control in self.config.full_controls: 

355 # get the corresponding control name 

356 control = self._controls_name_mapping[full_control.name] 

357 # set value to full_control 

358 self.set(full_control.name, solution.df.variable[control].ffill()) 

359 

360 def set_output(self, solution): 

361 """Takes the solution from optimization backend and sends it to AgentVariables.""" 

362 # Output must be defined in the config as "type"="pd.Series" 

363 if not self.config.set_outputs: 

364 return 

365 self.logger.info("Sending optimal output values to data_broker.") 

366 

367 # simulate with the casadi simulator 

368 self.sim_flex_model(solution) 

369 

370 df = solution.df 

371 if self.flex_results is not None: 

372 for output in self.var_ref.outputs: 

373 if output not in [ 

374 self.config.power_variable_name, 

375 self.config.storage_variable_name, 

376 ]: 

377 series = df.variable[output] 

378 self.set(output, series) 

379 # send the power and storage variable value from simulation results 

380 upsampled_output_power = self.flex_results[self.config.power_variable_name] 

381 self.set(self.config.power_variable_name, upsampled_output_power) 

382 if self.config.storage_variable_name is not None: 

383 upsampled_output_storage = self.flex_results[self.config.storage_variable_name] 

384 self.set(self.config.storage_variable_name, upsampled_output_storage.dropna()) 

385 else: 

386 for output in self.var_ref.outputs: 

387 series = df.variable[output] 

388 self.set(output, series) 

389 

390 def sim_flex_model(self, solution): 

391 """simulate the flex model over the preditcion horizon and save results""" 

392 

393 # return if sim_time_step is not a positive integer and system is in provision 

394 if not (self.config.casadi_sim_time_step > 0 and not self.get("in_provision").value): 

395 return 

396 

397 # read the defined simulation time step 

398 sim_time_step = self.config.casadi_sim_time_step 

399 mpc_time_step = self.config.time_step 

400 

401 # set the horizon length and the number of simulation steps 

402 total_horizon_time = int(self.config.prediction_horizon * self.config.time_step) 

403 n_simulation_steps = math.ceil(total_horizon_time / sim_time_step) 

404 

405 # read the current optimization result 

406 result_df = solution.df 

407 

408 # initialize the flex sim results Dataframe 

409 self._initialize_flex_results( 

410 n_simulation_steps, total_horizon_time, sim_time_step, result_df 

411 ) 

412 

413 # Update model parameters and initial states 

414 self._update_model_parameters() 

415 self._update_initial_states(result_df) 

416 

417 # Run simulation 

418 self._run_simulation( 

419 n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

420 ) 

421 

422 # set index of flex results to the same as mpc result 

423 store_results_df = self.flex_results.copy(deep=True) 

424 store_results_df.index = self.flex_results.index.tolist() 

425 

426 # save results 

427 if not os.path.exists(self.res_file_flex): 

428 store_results_df.to_csv(self.res_file_flex) 

429 else: 

430 store_results_df.to_csv(self.res_file_flex, mode="a", header=False) 

431 

432 # set the flex results format same as mpc result while updating Agentvariable 

433 self.flex_results.index = self.flex_results.index.get_level_values(1) 

434 

435 def _initialize_flex_results( 

436 self, n_simulation_steps, horizon_length, sim_time_step, result_df 

437 ): 

438 """Initialize the flex results dataframe with the correct dimension and index and fill 

439 with existing results from optimization""" 

440 

441 # create MultiIndex for collocation points 

442 index_coll = pd.MultiIndex.from_arrays( 

443 [[self.env.now] * len(result_df.index), result_df.index], 

444 names=["time_step", "time"] 

445 # Match the names with multi_index but note they're reversed 

446 ) 

447 # create Multiindex for full simulation sample times 

448 index_full_sample = pd.MultiIndex.from_tuples( 

449 zip( 

450 [self.env.now] * (n_simulation_steps + 1), 

451 range(0, horizon_length + sim_time_step, sim_time_step), 

452 ), 

453 names=["time_step", "time"], 

454 ) 

455 # merge indexes 

456 new_index = index_coll.union(index_full_sample).sort_values() 

457 # initialize the flex results with correct dimension 

458 self.flex_results = pd.DataFrame(np.nan, index=new_index, columns=self.var_ref.outputs) 

459 

460 # Get the optimization outputs and create a series for fixed optimization outputs with 

461 # the correct MultiIndex format 

462 opti_outputs = result_df.variable[self.config.power_variable_name] 

463 fixed_opti_output = pd.Series( 

464 opti_outputs.values, 

465 index=index_coll, 

466 ) 

467 # fill the output value at the time step where it already exists in optimization output 

468 for idx in fixed_opti_output.index: 

469 if idx in self.flex_results.index: 

470 self.flex_results.loc[idx, self.config.power_variable_name] = fixed_opti_output[idx] 

471 

472 def _update_model_parameters(self): 

473 """update the value of module parameters with value from config, 

474 since creating a model just reads the value in the model class but not the config 

475 """ 

476 

477 for par in self.config.parameters: 

478 self.flex_model.set(par.name, par.value) 

479 

480 def _update_initial_states(self, result_df): 

481 """set the initial value of states""" 

482 

483 # get state values from the mpc optimization result 

484 state_values = result_df.variable[self.var_ref.states] 

485 # update state values with last measurement 

486 for state, value in zip(self.var_ref.states, state_values.iloc[0]): 

487 self.flex_model.set(state, value) 

488 

489 def _run_simulation( 

490 self, n_simulation_steps, sim_time_step, mpc_time_step, result_df, total_horizon_time 

491 ): 

492 """simulate with flex model over the prediction horizon""" 

493 

494 # get control and input values from the mpc optimization result 

495 control_values = result_df.variable[ 

496 [*self.var_ref.controls, *self.var_ref.binary_controls] 

497 ].dropna() 

498 input_values = result_df.parameter[self.var_ref.inputs].dropna() 

499 

500 # Get the simulation time step index 

501 sim_time_index = np.arange(0, (n_simulation_steps + 1) * sim_time_step, sim_time_step) 

502 

503 # Reindex the controls and inputs to sim_time_index 

504 control_values_full = control_values.copy().reindex(sim_time_index, method="ffill") 

505 input_values_full = input_values.copy().reindex(sim_time_index, method="nearest") 

506 

507 for i in range(0, n_simulation_steps): 

508 current_sim_time = i * sim_time_step 

509 

510 # Apply control and input values from the appropriate MPC step 

511 for control, value in zip( 

512 self.var_ref.controls, 

513 control_values_full.loc[current_sim_time, self.var_ref.controls], 

514 ): 

515 self.flex_model.set(control, value) 

516 

517 for binary_control, value in zip( 

518 self.var_ref.binary_controls, 

519 control_values_full.loc[current_sim_time, self.var_ref.binary_controls], 

520 ): 

521 self.flex_model.set(binary_control, value) 

522 

523 for input_var, value in zip( 

524 self.var_ref.inputs, input_values_full.loc[current_sim_time] 

525 ): 

526 # change the type of iterable input, since casadi model can't deal with iterable 

527 if issubclass(eval(self.flex_model.get(input_var).type), Iterable): 

528 self.flex_model.get(input_var).type = type(value).__name__ 

529 self.flex_model.set(input_var, value) 

530 

531 # do integration 

532 # reduce the simulation time step so that the total horizon time will not be exceeded 

533 if current_sim_time + sim_time_step <= total_horizon_time: 

534 t_sample = sim_time_step 

535 else: 

536 t_sample = total_horizon_time - current_sim_time 

537 self.flex_model.do_step(t_start=0, t_sample=t_sample) 

538 

539 # save output 

540 for output in self.var_ref.outputs: 

541 self.flex_results.loc[ 

542 (self.env.now, current_sim_time + t_sample), output 

543 ] = self.flex_model.get_output(output).value