Coverage for agentlib_flexquant/modules/flexibility_indicator.py: 88%

193 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-01 15:10 +0000

1import logging 

2import os 

3from pathlib import Path 

4from typing import List, Optional 

5 

6import agentlib 

7import numpy as np 

8import pandas as pd 

9from agentlib.core.errors import ConfigurationError 

10from pydantic import BaseModel, ConfigDict, Field, model_validator 

11 

12import agentlib_flexquant.data_structures.globals as glbs 

13from agentlib_flexquant.data_structures.flex_kpis import ( 

14 FlexibilityData, 

15 FlexibilityKPIs 

16) 

17from agentlib_flexquant.data_structures.flex_offer import FlexOffer 

18 

19 

20class InputsForCorrectFlexCosts(BaseModel): 

21 enable_energy_costs_correction: bool = Field( 

22 name="enable_energy_costs_correction", 

23 description="Variable determining whether to correct the costs of the flexible energy" 

24 "Define the variable for stored electrical energy in the base MPC model and config as output if the correction of costs is enabled", 

25 default=False 

26 ) 

27 

28 absolute_power_deviation_tolerance: float = Field( 

29 name="absolute_power_deviation_tolerance", 

30 default=0.1, 

31 description="Absolute tolerance in kW within which no warning is thrown" 

32 ) 

33 

34 stored_energy_variable: str = Field( 

35 name="stored_energy_variable", 

36 default="E_stored", 

37 description="Name of the variable representing the stored electrical energy in the baseline config" 

38 ) 

39 

40class InputsForCalculateFlexCosts(BaseModel): 

41 use_constant_electricity_price: bool = Field( 

42 default=False, 

43 description="Use constant electricity price" 

44 ) 

45 calculate_flex_costs: bool = Field( 

46 default=True, 

47 description="Calculate the flexibility cost" 

48 ) 

49 const_electricity_price: float = Field( 

50 default=np.nan, 

51 description="constant electricity price in ct/kWh" 

52 ) 

53 

54 @model_validator(mode="after") 

55 def validate_constant_price(cls, model): 

56 if model.use_constant_electricity_price and np.isnan(model.const_electricity_price): 

57 raise Exception(f'Constant electricity price must have a valid value in float if it is to be used for calculation. ' 

58 f'Received "use_constant_electricity_price": true, "const_electricity_price": {model.const_electricity_price}. ' 

59 f'Please specify them correctly in "calculate_costs" field in flex config.') 

60 return model 

61 

62 

63# Pos and neg kpis to get the right names for plotting 

64kpis_pos = FlexibilityKPIs(direction="positive") 

65kpis_neg = FlexibilityKPIs(direction="negative") 

66 

67 

68class FlexibilityIndicatorModuleConfig(agentlib.BaseModuleConfig): 

69 

70 model_config = ConfigDict( 

71 extra='forbid' 

72 ) 

73 

74 inputs: List[agentlib.AgentVariable] = [ 

75 agentlib.AgentVariable(name=glbs.POWER_ALIAS_BASE, unit="W", type="pd.Series", 

76 description="The power input to the system"), 

77 agentlib.AgentVariable(name=glbs.POWER_ALIAS_NEG, unit="W", type="pd.Series", 

78 description="The power input to the system"), 

79 agentlib.AgentVariable(name=glbs.POWER_ALIAS_POS, unit="W", type="pd.Series", 

80 description="The power input to the system"), 

81 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_BASE, unit="kWh", type="pd.Series", 

82 description="Energy stored in the system w.r.t. 0K"), 

83 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_NEG, unit="kWh", type="pd.Series", 

84 description="Energy stored in the system w.r.t. 0K"), 

85 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_POS, unit="kWh", type="pd.Series", 

86 description="Energy stored in the system w.r.t. 0K") 

87 ] 

88 

