Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1import casadi as ca 

2import pandas as pd 

3from agentlib_mpc.models.casadi_model import ( 

4 CasadiModel, 

5 CasadiInput, 

6 CasadiState, 

7 CasadiParameter, 

8 CasadiOutput, 

9 CasadiModelConfig, 

10) 

11from math import inf 

12 

13 

14class BaselineMPCModelConfig(CasadiModelConfig): 

15 inputs: list[CasadiInput] = [ 

16 CasadiInput( 

17 name="mDot", 

18 value=0.0225, 

19 unit="kg/s", 

20 description="Air mass flow into zone", 

21 ), 

22 CasadiInput( 

23 name="load", value=150, unit="W", description="Heat load into zone" 

24 ), 

25 CasadiInput( 

26 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

27 ), 

28 CasadiInput( 

29 name="T_upper", 

30 value=294.15, 

31 unit="K", 

32 description="Upper boundary (soft) for T.", 

33 ), 

34 CasadiInput( 

35 name="T_lower", 

36 value=292.15, 

37 unit="K", 

38 description="Upper boundary (soft) for T.", 

39 ), 

40 CasadiInput( 

41 name="_P_external", 

42 value=0, 

43 unit="W", 

44 type="pd.Series", 

45 description="External power profile to be provided", 

46 ), 

47 CasadiInput( 

48 name="in_provision", 

49 value=False, 

50 unit="-", 

51 type="bool", 

52 description="Flag signaling if the flexibility is in provision", 

53 ), 

54 CasadiInput( 

55 name="rel_start", 

56 value=0, 

57 unit="s", 

58 type="int", 

59 description="relative start time of the flexibility event", 

60 ), 

61 CasadiInput( 

62 name="rel_end", 

63 value=0, 

64 unit="s", 

65 type="int", 

66 description="relative end time of the flexibility event", 

67 ), 

68 ] 

69 states: list[CasadiState] = [ 

70 CasadiState( 

71 name="T", value=293.15, unit="K", description="Temperature of zone" 

72 ), 

73 CasadiState( 

74 name="T_slack", 

75 value=0, 

76 unit="K", 

77 description="Slack variable of temperature of zone", 

78 ), 

79 ] 

80 parameters: list[CasadiParameter] = [ 

81 CasadiParameter( 

82 name="cp", 

83 value=1000, 

84 unit="J/kg*K", 

85 description="thermal capacity of the air", 

86 ), 

87 CasadiParameter( 

88 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

89 ), 

90 CasadiParameter( 

91 name="s_T", 

92 value=1, 

93 unit="-", 

94 description="Weight for T in constraint function", 

95 ), 

96 CasadiParameter( 

97 name="r_mDot", 

98 value=1, 

99 unit="-", 

100 description="Weight for mDot in objective function", 

101 ), 

102 CasadiParameter( 

103 name="profile_deviation_weight", 

104 value=0, 

105 unit="-", 

106 description="Weight of soft constraint for deviation from accepted flexible profile", 

107 ), 

108 CasadiParameter( 

109 name="profile_comfort_weight", 

110 value=0, 

111 unit="-", 

112 description="Weight of soft constraint for discomfort", 

113 ), 

114 ] 

115 outputs: list[CasadiOutput] = [ 

116 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

117 CasadiOutput( 

118 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

119 ), 

120 CasadiOutput( 

121 name="P_el", unit="W", description="The power input to the system" 

122 ), 

123 ] 

124 

125 

126class BaselineMPCModel(CasadiModel): 

127 config: BaselineMPCModelConfig 

128 

129 def setup_system(self): 

130 self.T.ode = ( 

131 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

132 ) 

133 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000 

134 self.T_out.alg = self.T 

135 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

136 self.constraints = [ 

137 (self.T_lower, self.T + self.T_slack, inf), 

138 (-inf, self.T - self.T_slack, self.T_upper), 

139 (0, self.T_slack, inf), 

140 ] 

141 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

142 obj_std = objective 

143 return ca.if_else( 

144 self.in_provision.sym, 

145 ca.if_else( 

146 self.time < self.rel_start.sym, 

147 obj_std, 

148 ca.if_else( 

149 self.time >= self.rel_end.sym, 

150 obj_std, 

151 sum( 

152 [ 

153 self.profile_deviation_weight 

154 * (self.P_el - self._P_external) ** 2, 

155 self.T_slack**2 * self.profile_comfort_weight, 

156 ] 

157 ), 

158 ), 

159 ), 

160 obj_std, 

161 ) 

