Coverage for agentlib/modules/utils/agent_logger.py: 92%

108 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-30 13:39 +0000

1"""This module contains a custom Module to log 

2all variables inside an agent's data_broker.""" 

3 

4import collections 

5import json 

6import logging 

7import os 

8import time 

9from ast import literal_eval 

10from pathlib import Path 

11from typing import Union, Optional 

12 

13import pandas as pd 

14from pydantic import field_validator, Field 

15from pydantic_core.core_schema import FieldValidationInfo 

16 

17from agentlib import AgentVariable 

18from agentlib.core import BaseModule, Agent, BaseModuleConfig 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class AgentLoggerConfig(BaseModuleConfig): 

24 """Define parameters for the AgentLogger""" 

25 

26 t_sample: Union[float, int] = Field( 

27 title="t_sample", 

28 default=300, 

29 description="The log is saved every other t_sample seconds.", 

30 ) 

31 values_only: bool = Field( 

32 title="values_only", 

33 default=True, 

34 description="If True, only the values are logged. Else, all" 

35 "fields in the AgentVariable are logged.", 

36 ) 

37 clean_up: bool = Field( 

38 title="clean_up", 

39 default=True, 

40 description="If True, file is deleted once load_log is called.", 

41 ) 

42 overwrite_log: bool = Field( 

43 title="Overwrite file", 

44 default=False, 

45 description="If true, old logs are auto deleted when a new log should be written with that name.", 

46 ) 

47 filename: Optional[str] = Field( 

48 title="filename", 

49 default=None, 

50 description="The filename where the log is stored. If None, will use 'agent_logs/{agent_id}_log.json'", 

51 ) 

52 

53 @field_validator("filename") 

54 @classmethod 

55 def check_existence_of_file(cls, filename, info: FieldValidationInfo): 

56 """Checks whether the file already exists.""" 

57 # pylint: disable=no-self-argument,no-self-use 

58 

59 # Skip check for None, as it will be replaced in __init__ 

60 if filename is None: 

61 return filename 

62 

63 file_path = Path(filename) 

64 if file_path.exists(): 

65 # remove result file, so a new one can be created 

66 if info.data["overwrite_log"]: 

67 file_path.unlink() 

68 return filename 

69 raise FileExistsError( 

70 f"Given filename at {filename} " 

71 f"already exists. We won't overwrite it automatically. " 

72 f"You can use the key word 'overwrite_log' to " 

73 f"activate automatic overwrite." 

74 ) 

75 # Create path in case it does not exist 

76 file_path.parent.mkdir(parents=True, exist_ok=True) 

77 return filename 

78 

79 

80class AgentLogger(BaseModule): 

81 """ 

82 A custom logger for Agents to write variables 

83 which are updated in data_broker into a file. 

84 """ 

85 

86 config: AgentLoggerConfig 

87 

88 def __init__(self, *, config: dict, agent: Agent): 

89 super().__init__(config=config, agent=agent) 

90 

91 # If filename is None, create a custom one using the agent ID 

92 if self.config.filename is None: 

93 # Use agent ID to create a default filename 

94 logs_dir = Path("agent_logs") 

95 logs_dir.mkdir(exist_ok=True) 

96 self._filename = str(logs_dir / f"{self.agent.id}.jsonl") 

97 

98 # Handle file exists case based on overwrite_log setting 

99 if Path(self._filename).exists() and not self.config.overwrite_log: 

100 # Generate a unique filename by appending a timestamp 

101 timestamp = int(time.time()) 

102 self._filename = str(logs_dir / f"{self.agent.id}_{timestamp}.jsonl") 

103 else: 

104 self._filename = self.config.filename 

105 

106 self._variables_to_log = {} 

107 if not self.env.config.rt and self.config.t_sample < 60: 

108 self.logger.warning( 

109 "Sampling time of agent_logger %s is very low %s. This can hinder " 

110 "performance.", 

111 self.id, 

112 self.config.t_sample, 

113 ) 

