Coverage for agentlib/utils/broker.py: 80%

25 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-07 16:27 +0000

1""" 

2Module containing a basic Broker which 

3may be inherited by specialized classes. 

4""" 

5 

6import threading 

7 

8 

9class Singleton(type): 

10 """Global singleton to ensure only one broker exists""" 

11 

12 _instances = {} 

13 

14 def __call__(cls, *args, **kwargs): 

15 if cls not in cls._instances: 

16 cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) 

17 return cls._instances[cls] 

18 

19 

20class Broker(metaclass=Singleton): 

21 """Base Broker class""" 

22 

23 def __init__(self): 

24 self.lock = threading.Lock() 

25 self._clients = set() 

26 

27 def register_client(self, client): 

28 """Append the given client to the list 

29 of clients.""" 

30 with self.lock: 

31 self._clients.add(client) 

32 

33 def delete_client(self, client): 

34 """Delete the given client from the list of clients""" 

35 with self.lock: 

36 try: 

37 self._clients.remove(client) 

38 except KeyError: 

39 pass 

40 

41 def delete_all_clients(self): 

42 """Delete all clients from the list of clients""" 

43 with self.lock: 

44 for client in list(self._clients): 

45 try: 

46 self._clients.remove(client) 

47 except KeyError: 

48 pass 

49 

50 def __repr__(self): 

51 """Overwrite build-in function.""" 

52 return f"{self.__class__.__name__} with registered agents: {self._clients}"