Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%
61 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-06-17 09:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-06-17 09:09 +0000
1import casadi as ca
2import pandas as pd
3from agentlib_mpc.models.casadi_model import (
4 CasadiModel,
5 CasadiInput,
6 CasadiState,
7 CasadiParameter,
8 CasadiOutput,
9 CasadiModelConfig,
10)
11from math import inf
14class BaselineMPCModelConfig(CasadiModelConfig):
15 inputs: list[CasadiInput] = [
16 CasadiInput(
17 name="mDot",
18 value=0.0225,
19 unit="kg/s",
20 description="Air mass flow into zone",
21 ),
22 CasadiInput(
23 name="load", value=150, unit="W", description="Heat load into zone"
24 ),
25 CasadiInput(
26 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
27 ),
28 CasadiInput(
29 name="T_upper",
30 value=294.15,
31 unit="K",
32 description="Upper boundary (soft) for T.",
33 ),
34 CasadiInput(
35 name="T_lower",
36 value=292.15,
37 unit="K",
38 description="Upper boundary (soft) for T.",
39 ),
40 CasadiInput(
41 name="_P_external",
42 value=0,
43 unit="W",
44 type="pd.Series",
45 description="External power profile to be provided",
46 ),
47 CasadiInput(
48 name="in_provision",
49 value=False,
50 unit="-",
51 type="bool",
52 description="Flag signaling if the flexibility is in provision",
53 ),
54 CasadiInput(
55 name="rel_start",
56 value=0,
57 unit="s",
58 type="int",
59 description="relative start time of the flexibility event",
60 ),
61 CasadiInput(
62 name="rel_end",
63 value=0,
64 unit="s",
65 type="int",
66 description="relative end time of the flexibility event",
67 ),
68 ]
69 states: list[CasadiState] = [
70 CasadiState(
71 name="T", value=293.15, unit="K", description="Temperature of zone"
72 ),
73 CasadiState(
74 name="T_slack",
75 value=0,
76 unit="K",
77 description="Slack variable of temperature of zone",
78 ),
79 ]
80 parameters: list[CasadiParameter] = [
81 CasadiParameter(
82 name="cp",
83 value=1000,
84 unit="J/kg*K",
85 description="thermal capacity of the air",
86 ),
87 CasadiParameter(
88 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
89 ),
90 CasadiParameter(
91 name="s_T",
92 value=1,
93 unit="-",
94 description="Weight for T in constraint function",
95 ),
96 CasadiParameter(
97 name="r_mDot",
98 value=1,
99 unit="-",
100 description="Weight for mDot in objective function",
101 ),
102 CasadiParameter(
103 name="profile_deviation_weight",
104 value=0,
105 unit="-",
106 description="Weight of soft constraint for deviation from accepted flexible profile",
107 ),
108 CasadiParameter(
109 name="profile_comfort_weight",
110 value=0,
111 unit="-",
112 description="Weight of soft constraint for discomfort",
113 ),
114 ]
115 outputs: list[CasadiOutput] = [
116 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
117 CasadiOutput(
118 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
119 ),
120 CasadiOutput(
121 name="eta_heater",
122 unit="-",
123 value=1,
124 description="Efficiency of electrical heater",
125 ),
126 CasadiOutput(
127 name="P_el", unit="W", description="The power input to the system"
128 ),
129 ]
132class BaselineMPCModel(CasadiModel):
133 config: BaselineMPCModelConfig
135 def setup_system(self):
136 self.T.ode = (
137 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
138 )
139 self.P_el.alg = (
140 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater
141 )
142 self.eta_heater.alg = 1
143 self.T_out.alg = self.T
144 self.E_out.alg = -self.T * self.C / (3600 * 1000)
145 self.constraints = [
146 (self.T_lower, self.T + self.T_slack, inf),
147 (-inf, self.T - self.T_slack, self.T_upper),
148 (0, self.T_slack, inf),
149 ]
150 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
151 obj_std = objective
152 return ca.if_else(
153 self.in_provision.sym,
154 ca.if_else(
155 self.time < self.rel_start.sym,
156 obj_std,
157 ca.if_else(
158 self.time >= self.rel_end.sym,
159 obj_std,
160 sum(
161 [
162 self.profile_deviation_weight
163 * (self.P_el - self._P_external) ** 2,
164 self.T_slack**2 * self.profile_comfort_weight,
165 ]
166 ),
167 ),
168 ),
169 obj_std,
170 )
173class PosFlexModelConfig(CasadiModelConfig):
174 inputs: list[CasadiInput] = [
175 CasadiInput(
176 name="mDot",
177 value=0.0225,
178 unit="kg/s",
179 description="Air mass flow into zone",
180 ),
181 CasadiInput(
182 name="load", value=150, unit="W", description="Heat load into zone"
183 ),
184 CasadiInput(
185 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
186 ),
187 CasadiInput(
188 name="T_upper",
189 value=294.15,
190 unit="K",
191 description="Upper boundary (soft) for T.",
192 ),
193 CasadiInput(
194 name="T_lower",
195 value=292.15,
196 unit="K",
197 description="Upper boundary (soft) for T.",
198 ),
199 CasadiInput(
200 name="mDot_full",
201 value=None,
202 unit="Not defined",
203 type="pd.Series",
204 description="full control trajectory output of baseline mpc",
205 ),
206 CasadiInput(
207 name="in_provision",
208 value=False,
209 unit="Not defined",
210 type="bool",
211 description="Flag indicating whether flexibility should be provisioned",
212 ),
213 ]
214 states: list[CasadiState] = [
215 CasadiState(
216 name="T", value=293.15, unit="K", description="Temperature of zone"
217 ),
218 CasadiState(
219 name="T_slack",
220 value=0,
221 unit="K",
222 description="Slack variable of temperature of zone",
223 ),
224 ]
225 parameters: list[CasadiParameter] = [
226 CasadiParameter(
227 name="cp",
228 value=1000,
229 unit="J/kg*K",
230 description="thermal capacity of the air",
231 ),
232 CasadiParameter(
233 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
234 ),
235 CasadiParameter(
236 name="s_T",
237 value=1,
238 unit="-",
239 description="Weight for T in constraint function",
240 ),
241 CasadiParameter(
242 name="r_mDot",
243 value=1,
244 unit="-",
245 description="Weight for mDot in objective function",
246 ),
247 CasadiParameter(
248 name="prep_time",
249 value=900,
250 unit="s",
251 description="Preparation time before switching objective",
252 ),
253 CasadiParameter(
254 name="flex_event_duration",
255 value=7200,
256 unit="s",
257 description="Duration of the flexibility event",
258 ),
259 CasadiParameter(
260 name="market_time",
261 value=900,
262 unit="s",
263 description="Market time associated with the objective switch",
264 ),
265 CasadiParameter(
266 name="s_P", value=10, unit="Not defined", description="Not defined"
267 ),
268 ]
269 outputs: list[CasadiOutput] = [
270 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
271 CasadiOutput(
272 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
273 ),
274 CasadiOutput(
275 name="eta_heater",
276 unit="-",
277 value=1,
278 description="Efficiency of electrical heater",
279 ),
280 CasadiOutput(
281 name="P_el", unit="W", description="The power input to the system"
282 ),
283 ]
286class PosFlexModel(CasadiModel):
287 config: PosFlexModelConfig
289 def setup_system(self):
290 mDot_lower = ca.if_else(
291 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb
292 )
293 mDot_upper = ca.if_else(
294 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub
295 )
296 self.T.ode = (
297 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
298 )
299 self.P_el.alg = (
300 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater
301 )
302 self.eta_heater.alg = 1
303 self.T_out.alg = self.T
304 self.E_out.alg = -self.T * self.C / (3600 * 1000)
305 self.constraints = [
306 (self.T_lower, self.T + self.T_slack, inf),
307 (-inf, self.T - self.T_slack, self.T_upper),
308 (0, self.T_slack, inf),
309 (mDot_lower, self.mDot, mDot_upper),
310 ]
311 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
312 obj_std = objective + self.P_el**2 * 0
313 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el])
314 return ca.if_else(
315 self.time < self.prep_time.sym + self.market_time.sym,
316 obj_std,
317 ca.if_else(
318 self.time
319 < self.prep_time.sym
320 + self.flex_event_duration.sym
321 + self.market_time.sym,
322 obj_flex,
323 obj_std,
324 ),
325 )
328class NegFlexModelConfig(CasadiModelConfig):
329 inputs: list[CasadiInput] = [
330 CasadiInput(
331 name="mDot",
332 value=0.0225,
333 unit="kg/s",
334 description="Air mass flow into zone",
335 ),
336 CasadiInput(
337 name="load", value=150, unit="W", description="Heat load into zone"
338 ),
339 CasadiInput(
340 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
341 ),
342 CasadiInput(
343 name="T_upper",
344 value=294.15,
345 unit="K",
346 description="Upper boundary (soft) for T.",
347 ),
348 CasadiInput(
349 name="T_lower",
350 value=292.15,
351 unit="K",
352 description="Upper boundary (soft) for T.",
353 ),
354 CasadiInput(
355 name="mDot_full",
356 value=None,
357 unit="Not defined",
358 type="pd.Series",
359 description="full control trajectory output of baseline mpc",
360 ),
361 CasadiInput(
362 name="in_provision",
363 value=False,
364 unit="Not defined",
365 type="bool",
366 description="Flag indicating whether flexibility should be provisioned",
367 ),
368 ]
369 states: list[CasadiState] = [
370 CasadiState(
371 name="T", value=293.15, unit="K", description="Temperature of zone"
372 ),
373 CasadiState(
374 name="T_slack",
375 value=0,
376 unit="K",
377 description="Slack variable of temperature of zone",
378 ),
379 ]
380 parameters: list[CasadiParameter] = [
381 CasadiParameter(
382 name="cp",
383 value=1000,
384 unit="J/kg*K",
385 description="thermal capacity of the air",
386 ),
387 CasadiParameter(
388 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
389 ),
390 CasadiParameter(
391 name="s_T",
392 value=1,
393 unit="-",
394 description="Weight for T in constraint function",
395 ),
396 CasadiParameter(
397 name="r_mDot",
398 value=1,
399 unit="-",
400 description="Weight for mDot in objective function",
401 ),
402 CasadiParameter(
403 name="prep_time",
404 value=900,
405 unit="s",
406 description="Preparation time before switching objective",
407 ),
408 CasadiParameter(
409 name="flex_event_duration",
410 value=7200,
411 unit="s",
412 description="Duration of the flexibility event",
413 ),
414 CasadiParameter(
415 name="market_time",
416 value=900,
417 unit="s",
418 description="Market time associated with the objective switch",
419 ),
420 CasadiParameter(
421 name="s_P", value=10, unit="Not defined", description="Not defined"
422 ),
423 ]
424 outputs: list[CasadiOutput] = [
425 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
426 CasadiOutput(
427 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
428 ),
429 CasadiOutput(
430 name="eta_heater",
431 unit="-",
432 value=1,
433 description="Efficiency of electrical heater",
434 ),
435 CasadiOutput(
436 name="P_el", unit="W", description="The power input to the system"
437 ),
438 ]
441class NegFlexModel(CasadiModel):
442 config: NegFlexModelConfig
444 def setup_system(self):
445 mDot_lower = ca.if_else(
446 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb
447 )
448 mDot_upper = ca.if_else(
449 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub
450 )
451 self.T.ode = (
452 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
453 )
454 self.P_el.alg = (
455 self.cp * self.mDot * (self.T - self.T_in) / 1000 / self.eta_heater
456 )
457 self.eta_heater.alg = 1
458 self.T_out.alg = self.T
459 self.E_out.alg = -self.T * self.C / (3600 * 1000)
460 self.constraints = [
461 (self.T_lower, self.T + self.T_slack, inf),
462 (-inf, self.T - self.T_slack, self.T_upper),
463 (0, self.T_slack, inf),
464 (mDot_lower, self.mDot, mDot_upper),
465 ]
466 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
467 obj_std = objective
468 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el])
469 return ca.if_else(
470 self.time < self.prep_time.sym + self.market_time.sym,
471 obj_std,
472 ca.if_else(
473 self.time
474 < self.prep_time.sym
475 + self.flex_event_duration.sym
476 + self.market_time.sym,
477 obj_flex,
478 obj_std,
479 ),
480 )