"""
Functions to clean up a tenant within a fiware based platform.
"""
import warnings
from functools import wraps
from pydantic import AnyHttpUrl, AnyUrl
from requests import RequestException
from typing import Callable, List, Union
from filip.models import FiwareHeader, FiwareLDHeader
from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient
from filip.clients.ngsi_ld.cb import ContextBrokerLDClient
from filip.models.ngsi_ld.context import ActionTypeLD
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]def clear_context_broker_ld(
url: str = None,
fiware_ld_header: FiwareLDHeader = None,
cb_ld_client: ContextBrokerLDClient = None,
):
"""
Function deletes all entities and subscriptions for a tenant in an LD context broker.
Args:
url: Url of the context broker LD
fiware_ld_header: header of the NGSI-LD tenant
cb_ld_client: NGSI-LD context broker client object
Returns:
"""
assert url or cb_ld_client, "Either url or client object must be given"
# create client
if cb_ld_client is None:
client = ContextBrokerLDClient(url=url, fiware_header=fiware_ld_header)
else:
client = cb_ld_client
# clean entities iteratively
try:
entity_list = True
while entity_list:
entity_list = client.get_entity_list(limit=100)
if entity_list:
client.entity_batch_operation(
action_type=ActionTypeLD.DELETE, entities=entity_list
)
except RequestException as e:
logger.warning("Could not clean entities completely")
raise
# clean subscriptions
try:
sub_list = cb_ld_client.get_subscription_list()
for sub in sub_list:
cb_ld_client.delete_subscription(sub.id)
except RequestException as e:
logger.warning("Could not clean subscriptions completely")
raise
[docs]def clear_context_broker(
url: str = None,
fiware_header: FiwareHeader = None,
clear_registrations: bool = False,
cb_client: ContextBrokerClient = None,
):
"""
Function deletes all entities, registrations and subscriptions for a
given fiware header. To use TLS connection you need to provide the cb_client parameter
as an argument with the Session object including the certificate and private key.
Note:
Always clear the devices first because the IoT-Agent will otherwise
through errors if it cannot find its registration anymore.
Args:
url: Url of the context broker service
fiware_header: header of the tenant
cb_client: enables TLS communication if created with Session object, only needed
for self-signed certificates
clear_registrations: Determines whether registrations should be deleted.
If registrations are deleted while devices with commands
still exist, these devices become unreachable.
Only set to true once such devices are cleared.
Returns:
None
"""
assert url or cb_client, "Either url or client object must be given"
# create client
if cb_client is None:
client = ContextBrokerClient(url=url, fiware_header=fiware_header)
else:
client = cb_client
# clear registrations
if clear_registrations:
for reg in client.get_registration_list():
client.delete_registration(registration_id=reg.id)
assert len(client.get_registration_list()) == 0
# clean entities
client.delete_entities(entities=client.get_entity_list())
# clear subscriptions
for sub in client.get_subscription_list():
client.delete_subscription(subscription_id=sub.id)
assert len(client.get_subscription_list()) == 0
[docs]def clear_iot_agent(
url: Union[str, AnyHttpUrl] = None,
fiware_header: FiwareHeader = None,
iota_client: IoTAClient = None,
):
"""
Function deletes all device groups and devices for a
given fiware header. To use TLS connection you need to provide the iota_client parameter
as an argument with the Session object including the certificate and private key.
Args:
url: Url of the iot agent service
fiware_header: header of the tenant
iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates
Returns:
None
"""
assert url or iota_client, "Either url or client object must be given"
# create client
if iota_client is None:
client = IoTAClient(url=url, fiware_header=fiware_header)
else:
client = iota_client
# clear registrations
for device in client.get_device_list():
client.delete_device(device_id=device.device_id)
assert len(client.get_device_list()) == 0
# clear groups
for group in client.get_group_list():
client.delete_group(resource=group.resource, apikey=group.apikey)
assert len(client.get_group_list()) == 0
[docs]def clear_quantumleap(
url: str = None,
fiware_header: FiwareHeader = None,
ql_client: QuantumLeapClient = None,
):
"""
Function deletes all data for a given fiware header. To use TLS connection you need to provide the ql_client parameter
as an argument with the Session object including the certificate and private key.
Args:
url: Url of the quantumleap service
fiware_header: header of the tenant
ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates
Returns:
None
"""
def handle_emtpy_db_exception(err: RequestException) -> None:
"""
When the database is empty for request quantumleap returns a 404
error with a error message. This will be handled here
evaluating the empty database error as 'OK'
Args:
err: exception raised by delete function
"""
if (
err.response.status_code == 404
and err.response.json().get("error", None) == "Not Found"
):
pass
else:
raise
assert url or ql_client, "Either url or client object must be given"
# create client
if ql_client is None:
client = QuantumLeapClient(url=url, fiware_header=fiware_header)
else:
client = ql_client
# clear data
entities = []
try:
entities = client.get_entities()
except RequestException as err:
handle_emtpy_db_exception(err)
# will be executed for all found entities
for entity in entities:
client.delete_entity(entity_id=entity.entityId, entity_type=entity.entityType)
[docs]def clear_all(
*,
fiware_header: FiwareHeader = None,
cb_url: str = None,
iota_url: Union[str, List[str]] = None,
ql_url: str = None,
cb_client: ContextBrokerClient = None,
iota_client: IoTAClient = None,
ql_client: QuantumLeapClient = None
):
"""
Clears all services that a url is provided for.
If cb_url is provided, the registration will also be deleted.
Args:
fiware_header:
cb_url: url of the context broker service
iota_url: url of the IoT-Agent service
ql_url: url of the QuantumLeap service
cb_client: enables TLS communication if created with Session object, only needed
for self-signed certificates
iota_client: enables TLS communication if created with Session object, only needed
for self-signed certificates
ql_client: enables TLS communication if created with Session object, only needed
for self-signed certificates
Returns:
None
"""
if iota_url is not None or iota_client is not None:
if iota_url is None:
# loop client
if isinstance(iota_client, IoTAClient):
iota_client = [iota_client]
for client in iota_client:
clear_iot_agent(fiware_header=fiware_header, iota_client=client)
else:
if isinstance(iota_url, (str, AnyUrl)):
iota_url = [iota_url]
for url in iota_url:
clear_iot_agent(url=url, fiware_header=fiware_header)
if cb_url is not None or cb_client is not None:
clear_context_broker(
url=cb_url,
fiware_header=fiware_header,
cb_client=cb_client,
clear_registrations=True,
)
if ql_url is not None or ql_client is not None:
clear_quantumleap(url=ql_url, fiware_header=fiware_header, ql_client=ql_client)
[docs]def clean_test(
*,
fiware_service: str,
fiware_servicepath: str,
cb_url: str = None,
iota_url: Union[str, List[str]] = None,
ql_url: str = None,
cb_client: ContextBrokerClient = None,
iota_client: IoTAClient = None,
ql_client: QuantumLeapClient = None
) -> Callable:
"""
Decorator to clean up the server before and after the test
Note:
This does not substitute a proper TearDown method, because a failing
test will not execute the clean up after the error. Since this would
mean an unnecessary error handling. We actually want a test to fail
with proper messages.
Args:
fiware_service: tenant
fiware_servicepath: tenant path
cb_url: url of context broker service
iota_url: url of IoT-Agent service
ql_url: url of quantumleap service
cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates
iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates
ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates
Returns:
Decorator for clean tests
"""
fiware_header = FiwareHeader(
service=fiware_service, service_path=fiware_servicepath
)
clear_all(
fiware_header=fiware_header,
cb_url=cb_url,
iota_url=iota_url,
ql_url=ql_url,
cb_client=cb_client,
iota_client=iota_client,
ql_client=ql_client,
)
# Inner decorator function
def decorator(func):
# Wrapper function for the decorated function
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
clear_all(
fiware_header=fiware_header,
cb_url=cb_url,
iota_url=iota_url,
ql_url=ql_url,
cb_client=cb_client,
iota_client=iota_client,
ql_client=ql_client,
)
return decorator