Coverage for filip/utils/validators.py: 96%

101 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-02-19 11:48 +0000

1""" 

2Helper functions to prohibit boiler plate code 

3""" 

4 

5import logging 

6import re 

7import warnings 

8from aenum import Enum 

9from typing import Dict, Any, List 

10from pydantic import AnyHttpUrl, validate_call 

11from pydantic_core import PydanticCustomError 

12from filip.custom_types import AnyMqttUrl 

13from pyjexl.jexl import JEXL 

14from pyjexl.parser import Transform 

15from pyjexl.exceptions import ParseError 

16 

17logger = logging.getLogger(name=__name__) 

18 

19 

20class FiwareRegex(str, Enum): 

21 """ 

22 Collection of Regex expression used to check if the value of a Pydantic 

23 field, can be used in the related Fiware field. The regexes here are primarily 

24 defined based on the identifiers syntax restriction: 

25 https://fiware-orion.readthedocs.io/en/stable/orion-api.html#identifiers-syntax-restrictions 

26 """ 

27 

28 _init_ = "value __doc__" 

29 # Identifiers syntax restriction 

30 standard = ( 

31 r"(^((?![?&#/\"' ])[\x00-\x7F])*$)", 

32 "Prevents any string that contains at least one of the " 

33 "symbols: ? & # / ' \" or a whitespace", 

34 ) 

35 string_protect = ( 

36 r"(?!^id$)(?!^type$)(?!^geo:json$)(^((?![?&#/\"' ])[\x00-\x7F])*$)", 

37 "Prevents any string that contains at least one of " 

38 "the symbols: ? & # / ' \" or a whitespace." 

39 "AND the strings: id, type, geo:json", 

40 ) 

41 attribute_name = ( 

42 r"(?!^id$)(?!^type$)(?!^geo:json$)(^((?![\"'<>()=; §&/#?])[\x00-\x7F])*$)", 

43 "Prevents any string that contains at least one of the " 

44 "symbols: ( ) < > \" ' = ; § & / # ?", 

45 ) 

46 attribute_value = ( 

47 r"(^((?![\"'<>()=;])[\x00-\x7F])*$)", 

48 "Prevents any string that contains at least one of the " 

49 "symbols: ( ) < > \" ' = ; ", 

50 ) 

51 

52 

53@validate_call 

54def validate_http_url(url: AnyHttpUrl) -> str: 

55 """ 

56 Function checks whether the host has "http" added in case of http as 

57 protocol. 

58 

59 Args: 

60 url (AnyHttpUrl): the url for the host / port 

61 

62 Returns: 

63 validated url 

64 """ 

65 url = str(url) if url else url 

66 if url[-1] != "/": 

67 # add trailing slash 

68 url = f"{url}/" 

69 return url 

70 

71 

72@validate_call 

73def validate_mqtt_url(url: AnyMqttUrl) -> str: 

74 """ 

75 Function that checks whether a url is valid mqtt endpoint 

76 

77 Args: 

78 url: the url for the target endpoint 

79 

80 Returns: 

81 validated url 

82 """ 

83 return str(url) if url else url 

84 

85 

86def validate_escape_character_free(value: Any) -> Any: 

87 """ 

88 Function that checks whether a value contains a string part that starts 

89 or end with ' or ". 

90 the function iterates to break down each complex data-structure to its 

91 fundamental string parts. 

92 Each value of a list is examined 

93 Of dictionaries each value is examined, keys are skipped, as they are ok 

94 for Fiware 

95 

96 Args: 

97 value: the string to check 

98 

99 Returns: 

100 validated string 

101 """ 

102 

103 if not isinstance(value, List): 

104 values = [value] 

105 else: 

106 values = value 

107 

108 for value in values: 

109 if isinstance(value, Dict): 

110 for key, dict_value in value.items(): 

111 validate_escape_character_free(dict_value) 

112 # it seems Fiware has no problem if the keys contain ' or " 

113 # validate_escape_character_free(key) 

114 elif isinstance(value, List): 

115 for inner_list in value: 

116 validate_escape_character_free(inner_list) 

117 else: 

118 # if a value here is not a string, it will also not contain ' or " 

119 value = str(value) 

120 if '"' == value[-1:] or '"' == value[0:1]: 

121 raise ValueError(f"The value {value} contains " f'the forbidden char "') 

122 if "'" == value[-1:] or "'" == value[0:1]: 

123 raise ValueError(f"The value {value} contains " f"the forbidden char '") 

124 return values 

125 

126 

127def match_regex(value: str, pattern: str): 

128 regex = re.compile(pattern) 

129 if not regex.match(value): 

130 raise PydanticCustomError( 

131 "string_pattern_mismatch", 

132 "String should match pattern '{pattern}', [type='{error_type}', input_value='{value}']", 

133 { 

134 "pattern": pattern, 

135 "error_type": "string_pattern_mismatch", 

136 "value": value, 

137 }, 

138 ) 

139 return value 

140 

141 

142def ignore_none_input(func): 

143 def wrapper(arg): 

144 if arg is None: 

145 return arg 

146 return func(arg) 

147 

148 return wrapper 

149 

150 

151def validate_fiware_standard_regex(vale: str): 

