Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%
58 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1import casadi as ca
2import pandas as pd
3from agentlib_mpc.models.casadi_model import (
4 CasadiModel,
5 CasadiInput,
6 CasadiState,
7 CasadiParameter,
8 CasadiOutput,
9 CasadiModelConfig,
10)
11from math import inf
14class BaselineMPCModelConfig(CasadiModelConfig):
15 inputs: list[CasadiInput] = [
16 CasadiInput(
17 name="mDot",
18 value=0.0225,
19 unit="kg/s",
20 description="Air mass flow into zone",
21 ),
22 CasadiInput(
23 name="load", value=150, unit="W", description="Heat load into zone"
24 ),
25 CasadiInput(
26 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
27 ),
28 CasadiInput(
29 name="T_upper",
30 value=294.15,
31 unit="K",
32 description="Upper boundary (soft) for T.",
33 ),
34 CasadiInput(
35 name="T_lower",
36 value=292.15,
37 unit="K",
38 description="Upper boundary (soft) for T.",
39 ),
40 CasadiInput(
41 name="_P_external",
42 value=0,
43 unit="W",
44 type="pd.Series",
45 description="External power profile to be provided",
46 ),
47 CasadiInput(
48 name="in_provision",
49 value=False,
50 unit="-",
51 type="bool",
52 description="Flag signaling if the flexibility is in provision",
53 ),
54 CasadiInput(
55 name="rel_start",
56 value=0,
57 unit="s",
58 type="int",
59 description="relative start time of the flexibility event",
60 ),
61 CasadiInput(
62 name="rel_end",
63 value=0,
64 unit="s",
65 type="int",
66 description="relative end time of the flexibility event",
67 ),
68 ]
69 states: list[CasadiState] = [
70 CasadiState(
71 name="T", value=293.15, unit="K", description="Temperature of zone"
72 ),
73 CasadiState(
74 name="T_slack",
75 value=0,
76 unit="K",
77 description="Slack variable of temperature of zone",
78 ),
79 ]
80 parameters: list[CasadiParameter] = [
81 CasadiParameter(
82 name="cp",
83 value=1000,
84 unit="J/kg*K",
85 description="thermal capacity of the air",
86 ),
87 CasadiParameter(
88 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
89 ),
90 CasadiParameter(
91 name="s_T",
92 value=1,
93 unit="-",
94 description="Weight for T in constraint function",
95 ),
96 CasadiParameter(
97 name="r_mDot",
98 value=1,
99 unit="-",
100 description="Weight for mDot in objective function",
101 ),
102 CasadiParameter(
103 name="profile_deviation_weight",
104 value=0,
105 unit="-",
106 description="Weight of soft constraint for deviation from accepted flexible profile",
107 ),
108 CasadiParameter(
109 name="profile_comfort_weight",
110 value=0,
111 unit="-",
112 description="Weight of soft constraint for discomfort",
113 ),
114 ]
115 outputs: list[CasadiOutput] = [
116 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
117 CasadiOutput(
118 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
119 ),
120 CasadiOutput(
121 name="P_el", unit="W", description="The power input to the system"
122 ),
123 ]
126class BaselineMPCModel(CasadiModel):
127 config: BaselineMPCModelConfig
129 def setup_system(self):
130 self.T.ode = (
131 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
132 )
133 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
134 self.T_out.alg = self.T
135 self.E_out.alg = -self.T * self.C / (3600 * 1000)
136 self.constraints = [
137 (self.T_lower, self.T + self.T_slack, inf),
138 (-inf, self.T - self.T_slack, self.T_upper),
139 (0, self.T_slack, inf),
140 ]
141 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
142 obj_std = objective
143 return ca.if_else(
144 self.in_provision.sym,
145 ca.if_else(
146 self.time < self.rel_start.sym,
147 obj_std,
148 ca.if_else(
149 self.time >= self.rel_end.sym,
150 obj_std,
151 sum(
152 [
153 self.profile_deviation_weight
154 * (self.P_el - self._P_external) ** 2,
155 self.T_slack**2 * self.profile_comfort_weight,
156 ]
157 ),
158 ),
159 ),
160 obj_std,
161 )
164class PosFlexModelConfig(CasadiModelConfig):
165 inputs: list[CasadiInput] = [
166 CasadiInput(
167 name="mDot",
168 value=0.0225,
169 unit="kg/s",
170 description="Air mass flow into zone",
171 ),
172 CasadiInput(
173 name="load", value=150, unit="W", description="Heat load into zone"
174 ),
175 CasadiInput(
176 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
177 ),
178 CasadiInput(
179 name="T_upper",
180 value=294.15,
181 unit="K",
182 description="Upper boundary (soft) for T.",
183 ),
184 CasadiInput(
185 name="T_lower",
186 value=292.15,
187 unit="K",
188 description="Upper boundary (soft) for T.",
189 ),
190 CasadiInput(
191 name="mDot_full",
192 value=None,
193 unit="Not defined",
194 type="pd.Series",
195 description="full control trajectory output of baseline mpc",
196 ),
197 CasadiInput(
198 name="in_provision",
199 value=False,
200 unit="-",
201 type="bool",
202 description="provision flag",
203 ),
204 ]
205 states: list[CasadiState] = [
206 CasadiState(
207 name="T", value=293.15, unit="K", description="Temperature of zone"
208 ),
209 CasadiState(
210 name="T_slack",
211 value=0,
212 unit="K",
213 description="Slack variable of temperature of zone",
214 ),
215 ]
216 parameters: list[CasadiParameter] = [
217 CasadiParameter(
218 name="cp",
219 value=1000,
220 unit="J/kg*K",
221 description="thermal capacity of the air",
222 ),
223 CasadiParameter(
224 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
225 ),
226 CasadiParameter(
227 name="s_T",
228 value=1,
229 unit="-",
230 description="Weight for T in constraint function",
231 ),
232 CasadiParameter(
233 name="r_mDot",
234 value=1,
235 unit="-",
236 description="Weight for mDot in objective function",
237 ),
238 CasadiParameter(
239 name="prep_time", value=0, unit="s", description="time to switch objective"
240 ),
241 CasadiParameter(
242 name="flex_event_duration",
243 value=0,
244 unit="s",
245 description="time to switch objective",
246 ),
247 CasadiParameter(
248 name="market_time",
249 value=0,
250 unit="s",
251 description="time to switch objective",
252 ),
253 CasadiParameter(
254 name="s_P",
255 value=10,
256 unit="-",
257 description="Weight for P in objective function",
258 ),
259 ]
260 outputs: list[CasadiOutput] = [
261 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
262 CasadiOutput(
263 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
264 ),
265 CasadiOutput(
266 name="P_el", unit="W", description="The power input to the system"
267 ),
268 ]
271class PosFlexModel(CasadiModel):
272 config: PosFlexModelConfig
274 def setup_system(self):
275 mDot_lower = ca.if_else(
276 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb
277 )
278 mDot_upper = ca.if_else(
279 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub
280 )
281 self.T.ode = (
282 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
283 )
284 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
285 self.T_out.alg = self.T
286 self.E_out.alg = -self.T * self.C / (3600 * 1000)
287 self.constraints = [
288 (self.T_lower, self.T + self.T_slack, inf),
289 (-inf, self.T - self.T_slack, self.T_upper),
290 (0, self.T_slack, inf),
291 (mDot_lower, self.mDot, mDot_upper),
292 ]
293 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
294 obj_std = objective
295 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el])
296 return ca.if_else(
297 self.time < self.prep_time.sym + self.market_time.sym,
298 obj_std,
299 ca.if_else(
300 self.time
301 < self.prep_time.sym
302 + self.flex_event_duration.sym
303 + self.market_time.sym,
304 obj_flex,
305 obj_std,
306 ),
307 )
310class NegFlexModelConfig(CasadiModelConfig):
311 inputs: list[CasadiInput] = [
312 CasadiInput(
313 name="mDot",
314 value=0.0225,
315 unit="kg/s",
316 description="Air mass flow into zone",
317 ),
318 CasadiInput(
319 name="load", value=150, unit="W", description="Heat load into zone"
320 ),
321 CasadiInput(
322 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
323 ),
324 CasadiInput(
325 name="T_upper",
326 value=294.15,
327 unit="K",
328 description="Upper boundary (soft) for T.",
329 ),
330 CasadiInput(
331 name="T_lower",
332 value=292.15,
333 unit="K",
334 description="Upper boundary (soft) for T.",
335 ),
336 CasadiInput(
337 name="mDot_full",
338 value=None,
339 unit="Not defined",
340 type="pd.Series",
341 description="full control trajectory output of baseline mpc",
342 ),
343 CasadiInput(
344 name="in_provision",
345 value=False,
346 unit="-",
347 type="bool",
348 description="provision flag",
349 ),
350 ]
351 states: list[CasadiState] = [
352 CasadiState(
353 name="T", value=293.15, unit="K", description="Temperature of zone"
354 ),
355 CasadiState(
356 name="T_slack",
357 value=0,
358 unit="K",
359 description="Slack variable of temperature of zone",
360 ),
361 ]
362 parameters: list[CasadiParameter] = [
363 CasadiParameter(
364 name="cp",
365 value=1000,
366 unit="J/kg*K",
367 description="thermal capacity of the air",
368 ),
369 CasadiParameter(
370 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
371 ),
372 CasadiParameter(
373 name="s_T",
374 value=1,
375 unit="-",
376 description="Weight for T in constraint function",
377 ),
378 CasadiParameter(
379 name="r_mDot",
380 value=1,
381 unit="-",
382 description="Weight for mDot in objective function",
383 ),
384 CasadiParameter(
385 name="prep_time", value=0, unit="s", description="time to switch objective"
386 ),
387 CasadiParameter(
388 name="flex_event_duration",
389 value=0,
390 unit="s",
391 description="time to switch objective",
392 ),
393 CasadiParameter(
394 name="market_time",
395 value=0,
396 unit="s",
397 description="time to switch objective",
398 ),
399 CasadiParameter(
400 name="s_P",
401 value=10,
402 unit="-",
403 description="Weight for P in objective function",
404 ),
405 ]
406 outputs: list[CasadiOutput] = [
407 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
408 CasadiOutput(
409 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
410 ),
411 CasadiOutput(
412 name="P_el", unit="W", description="The power input to the system"
413 ),
414 ]
417class NegFlexModel(CasadiModel):
418 config: NegFlexModelConfig
420 def setup_system(self):
421 mDot_lower = ca.if_else(
422 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.lb
423 )
424 mDot_upper = ca.if_else(
425 self.time < self.market_time.sym, self.mDot_full.sym, self.mDot.ub
426 )
427 self.T.ode = (
428 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
429 )
430 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
431 self.T_out.alg = self.T
432 self.E_out.alg = -self.T * self.C / (3600 * 1000)
433 self.constraints = [
434 (self.T_lower, self.T + self.T_slack, inf),
435 (-inf, self.T - self.T_slack, self.T_upper),
436 (0, self.T_slack, inf),
437 (mDot_lower, self.mDot, mDot_upper),
438 ]
439 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
440 obj_std = objective
441 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el])
442 return ca.if_else(
443 self.time < self.prep_time.sym + self.market_time.sym,
444 obj_std,
445 ca.if_else(
446 self.time
447 < self.prep_time.sym
448 + self.flex_event_duration.sym
449 + self.market_time.sym,
450 obj_flex,
451 obj_std,
452 ),
453 )