Coverage for agentlib_flexquant/modules/flexibility_indicator.py: 88%
192 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-15 15:25 +0000
1import logging
2import os
3import numpy as np
4import pandas as pd
5from pathlib import Path
6from typing import List, Optional
7from pydantic import BaseModel, ConfigDict, Field, model_validator
8import agentlib
9import agentlib_flexquant.data_structures.globals as glbs
10from agentlib_flexquant.data_structures.flex_kpis import FlexibilityData, FlexibilityKPIs
11from agentlib_flexquant.data_structures.flex_offer import FlexOffer
14class InputsForCorrectFlexCosts(BaseModel):
15 enable_energy_costs_correction: bool = Field(
16 name="enable_energy_costs_correction",
17 description="Variable determining whether to correct the costs of the flexible energy"
18 "Define the variable for stored electrical energy in the base MPC model and config as output if the correction of costs is enabled",
19 default=False
20 )
22 absolute_power_deviation_tolerance: float = Field(
23 name="absolute_power_deviation_tolerance",
24 default=0.1,
25 description="Absolute tolerance in kW within which no warning is thrown"
26 )
28 stored_energy_variable: str = Field(
29 name="stored_energy_variable",
30 default="E_stored",
31 description="Name of the variable representing the stored electrical energy in the baseline config"
32 )
35class InputsForCalculateFlexCosts(BaseModel):
36 use_constant_electricity_price: bool = Field(
37 default=False,
38 description="Use constant electricity price"
39 )
40 calculate_flex_costs: bool = Field(
41 default=True,
42 description="Calculate the flexibility cost"
43 )
44 const_electricity_price: float = Field(
45 default=np.nan,
46 description="constant electricity price in ct/kWh"
47 )
49 @model_validator(mode="after")
50 def validate_constant_price(cls, model):
51 if model.use_constant_electricity_price and np.isnan(model.const_electricity_price):
52 raise Exception(f'Constant electricity price must have a valid value in float if it is to be used for calculation. '
53 f'Received "use_constant_electricity_price": true, "const_electricity_price": {model.const_electricity_price}. '
54 f'Please specify them correctly in "calculate_costs" field in flex config.')
55 return model
58# Pos and neg kpis to get the right names for plotting
59kpis_pos = FlexibilityKPIs(direction="positive")
60kpis_neg = FlexibilityKPIs(direction="negative")
63class FlexibilityIndicatorModuleConfig(agentlib.BaseModuleConfig):
65 model_config = ConfigDict(
66 extra='forbid'
67 )
69 inputs: List[agentlib.AgentVariable] = [
70 agentlib.AgentVariable(name=glbs.POWER_ALIAS_BASE, unit="W", type="pd.Series",
71 description="The power input to the system"),
72 agentlib.AgentVariable(name=glbs.POWER_ALIAS_NEG, unit="W", type="pd.Series",
73 description="The power input to the system"),
74 agentlib.AgentVariable(name=glbs.POWER_ALIAS_POS, unit="W", type="pd.Series",
75 description="The power input to the system"),
76 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_BASE, unit="kWh", type="pd.Series",
77 description="Energy stored in the system w.r.t. 0K"),
78 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_NEG, unit="kWh", type="pd.Series",
79 description="Energy stored in the system w.r.t. 0K"),
80 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_POS, unit="kWh", type="pd.Series",
81 description="Energy stored in the system w.r.t. 0K")
82 ]
84 outputs: List[agentlib.AgentVariable] = [
85 # Flexibility offer
86 agentlib.AgentVariable(name=glbs.FlexibilityOffer, type="FlexOffer"),
88 # Power KPIs
89 agentlib.AgentVariable(
90 name=kpis_neg.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series",
91 description="Negative power flexibility"
92 ),
93 agentlib.AgentVariable(
94 name=kpis_pos.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series",
95 description="Positive power flexibility"
96 ),
97 agentlib.AgentVariable(
98 name=kpis_neg.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series",
99 description="Negative power flexibility"
100 ),
101 agentlib.AgentVariable(
102 name=kpis_pos.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series",
103 description="Positive power flexibility"
104 ),
105 agentlib.AgentVariable(
106 name=kpis_neg.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float",
107 description="Minimum of negative power flexibility"
108 ),
109 agentlib.AgentVariable(
110 name=kpis_pos.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float",
111 description="Minimum of positive power flexibility"
112 ),
113 agentlib.AgentVariable(
114 name=kpis_neg.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float",
115 description="Maximum of negative power flexibility"
116 ),
117 agentlib.AgentVariable(
118 name=kpis_pos.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float",
119 description="Maximum of positive power flexibility"
120 ),
121 agentlib.AgentVariable(
122 name=kpis_neg.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float",
123 description="Average of negative power flexibility"
124 ),
125 agentlib.AgentVariable(
126 name=kpis_pos.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float",
127 description="Average of positive power flexibility"
128 ),
129 agentlib.AgentVariable(
130 name=kpis_neg.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool",
131 description="Variable indicating whether the baseline power and flex power align at the horizon end"
132 ),
133 agentlib.AgentVariable(
134 name=kpis_pos.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool",
135 description="Variable indicating whether the baseline power and flex power align at the horizon end"
136 ),
138 # Energy KPIs
139 agentlib.AgentVariable(
140 name=kpis_neg.energy_flex.get_kpi_identifier(), unit='kWh', type="float",
141 description="Negative energy flexibility"
142 ),
143 agentlib.AgentVariable(
144 name=kpis_pos.energy_flex.get_kpi_identifier(), unit='kWh', type="float",
145 description="Positive energy flexibility"
146 ),
148 # Costs KPIs
149 agentlib.AgentVariable(
150 name=kpis_neg.costs.get_kpi_identifier(), unit="ct", type="float",
151 description="Saved costs due to baseline"
152 ),
153 agentlib.AgentVariable(
154 name=kpis_pos.costs.get_kpi_identifier(), unit="ct", type="float",
155 description="Saved costs due to baseline"
156 ),
157 agentlib.AgentVariable(
158 name=kpis_neg.corrected_costs.get_kpi_identifier(), unit="ct", type="float",
159 description="Corrected saved costs due to baseline"
160 ),
161 agentlib.AgentVariable(
162 name=kpis_pos.corrected_costs.get_kpi_identifier(), unit="ct", type="float",
163 description="Corrected saved costs due to baseline"
164 ),
165 agentlib.AgentVariable(
166 name=kpis_neg.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
167 description="Saved costs due to baseline"
168 ),
169 agentlib.AgentVariable(
170 name=kpis_pos.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
171 description="Saved costs due to baseline"
172 ),
173 agentlib.AgentVariable(
174 name=kpis_neg.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
175 description="Corrected saved costs per energy due to baseline"
176 ),
177 agentlib.AgentVariable(
178 name=kpis_pos.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
179 description="Corrected saved costs per energy due to baseline"
180 )
181 ]
183 parameters: List[agentlib.AgentVariable] = [
184 agentlib.AgentVariable(name=glbs.PREP_TIME, unit="s",
185 description="Preparation time"),
186 agentlib.AgentVariable(name=glbs.MARKET_TIME, unit="s",
187 description="Market time"),
188 agentlib.AgentVariable(name=glbs.FLEX_EVENT_DURATION, unit="s",
189 description="time to switch objective"),
190 agentlib.AgentVariable(name=glbs.TIME_STEP, unit="s",
191 description="timestep of the mpc solution"),
192 agentlib.AgentVariable(name=glbs.PREDICTION_HORIZON, unit="-",
193 description="prediction horizon of the mpc solution")
194 ]
196 results_file: Optional[Path] = Field(
197 default=Path("flexibility_indicator.csv"),
198 description="User specified results file name"
199 )
200 save_results: Optional[bool] = Field(
201 validate_default=True,
202 default=True
203 )
204 price_variable: str = Field(
205 default="c_pel",
206 description="Name of the price variable sent by a predictor",
207 )
208 power_unit: str = Field(
209 default="kW",
210 description="Unit of the power variable"
211 )
212 shared_variable_fields: List[str] = ["outputs"]
214 correct_costs: InputsForCorrectFlexCosts = InputsForCorrectFlexCosts()
215 calculate_costs: InputsForCalculateFlexCosts = InputsForCalculateFlexCosts()
217 @model_validator(mode="after")
218 def check_results_file_extension(self):
219 if self.results_file and self.results_file.suffix != ".csv":
220 raise ValueError(
221 f"Invalid file extension for 'results_file': '{self.results_file}'. "
222 f"Expected a '.csv' file."
223 )
224 return self
227class FlexibilityIndicatorModule(agentlib.BaseModule):
228 config: FlexibilityIndicatorModuleConfig
229 data: FlexibilityData
231 def __init__(self, *args, **kwargs):
232 super().__init__(*args, **kwargs)
233 self.var_list = []
234 for variable in self.variables:
235 if variable.name in [glbs.FlexibilityOffer]:
236 continue
237 self.var_list.append(variable.name)
238 self.time = []
239 self.in_provision = False
240 self.offer_count = 0
241 self.data = FlexibilityData(
242 prep_time=self.get(glbs.PREP_TIME).value,
243 market_time=self.get(glbs.MARKET_TIME).value,
244 flex_event_duration=self.get(glbs.FLEX_EVENT_DURATION).value,
245 time_step=self.get(glbs.TIME_STEP).value,
246 prediction_horizon=self.get(glbs.PREDICTION_HORIZON).value
247 )
248 self.df = pd.DataFrame(columns=pd.Series(self.var_list))
250 def register_callbacks(self):
251 inputs = self.config.inputs
252 for var in inputs:
253 self.agent.data_broker.register_callback(
254 name=var.name, alias=var.name, callback=self.callback
255 )
256 self.agent.data_broker.register_callback(
257 name="in_provision", alias="in_provision", callback=self.callback
258 )
260 def process(self):
261 yield self.env.event()
263 def callback(self, inp, name):
264 if name == "in_provision":
265 self.in_provision = inp.value
266 if self.in_provision:
267 self._set_inputs_to_none()
269 if not self.in_provision:
270 if name == glbs.POWER_ALIAS_BASE:
271 self.data.power_profile_base = self.data.format_mpc_inputs(inp.value)
272 elif name == glbs.POWER_ALIAS_NEG:
273 self.data.power_profile_flex_neg = self.data.format_mpc_inputs(inp.value)
274 elif name == glbs.POWER_ALIAS_POS:
275 self.data.power_profile_flex_pos = self.data.format_mpc_inputs(inp.value)
276 elif name == glbs.STORED_ENERGY_ALIAS_BASE:
277 self.data.stored_energy_profile_base = self.data.format_mpc_inputs(inp.value)
278 elif name == glbs.STORED_ENERGY_ALIAS_NEG:
279 self.data.stored_energy_profile_flex_neg = self.data.format_mpc_inputs(inp.value)
280 elif name == glbs.STORED_ENERGY_ALIAS_POS:
281 self.data.stored_energy_profile_flex_pos = self.data.format_mpc_inputs(inp.value)
282 elif name == self.config.price_variable:
283 if not self.config.calculate_costs.use_constant_electricity_price:
284 # price comes from predictor, so no stripping needed
285 self.data.electricity_price_series = self.data.format_predictor_inputs(inp.value)
287 # set the constant electricity price series if given
288 if self.config.calculate_costs.use_constant_electricity_price and self.data.electricity_price_series is None:
289 # get the index for the electricity price series
290 n = self.get(glbs.PREDICTION_HORIZON).value
291 ts = self.get(glbs.TIME_STEP).value
292 grid = np.arange(0, n * ts, ts)
293 # fill the electricity_price_series with values
294 electricity_price_series = pd.Series([self.config.calculate_costs.const_electricity_price for i in grid], index=grid)
295 self.data.electricity_price_series = self.data.format_predictor_inputs(electricity_price_series)
297 necessary_input_for_calc_flex = [self.data.power_profile_base,
298 self.data.power_profile_flex_neg,
299 self.data.power_profile_flex_pos]
301 if self.config.calculate_costs.calculate_flex_costs:
302 necessary_input_for_calc_flex.append(self.data.electricity_price_series)
304 if self.config.correct_costs.enable_energy_costs_correction:
305 necessary_input_for_calc_flex.extend(
306 [self.data.stored_energy_profile_base,
307 self.data.stored_energy_profile_flex_neg,
308 self.data.stored_energy_profile_flex_pos])
310 if all(var is not None for var in necessary_input_for_calc_flex):
312 # check the power profile end deviation
313 if not self.config.correct_costs.enable_energy_costs_correction:
314 self.check_power_end_deviation(tol=self.config.correct_costs.absolute_power_deviation_tolerance)
316 # Calculate the flexibility, send the offer, write and save the results
317 self.calc_and_send_offer()
319 # set the values to None to reset the callback
320 self._set_inputs_to_none()
322 def get_results(self) -> Optional[pd.DataFrame]:
323 """Open results file of flexibility_indicator.py."""
324 results_file = self.config.results_file
325 try:
326 results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
327 return results
328 except FileNotFoundError:
329 self.logger.error("Results file %s was not found.", results_file)
330 return None
332 def write_results(self, df: pd.DataFrame, ts: float, n: int) -> pd.DataFrame:
333 """Write every data of variables in self.var_list in an DataFrame.
335 DataFrame will be updated every time step
337 Args:
338 df: DataFrame which is initialised as an empty DataFrame with columns according to self.var_list
339 ts: time step
340 n: number of time steps during prediction horizon
342 Returns:
343 DataFrame with results of every variable in self.var_list
345 """
346 results = []
347 now = self.env.now
348 for name in self.var_list:
349 # Use the power variables averaged for each timestep, not the collocation values
350 if name == glbs.POWER_ALIAS_BASE:
351 values = self.data.power_profile_base
352 elif name == glbs.POWER_ALIAS_NEG:
353 values = self.data.power_profile_flex_neg
354 elif name == glbs.POWER_ALIAS_POS:
355 values = self.data.power_profile_flex_pos
356 elif name == glbs.STORED_ENERGY_ALIAS_BASE:
357 values = self.data.stored_energy_profile_base
358 elif name == glbs.STORED_ENERGY_ALIAS_NEG:
359 values = self.data.stored_energy_profile_flex_neg
360 elif name == glbs.STORED_ENERGY_ALIAS_POS:
361 values = self.data.stored_energy_profile_flex_pos
362 elif name == self.config.price_variable:
363 values = self.data.electricity_price_series
364 else:
365 values = self.get(name).value
367 if isinstance(values, pd.Series):
368 traj = values.reindex(np.arange(0, n * ts, ts))
369 else:
370 traj = pd.Series(values).reindex(np.arange(0, n * ts, ts))
371 results.append(traj)
373 if not now % ts:
374 self.time.append(now)
375 new_df = pd.DataFrame(results).T
376 new_df.columns = self.var_list
377 # Rename time_step variable column
378 new_df.rename(columns={glbs.TIME_STEP: f"{glbs.TIME_STEP}_mpc"}, inplace=True)
379 new_df.index.direction = "time"
380 new_df[glbs.TIME_STEP] = now
381 new_df.set_index([glbs.TIME_STEP, new_df.index], inplace=True)
382 df = pd.concat([df, new_df])
383 # set the indices once again as concat cant handle indices properly
384 indices = pd.MultiIndex.from_tuples(df.index, names=[glbs.TIME_STEP, "time"])
385 df.set_index(indices, inplace=True)
386 # Drop column time_step and keep it as an index only
387 if glbs.TIME_STEP in df.columns:
388 df.drop(columns=[glbs.TIME_STEP], inplace=True)
390 return df
392 def cleanup_results(self):
393 """Remove the existing result files."""
394 results_file = self.config.results_file
395 if not results_file:
396 return
397 os.remove(results_file)
399 def calc_and_send_offer(self):
400 """Calculate the flexibility KPIs for current predictions, send the flex offer and set the outputs, write and save the results."""
401 # Calculate the flexibility KPIs for current predictions
402 self.data.calculate(enable_energy_costs_correction=self.config.correct_costs.enable_energy_costs_correction, calculate_flex_cost=self.config.calculate_costs.calculate_flex_costs)
404 # Send flex offer
405 self.send_flex_offer(
406 name=glbs.FlexibilityOffer,
407 base_power_profile=self.data.power_profile_base,
408 pos_diff_profile=self.data.kpis_pos.power_flex_offer.value,
409 pos_price=self.data.kpis_pos.costs.value,
410 neg_diff_profile=self.data.kpis_neg.power_flex_offer.value,
411 neg_price=self.data.kpis_neg.costs.value,
412 )
414 # set outputs
415 for kpi in self.data.get_kpis().values():
416 if kpi.get_kpi_identifier() not in [kpis_pos.power_flex_within_boundary.get_kpi_identifier(), kpis_neg.power_flex_within_boundary.get_kpi_identifier()]:
417 for output in self.config.outputs:
418 if output.name == kpi.get_kpi_identifier():
419 self.set(output.name, kpi.value)
421 # write results
422 self.df = self.write_results(
423 df=self.df,
424 ts=self.get(glbs.TIME_STEP).value,
425 n=self.get(glbs.PREDICTION_HORIZON).value
426 )
428 # save results
429 if self.config.save_results:
430 self.df.to_csv(self.config.results_file)
432 def send_flex_offer(
433 self, name: str,
434 base_power_profile: pd.Series,
435 pos_diff_profile: pd.Series, pos_price: float,
436 neg_diff_profile: pd.Series, neg_price: float,
437 timestamp: float = None
438 ):
439 """Send a flex offer as an agent Variable.
441 The first offer is dismissed, since the different MPCs need one time step to fully initialize.
443 Args:
444 name: name of the agent variable
445 base_power_profile: time series of power from baseline mpc
446 pos_diff_profile: power profile for the positive difference (base-pos) in flexibility event time grid
447 pos_price: price for positive flexibility
448 neg_diff_profile: power profile for the negative difference (neg-base) in flexibility event time grid
449 neg_price: price for negative flexibility
450 timestamp: the time offer was generated
452 """
453 if self.offer_count > 0:
454 var = self._variables_dict[name]
455 var.value = FlexOffer(
456 base_power_profile=base_power_profile,
457 pos_diff_profile=pos_diff_profile, pos_price=pos_price,
458 neg_diff_profile=neg_diff_profile, neg_price=neg_price,
459 )
460 if timestamp is None:
461 timestamp = self.env.time
462 var.timestamp = timestamp
463 self.agent.data_broker.send_variable(
464 variable=var.copy(update={"source": self.source}),
465 copy=False,
466 )
467 self.offer_count += 1
469 def _set_inputs_to_none(self):
470 self.data.power_profile_base = None
471 self.data.power_profile_flex_neg = None
472 self.data.power_profile_flex_pos = None
473 self.data.electricity_price_series = None
474 self.data.stored_energy_profile_base = None
475 self.data.stored_energy_profile_flex_neg = None
476 self.data.stored_energy_profile_flex_pos = None
478 def check_power_end_deviation(self, tol: float):
479 """Calculate the deviation of the final value of the power profiles and warn the user if it exceeds the tolerance."""
480 logger = logging.getLogger(__name__)
481 dev_pos = np.mean(self.data.power_profile_flex_pos.values[-4:] - self.data.power_profile_base.values[-4:])
482 dev_neg = np.mean(self.data.power_profile_flex_neg.values[-4:] - self.data.power_profile_base.values[-4:])
483 if abs(dev_pos) > tol:
484 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of "
485 f"power profiles of positive shadow MPC and the baseline. "
486 f"Correction of energy costs might be necessary.")
487 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), False)
488 else:
489 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), True)
490 if abs(dev_neg) > tol:
491 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of "
492 f"power profiles of negative shadow MPC and the baseline. "
493 f"Correction of energy costs might be necessary.")
494 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), False)
495 else:
496 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), True)