Coverage for agentlib/modules/utils/agent_logger.py: 96%

89 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1"""This module contains a custom Module to log 

2all variables inside an agent's data_broker.""" 

3 

4import collections 

5import json 

6import logging 

7import os 

8from ast import literal_eval 

9from typing import Union 

10 

11import pandas as pd 

12from pydantic import field_validator, Field 

13from pydantic_core.core_schema import FieldValidationInfo 

14 

15from agentlib import AgentVariable 

16from agentlib.core import BaseModule, Agent, BaseModuleConfig 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class AgentLoggerConfig(BaseModuleConfig): 

22 """Define parameters for the AgentLogger""" 

23 

24 t_sample: Union[float, int] = Field( 

25 title="t_sample", 

26 default=300, 

27 description="The log is saved every other t_sample seconds.", 

28 ) 

29 values_only: bool = Field( 

30 title="values_only", 

31 default=True, 

32 description="If True, only the values are logged. Else, all" 

33 "fields in the AgentVariable are logged.", 

34 ) 

35 clean_up: bool = Field( 

36 title="clean_up", 

37 default=True, 

38 description="If True, file is deleted once load_log is called.", 

39 ) 

40 overwrite_log: bool = Field( 

41 title="Overwrite file", 

42 default=False, 

43 description="If true, old logs are auto deleted when a new log should be written with that name." 

44 ) 

45 filename: str = Field( 

46 title="filename", 

47 description="The filename where the log is stored.", 

48 ) 

49 

50 @field_validator("filename") 

51 @classmethod 

52 def check_existence_of_file(cls, filename, info: FieldValidationInfo): 

53 """Checks whether the file already exists.""" 

54 # pylint: disable=no-self-argument,no-self-use 

55 if os.path.isfile(filename): 

56 # remove result file, so a new one can be created 

57 if info.data["overwrite_log"]: 

58 os.remove(filename) 

59 return filename 

60 raise FileExistsError( 

61 f"Given filename at {filename} " 

62 f"already exists. We won't overwrite it automatically. " 

63 f"You can use the key word 'overwrite_log' to " 

64 f"activate automatic overwrite." 

65 ) 

66 # Create path in case it does not exist 

67 fpath = os.path.dirname(filename) 

68 if fpath: 

69 os.makedirs(fpath, exist_ok=True) 

70 return filename 

71 

72 

73class AgentLogger(BaseModule): 

74 """ 

75 A custom logger for Agents to write variables 

76 which are updated in data_broker into a file. 

77 """ 

78 

79 config: AgentLoggerConfig 

80 

81 def __init__(self, *, config: dict, agent: Agent): 

82 """Overwrite init to enable a custom default filename 

83 which uses the agent_id.""" 

84 super().__init__(config=config, agent=agent) 

85 self._filename = self.config.filename 

86 self._variables_to_log = {} 

87 if not self.env.config.rt and self.config.t_sample < 60: 

88 self.logger.warning( 

89 "Sampling time of agent_logger %s is very low %s. This can hinder " 

90 "performance.", 

91 self.id, 

92 self.config.t_sample, 

93 ) 

94 

95 @property 

96 def filename(self): 

97 """Return the filename where to log.""" 

98 return self._filename 

99 

100 def process(self): 

101 """Calls the logger every other t_sample 

102 is used.""" 

103 while True: 

104 self._log() 

105 yield self.env.timeout(self.config.t_sample) 

106 

107 def register_callbacks(self): 

108 """Callbacks trigger the log_cache function""" 

109 callback = ( 

110 self._callback_values if self.config.values_only else self._callback_full 

111 ) 

112 self.agent.data_broker.register_callback( 

113 alias=None, source=None, callback=callback 

114 ) 

115 

116 def _callback_values(self, variable: AgentVariable): 

117 """Save variable values to log later.""" 

118 if not isinstance(variable.value, (float, int, str)): 

119 return 

120 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

121 # we merge alias and source tuple into a string so we can .json it 

122 current_time[str((variable.alias, str(variable.source)))] = variable.value 

123 

124 def _callback_full(self, variable: AgentVariable): 

125 """Save full variable to log later.""" 

126 current_time = self._variables_to_log.setdefault(str(self.env.time), {}) 

127 current_time[str((variable.alias, str(variable.source)))] = variable.dict() 

128 

129 def _log(self): 

130 """Writes the currently in memory saved values to file""" 

131 _variables_to_log = self._variables_to_log 

132 self._variables_to_log = {} 

133 with open(self.filename, "a") as file: 

134 json.dump(_variables_to_log, file) 

135 file.write("\n") 

136 

137 @classmethod 

138 def load_from_file( 

139 cls, filename: str, values_only: bool = True, merge_sources: bool = True 

140 ) -> pd.DataFrame: 

141 """Loads the log file and consolidates it as a pandas DataFrame. 

142 

143 Args: 

144 filename: The file to load 

145 values_only: If true, loads a file that only has values saved (default True) 

146 merge_sources: When there are variables with the same alias from multiple 

147 sources, they are saved in different columns. For backwards 

148 compatibility, they are merged into a single column. However, if you 

149 specify False for this parameter, you can view them separately, 

150 resulting in a multi-indexed return column index 

151 

152 """ 

153 chunks = [] 

154 with open(filename, "r") as file: 

155 for data_line in file.readlines(): 

156 chunks.append(json.loads(data_line)) 

157 full_dict = collections.ChainMap(*chunks) 

158 df = pd.DataFrame.from_dict(full_dict, orient="index") 

159 df.index = df.index.astype(float) 

160 columns = (literal_eval(column) for column in df.columns) 

161 df.columns = pd.MultiIndex.from_tuples(columns) 

162 

163 if not values_only: 

164 

165 def _load_agent_variable(var): 

166 try: 

167 return AgentVariable.validate_data(var) 

168 except TypeError: 

169 pass 

170 

171 df = df.applymap(_load_agent_variable) 

172 

173 if merge_sources: 

174 df = df.droplevel(1, axis=1) 

175 df = df.loc[:, ~df.columns.duplicated(keep="first")] 

176 return df.sort_index() 

177 

178 def get_results(self) -> pd.DataFrame: 

179 """Load the own filename""" 

180 return self.load_from_file( 

181 filename=self.filename, values_only=self.config.values_only 

182 ) 

183 

184 def cleanup_results(self): 

185 """Deletes the log if wanted.""" 

186 if self.config.clean_up: 

187 try: 

188 os.remove(self.filename) 

189 except OSError: 

190 self.logger.error( 

191 "Could not delete filename %s. Please delete it yourself.", 

192 self.filename, 

193 ) 

194 

195 def terminate(self): 

196 # when terminating, we log one last time, since otherwise the data since the 

197 # last log interval is lost 

198 self._log()