Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 90%

208 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-03-26 09:43 +0000

1""" 

2Module for representing and calculating flexibility KPIs. It defines Pydantic models 

3for scalar and time-series KPIs, and provides methods to compute power, energy, 

4and cost metrics for positive and negative flexibility scenarios. 

5""" 

6from typing import Optional, Union 

7 

8import numpy as np 

9import pandas as pd 

10import pydantic 

11from agentlib_mpc.utils import TIME_CONVERSION, TimeConversionTypes 

12 

13from agentlib_flexquant.data_structures.globals import ( 

14 FlexibilityDirections, 

15 LINEAR, 

16 CONSTANT, 

17 INTEGRATION_METHOD, 

18) 

19from agentlib_flexquant.utils.data_handling import MEAN, fill_nans, strip_multi_index 

20 

21 

22class KPI(pydantic.BaseModel): 

23 """Class defining attributes of the indicator KPI.""" 

24 

25 name: str = pydantic.Field( 

26 default=None, 

27 description="Name of the flexibility KPI", 

28 ) 

29 value: Union[float, None] = pydantic.Field( 

30 default=None, 

31 description="Value of the flexibility KPI", 

32 ) 

33 unit: str = pydantic.Field( 

34 default=None, 

35 description="Unit of the flexibility KPI", 

36 ) 

37 direction: Union[FlexibilityDirections, None] = pydantic.Field( 

38 default=None, description="Direction of the shadow mpc / flexibility" 

39 ) 

40 

41 class Config: 

42 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray 

43 in model fields without requiring custom validators.""" 

44 

45 arbitrary_types_allowed = True 

46 

47 def get_kpi_identifier(self): 

48 """Get the identifier of the KPI composed of the direction of the flexibility 

49 and the KPI name.""" 

50 name = f"{self.direction}_{self.name}" 

51 return name 

52 

53 

54class KPISeries(KPI): 

55 """Class defining extra attributes of the indicator KPISeries in addition to KPI.""" 

56 

57 value: Union[pd.Series, None] = pydantic.Field( 

58 default=None, 

59 description="Value of the flexibility KPI", 

60 ) 

61 dt: Union[pd.Series, None] = pydantic.Field( 

62 default=None, 

63 description="Time differences between the timestamps of the series in seconds", 

64 ) 

65 integration_method: INTEGRATION_METHOD = pydantic.Field( 

66 default=LINEAR, description="Method set to integrate series variable" 

67 ) 

68 

69 def _get_dt(self) -> pd.Series: 

70 """Get the forward time differences between the timestamps of the series.""" 

71 # compute forward differences between consecutive timestamps 

72 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1) 

73 # set the last value of dt to zero since there is no subsequent time step to compute a 

74 # difference with 

75 dt.iloc[-1] = 0 

76 self.dt = dt 

77 return dt 

78 

79 def min(self) -> float: 

80 """Get the minimum of a KPISeries.""" 

81 return self.value.min() 

82 

83 def max(self) -> float: 

84 """Get the maximum of a KPISeries.""" 

85 return self.value.max() 

86 

87 def avg(self) -> float: 

88 """Calculate the average value of the KPISeries over time.""" 

89 if self.dt is None: 

90 self._get_dt() 

91 delta_t = self.dt.sum() 

92 avg = self.integrate() / delta_t 

93 return avg 

94 

95 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: 

96 """Integrate the value of the KPISeries over time by summing up 

97 the product of values and the time difference. 

98 

99 Args: 

100 time_unit: The time unit the integrated value should have 

101 

102 Returns: 

103 The integrated value of the KPISeries 

104 

