Coverage for agentlib/modules/controller/pid.py: 0%
75 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1from math import inf, isclose
2from typing import Union
4from pydantic import field_validator
5from pydantic_core.core_schema import FieldValidationInfo
7from agentlib.modules.controller import SISOController, SISOControllerConfig
8from agentlib.core.datamodels import AgentVariable
9from agentlib.core.errors import ConfigurationError
12class PIDConfig(SISOControllerConfig):
13 """
14 Pydantic data model for pid controller configuration parser
15 """
17 setpoint: Union[AgentVariable, float] = AgentVariable(
18 name="setpoint", description="Pid Setpoint", type="float"
19 )
20 Kp: Union[AgentVariable, float] = AgentVariable(
21 name="Kp", description="Proportional gain", type="float", value=1
22 )
23 Ti: Union[AgentVariable, float] = AgentVariable(
24 name="Ti", description="Integration time in s", type="float", value=inf
25 )
26 Td: Union[AgentVariable, float] = AgentVariable(
27 name="Td",
28 description="Derivative time in s",
29 type="float",
30 value=0,
31 unit="seconds",
32 )
34 @field_validator("Kp", "Ti", "Td", "setpoint", mode="before")
35 @classmethod
36 def convert_to_variable(cls, parameter, info: FieldValidationInfo):
37 if isinstance(parameter, (float, int)):
38 default = cls.default(info.field_name)
39 parameter = default.copy(update={"value": parameter})
40 if isinstance(parameter, AgentVariable):
41 value = parameter.value
42 else:
43 value = parameter.get("value")
44 if value is None:
45 raise ConfigurationError(
46 f"PID needs a value for the variable '{parameter.name}'."
47 )
48 return parameter
51class PID(SISOController):
52 """
53 A proportional–integral–derivative controller (PID
54 controller or three-term controller) with anti-wind up.
55 It continuously calculates an error value e(t) as the difference between a
56 desired set point and a measured process variable and applies a correction
57 based on proportional, integral, and derivative terms
58 (denoted P, I, and D respectively)
59 +--------------------+---------------+---------------+---------------------+
60 | Parameter | Kp | Ki=(Kp/Ti) | Kd=(Kp*Td) |
61 +--------------------+---------------+---------------+---------------------+
62 | Rise time | Decrease | Decrease | Minor change |
63 | Overshoot | Increase | Increase | Decrease |
64 | Settling time | Small change | Increase | Decrease |
65 | Steady-state error | Decrease | Eliminate | No effect in theory |
66 | Stability | Degrade | Degrade | Improve if Kd small |
67 +--------------------+---------------+---------------+---------------------+
69 Configs:
70 setpoint (float): Set point of the controller
71 Kp (float): proportional gain
72 Ti (float): integration time in s
73 Td (float): derivative time in s
74 ub (float): high control limit
75 lb (float): low control limit
76 reverse(boolean): change of sign
78 """
80 config: PIDConfig
82 def __init__(self, *, config, agent):
83 self.integral: float = 0
84 self.e_last: float = 0
85 self.last_time: float = 0
86 self.e_der_t: float = 0
87 super().__init__(config=config, agent=agent)
89 @property
90 def setpoint(self):
91 """Get the current setpoint value from data_broker or config"""
92 return self.get(self.config.setpoint.name).value
94 @property
95 def Kp(self):
96 """Get the current Kp value from data_broker or config"""
97 return self.get(self.config.Kp.name).value
99 @property
100 def Ti(self):
101 """Get the current Ti value from data_broker or config"""
102 return self.get(self.config.Ti.name).value
104 @property
105 def Td(self):
106 """Get the current Td value from data_broker or config"""
107 return self.get(self.config.Td.name).value
109 def loop_sim(self):
110 self.last_time = self.env.time
111 y = None
112 while True:
113 inp_var = yield y
114 y = self.do_step(inp_var)
116 def do_step(self, inp_var: AgentVariable):
117 u = inp_var.value
118 curr_time = inp_var.timestamp
120 t_sample = curr_time - self.last_time
121 if t_sample <= 0:
122 self.logger.error(
123 "t_sample is smaller equal zero. %s" " Can't compute integral part.",
124 t_sample,
125 )
126 return
128 # calculate control difference
129 if self.reverse:
130 e = u - self.setpoint
131 else:
132 e = self.setpoint - u
134 # calculate integral
135 if self.Ti != 0:
136 self.integral += 1 / self.Ti * e * t_sample
138 # calculate differential.
139 # Assert that t_sample is numerically feasible to use in division
140 if isclose(t_sample, 0.0, rel_tol=1e-12, abs_tol=0.0):
141 self.logger.error(
142 "Sample rate to high! t_sample: %s. "
143 "Can't calculate differential part of PID",
144 t_sample,
145 )
146 self.e_der_t = 0
147 else:
148 self.e_der_t = self.Td * (e - self.e_last) / t_sample
150 # PID output
151 y = self.Kp * (e + self.integral + self.e_der_t)
153 # Limiter
154 if y < self.lb:
155 y = self.lb
156 self.integral = y / self.Kp - e
157 elif y > self.ub:
158 y = self.ub
159 self.integral = y / self.Kp - e
161 self.e_last = e
162 self.last_time = curr_time # Set the value for the next iteration
163 return y