Coverage for agentlib_flexquant/modules/flexibility_market.py: 57%

118 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-15 15:25 +0000

1import os 

2import numpy as np 

3import pandas as pd 

4from pathlib import Path 

5from typing import List, Optional, Union 

6from pydantic import model_validator, ConfigDict, Field 

7import agentlib 

8from agentlib.core.datamodels import AgentVariable 

9from agentlib_flexquant.data_structures.flex_offer import OfferStatus, FlexOffer 

10from agentlib_flexquant.data_structures.market import MarketSpecifications 

11 

12 

13class FlexibilityMarketModuleConfig(agentlib.BaseModuleConfig): 

14 

15 model_config = ConfigDict( 

16 extra='forbid' 

17 ) 

18 

19 inputs: List[AgentVariable] = [ 

20 AgentVariable(name="FlexibilityOffer") 

21 ] 

22 

23 outputs: List[AgentVariable] = [ 

24 AgentVariable( 

25 name="_P_external", alias="_P_external", 

26 description="External Power IO" 

27 ), 

28 AgentVariable( 

29 name="rel_start", alias="rel_start", 

30 description="relative start time of the flexibility event" 

31 ), 

32 AgentVariable( 

33 name="rel_end", alias="rel_end", 

34 description="relative end time of the flexibility event" 

35 ), 

36 AgentVariable( 

37 name="in_provision", alias="in_provision", 

38 description="Set if the system is in provision", value=False 

39 ) 

40 ] 

41 

42 market_specs: MarketSpecifications 

43 

44 results_file: Optional[Path] = Field( 

45 default=Path("flexibility_market.csv"), 

46 description="User specified results file name" 

47 ) 

48 save_results: Optional[bool] = Field( 

49 validate_default=True, 

50 default=True 

51 ) 

52 

53 shared_variable_fields: List[str] = ["outputs"] 

54 

55 @model_validator(mode="after") 

56 def check_results_file_extension(self): 

57 if self.results_file and self.results_file.suffix != ".csv": 

58 raise ValueError( 

59 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

60 f"Expected a '.csv' file." 

61 ) 

62 return self 

63 

64 

65class FlexibilityMarketModule(agentlib.BaseModule): 

66 """Class to emulate flexibility market. Receives flex offers and accepts these.""" 

67 config: FlexibilityMarketModuleConfig 

68 

69 # DataFrame for flex offer. Multiindex: (time_step, time). Columns: pos_price, neg_price, status 

70 flex_offer_df: pd.DataFrame = None 

71 # absolute end time of a flexibility event (now + relative end time of the flexibility event on the mpc horizon) 

72 abs_flex_event_end: Union[int, float] = 0 

73 

74 def set_random_seed(self, random_seed: int): 

75 """Set the random seed for reproducibility.""" 

76 self.random_generator = np.random.default_rng(seed=random_seed) 

77 

78 def get_results(self) -> Optional[pd.DataFrame]: 

79 """Open results file of flexibility_indicators.py.""" 

80 results_file = self.config.results_file 

81 try: 

82 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

83 return results 

84 except FileNotFoundError: 

85 self.logger.error("Results file %s was not found.", results_file) 

86 return None 

87 

88 def register_callbacks(self): 

89 if self.config.market_specs.type == "custom": 

90 callback_function = self.custom_flexibility_callback 

91 elif self.config.market_specs.type == "single": 

92 callback_function = self.single_flexibility_callback 

93 elif self.config.market_specs.type == "random": 

94 callback_function = self.random_flexibility_callback 

95 self.set_random_seed(self.config.market_specs.options.random_seed) 

96 else: 

97 self.logger.error("No market type defined. Available market types are single, random " 

98 "and custom. Code will proceed without market interaction.") 

99 callback_function = self.dummy_callback 

100 

101 self.agent.data_broker.register_callback( 

102 name="FlexibilityOffer", alias="FlexibilityOffer", 

103 callback=callback_function 

104 ) 

105 

106 self.flex_offer_df = None 

107 self.cooldown_ticker = 0 

108 

109 def write_results(self, offer: FlexOffer): 

110 """Save the flex offer results depending on the config.""" 

111 if self.flex_offer_df is None: 

112 self.flex_offer_df = pd.DataFrame() 

113 df = offer.as_dataframe() 

114 index_first_level = [self.env.now] * len(df.index) 

115 multi_index = pd.MultiIndex.from_tuples(zip(index_first_level, df.index)) 

