Coverage for examples/OneRoom_CIA/flex_output_data/created_flex_files/flex_agents.py: 100%

68 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1import casadi as ca 

2import pandas as pd 

3import logging 

4import os 

5from pathlib import Path 

6import casadi as ca 

7from agentlib_mpc.models.casadi_model import ( 

8 CasadiModel, 

9 CasadiInput, 

10 CasadiState, 

11 CasadiParameter, 

12 CasadiOutput, 

13 CasadiModelConfig, 

14) 

15from agentlib.utils.multi_agent_system import LocalMASAgency 

16 

17logger = logging.getLogger(__name__) 

18ub = 295.15 

19COOLING = 1000 

20 

21 

22class BaselineMPCModelConfig(CasadiModelConfig): 

23 inputs: list[CasadiInput] = [ 

24 CasadiInput( 

25 name="cooling_power", 

26 value=400, 

27 unit="W", 

28 description="Air mass flow into zone", 

29 ), 

30 CasadiInput( 

31 name="cooler_on", 

32 value=1, 

33 unit="-", 

34 description="On / off signal of mass flow.", 

35 lb=0, 

36 ub=1, 

37 ), 

38 CasadiInput( 

39 name="load", value=150, unit="W", description="Heat load into zone" 

40 ), 

41 CasadiInput( 

42 name="T_in", value=290.15, unit="K", description="Inflow air temperature" 

43 ), 

44 CasadiInput( 

45 name="T_upper", 

46 value=294.15, 

47 unit="K", 

48 description="Upper boundary (soft) for T.", 

49 ), 

50 CasadiInput( 

51 name="_P_external", 

52 value=0, 

53 unit="W", 

54 type="pd.Series", 

55 description="External power profile to be provided", 

56 ), 

57 CasadiInput( 

58 name="in_provision", 

59 value=False, 

60 unit="-", 

61 type="bool", 

62 description="Flag signaling if the flexibility is in provision", 

63 ), 

64 CasadiInput( 

65 name="rel_start", 

66 value=0, 

67 unit="s", 

68 type="int", 

69 description="relative start time of the flexibility event", 

70 ), 

71 CasadiInput( 

72 name="rel_end", 

73 value=0, 

74 unit="s", 

75 type="int", 

76 description="relative end time of the flexibility event", 

77 ), 

78 ] 

79 states: list[CasadiState] = [ 

80 CasadiState( 

81 name="T", value=293.15, unit="K", description="Temperature of zone" 

82 ), 

83 CasadiState( 

84 name="T_slack", 

85 value=0, 

86 unit="K", 

87 description="Slack variable of temperature of zone", 

88 ), 

89 ] 

90 parameters: list[CasadiParameter] = [ 

91 CasadiParameter( 

92 name="cp", 

93 value=1000, 

94 unit="J/kg*K", 

95 description="thermal capacity of the air", 

96 ), 

97 CasadiParameter( 

98 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

99 ), 

100 CasadiParameter( 

101 name="s_T", 

102 value=1, 

103 unit="-", 

104 description="Weight for T in constraint function", 

105 ), 

106 CasadiParameter( 

107 name="r_cooling", 

108 value=1 / 5, 

109 unit="-", 

110 description="Weight for mDot in objective function", 

111 ), 

112 CasadiParameter( 

113 name="cooler_mod_limit", 

114 value=200, 

115 unit="W", 

116 description="Cooling power cannot modulate below this value", 

117 ), 

118 CasadiParameter( 

119 name="profile_deviation_weight", 

120 value=0, 

121 unit="-", 

122 description="Weight of soft constraint for deviation from accepted flexible profile", 

123 ), 

124 ] 

125 outputs: list[CasadiOutput] = [ 

126 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

127 CasadiOutput(name="P_el", unit="W", description="Electrical power"), 

128 ] 

129 

130 

131class BaselineMPCModel(CasadiModel): 

132 config: BaselineMPCModelConfig 

133 

134 def setup_system(self): 

135 self.T.ode = (self.load - self.cooling_power) / self.C 

136 self.T_out.alg = self.T 

137 self.P_el.alg = self.cooling_power 

138 self.constraints = [ 

139 (-ca.inf, self.cooling_power - self.cooler_on * COOLING, 0), 

140 (0, self.cooling_power - self.cooler_on * self.cooler_mod_limit, ca.inf), 

141 (0, self.T + self.T_slack, self.T_upper), 

142 ] 

143 objective = sum( 

144 [self.r_cooling * self.cooling_power, self.s_T * self.T_slack**2] 

145 ) 

146 obj_std = objective 

147 return ca.if_else( 

148 self.in_provision.sym, 

149 ca.if_else( 

150 self.time < self.rel_start.sym, 

151 obj_std, 

152 ca.if_else( 

153 self.time >= self.rel_end.sym, 

154 obj_std, 

155 sum( 

156 [ 

157 self.profile_deviation_weight 

158 * (self.P_el - self._P_external) ** 2 

159 ] 

160 ), 

161 ), 

162 ), 

163 obj_std, 

164 ) 

165 

166 

