Coverage for filip/models/ngsi_v2/iot.py: 88%

183 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Module contains models for accessing and interaction with FIWARE's IoT-Agents. 

3""" 

4from __future__ import annotations 

5import logging 

6import itertools 

7import warnings 

8from enum import Enum 

9from typing import Any, Dict, Optional, List, Union 

10import pytz 

11from pydantic import field_validator, model_validator, ConfigDict, BaseModel, Field, AnyHttpUrl 

12from filip.models.base import NgsiVersion, DataType 

13from filip.models.ngsi_v2.base import \ 

14 BaseAttribute, \ 

15 BaseValueAttribute, \ 

16 BaseNameAttribute 

17from filip.utils.validators import (validate_fiware_datatype_string_protect, 

18 validate_fiware_datatype_standard, 

19 validate_jexl_expression, 

20 validate_expression_language) 

21 

22logger = logging.getLogger() 

23 

24 

25class ExpressionLanguage(str, Enum): 

26 """ 

27 Options for expression language 

28 """ 

29 LEGACY = "legacy" 

30 JEXL = "jexl" 

31 

32 

33class PayloadProtocol(str, Enum): 

34 """ 

35 Options for payload protocols 

36 """ 

37 IOTA_JSON = "IoTA-JSON" 

38 IOTA_UL = "PDI-IoTA-UltraLight" 

39 LORAWAN = "LoRaWAN" 

40 

41 

42class TransportProtocol(str, Enum): 

43 """ 

44 Options for transport protocols 

45 """ 

46 MQTT = "MQTT" 

47 AMQP = "AMQP" 

48 HTTP = "HTTP" 

49 

50 

51class IoTABaseAttribute(BaseAttribute, BaseNameAttribute): 

52 """ 

53 Base model for device attributes 

54 """ 

55 

56 expression: Optional[str] = Field( 

57 default=None, 

58 description="indicates that the value of the target attribute will " 

59 "not be the plain value or the measurement, but an " 

60 "expression based on a combination of the reported values. " 

61 "See the Expression Language definition for details " 

62 "(https://iotagent-node-lib.readthedocs.io/en/latest/" 

63 "api.html#expression-language-support)" 

64 ) 

65 entity_name: Optional[str] = Field( 

66 default=None, 

67 description="entity_name: the presence of this attribute indicates " 

68 "that the value will not be stored in the original device " 

69 "entity but in a new entity with an ID given by this " 

70 "attribute. The type of this additional entity can be " 

71 "configured with the entity_type attribute. If no type is " 

72 "configured, the device entity type is used instead. " 

73 "Entity names can be defined as expressions, using the " 

74 "Expression Language definition " 

75 "(https://iotagent-node-lib.readthedocs.io/en/latest/" 

76 "api.html#expression-language-support). Allowed characters are" 

77 " the ones in the plain ASCII set, except the following " 

78 "ones: control characters, whitespace, &, ?, / and #.", 

79 max_length=256, 

80 min_length=1, 

81 ) 

82 valid_entity_name = field_validator("entity_name")(validate_fiware_datatype_standard) 

83 entity_type: Optional[str] = Field( 

84 default=None, 

85 description="configures the type of an alternative entity. " 

86 "Allowed characters " 

87 "are the ones in the plain ASCII set, except the following " 

88 "ones: control characters, whitespace, &, ?, / and #.", 

89 max_length=256, 

90 min_length=1, 

91 ) 

92 valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard) 

93 reverse: Optional[str] = Field( 

94 default=None, 

95 description="add bidirectionality expressions to the attribute. See " 

96 "the bidirectionality transformation plugin in the " 

97 "Data Mapping Plugins section for details. " 

98 "(https://iotagent-node-lib.readthedocs.io/en/latest/api/" 

99 "index.html#data-mapping-plugins)" 

100 ) 

101 

102 def __eq__(self, other): 

103 if isinstance(other, BaseAttribute): 

104 return self.name == other.name 

105 else: 

106 return self.model_dump() == other 

107 

108 

109class DeviceAttribute(IoTABaseAttribute): 

110 """ 

111 Model for active device attributes 

