Source code for agentlib_flexquant.modules.flexibility_market

import os
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Optional, Union
from pydantic import model_validator, ConfigDict, Field
import agentlib
from agentlib.core.datamodels import AgentVariable
from agentlib_flexquant.data_structures.flex_offer import OfferStatus, FlexOffer
from agentlib_flexquant.data_structures.market import MarketSpecifications


[docs]class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig): model_config = ConfigDict( extra='forbid' ) inputs: list[AgentVariable] = [ AgentVariable(name="FlexibilityOffer") ] outputs: list[AgentVariable] = [ AgentVariable( name="_P_external", alias="_P_external", description="External Power IO" ), AgentVariable( name="rel_start", alias="rel_start", description="relative start time of the flexibility event" ), AgentVariable( name="rel_end", alias="rel_end", description="relative end time of the flexibility event" ), AgentVariable( name="in_provision", alias="in_provision", description="Set if the system is in provision", value=False ) ] market_specs: MarketSpecifications results_file: Optional[Path] = Field( default=Path("flexibility_market.csv"), description="User specified results file name" ) save_results: Optional[bool] = Field( validate_default=True, default=True ) shared_variable_fields: list[str] = ["outputs"]
[docs] @model_validator(mode="after") def check_results_file_extension(self): if self.results_file and self.results_file.suffix != ".csv": raise ValueError( f"Invalid file extension for 'results_file': '{self.results_file}'. " f"Expected a '.csv' file." ) return self
[docs]class FlexibilityMarketModule(agentlib.BaseModule): """Class to emulate flexibility market. Receives flex offers and accepts these.""" config: FlexibilityMarketModuleConfig # DataFrame for flex offer. Multiindex: (time_step, time). Columns: pos_price, neg_price, status flex_offer_df: pd.DataFrame = None # absolute end time of a flexibility event (now + relative end time of the flexibility event on the mpc horizon) abs_flex_event_end: Union[int, float] = 0
[docs] def set_random_seed(self, random_seed: int): """Set the random seed for reproducibility.""" self.random_generator = np.random.default_rng(seed=random_seed)
[docs] def get_results(self) -> Optional[pd.DataFrame]: """Open results file of flexibility_indicators.py.""" results_file = self.config.results_file try: results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) return results except FileNotFoundError: self.logger.error("Results file %s was not found.", results_file) return None
[docs] def register_callbacks(self): if self.config.market_specs.type == "custom": callback_function = self.custom_flexibility_callback elif self.config.market_specs.type == "single": callback_function = self.single_flexibility_callback elif self.config.market_specs.type == "random": callback_function = self.random_flexibility_callback self.set_random_seed(self.config.market_specs.options.random_seed) else: self.logger.error("No market type defined. Available market types are single, random " "and custom. Code will proceed without market interaction.") callback_function = self.dummy_callback self.agent.data_broker.register_callback( name="FlexibilityOffer", alias="FlexibilityOffer", callback=callback_function ) self.flex_offer_df = None self.cooldown_ticker = 0
[docs] def write_results(self, offer: FlexOffer): """Save the flex offer results depending on the config.""" if self.flex_offer_df is None: self.flex_offer_df = pd.DataFrame() df = offer.as_dataframe() index_first_level = [self.env.now] * len(df.index) multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index)) self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index))) indices = pd.MultiIndex.from_tuples(self.flex_offer_df.index, names=["time_step", "time"]) self.flex_offer_df.set_index(indices, inplace=True) if self.config.save_results: self.flex_offer_df.to_csv(self.config.results_file)
[docs] def random_flexibility_callback(self, inp: AgentVariable, name: str): """When a flexibility offer is sent, this function is called. The offer is accepted randomly. The factor self.offer_acceptance_rate determines the random factor for offer acceptance. self.pos_neg_rate is the random factor for the direction of the flexibility. A higher rate means that more positive offers will be accepted. Constraints: cooldown: during $cooldown steps after a flexibility event no offer is accepted minimum_average_flex: min amount of flexibility to be accepted, to account for the model error """ offer = inp.value # check if there is a flexibility provision and the cooldown is finished if not self.get("in_provision").value and self.cooldown_ticker == 0: if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate: profile = None # if random value is below pos_neg_rate, positive offer is accepted. # Otherwise, negative offer if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate: if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: profile = offer.base_power_profile - offer.pos_diff_profile offer.status = OfferStatus.accepted_positive.value elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: profile = offer.base_power_profile + offer.neg_diff_profile offer.status = OfferStatus.accepted_negative.value if profile is not None: profile = profile.dropna() profile.index += self.env.time self.set("_P_external", profile) self.abs_flex_event_end = profile.index[-1] self.set("in_provision", True) self.cooldown_ticker = self.config.market_specs.cooldown elif self.cooldown_ticker > 0: self.cooldown_ticker -= 1 self.write_results(offer)
[docs] def single_flexibility_callback(self, inp: AgentVariable, name: str): """Callback to activate a single, predefined flexibility offer.""" offer = inp.value profile = None t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0] acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value: if self.config.market_specs.options.direction == "positive": if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: profile = offer.base_power_profile - offer.pos_diff_profile offer.status = OfferStatus.accepted_positive.value elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: profile = offer.base_power_profile + offer.neg_diff_profile offer.status = OfferStatus.accepted_negative.value if profile is not None: profile = profile.dropna() profile.index += self.env.time self.set("_P_external", profile) self.abs_flex_event_end = profile.index[-1] self.set("in_provision", True) self.write_results(offer)
[docs] def custom_flexibility_callback(self, inp: AgentVariable, name: str): """Placeholder for a custom flexibility callback.""" pass
[docs] def dummy_callback(self, inp: AgentVariable, name: str): """Dummy function that is included, when market type is not specified.""" self.logger.warning("No market type provided. No market interaction.")
[docs] def cleanup_results(self): """Remove the results if they already exist.""" results_file = self.config.results_file if not results_file: return os.remove(results_file)
[docs] def process(self): while True: # End the provision at the appropriate time if self.abs_flex_event_end < self.env.time: self.set("in_provision", False) yield self.env.timeout(self.env.config.t_sample)