Coverage for examples/OneRoom_SimpleMPC/flex_output_data/created_flex_files/flex_agents.py: 100%
60 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
1import casadi as ca
2import pandas as pd
3from agentlib_mpc.models.casadi_model import (
4 CasadiModel,
5 CasadiInput,
6 CasadiState,
7 CasadiParameter,
8 CasadiOutput,
9 CasadiModelConfig,
10)
11from typing import List
12from math import inf
15class BaselineMPCModelConfig(CasadiModelConfig):
16 inputs: List[CasadiInput] = [
17 CasadiInput(
18 name="mDot",
19 value=0.0225,
20 unit="kg/s",
21 description="Air mass flow into zone",
22 ),
23 CasadiInput(
24 name="load", value=150, unit="W", description="Heat load into zone"
25 ),
26 CasadiInput(
27 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
28 ),
29 CasadiInput(
30 name="T_upper",
31 value=294.15,
32 unit="K",
33 description="Upper boundary (soft) for T.",
34 ),
35 CasadiInput(
36 name="T_lower",
37 value=292.15,
38 unit="K",
39 description="Upper boundary (soft) for T.",
40 ),
41 CasadiInput(
42 name="_P_external",
43 value=0,
44 unit="W",
45 description="External power profile to be provided",
46 ),
47 CasadiInput(
48 name="in_provision",
49 value=False,
50 unit="-",
51 description="Flag signaling if the flexibility is in provision",
52 ),
53 CasadiInput(
54 name="rel_start",
55 value=0,
56 unit="s",
57 description="relative start time of the flexibility event",
58 ),
59 CasadiInput(
60 name="rel_end",
61 value=0,
62 unit="s",
63 description="relative end time of the flexibility event",
64 ),
65 ]
66 states: List[CasadiState] = [
67 CasadiState(
68 name="T", value=293.15, unit="K", description="Temperature of zone"
69 ),
70 CasadiState(
71 name="T_slack",
72 value=0,
73 unit="K",
74 description="Slack variable of temperature of zone",
75 ),
76 ]
77 parameters: List[CasadiParameter] = [
78 CasadiParameter(
79 name="cp",
80 value=1000,
81 unit="J/kg*K",
82 description="thermal capacity of the air",
83 ),
84 CasadiParameter(
85 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
86 ),
87 CasadiParameter(
88 name="s_T",
89 value=1,
90 unit="-",
91 description="Weight for T in constraint function",
92 ),
93 CasadiParameter(
94 name="r_mDot",
95 value=1,
96 unit="-",
97 description="Weight for mDot in objective function",
98 ),
99 CasadiParameter(
100 name="profile_deviation_weight",
101 value=0,
102 unit="-",
103 description="Weight of soft constraint for deviation from accepted flexible profile",
104 ),
105 CasadiParameter(
106 name="profile_comfort_weight",
107 value=0,
108 unit="-",
109 description="Weight of soft constraint for discomfort",
110 ),
111 ]
112 outputs: List[CasadiOutput] = [
113 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
114 CasadiOutput(
115 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
116 ),
117 CasadiOutput(
118 name="P_el", unit="W", description="The power input to the system"
119 ),
120 CasadiOutput(
121 name="_mDot_full",
122 unit="W",
123 type="pd.Series",
124 value=pd.Series([0]),
125 description="full control output",
126 ),
127 ]
130class BaselineMPCModel(CasadiModel):
131 config: BaselineMPCModelConfig
133 def setup_system(self):
134 self.T.ode = (
135 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
136 )
137 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
138 self.T_out.alg = self.T
139 self.E_out.alg = -self.T * self.C / (3600 * 1000)
140 self.constraints = [
141 (self.T_lower, self.T + self.T_slack, inf),
142 (-inf, self.T - self.T_slack, self.T_upper),
143 (0, self.T_slack, inf),
144 ]
145 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
146 self._mDot_full.alg = self.mDot
147 obj_std = objective
148 return ca.if_else(
149 self.in_provision.sym,
150 ca.if_else(
151 self.time < self.rel_start.sym,
152 obj_std,
153 ca.if_else(
154 self.time >= self.rel_end.sym,
155 obj_std,
156 sum(
157 [
158 self.profile_deviation_weight
159 * (self.P_el - self._P_external) ** 2,
160 self.T_slack**2 * self.profile_comfort_weight,
161 ]
162 ),
163 ),
164 ),
165 obj_std,
166 )
169class PosFlexModelConfig(CasadiModelConfig):
170 inputs: List[CasadiInput] = [
171 CasadiInput(
172 name="mDot",
173 value=0.0225,
174 unit="kg/s",
175 description="Air mass flow into zone",
176 ),
177 CasadiInput(
178 name="load", value=150, unit="W", description="Heat load into zone"
179 ),
180 CasadiInput(
181 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
182 ),
183 CasadiInput(
184 name="T_upper",
185 value=294.15,
186 unit="K",
187 description="Upper boundary (soft) for T.",
188 ),
189 CasadiInput(
190 name="T_lower",
191 value=292.15,
192 unit="K",
193 description="Upper boundary (soft) for T.",
194 ),
195 CasadiInput(
196 name="_mDot_full", value=pd.Series([0]), unit="W", description="pd.Series"
197 ),
198 CasadiInput(
199 name="in_provision", value=False, unit="-", description="provision flag"
200 ),
201 ]
202 states: List[CasadiState] = [
203 CasadiState(
204 name="T", value=293.15, unit="K", description="Temperature of zone"
205 ),
206 CasadiState(
207 name="T_slack",
208 value=0,
209 unit="K",
210 description="Slack variable of temperature of zone",
211 ),
212 ]
213 parameters: List[CasadiParameter] = [
214 CasadiParameter(
215 name="cp",
216 value=1000,
217 unit="J/kg*K",
218 description="thermal capacity of the air",
219 ),
220 CasadiParameter(
221 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
222 ),
223 CasadiParameter(
224 name="s_T",
225 value=1,
226 unit="-",
227 description="Weight for T in constraint function",
228 ),
229 CasadiParameter(
230 name="r_mDot",
231 value=1,
232 unit="-",
233 description="Weight for mDot in objective function",
234 ),
235 CasadiParameter(
236 name="prep_time", value=0, unit="s", description="time to switch objective"
237 ),
238 CasadiParameter(
239 name="flex_event_duration",
240 value=0,
241 unit="s",
242 description="time to switch objective",
243 ),
244 CasadiParameter(
245 name="market_time",
246 value=0,
247 unit="s",
248 description="time to switch objective",
249 ),
250 CasadiParameter(
251 name="s_P",
252 value=10,
253 unit="-",
254 description="Weight for P in objective function",
255 ),
256 ]
257 outputs: List[CasadiOutput] = [
258 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
259 CasadiOutput(
260 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
261 ),
262 CasadiOutput(
263 name="P_el", unit="W", description="The power input to the system"
264 ),
265 ]
268class PosFlexModel(CasadiModel):
269 config: PosFlexModelConfig
271 def setup_system(self):
272 mDot_lower = ca.if_else(
273 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.lb
274 )
275 mDot_upper = ca.if_else(
276 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.ub
277 )
278 self.T.ode = (
279 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
280 )
281 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
282 self.T_out.alg = self.T
283 self.E_out.alg = -self.T * self.C / (3600 * 1000)
284 self.constraints = [
285 (self.T_lower, self.T + self.T_slack, inf),
286 (-inf, self.T - self.T_slack, self.T_upper),
287 (0, self.T_slack, inf),
288 (mDot_lower, self.mDot, mDot_upper),
289 ]
290 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
291 obj_std = objective
292 obj_flex = sum([self.s_T * self.T_slack**2, self.s_P * self.P_el])
293 return ca.if_else(
294 self.time < self.prep_time.sym + self.market_time.sym,
295 obj_std,
296 ca.if_else(
297 self.time
298 < self.prep_time.sym
299 + self.flex_event_duration.sym
300 + self.market_time.sym,
301 obj_flex,
302 obj_std,
303 ),
304 )
307class NegFlexModelConfig(CasadiModelConfig):
308 inputs: List[CasadiInput] = [
309 CasadiInput(
310 name="mDot",
311 value=0.0225,
312 unit="kg/s",
313 description="Air mass flow into zone",
314 ),
315 CasadiInput(
316 name="load", value=150, unit="W", description="Heat load into zone"
317 ),
318 CasadiInput(
319 name="T_in", value=280.15, unit="K", description="Inflow air temperature"
320 ),
321 CasadiInput(
322 name="T_upper",
323 value=294.15,
324 unit="K",
325 description="Upper boundary (soft) for T.",
326 ),
327 CasadiInput(
328 name="T_lower",
329 value=292.15,
330 unit="K",
331 description="Upper boundary (soft) for T.",
332 ),
333 CasadiInput(
334 name="_mDot_full", value=pd.Series([0]), unit="W", description="pd.Series"
335 ),
336 CasadiInput(
337 name="in_provision", value=False, unit="-", description="provision flag"
338 ),
339 ]
340 states: List[CasadiState] = [
341 CasadiState(
342 name="T", value=293.15, unit="K", description="Temperature of zone"
343 ),
344 CasadiState(
345 name="T_slack",
346 value=0,
347 unit="K",
348 description="Slack variable of temperature of zone",
349 ),
350 ]
351 parameters: List[CasadiParameter] = [
352 CasadiParameter(
353 name="cp",
354 value=1000,
355 unit="J/kg*K",
356 description="thermal capacity of the air",
357 ),
358 CasadiParameter(
359 name="C", value=100000, unit="J/K", description="thermal capacity of zone"
360 ),
361 CasadiParameter(
362 name="s_T",
363 value=1,
364 unit="-",
365 description="Weight for T in constraint function",
366 ),
367 CasadiParameter(
368 name="r_mDot",
369 value=1,
370 unit="-",
371 description="Weight for mDot in objective function",
372 ),
373 CasadiParameter(
374 name="prep_time", value=0, unit="s", description="time to switch objective"
375 ),
376 CasadiParameter(
377 name="flex_event_duration",
378 value=0,
379 unit="s",
380 description="time to switch objective",
381 ),
382 CasadiParameter(
383 name="market_time",
384 value=0,
385 unit="s",
386 description="time to switch objective",
387 ),
388 CasadiParameter(
389 name="s_P",
390 value=10,
391 unit="-",
392 description="Weight for P in objective function",
393 ),
394 ]
395 outputs: List[CasadiOutput] = [
396 CasadiOutput(name="T_out", unit="K", description="Temperature of zone"),
397 CasadiOutput(
398 name="E_out", unit="kWh", description="Stored energy in the zone w.r.t. 0K"
399 ),
400 CasadiOutput(
401 name="P_el", unit="W", description="The power input to the system"
402 ),
403 ]
406class NegFlexModel(CasadiModel):
407 config: NegFlexModelConfig
409 def setup_system(self):
410 mDot_lower = ca.if_else(
411 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.lb
412 )
413 mDot_upper = ca.if_else(
414 self.time < self.market_time.sym, self._mDot_full.sym, self.mDot.ub
415 )
416 self.T.ode = (
417 self.cp * self.mDot / self.C * (self.T_in - self.T) + self.load / self.C
418 )
419 self.P_el.alg = self.cp * self.mDot * (self.T - self.T_in) / 1000
420 self.T_out.alg = self.T
421 self.E_out.alg = -self.T * self.C / (3600 * 1000)
422 self.constraints = [
423 (self.T_lower, self.T + self.T_slack, inf),
424 (-inf, self.T - self.T_slack, self.T_upper),
425 (0, self.T_slack, inf),
426 (mDot_lower, self.mDot, mDot_upper),
427 ]
428 objective = sum([self.r_mDot * self.mDot, self.s_T * self.T_slack**2])
429 obj_std = objective
430 obj_flex = sum([self.s_T * self.T_slack**2, -self.s_P * self.P_el])
431 return ca.if_else(
432 self.time < self.prep_time.sym + self.market_time.sym,
433 obj_std,
434 ca.if_else(
435 self.time
436 < self.prep_time.sym
437 + self.flex_event_duration.sym
438 + self.market_time.sym,
439 obj_flex,
440 obj_std,
441 ),
442 )