105 """ 

106 if self.integration_method == LINEAR: 

107 # Linear integration: apply the trapezoidal rule, which assumes 

108 # the function changes linearly between sample points 

109 return np.trapezoid(self.value.values, 

110 self.value.index) / TIME_CONVERSION[time_unit] 

111 if self.integration_method == CONSTANT: 

112 # Constant integration: use a step-wise constant approach by 

113 # holding the value constant over each interval 

114 return ( 

115 np.sum(self.value.values[:-1] * self._get_dt().iloc[:-1]) 

116 / TIME_CONVERSION[time_unit] 

117 ) 

118 

119 

120class FlexibilityKPIs(pydantic.BaseModel): 

121 """Class defining the indicator KPIs.""" 

122 

123 # Direction 

124 direction: FlexibilityDirections = pydantic.Field( 

125 default=None, description="Direction of the shadow mpc" 

126 ) 

127 

128 # Power / energy KPIs 

129 power_flex_full: KPISeries = pydantic.Field( 

130 default=KPISeries(name="power_flex_full", unit="kW", integration_method=LINEAR), 

131 description="Power flexibility", 

132 ) 

133 power_flex_offer: KPISeries = pydantic.Field( 

134 default=KPISeries(name="power_flex_offer", unit="kW", integration_method=LINEAR), 

135 description="Power flexibility", 

136 ) 

137 power_flex_offer_max: KPI = pydantic.Field( 

138 default=KPI(name="power_flex_offer_max", unit="kW"), 

139 description="Maximum power flexibility", 

140 ) 

141 power_flex_offer_min: KPI = pydantic.Field( 

142 default=KPI(name="power_flex_offer_min", unit="kW"), 

143 description="Minimum power flexibility", 

144 ) 

145 power_flex_offer_avg: KPI = pydantic.Field( 

146 default=KPI(name="power_flex_offer_avg", unit="kW"), 

147 description="Average power flexibility", 

148 ) 

149 energy_flex: KPI = pydantic.Field( 

150 default=KPI(name="energy_flex", unit="kWh"), 

151 description="Energy flexibility equals the integral of the power flexibility", 

152 ) 

153 power_flex_within_boundary: KPI = pydantic.Field( 

154 default=KPI(name="power_flex_within_boundary", unit="-"), 

155 description=( 

156 "Variable indicating whether the baseline power and flex power " 

157 "align at the horizon end" 

158 ), 

159 ) 

160 

161 # Costs KPIs 

162 electricity_costs_series: KPISeries = pydantic.Field( 

163 default=KPISeries(name="electricity_costs_series", unit="ct/h", integration_method=LINEAR), 

164 description="Difference in electricity costs between shadow and baseline mpc over full prediction horizon", 

165 ) 

166 costs: KPI = pydantic.Field( 

167 default=KPI(name="costs", unit="ct"), 

168 description="Costs of flexibility", 

169 ) 

170 corrected_costs: KPI = pydantic.Field( 

171 default=KPI(name="corrected_costs", unit="ct"), 

172 description="Corrected costs of flexibility considering the stored " 

173 "energy in the system", 

174 ) 

175 costs_rel: KPI = pydantic.Field( 

176 default=KPI(name="costs_rel", unit="ct/kWh"), 

177 description="Costs of flexibility per energy", 

178 ) 

179 corrected_costs_rel: KPI = pydantic.Field( 

180 default=KPI(name="corrected_costs_rel", unit="ct/kWh"), 

181 description="Corrected costs of flexibility per energy", 

182 ) 

183 

184 def __init__(self, direction: FlexibilityDirections, **data): 

185 super().__init__(**data) 

186 self.direction = direction 

187 for kpi in vars(self).values(): 

188 if isinstance(kpi, KPI): 

189 kpi.direction = self.direction 

190 

191 def calculate( 

192 self, 

193 power_profile_base: pd.Series, 

194 power_profile_shadow: pd.Series, 

195 electricity_price_series: pd.Series, 

196 feed_in_price_series: pd.Series, 

197 mpc_time_grid: np.ndarray, 

198 flex_offer_time_grid: np.ndarray, 

199 stored_energy_base: pd.Series, 

200 stored_energy_shadow: pd.Series, 

201 enable_energy_costs_correction: bool, 

202 calculate_flex_cost: bool, 

203 integration_method: INTEGRATION_METHOD, 

204 collocation_time_grid: list = None, 

205 ): 

206 """Calculate the KPIs based on the power and electricity price input profiles. 

207 

