Coverage for agentlib_flexquant/data_structures/flex_kpis.py: 90%

211 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-06-17 09:09 +0000

1""" 

2Module for representing and calculating flexibility KPIs. It defines Pydantic models 

3for scalar and time-series KPIs, and provides methods to compute power, energy, 

4and cost metrics for positive and negative flexibility scenarios. 

5""" 

6from typing import Optional, Union 

7 

8import numpy as np 

9import pandas as pd 

10import pydantic 

11from agentlib_mpc.utils import TIME_CONVERSION, TimeConversionTypes 

12 

13from agentlib_flexquant.data_structures.globals import ( 

14 FlexibilityDirections, 

15 LINEAR, 

16 CONSTANT, 

17 INTEGRATION_METHOD, 

18) 

19from agentlib_flexquant.utils.data_handling import MEAN, fill_nans, strip_multi_index 

20 

21 

22class KPI(pydantic.BaseModel): 

23 """Class defining attributes of the indicator KPI.""" 

24 

25 name: str = pydantic.Field( 

26 default=None, 

27 description="Name of the flexibility KPI", 

28 ) 

29 value: Union[float, None] = pydantic.Field( 

30 default=None, 

31 description="Value of the flexibility KPI", 

32 ) 

33 unit: str = pydantic.Field( 

34 default=None, 

35 description="Unit of the flexibility KPI", 

36 ) 

37 direction: Union[FlexibilityDirections, None] = pydantic.Field( 

38 default=None, description="Direction of the shadow mpc / flexibility" 

39 ) 

40 

41 class Config: 

42 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray 

43 in model fields without requiring custom validators.""" 

44 

45 arbitrary_types_allowed = True 

46 

47 def get_kpi_identifier(self): 

48 """Get the identifier of the KPI composed of the direction of the flexibility 

49 and the KPI name.""" 

50 name = f"{self.direction}_{self.name}" 

51 return name 

52 

53 

54class KPISeries(KPI): 

55 """Class defining extra attributes of the indicator KPISeries in addition to KPI.""" 

56 

57 value: Union[pd.Series, None] = pydantic.Field( 

58 default=None, 

59 description="Value of the flexibility KPI", 

60 ) 

61 dt: Union[pd.Series, None] = pydantic.Field( 

62 default=None, 

63 description="Time differences between the timestamps of the series in seconds", 

64 ) 

65 integration_method: INTEGRATION_METHOD = pydantic.Field( 

66 default=LINEAR, description="Method set to integrate series variable" 

67 ) 

68 

69 def _get_dt(self) -> pd.Series: 

70 """Get the forward time differences between the timestamps of the series.""" 

71 # compute forward differences between consecutive timestamps 

72 dt = pd.Series(index=self.value.index, data=self.value.index).diff().shift(-1) 

73 # set the last value of dt to zero since there is no subsequent time step to compute a 

74 # difference with 

75 dt.iloc[-1] = 0 

76 self.dt = dt 

77 return dt 

78 

79 def min(self) -> float: 

80 """Get the minimum of a KPISeries.""" 

81 return self.value.min() 

82 

83 def max(self) -> float: 

84 """Get the maximum of a KPISeries.""" 

85 return self.value.max() 

86 

87 def avg(self) -> float: 

88 """Calculate the average value of the KPISeries over time.""" 

89 if self.dt is None: 

90 self._get_dt() 

91 delta_t = self.dt.sum() 

92 avg = self.integrate() / delta_t 

93 return avg 

94 

95 def integrate(self, time_unit: TimeConversionTypes = "seconds") -> float: 

96 """Integrate the value of the KPISeries over time by summing up 

97 the product of values and the time difference. 

98 

99 Args: 

100 time_unit: The time unit the integrated value should have 

101 

102 Returns: 

103 The integrated value of the KPISeries 

104 