167logger = logging.getLogger(__name__) 

168ub = 295.15 

169COOLING = 1000 

170 

171 

172class PosFlexModelConfig(CasadiModelConfig): 

173 inputs: list[CasadiInput] = [ 

174 CasadiInput( 

175 name="cooling_power", 

176 value=400, 

177 unit="W", 

178 description="Air mass flow into zone", 

179 ), 

180 CasadiInput( 

181 name="cooler_on", 

182 value=1, 

183 unit="-", 

184 description="On / off signal of mass flow.", 

185 lb=0, 

186 ub=1, 

187 ), 

188 CasadiInput( 

189 name="load", value=150, unit="W", description="Heat load into zone" 

190 ), 

191 CasadiInput( 

192 name="T_in", value=290.15, unit="K", description="Inflow air temperature" 

193 ), 

194 CasadiInput( 

195 name="T_upper", 

196 value=294.15, 

197 unit="K", 

198 description="Upper boundary (soft) for T.", 

199 ), 

200 CasadiInput( 

201 name="cooling_power_full", 

202 value=None, 

203 unit="Not defined", 

204 type="pd.Series", 

205 description="full control trajectory output of baseline mpc", 

206 ), 

207 CasadiInput( 

208 name="cooler_on_full", 

209 value=None, 

210 unit="Not defined", 

211 type="pd.Series", 

212 description="full control trajectory output of baseline mpc", 

213 ), 

214 CasadiInput( 

215 name="in_provision", 

216 value=False, 

217 unit="-", 

218 type="bool", 

219 description="provision flag", 

220 ), 

221 ] 

222 states: list[CasadiState] = [ 

223 CasadiState( 

224 name="T", value=293.15, unit="K", description="Temperature of zone" 

225 ), 

226 CasadiState( 

227 name="T_slack", 

228 value=0, 

229 unit="K", 

230 description="Slack variable of temperature of zone", 

231 ), 

232 ] 

233 parameters: list[CasadiParameter] = [ 

234 CasadiParameter( 

235 name="cp", 

236 value=1000, 

237 unit="J/kg*K", 

238 description="thermal capacity of the air", 

239 ), 

240 CasadiParameter( 

241 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

242 ), 

243 CasadiParameter( 

244 name="s_T", 

245 value=1, 

246 unit="-", 

247 description="Weight for T in constraint function", 

248 ), 

249 CasadiParameter( 

250 name="r_cooling", 

251 value=1 / 5, 

252 unit="-", 

253 description="Weight for mDot in objective function", 

254 ), 

255 CasadiParameter( 

256 name="cooler_mod_limit", 

257 value=200, 

258 unit="W", 

259 description="Cooling power cannot modulate below this value", 

260 ), 

261 CasadiParameter( 

262 name="prep_time", value=0, unit="s", description="time to switch objective" 

263 ), 

264 CasadiParameter( 

265 name="flex_event_duration", 

266 value=0, 

267 unit="s", 

268 description="time to switch objective", 

269 ), 

270 CasadiParameter( 

271 name="market_time", 

272 value=0, 

273 unit="s", 

274 description="time to switch objective", 

275 ), 

276 CasadiParameter( 

277 name="s_P", 

278 value=10, 

279 unit="-", 

280 description="Weight for P in objective function", 

281 ), 

282 ] 

283 outputs: list[CasadiOutput] = [ 

284 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

285 CasadiOutput(name="P_el", unit="W", description="Electrical power"), 

286 ] 

287 

288 

289class PosFlexModel(CasadiModel): 

290 config: PosFlexModelConfig 

291 

292 def setup_system(self): 

293 cooling_power_lower = ca.if_else( 

294 self.time < self.market_time.sym, 

295 self.cooling_power_full.sym, 

296 self.cooling_power.lb, 

297 ) 

298 cooling_power_upper = ca.if_else( 

299 self.time < self.market_time.sym, 

300 self.cooling_power_full.sym, 

301 self.cooling_power.ub, 

302 ) 

303 self.T.ode = (self.load - self.cooling_power) / self.C 

304 self.T_out.alg = self.T 

305 self.P_el.alg = self.cooling_power 

306 self.constraints = [ 

307 (-ca.inf, self.cooling_power - self.cooler_on * COOLING, 0), 

308 (0, self.cooling_power - self.cooler_on * self.cooler_mod_limit, ca.inf), 

309 (0, self.T + self.T_slack, self.T_upper), 

310 (cooling_power_lower, self.cooling_power, cooling_power_upper), 

311 ] 

312 objective = sum( 

313 [self.r_cooling * self.cooling_power, self.s_T * self.T_slack**2] 

314 ) 

315 obj_std = objective 

316 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el]) 

317 return ca.if_else( 

318 self.time < self.prep_time.sym + self.market_time.sym, 

319 obj_std, 

320 ca.if_else( 

321 self.time 

322 < self.prep_time.sym 

323 + self.flex_event_duration.sym 

324 + self.market_time.sym, 

325 obj_flex, 

326 obj_std, 

327 ), 

328 ) 

329 

330 

331logger = logging.getLogger(__name__) 

