Coverage for agentlib_flexquant/modules/flexibility_market.py: 57%
118 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
1import os
2import numpy as np
3import pandas as pd
4from pathlib import Path
5from typing import List, Optional, Union
6from pydantic import model_validator, ConfigDict, Field
7import agentlib
8from agentlib.core.datamodels import AgentVariable
9from agentlib_flexquant.data_structures.flex_offer import OfferStatus, FlexOffer
10from agentlib_flexquant.data_structures.market import MarketSpecifications
13class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
15 model_config = ConfigDict(
16 extra='forbid'
17 )
19 inputs: List[AgentVariable] = [
20 AgentVariable(name="FlexibilityOffer")
21 ]
23 outputs: List[AgentVariable] = [
24 AgentVariable(
25 name="_P_external", alias="_P_external",
26 description="External Power IO"
27 ),
28 AgentVariable(
29 name="rel_start", alias="rel_start",
30 description="relative start time of the flexibility event"
31 ),
32 AgentVariable(
33 name="rel_end", alias="rel_end",
34 description="relative end time of the flexibility event"
35 ),
36 AgentVariable(
37 name="in_provision", alias="in_provision",
38 description="Set if the system is in provision", value=False
39 )
40 ]
42 market_specs: MarketSpecifications
44 results_file: Optional[Path] = Field(
45 default=Path("flexibility_market.csv"),
46 description="User specified results file name"
47 )
48 save_results: Optional[bool] = Field(
49 validate_default=True,
50 default=True
51 )
53 shared_variable_fields: List[str] = ["outputs"]
55 @model_validator(mode="after")
56 def check_results_file_extension(self):
57 if self.results_file and self.results_file.suffix != ".csv":
58 raise ValueError(
59 f"Invalid file extension for 'results_file': '{self.results_file}'. "
60 f"Expected a '.csv' file."
61 )
62 return self
65class FlexibilityMarketModule(agentlib.BaseModule):
66 """Class to emulate flexibility market. Receives flex offers and accepts these."""
67 config: FlexibilityMarketModuleConfig
69 # DataFrame for flex offer. Multiindex: (time_step, time). Columns: pos_price, neg_price, status
70 flex_offer_df: pd.DataFrame = None
71 # absolute end time of a flexibility event (now + relative end time of the flexibility event on the mpc horizon)
72 abs_flex_event_end: Union[int, float] = 0
74 def set_random_seed(self, random_seed: int):
75 """Set the random seed for reproducibility."""
76 self.random_generator = np.random.default_rng(seed=random_seed)
78 def get_results(self) -> Optional[pd.DataFrame]:
79 """Open results file of flexibility_indicators.py."""
80 results_file = self.config.results_file
81 try:
82 results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
83 return results
84 except FileNotFoundError:
85 self.logger.error("Results file %s was not found.", results_file)
86 return None
88 def register_callbacks(self):
89 if self.config.market_specs.type == "custom":
90 callback_function = self.custom_flexibility_callback
91 elif self.config.market_specs.type == "single":
92 callback_function = self.single_flexibility_callback
93 elif self.config.market_specs.type == "random":
94 callback_function = self.random_flexibility_callback
95 self.set_random_seed(self.config.market_specs.options.random_seed)
96 else:
97 self.logger.error("No market type defined. Available market types are single, random "
98 "and custom. Code will proceed without market interaction.")
99 callback_function = self.dummy_callback
101 self.agent.data_broker.register_callback(
102 name="FlexibilityOffer", alias="FlexibilityOffer",
103 callback=callback_function
104 )
106 self.flex_offer_df = None
107 self.cooldown_ticker = 0
109 def write_results(self, offer: FlexOffer):
110 """Save the flex offer results depending on the config."""
111 if self.flex_offer_df is None:
112 self.flex_offer_df = pd.DataFrame()
113 df = offer.as_dataframe()
114 index_first_level = [self.env.now] * len(df.index)
115 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
116 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index)))
117 indices = pd.MultiIndex.from_tuples(self.flex_offer_df.index, names=["time_step", "time"])
118 self.flex_offer_df.set_index(indices, inplace=True)
120 if self.config.save_results:
121 self.flex_offer_df.to_csv(self.config.results_file)
123 def random_flexibility_callback(self, inp: AgentVariable, name: str):
124 """When a flexibility offer is sent, this function is called.
126 The offer is accepted randomly. The factor self.offer_acceptance_rate determines the random factor for offer acceptance.
127 self.pos_neg_rate is the random factor for the direction of the flexibility.
128 A higher rate means that more positive offers will be accepted.
130 Constraints:
131 cooldown: during $cooldown steps after a flexibility event no offer is accepted
132 minimum_average_flex: min amount of flexibility to be accepted, to account for the model error
134 """
135 offer = inp.value
136 # check if there is a flexibility provision and the cooldown is finished
137 if not self.get("in_provision").value and self.cooldown_ticker == 0:
138 if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate:
139 profile = None
140 # if random value is below pos_neg_rate, positive offer is accepted.
141 # Otherwise, negative offer
142 if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate:
143 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
144 profile = offer.base_power_profile - offer.pos_diff_profile
145 offer.status = OfferStatus.accepted_positive.value
147 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
148 profile = offer.base_power_profile + offer.neg_diff_profile
149 offer.status = OfferStatus.accepted_negative.value
151 if profile is not None:
152 profile = profile.dropna()
153 profile.index += self.env.time
154 self.set("_P_external", profile)
155 self.abs_flex_event_end = profile.index[-1]
156 self.set("in_provision", True)
157 self.cooldown_ticker = self.config.market_specs.cooldown
159 elif self.cooldown_ticker > 0:
160 self.cooldown_ticker -= 1
162 self.write_results(offer)
164 def single_flexibility_callback(self, inp: AgentVariable, name: str):
165 """Callback to activate a single, predefined flexibility offer."""
166 offer = inp.value
167 profile = None
168 t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0]
169 acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time
170 acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample
171 if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value:
172 if self.config.market_specs.options.direction == "positive":
173 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex:
174 profile = offer.base_power_profile - offer.pos_diff_profile
175 offer.status = OfferStatus.accepted_positive.value
177 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex:
178 profile = offer.base_power_profile + offer.neg_diff_profile
179 offer.status = OfferStatus.accepted_negative.value
181 if profile is not None:
182 profile = profile.dropna()
183 profile.index += self.env.time
184 self.set("_P_external", profile)
185 self.abs_flex_event_end = profile.index[-1]
186 self.set("in_provision", True)
188 self.write_results(offer)
190 def custom_flexibility_callback(self, inp: AgentVariable, name: str):
191 """Placeholder for a custom flexibility callback."""
192 pass
194 def dummy_callback(self, inp: AgentVariable, name: str):
195 """Dummy function that is included, when market type is not specified."""
196 self.logger.warning("No market type provided. No market interaction.")
198 def cleanup_results(self):
199 """Remove the results if they already exist."""
200 results_file = self.config.results_file
201 if not results_file:
202 return
203 os.remove(results_file)
205 def process(self):
206 while True:
207 # End the provision at the appropriate time
208 if self.abs_flex_event_end < self.env.time:
209 self.set("in_provision", False)
210 yield self.env.timeout(self.env.config.t_sample)