116 self.flex_offer_df = pd.concat((self.flex_offer_df, df.set_index(multi_index))) 

117 indices = pd.MultiIndex.from_tuples(self.flex_offer_df.index, names=["time_step", "time"]) 

118 self.flex_offer_df.set_index(indices, inplace=True) 

119 

120 if self.config.save_results: 

121 self.flex_offer_df.to_csv(self.config.results_file) 

122 

123 def random_flexibility_callback(self, inp: AgentVariable, name: str): 

124 """When a flexibility offer is sent, this function is called. 

125 

126 The offer is accepted randomly. The factor self.offer_acceptance_rate determines the random factor for offer acceptance. 

127 self.pos_neg_rate is the random factor for the direction of the flexibility. 

128 A higher rate means that more positive offers will be accepted. 

129  

130 Constraints: 

131 cooldown: during $cooldown steps after a flexibility event no offer is accepted 

132 minimum_average_flex: min amount of flexibility to be accepted, to account for the model error 

133 

134 """ 

135 offer = inp.value 

136 # check if there is a flexibility provision and the cooldown is finished 

137 if not self.get("in_provision").value and self.cooldown_ticker == 0: 

138 if self.random_generator.random() < self.config.market_specs.options.offer_acceptance_rate: 

139 profile = None 

140 # if random value is below pos_neg_rate, positive offer is accepted. 

141 # Otherwise, negative offer 

142 if self.random_generator.random() < self.config.market_specs.options.pos_neg_rate: 

143 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: 

144 profile = offer.base_power_profile - offer.pos_diff_profile 

145 offer.status = OfferStatus.accepted_positive.value 

146 

147 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: 

148 profile = offer.base_power_profile + offer.neg_diff_profile 

149 offer.status = OfferStatus.accepted_negative.value 

150 

151 if profile is not None: 

152 profile = profile.dropna() 

153 profile.index += self.env.time 

154 self.set("_P_external", profile) 

155 self.abs_flex_event_end = profile.index[-1] 

156 self.set("in_provision", True) 

157 self.cooldown_ticker = self.config.market_specs.cooldown 

158 

159 elif self.cooldown_ticker > 0: 

160 self.cooldown_ticker -= 1 

161 

162 self.write_results(offer) 

163 

164 def single_flexibility_callback(self, inp: AgentVariable, name: str): 

165 """Callback to activate a single, predefined flexibility offer.""" 

166 offer = inp.value 

167 profile = None 

168 t_sample = offer.base_power_profile.index[1]-offer.base_power_profile.index[0] 

169 acceptance_time_lower = self.env.config.offset + self.config.market_specs.options.start_time 

170 acceptance_time_upper = self.env.config.offset + self.config.market_specs.options.start_time + t_sample 

171 if acceptance_time_lower <= self.env.now < acceptance_time_upper and not self.get("in_provision").value: 

172 if self.config.market_specs.options.direction == "positive": 

173 if np.average(offer.pos_diff_profile) > self.config.market_specs.minimum_average_flex: 

174 profile = offer.base_power_profile - offer.pos_diff_profile 

175 offer.status = OfferStatus.accepted_positive.value 

176 

177 elif np.average(offer.neg_diff_profile) > self.config.market_specs.minimum_average_flex: 

178 profile = offer.base_power_profile + offer.neg_diff_profile 

179 offer.status = OfferStatus.accepted_negative.value 

180 

181 if profile is not None: 

182 profile = profile.dropna() 

183 profile.index += self.env.time 

184 self.set("_P_external", profile) 

185 self.abs_flex_event_end = profile.index[-1] 

186 self.set("in_provision", True) 

187 

188 self.write_results(offer) 

189 

190 def custom_flexibility_callback(self, inp: AgentVariable, name: str): 

191 """Placeholder for a custom flexibility callback.""" 

192 pass 

193 

194 def dummy_callback(self, inp: AgentVariable, name: str): 

195 """Dummy function that is included, when market type is not specified.""" 

196 self.logger.warning("No market type provided. No market interaction.") 

197 

198 def cleanup_results(self): 

199 """Remove the results if they already exist.""" 

200 results_file = self.config.results_file 

201 if not results_file: 

202 return 

203 os.remove(results_file) 

204 

205 def process(self): 

206 while True: 

207 # End the provision at the appropriate time 

208 if self.abs_flex_event_end < self.env.time: 

209 self.set("in_provision", False) 

210 yield self.env.timeout(self.env.config.t_sample)