332ub = 295.15 

333COOLING = 1000 

334 

335 

336class NegFlexModelConfig(CasadiModelConfig): 

337 inputs: list[CasadiInput] = [ 

338 CasadiInput( 

339 name="cooling_power", 

340 value=400, 

341 unit="W", 

342 description="Air mass flow into zone", 

343 ), 

344 CasadiInput( 

345 name="cooler_on", 

346 value=1, 

347 unit="-", 

348 description="On / off signal of mass flow.", 

349 lb=0, 

350 ub=1, 

351 ), 

352 CasadiInput( 

353 name="load", value=150, unit="W", description="Heat load into zone" 

354 ), 

355 CasadiInput( 

356 name="T_in", value=290.15, unit="K", description="Inflow air temperature" 

357 ), 

358 CasadiInput( 

359 name="T_upper", 

360 value=294.15, 

361 unit="K", 

362 description="Upper boundary (soft) for T.", 

363 ), 

364 CasadiInput( 

365 name="cooling_power_full", 

366 value=None, 

367 unit="Not defined", 

368 type="pd.Series", 

369 description="full control trajectory output of baseline mpc", 

370 ), 

371 CasadiInput( 

372 name="cooler_on_full", 

373 value=None, 

374 unit="Not defined", 

375 type="pd.Series", 

376 description="full control trajectory output of baseline mpc", 

377 ), 

378 CasadiInput( 

379 name="in_provision", 

380 value=False, 

381 unit="-", 

382 type="bool", 

383 description="provision flag", 

384 ), 

385 ] 

386 states: list[CasadiState] = [ 

387 CasadiState( 

388 name="T", value=293.15, unit="K", description="Temperature of zone" 

389 ), 

390 CasadiState( 

391 name="T_slack", 

392 value=0, 

393 unit="K", 

394 description="Slack variable of temperature of zone", 

395 ), 

396 ] 

397 parameters: list[CasadiParameter] = [ 

398 CasadiParameter( 

399 name="cp", 

400 value=1000, 

401 unit="J/kg*K", 

402 description="thermal capacity of the air", 

403 ), 

404 CasadiParameter( 

405 name="C", value=100000, unit="J/K", description="thermal capacity of zone" 

406 ), 

407 CasadiParameter( 

408 name="s_T", 

409 value=1, 

410 unit="-", 

411 description="Weight for T in constraint function", 

412 ), 

413 CasadiParameter( 

414 name="r_cooling", 

415 value=1 / 5, 

416 unit="-", 

417 description="Weight for mDot in objective function", 

418 ), 

419 CasadiParameter( 

420 name="cooler_mod_limit", 

421 value=200, 

422 unit="W", 

423 description="Cooling power cannot modulate below this value", 

424 ), 

425 CasadiParameter( 

426 name="prep_time", value=0, unit="s", description="time to switch objective" 

427 ), 

428 CasadiParameter( 

429 name="flex_event_duration", 

430 value=0, 

431 unit="s", 

432 description="time to switch objective", 

433 ), 

434 CasadiParameter( 

435 name="market_time", 

436 value=0, 

437 unit="s", 

438 description="time to switch objective", 

439 ), 

440 CasadiParameter( 

441 name="s_P", 

442 value=10, 

443 unit="-", 

444 description="Weight for P in objective function", 

445 ), 

446 ] 

447 outputs: list[CasadiOutput] = [ 

448 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"), 

449 CasadiOutput(name="P_el", unit="W", description="Electrical power"), 

450 ] 

451 

452 

453class NegFlexModel(CasadiModel): 

454 config: NegFlexModelConfig 

455 

456 def setup_system(self): 

457 cooling_power_lower = ca.if_else( 

458 self.time < self.market_time.sym, 

459 self.cooling_power_full.sym, 

460 self.cooling_power.lb, 

461 ) 

462 cooling_power_upper = ca.if_else( 

463 self.time < self.market_time.sym, 

464 self.cooling_power_full.sym, 

465 self.cooling_power.ub, 

466 ) 

467 self.T.ode = (self.load - self.cooling_power) / self.C 

468 self.T_out.alg = self.T 

469 self.P_el.alg = self.cooling_power 

470 self.constraints = [ 

471 (-ca.inf, self.cooling_power - self.cooler_on * COOLING, 0), 

472 (0, self.cooling_power - self.cooler_on * self.cooler_mod_limit, ca.inf), 

473 (0, self.T + self.T_slack, self.T_upper), 

474 (cooling_power_lower, self.cooling_power, cooling_power_upper), 

475 ] 

476 objective = sum( 

477 [self.r_cooling * self.cooling_power, self.s_T * self.T_slack**2] 

478 ) 

479 obj_std = objective 

480 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el]) 

481 return ca.if_else( 

482 self.time < self.prep_time.sym + self.market_time.sym, 

483 obj_std, 

484 ca.if_else( 

485 self.time 

486 < self.prep_time.sym 

487 + self.flex_event_duration.sym 

488 + self.market_time.sym, 

489 obj_flex, 

490 obj_std, 

491 ), 

492 )