Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 92%
168 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
1from typing import Union, Optional
3import numpy
4import pydantic
5import numpy as np
6import pandas as pd
8from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
9from agentlib_flexquant.data_structures.globals import FlexibilityDirections
10from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN
13class KPI(pydantic.BaseModel):
14 """ Class defining attributes of the indicator KPI. """
16 name: str = pydantic.Field(
17 default=None,
18 description="Name of the flexibility KPI",
19 )
20 value: Union[float, None] = pydantic.Field(
21 default=None,
22 description="Value of the flexibility KPI",
23 )
24 unit: str = pydantic.Field(
25 default=None,
26 description="Unit of the flexibility KPI",
27 )
28 direction: Union[FlexibilityDirections, None] = pydantic.Field(
29 default=None,
30 description="Direction of the shadow mpc / flexibility"
31 )
33 class Config:
34 arbitrary_types_allowed = True
36 def get_kpi_identifier(self):
37 name = f"{self.direction}_{self.name}"
38 return name
41class KPISeries(KPI):
42 value: Union[pd.Series, None] = pydantic.Field(
43 default=None,
44 description="Value of the flexibility KPI",
45 )
46 dt: Union[pd.Series, None] = pydantic.Field(
47 default=None,
48 description="Time differences between the timestamps of the series in seconds",
49 )
51 def _get_dt(self) -> pd.Series:
52 """
53 Get the time differences between the timestamps of the series.
54 """
55 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill()
56 self.dt = dt
57 return dt
59 def min(self) -> float:
60 return self.value.min()
62 def max(self) -> float:
63 return self.value.max()
65 def avg(self) -> float:
66 """
67 Calculate the average value of the KPI over time.
68 """
69 if self.dt is None:
70 self._get_dt()
71 delta_t = self.dt.sum()
72 avg = self.integrate() / delta_t
73 return avg
75 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float:
76 """
77 Integrate the value of the KPI over time by summing up the product of values and the time difference.
78 """
79 if self.dt is None:
80 self._get_dt()
81 products = self.value * self.dt / TIME_CONVERSION[time_unit]
82 integral = products.sum()
83 return integral
86class FlexibilityKPIs(pydantic.BaseModel):
87 """
88 Class defining the indicator KPIs.
89 """
90 # Direction
91 direction: FlexibilityDirections = pydantic.Field(
92 default=None,
93 description="Direction of the shadow mpc"
94 )
96 # Power / energy KPIs
97 power_flex_full: KPISeries = pydantic.Field(
98 default=KPISeries(
99 name="power_flex_full",
100 unit="kW"
101 ),
102 description="Power flexibility",
103 )
104 power_flex_offer: KPISeries = pydantic.Field(
105 default=KPISeries(
106 name="power_flex_offer",
107 unit="kW"
108 ),
109 description="Power flexibility",
110 )
111 power_flex_offer_max: KPI = pydantic.Field(
112 default=KPI(
113 name="power_flex_offer_max",
114 unit="kW"
115 ),
116 description="Maximum power flexibility",
117 )
118 power_flex_offer_min: KPI = pydantic.Field(
119 default=KPI(
120 name="power_flex_offer_min",
121 unit="kW"
122 ),
123 description="Minimum power flexibility",
124 )
125 power_flex_offer_avg: KPI = pydantic.Field(
126 default=KPI(
127 name="power_flex_offer_avg",
128 unit="kW"
129 ),
130 description="Average power flexibility",
131 )
132 energy_flex: KPI = pydantic.Field(
133 default=KPI(
134 name="energy_flex",
135 unit="kWh"
136 ),
137 description="Energy flexibility equals the integral of the power flexibility",
138 )
139 power_flex_within_boundary: KPI = pydantic.Field(
140 default=KPI(
141 name="power_flex_within_boundary",
142 unit="-"
143 ),
144 description="Variable indicating whether the baseline power and flex power align at the horizon end",
145 )
147 # Costs KPIs
148 electricity_costs_series: KPISeries = pydantic.Field(
149 default=KPISeries(
150 name="electricity_costs_series",
151 unit="ct/h"
152 ),
153 description="Costs of flexibility",
154 )
155 costs: KPI = pydantic.Field(
156 default=KPI(
157 name="costs",
158 unit="ct"
159 ),
160 description="Costs of flexibility",
161 )
162 corrected_costs: KPI = pydantic.Field(
163 default=KPI(
164 name="corrected_costs",
165 unit="ct"
166 ),
167 description="Corrected costs of flexibility considering the stored energy in the system",
168 )
169 costs_rel: KPI = pydantic.Field(
170 default=KPI(
171 name="costs_rel",
172 unit="ct/kWh"
173 ),
174 description="Costs of flexibility per energy",
175 )
176 corrected_costs_rel: KPI = pydantic.Field(
177 default=KPI(
178 name="corrected_costs_rel",
179 unit="ct/kWh"
180 ),
181 description="Corrected costs of flexibility per energy",
182 )
184 def __init__(self, direction: FlexibilityDirections, **data):
185 super().__init__(**data)
186 self.direction = direction
187 for kpi in vars(self).values():
188 if isinstance(kpi, KPI):
189 kpi.direction = self.direction
191 def calculate(
192 self,
193 power_profile_base: pd.Series,
194 power_profile_shadow: pd.Series,
195 electricity_price_series: pd.Series,
196 mpc_time_grid: np.ndarray,
197 flex_offer_time_grid: np.ndarray,
198 stored_energy_base: pd.Series,
199 stored_energy_shadow: pd.Series,
200 enable_energy_costs_correction: bool,
201 calculate_flex_cost: bool
202 ):
203 """
204 Calculate the KPIs based on the power and electricity input profiles.
205 Time grids needed for indexing of the power flexibility profiles.
206 """
207 # Power / energy KPIs
208 self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid)
209 self._calculate_power_flex_stats()
210 self._calculate_energy_flex()
212 # Costs KPIs
213 if enable_energy_costs_correction:
214 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1]
215 else:
216 stored_energy_diff = 0
218 if calculate_flex_cost:
219 self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff)
220 self._calculate_costs_rel()
223 def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series,
224 flex_offer_time_grid: np.ndarray,
225 relative_error_acceptance: float = 0.01) -> pd.Series:
226 """
227 Calculate the power flexibility based on the base and flexibility power profiles.
229 Args:
230 relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero
231 """
232 if not power_profile_shadow.index.equals(power_profile_base.index):
233 raise ValueError(f"Indices of power profiles do not match.\n"
234 f"Baseline: {power_profile_base.index}\n"
235 f"Shadow: {power_profile_shadow.index}")
237 # Calculate flexibility
238 if self.direction == "positive":
239 power_flex = power_profile_base - power_profile_shadow
240 elif self.direction == "negative":
241 power_flex = power_profile_shadow - power_profile_base
242 else:
243 raise ValueError(f"Direction of KPIs not properly defined: {self.direction}")
245 # Set values to zero if the difference is small
246 relative_difference = (power_flex / power_profile_base).abs()
247 power_flex.loc[relative_difference < relative_error_acceptance] = 0
249 # Set values
250 self.power_flex_full.value = power_flex
251 self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]]
252 return power_flex
254 def _calculate_power_flex_stats(self) -> [float]:
255 """
256 Calculate the characteristic values of the power flexibility for the offer.
257 """
258 if self.power_flex_offer.value is None:
259 raise ValueError("Power flexibility value is empty.")
261 # Calculate characteristic values
262 power_flex_offer_max = self.power_flex_offer.max()
263 power_flex_offer_min = self.power_flex_offer.min()
264 power_flex_offer_avg = self.power_flex_offer.avg()
266 # Set values
267 self.power_flex_offer_max.value = power_flex_offer_max
268 self.power_flex_offer_min.value = power_flex_offer_min
269 self.power_flex_offer_avg.value = power_flex_offer_avg
270 return power_flex_offer_max, power_flex_offer_min, power_flex_offer_avg
272 def _calculate_energy_flex(self) -> float:
273 """
274 Calculate the energy flexibility by integrating the power flexibility of the offer window.
275 """
276 if self.power_flex_offer.value is None:
277 raise ValueError("Power flexibility value of the offer is empty.")
279 # Calculate flexibility
280 energy_flex = self.power_flex_offer.integrate(time_unit="hours")
282 # Set value
283 self.energy_flex.value = energy_flex
284 return energy_flex
286 def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float) -> [float, pd.Series]:
287 """
288 Calculate the costs of the flexibility event based on the electricity costs profile and the power flexibility profile.
289 """
290 # Calculate series
291 self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value
293 # Calculate scalar
294 costs = abs(self.electricity_costs_series.integrate(time_unit="hours"))
296 # correct the costs
297 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal)
299 self.costs.value = costs
300 self.corrected_costs.value = corrected_costs
302 def _calculate_costs_rel(self) -> float:
303 """
304 Calculate the relative costs of the flexibility event per energy flexibility.
305 """
306 if self.energy_flex.value == 0:
307 costs_rel = 0
308 corrected_costs_rel = 0
309 else:
310 costs_rel = self.costs.value / self.energy_flex.value
311 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value
313 # Set value
314 self.costs_rel.value = costs_rel
315 self.corrected_costs_rel.value = corrected_costs_rel
317 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]:
318 """
319 Get the KPIs as a dictionary with names or identifier as keys.
321 Args:
322 identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the kpi.
323 """
324 kpi_dict = {}
325 for kpi in vars(self).values():
326 if isinstance(kpi, KPI):
327 if identifier:
328 kpi_dict[kpi.get_kpi_identifier()] = kpi
329 else:
330 kpi_dict[kpi.name] = kpi
331 return kpi_dict
333 def get_name_dict(self) -> dict[str, str]:
334 """
335 Returns:
336 Dictionary of the kpis with names as keys and the identifiers as values.
337 """
338 name_dict = {}
339 for name, kpi in self.get_kpi_dict(identifier=False).items():
340 name_dict[name] = kpi.get_kpi_identifier()
341 return name_dict
344class FlexibilityData(pydantic.BaseModel):
345 """
346 Class containing the data for the calculation of the flexibility.
347 """
348 # Time parameters
349 mpc_time_grid: np.ndarray = pydantic.Field(
350 default=None,
351 description="Time grid of the mpcs",
352 )
353 flex_offer_time_grid: np.ndarray = pydantic.Field(
354 default=None,
355 description="Time grid of the flexibility offer",
356 )
357 switch_time: Optional[float] = pydantic.Field(
358 default=None,
359 description="Time of the switch between the preparation and the market time",
360 )
362 # Profiles
363 power_profile_base: pd.Series = pydantic.Field(
364 default=None,
365 description="Base power profile",
366 )
367 power_profile_flex_neg: pd.Series = pydantic.Field(
368 default=None,
369 description="Power profile of the negative flexibility",
370 )
371 power_profile_flex_pos: pd.Series = pydantic.Field(
372 default=None,
373 description="Power profile of the positive flexibility",
374 )
375 stored_energy_profile_base: pd.Series = pydantic.Field(
376 default=None,
377 description="Base profile of the stored electrical energy",
378 )
379 stored_energy_profile_flex_neg: pd.Series = pydantic.Field(
380 default=None,
381 description="Profile of the stored electrical energy for negative flexibility",
382 )
383 stored_energy_profile_flex_pos: pd.Series = pydantic.Field(
384 default=None,
385 description="Profile of the stored elctrical energy for positive flexibility",
386 )
387 electricity_price_series: pd.Series = pydantic.Field(
388 default=None,
389 description="Profile of the electricity price",
390 )
393 # KPIs
394 kpis_pos: FlexibilityKPIs = pydantic.Field(
395 default=FlexibilityKPIs(direction="positive"),
396 description="KPIs for positive flexibility",
397 )
398 kpis_neg: FlexibilityKPIs = pydantic.Field(
399 default=FlexibilityKPIs(direction="negative"),
400 description="KPIs for negative flexibility",
401 )
403 class Config:
404 arbitrary_types_allowed = True
406 def __init__(self, prep_time: int, market_time: int, flex_event_duration: int,
407 time_step: int, prediction_horizon: int, **data):
408 super().__init__(**data)
409 self.switch_time = prep_time + market_time
410 self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step)
411 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step)
413 def format_predictor_inputs(self, series: pd.Series) -> pd.Series:
414 """
415 Format the input of the predictor to unify the data.
417 Args:
418 series: Input series from a predictor.
420 Returns:
421 Formatted series.
422 """
423 series.index = series.index - series.index[0]
424 series = series.reindex(self.mpc_time_grid)
425 if any(series.isna()):
426 raise ValueError(f"The mpc time grid is not compatible with the predictor "
427 f"input, which leads to NaN values in the series.\n"
428 f"MPC time grid:{self.mpc_time_grid}\n"
429 f"Series index:{series.index}")
430 return series
432 def format_mpc_inputs(self, series: pd.Series) -> pd.Series:
433 """
434 Format the input of the mpc to unify the data.
436 Args:
437 series: Input series from a mpc.
439 Returns:
440 Formatted series.
441 """
442 series = strip_multi_index(series)
443 if any(series.isna()):
444 series = fill_nans(series=series, method=MEAN)
445 series = series.reindex(self.mpc_time_grid)
446 if any(series.isna()):
447 raise ValueError(f"The mpc time grid is not compatible with the mpc input, "
448 f"which leads to NaN values in the series.\n"
449 f"MPC time grid:{self.mpc_time_grid}\n"
450 f"Series index:{series.index}")
451 return series
453 def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool) -> [FlexibilityKPIs, FlexibilityKPIs]:
454 """
455 Calculate the KPIs for the positive and negative flexibility.
457 Returns:
458 positive KPIs, negative KPIs
459 """
460 self.kpis_pos.calculate(
461 power_profile_base=self.power_profile_base,
462 power_profile_shadow=self.power_profile_flex_pos,
463 electricity_price_series=self.electricity_price_series,
464 mpc_time_grid=self.mpc_time_grid,
465 flex_offer_time_grid=self.flex_offer_time_grid,
466 stored_energy_base=self.stored_energy_profile_base,
467 stored_energy_shadow=self.stored_energy_profile_flex_pos,
468 enable_energy_costs_correction=enable_energy_costs_correction,
469 calculate_flex_cost=calculate_flex_cost
470 )
471 self.kpis_neg.calculate(
472 power_profile_base=self.power_profile_base,
473 power_profile_shadow=self.power_profile_flex_neg,
474 electricity_price_series=self.electricity_price_series,
475 mpc_time_grid=self.mpc_time_grid,
476 flex_offer_time_grid=self.flex_offer_time_grid,
477 stored_energy_base=self.stored_energy_profile_base,
478 stored_energy_shadow=self.stored_energy_profile_flex_neg,
479 enable_energy_costs_correction=enable_energy_costs_correction,
480 calculate_flex_cost=calculate_flex_cost
481 )
482 return self.kpis_pos, self.kpis_neg
484 def get_kpis(self) -> dict[str, KPI]:
485 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True)
486 return kpis_dict