Coverage for agentlib_flexquant/modules/flexibility_market.py: 57%

119 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-01 15:10 +0000

1import os 

2from pathlib import Path 

3from typing import List, Optional, Union 

4 

5import agentlib 

6import numpy as np 

7import pandas as pd 

8import pydantic 

9from agentlib.core.errors import ConfigurationError 

10from pydantic import model_validator 

11 

12from agentlib_flexquant.data_structures.flex_offer import OfferStatus 

13from agentlib_flexquant.data_structures.market import ( 

14 MarketSpecifications 

15) 

16 

17 

18class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig): 

19 # parameters: List[agentlib.AgentVariable] = [ 

20 # ] 

21 model_config = pydantic.ConfigDict( 

22 extra='forbid' 

23 ) 

24 inputs: List[agentlib.AgentVariable] = [ 

25 agentlib.AgentVariable(name="FlexibilityOffer") 

26 ] 

27 outputs: List[agentlib.AgentVariable] = [ 

28 agentlib.AgentVariable( 

29 name="_P_external", alias="_P_external", 

30 description="External Power IO" 

31 ), 

32 agentlib.AgentVariable( 

33 name="rel_start", alias="rel_start", 

34 description="relative start time of the flexibility event" 

35 ), 

36 agentlib.AgentVariable( 

37 name="rel_end", alias="rel_end", 

38 description="relative end time of the flexibility event" 

39 ), 

40 agentlib.AgentVariable( 

41 name="in_provision", alias="in_provision", 

42 description="Set if the system is in provision", value=False 

43 ) 

44 ] 

45 

46 market_specs: MarketSpecifications 

47 

48 results_file: Optional[Path] = pydantic.Field( 

49 default=Path("flexibility_market.csv"), 

50 description="User specified results file name" 

51 ) 

52 save_results: Optional[bool] = pydantic.Field( 

53 validate_default=True, 

54 default=True 

55 ) 

56 

57 shared_variable_fields: List[str] = ["outputs"] 

58 

59 @model_validator(mode="after") 

60 def check_results_file_extension(self): 

61 if self.results_file and self.results_file.suffix != ".csv": 

62 raise ValueError( 

63 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

64 f"Expected a '.csv' file." 

65 ) 

66 return self 

67 

68 

69class FlexibilityMarketModule(agentlib.BaseModule): 

70 """Class to emulate flexibility market. Receives flex offers and accepts these. 

71 

72 """ 

73 config: FlexibilityMarketModuleConfig 

74 

75 

76 df: pd.DataFrame = None 

77 end: Union[int, float] = 0 

78 

79 def set_random_seed(self, random_seed): 

80 """set the random seed for reproducability""" 

81 self.random_generator = np.random.default_rng(seed=random_seed) 

82 

83 def get_results(self) -> Optional[pd.DataFrame]: 

84 """ 

85 Opens results file of flexibilityindicators.py 

86 results_file defined in __init__ 

87 """ 

88 results_file = self.config.results_file 

89 try: 

90 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

91 return results 

92 except FileNotFoundError: 

93 self.logger.error("Results file %s was not found.", results_file) 

94 return None 

95 

96 def register_callbacks(self): 

97 if self.config.market_specs.type == "custom": 

98 callback_function = self.custom_flexibility_callback 

99 elif self.config.market_specs.type == "single": 

100 callback_function = self.single_flexibility_callback 

101 elif self.config.market_specs.type == "random": 

102 callback_function = self.random_flexibility_callback 

103 self.set_random_seed(self.config.market_specs.options.random_seed) 

104 else: 

105 self.logger.error("No market type defined. Available market types are single, random " 

106 "and custom. Code will proceed without market interaction.") 

107 callback_function = self.dummy_callback 

108 

109 self.agent.data_broker.register_callback( 

110 name="FlexibilityOffer", alias="FlexibilityOffer", 

111 callback=callback_function 

112 ) 

