Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 92%

168 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-01 15:10 +0000

1from typing import Union, Optional 

2 

3import numpy 

4import pydantic 

5import numpy as np 

6import pandas as pd 

7 

8from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION 

9from agentlib_flexquant.data_structures.globals import FlexibilityDirections 

10from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN 

11 

12 

13class KPI(pydantic.BaseModel): 

14 """ Class defining attributes of the indicator KPI. """ 

15 

16 name: str = pydantic.Field( 

17 default=None, 

18 description="Name of the flexibility KPI", 

19 ) 

20 value: Union[float, None] = pydantic.Field( 

21 default=None, 

22 description="Value of the flexibility KPI", 

23 ) 

24 unit: str = pydantic.Field( 

25 default=None, 

26 description="Unit of the flexibility KPI", 

27 ) 

28 direction: Union[FlexibilityDirections, None] = pydantic.Field( 

29 default=None, 

30 description="Direction of the shadow mpc / flexibility" 

31 ) 

32 

33 class Config: 

34 arbitrary_types_allowed = True 

35 

36 def get_kpi_identifier(self): 

37 name = f"{self.direction}_{self.name}" 

38 return name 

39 

40 

41class KPISeries(KPI): 

42 value: Union[pd.Series, None] = pydantic.Field( 

43 default=None, 

44 description="Value of the flexibility KPI", 

45 ) 

46 dt: Union[pd.Series, None] = pydantic.Field( 

47 default=None, 

48 description="Time differences between the timestamps of the series in seconds", 

49 ) 

50 

51 def _get_dt(self) -> pd.Series: 

52 """ 

53 Get the time differences between the timestamps of the series. 

54 """ 

55 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill() 

56 self.dt = dt 

57 return dt 

58 

59 def min(self) -> float: 

60 return self.value.min() 

61 

62 def max(self) -> float: 

63 return self.value.max() 

64 

65 def avg(self) -> float: 

66 """ 

67 Calculate the average value of the KPI over time. 

68 """ 

69 if self.dt is None: 

70 self._get_dt() 

71 delta_t = self.dt.sum() 

72 avg = self.integrate() / delta_t 

73 return avg 

74 

75 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: 

76 """ 

77 Integrate the value of the KPI over time by summing up the product of values and the time difference. 

78 """ 

79 if self.dt is None: 

80 self._get_dt() 

81 products = self.value * self.dt / TIME_CONVERSION[time_unit] 

82 integral = products.sum() 

83 return integral 

84 

85 

86class FlexibilityKPIs(pydantic.BaseModel): 

87 """ 

88 Class defining the indicator KPIs. 

89 """ 

90 # Direction 

91 direction: FlexibilityDirections = pydantic.Field( 

92 default=None, 

93 description="Direction of the shadow mpc" 

94 ) 

95 

96 # Power / energy KPIs 

97 power_flex_full: KPISeries = pydantic.Field( 

98 default=KPISeries( 

99 name="power_flex_full", 

100 unit="kW" 

101 ), 

102 description="Power flexibility", 

103 ) 

104 power_flex_offer: KPISeries = pydantic.Field( 

105 default=KPISeries( 

106 name="power_flex_offer", 

107 unit="kW" 

108 ), 

109 description="Power flexibility", 

110 ) 

111 power_flex_offer_max: KPI = pydantic.Field( 

112 default=KPI( 

113 name="power_flex_offer_max", 

114 unit="kW" 

115 ), 

116 description="Maximum power flexibility", 

117 ) 

118 power_flex_offer_min: KPI = pydantic.Field( 

119 default=KPI( 

120 name="power_flex_offer_min", 

121 unit="kW" 

122 ), 

123 description="Minimum power flexibility", 

124 ) 

125 power_flex_offer_avg: KPI = pydantic.Field( 

126 default=KPI( 

127 name="power_flex_offer_avg", 

128 unit="kW" 

129 ), 

130 description="Average power flexibility", 

131 ) 

132 energy_flex: KPI = pydantic.Field( 

133 default=KPI( 

134 name="energy_flex", 

135 unit="kWh" 

136 ), 

137 description="Energy flexibility equals the integral of the power flexibility", 

138 ) 