105 """ 

106 if self.integration_method == LINEAR: 

107 # Linear integration: apply the trapezoidal rule, which assumes 

108 # the function changes linearly between sample points 

109 return np.trapezoid(self.value.values, 

110 self.value.index) / TIME_CONVERSION[time_unit] 

111 if self.integration_method == CONSTANT: 

112 # Constant integration: use a step-wise constant approach by 

113 # holding the value constant over each interval 

114 return ( 

115 np.sum(self.value.values[:-1] * self._get_dt().iloc[:-1]) 

116 / TIME_CONVERSION[time_unit] 

117 ) 

118 

119 

120class FlexibilityKPIs(pydantic.BaseModel): 

121 """Class defining the indicator KPIs.""" 

122 

123 # Direction 

124 direction: FlexibilityDirections = pydantic.Field( 

125 default=None, description="Direction of the shadow mpc" 

126 ) 

127 

128 # Power / energy KPIs 

129 power_flex_full: KPISeries = pydantic.Field( 

130 default=KPISeries(name="power_flex_full", unit="kW", integration_method=LINEAR), 

131 description="Power flexibility", 

132 ) 

133 power_flex_offer: KPISeries = pydantic.Field( 

134 default=KPISeries(name="power_flex_offer", unit="kW", integration_method=LINEAR), 

135 description="Power flexibility", 

136 ) 

137 power_flex_offer_max: KPI = pydantic.Field( 

138 default=KPI(name="power_flex_offer_max", unit="kW"), 

139 description="Maximum power flexibility", 

140 ) 

141 power_flex_offer_min: KPI = pydantic.Field( 

142 default=KPI(name="power_flex_offer_min", unit="kW"), 

143 description="Minimum power flexibility", 

144 ) 

145 power_flex_offer_avg: KPI = pydantic.Field( 

146 default=KPI(name="power_flex_offer_avg", unit="kW"), 

147 description="Average power flexibility", 

148 ) 

149 energy_flex: KPI = pydantic.Field( 

150 default=KPI(name="energy_flex", unit="kWh"), 

151 description="Energy flexibility equals the integral of the power flexibility", 

152 ) 

153 power_flex_within_boundary: KPI = pydantic.Field( 

154 default=KPI(name="power_flex_within_boundary", unit="-"), 

155 description=( 

156 "Variable indicating whether the baseline power and flex power " 

157 "align at the horizon end" 

158 ), 

159 ) 

160 

161 # Costs KPIs 

162 electricity_costs_series: KPISeries = pydantic.Field( 

163 default=KPISeries(name="electricity_costs_series", unit="ct/h", integration_method=LINEAR), 

164 description="Difference in electricity costs between shadow and baseline mpc over full prediction horizon", 

165 ) 

166 costs: KPI = pydantic.Field( 

167 default=KPI(name="costs", unit="ct"), 

168 description="Costs of flexibility", 

169 ) 

170 corrected_costs: KPI = pydantic.Field( 

171 default=KPI(name="corrected_costs", unit="ct"), 

172 description="Corrected costs of flexibility considering the stored " 

173 "energy in the system", 

174 ) 

175 costs_rel: KPI = pydantic.Field( 

176 default=KPI(name="costs_rel", unit="ct/kWh"), 

177 description="Costs of flexibility per energy", 

178 ) 

179 corrected_costs_rel: KPI = pydantic.Field( 

180 default=KPI(name="corrected_costs_rel", unit="ct/kWh"), 

181 description="Corrected costs of flexibility per energy", 

182 ) 

183 

184 def __init__(self, direction: FlexibilityDirections, **data): 

185 super().__init__(**data) 

186 self.direction = direction 

187 for kpi in vars(self).values(): 

188 if isinstance(kpi, KPI): 

189 kpi.direction = self.direction 

190 

191 def calculate( 

192 self, 

193 power_profile_base: pd.Series, 

194 power_profile_shadow: pd.Series, 

195 electricity_price_series: pd.Series, 

196 feed_in_price_series: pd.Series, 

197 mpc_time_grid: np.ndarray, 

198 flex_offer_time_grid: np.ndarray, 

199 stored_energy_base: pd.Series, 

200 stored_energy_shadow: pd.Series, 

201 eta_thermal_base: pd.Series, 

202 enable_energy_costs_correction: bool, 

203 calculate_flex_cost: bool, 

204 integration_method: INTEGRATION_METHOD, 

205 collocation_time_grid: list = None, 

206 ): 

207 """Calculate the KPIs based on the power and electricity price input profiles. 

208 

209 Args: 

210 power_profile_base: power profile from baseline mpc 

211 power_profile_shadow: power profile from shadow mpc 

