Coverage for agentlib_flexquant/modules/flexibility_indicator.py: 88%

192 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-15 15:25 +0000

1import logging 

2import os 

3import numpy as np 

4import pandas as pd 

5from pathlib import Path 

6from typing import List, Optional 

7from pydantic import BaseModel, ConfigDict, Field, model_validator 

8import agentlib 

9import agentlib_flexquant.data_structures.globals as glbs 

10from agentlib_flexquant.data_structures.flex_kpis import FlexibilityData, FlexibilityKPIs 

11from agentlib_flexquant.data_structures.flex_offer import FlexOffer 

12 

13 

14class InputsForCorrectFlexCosts(BaseModel): 

15 enable_energy_costs_correction: bool = Field( 

16 name="enable_energy_costs_correction", 

17 description="Variable determining whether to correct the costs of the flexible energy" 

18 "Define the variable for stored electrical energy in the base MPC model and config as output if the correction of costs is enabled", 

19 default=False 

20 ) 

21 

22 absolute_power_deviation_tolerance: float = Field( 

23 name="absolute_power_deviation_tolerance", 

24 default=0.1, 

25 description="Absolute tolerance in kW within which no warning is thrown" 

26 ) 

27 

28 stored_energy_variable: str = Field( 

29 name="stored_energy_variable", 

30 default="E_stored", 

31 description="Name of the variable representing the stored electrical energy in the baseline config" 

32 ) 

33 

34 

35class InputsForCalculateFlexCosts(BaseModel): 

36 use_constant_electricity_price: bool = Field( 

37 default=False, 

38 description="Use constant electricity price" 

39 ) 

40 calculate_flex_costs: bool = Field( 

41 default=True, 

42 description="Calculate the flexibility cost" 

43 ) 

44 const_electricity_price: float = Field( 

45 default=np.nan, 

46 description="constant electricity price in ct/kWh" 

47 ) 

48 

49 @model_validator(mode="after") 

50 def validate_constant_price(cls, model): 

51 if model.use_constant_electricity_price and np.isnan(model.const_electricity_price): 

52 raise Exception(f'Constant electricity price must have a valid value in float if it is to be used for calculation. ' 

53 f'Received "use_constant_electricity_price": true, "const_electricity_price": {model.const_electricity_price}. ' 

54 f'Please specify them correctly in "calculate_costs" field in flex config.') 

55 return model 

56 

57 

58# Pos and neg kpis to get the right names for plotting 

59kpis_pos = FlexibilityKPIs(direction="positive") 

60kpis_neg = FlexibilityKPIs(direction="negative") 

61 

62 

63class FlexibilityIndicatorModuleConfig(agentlib.BaseModuleConfig): 

64 

65 model_config = ConfigDict( 

66 extra='forbid' 

67 ) 

68 

69 inputs: List[agentlib.AgentVariable] = [ 

70 agentlib.AgentVariable(name=glbs.POWER_ALIAS_BASE, unit="W", type="pd.Series", 

71 description="The power input to the system"), 

72 agentlib.AgentVariable(name=glbs.POWER_ALIAS_NEG, unit="W", type="pd.Series", 

73 description="The power input to the system"), 

74 agentlib.AgentVariable(name=glbs.POWER_ALIAS_POS, unit="W", type="pd.Series", 

75 description="The power input to the system"), 

76 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_BASE, unit="kWh", type="pd.Series", 

77 description="Energy stored in the system w.r.t. 0K"), 

78 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_NEG, unit="kWh", type="pd.Series", 

79 description="Energy stored in the system w.r.t. 0K"), 

80 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_POS, unit="kWh", type="pd.Series", 

81 description="Energy stored in the system w.r.t. 0K") 

82 ] 

83 

