Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 91%

163 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-08-15 15:25 +0000

1import pydantic 

2import numpy as np 

3import pandas as pd 

4from typing import Union, Optional 

5from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION 

6from agentlib_flexquant.data_structures.globals import FlexibilityDirections 

7from agentlib_flexquant.utils.data_handling import strip_multi_index, fill_nans, MEAN 

8 

9 

10class KPI(pydantic.BaseModel): 

11 """Class defining attributes of the indicator KPI.""" 

12 name: str = pydantic.Field( 

13 default=None, 

14 description="Name of the flexibility KPI", 

15 ) 

16 value: Union[float, None] = pydantic.Field( 

17 default=None, 

18 description="Value of the flexibility KPI", 

19 ) 

20 unit: str = pydantic.Field( 

21 default=None, 

22 description="Unit of the flexibility KPI", 

23 ) 

24 direction: Union[FlexibilityDirections, None] = pydantic.Field( 

25 default=None, 

26 description="Direction of the shadow mpc / flexibility" 

27 ) 

28 

29 class Config: 

30 arbitrary_types_allowed = True 

31 

32 def get_kpi_identifier(self): 

33 """Get the identifier of the KPI composed of the direction of the flexibility and the KPI name.""" 

34 name = f"{self.direction}_{self.name}" 

35 return name 

36 

37 

38class KPISeries(KPI): 

39 """Class defining extra attributes of the indicator KPISeries in addition to KPI.""" 

40 value: Union[pd.Series, None] = pydantic.Field( 

41 default=None, 

42 description="Value of the flexibility KPI", 

43 ) 

44 dt: Union[pd.Series, None] = pydantic.Field( 

45 default=None, 

46 description="Time differences between the timestamps of the series in seconds", 

47 ) 

48 

49 def _get_dt(self) -> pd.Series: 

50 """Get the time differences between the timestamps of the series.""" 

51 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1).ffill() 

52 self.dt = dt 

53 return dt 

54 

55 def min(self) -> float: 

56 """Get the minimum of a KPISeries.""" 

57 return self.value.min() 

58 

59 def max(self) -> float: 

60 """Get the maximum of a KPISeries.""" 

61 return self.value.max() 

62 

63 def avg(self) -> float: 

64 """Calculate the average value of the KPISeries over time.""" 

65 if self.dt is None: 

66 self._get_dt() 

67 delta_t = self.dt.sum() 

68 avg = self.integrate() / delta_t 

69 return avg 

70 

71 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: 

72 """Integrate the value of the KPISeries over time by summing up the product of values and the time difference. 

73 

74 Args: 

75 time_unit: The time unit the integrated value should have 

76 

77 Returns: 

78 The integrated value of the KPISeries 

79 

80 """ 

81 if self.dt is None: 

82 self._get_dt() 

83 products = self.value * self.dt / TIME_CONVERSION[time_unit] 

84 integral = products.sum() 

85 return integral 

86 

87 

88class FlexibilityKPIs(pydantic.BaseModel): 

89 """Class defining the indicator KPIs.""" 

90 # Direction 

91 direction: FlexibilityDirections = pydantic.Field( 

92 default=None, 

93 description="Direction of the shadow mpc" 

94 ) 

95 

96 # Power / energy KPIs 

97 power_flex_full: KPISeries = pydantic.Field( 

98 default=KPISeries( 

99 name="power_flex_full", 

100 unit="kW" 

101 ), 

102 description="Power flexibility", 

103 ) 

104 power_flex_offer: KPISeries = pydantic.Field( 

105 default=KPISeries( 

106 name="power_flex_offer", 

107 unit="kW" 

108 ), 

109 description="Power flexibility", 

110 ) 

111 power_flex_offer_max: KPI = pydantic.Field( 

112 default=KPI( 

113 name="power_flex_offer_max", 

114 unit="kW" 

115 ), 

116 description="Maximum power flexibility", 

117 ) 

118 power_flex_offer_min: KPI = pydantic.Field( 

119 default=KPI( 

120 name="power_flex_offer_min", 

121 unit="kW" 

122 ), 

123 description="Minimum power flexibility", 

124 ) 

125 power_flex_offer_avg: KPI = pydantic.Field( 

126 default=KPI( 

127 name="power_flex_offer_avg", 

128 unit="kW" 

129 ), 

130 description="Average power flexibility", 

131 ) 

132 energy_flex: KPI = pydantic.Field( 

133 default=KPI( 

134 name="energy_flex", 

135 unit="kWh" 

136 ), 

137 description="Energy flexibility equals the integral of the power flexibility", 

138 ) 

139 power_flex_within_boundary: KPI = pydantic.Field( 

140 default=KPI( 

141 name="power_flex_within_boundary", 

142 unit="-" 

143 ), 

144 description="Variable indicating whether the baseline power and flex power align at the horizon end", 

145 ) 

