Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%
59 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-09-19 15:08 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-09-19 15:08 +0000
1import casadi as ca
2import pandas as pd
3from agentlib_mpc.models.casadi_model import (
4 CasadiModel,
5 CasadiInput,
6 CasadiState,
7 CasadiParameter,
8 CasadiOutput,
9 CasadiModelConfig,
10)
11from math import inf
14class BaselineMPCModelConfig(CasadiModelConfig):
15 inputs: list[CasadiInput] = [
16 CasadiInput(
17 name="mDot",
18 value=0.0225,
19 unit="kg/s",
20 description="Air mass flow into zone",
21 ),
22 CasadiInput(
23 name="load", value=150, unit="W", description="Heat load into zone"
24 ),
25 CasadiInput(
26 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
27 ),
28 CasadiInput(
29 name="T_upper",
30 value=294.15,
31 unit="K",
32 description="Upper boundary (soft) for T.",
33 ),
34 CasadiInput(
35 name="T_lower",
36 value=292.15,
37 unit="K",
38 description="Upper boundary (soft) for T.",
39 ),
40 CasadiInput(
41 name="_P_external",
42 value=0,
43 unit="W",
44 description="External power profile to be provided",
45 ),
46 CasadiInput(
47 name="in_provision",
48 value=False,
49 unit="-",
50 description="Flag signaling if the flexibility is in provision",
51 ),
52 CasadiInput(
53 name="rel_start",
54 value=0,
55 unit="s",
56 description="relative start time of the flexibility event",
57 ),
58 CasadiInput(
59 name="rel_end",
60 value=0,
61 unit="s",
62 description="relative end time of the flexibility event",
63 ),
64 ]
65 states: list[CasadiState] = [
66 CasadiState(
67 name="T", value=293.15, unit="K", description="Temperature of zone"
68 ),
69 CasadiState(
70 name="T_slack",
71 value=0,
72 unit="K",
73 description="Slack variable of temperature of zone",
74 ),
75 ]
76 parameters: list[CasadiParameter] = [
77 CasadiParameter(
78 name="cp",
79 value=1000,
80 unit="J/kg*K",
81 description="thermal capacity of the air",
82 ),
83 CasadiParameter(
84 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
85 ),
86 CasadiParameter(
87 name="s_T",
88 value=1,
89 unit="-",
90 description="Weight for T in constraint function",
91 ),
92 CasadiParameter(
93 name="r_mDot",
94 value=1,
95 unit="-",
96 description="Weight for mDot in objective function",
97 ),
98 CasadiParameter(
99 name="profile_deviation_weight",
100 value=0,
101 unit="-",
102 description="Weight of soft constraint for deviation from accepted flexible profile",
103 ),
104 CasadiParameter(
105 name="profile_comfort_weight",
106 value=0,
107 unit="-",
108 description="Weight of soft constraint for discomfort",
109 ),
110 ]
111 outputs: list[CasadiOutput] = [
112 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
113 CasadiOutput(
114 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
115 ),
116 CasadiOutput(
117 name="P_el", unit="W", description="The power input to the system"
118 ),
119 CasadiOutput(
120 name="_mDot_full",
121 unit="W",
122 type="pd.Series",
123 value=pd.Series([0]),
124 description="full control output",
125 ),
126 ]
129class BaselineMPCModel(CasadiModel):
130 config: BaselineMPCModelConfig
132 def setup_system(self):
133 self.T.ode = (
134 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
135 )
136 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
137 self.T_out.alg = self.T
138 self.E_out.alg = -self.T * self.C / (3600 * 1000)
139 self.constraints = [
140 (self.T_lower, self.T + self.T_slack, inf),
141 (-inf, self.T - self.T_slack, self.T_upper),
142 (0, self.T_slack, inf),
143 ]
144 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
145 self._mDot_full.alg = self.mDot
146 obj_std = objective
147 return ca.if_else(
148 self.in_provision.sym,
149 ca.if_else(
150 self.time < self.rel_start.sym,
151 obj_std,
152 ca.if_else(
153 self.time >= self.rel_end.sym,
154 obj_std,
155 sum(
156 [
157 self.profile_deviation_weight
158 * (self.P_el - self._P_external) ** 2,
159 self.T_slack**2 * self.profile_comfort_weight,
160 ]
161 ),
162 ),
163 ),
164 obj_std,
165 )
168class PosFlexModelConfig(CasadiModelConfig):
169 inputs: list[CasadiInput] = [
170 CasadiInput(
171 name="mDot",
172 value=0.0225,
173 unit="kg/s",
174 description="Air mass flow into zone",
175 ),
176 CasadiInput(
177 name="load", value=150, unit="W", description="Heat load into zone"
178 ),
179 CasadiInput(
180 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
181 ),
182 CasadiInput(
183 name="T_upper",
184 value=294.15,
185 unit="K",
186 description="Upper boundary (soft) for T.",
187 ),
188 CasadiInput(
189 name="T_lower",
190 value=292.15,
191 unit="K",
192 description="Upper boundary (soft) for T.",
193 ),
194 CasadiInput(
195 name="_mDot_full", value=pd.Series([0]), unit="W", description="pd.Series"
196 ),
197 CasadiInput(
198 name="in_provision", value=False, unit="-", description="provision flag"
199 ),
200 ]
201 states: list[CasadiState] = [
202 CasadiState(
203 name="T", value=293.15, unit="K", description="Temperature of zone"
204 ),
205 CasadiState(
206 name="T_slack",
207 value=0,
208 unit="K",
209 description="Slack variable of temperature of zone",
210 ),
211 ]
212 parameters: list[CasadiParameter] = [
213 CasadiParameter(
214 name="cp",
215 value=1000,
216 unit="J/kg*K",
217 description="thermal capacity of the air",
218 ),
219 CasadiParameter(
220 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
221 ),
222 CasadiParameter(
223 name="s_T",
224 value=1,
225 unit="-",
226 description="Weight for T in constraint function",
227 ),
228 CasadiParameter(
229 name="r_mDot",
230 value=1,
231 unit="-",
232 description="Weight for mDot in objective function",
233 ),
234 CasadiParameter(
235 name="prep_time", value=0, unit="s", description="time to switch objective"
236 ),
237 CasadiParameter(
238 name="flex_event_duration",
239 value=0,
240 unit="s",
241 description="time to switch objective",
242 ),
243 CasadiParameter(
244 name="market_time",
245 value=0,
246 unit="s",
247 description="time to switch objective",
248 ),
249 CasadiParameter(
250 name="s_P",
251 value=10,
252 unit="-",
253 description="Weight for P in objective function",
254 ),
255 ]
256 outputs: list[CasadiOutput] = [
257 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
258 CasadiOutput(
259 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
260 ),
261 CasadiOutput(
262 name="P_el", unit="W", description="The power input to the system"
263 ),
264 ]
267class PosFlexModel(CasadiModel):
268 config: PosFlexModelConfig
270 def setup_system(self):
271 mDot_lower = ca.if_else(
272 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.lb
273 )
274 mDot_upper = ca.if_else(
275 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.ub
276 )
277 self.T.ode = (
278 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
279 )
280 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
281 self.T_out.alg = self.T
282 self.E_out.alg = -self.T * self.C / (3600 * 1000)
283 self.constraints = [
284 (self.T_lower, self.T + self.T_slack, inf),
285 (-inf, self.T - self.T_slack, self.T_upper),
286 (0, self.T_slack, inf),
287 (mDot_lower, self.mDot, mDot_upper),
288 ]
289 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
290 obj_std = objective
291 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el])
292 return ca.if_else(
293 self.time < self.prep_time.sym + self.market_time.sym,
294 obj_std,
295 ca.if_else(
296 self.time
297 < self.prep_time.sym
298 + self.flex_event_duration.sym
299 + self.market_time.sym,
300 obj_flex,
301 obj_std,
302 ),
303 )
306class NegFlexModelConfig(CasadiModelConfig):
307 inputs: list[CasadiInput] = [
308 CasadiInput(
309 name="mDot",
310 value=0.0225,
311 unit="kg/s",
312 description="Air mass flow into zone",
313 ),
314 CasadiInput(
315 name="load", value=150, unit="W", description="Heat load into zone"
316 ),
317 CasadiInput(
318 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
319 ),
320 CasadiInput(
321 name="T_upper",
322 value=294.15,
323 unit="K",
324 description="Upper boundary (soft) for T.",
325 ),
326 CasadiInput(
327 name="T_lower",
328 value=292.15,
329 unit="K",
330 description="Upper boundary (soft) for T.",
331 ),
332 CasadiInput(
333 name="_mDot_full", value=pd.Series([0]), unit="W", description="pd.Series"
334 ),
335 CasadiInput(
336 name="in_provision", value=False, unit="-", description="provision flag"
337 ),
338 ]
339 states: list[CasadiState] = [
340 CasadiState(
341 name="T", value=293.15, unit="K", description="Temperature of zone"
342 ),
343 CasadiState(
344 name="T_slack",
345 value=0,
346 unit="K",
347 description="Slack variable of temperature of zone",
348 ),
349 ]
350 parameters: list[CasadiParameter] = [
351 CasadiParameter(
352 name="cp",
353 value=1000,
354 unit="J/kg*K",
355 description="thermal capacity of the air",
356 ),
357 CasadiParameter(
358 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
359 ),
360 CasadiParameter(
361 name="s_T",
362 value=1,
363 unit="-",
364 description="Weight for T in constraint function",
365 ),
366 CasadiParameter(
367 name="r_mDot",
368 value=1,
369 unit="-",
370 description="Weight for mDot in objective function",
371 ),
372 CasadiParameter(
373 name="prep_time", value=0, unit="s", description="time to switch objective"
374 ),
375 CasadiParameter(
376 name="flex_event_duration",
377 value=0,
378 unit="s",
379 description="time to switch objective",
380 ),
381 CasadiParameter(
382 name="market_time",
383 value=0,
384 unit="s",
385 description="time to switch objective",
386 ),
387 CasadiParameter(
388 name="s_P",
389 value=10,
390 unit="-",
391 description="Weight for P in objective function",
392 ),
393 ]
394 outputs: list[CasadiOutput] = [
395 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
396 CasadiOutput(
397 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
398 ),
399 CasadiOutput(
400 name="P_el", unit="W", description="The power input to the system"
401 ),
402 ]
405class NegFlexModel(CasadiModel):
406 config: NegFlexModelConfig
408 def setup_system(self):
409 mDot_lower = ca.if_else(
410 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.lb
411 )
412 mDot_upper = ca.if_else(
413 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.ub
414 )
415 self.T.ode = (
416 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
417 )
418 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
419 self.T_out.alg = self.T
420 self.E_out.alg = -self.T * self.C / (3600 * 1000)
421 self.constraints = [
422 (self.T_lower, self.T + self.T_slack, inf),
423 (-inf, self.T - self.T_slack, self.T_upper),
424 (0, self.T_slack, inf),
425 (mDot_lower, self.mDot, mDot_upper),
426 ]
427 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
428 obj_std = objective
429 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el])
430 return ca.if_else(
431 self.time < self.prep_time.sym + self.market_time.sym,
432 obj_std,
433 ca.if_else(
434 self.time
435 < self.prep_time.sym
436 + self.flex_event_duration.sym
437 + self.market_time.sym,
438 obj_flex,
439 obj_std,
440 ),
441 )