Source code for filip.utils.validators

"""
Helper functions to prohibit boiler plate code
"""
import logging
import re
import warnings
from aenum import Enum
from typing import Dict, Any, List
from pydantic import AnyHttpUrl, validate_call
from pydantic_core import PydanticCustomError
from filip.custom_types import AnyMqttUrl
from pyjexl.jexl import JEXL
from pyjexl.parser import Transform
from pyjexl.exceptions import ParseError

logger = logging.getLogger(name=__name__)


[docs]class FiwareRegex(str, Enum): """ Collection of Regex expression used to check if the value of a Pydantic field, can be used in the related Fiware field. """ _init_ = 'value __doc__' standard = r"(^((?![?&#/\"' ])[\x00-\x7F])*$)", \ "Prevents any string that contains at least one of the " \ "symbols: ? & # / ' \" or a whitespace" string_protect = r"(?!^id$)(?!^type$)(?!^geo:location$)" \ r"(^((?![?&#/\"' ])[\x00-\x7F])*$)",\ "Prevents any string that contains at least one of " \ "the symbols: ? & # / ' \" or a whitespace." \ "AND the strings: id, type, geo:location"
[docs]@validate_call def validate_http_url(url: AnyHttpUrl) -> str: """ Function checks whether the host has "http" added in case of http as protocol. Args: url (AnyHttpUrl): the url for the host / port Returns: validated url """ return str(url) if url else url
[docs]@validate_call def validate_mqtt_url(url: AnyMqttUrl) -> str: """ Function that checks whether a url is valid mqtt endpoint Args: url: the url for the target endpoint Returns: validated url """ return str(url) if url else url
[docs]def validate_escape_character_free(value: Any) -> Any: """ Function that checks whether a value contains a string part that starts or end with ' or ". the function iterates to break down each complex data-structure to its fundamental string parts. Each value of a list is examined Of dictionaries each value is examined, keys are skipped, as they are ok for Fiware Args: value: the string to check Returns: validated string """ if not isinstance(value, List): values = [value] else: values = value for value in values: if isinstance(value, Dict): for key, dict_value in value.items(): validate_escape_character_free(dict_value) # it seems Fiware has no problem if the keys contain ' or " # validate_escape_character_free(key) elif isinstance(value, List): for inner_list in value: validate_escape_character_free(inner_list) else: # if a value here is not a string, it will also not contain ' or " value = str(value) if '"' == value[-1:] or '"' == value[0:1]: raise ValueError(f"The value {value} contains " f"the forbidden char \"") if "'" == value[-1:] or "'" == value[0:1]: raise ValueError(f"The value {value} contains " f"the forbidden char '") return values
[docs]def match_regex(value: str, pattern: str): regex = re.compile(pattern) if not regex.match(value): raise PydanticCustomError( 'string_pattern_mismatch', "String should match pattern '{pattern}'", {'pattern': pattern}, ) return value
[docs]def ignore_none_input(func): def wrapper(arg): if arg is None: return arg return func(arg) return wrapper
[docs]def validate_fiware_standard_regex(vale: str): return match_regex(vale, FiwareRegex.standard.value)
[docs]def validate_fiware_string_protect_regex(vale: str): return match_regex(vale, FiwareRegex.string_protect.value)
[docs]@ignore_none_input def validate_mqtt_topic(topic: str): return match_regex(topic, r'^((?![\'\"#+,])[\x00-\x7F])*$')
[docs]@ignore_none_input def validate_fiware_datatype_standard(_type): from filip.models.base import DataType if isinstance(_type, DataType): return _type elif isinstance(_type, str): return validate_fiware_standard_regex(_type) else: raise TypeError(f"Invalid type {type(_type)}")
[docs]@ignore_none_input def validate_fiware_datatype_string_protect(_type): from filip.models.base import DataType if isinstance(_type, DataType): return _type elif isinstance(_type, str): return validate_fiware_string_protect_regex(_type) else: raise TypeError(f"Invalid type {type(_type)}")
[docs]@ignore_none_input def validate_fiware_service_path(service_path): return match_regex(service_path, r'^((\/\w*)|(\/\#))*(\,((\/\w*)|(\/\#)))*$')
[docs]@ignore_none_input def validate_fiware_service(service): return match_regex(service, r"\w*$")
jexl_transformation_functions = { "jsonparse": "(str) => JSON.parse(str)", "jsonstringify": "(obj) => JSON.stringify(obj)", "indexOf": "(val, char) => String(val).indexOf(char)", "length": "(val) => String(val).length", "trim": "(val) => String(val).trim()", "substr": "(val, int1, int2) => String(val).substr(int1, int2)", "addreduce": "(arr) => arr.reduce((i, v) => i + v)", "lengtharray": "(arr) => len(arr)", "typeof": "(val) => typeof val", "isarray": "(arr) => Array.isArray(arr)", "isnan": "(val) => isNaN(val)", "parseint": "(val) => parseInt(val)", "parsefloat": "(val) => parseFloat(val)", "toisodate": "(val) => new Date(val).toISOString()", "timeoffset": "(isostr) => new Date(isostr).getTimezoneOffset()", "tostring": "(val) => str(val)", "urlencode": "(val) => encodeURI(val)", "urldecode": "(val) => decodeURI(val)", "replacestr": "(str, from, to) => str.replace(from, to)", "replaceregexp": "(str, reg, to) => str.replace(reg, to)", "replaceallstr": "(str, from, to) => str.replace(from, to)", "replaceallregexp": "(str, reg, to) => str.replace(reg, to)", "split": "(str, ch) => str.split(ch)", "mapper": "(val, values, choices) => choices[values.index(val)]", "thmapper": "(val, values, choices) => choices[next((i for i, v in enumerate(values) if val <= v), None)]", "bitwisemask": "(i, mask, op, shf) => ((int(i) & mask) if op == '&' else ((int(i) | mask) if op == '|' else ((int(i) ^ mask) if op == '^' else int(i))) >> shf)", "slice": "(arr, init, end) => arr[init:end]", "addset": "(arr, x) => list(set(arr).add(x))", "removeset": "(arr, x) => list(set(arr).remove(x))", "touppercase": "(val) => str(val).upper()", "tolowercase": "(val) => str(val).lower()" }
[docs]def validate_jexl_expression(expression, attribute_name, device_id): try: jexl_expression = JEXL().parse(expression) if isinstance(jexl_expression, Transform): if jexl_expression.name not in jexl_transformation_functions.keys(): warnings.warn(f"{jexl_expression.name} might not supported") except ParseError: msg = f"Invalid JEXL expression '{expression}' inside the attribute '{attribute_name}' of Device '{device_id}'." if '|' in expression: msg += " If the expression contains the transform operator '|' you need to remove the spaces around it." raise ParseError(msg) return expression
[docs]def validate_expression_language(cls, expressionLanguage): if expressionLanguage == "legacy": warnings.warn(f"Using 'LEGACY' expression language inside {cls.__name__} is " f"deprecated. Use 'JEXL' instead.") elif expressionLanguage is None: expressionLanguage = "jexl" return expressionLanguage