152 return match_regex(vale, FiwareRegex.standard.value) 

153 

154 

155def validate_fiware_string_protect_regex(vale: str): 

156 return match_regex(vale, FiwareRegex.string_protect.value) 

157 

158 

159def validate_fiware_attribute_value_regex(vale: str): 

160 return match_regex(vale, FiwareRegex.attribute_value.value) 

161 

162 

163def validate_fiware_attribute_name_regex(vale: str): 

164 return match_regex(vale, FiwareRegex.attribute_name.value) 

165 

166 

167@ignore_none_input 

168def validate_mqtt_topic(topic: str): 

169 return match_regex(topic, r"^((?![\'\"#+,])[\x00-\x7F])*$") 

170 

171 

172@ignore_none_input 

173def validate_fiware_datatype_standard(_type): 

174 from filip.models.base import DataType 

175 

176 if isinstance(_type, DataType): 

177 return _type 

178 elif isinstance(_type, str): 

179 return validate_fiware_standard_regex(_type) 

180 else: 

181 raise TypeError(f"Invalid type {type(_type)}") 

182 

183 

184@ignore_none_input 

185def validate_fiware_datatype_string_protect(_type): 

186 from filip.models.base import DataType 

187 

188 if isinstance(_type, DataType): 

189 return _type 

190 elif isinstance(_type, str): 

191 return validate_fiware_string_protect_regex(_type) 

192 else: 

193 raise TypeError(f"Invalid type {type(_type)}") 

194 

195 

196@ignore_none_input 

197def validate_fiware_service_path(service_path): 

198 return match_regex(service_path, r"^((\/\w*)|(\/\#))*(\,((\/\w*)|(\/\#)))*$") 

199 

200 

201@ignore_none_input 

202def validate_fiware_service(service): 

203 return match_regex(service, r"\w*$") 

204 

205 

206jexl_transformation_functions = { 

207 "jsonparse": "(str) => JSON.parse(str)", 

208 "jsonstringify": "(obj) => JSON.stringify(obj)", 

209 "indexOf": "(val, char) => String(val).indexOf(char)", 

210 "length": "(val) => String(val).length", 

211 "trim": "(val) => String(val).trim()", 

212 "substr": "(val, int1, int2) => String(val).substr(int1, int2)", 

213 "addreduce": "(arr) => arr.reduce((i, v) => i + v)", 

214 "lengtharray": "(arr) => len(arr)", 

215 "typeof": "(val) => typeof val", 

216 "isarray": "(arr) => Array.isArray(arr)", 

217 "isnan": "(val) => isNaN(val)", 

218 "parseint": "(val) => parseInt(val)", 

219 "parsefloat": "(val) => parseFloat(val)", 

220 "toisodate": "(val) => new Date(val).toISOString()", 

221 "timeoffset": "(isostr) => new Date(isostr).getTimezoneOffset()", 

222 "tostring": "(val) => str(val)", 

223 "urlencode": "(val) => encodeURI(val)", 

224 "urldecode": "(val) => decodeURI(val)", 

225 "replacestr": "(str, from, to) => str.replace(from, to)", 

226 "replaceregexp": "(str, reg, to) => str.replace(reg, to)", 

227 "replaceallstr": "(str, from, to) => str.replace(from, to)", 

228 "replaceallregexp": "(str, reg, to) => str.replace(reg, to)", 

229 "split": "(str, ch) => str.split(ch)", 

230 "mapper": "(val, values, choices) => choices[values.index(val)]", 

231 "thmapper": "(val, values, choices) => choices[next((i for i, v in enumerate(values) if val <= v), None)]", 

232 "bitwisemask": "(i, mask, op, shf) => ((int(i) & mask) if op == '&' else ((int(i) | mask) if op == '|' else ((int(i) ^ mask) if op == '^' else int(i))) >> shf)", 

233 "slice": "(arr, init, end) => arr[init:end]", 

234 "addset": "(arr, x) => list(set(arr).add(x))", 

235 "removeset": "(arr, x) => list(set(arr).remove(x))", 

236 "touppercase": "(val) => str(val).upper()", 

237 "tolowercase": "(val) => str(val).lower()", 

238} 

239 

240 

241def validate_jexl_expression(expression, attribute_name, device_id): 

242 try: 

243 jexl_expression = JEXL().parse(expression) 

244 if isinstance(jexl_expression, Transform): 

245 if jexl_expression.name not in jexl_transformation_functions.keys(): 

246 warnings.warn(f"{jexl_expression.name} might not supported") 

247 except ParseError: 

248 msg = f"Invalid JEXL expression '{expression}' inside the attribute '{attribute_name}' of Device '{device_id}'." 

249 if "|" in expression: 

250 msg += " If the expression contains the transform operator '|' you need to remove the spaces around it." 

251 raise ParseError(msg) 

252 return expression 

253 

254 

255def validate_expression_language(cls, expressionLanguage): 

256 if expressionLanguage == "legacy": 

257 warnings.warn( 

258 f"Using 'LEGACY' expression language inside {cls.__name__} is " 

259 f"deprecated. Use 'JEXL' instead." 

260 ) 

261 elif expressionLanguage is None: 

262 expressionLanguage = "jexl" 

263 return expressionLanguage