Coverage for agentlib/modules/utils/agent_logger.py: 88%
99 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-30 13:00 +0000
1"""This module contains a custom Module to log
2all variables inside an agent's data_broker."""
4import collections
5import json
6import logging
7import os
8import time
9from ast import literal_eval
10from pathlib import Path
11from typing import Union, Optional
13import pandas as pd
14from pydantic import field_validator, Field
15from pydantic_core.core_schema import FieldValidationInfo
17from agentlib import AgentVariable
18from agentlib.core import BaseModule, Agent, BaseModuleConfig
20logger = logging.getLogger(__name__)
23class AgentLoggerConfig(BaseModuleConfig):
24 """Define parameters for the AgentLogger"""
26 t_sample: Union[float, int] = Field(
27 title="t_sample",
28 default=300,
29 description="The log is saved every other t_sample seconds.",
30 )
31 values_only: bool = Field(
32 title="values_only",
33 default=True,
34 description="If True, only the values are logged. Else, all"
35 "fields in the AgentVariable are logged.",
36 )
37 clean_up: bool = Field(
38 title="clean_up",
39 default=True,
40 description="If True, file is deleted once load_log is called.",
41 )
42 overwrite_log: bool = Field(
43 title="Overwrite file",
44 default=False,
45 description="If true, old logs are auto deleted when a new log should be written with that name.",
46 )
47 filename: Optional[str] = Field(
48 title="filename",
49 default=None,
50 description="The filename where the log is stored. If None, will use 'agent_logs/{agent_id}_log.json'",
51 )
53 @field_validator("filename")
54 @classmethod
55 def check_existence_of_file(cls, filename, info: FieldValidationInfo):
56 """Checks whether the file already exists."""
57 # pylint: disable=no-self-argument,no-self-use
59 # Skip check for None, as it will be replaced in __init__
60 if filename is None:
61 return filename
63 file_path = Path(filename)
64 if file_path.exists():
65 # remove result file, so a new one can be created
66 if info.data["overwrite_log"]:
67 file_path.unlink()
68 return filename
69 raise FileExistsError(
70 f"Given filename at {filename} "
71 f"already exists. We won't overwrite it automatically. "
72 f"You can use the key word 'overwrite_log' to "
73 f"activate automatic overwrite."
74 )
75 # Create path in case it does not exist
76 file_path.parent.mkdir(parents=True, exist_ok=True)
77 return filename
80class AgentLogger(BaseModule):
81 """
82 A custom logger for Agents to write variables
83 which are updated in data_broker into a file.
84 """
86 config: AgentLoggerConfig
88 def __init__(self, *, config: dict, agent: Agent):
89 super().__init__(config=config, agent=agent)
91 # If filename is None, create a custom one using the agent ID
92 if self.config.filename is None:
93 # Use agent ID to create a default filename
94 logs_dir = Path("agent_logs")
95 logs_dir.mkdir(exist_ok=True)
96 self._filename = str(logs_dir / f"{self.agent.id}.jsonl")
98 # Handle file exists case based on overwrite_log setting
99 if Path(self._filename).exists() and not self.config.overwrite_log:
100 # Generate a unique filename by appending a timestamp
101 timestamp = int(time.time())
102 self._filename = str(logs_dir / f"{self.agent.id}_{timestamp}.jsonl")
103 else:
104 self._filename = self.config.filename
106 self._variables_to_log = {}
107 if not self.env.config.rt and self.config.t_sample < 60:
108 self.logger.warning(
109 "Sampling time of agent_logger %s is very low %s. This can hinder "
110 "performance.",
111 self.id,
112 self.config.t_sample,
113 )
115 @property
116 def filename(self):
117 """Return the filename where to log."""
118 return self._filename
120 def process(self):
121 """Calls the logger every other t_sample
122 is used."""
123 while True:
124 self._log()
125 yield self.env.timeout(self.config.t_sample)
127 def register_callbacks(self):
128 """Callbacks trigger the log_cache function"""
129 callback = (
130 self._callback_values if self.config.values_only else self._callback_full
131 )
132 self.agent.data_broker.register_callback(
133 alias=None, source=None, callback=callback
134 )
136 def _callback_values(self, variable: AgentVariable):
137 """Save variable values to log later."""
138 if not isinstance(variable.value, (float, int, str)):
139 return
140 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
141 # we merge alias and source tuple into a string so we can .json it
142 current_time[str((variable.alias, str(variable.source)))] = variable.value
144 def _callback_full(self, variable: AgentVariable):
145 """Save full variable to log later."""
146 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
147 current_time[str((variable.alias, str(variable.source)))] = variable.dict()
149 def _log(self):
150 """Writes the currently in memory saved values to file"""
151 _variables_to_log = self._variables_to_log
152 self._variables_to_log = {}
153 with open(self.filename, "a") as file:
154 json.dump(_variables_to_log, file)
155 file.write("\n")
157 @classmethod
158 def load_from_file(
159 cls, filename: str, values_only: bool = True, merge_sources: bool = True
160 ) -> pd.DataFrame:
161 """Loads the log file and consolidates it as a pandas DataFrame.
163 Args:
164 filename: The file to load
165 values_only: If true, loads a file that only has values saved (default True)
166 merge_sources: When there are variables with the same alias from multiple
167 sources, they are saved in different columns. For backwards
168 compatibility, they are merged into a single column. However, if you
169 specify False for this parameter, you can view them separately,
170 resulting in a multi-indexed return column index
172 """
173 chunks = []
174 with open(filename, "r") as file:
175 for data_line in file.readlines():
176 chunks.append(json.loads(data_line))
177 full_dict = collections.ChainMap(*chunks)
178 df = pd.DataFrame.from_dict(full_dict, orient="index")
179 df.index = df.index.astype(float)
180 columns = (literal_eval(column) for column in df.columns)
181 df.columns = pd.MultiIndex.from_tuples(columns)
183 if not values_only:
185 def _load_agent_variable(var):
186 try:
187 return AgentVariable.validate_data(var)
188 except TypeError:
189 pass
191 df = df.applymap(_load_agent_variable)
193 if merge_sources:
194 df = df.droplevel(1, axis=1)
195 df = df.loc[:, ~df.columns.duplicated(keep="first")]
196 return df.sort_index()
198 def get_results(self) -> pd.DataFrame:
199 """Load the own filename"""
200 return self.load_from_file(
201 filename=self.filename, values_only=self.config.values_only
202 )
204 def cleanup_results(self):
205 """Deletes the log if wanted."""
206 if self.config.clean_up:
207 try:
208 os.remove(self.filename)
209 except OSError:
210 self.logger.error(
211 "Could not delete filename %s. Please delete it yourself.",
212 self.filename,
213 )
215 def terminate(self):
216 # when terminating, we log one last time, since otherwise the data since the
217 # last log interval is lost
218 self._log()