Coverage for filip/models/ngsi_v2/units.py: 87%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-02-19 11:48 +0000

1""" 

2Implementation of UN/CEFACT units 

3 

4We creating the data set of UNECE units from here. 

5"https://github.com/datasets/unece-units-of-measure" 

6It downloads the data and stores it in external resources if not 

7already present. For additional information on UNECE an the current state of 

8tables visit this website: 

9https://unece.org/trade/cefact/UNLOCODE-Download 

10https://unece.org/trade/uncefact/cl-recommendations 

11""" 

12 

13import json 

14import logging 

15import pandas as pd 

16from functools import lru_cache 

17from rapidfuzz import process 

18from typing import Any, Dict, List, Optional, Union 

19from typing_extensions import Literal 

20from pydantic import field_validator, model_validator, ConfigDict, BaseModel, Field 

21from filip.models.base import NgsiVersion, DataType 

22from filip.utils.data import load_datapackage 

23 

24 

25logger = logging.getLogger(name=__name__) 

26 

27 

28@lru_cache() 

29def load_units() -> pd.DataFrame: 

30 """ 

31 Loads data package from public repo if not already downloaded. 

32 This function will be cached for fast accessing the data set. 

33 Returns: 

34 Cleaned dataset containing all unit data 

35 """ 

36 units = load_datapackage( 

37 url="https://github.com/datasets/unece-units-of-measure", 

38 package_name="unece-units", 

39 )["units_of_measure"] 

40 # remove deprecated entries 

41 units = units.loc[ 

42 ((units.Status.str.casefold() != "x") & (units.Status.str.casefold() != "d")) 

43 ] 

44 return units 

45 

46 

47class UnitCode(BaseModel): 

48 """ 

49 The unit of measurement given using the UN/CEFACT Common Code (3 characters) 

50 or a URL. Other codes than the UN/CEFACT Common Code may be used with a 

51 prefix followed by a colon. 

52 https://schema.org/unitCode 

53 

54 Note: 

55 Currently we only support the UN/CEFACT Common Codes 

56 """ 

57 

58 type: DataType = Field( 

59 default=DataType.TEXT, 

60 # const=True, 

61 description="Data type", 

62 ) 

63 value: str = Field( 

64 ..., 

65 title="Code of unit ", 

66 description="UN/CEFACT Common Code (3 characters)", 

67 min_length=2, 

68 max_length=3, 

69 ) 

70 

71 @field_validator("value") 

72 @classmethod 

73 def validate_code(cls, value): 

74 units = load_units() 

75 if len(units.loc[units.CommonCode == value.upper()]) == 1: 

76 return value 

77 raise KeyError("Code does not exist or is deprecated! '%s'", value) 

78 

79 

80class UnitText(BaseModel): 

81 """ 

82 A string or text indicating the unit of measurement. Useful if you cannot 

83 provide a standard unit code for unitCode. 

84 https://schema.org/unitText 

85 

86 Note: 

87 We use the names of units of measurements from UN/CEFACT for validation 

88 """ 

89 

90 type: DataType = Field( 

91 default=DataType.TEXT, 

92 # const=True, 

93 description="Data type", 

94 ) 

95 value: str = Field( 

96 ..., 

97 title="Name of unit of measurement", 

98 description="Verbose name of a unit using British " 

99 "spelling in singular form, " 

100 "e.g. 'newton second per metre'", 

101 ) 

102 

103 @field_validator("value") 

104 @classmethod 

105 def validate_text(cls, value): 

106 units = load_units() 

107 

108 if len(units.loc[(units.Name.str.casefold() == value.casefold())]) >= 1: 

109 return value 

110 names = units.Name.tolist() 

111 suggestions = [ 

112 item[0] 

113 for item in process.extract( 

114 query=value.casefold(), choices=names, score_cutoff=50, limit=5 

115 ) 

116 ] 