84 outputs: List[agentlib.AgentVariable] = [ 

85 # Flexibility offer 

86 agentlib.AgentVariable(name=glbs.FlexibilityOffer, type="FlexOffer"), 

87 

88 # Power KPIs 

89 agentlib.AgentVariable( 

90 name=kpis_neg.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series", 

91 description="Negative power flexibility" 

92 ), 

93 agentlib.AgentVariable( 

94 name=kpis_pos.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series", 

95 description="Positive power flexibility" 

96 ), 

97 agentlib.AgentVariable( 

98 name=kpis_neg.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series", 

99 description="Negative power flexibility" 

100 ), 

101 agentlib.AgentVariable( 

102 name=kpis_pos.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series", 

103 description="Positive power flexibility" 

104 ), 

105 agentlib.AgentVariable( 

106 name=kpis_neg.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float", 

107 description="Minimum of negative power flexibility" 

108 ), 

109 agentlib.AgentVariable( 

110 name=kpis_pos.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float", 

111 description="Minimum of positive power flexibility" 

112 ), 

113 agentlib.AgentVariable( 

114 name=kpis_neg.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float", 

115 description="Maximum of negative power flexibility" 

116 ), 

117 agentlib.AgentVariable( 

118 name=kpis_pos.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float", 

119 description="Maximum of positive power flexibility" 

120 ), 

121 agentlib.AgentVariable( 

122 name=kpis_neg.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float", 

123 description="Average of negative power flexibility" 

124 ), 

125 agentlib.AgentVariable( 

126 name=kpis_pos.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float", 

127 description="Average of positive power flexibility" 

128 ), 

129 agentlib.AgentVariable( 

130 name=kpis_neg.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool", 

131 description="Variable indicating whether the baseline power and flex power align at the horizon end" 

132 ), 

133 agentlib.AgentVariable( 

134 name=kpis_pos.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool", 

135 description="Variable indicating whether the baseline power and flex power align at the horizon end" 

136 ), 

137 

138 # Energy KPIs 

139 agentlib.AgentVariable( 

140 name=kpis_neg.energy_flex.get_kpi_identifier(), unit='kWh', type="float", 

141 description="Negative energy flexibility" 

142 ), 

143 agentlib.AgentVariable( 

144 name=kpis_pos.energy_flex.get_kpi_identifier(), unit='kWh', type="float", 

145 description="Positive energy flexibility" 

146 ), 

147 

148 # Costs KPIs 

149 agentlib.AgentVariable( 

150 name=kpis_neg.costs.get_kpi_identifier(), unit="ct", type="float", 

151 description="Saved costs due to baseline" 

152 ), 

153 agentlib.AgentVariable( 

154 name=kpis_pos.costs.get_kpi_identifier(), unit="ct", type="float", 

155 description="Saved costs due to baseline" 

156 ), 

157 agentlib.AgentVariable( 

158 name=kpis_neg.corrected_costs.get_kpi_identifier(), unit="ct", type="float", 

159 description="Corrected saved costs due to baseline" 

160 ), 

161 agentlib.AgentVariable( 

162 name=kpis_pos.corrected_costs.get_kpi_identifier(), unit="ct", type="float", 

163 description="Corrected saved costs due to baseline" 

164 ), 

165 agentlib.AgentVariable( 

166 name=kpis_neg.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

167 description="Saved costs due to baseline" 

168 ), 

169 agentlib.AgentVariable( 

170 name=kpis_pos.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

171 description="Saved costs due to baseline" 

172 ), 

173 agentlib.AgentVariable( 

174 name=kpis_neg.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

175 description="Corrected saved costs per energy due to baseline" 

176 ), 

177 agentlib.AgentVariable( 

178 name=kpis_pos.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

179 description="Corrected saved costs per energy due to baseline" 

180 ) 

181 ] 

182 

183 parameters: List[agentlib.AgentVariable] = [ 

184 agentlib.AgentVariable(name=glbs.PREP_TIME, unit="s", 

185 description="Preparation time"), 

186 agentlib.AgentVariable(name=glbs.MARKET_TIME, unit="s", 

187 description="Market time"), 

188 agentlib.AgentVariable(name=glbs.FLEX_EVENT_DURATION, unit="s", 

189 description="time to switch objective"), 

190 agentlib.AgentVariable(name=glbs.TIME_STEP, unit="s", 

191 description="timestep of the mpc solution"), 

192 agentlib.AgentVariable(name=glbs.PREDICTION_HORIZON, unit="-", 

193 description="prediction horizon of the mpc solution") 

194 ] 

195 

196 results_file: Optional[Path] = Field( 

197 default=Path("flexibility_indicator.csv"), 

198 description="User specified results file name" 

199 ) 

200 save_results: Optional[bool] = Field( 

201 validate_default=True, 

202 default=True 

203 ) 

204 price_variable: str = Field( 

205 default="c_pel", 

206 description="Name of the price variable sent by a predictor", 

207 ) 

208 power_unit: str = Field( 

209 default="kW", 

210 description="Unit of the power variable" 

211 ) 

