Source code for agentlib.modules.controller.controller

"""
This modules defines re-use able controller modules,
such as the standard Controller and the SISOController
"""

import abc
import logging
from math import inf
from typing import Generator

from pydantic import field_validator, Field
from pydantic_core.core_schema import FieldValidationInfo

from agentlib.core import BaseModule, Agent, BaseModuleConfig
from agentlib.core.datamodels import AgentVariable

logger = logging.getLogger(__name__)


[docs]class Controller(BaseModule): """ Base class for all controller tasks within an agent """ def __init__(self, *, config: dict, agent: Agent): super().__init__(config=config, agent=agent) self.step = self.loop_sim() @property def step(self) -> Generator: """Return the generator for the do_step function""" return self._step @step.setter def step(self, step: Generator): """Set the generator for the do_step function""" self._step = step
[docs] def process(self): """ "Only called on run() to initialize the step.""" # pylint: disable=stop-iteration-return next(self.step) yield self.env.event()
[docs] def loop_sim(self): """Loop over the do_step function""" raise NotImplementedError("Needs to be implemented by derived modules")
[docs] @abc.abstractmethod def do_step(self, inp_var: AgentVariable): """Controller step function. Needs to be a generator function, thus using yield instead of return""" raise NotImplementedError("Needs to be implemented by derived modules")
[docs]class SISOControllerConfig(BaseModuleConfig): """Check all inputs of a SISO-Contoller Parameters used in all SISO Controllers. ub: Upper bound of controller output lb: Lower bound of controller output reverser: Change of sign. """ input: AgentVariable = AgentVariable(name="u", type="float", value=0) output: AgentVariable = AgentVariable(name="y", type="float", value=0) ub: float = Field(title="Upper bound", default=inf) lb: float = Field(title="Lower bound", default=-inf) reverse: bool = Field(title="Change of sign", default=False)
[docs] @field_validator("lb") @classmethod def check_bounds(cls, lb, info: FieldValidationInfo): """Check if upper and lower bound values are correct""" assert info.data["ub"] > lb, "Upper limit must be greater than lower limit" return lb
[docs] @field_validator("output", "input") @classmethod def check_value_type(cls, var): if var.value is None: var.value = 0.0 return var
[docs]class SISOController(Controller): """ Base class for all controller having one single input and one single output """ config: SISOControllerConfig @property def ub(self): """The ub value""" return self.config.ub @property def lb(self): """The lb value""" return self.config.lb @property def reverse(self): """The reverse value""" return self.config.reverse
[docs] def register_callbacks(self): """A SISO controller has only one input and only reacts to this input.""" inp = self.get(self.config.input.name) self.agent.data_broker.register_callback( alias=inp.alias, source=inp.source, callback=self._siso_callback, name=inp.name, )
def _siso_callback(self, inp: AgentVariable, name: str): self.logger.debug("Received input %s=%s", name, inp.value) out_val = self.step.send(inp) if out_val is None: self.logger.error("Output value is None. Won't send it.") else: out_name = self.config.output.name self.logger.debug("Sending output %s=%s", out_name, out_val) self.set(name=out_name, value=out_val)