Coverage for agentlib/utils/broker.py: 80%
25 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-07 16:27 +0000
1"""
2Module containing a basic Broker which
3may be inherited by specialized classes.
4"""
6import threading
9class Singleton(type):
10 """Global singleton to ensure only one broker exists"""
12 _instances = {}
14 def __call__(cls, *args, **kwargs):
15 if cls not in cls._instances:
16 cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
17 return cls._instances[cls]
20class Broker(metaclass=Singleton):
21 """Base Broker class"""
23 def __init__(self):
24 self.lock = threading.Lock()
25 self._clients = set()
27 def register_client(self, client):
28 """Append the given client to the list
29 of clients."""
30 with self.lock:
31 self._clients.add(client)
33 def delete_client(self, client):
34 """Delete the given client from the list of clients"""
35 with self.lock:
36 try:
37 self._clients.remove(client)
38 except KeyError:
39 pass
41 def delete_all_clients(self):
42 """Delete all clients from the list of clients"""
43 with self.lock:
44 for client in list(self._clients):
45 try:
46 self._clients.remove(client)
47 except KeyError:
48 pass
50 def __repr__(self):
51 """Overwrite build-in function."""
52 return f"{self.__class__.__name__} with registered agents: {self._clients}"