Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-06-17 09:09 +0000

1import casadi as ca 

2import pandas as pd 

3from agentlib_mpc.models.casadi_model import ( 

4 CasadiModel, 

5 CasadiInput, 

6 CasadiState, 

7 CasadiParameter, 

8 CasadiOutput, 

9 CasadiModelConfig, 

10) 

11from math import inf 

12 

13 

14class BaselineMPCModelConfig(CasadiModelConfig): 

15 inputs: list[CasadiInput] = [ 

16 CasadiInput( 

17 name="mDot", 

18 value=0.0225, 

19 unit="kg/s", 

20 description="Air mass flow into zone", 

21 ), 

22 CasadiInput( 

23 name="load", value=150, unit="W", description="Heat load into zone" 

24 ), 

25 CasadiInput( 

26 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

27 ), 

28 CasadiInput( 

29 name="T_upper", 

30 value=294.15, 

31 unit="K", 

32 description="Upper boundary (soft) for T.", 

33 ), 

34 CasadiInput( 

35 name="T_lower", 

36 value=292.15, 

37 unit="K", 

38 description="Upper boundary (soft) for T.", 

39 ), 

40 CasadiInput( 

41 name="_P_external", 

42 value=0, 

43 unit="W", 

44 type="pd.Series", 

45 description="External power profile to be provided", 

46 ), 

47 CasadiInput( 

48 name="in_provision", 

49 value=False, 

50 unit="-", 

51 type="bool", 

52 description="Flag signaling if the flexibility is in provision", 

53 ), 

54 CasadiInput( 

55 name="rel_start", 

56 value=0, 

57 unit="s", 

58 type="int", 

59 description="relative start time of the flexibility event", 

60 ), 

61 CasadiInput( 

62 name="rel_end", 

63 value=0, 

64 unit="s", 

65 type="int", 

66 description="relative end time of the flexibility event", 

67 ), 

68 ] 

69 states: list[CasadiState] = [ 

70 CasadiState( 

71 name="T", value=293.15, unit="K", description="Temperature of zone" 

72 ), 

73 CasadiState( 

74 name="T_slack", 

75 value=0, 

76 unit="K", 

77 description="Slack variable of temperature of zone", 

78 ), 

79 ] 

80 parameters: list[CasadiParameter] = [ 

81 CasadiParameter( 

82 name="cp", 

83 value=1000, 

84 unit="J/kg*K", 

85 description="thermal capacity of the air", 

86 ), 

87 CasadiParameter( 

88 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

89 ), 

90 CasadiParameter( 

91 name="s_T", 

92 value=1, 

93 unit="-", 

94 description="Weight for T in constraint function", 

95 ), 

96 CasadiParameter( 

97 name="r_mDot", 

98 value=1, 

99 unit="-", 

100 description="Weight for mDot in objective function", 

101 ), 

102 CasadiParameter( 

103 name="profile_deviation_weight", 

104 value=0, 

105 unit="-", 

106 description="Weight of soft constraint for deviation from accepted flexible profile", 

107 ), 

108 CasadiParameter( 

109 name="profile_comfort_weight", 

110 value=0, 

111 unit="-", 

112 description="Weight of soft constraint for discomfort", 

113 ), 

114 ] 

115 outputs: list[CasadiOutput] = [ 

116 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

117 CasadiOutput( 

118 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

119 ), 

120 CasadiOutput( 

121 name="eta_heater", 

122 unit="-", 

123 value=1, 

124 description="Efficiency of electrical heater", 

125 ), 

126 CasadiOutput( 

127 name="P_el", unit="W", description="The power input to the system" 

128 ), 

129 ] 

130 

131 

132class BaselineMPCModel(CasadiModel): 

133 config: BaselineMPCModelConfig 

134 

135 def setup_system(self): 

136 self.T.ode = ( 

137 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

138 ) 

139 self.P_el.alg = ( 

140 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater 

141 ) 

142 self.eta_heater.alg = 1 

143 self.T_out.alg = self.T 

144 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

145 self.constraints = [ 

146 (self.T_lower, self.T + self.T_slack, inf), 

147 (-inf, self.T - self.T_slack, self.T_upper), 

148 (0, self.T_slack, inf), 

149 ] 

150 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

151 obj_std = objective 

152 return ca.if_else( 

153 self.in_provision.sym, 

154 ca.if_else( 

155 self.time < self.rel_start.sym, 

156 obj_std, 

157 ca.if_else( 

158 self.time >= self.rel_end.sym, 

159 obj_std, 

160 sum( 

161 [ 

162 self.profile_deviation_weight 

163 * (self.P_el - self._P_external) ** 2, 

164 self.T_slack**2 * self.profile_comfort_weight, 

165 ] 

166 ), 

167 ), 

168 ), 

169 obj_std, 

170 ) 

