Coverage for agentlib_flexquant/modules/flexibility_market.py: 50%

138 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-03-26 09:43 +0000

1import os 

2from pathlib import Path 

3from typing import Optional, Union 

4 

5import agentlib 

6import numpy as np 

7import pandas as pd 

8import agentlib_flexquant.data_structures.globals as glbs 

9from agentlib.core.datamodels import AgentVariable 

10from pydantic import ConfigDict, Field, model_validator 

11 

12from agentlib_flexquant.data_structures.flex_offer import FlexOffer, OfferStatus 

13from agentlib_flexquant.data_structures.market import MarketSpecifications 

14 

15 

16class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig): 

17 

18 model_config = ConfigDict(extra="forbid") 

19 

20 inputs: list[AgentVariable] = [AgentVariable(name="FlexibilityOffer")] 

21 

22 outputs: list[AgentVariable] = [ 

23 AgentVariable( 

24 name=glbs.ACCEPTED_POWER_VAR_NAME, alias=glbs.ACCEPTED_POWER_VAR_NAME, 

25 description="External Power IO" 

26 ), 

27 AgentVariable( 

28 name=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME, 

29 alias=glbs.RELATIVE_EVENT_START_TIME_VAR_NAME, 

30 description="relative start time of the flexibility event", 

31 ), 

32 AgentVariable( 

33 name=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME, 

34 alias=glbs.RELATIVE_EVENT_END_TIME_VAR_NAME, 

35 description="relative end time of the flexibility event", 

36 ), 

37 AgentVariable( 

38 name=glbs.PROVISION_VAR_NAME, 

39 alias=glbs.PROVISION_VAR_NAME, 

40 description="Set if the system is in provision", 

41 value=False, 

42 ), 

43 ] 

44 

45 parameters: list[AgentVariable] = [ 

46 AgentVariable(name=glbs.COLLOCATION_TIME_GRID, alias=glbs.COLLOCATION_TIME_GRID, 

47 description="Time grid of the mpc model output"), 

48 AgentVariable(name=glbs.TIME_STEP, unit="s", description="Time step of the mpc") 

49 ] 

50 

51 market_specs: MarketSpecifications 

52 

53 results_file: Optional[Path] = Field( 

54 default=Path("flexibility_market.csv"), 

55 description="User specified results file name", 

56 ) 

57 save_results: Optional[bool] = Field(validate_default=True, default=True) 

58 

59 shared_variable_fields: list[str] = ["outputs"] 

60 

61 @model_validator(mode="after") 

62 def check_results_file_extension(self): 

63 if self.results_file and self.results_file.suffix != ".csv": 

64 raise ValueError( 

65 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

66 f"Expected a '.csv' file." 

67 ) 

68 return self 

69 

70 

71class FlexibilityMarketModule(agentlib.BaseModule): 

72 """Class to emulate flexibility market. Receives flex offers and accepts these.""" 

73 

74 config: FlexibilityMarketModuleConfig 

75 

76 # DataFrame for flex offer. Multiindex: (time_step, time). 

77 # Columns: pos_price, neg_price, status 

78 flex_offer_df: pd.DataFrame = None 

79 # absolute end time of a flexibility event (now + relative end time 

80 # of the flexibility event on the mpc horizon) 

81 abs_flex_event_end: Union[int, float] = 0 

82 

83 def set_random_seed(self, random_seed: int): 

84 """Set the random seed for reproducibility.""" 

85 self.random_generator = np.random.default_rng(seed=random_seed) 

86 

87 def get_results(self) -> Optional[pd.DataFrame]: 

88 """Open results file of flexibility_indicators.py.""" 

89 results_file = self.config.results_file 

90 try: 

91 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

92 return results 

93 except FileNotFoundError: 

94 self.logger.error("Results file %s was not found.", results_file) 

95 return None 

96 

97 def register_callbacks(self): 

98 if self.config.market_specs.type == "custom": 

