Coverage for filip/utils/validators.py: 96%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Helper functions to prohibit boiler plate code 

3""" 

4import logging 

5import re 

6import warnings 

7from aenum import Enum 

8from typing import Dict, Any, List 

9from pydantic import AnyHttpUrl, validate_call 

10from pydantic_core import PydanticCustomError 

11from filip.custom_types import AnyMqttUrl 

12from pyjexl.jexl import JEXL 

13from pyjexl.parser import Transform 

14from pyjexl.exceptions import ParseError 

15 

16logger = logging.getLogger(name=__name__) 

17 

18 

19class FiwareRegex(str, Enum): 

20 """ 

21 Collection of Regex expression used to check if the value of a Pydantic 

22 field, can be used in the related Fiware field. 

23 """ 

24 _init_ = 'value __doc__' 

25 

26 standard = r"(^((?![?&#/\"' ])[\x00-\x7F])*$)", \ 

27 "Prevents any string that contains at least one of the " \ 

28 "symbols: ? & # / ' \" or a whitespace" 

29 string_protect = r"(?!^id$)(?!^type$)(?!^geo:location$)" \ 

30 r"(^((?![?&#/\"' ])[\x00-\x7F])*$)",\ 

31 "Prevents any string that contains at least one of " \ 

32 "the symbols: ? & # / ' \" or a whitespace." \ 

33 "AND the strings: id, type, geo:location" 

34 

35 

36@validate_call 

37def validate_http_url(url: AnyHttpUrl) -> str: 

38 """ 

39 Function checks whether the host has "http" added in case of http as 

40 protocol. 

41 

42 Args: 

43 url (AnyHttpUrl): the url for the host / port 

44 

45 Returns: 

46 validated url 

47 """ 

48 return str(url) if url else url 

49 

50 

51@validate_call 

52def validate_mqtt_url(url: AnyMqttUrl) -> str: 

53 """ 

54 Function that checks whether a url is valid mqtt endpoint 

55 

56 Args: 

57 url: the url for the target endpoint 

58 

59 Returns: 

60 validated url 

61 """ 

62 return str(url) if url else url 

63 

64 

65def validate_escape_character_free(value: Any) -> Any: 

66 """ 

67 Function that checks whether a value contains a string part that starts 

68 or end with ' or ". 

69 the function iterates to break down each complex data-structure to its 

70 fundamental string parts. 

71 Each value of a list is examined 

72 Of dictionaries each value is examined, keys are skipped, as they are ok 

73 for Fiware 

74 

75 Args: 

76 value: the string to check 

77 

78 Returns: 

79 validated string 