162 

163 

164class PosFlexModelConfig(CasadiModelConfig): 

165 inputs: list[CasadiInput] = [ 

166 CasadiInput( 

167 name="mDot", 

168 value=0.0225, 

169 unit="kg/s", 

170 description="Air mass flow into zone", 

171 ), 

172 CasadiInput( 

173 name="load", value=150, unit="W", description="Heat load into zone" 

174 ), 

175 CasadiInput( 

176 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

177 ), 

178 CasadiInput( 

179 name="T_upper", 

180 value=294.15, 

181 unit="K", 

182 description="Upper boundary (soft) for T.", 

183 ), 

184 CasadiInput( 

185 name="T_lower", 

186 value=292.15, 

187 unit="K", 

188 description="Upper boundary (soft) for T.", 

189 ), 

190 CasadiInput( 

191 name="mDot_full", 

192 value=None, 

193 unit="Not defined", 

194 type="pd.Series", 

195 description="full control trajectory output of baseline mpc", 

196 ), 

197 CasadiInput( 

198 name="in_provision", 

199 value=False, 

200 unit="-", 

201 type="bool", 

202 description="provision flag", 

203 ), 

204 ] 

205 states: list[CasadiState] = [ 

206 CasadiState( 

207 name="T", value=293.15, unit="K", description="Temperature of zone" 

208 ), 

209 CasadiState( 

210 name="T_slack", 

211 value=0, 

212 unit="K", 

213 description="Slack variable of temperature of zone", 

214 ), 

215 ] 

216 parameters: list[CasadiParameter] = [ 

217 CasadiParameter( 

218 name="cp", 

219 value=1000, 

220 unit="J/kg*K", 

221 description="thermal capacity of the air", 

222 ), 

223 CasadiParameter( 

224 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

225 ), 

226 CasadiParameter( 

227 name="s_T", 

228 value=1, 

229 unit="-", 

230 description="Weight for T in constraint function", 

231 ), 

232 CasadiParameter( 

233 name="r_mDot", 

234 value=1, 

235 unit="-", 

236 description="Weight for mDot in objective function", 

237 ), 

238 CasadiParameter( 

239 name="prep_time", value=0, unit="s", description="time to switch objective" 

240 ), 

241 CasadiParameter( 

242 name="flex_event_duration", 

243 value=0, 

244 unit="s", 

245 description="time to switch objective", 

246 ), 

247 CasadiParameter( 

248 name="market_time", 

249 value=0, 

250 unit="s", 

251 description="time to switch objective", 

252 ), 

253 CasadiParameter( 

254 name="s_P", 

255 value=10, 

256 unit="-", 

257 description="Weight for P in objective function", 

258 ), 

259 ] 

260 outputs: list[CasadiOutput] = [ 

261 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

262 CasadiOutput( 

263 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

264 ), 

265 CasadiOutput( 

266 name="P_el", unit="W", description="The power input to the system" 

267 ), 

268 ] 

269 

270 

271class PosFlexModel(CasadiModel): 

272 config: PosFlexModelConfig 

273 

274 def setup_system(self): 

275 mDot_lower = ca.if_else( 

276 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb 

277 ) 

278 mDot_upper = ca.if_else( 

279 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub 

280 ) 

281 self.T.ode = ( 

282 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

283 ) 

284 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000 

285 self.T_out.alg = self.T 

286 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

287 self.constraints = [ 

288 (self.T_lower, self.T + self.T_slack, inf), 

289 (-inf, self.T - self.T_slack, self.T_upper), 

290 (0, self.T_slack, inf), 

291 (mDot_lower, self.mDot, mDot_upper), 

292 ] 

293 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

294 obj_std = objective 

295 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el]) 

296 return ca.if_else( 

297 self.time < self.prep_time.sym + self.market_time.sym, 

298 obj_std, 

299 ca.if_else( 

300 self.time 

301 < self.prep_time.sym 

302 + self.flex_event_duration.sym 

303 + self.market_time.sym, 

304 obj_flex, 

305 obj_std, 

306 ), 

307 ) 