99 callback_function = self.custom_flexibility_callback 

100 elif self.config.market_specs.type == "single": 

101 callback_function = self.single_flexibility_callback 

102 elif self.config.market_specs.type == "random": 

103 callback_function = self.random_flexibility_callback 

104 self.set_random_seed(self.config.market_specs.options.random_seed) 

105 else: 

106 self.logger.error( 

107 "No market type defined. Available market types are single, random " 

108 "and custom. Code will proceed without market interaction." 

109 ) 

110 callback_function = self.dummy_callback 

111 

112 self.agent.data_broker.register_callback( 

113 name="FlexibilityOffer", 

114 alias="FlexibilityOffer", 

115 callback=callback_function, 

116 ) 

117 

118 self.flex_offer_df = None 

119 self.cooldown_ticker = 0 

120 

121 def write_results(self, offer: FlexOffer): 

122 """Save the flex offer results depending on the config.""" 

123 if self.flex_offer_df is None: 

124 self.flex_offer_df = pd.DataFrame() 

125 df = offer.as_dataframe() 

126 index_first_level = [self.env.now] * len(df.index) 

127 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index)) 

128 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index))) 

129 indices = pd.MultiIndex.from_tuples( 

130 self.flex_offer_df.index, names=["time_step", "time"] 

131 ) 

132 self.flex_offer_df.set_index(indices, inplace=True) 

133 

134 if self.config.save_results: 

135 self.flex_offer_df.to_csv(self.config.results_file) 

136 

137 def random_flexibility_callback(self, inp: AgentVariable, name: str): 

138 """When a flexibility offer is sent, this function is called. 

139 

140 The offer is accepted randomly. The factor self.offer_acceptance_rate determines 

141 the random factor for offer acceptance. 

142 self.pos_neg_rate is the random factor for the direction of the flexibility. 

143 A higher rate means that more positive offers will be accepted. 

144 

145 Constraints: 

146 cooldown: during $cooldown steps after a flexibility event no offer is 

147 accepted 

148 minimum_average_flex: min amount of flexibility to be accepted, 

149 to account for the model error 

150 

151 """ 

152 offer = inp.value 

153 # check if there is a flexibility provision and the cooldown is finished 

154 if not self.get(glbs.PROVISION_VAR_NAME).value and self.cooldown_ticker == 0: 

155 if ( 

156 self.random_generator.random() 

157 < self.config.market_specs.options.offer_acceptance_rate 

158 ): 

159 profile = None 

160 # if random value is below pos_neg_rate, positive offer is accepted. 

161 # Otherwise, negative offer 

162 if ( 

163 self.random_generator.random() 

164 < self.config.market_specs.options.pos_neg_rate 

165 ): 

166 if ( 

167 np.average(offer.pos_diff_profile) 

168 > self.config.market_specs.minimum_average_flex 

169 ): 

170 profile = offer.base_power_profile - offer.pos_diff_profile 

171 offer.status = OfferStatus.ACCEPTED_POSITIVE.value 

172 

173 elif ( 

174 np.average(offer.neg_diff_profile) 

175 > self.config.market_specs.minimum_average_flex 

176 ): 

177 profile = offer.base_power_profile + offer.neg_diff_profile 

178 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value 

179 

180 if profile is not None: 

181 # reindex the profile to the mpc output time grid 

182 flex_power_feedback_method = ( 

183 self.config.market_specs.accepted_offer_sample_points) 

184 if flex_power_feedback_method == glbs.COLLOCATION: 

185 profile = profile.reindex( 

186 self.get(glbs.COLLOCATION_TIME_GRID).value) 

187 elif flex_power_feedback_method == glbs.CONSTANT: 

188 index_to_keep = ~np.isin( 

189 profile.index, self.get(glbs.COLLOCATION_TIME_GRID).value) 

190 profile = profile.get(index_to_keep) 

191 helper_indices = [i - 1 for i in profile.index[1:]] 