208 Args: 

209 power_profile_base: power profile from baseline mpc 

210 power_profile_shadow: power profile from shadow mpc 

211 electricity_price_series: time series of electricity prices 

212 feed_in_price_series: time series of electricity feed-in prices 

213 flex_offer_time_grid: time grid over which the flexibility offer is calculated, 

214 for indexing of the power flexibility profiles 

215 stored_energy_base: time series of stored energy from baseline mpc 

216 stored_energy_shadow: time series of stored energy from shadow mpc 

217 enable_energy_costs_correction: whether the energy costs should be corrected 

218 calculate_flex_cost: whether the cost of the flexibility should be calculated 

219 integration_method: method used for integration of KPISeries e.g. linear, constant 

220 collocation_time_grid: Time grid of the mpc output with collocation discretization 

221 

222 

223 """ 

224 # Power / energy KPIs 

225 self._calculate_power_flex( 

226 power_profile_base=power_profile_base, 

227 power_profile_shadow=power_profile_shadow, 

228 flex_offer_time_grid=flex_offer_time_grid, 

229 integration_method=integration_method, 

230 ) 

231 self._calculate_power_flex_stats( 

232 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid 

233 ) 

234 self._calculate_energy_flex( 

235 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid 

236 ) 

237 

238 # Costs KPIs 

239 if enable_energy_costs_correction: 

240 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] 

241 else: 

242 stored_energy_diff = 0 

243 

244 if calculate_flex_cost: 

245 self._calculate_costs( 

246 electricity_price_signal=electricity_price_series, 

247 feed_in_price_signal=feed_in_price_series, 

248 power_profile_base=power_profile_base, 

249 power_profile_shadow=power_profile_shadow, 

250 stored_energy_diff=stored_energy_diff, 

251 integration_method=integration_method, 

252 mpc_time_grid=mpc_time_grid, 

253 collocation_time_grid=collocation_time_grid, 

254 ) 

255 self._calculate_costs_rel() 

256 

257 def _calculate_power_flex( 

258 self, 

259 power_profile_base: pd.Series, 

260 power_profile_shadow: pd.Series, 

261 flex_offer_time_grid: np.ndarray, 

262 integration_method: INTEGRATION_METHOD, 

263 relative_error_acceptance: float = 0.01, 

264 ): 

265 """Calculate the power flexibility based on the base and flexibility power profiles. 

266 

267 Args: 

268 power_profile_base: power profile from the baseline mpc 

269 power_profile_shadow: power profile from the shadow mpc 

270 flex_offer_time_grid: time grid over which the flexibility offer is calculated 

271 integration_method: method used for integration of KPISeries e.g. linear, constant 

272 relative_error_acceptance: threshold for the relative error between the baseline 

273 and shadow mpc to set the power flexibility to zero 

274 

