Coverage for agentlib/modules/controller/controller.py: 64%

64 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2This modules defines re-use able controller modules, 

3such as the standard Controller and the SISOController 

4""" 

5 

6import abc 

7import logging 

8from math import inf 

9from typing import Generator 

10 

11from pydantic import field_validator, Field 

12from pydantic_core.core_schema import FieldValidationInfo 

13 

14from agentlib.core import BaseModule, Agent, BaseModuleConfig 

15from agentlib.core.datamodels import AgentVariable 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class Controller(BaseModule): 

21 """ 

22 Base class for all controller tasks within an agent 

23 """ 

24 

25 def __init__(self, *, config: dict, agent: Agent): 

26 super().__init__(config=config, agent=agent) 

27 self.step = self.loop_sim() 

28 

29 @property 

30 def step(self) -> Generator: 

31 """Return the generator for the do_step function""" 

32 return self._step 

33 

34 @step.setter 

35 def step(self, step: Generator): 

36 """Set the generator for the do_step function""" 

37 self._step = step 

38 

39 def process(self): 

40 """ "Only called on run() to initialize the step.""" 

41 # pylint: disable=stop-iteration-return 

42 next(self.step) 

43 yield self.env.event() 

44 

45 def loop_sim(self): 

46 """Loop over the do_step function""" 

47 raise NotImplementedError("Needs to be implemented by derived modules") 

48 

49 @abc.abstractmethod 

50 def do_step(self, inp_var: AgentVariable): 

51 """Controller step function. Needs to be a generator function, 

52 thus using yield instead of return""" 

53 raise NotImplementedError("Needs to be implemented by derived modules") 

54 

55 

56class SISOControllerConfig(BaseModuleConfig): 

57 """Check all inputs of a SISO-Contoller 

58 

59 Parameters used in all SISO Controllers. 

60 ub: Upper bound of controller output 

61 lb: Lower bound of controller output 

62 reverser: Change of sign. 

63 """ 

64 

65 input: AgentVariable = AgentVariable(name="u", type="float", value=0) 

66 output: AgentVariable = AgentVariable(name="y", type="float", value=0) 

67 ub: float = Field(title="Upper bound", default=inf) 

68 lb: float = Field(title="Lower bound", default=-inf) 

69 reverse: bool = Field(title="Change of sign", default=False) 

70 

71 @field_validator("lb") 

72 @classmethod 

73 def check_bounds(cls, lb, info: FieldValidationInfo): 

74 """Check if upper and lower bound values are correct""" 

75 assert info.data["ub"] > lb, "Upper limit must be greater than lower limit" 

76 return lb 

77 

78 @field_validator("output", "input") 

79 @classmethod 

80 def check_value_type(cls, var): 

81 if var.value is None: 

82 var.value = 0.0 

83 return var 

84 

85 

86class SISOController(Controller): 

87 """ 

88 Base class for all controller having one single input and one single output 

89 """ 

90 

91 config: SISOControllerConfig 

92 

93 @property 

94 def ub(self): 

95 """The ub value""" 

96 return self.config.ub 

97 

98 @property 

99 def lb(self): 

100 """The lb value""" 

101 return self.config.lb 

102 

103 @property 

104 def reverse(self): 

105 """The reverse value""" 

106 return self.config.reverse 

107 

108 def register_callbacks(self): 

109 """A SISO controller has only one input and only reacts to this input.""" 

110 inp = self.get(self.config.input.name) 

111 self.agent.data_broker.register_callback( 

112 alias=inp.alias, 

113 source=inp.source, 

114 callback=self._siso_callback, 

115 name=inp.name, 

116 ) 

117 

118 def _siso_callback(self, inp: AgentVariable, name: str): 

119 self.logger.debug("Received input %s=%s", name, inp.value) 

120 out_val = self.step.send(inp) 

121 if out_val is None: 

122 self.logger.error("Output value is None. Won't send it.") 

123 else: 

124 out_name = self.config.output.name 

125 self.logger.debug("Sending output %s=%s", out_name, out_val) 

126 self.set(name=out_name, value=out_val)