Coverage for filip/models/ngsi_ld/subscriptions.py: 95%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1from typing import List, Optional, Literal 

2from pydantic import ( 

3 ConfigDict, 

4 BaseModel, 

5 Field, 

6 HttpUrl, 

7 AnyUrl, 

8 field_validator, 

9 model_validator, 

10) 

11import dateutil.parser 

12from filip.models.ngsi_ld.base import GeoQuery, validate_ngsi_ld_query 

13 

14 

15class EntityInfo(BaseModel): 

16 """ 

17 In v1.3.1 it is specified as EntityInfo 

18 In v1.6.1 it is specified in a new data type, namely EntitySelector 

19 """ 

20 

21 id: Optional[str] = Field(default=None, description="Entity identifier (valid URI)") 

22 idPattern: Optional[str] = Field( 

23 default=None, description="Regular expression as per IEEE POSIX 1003.2™ [11]" 

24 ) 

25 type: str = Field( 

26 description="Fully Qualified Name of an Entity Type or the Entity Type Name as a " 

27 "short-hand string. See clause 4.6.2" 

28 ) 

29 model_config = ConfigDict(populate_by_name=True) 

30 

31 

32class KeyValuePair(BaseModel): 

33 key: str 

34 value: str 

35 

36 

37class Endpoint(BaseModel): 

38 """ 

39 This datatype represents the parameters that are required in order to define 

40 an endpoint for notifications. This can include the endpoint's URI, a 

41 generic{key, value} array, named receiverInfo, which contains, in a 

42 generalized form, whatever extra information the broker shall convey to the 

43 receiver in order for the broker to successfully communicate with 

44 receiver (e.g Authorization material), or for the receiver to correctly 

45 interpret the received content (e.g. the Link URL to fetch an @context). 

46 

47 Additionally, it can include another generic{key, value} array, named 

48 notifierInfo, which contains the configuration that the broker needs to 

49 know in order to correctly set up the communication channel towards the 

50 receiver 

51 

52 Example of "receiverInfo" 

53 "receiverInfo": [ 

54 { 

55 "key": "H1", 

56 "value": "123" 

57 }, 

58 { 

59 "key": "H2", 

60 "value": "456" 

61 } 

62 ] 

63 

64 Example of "notifierInfo" 

65 "notifierInfo": [ 

66 { 

67 "key": "MQTT-Version", 

68 "value": "mqtt5.0" 

69 } 

70 ] 

71 """ 

72 

73 uri: AnyUrl = Field(description="Dereferenceable URI") 

74 accept: Optional[str] = Field( 

75 default=None, 

76 description="MIME type for the notification payload body " 

77 "(application/json, application/ld+json, " 

78 "application/geo+json)", 

79 ) 

80 receiverInfo: Optional[List[KeyValuePair]] = Field( 

81 default=None, 

82 description="Generic {key, value} array to convey optional information " 

83 "to the receiver", 

84 ) 

85 notifierInfo: Optional[List[KeyValuePair]] = Field( 

86 default=None, 

87 description="Generic {key, value} array to set up the communication " "channel", 

88 ) 

89 model_config = ConfigDict(populate_by_name=True) 

90 

91 @field_validator("uri") 

92 @classmethod 

93 def check_uri(cls, uri: AnyUrl): 

94 if uri.scheme not in ("http", "mqtt"): 

95 raise ValueError("NGSI-LD currently only support http and mqtt") 

96 return uri 

97 

98 @field_validator("notifierInfo") 

99 @classmethod 

100 def check_notifier_info(cls, notifierInfo: List[KeyValuePair]): 

101 # TODO add validation of notifierInfo for MQTT notification 

102 return notifierInfo 

103 

104 

105class NotificationParams(BaseModel): 

106 """ 

107 NGSI-LD Notification model. It contains the parameters that allow to 

108 convey the details of a notification, as described in NGSI-LD Spec section 5.2.14 

109 """ 

110 

111 attributes: Optional[List[str]] = Field( 

112 default=None, 

113 description="Entity Attribute Names (Properties or Relationships) to be included " 

114 "in the notification payload body. If undefined, it will mean all Attributes", 

115 ) 

116 format: Optional[str] = Field( 

117 default="normalized", 

118 description="Conveys the representation format of the entities delivered at " 

119 "notification time. By default, it will be in normalized format", 

120 ) 

121 endpoint: Endpoint = Field(..., description="Notification endpoint details") 

122 # status can either be "ok" or "failed" 

123 status: Literal["ok", "failed"] = Field( 

124 default="ok", 

125 description="Status of the Notification. It shall be 'ok' if the last attempt " 

126 "to notify the subscriber succeeded. It shall be 'failed' if the last" 

127 " attempt to notify the subscriber failed", 

128 ) 

129 

130 # Additional members 

131 timesSent: Optional[int] = Field( 

132 default=None, 

133 description="Number of times that the notification was sent. Provided by the " 

134 "system when querying the details of a subscription", 

135 ) 

136 lastNotification: Optional[str] = Field( 

137 default=None, 

138 description="Timestamp corresponding to the instant when the last notification " 

139 "was sent. Provided by the system when querying the details of a subscription", 

140 ) 

141 lastFailure: Optional[str] = Field( 

142 default=None, 

143 description="Timestamp corresponding to the instant when the last notification" 

144 " resulting in failure was sent. Provided by the system when querying the details of a subscription", 

145 ) 