112 """ 

113 object_id: Optional[str] = Field( 

114 default=None, 

115 description="name of the attribute as coming from the device." 

116 ) 

117 

118 

119class LazyDeviceAttribute(BaseNameAttribute): 

120 """ 

121 Model for lazy device attributes 

122 """ 

123 type: Union[DataType, str] = Field( 

124 default=DataType.TEXT, 

125 description="The attribute type represents the NGSI value type of the " 

126 "attribute value. Note that FIWARE NGSI has its own type " 

127 "system for attribute values, so NGSI value types are not " 

128 "the same as JSON types. Allowed characters " 

129 "are the ones in the plain ASCII set, except the following " 

130 "ones: control characters, whitespace, &, ?, / and #.", 

131 max_length=256, 

132 min_length=1, 

133 ) 

134 valid_type = field_validator("type")(validate_fiware_datatype_string_protect) 

135 

136 

137class DeviceCommand(BaseModel): 

138 """ 

139 Model for commands 

140 """ 

141 name: str = Field( 

142 description="ID of the attribute in the target entity in the " 

143 "Context Broker. Allowed characters " 

144 "are the ones in the plain ASCII set, except the following " 

145 "ones: control characters, whitespace, &, ?, / and #.", 

146 max_length=256, 

147 min_length=1, 

148 ) 

149 valid_name = field_validator("name")(validate_fiware_datatype_string_protect) 

150 type: Union[DataType, str] = Field( 

151 description="name of the type of the attribute in the target entity. ", 

152 default=DataType.COMMAND 

153 ) 

154 

155 

156class StaticDeviceAttribute(IoTABaseAttribute, BaseValueAttribute): 

157 """ 

158 Model for static device attributes 

159 """ 

160 pass 

161 

162 

163class ServiceGroup(BaseModel): 

164 """ 

165 Model for device service group. 

166 https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#service-group-api 

167 """ 

168 service: Optional[str] = Field( 

169 default=None, 

170 description="ServiceGroup of the devices of this type" 

171 ) 

172 subservice: Optional[str] = Field( 

173 default=None, 

174 description="Subservice of the devices of this type.", 

175 pattern="^/" 

176 ) 

177 resource: str = Field( 

178 description="string representing the Southbound resource that will be " 

179 "used to assign a type to a device (e.g.: pathname in the " 

180 "southbound port)." 

181 ) 

182 apikey: str = Field( 

183 description="API Key string. It is a key used for devices belonging " 

184 "to this service_group. If "", service_group does not use " 

185 "apikey, but it must be specified." 

186 ) 

187 timestamp: Optional[bool] = Field( 

188 default=None, 

189 description="Optional flag about whether or not to add the TimeInstant " 

190 "attribute to the device entity created, as well as a " 

191 "TimeInstant metadata to each attribute, with the current " 

192 "timestamp. With NGSI-LD, the Standard observedAt " 

193 "property-of-a-property is created instead." 

194 ) 

195 entity_type: Optional[str] = Field( 

196 default=None, 

197 description="name of the Entity type to assign to the group. " 

198 "Allowed characters " 

199 "are the ones in the plain ASCII set, except the following " 

200 "ones: control characters, whitespace, &, ?, / and #.", 

201 max_length=256, 

202 min_length=1, 

203 ) 

204 valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard) 

205 trust: Optional[str] = Field( 

206 default=None, 

207 description="trust token to use for secured access to the " 

208 "Context Broker for this type of devices (optional; only " 

209 "needed for secured scenarios)." 

210 ) 

211 cbHost: Optional[AnyHttpUrl] = Field( 

212 default=None, 

213 description="Context Broker connection information. This options can " 

214 "be used to override the global ones for specific types of " 

215 "devices." 

216 ) 

217 @field_validator('cbHost') 

218 @classmethod 

219 def validate_cbHost(cls, value): 

220 """ 

221 convert cbHost to str 

222 Returns: 

223 timezone 