212 electricity_price_series: time series of electricity prices 

213 feed_in_price_series: time series of electricity feed-in prices 

214 flex_offer_time_grid: time grid over which the flexibility offer is calculated, 

215 for indexing of the power flexibility profiles 

216 stored_energy_base: time series of stored energy from baseline mpc 

217 stored_energy_shadow: time series of stored energy from shadow mpc 

218 eta_thermal_base: time series of efficiency of thermal generation unit of baseline mpc 

219 enable_energy_costs_correction: whether the energy costs should be corrected 

220 calculate_flex_cost: whether the cost of the flexibility should be calculated 

221 integration_method: method used for integration of KPISeries e.g. linear, constant 

222 collocation_time_grid: Time grid of the mpc output with collocation discretization 

223 

224 

225 """ 

226 # Power / energy KPIs 

227 self._calculate_power_flex( 

228 power_profile_base=power_profile_base, 

229 power_profile_shadow=power_profile_shadow, 

230 flex_offer_time_grid=flex_offer_time_grid, 

231 integration_method=integration_method, 

232 ) 

233 self._calculate_power_flex_stats( 

234 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid 

235 ) 

236 self._calculate_energy_flex( 

237 mpc_time_grid=mpc_time_grid, collocation_time_grid=collocation_time_grid 

238 ) 

239 

240 # Costs KPIs 

241 if enable_energy_costs_correction: 

242 stored_energy_diff = stored_energy_shadow.values[-1] - stored_energy_base.values[-1] 

243 if eta_thermal_base is None: 

244 eta_thermal_base_avg = 1 

245 else: 

246 eta_thermal_base_avg = eta_thermal_base.mean() 

247 else: 

248 stored_energy_diff = 0 

249 eta_thermal_base_avg = 1 

250 

251 if calculate_flex_cost: 

252 self._calculate_costs( 

253 electricity_price_signal=electricity_price_series, 

254 feed_in_price_signal=feed_in_price_series, 

255 power_profile_base=power_profile_base, 

256 power_profile_shadow=power_profile_shadow, 

257 stored_energy_diff=stored_energy_diff, 

258 eta_thermal_base_avg=eta_thermal_base_avg, 

259 integration_method=integration_method, 

260 mpc_time_grid=mpc_time_grid, 

261 collocation_time_grid=collocation_time_grid, 

262 ) 

263 self._calculate_costs_rel() 

264 

265 def _calculate_power_flex( 

266 self, 

267 power_profile_base: pd.Series, 

268 power_profile_shadow: pd.Series, 

269 flex_offer_time_grid: np.ndarray, 

270 integration_method: INTEGRATION_METHOD, 

271 relative_error_acceptance: float = 0.01, 

272 ): 

273 """Calculate the power flexibility based on the base and flexibility power profiles. 

274 

275 Args: 

276 power_profile_base: power profile from the baseline mpc 

277 power_profile_shadow: power profile from the shadow mpc 

278 flex_offer_time_grid: time grid over which the flexibility offer is calculated 

279 integration_method: method used for integration of KPISeries e.g. linear, constant 

280 relative_error_acceptance: threshold for the relative error between the baseline 

281 and shadow mpc to set the power flexibility to zero 

282 

