Coverage for agentlib/modules/utils/agent_logger.py: 88%

99 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-30 13:00 +0000

1"""This module contains a custom Module to log 

2all variables inside an agent's data_broker.""" 

3 

4import collections 

5import json 

6import logging 

7import os 

8import time 

9from ast import literal_eval 

10from pathlib import Path 

11from typing import Union, Optional 

12 

13import pandas as pd 

14from pydantic import field_validator, Field 

15from pydantic_core.core_schema import FieldValidationInfo 

16 

17from agentlib import AgentVariable 

18from agentlib.core import BaseModule, Agent, BaseModuleConfig 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class AgentLoggerConfig(BaseModuleConfig): 

24 """Define parameters for the AgentLogger""" 

25 

26 t_sample: Union[float, int] = Field( 

27 title="t_sample", 

28 default=300, 

29 description="The log is saved every other t_sample seconds.", 

30 ) 

31 values_only: bool = Field( 

32 title="values_only", 

33 default=True, 

34 description="If True, only the values are logged. Else, all" 

35 "fields in the AgentVariable are logged.", 

36 ) 

37 clean_up: bool = Field( 

38 title="clean_up", 

39 default=True, 

40 description="If True, file is deleted once load_log is called.", 

41 ) 

42 overwrite_log: bool = Field( 

43 title="Overwrite file", 

44 default=False, 

45 description="If true, old logs are auto deleted when a new log should be written with that name.", 

46 ) 

47 filename: Optional[str] = Field( 

48 title="filename", 

49 default=None, 

50 description="The filename where the log is stored. If None, will use 'agent_logs/{agent_id}_log.json'", 

51 ) 

52 

53 @field_validator("filename") 

54 @classmethod 

55 def check_existence_of_file(cls, filename, info: FieldValidationInfo): 

56 """Checks whether the file already exists.""" 

57 # pylint: disable=no-self-argument,no-self-use 

58 

59 # Skip check for None, as it will be replaced in __init__ 

60 if filename is None: 

61 return filename 

62 

63 file_path = Path(filename) 

64 if file_path.exists(): 

65 # remove result file, so a new one can be created 

66 if info.data["overwrite_log"]: 

67 file_path.unlink() 

68 return filename 

69 raise FileExistsError( 

70 f"Given filename at {filename} " 

71 f"already exists. We won't overwrite it automatically. " 

72 f"You can use the key word 'overwrite_log' to " 

73 f"activate automatic overwrite." 

74 ) 

75 # Create path in case it does not exist 

76 file_path.parent.mkdir(parents=True, exist_ok=True) 

77 return filename 

78 

79 

80class AgentLogger(BaseModule): 

81 """ 

82 A custom logger for Agents to write variables 

83 which are updated in data_broker into a file. 

84 """ 

85 

86 config: AgentLoggerConfig 

87 

88 def __init__(self, *, config: dict, agent: Agent): 

89 super().__init__(config=config, agent=agent) 

90 

91 # If filename is None, create a custom one using the agent ID 

92 if self.config.filename is None: 

93 # Use agent ID to create a default filename 

94 logs_dir = Path("agent_logs") 

95 logs_dir.mkdir(exist_ok=True) 

96 self._filename = str(logs_dir / f"{self.agent.id}.jsonl") 

97 

98 # Handle file exists case based on overwrite_log setting 

99 if Path(self._filename).exists() and not self.config.overwrite_log: 

100 # Generate a unique filename by appending a timestamp 

101 timestamp = int(time.time()) 

102 self._filename = str(logs_dir / f"{self.agent.id}_{timestamp}.jsonl") 

103 else: 

104 self._filename = self.config.filename 

105 

106 self._variables_to_log = {} 

107 if not self.env.config.rt and self.config.t_sample < 60: 

108 self.logger.warning( 

109 "Sampling time of agent_logger %s is very low %s. This can hinder " 

110 "performance.", 

111 self.id, 

112 self.config.t_sample, 

113 ) 

114 

115 @property 

116 def filename(self): 

117 """Return the filename where to log.""" 

118 return self._filename 

119 

120 def process(self): 

121 """Calls the logger every other t_sample 

122 is used.""" 

123 while True: 

124 self._log() 

125 yield self.env.timeout(self.config.t_sample) 

126 

127 def register_callbacks(self): 

128 """Callbacks trigger the log_cache function""" 

129 callback = ( 

130 self._callback_values if self.config.values_only else self._callback_full 

131 ) 

132 self.agent.data_broker.register_callback( 

133 alias=None, source=None, callback=callback 

134 ) 

135 

136 def _callback_values(self, variable: AgentVariable): 

137 """Save variable values to log later.""" 

138 if not isinstance(variable.value, (float, int, str)): 

139 return 

140 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

141 # we merge alias and source tuple into a string so we can .json it 

142 current_time[str((variable.alias, str(variable.source)))] = variable.value 

143 

144 def _callback_full(self, variable: AgentVariable): 

145 """Save full variable to log later.""" 

146 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

147 current_time[str((variable.alias, str(variable.source)))] = variable.dict() 

148 

149 def _log(self): 

150 """Writes the currently in memory saved values to file""" 

151 _variables_to_log = self._variables_to_log 

152 self._variables_to_log = {} 

153 with open(self.filename, "a") as file: 

154 json.dump(_variables_to_log, file) 

155 file.write("\n") 

156 

157 @classmethod 

158 def load_from_file( 

159 cls, filename: str, values_only: bool = True, merge_sources: bool = True 

160 ) -> pd.DataFrame: 

161 """Loads the log file and consolidates it as a pandas DataFrame. 

162 

163 Args: 

164 filename: The file to load 

165 values_only: If true, loads a file that only has values saved (default True) 

166 merge_sources: When there are variables with the same alias from multiple 

167 sources, they are saved in different columns. For backwards 

168 compatibility, they are merged into a single column. However, if you 

169 specify False for this parameter, you can view them separately, 

170 resulting in a multi-indexed return column index 

171 

172 """ 

173 chunks = [] 

174 with open(filename, "r") as file: 

175 for data_line in file.readlines(): 

176 chunks.append(json.loads(data_line)) 

177 full_dict = collections.ChainMap(*chunks) 

178 df = pd.DataFrame.from_dict(full_dict, orient="index") 

179 df.index = df.index.astype(float) 

180 columns = (literal_eval(column) for column in df.columns) 

181 df.columns = pd.MultiIndex.from_tuples(columns) 

182 

183 if not values_only: 

184 

185 def _load_agent_variable(var): 

186 try: 

187 return AgentVariable.validate_data(var) 

188 except TypeError: 

189 pass 

190 

191 df = df.applymap(_load_agent_variable) 

192 

193 if merge_sources: 

194 df = df.droplevel(1, axis=1) 

195 df = df.loc[:, ~df.columns.duplicated(keep="first")] 

196 return df.sort_index() 

197 

198 def get_results(self) -> pd.DataFrame: 

199 """Load the own filename""" 

200 return self.load_from_file( 

201 filename=self.filename, values_only=self.config.values_only 

202 ) 

203 

204 def cleanup_results(self): 

205 """Deletes the log if wanted.""" 

206 if self.config.clean_up: 

207 try: 

208 os.remove(self.filename) 

209 except OSError: 

210 self.logger.error( 

211 "Could not delete filename %s. Please delete it yourself.", 

212 self.filename, 

213 ) 

214 

215 def terminate(self): 

216 # when terminating, we log one last time, since otherwise the data since the 

217 # last log interval is lost 

218 self._log()