224 """ 

225 return str(value) if value else value 

226 lazy: Optional[List[LazyDeviceAttribute]] = Field( 

227 default=[], 

228 desription="list of common lazy attributes of the device. For each " 

229 "attribute, its name and type must be provided." 

230 ) 

231 commands: Optional[List[DeviceCommand]] = Field( 

232 default=[], 

233 desription="list of common commands attributes of the device. For each " 

234 "attribute, its name and type must be provided, additional " 

235 "metadata is optional" 

236 ) 

237 attributes: Optional[List[DeviceAttribute]] = Field( 

238 default=[], 

239 description="list of common commands attributes of the device. For " 

240 "each attribute, its name and type must be provided, " 

241 "additional metadata is optional." 

242 ) 

243 static_attributes: Optional[List[StaticDeviceAttribute]] = Field( 

244 default=[], 

245 description="this attributes will be added to all the entities of this " 

246 "group 'as is', additional metadata is optional." 

247 ) 

248 internal_attributes: Optional[List[Dict[str, Any]]] = Field( 

249 default=[], 

250 description="optional section with free format, to allow specific " 

251 "IoT Agents to store information along with the devices " 

252 "in the Device Registry." 

253 ) 

254 expressionLanguage: Optional[ExpressionLanguage] = Field( 

255 default=ExpressionLanguage.JEXL, 

256 description="optional boolean value, to set expression language used " 

257 "to compute expressions, possible values are: " 

258 "legacy or jexl, but legacy is deprecated. If it is set None, jexl is used." 

259 ) 

260 valid_expressionLanguage = field_validator("expressionLanguage")(validate_expression_language) 

261 explicitAttrs: Optional[bool] = Field( 

262 default=False, 

263 description="optional boolean value, to support selective ignore " 

264 "of measures so that IOTA does not progress. If not " 

265 "specified default is false." 

266 ) 

267 autoprovision: Optional[bool] = Field( 

268 default=True, 

269 description="optional boolean: If false, autoprovisioned devices " 

270 "(i.e. devices that are not created with an explicit " 

271 "provision operation but when the first measure arrives) " 

272 "are not allowed in this group. " 

273 "Default (in the case of omitting the field) is true." 

274 ) 

275 ngsiVersion: Optional[NgsiVersion] = Field( 

276 default="v2", 

277 description="optional string value used in mixed mode to switch between" 

278 " NGSI-v2 and NGSI-LD payloads. Possible values are: " 

279 "v2 or ld. The default is v2. When not running in mixed " 

280 "mode, this field is ignored.") 

281 defaultEntityNameConjunction: Optional[str] = Field( 

282 default=None, 

283 description="optional string value to set default conjunction string " 

284 "used to compose a default entity_name when is not " 

285 "provided at device provisioning time." 

286 ) 

287 

288 

289class DeviceSettings(BaseModel): 

290 """ 

291 Model for iot device settings 

292 """ 

293 model_config = ConfigDict(validate_assignment=True) 

294 timezone: Optional[str] = Field( 

295 default='Europe/London', 

296 description="Time zone of the sensor if it has any" 

297 ) 

298 timestamp: Optional[bool] = Field( 

299 default=None, 

300 description="Optional flag about whether or not to add the TimeInstant " 

301 "attribute to the device entity created, as well as a " 

302 "TimeInstant metadata to each attribute, with the current " 

303 "timestamp. With NGSI-LD, the Standard observedAt " 

304 "property-of-a-property is created instead." 

305 ) 

306 apikey: Optional[str] = Field( 

307 default=None, 

308 description="Optional Apikey key string to use instead of group apikey" 

309 ) 

310 endpoint: Optional[AnyHttpUrl] = Field( 

311 default=None, 

312 description="Endpoint where the device is going to receive commands, " 

313 "if any." 

314 ) 

315 protocol: Optional[Union[PayloadProtocol, str]] = Field( 

316 default=None, 

317 description="Name of the device protocol, for its use with an " 

318 "IoT Manager." 

319 ) 

320 transport: Optional[Union[TransportProtocol, str]] = Field( 

321 default=None, 

322 description="Name of the device transport protocol, for the IoT Agents " 

323 "with multiple transport protocols." 

324 ) 

325 expressionLanguage: Optional[ExpressionLanguage] = Field( 

326 default=ExpressionLanguage.JEXL, 

327 description="optional boolean value, to set expression language used " 

328 "to compute expressions, possible values are: " 

329 "legacy or jexl, but legacy is deprecated. If it is set None, jexl is used." 

330 ) 

331 valid_expressionLanguage = field_validator("expressionLanguage")(validate_expression_language) 

332 explicitAttrs: Optional[bool] = Field( 

333 default=False, 

334 description="optional boolean value, to support selective ignore " 

335 "of measures so that IOTA does not progress. If not " 

336 "specified default is false." 

337 ) 

338 

339 

340class Device(DeviceSettings): 

341 """ 