283 """ 

284 if not power_profile_shadow.index.equals(power_profile_base.index): 

285 raise ValueError( 

286 f"Indices of power profiles do not match.\n" 

287 f"Baseline: {power_profile_base.index}\n" 

288 f"Shadow: {power_profile_shadow.index}" 

289 ) 

290 

291 # Calculate power flexibility trajectory 

292 if self.direction == "positive": 

293 power_flex = power_profile_base - power_profile_shadow 

294 elif self.direction == "negative": 

295 power_flex = power_profile_shadow - power_profile_base 

296 else: 

297 raise ValueError( 

298 f"Direction of KPIs not properly defined: {self.direction}") 

299 

300 # Set values to zero if the difference is small 

301 relative_difference = (power_flex / power_profile_base).abs() 

302 power_flex.loc[relative_difference < relative_error_acceptance] = 0 

303 

304 # Set values 

305 self.power_flex_full.value = power_flex 

306 self.power_flex_offer.value = power_flex.loc[ 

307 flex_offer_time_grid[0] : flex_offer_time_grid[-1] 

308 ] 

309 

310 # Set integration method 

311 self.power_flex_full.integration_method = integration_method 

312 self.power_flex_offer.integration_method = integration_method 

313 

314 def _calculate_power_flex_stats( 

315 self, mpc_time_grid: np.array, collocation_time_grid: list = None 

316 ): 

317 """Calculate the characteristic values of the power flexibility for the offer.""" 

318 if self.power_flex_offer.value is None: 

319 raise ValueError("Power flexibility value is empty.") 

320 

321 # Calculate characteristic values 

322 # max and min of power flex offer 

323 power_flex_offer = self.power_flex_offer.value.iloc[:-1].drop( 

324 collocation_time_grid, errors="ignore" 

325 ) 

326 power_flex_offer_max = power_flex_offer.max() 

327 power_flex_offer_min = power_flex_offer.min() 

328 # Average of the power flex offer 

329 # Get the series for integration before calculating average 

330 power_flex_offer_integration = self._get_series_for_integration( 

331 series=self.power_flex_offer, mpc_time_grid=mpc_time_grid 

332 ) 

333 power_flex_offer_integration.value = power_flex_offer_integration.value.drop( 

334 collocation_time_grid, errors="ignore" 

335 ) 

336 # Calculate the average and stores the original value 

337 power_flex_offer_avg = power_flex_offer_integration.avg() 

338 

339 # Set values 

340 self.power_flex_offer_max.value = power_flex_offer_max 

341 self.power_flex_offer_min.value = power_flex_offer_min 

342 self.power_flex_offer_avg.value = power_flex_offer_avg 

343 

344 def _get_series_for_integration( 

345 self, series: KPISeries, mpc_time_grid: np.ndarray 

346 ) -> KPISeries: 

347 """Return the KPISeries value sampled on the MPC time grid when the integration 

348 method is constant. 

349 

350 Otherwise, the original value is returned. 

351 

352 Args: 

353 series: the KPISeries to get value from 

354 mpc_time_grid: the MPC time grid over the horizon 

355 

356 """ 

357 if series.integration_method == CONSTANT: 

358 series = series.__deepcopy__() 

359 series.value = series.value.reindex(mpc_time_grid).dropna() 

360 return series 

361 else: 

362 return series.__deepcopy__() 

363 

364 def _calculate_energy_flex(self, mpc_time_grid, collocation_time_grid: list = None): 

365 """Calculate the energy flexibility by integrating the power flexibility 

366 of the offer window.""" 

367 if self.power_flex_offer.value is None: 

368 raise ValueError("Power flexibility value of the offer is empty.") 

369 

370 # Calculate flexibility 

371 # Get the series for integration before calculating average 

372 power_flex_offer_integration = self._get_series_for_integration( 

373 series=self.power_flex_offer, mpc_time_grid=mpc_time_grid 

374 ) 

375 power_flex_offer_integration.value = power_flex_offer_integration.value.drop( 

376 collocation_time_grid, errors="ignore" 

377 ) 

378 # Calculate the energy flex and stores the original value 

379 energy_flex = power_flex_offer_integration.integrate(time_unit="hours") 

380 

381 # Set value 

382 self.energy_flex.value = energy_flex 

383 

384 def _calculate_costs( 

385 self, 

386 electricity_price_signal: pd.Series, 

387 feed_in_price_signal: pd.Series, 

388 power_profile_base: pd.Series, 

389 power_profile_shadow: pd.Series, 

390 stored_energy_diff: float, 

391 eta_thermal_base_avg: float, 

392 integration_method: INTEGRATION_METHOD, 

393 mpc_time_grid: np.ndarray, 

394 collocation_time_grid: list = None, 

395 ): 

396 """Calculate the costs of the flexibility event based on the electricity costs profile, 

397 the power flexibility profile and difference of stored energy. 

398 

399 Args: 

400 electricity_price_signal: time series of the electricity price signal 

401 feed_in_price_signal: time series of the feed-in price signal 

402 power_profile_base: baseline power profile used to select tariff by sign 

403 power_profile_shadow: shadow mpc power profile used to select tariff by sign 

404 stored_energy_diff: the difference of the stored energy between baseline and shadow mpc 