275 """ 

276 if not power_profile_shadow.index.equals(power_profile_base.index): 

277 raise ValueError( 

278 f"Indices of power profiles do not match.\n" 

279 f"Baseline: {power_profile_base.index}\n" 

280 f"Shadow: {power_profile_shadow.index}" 

281 ) 

282 

283 # Calculate power flexibility trajectory 

284 if self.direction == "positive": 

285 power_flex = power_profile_base - power_profile_shadow 

286 elif self.direction == "negative": 

287 power_flex = power_profile_shadow - power_profile_base 

288 else: 

289 raise ValueError( 

290 f"Direction of KPIs not properly defined: {self.direction}") 

291 

292 # Set values to zero if the difference is small 

293 relative_difference = (power_flex / power_profile_base).abs() 

294 power_flex.loc[relative_difference < relative_error_acceptance] = 0 

295 # Set the first value of power_flex to zero, since it comes from the measurement/simulator 

296 # and is the same for baseline and shadow mpcs. 

297 # For quantification of flexibility, only power difference is of interest. 

298 power_flex.iloc[0] = 0 

299 

300 # Set values 

301 self.power_flex_full.value = power_flex 

302 self.power_flex_offer.value = power_flex.loc[ 

303 flex_offer_time_grid[0] : flex_offer_time_grid[-1] 

304 ] 

305 

306 # Set integration method 

307 self.power_flex_full.integration_method = integration_method 

308 self.power_flex_offer.integration_method = integration_method 

309 

310 def _calculate_power_flex_stats( 

311 self, mpc_time_grid: np.array, collocation_time_grid: list = None 

312 ): 

313 """Calculate the characteristic values of the power flexibility for the offer.""" 

314 if self.power_flex_offer.value is None: 

315 raise ValueError("Power flexibility value is empty.") 

316 

317 # Calculate characteristic values 

318 # max and min of power flex offer 

319 power_flex_offer = self.power_flex_offer.value.iloc[:-1].drop( 

320 collocation_time_grid, errors="ignore" 

321 ) 

322 power_flex_offer_max = power_flex_offer.max() 

323 power_flex_offer_min = power_flex_offer.min() 

324 # Average of the power flex offer 

325 # Get the series for integration before calculating average 

326 power_flex_offer_integration = self._get_series_for_integration( 

327 series=self.power_flex_offer, mpc_time_grid=mpc_time_grid 

328 ) 

329 power_flex_offer_integration.value = power_flex_offer_integration.value.drop( 

330 collocation_time_grid, errors="ignore" 

331 ) 

332 # Calculate the average and stores the original value 

333 power_flex_offer_avg = power_flex_offer_integration.avg() 

334 

335 # Set values 

336 self.power_flex_offer_max.value = power_flex_offer_max 

337 self.power_flex_offer_min.value = power_flex_offer_min 

338 self.power_flex_offer_avg.value = power_flex_offer_avg 

339 

340 def _get_series_for_integration( 

341 self, series: KPISeries, mpc_time_grid: np.ndarray 

342 ) -> KPISeries: 

343 """Return the KPISeries value sampled on the MPC time grid when the integration 

344 method is constant. 

345 

346 Otherwise, the original value is returned. 

347 

348 Args: 

349 series: the KPISeries to get value from 

350 mpc_time_grid: the MPC time grid over the horizon 

351 

352 """ 

353 if series.integration_method == CONSTANT: 

354 series = series.__deepcopy__() 

355 series.value = series.value.reindex(mpc_time_grid).dropna() 

356 return series 

357 else: 

358 return series.__deepcopy__() 

359 

360 def _calculate_energy_flex(self, mpc_time_grid, collocation_time_grid: list = None): 

361 """Calculate the energy flexibility by integrating the power flexibility 

362 of the offer window.""" 

363 if self.power_flex_offer.value is None: 

364 raise ValueError("Power flexibility value of the offer is empty.") 

365 

366 # Calculate flexibility 

367 # Get the series for integration before calculating average 

368 power_flex_offer_integration = self._get_series_for_integration( 

369 series=self.power_flex_offer, mpc_time_grid=mpc_time_grid 

370 ) 

371 power_flex_offer_integration.value = power_flex_offer_integration.value.drop( 

372 collocation_time_grid, errors="ignore" 

373 ) 

374 # Calculate the energy flex and stores the original value 

375 energy_flex = power_flex_offer_integration.integrate(time_unit="hours") 

376 

377 # Set value 

378 self.energy_flex.value = energy_flex 

379 

380 def _calculate_costs( 

381 self, 

382 electricity_price_signal: pd.Series, 

383 feed_in_price_signal: pd.Series, 

384 power_profile_base: pd.Series, 

385 power_profile_shadow: pd.Series, 

386 stored_energy_diff: float, 

387 integration_method: INTEGRATION_METHOD, 

388 mpc_time_grid: np.ndarray, 

389 collocation_time_grid: list = None, 

390 ): 

391 """Calculate the costs of the flexibility event based on the electricity costs profile, 

392 the power flexibility profile and difference of stored energy. 

393 

394 Args: 

395 electricity_price_signal: time series of the electricity price signal 

396 feed_in_price_signal: time series of the feed-in price signal 

397 power_profile_base: baseline power profile used to select tariff by sign 

