Coverage for filip/utils/cleanup.py: 93%
86 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
1"""
2Functions to clean up a tenant within a fiware based platform.
3"""
5import warnings
6from functools import wraps
8from pydantic import AnyHttpUrl, AnyUrl
9from requests import RequestException
10from typing import Callable, List, Union
11from filip.models import FiwareHeader, FiwareLDHeader
12from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient
13from filip.clients.ngsi_ld.cb import ContextBrokerLDClient
14from filip.models.ngsi_ld.context import ActionTypeLD
15import logging
17logger = logging.getLogger(__name__)
18logger.setLevel(logging.DEBUG)
21def clear_context_broker_ld(
22 url: str = None,
23 fiware_ld_header: FiwareLDHeader = None,
24 cb_ld_client: ContextBrokerLDClient = None,
25):
26 """
27 Function deletes all entities and subscriptions for a tenant in an LD context broker.
29 Args:
30 url: Url of the context broker LD
31 fiware_ld_header: header of the NGSI-LD tenant
32 cb_ld_client: NGSI-LD context broker client object
34 Returns:
36 """
37 assert url or cb_ld_client, "Either url or client object must be given"
38 # create client
39 if cb_ld_client is None:
40 client = ContextBrokerLDClient(url=url, fiware_header=fiware_ld_header)
41 else:
42 client = cb_ld_client
43 # clean entities iteratively
44 try:
45 entity_list = True
46 while entity_list:
47 entity_list = client.get_entity_list(limit=100)
48 if entity_list:
49 client.entity_batch_operation(
50 action_type=ActionTypeLD.DELETE, entities=entity_list
51 )
52 except RequestException as e:
53 logger.warning("Could not clean entities completely")
54 raise
56 # clean subscriptions
57 try:
58 sub_list = cb_ld_client.get_subscription_list()
59 for sub in sub_list:
60 cb_ld_client.delete_subscription(sub.id)
61 except RequestException as e:
62 logger.warning("Could not clean subscriptions completely")
63 raise
66def clear_context_broker(
67 url: str = None,
68 fiware_header: FiwareHeader = None,
69 clear_registrations: bool = False,
70 cb_client: ContextBrokerClient = None,
71):
72 """
73 Function deletes all entities, registrations and subscriptions for a
74 given fiware header. To use TLS connection you need to provide the cb_client parameter
75 as an argument with the Session object including the certificate and private key.
77 Note:
78 Always clear the devices first because the IoT-Agent will otherwise
79 through errors if it cannot find its registration anymore.
81 Args:
82 url: Url of the context broker service
83 fiware_header: header of the tenant
84 cb_client: enables TLS communication if created with Session object, only needed
85 for self-signed certificates
86 clear_registrations: Determines whether registrations should be deleted.
87 If registrations are deleted while devices with commands
88 still exist, these devices become unreachable.
89 Only set to true once such devices are cleared.
90 Returns:
91 None
92 """
93 assert url or cb_client, "Either url or client object must be given"
94 # create client
95 if cb_client is None:
96 client = ContextBrokerClient(url=url, fiware_header=fiware_header)
97 else:
98 client = cb_client
100 # clear registrations
101 if clear_registrations:
102 for reg in client.get_registration_list():
103 client.delete_registration(registration_id=reg.id)
104 assert len(client.get_registration_list()) == 0
106 # clean entities
107 client.delete_entities(entities=client.get_entity_list())
109 # clear subscriptions
110 for sub in client.get_subscription_list():
111 client.delete_subscription(subscription_id=sub.id)
112 assert len(client.get_subscription_list()) == 0
115def clear_iot_agent(
116 url: Union[str, AnyHttpUrl] = None,
117 fiware_header: FiwareHeader = None,
118 iota_client: IoTAClient = None,
119):
120 """
121 Function deletes all device groups and devices for a
122 given fiware header. To use TLS connection you need to provide the iota_client parameter
123 as an argument with the Session object including the certificate and private key.
125 Args:
126 url: Url of the iot agent service
127 fiware_header: header of the tenant
128 iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates
130 Returns:
131 None
132 """
133 assert url or iota_client, "Either url or client object must be given"
134 # create client
135 if iota_client is None:
136 client = IoTAClient(url=url, fiware_header=fiware_header)
137 else:
138 client = iota_client
140 # clear registrations
141 for device in client.get_device_list():
142 client.delete_device(device_id=device.device_id)
143 assert len(client.get_device_list()) == 0
145 # clear groups
146 for group in client.get_group_list():
147 client.delete_group(resource=group.resource, apikey=group.apikey)
148 assert len(client.get_group_list()) == 0
151def clear_quantumleap(
152 url: str = None,
153 fiware_header: FiwareHeader = None,
154 ql_client: QuantumLeapClient = None,
155):
156 """
157 Function deletes all data for a given fiware header. To use TLS connection you need to provide the ql_client parameter
158 as an argument with the Session object including the certificate and private key.
159 Args:
160 url: Url of the quantumleap service
161 fiware_header: header of the tenant
162 ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates
164 Returns:
165 None
166 """
168 def handle_emtpy_db_exception(err: RequestException) -> None:
169 """
170 When the database is empty for request quantumleap returns a 404
171 error with a error message. This will be handled here
172 evaluating the empty database error as 'OK'
174 Args:
175 err: exception raised by delete function
176 """
177 if (
178 err.response.status_code == 404
179 and err.response.json().get("error", None) == "Not Found"
180 ):
181 pass
182 else:
183 raise
185 assert url or ql_client, "Either url or client object must be given"
186 # create client
187 if ql_client is None:
188 client = QuantumLeapClient(url=url, fiware_header=fiware_header)
189 else:
190 client = ql_client
192 # clear data
193 entities = []
194 try:
195 entities = client.get_entities()
196 except RequestException as err:
197 handle_emtpy_db_exception(err)
199 # will be executed for all found entities
200 for entity in entities:
201 client.delete_entity(entity_id=entity.entityId, entity_type=entity.entityType)
204def clear_all(
205 *,
206 fiware_header: FiwareHeader = None,
207 cb_url: str = None,
208 iota_url: Union[str, List[str]] = None,
209 ql_url: str = None,
210 cb_client: ContextBrokerClient = None,
211 iota_client: IoTAClient = None,
212 ql_client: QuantumLeapClient = None
213):
214 """
215 Clears all services that a url is provided for.
216 If cb_url is provided, the registration will also be deleted.
218 Args:
219 fiware_header:
220 cb_url: url of the context broker service
221 iota_url: url of the IoT-Agent service
222 ql_url: url of the QuantumLeap service
223 cb_client: enables TLS communication if created with Session object, only needed
224 for self-signed certificates
225 iota_client: enables TLS communication if created with Session object, only needed
226 for self-signed certificates
227 ql_client: enables TLS communication if created with Session object, only needed
228 for self-signed certificates
230 Returns:
231 None
232 """
233 if iota_url is not None or iota_client is not None:
234 if iota_url is None:
235 # loop client
236 if isinstance(iota_client, IoTAClient):
237 iota_client = [iota_client]
238 for client in iota_client:
239 clear_iot_agent(fiware_header=fiware_header, iota_client=client)
240 else:
241 if isinstance(iota_url, (str, AnyUrl)):
242 iota_url = [iota_url]
243 for url in iota_url:
244 clear_iot_agent(url=url, fiware_header=fiware_header)
246 if cb_url is not None or cb_client is not None:
247 clear_context_broker(
248 url=cb_url,
249 fiware_header=fiware_header,
250 cb_client=cb_client,
251 clear_registrations=True,
252 )
254 if ql_url is not None or ql_client is not None:
255 clear_quantumleap(url=ql_url, fiware_header=fiware_header, ql_client=ql_client)
258def clean_test(
259 *,
260 fiware_service: str,
261 fiware_servicepath: str,
262 cb_url: str = None,
263 iota_url: Union[str, List[str]] = None,
264 ql_url: str = None,
265 cb_client: ContextBrokerClient = None,
266 iota_client: IoTAClient = None,
267 ql_client: QuantumLeapClient = None
268) -> Callable:
269 """
270 Decorator to clean up the server before and after the test
272 Note:
273 This does not substitute a proper TearDown method, because a failing
274 test will not execute the clean up after the error. Since this would
275 mean an unnecessary error handling. We actually want a test to fail
276 with proper messages.
278 Args:
279 fiware_service: tenant
280 fiware_servicepath: tenant path
281 cb_url: url of context broker service
282 iota_url: url of IoT-Agent service
283 ql_url: url of quantumleap service
284 cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates
285 iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates
286 ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates
288 Returns:
289 Decorator for clean tests
290 """
291 fiware_header = FiwareHeader(
292 service=fiware_service, service_path=fiware_servicepath
293 )
294 clear_all(
295 fiware_header=fiware_header,
296 cb_url=cb_url,
297 iota_url=iota_url,
298 ql_url=ql_url,
299 cb_client=cb_client,
300 iota_client=iota_client,
301 ql_client=ql_client,
302 )
304 # Inner decorator function
305 def decorator(func):
306 # Wrapper function for the decorated function
307 @wraps(func)
308 def wrapper(*args, **kwargs):
309 return func(*args, **kwargs)
311 return wrapper
313 clear_all(
314 fiware_header=fiware_header,
315 cb_url=cb_url,
316 iota_url=iota_url,
317 ql_url=ql_url,
318 cb_client=cb_client,
319 iota_client=iota_client,
320 ql_client=ql_client,
321 )
323 return decorator