405 eta_thermal_base: average efficiency of thermal generation unit 

406 integration_method: the integration method used to integrate KPISeries 

407 mpc_time_grid: the MPC time grid over the horizon 

408 collocation_time_grid: Time grid of the mpc output with collocation discretization 

409 

410 

411 """ 

412 if not power_profile_shadow.index.equals(power_profile_base.index): 

413 raise ValueError( 

414 f"Indices of power profiles do not match.\n" 

415 f"Baseline: {power_profile_base.index}\n" 

416 f"Shadow: {power_profile_shadow.index}" 

417 ) 

418 

419 # Set integration method 

420 self.power_flex_full.integration_method = integration_method 

421 self.electricity_costs_series.integration_method = integration_method 

422 

423 # if there is no feed-in tariff provided, the electricity price signal is used for both consumption and feed-in 

424 if feed_in_price_signal is None: 

425 feed_in_price_signal = electricity_price_signal 

426 

427 # Select tariff based on the sign of each profile 

428 effective_price_base = electricity_price_signal.where( 

429 power_profile_base > 0, 

430 feed_in_price_signal, 

431 ) 

432 effective_price_shadow = electricity_price_signal.where( 

433 power_profile_shadow > 0, 

434 feed_in_price_signal, 

435 ) 

436 

437 cost_profile_base = power_profile_base * effective_price_base 

438 cost_profile_shadow = power_profile_shadow * effective_price_shadow 

439 

440 # Get the series for integration before calculating 

441 power_flex_full_integration = self._get_series_for_integration( 

442 series=self.power_flex_full, mpc_time_grid=mpc_time_grid 

443 ) 

444 power_flex_full_integration.value = power_flex_full_integration.value.drop( 

445 collocation_time_grid, errors="ignore" 

446 ) 

447 

448 # Difference in costs between shadow and baseline mpc 

449 delta_cost = cost_profile_shadow - cost_profile_base 

450 delta_cost = delta_cost.reindex(power_flex_full_integration.value.index) 

451 delta_cost = delta_cost.where(power_flex_full_integration.value != 0, 0) 

452 self.electricity_costs_series.value = delta_cost.dropna() 

453 

454 # Calculate the costs and stores the original value 

455 costs = self.electricity_costs_series.integrate(time_unit="hours") 

456 

457 # correct the costs 

458 corrected_costs = costs - stored_energy_diff * np.mean(electricity_price_signal) / eta_thermal_base_avg 

459 

460 self.costs.value = costs 

461 self.corrected_costs.value = corrected_costs 

462 

463 def _calculate_costs_rel(self): 

464 """Calculate the relative costs of the flexibility event per energy flexibility.""" 

465 if self.energy_flex.value == 0: 

466 costs_rel = 0 

467 corrected_costs_rel = 0 

468 else: 

469 costs_rel = self.costs.value / self.energy_flex.value 

470 corrected_costs_rel = self.corrected_costs.value / self.energy_flex.value 

471 

472 # Set value 

473 self.costs_rel.value = costs_rel 

474 self.corrected_costs_rel.value = corrected_costs_rel 

475 

476 def get_kpi_dict(self, identifier: bool = False) -> dict[str, KPI]: 

477 """Get the KPIs as a dictionary with names or identifier as keys. 

478 

479 Args: 

480 identifier: If True, the keys are the identifiers of the KPIs, 

481 otherwise the name of the KPI. 

482 

483 Returns: 

484 A dictionary mapping desired KPI keys to KPI. 

485 

486 """ 

487 kpi_dict = {} 

488 for kpi in vars(self).values(): 

489 if isinstance(kpi, KPI): 

490 if identifier: 

491 kpi_dict[kpi.get_kpi_identifier()] = kpi 

492 else: 

493 kpi_dict[kpi.name] = kpi 

494 return kpi_dict 

495 

496 def get_name_dict(self) -> dict[str, str]: 

497 """Get KPIs mapping. 

498 

499 Returns: 

500 Dictionary of the kpis with names as keys and the identifiers as values. 

501 