113 

114 self.df = None 

115 self.cooldown_ticker = 0 

116 

117 def write_results(self, offer): 

118 if self.df is None: 

119 self.df = pd.DataFrame() 

120 df = offer.as_dataframe() 

121 index_first_level = [self.env.now] * len(df.index) 

122 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index)) 

123 self.df = pd.concat((self.df, df.set_index(multi_index))) 

124 indices = pd.MultiIndex.from_tuples(self.df.index, names=["time_step", "time"]) 

125 self.df.set_index(indices, inplace=True) 

126 

127 if self.config.save_results: 

128 self.df.to_csv(self.config.results_file) 

129 

130 def random_flexibility_callback(self, inp, name): 

131 """ 

132 When a flexibility offer is sent this function is called.  

133  

134 The offer is accepted randomly. The factor self.offer_acceptance_rate determines the 

135 random factor for offer acceptance. self.pos_neg_rate is the random factor for 

136 the direction of the flexibility. A higher rate means that more positive offers will be accepted. 

137  

138 Constraints: 

139 cooldown: during $cooldown steps after a flexibility event no offer is accepted 

140 minimum_average_flex: min amount of flexibility to be accepted, to account for the model error 

141 """ 

142 

143 offer = inp.value 

144 # check if there is a flexibility provision and the cooldown is finished 

145 if not self.get("in_provision").value and self.cooldown_ticker == 0: 

146 if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate: 

147 profile = None 

148 # if random value is below pos_neg_rate, positive offer is accepted. 

149 # Otherwise, negative offer 

150 if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate: 

151 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: 

152 profile = offer.base_power_profile - offer.pos_diff_profile 

153 offer.status = OfferStatus.accepted_positive.value 

154 

155 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: 

156 profile = offer.base_power_profile + offer.neg_diff_profile 

157 offer.status = OfferStatus.accepted_negative.value 

158 

159 if profile is not None: 

160 profile = profile.dropna() 

161 profile.index += self.env.time 

162 self.set("_P_external", profile) 

163 self.end = profile.index[-1] 

164 self.set("in_provision", True) 

165 self.cooldown_ticker = self.config.market_specs.cooldown 

166 

167 elif self.cooldown_ticker > 0: 

168 self.cooldown_ticker -= 1 

169 

170 self.write_results(offer) 

171 

172 def single_flexibility_callback(self, inp, name): 

173 """Callback to activate a single, predefined flexibility offer. 

174 

175 """ 

176 offer = inp.value 

177 profile = None 

178 t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0] 

179 acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time 

180 acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample 

181 if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value: 

182 if self.config.market_specs.options.direction == "positive": 

183 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: 

184 profile = offer.base_power_profile - offer.pos_diff_profile 

185 offer.status = OfferStatus.accepted_positive.value 

186 

187 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: 

188 profile = offer.base_power_profile + offer.neg_diff_profile 

189 offer.status = OfferStatus.accepted_negative.value 

190 

191 if profile is not None: 

192 profile = profile.dropna() 

193 profile.index += self.env.time 

194 self.set("_P_external", profile) 

195 self.end = profile.index[-1] 

196 self.set("in_provision", True) 

197 

198 self.write_results(offer) 

199 

200 def custom_flexibility_callback(self, inp, name): 

201 """Placeholder for a custom flexibility callback""" 

202 pass 

203 

204 def dummy_callback(self, inp, name): 

205 """Dummy function, that is included, when market type is not specified""" 

206 self.logger.warning("No market type provided. No market interaction.") 

207 

208 def cleanup_results(self): 

209 results_file = self.config.results_file 

210 if not results_file: 

211 return 

212 os.remove(results_file) 

213 

214 def process(self): 

215 while True: 

216 # End the provision at the appropriate time 

217 if self.end < self.env.time: 

218 self.set("in_provision", False) 

219 yield self.env.timeout(self.env.config.t_sample)