Coverage for agentlib_flexquant/modules/flexibility_indicator.py: 88%
193 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-08-01 15:10 +0000
1import logging
2import os
3from pathlib import Path
4from typing import List, Optional
6import agentlib
7import numpy as np
8import pandas as pd
9from agentlib.core.errors import ConfigurationError
10from pydantic import BaseModel, ConfigDict, Field, model_validator
12import agentlib_flexquant.data_structures.globals as glbs
13from agentlib_flexquant.data_structures.flex_kpis import (
14 FlexibilityData,
15 FlexibilityKPIs
16)
17from agentlib_flexquant.data_structures.flex_offer import FlexOffer
20class InputsForCorrectFlexCosts(BaseModel):
21 enable_energy_costs_correction: bool = Field(
22 name="enable_energy_costs_correction",
23 description="Variable determining whether to correct the costs of the flexible energy"
24 "Define the variable for stored electrical energy in the base MPC model and config as output if the correction of costs is enabled",
25 default=False
26 )
28 absolute_power_deviation_tolerance: float = Field(
29 name="absolute_power_deviation_tolerance",
30 default=0.1,
31 description="Absolute tolerance in kW within which no warning is thrown"
32 )
34 stored_energy_variable: str = Field(
35 name="stored_energy_variable",
36 default="E_stored",
37 description="Name of the variable representing the stored electrical energy in the baseline config"
38 )
40class InputsForCalculateFlexCosts(BaseModel):
41 use_constant_electricity_price: bool = Field(
42 default=False,
43 description="Use constant electricity price"
44 )
45 calculate_flex_costs: bool = Field(
46 default=True,
47 description="Calculate the flexibility cost"
48 )
49 const_electricity_price: float = Field(
50 default=np.nan,
51 description="constant electricity price in ct/kWh"
52 )
54 @model_validator(mode="after")
55 def validate_constant_price(cls, model):
56 if model.use_constant_electricity_price and np.isnan(model.const_electricity_price):
57 raise Exception(f'Constant electricity price must have a valid value in float if it is to be used for calculation. '
58 f'Received "use_constant_electricity_price": true, "const_electricity_price": {model.const_electricity_price}. '
59 f'Please specify them correctly in "calculate_costs" field in flex config.')
60 return model
63# Pos and neg kpis to get the right names for plotting
64kpis_pos = FlexibilityKPIs(direction="positive")
65kpis_neg = FlexibilityKPIs(direction="negative")
68class FlexibilityIndicatorModuleConfig(agentlib.BaseModuleConfig):
70 model_config = ConfigDict(
71 extra='forbid'
72 )
74 inputs: List[agentlib.AgentVariable] = [
75 agentlib.AgentVariable(name=glbs.POWER_ALIAS_BASE, unit="W", type="pd.Series",
76 description="The power input to the system"),
77 agentlib.AgentVariable(name=glbs.POWER_ALIAS_NEG, unit="W", type="pd.Series",
78 description="The power input to the system"),
79 agentlib.AgentVariable(name=glbs.POWER_ALIAS_POS, unit="W", type="pd.Series",
80 description="The power input to the system"),
81 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_BASE, unit="kWh", type="pd.Series",
82 description="Energy stored in the system w.r.t. 0K"),
83 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_NEG, unit="kWh", type="pd.Series",
84 description="Energy stored in the system w.r.t. 0K"),
85 agentlib.AgentVariable(name=glbs.STORED_ENERGY_ALIAS_POS, unit="kWh", type="pd.Series",
86 description="Energy stored in the system w.r.t. 0K")
87 ]
89 outputs: List[agentlib.AgentVariable] = [
90 # Flexibility offer
91 agentlib.AgentVariable(name=glbs.FlexibilityOffer, type="FlexOffer"),
93 # Power KPIs
94 agentlib.AgentVariable(
95 name=kpis_neg.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series",
96 description="Negative power flexibility"
97 ),
98 agentlib.AgentVariable(
99 name=kpis_pos.power_flex_full.get_kpi_identifier(), unit='W', type="pd.Series",
100 description="Positive power flexibility"
101 ),
102 agentlib.AgentVariable(
103 name=kpis_neg.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series",
104 description="Negative power flexibility"
105 ),
106 agentlib.AgentVariable(
107 name=kpis_pos.power_flex_offer.get_kpi_identifier(), unit='W', type="pd.Series",
108 description="Positive power flexibility"
109 ),
110 agentlib.AgentVariable(
111 name=kpis_neg.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float",
112 description="Minimum of negative power flexibility"
113 ),
114 agentlib.AgentVariable(
115 name=kpis_pos.power_flex_offer_min.get_kpi_identifier(), unit='W', type="float",
116 description="Minimum of positive power flexibility"
117 ),
118 agentlib.AgentVariable(
119 name=kpis_neg.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float",
120 description="Maximum of negative power flexibility"
121 ),
122 agentlib.AgentVariable(
123 name=kpis_pos.power_flex_offer_max.get_kpi_identifier(), unit='W', type="float",
124 description="Maximum of positive power flexibility"
125 ),
126 agentlib.AgentVariable(
127 name=kpis_neg.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float",
128 description="Average of negative power flexibility"
129 ),
130 agentlib.AgentVariable(
131 name=kpis_pos.power_flex_offer_avg.get_kpi_identifier(), unit='W', type="float",
132 description="Average of positive power flexibility"
133 ),
134 agentlib.AgentVariable(
135 name=kpis_neg.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool",
136 description="Variable indicating whether the baseline power and flex power align at the horizon end"
137 ),
138 agentlib.AgentVariable(
139 name=kpis_pos.power_flex_within_boundary.get_kpi_identifier(), unit='-', type="bool",
140 description="Variable indicating whether the baseline power and flex power align at the horizon end"
141 ),
143 # Energy KPIs
144 agentlib.AgentVariable(
145 name=kpis_neg.energy_flex.get_kpi_identifier(), unit='kWh', type="float",
146 description="Negative energy flexibility"
147 ),
148 agentlib.AgentVariable(
149 name=kpis_pos.energy_flex.get_kpi_identifier(), unit='kWh', type="float",
150 description="Positive energy flexibility"
151 ),
153 # Costs KPIs
154 agentlib.AgentVariable(
155 name=kpis_neg.costs.get_kpi_identifier(), unit="ct", type="float",
156 description="Saved costs due to baseline"
157 ),
158 agentlib.AgentVariable(
159 name=kpis_pos.costs.get_kpi_identifier(), unit="ct", type="float",
160 description="Saved costs due to baseline"
161 ),
162 agentlib.AgentVariable(
163 name=kpis_neg.corrected_costs.get_kpi_identifier(), unit="ct", type="float",
164 description="Corrected saved costs due to baseline"
165 ),
166 agentlib.AgentVariable(
167 name=kpis_pos.corrected_costs.get_kpi_identifier(), unit="ct", type="float",
168 description="Corrected saved costs due to baseline"
169 ),
170 agentlib.AgentVariable(
171 name=kpis_neg.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
172 description="Saved costs due to baseline"
173 ),
174 agentlib.AgentVariable(
175 name=kpis_pos.costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
176 description="Saved costs due to baseline"
177 ),
178 agentlib.AgentVariable(
179 name=kpis_neg.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
180 description="Corrected saved costs per energy due to baseline"
181 ),
182 agentlib.AgentVariable(
183 name=kpis_pos.corrected_costs_rel.get_kpi_identifier(), unit='ct/kWh', type="float",
184 description="Corrected saved costs per energy due to baseline"
185 )
186 ]
188 parameters: List[agentlib.AgentVariable] = [
189 agentlib.AgentVariable(name=glbs.PREP_TIME, unit="s",
190 description="Preparation time"),
191 agentlib.AgentVariable(name=glbs.MARKET_TIME, unit="s",
192 description="Market time"),
193 agentlib.AgentVariable(name=glbs.FLEX_EVENT_DURATION, unit="s",
194 description="time to switch objective"),
195 agentlib.AgentVariable(name=glbs.TIME_STEP, unit="s",
196 description="timestep of the mpc solution"),
197 agentlib.AgentVariable(name=glbs.PREDICTION_HORIZON, unit="-",
198 description="prediction horizon of the mpc solution")
199 ]
201 results_file: Optional[Path] = Field(
202 default=Path("flexibility_indicator.csv"),
203 description="User specified results file name"
204 )
205 save_results: Optional[bool] = Field(
206 validate_default=True,
207 default=True
208 )
209 price_variable: str = Field(
210 default="c_pel",
211 description="Name of the price variable sent by a predictor",
212 )
213 power_unit: str = Field(
214 default="kW",
215 description="Unit of the power variable"
216 )
217 shared_variable_fields: List[str] = ["outputs"]
219 correct_costs: InputsForCorrectFlexCosts = InputsForCorrectFlexCosts()
220 calculate_costs: InputsForCalculateFlexCosts = InputsForCalculateFlexCosts()
222 @model_validator(mode="after")
223 def check_results_file_extension(self):
224 if self.results_file and self.results_file.suffix != ".csv":
225 raise ValueError(
226 f"Invalid file extension for 'results_file': '{self.results_file}'. "
227 f"Expected a '.csv' file."
228 )
229 return self
231class FlexibilityIndicatorModule(agentlib.BaseModule):
232 config: FlexibilityIndicatorModuleConfig
234 data: FlexibilityData
236 def __init__(self, *args, **kwargs):
237 super().__init__(*args, **kwargs)
238 self.var_list = []
239 for variable in self.variables:
240 if variable.name in [glbs.FlexibilityOffer]:
241 continue
242 self.var_list.append(variable.name)
243 self.time = []
244 self.in_provision = False
245 self.offer_count = 0
246 self.data = FlexibilityData(
247 prep_time=self.get(glbs.PREP_TIME).value,
248 market_time=self.get(glbs.MARKET_TIME).value,
249 flex_event_duration=self.get(glbs.FLEX_EVENT_DURATION).value,
250 time_step=self.get(glbs.TIME_STEP).value,
251 prediction_horizon=self.get(glbs.PREDICTION_HORIZON).value
252 )
253 self.df = pd.DataFrame(columns=pd.Series(self.var_list))
255 def register_callbacks(self):
256 inputs = self.config.inputs
257 for var in inputs:
258 self.agent.data_broker.register_callback(
259 name=var.name, alias=var.name, callback=self.callback
260 )
261 self.agent.data_broker.register_callback(
262 name="in_provision", alias="in_provision", callback=self.callback
263 )
265 def process(self):
266 yield self.env.event()
268 def callback(self, inp, name):
269 if name == "in_provision":
270 self.in_provision = inp.value
271 if self.in_provision:
272 self._set_inputs_to_none()
274 if not self.in_provision:
275 if name == glbs.POWER_ALIAS_BASE:
276 self.data.power_profile_base = self.data.format_mpc_inputs(inp.value)
277 elif name == glbs.POWER_ALIAS_NEG:
278 self.data.power_profile_flex_neg = self.data.format_mpc_inputs(inp.value)
279 elif name == glbs.POWER_ALIAS_POS:
280 self.data.power_profile_flex_pos = self.data.format_mpc_inputs(inp.value)
281 elif name == glbs.STORED_ENERGY_ALIAS_BASE:
282 self.data.stored_energy_profile_base = self.data.format_mpc_inputs(inp.value)
283 elif name == glbs.STORED_ENERGY_ALIAS_NEG:
284 self.data.stored_energy_profile_flex_neg = self.data.format_mpc_inputs(inp.value)
285 elif name == glbs.STORED_ENERGY_ALIAS_POS:
286 self.data.stored_energy_profile_flex_pos = self.data.format_mpc_inputs(inp.value)
287 elif name == self.config.price_variable:
288 if not self.config.calculate_costs.use_constant_electricity_price:
289 # price comes from predictor, so no stripping needed
290 self.data.electricity_price_series = self.data.format_predictor_inputs(inp.value)
292 # set the constant electricity price series if given
293 if self.config.calculate_costs.use_constant_electricity_price and self.data.electricity_price_series is None:
294 # get the index for the electricity price series
295 n = self.get(glbs.PREDICTION_HORIZON).value
296 ts = self.get(glbs.TIME_STEP).value
297 grid = np.arange(0, n * ts, ts)
298 # fill the electricity_price_series with values
299 electricity_price_series = pd.Series([self.config.calculate_costs.const_electricity_price for i in grid], index=grid)
300 self.data.electricity_price_series = self.data.format_predictor_inputs(electricity_price_series)
303 necessary_input_for_calc_flex = [self.data.power_profile_base,
304 self.data.power_profile_flex_neg,
305 self.data.power_profile_flex_pos]
307 if self.config.calculate_costs.calculate_flex_costs:
308 necessary_input_for_calc_flex.append(self.data.electricity_price_series)
310 if self.config.correct_costs.enable_energy_costs_correction:
311 necessary_input_for_calc_flex.extend(
312 [self.data.stored_energy_profile_base,
313 self.data.stored_energy_profile_flex_neg,
314 self.data.stored_energy_profile_flex_pos])
316 if all(var is not None for var in necessary_input_for_calc_flex):
318 # check the power profile end deviation
319 if not self.config.correct_costs.enable_energy_costs_correction:
320 self.check_power_end_deviation(tol=self.config.correct_costs.absolute_power_deviation_tolerance)
322 # Calculate the flexibility, send the offer, write and save the results
323 self.calc_and_send_offer()
325 # set the values to None to reset the callback
326 self._set_inputs_to_none()
328 def get_results(self) -> Optional[pd.DataFrame]:
329 """
330 Opens results file of flexibility_indicator.py
331 results_file defined in __init__
332 """
333 results_file = self.config.results_file
334 try:
335 results = pd.read_csv(results_file, header=[0], index_col=[0, 1])
336 return results
337 except FileNotFoundError:
338 self.logger.error("Results file %s was not found.", results_file)
339 return None
341 def write_results(self, df, ts, n):
342 """
343 Write every data of variables in self.var_list in an DataFrame
344 DataFrame will be updated every time step
346 Args:
347 df: DataFrame which is initialised as an empty DataFrame with columns according to self.var_list
348 ts: time step
349 n: number of time steps during prediction horizon
350 Returns:
351 DataFrame with results of every variable in self.var_list
352 """
353 results = []
354 now = self.env.now
355 for name in self.var_list:
356 # Use the power variables averaged for each timestep, not the collocation values
357 if name == glbs.POWER_ALIAS_BASE:
358 values = self.data.power_profile_base
359 elif name == glbs.POWER_ALIAS_NEG:
360 values = self.data.power_profile_flex_neg
361 elif name == glbs.POWER_ALIAS_POS:
362 values = self.data.power_profile_flex_pos
363 elif name == glbs.STORED_ENERGY_ALIAS_BASE:
364 values = self.data.stored_energy_profile_base
365 elif name == glbs.STORED_ENERGY_ALIAS_NEG:
366 values = self.data.stored_energy_profile_flex_neg
367 elif name == glbs.STORED_ENERGY_ALIAS_POS:
368 values = self.data.stored_energy_profile_flex_pos
369 elif name == self.config.price_variable:
370 values = self.data.electricity_price_series
371 else:
372 values = self.get(name).value
374 if isinstance(values, pd.Series):
375 traj = values.reindex(np.arange(0, n * ts, ts))
376 else:
377 traj = pd.Series(values).reindex(np.arange(0, n * ts, ts))
378 results.append(traj)
380 if not now % ts:
381 self.time.append(now)
382 new_df = pd.DataFrame(results).T
383 new_df.columns = self.var_list
384 # Rename time_step variable column
385 new_df.rename(columns={glbs.TIME_STEP: f"{glbs.TIME_STEP}_mpc"}, inplace=True)
386 new_df.index.direction = "time"
387 new_df[glbs.TIME_STEP] = now
388 new_df.set_index([glbs.TIME_STEP, new_df.index], inplace=True)
389 df = pd.concat([df, new_df])
390 # set the indices once again as concat cant handle indices properly
391 indices = pd.MultiIndex.from_tuples(df.index, names=[glbs.TIME_STEP, "time"])
392 df.set_index(indices, inplace=True)
393 # Drop column time_step and keep it as an index only
394 if glbs.TIME_STEP in df.columns:
395 df.drop(columns=[glbs.TIME_STEP], inplace=True)
397 return df
399 def cleanup_results(self):
400 results_file = self.config.results_file
401 if not results_file:
402 return
403 os.remove(results_file)
405 def calc_and_send_offer(self):
406 """
407 Calculate the flexibility KPIs for current predictions, send the flex offer and set the outputs, write and save the results.
408 """
409 # Calculate the flexibility KPIs for current predictions
410 self.data.calculate(enable_energy_costs_correction=self.config.correct_costs.enable_energy_costs_correction, calculate_flex_cost=self.config.calculate_costs.calculate_flex_costs)
412 # Send flex offer
413 self.send_flex_offer(
414 name=glbs.FlexibilityOffer,
415 base_power_profile=self.data.power_profile_base,
416 pos_diff_profile=self.data.kpis_pos.power_flex_offer.value,
417 pos_price=self.data.kpis_pos.costs.value,
418 neg_diff_profile=self.data.kpis_neg.power_flex_offer.value,
419 neg_price=self.data.kpis_neg.costs.value,
420 )
422 # set outputs
423 for kpi in self.data.get_kpis().values():
424 if kpi.get_kpi_identifier() not in [kpis_pos.power_flex_within_boundary.get_kpi_identifier(), kpis_neg.power_flex_within_boundary.get_kpi_identifier()]:
425 for output in self.config.outputs:
426 if output.name == kpi.get_kpi_identifier():
427 self.set(output.name, kpi.value)
429 # write results
430 self.df = self.write_results(
431 df=self.df,
432 ts=self.get(glbs.TIME_STEP).value,
433 n=self.get(glbs.PREDICTION_HORIZON).value
434 )
436 # save results
437 if self.config.save_results:
438 self.df.to_csv(self.config.results_file)
440 def send_flex_offer(
441 self, name,
442 base_power_profile: pd.Series,
443 pos_diff_profile: pd.Series, pos_price: float,
444 neg_diff_profile: pd.Series, neg_price: float,
445 timestamp: float = None
446 ):
447 """
448 Send a flex offer as an agent Variable. The first offer is dismissed,
449 since the different MPCs need one time step to fully initialize.
451 Inputs:
453 name: name of the agent variable
454 indicator_data: the indicator data object
455 timestamp: the time offer was generated
457 """
458 if self.offer_count > 0:
459 var = self._variables_dict[name]
460 var.value = FlexOffer(
461 base_power_profile=base_power_profile,
462 pos_diff_profile=pos_diff_profile, pos_price=pos_price,
463 neg_diff_profile=neg_diff_profile, neg_price=neg_price,
464 )
465 if timestamp is None:
466 timestamp = self.env.time
467 var.timestamp = timestamp
468 self.agent.data_broker.send_variable(
469 variable=var.copy(update={"source": self.source}),
470 copy=False,
471 )
472 self.offer_count += 1
474 def _set_inputs_to_none(self):
475 self.data.power_profile_base = None
476 self.data.power_profile_flex_neg = None
477 self.data.power_profile_flex_pos = None
478 self.data.electricity_price_series = None
479 self.data.stored_energy_profile_base = None
480 self.data.stored_energy_profile_flex_neg = None
481 self.data.stored_energy_profile_flex_pos = None
483 def check_power_end_deviation(self, tol: float):
484 """
485 calculates the deviation of the final value of the power profiles and warn the user if it exceeds the tolerance
486 """
487 logger = logging.getLogger(__name__)
488 dev_pos = np.mean(self.data.power_profile_flex_pos.values[-4:] - self.data.power_profile_base.values[-4:])
489 dev_neg = np.mean(self.data.power_profile_flex_neg.values[-4:] - self.data.power_profile_base.values[-4:])
490 if abs(dev_pos) > tol:
491 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of power profiles of positive shadow MPC and the baseline. Correction of energy costs might be necessary.")
492 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), False)
493 else:
494 self.set(kpis_pos.power_flex_within_boundary.get_kpi_identifier(), True)
495 if abs(dev_neg) > tol:
496 logger.warning(f"There is an average deviation of {dev_pos:.6f} kW between the final values of power profiles of negative shadow MPC and the baseline. Correction of energy costs might be necessary.")
497 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), False)
498 else:
499 self.set(kpis_neg.power_flex_within_boundary.get_kpi_identifier(), True)