398 power_profile_shadow: shadow mpc power profile used to select tariff by sign 

399 stored_energy_diff: the difference of the stored energy between baseline and shadow mpc 

400 integration_method: the integration method used to integrate KPISeries 

401 mpc_time_grid: the MPC time grid over the horizon 

402 collocation_time_grid: Time grid of the mpc output with collocation discretization 

403 

404 

405 """ 

406 if not power_profile_shadow.index.equals(power_profile_base.index): 

407 raise ValueError( 

408 f"Indices of power profiles do not match.\n" 

409 f"Baseline: {power_profile_base.index}\n" 

410 f"Shadow: {power_profile_shadow.index}" 

411 ) 

412 

413 # Set integration method 

414 self.power_flex_full.integration_method = integration_method 

415 self.electricity_costs_series.integration_method = integration_method 

416 

417 # if there is no feed-in tariff provided, the electricity price signal is used for both consumption and feed-in 

418 if feed_in_price_signal is None: 

419 feed_in_price_signal = electricity_price_signal 

420 

421 # Select tariff based on the sign of each profile 

422 effective_price_base = electricity_price_signal.where( 

423 power_profile_base > 0, 

424 feed_in_price_signal, 

425 ) 

426 effective_price_shadow = electricity_price_signal.where( 

427 power_profile_shadow > 0, 

428 feed_in_price_signal, 

429 ) 

430 

431 cost_profile_base = power_profile_base * effective_price_base 

432 cost_profile_shadow = power_profile_shadow * effective_price_shadow 

433 

434 # Get the series for integration before calculating 

435 power_flex_full_integration = self._get_series_for_integration( 

436 series=self.power_flex_full, mpc_time_grid=mpc_time_grid 

437 ) 

438 power_flex_full_integration.value = power_flex_full_integration.value.drop( 

439 collocation_time_grid, errors="ignore" 

440 ) 

441 

442 # Difference in costs between shadow and baseline mpc 

443 delta_cost = cost_profile_shadow - cost_profile_base 

444 delta_cost = delta_cost.reindex(power_flex_full_integration.value.index) 

445 delta_cost = delta_cost.where(power_flex_full_integration.value != 0, 0) 

446 self.electricity_costs_series.value = delta_cost.dropna() 

447 

448 # Calculate the costs and stores the original value 

449 costs = self.electricity_costs_series.integrate(time_unit="hours") 

450 

451 # correct the costs 

452 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) 

453 

454 self.costs.value = costs 

455 self.corrected_costs.value = corrected_costs 

456 

457 def _calculate_costs_rel(self): 

458 """Calculate the relative costs of the flexibility event per energy flexibility.""" 

459 if self.energy_flex.value == 0: 

460 costs_rel = 0 

461 corrected_costs_rel = 0 

462 else: 

463 costs_rel = self.costs.value / self.energy_flex.value 

464 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value 

465 

466 # Set value 

467 self.costs_rel.value = costs_rel 

468 self.corrected_costs_rel.value = corrected_costs_rel 

469 

470 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: 

471 """Get the KPIs as a dictionary with names or identifier as keys. 

472 

473 Args: 

474 identifier: If True, the keys are the identifiers of the KPIs, 

475 otherwise the name of the KPI. 

476 

477 Returns: 

478 A dictionary mapping desired KPI keys to KPI. 

479 

480 """ 

481 kpi_dict = {} 

482 for kpi in vars(self).values(): 

483 if isinstance(kpi, KPI): 

484 if identifier: 

485 kpi_dict[kpi.get_kpi_identifier()] = kpi 

486 else: 

487 kpi_dict[kpi.name] = kpi 

488 return kpi_dict 

489 

490 def get_name_dict(self) -> dict[str, str]: 

491 """Get KPIs mapping. 

492 

493 Returns: 

494 Dictionary of the kpis with names as keys and the identifiers as values. 

495 

