Coverage for agentlib/modules/controller/controller.py: 64%
64 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2This modules defines re-use able controller modules,
3such as the standard Controller and the SISOController
4"""
6import abc
7import logging
8from math import inf
9from typing import Generator
11from pydantic import field_validator, Field
12from pydantic_core.core_schema import FieldValidationInfo
14from agentlib.core import BaseModule, Agent, BaseModuleConfig
15from agentlib.core.datamodels import AgentVariable
17logger = logging.getLogger(__name__)
20class Controller(BaseModule):
21 """
22 Base class for all controller tasks within an agent
23 """
25 def __init__(self, *, config: dict, agent: Agent):
26 super().__init__(config=config, agent=agent)
27 self.step = self.loop_sim()
29 @property
30 def step(self) -> Generator:
31 """Return the generator for the do_step function"""
32 return self._step
34 @step.setter
35 def step(self, step: Generator):
36 """Set the generator for the do_step function"""
37 self._step = step
39 def process(self):
40 """ "Only called on run() to initialize the step."""
41 # pylint: disable=stop-iteration-return
42 next(self.step)
43 yield self.env.event()
45 def loop_sim(self):
46 """Loop over the do_step function"""
47 raise NotImplementedError("Needs to be implemented by derived modules")
49 @abc.abstractmethod
50 def do_step(self, inp_var: AgentVariable):
51 """Controller step function. Needs to be a generator function,
52 thus using yield instead of return"""
53 raise NotImplementedError("Needs to be implemented by derived modules")
56class SISOControllerConfig(BaseModuleConfig):
57 """Check all inputs of a SISO-Contoller
59 Parameters used in all SISO Controllers.
60 ub: Upper bound of controller output
61 lb: Lower bound of controller output
62 reverser: Change of sign.
63 """
65 input: AgentVariable = AgentVariable(name="u", type="float", value=0)
66 output: AgentVariable = AgentVariable(name="y", type="float", value=0)
67 ub: float = Field(title="Upper bound", default=inf)
68 lb: float = Field(title="Lower bound", default=-inf)
69 reverse: bool = Field(title="Change of sign", default=False)
71 @field_validator("lb")
72 @classmethod
73 def check_bounds(cls, lb, info: FieldValidationInfo):
74 """Check if upper and lower bound values are correct"""
75 assert info.data["ub"] > lb, "Upper limit must be greater than lower limit"
76 return lb
78 @field_validator("output", "input")
79 @classmethod
80 def check_value_type(cls, var):
81 if var.value is None:
82 var.value = 0.0
83 return var
86class SISOController(Controller):
87 """
88 Base class for all controller having one single input and one single output
89 """
91 config: SISOControllerConfig
93 @property
94 def ub(self):
95 """The ub value"""
96 return self.config.ub
98 @property
99 def lb(self):
100 """The lb value"""
101 return self.config.lb
103 @property
104 def reverse(self):
105 """The reverse value"""
106 return self.config.reverse
108 def register_callbacks(self):
109 """A SISO controller has only one input and only reacts to this input."""
110 inp = self.get(self.config.input.name)
111 self.agent.data_broker.register_callback(
112 alias=inp.alias,
113 source=inp.source,
114 callback=self._siso_callback,
115 name=inp.name,
116 )
118 def _siso_callback(self, inp: AgentVariable, name: str):
119 self.logger.debug("Received input %s=%s", name, inp.value)
120 out_val = self.step.send(inp)
121 if out_val is None:
122 self.logger.error("Output value is None. Won't send it.")
123 else:
124 out_name = self.config.output.name
125 self.logger.debug("Sending output %s=%s", out_name, out_val)
126 self.set(name=out_name, value=out_val)