Coverage for agentlib/modules/communicator/clonemap.py: 0%

110 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2This module implements a clonemap compatible communicator 

3""" 

4 

5import json 

6import logging 

7import os 

8from functools import cached_property 

9from typing import Union, List 

10 

11from pydantic import Field, field_validator 

12 

13from agentlib.core import Agent, Environment 

14from agentlib.core.datamodels import AgentVariable 

15from agentlib.core.errors import OptionalDependencyError 

16from agentlib.modules.communicator.communicator import ( 

17 Communicator, 

18 SubscriptionCommunicatorConfig, 

19) 

20from agentlib.utils.validators import convert_to_list 

21 

22try: 

23 import clonemapy.agent as clonemapyagent 

24 import clonemapy.agency as clonemapyagency 

25except ImportError as err: 

26 raise OptionalDependencyError( 

27 dependency_install="git+https://github.com/sogno-platform/clonemapy", 

28 used_object="Module type 'clonemap'", 

29 ) from err 

30 

31 

32def set_and_get_cmap_config(agent_config: dict, cagent: clonemapyagent.Agent): 

33 """ 

34 Manipulate the given agent_config to pass the cagent into the config. 

35 Further, get the settings of log level and env_factor to 

36 start the agent correctly. 

37 

38 Args: 

39 agent_config dict: Agent configuation 

40 cagent clonemapyagent.Agent: Clonemappy Agent 

41 

42 Returns: 

43 agent_config: dict 

44 Manipulated config 

45 env_factor: float 

46 Environment factor in config 

47 log_level: str 

48 Log-level of config 

49 """ 

50 env_factor = 1 

51 _default_lvl = os.environ.get("CLONEMAP_LOG_LEVEL", "ERROR") 

52 log_level = _default_lvl 

53 found_clonemap_module = False 

54 module_types = [] 

55 for module in agent_config["modules"]: 

56 _type = module["type"] 

57 if isinstance(_type, dict): 

58 module_types.append(_type["class_name"]) 

59 continue 

60 module_types.append(_type) 

61 if module["type"] == "clonemap": 

62 module.update( 

63 { 

64 "cagent": cagent, 

65 } 

66 ) 

67 env_factor = module.get("env_factor", 1) 

68 log_level = module.get("log_level", _default_lvl) 

69 found_clonemap_module = True 

70 if not found_clonemap_module: 

71 from agentlib.core.errors import ConfigurationError 

72 

73 raise ConfigurationError( 

74 "Each agents needs a clonemap communicator " 

75 "module to be executed on clonemap. You passed the modules:" 

76 f"{' ,'.join(module_types)}" 

77 ) 

78 return agent_config, env_factor, log_level.upper() 

79 

80 

81class CloneMAPClientConfig(SubscriptionCommunicatorConfig): 

82 """ 

83 clonemap communicator settings 

84 """ 

85 

86 cagent: clonemapyagent.Agent = Field( 

87 default=None, description="Agent object of CloneMAP" 

88 ) 

89 subtopics: Union[List[str], str] = Field( 

90 default=[], description="Topics to that the agent " "subscribes" 

91 ) 

92 prefix: str = Field(default="/agentlib", description="Prefix for MQTT-Topic") 

93 env_factor: float = Field( 

94 default=1, description="Specify Environment Variable Factor" 

95 ) 

96 

97 # Add validator 

98 check_subtopics = field_validator("subtopics")(convert_to_list) 

99 

100 

101class CloneMAPClient(Communicator): 

102 """ 

103 This communicator implements the communication between agents via clonemap. 

104 """ 

105 

106 config: CloneMAPClientConfig 

107 

108 def __init__(self, config: dict, agent: Agent): 

109 Communicator.__init__(self=self, config=config, agent=agent) 

110 self._subscribe() 

111 behavior = self.config.cagent.new_mqtt_default_behavior(self._message_callback) 

112 behavior.start() 

113 behavior = self.config.cagent.new_custom_update_behavior( 

114 self._config_update_callback 

115 ) 

116 behavior.start() 

117 

118 @cached_property 

119 def pubtopic(self): 

120 """Generate the publication topic""" 

121 return self.generate_topic(agent_id=self.agent.id, subscription=False) 

122 

123 def generate_topic(self, agent_id: str, subscription: bool = True): 

124 """ 

