Coverage for filip/models/ngsi_v2/subscriptions.py: 96%

141 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2This module contains NGSIv2 models for context subscription in the context 

3broker. 

4""" 

5from typing import Any, List, Dict, Union, Optional 

6from datetime import datetime 

7from aenum import Enum 

8from pydantic import \ 

9 field_validator, model_validator, ConfigDict, BaseModel, \ 

10 conint, \ 

11 Field, \ 

12 Json 

13from .base import AttrsFormat, EntityPattern, Http, Status, Expression 

14from filip.utils.validators import ( 

15 validate_mqtt_url, 

16 validate_mqtt_topic 

17) 

18from filip.models.ngsi_v2.context import ContextEntity 

19from filip.models.ngsi_v2.base import ( 

20 EntityPattern, 

21 Expression, 

22 DataType 

23) 

24from filip.custom_types import AnyMqttUrl 

25import warnings 

26 

27# The pydantic models still have a .json() function, but this method is deprecated. 

28warnings.filterwarnings("ignore", category=UserWarning, 

29 message='Field name "json" shadows an attribute in parent "Http"') 

30warnings.filterwarnings("ignore", category=UserWarning, 

31 message='Field name "json" shadows an attribute in parent "Mqtt"') 

32 

33 

34class NgsiPayloadAttr(BaseModel): 

35 """ 

36 Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. 

37 The difference between this model and the usual BaseValueAttribute model is that 

38 a metadata field is not allowed. 

39 In the absence of type/value in some attribute field, one should resort to partial 

40 representations ( as specified in the orion api manual), done by the BaseValueAttr. 

41 model. 

42 """ 

43 model_config = ConfigDict(extra="forbid") 

44 type: Union[DataType, str] = Field( 

45 default=DataType.TEXT, 

46 description="The attribute type represents the NGSI value type of the " 

47 "attribute value. Note that FIWARE NGSI has its own type " 

48 "system for attribute values, so NGSI value types are not " 

49 "the same as JSON types. Allowed characters " 

50 "are the ones in the plain ASCII set, except the following " 

51 "ones: control characters, whitespace, &, ?, / and #.", 

52 max_length=256, 

53 min_length=1, 

54 ) 

55 value: Optional[Any] = Field( 

56 default=None, title="Attribute value", description="the actual data" 

57 ) 

58 

59 

60class NgsiPayload(BaseModel): 

61 """ 

62 Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. 

63 Differences between this model and the usual Context entity models include: 

64 - id and type are not mandatory 

65 - an attribute metadata field is not allowed 

66 """ 

67 model_config = ConfigDict( 

68 extra="allow", validate_default=True 

69 ) 

70 id: Optional[str] = Field( 

71 default=None, 

72 max_length=256, 

73 min_length=1, 

74 frozen=True 

75 ) 

76 type: Optional[Union[str, Enum]] = Field( 

77 default=None, 

78 max_length=256, 

79 min_length=1, 

80 frozen=True, 

81 ) 

82 

83 @model_validator(mode='after') 

84 def validate_notification_attrs(self): 

85 for v in self.model_dump(exclude={"id", "type"}).values(): 

86 assert isinstance(NgsiPayloadAttr.model_validate(v), NgsiPayloadAttr) 

87 return self 

88 

89 

90class Message(BaseModel): 

91 """ 

92 Model for a notification message, when sent to other NGSIv2-APIs 

93 """ 

94 subscriptionId: Optional[str] = Field( 

95 default=None, 

96 description="Id of the subscription the notification comes from", 

97 ) 

98 data: List[ContextEntity] = Field( 

99 description="is an array with the notification data itself which " 

100 "includes the entity and all concerned attributes. Each " 

101 "element in the array corresponds to a different entity. " 

102 "By default, the entities are represented in normalized " 

103 "mode. However, using the attrsFormat modifier, a " 

104 "simplified representation mode can be requested." 

105 ) 

106 

107 

108class HttpMethods(str, Enum): 

109 _init_ = 'value __doc__' 

110 

111 POST = "POST", "Post Method" 

112 PUT = "PUT", "Put Method" 

113 PATCH = "PATCH", "Patch Method" 

114 

115 

116class HttpCustom(Http): 

117 """ 