496 """ 

497 name_dict = {} 

498 for name, kpi in self.get_kpi_dict(identifier=False).items(): 

499 name_dict[name] = kpi.get_kpi_identifier() 

500 return name_dict 

501 

502 

503class FlexibilityData(pydantic.BaseModel): 

504 """Class containing the data for the calculation of the flexibility.""" 

505 

506 # Time parameters 

507 mpc_time_grid: np.ndarray = pydantic.Field( 

508 default=None, 

509 description="Time grid of the mpcs", 

510 ) 

511 flex_offer_time_grid: np.ndarray = pydantic.Field( 

512 default=None, 

513 description="Time grid of the flexibility offer", 

514 ) 

515 switch_time: Optional[float] = pydantic.Field( 

516 default=None, 

517 description="Time of the switch between the preparation and the market time", 

518 ) 

519 

520 # Profiles 

521 power_profile_base: pd.Series = pydantic.Field( 

522 default=None, 

523 description="Base power profile", 

524 ) 

525 power_profile_flex_neg: pd.Series = pydantic.Field( 

526 default=None, 

527 description="Power profile of the negative flexibility", 

528 ) 

529 power_profile_flex_pos: pd.Series = pydantic.Field( 

530 default=None, 

531 description="Power profile of the positive flexibility", 

532 ) 

533 stored_energy_profile_base: pd.Series = pydantic.Field( 

534 default=None, 

535 description="Base profile of the stored electrical energy", 

536 ) 

537 stored_energy_profile_flex_neg: pd.Series = pydantic.Field( 

538 default=None, 

539 description="Profile of the stored electrical energy for negative flexibility", 

540 ) 

541 stored_energy_profile_flex_pos: pd.Series = pydantic.Field( 

542 default=None, 

543 description="Profile of the stored elctrical energy for positive flexibility", 

544 ) 

545 electricity_price_series: pd.Series = pydantic.Field( 

546 default=None, 

547 description="Profile of the electricity price", 

548 ) 

549 feed_in_price_series: pd.Series = pydantic.Field( 

550 default=None, 

551 description="Profile of the electricity feed-in price", 

552 ) 

553 

554 # KPIs 

555 kpis_pos: FlexibilityKPIs = pydantic.Field( 

556 default=FlexibilityKPIs(direction="positive"), 

557 description="KPIs for positive flexibility", 

558 ) 

559 kpis_neg: FlexibilityKPIs = pydantic.Field( 

560 default=FlexibilityKPIs(direction="negative"), 

561 description="KPIs for negative flexibility", 

562 ) 

563 

564 class Config: 

565 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray 

566 in model fields without requiring custom validators.""" 

567 

568 arbitrary_types_allowed = True 

569 

570 def __init__( 

571 self, 

572 prep_time: int, 

573 market_time: int, 

574 flex_event_duration: int, 

575 time_step: int, 

576 prediction_horizon: int, 

577 **data, 

578 ): 

579 super().__init__(**data) 

580 self.switch_time = prep_time + market_time 

581 self.flex_offer_time_grid = np.arange( 

582 self.switch_time, self.switch_time + flex_event_duration + time_step, time_step 

583 ) 

584 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step + time_step, time_step) 

585 self._common_time_grid = None # Initialize common time grid 

586 

587 def unify_inputs(self, series: pd.Series, mpc=True) -> pd.Series: 

588 """Format the input of the mpc to unify the data. 

589 

590 Args: 

591 series: Input series from a mpc. 

592 

593 Returns: 

594 Formatted series. 

595 

596 """ 

597 if mpc: 

598 series = series 

599 else: 

600 series.index = series.index - series.index[0] 

601 

602 # Ensure series has values at mpc_time_grid points 

603 mpc_points_in_series = np.isin(self.mpc_time_grid, series.index) 

604 if not all(mpc_points_in_series): 

605 # Create a temp series with all mpc points 

606 temp_series = pd.Series(index=pd.Index(self.mpc_time_grid), dtype=series.dtype) 

607 # Merge with original series 

608 merged_series = pd.concat([series, temp_series]) 

609 # Remove duplicates keeping the original values 

610 merged_series = merged_series[~merged_series.index.duplicated(keep="first")] 

611 # Sort by index 

