Coverage for agentlib/modules/controller/pid.py: 0%

75 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1from math import inf, isclose 

2from typing import Union 

3 

4from pydantic import field_validator 

5from pydantic_core.core_schema import FieldValidationInfo 

6 

7from agentlib.modules.controller import SISOController, SISOControllerConfig 

8from agentlib.core.datamodels import AgentVariable 

9from agentlib.core.errors import ConfigurationError 

10 

11 

12class PIDConfig(SISOControllerConfig): 

13 """ 

14 Pydantic data model for pid controller configuration parser 

15 """ 

16 

17 setpoint: Union[AgentVariable, float] = AgentVariable( 

18 name="setpoint", description="Pid Setpoint", type="float" 

19 ) 

20 Kp: Union[AgentVariable, float] = AgentVariable( 

21 name="Kp", description="Proportional gain", type="float", value=1 

22 ) 

23 Ti: Union[AgentVariable, float] = AgentVariable( 

24 name="Ti", description="Integration time in s", type="float", value=inf 

25 ) 

26 Td: Union[AgentVariable, float] = AgentVariable( 

27 name="Td", 

28 description="Derivative time in s", 

29 type="float", 

30 value=0, 

31 unit="seconds", 

32 ) 

33 

34 @field_validator("Kp", "Ti", "Td", "setpoint", mode="before") 

35 @classmethod 

36 def convert_to_variable(cls, parameter, info: FieldValidationInfo): 

37 if isinstance(parameter, (float, int)): 

38 default = cls.default(info.field_name) 

39 parameter = default.copy(update={"value": parameter}) 

40 if isinstance(parameter, AgentVariable): 

41 value = parameter.value 

42 else: 

43 value = parameter.get("value") 

44 if value is None: 

45 raise ConfigurationError( 

46 f"PID needs a value for the variable '{parameter.name}'." 

47 ) 

48 return parameter 

49 

50 

51class PID(SISOController): 

52 """ 

53 A proportional–integral–derivative controller (PID 

54 controller or three-term controller) with anti-wind up. 

55 It continuously calculates an error value e(t) as the difference between a 

56 desired set point and a measured process variable and applies a correction 

57 based on proportional, integral, and derivative terms 

58 (denoted P, I, and D respectively) 

59 +--------------------+---------------+---------------+---------------------+ 

60 | Parameter | Kp | Ki=(Kp/Ti) | Kd=(Kp*Td) | 

61 +--------------------+---------------+---------------+---------------------+ 

62 | Rise time | Decrease | Decrease | Minor change | 

63 | Overshoot | Increase | Increase | Decrease | 

64 | Settling time | Small change | Increase | Decrease | 

65 | Steady-state error | Decrease | Eliminate | No effect in theory | 

66 | Stability | Degrade | Degrade | Improve if Kd small | 

67 +--------------------+---------------+---------------+---------------------+ 

68 

69 Configs: 

70 setpoint (float): Set point of the controller 

71 Kp (float): proportional gain 

72 Ti (float): integration time in s 

73 Td (float): derivative time in s 

74 ub (float): high control limit 

75 lb (float): low control limit 

76 reverse(boolean): change of sign 

77 

78 """ 

79 

80 config: PIDConfig 

81 

82 def __init__(self, *, config, agent): 

83 self.integral: float = 0 

84 self.e_last: float = 0 

85 self.last_time: float = 0 

86 self.e_der_t: float = 0 

87 super().__init__(config=config, agent=agent) 

88 

89 @property 

90 def setpoint(self): 

91 """Get the current setpoint value from data_broker or config""" 

92 return self.get(self.config.setpoint.name).value 

93 

94 @property 

95 def Kp(self): 

96 """Get the current Kp value from data_broker or config""" 

97 return self.get(self.config.Kp.name).value 

98 

99 @property 

100 def Ti(self): 

101 """Get the current Ti value from data_broker or config""" 

102 return self.get(self.config.Ti.name).value 

103 

104 @property 

105 def Td(self): 

106 """Get the current Td value from data_broker or config""" 

107 return self.get(self.config.Td.name).value 

108 

109 def loop_sim(self): 

110 self.last_time = self.env.time 

111 y = None 

112 while True: 

113 inp_var = yield y 

114 y = self.do_step(inp_var) 

115 

116 def do_step(self, inp_var: AgentVariable): 

117 u = inp_var.value 

118 curr_time = inp_var.timestamp 

119 

120 t_sample = curr_time - self.last_time 

121 if t_sample <= 0: 

122 self.logger.error( 

123 "t_sample is smaller equal zero. %s" " Can't compute integral part.", 

124 t_sample, 

125 ) 

126 return 

127 

128 # calculate control difference 

129 if self.reverse: 

130 e = u - self.setpoint 

131 else: 

132 e = self.setpoint - u 

133 

134 # calculate integral 

135 if self.Ti != 0: 

136 self.integral += 1 / self.Ti * e * t_sample 

137 

138 # calculate differential. 

139 # Assert that t_sample is numerically feasible to use in division 

140 if isclose(t_sample, 0.0, rel_tol=1e-12, abs_tol=0.0): 

141 self.logger.error( 

142 "Sample rate to high! t_sample: %s. " 

143 "Can't calculate differential part of PID", 

144 t_sample, 

145 ) 

146 self.e_der_t = 0 

147 else: 

148 self.e_der_t = self.Td * (e - self.e_last) / t_sample 

149 

150 # PID output 

151 y = self.Kp * (e + self.integral + self.e_der_t) 

152 

153 # Limiter 

154 if y < self.lb: 

155 y = self.lb 

156 self.integral = y / self.Kp - e 

157 elif y > self.ub: 

158 y = self.ub 

159 self.integral = y / self.Kp - e 

160 

161 self.e_last = e 

162 self.last_time = curr_time # Set the value for the next iteration 

163 return y