212 shared_variable_fields: List[str] = ["outputs"] 

213 

214 correct_costs: InputsForCorrectFlexCosts = InputsForCorrectFlexCosts() 

215 calculate_costs: InputsForCalculateFlexCosts = InputsForCalculateFlexCosts() 

216 

217 @model_validator(mode="after") 

218 def check_results_file_extension(self): 

219 if self.results_file and self.results_file.suffix != ".csv": 

220 raise ValueError( 

221 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

222 f"Expected a '.csv' file." 

223 ) 

224 return self 

225 

226 

227class FlexibilityIndicatorModule(agentlib.BaseModule): 

228 config: FlexibilityIndicatorModuleConfig 

229 data: FlexibilityData 

230 

231 def __init__(self, *args, **kwargs): 

232 super().__init__(*args, **kwargs) 

233 self.var_list = [] 

234 for variable in self.variables: 

235 if variable.name in [glbs.FlexibilityOffer]: 

236 continue 

237 self.var_list.append(variable.name) 

238 self.time = [] 

239 self.in_provision = False 

240 self.offer_count = 0 

241 self.data = FlexibilityData( 

242 prep_time=self.get(glbs.PREP_TIME).value, 

243 market_time=self.get(glbs.MARKET_TIME).value, 

244 flex_event_duration=self.get(glbs.FLEX_EVENT_DURATION).value, 

245 time_step=self.get(glbs.TIME_STEP).value, 

246 prediction_horizon=self.get(glbs.PREDICTION_HORIZON).value 

247 ) 

248 self.df = pd.DataFrame(columns=pd.Series(self.var_list)) 

249 

250 def register_callbacks(self): 

251 inputs = self.config.inputs 

252 for var in inputs: 

253 self.agent.data_broker.register_callback( 

254 name=var.name, alias=var.name, callback=self.callback 

255 ) 

256 self.agent.data_broker.register_callback( 

257 name="in_provision", alias="in_provision", callback=self.callback 

258 ) 

259 

260 def process(self): 

261 yield self.env.event() 

262 

263 def callback(self, inp, name): 

264 if name == "in_provision": 

265 self.in_provision = inp.value 

266 if self.in_provision: 

267 self._set_inputs_to_none() 

268 

269 if not self.in_provision: 

270 if name == glbs.POWER_ALIAS_BASE: 

271 self.data.power_profile_base = self.data.format_mpc_inputs(inp.value) 

272 elif name == glbs.POWER_ALIAS_NEG: 

273 self.data.power_profile_flex_neg = self.data.format_mpc_inputs(inp.value) 

274 elif name == glbs.POWER_ALIAS_POS: 

275 self.data.power_profile_flex_pos = self.data.format_mpc_inputs(inp.value) 

276 elif name == glbs.STORED_ENERGY_ALIAS_BASE: 

277 self.data.stored_energy_profile_base = self.data.format_mpc_inputs(inp.value) 

278 elif name == glbs.STORED_ENERGY_ALIAS_NEG: 

279 self.data.stored_energy_profile_flex_neg = self.data.format_mpc_inputs(inp.value) 

280 elif name == glbs.STORED_ENERGY_ALIAS_POS: 

281 self.data.stored_energy_profile_flex_pos = self.data.format_mpc_inputs(inp.value) 

282 elif name == self.config.price_variable: 

283 if not self.config.calculate_costs.use_constant_electricity_price: 

284 # price comes from predictor, so no stripping needed 

285 self.data.electricity_price_series = self.data.format_predictor_inputs(inp.value) 

286 

287 # set the constant electricity price series if given 

288 if self.config.calculate_costs.use_constant_electricity_price and self.data.electricity_price_series is None: 

289 # get the index for the electricity price series 

290 n = self.get(glbs.PREDICTION_HORIZON).value 

291 ts = self.get(glbs.TIME_STEP).value 

292 grid = np.arange(0, n * ts, ts) 

293 # fill the electricity_price_series with values 

294 electricity_price_series = pd.Series([self.config.calculate_costs.const_electricity_price for i in grid], index=grid) 

295 self.data.electricity_price_series = self.data.format_predictor_inputs(electricity_price_series) 

296 

297 necessary_input_for_calc_flex = [self.data.power_profile_base, 

298 self.data.power_profile_flex_neg, 

299 self.data.power_profile_flex_pos] 

300 