89 outputs: List[agentlib.AgentVariable] = [ 

90 # Flexibility offer 

91 agentlib.AgentVariable(name=glbs.FlexibilityOffer, type="FlexOffer"), 

92 

93 # Power KPIs 

94 agentlib.AgentVariable( 

95 name=kpis_neg.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series", 

96 description="Negative power flexibility" 

97 ), 

98 agentlib.AgentVariable( 

99 name=kpis_pos.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series", 

100 description="Positive power flexibility" 

101 ), 

102 agentlib.AgentVariable( 

103 name=kpis_neg.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series", 

104 description="Negative power flexibility" 

105 ), 

106 agentlib.AgentVariable( 

107 name=kpis_pos.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series", 

108 description="Positive power flexibility" 

109 ), 

110 agentlib.AgentVariable( 

111 name=kpis_neg.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float", 

112 description="Minimum of negative power flexibility" 

113 ), 

114 agentlib.AgentVariable( 

115 name=kpis_pos.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float", 

116 description="Minimum of positive power flexibility" 

117 ), 

118 agentlib.AgentVariable( 

119 name=kpis_neg.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float", 

120 description="Maximum of negative power flexibility" 

121 ), 

122 agentlib.AgentVariable( 

123 name=kpis_pos.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float", 

124 description="Maximum of positive power flexibility" 

125 ), 

126 agentlib.AgentVariable( 

127 name=kpis_neg.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float", 

128 description="Average of negative power flexibility" 

129 ), 

130 agentlib.AgentVariable( 

131 name=kpis_pos.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float", 

132 description="Average of positive power flexibility" 

133 ), 

134 agentlib.AgentVariable( 

135 name=kpis_neg.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool", 

136 description="Variable indicating whether the baseline power and flex power align at the horizon end" 

137 ), 

138 agentlib.AgentVariable( 

139 name=kpis_pos.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool", 

140 description="Variable indicating whether the baseline power and flex power align at the horizon end" 

141 ), 

142 

143 # Energy KPIs 

144 agentlib.AgentVariable( 

145 name=kpis_neg.energy_flex.get_kpi_identifier(), unit='kWh', type="float", 

146 description="Negative energy flexibility" 

147 ), 

148 agentlib.AgentVariable( 

149 name=kpis_pos.energy_flex.get_kpi_identifier(), unit='kWh', type="float", 

150 description="Positive energy flexibility" 

151 ), 

152 

153 # Costs KPIs 

154 agentlib.AgentVariable( 

155 name=kpis_neg.costs.get_kpi_identifier(), unit="ct", type="float", 

156 description="Saved costs due to baseline" 

157 ), 

158 agentlib.AgentVariable( 

159 name=kpis_pos.costs.get_kpi_identifier(), unit="ct", type="float", 

160 description="Saved costs due to baseline" 

161 ), 

162 agentlib.AgentVariable( 

163 name=kpis_neg.corrected_costs.get_kpi_identifier(), unit="ct", type="float", 

164 description="Corrected saved costs due to baseline" 

165 ), 

166 agentlib.AgentVariable( 

167 name=kpis_pos.corrected_costs.get_kpi_identifier(), unit="ct", type="float", 

168 description="Corrected saved costs due to baseline" 

169 ), 

170 agentlib.AgentVariable( 

171 name=kpis_neg.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

172 description="Saved costs due to baseline" 

173 ), 

174 agentlib.AgentVariable( 

175 name=kpis_pos.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

176 description="Saved costs due to baseline" 

177 ), 

178 agentlib.AgentVariable( 

179 name=kpis_neg.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

180 description="Corrected saved costs per energy due to baseline" 

181 ), 

182 agentlib.AgentVariable( 

183 name=kpis_pos.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float", 

184 description="Corrected saved costs per energy due to baseline" 

185 ) 

186 ] 

187 

188 parameters: List[agentlib.AgentVariable] = [ 

189 agentlib.AgentVariable(name=glbs.PREP_TIME, unit="s", 

190 description="Preparation time"), 

191 agentlib.AgentVariable(name=glbs.MARKET_TIME, unit="s", 

192 description="Market time"), 

193 agentlib.AgentVariable(name=glbs.FLEX_EVENT_DURATION, unit="s", 

194 description="time to switch objective"), 

195 agentlib.AgentVariable(name=glbs.TIME_STEP, unit="s", 

196 description="timestep of the mpc solution"), 

197 agentlib.AgentVariable(name=glbs.PREDICTION_HORIZON, unit="-", 

198 description="prediction horizon of the mpc solution") 

199 ] 

200 

201 results_file: Optional[Path] = Field( 

202 default=Path("flexibility_indicator.csv"), 

203 description="User specified results file name" 

204 ) 

205 save_results: Optional[bool] = Field( 

206 validate_default=True, 

207 default=True 

208 ) 

209 price_variable: str = Field( 

210 default="c_pel", 

211 description="Name of the price variable sent by a predictor", 

212 ) 

