Coverage for filip/models/ngsi_v2/subscriptions.py: 97%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2This module contains NGSIv2 models for context subscription in the context 

3broker. 

4""" 

5 

6from typing import Any, List, Dict, Union, Optional 

7from datetime import datetime 

8from aenum import Enum 

9from pydantic import ( 

10 field_validator, 

11 model_validator, 

12 ConfigDict, 

13 BaseModel, 

14 conint, 

15 Field, 

16 Json, 

17) 

18from .base import AttrsFormat, EntityPattern, Http, Status, Expression 

19from filip.utils.validators import validate_mqtt_url, validate_mqtt_topic 

20from filip.models.ngsi_v2.context import ContextEntity 

21from filip.models.ngsi_v2.base import EntityPattern, Expression, DataType 

22from filip.custom_types import AnyMqttUrl 

23import warnings 

24 

25# The pydantic models still have a .json() function, but this method is deprecated. 

26warnings.filterwarnings( 

27 "ignore", 

28 category=UserWarning, 

29 message='Field name "json" shadows an attribute in parent "Http"', 

30) 

31warnings.filterwarnings( 

32 "ignore", 

33 category=UserWarning, 

34 message='Field name "json" shadows an attribute in parent "Mqtt"', 

35) 

36 

37 

38class NgsiPayloadAttr(BaseModel): 

39 """ 

40 Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. 

41 The difference between this model and the usual BaseValueAttribute model is that 

42 a metadata field is not allowed. 

43 In the absence of type/value in some attribute field, one should resort to partial 

44 representations ( as specified in the orion api manual), done by the BaseValueAttr. 

45 model. 

46 """ 

47 

48 model_config = ConfigDict(extra="forbid") 

49 type: Union[DataType, str] = Field( 

50 default=DataType.TEXT, 

51 description="The attribute type represents the NGSI value type of the " 

52 "attribute value. Note that FIWARE NGSI has its own type " 

53 "system for attribute values, so NGSI value types are not " 

54 "the same as JSON types. Allowed characters " 

55 "are the ones in the plain ASCII set, except the following " 

56 "ones: control characters, whitespace, &, ?, / and #.", 

57 max_length=256, 

58 min_length=1, 

59 ) 

60 value: Optional[Any] = Field( 

61 default=None, title="Attribute value", description="the actual data" 

62 ) 

63 

64 

65class NgsiPayload(BaseModel): 

66 """ 

67 Model for NGSI V2 type payload in httpCustom/mqttCustom notifications. 

68 Differences between this model and the usual Context entity models include: 

69 - id and type are not mandatory 

70 - an attribute metadata field is not allowed 

71 """ 

72 

73 model_config = ConfigDict(extra="allow", validate_default=True) 

74 id: Optional[str] = Field(default=None, max_length=256, min_length=1, frozen=True) 

75 type: Optional[Union[str, Enum]] = Field( 

76 default=None, 

77 max_length=256, 

78 min_length=1, 

79 frozen=True, 

80 ) 

81 

82 @model_validator(mode="after") 

83 def validate_notification_attrs(self): 

84 for v in self.model_dump(exclude={"id", "type"}).values(): 

85 assert isinstance(NgsiPayloadAttr.model_validate(v), NgsiPayloadAttr) 

86 return self 

87 

88 

89class Message(BaseModel): 

90 """ 

91 Model for a notification message, when sent to other NGSIv2-APIs 

92 """ 

93 

94 subscriptionId: Optional[str] = Field( 

95 default=None, 

96 description="Id of the subscription the notification comes from", 

97 ) 

98 data: List[ContextEntity] = Field( 

99 description="is an array with the notification data itself which " 

100 "includes the entity and all concerned attributes. Each " 

101 "element in the array corresponds to a different entity. " 

102 "By default, the entities are represented in normalized " 

103 "mode. However, using the attrsFormat modifier, a " 

104 "simplified representation mode can be requested." 

105 ) 

106 

107 

108class HttpMethods(str, Enum): 

109 _init_ = "value __doc__" 

110 

111 POST = "POST", "Post Method" 

112 PUT = "PUT", "Put Method" 

113 PATCH = "PATCH", "Patch Method" 

114 

115 

116class HttpCustom(Http): 

117 """ 

118 Model for custom notification patterns sent via HTTP 

