agentlib.modules.communicator package
Submodules
agentlib.modules.communicator.clonemap module
This module implements a clonemap compatible communicator
- class agentlib.modules.communicator.clonemap.CloneMAPAgent(info: AgentInfo, mas_name: str, mas_custom: str, msg_in: Queue, msg_out: Queue, log_out: Queue, ts_out: Queue)[source]
 Bases:
AgentcloneMAP Agent
- class agentlib.modules.communicator.clonemap.CloneMAPClient(config: dict, agent: Agent)[source]
 Bases:
CommunicatorThis communicator implements the communication between agents via clonemap.
- generate_topic(agent_id: str, subscription: bool = True)[source]
 Generate the topic with the given agent_id and configs prefix
- property prefix
 Custom prefix for clonemap. For MAS with id 0 and default config it’s: /mas_0/agentlib
- property pubtopic
 Generate the publication topic
- pydantic model agentlib.modules.communicator.clonemap.CloneMAPClientConfig[source]
 Bases:
SubscriptionCommunicatorConfigclonemap communicator settings
- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field cagent: Agent = None
 Agent object of CloneMAP
- field env_factor: float = 1
 Specify Environment Variable Factor
- field prefix: str = '/agentlib'
 Prefix for MQTT-Topic
- field subtopics: List[str] | str = []
 Topics to that the agent subscribes
- Validated by:
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
- class agentlib.modules.communicator.clonemap.CustomLogger(cagent: Agent)[source]
 Bases:
Handlercustom logger to route all logs to the cloneMAP logger module
- agentlib.modules.communicator.clonemap.set_and_get_cmap_config(agent_config: dict, cagent: Agent)[source]
 Manipulate the given agent_config to pass the cagent into the config. Further, get the settings of log level and env_factor to start the agent correctly.
- Parameters:
 dict (agent_config) – Agent configuation
clonemapyagent.Agent (cagent) – Clonemappy Agent
- Returns:
 - dict
 Manipulated config
- env_factor: float
 Environment factor in config
- log_level: str
 Log-level of config
- Return type:
 agent_config
agentlib.modules.communicator.communicator module
Module contains basics communicator modules
- class agentlib.modules.communicator.communicator.CommunicationDict[source]
 Bases:
TypedDict- alias: str
 
- source: str
 
- timestamp: float
 
- type: str
 
- value: Any
 
- class agentlib.modules.communicator.communicator.Communicator(*, config: dict, agent: Agent)[source]
 Bases:
BaseModuleBase class for all communicators
- process()[source]
 This abstract method must be implemented in order to sync the module with the other processes of the agent and the whole MAS.
- short_dict(variable: AgentVariable, parse_json: bool = True) CommunicationDict[source]
 Creates a short dict serialization of the Variable.
Only contains attributes of the AgentVariable, that are relevant for other modules or agents. For performance and privacy reasons, this function should be called for communicators.
- to_json(payload: CommunicationDict) bytes | str[source]
 Transforms the payload into json serialized form. Dynamically uses orjson if it is installed, and the builtin json otherwise.
Returns bytes or str depending on the library used, but this has not mattered with the communicators as of now.
- pydantic model agentlib.modules.communicator.communicator.CommunicatorConfig[source]
 Bases:
BaseModuleConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field use_orjson: bool = False
 If true, the faster orjson library will be used for serialization deserialization. Requires the optional dependency.
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
- class agentlib.modules.communicator.communicator.LocalCommunicator(config: dict, agent: Agent)[source]
 Bases:
CommunicatorBase class for local communicators.
- pydantic model agentlib.modules.communicator.communicator.LocalCommunicatorConfig[source]
 Bases:
CommunicatorConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field parse_json: bool = False
 
- field queue_size: int = 10000
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
- pydantic model agentlib.modules.communicator.communicator.SubscriptionCommunicatorConfig[source]
 Bases:
CommunicatorConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field subscriptions: List[str] | str = []
 List of agent-id strings to subscribe to
- Validated by:
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
agentlib.modules.communicator.local module
- class agentlib.modules.communicator.local.LocalClient(config: dict, agent: Agent)[source]
 Bases:
LocalCommunicatorThis communicator implements the communication between agents via a central process broker. Note: The broker is implemented as singleton this means that all agents must be in the same process!
- property subscriptions
 
- pydantic model agentlib.modules.communicator.local.LocalSubscriptionCommunicatorConfig[source]
 Bases:
LocalCommunicatorConfig,SubscriptionCommunicatorConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
agentlib.modules.communicator.local_broadcast module
- class agentlib.modules.communicator.local_broadcast.LocalBroadcastClient(config: dict, agent: Agent)[source]
 Bases:
LocalCommunicatorThis communicator implements the communication between agents via a broadcast broker central process. Note: The broker is implemented as singleton. This means that all agents must be in the same process!
agentlib.modules.communicator.local_multiprocessing module
- class agentlib.modules.communicator.local_multiprocessing.MultiProcessingBroadcastClient(config: dict, agent: Agent)[source]
 Bases:
CommunicatorThis communicator implements the communication between agents via a central process broker.
- pydantic model agentlib.modules.communicator.local_multiprocessing.MultiProcessingBroadcastClientConfig[source]
 Bases:
CommunicatorConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field authkey: bytes = b'useTheAgentlib'
 Authorization key for the connection with the broker.
- field ipv4: IPv4Address = '127.0.0.1'
 IP Address for the communication server. Defaults to localhost.
- field port: int = 50000
 Port for setting up the connection with the broker.
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
agentlib.modules.communicator.mqtt module
- pydantic model agentlib.modules.communicator.mqtt.BaseMQTTClientConfig[source]
 Bases:
SubscriptionCommunicatorConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field clean_start: bool = True
 True, False or MQTT_CLEAN_START_FIRST_ONLY.Sets the MQTT v5.0 clean_start flag always, never or on the first successful connect only, respectively. MQTT session data (such as outstanding messages and subscriptions) is cleared on successful connect when the clean_start flag is set.
- field client_id: str | None = None
 
- field connection_timeout: float = 10
 Number of seconds to wait for the initial connection until throwing an Error.
- field keepalive: int = 60
 Maximum period in seconds between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker.
- field password: str | None = None
 
- field prefix: str = '/agentlib'
 Prefix for MQTT-Topic
- field qos: int = 0
 Quality of Service
- Constraints:
 ge = 0
le = 2
- field subtopics: List[str] | str = []
 Topics to that the agent subscribes
- Validated by:
 
- field tls_ca_certs: str | None = None
 Path to the Certificate Authority certificate files. If None, windows certificate will be used.
- field use_tls: bool | None = None
 Option to use TLS certificates
- field username: str | None = None
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
- class agentlib.modules.communicator.mqtt.BaseMqttClient(config: dict, agent: Agent)[source]
 Bases:
Communicator- mqttc_type
 alias of
Client
- property topics_size
 
- pydantic model agentlib.modules.communicator.mqtt.MQTTClientConfig[source]
 Bases:
BaseMQTTClientConfig- Config:
 arbitrary_types_allowed: bool = True
validate_assignment: bool = True
extra: str = forbid
frozen: bool = True
- Fields:
 - Validators:
 
- field url: AnyUrl [Required]
 Host is the hostname or IP address of the remote broker.
- Validated by:
 
- model_post_init(context: Any, /) None
 This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
 self – The BaseModel instance.
context – The context.
Module contents
Package contains all modules to communicate messages with