Source code for agentlib_flexquant.data_structures.flex_kpis

import pydantic
import numpy as np
import pandas as pd
from typing import Union, Optional
from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
from agentlib_flexquant.data_structures.globals import FlexibilityDirections
from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN


[docs]class KPI(pydantic.BaseModel): """Class defining attributes of the indicator KPI.""" name: str = pydantic.Field( default=None, description="Name of the flexibility KPI", ) value: Union[float, None] = pydantic.Field( default=None, description="Value of the flexibility KPI", ) unit: str = pydantic.Field( default=None, description="Unit of the flexibility KPI", ) direction: Union[FlexibilityDirections, None] = pydantic.Field( default=None, description="Direction of the shadow mpc / flexibility" )
[docs] class Config: arbitrary_types_allowed = True
[docs] def get_kpi_identifier(self): """Get the identifier of the KPI composed of the direction of the flexibility and the KPI name.""" name = f"{self.direction}_{self.name}" return name
[docs]class KPISeries(KPI): """Class defining extra attributes of the indicator KPISeries in addition to KPI.""" value: Union[pd.Series, None] = pydantic.Field( default=None, description="Value of the flexibility KPI", ) dt: Union[pd.Series, None] = pydantic.Field( default=None, description="Time differences between the timestamps of the series in seconds", ) def _get_dt(self) -> pd.Series: """Get the time differences between the timestamps of the series.""" dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill() self.dt = dt return dt
[docs] def min(self) -> float: """Get the minimum of a KPISeries.""" return self.value.min()
[docs] def max(self) -> float: """Get the maximum of a KPISeries.""" return self.value.max()
[docs] def avg(self) -> float: """Calculate the average value of the KPISeries over time.""" if self.dt is None: self._get_dt() delta_t = self.dt.sum() avg = self.integrate() / delta_t return avg
[docs] def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: """Integrate the value of the KPISeries over time by summing up the product of values and the time difference. Args: time_unit: The time unit the integrated value should have Returns: The integrated value of the KPISeries """ if self.dt is None: self._get_dt() products = self.value * self.dt / TIME_CONVERSION[time_unit] integral = products.sum() return integral
[docs]class FlexibilityKPIs(pydantic.BaseModel): """Class defining the indicator KPIs.""" # Direction direction: FlexibilityDirections = pydantic.Field( default=None, description="Direction of the shadow mpc" ) # Power / energy KPIs power_flex_full: KPISeries = pydantic.Field( default=KPISeries( name="power_flex_full", unit="kW" ), description="Power flexibility", ) power_flex_offer: KPISeries = pydantic.Field( default=KPISeries( name="power_flex_offer", unit="kW" ), description="Power flexibility", ) power_flex_offer_max: KPI = pydantic.Field( default=KPI( name="power_flex_offer_max", unit="kW" ), description="Maximum power flexibility", ) power_flex_offer_min: KPI = pydantic.Field( default=KPI( name="power_flex_offer_min", unit="kW" ), description="Minimum power flexibility", ) power_flex_offer_avg: KPI = pydantic.Field( default=KPI( name="power_flex_offer_avg", unit="kW" ), description="Average power flexibility", ) energy_flex: KPI = pydantic.Field( default=KPI( name="energy_flex", unit="kWh" ), description="Energy flexibility equals the integral of the power flexibility", ) power_flex_within_boundary: KPI = pydantic.Field( default=KPI( name="power_flex_within_boundary", unit="-" ), description="Variable indicating whether the baseline power and flex power align at the horizon end", ) # Costs KPIs electricity_costs_series: KPISeries = pydantic.Field( default=KPISeries( name="electricity_costs_series", unit="ct/h" ), description="Costs of flexibility", ) costs: KPI = pydantic.Field( default=KPI( name="costs", unit="ct" ), description="Costs of flexibility", ) corrected_costs: KPI = pydantic.Field( default=KPI( name="corrected_costs", unit="ct" ), description="Corrected costs of flexibility considering the stored energy in the system", ) costs_rel: KPI = pydantic.Field( default=KPI( name="costs_rel", unit="ct/kWh" ), description="Costs of flexibility per energy", ) corrected_costs_rel: KPI = pydantic.Field( default=KPI( name="corrected_costs_rel", unit="ct/kWh" ), description="Corrected costs of flexibility per energy", ) def __init__(self, direction: FlexibilityDirections, **data): super().__init__(**data) self.direction = direction for kpi in vars(self).values(): if isinstance(kpi, KPI): kpi.direction = self.direction
[docs] def calculate( self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, electricity_price_series: pd.Series, mpc_time_grid: np.ndarray, flex_offer_time_grid: np.ndarray, stored_energy_base: pd.Series, stored_energy_shadow: pd.Series, enable_energy_costs_correction: bool, calculate_flex_cost: bool ): """Calculate the KPIs based on the power and electricity price input profiles. Args: power_profile_base: power profile from baseline mpc power_profile_shadow: power profile from shadow mpc electricity_price_series: time series of electricity prices mpc_time_grid: time grid over the MPC horizon with intervals of time_step flex_offer_time_grid: time grid over which the flexibility offer is calculated, for indexing of the power flexibility profiles stored_energy_base: time series of stored energy from baseline mpc stored_energy_shadow: time series of stored energy from shadow mpc enable_energy_costs_correction: whether the energy costs should be corrected calculate_flex_cost: whether the cost of the flexibility should be calculated """ # Power / energy KPIs self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid) self._calculate_power_flex_stats() self._calculate_energy_flex() # Costs KPIs if enable_energy_costs_correction: stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] else: stored_energy_diff = 0 if calculate_flex_cost: self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff) self._calculate_costs_rel()
def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, flex_offer_time_grid: np.ndarray, relative_error_acceptance: float = 0.01): """Calculate the power flexibility based on the base and flexibility power profiles. Args: power_profile_base: power profile from the baseline mpc power_profile_shadow: power profile from the shadow mpc flex_offer_time_grid: time grid over which the flexibility offer is calculated relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero """ if not power_profile_shadow.index.equals(power_profile_base.index): raise ValueError(f"Indices of power profiles do not match.\n" f"Baseline: {power_profile_base.index}\n" f"Shadow: {power_profile_shadow.index}") # Calculate flexibility if self.direction == "positive": power_flex = power_profile_base - power_profile_shadow elif self.direction == "negative": power_flex = power_profile_shadow - power_profile_base else: raise ValueError(f"Direction of KPIs not properly defined: {self.direction}") # Set values to zero if the difference is small relative_difference = (power_flex / power_profile_base).abs() power_flex.loc[relative_difference < relative_error_acceptance] = 0 # Set values self.power_flex_full.value = power_flex self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]] def _calculate_power_flex_stats(self): """Calculate the characteristic values of the power flexibility for the offer.""" if self.power_flex_offer.value is None: raise ValueError("Power flexibility value is empty.") # Calculate characteristic values power_flex_offer_max = self.power_flex_offer.max() power_flex_offer_min = self.power_flex_offer.min() power_flex_offer_avg = self.power_flex_offer.avg() # Set values self.power_flex_offer_max.value = power_flex_offer_max self.power_flex_offer_min.value = power_flex_offer_min self.power_flex_offer_avg.value = power_flex_offer_avg def _calculate_energy_flex(self): """Calculate the energy flexibility by integrating the power flexibility of the offer window.""" if self.power_flex_offer.value is None: raise ValueError("Power flexibility value of the offer is empty.") # Calculate flexibility energy_flex = self.power_flex_offer.integrate(time_unit="hours") # Set value self.energy_flex.value = energy_flex def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float): """Calculate the costs of the flexibility event based on the electricity costs profile, the power flexibility profile and difference of stored energy. Args: electricity_price_signal: time series of the electricity price signal stored_energy_diff: the difference of the stored energy between baseline and shadow mpc """ # Calculate series self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value # Calculate scalar costs = abs(self.electricity_costs_series.integrate(time_unit="hours")) # correct the costs corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) self.costs.value = costs self.corrected_costs.value = corrected_costs def _calculate_costs_rel(self): """Calculate the relative costs of the flexibility event per energy flexibility.""" if self.energy_flex.value == 0: costs_rel = 0 corrected_costs_rel = 0 else: costs_rel = self.costs.value / self.energy_flex.value corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value # Set value self.costs_rel.value = costs_rel self.corrected_costs_rel.value = corrected_costs_rel
[docs] def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: """Get the KPIs as a dictionary with names or identifier as keys. Args: identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the KPI. Returns: A dictionary mapping desired KPI keys to KPI. """ kpi_dict = {} for kpi in vars(self).values(): if isinstance(kpi, KPI): if identifier: kpi_dict[kpi.get_kpi_identifier()] = kpi else: kpi_dict[kpi.name] = kpi return kpi_dict
[docs] def get_name_dict(self) -> dict[str, str]: """Get KPIs mapping. Returns: Dictionary of the kpis with names as keys and the identifiers as values. """ name_dict = {} for name, kpi in self.get_kpi_dict(identifier=False).items(): name_dict[name] = kpi.get_kpi_identifier() return name_dict
[docs]class FlexibilityData(pydantic.BaseModel): """Class containing the data for the calculation of the flexibility.""" # Time parameters mpc_time_grid: np.ndarray = pydantic.Field( default=None, description="Time grid of the mpcs", ) flex_offer_time_grid: np.ndarray = pydantic.Field( default=None, description="Time grid of the flexibility offer", ) switch_time: Optional[float] = pydantic.Field( default=None, description="Time of the switch between the preparation and the market time", ) # Profiles power_profile_base: pd.Series = pydantic.Field( default=None, description="Base power profile", ) power_profile_flex_neg: pd.Series = pydantic.Field( default=None, description="Power profile of the negative flexibility", ) power_profile_flex_pos: pd.Series = pydantic.Field( default=None, description="Power profile of the positive flexibility", ) stored_energy_profile_base: pd.Series = pydantic.Field( default=None, description="Base profile of the stored electrical energy", ) stored_energy_profile_flex_neg: pd.Series = pydantic.Field( default=None, description="Profile of the stored electrical energy for negative flexibility", ) stored_energy_profile_flex_pos: pd.Series = pydantic.Field( default=None, description="Profile of the stored elctrical energy for positive flexibility", ) electricity_price_series: pd.Series = pydantic.Field( default=None, description="Profile of the electricity price", ) # KPIs kpis_pos: FlexibilityKPIs = pydantic.Field( default=FlexibilityKPIs(direction="positive"), description="KPIs for positive flexibility", ) kpis_neg: FlexibilityKPIs = pydantic.Field( default=FlexibilityKPIs(direction="negative"), description="KPIs for negative flexibility", )
[docs] class Config: arbitrary_types_allowed = True
def __init__(self, prep_time: int, market_time: int, flex_event_duration: int, time_step: int, prediction_horizon: int, **data): super().__init__(**data) self.switch_time = prep_time + market_time self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step) self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step)
[docs] def format_predictor_inputs(self, series: pd.Series) -> pd.Series: """Format the input of the predictor to unify the data. Args: series: Input series from a predictor. Returns: Formatted series. """ series.index = series.index - series.index[0] series = series.reindex(self.mpc_time_grid) if any(series.isna()): raise ValueError(f"The mpc time grid is not compatible with the predictor " f"input, which leads to NaN values in the series.\n" f"MPC time grid:{self.mpc_time_grid}\n" f"Series index:{series.index}") return series
[docs] def format_mpc_inputs(self, series: pd.Series) -> pd.Series: """Format the input of the mpc to unify the data. Args: series: Input series from a mpc. Returns: Formatted series. """ series = strip_multi_index(series) if any(series.isna()): series = fill_nans(series=series, method=MEAN) series = series.reindex(self.mpc_time_grid) if any(series.isna()): raise ValueError(f"The mpc time grid is not compatible with the mpc input, " f"which leads to NaN values in the series.\n" f"MPC time grid:{self.mpc_time_grid}\n" f"Series index:{series.index}") return series
[docs] def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool): """Calculate the KPIs for the positive and negative flexibility. Args: enable_energy_costs_correction: whether the energy costs should be corrected calculate_flex_cost: whether the cost of the flexibility should be calculated """ self.kpis_pos.calculate( power_profile_base=self.power_profile_base, power_profile_shadow=self.power_profile_flex_pos, electricity_price_series=self.electricity_price_series, mpc_time_grid=self.mpc_time_grid, flex_offer_time_grid=self.flex_offer_time_grid, stored_energy_base=self.stored_energy_profile_base, stored_energy_shadow=self.stored_energy_profile_flex_pos, enable_energy_costs_correction=enable_energy_costs_correction, calculate_flex_cost=calculate_flex_cost ) self.kpis_neg.calculate( power_profile_base=self.power_profile_base, power_profile_shadow=self.power_profile_flex_neg, electricity_price_series=self.electricity_price_series, mpc_time_grid=self.mpc_time_grid, flex_offer_time_grid=self.flex_offer_time_grid, stored_energy_base=self.stored_energy_profile_base, stored_energy_shadow=self.stored_energy_profile_flex_neg, enable_energy_costs_correction=enable_energy_costs_correction, calculate_flex_cost=calculate_flex_cost )
[docs] def get_kpis(self) -> dict[str, KPI]: kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True) return kpis_dict