Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 91%
163 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
1import pydantic
2import numpy as np
3import pandas as pd
4from typing import Union, Optional
5from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
6from agentlib_flexquant.data_structures.globals import FlexibilityDirections
7from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN
10class KPI(pydantic.BaseModel):
11 """Class defining attributes of the indicator KPI."""
12 name: str = pydantic.Field(
13 default=None,
14 description="Name of the flexibility KPI",
15 )
16 value: Union[float, None] = pydantic.Field(
17 default=None,
18 description="Value of the flexibility KPI",
19 )
20 unit: str = pydantic.Field(
21 default=None,
22 description="Unit of the flexibility KPI",
23 )
24 direction: Union[FlexibilityDirections, None] = pydantic.Field(
25 default=None,
26 description="Direction of the shadow mpc / flexibility"
27 )
29 class Config:
30 arbitrary_types_allowed = True
32 def get_kpi_identifier(self):
33 """Get the identifier of the KPI composed of the direction of the flexibility and the KPI name."""
34 name = f"{self.direction}_{self.name}"
35 return name
38class KPISeries(KPI):
39 """Class defining extra attributes of the indicator KPISeries in addition to KPI."""
40 value: Union[pd.Series, None] = pydantic.Field(
41 default=None,
42 description="Value of the flexibility KPI",
43 )
44 dt: Union[pd.Series, None] = pydantic.Field(
45 default=None,
46 description="Time differences between the timestamps of the series in seconds",
47 )
49 def _get_dt(self) -> pd.Series:
50 """Get the time differences between the timestamps of the series."""
51 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill()
52 self.dt = dt
53 return dt
55 def min(self) -> float:
56 """Get the minimum of a KPISeries."""
57 return self.value.min()
59 def max(self) -> float:
60 """Get the maximum of a KPISeries."""
61 return self.value.max()
63 def avg(self) -> float:
64 """Calculate the average value of the KPISeries over time."""
65 if self.dt is None:
66 self._get_dt()
67 delta_t = self.dt.sum()
68 avg = self.integrate() / delta_t
69 return avg
71 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float:
72 """Integrate the value of the KPISeries over time by summing up the product of values and the time difference.
74 Args:
75 time_unit: The time unit the integrated value should have
77 Returns:
78 The integrated value of the KPISeries
80 """
81 if self.dt is None:
82 self._get_dt()
83 products = self.value * self.dt / TIME_CONVERSION[time_unit]
84 integral = products.sum()
85 return integral
88class FlexibilityKPIs(pydantic.BaseModel):
89 """Class defining the indicator KPIs."""
90 # Direction
91 direction: FlexibilityDirections = pydantic.Field(
92 default=None,
93 description="Direction of the shadow mpc"
94 )
96 # Power / energy KPIs
97 power_flex_full: KPISeries = pydantic.Field(
98 default=KPISeries(
99 name="power_flex_full",
100 unit="kW"
101 ),
102 description="Power flexibility",
103 )
104 power_flex_offer: KPISeries = pydantic.Field(
105 default=KPISeries(
106 name="power_flex_offer",
107 unit="kW"
108 ),
109 description="Power flexibility",
110 )
111 power_flex_offer_max: KPI = pydantic.Field(
112 default=KPI(
113 name="power_flex_offer_max",
114 unit="kW"
115 ),
116 description="Maximum power flexibility",
117 )
118 power_flex_offer_min: KPI = pydantic.Field(
119 default=KPI(
120 name="power_flex_offer_min",
121 unit="kW"
122 ),
123 description="Minimum power flexibility",
124 )
125 power_flex_offer_avg: KPI = pydantic.Field(
126 default=KPI(
127 name="power_flex_offer_avg",
128 unit="kW"
129 ),
130 description="Average power flexibility",
131 )
132 energy_flex: KPI = pydantic.Field(
133 default=KPI(
134 name="energy_flex",
135 unit="kWh"
136 ),
137 description="Energy flexibility equals the integral of the power flexibility",
138 )
139 power_flex_within_boundary: KPI = pydantic.Field(
140 default=KPI(
141 name="power_flex_within_boundary",
142 unit="-"
143 ),
144 description="Variable indicating whether the baseline power and flex power align at the horizon end",
145 )
147 # Costs KPIs
148 electricity_costs_series: KPISeries = pydantic.Field(
149 default=KPISeries(
150 name="electricity_costs_series",
151 unit="ct/h"
152 ),
153 description="Costs of flexibility",
154 )
155 costs: KPI = pydantic.Field(
156 default=KPI(
157 name="costs",
158 unit="ct"
159 ),
160 description="Costs of flexibility",
161 )
162 corrected_costs: KPI = pydantic.Field(
163 default=KPI(
164 name="corrected_costs",
165 unit="ct"
166 ),
167 description="Corrected costs of flexibility considering the stored energy in the system",
168 )
169 costs_rel: KPI = pydantic.Field(
170 default=KPI(
171 name="costs_rel",
172 unit="ct/kWh"
173 ),
174 description="Costs of flexibility per energy",
175 )
176 corrected_costs_rel: KPI = pydantic.Field(
177 default=KPI(
178 name="corrected_costs_rel",
179 unit="ct/kWh"
180 ),
181 description="Corrected costs of flexibility per energy",
182 )
184 def __init__(self, direction: FlexibilityDirections, **data):
185 super().__init__(**data)
186 self.direction = direction
187 for kpi in vars(self).values():
188 if isinstance(kpi, KPI):
189 kpi.direction = self.direction
191 def calculate(
192 self,
193 power_profile_base: pd.Series,
194 power_profile_shadow: pd.Series,
195 electricity_price_series: pd.Series,
196 mpc_time_grid: np.ndarray,
197 flex_offer_time_grid: np.ndarray,
198 stored_energy_base: pd.Series,
199 stored_energy_shadow: pd.Series,
200 enable_energy_costs_correction: bool,
201 calculate_flex_cost: bool
202 ):
203 """Calculate the KPIs based on the power and electricity price input profiles.
205 Args:
206 power_profile_base: power profile from baseline mpc
207 power_profile_shadow: power profile from shadow mpc
208 electricity_price_series: time series of electricity prices
209 mpc_time_grid: time grid over the MPC horizon with intervals of time_step
210 flex_offer_time_grid: time grid over which the flexibility offer is calculated, for indexing of the power flexibility profiles
211 stored_energy_base: time series of stored energy from baseline mpc
212 stored_energy_shadow: time series of stored energy from shadow mpc
213 enable_energy_costs_correction: whether the energy costs should be corrected
214 calculate_flex_cost: whether the cost of the flexibility should be calculated
216 """
217 # Power / energy KPIs
218 self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid)
219 self._calculate_power_flex_stats()
220 self._calculate_energy_flex()
222 # Costs KPIs
223 if enable_energy_costs_correction:
224 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1]
225 else:
226 stored_energy_diff = 0
228 if calculate_flex_cost:
229 self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff)
230 self._calculate_costs_rel()
232 def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series,
233 flex_offer_time_grid: np.ndarray, relative_error_acceptance: float = 0.01):
234 """Calculate the power flexibility based on the base and flexibility power profiles.
236 Args:
237 power_profile_base: power profile from the baseline mpc
238 power_profile_shadow: power profile from the shadow mpc
239 flex_offer_time_grid: time grid over which the flexibility offer is calculated
240 relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero
242 """
243 if not power_profile_shadow.index.equals(power_profile_base.index):
244 raise ValueError(f"Indices of power profiles do not match.\n"
245 f"Baseline: {power_profile_base.index}\n"
246 f"Shadow: {power_profile_shadow.index}")
248 # Calculate flexibility
249 if self.direction == "positive":
250 power_flex = power_profile_base - power_profile_shadow
251 elif self.direction == "negative":
252 power_flex = power_profile_shadow - power_profile_base
253 else:
254 raise ValueError(f"Direction of KPIs not properly defined: {self.direction}")
256 # Set values to zero if the difference is small
257 relative_difference = (power_flex / power_profile_base).abs()
258 power_flex.loc[relative_difference < relative_error_acceptance] = 0
260 # Set values
261 self.power_flex_full.value = power_flex
262 self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]]
264 def _calculate_power_flex_stats(self):
265 """Calculate the characteristic values of the power flexibility for the offer."""
266 if self.power_flex_offer.value is None:
267 raise ValueError("Power flexibility value is empty.")
269 # Calculate characteristic values
270 power_flex_offer_max = self.power_flex_offer.max()
271 power_flex_offer_min = self.power_flex_offer.min()
272 power_flex_offer_avg = self.power_flex_offer.avg()
274 # Set values
275 self.power_flex_offer_max.value = power_flex_offer_max
276 self.power_flex_offer_min.value = power_flex_offer_min
277 self.power_flex_offer_avg.value = power_flex_offer_avg
279 def _calculate_energy_flex(self):
280 """Calculate the energy flexibility by integrating the power flexibility of the offer window."""
281 if self.power_flex_offer.value is None:
282 raise ValueError("Power flexibility value of the offer is empty.")
284 # Calculate flexibility
285 energy_flex = self.power_flex_offer.integrate(time_unit="hours")
287 # Set value
288 self.energy_flex.value = energy_flex
290 def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float):
291 """Calculate the costs of the flexibility event based on the electricity costs profile, the power flexibility profile and difference of stored energy.
293 Args:
294 electricity_price_signal: time series of the electricity price signal
295 stored_energy_diff: the difference of the stored energy between baseline and shadow mpc
297 """
298 # Calculate series
299 self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value
301 # Calculate scalar
302 costs = abs(self.electricity_costs_series.integrate(time_unit="hours"))
304 # correct the costs
305 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal)
307 self.costs.value = costs
308 self.corrected_costs.value = corrected_costs
310 def _calculate_costs_rel(self):
311 """Calculate the relative costs of the flexibility event per energy flexibility."""
312 if self.energy_flex.value == 0:
313 costs_rel = 0
314 corrected_costs_rel = 0
315 else:
316 costs_rel = self.costs.value / self.energy_flex.value
317 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value
319 # Set value
320 self.costs_rel.value = costs_rel
321 self.corrected_costs_rel.value = corrected_costs_rel
323 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]:
324 """Get the KPIs as a dictionary with names or identifier as keys.
326 Args:
327 identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the KPI.
329 Returns:
330 A dictionary mapping desired KPI keys to KPI.
332 """
333 kpi_dict = {}
334 for kpi in vars(self).values():
335 if isinstance(kpi, KPI):
336 if identifier:
337 kpi_dict[kpi.get_kpi_identifier()] = kpi
338 else:
339 kpi_dict[kpi.name] = kpi
340 return kpi_dict
342 def get_name_dict(self) -> dict[str, str]:
343 """Get KPIs mapping.
345 Returns:
346 Dictionary of the kpis with names as keys and the identifiers as values.
348 """
349 name_dict = {}
350 for name, kpi in self.get_kpi_dict(identifier=False).items():
351 name_dict[name] = kpi.get_kpi_identifier()
352 return name_dict
355class FlexibilityData(pydantic.BaseModel):
356 """Class containing the data for the calculation of the flexibility."""
357 # Time parameters
358 mpc_time_grid: np.ndarray = pydantic.Field(
359 default=None,
360 description="Time grid of the mpcs",
361 )
362 flex_offer_time_grid: np.ndarray = pydantic.Field(
363 default=None,
364 description="Time grid of the flexibility offer",
365 )
366 switch_time: Optional[float] = pydantic.Field(
367 default=None,
368 description="Time of the switch between the preparation and the market time",
369 )
371 # Profiles
372 power_profile_base: pd.Series = pydantic.Field(
373 default=None,
374 description="Base power profile",
375 )
376 power_profile_flex_neg: pd.Series = pydantic.Field(
377 default=None,
378 description="Power profile of the negative flexibility",
379 )
380 power_profile_flex_pos: pd.Series = pydantic.Field(
381 default=None,
382 description="Power profile of the positive flexibility",
383 )
384 stored_energy_profile_base: pd.Series = pydantic.Field(
385 default=None,
386 description="Base profile of the stored electrical energy",
387 )
388 stored_energy_profile_flex_neg: pd.Series = pydantic.Field(
389 default=None,
390 description="Profile of the stored electrical energy for negative flexibility",
391 )
392 stored_energy_profile_flex_pos: pd.Series = pydantic.Field(
393 default=None,
394 description="Profile of the stored elctrical energy for positive flexibility",
395 )
396 electricity_price_series: pd.Series = pydantic.Field(
397 default=None,
398 description="Profile of the electricity price",
399 )
401 # KPIs
402 kpis_pos: FlexibilityKPIs = pydantic.Field(
403 default=FlexibilityKPIs(direction="positive"),
404 description="KPIs for positive flexibility",
405 )
406 kpis_neg: FlexibilityKPIs = pydantic.Field(
407 default=FlexibilityKPIs(direction="negative"),
408 description="KPIs for negative flexibility",
409 )
411 class Config:
412 arbitrary_types_allowed = True
414 def __init__(self, prep_time: int, market_time: int, flex_event_duration: int,
415 time_step: int, prediction_horizon: int, **data):
416 super().__init__(**data)
417 self.switch_time = prep_time + market_time
418 self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step)
419 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step)
421 def format_predictor_inputs(self, series: pd.Series) -> pd.Series:
422 """Format the input of the predictor to unify the data.
424 Args:
425 series: Input series from a predictor.
427 Returns:
428 Formatted series.
430 """
431 series.index = series.index - series.index[0]
432 series = series.reindex(self.mpc_time_grid)
433 if any(series.isna()):
434 raise ValueError(f"The mpc time grid is not compatible with the predictor "
435 f"input, which leads to NaN values in the series.\n"
436 f"MPC time grid:{self.mpc_time_grid}\n"
437 f"Series index:{series.index}")
438 return series
440 def format_mpc_inputs(self, series: pd.Series) -> pd.Series:
441 """Format the input of the mpc to unify the data.
443 Args:
444 series: Input series from a mpc.
446 Returns:
447 Formatted series.
449 """
450 series = strip_multi_index(series)
451 if any(series.isna()):
452 series = fill_nans(series=series, method=MEAN)
453 series = series.reindex(self.mpc_time_grid)
454 if any(series.isna()):
455 raise ValueError(f"The mpc time grid is not compatible with the mpc input, "
456 f"which leads to NaN values in the series.\n"
457 f"MPC time grid:{self.mpc_time_grid}\n"
458 f"Series index:{series.index}")
459 return series
461 def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool):
462 """Calculate the KPIs for the positive and negative flexibility.
464 Args:
465 enable_energy_costs_correction: whether the energy costs should be corrected
466 calculate_flex_cost: whether the cost of the flexibility should be calculated
468 """
469 self.kpis_pos.calculate(
470 power_profile_base=self.power_profile_base,
471 power_profile_shadow=self.power_profile_flex_pos,
472 electricity_price_series=self.electricity_price_series,
473 mpc_time_grid=self.mpc_time_grid,
474 flex_offer_time_grid=self.flex_offer_time_grid,
475 stored_energy_base=self.stored_energy_profile_base,
476 stored_energy_shadow=self.stored_energy_profile_flex_pos,
477 enable_energy_costs_correction=enable_energy_costs_correction,
478 calculate_flex_cost=calculate_flex_cost
479 )
480 self.kpis_neg.calculate(
481 power_profile_base=self.power_profile_base,
482 power_profile_shadow=self.power_profile_flex_neg,
483 electricity_price_series=self.electricity_price_series,
484 mpc_time_grid=self.mpc_time_grid,
485 flex_offer_time_grid=self.flex_offer_time_grid,
486 stored_energy_base=self.stored_energy_profile_base,
487 stored_energy_shadow=self.stored_energy_profile_flex_neg,
488 enable_energy_costs_correction=enable_energy_costs_correction,
489 calculate_flex_cost=calculate_flex_cost
490 )
492 def get_kpis(self) -> dict[str, KPI]:
493 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True)
494 return kpis_dict