import os
from pathlib import Path
from typing import Optional, Union
import agentlib
import numpy as np
import pandas as pd
import agentlib_flexquant.data_structures.globals as glbs
from agentlib.core.datamodels import AgentVariable
from pydantic import ConfigDict, Field, model_validator
from agentlib_flexquant.data_structures.flex_offer import FlexOffer, OfferStatus
from agentlib_flexquant.data_structures.market import MarketSpecifications
[docs]class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
model_config = ConfigDict(extra="forbid")
inputs: list[AgentVariable] = [AgentVariable(name="FlexibilityOffer")]
outputs: list[AgentVariable] = [
AgentVariable(
name=glbs.ACCEPTED_POWER_VAR_NAME, alias=glbs.ACCEPTED_POWER_VAR_NAME,
description="External Power IO"
),
AgentVariable(
name=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
alias=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
description="relative start time of the flexibility event",
),
AgentVariable(
name=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
alias=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
description="relative end time of the flexibility event",
),
AgentVariable(
name=glbs.PROVISION_VAR_NAME,
alias=glbs.PROVISION_VAR_NAME,
description="Set if the system is in provision",
value=False,
),
]
parameters: list[AgentVariable] = [
AgentVariable(name=glbs.COLLOCATION_TIME_GRID, alias=glbs.COLLOCATION_TIME_GRID,
description="Time grid of the mpc model output"),
AgentVariable(name=glbs.TIME_STEP, unit="s", description="Time step of the mpc")
]
market_specs: MarketSpecifications
results_file: Optional[Path] = Field(
default=Path("flexibility_market.csv"),
description="User specified results file name",
)
save_results: Optional[bool] = Field(validate_default=True, default=True)
shared_variable_fields: list[str] = ["outputs"]
[docs] @model_validator(mode="after")
def check_results_file_extension(self):
if self.results_file and self.results_file.suffix != ".csv":
raise ValueError(
f"Invalid file extension for 'results_file': '{self.results_file}'. "
f"Expected a '.csv' file."
)
return self
[docs]class FlexibilityMarketModule(agentlib.BaseModule):
"""Class to emulate flexibility market. Receives flex offers and accepts these."""
config: FlexibilityMarketModuleConfig
# DataFrame for flex offer. Multiindex: (time_step, time).
# Columns: pos_price, neg_price, status
flex_offer_df: pd.DataFrame = None
# absolute end time of a flexibility event (now + relative end time
# of the flexibility event on the mpc horizon)
abs_flex_event_end: Union[int, float] = 0
[docs] def set_random_seed(self, random_seed: int):
"""Set the random seed for reproducibility."""
self.random_generator = np.random.default_rng(seed=random_seed)
[docs] def get_results(self) -> Optional[pd.DataFrame]:
"""Open results file of flexibility_indicators.py."""
results_file = self.config.results_file
try:
results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
return results
except FileNotFoundError:
self.logger.error("Results file %s was not found.", results_file)
return None
[docs] def register_callbacks(self):
if self.config.market_specs.type == "custom":
callback_function = self.custom_flexibility_callback
elif self.config.market_specs.type == "single":
callback_function = self.single_flexibility_callback
elif self.config.market_specs.type == "random":
callback_function = self.random_flexibility_callback
self.set_random_seed(self.config.market_specs.options.random_seed)
else:
self.logger.error(
"No market type defined. Available market types are single, random "
"and custom. Code will proceed without market interaction."
)
callback_function = self.dummy_callback
self.agent.data_broker.register_callback(
name="FlexibilityOffer",
alias="FlexibilityOffer",
callback=callback_function,
)
self.flex_offer_df = None
self.cooldown_ticker = 0
[docs] def write_results(self, offer: FlexOffer):
"""Save the flex offer results depending on the config."""
if self.flex_offer_df is None:
self.flex_offer_df = pd.DataFrame()
df = offer.as_dataframe()
index_first_level = [self.env.now] * len(df.index)
multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index)))
indices = pd.MultiIndex.from_tuples(
self.flex_offer_df.index, names=["time_step", "time"]
)
self.flex_offer_df.set_index(indices, inplace=True)
if self.config.save_results:
self.flex_offer_df.to_csv(self.config.results_file)
[docs] def random_flexibility_callback(self, inp: AgentVariable, name: str):
"""When a flexibility offer is sent, this function is called.
The offer is accepted randomly. The factor self.offer_acceptance_rate determines
the random factor for offer acceptance.
self.pos_neg_rate is the random factor for the direction of the flexibility.
A higher rate means that more positive offers will be accepted.
Constraints:
cooldown: during $cooldown steps after a flexibility event no offer is
accepted
minimum_average_flex: min amount of flexibility to be accepted,
to account for the model error
"""
offer = inp.value
# check if there is a flexibility provision and the cooldown is finished
if not self.get(glbs.PROVISION_VAR_NAME).value and self.cooldown_ticker == 0:
if (
self.random_generator.random()
< self.config.market_specs.options.offer_acceptance_rate
):
profile = None
# if random value is below pos_neg_rate, positive offer is accepted.
# Otherwise, negative offer
if (
self.random_generator.random()
< self.config.market_specs.options.pos_neg_rate
):
if (
np.average(offer.pos_diff_profile)
> self.config.market_specs.minimum_average_flex
):
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.ACCEPTED_POSITIVE.value
elif (
np.average(offer.neg_diff_profile)
> self.config.market_specs.minimum_average_flex
):
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
if profile is not None:
# reindex the profile to the mpc output time grid
flex_power_feedback_method = (
self.config.market_specs.accepted_offer_sample_points)
if flex_power_feedback_method == glbs.COLLOCATION:
profile = profile.reindex(
self.get(glbs.COLLOCATION_TIME_GRID).value)
elif flex_power_feedback_method == glbs.CONSTANT:
index_to_keep = ~np.isin(
profile.index, self.get(glbs.COLLOCATION_TIME_GRID).value)
profile = profile.get(index_to_keep)
helper_indices = [i - 1 for i in profile.index[1:]]
new_index = sorted(set(profile.index.tolist() +
helper_indices))[:-1]
profile = profile.reindex(new_index).ffill()
profile = profile.dropna()
profile.index += self.env.time
self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile)
self.abs_flex_event_end = profile.index[-1]
self.set(glbs.PROVISION_VAR_NAME, True)
self.cooldown_ticker = self.config.market_specs.cooldown
elif self.cooldown_ticker > 0:
self.cooldown_ticker -= 1
self.write_results(offer)
[docs] def single_flexibility_callback(self, inp: AgentVariable, name: str):
"""Callback to activate a single, predefined flexibility offer."""
offer = inp.value
profile = None
t_sample = self.get(glbs.TIME_STEP).value
acceptance_time_lower = (
self.env.config.offset +
self.config.market_specs.options.offer_acceptance_time
)
acceptance_time_upper = (
self.env.config.offset
+ self.config.market_specs.options.offer_acceptance_time
+ t_sample
)
if (
acceptance_time_lower <= self.env.now < acceptance_time_upper
and not self.get(glbs.PROVISION_VAR_NAME).value
):
if self.config.market_specs.options.direction == "positive":
if (
np.average(offer.pos_diff_profile)
> self.config.market_specs.minimum_average_flex
):
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.ACCEPTED_POSITIVE.value
elif (
np.average(offer.neg_diff_profile)
> self.config.market_specs.minimum_average_flex
):
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
if profile is not None:
# reindex the profile to the mpc output time grid
flex_power_feedback_method = (
self.config.market_specs.accepted_offer_sample_points)
if flex_power_feedback_method == glbs.COLLOCATION:
profile = profile.reindex(self.get(
glbs.COLLOCATION_TIME_GRID).value)
elif flex_power_feedback_method == glbs.CONSTANT:
index_to_keep = ~np.isin(profile.index,
self.get(glbs.COLLOCATION_TIME_GRID).value)
profile = profile.get(index_to_keep)
helper_indices = [i - 1 for i in profile.index[1:]]
new_index = sorted(set(profile.index.tolist() +
helper_indices))[:-1]
profile = profile.reindex(new_index).ffill()
profile = profile.dropna()
profile.index += self.env.time
self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile)
self.abs_flex_event_end = profile.index[-1]
self.set(glbs.PROVISION_VAR_NAME, True)
self.write_results(offer)
[docs] def custom_flexibility_callback(self, inp: AgentVariable, name: str):
"""Placeholder for a custom flexibility callback."""
pass
[docs] def dummy_callback(self, inp: AgentVariable, name: str):
"""Dummy function that is included, when market type is not specified."""
self.logger.warning("No market type provided. No market interaction.")
[docs] def cleanup_results(self):
"""Remove the results if they already exist."""
results_file = self.config.results_file
if not results_file:
return
os.remove(results_file)
[docs] def process(self):
while True:
# End the provision at the appropriate time
if self.abs_flex_event_end <= self.env.time:
self.set(glbs.PROVISION_VAR_NAME, False)
yield self.env.timeout(self.env.config.t_sample)