117 raise ValueError( 

118 f"Invalid 'name' for unit! '{value}' \n " 

119 f"Did you mean one of the following? \n " 

120 f"{suggestions}" 

121 ) 

122 

123 

124class Unit(BaseModel): 

125 """ 

126 Model for a unit definition 

127 """ 

128 

129 model_config = ConfigDict(extra="ignore", populate_by_name=True) 

130 _ngsi_version: Literal[NgsiVersion.v2] = NgsiVersion.v2 

131 name: Optional[Union[str, UnitText]] = Field( 

132 alias="unitText", 

133 default=None, 

134 description="A string or text indicating the unit of measurement", 

135 ) 

136 code: Optional[Union[str, UnitCode]] = Field( 

137 alias="unitCode", 

138 default=None, 

139 description="The unit of measurement given using the UN/CEFACT " 

140 "Common Code (3 characters)", 

141 ) 

142 description: Optional[str] = Field( 

143 default=None, 

144 alias="unitDescription", 

145 description="Verbose description of unit", 

146 max_length=350, 

147 ) 

148 symbol: Optional[str] = Field( 

149 default=None, 

150 alias="unitSymbol", 

151 description="The symbol used to represent the unit of measure as " 

152 "in ISO 31 / 80000.", 

153 ) 

154 conversion_factor: Optional[str] = Field( 

155 default=None, 

156 alias="unitConversionFactor", 

157 description="The value used to convert units to the equivalent SI " 

158 "unit when applicable.", 

159 ) 

160 

161 @model_validator(mode="before") 

162 @classmethod 

163 def check_consistency(cls, values): 

164 """ 

165 Validate and auto complete unit data based on the UN/CEFACT data 

166 Args: 

167 values (dict): Values of a all data fields 

168 

169 Returns: 

170 values (dict): Validated data 

171 """ 

172 units = load_units() 

173 name = values.get("name") 

174 code = values.get("code") 

175 

176 if isinstance(name, dict): 

177 name = UnitText.model_validate(name) 

178 if isinstance(code, UnitCode): 

179 code = code.value 

180 if isinstance(name, UnitText): 

181 name = name.value 

182 

183 if code and name: 

184 idx = units.index[((units.CommonCode == code) & (units.Name == name))] 

185 if idx.empty: 

186 raise ValueError( 

187 "Invalid combination of 'code' and 'name': ", code, name 

188 ) 

189 elif code: 

190 idx = units.index[(units.CommonCode == code)] 

191 if idx.empty: 

192 raise ValueError("Invalid 'code': ", code) 

193 elif name: 

194 idx = units.index[(units.Name == name)] 

195 if idx.empty: 

196 names = units.Name.tolist() 

197 suggestions = [ 

198 item[0] 

199 for item in process.extract( 

200 query=name.casefold(), choices=names, score_cutoff=50, limit=5 

201 ) 

202 ] 

203 

204 raise ValueError( 

205 f"Invalid 'name' for unit! '{name}' \n " 

206 f"Did you mean one of the following? \n " 

207 f"{suggestions}" 

208 ) 

209 else: 

210 raise AssertionError("'name' or 'code' must be provided!") 

211 

212 values["code"] = UnitCode(value=units.CommonCode[idx[0]]).value 

213 values["name"] = UnitText(value=units.Name[idx[0]]).value 

214 values["symbol"] = units.Symbol[idx[0]] 

215 values["conversion_factor"] = units.ConversionFactor[idx[0]] 

216 if not values.get("description"): 

217 values["description"] = units.Description[idx[0]] 

218 return values 

219 

220 

221class Units: 

222 """ 

223 Class for easy accessing the data set of UNECE units from here. 

224 "https://github.com/datasets/unece-units-of-measure" 

225 """ 

226 

227 units = load_units() 

228 

229 def __getattr__(self, item): 

230 """ 

231 Return unit as attribute by name or code. 

232 Notes: 

233 Underscores will be substituted with whitespaces 

234 Args: 

235 item: if len(row) == 0: 

236 

237 Returns: 

238 Unit 

239 """ 