502 """ 

503 name_dict = {} 

504 for name, kpi in self.get_kpi_dict(identifier=False).items(): 

505 name_dict[name] = kpi.get_kpi_identifier() 

506 return name_dict 

507 

508 

509class FlexibilityData(pydantic.BaseModel): 

510 """Class containing the data for the calculation of the flexibility.""" 

511 

512 # Time parameters 

513 mpc_time_grid: np.ndarray = pydantic.Field( 

514 default=None, 

515 description="Time grid of the mpcs", 

516 ) 

517 flex_offer_time_grid: np.ndarray = pydantic.Field( 

518 default=None, 

519 description="Time grid of the flexibility offer", 

520 ) 

521 switch_time: Optional[float] = pydantic.Field( 

522 default=None, 

523 description="Time of the switch between the preparation and the market time", 

524 ) 

525 

526 # Profiles 

527 power_profile_base: pd.Series = pydantic.Field( 

528 default=None, 

529 description="Base power profile", 

530 ) 

531 power_profile_flex_neg: pd.Series = pydantic.Field( 

532 default=None, 

533 description="Power profile of the negative flexibility", 

534 ) 

535 power_profile_flex_pos: pd.Series = pydantic.Field( 

536 default=None, 

537 description="Power profile of the positive flexibility", 

538 ) 

539 stored_energy_profile_base: pd.Series = pydantic.Field( 

540 default=None, 

541 description="Base profile of the stored electrical energy", 

542 ) 

543 stored_energy_profile_flex_neg: pd.Series = pydantic.Field( 

544 default=None, 

545 description="Profile of the stored electrical energy for negative flexibility", 

546 ) 

547 stored_energy_profile_flex_pos: pd.Series = pydantic.Field( 

548 default=None, 

549 description="Profile of the stored elctrical energy for positive flexibility", 

550 ) 

551 eta_thermal_base: pd.Series = pydantic.Field( 

552 default=None, 

553 description="Efficiency of the thermal generation unit e.g. COP of heatpump", 

554 ) 

555 electricity_price_series: pd.Series = pydantic.Field( 

556 default=None, 

557 description="Profile of the electricity price", 

558 ) 

559 feed_in_price_series: pd.Series = pydantic.Field( 

560 default=None, 

561 description="Profile of the electricity feed-in price", 

562 ) 

563 

564 # KPIs 

565 kpis_pos: FlexibilityKPIs = pydantic.Field( 

566 default=FlexibilityKPIs(direction="positive"), 

567 description="KPIs for positive flexibility", 

568 ) 

569 kpis_neg: FlexibilityKPIs = pydantic.Field( 

570 default=FlexibilityKPIs(direction="negative"), 

571 description="KPIs for negative flexibility", 

572 ) 

573 

574 class Config: 

575 """Allow arbitrary (non-Pydantic) types such as pandas.Series or numpy.ndarray 

576 in model fields without requiring custom validators.""" 

577 

578 arbitrary_types_allowed = True 

579 

580 def __init__( 

581 self, 

582 prep_time: int, 

583 market_time: int, 

584 flex_event_duration: int, 

585 time_step: int, 

586 prediction_horizon: int, 

587 **data, 

588 ): 

589 super().__init__(**data) 

590 self.switch_time = prep_time + market_time 

591 self.flex_offer_time_grid = np.arange( 

592 self.switch_time, self.switch_time + flex_event_duration + time_step, time_step 

593 ) 

594 self.mpc_time_grid = np.arange(0, prediction_horizon * time_step + time_step, time_step) 

595 self._common_time_grid = None # Initialize common time grid 

596 

597 def unify_inputs(self, series: pd.Series, mpc=True) -> pd.Series: 

598 """Format the input of the mpc to unify the data. 

599 

600 Args: 

601 series: Input series from a mpc. 

602 

603 Returns: 

604 Formatted series. 

605 

