Coverage for filip/models/ngsi_ld/subscriptions.py: 99%

80 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-02-19 11:48 +0000

1from typing import List, Optional, Literal 

2from pydantic import ( 

3 ConfigDict, 

4 BaseModel, 

5 Field, 

6 HttpUrl, 

7 AnyUrl, 

8 field_validator, 

9 model_validator, 

10) 

11import dateutil.parser 

12from filip.models.ngsi_ld.base import GeoQuery, validate_ngsi_ld_query 

13 

14 

15class EntityInfo(BaseModel): 

16 """ 

17 In v1.3.1 it is specified as EntityInfo 

18 In v1.6.1 it is specified in a new data type, namely EntitySelector 

19 """ 

20 

21 id: Optional[HttpUrl] = Field( 

22 default=None, description="Entity identifier (valid URI)" 

23 ) 

24 idPattern: Optional[str] = Field( 

25 default=None, description="Regular expression as per IEEE POSIX 1003.2™ [11]" 

26 ) 

27 type: str = Field( 

28 description="Fully Qualified Name of an Entity Type or the Entity Type Name as a " 

29 "short-hand string. See clause 4.6.2" 

30 ) 

31 model_config = ConfigDict(populate_by_name=True) 

32 

33 

34class KeyValuePair(BaseModel): 

35 key: str 

36 value: str 

37 

38 

39class Endpoint(BaseModel): 

40 """ 

41 This datatype represents the parameters that are required in order to define 

42 an endpoint for notifications. This can include the endpoint's URI, a 

43 generic{key, value} array, named receiverInfo, which contains, in a 

44 generalized form, whatever extra information the broker shall convey to the 

45 receiver in order for the broker to successfully communicate with 

46 receiver (e.g Authorization material), or for the receiver to correctly 

47 interpret the received content (e.g. the Link URL to fetch an @context). 

48 

49 Additionally, it can include another generic{key, value} array, named 

50 notifierInfo, which contains the configuration that the broker needs to 

51 know in order to correctly set up the communication channel towards the 

52 receiver 

53 

54 Example of "receiverInfo" 

55 "receiverInfo": [ 

56 { 

57 "key": "H1", 

58 "value": "123" 

59 }, 

60 { 

61 "key": "H2", 

62 "value": "456" 

63 } 

64 ] 

65 

66 Example of "notifierInfo" 

67 "notifierInfo": [ 

68 { 

69 "key": "MQTT-Version", 

70 "value": "mqtt5.0" 

71 } 

72 ] 

73 """ 

74 

75 uri: AnyUrl = Field(description="Dereferenceable URI") 

76 accept: Optional[str] = Field( 

77 default=None, 

78 description="MIME type for the notification payload body " 

79 "(application/json, application/ld+json, " 

80 "application/geo+json)", 

81 ) 

82 receiverInfo: Optional[List[KeyValuePair]] = Field( 

83 default=None, 

84 description="Generic {key, value} array to convey optional information " 

85 "to the receiver", 

86 ) 

87 notifierInfo: Optional[List[KeyValuePair]] = Field( 

88 default=None, 

89 description="Generic {key, value} array to set up the communication " "channel", 

90 ) 

91 model_config = ConfigDict(populate_by_name=True) 

92 

93 @field_validator("uri") 

94 @classmethod 

95 def check_uri(cls, uri: AnyUrl): 

96 if uri.scheme not in ("http", "mqtt"): 

97 raise ValueError("NGSI-LD currently only support http and mqtt") 

98 return uri 

99 

100 @field_validator("notifierInfo") 

101 @classmethod 

102 def check_notifier_info(cls, notifierInfo: List[KeyValuePair]): 

103 # TODO add validation of notifierInfo for MQTT notification 

104 return notifierInfo 

105 

106 

107class NotificationParams(BaseModel): 

108 """ 

109 NGSI-LD Notification model. It contains the parameters that allow to 

110 convey the details of a notification, as described in NGSI-LD Spec section 5.2.14 

111 """ 

112 

113 attributes: Optional[List[str]] = Field( 

114 default=None, 

115 description="Entity Attribute Names (Properties or Relationships) to be included " 

116 "in the notification payload body. If undefined, it will mean all Attributes", 

117 ) 

118 format: Optional[str] = Field( 

119 default="normalized", 

120 description="Conveys the representation format of the entities delivered at " 

121 "notification time. By default, it will be in normalized format", 

122 ) 

123 endpoint: Endpoint = Field(..., description="Notification endpoint details") 

124 # status can either be "ok" or "failed" 

125 status: Literal["ok", "failed"] = Field( 

126 default="ok", 

127 description="Status of the Notification. It shall be 'ok' if the last attempt " 

128 "to notify the subscriber succeeded. It shall be 'failed' if the last" 

129 " attempt to notify the subscriber failed", 

130 ) 

131 

132 # Additional members 

133 timesSent: Optional[int] = Field( 

134 default=None, 

135 description="Number of times that the notification was sent. Provided by the " 

136 "system when querying the details of a subscription", 

137 ) 

138 lastNotification: Optional[str] = Field( 

139 default=None, 

140 description="Timestamp corresponding to the instant when the last notification " 

141 "was sent. Provided by the system when querying the details of a subscription", 

142 ) 

