"""Module containing all utilfunctions for the agentlib.Most notably, the custom injection enablingdynamic loading of custom models and modules."""importimportlib.utilimportosimportsysfrompathlibimportPathimportnumpyasnpfrom.local_broadcast_brokerimportLocalBroadcastBrokerfrom.local_brokerimportLocalBrokerfrom.multi_processing_brokerimportMultiProcessingBroker
[docs]defcustom_injection(config:dict,module_name:str=None):""" Function to dynamically load new python files into the agentlib. Using this, users may use custom modules oder custom models together with the existing agentlib objects. Args: config (dict): Config dict containing the following items: file (str): Filepath to a python file (.py) class_name (str): Name of the class to be imported module_name (str, optional): Name of the imported module in the sys.modules list. Carefully check if duplicate module keys raise unexpected behaviour. If so, use randomly generated strings or similar in classes calling this function. Default is None. In that case, the path is converted to a matching string. Returns: class (object): The class object specified by class_name """assert"file"inconfig,("For custom module injection, the config type dict has to ""contain a 'file'-key with an existing python file as value")assert"class_name"inconfig,("For custom module injection, the config type dict has to ""contain a 'class_name'-key with a string as value ""specifying the class to inject")file=config.get("file")class_name=config.get("class_name")ifnotisinstance(file,(str,Path)):raiseTypeError(f"Given file is not a string but {type(file)}")# Convert to Path objectfile=Path(file)# Check if file is a valid filepathifnotos.path.isfile(file):raiseFileNotFoundError(f"Given file '{str(file)}' was not found on your device.")# Build module_name if not given:ifmodule_nameisNone:# Build a unique module_name to be imported based on the pathmodule_name=".".join([p.nameforpinfile.parents][:-1]+[file.stem])# Custom file importtry:# Check if the module_name is already presentifmodule_nameinsys.modules:custom_module=sys.modules[module_name]else:spec=importlib.util.spec_from_file_location(module_name,file)custom_module=importlib.util.module_from_spec(spec)sys.modules[module_name]=custom_modulespec.loader.exec_module(custom_module)exceptImportErroraserr:raiseImportError(f"Could not inject given module '{class_name}' at '{file}' due to import ""error. Carefully check for circular imports and partially ""imported objects based on the following error message: "f"{err}")fromerrtry:returncustom_module.__dict__[class_name]exceptKeyError:raiseImportError(f"Given module '{custom_module}' does not "f"contain the specified class {class_name}")
[docs]defcreate_time_samples(dt:float,t_end:float)->np.ndarray:""" Function to generate an array of time steps using the dt object. Note that, if dt is not a true divider of t_end, the output array is not equally sampled. Args: dt (float): Step-size t_end (float): end time Returns: np.ndarray: Array of time samples from 0 to t_end with step size dt, ensuring t_end is always included. """samples=np.arange(0,t_end,dt,dtype=float)ifsamples[-1]==t_end:returnsamplesreturnnp.append(samples,t_end)