Source code for agentlib_flexquant.data_structures.flex_kpis

from typing import Union, Optional

import numpy
import pydantic
import numpy as np
import pandas as pd

from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
from agentlib_flexquant.data_structures.globals import FlexibilityDirections
from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN


[docs] class KPI(pydantic.BaseModel): """ Class defining attributes of the indicator KPI. """ name: str = pydantic.Field( default=None, description="Name of the flexibility KPI", ) value: Union[float, None] = pydantic.Field( default=None, description="Value of the flexibility KPI", ) unit: str = pydantic.Field( default=None, description="Unit of the flexibility KPI", ) direction: Union[FlexibilityDirections, None] = pydantic.Field( default=None, description="Direction of the shadow mpc / flexibility" )
[docs] class Config: arbitrary_types_allowed = True
[docs] def get_kpi_identifier(self): name = f"{self.direction}_{self.name}" return name
[docs] class KPISeries(KPI): value: Union[pd.Series, None] = pydantic.Field( default=None, description="Value of the flexibility KPI", ) dt: Union[pd.Series, None] = pydantic.Field( default=None, description="Time differences between the timestamps of the series in seconds", ) def _get_dt(self) -> pd.Series: """ Get the time differences between the timestamps of the series. """ dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill() self.dt = dt return dt
[docs] def min(self) -> float: return self.value.min()
[docs] def max(self) -> float: return self.value.max()
[docs] def avg(self) -> float: """ Calculate the average value of the KPI over time. """ if self.dt is None: self._get_dt() delta_t = self.dt.sum() avg = self.integrate() / delta_t return avg
[docs] def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: """ Integrate the value of the KPI over time by summing up the product of values and the time difference. """ if self.dt is None: self._get_dt() products = self.value * self.dt / TIME_CONVERSION[time_unit] integral = products.sum() return integral
[docs] class FlexibilityKPIs(pydantic.BaseModel): """ Class defining the indicator KPIs. """ # Direction direction: FlexibilityDirections = pydantic.Field( default=None, description="Direction of the shadow mpc" ) # Power / energy KPIs power_flex_full: KPISeries = pydantic.Field( default=KPISeries( name="power_flex_full", unit="kW" ), description="Power flexibility", ) power_flex_offer: KPISeries = pydantic.Field( default=KPISeries( name="power_flex_offer", unit="kW" ), description="Power flexibility", ) power_flex_offer_max: KPI = pydantic.Field( default=KPI( name="power_flex_offer_max", unit="kW" ), description="Maximum power flexibility", ) power_flex_offer_min: KPI = pydantic.Field( default=KPI( name="power_flex_offer_min", unit="kW" ), description="Minimum power flexibility", ) power_flex_offer_avg: KPI = pydantic.Field( default=KPI( name="power_flex_offer_avg", unit="kW" ), description="Average power flexibility", ) energy_flex: KPI = pydantic.Field( default=KPI( name="energy_flex", unit="kWh" ), description="Energy flexibility equals the integral of the power flexibility", ) power_flex_within_boundary: KPI = pydantic.Field( default=KPI( name="power_flex_within_boundary", unit="-" ), description="Variable indicating whether the baseline power and flex power align at the horizon end", ) # Costs KPIs electricity_costs_series: KPISeries = pydantic.Field( default=KPISeries( name="electricity_costs_series", unit="ct/h" ), description="Costs of flexibility", ) costs: KPI = pydantic.Field( default=KPI( name="costs", unit="ct" ), description="Costs of flexibility", ) corrected_costs: KPI = pydantic.Field( default=KPI( name="corrected_costs", unit="ct" ), description="Corrected costs of flexibility considering the stored energy in the system", ) costs_rel: KPI = pydantic.Field( default=KPI( name="costs_rel", unit="ct/kWh" ), description="Costs of flexibility per energy", ) corrected_costs_rel: KPI = pydantic.Field( default=KPI( name="corrected_costs_rel", unit="ct/kWh" ), description="Corrected costs of flexibility per energy", ) def __init__(self, direction: FlexibilityDirections, **data): super().__init__(**data) self.direction = direction for kpi in vars(self).values(): if isinstance(kpi, KPI): kpi.direction = self.direction
[docs] def calculate( self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, electricity_price_series: pd.Series, mpc_time_grid: np.ndarray, flex_offer_time_grid: np.ndarray, stored_energy_base: pd.Series, stored_energy_shadow: pd.Series, enable_energy_costs_correction: bool, calculate_flex_cost: bool ): """ Calculate the KPIs based on the power and electricity input profiles. Time grids needed for indexing of the power flexibility profiles. """ # Power / energy KPIs self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid) self._calculate_power_flex_stats() self._calculate_energy_flex() # Costs KPIs if enable_energy_costs_correction: stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] else: stored_energy_diff = 0 if calculate_flex_cost: self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff) self._calculate_costs_rel()
def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, flex_offer_time_grid: np.ndarray, relative_error_acceptance: float = 0.01) -> pd.Series: """ Calculate the power flexibility based on the base and flexibility power profiles. Args: relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero """ if not power_profile_shadow.index.equals(power_profile_base.index): raise ValueError(f"Indices of power profiles do not match.\n" f"Baseline: {power_profile_base.index}\n" f"Shadow: {power_profile_shadow.index}") # Calculate flexibility if self.direction == "positive": power_flex = power_profile_base - power_profile_shadow elif self.direction == "negative": power_flex = power_profile_shadow - power_profile_base else: raise ValueError(f"Direction of KPIs not properly defined: {self.direction}") # Set values to zero if the difference is small relative_difference = (power_flex / power_profile_base).abs() power_flex.loc[relative_difference < relative_error_acceptance] = 0 # Set values self.power_flex_full.value = power_flex self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]] return power_flex def _calculate_power_flex_stats(self) -> [float]: """ Calculate the characteristic values of the power flexibility for the offer. """ if self.power_flex_offer.value is None: raise ValueError("Power flexibility value is empty.") # Calculate characteristic values power_flex_offer_max = self.power_flex_offer.max() power_flex_offer_min = self.power_flex_offer.min() power_flex_offer_avg = self.power_flex_offer.avg() # Set values self.power_flex_offer_max.value = power_flex_offer_max self.power_flex_offer_min.value = power_flex_offer_min self.power_flex_offer_avg.value = power_flex_offer_avg return power_flex_offer_max, power_flex_offer_min, power_flex_offer_avg def _calculate_energy_flex(self) -> float: """ Calculate the energy flexibility by integrating the power flexibility of the offer window. """ if self.power_flex_offer.value is None: raise ValueError("Power flexibility value of the offer is empty.") # Calculate flexibility energy_flex = self.power_flex_offer.integrate(time_unit="hours") # Set value self.energy_flex.value = energy_flex return energy_flex def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float) -> [float, pd.Series]: """ Calculate the costs of the flexibility event based on the electricity costs profile and the power flexibility profile. """ # Calculate series self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value # Calculate scalar costs = abs(self.electricity_costs_series.integrate(time_unit="hours")) # correct the costs corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) self.costs.value = costs self.corrected_costs.value = corrected_costs def _calculate_costs_rel(self) -> float: """ Calculate the relative costs of the flexibility event per energy flexibility. """ if self.energy_flex.value == 0: costs_rel = 0 corrected_costs_rel = 0 else: costs_rel = self.costs.value / self.energy_flex.value corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value # Set value self.costs_rel.value = costs_rel self.corrected_costs_rel.value = corrected_costs_rel
[docs] def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: """ Get the KPIs as a dictionary with names or identifier as keys. Args: identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the kpi. """ kpi_dict = {} for kpi in vars(self).values(): if isinstance(kpi, KPI): if identifier: kpi_dict[kpi.get_kpi_identifier()] = kpi else: kpi_dict[kpi.name] = kpi return kpi_dict
[docs] def get_name_dict(self) -> dict[str, str]: """ Returns: Dictionary of the kpis with names as keys and the identifiers as values. """ name_dict = {} for name, kpi in self.get_kpi_dict(identifier=False).items(): name_dict[name] = kpi.get_kpi_identifier() return name_dict
[docs] class FlexibilityData(pydantic.BaseModel): """ Class containing the data for the calculation of the flexibility. """ # Time parameters mpc_time_grid: np.ndarray = pydantic.Field( default=None, description="Time grid of the mpcs", ) flex_offer_time_grid: np.ndarray = pydantic.Field( default=None, description="Time grid of the flexibility offer", ) switch_time: Optional[float] = pydantic.Field( default=None, description="Time of the switch between the preparation and the market time", ) # Profiles power_profile_base: pd.Series = pydantic.Field( default=None, description="Base power profile", ) power_profile_flex_neg: pd.Series = pydantic.Field( default=None, description="Power profile of the negative flexibility", ) power_profile_flex_pos: pd.Series = pydantic.Field( default=None, description="Power profile of the positive flexibility", ) stored_energy_profile_base: pd.Series = pydantic.Field( default=None, description="Base profile of the stored electrical energy", ) stored_energy_profile_flex_neg: pd.Series = pydantic.Field( default=None, description="Profile of the stored electrical energy for negative flexibility", ) stored_energy_profile_flex_pos: pd.Series = pydantic.Field( default=None, description="Profile of the stored elctrical energy for positive flexibility", ) electricity_price_series: pd.Series = pydantic.Field( default=None, description="Profile of the electricity price", ) # KPIs kpis_pos: FlexibilityKPIs = pydantic.Field( default=FlexibilityKPIs(direction="positive"), description="KPIs for positive flexibility", ) kpis_neg: FlexibilityKPIs = pydantic.Field( default=FlexibilityKPIs(direction="negative"), description="KPIs for negative flexibility", )
[docs] class Config: arbitrary_types_allowed = True
def __init__(self, prep_time: int, market_time: int, flex_event_duration: int, time_step: int, prediction_horizon: int, **data): super().__init__(**data) self.switch_time = prep_time + market_time self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step) self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step)
[docs] def format_predictor_inputs(self, series: pd.Series) -> pd.Series: """ Format the input of the predictor to unify the data. Args: series: Input series from a predictor. Returns: Formatted series. """ series.index = series.index - series.index[0] series = series.reindex(self.mpc_time_grid) if any(series.isna()): raise ValueError(f"The mpc time grid is not compatible with the predictor " f"input, which leads to NaN values in the series.\n" f"MPC time grid:{self.mpc_time_grid}\n" f"Series index:{series.index}") return series
[docs] def format_mpc_inputs(self, series: pd.Series) -> pd.Series: """ Format the input of the mpc to unify the data. Args: series: Input series from a mpc. Returns: Formatted series. """ series = strip_multi_index(series) if any(series.isna()): series = fill_nans(series=series, method=MEAN) series = series.reindex(self.mpc_time_grid) if any(series.isna()): raise ValueError(f"The mpc time grid is not compatible with the mpc input, " f"which leads to NaN values in the series.\n" f"MPC time grid:{self.mpc_time_grid}\n" f"Series index:{series.index}") return series
[docs] def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool) -> [FlexibilityKPIs, FlexibilityKPIs]: """ Calculate the KPIs for the positive and negative flexibility. Returns: positive KPIs, negative KPIs """ self.kpis_pos.calculate( power_profile_base=self.power_profile_base, power_profile_shadow=self.power_profile_flex_pos, electricity_price_series=self.electricity_price_series, mpc_time_grid=self.mpc_time_grid, flex_offer_time_grid=self.flex_offer_time_grid, stored_energy_base=self.stored_energy_profile_base, stored_energy_shadow=self.stored_energy_profile_flex_pos, enable_energy_costs_correction=enable_energy_costs_correction, calculate_flex_cost=calculate_flex_cost ) self.kpis_neg.calculate( power_profile_base=self.power_profile_base, power_profile_shadow=self.power_profile_flex_neg, electricity_price_series=self.electricity_price_series, mpc_time_grid=self.mpc_time_grid, flex_offer_time_grid=self.flex_offer_time_grid, stored_energy_base=self.stored_energy_profile_base, stored_energy_shadow=self.stored_energy_profile_flex_neg, enable_energy_costs_correction=enable_energy_costs_correction, calculate_flex_cost=calculate_flex_cost ) return self.kpis_pos, self.kpis_neg
[docs] def get_kpis(self) -> dict[str, KPI]: kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True) return kpis_dict