Coverage for filip/models/ngsi_v2/units.py: 87%
121 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Implementation of UN/CEFACT units
4We creating the data set of UNECE units from here.
5"https://github.com/datasets/unece-units-of-measure"
6It downloads the data and stores it in external resources if not
7already present. For additional information on UNECE an the current state of
8tables visit this website:
9https://unece.org/trade/cefact/UNLOCODE-Download
10https://unece.org/trade/uncefact/cl-recommendations
11"""
12import json
13import logging
14import pandas as pd
15from functools import lru_cache
16from rapidfuzz import process
17from typing import Any, Dict, List, Optional, Union
18from typing_extensions import Literal
19from pydantic import field_validator, model_validator, ConfigDict, BaseModel, Field
20from filip.models.base import NgsiVersion, DataType
21from filip.utils.data import load_datapackage
24logger = logging.getLogger(name=__name__)
27@lru_cache()
28def load_units() -> pd.DataFrame:
29 """
30 Loads data package from public repo if not already downloaded.
31 This function will be cached for fast accessing the data set.
32 Returns:
33 Cleaned dataset containing all unit data
34 """
35 units = load_datapackage(
36 url="https://github.com/datasets/unece-units-of-measure",
37 package_name="unece-units")["units_of_measure"]
38 # remove deprecated entries
39 units = units.loc[
40 ((units.Status.str.casefold() != 'x') &
41 (units.Status.str.casefold() != 'd'))]
42 return units
45class UnitCode(BaseModel):
46 """
47 The unit of measurement given using the UN/CEFACT Common Code (3 characters)
48 or a URL. Other codes than the UN/CEFACT Common Code may be used with a
49 prefix followed by a colon.
50 https://schema.org/unitCode
52 Note:
53 Currently we only support the UN/CEFACT Common Codes
54 """
55 type: DataType = Field(default=DataType.TEXT,
56 # const=True,
57 description="Data type")
58 value: str = Field(...,
59 title="Code of unit ",
60 description="UN/CEFACT Common Code (3 characters)",
61 min_length=2,
62 max_length=3)
64 @field_validator('value')
65 @classmethod
66 def validate_code(cls, value):
67 units = load_units()
68 if len(units.loc[units.CommonCode == value.upper()]) == 1:
69 return value
70 raise KeyError("Code does not exist or is deprecated! '%s'", value)
73class UnitText(BaseModel):
74 """
75 A string or text indicating the unit of measurement. Useful if you cannot
76 provide a standard unit code for unitCode.
77 https://schema.org/unitText
79 Note:
80 We use the names of units of measurements from UN/CEFACT for validation
81 """
82 type: DataType = Field(default=DataType.TEXT,
83 # const=True,
84 description="Data type")
85 value: str = Field(...,
86 title="Name of unit of measurement",
87 description="Verbose name of a unit using British "
88 "spelling in singular form, "
89 "e.g. 'newton second per metre'")
91 @field_validator('value')
92 @classmethod
93 def validate_text(cls, value):
94 units = load_units()
96 if len(units.loc[(units.Name.str.casefold() == value.casefold())]) >= 1:
97 return value
98 names = units.Name.tolist()
99 suggestions = [item[0] for item in process.extract(
100 query=value.casefold(),
101 choices=names,
102 score_cutoff=50,
103 limit=5)]
104 raise ValueError(f"Invalid 'name' for unit! '{value}' \n "
105 f"Did you mean one of the following? \n "
106 f"{suggestions}")
109class Unit(BaseModel):
110 """
111 Model for a unit definition
112 """
113 model_config = ConfigDict(extra='ignore', populate_by_name=True)
114 _ngsi_version: Literal[NgsiVersion.v2] = NgsiVersion.v2
115 name: Optional[Union[str, UnitText]] = Field(
116 alias="unitText",
117 default=None,
118 description="A string or text indicating the unit of measurement")
119 code: Optional[Union[str, UnitCode]] = Field(
120 alias="unitCode",
121 default=None,
122 description="The unit of measurement given using the UN/CEFACT "
123 "Common Code (3 characters)")
124 description: Optional[str] = Field(
125 default=None,
126 alias="unitDescription",
127 description="Verbose description of unit",
128 max_length=350)
129 symbol: Optional[str] = Field(
130 default=None,
131 alias="unitSymbol",
132 description="The symbol used to represent the unit of measure as "
133 "in ISO 31 / 80000.")
134 conversion_factor: Optional[str] = Field(
135 default=None,
136 alias="unitConversionFactor",
137 description="The value used to convert units to the equivalent SI "
138 "unit when applicable.")
140 @model_validator(mode="before")
141 @classmethod
142 def check_consistency(cls, values):
143 """
144 Validate and auto complete unit data based on the UN/CEFACT data
145 Args:
146 values (dict): Values of a all data fields
148 Returns:
149 values (dict): Validated data
150 """
151 units = load_units()
152 name = values.get("name")
153 code = values.get("code")
155 if isinstance(name, dict):
156 name = UnitText.model_validate(name)
157 if isinstance(code, UnitCode):
158 code = code.value
159 if isinstance(name, UnitText):
160 name = name.value
162 if code and name:
163 idx = units.index[((units.CommonCode == code) &
164 (units.Name == name))]
165 if idx.empty:
166 raise ValueError("Invalid combination of 'code' and 'name': ",
167 code, name)
168 elif code:
169 idx = units.index[(units.CommonCode == code)]
170 if idx.empty:
171 raise ValueError("Invalid 'code': ", code)
172 elif name:
173 idx = units.index[(units.Name == name)]
174 if idx.empty:
175 names = units.Name.tolist()
176 suggestions = [item[0] for item in process.extract(
177 query=name.casefold(),
178 choices=names,
179 score_cutoff=50,
180 limit=5)]
182 raise ValueError(f"Invalid 'name' for unit! '{name}' \n "
183 f"Did you mean one of the following? \n "
184 f"{suggestions}")
185 else:
186 raise AssertionError("'name' or 'code' must be provided!")
188 values["code"] = UnitCode(value=units.CommonCode[idx[0]]).value
189 values["name"] = UnitText(value=units.Name[idx[0]]).value
190 values["symbol"] = units.Symbol[idx[0]]
191 values["conversion_factor"] = units.ConversionFactor[idx[0]]
192 if not values.get("description"):
193 values["description"] = units.Description[idx[0]]
194 return values
197class Units:
198 """
199 Class for easy accessing the data set of UNECE units from here.
200 "https://github.com/datasets/unece-units-of-measure"
201 """
202 units = load_units()
204 def __getattr__(self, item):
205 """
206 Return unit as attribute by name or code.
207 Notes:
208 Underscores will be substituted with whitespaces
209 Args:
210 item: if len(row) == 0:
212 Returns:
213 Unit
214 """
215 item = item.casefold().replace('_', ' ')
216 return self.__getitem__(item)
218 @property
219 def quantities(self):
220 """
221 Get list of units ordered by measured quantities
222 Returns:
223 list of units ordered by measured quantities
224 """
225 raise NotImplementedError("The used dataset does currently not "
226 "contain the information about quantity")
228 def __getitem__(self, item: str) -> Unit:
229 """
230 Get unit by name or code
232 Args:
233 item (str): name or code
235 Returns:
236 Unit
237 """
238 idx = self.units.index[((self.units.CommonCode == item.upper()) |
239 (self.units.Name.str.casefold() == item.casefold()))]
240 if idx.empty:
241 names = self.units.Name.tolist()
242 suggestions = [item[0] for item in process.extract(
243 query=item.casefold(),
244 choices=names,
245 score_cutoff=50,
246 limit=5)]
247 raise ValueError(f"Invalid 'name' for unit! '{item}' \n "
248 f"Did you mean one of the following? \n "
249 f"{suggestions}")
251 return Unit(code=self.units.CommonCode[idx[0]])
253 @classmethod
254 def keys(cls, by_code: bool = False) -> List[str]:
255 """
256 Returns list of all unit names or codes
258 Args:
259 by_code (bool): if 'True' the keys will contain the unit codes
260 instead of their names.
262 Returns:
263 List[str] containing the names or list
264 """
265 if by_code:
266 return cls.units.CommonCode.to_list()
267 return cls.units.Name.to_list()
269 @property
270 def names(self) -> List[str]:
271 """
272 Returns list of all unit names
274 Returns:
275 List[str] containing the names or list
276 """
277 return self.keys()
279 @property
280 def codes(self) -> List[str]:
281 """
282 Returns list of all unit codes
284 Returns:
285 List[str] containing the codes
286 """
287 return self.keys(by_code=True)
289 def values(self) -> List[Unit]:
290 """
291 Get list of all units
293 Returns:
294 List[Unit] containing all units
295 """
297 return [Unit(code=code) for code in self.units.CommonCode]
299 def get(self, item: str, default: Any = None):
300 """
301 Get unit by name or by code
303 Args:
304 item (str): name or code of unit
305 default (Any): Default value to return if unit does not exist.
306 Returns:
307 Unit
308 """
309 try:
310 return self.__getitem__(item)
311 except KeyError:
312 return default
315def validate_unit_data(data: Dict) -> Dict:
316 """
317 Validator for unit objects
318 Args:
319 data (Dict): Dictionary containing the metadata of an object
321 Returns:
322 Validated dictionary of metadata
323 """
324 _unit_models = {'unit': Unit,
325 "unitText": UnitText,
326 "unitCode": UnitCode}
327 for modelname, model in _unit_models.items():
328 if data.get("name", "").casefold() == modelname.casefold():
329 if data.get("name", "").casefold() == 'unit':
330 data["type"] = 'Unit'
331 data["value"] = model.model_validate(data["value"])
332 # data["value"] = model.parse_obj(data["value"])
333 return data
334 else:
335 data.update(model.model_validate(data).model_dump())
336 # data.update(model.parse_obj(data).dict())
337 return data
338 raise ValueError(f"Invalid unit data found: \n "
339 f"{json.dumps(data, indent=2)}")