213 power_unit: str = Field( 

214 default="kW", 

215 description="Unit of the power variable" 

216 ) 

217 shared_variable_fields: List[str] = ["outputs"] 

218 

219 correct_costs: InputsForCorrectFlexCosts = InputsForCorrectFlexCosts() 

220 calculate_costs: InputsForCalculateFlexCosts = InputsForCalculateFlexCosts() 

221 

222 @model_validator(mode="after") 

223 def check_results_file_extension(self): 

224 if self.results_file and self.results_file.suffix != ".csv": 

225 raise ValueError( 

226 f"Invalid file extension for 'results_file': '{self.results_file}'. " 

227 f"Expected a '.csv' file." 

228 ) 

229 return self 

230 

231class FlexibilityIndicatorModule(agentlib.BaseModule): 

232 config: FlexibilityIndicatorModuleConfig 

233 

234 data: FlexibilityData 

235 

236 def __init__(self, *args, **kwargs): 

237 super().__init__(*args, **kwargs) 

238 self.var_list = [] 

239 for variable in self.variables: 

240 if variable.name in [glbs.FlexibilityOffer]: 

241 continue 

242 self.var_list.append(variable.name) 

243 self.time = [] 

244 self.in_provision = False 

245 self.offer_count = 0 

246 self.data = FlexibilityData( 

247 prep_time=self.get(glbs.PREP_TIME).value, 

248 market_time=self.get(glbs.MARKET_TIME).value, 

249 flex_event_duration=self.get(glbs.FLEX_EVENT_DURATION).value, 

250 time_step=self.get(glbs.TIME_STEP).value, 

251 prediction_horizon=self.get(glbs.PREDICTION_HORIZON).value 

252 ) 

253 self.df = pd.DataFrame(columns=pd.Series(self.var_list)) 

254 

255 def register_callbacks(self): 

256 inputs = self.config.inputs 

257 for var in inputs: 

258 self.agent.data_broker.register_callback( 

259 name=var.name, alias=var.name, callback=self.callback 

260 ) 

261 self.agent.data_broker.register_callback( 

262 name="in_provision", alias="in_provision", callback=self.callback 

263 ) 

264 

265 def process(self): 

266 yield self.env.event() 

267 

268 def callback(self, inp, name): 

269 if name == "in_provision": 

270 self.in_provision = inp.value 

271 if self.in_provision: 

272 self._set_inputs_to_none() 

273 

274 if not self.in_provision: 

275 if name == glbs.POWER_ALIAS_BASE: 

276 self.data.power_profile_base = self.data.format_mpc_inputs(inp.value) 

277 elif name == glbs.POWER_ALIAS_NEG: 

278 self.data.power_profile_flex_neg = self.data.format_mpc_inputs(inp.value) 

279 elif name == glbs.POWER_ALIAS_POS: 

280 self.data.power_profile_flex_pos = self.data.format_mpc_inputs(inp.value) 

281 elif name == glbs.STORED_ENERGY_ALIAS_BASE: 

282 self.data.stored_energy_profile_base = self.data.format_mpc_inputs(inp.value) 

283 elif name == glbs.STORED_ENERGY_ALIAS_NEG: 

284 self.data.stored_energy_profile_flex_neg = self.data.format_mpc_inputs(inp.value) 

285 elif name == glbs.STORED_ENERGY_ALIAS_POS: 

286 self.data.stored_energy_profile_flex_pos = self.data.format_mpc_inputs(inp.value) 

287 elif name == self.config.price_variable: 

288 if not self.config.calculate_costs.use_constant_electricity_price: 

289 # price comes from predictor, so no stripping needed 

290 self.data.electricity_price_series = self.data.format_predictor_inputs(inp.value) 

291 

292 # set the constant electricity price series if given 

293 if self.config.calculate_costs.use_constant_electricity_price and self.data.electricity_price_series is None: 

294 # get the index for the electricity price series 

295 n = self.get(glbs.PREDICTION_HORIZON).value 

296 ts = self.get(glbs.TIME_STEP).value 

297 grid = np.arange(0, n * ts, ts) 

298 # fill the electricity_price_series with values 

299 electricity_price_series = pd.Series([self.config.calculate_costs.const_electricity_price for i in grid], index=grid) 

300 self.data.electricity_price_series = self.data.format_predictor_inputs(electricity_price_series) 

