import os
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Optional, Union
from pydantic import model_validator, ConfigDict, Field
import agentlib
from agentlib.core.datamodels import AgentVariable
from agentlib_flexquant.data_structures.flex_offer import OfferStatus, FlexOffer
from agentlib_flexquant.data_structures.market import MarketSpecifications
[docs]class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
model_config = ConfigDict(
extra='forbid'
)
inputs: list[AgentVariable] = [
AgentVariable(name="FlexibilityOffer")
]
outputs: list[AgentVariable] = [
AgentVariable(
name="_P_external", alias="_P_external",
description="External Power IO"
),
AgentVariable(
name="rel_start", alias="rel_start",
description="relative start time of the flexibility event"
),
AgentVariable(
name="rel_end", alias="rel_end",
description="relative end time of the flexibility event"
),
AgentVariable(
name="in_provision", alias="in_provision",
description="Set if the system is in provision", value=False
)
]
market_specs: MarketSpecifications
results_file: Optional[Path] = Field(
default=Path("flexibility_market.csv"),
description="User specified results file name"
)
save_results: Optional[bool] = Field(
validate_default=True,
default=True
)
shared_variable_fields: list[str] = ["outputs"]
[docs] @model_validator(mode="after")
def check_results_file_extension(self):
if self.results_file and self.results_file.suffix != ".csv":
raise ValueError(
f"Invalid file extension for 'results_file': '{self.results_file}'. "
f"Expected a '.csv' file."
)
return self
[docs]class FlexibilityMarketModule(agentlib.BaseModule):
"""Class to emulate flexibility market. Receives flex offers and accepts these."""
config: FlexibilityMarketModuleConfig
# DataFrame for flex offer. Multiindex: (time_step, time). Columns: pos_price, neg_price, status
flex_offer_df: pd.DataFrame = None
# absolute end time of a flexibility event (now + relative end time of the flexibility event on the mpc horizon)
abs_flex_event_end: Union[int, float] = 0
[docs] def set_random_seed(self, random_seed: int):
"""Set the random seed for reproducibility."""
self.random_generator = np.random.default_rng(seed=random_seed)
[docs] def get_results(self) -> Optional[pd.DataFrame]:
"""Open results file of flexibility_indicators.py."""
results_file = self.config.results_file
try:
results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
return results
except FileNotFoundError:
self.logger.error("Results file %s was not found.", results_file)
return None
[docs] def register_callbacks(self):
if self.config.market_specs.type == "custom":
callback_function = self.custom_flexibility_callback
elif self.config.market_specs.type == "single":
callback_function = self.single_flexibility_callback
elif self.config.market_specs.type == "random":
callback_function = self.random_flexibility_callback
self.set_random_seed(self.config.market_specs.options.random_seed)
else:
self.logger.error("No market type defined. Available market types are single, random "
"and custom. Code will proceed without market interaction.")
callback_function = self.dummy_callback
self.agent.data_broker.register_callback(
name="FlexibilityOffer", alias="FlexibilityOffer",
callback=callback_function
)
self.flex_offer_df = None
self.cooldown_ticker = 0
[docs] def write_results(self, offer: FlexOffer):
"""Save the flex offer results depending on the config."""
if self.flex_offer_df is None:
self.flex_offer_df = pd.DataFrame()
df = offer.as_dataframe()
index_first_level = [self.env.now] * len(df.index)
multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index)))
indices = pd.MultiIndex.from_tuples(self.flex_offer_df.index, names=["time_step", "time"])
self.flex_offer_df.set_index(indices, inplace=True)
if self.config.save_results:
self.flex_offer_df.to_csv(self.config.results_file)
[docs] def random_flexibility_callback(self, inp: AgentVariable, name: str):
"""When a flexibility offer is sent, this function is called.
The offer is accepted randomly. The factor self.offer_acceptance_rate determines the random factor for offer acceptance.
self.pos_neg_rate is the random factor for the direction of the flexibility.
A higher rate means that more positive offers will be accepted.
Constraints:
cooldown: during $cooldown steps after a flexibility event no offer is accepted
minimum_average_flex: min amount of flexibility to be accepted, to account for the model error
"""
offer = inp.value
# check if there is a flexibility provision and the cooldown is finished
if not self.get("in_provision").value and self.cooldown_ticker == 0:
if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate:
profile = None
# if random value is below pos_neg_rate, positive offer is accepted.
# Otherwise, negative offer
if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate:
if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.accepted_positive.value
elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.accepted_negative.value
if profile is not None:
profile = profile.dropna()
profile.index += self.env.time
self.set("_P_external", profile)
self.abs_flex_event_end = profile.index[-1]
self.set("in_provision", True)
self.cooldown_ticker = self.config.market_specs.cooldown
elif self.cooldown_ticker > 0:
self.cooldown_ticker -= 1
self.write_results(offer)
[docs] def single_flexibility_callback(self, inp: AgentVariable, name: str):
"""Callback to activate a single, predefined flexibility offer."""
offer = inp.value
profile = None
t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0]
acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time
acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample
if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value:
if self.config.market_specs.options.direction == "positive":
if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.accepted_positive.value
elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.accepted_negative.value
if profile is not None:
profile = profile.dropna()
profile.index += self.env.time
self.set("_P_external", profile)
self.abs_flex_event_end = profile.index[-1]
self.set("in_provision", True)
self.write_results(offer)
[docs] def custom_flexibility_callback(self, inp: AgentVariable, name: str):
"""Placeholder for a custom flexibility callback."""
pass
[docs] def dummy_callback(self, inp: AgentVariable, name: str):
"""Dummy function that is included, when market type is not specified."""
self.logger.warning("No market type provided. No market interaction.")
[docs] def cleanup_results(self):
"""Remove the results if they already exist."""
results_file = self.config.results_file
if not results_file:
return
os.remove(results_file)
[docs] def process(self):
while True:
# End the provision at the appropriate time
if self.abs_flex_event_end < self.env.time:
self.set("in_provision", False)
yield self.env.timeout(self.env.config.t_sample)