Source code for agentlib.modules.controller.pid

from math import inf, isclose
from typing import Union

from pydantic import field_validator
from pydantic_core.core_schema import FieldValidationInfo

from agentlib.modules.controller import SISOController, SISOControllerConfig
from agentlib.core.datamodels import AgentVariable
from agentlib.core.errors import ConfigurationError


[docs]class PIDConfig(SISOControllerConfig): """ Pydantic data model for pid controller configuration parser """ setpoint: Union[AgentVariable, float] = AgentVariable( name="setpoint", description="Pid Setpoint", type="float" ) Kp: Union[AgentVariable, float] = AgentVariable( name="Kp", description="Proportional gain", type="float", value=1 ) Ti: Union[AgentVariable, float] = AgentVariable( name="Ti", description="Integration time in s", type="float", value=inf ) Td: Union[AgentVariable, float] = AgentVariable( name="Td", description="Derivative time in s", type="float", value=0, unit="seconds", )
[docs] @field_validator("Kp", "Ti", "Td", "setpoint", mode="before") @classmethod def convert_to_variable(cls, parameter, info: FieldValidationInfo): if isinstance(parameter, (float, int)): default = cls.default(info.field_name) parameter = default.copy(update={"value": parameter}) if isinstance(parameter, AgentVariable): value = parameter.value else: value = parameter.get("value") if value is None: raise ConfigurationError( f"PID needs a value for the variable '{parameter.name}'." ) return parameter
[docs]class PID(SISOController): """ A proportional–integral–derivative controller (PID controller or three-term controller) with anti-wind up. It continuously calculates an error value e(t) as the difference between a desired set point and a measured process variable and applies a correction based on proportional, integral, and derivative terms (denoted P, I, and D respectively) +--------------------+---------------+---------------+---------------------+ | Parameter | Kp | Ki=(Kp/Ti) | Kd=(Kp*Td) | +--------------------+---------------+---------------+---------------------+ | Rise time | Decrease | Decrease | Minor change | | Overshoot | Increase | Increase | Decrease | | Settling time | Small change | Increase | Decrease | | Steady-state error | Decrease | Eliminate | No effect in theory | | Stability | Degrade | Degrade | Improve if Kd small | +--------------------+---------------+---------------+---------------------+ Configs: setpoint (float): Set point of the controller Kp (float): proportional gain Ti (float): integration time in s Td (float): derivative time in s ub (float): high control limit lb (float): low control limit reverse(boolean): change of sign """ config: PIDConfig def __init__(self, *, config, agent): self.integral: float = 0 self.e_last: float = 0 self.last_time: float = 0 self.e_der_t: float = 0 super().__init__(config=config, agent=agent) @property def setpoint(self): """Get the current setpoint value from data_broker or config""" return self.get(self.config.setpoint.name).value @property def Kp(self): """Get the current Kp value from data_broker or config""" return self.get(self.config.Kp.name).value @property def Ti(self): """Get the current Ti value from data_broker or config""" return self.get(self.config.Ti.name).value @property def Td(self): """Get the current Td value from data_broker or config""" return self.get(self.config.Td.name).value
[docs] def loop_sim(self): self.last_time = self.env.time y = None while True: inp_var = yield y y = self.do_step(inp_var)
[docs] def do_step(self, inp_var: AgentVariable): u = inp_var.value curr_time = inp_var.timestamp t_sample = curr_time - self.last_time if t_sample <= 0: self.logger.error( "t_sample is smaller equal zero. %s" " Can't compute integral part.", t_sample, ) return # calculate control difference if self.reverse: e = u - self.setpoint else: e = self.setpoint - u # calculate integral if self.Ti != 0: self.integral += 1 / self.Ti * e * t_sample # calculate differential. # Assert that t_sample is numerically feasible to use in division if isclose(t_sample, 0.0, rel_tol=1e-12, abs_tol=0.0): self.logger.error( "Sample rate to high! t_sample: %s. " "Can't calculate differential part of PID", t_sample, ) self.e_der_t = 0 else: self.e_der_t = self.Td * (e - self.e_last) / t_sample # PID output y = self.Kp * (e + self.integral + self.e_der_t) # Limiter if y < self.lb: y = self.lb self.integral = y / self.Kp - e elif y > self.ub: y = self.ub self.integral = y / self.Kp - e self.e_last = e self.last_time = curr_time # Set the value for the next iteration return y