Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 91%
191 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1"""
2Module for representing and calculating flexibility KPIs. It defines Pydantic models
3for scalar and time-series KPIs, and provides methods to compute power, energy,
4and cost metrics for positive and negative flexibility scenarios.
5"""
6from typing import Optional, Union
8import numpy as np
9import pandas as pd
10import pydantic
11from agentlib_mpc.utils import TIME_CONVERSION, TimeConversionTypes
13from agentlib_flexquant.data_structures.globals import (
14 FlexibilityDirections,
15 LINEAR,
16 CONSTANT,
17 INTEGRATION_METHOD,
18)
19from agentlib_flexquant.utils.data_handling import MEAN, fill_nans, strip_multi_index
22class KPI(pydantic.BaseModel):
23 """Class defining attributes of the indicator KPI."""
25 name: str = pydantic.Field(
26 default=None,
27 description="Name of the flexibility KPI",
28 )
29 value: Union[float, None] = pydantic.Field(
30 default=None,
31 description="Value of the flexibility KPI",
32 )
33 unit: str = pydantic.Field(
34 default=None,
35 description="Unit of the flexibility KPI",
36 )
37 direction: Union[FlexibilityDirections, None] = pydantic.Field(
38 default=None, description="Direction of the shadow mpc / flexibility"
39 )
41 class Config:
42 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray
43 in model fields without requiring custom validators."""
45 arbitrary_types_allowed = True
47 def get_kpi_identifier(self):
48 """Get the identifier of the KPI composed of the direction of the flexibility
49 and the KPI name."""
50 name = f"{self.direction}_{self.name}"
51 return name
54class KPISeries(KPI):
55 """Class defining extra attributes of the indicator KPISeries in addition to KPI."""
57 value: Union[pd.Series, None] = pydantic.Field(
58 default=None,
59 description="Value of the flexibility KPI",
60 )
61 dt: Union[pd.Series, None] = pydantic.Field(
62 default=None,
63 description="Time differences between the timestamps of the series in seconds",
64 )
65 integration_method: INTEGRATION_METHOD = pydantic.Field(
66 default=LINEAR, description="Method set to integrate series variable"
67 )
69 def _get_dt(self) -> pd.Series:
70 """Get the forward time differences between the timestamps of the series."""
71 # compute forward differences between consecutive timestamps
72 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1)
73 # set the last value of dt to zero since there is no subsequent time step to compute a
74 # difference with
75 dt.iloc[-1] = 0
76 self.dt = dt
77 return dt
79 def min(self) -> float:
80 """Get the minimum of a KPISeries."""
81 return self.value.min()
83 def max(self) -> float:
84 """Get the maximum of a KPISeries."""
85 return self.value.max()
87 def avg(self) -> float:
88 """Calculate the average value of the KPISeries over time."""
89 if self.dt is None:
90 self._get_dt()
91 delta_t = self.dt.sum()
92 avg = self.integrate() / delta_t
93 return avg
95 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float:
96 """Integrate the value of the KPISeries over time by summing up
97 the product of values and the time difference.
99 Args:
100 time_unit: The time unit the integrated value should have
102 Returns:
103 The integrated value of the KPISeries
105 """
106 if self.integration_method == LINEAR:
107 # Linear integration: apply the trapezoidal rule, which assumes the function changes
108 # linearly between sample points
109 return np.trapz(self.value.values, self.value.index) / TIME_CONVERSION[time_unit]
110 if self.integration_method == CONSTANT:
111 # Constant integration: use a step-wise constant approach by holding the value constant
112 # over each interval
113 return (
114 np.sum(self.value.values[:-1] * self._get_dt().iloc[:-1])
115 / TIME_CONVERSION[time_unit]
116 )
119class FlexibilityKPIs(pydantic.BaseModel):
120 """Class defining the indicator KPIs."""
122 # Direction
123 direction: FlexibilityDirections = pydantic.Field(
124 default=None, description="Direction of the shadow mpc"
125 )
127 # Power / energy KPIs
128 power_flex_full: KPISeries = pydantic.Field(
129 default=KPISeries(name="power_flex_full", unit="kW", integration_method=LINEAR),
130 description="Power flexibility",
131 )
132 power_flex_offer: KPISeries = pydantic.Field(
133 default=KPISeries(name="power_flex_offer", unit="kW", integration_method=LINEAR),
134 description="Power flexibility",
135 )
136 power_flex_offer_max: KPI = pydantic.Field(
137 default=KPI(name="power_flex_offer_max", unit="kW"),
138 description="Maximum power flexibility",
139 )
140 power_flex_offer_min: KPI = pydantic.Field(
141 default=KPI(name="power_flex_offer_min", unit="kW"),
142 description="Minimum power flexibility",
143 )
144 power_flex_offer_avg: KPI = pydantic.Field(
145 default=KPI(name="power_flex_offer_avg", unit="kW"),
146 description="Average power flexibility",
147 )
148 energy_flex: KPI = pydantic.Field(
149 default=KPI(name="energy_flex", unit="kWh"),
150 description="Energy flexibility equals the integral of the power flexibility",
151 )
152 power_flex_within_boundary: KPI = pydantic.Field(
153 default=KPI(name="power_flex_within_boundary", unit="-"),
154 description=(
155 "Variable indicating whether the baseline power and flex power "
156 "align at the horizon end"
157 ),
158 )
160 # Costs KPIs
161 electricity_costs_series: KPISeries = pydantic.Field(
162 default=KPISeries(name="electricity_costs_series", unit="ct/h", integration_method=LINEAR),
163 description="Costs of flexibility",
164 )
165 costs: KPI = pydantic.Field(
166 default=KPI(name="costs", unit="ct"),
167 description="Costs of flexibility",
168 )
169 corrected_costs: KPI = pydantic.Field(
170 default=KPI(name="corrected_costs", unit="ct"),
171 description="Corrected costs of flexibility considering the stored energy in the system",
172 )
173 costs_rel: KPI = pydantic.Field(
174 default=KPI(name="costs_rel", unit="ct/kWh"),
175 description="Costs of flexibility per energy",
176 )
177 corrected_costs_rel: KPI = pydantic.Field(
178 default=KPI(name="corrected_costs_rel", unit="ct/kWh"),
179 description="Corrected costs of flexibility per energy",
180 )
182 def __init__(self, direction: FlexibilityDirections, **data):
183 super().__init__(**data)
184 self.direction = direction
185 for kpi in vars(self).values():
186 if isinstance(kpi, KPI):
187 kpi.direction = self.direction
189 def calculate(
190 self,
191 power_profile_base: pd.Series,
192 power_profile_shadow: pd.Series,
193 electricity_price_series: pd.Series,
194 mpc_time_grid: np.ndarray,
195 flex_offer_time_grid: np.ndarray,
196 stored_energy_base: pd.Series,
197 stored_energy_shadow: pd.Series,
198 enable_energy_costs_correction: bool,
199 calculate_flex_cost: bool,
200 integration_method: INTEGRATION_METHOD,
201 collocation_time_grid: list = None,
202 ):
203 """Calculate the KPIs based on the power and electricity price input profiles.
205 Args:
206 power_profile_base: power profile from baseline mpc
207 power_profile_shadow: power profile from shadow mpc
208 electricity_price_series: time series of electricity prices
209 flex_offer_time_grid: time grid over which the flexibility offer is calculated,
210 for indexing of the power flexibility profiles
211 stored_energy_base: time series of stored energy from baseline mpc
212 stored_energy_shadow: time series of stored energy from shadow mpc
213 enable_energy_costs_correction: whether the energy costs should be corrected
214 calculate_flex_cost: whether the cost of the flexibility should be calculated
215 integration_method: method used for integration of KPISeries e.g. linear, constant
216 collocation_time_grid: Time grid of the mpc output with collocation discretization
219 """
220 # Power / energy KPIs
221 self._calculate_power_flex(
222 power_profile_base=power_profile_base,
223 power_profile_shadow=power_profile_shadow,
224 flex_offer_time_grid=flex_offer_time_grid,
225 integration_method=integration_method,
226 )
227 self._calculate_power_flex_stats(
228 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid
229 )
230 self._calculate_energy_flex(
231 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid
232 )
234 # Costs KPIs
235 if enable_energy_costs_correction:
236 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1]
237 else:
238 stored_energy_diff = 0
240 if calculate_flex_cost:
241 self._calculate_costs(
242 electricity_price_signal=electricity_price_series,
243 stored_energy_diff=stored_energy_diff,
244 integration_method=integration_method,
245 mpc_time_grid=mpc_time_grid,
246 collocation_time_grid=collocation_time_grid,
247 )
248 self._calculate_costs_rel()
250 def _calculate_power_flex(
251 self,
252 power_profile_base: pd.Series,
253 power_profile_shadow: pd.Series,
254 flex_offer_time_grid: np.ndarray,
255 integration_method: INTEGRATION_METHOD,
256 relative_error_acceptance: float = 0.01,
257 ):
258 """Calculate the power flexibility based on the base and flexibility power profiles.
260 Args:
261 power_profile_base: power profile from the baseline mpc
262 power_profile_shadow: power profile from the shadow mpc
263 flex_offer_time_grid: time grid over which the flexibility offer is calculated
264 integration_method: method used for integration of KPISeries e.g. linear, constant
265 relative_error_acceptance: threshold for the relative error between the baseline
266 and shadow mpc to set the power flexibility to zero
268 """
269 if not power_profile_shadow.index.equals(power_profile_base.index):
270 raise ValueError(
271 f"Indices of power profiles do not match.\n"
272 f"Baseline: {power_profile_base.index}\n"
273 f"Shadow: {power_profile_shadow.index}"
274 )
276 # Calculate flexibility
277 if self.direction == "positive":
278 power_flex = power_profile_base - power_profile_shadow
279 elif self.direction == "negative":
280 power_flex = power_profile_shadow - power_profile_base
281 else:
282 raise ValueError(f"Direction of KPIs not properly defined: {self.direction}")
284 # Set values to zero if the difference is small
285 relative_difference = (power_flex / power_profile_base).abs()
286 power_flex.loc[relative_difference < relative_error_acceptance] = 0
287 # Set the first value of power_flex to zero, since it comes from the measurement/simulator
288 # and is the same for baseline and shadow mpcs.
289 # For quantification of flexibility, only power difference is of interest.
290 power_flex.iloc[0] = 0
292 # Set values
293 self.power_flex_full.value = power_flex
294 self.power_flex_offer.value = power_flex.loc[
295 flex_offer_time_grid[0] : flex_offer_time_grid[-1]
296 ]
298 # Set integration method
299 self.power_flex_full.integration_method = integration_method
300 self.power_flex_offer.integration_method = integration_method
302 def _calculate_power_flex_stats(
303 self, mpc_time_grid: np.array, collocation_time_grid: list = None
304 ):
305 """Calculate the characteristic values of the power flexibility for the offer."""
306 if self.power_flex_offer.value is None:
307 raise ValueError("Power flexibility value is empty.")
309 # Calculate characteristic values
310 # max and min of power flex offer
311 power_flex_offer = self.power_flex_offer.value.iloc[:-1].drop(
312 collocation_time_grid, errors="ignore"
313 )
314 power_flex_offer_max = power_flex_offer.max()
315 power_flex_offer_min = power_flex_offer.min()
316 # Average of the power flex offer
317 # Get the series for integration before calculating average
318 power_flex_offer_integration = self.power_flex_offer.__deepcopy__()
319 power_flex_offer_integration.value = self._get_series_for_integration(
320 series=power_flex_offer_integration, mpc_time_grid=mpc_time_grid
321 ).drop(collocation_time_grid, errors="ignore")
322 # Calculate the average and stores the original value
323 power_flex_offer_avg = power_flex_offer_integration.avg()
325 # Set values
326 self.power_flex_offer_max.value = power_flex_offer_max
327 self.power_flex_offer_min.value = power_flex_offer_min
328 self.power_flex_offer_avg.value = power_flex_offer_avg
330 def _get_series_for_integration(
331 self, series: KPISeries, mpc_time_grid: np.ndarray
332 ) -> pd.Series:
333 """Return the KPISeries value sampled on the MPC time grid when the integration method is
334 constant.
336 Otherwise, the original value is returned.
338 Args:
339 series: the KPISeries to get value from
340 mpc_time_grid: the MPC time grid over the horizon
342 """
343 if series.integration_method == CONSTANT:
344 return series.value.reindex(mpc_time_grid).dropna()
345 else:
346 return series.value
348 def _calculate_energy_flex(self, mpc_time_grid, collocation_time_grid: list = None):
349 """Calculate the energy flexibility by integrating the power flexibility
350 of the offer window."""
351 if self.power_flex_offer.value is None:
352 raise ValueError("Power flexibility value of the offer is empty.")
354 # Calculate flexibility
355 # Get the series for integration before calculating average
356 power_flex_offer_integration = self.power_flex_offer.__deepcopy__()
357 power_flex_offer_integration.value = self._get_series_for_integration(
358 series=power_flex_offer_integration, mpc_time_grid=mpc_time_grid
359 ).drop(collocation_time_grid, errors="ignore")
360 # Calculate the energy flex and stores the original value
361 energy_flex = power_flex_offer_integration.integrate(time_unit="hours")
363 # Set value
364 self.energy_flex.value = energy_flex
366 def _calculate_costs(
367 self,
368 electricity_price_signal: pd.Series,
369 stored_energy_diff: float,
370 integration_method: INTEGRATION_METHOD,
371 mpc_time_grid: np.ndarray,
372 collocation_time_grid: list = None,
373 ):
374 """Calculate the costs of the flexibility event based on the electricity costs profile,
375 the power flexibility profile and difference of stored energy.
377 Args:
378 electricity_price_signal: time series of the electricity price signal
379 stored_energy_diff: the difference of the stored energy between baseline and shadow mpc
380 integration_method: the integration method used to integrate KPISeries
381 mpc_time_grid: the MPC time grid over the horizon
382 collocation_time_grid: Time grid of the mpc output with collocation discretization
385 """
386 # Calculate series
387 self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value
389 # Set integration method
390 self.power_flex_full.integration_method = integration_method
391 self.electricity_costs_series.integration_method = integration_method
393 # Get the series for integration before calculating
394 power_flex_full_integration = self.power_flex_full.__deepcopy__()
395 power_flex_full_integration.value = self._get_series_for_integration(
396 series=power_flex_full_integration, mpc_time_grid=mpc_time_grid
397 ).drop(collocation_time_grid, errors="ignore")
398 self.electricity_costs_series.value = (
399 electricity_price_signal * power_flex_full_integration.value
400 ).dropna()
402 # Calculate the costs and stores the original value
403 costs = abs(self.electricity_costs_series.integrate(time_unit="hours"))
405 # correct the costs
406 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal)
408 self.costs.value = costs
409 self.corrected_costs.value = corrected_costs
411 def _calculate_costs_rel(self):
412 """Calculate the relative costs of the flexibility event per energy flexibility."""
413 if self.energy_flex.value == 0:
414 costs_rel = 0
415 corrected_costs_rel = 0
416 else:
417 costs_rel = self.costs.value / self.energy_flex.value
418 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value
420 # Set value
421 self.costs_rel.value = costs_rel
422 self.corrected_costs_rel.value = corrected_costs_rel
424 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]:
425 """Get the KPIs as a dictionary with names or identifier as keys.
427 Args:
428 identifier: If True, the keys are the identifiers of the KPIs,
429 otherwise the name of the KPI.
431 Returns:
432 A dictionary mapping desired KPI keys to KPI.
434 """
435 kpi_dict = {}
436 for kpi in vars(self).values():
437 if isinstance(kpi, KPI):
438 if identifier:
439 kpi_dict[kpi.get_kpi_identifier()] = kpi
440 else:
441 kpi_dict[kpi.name] = kpi
442 return kpi_dict
444 def get_name_dict(self) -> dict[str, str]:
445 """Get KPIs mapping.
447 Returns:
448 Dictionary of the kpis with names as keys and the identifiers as values.
450 """
451 name_dict = {}
452 for name, kpi in self.get_kpi_dict(identifier=False).items():
453 name_dict[name] = kpi.get_kpi_identifier()
454 return name_dict
457class FlexibilityData(pydantic.BaseModel):
458 """Class containing the data for the calculation of the flexibility."""
460 # Time parameters
461 mpc_time_grid: np.ndarray = pydantic.Field(
462 default=None,
463 description="Time grid of the mpcs",
464 )
465 flex_offer_time_grid: np.ndarray = pydantic.Field(
466 default=None,
467 description="Time grid of the flexibility offer",
468 )
469 switch_time: Optional[float] = pydantic.Field(
470 default=None,
471 description="Time of the switch between the preparation and the market time",
472 )
474 # Profiles
475 power_profile_base: pd.Series = pydantic.Field(
476 default=None,
477 description="Base power profile",
478 )
479 power_profile_flex_neg: pd.Series = pydantic.Field(
480 default=None,
481 description="Power profile of the negative flexibility",
482 )
483 power_profile_flex_pos: pd.Series = pydantic.Field(
484 default=None,
485 description="Power profile of the positive flexibility",
486 )
487 stored_energy_profile_base: pd.Series = pydantic.Field(
488 default=None,
489 description="Base profile of the stored electrical energy",
490 )
491 stored_energy_profile_flex_neg: pd.Series = pydantic.Field(
492 default=None,
493 description="Profile of the stored electrical energy for negative flexibility",
494 )
495 stored_energy_profile_flex_pos: pd.Series = pydantic.Field(
496 default=None,
497 description="Profile of the stored elctrical energy for positive flexibility",
498 )
499 electricity_price_series: pd.Series = pydantic.Field(
500 default=None,
501 description="Profile of the electricity price",
502 )
504 # KPIs
505 kpis_pos: FlexibilityKPIs = pydantic.Field(
506 default=FlexibilityKPIs(direction="positive"),
507 description="KPIs for positive flexibility",
508 )
509 kpis_neg: FlexibilityKPIs = pydantic.Field(
510 default=FlexibilityKPIs(direction="negative"),
511 description="KPIs for negative flexibility",
512 )
514 class Config:
515 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray
516 in model fields without requiring custom validators."""
518 arbitrary_types_allowed = True
520 def __init__(
521 self,
522 prep_time: int,
523 market_time: int,
524 flex_event_duration: int,
525 time_step: int,
526 prediction_horizon: int,
527 **data,
528 ):
529 super().__init__(**data)
530 self.switch_time = prep_time + market_time
531 self.flex_offer_time_grid = np.arange(
532 self.switch_time, self.switch_time + flex_event_duration + time_step, time_step
533 )
534 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step + time_step, time_step)
535 self._common_time_grid = None # Initialize common time grid
537 def unify_inputs(self, series: pd.Series, mpc=True) -> pd.Series:
538 """Format the input of the mpc to unify the data.
540 Args:
541 series: Input series from a mpc.
543 Returns:
544 Formatted series.
546 """
547 if mpc:
548 series = series
549 else:
550 series.index = series.index - series.index[0]
552 # Ensure series has values at mpc_time_grid points
553 mpc_points_in_series = np.isin(self.mpc_time_grid, series.index)
554 if not all(mpc_points_in_series):
555 # Create a temp series with all mpc points
556 temp_series = pd.Series(index=pd.Index(self.mpc_time_grid), dtype=series.dtype)
557 # Merge with original series
558 merged_series = pd.concat([series, temp_series])
559 # Remove duplicates keeping the original values
560 merged_series = merged_series[~merged_series.index.duplicated(keep="first")]
561 # Sort by index
562 series = merged_series.sort_index()
563 # Fill NaNs
564 if mpc:
565 # only fill NaN if there is NaN except for the first value
566 if any(np.isnan(series.loc[1:])):
567 series = fill_nans(series=series, method=MEAN)
568 # ensure the first value is nan, since it is calculated with the state from the
569 # controlled system and thus the same for baseline and shadow mpcs
570 series.iloc[0] = np.nan
572 if not mpc:
573 series = series.ffill() # price signals are typically steps
575 # Check for NaNs
576 if any(series.loc[1:].isna()):
577 raise ValueError(
578 f"The mpc time grid is not compatible with the mpc input "
579 f"provided for kpi calculation, "
580 f"which leads to NaN values in the series.\n"
581 f"MPC time grid:{self.mpc_time_grid}\n"
582 f"Series index:{series.index} \n"
583 f"Check time steps of the mpcs as well as casadi simulator "
584 f"step sizes."
585 )
586 return series
588 def calculate(
589 self,
590 enable_energy_costs_correction: bool,
591 calculate_flex_cost: bool,
592 integration_method: INTEGRATION_METHOD,
593 collocation_time_grid: list = None,
594 ):
595 """Calculate the KPIs for the positive and negative flexibility.
597 Args:
598 enable_energy_costs_correction: whether the energy costs should be corrected
599 calculate_flex_cost: whether the cost of the flexibility should be calculated
600 integration_method: method used for integration of KPISeries e.g. linear, constant
601 collocation_time_grid: Time grid of the mpc output with collocation discretization
603 """
604 self.kpis_pos.calculate(
605 power_profile_base=self.power_profile_base,
606 power_profile_shadow=self.power_profile_flex_pos,
607 electricity_price_series=self.electricity_price_series,
608 mpc_time_grid=self.mpc_time_grid,
609 flex_offer_time_grid=self.flex_offer_time_grid,
610 stored_energy_base=self.stored_energy_profile_base,
611 stored_energy_shadow=self.stored_energy_profile_flex_pos,
612 enable_energy_costs_correction=enable_energy_costs_correction,
613 calculate_flex_cost=calculate_flex_cost,
614 integration_method=integration_method,
615 collocation_time_grid=collocation_time_grid,
616 )
617 self.kpis_neg.calculate(
618 power_profile_base=self.power_profile_base,
619 power_profile_shadow=self.power_profile_flex_neg,
620 electricity_price_series=self.electricity_price_series,
621 mpc_time_grid=self.mpc_time_grid,
622 flex_offer_time_grid=self.flex_offer_time_grid,
623 stored_energy_base=self.stored_energy_profile_base,
624 stored_energy_shadow=self.stored_energy_profile_flex_neg,
625 enable_energy_costs_correction=enable_energy_costs_correction,
626 calculate_flex_cost=calculate_flex_cost,
627 integration_method=integration_method,
628 collocation_time_grid=collocation_time_grid,
629 )
630 self.reset_time_grid()
631 return self.kpis_pos, self.kpis_neg
633 def get_kpis(self) -> dict[str, KPI]:
634 """Return combined KPIs from positive and negative flexibility scenarios."""
635 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(
636 identifier=True
637 )
638 return kpis_dict
640 def reset_time_grid(self):
641 """
642 Reset the common time grid.
643 This should be called between different flexibility calculations.
644 """
645 self._common_time_grid = None