Coverage for agentlib_flexquant/modules/flexibility_market.py: 50%
138 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-03-26 09:43 +0000
1import os
2from pathlib import Path
3from typing import Optional, Union
5import agentlib
6import numpy as np
7import pandas as pd
8import agentlib_flexquant.data_structures.globals as glbs
9from agentlib.core.datamodels import AgentVariable
10from pydantic import ConfigDict, Field, model_validator
12from agentlib_flexquant.data_structures.flex_offer import FlexOffer, OfferStatus
13from agentlib_flexquant.data_structures.market import MarketSpecifications
16class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig):
18 model_config = ConfigDict(extra="forbid")
20 inputs: list[AgentVariable] = [AgentVariable(name="FlexibilityOffer")]
22 outputs: list[AgentVariable] = [
23 AgentVariable(
24 name=glbs.ACCEPTED_POWER_VAR_NAME, alias=glbs.ACCEPTED_POWER_VAR_NAME,
25 description="External Power IO"
26 ),
27 AgentVariable(
28 name=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
29 alias=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME,
30 description="relative start time of the flexibility event",
31 ),
32 AgentVariable(
33 name=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
34 alias=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME,
35 description="relative end time of the flexibility event",
36 ),
37 AgentVariable(
38 name=glbs.PROVISION_VAR_NAME,
39 alias=glbs.PROVISION_VAR_NAME,
40 description="Set if the system is in provision",
41 value=False,
42 ),
43 ]
45 parameters: list[AgentVariable] = [
46 AgentVariable(name=glbs.COLLOCATION_TIME_GRID, alias=glbs.COLLOCATION_TIME_GRID,
47 description="Time grid of the mpc model output"),
48 AgentVariable(name=glbs.TIME_STEP, unit="s", description="Time step of the mpc")
49 ]
51 market_specs: MarketSpecifications
53 results_file: Optional[Path] = Field(
54 default=Path("flexibility_market.csv"),
55 description="User specified results file name",
56 )
57 save_results: Optional[bool] = Field(validate_default=True, default=True)
59 shared_variable_fields: list[str] = ["outputs"]
61 @model_validator(mode="after")
62 def check_results_file_extension(self):
63 if self.results_file and self.results_file.suffix != ".csv":
64 raise ValueError(
65 f"Invalid file extension for 'results_file': '{self.results_file}'. "
66 f"Expected a '.csv' file."
67 )
68 return self
71class FlexibilityMarketModule(agentlib.BaseModule):
72 """Class to emulate flexibility market. Receives flex offers and accepts these."""
74 config: FlexibilityMarketModuleConfig
76 # DataFrame for flex offer. Multiindex: (time_step, time).
77 # Columns: pos_price, neg_price, status
78 flex_offer_df: pd.DataFrame = None
79 # absolute end time of a flexibility event (now + relative end time
80 # of the flexibility event on the mpc horizon)
81 abs_flex_event_end: Union[int, float] = 0
83 def set_random_seed(self, random_seed: int):
84 """Set the random seed for reproducibility."""
85 self.random_generator = np.random.default_rng(seed=random_seed)
87 def get_results(self) -> Optional[pd.DataFrame]:
88 """Open results file of flexibility_indicators.py."""
89 results_file = self.config.results_file
90 try:
91 results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
92 return results
93 except FileNotFoundError:
94 self.logger.error("Results file %s was not found.", results_file)
95 return None
97 def register_callbacks(self):
98 if self.config.market_specs.type == "custom":
99 callback_function = self.custom_flexibility_callback
100 elif self.config.market_specs.type == "single":
101 callback_function = self.single_flexibility_callback
102 elif self.config.market_specs.type == "random":
103 callback_function = self.random_flexibility_callback
104 self.set_random_seed(self.config.market_specs.options.random_seed)
105 else:
106 self.logger.error(
107 "No market type defined. Available market types are single, random "
108 "and custom. Code will proceed without market interaction."
109 )
110 callback_function = self.dummy_callback
112 self.agent.data_broker.register_callback(
113 name="FlexibilityOffer",
114 alias="FlexibilityOffer",
115 callback=callback_function,
116 )
118 self.flex_offer_df = None
119 self.cooldown_ticker = 0
121 def write_results(self, offer: FlexOffer):
122 """Save the flex offer results depending on the config."""
123 if self.flex_offer_df is None:
124 self.flex_offer_df = pd.DataFrame()
125 df = offer.as_dataframe()
126 index_first_level = [self.env.now] * len(df.index)
127 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index))
128 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index)))
129 indices = pd.MultiIndex.from_tuples(
130 self.flex_offer_df.index, names=["time_step", "time"]
131 )
132 self.flex_offer_df.set_index(indices, inplace=True)
134 if self.config.save_results:
135 self.flex_offer_df.to_csv(self.config.results_file)
137 def random_flexibility_callback(self, inp: AgentVariable, name: str):
138 """When a flexibility offer is sent, this function is called.
140 The offer is accepted randomly. The factor self.offer_acceptance_rate determines
141 the random factor for offer acceptance.
142 self.pos_neg_rate is the random factor for the direction of the flexibility.
143 A higher rate means that more positive offers will be accepted.
145 Constraints:
146 cooldown: during $cooldown steps after a flexibility event no offer is
147 accepted
148 minimum_average_flex: min amount of flexibility to be accepted,
149 to account for the model error
151 """
152 offer = inp.value
153 # check if there is a flexibility provision and the cooldown is finished
154 if not self.get(glbs.PROVISION_VAR_NAME).value and self.cooldown_ticker == 0:
155 if (
156 self.random_generator.random()
157 < self.config.market_specs.options.offer_acceptance_rate
158 ):
159 profile = None
160 # if random value is below pos_neg_rate, positive offer is accepted.
161 # Otherwise, negative offer
162 if (
163 self.random_generator.random()
164 < self.config.market_specs.options.pos_neg_rate
165 ):
166 if (
167 np.average(offer.pos_diff_profile)
168 > self.config.market_specs.minimum_average_flex
169 ):
170 profile = offer.base_power_profile - offer.pos_diff_profile
171 offer.status = OfferStatus.ACCEPTED_POSITIVE.value
173 elif (
174 np.average(offer.neg_diff_profile)
175 > self.config.market_specs.minimum_average_flex
176 ):
177 profile = offer.base_power_profile + offer.neg_diff_profile
178 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
180 if profile is not None:
181 # reindex the profile to the mpc output time grid
182 flex_power_feedback_method = (
183 self.config.market_specs.accepted_offer_sample_points)
184 if flex_power_feedback_method == glbs.COLLOCATION:
185 profile = profile.reindex(
186 self.get(glbs.COLLOCATION_TIME_GRID).value)
187 elif flex_power_feedback_method == glbs.CONSTANT:
188 index_to_keep = ~np.isin(
189 profile.index, self.get(glbs.COLLOCATION_TIME_GRID).value)
190 profile = profile.get(index_to_keep)
191 helper_indices = [i - 1 for i in profile.index[1:]]
192 new_index = sorted(set(profile.index.tolist() +
193 helper_indices))[:-1]
194 profile = profile.reindex(new_index).ffill()
195 profile = profile.dropna()
196 profile.index += self.env.time
197 self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile)
198 self.abs_flex_event_end = profile.index[-1]
199 self.set(glbs.PROVISION_VAR_NAME, True)
200 self.cooldown_ticker = self.config.market_specs.cooldown
202 elif self.cooldown_ticker > 0:
203 self.cooldown_ticker -= 1
205 self.write_results(offer)
207 def single_flexibility_callback(self, inp: AgentVariable, name: str):
208 """Callback to activate a single, predefined flexibility offer."""
209 offer = inp.value
210 profile = None
211 t_sample = self.get(glbs.TIME_STEP).value
212 acceptance_time_lower = (
213 self.env.config.offset +
214 self.config.market_specs.options.offer_acceptance_time
215 )
216 acceptance_time_upper = (
217 self.env.config.offset
218 + self.config.market_specs.options.offer_acceptance_time
219 + t_sample
220 )
221 if (
222 acceptance_time_lower <= self.env.now < acceptance_time_upper
223 and not self.get(glbs.PROVISION_VAR_NAME).value
224 ):
225 if self.config.market_specs.options.direction == "positive":
226 if (
227 np.average(offer.pos_diff_profile)
228 > self.config.market_specs.minimum_average_flex
229 ):
230 profile = offer.base_power_profile - offer.pos_diff_profile
231 offer.status = OfferStatus.ACCEPTED_POSITIVE.value
233 elif (
234 np.average(offer.neg_diff_profile)
235 > self.config.market_specs.minimum_average_flex
236 ):
237 profile = offer.base_power_profile + offer.neg_diff_profile
238 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value
240 if profile is not None:
241 # reindex the profile to the mpc output time grid
242 flex_power_feedback_method = (
243 self.config.market_specs.accepted_offer_sample_points)
244 if flex_power_feedback_method == glbs.COLLOCATION:
245 profile = profile.reindex(self.get(
246 glbs.COLLOCATION_TIME_GRID).value)
247 elif flex_power_feedback_method == glbs.CONSTANT:
248 index_to_keep = ~np.isin(profile.index,
249 self.get(glbs.COLLOCATION_TIME_GRID).value)
250 profile = profile.get(index_to_keep)
251 helper_indices = [i - 1 for i in profile.index[1:]]
252 new_index = sorted(set(profile.index.tolist() +
253 helper_indices))[:-1]
254 profile = profile.reindex(new_index).ffill()
255 profile = profile.dropna()
256 profile.index += self.env.time
257 self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile)
258 self.abs_flex_event_end = profile.index[-1]
259 self.set(glbs.PROVISION_VAR_NAME, True)
261 self.write_results(offer)
263 def custom_flexibility_callback(self, inp: AgentVariable, name: str):
264 """Placeholder for a custom flexibility callback."""
265 pass
267 def dummy_callback(self, inp: AgentVariable, name: str):
268 """Dummy function that is included, when market type is not specified."""
269 self.logger.warning("No market type provided. No market interaction.")
271 def cleanup_results(self):
272 """Remove the results if they already exist."""
273 results_file = self.config.results_file
274 if not results_file:
275 return
276 os.remove(results_file)
278 def process(self):
279 while True:
280 # End the provision at the appropriate time
281 if self.abs_flex_event_end <= self.env.time:
282 self.set(glbs.PROVISION_VAR_NAME, False)
283 yield self.env.timeout(self.env.config.t_sample)