125 Generate the topic with the given agent_id and 

126 configs prefix 

127 """ 

128 if subscription: 

129 topic = "/".join([self.prefix, agent_id, "#"]) 

130 else: 

131 topic = "/".join([self.prefix, agent_id]) 

132 topic.replace("//", "/") 

133 return topic 

134 

135 @property 

136 def prefix(self): 

137 """Custom prefix for clonemap. 

138 For MAS with id 0 and default config it's: 

139 /mas_0/agentlib 

140 """ 

141 return "/".join( 

142 ["", f"mas_{self.config.cagent.masid}", self.config.prefix.strip("/")] 

143 ) 

144 

145 # The callback for when the client receives a CONNACK response from the server. 

146 def _subscribe(self): 

147 topics = set() 

148 for subscription in self.config.subscriptions: 

149 topics.add(self.generate_topic(agent_id=subscription)) 

150 topics.update(set(self.config.subtopics)) 

151 for topic in topics: 

152 self.logger.debug("Subscribing to topic %s", topic) 

153 self.config.cagent.mqtt.subscribe(topic=topic) 

154 

155 def _message_callback(self, msg): 

156 variable = AgentVariable.from_json(msg.payload) 

157 self.logger.debug( 

158 "Received variable %s from %s", variable.alias, variable.source 

159 ) 

160 self.agent.data_broker.send_variable(variable) 

161 

162 def _send(self, payload: AgentVariable): 

163 self.logger.debug( 

164 "Publishing variable %s over mqtt to %s", payload["alias"], self.pubtopic 

165 ) 

166 self.config.cagent.mqtt.publish( 

167 topic=self.pubtopic, payload=self.to_json(payload) 

168 ) 

169 

170 def _config_update_callback(self, new_config: str): 

171 """Set the new agent config and thus update all modules - 

172 including this module""" 

173 self.logger.info("Updating agent config. Payload: %s", new_config) 

174 new_config = json.loads(new_config) 

175 new_config, _, _ = set_and_get_cmap_config( 

176 agent_config=new_config, cagent=self.config.cagent 

177 ) 

178 self.agent.config = new_config 

179 

180 

181class CustomLogger(logging.Handler): 

182 """ 

183 custom logger to route all logs to the cloneMAP logger module 

184 """ 

185 

186 def __init__(self, cagent: clonemapyagent.Agent): 

187 logging.Handler.__init__(self) 

188 self._cagent = cagent 

189 

190 def emit(self, record): 

191 msg = record.name + " | " + self.format(record) 

192 if record.levelname == "ERROR": 

193 self._cagent.logger.new_log("error", msg, "") 

194 elif record.levelname == "CRITICAL": 

195 self._cagent.logger.new_log("error", msg, "") 

196 elif record.levelname == "WARNING": 

197 msg = record.name + " | WARNING: " + self.format(record) 

198 self._cagent.logger.new_log("error", msg, "") 

199 elif record.levelname == "DEBUG": 

200 self._cagent.logger.new_log("debug", msg, "") 

201 else: 

202 self._cagent.logger.new_log("app", msg, "") 

203 

204 

205class CloneMAPAgent(clonemapyagent.Agent): 

206 """ 

207 cloneMAP Agent 

208 """ 

209 

210 def task(self): 

211 """ 

212 Method task is executed by the agency for each agent in a separate process 

213 """ 

214 # get agent config and inject self object 

215 agent_config = json.loads(self.custom) 

216 agent_config, env_factor, log_level = set_and_get_cmap_config( 

217 agent_config=agent_config, cagent=self 

218 ) 

219 

220 cl = CustomLogger(self) 

221 logger = logging.getLogger() 

222 cl.setLevel(logging.DEBUG) 

223 logger.addHandler(cl) 

224 

225 env = Environment(config={"rt": True, "factor": env_factor}) 

226 agent = Agent(env=env, config=agent_config) 

227 env.run() 

228 self.loop_forever() 

229 

230 

231if __name__ == "__main__": 

232 ag = clonemapyagency.Agency(CloneMAPAgent)