146 

147 # Costs KPIs 

148 electricity_costs_series: KPISeries = pydantic.Field( 

149 default=KPISeries( 

150 name="electricity_costs_series", 

151 unit="ct/h" 

152 ), 

153 description="Costs of flexibility", 

154 ) 

155 costs: KPI = pydantic.Field( 

156 default=KPI( 

157 name="costs", 

158 unit="ct" 

159 ), 

160 description="Costs of flexibility", 

161 ) 

162 corrected_costs: KPI = pydantic.Field( 

163 default=KPI( 

164 name="corrected_costs", 

165 unit="ct" 

166 ), 

167 description="Corrected costs of flexibility considering the stored energy in the system", 

168 ) 

169 costs_rel: KPI = pydantic.Field( 

170 default=KPI( 

171 name="costs_rel", 

172 unit="ct/kWh" 

173 ), 

174 description="Costs of flexibility per energy", 

175 ) 

176 corrected_costs_rel: KPI = pydantic.Field( 

177 default=KPI( 

178 name="corrected_costs_rel", 

179 unit="ct/kWh" 

180 ), 

181 description="Corrected costs of flexibility per energy", 

182 ) 

183 

184 def __init__(self, direction: FlexibilityDirections, **data): 

185 super().__init__(**data) 

186 self.direction = direction 

187 for kpi in vars(self).values(): 

188 if isinstance(kpi, KPI): 

189 kpi.direction = self.direction 

190 

191 def calculate( 

192 self, 

193 power_profile_base: pd.Series, 

194 power_profile_shadow: pd.Series, 

195 electricity_price_series: pd.Series, 

196 mpc_time_grid: np.ndarray, 

197 flex_offer_time_grid: np.ndarray, 

198 stored_energy_base: pd.Series, 

199 stored_energy_shadow: pd.Series, 

200 enable_energy_costs_correction: bool, 

201 calculate_flex_cost: bool 

202 ): 

203 """Calculate the KPIs based on the power and electricity price input profiles. 

204 

205 Args: 

206 power_profile_base: power profile from baseline mpc 

207 power_profile_shadow: power profile from shadow mpc 

208 electricity_price_series: time series of electricity prices 

209 mpc_time_grid: time grid over the MPC horizon with intervals of time_step 

210 flex_offer_time_grid: time grid over which the flexibility offer is calculated, for indexing of the power flexibility profiles 

211 stored_energy_base: time series of stored energy from baseline mpc 

212 stored_energy_shadow: time series of stored energy from shadow mpc 

213 enable_energy_costs_correction: whether the energy costs should be corrected 

214 calculate_flex_cost: whether the cost of the flexibility should be calculated 

215 

216 """ 

217 # Power / energy KPIs 

218 self._calculate_power_flex(power_profile_base=power_profile_base, power_profile_shadow=power_profile_shadow, flex_offer_time_grid=flex_offer_time_grid) 

219 self._calculate_power_flex_stats() 

220 self._calculate_energy_flex() 

221 

222 # Costs KPIs 

223 if enable_energy_costs_correction: 

224 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] 

225 else: 

226 stored_energy_diff = 0 

227 

228 if calculate_flex_cost: 

229 self._calculate_costs(electricity_price_signal=electricity_price_series, stored_energy_diff=stored_energy_diff) 

230 self._calculate_costs_rel() 

231 

232 def _calculate_power_flex(self, power_profile_base: pd.Series, power_profile_shadow: pd.Series, 

233 flex_offer_time_grid: np.ndarray, relative_error_acceptance: float = 0.01): 

234 """Calculate the power flexibility based on the base and flexibility power profiles. 

235 

236 Args: 

237 power_profile_base: power profile from the baseline mpc 

238 power_profile_shadow: power profile from the shadow mpc 

239 flex_offer_time_grid: time grid over which the flexibility offer is calculated 

240 relative_error_acceptance: threshold for the relative error between the baseline and shadow mpc to set the power flexibility to zero 

241 

242 """ 

243 if not power_profile_shadow.index.equals(power_profile_base.index): 

244 raise ValueError(f"Indices of power profiles do not match.\n" 

245 f"Baseline: {power_profile_base.index}\n" 

246 f"Shadow: {power_profile_shadow.index}") 

247 

248 # Calculate flexibility 

249 if self.direction == "positive": 

250 power_flex = power_profile_base - power_profile_shadow 

251 elif self.direction == "negative": 

252 power_flex = power_profile_shadow - power_profile_base 

253 else: 

254 raise ValueError(f"Direction of KPIs not properly defined: {self.direction}") 

255 

256 # Set values to zero if the difference is small 

257 relative_difference = (power_flex / power_profile_base).abs() 