301 

302 

303 necessary_input_for_calc_flex = [self.data.power_profile_base, 

304 self.data.power_profile_flex_neg, 

305 self.data.power_profile_flex_pos] 

306 

307 if self.config.calculate_costs.calculate_flex_costs: 

308 necessary_input_for_calc_flex.append(self.data.electricity_price_series) 

309 

310 if self.config.correct_costs.enable_energy_costs_correction: 

311 necessary_input_for_calc_flex.extend( 

312 [self.data.stored_energy_profile_base, 

313 self.data.stored_energy_profile_flex_neg, 

314 self.data.stored_energy_profile_flex_pos]) 

315 

316 if all(var is not None for var in necessary_input_for_calc_flex): 

317 

318 # check the power profile end deviation 

319 if not self.config.correct_costs.enable_energy_costs_correction: 

320 self.check_power_end_deviation(tol=self.config.correct_costs.absolute_power_deviation_tolerance) 

321 

322 # Calculate the flexibility, send the offer, write and save the results 

323 self.calc_and_send_offer() 

324 

325 # set the values to None to reset the callback 

326 self._set_inputs_to_none() 

327 

328 def get_results(self) -> Optional[pd.DataFrame]: 

329 """ 

330 Opens results file of flexibility_indicator.py 

331 results_file defined in __init__ 

332 """ 

333 results_file = self.config.results_file 

334 try: 

335 results = pd.read_csv(results_file, header=[0], index_col=[0, 1]) 

336 return results 

337 except FileNotFoundError: 

338 self.logger.error("Results file %s was not found.", results_file) 

339 return None 

340 

341 def write_results(self, df, ts, n): 

342 """ 

343 Write every data of variables in self.var_list in an DataFrame 

344 DataFrame will be updated every time step 

345 

346 Args: 

347 df: DataFrame which is initialised as an empty DataFrame with columns according to self.var_list 

348 ts: time step 

349 n: number of time steps during prediction horizon 

350 Returns: 

351 DataFrame with results of every variable in self.var_list 

352 """ 

353 results = [] 

354 now = self.env.now 

355 for name in self.var_list: 

356 # Use the power variables averaged for each timestep, not the collocation values 

357 if name == glbs.POWER_ALIAS_BASE: 

358 values = self.data.power_profile_base 

359 elif name == glbs.POWER_ALIAS_NEG: 

360 values = self.data.power_profile_flex_neg 

361 elif name == glbs.POWER_ALIAS_POS: 

362 values = self.data.power_profile_flex_pos 

363 elif name == glbs.STORED_ENERGY_ALIAS_BASE: 

364 values = self.data.stored_energy_profile_base 

365 elif name == glbs.STORED_ENERGY_ALIAS_NEG: 

366 values = self.data.stored_energy_profile_flex_neg 

367 elif name == glbs.STORED_ENERGY_ALIAS_POS: 

368 values = self.data.stored_energy_profile_flex_pos 

369 elif name == self.config.price_variable: 

370 values = self.data.electricity_price_series 

371 else: 

372 values = self.get(name).value 

373 

374 if isinstance(values, pd.Series): 

375 traj = values.reindex(np.arange(0, n * ts, ts)) 

376 else: 

377 traj = pd.Series(values).reindex(np.arange(0, n * ts, ts)) 

378 results.append(traj) 

379 

380 if not now % ts: 

381 self.time.append(now) 

382 new_df = pd.DataFrame(results).T 

383 new_df.columns = self.var_list 

384 # Rename time_step variable column 

385 new_df.rename(columns={glbs.TIME_STEP: f"{glbs.TIME_STEP}_mpc"}, inplace=True) 

386 new_df.index.direction = "time" 

387 new_df[glbs.TIME_STEP] = now 

388 new_df.set_index([glbs.TIME_STEP, new_df.index], inplace=True) 

389 df = pd.concat([df, new_df]) 

390 # set the indices once again as concat cant handle indices properly 

391 indices = pd.MultiIndex.from_tuples(df.index, names=[glbs.TIME_STEP, "time"]) 

392 df.set_index(indices, inplace=True) 

393 # Drop column time_step and keep it as an index only 

394 if glbs.TIME_STEP in df.columns: 

395 df.drop(columns=[glbs.TIME_STEP], inplace=True) 

396 

397 return df 

398 

399 def cleanup_results(self): 