119 """ 

120 

121 headers: Optional[Dict[str, Union[str, Json]]] = Field( 

122 default=None, 

123 description="a key-map of HTTP headers that are included in " 

124 "notification messages.", 

125 ) 

126 qs: Optional[Dict[str, Union[str, Json]]] = Field( 

127 default=None, 

128 description="a key-map of URL query parameters that are included in " 

129 "notification messages.", 

130 ) 

131 method: str = Field( 

132 default=HttpMethods.POST, 

133 description="the method to use when sending the notification " 

134 "(default is POST). Only valid HTTP methods are allowed. " 

135 "On specifying an invalid HTTP method, a 400 Bad Request " 

136 "error is returned.", 

137 ) 

138 payload: Optional[str] = Field( 

139 default=None, 

140 description="the payload to be used in notifications. If omitted, the " 

141 'default payload (see "Notification Messages" sections) ' 

142 "is used.", 

143 ) 

144 json: Optional[Dict[str, Union[str, Json]]] = Field( 

145 default=None, 

146 description="get a json as notification. If omitted, the default" 

147 'payload (see "Notification Messages" sections) is used.', 

148 ) 

149 ngsi: Optional[NgsiPayload] = Field( 

150 default=None, 

151 description="get an NGSI-v2 normalized entity as notification.If omitted, " 

152 'the default payload (see "Notification Messages" sections) is used.', 

153 ) 

154 timeout: Optional[int] = Field( 

155 default=None, 

156 description="Maximum time (in milliseconds) the subscription waits for the " 

157 "response. The maximum value allowed for this parameter is 1800000 " 

158 "(30 minutes). If timeout is defined to 0 or omitted, then the value " 

159 "passed as -httpTimeout CLI parameter is used. See section in the " 

160 "'Command line options' for more details.", 

161 ) 

162 

163 @model_validator(mode="after") 

164 def validate_notification_payloads(self): 

165 fields = [self.payload, self.json, self.ngsi] 

166 filled_fields = [field for field in fields if field is not None] 

167 

168 if len(filled_fields) > 1: 

169 raise ValueError( 

170 "Only one of payload, json or ngsi fields accepted at the " 

171 "same time in httpCustom." 

172 ) 

173 

174 return self 

175 

176 

177class Mqtt(BaseModel): 

178 """ 

179 Model for notifications sent via MQTT 

180 https://fiware-orion.readthedocs.io/en/3.8.0/user/mqtt_notifications/index.html 

181 """ 

182 

183 url: Union[AnyMqttUrl, str] = Field( 

184 description="to specify the MQTT broker endpoint to use. URL must " 

185 "start with mqtt:// and never contains a path (i.e. it " 

186 "only includes host and port)" 

187 ) 

188 topic: str = Field( 

189 description="to specify the MQTT topic to use", 

190 ) 

191 valid_type = field_validator("topic")(validate_mqtt_topic) 

192 qos: Optional[int] = Field( 

193 default=0, 

194 description="to specify the MQTT QoS value to use in the " 

195 "notifications associated to the subscription (0, 1 or 2). " 

196 "This is an optional field, if omitted then QoS 0 is used.", 

197 ge=0, 

198 le=2, 

199 ) 

200 user: Optional[str] = Field(default=None, description="username if required") 

201 passwd: Optional[str] = Field(default=None, description="password if required") 

202 

203 @field_validator("url") 

204 @classmethod 

205 def check_url(cls, value): 

206 """ 

207 Check if url has a valid structure 

208 Args: 

209 value: url to validate 

210 Returns: 

211 validated url 

212 """ 

213 return validate_mqtt_url(url=value) 

214 

215 

216class MqttCustom(Mqtt): 

217 """ 

218 Model for custom notification patterns sent via MQTT 

219 https://fiware-orion.readthedocs.io/en/3.8.0/user/mqtt_notifications/index.html 