240 item = item.casefold().replace("_", " ") 

241 return self.__getitem__(item) 

242 

243 @property 

244 def quantities(self): 

245 """ 

246 Get list of units ordered by measured quantities 

247 Returns: 

248 list of units ordered by measured quantities 

249 """ 

250 raise NotImplementedError( 

251 "The used dataset does currently not " 

252 "contain the information about quantity" 

253 ) 

254 

255 def __getitem__(self, item: str) -> Unit: 

256 """ 

257 Get unit by name or code 

258 

259 Args: 

260 item (str): name or code 

261 

262 Returns: 

263 Unit 

264 """ 

265 idx = self.units.index[ 

266 ( 

267 (self.units.CommonCode == item.upper()) 

268 | (self.units.Name.str.casefold() == item.casefold()) 

269 ) 

270 ] 

271 if idx.empty: 

272 names = self.units.Name.tolist() 

273 suggestions = [ 

274 item[0] 

275 for item in process.extract( 

276 query=item.casefold(), choices=names, score_cutoff=50, limit=5 

277 ) 

278 ] 

279 raise ValueError( 

280 f"Invalid 'name' for unit! '{item}' \n " 

281 f"Did you mean one of the following? \n " 

282 f"{suggestions}" 

283 ) 

284 

285 return Unit(code=self.units.CommonCode[idx[0]]) 

286 

287 @classmethod 

288 def keys(cls, by_code: bool = False) -> List[str]: 

289 """ 

290 Returns list of all unit names or codes 

291 

292 Args: 

293 by_code (bool): if 'True' the keys will contain the unit codes 

294 instead of their names. 

295 

296 Returns: 

297 List[str] containing the names or list 

298 """ 

299 if by_code: 

300 return cls.units.CommonCode.to_list() 

301 return cls.units.Name.to_list() 

302 

303 @property 

304 def names(self) -> List[str]: 

305 """ 

306 Returns list of all unit names 

307 

308 Returns: 

309 List[str] containing the names or list 

310 """ 

311 return self.keys() 

312 

313 @property 

314 def codes(self) -> List[str]: 

315 """ 

316 Returns list of all unit codes 

317 

318 Returns: 

319 List[str] containing the codes 

320 """ 

321 return self.keys(by_code=True) 

322 

323 def values(self) -> List[Unit]: 

324 """ 

325 Get list of all units 

326 

327 Returns: 

328 List[Unit] containing all units 

329 """ 

330 

331 return [Unit(code=code) for code in self.units.CommonCode] 

332 

333 def get(self, item: str, default: Any = None): 

334 """ 

335 Get unit by name or by code 

336 

337 Args: 

338 item (str): name or code of unit 

339 default (Any): Default value to return if unit does not exist. 

340 Returns: 

341 Unit 

342 """ 

343 try: 

344 return self.__getitem__(item) 

345 except KeyError: 

346 return default 

347 

348 

349def validate_unit_data(data: Dict) -> Dict: 

350 """ 

351 Validator for unit objects 

352 Args: 

353 data (Dict): Dictionary containing the metadata of an object 

354 

355 Returns: 

356 Validated dictionary of metadata 

357 """ 

358 _unit_models = {"unit": Unit, "unitText": UnitText, "unitCode": UnitCode} 

359 for modelname, model in _unit_models.items(): 

360 if data.get("name", "").casefold() == modelname.casefold(): 

361 if data.get("name", "").casefold() == "unit": 

362 data["type"] = "Unit" 

363 data["value"] = model.model_validate(data["value"]) 

364 # data["value"] = model.parse_obj(data["value"]) 

365 return data 

366 else: 

367 data.update(model.model_validate(data).model_dump()) 

368 # data.update(model.parse_obj(data).dict()) 

369 return data 

370 raise ValueError(f"Invalid unit data found: \n " f"{json.dumps(data, indent=2)}")