400 results_file = self.config.results_file 

401 if not results_file: 

402 return 

403 os.remove(results_file) 

404 

405 def calc_and_send_offer(self): 

406 """ 

407 Calculate the flexibility KPIs for current predictions, send the flex offer and set the outputs, write and save the results. 

408 """ 

409 # Calculate the flexibility KPIs for current predictions 

410 self.data.calculate(enable_energy_costs_correction=self.config.correct_costs.enable_energy_costs_correction, calculate_flex_cost=self.config.calculate_costs.calculate_flex_costs) 

411 

412 # Send flex offer 

413 self.send_flex_offer( 

414 name=glbs.FlexibilityOffer, 

415 base_power_profile=self.data.power_profile_base, 

416 pos_diff_profile=self.data.kpis_pos.power_flex_offer.value, 

417 pos_price=self.data.kpis_pos.costs.value, 

418 neg_diff_profile=self.data.kpis_neg.power_flex_offer.value, 

419 neg_price=self.data.kpis_neg.costs.value, 

420 ) 

421 

422 # set outputs 

423 for kpi in self.data.get_kpis().values(): 

424 if kpi.get_kpi_identifier() not in [kpis_pos.power_flex_within_boundary.get_kpi_identifier(), kpis_neg.power_flex_within_boundary.get_kpi_identifier()]: 

425 for output in self.config.outputs: 

426 if output.name == kpi.get_kpi_identifier(): 

427 self.set(output.name, kpi.value) 

428 

429 # write results 

430 self.df = self.write_results( 

431 df=self.df, 

432 ts=self.get(glbs.TIME_STEP).value, 

433 n=self.get(glbs.PREDICTION_HORIZON).value 

434 ) 

435 

436 # save results 

437 if self.config.save_results: 

438 self.df.to_csv(self.config.results_file) 

439 

440 def send_flex_offer( 

441 self, name, 

442 base_power_profile: pd.Series, 

443 pos_diff_profile: pd.Series, pos_price: float, 

444 neg_diff_profile: pd.Series, neg_price: float, 

445 timestamp: float = None 

446 ): 

447 """ 

448 Send a flex offer as an agent Variable. The first offer is dismissed, 

449 since the different MPCs need one time step to fully initialize. 

450 

451 Inputs: 

452 

453 name: name of the agent variable 

454 indicator_data: the indicator data object 

455 timestamp: the time offer was generated 

456 

457 """ 

458 if self.offer_count > 0: 

459 var = self._variables_dict[name] 

460 var.value = FlexOffer( 

461 base_power_profile=base_power_profile, 

462 pos_diff_profile=pos_diff_profile, pos_price=pos_price, 

463 neg_diff_profile=neg_diff_profile, neg_price=neg_price, 

464 ) 

465 if timestamp is None: 

466 timestamp = self.env.time 

467 var.timestamp = timestamp 

468 self.agent.data_broker.send_variable( 

469 variable=var.copy(update={"source": self.source}), 

470 copy=False, 

471 ) 

472 self.offer_count += 1 

473 

474 def _set_inputs_to_none(self): 

475 self.data.power_profile_base = None 

476 self.data.power_profile_flex_neg = None 

477 self.data.power_profile_flex_pos = None 

478 self.data.electricity_price_series = None 

479 self.data.stored_energy_profile_base = None 

480 self.data.stored_energy_profile_flex_neg = None 

481 self.data.stored_energy_profile_flex_pos = None 

482 

483 def check_power_end_deviation(self, tol: float): 

484 """ 

485 calculates the deviation of the final value of the power profiles and warn the user if it exceeds the tolerance 

486 """ 

487 logger = logging.getLogger(__name__) 

488 dev_pos = np.mean(self.data.power_profile_flex_pos.values[-4:] - self.data.power_profile_base.values[-4:]) 

489 dev_neg = np.mean(self.data.power_profile_flex_neg.values[-4:] - self.data.power_profile_base.values[-4:]) 

490 if abs(dev_pos) > tol: 

491 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of power profiles of positive shadow MPC and the baseline. Correction of energy costs might be necessary.") 

492 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), False) 

493 else: 

494 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), True) 

495 if abs(dev_neg) > tol: 

496 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of power profiles of negative shadow MPC and the baseline. Correction of energy costs might be necessary.") 

497 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), False) 

498 else: 

499 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), True) 

500