220 """ 

221 

222 payload: Optional[str] = Field( 

223 default=None, 

224 description="the payload to be used in notifications. If omitted, the " 

225 'default payload (see "Notification Messages" sections) ' 

226 "is used.", 

227 ) 

228 json: Optional[Dict[str, Any]] = Field( 

229 default=None, 

230 description="get a json as notification. If omitted, the default" 

231 'payload (see "Notification Messages" sections) is used.', 

232 ) 

233 ngsi: Optional[NgsiPayload] = Field( 

234 default=None, 

235 description="get an NGSI-v2 normalized entity as notification.If omitted, " 

236 'the default payload (see "Notification Messages" sections) is used.', 

237 ) 

238 

239 @model_validator(mode="after") 

240 def validate_payload_type(self): 

241 assert ( 

242 len( 

243 [ 

244 v 

245 for k, v in self.model_dump().items() 

246 if ((v is not None) and (k in ["payload", "ngsi", "json"])) 

247 ] 

248 ) 

249 <= 1 

250 ) 

251 return self 

252 

253 

254class Notification(BaseModel): 

255 """ 

256 If the notification attributes are left empty, all attributes will be 

257 included in the notifications. Otherwise, only the specified ones will 

258 be included. 

259 """ 

260 

261 model_config = ConfigDict(validate_assignment=True) 

262 timesSent: Optional[Any] = Field( 

263 default=None, 

264 description="Not editable, only present in GET operations. " 

265 "Number of notifications sent due to this subscription.", 

266 ) 

267 http: Optional[Http] = Field( 

268 default=None, 

269 description="It is used to convey parameters for notifications " 

270 "delivered through the HTTP protocol. Cannot be used " 

271 'together with "httpCustom, mqtt, mqttCustom"', 

272 ) 

273 httpCustom: Optional[HttpCustom] = Field( 

274 default=None, 

275 description="It is used to convey parameters for notifications " 

276 "delivered through the HTTP protocol. Cannot be used " 

277 'together with "http"', 

278 ) 

279 mqtt: Optional[Mqtt] = Field( 

280 default=None, 

281 description="It is used to convey parameters for notifications " 

282 "delivered through the MQTT protocol. Cannot be used " 

283 'together with "http, httpCustom, mqttCustom"', 

284 ) 

285 mqttCustom: Optional[MqttCustom] = Field( 

286 default=None, 

287 description="It is used to convey parameters for notifications " 

288 "delivered through the MQTT protocol. Cannot be used " 

289 'together with "http, httpCustom, mqtt"', 

290 ) 

291 attrs: Optional[List[str]] = Field( 

292 default=None, 

293 description="List of attributes to be included in notification " 

294 "messages. It also defines the order in which attributes " 

295 "must appear in notifications when attrsFormat value is " 

296 'used (see "Notification Messages" section). An empty list ' 

297 "means that all attributes are to be included in " 

298 'notifications. See "Filtering out attributes and ' 

299 'metadata" section for more detail.', 

300 ) 

301 exceptAttrs: Optional[List[str]] = Field( 

302 default=None, 

303 description="List of attributes to be excluded from the notification " 

304 "message, i.e. a notification message includes all entity " 

305 "attributes except the ones listed in this field.", 

306 ) 

307 attrsFormat: Optional[AttrsFormat] = Field( 

308 default=AttrsFormat.NORMALIZED, 

309 description="specifies how the entities are represented in " 

310 "notifications. Accepted values are normalized (default), " 

311 "keyValues or values. If attrsFormat takes any value " 

312 "different than those, an error is raised. See detail in " 

313 '"Notification Messages" section.', 

314 ) 

315 metadata: Optional[Any] = Field( 

316 default=None, 

317 description="List of metadata to be included in notification messages. " 

318 'See "Filtering out attributes and metadata" section for ' 

319 "more detail.", 

320 ) 

321 onlyChangedAttrs: Optional[bool] = Field( 

322 default=False, 

323 description="Only supported by Orion Context Broker!" 

324 "If set to true then notifications associated to the " 

325 "subscription include only attributes that changed in the " 

326 "triggering update request, in combination with the attrs " 

327 "or exceptAttrs field. For instance, if attrs is " 

328 "[A=1, B=2, C=3] and A=0 is updated. In case " 

329 "onlyChangedAttrs=false, CB notifies [A=0, B=2, C=3]." 

330 "In case onlyChangedAttrs=true, CB notifies " 

331 "[A=0, B=null, C=null]. This ", 

332 ) 

333 lastSuccess: Optional[Any] = Field( 

334 default=None, 

335 description="Not editable, only present in GET operations. " 

336 "A Timestamp in ISO8601 format for last successful " 

337 "notification. Not present if subscription has never " 

338 "had a successful notification.", 

339 ) 

340 lastSuccessCode: Optional[Any] = Field( 

341 default=None, 

342 description="Not editable, only present in GET operations. " 

343 "HTTP status code of the last successful notification.", 

344 ) 

345 lastFailure: Optional[Any] = Field( 

346 default=None, 

347 description="Not editable, only present in GET operations" 

348 "Last failure timestamp in ISO8601 format. Not present if " 

349 "subscription has never had a problem with notifications.", 

350 ) 

351 lastFailureReason: Optional[Any] = Field( 

352 default=None, 

353 description="Not editable, only present in GET operations. " 

354 "A string with the reason for the last failure. ", 

355 ) 

356 covered: Optional[bool] = Field( 

357 default=False, 

358 description="A flag to decide whether to include not existing attribute in " 

359 "notifications. It can be useful for those notification endpoints " 

360 "that are not flexible enough for a variable set of attributes and " 

361 "needs always the same set of incoming attributes in every received" 

362 " notification " 

363 "https://fiware-orion.readthedocs.io/en/master/orion-api.html#covered-subscriptions", 

364 ) 

365 

366 @model_validator(mode="after") 

367 def validate_http(self): 

368 if self.httpCustom is not None: 

369 assert self.http is None 

370 return self 

371 

372 @model_validator(mode="after") 

373 def validate_attr(self): 

374 if self.exceptAttrs is not None: 

375 assert self.attrs is None 

376 return self 

377 

378 @model_validator(mode="after") 

379 def validate_endpoints(self): 

380 if self.http is not None: 

381 assert all( 

382 ( 

383 v is None 

384 for k, v in self.model_dump().items() 

385 if k in ["httpCustom", "mqtt", "mqttCustom"] 

386 ) 

387 ) 

388 elif self.httpCustom is not None: 

389 assert all( 

390 ( 

391 v is None 

392 for k, v in self.model_dump().items() 

393 if k in ["http", "mqtt", "mqttCustom"] 

394 ) 

395 ) 

396 elif self.mqtt is not None: 

397 assert all( 

398 ( 

399 v is None 

400 for k, v in self.model_dump().items() 

401 if k in ["http", "httpCustom", "mqttCustom"] 

402 ) 

403 ) 

404 else: 

405 assert all( 

406 ( 

407 v is None 

408 for k, v in self.model_dump().items() 

409 if k in ["http", "httpCustom", "mqtt"] 

410 ) 

411 ) 

412 return self 

413 

414 @model_validator(mode="after") 

415 def validate_covered_attrs(self): 

416 if self.covered is True: 

417 if isinstance(self.attrs, list) and len(self.attrs) > 0: 

418 return self 

419 else: 

420 raise ValueError("Covered notification need an explicit list of attrs.") 

421 return self 

422 

423 

424class Response(Notification): 

425 """ 