118 Model for custom notification patterns sent via HTTP 

119 """ 

120 headers: Optional[Dict[str, Union[str, Json]]] = Field( 

121 default=None, 

122 description="a key-map of HTTP headers that are included in " 

123 "notification messages." 

124 ) 

125 qs: Optional[Dict[str, Union[str, Json]]] = Field( 

126 default=None, 

127 description="a key-map of URL query parameters that are included in " 

128 "notification messages." 

129 ) 

130 method: str = Field( 

131 default=HttpMethods.POST, 

132 description="the method to use when sending the notification " 

133 "(default is POST). Only valid HTTP methods are allowed. " 

134 "On specifying an invalid HTTP method, a 400 Bad Request " 

135 "error is returned." 

136 ) 

137 payload: Optional[str] = Field( 

138 default=None, 

139 description='the payload to be used in notifications. If omitted, the ' 

140 'default payload (see "Notification Messages" sections) ' 

141 'is used.' 

142 ) 

143 json: Optional[Dict[str, Union[str, Json]]] = Field( 

144 default=None, 

145 description='get a json as notification. If omitted, the default' 

146 'payload (see "Notification Messages" sections) is used.' 

147 ) 

148 ngsi: Optional[NgsiPayload] = Field( 

149 default=None, 

150 description='get an NGSI-v2 normalized entity as notification.If omitted, ' 

151 'the default payload (see "Notification Messages" sections) is used.' 

152 ) 

153 timeout: Optional[int] = Field( 

154 default=None, 

155 description="Maximum time (in milliseconds) the subscription waits for the " 

156 "response. The maximum value allowed for this parameter is 1800000 " 

157 "(30 minutes). If timeout is defined to 0 or omitted, then the value " 

158 "passed as -httpTimeout CLI parameter is used. See section in the " 

159 "'Command line options' for more details." 

160 ) 

161 

162 @model_validator(mode='after') 

163 def validate_notification_payloads(self): 

164 fields = [self.payload, self.json, self.ngsi] 

165 filled_fields = [field for field in fields if field is not None] 

166 

167 if len(filled_fields) > 1: 

168 raise ValueError("Only one of payload, json or ngsi fields accepted at the " 

169 "same time in httpCustom.") 

170 

171 return self 

172 

173 

174class Mqtt(BaseModel): 

175 """ 

176 Model for notifications sent via MQTT 

177 https://fiware-orion.readthedocs.io/en/3.8.0/user/mqtt_notifications/index.html 

178 """ 

179 url: Union[AnyMqttUrl, str] = Field( 

180 description='to specify the MQTT broker endpoint to use. URL must ' 

181 'start with mqtt:// and never contains a path (i.e. it ' 

182 'only includes host and port)') 

183 topic: str = Field( 

184 description='to specify the MQTT topic to use', 

185 ) 

186 valid_type = field_validator("topic")(validate_mqtt_topic) 

187 qos: Optional[int] = Field( 

188 default=0, 

189 description='to specify the MQTT QoS value to use in the ' 

190 'notifications associated to the subscription (0, 1 or 2). ' 

191 'This is an optional field, if omitted then QoS 0 is used.', 

192 ge=0, 

193 le=2) 

194 user: Optional[str] = Field( 

195 default=None, 

196 description="username if required" 

197 ) 

198 passwd: Optional[str] = Field( 

199 default=None, 

200 description="password if required" 

201 ) 

202 

203 @field_validator('url') 

204 @classmethod 

205 def check_url(cls, value): 

206 """ 

207 Check if url has a valid structure 

208 Args: 

209 value: url to validate 

210 Returns: 

211 validated url 

212 """ 

213 return validate_mqtt_url(url=value) 

214 

215 

216class MqttCustom(Mqtt): 

217 """ 

218 Model for custom notification patterns sent via MQTT 

219 https://fiware-orion.readthedocs.io/en/3.8.0/user/mqtt_notifications/index.html 

