Coverage for agentlib_flexquant/modules/flexibility_market.py: 50%

138 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1import os 

2from pathlib import Path 

3from typing import Optional, Union 

4 

5import agentlib 

6import numpy as np 

7import pandas as pd 

8import agentlib_flexquant.data_structures.globals as glbs 

9from agentlib.core.datamodels import AgentVariable 

10from pydantic import ConfigDict, Field, model_validator 

11 

12from agentlib_flexquant.data_structures.flex_offer import FlexOffer, OfferStatus 

13from agentlib_flexquant.data_structures.market import MarketSpecifications 

14 

15 

16class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig): 

17 

18 model_config = ConfigDict(extra="forbid") 

19 

20 inputs: list[AgentVariable] = [AgentVariable(name="FlexibilityOffer")] 

21 

22 outputs: list[AgentVariable] = [ 

23 AgentVariable( 

24 name="_P_external", alias="_P_external", description="External Power IO" 

25 ), 

26 AgentVariable( 

27 name="rel_start", 

28 alias="rel_start", 

29 description="relative start time of the flexibility event", 

30 ), 

31 AgentVariable( 

32 name="rel_end", 

33 alias="rel_end", 

34 description="relative end time of the flexibility event", 

35 ), 

36 AgentVariable( 

37 name="in_provision", 

38 alias="in_provision", 

39 description="Set if the system is in provision", 

40 value=False, 

41 ), 

42 ] 

43 

44 parameters: list[AgentVariable] = [ 

45 AgentVariable(name=glbs.COLLOCATION_TIME_GRID, alias=glbs.COLLOCATION_TIME_GRID, 

46 description="Time grid of the mpc model output"), 

47 AgentVariable(name=glbs.TIME_STEP, unit="s", description="Time step of the mpc") 

48 ] 

49 

50 market_specs: MarketSpecifications 

51 

52 results_file: Optional[Path] = Field( 

53 default=Path("flexibility_market.csv"), 

54 description="User specified results file name", 

55 ) 

56 save_results: Optional[bool] = Field(validate_default=True, default=True) 

57 

58 shared_variable_fields: list[str] = ["outputs"] 

59 

60 @model_validator(mode="after") 

61 def check_results_file_extension(self): 

62 if self.results_file and self.results_file.suffix != ".csv": 

63 raise ValueError( 

64 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

65 f"Expected a '.csv' file." 

66 ) 

67 return self 

68 

69 

70class FlexibilityMarketModule(agentlib.BaseModule): 

71 """Class to emulate flexibility market. Receives flex offers and accepts these.""" 

72 

73 config: FlexibilityMarketModuleConfig 

74 

75 # DataFrame for flex offer. Multiindex: (time_step, time). 

76 # Columns: pos_price, neg_price, status 

77 flex_offer_df: pd.DataFrame = None 

78 # absolute end time of a flexibility event (now + relative end time of the flexibility 

79 # event on the mpc horizon) 

80 abs_flex_event_end: Union[int, float] = 0 

81 

82 def set_random_seed(self, random_seed: int): 

83 """Set the random seed for reproducibility.""" 

84 self.random_generator = np.random.default_rng(seed=random_seed) 

85 

86 def get_results(self) -> Optional[pd.DataFrame]: 

87 """Open results file of flexibility_indicators.py.""" 

88 results_file = self.config.results_file 

89 try: 

90 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

91 return results 

92 except FileNotFoundError: 

93 self.logger.error("Results file %s was not found.", results_file) 

94 return None 

95 

96 def register_callbacks(self): 

97 if self.config.market_specs.type == "custom": 

98 callback_function = self.custom_flexibility_callback 

99 elif self.config.market_specs.type == "single": 

100 callback_function = self.single_flexibility_callback 

101 elif self.config.market_specs.type == "random": 

102 callback_function = self.random_flexibility_callback 

103 self.set_random_seed(self.config.market_specs.options.random_seed) 

104 else: 

105 self.logger.error( 

106 "No market type defined. Available market types are single, random " 

107 "and custom. Code will proceed without market interaction." 

108 ) 

109 callback_function = self.dummy_callback 

110 

111 self.agent.data_broker.register_callback( 

112 name="FlexibilityOffer", 

113 alias="FlexibilityOffer", 

114 callback=callback_function, 

115 ) 

116 

117 self.flex_offer_df = None 

118 self.cooldown_ticker = 0 

119 

120 def write_results(self, offer: FlexOffer): 

121 """Save the flex offer results depending on the config.""" 

122 if self.flex_offer_df is None: 

123 self.flex_offer_df = pd.DataFrame() 

124 df = offer.as_dataframe() 

125 index_first_level = [self.env.now] * len(df.index) 

126 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index)) 

127 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index))) 

128 indices = pd.MultiIndex.from_tuples( 

129 self.flex_offer_df.index, names=["time_step", "time"] 

130 ) 

131 self.flex_offer_df.set_index(indices, inplace=True) 

132 

133 if self.config.save_results: 

134 self.flex_offer_df.to_csv(self.config.results_file) 

135 

136 def random_flexibility_callback(self, inp: AgentVariable, name: str): 

137 """When a flexibility offer is sent, this function is called. 

138 

139 The offer is accepted randomly. The factor self.offer_acceptance_rate determines 

140 the random factor for offer acceptance. 

141 self.pos_neg_rate is the random factor for the direction of the flexibility. 

142 A higher rate means that more positive offers will be accepted. 

143 

144 Constraints: 

145 cooldown: during $cooldown steps after a flexibility event no offer is accepted 

146 minimum_average_flex: min amount of flexibility to be accepted, 

147 to account for the model error 

148 

149 """ 

