Coverage for filip/models/ngsi_ld/subscriptions.py: 95%
86 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1from typing import List, Optional, Literal
2from pydantic import (
3 ConfigDict,
4 BaseModel,
5 Field,
6 HttpUrl,
7 AnyUrl,
8 field_validator,
9 model_validator,
10)
11import dateutil.parser
12from filip.models.ngsi_ld.base import GeoQuery, validate_ngsi_ld_query
15class EntityInfo(BaseModel):
16 """
17 In v1.3.1 it is specified as EntityInfo
18 In v1.6.1 it is specified in a new data type, namely EntitySelector
19 """
21 id: Optional[str] = Field(default=None, description="Entity identifier (valid URI)")
22 idPattern: Optional[str] = Field(
23 default=None, description="Regular expression as per IEEE POSIX 1003.2™ [11]"
24 )
25 type: str = Field(
26 description="Fully Qualified Name of an Entity Type or the Entity Type Name as a "
27 "short-hand string. See clause 4.6.2"
28 )
29 model_config = ConfigDict(populate_by_name=True)
32class KeyValuePair(BaseModel):
33 key: str
34 value: str
37class Endpoint(BaseModel):
38 """
39 This datatype represents the parameters that are required in order to define
40 an endpoint for notifications. This can include the endpoint's URI, a
41 generic{key, value} array, named receiverInfo, which contains, in a
42 generalized form, whatever extra information the broker shall convey to the
43 receiver in order for the broker to successfully communicate with
44 receiver (e.g Authorization material), or for the receiver to correctly
45 interpret the received content (e.g. the Link URL to fetch an @context).
47 Additionally, it can include another generic{key, value} array, named
48 notifierInfo, which contains the configuration that the broker needs to
49 know in order to correctly set up the communication channel towards the
50 receiver
52 Example of "receiverInfo"
53 "receiverInfo": [
54 {
55 "key": "H1",
56 "value": "123"
57 },
58 {
59 "key": "H2",
60 "value": "456"
61 }
62 ]
64 Example of "notifierInfo"
65 "notifierInfo": [
66 {
67 "key": "MQTT-Version",
68 "value": "mqtt5.0"
69 }
70 ]
71 """
73 uri: AnyUrl = Field(description="Dereferenceable URI")
74 accept: Optional[str] = Field(
75 default=None,
76 description="MIME type for the notification payload body "
77 "(application/json, application/ld+json, "
78 "application/geo+json)",
79 )
80 receiverInfo: Optional[List[KeyValuePair]] = Field(
81 default=None,
82 description="Generic {key, value} array to convey optional information "
83 "to the receiver",
84 )
85 notifierInfo: Optional[List[KeyValuePair]] = Field(
86 default=None,
87 description="Generic {key, value} array to set up the communication " "channel",
88 )
89 model_config = ConfigDict(populate_by_name=True)
91 @field_validator("uri")
92 @classmethod
93 def check_uri(cls, uri: AnyUrl):
94 if uri.scheme not in ("http", "mqtt"):
95 raise ValueError("NGSI-LD currently only support http and mqtt")
96 return uri
98 @field_validator("notifierInfo")
99 @classmethod
100 def check_notifier_info(cls, notifierInfo: List[KeyValuePair]):
101 # TODO add validation of notifierInfo for MQTT notification
102 return notifierInfo
105class NotificationParams(BaseModel):
106 """
107 NGSI-LD Notification model. It contains the parameters that allow to
108 convey the details of a notification, as described in NGSI-LD Spec section 5.2.14
109 """
111 attributes: Optional[List[str]] = Field(
112 default=None,
113 description="Entity Attribute Names (Properties or Relationships) to be included "
114 "in the notification payload body. If undefined, it will mean all Attributes",
115 )
116 format: Optional[str] = Field(
117 default="normalized",
118 description="Conveys the representation format of the entities delivered at "
119 "notification time. By default, it will be in normalized format",
120 )
121 endpoint: Endpoint = Field(..., description="Notification endpoint details")
122 # status can either be "ok" or "failed"
123 status: Literal["ok", "failed"] = Field(
124 default="ok",
125 description="Status of the Notification. It shall be 'ok' if the last attempt "
126 "to notify the subscriber succeeded. It shall be 'failed' if the last"
127 " attempt to notify the subscriber failed",
128 )
130 # Additional members
131 timesSent: Optional[int] = Field(
132 default=None,
133 description="Number of times that the notification was sent. Provided by the "
134 "system when querying the details of a subscription",
135 )
136 lastNotification: Optional[str] = Field(
137 default=None,
138 description="Timestamp corresponding to the instant when the last notification "
139 "was sent. Provided by the system when querying the details of a subscription",
140 )
141 lastFailure: Optional[str] = Field(
142 default=None,
143 description="Timestamp corresponding to the instant when the last notification"
144 " resulting in failure was sent. Provided by the system when querying the details of a subscription",
145 )
146 lastSuccess: Optional[str] = Field(
147 default=None,
148 description="Timestamp corresponding to the instant when the last successful "
149 "notification was sent. Provided by the system when querying the details of a subscription",
150 )
151 model_config = ConfigDict(populate_by_name=True)
154class TemporalQuery(BaseModel):
155 """
156 Temporal query according to NGSI-LD Spec section 5.2.21
158 timerel:
159 Temporal relationship, one of "before", "after" and "between".
160 "before": before the time specified by timeAt.
161 "after": after the time specified by timeAt.
162 "between": after the time specified by timeAt and before the time specified by
163 endtimeAt
164 timeAt:
165 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z
166 endTimeAt (optional):
167 A DateTime object following ISO 8061, e.g. 2007-12-24T18:21Z
168 Only required when timerel="between"
169 timeproperty: str
170 Representing a Propertyname of the Property that contains the temporal data that
171 will be used to resolve the temporal query. If not specified, the default is
172 "observedAt"
174 """
176 model_config = ConfigDict(populate_by_name=True)
177 timerel: Literal["before", "after", "between"] = Field(
178 ...,
179 description="String representing the temporal relationship as defined by clause "
180 "4.11 (Allowed values: 'before', 'after', and 'between') ",
181 )
182 timeAt: str = Field(
183 ...,
184 description="String representing the timeAt parameter as defined by clause "
185 "4.11. It shall be a DateTime ",
186 )
187 endTimeAt: Optional[str] = Field(
188 default=None,
189 description="String representing the endTimeAt parameter as defined by clause "
190 "4.11. It shall be a DateTime. Cardinality shall be 1 if timerel is "
191 "equal to 'between' ",
192 )
193 timeproperty: Optional[str] = Field(
194 default=None,
195 description="String representing a Property name. The name of the Property that "
196 "contains the temporal data that will be used to resolve the "
197 "temporal query. If not specified, ",
198 )
200 @field_validator("timeAt", "endTimeAt")
201 @classmethod
202 def check_uri(cls, v: str):
203 if not v:
204 return v
205 else:
206 try:
207 dateutil.parser.isoparse(v)
208 except ValueError:
209 raise ValueError("timeAt must be in ISO8061 format")
210 return v
212 # when timerel=between, endTimeAt must be specified
213 @model_validator(mode="after")
214 def check_passwords_match(self) -> "TemporalQuery":
215 if self.timerel == "between" and self.endTimeAt is None:
216 raise ValueError('When timerel="between", endTimeAt must be specified')
217 return self
220class SubscriptionLD(BaseModel):
221 """
222 Context Subscription model according to NGSI-LD Spec section 5.2.12
223 """
225 id: Optional[str] = Field(
226 default=None, description="Subscription identifier (JSON-LD @id)"
227 )
228 type: str = Field(default="Subscription", description="JSON-LD @type")
229 subscriptionName: Optional[str] = Field(
230 default=None, description="A (short) name given to this Subscription"
231 )
232 description: Optional[str] = Field(
233 default=None, description="Subscription description"
234 )
235 entities: Optional[List[EntityInfo]] = Field(
236 default=None, description="Entities subscribed"
237 )
238 watchedAttributes: Optional[List[str]] = Field(
239 default=None, description="Watched Attributes (Properties or Relationships)"
240 )
241 notificationTrigger: Optional[List[str]] = Field(
242 default=None, description="Notification triggers"
243 )
244 timeInterval: Optional[int] = Field(
245 default=None, description="Time interval in seconds"
246 )
247 q: Optional[str] = Field(
248 default=None,
249 description="Query met by subscribed entities to trigger the notification",
250 )
252 @field_validator("q")
253 @classmethod
254 def check_q(cls, v: str):
255 return validate_ngsi_ld_query(v)
257 geoQ: Optional[GeoQuery] = Field(
258 default=None,
259 description="Geoquery met by subscribed entities to trigger the notification",
260 )
261 csf: Optional[str] = Field(default=None, description="Context source filter")
262 isActive: bool = Field(
263 default=True,
264 description="Indicates if the Subscription is under operation (True) or paused (False)",
265 )
266 notification: NotificationParams = Field(..., description="Notification details")
267 expiresAt: Optional[str] = Field(
268 default=None, description="Expiration date for the subscription"
269 )
270 throttling: Optional[int] = Field(
271 default=None,
272 description="Minimal period of time in seconds between two consecutive notifications."
273 "It must be greater than 0 if set. If not set, it means no throttling is used.",
274 )
276 @field_validator("throttling")
277 @classmethod
278 def check_throttling(cls, throttling):
279 """
280 Validate the throttling value. It must be greater than 0 if provided.
281 """
282 if throttling is not None and throttling <= 0:
283 raise ValueError("Throttling must be greater than 0")
284 return throttling
286 temporalQ: Optional[TemporalQuery] = Field(
287 default=None, description="Temporal Query"
288 )
289 lang: Optional[str] = Field(
290 default=None, description="Language filter applied to the query"
291 )
292 model_config = ConfigDict(populate_by_name=True)