Coverage for agentlib/modules/utils/agent_logger.py: 96%
89 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""This module contains a custom Module to log
2all variables inside an agent's data_broker."""
4import collections
5import json
6import logging
7import os
8from ast import literal_eval
9from typing import Union
11import pandas as pd
12from pydantic import field_validator, Field
13from pydantic_core.core_schema import FieldValidationInfo
15from agentlib import AgentVariable
16from agentlib.core import BaseModule, Agent, BaseModuleConfig
18logger = logging.getLogger(__name__)
21class AgentLoggerConfig(BaseModuleConfig):
22 """Define parameters for the AgentLogger"""
24 t_sample: Union[float, int] = Field(
25 title="t_sample",
26 default=300,
27 description="The log is saved every other t_sample seconds.",
28 )
29 values_only: bool = Field(
30 title="values_only",
31 default=True,
32 description="If True, only the values are logged. Else, all"
33 "fields in the AgentVariable are logged.",
34 )
35 clean_up: bool = Field(
36 title="clean_up",
37 default=True,
38 description="If True, file is deleted once load_log is called.",
39 )
40 overwrite_log: bool = Field(
41 title="Overwrite file",
42 default=False,
43 description="If true, old logs are auto deleted when a new log should be written with that name."
44 )
45 filename: str = Field(
46 title="filename",
47 description="The filename where the log is stored.",
48 )
50 @field_validator("filename")
51 @classmethod
52 def check_existence_of_file(cls, filename, info: FieldValidationInfo):
53 """Checks whether the file already exists."""
54 # pylint: disable=no-self-argument,no-self-use
55 if os.path.isfile(filename):
56 # remove result file, so a new one can be created
57 if info.data["overwrite_log"]:
58 os.remove(filename)
59 return filename
60 raise FileExistsError(
61 f"Given filename at {filename} "
62 f"already exists. We won't overwrite it automatically. "
63 f"You can use the key word 'overwrite_log' to "
64 f"activate automatic overwrite."
65 )
66 # Create path in case it does not exist
67 fpath = os.path.dirname(filename)
68 if fpath:
69 os.makedirs(fpath, exist_ok=True)
70 return filename
73class AgentLogger(BaseModule):
74 """
75 A custom logger for Agents to write variables
76 which are updated in data_broker into a file.
77 """
79 config: AgentLoggerConfig
81 def __init__(self, *, config: dict, agent: Agent):
82 """Overwrite init to enable a custom default filename
83 which uses the agent_id."""
84 super().__init__(config=config, agent=agent)
85 self._filename = self.config.filename
86 self._variables_to_log = {}
87 if not self.env.config.rt and self.config.t_sample < 60:
88 self.logger.warning(
89 "Sampling time of agent_logger %s is very low %s. This can hinder "
90 "performance.",
91 self.id,
92 self.config.t_sample,
93 )
95 @property
96 def filename(self):
97 """Return the filename where to log."""
98 return self._filename
100 def process(self):
101 """Calls the logger every other t_sample
102 is used."""
103 while True:
104 self._log()
105 yield self.env.timeout(self.config.t_sample)
107 def register_callbacks(self):
108 """Callbacks trigger the log_cache function"""
109 callback = (
110 self._callback_values if self.config.values_only else self._callback_full
111 )
112 self.agent.data_broker.register_callback(
113 alias=None, source=None, callback=callback
114 )
116 def _callback_values(self, variable: AgentVariable):
117 """Save variable values to log later."""
118 if not isinstance(variable.value, (float, int, str)):
119 return
120 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
121 # we merge alias and source tuple into a string so we can .json it
122 current_time[str((variable.alias, str(variable.source)))] = variable.value
124 def _callback_full(self, variable: AgentVariable):
125 """Save full variable to log later."""
126 current_time = self._variables_to_log.setdefault(str(self.env.time), {})
127 current_time[str((variable.alias, str(variable.source)))] = variable.dict()
129 def _log(self):
130 """Writes the currently in memory saved values to file"""
131 _variables_to_log = self._variables_to_log
132 self._variables_to_log = {}
133 with open(self.filename, "a") as file:
134 json.dump(_variables_to_log, file)
135 file.write("\n")
137 @classmethod
138 def load_from_file(
139 cls, filename: str, values_only: bool = True, merge_sources: bool = True
140 ) -> pd.DataFrame:
141 """Loads the log file and consolidates it as a pandas DataFrame.
143 Args:
144 filename: The file to load
145 values_only: If true, loads a file that only has values saved (default True)
146 merge_sources: When there are variables with the same alias from multiple
147 sources, they are saved in different columns. For backwards
148 compatibility, they are merged into a single column. However, if you
149 specify False for this parameter, you can view them separately,
150 resulting in a multi-indexed return column index
152 """
153 chunks = []
154 with open(filename, "r") as file:
155 for data_line in file.readlines():
156 chunks.append(json.loads(data_line))
157 full_dict = collections.ChainMap(*chunks)
158 df = pd.DataFrame.from_dict(full_dict, orient="index")
159 df.index = df.index.astype(float)
160 columns = (literal_eval(column) for column in df.columns)
161 df.columns = pd.MultiIndex.from_tuples(columns)
163 if not values_only:
165 def _load_agent_variable(var):
166 try:
167 return AgentVariable.validate_data(var)
168 except TypeError:
169 pass
171 df = df.applymap(_load_agent_variable)
173 if merge_sources:
174 df = df.droplevel(1, axis=1)
175 df = df.loc[:, ~df.columns.duplicated(keep="first")]
176 return df.sort_index()
178 def get_results(self) -> pd.DataFrame:
179 """Load the own filename"""
180 return self.load_from_file(
181 filename=self.filename, values_only=self.config.values_only
182 )
184 def cleanup_results(self):
185 """Deletes the log if wanted."""
186 if self.config.clean_up:
187 try:
188 os.remove(self.filename)
189 except OSError:
190 self.logger.error(
191 "Could not delete filename %s. Please delete it yourself.",
192 self.filename,
193 )
195 def terminate(self):
196 # when terminating, we log one last time, since otherwise the data since the
197 # last log interval is lost
198 self._log()