301 if self.config.calculate_costs.calculate_flex_costs: 

302 necessary_input_for_calc_flex.append(self.data.electricity_price_series) 

303 

304 if self.config.correct_costs.enable_energy_costs_correction: 

305 necessary_input_for_calc_flex.extend( 

306 [self.data.stored_energy_profile_base, 

307 self.data.stored_energy_profile_flex_neg, 

308 self.data.stored_energy_profile_flex_pos]) 

309 

310 if all(var is not None for var in necessary_input_for_calc_flex): 

311 

312 # check the power profile end deviation 

313 if not self.config.correct_costs.enable_energy_costs_correction: 

314 self.check_power_end_deviation(tol=self.config.correct_costs.absolute_power_deviation_tolerance) 

315 

316 # Calculate the flexibility, send the offer, write and save the results 

317 self.calc_and_send_offer() 

318 

319 # set the values to None to reset the callback 

320 self._set_inputs_to_none() 

321 

322 def get_results(self) -> Optional[pd.DataFrame]: 

323 """Open results file of flexibility_indicator.py.""" 

324 results_file = self.config.results_file 

325 try: 

326 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

327 return results 

328 except FileNotFoundError: 

329 self.logger.error("Results file %s was not found.", results_file) 

330 return None 

331 

332 def write_results(self, df: pd.DataFrame, ts: float, n: int) -> pd.DataFrame: 

333 """Write every data of variables in self.var_list in an DataFrame. 

334 

335 DataFrame will be updated every time step 

336 

337 Args: 

338 df: DataFrame which is initialised as an empty DataFrame with columns according to self.var_list 

339 ts: time step 

340 n: number of time steps during prediction horizon 

341 

342 Returns: 

343 DataFrame with results of every variable in self.var_list 

344 

345 """ 

346 results = [] 

347 now = self.env.now 

348 for name in self.var_list: 

349 # Use the power variables averaged for each timestep, not the collocation values 

350 if name == glbs.POWER_ALIAS_BASE: 

351 values = self.data.power_profile_base 

352 elif name == glbs.POWER_ALIAS_NEG: 

353 values = self.data.power_profile_flex_neg 

354 elif name == glbs.POWER_ALIAS_POS: 

355 values = self.data.power_profile_flex_pos 

356 elif name == glbs.STORED_ENERGY_ALIAS_BASE: 

357 values = self.data.stored_energy_profile_base 

358 elif name == glbs.STORED_ENERGY_ALIAS_NEG: 

359 values = self.data.stored_energy_profile_flex_neg 

360 elif name == glbs.STORED_ENERGY_ALIAS_POS: 

361 values = self.data.stored_energy_profile_flex_pos 

362 elif name == self.config.price_variable: 

363 values = self.data.electricity_price_series 

364 else: 

365 values = self.get(name).value 

366 

367 if isinstance(values, pd.Series): 

368 traj = values.reindex(np.arange(0, n * ts, ts)) 

369 else: 

370 traj = pd.Series(values).reindex(np.arange(0, n * ts, ts)) 

371 results.append(traj) 

372 

373 if not now % ts: 

374 self.time.append(now) 

375 new_df = pd.DataFrame(results).T 

376 new_df.columns = self.var_list 

377 # Rename time_step variable column 

378 new_df.rename(columns={glbs.TIME_STEP: f"{glbs.TIME_STEP}_mpc"}, inplace=True) 

379 new_df.index.direction = "time" 

380 new_df[glbs.TIME_STEP] = now 

381 new_df.set_index([glbs.TIME_STEP, new_df.index], inplace=True) 

382 df = pd.concat([df, new_df]) 

383 # set the indices once again as concat cant handle indices properly 

384 indices = pd.MultiIndex.from_tuples(df.index, names=[glbs.TIME_STEP, "time"]) 

385 df.set_index(indices, inplace=True) 

386 # Drop column time_step and keep it as an index only 

387 if glbs.TIME_STEP in df.columns: 

388 df.drop(columns=[glbs.TIME_STEP], inplace=True) 

389 

390 return df 

391 

392 def cleanup_results(self): 

393 """Remove the existing result files.""" 

394 results_file = self.config.results_file 

395 if not results_file: 

396 return 

397 os.remove(results_file) 

398 

399 def calc_and_send_offer(self): 

400 """Calculate the flexibility KPIs for current predictions, send the flex offer and set the outputs, write and save the results.""" 