220 """ 

221 payload: Optional[str] = Field( 

222 default=None, 

223 description='the payload to be used in notifications. If omitted, the ' 

224 'default payload (see "Notification Messages" sections) ' 

225 'is used.' 

226 ) 

227 json: Optional[Dict[str, Any]] = Field( 

228 default=None, 

229 description='get a json as notification. If omitted, the default' 

230 'payload (see "Notification Messages" sections) is used.' 

231 ) 

232 ngsi: Optional[NgsiPayload] = Field( 

233 default=None, 

234 description='get an NGSI-v2 normalized entity as notification.If omitted, ' 

235 'the default payload (see "Notification Messages" sections) is used.' 

236 ) 

237 

238 @model_validator(mode='after') 

239 def validate_payload_type(self): 

240 assert len([v for k, v in self.model_dump().items() 

241 if ((v is not None) and (k in ['payload', 'ngsi', 'json']))]) <= 1 

242 return self 

243 

244 

245class Notification(BaseModel): 

246 """ 

247 If the notification attributes are left empty, all attributes will be 

248 included in the notifications. Otherwise, only the specified ones will 

249 be included. 

250 """ 

251 model_config = ConfigDict(validate_assignment=True) 

252 timesSent: Optional[Any] = Field( 

253 default=None, 

254 description="Not editable, only present in GET operations. " 

255 "Number of notifications sent due to this subscription." 

256 ) 

257 http: Optional[Http] = Field( 

258 default=None, 

259 description='It is used to convey parameters for notifications ' 

260 'delivered through the HTTP protocol. Cannot be used ' 

261 'together with "httpCustom, mqtt, mqttCustom"' 

262 ) 

263 httpCustom: Optional[HttpCustom] = Field( 

264 default=None, 

265 description='It is used to convey parameters for notifications ' 

266 'delivered through the HTTP protocol. Cannot be used ' 

267 'together with "http"' 

268 ) 

269 mqtt: Optional[Mqtt] = Field( 

270 default=None, 

271 description='It is used to convey parameters for notifications ' 

272 'delivered through the MQTT protocol. Cannot be used ' 

273 'together with "http, httpCustom, mqttCustom"' 

274 ) 

275 mqttCustom: Optional[MqttCustom] = Field( 

276 default=None, 

277 description='It is used to convey parameters for notifications ' 

278 'delivered through the MQTT protocol. Cannot be used ' 

279 'together with "http, httpCustom, mqtt"' 

280 ) 

281 attrs: Optional[List[str]] = Field( 

282 default=None, 

283 description='List of attributes to be included in notification ' 

284 'messages. It also defines the order in which attributes ' 

285 'must appear in notifications when attrsFormat value is ' 

286 'used (see "Notification Messages" section). An empty list ' 

287 'means that all attributes are to be included in ' 

288 'notifications. See "Filtering out attributes and ' 

289 'metadata" section for more detail.' 

290 ) 

291 exceptAttrs: Optional[List[str]] = Field( 

292 default=None, 

293 description='List of attributes to be excluded from the notification ' 

294 'message, i.e. a notification message includes all entity ' 

295 'attributes except the ones listed in this field.' 

296 ) 

297 attrsFormat: Optional[AttrsFormat] = Field( 

298 default=AttrsFormat.NORMALIZED, 

299 description='specifies how the entities are represented in ' 

300 'notifications. Accepted values are normalized (default), ' 

301 'keyValues or values. If attrsFormat takes any value ' 

302 'different than those, an error is raised. See detail in ' 

303 '"Notification Messages" section.' 

304 ) 

305 metadata: Optional[Any] = Field( 

306 default=None, 

307 description='List of metadata to be included in notification messages. ' 

308 'See "Filtering out attributes and metadata" section for ' 

309 'more detail.' 

310 ) 

311 onlyChangedAttrs: Optional[bool] = Field( 

312 default=False, 

313 description='Only supported by Orion Context Broker!' 

314 'If set to true then notifications associated to the ' 

315 'subscription include only attributes that changed in the ' 

316 'triggering update request, in combination with the attrs ' 

317 'or exceptAttrs field. For instance, if attrs is ' 

318 '[A=1, B=2, C=3] and A=0 is updated. In case ' 

319 'onlyChangedAttrs=false, CB notifies [A=0, B=2, C=3].' 

320 'In case onlyChangedAttrs=true, CB notifies ' 

321 '[A=0, B=null, C=null]. This ' 

322 ) 

323 covered: Optional[bool] = Field( 

324 default=False, 

325 description="A flag to decide whether to include not existing attribute in " 

326 "notifications. It can be useful for those notification endpoints " 

327 "that are not flexible enough for a variable set of attributes and " 

328 "needs always the same set of incoming attributes in every received" 

329 " notification " 

330 "https://fiware-orion.readthedocs.io/en/master/orion-api.html#covered-subscriptions" 

331 ) 

332 

333 @model_validator(mode='after') 

334 def validate_http(self): 

335 if self.httpCustom is not None: 

336 assert self.http is None 

337 return self 

338 

339 @model_validator(mode='after') 

340 def validate_attr(self): 

341 if self.exceptAttrs is not None: 

342 assert self.attrs is None 

343 return self 

344 

345 @model_validator(mode='after') 

346 def validate_endpoints(self): 

347 if self.http is not None: 

348 assert all((v is None for k, v in self.model_dump().items() if k in [ 

349 'httpCustom', 'mqtt', 'mqttCustom'])) 

350 elif self.httpCustom is not None: 

351 assert all((v is None for k, v in self.model_dump().items() if k in [ 

352 'http', 'mqtt', 'mqttCustom'])) 

353 elif self.mqtt is not None: 

354 assert all((v is None for k, v in self.model_dump().items() if k in [ 

355 'http', 'httpCustom', 'mqttCustom'])) 

356 else: 

357 assert all((v is None for k, v in self.model_dump().items() if k in [ 

358 'http', 'httpCustom', 'mqtt'])) 

359 return self 

360 

361 @model_validator(mode='after') 

362 def validate_covered_attrs(self): 

363 if self.covered is True: 

364 if isinstance(self.attrs, list) and len(self.attrs) > 0: 

365 return self 

366 else: 

367 raise ValueError('Covered notification need an explicit list of attrs.') 

368 return self 

369 

370 

371class Response(Notification): 

372 """ 