342 Model for iot devices. 

343 https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#device-api 

344 """ 

345 model_config = ConfigDict(validate_default=True, validate_assignment=True) 

346 device_id: str = Field( 

347 description="Device ID that will be used to identify the device" 

348 ) 

349 service: Optional[str] = Field( 

350 default=None, 

351 description="Name of the service the device belongs to " 

352 "(will be used in the fiware-service header).", 

353 max_length=50 

354 ) 

355 service_path: Optional[str] = Field( 

356 default="/", 

357 description="Name of the subservice the device belongs to " 

358 "(used in the fiware-servicepath header).", 

359 max_length=51, 

360 pattern="^/" 

361 ) 

362 entity_name: str = Field( 

363 description="Name of the entity representing the device in " 

364 "the Context Broker Allowed characters " 

365 "are the ones in the plain ASCII set, except the following " 

366 "ones: control characters, whitespace, &, ?, / and #.", 

367 max_length=256, 

368 min_length=1, 

369 ) 

370 valid_entity_name = field_validator("entity_name")(validate_fiware_datatype_standard) 

371 entity_type: str = Field( 

372 description="Type of the entity in the Context Broker. " 

373 "Allowed characters " 

374 "are the ones in the plain ASCII set, except the following " 

375 "ones: control characters, whitespace, &, ?, / and #.", 

376 max_length=256, 

377 min_length=1, 

378 ) 

379 valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard) 

380 lazy: List[LazyDeviceAttribute] = Field( 

381 default=[], 

382 description="List of lazy attributes of the device" 

383 ) 

384 commands: List[DeviceCommand] = Field( 

385 default=[], 

386 desription="List of commands of the device" 

387 ) 

388 attributes: List[DeviceAttribute] = Field( 

389 default=[], 

390 description="List of active attributes of the device" 

391 ) 

392 static_attributes: Optional[List[StaticDeviceAttribute]] = Field( 

393 default=[], 

394 description="List of static attributes to append to the entity. All the" 

395 " updateContext requests to the CB will have this set of " 

396 "attributes appended." 

397 ) 

398 internal_attributes: Optional[List[Dict[str, Any]]] = Field( 

399 default=[], 

400 description="List of internal attributes with free format for specific " 

401 "IoT Agent configuration" 

402 ) 

403 ngsiVersion: NgsiVersion = Field( 

404 default=NgsiVersion.v2, 

405 description="optional string value used in mixed mode to switch between" 

406 " NGSI-v2 and NGSI-LD payloads. Possible values are: " 

407 "v2 or ld. The default is v2. When not running in " 

408 "mixed mode, this field is ignored.") 

409 

410 @field_validator('timezone') 

411 @classmethod 

412 def validate_timezone(cls, value): 

413 """ 

414 validate timezone 

415 Returns: 

416 timezone 

417 """ 

418 assert value in pytz.all_timezones 

419 return value 

420 

421 @model_validator(mode='after') 

422 def validate_device_attributes_expression(self): 

423 """ 

424 Validates device attributes expressions based on the expression language (JEXL or Legacy, where Legacy is 

425 deprecated). 

426 

427 Args: 

428 self: The Device instance. 

429 

430 Returns: 

431 The Device instance after validation. 

432 """ 

433 if self.expressionLanguage == ExpressionLanguage.JEXL: 

434 for attribute in self.attributes: 

435 if attribute.expression: 

436 validate_jexl_expression(attribute.expression, attribute.name, self.device_id) 

437 elif self.expressionLanguage == ExpressionLanguage.LEGACY: 

438 warnings.warn(f"No validation for legacy expression language of Device {self.device_id}.") 

439 

440 return self 

441 

442 @model_validator(mode='after') 

443 def validate_duplicated_device_attributes(self): 

444 """ 