192 new_index = sorted(set(profile.index.tolist() + 

193 helper_indices))[:-1] 

194 profile = profile.reindex(new_index).ffill() 

195 profile = profile.dropna() 

196 profile.index += self.env.time 

197 self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile) 

198 self.abs_flex_event_end = profile.index[-1] 

199 self.set(glbs.PROVISION_VAR_NAME, True) 

200 self.cooldown_ticker = self.config.market_specs.cooldown 

201 

202 elif self.cooldown_ticker > 0: 

203 self.cooldown_ticker -= 1 

204 

205 self.write_results(offer) 

206 

207 def single_flexibility_callback(self, inp: AgentVariable, name: str): 

208 """Callback to activate a single, predefined flexibility offer.""" 

209 offer = inp.value 

210 profile = None 

211 t_sample = self.get(glbs.TIME_STEP).value 

212 acceptance_time_lower = ( 

213 self.env.config.offset + 

214 self.config.market_specs.options.offer_acceptance_time 

215 ) 

216 acceptance_time_upper = ( 

217 self.env.config.offset 

218 + self.config.market_specs.options.offer_acceptance_time 

219 + t_sample 

220 ) 

221 if ( 

222 acceptance_time_lower <= self.env.now < acceptance_time_upper 

223 and not self.get(glbs.PROVISION_VAR_NAME).value 

224 ): 

225 if self.config.market_specs.options.direction == "positive": 

226 if ( 

227 np.average(offer.pos_diff_profile) 

228 > self.config.market_specs.minimum_average_flex 

229 ): 

230 profile = offer.base_power_profile - offer.pos_diff_profile 

231 offer.status = OfferStatus.ACCEPTED_POSITIVE.value 

232 

233 elif ( 

234 np.average(offer.neg_diff_profile) 

235 > self.config.market_specs.minimum_average_flex 

236 ): 

237 profile = offer.base_power_profile + offer.neg_diff_profile 

238 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value 

239 

240 if profile is not None: 

241 # reindex the profile to the mpc output time grid 

242 flex_power_feedback_method = ( 

243 self.config.market_specs.accepted_offer_sample_points) 

244 if flex_power_feedback_method == glbs.COLLOCATION: 

245 profile = profile.reindex(self.get( 

246 glbs.COLLOCATION_TIME_GRID).value) 

247 elif flex_power_feedback_method == glbs.CONSTANT: 

248 index_to_keep = ~np.isin(profile.index, 

249 self.get(glbs.COLLOCATION_TIME_GRID).value) 

250 profile = profile.get(index_to_keep) 

251 helper_indices = [i - 1 for i in profile.index[1:]] 

252 new_index = sorted(set(profile.index.tolist() + 

253 helper_indices))[:-1] 

254 profile = profile.reindex(new_index).ffill() 

255 profile = profile.dropna() 

256 profile.index += self.env.time 

257 self.set(glbs.ACCEPTED_POWER_VAR_NAME, profile) 

258 self.abs_flex_event_end = profile.index[-1] 

259 self.set(glbs.PROVISION_VAR_NAME, True) 

260 

261 self.write_results(offer) 

262 

263 def custom_flexibility_callback(self, inp: AgentVariable, name: str): 

264 """Placeholder for a custom flexibility callback.""" 

265 pass 

266 

267 def dummy_callback(self, inp: AgentVariable, name: str): 

268 """Dummy function that is included, when market type is not specified.""" 

269 self.logger.warning("No market type provided. No market interaction.") 

270 

271 def cleanup_results(self): 

272 """Remove the results if they already exist.""" 

273 results_file = self.config.results_file 

274 if not results_file: 

275 return 

276 os.remove(results_file) 

277 

278 def process(self): 

279 while True: 

280 # End the provision at the appropriate time 

281 if self.abs_flex_event_end <= self.env.time: 

282 self.set(glbs.PROVISION_VAR_NAME, False) 

283 yield self.env.timeout(self.env.config.t_sample)