373 Server response model for notifications 

374 """ 

375 timesSent: int = Field( 

376 description='(not editable, only present in GET operations): ' 

377 'Number of notifications sent due to this subscription.' 

378 ) 

379 lastNotification: datetime = Field( 

380 description='(not editable, only present in GET operations): ' 

381 'Last notification timestamp in ISO8601 format.' 

382 ) 

383 lastFailure: Optional[datetime] = Field( 

384 default=None, 

385 description='(not editable, only present in GET operations): ' 

386 'Last failure timestamp in ISO8601 format. Not present if ' 

387 'subscription has never had a problem with notifications.' 

388 ) 

389 lastSuccess: Optional[datetime] = Field( 

390 default=None, 

391 description='(not editable, only present in GET operations): ' 

392 'Timestamp in ISO8601 format for last successful ' 

393 'notification. Not present if subscription has never ' 

394 'had a successful notification.' 

395 ) 

396 

397 

398class Condition(BaseModel): 

399 """ 

400 Notification rules are as follow: 

401 If attrs and expression are used, a notification is sent whenever one of 

402 the attributes in the attrs list changes and at the same time expression 

403 matches. 

404 If attrs is used and expression is not used, a notification is sent 

405 whenever any of the attributes in the attrs list changes. 

406 If attrs is not used and expression is used, a notification is sent 

407 whenever any of the attributes of the entity changes and at the same time 

408 expression matches. 

409 If neither attrs nor expression are used, a notification is sent whenever 

410 any of the attributes of the entity changes. 

411 alterationTypes: for more information about this field, see 

412 https://github.com/telefonicaid/fiware-orion/blob/3.8.0/doc/manuals/orion-api.md#subscriptions-based-in-alteration-type 

