Coverage for agentlib/utils/__init__.py: 91%
34 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-17 17:46 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-12-17 17:46 +0000
1"""
2Module containing all util
3functions for the agentlib.
5Most notably, the custom injection enabling
6dynamic loading of custom models and modules.
7"""
9import importlib.util
10import os
11import sys
12from pathlib import Path
14import numpy as np
16from .local_broadcast_broker import LocalBroadcastBroker
17from .local_broker import LocalBroker
18from .multi_processing_broker import MultiProcessingBroker
21def custom_injection(config: dict, module_name: str = None):
22 """
23 Function to dynamically load new python files into
24 the agentlib. Using this, users may use custom modules
25 oder custom models together with the existing agentlib objects.
27 Args:
28 config (dict): Config dict containing the following items:
29 file (str): Filepath to a python file (.py)
30 class_name (str): Name of the class to be imported
31 module_name (str, optional): Name of the imported module
32 in the sys.modules list. Carefully check if duplicate
33 module keys raise unexpected behaviour. If so,
34 use randomly generated strings or similar in classes
35 calling this function. Default is None.
36 In that case, the path is converted to a matching string.
38 Returns:
39 class (object): The class object specified by class_name
40 """
41 assert "file" in config, (
42 "For custom module injection, the config type dict has to "
43 "contain a 'file'-key with an existing python file as value"
44 )
45 assert "class_name" in config, (
46 "For custom module injection, the config type dict has to "
47 "contain a 'class_name'-key with a string as value "
48 "specifying the class to inject"
49 )
50 file = config.get("file")
51 class_name = config.get("class_name")
52 if not isinstance(file, (str, Path)):
53 raise TypeError(f"Given file is not a string but {type(file)}")
54 # Convert to Path object
55 file = Path(file)
56 # Check if file is a valid filepath
57 if not os.path.isfile(file):
58 raise FileNotFoundError(
59 f"Given file '{str(file)}' was not found on your device."
60 )
62 # Build module_name if not given:
63 if module_name is None:
64 # Build a unique module_name to be imported based on the path
65 module_name = ".".join([p.name for p in file.parents][:-1] + [file.stem])
67 # Custom file import
68 try:
69 # Check if the module_name is already present
70 if module_name in sys.modules:
71 custom_module = sys.modules[module_name]
72 else:
73 spec = importlib.util.spec_from_file_location(module_name, file)
74 custom_module = importlib.util.module_from_spec(spec)
75 sys.modules[module_name] = custom_module
76 spec.loader.exec_module(custom_module)
77 except ImportError as err:
78 raise ImportError(
79 f"Could not inject given module '{class_name}' at '{file}' due to import "
80 "error. Carefully check for circular imports and partially "
81 "imported objects based on the following error message: "
82 f"{err}"
83 ) from err
84 try:
85 return custom_module.__dict__[class_name]
86 except KeyError:
87 raise ImportError(
88 f"Given module '{custom_module}' does not "
89 f"contain the specified class {class_name}"
90 )
93def create_time_samples(dt, t_end):
94 """
95 Function to generate an array of time steps
96 using the dt object.
97 Note that, if dt is not a true divider of t_end,
98 the output array is not equally sampled.
100 Args:
101 dt (float): Step-size
102 t_end (float): end time
104 Returns:
105 np.ndarray: Array of time samples from 0 to t_end with step size dt, ensuring t_end is always included.
106 """
107 samples = np.arange(0, t_end, dt, dtype=float)
108 if samples[-1] == t_end:
109 return samples
110 return np.append(samples, t_end)