612 series = merged_series.sort_index() 

613 # Fill NaNs 

614 if mpc: 

615 # only fill NaN if there is NaN except for the first value 

616 if any(np.isnan(series.loc[1:])): 

617 series = fill_nans(series=series, method=MEAN) 

618 # ensure the first value is nan, since it is calculated with the state from the 

619 # controlled system and thus the same for baseline and shadow mpcs 

620 series.iloc[0] = np.nan 

621 

622 if not mpc: 

623 series = series.ffill() # price signals are typically steps 

624 

625 # Check for NaNs 

626 if any(series.loc[1:].isna()): 

627 raise ValueError( 

628 f"The mpc time grid is not compatible with the mpc input " 

629 f"provided for kpi calculation, " 

630 f"which leads to NaN values in the series.\n" 

631 f"MPC time grid:{self.mpc_time_grid}\n" 

632 f"Series index:{series.index} \n" 

633 f"Check time steps of the mpcs as well as casadi simulator " 

634 f"step sizes." 

635 ) 

636 return series 

637 

638 def calculate( 

639 self, 

640 enable_energy_costs_correction: bool, 

641 calculate_flex_cost: bool, 

642 integration_method: INTEGRATION_METHOD, 

643 collocation_time_grid: list = None, 

644 ): 

645 """Calculate the KPIs for the positive and negative flexibility. 

646 

647 Args: 

648 enable_energy_costs_correction: whether the energy costs should be corrected 

649 calculate_flex_cost: whether the cost of the flexibility should be calculated 

650 integration_method: method used for integration of KPISeries e.g. linear, constant 

651 collocation_time_grid: Time grid of the mpc output with collocation discretization 

652 

653 """ 

654 self.kpis_pos.calculate( 

655 power_profile_base=self.power_profile_base, 

656 power_profile_shadow=self.power_profile_flex_pos, 

657 electricity_price_series=self.electricity_price_series, 

658 feed_in_price_series=self.feed_in_price_series, 

659 mpc_time_grid=self.mpc_time_grid, 

660 flex_offer_time_grid=self.flex_offer_time_grid, 

661 stored_energy_base=self.stored_energy_profile_base, 

662 stored_energy_shadow=self.stored_energy_profile_flex_pos, 

663 enable_energy_costs_correction=enable_energy_costs_correction, 

664 calculate_flex_cost=calculate_flex_cost, 

665 integration_method=integration_method, 

666 collocation_time_grid=collocation_time_grid, 

667 ) 

668 self.kpis_neg.calculate( 

669 power_profile_base=self.power_profile_base, 

670 power_profile_shadow=self.power_profile_flex_neg, 

671 electricity_price_series=self.electricity_price_series, 

672 feed_in_price_series=self.feed_in_price_series, 

673 mpc_time_grid=self.mpc_time_grid, 

674 flex_offer_time_grid=self.flex_offer_time_grid, 

675 stored_energy_base=self.stored_energy_profile_base, 

676 stored_energy_shadow=self.stored_energy_profile_flex_neg, 

677 enable_energy_costs_correction=enable_energy_costs_correction, 

678 calculate_flex_cost=calculate_flex_cost, 

679 integration_method=integration_method, 

680 collocation_time_grid=collocation_time_grid, 

681 ) 

682 self.reset_time_grid() 

683 return self.kpis_pos, self.kpis_neg 

684 

685 def get_kpis(self) -> dict[str, KPI]: 

686 """Return combined KPIs from positive and negative flexibility scenarios.""" 

687 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict( 

688 identifier=True 

689 ) 

690 return kpis_dict 

691 

692 def reset_time_grid(self): 

693 """ 

694 Reset the common time grid. 

695 This should be called between different flexibility calculations. 

696 """ 

697 self._common_time_grid = None 

698 

699 def update_profile(self, name: str, value: pd.Series, mpc:bool) -> None: 

700 """Update a specific profile for calculation with a new value.""" 

701 if value is not None: 

702 value = self.unify_inputs(series=value, mpc= mpc) 

703 setattr(self, name, value) 

704 

705