from typing import Union, Optional
import numpy
import pydantic
import numpy as np
import pandas as pd
from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
from agentlib_flexquant.data_structures.globals import FlexibilityDirections
from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN
[docs]
class KPI(pydantic.BaseModel):
""" Class defining attributes of the indicator KPI. """
name: str = pydantic.Field(
default=None,
description="Name of the flexibility KPI",
)
value: Union[float, None] = pydantic.Field(
default=None,
description="Value of the flexibility KPI",
)
unit: str = pydantic.Field(
default=None,
description="Unit of the flexibility KPI",
)
direction: Union[FlexibilityDirections, None] = pydantic.Field(
default=None,
description="Direction of the shadow mpc / flexibility"
)
[docs]
class Config:
arbitrary_types_allowed = True
[docs]
def get_kpi_identifier(self):
name = f"{self.direction}_{self.name}"
return name
[docs]
class KPISeries(KPI):
value: Union[pd.Series, None] = pydantic.Field(
default=None,
description="Value of the flexibility KPI",
)
dt: Union[pd.Series, None] = pydantic.Field(
default=None,
description="Time differences between the timestamps of the series in seconds",
)
def _get_dt(self) -> pd.Series:
"""
Get the time differences between the timestamps of the series.
"""
dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill()
self.dt = dt
return dt
[docs]
def min(self) -> float:
return self.value.min()
[docs]
def max(self) -> float:
return self.value.max()
[docs]
def avg(self) -> float:
"""
Calculate the average value of the KPI over time.
"""
if self.dt is None:
self._get_dt()
delta_t = self.dt.sum()
avg = self.integrate() / delta_t
return avg
[docs]
def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float:
"""
Integrate the value of the KPI over time by summing up the product of values and the time difference.
"""
if self.dt is None:
self._get_dt()
products = self.value * self.dt / TIME_CONVERSION[time_unit]
integral = products.sum()
return integral
[docs]
class FlexibilityKPIs(pydantic.BaseModel):
"""
Class defining the indicator KPIs.
"""
# Direction
direction: FlexibilityDirections = pydantic.Field(
default=None,
description="Direction of the shadow mpc"
)
# Power / energy KPIs
power_flex_full: KPISeries = pydantic.Field(
default=KPISeries(
name="power_flex_full",
unit="kW"
),
description="Power flexibility",
)
power_flex_offer: KPISeries = pydantic.Field(
default=KPISeries(
name="power_flex_offer",
unit="kW"
),
description="Power flexibility",
)
power_flex_offer_max: KPI = pydantic.Field(
default=KPI(
name="power_flex_offer_max",
unit="kW"
),
description="Maximum power flexibility",
)
power_flex_offer_min: KPI = pydantic.Field(
default=KPI(
name="power_flex_offer_min",
unit="kW"
),
description="Minimum power flexibility",
)
power_flex_offer_avg: KPI = pydantic.Field(
default=KPI(
name="power_flex_offer_avg",
unit="kW"
),
description="Average power flexibility",
)
energy_flex: KPI = pydantic.Field(
default=KPI(
name="energy_flex",
unit="kWh"
),
description="Energy flexibility equals the integral of the power flexibility",
)
power_flex_within_boundary: KPI = pydantic.Field(
default=KPI(
name="power_flex_within_boundary",
unit="-"
),
description="Variable indicating whether the baseline power and flex power align at the horizon end",
)
# Costs KPIs
electricity_costs_series: KPISeries = pydantic.Field(
default=KPISeries(
name="electricity_costs_series",
unit="ct/h"
),
description="Costs of flexibility",
)
costs: KPI = pydantic.Field(
default=KPI(
name="costs",
unit="ct"
),
description="Costs of flexibility",
)
corrected_costs: KPI = pydantic.Field(
default=KPI(
name="corrected_costs",
unit="ct"
),
description="Corrected costs of flexibility considering the stored energy in the system",
)
costs_rel: KPI = pydantic.Field(
default=KPI(
name="costs_rel",
unit="ct/kWh"
),
description="Costs of flexibility per energy",
)
corrected_costs_rel: KPI = pydantic.Field(
default=KPI(
name="corrected_costs_rel",
unit="ct/kWh"
),
description="Corrected costs of flexibility per energy",
)
def __init__(self, direction: FlexibilityDirections, **data):
super().__init__(**data)
self.direction = direction
for kpi in vars(self).values():
if isinstance(kpi, KPI):
kpi.direction = self.direction
[docs]
def calculate(
self,
power_profile_base: pd.Series,
power_profile_shadow: pd.Series,
electricity_price_series: pd.Series,
mpc_time_grid: np.ndarray,
flex_offer_time_grid: np.ndarray,
stored_energy_base: pd.Series,
stored_energy_shadow: pd.Series,
enable_energy_costs_correction: bool,
calculate_flex_cost: bool
):
"""
Calculate the KPIs based on the power and electricity input profiles.
Time grids needed for indexing of the power flexibility profiles.
"""
# Power / energy KPIs
self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid)
self._calculate_power_flex_stats()
self._calculate_energy_flex()
# Costs KPIs
if enable_energy_costs_correction:
stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1]
else:
stored_energy_diff = 0
if calculate_flex_cost:
self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff)
self._calculate_costs_rel()
def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series,
flex_offer_time_grid: np.ndarray,
relative_error_acceptance: float = 0.01) -> pd.Series:
"""
Calculate the power flexibility based on the base and flexibility power profiles.
Args:
relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero
"""
if not power_profile_shadow.index.equals(power_profile_base.index):
raise ValueError(f"Indices of power profiles do not match.\n"
f"Baseline: {power_profile_base.index}\n"
f"Shadow: {power_profile_shadow.index}")
# Calculate flexibility
if self.direction == "positive":
power_flex = power_profile_base - power_profile_shadow
elif self.direction == "negative":
power_flex = power_profile_shadow - power_profile_base
else:
raise ValueError(f"Direction of KPIs not properly defined: {self.direction}")
# Set values to zero if the difference is small
relative_difference = (power_flex / power_profile_base).abs()
power_flex.loc[relative_difference < relative_error_acceptance] = 0
# Set values
self.power_flex_full.value = power_flex
self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]]
return power_flex
def _calculate_power_flex_stats(self) -> [float]:
"""
Calculate the characteristic values of the power flexibility for the offer.
"""
if self.power_flex_offer.value is None:
raise ValueError("Power flexibility value is empty.")
# Calculate characteristic values
power_flex_offer_max = self.power_flex_offer.max()
power_flex_offer_min = self.power_flex_offer.min()
power_flex_offer_avg = self.power_flex_offer.avg()
# Set values
self.power_flex_offer_max.value = power_flex_offer_max
self.power_flex_offer_min.value = power_flex_offer_min
self.power_flex_offer_avg.value = power_flex_offer_avg
return power_flex_offer_max, power_flex_offer_min, power_flex_offer_avg
def _calculate_energy_flex(self) -> float:
"""
Calculate the energy flexibility by integrating the power flexibility of the offer window.
"""
if self.power_flex_offer.value is None:
raise ValueError("Power flexibility value of the offer is empty.")
# Calculate flexibility
energy_flex = self.power_flex_offer.integrate(time_unit="hours")
# Set value
self.energy_flex.value = energy_flex
return energy_flex
def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float) -> [float, pd.Series]:
"""
Calculate the costs of the flexibility event based on the electricity costs profile and the power flexibility profile.
"""
# Calculate series
self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value
# Calculate scalar
costs = abs(self.electricity_costs_series.integrate(time_unit="hours"))
# correct the costs
corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal)
self.costs.value = costs
self.corrected_costs.value = corrected_costs
def _calculate_costs_rel(self) -> float:
"""
Calculate the relative costs of the flexibility event per energy flexibility.
"""
if self.energy_flex.value == 0:
costs_rel = 0
corrected_costs_rel = 0
else:
costs_rel = self.costs.value / self.energy_flex.value
corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value
# Set value
self.costs_rel.value = costs_rel
self.corrected_costs_rel.value = corrected_costs_rel
[docs]
def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]:
"""
Get the KPIs as a dictionary with names or identifier as keys.
Args:
identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the kpi.
"""
kpi_dict = {}
for kpi in vars(self).values():
if isinstance(kpi, KPI):
if identifier:
kpi_dict[kpi.get_kpi_identifier()] = kpi
else:
kpi_dict[kpi.name] = kpi
return kpi_dict
[docs]
def get_name_dict(self) -> dict[str, str]:
"""
Returns:
Dictionary of the kpis with names as keys and the identifiers as values.
"""
name_dict = {}
for name, kpi in self.get_kpi_dict(identifier=False).items():
name_dict[name] = kpi.get_kpi_identifier()
return name_dict
[docs]
class FlexibilityData(pydantic.BaseModel):
"""
Class containing the data for the calculation of the flexibility.
"""
# Time parameters
mpc_time_grid: np.ndarray = pydantic.Field(
default=None,
description="Time grid of the mpcs",
)
flex_offer_time_grid: np.ndarray = pydantic.Field(
default=None,
description="Time grid of the flexibility offer",
)
switch_time: Optional[float] = pydantic.Field(
default=None,
description="Time of the switch between the preparation and the market time",
)
# Profiles
power_profile_base: pd.Series = pydantic.Field(
default=None,
description="Base power profile",
)
power_profile_flex_neg: pd.Series = pydantic.Field(
default=None,
description="Power profile of the negative flexibility",
)
power_profile_flex_pos: pd.Series = pydantic.Field(
default=None,
description="Power profile of the positive flexibility",
)
stored_energy_profile_base: pd.Series = pydantic.Field(
default=None,
description="Base profile of the stored electrical energy",
)
stored_energy_profile_flex_neg: pd.Series = pydantic.Field(
default=None,
description="Profile of the stored electrical energy for negative flexibility",
)
stored_energy_profile_flex_pos: pd.Series = pydantic.Field(
default=None,
description="Profile of the stored elctrical energy for positive flexibility",
)
electricity_price_series: pd.Series = pydantic.Field(
default=None,
description="Profile of the electricity price",
)
# KPIs
kpis_pos: FlexibilityKPIs = pydantic.Field(
default=FlexibilityKPIs(direction="positive"),
description="KPIs for positive flexibility",
)
kpis_neg: FlexibilityKPIs = pydantic.Field(
default=FlexibilityKPIs(direction="negative"),
description="KPIs for negative flexibility",
)
[docs]
class Config:
arbitrary_types_allowed = True
def __init__(self, prep_time: int, market_time: int, flex_event_duration: int,
time_step: int, prediction_horizon: int, **data):
super().__init__(**data)
self.switch_time = prep_time + market_time
self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step)
self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step)
[docs]
def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool) -> [FlexibilityKPIs, FlexibilityKPIs]:
"""
Calculate the KPIs for the positive and negative flexibility.
Returns:
positive KPIs, negative KPIs
"""
self.kpis_pos.calculate(
power_profile_base=self.power_profile_base,
power_profile_shadow=self.power_profile_flex_pos,
electricity_price_series=self.electricity_price_series,
mpc_time_grid=self.mpc_time_grid,
flex_offer_time_grid=self.flex_offer_time_grid,
stored_energy_base=self.stored_energy_profile_base,
stored_energy_shadow=self.stored_energy_profile_flex_pos,
enable_energy_costs_correction=enable_energy_costs_correction,
calculate_flex_cost=calculate_flex_cost
)
self.kpis_neg.calculate(
power_profile_base=self.power_profile_base,
power_profile_shadow=self.power_profile_flex_neg,
electricity_price_series=self.electricity_price_series,
mpc_time_grid=self.mpc_time_grid,
flex_offer_time_grid=self.flex_offer_time_grid,
stored_energy_base=self.stored_energy_profile_base,
stored_energy_shadow=self.stored_energy_profile_flex_neg,
enable_energy_costs_correction=enable_energy_costs_correction,
calculate_flex_cost=calculate_flex_cost
)
return self.kpis_pos, self.kpis_neg
[docs]
def get_kpis(self) -> dict[str, KPI]:
kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True)
return kpis_dict