146 lastSuccess: Optional[str] = Field( 

147 default=None, 

148 description="Timestamp corresponding to the instant when the last successful " 

149 "notification was sent. Provided by the system when querying the details of a subscription", 

150 ) 

151 model_config = ConfigDict(populate_by_name=True) 

152 

153 

154class TemporalQuery(BaseModel): 

155 """ 

156 Temporal query according to NGSI-LD Spec section 5.2.21 

157 

158 timerel: 

159 Temporal relationship, one of "before", "after" and "between". 

160 "before": before the time specified by timeAt. 

161 "after": after the time specified by timeAt. 

162 "between": after the time specified by timeAt and before the time specified by 

163 endtimeAt 

164 timeAt: 

165 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z 

166 endTimeAt (optional): 

167 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z 

168 Only required when timerel="between" 

169 timeproperty: str 

170 Representing a Propertyname of the Property that contains the temporal data that 

171 will be used to resolve the temporal query. If not specified, the default is 

172 "observedAt" 

173 

174 """ 

175 

176 model_config = ConfigDict(populate_by_name=True) 

177 timerel: Literal["before", "after", "between"] = Field( 

178 ..., 

179 description="String representing the temporal relationship as defined by clause " 

180 "4.11 (Allowed values: 'before', 'after', and 'between') ", 

181 ) 

182 timeAt: str = Field( 

183 ..., 

184 description="String representing the timeAt parameter as defined by clause " 

185 "4.11. It shall be a DateTime ", 

186 ) 

187 endTimeAt: Optional[str] = Field( 

188 default=None, 

189 description="String representing the endTimeAt parameter as defined by clause " 

190 "4.11. It shall be a DateTime. Cardinality shall be 1 if timerel is " 

191 "equal to 'between' ", 

192 ) 

193 timeproperty: Optional[str] = Field( 

194 default=None, 

195 description="String representing a Property name. The name of the Property that " 

196 "contains the temporal data that will be used to resolve the " 

197 "temporal query. If not specified, ", 

198 ) 

199 

200 @field_validator("timeAt", "endTimeAt") 

201 @classmethod 

202 def check_uri(cls, v: str): 

203 if not v: 

204 return v 

205 else: 

206 try: 

207 dateutil.parser.isoparse(v) 

208 except ValueError: 

209 raise ValueError("timeAt must be in ISO8061 format") 

210 return v 

211 

212 # when timerel=between, endTimeAt must be specified 

213 @model_validator(mode="after") 

214 def check_passwords_match(self) -> "TemporalQuery": 

215 if self.timerel == "between" and self.endTimeAt is None: 

216 raise ValueError('When timerel="between", endTimeAt must be specified') 

217 return self 

218 

219 

220class SubscriptionLD(BaseModel): 

221 """ 

222 Context Subscription model according to NGSI-LD Spec section 5.2.12 

223 """ 

224 

225 id: Optional[str] = Field( 

226 default=None, description="Subscription identifier (JSON-LD @id)" 

227 ) 

228 type: str = Field(default="Subscription", description="JSON-LD @type") 

229 subscriptionName: Optional[str] = Field( 

230 default=None, description="A (short) name given to this Subscription" 

231 ) 

232 description: Optional[str] = Field( 

233 default=None, description="Subscription description" 

234 ) 

235 entities: Optional[List[EntityInfo]] = Field( 

236 default=None, description="Entities subscribed" 

237 ) 

238 watchedAttributes: Optional[List[str]] = Field( 

239 default=None, description="Watched Attributes (Properties or Relationships)" 

240 ) 

241 notificationTrigger: Optional[List[str]] = Field( 

242 default=None, description="Notification triggers" 

243 ) 

244 timeInterval: Optional[int] = Field( 

245 default=None, description="Time interval in seconds" 

246 ) 

247 q: Optional[str] = Field( 

248 default=None, 

249 description="Query met by subscribed entities to trigger the notification", 

250 ) 

251 

252 @field_validator("q") 

253 @classmethod 

254 def check_q(cls, v: str): 

255 return validate_ngsi_ld_query(v) 

256 

257 geoQ: Optional[GeoQuery] = Field( 

258 default=None, 

259 description="Geoquery met by subscribed entities to trigger the notification", 

260 ) 

261 csf: Optional[str] = Field(default=None, description="Context source filter") 

262 isActive: bool = Field( 

263 default=True, 

264 description="Indicates if the Subscription is under operation (True) or paused (False)", 

265 ) 

266 notification: NotificationParams = Field(..., description="Notification details") 

267 expiresAt: Optional[str] = Field( 

268 default=None, description="Expiration date for the subscription" 

269 ) 

270 throttling: Optional[int] = Field( 

271 default=None, 

272 description="Minimal period of time in seconds between two consecutive notifications." 

273 "It must be greater than 0 if set. If not set, it means no throttling is used.", 

274 ) 

275 

276 @field_validator("throttling") 

277 @classmethod 

278 def check_throttling(cls, throttling): 

279 """ 

280 Validate the throttling value. It must be greater than 0 if provided. 

281 """ 

282 if throttling is not None and throttling <= 0: 

283 raise ValueError("Throttling must be greater than 0") 

284 return throttling 

285 

286 temporalQ: Optional[TemporalQuery] = Field( 

287 default=None, description="Temporal Query" 

288 ) 

289 lang: Optional[str] = Field( 

290 default=None, description="Language filter applied to the query" 

291 ) 

292 model_config = ConfigDict(populate_by_name=True)