Coverage for agentlib/modules/utils/agent_logger.py: 92%
108 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-30 13:39 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-30 13:39 +0000
1"""This module contains a custom Module to log
2all variables inside an agent's data_broker."""
4import collections
5import json
6import logging
7import os
8import time
9from ast import literal_eval
10from pathlib import Path
11from typing import Union, Optional
13import pandas as pd
14from pydantic import field_validator, Field
15from pydantic_core.core_schema import FieldValidationInfo
17from agentlib import AgentVariable
18from agentlib.core import BaseModule, Agent, BaseModuleConfig
20logger = logging.getLogger(__name__)
23class AgentLoggerConfig(BaseModuleConfig):
24 """Define parameters for the AgentLogger"""
26 t_sample: Union[float, int] = Field(
27 title="t_sample",
28 default=300,
29 description="The log is saved every other t_sample seconds.",
30 )
31 values_only: bool = Field(
32 title="values_only",
33 default=True,
34 description="If True, only the values are logged. Else, all"
35 "fields in the AgentVariable are logged.",
36 )
37 clean_up: bool = Field(
38 title="clean_up",
39 default=True,
40 description="If True, file is deleted once load_log is called.",
41 )
42 overwrite_log: bool = Field(
43 title="Overwrite file",
44 default=False,
45 description="If true, old logs are auto deleted when a new log should be written with that name.",
46 )
47 filename: Optional[str] = Field(
48 title="filename",
49 default=None,
50 description="The filename where the log is stored. If None, will use 'agent_logs/{agent_id}_log.json'",
51 )
53 @field_validator("filename")
54 @classmethod
55 def check_existence_of_file(cls, filename, info: FieldValidationInfo):
56 """Checks whether the file already exists."""
57 # pylint: disable=no-self-argument,no-self-use
59 # Skip check for None, as it will be replaced in __init__
60 if filename is None:
61 return filename
63 file_path = Path(filename)
64 if file_path.exists():
65 # remove result file, so a new one can be created
66 if info.data["overwrite_log"]:
67 file_path.unlink()
68 return filename
69 raise FileExistsError(
70 f"Given filename at {filename} "
71 f"already exists. We won't overwrite it automatically. "
72 f"You can use the key word 'overwrite_log' to "
73 f"activate automatic overwrite."
74 )
75 # Create path in case it does not exist
76 file_path.parent.mkdir(parents=True, exist_ok=True)
77 return filename
80class AgentLogger(BaseModule):
81 """
82 A custom logger for Agents to write variables
83 which are updated in data_broker into a file.
84 """
86 config: AgentLoggerConfig
88 def __init__(self, *, config: dict, agent: Agent):
89 super().__init__(config=config, agent=agent)
91 # If filename is None, create a custom one using the agent ID
92 if self.config.filename is None:
93 # Use agent ID to create a default filename
94 logs_dir = Path("agent_logs")
95 logs_dir.mkdir(exist_ok=True)
96 self._filename = str(logs_dir / f"{self.agent.id}.jsonl")
98 # Handle file exists case based on overwrite_log setting
99 if Path(self._filename).exists() and not self.config.overwrite_log:
100 # Generate a unique filename by appending a timestamp
101 timestamp = int(time.time())
102 self._filename = str(logs_dir / f"{self.agent.id}_{timestamp}.jsonl")
103 else:
104 self._filename = self.config.filename
106 self._variables_to_log = {}
107 if not self.env.config.rt and self.config.t_sample < 60:
108 self.logger.warning(
109 "Sampling time of agent_logger %s is very low %s. This can hinder "
110 "performance.",
111 self.id,
112 self.config.t_sample,
113 )
115 @property
116 def filename(self):
117 """Return the filename where to log."""
118 return self._filename
120 def process(self):
121 """Calls the logger every other t_sample
122 is used."""
123 while True:
124 self._log()
125 yield self.env.timeout(self.config.t_sample)
127 def register_callbacks(self):
128 """Callbacks trigger the log_cache function"""
129 callback = (
130 self._callback_values if self.config.values_only else self._callback_full
131 )
132 self.agent.data_broker.register_callback(
133 alias=None, source=None, callback=callback
134 )
136 def _callback_values(self, variable: AgentVariable):
137 """Save variable values to log later."""
138 if not isinstance(variable.value, (float, int, str)):
139 if isinstance(variable.value, pd.Series):
140 return
141 else:
142 try:
143 variable.value = float(variable.value)
144 except (TypeError, ValueError):
145 return
146 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
147 # we merge alias and source tuple into a string so we can .json it
148 current_time[str((variable.alias, str(variable.source)))] = variable.value
150 def _callback_full(self, variable: AgentVariable):
151 """Save full variable to log later."""
152 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
153 current_time[str((variable.alias, str(variable.source)))] = variable.dict()
155 def _log(self):
156 """Writes the currently in memory saved values to file"""
157 _variables_to_log = self._variables_to_log
158 self._variables_to_log = {}
159 with open(self.filename, "a") as file:
160 json.dump(_variables_to_log, file)
161 file.write("\n")
163 @classmethod
164 def load_from_file(
165 cls, filename: str, values_only: bool = True, merge_sources: bool = True
166 ) -> pd.DataFrame:
167 """Loads the log file and consolidates it as a pandas DataFrame.
169 Args:
170 filename: The file to load
171 values_only: If true, loads a file that only has values saved (default True)
172 merge_sources: When there are variables with the same alias from multiple
173 sources, they are saved in different columns. For backwards
174 compatibility, they are merged into a single column. However, if you
175 specify False for this parameter, you can view them separately,
176 resulting in a multi-indexed return column index
178 """
179 data = []
180 with open(filename, "r", encoding="utf-8") as file:
181 for line in file:
182 if not line.strip():
183 continue
184 chunk = json.loads(line)
185 for timestamp, values in chunk.items():
186 row = {"time": float(timestamp)}
187 row.update(values)
188 data.append(row)
190 if not data:
191 return pd.DataFrame()
193 df = pd.DataFrame(data).set_index("time")
194 df.columns = pd.MultiIndex.from_tuples(
195 literal_eval(c) for c in df.columns
196 )
198 if not values_only:
200 def _load_agent_variable(var):
201 try:
202 return AgentVariable.validate_data(var)
203 except TypeError:
204 pass
206 df = df.applymap(_load_agent_variable)
208 if merge_sources:
209 df = df.droplevel(1, axis=1)
210 df = df.loc[:, ~df.columns.duplicated(keep="first")]
211 return df.sort_index()
213 def get_results(self) -> pd.DataFrame:
214 """Load the own filename"""
215 return self.load_from_file(
216 filename=self.filename, values_only=self.config.values_only
217 )
219 def cleanup_results(self):
220 """Deletes the log if wanted."""
221 if self.config.clean_up:
222 try:
223 os.remove(self.filename)
224 except OSError:
225 self.logger.error(
226 "Could not delete filename %s. Please delete it yourself.",
227 self.filename,
228 )
230 def terminate(self):
231 # when terminating, we log one last time, since otherwise the data since the
232 # last log interval is lost
233 self._log()