413 

414 """ 

415 attrs: Optional[Union[str, List[str]]] = Field( 

416 default=None, 

417 description='array of attribute names' 

418 ) 

419 expression: Optional[Union[str, Expression]] = Field( 

420 default=None, 

421 description='an expression composed of q, mq, georel, geometry and ' 

422 'coords (see "List entities" operation above about this ' 

423 'field).' 

424 ) 

425 alterationTypes: Optional[List[str]] = Field( 

426 default=None, 

427 description='list of alteration types triggering the subscription' 

428 ) 

429 

430 @field_validator('attrs') 

431 def check_attrs(cls, v): 

432 if isinstance(v, list): 

433 return v 

434 elif isinstance(v, str): 

435 return [v] 

436 else: 

437 raise TypeError() 

438 

439 @field_validator('alterationTypes') 

440 def check_alteration_types(cls, v): 

441 allowed_types = {"entityCreate", "entityDelete", "entityUpdate", "entityChange"} 

442 

443 if v is None: 

444 return None 

445 elif isinstance(v, list): 

446 for item in v: 

447 if item not in allowed_types: 

448 raise ValueError(f'{item} is not a valid alterationType' 

449 f' allowed values are {allowed_types}') 

450 return v 

451 else: 

452 raise ValueError('alterationTypes must be a list of strings') 

453 

454 

455class Subject(BaseModel): 

456 """ 

457 Model for subscription subject 

458 """ 

459 entities: List[EntityPattern] = Field( 

460 description="A list of objects, each one composed of by an Entity " 

461 "Object:" 

462 ) 

463 condition: Optional[Condition] = Field( 

464 default=None, 

465 ) 

466 

467 

468class Subscription(BaseModel): 

469 """ 

470 Subscription payload validations 

471 https://fiware-orion.readthedocs.io/en/master/user/ngsiv2_implementation_notes/index.html#subscription-payload-validations 

472 """ 

473 model_config = ConfigDict(validate_assignment=True) 

474 

475 id: Optional[str] = Field( 

476 default=None, 

477 description="Subscription unique identifier. Automatically created at " 

478 "creation time." 

479 ) 

480 description: Optional[str] = Field( 

481 default=None, 

482 description="A free text used by the client to describe the " 

483 "subscription." 

484 ) 

485 status: Optional[Status] = Field( 

486 default=Status.ACTIVE, 

487 description="Either active (for active subscriptions) or inactive " 

488 "(for inactive subscriptions). If this field is not " 

489 "provided at subscription creation time, new subscriptions " 

490 "are created with the active status, which can be changed" 

491 " by clients afterwards. For expired subscriptions, this " 

492 "attribute is set to expired (no matter if the client " 

493 "updates it to active/inactive). Also, for subscriptions " 

494 "experiencing problems with notifications, the status is " 

495 "set to failed. As soon as the notifications start working " 

496 "again, the status is changed back to active." 

497 ) 

498 subject: Subject = Field( 

499 description="An object that describes the subject of the subscription.", 

500 examples=[{ 

501 'entities': [{'idPattern': '.*', 'type': 'Room'}], 

502 'condition': { 

503 'attrs': ['temperature'], 

504 'expression': {'q': 'temperature>40'}, 

505 }, 

506 }], 

507 ) 

508 notification: Notification = Field( 

509 description="An object that describes the notification to send when " 

510 "the subscription is triggered.", 

511 examples=[{ 

512 'http': {'url': 'http://localhost:1234'}, 

513 'attrs': ['temperature', 'humidity'], 

514 }], 

515 ) 

516 expires: Optional[datetime] = Field( 

517 default=None, 

518 description="Subscription expiration date in ISO8601 format. " 

519 "Permanent subscriptions must omit this field." 

520 ) 

521 

522 throttling: Optional[conint(strict=True, ge=0, )] = Field( 

523 default=None, 

524 strict=True, 

525 description="Minimal period of time in seconds which " 

526 "must elapse between two consecutive notifications. " 

527 "It is optional." 

528 )