Coverage for filip/models/ngsi_v2/units.py: 87%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Implementation of UN/CEFACT units 

3 

4We creating the data set of UNECE units from here. 

5"https://github.com/datasets/unece-units-of-measure" 

6It downloads the data and stores it in external resources if not 

7already present. For additional information on UNECE an the current state of 

8tables visit this website: 

9https://unece.org/trade/cefact/UNLOCODE-Download 

10https://unece.org/trade/uncefact/cl-recommendations 

11""" 

12import json 

13import logging 

14import pandas as pd 

15from functools import lru_cache 

16from rapidfuzz import process 

17from typing import Any, Dict, List, Optional, Union 

18from typing_extensions import Literal 

19from pydantic import field_validator, model_validator, ConfigDict, BaseModel, Field 

20from filip.models.base import NgsiVersion, DataType 

21from filip.utils.data import load_datapackage 

22 

23 

24logger = logging.getLogger(name=__name__) 

25 

26 

27@lru_cache() 

28def load_units() -> pd.DataFrame: 

29 """ 

30 Loads data package from public repo if not already downloaded. 

31 This function will be cached for fast accessing the data set. 

32 Returns: 

33 Cleaned dataset containing all unit data 

34 """ 

35 units = load_datapackage( 

36 url="https://github.com/datasets/unece-units-of-measure", 

37 package_name="unece-units")["units_of_measure"] 

38 # remove deprecated entries 

39 units = units.loc[ 

40 ((units.Status.str.casefold() != 'x') & 

41 (units.Status.str.casefold() != 'd'))] 

42 return units 

43 

44 

45class UnitCode(BaseModel): 

46 """ 

47 The unit of measurement given using the UN/CEFACT Common Code (3 characters) 

48 or a URL. Other codes than the UN/CEFACT Common Code may be used with a 

49 prefix followed by a colon. 

50 https://schema.org/unitCode 

51 

52 Note: 

53 Currently we only support the UN/CEFACT Common Codes 

54 """ 

55 type: DataType = Field(default=DataType.TEXT, 

56 # const=True, 

57 description="Data type") 

58 value: str = Field(..., 

59 title="Code of unit ", 

60 description="UN/CEFACT Common Code (3 characters)", 

61 min_length=2, 

62 max_length=3) 

63 

64 @field_validator('value') 

65 @classmethod 

66 def validate_code(cls, value): 

67 units = load_units() 

68 if len(units.loc[units.CommonCode == value.upper()]) == 1: 

69 return value 

70 raise KeyError("Code does not exist or is deprecated! '%s'", value) 

71 

72 

73class UnitText(BaseModel): 

74 """ 

75 A string or text indicating the unit of measurement. Useful if you cannot 

76 provide a standard unit code for unitCode. 

77 https://schema.org/unitText 

78 

79 Note: 

80 We use the names of units of measurements from UN/CEFACT for validation 

81 """ 

82 type: DataType = Field(default=DataType.TEXT, 

83 # const=True, 

84 description="Data type") 

85 value: str = Field(..., 

86 title="Name of unit of measurement", 

87 description="Verbose name of a unit using British " 

88 "spelling in singular form, " 

89 "e.g. 'newton second per metre'") 

90 

91 @field_validator('value') 

92 @classmethod 

93 def validate_text(cls, value): 

94 units = load_units() 

95 

96 if len(units.loc[(units.Name.str.casefold() == value.casefold())]) >= 1: 

97 return value 

98 names = units.Name.tolist() 

99 suggestions = [item[0] for item in process.extract( 

100 query=value.casefold(), 

101 choices=names, 

102 score_cutoff=50, 

103 limit=5)] 

104 raise ValueError(f"Invalid 'name' for unit! '{value}' \n " 

105 f"Did you mean one of the following? \n " 

106 f"{suggestions}") 

107 

108 

109class Unit(BaseModel): 

110 """ 

111 Model for a unit definition 

112 """ 

113 model_config = ConfigDict(extra='ignore', populate_by_name=True) 

114 _ngsi_version: Literal[NgsiVersion.v2] = NgsiVersion.v2 

115 name: Optional[Union[str, UnitText]] = Field( 

116 alias="unitText", 

117 default=None, 

118 description="A string or text indicating the unit of measurement") 

119 code: Optional[Union[str, UnitCode]] = Field( 

120 alias="unitCode", 

121 default=None, 

122 description="The unit of measurement given using the UN/CEFACT " 

123 "Common Code (3 characters)") 

124 description: Optional[str] = Field( 

125 default=None, 

126 alias="unitDescription", 

127 description="Verbose description of unit", 

128 max_length=350) 

129 symbol: Optional[str] = Field( 

130 default=None, 

131 alias="unitSymbol", 

132 description="The symbol used to represent the unit of measure as " 

133 "in ISO 31 / 80000.") 

134 conversion_factor: Optional[str] = Field( 

135 default=None, 

136 alias="unitConversionFactor", 

137 description="The value used to convert units to the equivalent SI " 

138 "unit when applicable.") 

139 

140 @model_validator(mode="before") 

141 @classmethod 

142 def check_consistency(cls, values): 

143 """ 

144 Validate and auto complete unit data based on the UN/CEFACT data 

145 Args: 

146 values (dict): Values of a all data fields 

147 

148 Returns: 

149 values (dict): Validated data 

