Source code for filip.utils.cleanup

"""
Functions to clean up a tenant within a fiware based platform.
"""
import warnings
from functools import wraps

from pydantic import AnyHttpUrl, AnyUrl
from requests import RequestException
from typing import Callable, List, Union
from filip.models import FiwareHeader
from filip.clients.ngsi_v2 import \
    ContextBrokerClient, \
    IoTAClient, \
    QuantumLeapClient


[docs]def clear_context_broker(url: str=None, fiware_header: FiwareHeader=None, clear_registrations: bool = False, cb_client: ContextBrokerClient = None ): """ Function deletes all entities, registrations and subscriptions for a given fiware header. To use TLS connection you need to provide the cb_client parameter as an argument with the Session object including the certificate and private key. Note: Always clear the devices first because the IoT-Agent will otherwise through errors if it cannot find its registration anymore. Args: url: Url of the context broker service fiware_header: header of the tenant cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates clear_registrations: Determines whether registrations should be deleted. If registrations are deleted while devices with commands still exist, these devices become unreachable. Only set to true once such devices are cleared. Returns: None """ assert url or cb_client, "Either url or client object must be given" # create client if cb_client is None: client = ContextBrokerClient(url=url, fiware_header=fiware_header) else: client = cb_client # clean entities client.delete_entities(entities=client.get_entity_list()) # clear subscriptions for sub in client.get_subscription_list(): client.delete_subscription(subscription_id=sub.id) assert len(client.get_subscription_list()) == 0 # clear registrations if clear_registrations: for reg in client.get_registration_list(): client.delete_registration(registration_id=reg.id) assert len(client.get_registration_list()) == 0
[docs]def clear_iot_agent(url: Union[str, AnyHttpUrl] = None, fiware_header: FiwareHeader = None, iota_client: IoTAClient = None): """ Function deletes all device groups and devices for a given fiware header. To use TLS connection you need to provide the iota_client parameter as an argument with the Session object including the certificate and private key. Args: url: Url of the iot agent service fiware_header: header of the tenant iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates Returns: None """ assert url or iota_client, "Either url or client object must be given" # create client if iota_client is None: client = IoTAClient(url=url, fiware_header=fiware_header) else: client = iota_client # clear registrations for device in client.get_device_list(): client.delete_device(device_id=device.device_id) assert len(client.get_device_list()) == 0 # clear groups for group in client.get_group_list(): client.delete_group(resource=group.resource, apikey=group.apikey) assert len(client.get_group_list()) == 0
[docs]def clear_quantumleap(url: str = None, fiware_header: FiwareHeader = None, ql_client: QuantumLeapClient = None): """ Function deletes all data for a given fiware header. To use TLS connection you need to provide the ql_client parameter as an argument with the Session object including the certificate and private key. Args: url: Url of the quantumleap service fiware_header: header of the tenant ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates Returns: None """ def handle_emtpy_db_exception(err: RequestException) -> None: """ When the database is empty for request quantumleap returns a 404 error with a error message. This will be handled here evaluating the empty database error as 'OK' Args: err: exception raised by delete function """ if err.response.status_code == 404 \ and err.response.json().get('error', None) == 'Not Found': pass else: raise assert url or ql_client, "Either url or client object must be given" # create client if ql_client is None: client = QuantumLeapClient(url=url, fiware_header=fiware_header) else: client = ql_client # clear data entities = [] try: entities = client.get_entities() except RequestException as err: handle_emtpy_db_exception(err) # will be executed for all found entities for entity in entities: client.delete_entity(entity_id=entity.entityId, entity_type=entity.entityType)
[docs]def clear_all(*, fiware_header: FiwareHeader = None, cb_url: str = None, iota_url: Union[str, List[str]] = None, ql_url: str = None, cb_client: ContextBrokerClient = None, iota_client: IoTAClient = None, ql_client: QuantumLeapClient = None): """ Clears all services that a url is provided for Args: fiware_header: cb_url: url of the context broker service iota_url: url of the IoT-Agent service ql_url: url of the QuantumLeap service cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates Returns: None """ if iota_url is not None or iota_client is not None: if iota_url is None: # loop client if isinstance(iota_client, IoTAClient): iota_client = [iota_client] for client in iota_client: clear_iot_agent(fiware_header=fiware_header, iota_client=client) else: if isinstance(iota_url, (str, AnyUrl)): iota_url = [iota_url] for url in iota_url: clear_iot_agent(url=url, fiware_header=fiware_header) if cb_url is not None or cb_client is not None: clear_context_broker(url=cb_url, fiware_header=fiware_header, cb_client=cb_client) if ql_url is not None or ql_client is not None: clear_quantumleap(url=ql_url, fiware_header=fiware_header, ql_client=ql_client)
[docs]def clean_test(*, fiware_service: str, fiware_servicepath: str, cb_url: str = None, iota_url: Union[str, List[str]] = None, ql_url: str = None, cb_client: ContextBrokerClient = None, iota_client: IoTAClient = None, ql_client: QuantumLeapClient = None) -> Callable: """ Decorator to clean up the server before and after the test Note: This does not substitute a proper TearDown method, because a failing test will not execute the clean up after the error. Since this would mean an unnecessary error handling. We actually want a test to fail with proper messages. Args: fiware_service: tenant fiware_servicepath: tenant path cb_url: url of context broker service iota_url: url of IoT-Agent service ql_url: url of quantumleap service cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates Returns: Decorator for clean tests """ fiware_header = FiwareHeader(service=fiware_service, service_path=fiware_servicepath) clear_all(fiware_header=fiware_header, cb_url=cb_url, iota_url=iota_url, ql_url=ql_url, cb_client=cb_client, iota_client=iota_client, ql_client=ql_client) # Inner decorator function def decorator(func): # Wrapper function for the decorated function @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper clear_all(fiware_header=fiware_header, cb_url=cb_url, iota_url=iota_url, ql_url=ql_url, cb_client=cb_client, iota_client=iota_client, ql_client=ql_client) return decorator