Coverage for agentlib/modules/simulation/simulator.py: 89%
314 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-20 15:11 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-01-20 15:11 +0000
1"""
2Module contains the Simulator, used to simulate any model.
3"""
5import os
6import warnings
7from dataclasses import dataclass, field
8from math import inf
9from pathlib import Path
10from typing import Union, Dict, List, Optional
12import numpy as np
13import pandas as pd
14from pydantic import field_validator, Field
15from pydantic_core.core_schema import FieldValidationInfo
17from agentlib.core import (
18 BaseModule,
19 BaseModuleConfig,
20 Agent,
21 Causality,
22 AgentVariable,
23 AgentVariables,
24 ModelVariable,
25 Model,
26)
27from agentlib.core.errors import OptionalDependencyError
28from agentlib.models import get_model_type, UNINSTALLED_MODEL_TYPES
29from agentlib.utils import custom_injection, create_time_samples
32@dataclass
33class SimulatorResults:
34 """Class to organize in-memory simulator results."""
36 # Configuration
37 filename: Optional[str] = None
38 header_written: bool = False
40 # Data Buffers
41 index: List[float] = field(default_factory=list)
42 data: List[List[float]] = field(default_factory=list)
44 # State tracking
45 _current_inputs: List[float] = field(default_factory=list)
46 _current_outputs: List[float] = field(default_factory=list)
47 _columns: pd.MultiIndex = None
48 _input_count: int = 0
49 _output_count: int = 0
51 def setup(self, input_vars: List[ModelVariable], output_vars: List[ModelVariable]):
52 """
53 Initializes results object input variables
54 "u", outputs "x" and internal state variables "x".
55 It uses a nested dict provided from model class.
56 +---------------------------------------------------+
57 | | inputs u | outputs y | states x |
58 | t | u1 | u2 | y1 | y2 | x1 | x2 |
59 +---------------------------------------------------+
60 | 1 | ... | ... | ... | ... | ... | ... |
61 | 2 | ... | ... | ... | ... | ... | ... |
62 |...| ... | ... | ... | ... | ... | ... |
63 |...
64 Also initializes the internal buffers.
65 """
66 variables = output_vars + input_vars
67 self._input_count = len(input_vars)
68 self._output_count = len(output_vars)
70 # Initialize current inputs with current values
71 self._current_inputs = [var.value for var in input_vars]
73 self._columns = pd.MultiIndex.from_arrays(
74 arrays=np.array(
75 [
76 [_var.causality.name for _var in variables],
77 [_var.name for _var in variables],
78 [_var.type for _var in variables],
79 ]
80 ),
81 sortorder=0,
82 names=["causality", "name", "type"],
83 )
85 def update_inputs(self, values: List[float], time: float, capture_all_inputs: bool):
86 """
87 Updates the result with the inputs creating a full result row (output + input).
88 If capture_all_inputs is True, creates a row with NaN outputs.
89 """
90 self._current_inputs = values
91 # Results can already hold the input (at t_sample_communication created by
92 # the output writing) or the input time is new (created by an input callback)
93 if not self.index or time != self.index[-1]:
94 # For capture_all_inputs, append the inputs created by an input callback
95 if capture_all_inputs:
96 self.index.append(time)
97 # index is not in data, if results have been written to disk
98 # Create row: [NaN, NaN, ..., In1, In2, ...]
99 row = [None] * self._output_count + self._current_inputs
100 # If timestamp is new, this needs to be appended
101 self.data.append(row)
102 else:
103 # Create row: [Out1, Out2, ..., In1, In2, ...]
104 row = self.data[-1][:self._output_count] + self._current_inputs
105 # Update timestamp with new inputs
106 self.data[-1] = row
108 def update_outputs(self, values: List[float], time: float):
109 """
110 Stores a result row at the end of a simulation step.
111 Combines provided output values with None for input values, as these are
112 updated in the next time step.
113 """
114 # Create row: [Out1, Out2, ..., None, None, ...]
115 row = values + [None] * self._input_count
116 self.index.append(time)
117 self.data.append(row)
119 def update_current_outputs(self, values: List[float]):
120 """
121 Stores the current output values of intermediate simulation steps.
122 """
123 self._current_outputs = values
125 def initialize_outputs(self, time):
126 """
127 Initializes output data with Nones.
128 """
129 self.index.append(time)
130 # Create row: [None, None, ..., In1, In2, ...]
131 self.data.append([None] * self._output_count + self._current_inputs)
133 def initialize_inputs(self, values: List[float]):
134 """
135 Initializes input data with Nones.
136 """
137 self._current_inputs = values
139 def write_results(self):
140 """
141 Dumps results which are currently in memory to the file.
142 Clears memory after writing to keep footprint low.
143 """
144 if not self.filename or not self.data:
145 return
147 df = pd.DataFrame(self.data, index=self.index, columns=self._columns)
149 # Write header only once
150 header = not self.header_written and not Path(self.filename).exists()
151 df.to_csv(self.filename, mode="a", header=header)
153 self.header_written = True
155 # Clear buffers
156 self.index.clear()
157 self.data.clear()
159 def df(self) -> pd.DataFrame:
160 """Returns the current results as a dataframe."""
161 return pd.DataFrame(self.data, index=self.index, columns=self._columns)
164def read_simulator_results(file: str):
165 """Reads results from file with correct multi-column format."""
166 return pd.read_csv(file, header=[0, 1, 2], index_col=0)
169class SimulatorConfig(BaseModuleConfig):
170 """
171 Pydantic data model for simulator configuration parser
172 """
174 parameters: AgentVariables = []
175 inputs: AgentVariables = []
176 outputs: AgentVariables = []
177 states: AgentVariables = []
178 shared_variable_fields: List[str] = ["outputs"]
180 t_start: Union[float, int] = Field(
181 title="t_start", default=0.0, ge=0, description="Simulation start time"
182 )
183 t_stop: Union[float, int] = Field(
184 title="t_stop", default=inf, ge=0, description="Simulation stop time"
185 )
186 t_sample: Union[float, int] = Field(
187 title="t_sample", default=1, ge=0, description="Deprecated option."
188 )
189 t_sample_communication: Union[float, int] = Field(
190 title="t_sample",
191 default=1,
192 validate_default=True,
193 ge=0,
194 description="Sample time of a full simulation step relevant for communication, including:"
195 "- Perform simulation with t_sample_simulation"
196 "- Update model results and send output values to other Agents or Modules."
197 )
198 t_sample_simulation: Union[float, int] = Field(
199 title="t_sample_simulation",
200 default=1,
201 validate_default=True,
202 ge=0,
203 description="Sample time of the simulation itself. "
204 "The inputs of the models may be updated every other t_sample_simulation, "
205 "as long as the model supports this. Used to override dt of the model."
206 )
207 model: Dict
208 # Model results
209 save_results: bool = Field(
210 title="save_results",
211 default=False,
212 description="If True, results are created and stored",
213 )
214 overwrite_result_file: bool = Field(
215 title="overwrite_result",
216 default=False,
217 description="If True, and the result file already exists, the file is overwritten.",
218 )
219 result_filename: Optional[str] = Field(
220 title="result_filename",
221 default=None,
222 description="If not None, results are stored in that filename."
223 "Needs to be a .csv file",
224 )
225 result_sep: str = Field(
226 title="result_sep",
227 default=",",
228 description="Separator in the .csv file. Only relevant if "
229 "result_filename is passed",
230 )
231 result_causalities: List[Causality] = Field(
232 title="result_causalities",
233 default=[Causality.input, Causality.output],
234 description="List of causalities to store. Default stores "
235 "only inputs and outputs",
236 )
237 capture_all_inputs: bool = Field(
238 title="capture_all_inputs",
239 default=False,
240 description="If True, results are stored immediately when "
241 "inputs change, even during simulation steps.",
242 )
243 write_results_delay: Optional[float] = Field(
244 title="Write Results Delay",
245 default=None,
246 description="Sampling interval for which the results are written to disc in seconds.",
247 validate_default=True,
248 gt=0,
249 )
250 update_inputs_on_callback: bool = Field(
251 title="update_inputs_on_callback",
252 default=True,
253 description="Deprecated! Will be removed in future versions."
254 "If True, model inputs are updated if they are updated in data_broker."
255 "Else, the model inputs are updated before each simulation.",
256 )
257 measurement_uncertainty: Union[Dict[str, float], float] = Field(
258 title="measurement_uncertainty",
259 default=0,
260 description="Either pass a float and add the percentage uncertainty "
261 "to all measurements from the model."
262 "Or pass a Dict and specify the model variable name as key"
263 "and the associated uncertainty as a float",
264 )
265 validate_incoming_values: Optional[bool] = Field(
266 default=False, # we overwrite the default True in base, to be more efficient
267 title="Validate Incoming Values",
268 description="If true, the validator of the AgentVariable value is called when "
269 "receiving a new value from the DataBroker. In the simulator, this "
270 "is False by default, as we expect to receive a lot of measurements"
271 " and want to be efficient.",
272 )
274 @field_validator("result_filename")
275 @classmethod
276 def check_nonexisting_csv(cls, result_filename, info: FieldValidationInfo):
277 """Check if the result_filename is a .csv file or an hf
278 and assert that it does not exist."""
279 if not info.data.get("save_results", False):
280 # No need to check as filename will never be used anyways
281 return None
282 if result_filename is None:
283 return result_filename
284 if not result_filename.endswith(".csv"):
285 raise TypeError(
286 f"Given result_filename ends with "
287 f'{result_filename.split(".")[-1]} '
288 f"but should be a .csv file"
289 )
290 if os.path.isfile(result_filename):
291 # remove result file, so a new one can be created
292 if info.data["overwrite_result_file"]:
293 os.remove(result_filename)
294 return result_filename
295 raise FileExistsError(
296 f"Given result_filename at {result_filename} "
297 f"already exists. We won't overwrite it automatically. "
298 f"You can use the key word 'overwrite_result_file' to "
299 f"activate automatic overwrite."
300 )
301 # Create path in case it does not exist
302 fpath = os.path.dirname(result_filename)
303 if fpath:
304 os.makedirs(fpath, exist_ok=True)
305 return result_filename
307 @field_validator("t_stop")
308 @classmethod
309 def check_t_stop(cls, t_stop, info: FieldValidationInfo):
310 """Check if stop is greater than start time"""
311 t_start = info.data.get("t_start")
312 assert t_stop > t_start, "t_stop must be greater than t_start"
313 return t_stop
315 @field_validator("t_sample_communication", "t_sample_simulation")
316 @classmethod
317 def check_t_sample(cls, t_sample, info: FieldValidationInfo):
318 """Check if t_sample is smaller than stop-start time"""
319 t_start = info.data.get("t_start")
320 t_stop = info.data.get("t_stop")
321 t_sample_old = info.data.get("t_sample")
323 # Handle legacy t_sample logic
324 if t_sample_old != 1:
325 if info.field_name == "t_sample_simulation":
326 t_sample = 1
327 else:
328 t_sample = t_sample_old
329 assert (
330 t_start + t_sample <= t_stop
331 ), "t_stop-t_start must be greater than t_sample"
332 return t_sample
334 @field_validator("t_sample_communication")
335 @classmethod
336 def check_t_comm_against_sim(cls, t_sample_communication,
337 info: FieldValidationInfo):
338 """Check if t_sample is smaller than stop-start time"""
339 t_sample_simulation = info.data.get("t_sample_simulation")
340 if t_sample_simulation is not None:
341 if t_sample_simulation > t_sample_communication:
342 warnings.warn(
343 f"{t_sample_communication=} is smaller than {t_sample_simulation=}",
344 category=UserWarning
345 )
346 return t_sample_communication
348 @field_validator("update_inputs_on_callback")
349 @classmethod
350 def deprecate_update_inputs_on_callback(cls, update_inputs_on_callback,
351 info: FieldValidationInfo):
352 """Check if t_sample is smaller than stop-start time"""
353 warnings.warn(
354 "update_inputs_on_callback is deprecated, remove it from your config. "
355 "Will use update_inputs_on_callback=True",
356 category=DeprecationWarning
357 )
358 return True
360 @field_validator("t_sample")
361 @classmethod
362 def deprecate_t_sample(cls, t_sample, info: FieldValidationInfo):
363 """Deprecates the t_sample field in favor of t_sample_communication
364 and t_sample_simulation."""
365 warnings.warn(
366 "t_sample is deprecated, use t_sample_communication for storing outputs "
367 "and t_sample_simulation for the actual simulation step. "
368 "Will use the given t_sample for t_sample_communication and "
369 "t_sample_simulation=1 s, the `model.dt` default.",
370 )
371 return t_sample
373 @field_validator("write_results_delay")
374 @classmethod
375 def set_default_t_sample(cls, write_results_delay, info: FieldValidationInfo):
376 t_comm = info.data.get("t_sample_communication", 1)
378 if write_results_delay is None:
379 # Default to writing every 5 communication steps to balance I/O
380 return t_comm * 5
382 if write_results_delay < t_comm:
383 raise ValueError("write_results_delay should be >= t_sample_communication")
384 return write_results_delay
386 @field_validator("model")
387 @classmethod
388 def check_model(cls, model, info: FieldValidationInfo):
389 """Validate the model input"""
390 parameters = info.data.get("parameters")
391 inputs = info.data.get("inputs")
392 outputs = info.data.get("outputs")
393 states = info.data.get("states")
394 dt = info.data.get("t_sample_simulation")
395 if "dt" in model and dt != model["dt"]:
396 warnings.warn(
397 f"Given model {model['dt']=} differs from {dt=} of simulator. "
398 f"Using models dt, consider switching to t_sample_simulation."
399 )
400 else:
401 model["dt"] = dt
402 if "type" not in model:
403 raise KeyError(
404 "Given model config does not " "contain key 'type' (type of the model)."
405 )
406 _type = model.pop("type")
407 if isinstance(_type, dict):
408 custom_cls = custom_injection(config=_type)
409 model = custom_cls(**model)
410 elif isinstance(_type, str):
411 if _type in UNINSTALLED_MODEL_TYPES:
412 raise OptionalDependencyError(
413 dependency_name=_type,
414 dependency_install=UNINSTALLED_MODEL_TYPES[_type],
415 used_object=f"model {_type}",
416 )
417 model = get_model_type(_type)(
418 **model,
419 parameters=convert_agent_vars_to_list_of_dicts(parameters),
420 inputs=convert_agent_vars_to_list_of_dicts(inputs),
421 outputs=convert_agent_vars_to_list_of_dicts(outputs),
422 states=convert_agent_vars_to_list_of_dicts(states),
423 )
424 # Check if model was correctly initialized
425 assert isinstance(model, Model)
426 return model
429class Simulator(BaseModule):
430 """
431 The Simulator is the interface between simulation models
432 and further other implementations. It contains all interface functions for
433 interacting with the standard model class.
434 """
436 config: SimulatorConfig
438 def __init__(self, *, config: dict, agent: Agent):
439 super().__init__(config=config, agent=agent)
441 self._model = None
442 self.model = self.config.model
443 self._inputs_changed_since_last_results_saving = False
445 # Caching variables for performance (avoid list comprehensions in loop)
446 self._input_vars = self._get_result_input_variables()
447 self._output_vars = self._get_result_output_variables()
449 # Initialize Result Handler
450 self._result = SimulatorResults(filename=self.config.result_filename)
451 if self.config.save_results:
452 self._result.setup(input_vars=self._input_vars,
453 output_vars=self._output_vars)
455 # Initialize local time trackers
456 self._last_write_time = 0.0
457 self._last_communication_time = self.env.time
459 self._register_input_callbacks()
460 self.logger.info("%s initialized!", self.__class__.__name__)
462 def terminate(self):
463 """Terminate the model"""
464 self.model.terminate()
465 super().terminate()
467 @property
468 def model(self) -> Model:
469 """
470 Getter for current simulation model
472 Returns:
473 agentlib.core.model.Model: Current simulation model
474 """
475 return self._model
477 @model.setter
478 def model(self, model: Model):
479 """
480 Setter for current simulation model.
481 Also initializes it if needed!
482 Args:
483 model (agentlib.core.model.Model): model to set as current simulation model
484 """
485 if not isinstance(model, Model):
486 self.logger.error(
487 "You forgot to pass a valid model to the simulator module!"
488 )
489 raise TypeError(
490 f"Given model is of type {type(model)} "
491 f"but should be an instance of Model or a valid subclass"
492 )
493 self._model = model
494 if self.config.t_start and self.env.offset:
495 self.logger.warning(
496 "config.t_start and env.offset are both non-zero. "
497 "This may cause unexpected behavior. Ensure that this "
498 "is intended and you know what you are doing."
499 )
500 self.model.initialize(
501 t_start=self.config.t_start + self.env.config.offset,
502 t_stop=self.config.t_stop,
503 )
504 self.logger.info("Model successfully loaded model: %s", self.model.name)
506 def run(self, until=None):
507 """
508 Runs the simulator in stand-alone mode if needed
509 Attention: If the environment is connected to another environment
510 all scheduled process will be started in this environment.
511 """
512 if until is None:
513 self.env.run(until=self.config.t_stop - self.config.t_start)
514 else:
515 self.env.run(until=until)
517 def register_callbacks(self):
518 pass
520 def _register_input_callbacks(self):
521 """Register input callbacks"""
522 # Possible inputs are Inputs and parameters.
523 # Outputs and states are always the result of the model
524 # "Complicated" double for-loop to avoid boilerplate code
525 for _type, model_var_names, ag_vars, callback in zip(
526 ["input", "parameter"],
527 [self.model.get_input_names(), self.model.get_parameter_names()],
528 [self.config.inputs, self.config.parameters],
529 [self._callback_update_model_input, self._callback_update_model_parameter],
530 ):
531 for var in ag_vars:
532 if var.name in model_var_names:
533 self.logger.info(
534 "Registered callback for model %s %s ", _type, var.name
535 )
536 self.agent.data_broker.register_callback(
537 alias=var.alias,
538 source=var.source,
539 callback=callback,
540 name=var.name,
541 )
542 # Case for variable overwriting
543 if var.value is not None:
544 self.logger.debug(
545 "Updating model %s %s=%s", _type, var.name, var.value
546 )
547 self.model.set(name=var.name, value=var.value)
548 self._inputs_changed_since_last_results_saving = True
550 def _callback_update_model_input(self, inp: AgentVariable, name: str):
551 """Set given model input value to the model"""
552 self.logger.debug("Updating model input %s=%s", name, inp.value)
553 self.model.set_input_value(name=name, value=inp.value)
554 self._inputs_changed_since_last_results_saving = True
556 def _callback_update_model_parameter(self, par: AgentVariable, name: str):
557 """Set given model parameter value to the model"""
558 self.logger.debug("Updating model parameter %s=%s", name, par.value)
559 self.model.set_parameter_value(name=name, value=par.value)
560 self._inputs_changed_since_last_results_saving = True
562 def process(self):
563 """
564 Main simulation loop.
565 Handles simulation stepping, result logging, and synchronization.
566 """
567 # 1. Log Initial State (t=0)
568 if self.config.save_results:
569 # Ensure the result buffer has the correct initial inputs
570 in_values = [var.value for var in self._input_vars]
571 self._result.initialize_inputs(in_values)
572 self._result.initialize_outputs(self.env.time)
573 # Prevent false positive "input change" log at t=0 due to initialization callbacks
574 self._inputs_changed_since_last_results_saving = False
575 while True:
576 # Determine the time points for the next communication step
577 t_samples = create_time_samples(
578 t_end=self.config.t_sample_communication,
579 dt=self.config.t_sample_simulation
580 )
582 # Iterate through simulation sub-steps
583 for i in range(len(t_samples) - 1):
584 dt_sim = float(t_samples[i + 1] - t_samples[i])
586 # 2. Check for Input Changes (Pre-Step)
587 # If inputs changed since the last step (or during the yield), we log them now.
588 # This ensures the new inputs are recorded at the current timestamp,
589 # separate from the outputs of the *previous* step (which were logged at
590 # the end of the last loop).
591 if self._inputs_changed_since_last_results_saving:
592 if self.config.save_results:
593 # Create row: [t=Current, Out=NaN, In=New]
594 self._log_inputs(self.env.time,
595 capture_all_inputs=self.config.capture_all_inputs)
596 self._inputs_changed_since_last_results_saving = False
598 # 3. Perform Simulation Step
599 self.model.do_step(
600 t_start=self.config.t_start + self.env.now,
601 t_sample=dt_sim
602 )
604 # 4. Store intermediate outputs
605 if self.config.save_results:
606 out_values = [var.value for var in self._output_vars]
607 self._result.update_current_outputs(out_values)
609 # 5. Write results
610 if self.config.save_results:
611 # Since simulation has been performed, the model and its results are
612 # already a time step ahead
613 current_time = self.env.time + self.config.t_sample_simulation
614 if ((current_time - self._last_communication_time) >=
615 self.config.t_sample_communication):
616 # Update time tracker for communication
617 self._last_communication_time = ((current_time //
618 self.config.t_sample_communication) *
619 self.config.t_sample_communication)
620 # Check if we need to write to disk, do this before storing
621 # outputs, to initialize the new row after dumping the results
622 self._check_and_write_to_disk(self.env.time +
623 self.config.t_sample_simulation)
625 # Log the outputs resulting from the step we just finished.
626 # These will be paired with the inputs active for the next simulation step.
627 self._log_outputs(self._last_communication_time)
629 # 6. Wait for the environment
630 yield self.env.timeout(dt_sim)
632 # 7. End of Communication Step (Post-Step)
633 # Communicate
634 self.update_module_vars()
636 def _log_inputs(self, time: float, capture_all_inputs: bool):
637 """
638 Update the result object with current inputs.
639 If capture_all_inputs is True, a row is added immediately.
640 """
641 values = [var.value for var in self._input_vars]
642 self._result.update_inputs(values, time, capture_all_inputs=capture_all_inputs)
644 def _log_outputs(self, time: float):
645 """
646 Add a full result row (Outputs + Last Inputs).
647 """
648 values = [var.value for var in self._output_vars]
649 self._result.update_outputs(values, time)
651 def _check_and_write_to_disk(self, time):
652 """Check if write delay has passed and dump to disk."""
653 if not self.config.result_filename:
654 return
656 # Inputs are written in the next time step, therefore results
657 # are behind actual env time
658 current_result_time = time - self.config.t_sample_communication
659 if (current_result_time - self._last_write_time) >= self.config.write_results_delay:
660 self._result.write_results()
661 self._last_write_time = time
663 def update_module_vars(self):
664 """
665 Method to write current model output and states
666 values to the module outputs and states.
667 """
668 # pylint: disable=logging-fstring-interpolation
669 for _type, model_get, agent_vars in zip(
670 ["state", "output"],
671 [self.model.get_state, self.model.get_output],
672 [self.config.states, self.config.outputs],
673 ):
674 for var in agent_vars:
675 mo_var = model_get(var.name)
676 if mo_var is None:
677 raise KeyError(f"Given variable {var.name} not found in model.")
678 value = self._get_uncertain_value(model_variable=mo_var)
679 self.logger.debug("Updating %s %s=%s", _type, var.name, value)
680 self.set(name=var.name, value=value)
682 def _get_uncertain_value(self, model_variable: ModelVariable) -> float:
683 """Get the value with added uncertainty based on the value of the variable"""
684 if isinstance(self.config.measurement_uncertainty, dict):
685 bias = self.config.measurement_uncertainty.get(model_variable.name, 0)
686 else:
687 bias = self.config.measurement_uncertainty
688 return model_variable.value * (1 + np.random.uniform(-bias, bias))
690 def get_results(self) -> Optional[pd.DataFrame]:
691 """
692 Return the current results.
694 Returns:
695 pd.DataFrame: The results DataFrame.
696 """
697 if not self.config.save_results:
698 return
699 file = self.config.result_filename
700 if file:
701 self._result.write_results()
702 df = read_simulator_results(file)
703 else:
704 df = self._result.df()
705 df = df.droplevel(level=2, axis=1).droplevel(level=0, axis=1)
706 return df
708 def cleanup_results(self):
709 if not self.config.save_results or not self.config.result_filename:
710 return
711 os.remove(self.config.result_filename)
713 def _get_result_input_variables(self) -> List[ModelVariable]:
714 """Gets all input variables to be saved in the results based on
715 self.result_causalities. Input variables are added to the results at the time
716 index before an interval, i.e. parameters and inputs."""
717 _variables = []
718 for causality in self.config.result_causalities:
719 if causality == Causality.input:
720 _variables.extend(self.model.inputs)
721 elif causality in [Causality.parameter, Causality.calculatedParameter]:
722 _variables.extend(self.model.parameters)
723 return _variables
725 def _get_result_output_variables(self) -> List[ModelVariable]:
726 """Gets all output variables to be saved in the results based on
727 self.result_causalities. Input variables are added to the results at the time
728 index after an interval, i.e. locals and outputs."""
729 _variables = []
730 for causality in self.config.result_causalities:
731 if causality == Causality.output:
732 _variables.extend(self.model.outputs)
733 elif causality == Causality.local:
734 _variables.extend(self.model.states)
735 return _variables
738def convert_agent_vars_to_list_of_dicts(var: AgentVariables) -> List[Dict]:
739 """
740 Function to convert AgentVariables to a list of dictionaries containing information for
741 ModelVariables.
742 """
743 var_dict_list = [
744 agent_var.dict(exclude={"source", "alias", "shared", "rdf_class"})
745 for agent_var in var
746 ]
747 return var_dict_list