150 """ 

151 units = load_units() 

152 name = values.get("name") 

153 code = values.get("code") 

154 

155 if isinstance(name, dict): 

156 name = UnitText.model_validate(name) 

157 if isinstance(code, UnitCode): 

158 code = code.value 

159 if isinstance(name, UnitText): 

160 name = name.value 

161 

162 if code and name: 

163 idx = units.index[((units.CommonCode == code) & 

164 (units.Name == name))] 

165 if idx.empty: 

166 raise ValueError("Invalid combination of 'code' and 'name': ", 

167 code, name) 

168 elif code: 

169 idx = units.index[(units.CommonCode == code)] 

170 if idx.empty: 

171 raise ValueError("Invalid 'code': ", code) 

172 elif name: 

173 idx = units.index[(units.Name == name)] 

174 if idx.empty: 

175 names = units.Name.tolist() 

176 suggestions = [item[0] for item in process.extract( 

177 query=name.casefold(), 

178 choices=names, 

179 score_cutoff=50, 

180 limit=5)] 

181 

182 raise ValueError(f"Invalid 'name' for unit! '{name}' \n " 

183 f"Did you mean one of the following? \n " 

184 f"{suggestions}") 

185 else: 

186 raise AssertionError("'name' or 'code' must be provided!") 

187 

188 values["code"] = UnitCode(value=units.CommonCode[idx[0]]).value 

189 values["name"] = UnitText(value=units.Name[idx[0]]).value 

190 values["symbol"] = units.Symbol[idx[0]] 

191 values["conversion_factor"] = units.ConversionFactor[idx[0]] 

192 if not values.get("description"): 

193 values["description"] = units.Description[idx[0]] 

194 return values 

195 

196 

197class Units: 

198 """ 

199 Class for easy accessing the data set of UNECE units from here. 

200 "https://github.com/datasets/unece-units-of-measure" 

201 """ 

202 units = load_units() 

203 

204 def __getattr__(self, item): 

205 """ 

206 Return unit as attribute by name or code. 

207 Notes: 

208 Underscores will be substituted with whitespaces 

209 Args: 

210 item: if len(row) == 0: 

211 

212 Returns: 

213 Unit 

214 """ 

215 item = item.casefold().replace('_', ' ') 

216 return self.__getitem__(item) 

217 

218 @property 

219 def quantities(self): 

220 """ 

221 Get list of units ordered by measured quantities 

222 Returns: 

223 list of units ordered by measured quantities 

224 """ 

225 raise NotImplementedError("The used dataset does currently not " 

226 "contain the information about quantity") 

227 

228 def __getitem__(self, item: str) -> Unit: 

229 """ 

230 Get unit by name or code 

231 

232 Args: 

233 item (str): name or code 

234 

235 Returns: 

236 Unit 

237 """ 

238 idx = self.units.index[((self.units.CommonCode == item.upper()) | 

239 (self.units.Name.str.casefold() == item.casefold()))] 

240 if idx.empty: 

241 names = self.units.Name.tolist() 

242 suggestions = [item[0] for item in process.extract( 

243 query=item.casefold(), 

244 choices=names, 

245 score_cutoff=50, 

246 limit=5)] 

247 raise ValueError(f"Invalid 'name' for unit! '{item}' \n " 

248 f"Did you mean one of the following? \n " 

249 f"{suggestions}") 

250 

251 return Unit(code=self.units.CommonCode[idx[0]]) 

252 

253 @classmethod 

254 def keys(cls, by_code: bool = False) -> List[str]: 

255 """ 

256 Returns list of all unit names or codes 

257 

258 Args: 

259 by_code (bool): if 'True' the keys will contain the unit codes 

260 instead of their names. 

261 

262 Returns: 

263 List[str] containing the names or list 

264 """ 

265 if by_code: 

266 return cls.units.CommonCode.to_list() 

267 return cls.units.Name.to_list() 

268 

269 @property 

270 def names(self) -> List[str]: 

271 """ 

272 Returns list of all unit names 

273 

274 Returns: 

275 List[str] containing the names or list 

276 """ 

277 return self.keys() 

278 

279 @property 

280 def codes(self) -> List[str]: 

281 """ 

282 Returns list of all unit codes 

283 

284 Returns: 

285 List[str] containing the codes 

286 """ 

287 return self.keys(by_code=True) 

288 

289 def values(self) -> List[Unit]: 

290 """ 

291 Get list of all units 

292 

293 Returns: 

294 List[Unit] containing all units 

295 """ 

296 

297 return [Unit(code=code) for code in self.units.CommonCode] 

298 

299 def get(self, item: str, default: Any = None): 

300 """ 

301 Get unit by name or by code 

302 

303 Args: 

304 item (str): name or code of unit 

305 default (Any): Default value to return if unit does not exist. 

306 Returns: 

307 Unit 

308 """ 

309 try: 

310 return self.__getitem__(item) 

311 except KeyError: 

312 return default 

313 

314 

315def validate_unit_data(data: Dict) -> Dict: 

316 """ 

317 Validator for unit objects 

318 Args: 

319 data (Dict): Dictionary containing the metadata of an object 

320 

321 Returns: 

322 Validated dictionary of metadata 

323 """ 

324 _unit_models = {'unit': Unit, 

325 "unitText": UnitText, 

326 "unitCode": UnitCode} 

327 for modelname, model in _unit_models.items(): 

328 if data.get("name", "").casefold() == modelname.casefold(): 

329 if data.get("name", "").casefold() == 'unit': 

330 data["type"] = 'Unit' 

331 data["value"] = model.model_validate(data["value"]) 

332 # data["value"] = model.parse_obj(data["value"]) 

333 return data 

334 else: 

335 data.update(model.model_validate(data).model_dump()) 

336 # data.update(model.parse_obj(data).dict()) 

337 return data 

338 raise ValueError(f"Invalid unit data found: \n " 

339 f"{json.dumps(data, indent=2)}")