426 Server response model for notifications 

427 """ 

428 

429 timesSent: int = Field( 

430 description="(not editable, only present in GET operations): " 

431 "Number of notifications sent due to this subscription." 

432 ) 

433 lastNotification: datetime = Field( 

434 description="(not editable, only present in GET operations): " 

435 "Last notification timestamp in ISO8601 format." 

436 ) 

437 lastFailure: Optional[datetime] = Field( 

438 default=None, 

439 description="(not editable, only present in GET operations): " 

440 "Last failure timestamp in ISO8601 format. Not present if " 

441 "subscription has never had a problem with notifications.", 

442 ) 

443 lastSuccess: Optional[datetime] = Field( 

444 default=None, 

445 description="(not editable, only present in GET operations): " 

446 "Timestamp in ISO8601 format for last successful " 

447 "notification. Not present if subscription has never " 

448 "had a successful notification.", 

449 ) 

450 

451 

452class Condition(BaseModel): 

453 """ 

454 Notification rules are as follow: 

455 If attrs and expression are used, a notification is sent whenever one of 

456 the attributes in the attrs list changes and at the same time expression 

457 matches. 

458 If attrs is used and expression is not used, a notification is sent 

459 whenever any of the attributes in the attrs list changes. 

460 If attrs is not used and expression is used, a notification is sent 

461 whenever any of the attributes of the entity changes and at the same time 

462 expression matches. 

463 If neither attrs nor expression are used, a notification is sent whenever 

464 any of the attributes of the entity changes. 

465 alterationTypes: for more information about this field, see 

