Coverage for filip/utils/validators.py: 96%
92 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Helper functions to prohibit boiler plate code
3"""
4import logging
5import re
6import warnings
7from aenum import Enum
8from typing import Dict, Any, List
9from pydantic import AnyHttpUrl, validate_call
10from pydantic_core import PydanticCustomError
11from filip.custom_types import AnyMqttUrl
12from pyjexl.jexl import JEXL
13from pyjexl.parser import Transform
14from pyjexl.exceptions import ParseError
16logger = logging.getLogger(name=__name__)
19class FiwareRegex(str, Enum):
20 """
21 Collection of Regex expression used to check if the value of a Pydantic
22 field, can be used in the related Fiware field.
23 """
24 _init_ = 'value __doc__'
26 standard = r"(^((?![?&#/\"' ])[\x00-\x7F])*$)", \
27 "Prevents any string that contains at least one of the " \
28 "symbols: ? & # / ' \" or a whitespace"
29 string_protect = r"(?!^id$)(?!^type$)(?!^geo:location$)" \
30 r"(^((?![?&#/\"' ])[\x00-\x7F])*$)",\
31 "Prevents any string that contains at least one of " \
32 "the symbols: ? & # / ' \" or a whitespace." \
33 "AND the strings: id, type, geo:location"
36@validate_call
37def validate_http_url(url: AnyHttpUrl) -> str:
38 """
39 Function checks whether the host has "http" added in case of http as
40 protocol.
42 Args:
43 url (AnyHttpUrl): the url for the host / port
45 Returns:
46 validated url
47 """
48 return str(url) if url else url
51@validate_call
52def validate_mqtt_url(url: AnyMqttUrl) -> str:
53 """
54 Function that checks whether a url is valid mqtt endpoint
56 Args:
57 url: the url for the target endpoint
59 Returns:
60 validated url
61 """
62 return str(url) if url else url
65def validate_escape_character_free(value: Any) -> Any:
66 """
67 Function that checks whether a value contains a string part that starts
68 or end with ' or ".
69 the function iterates to break down each complex data-structure to its
70 fundamental string parts.
71 Each value of a list is examined
72 Of dictionaries each value is examined, keys are skipped, as they are ok
73 for Fiware
75 Args:
76 value: the string to check
78 Returns:
79 validated string
80 """
82 if not isinstance(value, List):
83 values = [value]
84 else:
85 values = value
87 for value in values:
88 if isinstance(value, Dict):
89 for key, dict_value in value.items():
90 validate_escape_character_free(dict_value)
91 # it seems Fiware has no problem if the keys contain ' or "
92 # validate_escape_character_free(key)
93 elif isinstance(value, List):
94 for inner_list in value:
95 validate_escape_character_free(inner_list)
96 else:
97 # if a value here is not a string, it will also not contain ' or "
98 value = str(value)
99 if '"' == value[-1:] or '"' == value[0:1]:
100 raise ValueError(f"The value {value} contains "
101 f"the forbidden char \"")
102 if "'" == value[-1:] or "'" == value[0:1]:
103 raise ValueError(f"The value {value} contains "
104 f"the forbidden char '")
105 return values
108def match_regex(value: str, pattern: str):
109 regex = re.compile(pattern)
110 if not regex.match(value):
111 raise PydanticCustomError(
112 'string_pattern_mismatch',
113 "String should match pattern '{pattern}'",
114 {'pattern': pattern},
115 )
116 return value
119def ignore_none_input(func):
120 def wrapper(arg):
121 if arg is None:
122 return arg
123 return func(arg)
124 return wrapper
127def validate_fiware_standard_regex(vale: str):
128 return match_regex(vale, FiwareRegex.standard.value)
131def validate_fiware_string_protect_regex(vale: str):
132 return match_regex(vale, FiwareRegex.string_protect.value)
135@ignore_none_input
136def validate_mqtt_topic(topic: str):
137 return match_regex(topic, r'^((?![\'\"#+,])[\x00-\x7F])*$')
140@ignore_none_input
141def validate_fiware_datatype_standard(_type):
142 from filip.models.base import DataType
143 if isinstance(_type, DataType):
144 return _type
145 elif isinstance(_type, str):
146 return validate_fiware_standard_regex(_type)
147 else:
148 raise TypeError(f"Invalid type {type(_type)}")
151@ignore_none_input
152def validate_fiware_datatype_string_protect(_type):
153 from filip.models.base import DataType
154 if isinstance(_type, DataType):
155 return _type
156 elif isinstance(_type, str):
157 return validate_fiware_string_protect_regex(_type)
158 else:
159 raise TypeError(f"Invalid type {type(_type)}")
162@ignore_none_input
163def validate_fiware_service_path(service_path):
164 return match_regex(service_path,
165 r'^((\/\w*)|(\/\#))*(\,((\/\w*)|(\/\#)))*$')
168@ignore_none_input
169def validate_fiware_service(service):
170 return match_regex(service,
171 r"\w*$")
174jexl_transformation_functions = {
175 "jsonparse": "(str) => JSON.parse(str)",
176 "jsonstringify": "(obj) => JSON.stringify(obj)",
177 "indexOf": "(val, char) => String(val).indexOf(char)",
178 "length": "(val) => String(val).length",
179 "trim": "(val) => String(val).trim()",
180 "substr": "(val, int1, int2) => String(val).substr(int1, int2)",
181 "addreduce": "(arr) => arr.reduce((i, v) => i + v)",
182 "lengtharray": "(arr) => len(arr)",
183 "typeof": "(val) => typeof val",
184 "isarray": "(arr) => Array.isArray(arr)",
185 "isnan": "(val) => isNaN(val)",
186 "parseint": "(val) => parseInt(val)",
187 "parsefloat": "(val) => parseFloat(val)",
188 "toisodate": "(val) => new Date(val).toISOString()",
189 "timeoffset": "(isostr) => new Date(isostr).getTimezoneOffset()",
190 "tostring": "(val) => str(val)",
191 "urlencode": "(val) => encodeURI(val)",
192 "urldecode": "(val) => decodeURI(val)",
193 "replacestr": "(str, from, to) => str.replace(from, to)",
194 "replaceregexp": "(str, reg, to) => str.replace(reg, to)",
195 "replaceallstr": "(str, from, to) => str.replace(from, to)",
196 "replaceallregexp": "(str, reg, to) => str.replace(reg, to)",
197 "split": "(str, ch) => str.split(ch)",
198 "mapper": "(val, values, choices) => choices[values.index(val)]",
199 "thmapper": "(val, values, choices) => choices[next((i for i, v in enumerate(values) if val <= v), None)]",
200 "bitwisemask": "(i, mask, op, shf) => ((int(i) & mask) if op == '&' else ((int(i) | mask) if op == '|' else ((int(i) ^ mask) if op == '^' else int(i))) >> shf)",
201 "slice": "(arr, init, end) => arr[init:end]",
202 "addset": "(arr, x) => list(set(arr).add(x))",
203 "removeset": "(arr, x) => list(set(arr).remove(x))",
204 "touppercase": "(val) => str(val).upper()",
205 "tolowercase": "(val) => str(val).lower()"
206}
209def validate_jexl_expression(expression, attribute_name, device_id):
210 try:
211 jexl_expression = JEXL().parse(expression)
212 if isinstance(jexl_expression, Transform):
213 if jexl_expression.name not in jexl_transformation_functions.keys():
214 warnings.warn(f"{jexl_expression.name} might not supported")
215 except ParseError:
216 msg = f"Invalid JEXL expression '{expression}' inside the attribute '{attribute_name}' of Device '{device_id}'."
217 if '|' in expression:
218 msg += " If the expression contains the transform operator '|' you need to remove the spaces around it."
219 raise ParseError(msg)
220 return expression
223def validate_expression_language(cls, expressionLanguage):
224 if expressionLanguage == "legacy":
225 warnings.warn(f"Using 'LEGACY' expression language inside {cls.__name__} is "
226 f"deprecated. Use 'JEXL' instead.")
227 elif expressionLanguage is None:
228 expressionLanguage = "jexl"
229 return expressionLanguage