401 # Calculate the flexibility KPIs for current predictions 

402 self.data.calculate(enable_energy_costs_correction=self.config.correct_costs.enable_energy_costs_correction, calculate_flex_cost=self.config.calculate_costs.calculate_flex_costs) 

403 

404 # Send flex offer 

405 self.send_flex_offer( 

406 name=glbs.FlexibilityOffer, 

407 base_power_profile=self.data.power_profile_base, 

408 pos_diff_profile=self.data.kpis_pos.power_flex_offer.value, 

409 pos_price=self.data.kpis_pos.costs.value, 

410 neg_diff_profile=self.data.kpis_neg.power_flex_offer.value, 

411 neg_price=self.data.kpis_neg.costs.value, 

412 ) 

413 

414 # set outputs 

415 for kpi in self.data.get_kpis().values(): 

416 if kpi.get_kpi_identifier() not in [kpis_pos.power_flex_within_boundary.get_kpi_identifier(), kpis_neg.power_flex_within_boundary.get_kpi_identifier()]: 

417 for output in self.config.outputs: 

418 if output.name == kpi.get_kpi_identifier(): 

419 self.set(output.name, kpi.value) 

420 

421 # write results 

422 self.df = self.write_results( 

423 df=self.df, 

424 ts=self.get(glbs.TIME_STEP).value, 

425 n=self.get(glbs.PREDICTION_HORIZON).value 

426 ) 

427 

428 # save results 

429 if self.config.save_results: 

430 self.df.to_csv(self.config.results_file) 

431 

432 def send_flex_offer( 

433 self, name: str, 

434 base_power_profile: pd.Series, 

435 pos_diff_profile: pd.Series, pos_price: float, 

436 neg_diff_profile: pd.Series, neg_price: float, 

437 timestamp: float = None 

438 ): 

439 """Send a flex offer as an agent Variable. 

440 

441 The first offer is dismissed, since the different MPCs need one time step to fully initialize. 

442 

443 Args: 

444 name: name of the agent variable 

445 base_power_profile: time series of power from baseline mpc 

446 pos_diff_profile: power profile for the positive difference (base-pos) in flexibility event time grid 

447 pos_price: price for positive flexibility 

448 neg_diff_profile: power profile for the negative difference (neg-base) in flexibility event time grid 

449 neg_price: price for negative flexibility 

450 timestamp: the time offer was generated 

451 

452 """ 

453 if self.offer_count > 0: 

454 var = self._variables_dict[name] 

455 var.value = FlexOffer( 

456 base_power_profile=base_power_profile, 

457 pos_diff_profile=pos_diff_profile, pos_price=pos_price, 

458 neg_diff_profile=neg_diff_profile, neg_price=neg_price, 

459 ) 

460 if timestamp is None: 

461 timestamp = self.env.time 

462 var.timestamp = timestamp 

463 self.agent.data_broker.send_variable( 

464 variable=var.copy(update={"source": self.source}), 

465 copy=False, 

466 ) 

467 self.offer_count += 1 

468 

469 def _set_inputs_to_none(self): 

470 self.data.power_profile_base = None 

471 self.data.power_profile_flex_neg = None 

472 self.data.power_profile_flex_pos = None 

473 self.data.electricity_price_series = None 

474 self.data.stored_energy_profile_base = None 

475 self.data.stored_energy_profile_flex_neg = None 

476 self.data.stored_energy_profile_flex_pos = None 

477 

478 def check_power_end_deviation(self, tol: float): 

479 """Calculate the deviation of the final value of the power profiles and warn the user if it exceeds the tolerance.""" 

480 logger = logging.getLogger(__name__) 

481 dev_pos = np.mean(self.data.power_profile_flex_pos.values[-4:] - self.data.power_profile_base.values[-4:]) 

482 dev_neg = np.mean(self.data.power_profile_flex_neg.values[-4:] - self.data.power_profile_base.values[-4:]) 

483 if abs(dev_pos) > tol: 

484 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of " 

485 f"power profiles of positive shadow MPC and the baseline. " 

486 f"Correction of energy costs might be necessary.") 

487 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), False) 

488 else: 

489 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), True) 

490 if abs(dev_neg) > tol: 

491 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of " 

492 f"power profiles of negative shadow MPC and the baseline. " 

493 f"Correction of energy costs might be necessary.") 

494 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), False) 

495 else: 

496 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), True)