Coverage for tutorials/ngsi_v2/e6_timeseries_data/e6_timeseries_data_solution.py: 0%
44 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2# # Exercise 6: Time Series Data
4# We now want store our data in the historic data storage and visualize it
6# The input sections are marked with 'ToDo'
8# #### Steps to complete:
9# 1. Set up the missing parameters in the parameter section
10# 2. Create a quantumleap client that creates subscription that gets triggered
11# by the updates on your context entities
12# 3. Run the simulation
13# 4. Retrieve the data via QuantumLeap and visualize it
14"""
16# ## Import packages
17import json
18from pathlib import Path
19import time
20from typing import List
21from urllib.parse import urlparse
22from uuid import uuid4
23import pandas as pd
24import paho.mqtt.client as mqtt
25from pydantic import TypeAdapter
26import matplotlib.pyplot as plt
28# import from filip
29from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient
30from filip.clients.mqtt import IoTAMQTTClient
31from filip.models.base import FiwareHeader
32from filip.models.ngsi_v2.context import NamedCommand
33from filip.models.ngsi_v2.subscriptions import (
34 Subscription,
35 Message,
36 Notification,
37 Subject,
38)
39from filip.models.ngsi_v2.iot import Device, PayloadProtocol, ServiceGroup
40from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap
42# import simulation model
43from tutorials.ngsi_v2.simulation_model import SimulationModel
45# ## Parameters
46# ToDo: Enter your context broker host and port, e.g. http://localhost:1026.
47CB_URL = "http://localhost:1026"
48# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041.
49IOTA_URL = "http://localhost:4041"
50# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041.
51QL_URL = "http://localhost:8668"
52# ToDo: Enter your mqtt broker url, e.g. mqtt://test.mosquitto.org:1883.
53MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883"
54# ToDo: Enter your mqtt broker url, e.g. mqtt://mosquitto:1883.
55MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883"
56# ToDo: If required, enter your username and password.
57MQTT_USER = ""
58MQTT_PW = ""
60# ToDo: Change the name of your service to something unique. If you run
61# on a shared instance this is very important in order to avoid user
62# collisions. You will use this service through the whole tutorial.
63# If you forget to change it, an error will be raised!
64# FIWARE-Service
65SERVICE = "filip_tutorial"
66# FIWARE-Service path
67SERVICE_PATH = "/"
69# ToDo: Change the APIKEY to something unique. This represents the "token"
70# for IoT devices to connect (send/receive data) with the platform. In the
71# context of MQTT, APIKEY is linked with the topic used for communication.
72APIKEY = "your_apikey"
73UNIQUE_ID = str(uuid4())
74TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller"
75print(TOPIC_CONTROLLER)
77# path to read json-files from previous exercises
78READ_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_groups.json")
79READ_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_devices.json")
80READ_SUBSCRIPTIONS_FILEPATH = Path(
81 "../e5_iot_thermal_zone_control_solution_subscriptions.json"
82)
84# opening the files
85with open(READ_GROUPS_FILEPATH, "r") as groups_file, open(
86 READ_DEVICES_FILEPATH, "r"
87) as devices_file, open(READ_SUBSCRIPTIONS_FILEPATH, "r") as subscriptions_file:
88 json_groups = json.load(groups_file)
89 json_devices = json.load(devices_file)
90 json_subscriptions = json.load(subscriptions_file)
92# set parameters for the temperature simulation
93TEMPERATURE_MAX = 10 # maximal ambient temperature
94TEMPERATURE_MIN = -5 # minimal ambient temperature
95TEMPERATURE_ZONE_START = 20 # start value of the zone temperature
97T_SIM_START = 0 # simulation start time in seconds
98T_SIM_END = 24 * 60 * 60 # simulation end time in seconds
99COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds
101# ## Main script
102if __name__ == "__main__":
103 # create a fiware header object
104 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH)
105 # clear the state of your service and scope
106 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
107 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
108 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)
110 # instantiate simulation model
111 sim_model = SimulationModel(
112 t_start=T_SIM_START,
113 t_end=T_SIM_END,
114 temp_max=TEMPERATURE_MAX,
115 temp_min=TEMPERATURE_MIN,
116 temp_start=TEMPERATURE_ZONE_START,
117 )
119 # create clients and restore devices and groups from file
120 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups)
121 devices = TypeAdapter(List[Device]).validate_python(json_devices)
122 sub = TypeAdapter(List[Subscription]).validate_python(json_subscriptions)[0]
123 sub.notification.mqtt.topic = TOPIC_CONTROLLER
124 sub.notification.mqtt.user = MQTT_USER
125 sub.notification.mqtt.passwd = MQTT_PW
126 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header)
127 cbc.post_subscription(subscription=sub)
128 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header)
129 iotac.post_groups(service_groups=groups)
130 iotac.post_devices(devices=devices)
132 # get the group and device configurations from the server
133 group = iotac.get_group(resource="/iot/json", apikey=APIKEY)
134 weather_station = iotac.get_device(device_id="device:001")
135 zone_temperature_sensor = iotac.get_device(device_id="device:002")
136 heater = iotac.get_device(device_id="device:003")
138 # create a MQTTv5 client with paho-mqtt and the known groups and
139 # devices.
140 mqttc = IoTAMQTTClient(
141 protocol=mqtt.MQTTv5,
142 devices=[weather_station, zone_temperature_sensor, heater],
143 service_groups=[group],
144 )
145 # ToDo: Set user data if required.
146 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)
147 # Implement a callback function that gets triggered when the
148 # command is sent to the device. The incoming command should update the
149 # heater attribute of the simulation model
151 def on_command(client, obj, msg):
152 """
153 Callback for incoming commands
154 """
155 # Decode the message payload using the libraries builtin encoders
156 apikey, device_id, payload = client.get_encoder(
157 PayloadProtocol.IOTA_JSON
158 ).decode_message(msg=msg)
160 sim_model.heater_on = payload[heater.commands[0].name]
162 # Acknowledge the command. Here commands are usually single
163 # messages. The first key is equal to the commands name.
164 client.publish(
165 device_id=device_id, command_name=next(iter(payload)), payload=payload
166 )
168 # Add the command callback to your MQTTClient. This will get
169 # triggered for the specified device_id.
170 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command)
172 # You need to implement a controller that controls the
173 # heater state with respect to the zone temperature. This will be
174 # implemented with asynchronous communication using MQTT-Subscriptions
175 def on_measurement(client, obj, msg):
176 """
177 Callback for measurement notifications
178 """
179 message = Message.model_validate_json(msg.payload)
180 updated_zone_temperature_sensor = message.data[0]
182 # retrieve the value of temperature attribute
183 temperature = updated_zone_temperature_sensor.temperature.value
185 update = True
186 if temperature <= 19:
187 state = 1
188 elif temperature >= 21:
189 state = 0
190 else:
191 update = False
192 # send the command to the heater entity
193 if update:
194 command = NamedCommand(name=heater.commands[0].name, value=state)
195 cbc.post_command(
196 entity_id=heater.entity_name,
197 entity_type=heater.entity_type,
198 command=command,
199 )
201 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement)
203 # ToDo: Create http subscriptions that get triggered by updates of your
204 # device attributes. Note that you can only post the subscription
205 # to the context broker.
207 cbc.post_subscription(
208 subscription=Subscription(
209 subject=Subject(
210 **{
211 "entities": [
212 {
213 "id": weather_station.entity_name,
214 "type": weather_station.entity_type,
215 }
216 ]
217 }
218 ),
219 notification=Notification(
220 **{"http": {"url": "http://quantumleap:8668/v2/notify"}}
221 ),
222 throttling=0,
223 )
224 )
226 cbc.post_subscription(
227 subscription=Subscription(
228 subject=Subject(
229 **{
230 "entities": [
231 {
232 "id": zone_temperature_sensor.entity_name,
233 "type": zone_temperature_sensor.entity_type,
234 }
235 ]
236 }
237 ),
238 notification=Notification(
239 **{"http": {"url": "http://quantumleap:8668/v2/notify"}}
240 ),
241 throttling=0,
242 )
243 )
245 cbc.post_subscription(
246 subscription=Subscription(
247 subject=Subject(
248 **{"entities": [{"id": heater.entity_name, "type": heater.entity_type}]}
249 ),
250 notification=Notification(
251 **{"http": {"url": "http://quantumleap:8668/v2/notify"}}
252 ),
253 throttling=0,
254 )
255 )
257 # connect to the mqtt broker and subscribe to your topic
258 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED)
259 mqttc.connect(
260 host=mqtt_url.hostname,
261 port=mqtt_url.port,
262 keepalive=60,
263 bind_address="",
264 bind_port=0,
265 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
266 properties=None,
267 )
269 # subscribe to topics
270 # subscribe to all incoming command topics for the registered devices
271 mqttc.subscribe()
272 mqttc.subscribe(topic=TOPIC_CONTROLLER)
274 # create a non-blocking thread for mqtt communication
275 mqttc.loop_start()
277 # Create a loop that publishes a message every 0.3 seconds to the broker
278 # that holds the simulation time "sim_time" and the corresponding
279 # temperature "temperature". You may use the `object_id`
280 # or the attribute name as key in your payload.
281 for t_sim in range(
282 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP)
283 ):
284 # publish the simulated ambient temperature
285 mqttc.publish(
286 device_id=weather_station.device_id,
287 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim},
288 )
290 # publish the simulated zone temperature
291 mqttc.publish(
292 device_id=zone_temperature_sensor.device_id,
293 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim},
294 )
296 # publish the 'sim_time' for the heater device
297 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim})
299 time.sleep(0.3)
300 # simulation step for next loop
301 sim_model.do_step(int(t_sim + COM_STEP))
302 # wait for 0.3 seconds before publishing the next values
303 time.sleep(0.3)
305 # close the mqtt listening thread
306 mqttc.loop_stop()
307 # disconnect the mqtt device
308 mqttc.disconnect()
310 # wait until all data is available
311 time.sleep(10)
313 # ToDo: Create a quantumleap client.
314 qlc = QuantumLeapClient(url=QL_URL, fiware_header=fiware_header)
316 # ToDo: Retrieve the historic data from QuantumLeap, convert them to a
317 # pandas dataframe and plot them.
318 # retrieve the data for the weather station
319 history_weather_station = qlc.get_entity_by_id(
320 entity_id=weather_station.entity_name,
321 entity_type=weather_station.entity_type,
322 last_n=10000,
323 )
325 # convert to pandas dataframe and print it
326 history_weather_station = history_weather_station.to_pandas()
327 print(history_weather_station)
328 # drop unnecessary index levels
329 history_weather_station = history_weather_station.droplevel(
330 level=("entityId", "entityType"), axis=1
331 )
332 history_weather_station["sim_time"] = pd.to_numeric(
333 history_weather_station["sim_time"], downcast="float"
334 )
335 history_weather_station["temperature"] = pd.to_numeric(
336 history_weather_station["temperature"], downcast="float"
337 )
338 # ToDo: Plot the results.
339 fig, ax = plt.subplots()
340 ax.plot(
341 history_weather_station["sim_time"] / 60, history_weather_station["temperature"]
342 )
343 ax.title.set_text("Weather Station")
344 ax.set_xlabel("time in min")
345 ax.set_ylabel("ambient temperature in °C")
346 plt.show()
348 # ToDo: Retrieve the data for the zone temperature.
349 history_zone_temperature_sensor = qlc.get_entity_by_id(
350 entity_id=zone_temperature_sensor.entity_name,
351 entity_type=zone_temperature_sensor.entity_type,
352 last_n=10000,
353 )
355 # ToDo: Convert to pandas dataframe and print it.
356 history_zone_temperature_sensor = history_zone_temperature_sensor.to_pandas()
357 print(history_zone_temperature_sensor)
358 # ToDo: Drop unnecessary index levels.
359 history_zone_temperature_sensor = history_zone_temperature_sensor.droplevel(
360 level=("entityId", "entityType"), axis=1
361 )
362 history_zone_temperature_sensor["sim_time"] = pd.to_numeric(
363 history_zone_temperature_sensor["sim_time"], downcast="float"
364 )
365 history_zone_temperature_sensor["temperature"] = pd.to_numeric(
366 history_zone_temperature_sensor["temperature"], downcast="float"
367 )
368 # ToDo: Plot the results.
369 fig2, ax2 = plt.subplots()
370 ax2.plot(
371 history_zone_temperature_sensor["sim_time"] / 60,
372 history_zone_temperature_sensor["temperature"],
373 )
374 ax2.title.set_text("Zone Temperature Sensor")
375 ax2.set_xlabel("time in min")
376 ax2.set_ylabel("zone temperature in °C")
377 plt.show()
379 # ToDo: Retrieve the data for the heater.
380 history_heater = qlc.get_entity_by_id(
381 entity_id=heater.entity_name, entity_type=heater.entity_type, last_n=10000
382 )
384 # convert to pandas dataframe and print it
385 history_heater = history_heater.to_pandas()
386 history_heater = history_heater.replace(" ", 0)
387 print(history_heater)
389 # ToDo: Drop unnecessary index levels.
390 history_heater = history_heater.droplevel(level=("entityId", "entityType"), axis=1)
391 history_heater["sim_time"] = pd.to_numeric(
392 history_heater["sim_time"], downcast="float"
393 )
394 history_heater["heater_on_info"] = pd.to_numeric(
395 history_heater["heater_on_info"], downcast="float"
396 )
397 # ToDo: Plot the results.
398 fig3, ax3 = plt.subplots()
399 ax3.plot(history_heater["sim_time"] / 60, history_heater["heater_on_info"])
400 ax3.title.set_text("Heater")
401 ax3.set_xlabel("time in min")
402 ax3.set_ylabel("set point")
403 plt.show()
405 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
406 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
407 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)