139 power_flex_within_boundary: KPI = pydantic.Field( 

140 default=KPI( 

141 name="power_flex_within_boundary", 

142 unit="-" 

143 ), 

144 description="Variable indicating whether the baseline power and flex power align at the horizon end", 

145 ) 

146 

147 # Costs KPIs 

148 electricity_costs_series: KPISeries = pydantic.Field( 

149 default=KPISeries( 

150 name="electricity_costs_series", 

151 unit="ct/h" 

152 ), 

153 description="Costs of flexibility", 

154 ) 

155 costs: KPI = pydantic.Field( 

156 default=KPI( 

157 name="costs", 

158 unit="ct" 

159 ), 

160 description="Costs of flexibility", 

161 ) 

162 corrected_costs: KPI = pydantic.Field( 

163 default=KPI( 

164 name="corrected_costs", 

165 unit="ct" 

166 ), 

167 description="Corrected costs of flexibility considering the stored energy in the system", 

168 ) 

169 costs_rel: KPI = pydantic.Field( 

170 default=KPI( 

171 name="costs_rel", 

172 unit="ct/kWh" 

173 ), 

174 description="Costs of flexibility per energy", 

175 ) 

176 corrected_costs_rel: KPI = pydantic.Field( 

177 default=KPI( 

178 name="corrected_costs_rel", 

179 unit="ct/kWh" 

180 ), 

181 description="Corrected costs of flexibility per energy", 

182 ) 

183 

184 def __init__(self, direction: FlexibilityDirections, **data): 

185 super().__init__(**data) 

186 self.direction = direction 

187 for kpi in vars(self).values(): 

188 if isinstance(kpi, KPI): 

189 kpi.direction = self.direction 

190 

191 def calculate( 

192 self, 

193 power_profile_base: pd.Series, 

194 power_profile_shadow: pd.Series, 

195 electricity_price_series: pd.Series, 

196 mpc_time_grid: np.ndarray, 

197 flex_offer_time_grid: np.ndarray, 

198 stored_energy_base: pd.Series, 

199 stored_energy_shadow: pd.Series, 

200 enable_energy_costs_correction: bool, 

201 calculate_flex_cost: bool 

202 ): 

203 """ 

204 Calculate the KPIs based on the power and electricity input profiles. 

205 Time grids needed for indexing of the power flexibility profiles. 

206 """ 

207 # Power / energy KPIs 

208 self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid) 

209 self._calculate_power_flex_stats() 

210 self._calculate_energy_flex() 

211 

212 # Costs KPIs 

213 if enable_energy_costs_correction: 

214 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] 

215 else: 

216 stored_energy_diff = 0 

217 

218 if calculate_flex_cost: 

219 self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff) 

220 self._calculate_costs_rel() 

221 

222 

223 def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, 

224 flex_offer_time_grid: np.ndarray, 

225 relative_error_acceptance: float = 0.01) -> pd.Series: 

226 """ 

227 Calculate the power flexibility based on the base and flexibility power profiles. 

228 

229 Args: 

230 relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero 

231 """ 

232 if not power_profile_shadow.index.equals(power_profile_base.index): 

233 raise ValueError(f"Indices of power profiles do not match.\n" 

234 f"Baseline: {power_profile_base.index}\n" 

235 f"Shadow: {power_profile_shadow.index}") 

236 

237 # Calculate flexibility 

238 if self.direction == "positive": 

239 power_flex = power_profile_base - power_profile_shadow 

240 elif self.direction == "negative": 

241 power_flex = power_profile_shadow - power_profile_base 

242 else: 

243 raise ValueError(f"Direction of KPIs not properly defined: {self.direction}") 

244 

245 # Set values to zero if the difference is small 

246 relative_difference = (power_flex / power_profile_base).abs() 

247 power_flex.loc[relative_difference < relative_error_acceptance] = 0 

248 

249 # Set values 

250 self.power_flex_full.value = power_flex 

251 self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]] 

252 return power_flex 

253 

254 def _calculate_power_flex_stats(self) -> [float]: 

255 """ 

256 Calculate the characteristic values of the power flexibility for the offer. 

257 """ 

258 if self.power_flex_offer.value is None: 

259 raise ValueError("Power flexibility value is empty.") 

260 

261 # Calculate characteristic values 

262 power_flex_offer_max = self.power_flex_offer.max() 

