Coverage for agentlib/modules/communicator/clonemap.py: 0%
110 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2This module implements a clonemap compatible communicator
3"""
5import json
6import logging
7import os
8from functools import cached_property
9from typing import Union, List
11from pydantic import Field, field_validator
13from agentlib.core import Agent, Environment
14from agentlib.core.datamodels import AgentVariable
15from agentlib.core.errors import OptionalDependencyError
16from agentlib.modules.communicator.communicator import (
17 Communicator,
18 SubscriptionCommunicatorConfig,
19)
20from agentlib.utils.validators import convert_to_list
22try:
23 import clonemapy.agent as clonemapyagent
24 import clonemapy.agency as clonemapyagency
25except ImportError as err:
26 raise OptionalDependencyError(
27 dependency_install="git+https://github.com/sogno-platform/clonemapy",
28 used_object="Module type 'clonemap'",
29 ) from err
32def set_and_get_cmap_config(agent_config: dict, cagent: clonemapyagent.Agent):
33 """
34 Manipulate the given agent_config to pass the cagent into the config.
35 Further, get the settings of log level and env_factor to
36 start the agent correctly.
38 Args:
39 agent_config dict: Agent configuation
40 cagent clonemapyagent.Agent: Clonemappy Agent
42 Returns:
43 agent_config: dict
44 Manipulated config
45 env_factor: float
46 Environment factor in config
47 log_level: str
48 Log-level of config
49 """
50 env_factor = 1
51 _default_lvl = os.environ.get("CLONEMAP_LOG_LEVEL", "ERROR")
52 log_level = _default_lvl
53 found_clonemap_module = False
54 module_types = []
55 for module in agent_config["modules"]:
56 _type = module["type"]
57 if isinstance(_type, dict):
58 module_types.append(_type["class_name"])
59 continue
60 module_types.append(_type)
61 if module["type"] == "clonemap":
62 module.update(
63 {
64 "cagent": cagent,
65 }
66 )
67 env_factor = module.get("env_factor", 1)
68 log_level = module.get("log_level", _default_lvl)
69 found_clonemap_module = True
70 if not found_clonemap_module:
71 from agentlib.core.errors import ConfigurationError
73 raise ConfigurationError(
74 "Each agents needs a clonemap communicator "
75 "module to be executed on clonemap. You passed the modules:"
76 f"{' ,'.join(module_types)}"
77 )
78 return agent_config, env_factor, log_level.upper()
81class CloneMAPClientConfig(SubscriptionCommunicatorConfig):
82 """
83 clonemap communicator settings
84 """
86 cagent: clonemapyagent.Agent = Field(
87 default=None, description="Agent object of CloneMAP"
88 )
89 subtopics: Union[List[str], str] = Field(
90 default=[], description="Topics to that the agent " "subscribes"
91 )
92 prefix: str = Field(default="/agentlib", description="Prefix for MQTT-Topic")
93 env_factor: float = Field(
94 default=1, description="Specify Environment Variable Factor"
95 )
97 # Add validator
98 check_subtopics = field_validator("subtopics")(convert_to_list)
101class CloneMAPClient(Communicator):
102 """
103 This communicator implements the communication between agents via clonemap.
104 """
106 config: CloneMAPClientConfig
108 def __init__(self, config: dict, agent: Agent):
109 Communicator.__init__(self=self, config=config, agent=agent)
110 self._subscribe()
111 behavior = self.config.cagent.new_mqtt_default_behavior(self._message_callback)
112 behavior.start()
113 behavior = self.config.cagent.new_custom_update_behavior(
114 self._config_update_callback
115 )
116 behavior.start()
118 @cached_property
119 def pubtopic(self):
120 """Generate the publication topic"""
121 return self.generate_topic(agent_id=self.agent.id, subscription=False)
123 def generate_topic(self, agent_id: str, subscription: bool = True):
124 """
125 Generate the topic with the given agent_id and
126 configs prefix
127 """
128 if subscription:
129 topic = "/".join([self.prefix, agent_id, "#"])
130 else:
131 topic = "/".join([self.prefix, agent_id])
132 topic.replace("//", "/")
133 return topic
135 @property
136 def prefix(self):
137 """Custom prefix for clonemap.
138 For MAS with id 0 and default config it's:
139 /mas_0/agentlib
140 """
141 return "/".join(
142 ["", f"mas_{self.config.cagent.masid}", self.config.prefix.strip("/")]
143 )
145 # The callback for when the client receives a CONNACK response from the server.
146 def _subscribe(self):
147 topics = set()
148 for subscription in self.config.subscriptions:
149 topics.add(self.generate_topic(agent_id=subscription))
150 topics.update(set(self.config.subtopics))
151 for topic in topics:
152 self.logger.debug("Subscribing to topic %s", topic)
153 self.config.cagent.mqtt.subscribe(topic=topic)
155 def _message_callback(self, msg):
156 variable = AgentVariable.from_json(msg.payload)
157 self.logger.debug(
158 "Received variable %s from %s", variable.alias, variable.source
159 )
160 self.agent.data_broker.send_variable(variable)
162 def _send(self, payload: AgentVariable):
163 self.logger.debug(
164 "Publishing variable %s over mqtt to %s", payload["alias"], self.pubtopic
165 )
166 self.config.cagent.mqtt.publish(
167 topic=self.pubtopic, payload=self.to_json(payload)
168 )
170 def _config_update_callback(self, new_config: str):
171 """Set the new agent config and thus update all modules -
172 including this module"""
173 self.logger.info("Updating agent config. Payload: %s", new_config)
174 new_config = json.loads(new_config)
175 new_config, _, _ = set_and_get_cmap_config(
176 agent_config=new_config, cagent=self.config.cagent
177 )
178 self.agent.config = new_config
181class CustomLogger(logging.Handler):
182 """
183 custom logger to route all logs to the cloneMAP logger module
184 """
186 def __init__(self, cagent: clonemapyagent.Agent):
187 logging.Handler.__init__(self)
188 self._cagent = cagent
190 def emit(self, record):
191 msg = record.name + " | " + self.format(record)
192 if record.levelname == "ERROR":
193 self._cagent.logger.new_log("error", msg, "")
194 elif record.levelname == "CRITICAL":
195 self._cagent.logger.new_log("error", msg, "")
196 elif record.levelname == "WARNING":
197 msg = record.name + " | WARNING: " + self.format(record)
198 self._cagent.logger.new_log("error", msg, "")
199 elif record.levelname == "DEBUG":
200 self._cagent.logger.new_log("debug", msg, "")
201 else:
202 self._cagent.logger.new_log("app", msg, "")
205class CloneMAPAgent(clonemapyagent.Agent):
206 """
207 cloneMAP Agent
208 """
210 def task(self):
211 """
212 Method task is executed by the agency for each agent in a separate process
213 """
214 # get agent config and inject self object
215 agent_config = json.loads(self.custom)
216 agent_config, env_factor, log_level = set_and_get_cmap_config(
217 agent_config=agent_config, cagent=self
218 )
220 cl = CustomLogger(self)
221 logger = logging.getLogger()
222 cl.setLevel(logging.DEBUG)
223 logger.addHandler(cl)
225 env = Environment(config={"rt": True, "factor": env_factor})
226 agent = Agent(env=env, config=agent_config)
227 env.run()
228 self.loop_forever()
231if __name__ == "__main__":
232 ag = clonemapyagency.Agency(CloneMAPAgent)