Coverage for tutorials/ngsi_v2/e6_timeseries_data/e6_timeseries_data.py: 0%
44 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2# # Exercise 6: Time Series Data
4# We now want store our data in the historic data storage and visualize it
6# The input sections are marked with 'ToDo'
8# #### Steps to complete:
9# 1. Set up the missing parameters in the parameter section
10# 2. Create a quantumleap client that creates subscription that gets triggered
11# by the updates on your context entities
12# 3. Run the simulation
13# 4. Retrieve the data via QuantumLeap and visualize it
14"""
16# ## Import packages
17import json
18from pathlib import Path
19import time
20from typing import List
21from urllib.parse import urlparse
22from uuid import uuid4
23import pandas as pd
24import paho.mqtt.client as mqtt
25from pydantic import TypeAdapter
26import matplotlib.pyplot as plt
28# import from filip
29from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient
30from filip.clients.mqtt import IoTAMQTTClient
31from filip.models.base import FiwareHeader
32from filip.models.ngsi_v2.context import NamedCommand
33from filip.models.ngsi_v2.subscriptions import (
34 Subscription,
35 Message,
36 Subject,
37 Notification,
38)
39from filip.models.ngsi_v2.iot import Device, PayloadProtocol, ServiceGroup
40from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap
42# import simulation model
43from tutorials.ngsi_v2.simulation_model import SimulationModel
45# ## Parameters
46# ToDo: Enter your context broker host and port, e.g. http://localhost:1026.
47CB_URL = "http://localhost:1026"
48# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041.
49IOTA_URL = "http://localhost:4041"
50# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041.
51QL_URL = "http://localhost:8668"
52# ToDo: Enter your mqtt broker url, e.g. mqtt://test.mosquitto.org:1883.
53MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883"
54# ToDo: Enter your mqtt broker url, e.g. mqtt://mosquitto:1883.
55MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883"
56# ToDo: If required, enter your username and password.
57MQTT_USER = ""
58MQTT_PW = ""
60# ToDo: Change the name of your service to something unique. If you run
61# on a shared instance this is very important in order to avoid user
62# collisions. You will use this service through the whole tutorial.
63# If you forget to change it, an error will be raised!
64# FIWARE-Service
65SERVICE = "filip_tutorial"
66# FIWARE-Service path
67SERVICE_PATH = "/"
69# ToDo: Change the APIKEY to something unique. This represents the "token"
70# for IoT devices to connect (send/receive data) with the platform. In the
71# context of MQTT, APIKEY is linked with the topic used for communication.
72APIKEY = "your_apikey"
73UNIQUE_ID = str(uuid4())
74TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller"
75print(TOPIC_CONTROLLER)
77# path to read json-files from previous exercises
78READ_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_groups.json")
79READ_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_devices.json")
80READ_SUBSCRIPTIONS_FILEPATH = Path("../e5_iot_thermal_zone_control_subscriptions.json")
82# opening the files
83with open(READ_GROUPS_FILEPATH, "r") as groups_file, open(
84 READ_DEVICES_FILEPATH, "r"
85) as devices_file, open(READ_SUBSCRIPTIONS_FILEPATH, "r") as subscriptions_file:
86 json_groups = json.load(groups_file)
87 json_devices = json.load(devices_file)
88 json_subscriptions = json.load(subscriptions_file)
90# set parameters for the temperature simulation
91TEMPERATURE_MAX = 10 # maximal ambient temperature
92TEMPERATURE_MIN = -5 # minimal ambient temperature
93TEMPERATURE_ZONE_START = 20 # start value of the zone temperature
95T_SIM_START = 0 # simulation start time in seconds
96T_SIM_END = 24 * 60 * 60 # simulation end time in seconds
97COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds
99# ## Main script
100if __name__ == "__main__":
101 # create a fiware header object
102 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH)
103 # clear the state of your service and scope
104 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
105 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
106 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)
108 # instantiate simulation model
109 sim_model = SimulationModel(
110 t_start=T_SIM_START,
111 t_end=T_SIM_END,
112 temp_max=TEMPERATURE_MAX,
113 temp_min=TEMPERATURE_MIN,
114 temp_start=TEMPERATURE_ZONE_START,
115 )
117 # create clients and restore devices and groups from file
118 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups)
119 devices = TypeAdapter(List[Device]).validate_python(json_devices)
120 sub = TypeAdapter(List[Subscription]).validate_python(json_subscriptions)[0]
121 sub.notification.mqtt.topic = TOPIC_CONTROLLER
122 sub.notification.mqtt.user = MQTT_USER
123 sub.notification.mqtt.passwd = MQTT_PW
124 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header)
125 cbc.post_subscription(subscription=sub)
126 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header)
127 iotac.post_groups(service_groups=groups)
128 iotac.post_devices(devices=devices)
130 # get the group and device configurations from the server
131 group = iotac.get_group(resource="/iot/json", apikey=APIKEY)
132 weather_station = iotac.get_device(device_id="device:001")
133 zone_temperature_sensor = iotac.get_device(device_id="device:002")
134 heater = iotac.get_device(device_id="device:003")
136 # create a MQTTv5 client with paho-mqtt and the known groups and
137 # devices.
138 mqttc = IoTAMQTTClient(
139 protocol=mqtt.MQTTv5,
140 devices=[weather_station, zone_temperature_sensor, heater],
141 service_groups=[group],
142 )
143 # ToDo: Set user data if required.
144 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)
145 # Implement a callback function that gets triggered when the
146 # command is sent to the device. The incoming command should update the
147 # heater attribute of the simulation model
149 def on_command(client, obj, msg):
150 """
151 Callback for incoming commands
152 """
153 # Decode the message payload using the libraries builtin encoders
154 apikey, device_id, payload = client.get_encoder(
155 PayloadProtocol.IOTA_JSON
156 ).decode_message(msg=msg)
158 sim_model.heater_on = payload[heater.commands[0].name]
160 # Acknowledge the command. Here commands are usually single
161 # messages. The first key is equal to the commands name.
162 client.publish(
163 device_id=device_id, command_name=next(iter(payload)), payload=payload
164 )
166 # Add the command callback to your MQTTClient. This will get
167 # triggered for the specified device_id.
168 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command)
170 # You need to implement a controller that controls the
171 # heater state with respect to the zone temperature. This will be
172 # implemented with asynchronous communication using MQTT-Subscriptions
173 def on_measurement(client, obj, msg):
174 """
175 Callback for measurement notifications
176 """
177 message = Message.model_validate_json(msg.payload)
178 updated_zone_temperature_sensor = message.data[0]
180 # retrieve the value of temperature attribute
181 temperature = updated_zone_temperature_sensor.temperature.value
183 update = True
184 if temperature <= 19:
185 state = 1
186 elif temperature >= 21:
187 state = 0
188 else:
189 update = False
190 # send the command to the heater entity
191 if update:
192 command = NamedCommand(name=heater.commands[0].name, value=state)
193 cbc.post_command(
194 entity_id=heater.entity_name,
195 entity_type=heater.entity_type,
196 command=command,
197 )
199 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement)
201 # ToDo: Create http subscriptions that get triggered by updates of your
202 # device attributes. Note that you can only post the subscription
203 # to the context broker.
204 # Subscription for weather station
205 cbc.post_subscription(
206 subscription=Subscription(
207 subject=Subject(
208 **{
209 "entities": [
210 {
211 "id": weather_station.entity_name,
212 "type": weather_station.entity_type,
213 }
214 ]
215 }
216 ),
217 notification=Notification(
218 **{"http": {"url": "http://quantumleap:8668/v2/notify"}}
219 ),
220 throttling=0,
221 )
222 )
224 # Subscription for zone temperature sensor
225 cbc.post_subscription(subscription=Subscription(...))
227 # Subscription for heater
228 cbc.post_subscription(subscription=Subscription(...))
230 # connect to the mqtt broker and subscribe to your topic
231 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED)
232 mqttc.connect(
233 host=mqtt_url.hostname,
234 port=mqtt_url.port,
235 keepalive=60,
236 bind_address="",
237 bind_port=0,
238 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
239 properties=None,
240 )
242 # subscribe to topics
243 # subscribe to all incoming command topics for the registered devices
244 mqttc.subscribe()
245 mqttc.subscribe(topic=TOPIC_CONTROLLER)
247 # create a non-blocking thread for mqtt communication
248 mqttc.loop_start()
250 # Create a loop that publishes a message every 0.3 seconds to the broker
251 # that holds the simulation time "sim_time" and the corresponding
252 # temperature "temperature". You may use the `object_id`
253 # or the attribute name as key in your payload.
254 for t_sim in range(
255 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP)
256 ):
257 # publish the simulated ambient temperature
258 mqttc.publish(
259 device_id=weather_station.device_id,
260 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim},
261 )
263 # publish the simulated zone temperature
264 mqttc.publish(
265 device_id=zone_temperature_sensor.device_id,
266 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim},
267 )
269 # publish the 'sim_time' for the heater device
270 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim})
272 time.sleep(0.3)
273 # simulation step for next loop
274 sim_model.do_step(int(t_sim + COM_STEP))
275 # wait for 0.3 seconds before publishing the next values
276 time.sleep(0.3)
278 # close the mqtt listening thread
279 mqttc.loop_stop()
280 # disconnect the mqtt device
281 mqttc.disconnect()
283 # wait until all data is available
284 time.sleep(10)
286 # Todo: Create a quantumleap client.
287 qlc = QuantumLeapClient(...)
289 # ToDo: Retrieve the historic data from QuantumLeap, convert them to a
290 # pandas dataframe and plot them.
291 # retrieve the data for the weather station
292 history_weather_station = qlc.get_entity_by_id(
293 entity_id=weather_station.entity_name,
294 entity_type=weather_station.entity_type,
295 last_n=10000,
296 )
298 # convert to pandas dataframe and print it
299 history_weather_station = history_weather_station.to_pandas()
300 print(history_weather_station)
301 # drop unnecessary index levels
302 history_weather_station = history_weather_station.droplevel(
303 level=("entityId", "entityType"), axis=1
304 )
305 history_weather_station["sim_time"] = pd.to_numeric(
306 history_weather_station["sim_time"], downcast="float"
307 )
308 history_weather_station["temperature"] = pd.to_numeric(
309 history_weather_station["temperature"], downcast="float"
310 )
311 # ToDo: Plot the results.
312 fig, ax = plt.subplots()
313 ax.plot(
314 history_weather_station["sim_time"] / 60, history_weather_station["temperature"]
315 )
316 ax.title.set_text("Weather Station")
317 ax.set_xlabel("time in min")
318 ax.set_ylabel("ambient temperature in °C")
319 plt.show()
321 # ToDo: Retrieve the data for the zone temperature.
322 history_zone_temperature_sensor = ...
324 # ToDo: Convert to pandas dataframe and print it.
325 history_zone_temperature_sensor = ...
326 # ToDo: Drop unnecessary index levels.
327 history_zone_temperature_sensor = ...
328 history_zone_temperature_sensor["sim_time"] = pd.to_numeric(
329 history_zone_temperature_sensor["sim_time"], downcast="float"
330 )
331 history_zone_temperature_sensor["temperature"] = pd.to_numeric(
332 history_zone_temperature_sensor["temperature"], downcast="float"
333 )
334 # ToDo: Plot the results.
335 fig2, ax2 = plt.subplots()
336 ax2.plot(
337 history_zone_temperature_sensor["sim_time"] / 60,
338 history_zone_temperature_sensor["temperature"],
339 )
340 ax2.title.set_text("Zone Temperature Sensor")
341 ax2.set_xlabel("time in min")
342 ax2.set_ylabel("zone temperature in °C")
343 plt.show()
345 # ToDo: Retrieve the data for the heater.
346 history_heater = ...
348 # convert to pandas dataframe and print it
349 history_heater = history_heater.to_pandas()
350 history_heater = history_heater.replace(" ", 0)
351 print(history_heater)
353 # ToDo: Drop unnecessary index levels.
354 history_heater = history_heater.droplevel(level=("entityId", "entityType"), axis=1)
355 history_heater["sim_time"] = pd.to_numeric(
356 history_heater["sim_time"], downcast="float"
357 )
358 history_heater["heater_on_info"] = pd.to_numeric(
359 history_heater["heater_on_info"], downcast="float"
360 )
361 # ToDo: Plot the results.
362 fig3, ax3 = plt.subplots()
363 ax3.plot(history_heater["sim_time"] / 60, history_heater["heater_on_info"])
364 ax3.title.set_text("Heater")
365 ax3.set_xlabel("time in min")
366 ax3.set_ylabel("set point")
367 plt.show()
369 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
370 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
371 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)