Coverage for filip/models/ngsi_ld/subscriptions.py: 99%
80 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-02-19 11:48 +0000
1from typing import List, Optional, Literal
2from pydantic import (
3 ConfigDict,
4 BaseModel,
5 Field,
6 HttpUrl,
7 AnyUrl,
8 field_validator,
9 model_validator,
10)
11import dateutil.parser
12from filip.models.ngsi_ld.base import GeoQuery, validate_ngsi_ld_query
15class EntityInfo(BaseModel):
16 """
17 In v1.3.1 it is specified as EntityInfo
18 In v1.6.1 it is specified in a new data type, namely EntitySelector
19 """
21 id: Optional[HttpUrl] = Field(
22 default=None, description="Entity identifier (valid URI)"
23 )
24 idPattern: Optional[str] = Field(
25 default=None, description="Regular expression as per IEEE POSIX 1003.2™ [11]"
26 )
27 type: str = Field(
28 description="Fully Qualified Name of an Entity Type or the Entity Type Name as a "
29 "short-hand string. See clause 4.6.2"
30 )
31 model_config = ConfigDict(populate_by_name=True)
34class KeyValuePair(BaseModel):
35 key: str
36 value: str
39class Endpoint(BaseModel):
40 """
41 This datatype represents the parameters that are required in order to define
42 an endpoint for notifications. This can include the endpoint's URI, a
43 generic{key, value} array, named receiverInfo, which contains, in a
44 generalized form, whatever extra information the broker shall convey to the
45 receiver in order for the broker to successfully communicate with
46 receiver (e.g Authorization material), or for the receiver to correctly
47 interpret the received content (e.g. the Link URL to fetch an @context).
49 Additionally, it can include another generic{key, value} array, named
50 notifierInfo, which contains the configuration that the broker needs to
51 know in order to correctly set up the communication channel towards the
52 receiver
54 Example of "receiverInfo"
55 "receiverInfo": [
56 {
57 "key": "H1",
58 "value": "123"
59 },
60 {
61 "key": "H2",
62 "value": "456"
63 }
64 ]
66 Example of "notifierInfo"
67 "notifierInfo": [
68 {
69 "key": "MQTT-Version",
70 "value": "mqtt5.0"
71 }
72 ]
73 """
75 uri: AnyUrl = Field(description="Dereferenceable URI")
76 accept: Optional[str] = Field(
77 default=None,
78 description="MIME type for the notification payload body "
79 "(application/json, application/ld+json, "
80 "application/geo+json)",
81 )
82 receiverInfo: Optional[List[KeyValuePair]] = Field(
83 default=None,
84 description="Generic {key, value} array to convey optional information "
85 "to the receiver",
86 )
87 notifierInfo: Optional[List[KeyValuePair]] = Field(
88 default=None,
89 description="Generic {key, value} array to set up the communication " "channel",
90 )
91 model_config = ConfigDict(populate_by_name=True)
93 @field_validator("uri")
94 @classmethod
95 def check_uri(cls, uri: AnyUrl):
96 if uri.scheme not in ("http", "mqtt"):
97 raise ValueError("NGSI-LD currently only support http and mqtt")
98 return uri
100 @field_validator("notifierInfo")
101 @classmethod
102 def check_notifier_info(cls, notifierInfo: List[KeyValuePair]):
103 # TODO add validation of notifierInfo for MQTT notification
104 return notifierInfo
107class NotificationParams(BaseModel):
108 """
109 NGSI-LD Notification model. It contains the parameters that allow to
110 convey the details of a notification, as described in NGSI-LD Spec section 5.2.14
111 """
113 attributes: Optional[List[str]] = Field(
114 default=None,
115 description="Entity Attribute Names (Properties or Relationships) to be included "
116 "in the notification payload body. If undefined, it will mean all Attributes",
117 )
118 format: Optional[str] = Field(
119 default="normalized",
120 description="Conveys the representation format of the entities delivered at "
121 "notification time. By default, it will be in normalized format",
122 )
123 endpoint: Endpoint = Field(..., description="Notification endpoint details")
124 # status can either be "ok" or "failed"
125 status: Literal["ok", "failed"] = Field(
126 default="ok",
127 description="Status of the Notification. It shall be 'ok' if the last attempt "
128 "to notify the subscriber succeeded. It shall be 'failed' if the last"
129 " attempt to notify the subscriber failed",
130 )
132 # Additional members
133 timesSent: Optional[int] = Field(
134 default=None,
135 description="Number of times that the notification was sent. Provided by the "
136 "system when querying the details of a subscription",
137 )
138 lastNotification: Optional[str] = Field(
139 default=None,
140 description="Timestamp corresponding to the instant when the last notification "
141 "was sent. Provided by the system when querying the details of a subscription",
142 )
143 lastFailure: Optional[str] = Field(
144 default=None,
145 description="Timestamp corresponding to the instant when the last notification"
146 " resulting in failure was sent. Provided by the system when querying the details of a subscription",
147 )
148 lastSuccess: Optional[str] = Field(
149 default=None,
150 description="Timestamp corresponding to the instant when the last successful "
151 "notification was sent. Provided by the system when querying the details of a subscription",
152 )
153 model_config = ConfigDict(populate_by_name=True)
156class TemporalQuery(BaseModel):
157 """
158 Temporal query according to NGSI-LD Spec section 5.2.21
160 timerel:
161 Temporal relationship, one of "before", "after" and "between".
162 "before": before the time specified by timeAt.
163 "after": after the time specified by timeAt.
164 "between": after the time specified by timeAt and before the time specified by
165 endtimeAt
166 timeAt:
167 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z
168 endTimeAt (optional):
169 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z
170 Only required when timerel="between"
171 timeproperty: str
172 Representing a Propertyname of the Property that contains the temporal data that
173 will be used to resolve the temporal query. If not specified, the default is
174 "observedAt"
176 """
178 model_config = ConfigDict(populate_by_name=True)
179 timerel: Literal["before", "after", "between"] = Field(
180 ...,
181 description="String representing the temporal relationship as defined by clause "
182 "4.11 (Allowed values: 'before', 'after', and 'between') ",
183 )
184 timeAt: str = Field(
185 ...,
186 description="String representing the timeAt parameter as defined by clause "
187 "4.11. It shall be a DateTime ",
188 )
189 endTimeAt: Optional[str] = Field(
190 default=None,
191 description="String representing the endTimeAt parameter as defined by clause "
192 "4.11. It shall be a DateTime. Cardinality shall be 1 if timerel is "
193 "equal to 'between' ",
194 )
195 timeproperty: Optional[str] = Field(
196 default=None,
197 description="String representing a Property name. The name of the Property that "
198 "contains the temporal data that will be used to resolve the "
199 "temporal query. If not specified, ",
200 )
202 @field_validator("timeAt", "endTimeAt")
203 @classmethod
204 def check_uri(cls, v: str):
205 if not v:
206 return v
207 else:
208 try:
209 dateutil.parser.isoparse(v)
210 except ValueError:
211 raise ValueError("timeAt must be in ISO8061 format")
212 return v
214 # when timerel=between, endTimeAt must be specified
215 @model_validator(mode="after")
216 def check_passwords_match(self) -> "TemporalQuery":
217 if self.timerel == "between" and self.endTimeAt is None:
218 raise ValueError('When timerel="between", endTimeAt must be specified')
219 return self
222class SubscriptionLD(BaseModel):
223 """
224 Context Subscription model according to NGSI-LD Spec section 5.2.12
225 """
227 id: Optional[str] = Field(
228 default=None, description="Subscription identifier (JSON-LD @id)"
229 )
230 type: str = Field(default="Subscription", description="JSON-LD @type")
231 subscriptionName: Optional[str] = Field(
232 default=None, description="A (short) name given to this Subscription"
233 )
234 description: Optional[str] = Field(
235 default=None, description="Subscription description"
236 )
237 entities: Optional[List[EntityInfo]] = Field(
238 default=None, description="Entities subscribed"
239 )
240 watchedAttributes: Optional[List[str]] = Field(
241 default=None, description="Watched Attributes (Properties or Relationships)"
242 )
243 notificationTrigger: Optional[List[str]] = Field(
244 default=None, description="Notification triggers"
245 )
246 timeInterval: Optional[int] = Field(
247 default=None, description="Time interval in seconds"
248 )
249 q: Optional[str] = Field(
250 default=None,
251 description="Query met by subscribed entities to trigger the notification",
252 )
254 @field_validator("q")
255 @classmethod
256 def check_q(cls, v: str):
257 return validate_ngsi_ld_query(v)
259 geoQ: Optional[GeoQuery] = Field(
260 default=None,
261 description="Geoquery met by subscribed entities to trigger the notification",
262 )
263 csf: Optional[str] = Field(default=None, description="Context source filter")
264 isActive: bool = Field(
265 default=True,
266 description="Indicates if the Subscription is under operation (True) or paused (False)",
267 )
268 notification: NotificationParams = Field(..., description="Notification details")
269 expiresAt: Optional[str] = Field(
270 default=None, description="Expiration date for the subscription"
271 )
272 throttling: Optional[int] = Field(
273 default=None,
274 description="Minimal period of time in seconds between two consecutive notifications",
275 )
276 temporalQ: Optional[TemporalQuery] = Field(
277 default=None, description="Temporal Query"
278 )
279 lang: Optional[str] = Field(
280 default=None, description="Language filter applied to the query"
281 )
282 model_config = ConfigDict(populate_by_name=True)