263 power_flex_offer_min = self.power_flex_offer.min() 

264 power_flex_offer_avg = self.power_flex_offer.avg() 

265 

266 # Set values 

267 self.power_flex_offer_max.value = power_flex_offer_max 

268 self.power_flex_offer_min.value = power_flex_offer_min 

269 self.power_flex_offer_avg.value = power_flex_offer_avg 

270 return power_flex_offer_max, power_flex_offer_min, power_flex_offer_avg 

271 

272 def _calculate_energy_flex(self) -> float: 

273 """ 

274 Calculate the energy flexibility by integrating the power flexibility of the offer window. 

275 """ 

276 if self.power_flex_offer.value is None: 

277 raise ValueError("Power flexibility value of the offer is empty.") 

278 

279 # Calculate flexibility 

280 energy_flex = self.power_flex_offer.integrate(time_unit="hours") 

281 

282 # Set value 

283 self.energy_flex.value = energy_flex 

284 return energy_flex 

285 

286 def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float) -> [float, pd.Series]: 

287 """ 

288 Calculate the costs of the flexibility event based on the electricity costs profile and the power flexibility profile. 

289 """ 

290 # Calculate series 

291 self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value 

292 

293 # Calculate scalar 

294 costs = abs(self.electricity_costs_series.integrate(time_unit="hours")) 

295 

296 # correct the costs 

297 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) 

298 

299 self.costs.value = costs 

300 self.corrected_costs.value = corrected_costs 

301 

302 def _calculate_costs_rel(self) -> float: 

303 """ 

304 Calculate the relative costs of the flexibility event per energy flexibility. 

305 """ 

306 if self.energy_flex.value == 0: 

307 costs_rel = 0 

308 corrected_costs_rel = 0 

309 else: 

310 costs_rel = self.costs.value / self.energy_flex.value 

311 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value 

312 

313 # Set value 

314 self.costs_rel.value = costs_rel 

315 self.corrected_costs_rel.value = corrected_costs_rel 

316 

317 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: 

318 """ 

319 Get the KPIs as a dictionary with names or identifier as keys. 

320  

321 Args: 

322 identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the kpi. 

323 """ 

324 kpi_dict = {} 

325 for kpi in vars(self).values(): 

326 if isinstance(kpi, KPI): 

327 if identifier: 

328 kpi_dict[kpi.get_kpi_identifier()] = kpi 

329 else: 

330 kpi_dict[kpi.name] = kpi 

331 return kpi_dict 

332 

333 def get_name_dict(self) -> dict[str, str]: 

334 """ 

335 Returns: 

336 Dictionary of the kpis with names as keys and the identifiers as values. 

337 """ 

338 name_dict = {} 

339 for name, kpi in self.get_kpi_dict(identifier=False).items(): 

340 name_dict[name] = kpi.get_kpi_identifier() 

341 return name_dict 

342 

343 

344class FlexibilityData(pydantic.BaseModel): 

345 """ 

346 Class containing the data for the calculation of the flexibility. 

347 """ 

348 # Time parameters 

349 mpc_time_grid: np.ndarray = pydantic.Field( 

350 default=None, 

351 description="Time grid of the mpcs", 

352 ) 

353 flex_offer_time_grid: np.ndarray = pydantic.Field( 

354 default=None, 

355 description="Time grid of the flexibility offer", 

356 ) 

357 switch_time: Optional[float] = pydantic.Field( 

358 default=None, 

359 description="Time of the switch between the preparation and the market time", 

360 ) 

361 

362 # Profiles 

363 power_profile_base: pd.Series = pydantic.Field( 

364 default=None, 

365 description="Base power profile", 

366 ) 

367 power_profile_flex_neg: pd.Series = pydantic.Field( 

368 default=None, 

369 description="Power profile of the negative flexibility", 

370 ) 

371 power_profile_flex_pos: pd.Series = pydantic.Field( 

372 default=None, 

373 description="Power profile of the positive flexibility", 

374 ) 

375 stored_energy_profile_base: pd.Series = pydantic.Field( 

376 default=None, 

377 description="Base profile of the stored electrical energy", 

378 ) 

379 stored_energy_profile_flex_neg: pd.Series = pydantic.Field( 

380 default=None, 

381 description="Profile of the stored electrical energy for negative flexibility", 

382 ) 