80 """ 

81 

82 if not isinstance(value, List): 

83 values = [value] 

84 else: 

85 values = value 

86 

87 for value in values: 

88 if isinstance(value, Dict): 

89 for key, dict_value in value.items(): 

90 validate_escape_character_free(dict_value) 

91 # it seems Fiware has no problem if the keys contain ' or " 

92 # validate_escape_character_free(key) 

93 elif isinstance(value, List): 

94 for inner_list in value: 

95 validate_escape_character_free(inner_list) 

96 else: 

97 # if a value here is not a string, it will also not contain ' or " 

98 value = str(value) 

99 if '"' == value[-1:] or '"' == value[0:1]: 

100 raise ValueError(f"The value {value} contains " 

101 f"the forbidden char \"") 

102 if "'" == value[-1:] or "'" == value[0:1]: 

103 raise ValueError(f"The value {value} contains " 

104 f"the forbidden char '") 

105 return values 

106 

107 

108def match_regex(value: str, pattern: str): 

109 regex = re.compile(pattern) 

110 if not regex.match(value): 

111 raise PydanticCustomError( 

112 'string_pattern_mismatch', 

113 "String should match pattern '{pattern}'", 

114 {'pattern': pattern}, 

115 ) 

116 return value 

117 

118 

119def ignore_none_input(func): 

120 def wrapper(arg): 

121 if arg is None: 

122 return arg 

123 return func(arg) 

124 return wrapper 

125 

126 

127def validate_fiware_standard_regex(vale: str): 

128 return match_regex(vale, FiwareRegex.standard.value) 

129 

130 

131def validate_fiware_string_protect_regex(vale: str): 

132 return match_regex(vale, FiwareRegex.string_protect.value) 

133 

134 

135@ignore_none_input 

136def validate_mqtt_topic(topic: str): 

137 return match_regex(topic, r'^((?![\'\"#+,])[\x00-\x7F])*$') 

138 

139 

140@ignore_none_input 

141def validate_fiware_datatype_standard(_type): 

142 from filip.models.base import DataType 

143 if isinstance(_type, DataType): 

144 return _type 

145 elif isinstance(_type, str): 

146 return validate_fiware_standard_regex(_type) 

147 else: 

148 raise TypeError(f"Invalid type {type(_type)}") 

149 

150 

151@ignore_none_input 

152def validate_fiware_datatype_string_protect(_type): 

153 from filip.models.base import DataType 

154 if isinstance(_type, DataType): 

155 return _type 

156 elif isinstance(_type, str): 

157 return validate_fiware_string_protect_regex(_type) 

158 else: 

159 raise TypeError(f"Invalid type {type(_type)}") 

160 

161 

162@ignore_none_input 

163def validate_fiware_service_path(service_path): 

164 return match_regex(service_path, 

165 r'^((\/\w*)|(\/\#))*(\,((\/\w*)|(\/\#)))*$') 

166 

167 

168@ignore_none_input 

169def validate_fiware_service(service): 

170 return match_regex(service, 

171 r"\w*$") 

172 

173 

174jexl_transformation_functions = { 

175 "jsonparse": "(str) => JSON.parse(str)", 

176 "jsonstringify": "(obj) => JSON.stringify(obj)", 

177 "indexOf": "(val, char) => String(val).indexOf(char)", 

178 "length": "(val) => String(val).length", 

179 "trim": "(val) => String(val).trim()", 

180 "substr": "(val, int1, int2) => String(val).substr(int1, int2)", 

181 "addreduce": "(arr) => arr.reduce((i, v) => i + v)", 

182 "lengtharray": "(arr) => len(arr)", 

183 "typeof": "(val) => typeof val", 

184 "isarray": "(arr) => Array.isArray(arr)", 

185 "isnan": "(val) => isNaN(val)", 

186 "parseint": "(val) => parseInt(val)", 

187 "parsefloat": "(val) => parseFloat(val)", 

188 "toisodate": "(val) => new Date(val).toISOString()", 

189 "timeoffset": "(isostr) => new Date(isostr).getTimezoneOffset()", 

190 "tostring": "(val) => str(val)", 

191 "urlencode": "(val) => encodeURI(val)", 

192 "urldecode": "(val) => decodeURI(val)", 

193 "replacestr": "(str, from, to) => str.replace(from, to)", 

194 "replaceregexp": "(str, reg, to) => str.replace(reg, to)", 

195 "replaceallstr": "(str, from, to) => str.replace(from, to)", 

196 "replaceallregexp": "(str, reg, to) => str.replace(reg, to)", 

197 "split": "(str, ch) => str.split(ch)", 

198 "mapper": "(val, values, choices) => choices[values.index(val)]", 

199 "thmapper": "(val, values, choices) => choices[next((i for i, v in enumerate(values) if val <= v), None)]", 

200 "bitwisemask": "(i, mask, op, shf) => ((int(i) & mask) if op == '&' else ((int(i) | mask) if op == '|' else ((int(i) ^ mask) if op == '^' else int(i))) >> shf)", 

201 "slice": "(arr, init, end) => arr[init:end]", 

202 "addset": "(arr, x) => list(set(arr).add(x))", 

203 "removeset": "(arr, x) => list(set(arr).remove(x))", 

204 "touppercase": "(val) => str(val).upper()", 

205 "tolowercase": "(val) => str(val).lower()" 

206} 

207 

208 

209def validate_jexl_expression(expression, attribute_name, device_id): 

210 try: 

211 jexl_expression = JEXL().parse(expression) 

212 if isinstance(jexl_expression, Transform): 

213 if jexl_expression.name not in jexl_transformation_functions.keys(): 

214 warnings.warn(f"{jexl_expression.name} might not supported") 

215 except ParseError: 

216 msg = f"Invalid JEXL expression '{expression}' inside the attribute '{attribute_name}' of Device '{device_id}'." 

217 if '|' in expression: 

218 msg += " If the expression contains the transform operator '|' you need to remove the spaces around it." 

219 raise ParseError(msg) 

220 return expression 

221 

222 

223def validate_expression_language(cls, expressionLanguage): 

224 if expressionLanguage == "legacy": 

225 warnings.warn(f"Using 'LEGACY' expression language inside {cls.__name__} is " 

226 f"deprecated. Use 'JEXL' instead.") 

227 elif expressionLanguage is None: 

228 expressionLanguage = "jexl" 

229 return expressionLanguage