Coverage for agentlib_flexquant/modules/flexibility_market.py: 50%
138 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1import os
2from pathlib import Path
3from typing import Optional, Union
5import agentlib
6import numpy as np
7import pandas as pd
8import agentlib_flexquant.data_structures.globals as glbs
9from agentlib.core.datamodels import AgentVariable
10from pydantic import ConfigDict, Field, model_validator
12from agentlib_flexquant.data_structures.flex_offer import FlexOffer, OfferStatus
13from agentlib_flexquant.data_structures.market import MarketSpecifications
16class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
18 model_config = ConfigDict(extra="forbid")
20 inputs: list[AgentVariable] = [AgentVariable(name="FlexibilityOffer")]
22 outputs: list[AgentVariable] = [
23 AgentVariable(
24 name="_P_external", alias="_P_external", description="External Power IO"
25 ),
26 AgentVariable(
27 name="rel_start",
28 alias="rel_start",
29 description="relative start time of the flexibility event",
30 ),
31 AgentVariable(
32 name="rel_end",
33 alias="rel_end",
34 description="relative end time of the flexibility event",
35 ),
36 AgentVariable(
37 name="in_provision",
38 alias="in_provision",
39 description="Set if the system is in provision",
40 value=False,
41 ),
42 ]
44 parameters: list[AgentVariable] = [
45 AgentVariable(name=glbs.COLLOCATION_TIME_GRID, alias=glbs.COLLOCATION_TIME_GRID,
46 description="Time grid of the mpc model output"),
47 AgentVariable(name=glbs.TIME_STEP, unit="s", description="Time step of the mpc")
48 ]
50 market_specs: MarketSpecifications
52 results_file: Optional[Path] = Field(
53 default=Path("flexibility_market.csv"),
54 description="User specified results file name",
55 )
56 save_results: Optional[bool] = Field(validate_default=True, default=True)
58 shared_variable_fields: list[str] = ["outputs"]
60 @model_validator(mode="after")
61 def check_results_file_extension(self):
62 if self.results_file and self.results_file.suffix != ".csv":
63 raise ValueError(
64 f"Invalid file extension for 'results_file': '{self.results_file}'. "
65 f"Expected a '.csv' file."
66 )
67 return self
70class FlexibilityMarketModule(agentlib.BaseModule):
71 """Class to emulate flexibility market. Receives flex offers and accepts these."""
73 config: FlexibilityMarketModuleConfig
75 # DataFrame for flex offer. Multiindex: (time_step, time).
76 # Columns: pos_price, neg_price, status
77 flex_offer_df: pd.DataFrame = None
78 # absolute end time of a flexibility event (now + relative end time of the flexibility
79 # event on the mpc horizon)
80 abs_flex_event_end: Union[int, float] = 0
82 def set_random_seed(self, random_seed: int):
83 """Set the random seed for reproducibility."""
84 self.random_generator = np.random.default_rng(seed=random_seed)
86 def get_results(self) -> Optional[pd.DataFrame]:
87 """Open results file of flexibility_indicators.py."""
88 results_file = self.config.results_file
89 try:
90 results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
91 return results
92 except FileNotFoundError:
93 self.logger.error("Results file %s was not found.", results_file)
94 return None
96 def register_callbacks(self):
97 if self.config.market_specs.type == "custom":
98 callback_function = self.custom_flexibility_callback
99 elif self.config.market_specs.type == "single":
100 callback_function = self.single_flexibility_callback
101 elif self.config.market_specs.type == "random":
102 callback_function = self.random_flexibility_callback
103 self.set_random_seed(self.config.market_specs.options.random_seed)
104 else:
105 self.logger.error(
106 "No market type defined. Available market types are single, random "
107 "and custom. Code will proceed without market interaction."
108 )
109 callback_function = self.dummy_callback
111 self.agent.data_broker.register_callback(
112 name="FlexibilityOffer",
113 alias="FlexibilityOffer",
114 callback=callback_function,
115 )
117 self.flex_offer_df = None
118 self.cooldown_ticker = 0
120 def write_results(self, offer: FlexOffer):
121 """Save the flex offer results depending on the config."""
122 if self.flex_offer_df is None:
123 self.flex_offer_df = pd.DataFrame()
124 df = offer.as_dataframe()
125 index_first_level = [self.env.now] * len(df.index)
126 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
127 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index)))
128 indices = pd.MultiIndex.from_tuples(
129 self.flex_offer_df.index, names=["time_step", "time"]
130 )
131 self.flex_offer_df.set_index(indices, inplace=True)
133 if self.config.save_results:
134 self.flex_offer_df.to_csv(self.config.results_file)
136 def random_flexibility_callback(self, inp: AgentVariable, name: str):
137 """When a flexibility offer is sent, this function is called.
139 The offer is accepted randomly. The factor self.offer_acceptance_rate determines
140 the random factor for offer acceptance.
141 self.pos_neg_rate is the random factor for the direction of the flexibility.
142 A higher rate means that more positive offers will be accepted.
144 Constraints:
145 cooldown: during $cooldown steps after a flexibility event no offer is accepted
146 minimum_average_flex: min amount of flexibility to be accepted,
147 to account for the model error
149 """
150 offer = inp.value
151 # check if there is a flexibility provision and the cooldown is finished
152 if not self.get("in_provision").value and self.cooldown_ticker == 0:
153 if (
154 self.random_generator.random()
155 < self.config.market_specs.options.offer_acceptance_rate
156 ):
157 profile = None
158 # if random value is below pos_neg_rate, positive offer is accepted.
159 # Otherwise, negative offer
160 if (
161 self.random_generator.random()
162 < self.config.market_specs.options.pos_neg_rate
163 ):
164 if (
165 np.average(offer.pos_diff_profile)
166 > self.config.market_specs.minimum_average_flex
167 ):
168 profile = offer.base_power_profile - offer.pos_diff_profile
169 offer.status = OfferStatus.ACCEPTED_POSITIVE.value
171 elif (
172 np.average(offer.neg_diff_profile)
173 > self.config.market_specs.minimum_average_flex
174 ):
175 profile = offer.base_power_profile + offer.neg_diff_profile
176 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
178 if profile is not None:
179 # reindex the profile to the mpc output time grid
180 flex_power_feedback_method = self.config.market_specs.accepted_offer_sample_points
181 if flex_power_feedback_method == glbs.COLLOCATION:
182 profile = profile.reindex(self.get(glbs.COLLOCATION_TIME_GRID).value)
183 elif flex_power_feedback_method == glbs.CONSTANT:
184 index_to_keep = ~np.isin(profile.index,
185 self.get(glbs.COLLOCATION_TIME_GRID).value)
186 profile = profile.get(index_to_keep)
187 helper_indices = [i - 1 for i in profile.index[1:]]
188 new_index = sorted(set(profile.index.tolist() + helper_indices))[:-1]
189 profile = profile.reindex(new_index).ffill()
190 profile = profile.dropna()
191 profile.index += self.env.time
192 self.set("_P_external", profile)
193 self.abs_flex_event_end = profile.index[-1]
194 self.set("in_provision", True)
195 self.cooldown_ticker = self.config.market_specs.cooldown
197 elif self.cooldown_ticker > 0:
198 self.cooldown_ticker -= 1
200 self.write_results(offer)
202 def single_flexibility_callback(self, inp: AgentVariable, name: str):
203 """Callback to activate a single, predefined flexibility offer."""
204 offer = inp.value
205 profile = None
206 t_sample = self.get(glbs.TIME_STEP).value
207 acceptance_time_lower = (
208 self.env.config.offset + self.config.market_specs.options.offer_acceptance_time
209 )
210 acceptance_time_upper = (
211 self.env.config.offset
212 + self.config.market_specs.options.offer_acceptance_time
213 + t_sample
214 )
215 if (
216 acceptance_time_lower <= self.env.now < acceptance_time_upper
217 and not self.get("in_provision").value
218 ):
219 if self.config.market_specs.options.direction == "positive":
220 if (
221 np.average(offer.pos_diff_profile)
222 > self.config.market_specs.minimum_average_flex
223 ):
224 profile = offer.base_power_profile - offer.pos_diff_profile
225 offer.status = OfferStatus.ACCEPTED_POSITIVE.value
227 elif (
228 np.average(offer.neg_diff_profile)
229 > self.config.market_specs.minimum_average_flex
230 ):
231 profile = offer.base_power_profile + offer.neg_diff_profile
232 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
234 if profile is not None:
235 # reindex the profile to the mpc output time grid
236 flex_power_feedback_method = self.config.market_specs.accepted_offer_sample_points
237 if flex_power_feedback_method == glbs.COLLOCATION:
238 profile = profile.reindex(self.get(glbs.COLLOCATION_TIME_GRID).value)
239 elif flex_power_feedback_method == glbs.CONSTANT:
240 index_to_keep = ~np.isin(profile.index,
241 self.get(glbs.COLLOCATION_TIME_GRID).value)
242 profile = profile.get(index_to_keep)
243 helper_indices = [i - 1 for i in profile.index[1:]]
244 new_index = sorted(set(profile.index.tolist() + helper_indices))[:-1]
245 profile = profile.reindex(new_index).ffill()
246 profile = profile.dropna()
247 profile.index += self.env.time
248 self.set("_P_external", profile)
249 self.abs_flex_event_end = profile.index[-1]
250 self.set("in_provision", True)
252 self.write_results(offer)
254 def custom_flexibility_callback(self, inp: AgentVariable, name: str):
255 """Placeholder for a custom flexibility callback."""
256 pass
258 def dummy_callback(self, inp: AgentVariable, name: str):
259 """Dummy function that is included, when market type is not specified."""
260 self.logger.warning("No market type provided. No market interaction.")
262 def cleanup_results(self):
263 """Remove the results if they already exist."""
264 results_file = self.config.results_file
265 if not results_file:
266 return
267 os.remove(results_file)
269 def process(self):
270 while True:
271 # End the provision at the appropriate time
272 if self.abs_flex_event_end <= self.env.time:
273 self.set("in_provision", False)
274 yield self.env.timeout(self.env.config.t_sample)