383 stored_energy_profile_flex_pos: pd.Series = pydantic.Field( 

384 default=None, 

385 description="Profile of the stored elctrical energy for positive flexibility", 

386 ) 

387 electricity_price_series: pd.Series = pydantic.Field( 

388 default=None, 

389 description="Profile of the electricity price", 

390 ) 

391 

392 

393 # KPIs 

394 kpis_pos: FlexibilityKPIs = pydantic.Field( 

395 default=FlexibilityKPIs(direction="positive"), 

396 description="KPIs for positive flexibility", 

397 ) 

398 kpis_neg: FlexibilityKPIs = pydantic.Field( 

399 default=FlexibilityKPIs(direction="negative"), 

400 description="KPIs for negative flexibility", 

401 ) 

402 

403 class Config: 

404 arbitrary_types_allowed = True 

405 

406 def __init__(self, prep_time: int, market_time: int, flex_event_duration: int, 

407 time_step: int, prediction_horizon: int, **data): 

408 super().__init__(**data) 

409 self.switch_time = prep_time + market_time 

410 self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step) 

411 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step) 

412 

413 def format_predictor_inputs(self, series: pd.Series) -> pd.Series: 

414 """ 

415 Format the input of the predictor to unify the data. 

416 

417 Args: 

418 series: Input series from a predictor. 

419  

420 Returns: 

421 Formatted series. 

422 """ 

423 series.index = series.index - series.index[0] 

424 series = series.reindex(self.mpc_time_grid) 

425 if any(series.isna()): 

426 raise ValueError(f"The mpc time grid is not compatible with the predictor " 

427 f"input, which leads to NaN values in the series.\n" 

428 f"MPC time grid:{self.mpc_time_grid}\n" 

429 f"Series index:{series.index}") 

430 return series 

431 

432 def format_mpc_inputs(self, series: pd.Series) -> pd.Series: 

433 """ 

434 Format the input of the mpc to unify the data. 

435  

436 Args: 

437 series: Input series from a mpc. 

438  

439 Returns: 

440 Formatted series. 

441 """ 

442 series = strip_multi_index(series) 

443 if any(series.isna()): 

444 series = fill_nans(series=series, method=MEAN) 

445 series = series.reindex(self.mpc_time_grid) 

446 if any(series.isna()): 

447 raise ValueError(f"The mpc time grid is not compatible with the mpc input, " 

448 f"which leads to NaN values in the series.\n" 

449 f"MPC time grid:{self.mpc_time_grid}\n" 

450 f"Series index:{series.index}") 

451 return series 

452 

453 def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool) -> [FlexibilityKPIs, FlexibilityKPIs]: 

454 """ 

455 Calculate the KPIs for the positive and negative flexibility. 

456 

457 Returns: 

458 positive KPIs, negative KPIs 

459 """ 

460 self.kpis_pos.calculate( 

461 power_profile_base=self.power_profile_base, 

462 power_profile_shadow=self.power_profile_flex_pos, 

463 electricity_price_series=self.electricity_price_series, 

464 mpc_time_grid=self.mpc_time_grid, 

465 flex_offer_time_grid=self.flex_offer_time_grid, 

466 stored_energy_base=self.stored_energy_profile_base, 

467 stored_energy_shadow=self.stored_energy_profile_flex_pos, 

468 enable_energy_costs_correction=enable_energy_costs_correction, 

469 calculate_flex_cost=calculate_flex_cost 

470 ) 

471 self.kpis_neg.calculate( 

472 power_profile_base=self.power_profile_base, 

473 power_profile_shadow=self.power_profile_flex_neg, 

474 electricity_price_series=self.electricity_price_series, 

475 mpc_time_grid=self.mpc_time_grid, 

476 flex_offer_time_grid=self.flex_offer_time_grid, 

477 stored_energy_base=self.stored_energy_profile_base, 

478 stored_energy_shadow=self.stored_energy_profile_flex_neg, 

479 enable_energy_costs_correction=enable_energy_costs_correction, 

480 calculate_flex_cost=calculate_flex_cost 

481 ) 

482 return self.kpis_pos, self.kpis_neg 

483 

484 def get_kpis(self) -> dict[str, KPI]: 

485 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True) 

486 return kpis_dict