114 

115 @property 

116 def filename(self): 

117 """Return the filename where to log.""" 

118 return self._filename 

119 

120 def process(self): 

121 """Calls the logger every other t_sample 

122 is used.""" 

123 while True: 

124 self._log() 

125 yield self.env.timeout(self.config.t_sample) 

126 

127 def register_callbacks(self): 

128 """Callbacks trigger the log_cache function""" 

129 callback = ( 

130 self._callback_values if self.config.values_only else self._callback_full 

131 ) 

132 self.agent.data_broker.register_callback( 

133 alias=None, source=None, callback=callback 

134 ) 

135 

136 def _callback_values(self, variable: AgentVariable): 

137 """Save variable values to log later.""" 

138 if not isinstance(variable.value, (float, int, str)): 

139 if isinstance(variable.value, pd.Series): 

140 return 

141 else: 

142 try: 

143 variable.value = float(variable.value) 

144 except (TypeError, ValueError): 

145 return 

146 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

147 # we merge alias and source tuple into a string so we can .json it 

148 current_time[str((variable.alias, str(variable.source)))] = variable.value 

149 

150 def _callback_full(self, variable: AgentVariable): 

151 """Save full variable to log later.""" 

152 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

153 current_time[str((variable.alias, str(variable.source)))] = variable.dict() 

154 

155 def _log(self): 

156 """Writes the currently in memory saved values to file""" 

157 _variables_to_log = self._variables_to_log 

158 self._variables_to_log = {} 

159 with open(self.filename, "a") as file: 

160 json.dump(_variables_to_log, file) 

161 file.write("\n") 

162 

163 @classmethod 

164 def load_from_file( 

165 cls, filename: str, values_only: bool = True, merge_sources: bool = True 

166 ) -> pd.DataFrame: 

167 """Loads the log file and consolidates it as a pandas DataFrame. 

168 

169 Args: 

170 filename: The file to load 

171 values_only: If true, loads a file that only has values saved (default True) 

172 merge_sources: When there are variables with the same alias from multiple 

173 sources, they are saved in different columns. For backwards 

174 compatibility, they are merged into a single column. However, if you 

175 specify False for this parameter, you can view them separately, 

176 resulting in a multi-indexed return column index 

177 

178 """ 

179 data = [] 

180 with open(filename, "r", encoding="utf-8") as file: 

181 for line in file: 

182 if not line.strip(): 

183 continue 

184 chunk = json.loads(line) 

185 for timestamp, values in chunk.items(): 

186 row = {"time": float(timestamp)} 

187 row.update(values) 

188 data.append(row) 

189 

190 if not data: 

191 return pd.DataFrame() 

192 

193 df = pd.DataFrame(data).set_index("time") 

194 df.columns = pd.MultiIndex.from_tuples( 

195 literal_eval(c) for c in df.columns 

196 ) 

197 

198 if not values_only: 

199 

200 def _load_agent_variable(var): 

201 try: 

202 return AgentVariable.validate_data(var) 

203 except TypeError: 

204 pass 

205 

206 df = df.applymap(_load_agent_variable) 

207 

208 if merge_sources: 

209 df = df.droplevel(1, axis=1) 

210 df = df.loc[:, ~df.columns.duplicated(keep="first")] 

211 return df.sort_index() 

212 

213 def get_results(self) -> pd.DataFrame: 

214 """Load the own filename""" 

215 return self.load_from_file( 

216 filename=self.filename, values_only=self.config.values_only 

217 ) 

218 

219 def cleanup_results(self): 

220 """Deletes the log if wanted.""" 

221 if self.config.clean_up: 

222 try: 

223 os.remove(self.filename) 

224 except OSError: 

225 self.logger.error( 

226 "Could not delete filename %s. Please delete it yourself.", 

227 self.filename, 

228 ) 

229 

230 def terminate(self): 

231 # when terminating, we log one last time, since otherwise the data since the 

232 # last log interval is lost 

233 self._log()