258 power_flex.loc[relative_difference < relative_error_acceptance] = 0 

259 

260 # Set values 

261 self.power_flex_full.value = power_flex 

262 self.power_flex_offer.value = power_flex.loc[flex_offer_time_grid[0]:flex_offer_time_grid[-1]] 

263 

264 def _calculate_power_flex_stats(self): 

265 """Calculate the characteristic values of the power flexibility for the offer.""" 

266 if self.power_flex_offer.value is None: 

267 raise ValueError("Power flexibility value is empty.") 

268 

269 # Calculate characteristic values 

270 power_flex_offer_max = self.power_flex_offer.max() 

271 power_flex_offer_min = self.power_flex_offer.min() 

272 power_flex_offer_avg = self.power_flex_offer.avg() 

273 

274 # Set values 

275 self.power_flex_offer_max.value = power_flex_offer_max 

276 self.power_flex_offer_min.value = power_flex_offer_min 

277 self.power_flex_offer_avg.value = power_flex_offer_avg 

278 

279 def _calculate_energy_flex(self): 

280 """Calculate the energy flexibility by integrating the power flexibility of the offer window.""" 

281 if self.power_flex_offer.value is None: 

282 raise ValueError("Power flexibility value of the offer is empty.") 

283 

284 # Calculate flexibility 

285 energy_flex = self.power_flex_offer.integrate(time_unit="hours") 

286 

287 # Set value 

288 self.energy_flex.value = energy_flex 

289 

290 def _calculate_costs(self, electricity_price_signal: pd.Series, stored_energy_diff: float): 

291 """Calculate the costs of the flexibility event based on the electricity costs profile, the power flexibility profile and difference of stored energy. 

292 

293 Args: 

294 electricity_price_signal: time series of the electricity price signal 

295 stored_energy_diff: the difference of the stored energy between baseline and shadow mpc 

296 

297 """ 

298 # Calculate series 

299 self.electricity_costs_series.value = electricity_price_signal * self.power_flex_full.value 

300 

301 # Calculate scalar 

302 costs = abs(self.electricity_costs_series.integrate(time_unit="hours")) 

303 

304 # correct the costs 

305 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) 

306 

307 self.costs.value = costs 

308 self.corrected_costs.value = corrected_costs 

309 

310 def _calculate_costs_rel(self): 

311 """Calculate the relative costs of the flexibility event per energy flexibility.""" 

312 if self.energy_flex.value == 0: 

313 costs_rel = 0 

314 corrected_costs_rel = 0 

315 else: 

316 costs_rel = self.costs.value / self.energy_flex.value 

317 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value 

318 

319 # Set value 

320 self.costs_rel.value = costs_rel 

321 self.corrected_costs_rel.value = corrected_costs_rel 

322 

323 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: 

324 """Get the KPIs as a dictionary with names or identifier as keys. 

325  

326 Args: 

327 identifier: If True, the keys are the identifiers of the KPIs, otherwise the name of the KPI. 

328 

329 Returns: 

330 A dictionary mapping desired KPI keys to KPI. 

331 

332 """ 

333 kpi_dict = {} 

334 for kpi in vars(self).values(): 

335 if isinstance(kpi, KPI): 

336 if identifier: 

337 kpi_dict[kpi.get_kpi_identifier()] = kpi 

338 else: 

339 kpi_dict[kpi.name] = kpi 

340 return kpi_dict 

341 

342 def get_name_dict(self) -> dict[str, str]: 

343 """Get KPIs mapping. 

344 

345 Returns: 

346 Dictionary of the kpis with names as keys and the identifiers as values. 

347 

348 """ 

349 name_dict = {} 

350 for name, kpi in self.get_kpi_dict(identifier=False).items(): 

351 name_dict[name] = kpi.get_kpi_identifier() 

352 return name_dict 

353 

354 

355class FlexibilityData(pydantic.BaseModel): 

356 """Class containing the data for the calculation of the flexibility.""" 

357 # Time parameters 

358 mpc_time_grid: np.ndarray = pydantic.Field( 

359 default=None, 

360 description="Time grid of the mpcs", 

361 ) 

362 flex_offer_time_grid: np.ndarray = pydantic.Field( 

363 default=None, 

364 description="Time grid of the flexibility offer", 

365 ) 

366 switch_time: Optional[float] = pydantic.Field( 

367 default=None, 

368 description="Time of the switch between the preparation and the market time", 

369 ) 

370 

371 # Profiles 

372 power_profile_base: pd.Series = pydantic.Field( 

373 default=None, 

374 description="Base power profile", 

375 ) 

376 power_profile_flex_neg: pd.Series = pydantic.Field( 

377 default=None, 

378 description="Power profile of the negative flexibility", 

379 ) 

380 power_profile_flex_pos: pd.Series = pydantic.Field( 

381 default=None, 

382 description="Power profile of the positive flexibility", 

383 ) 