171 

172 

173class PosFlexModelConfig(CasadiModelConfig): 

174 inputs: list[CasadiInput] = [ 

175 CasadiInput( 

176 name="mDot", 

177 value=0.0225, 

178 unit="kg/s", 

179 description="Air mass flow into zone", 

180 ), 

181 CasadiInput( 

182 name="load", value=150, unit="W", description="Heat load into zone" 

183 ), 

184 CasadiInput( 

185 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

186 ), 

187 CasadiInput( 

188 name="T_upper", 

189 value=294.15, 

190 unit="K", 

191 description="Upper boundary (soft) for T.", 

192 ), 

193 CasadiInput( 

194 name="T_lower", 

195 value=292.15, 

196 unit="K", 

197 description="Upper boundary (soft) for T.", 

198 ), 

199 CasadiInput( 

200 name="mDot_full", 

201 value=None, 

202 unit="Not defined", 

203 type="pd.Series", 

204 description="full control trajectory output of baseline mpc", 

205 ), 

206 CasadiInput( 

207 name="in_provision", 

208 value=False, 

209 unit="Not defined", 

210 type="bool", 

211 description="Flag indicating whether flexibility should be provisioned", 

212 ), 

213 ] 

214 states: list[CasadiState] = [ 

215 CasadiState( 

216 name="T", value=293.15, unit="K", description="Temperature of zone" 

217 ), 

218 CasadiState( 

219 name="T_slack", 

220 value=0, 

221 unit="K", 

222 description="Slack variable of temperature of zone", 

223 ), 

224 ] 

225 parameters: list[CasadiParameter] = [ 

226 CasadiParameter( 

227 name="cp", 

228 value=1000, 

229 unit="J/kg*K", 

230 description="thermal capacity of the air", 

231 ), 

232 CasadiParameter( 

233 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

234 ), 

235 CasadiParameter( 

236 name="s_T", 

237 value=1, 

238 unit="-", 

239 description="Weight for T in constraint function", 

240 ), 

241 CasadiParameter( 

242 name="r_mDot", 

243 value=1, 

244 unit="-", 

245 description="Weight for mDot in objective function", 

246 ), 

247 CasadiParameter( 

248 name="prep_time", 

249 value=900, 

250 unit="s", 

251 description="Preparation time before switching objective", 

252 ), 

253 CasadiParameter( 

254 name="flex_event_duration", 

255 value=7200, 

256 unit="s", 

257 description="Duration of the flexibility event", 

258 ), 

259 CasadiParameter( 

260 name="market_time", 

261 value=900, 

262 unit="s", 

263 description="Market time associated with the objective switch", 

264 ), 

265 CasadiParameter( 

266 name="s_P", value=10, unit="Not defined", description="Not defined" 

267 ), 

268 ] 

269 outputs: list[CasadiOutput] = [ 

270 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

271 CasadiOutput( 

272 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

273 ), 

274 CasadiOutput( 

275 name="eta_heater", 

276 unit="-", 

277 value=1, 

278 description="Efficiency of electrical heater", 

279 ), 

280 CasadiOutput( 

281 name="P_el", unit="W", description="The power input to the system" 

282 ), 

283 ] 

284 

285 

286class PosFlexModel(CasadiModel): 

287 config: PosFlexModelConfig 

288 

289 def setup_system(self): 

290 mDot_lower = ca.if_else( 

291 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb 

292 ) 

293 mDot_upper = ca.if_else( 

294 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub 

295 ) 

296 self.T.ode = ( 

297 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

298 ) 

299 self.P_el.alg = ( 

300 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater 

301 ) 

302 self.eta_heater.alg = 1 

303 self.T_out.alg = self.T 

304 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

305 self.constraints = [ 

306 (self.T_lower, self.T + self.T_slack, inf), 

307 (-inf, self.T - self.T_slack, self.T_upper), 

308 (0, self.T_slack, inf), 

309 (mDot_lower, self.mDot, mDot_upper), 

310 ] 

311 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

312 obj_std = objective + self.P_el**2 * 0 

313 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el]) 

314 return ca.if_else( 

315 self.time < self.prep_time.sym + self.market_time.sym, 

316 obj_std, 

317 ca.if_else( 

318 self.time 

319 < self.prep_time.sym 

320 + self.flex_event_duration.sym 

321 + self.market_time.sym, 

322 obj_flex, 

323 obj_std, 

324 ), 

325 ) 