445 Check whether device has identical attributes 

446 Args: 

447 self: dict of Device instance. 

448 

449 Returns: 

450 The dict of Device instance after validation. 

451 """ 

452 for i, attr in enumerate(self.attributes): 

453 for other_attr in self.attributes[:i] + self.attributes[i + 1:]: 

454 if attr.model_dump() == other_attr.model_dump(): 

455 raise ValueError(f"Duplicated attributes found: {attr.name}") 

456 return self 

457 

458 @model_validator(mode='after') 

459 def validate_device_attributes_name_object_id(self): 

460 """ 

461 Validate the device regarding the behavior with devices attributes. 

462 According to https://iotagent-node-lib.readthedocs.io/en/latest/api.html and 

463 based on our best practice, following rules are checked 

464 - name is required, but not necessarily unique 

465 - object_id is not required, if given must be unique, i.e. not equal to any 

466 existing object_id and name 

467 Args: 

468 self: dict of Device instance. 

469 

470 Returns: 

471 The dict of Device instance after validation. 

472 """ 

473 for i, attr in enumerate(self.attributes): 

474 for other_attr in self.attributes[:i] + self.attributes[i + 1:]: 

475 if attr.object_id and other_attr.object_id and \ 

476 attr.object_id == other_attr.object_id: 

477 raise ValueError(f"object_id {attr.object_id} is not unique") 

478 if attr.object_id and attr.object_id == other_attr.name: 

479 raise ValueError(f"object_id {attr.object_id} is not unique") 

480 return self 

481 

482 def get_attribute(self, attribute_name: str) -> Union[DeviceAttribute, 

483 LazyDeviceAttribute, 

484 StaticDeviceAttribute, 

485 DeviceCommand]: 

486 """ 

487 

488 Args: 

489 attribute_name: 

490 

491 Returns: 

492 

493 """ 

494 for attribute in itertools.chain(self.attributes, 

495 self.lazy, 

496 self.static_attributes, 

497 self.internal_attributes, 

498 self.commands): 

499 if attribute.name == attribute_name: 

500 return attribute 

501 msg = f"Device: {self.device_id}: Could not " \ 

502 f"find attribute with name {attribute_name}" 

503 logger.error(msg) 

504 raise KeyError(msg) 

505 

506 def add_attribute(self, 

507 attribute: Union[DeviceAttribute, 

508 LazyDeviceAttribute, 

509 StaticDeviceAttribute, 

510 DeviceCommand], 

511 update: bool = False) -> None: 

512 """ 

513 

514 Args: 

515 attribute: 

516 update (bool): If 'True' and attribute does already exists tries 

517 to update the attribute if not 

518 Returns: 

519 None 

520 """ 

521 try: 

522 if type(attribute) == DeviceAttribute: 

523 if attribute.model_dump(exclude_none=True) in \ 

524 [attr.model_dump(exclude_none=True) for attr in self.attributes]: 

525 raise ValueError 

526 

527 self.attributes.append(attribute) 

528 self.__setattr__(name='attributes', 

529 value=self.attributes) 

530 elif type(attribute) == LazyDeviceAttribute: 

531 if attribute in self.lazy: 

532 raise ValueError 

533 

534 self.lazy.append(attribute) 

535 self.__setattr__(name='lazy', 

536 value=self.lazy) 

537 elif type(attribute) == StaticDeviceAttribute: 

538 if attribute in self.static_attributes: 

539 raise ValueError 

540 

541 self.static_attributes.append(attribute) 

542 self.__setattr__(name='static_attributes', 

543 value=self.static_attributes) 

544 elif type(attribute) == DeviceCommand: 

545 if attribute in self.commands: 

546 raise ValueError 

547 

548 self.commands.append(attribute) 

549 self.__setattr__(name='commands', 

550 value=self.commands) 

551 else: 

552 raise ValueError 

553 except ValueError: 

554 if update: 

555 self.update_attribute(attribute, append=False) 

556 logger.warning("Device: %s: Attribute already " 

557 "exists. Will update: \n %s", 

558 self.device_id, attribute.model_dump_json(indent=2)) 

559 else: 

560 logger.error("Device: %s: Attribute already " 

561 "exists: \n %s", self.device_id, 

562 attribute.model_dump_json(indent=2)) 

563 raise 

564 

565 def update_attribute(self, 

566 attribute: Union[DeviceAttribute, 

567 LazyDeviceAttribute, 

568 StaticDeviceAttribute, 

569 DeviceCommand], 

570 append: bool = False) -> None: 

571 """ 