143 lastFailure: Optional[str] = Field( 

144 default=None, 

145 description="Timestamp corresponding to the instant when the last notification" 

146 " resulting in failure was sent. Provided by the system when querying the details of a subscription", 

147 ) 

148 lastSuccess: Optional[str] = Field( 

149 default=None, 

150 description="Timestamp corresponding to the instant when the last successful " 

151 "notification was sent. Provided by the system when querying the details of a subscription", 

152 ) 

153 model_config = ConfigDict(populate_by_name=True) 

154 

155 

156class TemporalQuery(BaseModel): 

157 """ 

158 Temporal query according to NGSI-LD Spec section 5.2.21 

159 

160 timerel: 

161 Temporal relationship, one of "before", "after" and "between". 

162 "before": before the time specified by timeAt. 

163 "after": after the time specified by timeAt. 

164 "between": after the time specified by timeAt and before the time specified by 

165 endtimeAt 

166 timeAt: 

167 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z 

168 endTimeAt (optional): 

169 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z 

170 Only required when timerel="between" 

171 timeproperty: str 

172 Representing a Propertyname of the Property that contains the temporal data that 

173 will be used to resolve the temporal query. If not specified, the default is 

174 "observedAt" 

175 

176 """ 

177 

178 model_config = ConfigDict(populate_by_name=True) 

179 timerel: Literal["before", "after", "between"] = Field( 

180 ..., 

181 description="String representing the temporal relationship as defined by clause " 

182 "4.11 (Allowed values: 'before', 'after', and 'between') ", 

183 ) 

184 timeAt: str = Field( 

185 ..., 

186 description="String representing the timeAt parameter as defined by clause " 

187 "4.11. It shall be a DateTime ", 

188 ) 

189 endTimeAt: Optional[str] = Field( 

190 default=None, 

191 description="String representing the endTimeAt parameter as defined by clause " 

192 "4.11. It shall be a DateTime. Cardinality shall be 1 if timerel is " 

193 "equal to 'between' ", 

194 ) 

195 timeproperty: Optional[str] = Field( 

196 default=None, 

197 description="String representing a Property name. The name of the Property that " 

198 "contains the temporal data that will be used to resolve the " 

199 "temporal query. If not specified, ", 

200 ) 

201 

202 @field_validator("timeAt", "endTimeAt") 

203 @classmethod 

204 def check_uri(cls, v: str): 

205 if not v: 

206 return v 

207 else: 

208 try: 

209 dateutil.parser.isoparse(v) 

210 except ValueError: 

211 raise ValueError("timeAt must be in ISO8061 format") 

212 return v 

213 

214 # when timerel=between, endTimeAt must be specified 

215 @model_validator(mode="after") 

216 def check_passwords_match(self) -> "TemporalQuery": 

217 if self.timerel == "between" and self.endTimeAt is None: 

218 raise ValueError('When timerel="between", endTimeAt must be specified') 

219 return self 

220 

221 

222class SubscriptionLD(BaseModel): 

223 """ 

224 Context Subscription model according to NGSI-LD Spec section 5.2.12 

225 """ 

226 

227 id: Optional[str] = Field( 

228 default=None, description="Subscription identifier (JSON-LD @id)" 

229 ) 

230 type: str = Field(default="Subscription", description="JSON-LD @type") 

231 subscriptionName: Optional[str] = Field( 

232 default=None, description="A (short) name given to this Subscription" 

233 ) 

234 description: Optional[str] = Field( 

235 default=None, description="Subscription description" 

236 ) 

237 entities: Optional[List[EntityInfo]] = Field( 

238 default=None, description="Entities subscribed" 

239 ) 

240 watchedAttributes: Optional[List[str]] = Field( 

241 default=None, description="Watched Attributes (Properties or Relationships)" 

242 ) 

243 notificationTrigger: Optional[List[str]] = Field( 

244 default=None, description="Notification triggers" 

245 ) 

246 timeInterval: Optional[int] = Field( 

247 default=None, description="Time interval in seconds" 

248 ) 

249 q: Optional[str] = Field( 

250 default=None, 

251 description="Query met by subscribed entities to trigger the notification", 

252 ) 

253 

254 @field_validator("q") 

255 @classmethod 

256 def check_q(cls, v: str): 

257 return validate_ngsi_ld_query(v) 

258 

259 geoQ: Optional[GeoQuery] = Field( 

260 default=None, 

261 description="Geoquery met by subscribed entities to trigger the notification", 

262 ) 

263 csf: Optional[str] = Field(default=None, description="Context source filter") 

264 isActive: bool = Field( 

265 default=True, 

266 description="Indicates if the Subscription is under operation (True) or paused (False)", 

267 ) 

268 notification: NotificationParams = Field(..., description="Notification details") 

269 expiresAt: Optional[str] = Field( 

270 default=None, description="Expiration date for the subscription" 

271 ) 

272 throttling: Optional[int] = Field( 

273 default=None, 

274 description="Minimal period of time in seconds between two consecutive notifications", 

275 ) 

276 temporalQ: Optional[TemporalQuery] = Field( 

277 default=None, description="Temporal Query" 

278 ) 

279 lang: Optional[str] = Field( 

280 default=None, description="Language filter applied to the query" 

281 ) 

282 model_config = ConfigDict(populate_by_name=True)