466 https://github.com/telefonicaid/fiware-orion/blob/3.8.0/doc/manuals/orion-api.md#subscriptions-based-in-alteration-type 

467 

468 """ 

469 

470 attrs: Optional[Union[str, List[str]]] = Field( 

471 default=None, description="array of attribute names" 

472 ) 

473 expression: Optional[Union[str, Expression]] = Field( 

474 default=None, 

475 description="an expression composed of q, mq, georel, geometry and " 

476 'coords (see "List entities" operation above about this ' 

477 "field).", 

478 ) 

479 alterationTypes: Optional[List[str]] = Field( 

480 default=None, description="list of alteration types triggering the subscription" 

481 ) 

482 

483 @field_validator("attrs") 

484 def check_attrs(cls, v): 

485 if isinstance(v, list): 

486 return v 

487 elif isinstance(v, str): 

488 return [v] 

489 else: 

490 raise TypeError() 

491 

492 @field_validator("alterationTypes") 

493 def check_alteration_types(cls, v): 

494 allowed_types = {"entityCreate", "entityDelete", "entityUpdate", "entityChange"} 

495 

496 if v is None: 

497 return None 

498 elif isinstance(v, list): 

499 for item in v: 

500 if item not in allowed_types: 

501 raise ValueError( 

502 f"{item} is not a valid alterationType" 

503 f" allowed values are {allowed_types}" 

504 ) 

505 return v 

506 else: 

507 raise ValueError("alterationTypes must be a list of strings") 

508 

509 

510class Subject(BaseModel): 

511 """ 

512 Model for subscription subject 

513 """ 

514 

515 entities: List[EntityPattern] = Field( 

516 description="A list of objects, each one composed of by an Entity " "Object:" 

517 ) 

518 condition: Optional[Condition] = Field( 

519 default=None, 

520 ) 

521 

522 

523class Subscription(BaseModel): 

524 """ 

525 Subscription payload validations 

526 https://fiware-orion.readthedocs.io/en/master/user/ngsiv2_implementation_notes/index.html#subscription-payload-validations 

527 """ 

528 

529 model_config = ConfigDict(validate_assignment=True) 

530 

531 id: Optional[str] = Field( 

532 default=None, 

533 description="Subscription unique identifier. Automatically created at " 

534 "creation time.", 

535 ) 

536 description: Optional[str] = Field( 

537 default=None, 

538 description="A free text used by the client to describe the " "subscription.", 

539 ) 

540 status: Optional[Status] = Field( 

541 default=Status.ACTIVE, 

542 description="Either active (for active subscriptions) or inactive " 

543 "(for inactive subscriptions). If this field is not " 

544 "provided at subscription creation time, new subscriptions " 

545 "are created with the active status, which can be changed" 

546 " by clients afterwards. For expired subscriptions, this " 

547 "attribute is set to expired (no matter if the client " 

548 "updates it to active/inactive). Also, for subscriptions " 

549 "experiencing problems with notifications, the status is " 

550 "set to failed. As soon as the notifications start working " 

551 "again, the status is changed back to active.", 

552 ) 

553 subject: Subject = Field( 

554 description="An object that describes the subject of the subscription.", 

555 json_schema_extra={ 

556 "example": { 

557 "entities": [{"idPattern": ".*", "type": "Room"}], 

558 "condition": { 

559 "attrs": ["temperature"], 

560 "expression": {"q": "temperature>40"}, 

561 }, 

562 } 

563 }, 

564 ) 

565 notification: Notification = Field( 

566 description="An object that describes the notification to send when " 

567 "the subscription is triggered.", 

568 json_schema_extra={ 

569 "example": { 

570 "http": {"url": "http://localhost:1234"}, 

571 "attrs": ["temperature", "humidity"], 

572 } 

573 }, 

574 ) 

575 expires: Optional[datetime] = Field( 

576 default=None, 

577 description="Subscription expiration date in ISO8601 format. " 

578 "Permanent subscriptions must omit this field.", 

579 ) 

580 

581 throttling: Optional[ 

582 conint( 

583 strict=True, 

584 ge=0, 

585 ) 

586 ] = Field( 

587 default=None, 

588 strict=True, 

589 description="Minimal period of time in seconds which " 

590 "must elapse between two consecutive notifications. " 

591 "It is optional.", 

592 )