384 stored_energy_profile_base: pd.Series = pydantic.Field( 

385 default=None, 

386 description="Base profile of the stored electrical energy", 

387 ) 

388 stored_energy_profile_flex_neg: pd.Series = pydantic.Field( 

389 default=None, 

390 description="Profile of the stored electrical energy for negative flexibility", 

391 ) 

392 stored_energy_profile_flex_pos: pd.Series = pydantic.Field( 

393 default=None, 

394 description="Profile of the stored elctrical energy for positive flexibility", 

395 ) 

396 electricity_price_series: pd.Series = pydantic.Field( 

397 default=None, 

398 description="Profile of the electricity price", 

399 ) 

400 

401 # KPIs 

402 kpis_pos: FlexibilityKPIs = pydantic.Field( 

403 default=FlexibilityKPIs(direction="positive"), 

404 description="KPIs for positive flexibility", 

405 ) 

406 kpis_neg: FlexibilityKPIs = pydantic.Field( 

407 default=FlexibilityKPIs(direction="negative"), 

408 description="KPIs for negative flexibility", 

409 ) 

410 

411 class Config: 

412 arbitrary_types_allowed = True 

413 

414 def __init__(self, prep_time: int, market_time: int, flex_event_duration: int, 

415 time_step: int, prediction_horizon: int, **data): 

416 super().__init__(**data) 

417 self.switch_time = prep_time + market_time 

418 self.flex_offer_time_grid = np.arange(self.switch_time, self.switch_time + flex_event_duration, time_step) 

419 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step, time_step) 

420 

421 def format_predictor_inputs(self, series: pd.Series) -> pd.Series: 

422 """Format the input of the predictor to unify the data. 

423 

424 Args: 

425 series: Input series from a predictor. 

426  

427 Returns: 

428 Formatted series. 

429 

430 """ 

431 series.index = series.index - series.index[0] 

432 series = series.reindex(self.mpc_time_grid) 

433 if any(series.isna()): 

434 raise ValueError(f"The mpc time grid is not compatible with the predictor " 

435 f"input, which leads to NaN values in the series.\n" 

436 f"MPC time grid:{self.mpc_time_grid}\n" 

437 f"Series index:{series.index}") 

438 return series 

439 

440 def format_mpc_inputs(self, series: pd.Series) -> pd.Series: 

441 """Format the input of the mpc to unify the data. 

442  

443 Args: 

444 series: Input series from a mpc. 

445  

446 Returns: 

447 Formatted series. 

448 

449 """ 

450 series = strip_multi_index(series) 

451 if any(series.isna()): 

452 series = fill_nans(series=series, method=MEAN) 

453 series = series.reindex(self.mpc_time_grid) 

454 if any(series.isna()): 

455 raise ValueError(f"The mpc time grid is not compatible with the mpc input, " 

456 f"which leads to NaN values in the series.\n" 

457 f"MPC time grid:{self.mpc_time_grid}\n" 

458 f"Series index:{series.index}") 

459 return series 

460 

461 def calculate(self, enable_energy_costs_correction: bool, calculate_flex_cost: bool): 

462 """Calculate the KPIs for the positive and negative flexibility. 

463 

464 Args: 

465 enable_energy_costs_correction: whether the energy costs should be corrected 

466 calculate_flex_cost: whether the cost of the flexibility should be calculated 

467 

468 """ 

469 self.kpis_pos.calculate( 

470 power_profile_base=self.power_profile_base, 

471 power_profile_shadow=self.power_profile_flex_pos, 

472 electricity_price_series=self.electricity_price_series, 

473 mpc_time_grid=self.mpc_time_grid, 

474 flex_offer_time_grid=self.flex_offer_time_grid, 

475 stored_energy_base=self.stored_energy_profile_base, 

476 stored_energy_shadow=self.stored_energy_profile_flex_pos, 

477 enable_energy_costs_correction=enable_energy_costs_correction, 

478 calculate_flex_cost=calculate_flex_cost 

479 ) 

480 self.kpis_neg.calculate( 

481 power_profile_base=self.power_profile_base, 

482 power_profile_shadow=self.power_profile_flex_neg, 

483 electricity_price_series=self.electricity_price_series, 

484 mpc_time_grid=self.mpc_time_grid, 

485 flex_offer_time_grid=self.flex_offer_time_grid, 

486 stored_energy_base=self.stored_energy_profile_base, 

487 stored_energy_shadow=self.stored_energy_profile_flex_neg, 

488 enable_energy_costs_correction=enable_energy_costs_correction, 

489 calculate_flex_cost=calculate_flex_cost 

490 ) 

491 

492 def get_kpis(self) -> dict[str, KPI]: 

493 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict(identifier=True) 

494 return kpis_dict