Coverage for filip/utils/validators.py: 96%
101 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
1"""
2Helper functions to prohibit boiler plate code
3"""
5import logging
6import re
7import warnings
8from aenum import Enum
9from typing import Dict, Any, List
10from pydantic import AnyHttpUrl, validate_call
11from pydantic_core import PydanticCustomError
12from filip.custom_types import AnyMqttUrl
13from pyjexl.jexl import JEXL
14from pyjexl.parser import Transform
15from pyjexl.exceptions import ParseError
17logger = logging.getLogger(name=__name__)
20class FiwareRegex(str, Enum):
21 """
22 Collection of Regex expression used to check if the value of a Pydantic
23 field, can be used in the related Fiware field. The regexes here are primarily
24 defined based on the identifiers syntax restriction:
25 https://fiware-orion.readthedocs.io/en/stable/orion-api.html#identifiers-syntax-restrictions
26 """
28 _init_ = "value __doc__"
29 # Identifiers syntax restriction
30 standard = (
31 r"(^((?![?&#/\"' ])[\x00-\x7F])*$)",
32 "Prevents any string that contains at least one of the "
33 "symbols: ? & # / ' \" or a whitespace",
34 )
35 string_protect = (
36 r"(?!^id$)(?!^type$)(?!^geo:json$)(^((?![?&#/\"' ])[\x00-\x7F])*$)",
37 "Prevents any string that contains at least one of "
38 "the symbols: ? & # / ' \" or a whitespace."
39 "AND the strings: id, type, geo:json",
40 )
41 attribute_name = (
42 r"(?!^id$)(?!^type$)(?!^geo:json$)(^((?![\"'<>()=; §&/#?])[\x00-\x7F])*$)",
43 "Prevents any string that contains at least one of the "
44 "symbols: ( ) < > \" ' = ; § & / # ?",
45 )
46 attribute_value = (
47 r"(^((?![\"'<>()=;])[\x00-\x7F])*$)",
48 "Prevents any string that contains at least one of the "
49 "symbols: ( ) < > \" ' = ; ",
50 )
53@validate_call
54def validate_http_url(url: AnyHttpUrl) -> str:
55 """
56 Function checks whether the host has "http" added in case of http as
57 protocol.
59 Args:
60 url (AnyHttpUrl): the url for the host / port
62 Returns:
63 validated url
64 """
65 url = str(url) if url else url
66 if url[-1] != "/":
67 # add trailing slash
68 url = f"{url}/"
69 return url
72@validate_call
73def validate_mqtt_url(url: AnyMqttUrl) -> str:
74 """
75 Function that checks whether a url is valid mqtt endpoint
77 Args:
78 url: the url for the target endpoint
80 Returns:
81 validated url
82 """
83 return str(url) if url else url
86def validate_escape_character_free(value: Any) -> Any:
87 """
88 Function that checks whether a value contains a string part that starts
89 or end with ' or ".
90 the function iterates to break down each complex data-structure to its
91 fundamental string parts.
92 Each value of a list is examined
93 Of dictionaries each value is examined, keys are skipped, as they are ok
94 for Fiware
96 Args:
97 value: the string to check
99 Returns:
100 validated string
101 """
103 if not isinstance(value, List):
104 values = [value]
105 else:
106 values = value
108 for value in values:
109 if isinstance(value, Dict):
110 for key, dict_value in value.items():
111 validate_escape_character_free(dict_value)
112 # it seems Fiware has no problem if the keys contain ' or "
113 # validate_escape_character_free(key)
114 elif isinstance(value, List):
115 for inner_list in value:
116 validate_escape_character_free(inner_list)
117 else:
118 # if a value here is not a string, it will also not contain ' or "
119 value = str(value)
120 if '"' == value[-1:] or '"' == value[0:1]:
121 raise ValueError(f"The value {value} contains " f'the forbidden char "')
122 if "'" == value[-1:] or "'" == value[0:1]:
123 raise ValueError(f"The value {value} contains " f"the forbidden char '")
124 return values
127def match_regex(value: str, pattern: str):
128 regex = re.compile(pattern)
129 if not regex.match(value):
130 raise PydanticCustomError(
131 "string_pattern_mismatch",
132 "String should match pattern '{pattern}', [type='{error_type}', input_value='{value}']",
133 {
134 "pattern": pattern,
135 "error_type": "string_pattern_mismatch",
136 "value": value,
137 },
138 )
139 return value
142def ignore_none_input(func):
143 def wrapper(arg):
144 if arg is None:
145 return arg
146 return func(arg)
148 return wrapper
151def validate_fiware_standard_regex(vale: str):
152 return match_regex(vale, FiwareRegex.standard.value)
155def validate_fiware_string_protect_regex(vale: str):
156 return match_regex(vale, FiwareRegex.string_protect.value)
159def validate_fiware_attribute_value_regex(vale: str):
160 return match_regex(vale, FiwareRegex.attribute_value.value)
163def validate_fiware_attribute_name_regex(vale: str):
164 return match_regex(vale, FiwareRegex.attribute_name.value)
167@ignore_none_input
168def validate_mqtt_topic(topic: str):
169 return match_regex(topic, r"^((?![\'\"#+,])[\x00-\x7F])*$")
172@ignore_none_input
173def validate_fiware_datatype_standard(_type):
174 from filip.models.base import DataType
176 if isinstance(_type, DataType):
177 return _type
178 elif isinstance(_type, str):
179 return validate_fiware_standard_regex(_type)
180 else:
181 raise TypeError(f"Invalid type {type(_type)}")
184@ignore_none_input
185def validate_fiware_datatype_string_protect(_type):
186 from filip.models.base import DataType
188 if isinstance(_type, DataType):
189 return _type
190 elif isinstance(_type, str):
191 return validate_fiware_string_protect_regex(_type)
192 else:
193 raise TypeError(f"Invalid type {type(_type)}")
196@ignore_none_input
197def validate_fiware_service_path(service_path):
198 return match_regex(service_path, r"^((\/\w*)|(\/\#))*(\,((\/\w*)|(\/\#)))*$")
201@ignore_none_input
202def validate_fiware_service(service):
203 return match_regex(service, r"\w*$")
206jexl_transformation_functions = {
207 "jsonparse": "(str) => JSON.parse(str)",
208 "jsonstringify": "(obj) => JSON.stringify(obj)",
209 "indexOf": "(val, char) => String(val).indexOf(char)",
210 "length": "(val) => String(val).length",
211 "trim": "(val) => String(val).trim()",
212 "substr": "(val, int1, int2) => String(val).substr(int1, int2)",
213 "addreduce": "(arr) => arr.reduce((i, v) => i + v)",
214 "lengtharray": "(arr) => len(arr)",
215 "typeof": "(val) => typeof val",
216 "isarray": "(arr) => Array.isArray(arr)",
217 "isnan": "(val) => isNaN(val)",
218 "parseint": "(val) => parseInt(val)",
219 "parsefloat": "(val) => parseFloat(val)",
220 "toisodate": "(val) => new Date(val).toISOString()",
221 "timeoffset": "(isostr) => new Date(isostr).getTimezoneOffset()",
222 "tostring": "(val) => str(val)",
223 "urlencode": "(val) => encodeURI(val)",
224 "urldecode": "(val) => decodeURI(val)",
225 "replacestr": "(str, from, to) => str.replace(from, to)",
226 "replaceregexp": "(str, reg, to) => str.replace(reg, to)",
227 "replaceallstr": "(str, from, to) => str.replace(from, to)",
228 "replaceallregexp": "(str, reg, to) => str.replace(reg, to)",
229 "split": "(str, ch) => str.split(ch)",
230 "mapper": "(val, values, choices) => choices[values.index(val)]",
231 "thmapper": "(val, values, choices) => choices[next((i for i, v in enumerate(values) if val <= v), None)]",
232 "bitwisemask": "(i, mask, op, shf) => ((int(i) & mask) if op == '&' else ((int(i) | mask) if op == '|' else ((int(i) ^ mask) if op == '^' else int(i))) >> shf)",
233 "slice": "(arr, init, end) => arr[init:end]",
234 "addset": "(arr, x) => list(set(arr).add(x))",
235 "removeset": "(arr, x) => list(set(arr).remove(x))",
236 "touppercase": "(val) => str(val).upper()",
237 "tolowercase": "(val) => str(val).lower()",
238}
241def validate_jexl_expression(expression, attribute_name, device_id):
242 try:
243 jexl_expression = JEXL().parse(expression)
244 if isinstance(jexl_expression, Transform):
245 if jexl_expression.name not in jexl_transformation_functions.keys():
246 warnings.warn(f"{jexl_expression.name} might not supported")
247 except ParseError:
248 msg = f"Invalid JEXL expression '{expression}' inside the attribute '{attribute_name}' of Device '{device_id}'."
249 if "|" in expression:
250 msg += " If the expression contains the transform operator '|' you need to remove the spaces around it."
251 raise ParseError(msg)
252 return expression
255def validate_expression_language(cls, expressionLanguage):
256 if expressionLanguage == "legacy":
257 warnings.warn(
258 f"Using 'LEGACY' expression language inside {cls.__name__} is "
259 f"deprecated. Use 'JEXL' instead."
260 )
261 elif expressionLanguage is None:
262 expressionLanguage = "jexl"
263 return expressionLanguage