308 

309 

310class NegFlexModelConfig(CasadiModelConfig): 

311 inputs: list[CasadiInput] = [ 

312 CasadiInput( 

313 name="mDot", 

314 value=0.0225, 

315 unit="kg/s", 

316 description="Air mass flow into zone", 

317 ), 

318 CasadiInput( 

319 name="load", value=150, unit="W", description="Heat load into zone" 

320 ), 

321 CasadiInput( 

322 name="T_in", value=280.15, unit="K", description="Inflow air temperature" 

323 ), 

324 CasadiInput( 

325 name="T_upper", 

326 value=294.15, 

327 unit="K", 

328 description="Upper boundary (soft) for T.", 

329 ), 

330 CasadiInput( 

331 name="T_lower", 

332 value=292.15, 

333 unit="K", 

334 description="Upper boundary (soft) for T.", 

335 ), 

336 CasadiInput( 

337 name="mDot_full", 

338 value=None, 

339 unit="Not defined", 

340 type="pd.Series", 

341 description="full control trajectory output of baseline mpc", 

342 ), 

343 CasadiInput( 

344 name="in_provision", 

345 value=False, 

346 unit="-", 

347 type="bool", 

348 description="provision flag", 

349 ), 

350 ] 

351 states: list[CasadiState] = [ 

352 CasadiState( 

353 name="T", value=293.15, unit="K", description="Temperature of zone" 

354 ), 

355 CasadiState( 

356 name="T_slack", 

357 value=0, 

358 unit="K", 

359 description="Slack variable of temperature of zone", 

360 ), 

361 ] 

362 parameters: list[CasadiParameter] = [ 

363 CasadiParameter( 

364 name="cp", 

365 value=1000, 

366 unit="J/kg*K", 

367 description="thermal capacity of the air", 

368 ), 

369 CasadiParameter( 

370 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

371 ), 

372 CasadiParameter( 

373 name="s_T", 

374 value=1, 

375 unit="-", 

376 description="Weight for T in constraint function", 

377 ), 

378 CasadiParameter( 

379 name="r_mDot", 

380 value=1, 

381 unit="-", 

382 description="Weight for mDot in objective function", 

383 ), 

384 CasadiParameter( 

385 name="prep_time", value=0, unit="s", description="time to switch objective" 

386 ), 

387 CasadiParameter( 

388 name="flex_event_duration", 

389 value=0, 

390 unit="s", 

391 description="time to switch objective", 

392 ), 

393 CasadiParameter( 

394 name="market_time", 

395 value=0, 

396 unit="s", 

397 description="time to switch objective", 

398 ), 

399 CasadiParameter( 

400 name="s_P", 

401 value=10, 

402 unit="-", 

403 description="Weight for P in objective function", 

404 ), 

405 ] 

406 outputs: list[CasadiOutput] = [ 

407 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

408 CasadiOutput( 

409 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K" 

410 ), 

411 CasadiOutput( 

412 name="P_el", unit="W", description="The power input to the system" 

413 ), 

414 ] 

415 

416 

417class NegFlexModel(CasadiModel): 

418 config: NegFlexModelConfig 

419 

420 def setup_system(self): 

421 mDot_lower = ca.if_else( 

422 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb 

423 ) 

424 mDot_upper = ca.if_else( 

425 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub 

426 ) 

427 self.T.ode = ( 

428 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C 

429 ) 

430 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000 

431 self.T_out.alg = self.T 

432 self.E_out.alg = -self.T * self.C / (3600 * 1000) 

433 self.constraints = [ 

434 (self.T_lower, self.T + self.T_slack, inf), 

435 (-inf, self.T - self.T_slack, self.T_upper), 

436 (0, self.T_slack, inf), 

437 (mDot_lower, self.mDot, mDot_upper), 

438 ] 

439 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2]) 

440 obj_std = objective 

441 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el]) 

442 return ca.if_else( 

443 self.time < self.prep_time.sym + self.market_time.sym, 

444 obj_std, 

445 ca.if_else( 

446 self.time 

447 < self.prep_time.sym 

448 + self.flex_event_duration.sym 

449 + self.market_time.sym, 

450 obj_flex, 

451 obj_std, 

452 ), 

453 )