572 Updates existing device attribute 

573 

574 Args: 

575 attribute: Attribute to add to device configuration 

576 append (bool): Adds attribute if not exist 

577 

578 Returns: 

579 None 

580 """ 

581 try: 

582 if type(attribute) == DeviceAttribute: 

583 idx = self.attributes.index(attribute) 

584 self.attributes[idx].model_dump().update(attribute.model_dump()) 

585 elif type(attribute) == LazyDeviceAttribute: 

586 idx = self.lazy.index(attribute) 

587 self.lazy[idx].model_dump().update(attribute.model_dump()) 

588 elif type(attribute) == StaticDeviceAttribute: 

589 idx = self.static_attributes.index(attribute) 

590 self.static_attributes[idx].model_dump().update(attribute.model_dump()) 

591 elif type(attribute) == DeviceCommand: 

592 idx = self.commands.index(attribute) 

593 self.commands[idx].model_dump().update(attribute.model_dump()) 

594 except ValueError: 

595 if append: 

596 logger.warning("Device: %s: Could not find " 

597 "attribute: \n %s", 

598 self.device_id, attribute.model_dump_json(indent=2)) 

599 self.add_attribute(attribute=attribute) 

600 else: 

601 msg = f"Device: {self.device_id}: Could not find "\ 

602 f"attribute: \n {attribute.model_dump_json(indent=2)}" 

603 raise KeyError(msg) 

604 

605 def delete_attribute(self, attribute: Union[DeviceAttribute, 

606 LazyDeviceAttribute, 

607 StaticDeviceAttribute, 

608 DeviceCommand]): 

609 """ 

610 Deletes attribute from device 

611 Args: 

612 attribute: () 

613 

614 Returns: 

615 

616 """ 

617 try: 

618 if type(attribute) == DeviceAttribute: 

619 self.attributes.remove(attribute) 

620 elif type(attribute) == LazyDeviceAttribute: 

621 self.lazy.remove(attribute) 

622 elif type(attribute) == StaticDeviceAttribute: 

623 self.static_attributes.remove(attribute) 

624 elif type(attribute) == DeviceCommand: 

625 self.commands.remove(attribute) 

626 else: 

627 raise ValueError 

628 except ValueError: 

629 logger.warning("Device: %s: Could not delete " 

630 "attribute: \n %s", 

631 self.device_id, attribute.model_dump_json(indent=2)) 

632 raise 

633 

634 logger.info("Device: %s: Attribute deleted! \n %s", 

635 self.device_id, attribute.model_dump_json(indent=2)) 

636 

637 def get_command(self, command_name: str): 

638 """ 

639 Short for self.get_attributes 

640 Args: 

641 command_name (str): 

642 Returns: 

643 

644 """ 

645 return self.get_attribute(attribute_name=command_name) 

646 

647 def add_command(self, command: DeviceCommand, update: bool = False): 

648 """ 

649 Short for self.add_attribute 

650 Args: 

651 command (DeviceCommand): 

652 update (bool): Update command if it already exists 

653 Returns: 

654 """ 

655 self.add_attribute(attribute=command, update=update) 

656 

657 def update_command(self, command: DeviceCommand, append: bool = False): 

658 """ 

659 Short for self.update_attribute 

660 Args: 

661 command: 

662 append: 

663 Returns: 

664 """ 

665 self.update_attribute(attribute=command, append=append) 

666 

667 def delete_command(self, command: DeviceCommand): 

668 """ 

669 Short for self.delete_attribute 

670 Args: 

671 command: 

672 

673 Returns: 

674 None 

675 """ 

676 self.delete_attribute(attribute=command)