import os
from pathlib import Path
from typing import List, Optional, Union
import agentlib
import numpy as np
import pandas as pd
import pydantic
from agentlib.core.errors import ConfigurationError
from pydantic import model_validator
from agentlib_flexquant.data_structures.flex_offer import OfferStatus
from agentlib_flexquant.data_structures.market import (
MarketSpecifications
)
[docs]
class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
# parameters: List[agentlib.AgentVariable] = [
# ]
model_config = pydantic.ConfigDict(
extra='forbid'
)
inputs: List[agentlib.AgentVariable] = [
agentlib.AgentVariable(name="FlexibilityOffer")
]
outputs: List[agentlib.AgentVariable] = [
agentlib.AgentVariable(
name="_P_external", alias="_P_external",
description="External Power IO"
),
agentlib.AgentVariable(
name="rel_start", alias="rel_start",
description="relative start time of the flexibility event"
),
agentlib.AgentVariable(
name="rel_end", alias="rel_end",
description="relative end time of the flexibility event"
),
agentlib.AgentVariable(
name="in_provision", alias="in_provision",
description="Set if the system is in provision", value=False
)
]
market_specs: MarketSpecifications
results_file: Optional[Path] = pydantic.Field(
default=Path("flexibility_market.csv"),
description="User specified results file name"
)
save_results: Optional[bool] = pydantic.Field(
validate_default=True,
default=True
)
shared_variable_fields: List[str] = ["outputs"]
[docs]
@model_validator(mode="after")
def check_results_file_extension(self):
if self.results_file and self.results_file.suffix != ".csv":
raise ValueError(
f"Invalid file extension for 'results_file': '{self.results_file}'. "
f"Expected a '.csv' file."
)
return self
[docs]
class FlexibilityMarketModule(agentlib.BaseModule):
"""Class to emulate flexibility market. Receives flex offers and accepts these.
"""
config: FlexibilityMarketModuleConfig
df: pd.DataFrame = None
end: Union[int, float] = 0
[docs]
def set_random_seed(self, random_seed):
"""set the random seed for reproducability"""
self.random_generator = np.random.default_rng(seed=random_seed)
[docs]
def get_results(self) -> Optional[pd.DataFrame]:
"""
Opens results file of flexibilityindicators.py
results_file defined in __init__
"""
results_file = self.config.results_file
try:
results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
return results
except FileNotFoundError:
self.logger.error("Results file %s was not found.", results_file)
return None
[docs]
def register_callbacks(self):
if self.config.market_specs.type == "custom":
callback_function = self.custom_flexibility_callback
elif self.config.market_specs.type == "single":
callback_function = self.single_flexibility_callback
elif self.config.market_specs.type == "random":
callback_function = self.random_flexibility_callback
self.set_random_seed(self.config.market_specs.options.random_seed)
else:
self.logger.error("No market type defined. Available market types are single, random "
"and custom. Code will proceed without market interaction.")
callback_function = self.dummy_callback
self.agent.data_broker.register_callback(
name="FlexibilityOffer", alias="FlexibilityOffer",
callback=callback_function
)
self.df = None
self.cooldown_ticker = 0
[docs]
def write_results(self, offer):
if self.df is None:
self.df = pd.DataFrame()
df = offer.as_dataframe()
index_first_level = [self.env.now] * len(df.index)
multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
self.df = pd.concat((self.df, df.set_index(multi_index)))
indices = pd.MultiIndex.from_tuples(self.df.index, names=["time_step", "time"])
self.df.set_index(indices, inplace=True)
if self.config.save_results:
self.df.to_csv(self.config.results_file)
[docs]
def random_flexibility_callback(self, inp, name):
"""
When a flexibility offer is sent this function is called.
The offer is accepted randomly. The factor self.offer_acceptance_rate determines the
random factor for offer acceptance. self.pos_neg_rate is the random factor for
the direction of the flexibility. A higher rate means that more positive offers will be accepted.
Constraints:
cooldown: during $cooldown steps after a flexibility event no offer is accepted
minimum_average_flex: min amount of flexibility to be accepted, to account for the model error
"""
offer = inp.value
# check if there is a flexibility provision and the cooldown is finished
if not self.get("in_provision").value and self.cooldown_ticker == 0:
if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate:
profile = None
# if random value is below pos_neg_rate, positive offer is accepted.
# Otherwise, negative offer
if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate:
if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.accepted_positive.value
elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.accepted_negative.value
if profile is not None:
profile = profile.dropna()
profile.index += self.env.time
self.set("_P_external", profile)
self.end = profile.index[-1]
self.set("in_provision", True)
self.cooldown_ticker = self.config.market_specs.cooldown
elif self.cooldown_ticker > 0:
self.cooldown_ticker -= 1
self.write_results(offer)
[docs]
def single_flexibility_callback(self, inp, name):
"""Callback to activate a single, predefined flexibility offer.
"""
offer = inp.value
profile = None
t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0]
acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time
acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample
if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value:
if self.config.market_specs.options.direction == "positive":
if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile - offer.pos_diff_profile
offer.status = OfferStatus.accepted_positive.value
elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
profile = offer.base_power_profile + offer.neg_diff_profile
offer.status = OfferStatus.accepted_negative.value
if profile is not None:
profile = profile.dropna()
profile.index += self.env.time
self.set("_P_external", profile)
self.end = profile.index[-1]
self.set("in_provision", True)
self.write_results(offer)
[docs]
def custom_flexibility_callback(self, inp, name):
"""Placeholder for a custom flexibility callback"""
pass
[docs]
def dummy_callback(self, inp, name):
"""Dummy function, that is included, when market type is not specified"""
self.logger.warning("No market type provided. No market interaction.")
[docs]
def cleanup_results(self):
results_file = self.config.results_file
if not results_file:
return
os.remove(results_file)
[docs]
def process(self):
while True:
# End the provision at the appropriate time
if self.end < self.env.time:
self.set("in_provision", False)
yield self.env.timeout(self.env.config.t_sample)