606 """ 

607 if mpc: 

608 series = series 

609 else: 

610 series.index = series.index - series.index[0] 

611 

612 # Ensure series has values at mpc_time_grid points 

613 mpc_points_in_series = np.isin(self.mpc_time_grid, series.index) 

614 if not all(mpc_points_in_series): 

615 # Create a temp series with all mpc points 

616 temp_series = pd.Series(index=pd.Index(self.mpc_time_grid), dtype=series.dtype) 

617 # Merge with original series 

618 merged_series = pd.concat([series, temp_series]) 

619 # Remove duplicates keeping the original values 

620 merged_series = merged_series[~merged_series.index.duplicated(keep="first")] 

621 # Sort by index 

622 series = merged_series.sort_index() 

623 # Fill NaNs 

624 if mpc: 

625 # only fill NaN if there is NaN except for the first value 

626 if any(np.isnan(series.loc[1:])): 

627 series = fill_nans(series=series, method=MEAN) 

628 

629 if not mpc: 

630 series = series.ffill() # price signals are typically steps 

631 

632 # Check for NaNs 

633 if any(series.loc[1:].isna()): 

634 raise ValueError( 

635 f"The mpc time grid is not compatible with the mpc input " 

636 f"provided for kpi calculation, " 

637 f"which leads to NaN values in the series.\n" 

638 f"MPC time grid:{self.mpc_time_grid}\n" 

639 f"Series index:{series.index} \n" 

640 f"Check time steps of the mpcs as well as casadi simulator " 

641 f"step sizes." 

642 ) 

643 return series 

644 

645 def calculate( 

646 self, 

647 enable_energy_costs_correction: bool, 

648 calculate_flex_cost: bool, 

649 integration_method: INTEGRATION_METHOD, 

650 collocation_time_grid: list = None, 

651 ): 

652 """Calculate the KPIs for the positive and negative flexibility. 

653 

654 Args: 

655 enable_energy_costs_correction: whether the energy costs should be corrected 

656 calculate_flex_cost: whether the cost of the flexibility should be calculated 

657 integration_method: method used for integration of KPISeries e.g. linear, constant 

658 collocation_time_grid: Time grid of the mpc output with collocation discretization 

659 

660 """ 

661 self.kpis_pos.calculate( 

662 power_profile_base=self.power_profile_base, 

663 power_profile_shadow=self.power_profile_flex_pos, 

664 electricity_price_series=self.electricity_price_series, 

665 feed_in_price_series=self.feed_in_price_series, 

666 mpc_time_grid=self.mpc_time_grid, 

667 flex_offer_time_grid=self.flex_offer_time_grid, 

668 stored_energy_base=self.stored_energy_profile_base, 

669 stored_energy_shadow=self.stored_energy_profile_flex_pos, 

670 eta_thermal_base=self.eta_thermal_base, 

671 enable_energy_costs_correction=enable_energy_costs_correction, 

672 calculate_flex_cost=calculate_flex_cost, 

673 integration_method=integration_method, 

674 collocation_time_grid=collocation_time_grid, 

675 ) 

676 self.kpis_neg.calculate( 

677 power_profile_base=self.power_profile_base, 

678 power_profile_shadow=self.power_profile_flex_neg, 

679 electricity_price_series=self.electricity_price_series, 

680 feed_in_price_series=self.feed_in_price_series, 

681 mpc_time_grid=self.mpc_time_grid, 

682 flex_offer_time_grid=self.flex_offer_time_grid, 

683 stored_energy_base=self.stored_energy_profile_base, 

684 stored_energy_shadow=self.stored_energy_profile_flex_neg, 

685 eta_thermal_base=self.eta_thermal_base, 

686 enable_energy_costs_correction=enable_energy_costs_correction, 

687 calculate_flex_cost=calculate_flex_cost, 

688 integration_method=integration_method, 

689 collocation_time_grid=collocation_time_grid, 

690 ) 

691 self.reset_time_grid() 

692 return self.kpis_pos, self.kpis_neg 

693 

694 def get_kpis(self) -> dict[str, KPI]: 

695 """Return combined KPIs from positive and negative flexibility scenarios.""" 

696 kpis_dict = self.kpis_pos.get_kpi_dict(identifier=True) | self.kpis_neg.get_kpi_dict( 

697 identifier=True 

698 ) 

699 return kpis_dict 

700 

701 def reset_time_grid(self): 

702 """ 

703 Reset the common time grid. 

704 This should be called between different flexibility calculations. 

705 """ 

706 self._common_time_grid = None 

707 

708 def update_profile(self, name: str, value: pd.Series, mpc:bool) -> None: 

709 """Update a specific profile for calculation with a new value.""" 

710 if value is not None: 

711 value = self.unify_inputs(series=value, mpc=mpc) 

712 setattr(self, name, value)