150 offer = inp.value 

151 # check if there is a flexibility provision and the cooldown is finished 

152 if not self.get("in_provision").value and self.cooldown_ticker == 0: 

153 if ( 

154 self.random_generator.random() 

155 < self.config.market_specs.options.offer_acceptance_rate 

156 ): 

157 profile = None 

158 # if random value is below pos_neg_rate, positive offer is accepted. 

159 # Otherwise, negative offer 

160 if ( 

161 self.random_generator.random() 

162 < self.config.market_specs.options.pos_neg_rate 

163 ): 

164 if ( 

165 np.average(offer.pos_diff_profile) 

166 > self.config.market_specs.minimum_average_flex 

167 ): 

168 profile = offer.base_power_profile - offer.pos_diff_profile 

169 offer.status = OfferStatus.ACCEPTED_POSITIVE.value 

170 

171 elif ( 

172 np.average(offer.neg_diff_profile) 

173 > self.config.market_specs.minimum_average_flex 

174 ): 

175 profile = offer.base_power_profile + offer.neg_diff_profile 

176 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value 

177 

178 if profile is not None: 

179 # reindex the profile to the mpc output time grid 

180 flex_power_feedback_method = self.config.market_specs.accepted_offer_sample_points 

181 if flex_power_feedback_method == glbs.COLLOCATION: 

182 profile = profile.reindex(self.get(glbs.COLLOCATION_TIME_GRID).value) 

183 elif flex_power_feedback_method == glbs.CONSTANT: 

184 index_to_keep = ~np.isin(profile.index, 

185 self.get(glbs.COLLOCATION_TIME_GRID).value) 

186 profile = profile.get(index_to_keep) 

187 helper_indices = [i - 1 for i in profile.index[1:]] 

188 new_index = sorted(set(profile.index.tolist() + helper_indices))[:-1] 

189 profile = profile.reindex(new_index).ffill() 

190 profile = profile.dropna() 

191 profile.index += self.env.time 

192 self.set("_P_external", profile) 

193 self.abs_flex_event_end = profile.index[-1] 

194 self.set("in_provision", True) 

195 self.cooldown_ticker = self.config.market_specs.cooldown 

196 

197 elif self.cooldown_ticker > 0: 

198 self.cooldown_ticker -= 1 

199 

200 self.write_results(offer) 

201 

202 def single_flexibility_callback(self, inp: AgentVariable, name: str): 

203 """Callback to activate a single, predefined flexibility offer.""" 

204 offer = inp.value 

205 profile = None 

206 t_sample = self.get(glbs.TIME_STEP).value 

207 acceptance_time_lower = ( 

208 self.env.config.offset + self.config.market_specs.options.offer_acceptance_time 

209 ) 

210 acceptance_time_upper = ( 

211 self.env.config.offset 

212 + self.config.market_specs.options.offer_acceptance_time 

213 + t_sample 

214 ) 

215 if ( 

216 acceptance_time_lower <= self.env.now < acceptance_time_upper 

217 and not self.get("in_provision").value 

218 ): 

219 if self.config.market_specs.options.direction == "positive": 

220 if ( 

221 np.average(offer.pos_diff_profile) 

222 > self.config.market_specs.minimum_average_flex 

223 ): 

224 profile = offer.base_power_profile - offer.pos_diff_profile 

225 offer.status = OfferStatus.ACCEPTED_POSITIVE.value 

226 

227 elif ( 

228 np.average(offer.neg_diff_profile) 

229 > self.config.market_specs.minimum_average_flex 

230 ): 

231 profile = offer.base_power_profile + offer.neg_diff_profile 

232 offer.status = OfferStatus.ACCEPTED_NEGATIVE.value 

233 

234 if profile is not None: 

235 # reindex the profile to the mpc output time grid 

236 flex_power_feedback_method = self.config.market_specs.accepted_offer_sample_points 

237 if flex_power_feedback_method == glbs.COLLOCATION: 

238 profile = profile.reindex(self.get(glbs.COLLOCATION_TIME_GRID).value) 

239 elif flex_power_feedback_method == glbs.CONSTANT: 

240 index_to_keep = ~np.isin(profile.index, 

241 self.get(glbs.COLLOCATION_TIME_GRID).value) 

242 profile = profile.get(index_to_keep) 

243 helper_indices = [i - 1 for i in profile.index[1:]] 

244 new_index = sorted(set(profile.index.tolist() + helper_indices))[:-1] 

245 profile = profile.reindex(new_index).ffill() 

246 profile = profile.dropna() 

247 profile.index += self.env.time 

248 self.set("_P_external", profile) 

249 self.abs_flex_event_end = profile.index[-1] 

250 self.set("in_provision", True) 

251 

252 self.write_results(offer) 

253 

254 def custom_flexibility_callback(self, inp: AgentVariable, name: str): 

255 """Placeholder for a custom flexibility callback.""" 

256 pass 

257 

258 def dummy_callback(self, inp: AgentVariable, name: str): 

259 """Dummy function that is included, when market type is not specified.""" 

260 self.logger.warning("No market type provided. No market interaction.") 

261 

262 def cleanup_results(self): 

263 """Remove the results if they already exist.""" 

264 results_file = self.config.results_file 

265 if not results_file: 

266 return 

267 os.remove(results_file) 

268 

269 def process(self): 

270 while True: 

271 # End the provision at the appropriate time 

272 if self.abs_flex_event_end <= self.env.time: 

273 self.set("in_provision", False) 

274 yield self.env.timeout(self.env.config.t_sample)