326 

327 

328class NegFlexModelConfig(CasadiModelConfig): 

329 inputs: list[CasadiInput] = [ 

330 CasadiInput( 

331 name="mDot", 

332 value=0.0225, 

333 unit="kg/s", 

334 description="Air mass flow into zone", 

335 ), 

336 CasadiInput( 

337 name="load", value=150, unit="W", description="Heat load into zone" 

338 ), 

339 CasadiInput( 

340 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

341 ), 

342 CasadiInput( 

343 name="T_upper", 

344 value=294.15, 

345 unit="K", 

346 description="Upper boundary (soft) for T.", 

347 ), 

348 CasadiInput( 

349 name="T_lower", 

350 value=292.15, 

351 unit="K", 

352 description="Upper boundary (soft) for T.", 

353 ), 

354 CasadiInput( 

355 name="mDot_full", 

356 value=None, 

357 unit="Not defined", 

358 type="pd.Series", 

359 description="full control trajectory output of baseline mpc", 

360 ), 

361 CasadiInput( 

362 name="in_provision", 

363 value=False, 

364 unit="Not defined", 

365 type="bool", 

366 description="Flag indicating whether flexibility should be provisioned", 

367 ), 

368 ] 

369 states: list[CasadiState] = [ 

370 CasadiState( 

371 name="T", value=293.15, unit="K", description="Temperature of zone" 

372 ), 

373 CasadiState( 

374 name="T_slack", 

375 value=0, 

376 unit="K", 

377 description="Slack variable of temperature of zone", 

378 ), 

379 ] 

380 parameters: list[CasadiParameter] = [ 

381 CasadiParameter( 

382 name="cp", 

383 value=1000, 

384 unit="J/kg*K", 

385 description="thermal capacity of the air", 

386 ), 

387 CasadiParameter( 

388 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

389 ), 

390 CasadiParameter( 

391 name="s_T", 

392 value=1, 

393 unit="-", 

394 description="Weight for T in constraint function", 

395 ), 

396 CasadiParameter( 

397 name="r_mDot", 

398 value=1, 

399 unit="-", 

400 description="Weight for mDot in objective function", 

401 ), 

402 CasadiParameter( 

403 name="prep_time", 

404 value=900, 

405 unit="s", 

406 description="Preparation time before switching objective", 

407 ), 

408 CasadiParameter( 

409 name="flex_event_duration", 

410 value=7200, 

411 unit="s", 

412 description="Duration of the flexibility event", 

413 ), 

414 CasadiParameter( 

415 name="market_time", 

416 value=900, 

417 unit="s", 

418 description="Market time associated with the objective switch", 

419 ), 

420 CasadiParameter( 

421 name="s_P", value=10, unit="Not defined", description="Not defined" 

422 ), 

423 ] 

424 outputs: list[CasadiOutput] = [ 

425 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

426 CasadiOutput( 

427 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

428 ), 

429 CasadiOutput( 

430 name="eta_heater", 

431 unit="-", 

432 value=1, 

433 description="Efficiency of electrical heater", 

434 ), 

435 CasadiOutput( 

436 name="P_el", unit="W", description="The power input to the system" 

437 ), 

438 ] 

439 

440 

441class NegFlexModel(CasadiModel): 

442 config: NegFlexModelConfig 

443 

444 def setup_system(self): 

445 mDot_lower = ca.if_else( 

446 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb 

447 ) 

448 mDot_upper = ca.if_else( 

449 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub 

450 ) 

451 self.T.ode = ( 

452 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

453 ) 

454 self.P_el.alg = ( 

455 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater 

456 ) 

457 self.eta_heater.alg = 1 

458 self.T_out.alg = self.T 

459 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

460 self.constraints = [ 

461 (self.T_lower, self.T + self.T_slack, inf), 

462 (-inf, self.T - self.T_slack, self.T_upper), 

463 (0, self.T_slack, inf), 

464 (mDot_lower, self.mDot, mDot_upper), 

465 ] 

466 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

467 obj_std = objective 

468 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el]) 

469 return ca.if_else( 

470 self.time < self.prep_time.sym + self.market_time.sym, 

471 obj_std, 

472 ca.if_else( 

473 self.time 

474 < self.prep_time.sym 

475 + self.flex_event_duration.sym 

476 + self.market_time.sym, 

477 obj_flex, 

478 obj_std, 

479 ), 

480 )