Coverage for tutorials/ngsi_v2/e5_iot_thermal_zone_control/e5_iot_thermal_zone_control.py: 0%
43 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2# # Exercise 5: Virtual Thermal Zone with Control
4# Create a virtual IoT device that simulates a heater for your
5# thermal zone. The heater can be turned on and off via a simple hysteresis
6# controller. The devices from e4_iot_thermal_zone_sensors.py will be loaded
7# from the stored *.json-files.
9# The input sections are marked with 'ToDo'
11# #### Steps to complete:
12# 1. Set up the missing parameters in the parameter section
13# 2. Retrieve the service group and device configurations of already existing
14# devices from the IoT-Agent
15# 3. Create a third device configuration for a heater holding a command
16# for turning it `on` and `off` and post it to the server
17# 4. Create an MQTT client using the filip.client.mqtt package and register
18# your service group and your devices
19# 4. Define a callback function that will be executed when the client
20# receives a command. Decode the message and set the update state in
21# simulation model. Afterwards, acknowledge the command using the api of the
22# IoTAMQTTClient.
23# 5. Add the callback for your heater device to the IoTAMQTTClient
24# 6. Create an MQTT subscription for asynchronous communication that
25# gets triggered when the temperature attribute changes.
26# 7. Write a second callback that represents your controller. It should get
27# triggered when the MQTTClient receives a notification message due to your
28# subscription. Add the callback to your MQTTClient using the original
29# paho-api (`message_callback_add`)
30# 8. Run the simulation and plot
31"""
33# ## Import packages
34import json
35from pathlib import Path
36import time
37from typing import List
38from urllib.parse import urlparse
39from uuid import uuid4
40import paho.mqtt.client as mqtt
41from pydantic import TypeAdapter
42import matplotlib.pyplot as plt
44# import from filip
45from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient
46from filip.clients.mqtt import IoTAMQTTClient
47from filip.models.base import DataType, FiwareHeader
48from filip.models.ngsi_v2.context import NamedCommand
49from filip.models.ngsi_v2.subscriptions import Subscription, Message
50from filip.models.ngsi_v2.iot import (
51 Device,
52 DeviceAttribute,
53 DeviceCommand,
54 PayloadProtocol,
55 ServiceGroup,
56)
57from filip.utils.cleanup import clear_context_broker, clear_iot_agent
59# import simulation model
60from tutorials.ngsi_v2.simulation_model import SimulationModel
62# ## Parameters
63# ToDo: Enter your context broker host and port, e.g http://localhost:1026.
64CB_URL = "http://localhost:1026"
65# ToDo: Enter your IoT-Agent host and port, e.g http://localhost:4041.
66IOTA_URL = "http://localhost:4041"
67# ToDo: Enter your mqtt broker url, e.g mqtt://test.mosquitto.org:1883.
68MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883"
69# ToDo: Enter your mqtt broker url, e.g mqtt://mosquitto:1883.
70MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883"
71# ToDo: If required, enter your username and password.
72MQTT_USER = ""
73MQTT_PW = ""
75# ToDo: Change the name of your service to something unique. If you run
76# on a shared instance this is very important in order to avoid user
77# collisions. You will use this service through the whole tutorial.
78# If you forget to change it, an error will be raised!
79# FIWARE-Service
80SERVICE = "filip_tutorial"
81# FIWARE-Service path
82SERVICE_PATH = "/"
84# ToDo: Change the APIKEY to something unique. This represents the "token"
85# for IoT devices to connect (send/receive data) with the platform. In the
86# context of MQTT, APIKEY is linked with the topic used for communication.
87APIKEY = "your_apikey"
88UNIQUE_ID = str(uuid4())
89TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller"
90print(TOPIC_CONTROLLER)
91# path to json-files to store entity data for follow-up exercises
92WRITE_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_groups.json")
93WRITE_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_devices.json")
94WRITE_SUBSCRIPTIONS_FILEPATH = Path("../e5_iot_thermal_zone_control_subscriptions.json")
95# path to read json-files from previous exercises
96READ_GROUPS_FILEPATH = Path("../e4_iot_thermal_zone_sensors_groups.json")
97READ_DEVICES_FILEPATH = Path("../e4_iot_thermal_zone_sensors_devices.json")
99# opening the files
100with open(READ_GROUPS_FILEPATH, "r") as groups_file, open(
101 READ_DEVICES_FILEPATH, "r"
102) as devices_file:
103 json_groups = json.load(groups_file)
104 json_devices = json.load(devices_file)
106# set parameters for the temperature simulation
107TEMPERATURE_MAX = 10 # maximal ambient temperature
108TEMPERATURE_MIN = -5 # minimal ambient temperature
109TEMPERATURE_ZONE_START = 20 # start value of the zone temperature
111T_SIM_START = 0 # simulation start time in seconds
112T_SIM_END = 24 * 60 * 60 # simulation end time in seconds
113COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds
115# ## Main script
116if __name__ == "__main__":
117 # create a fiware header object
118 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH)
119 # clear the state of your service and scope
120 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
121 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
123 # instantiate simulation model
124 sim_model = SimulationModel(
125 t_start=T_SIM_START,
126 t_end=T_SIM_END,
127 temp_max=TEMPERATURE_MAX,
128 temp_min=TEMPERATURE_MIN,
129 temp_start=TEMPERATURE_ZONE_START,
130 )
132 # define lists to store historical data
133 history_weather_station = []
134 history_zone_temperature_sensor = []
135 history_heater = []
137 # create clients and also restore devices and groups from file
138 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups)
139 devices = TypeAdapter(List[Device]).validate_python(json_devices)
140 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header)
141 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header)
142 iotac.post_groups(service_groups=groups)
143 iotac.post_devices(devices=devices)
145 # ToDo: Get the device configurations from the server.
146 weather_station = iotac.get_device(device_id="device:001")
147 zone_temperature_sensor = ...
149 # ToDo: Get the service group configurations from the server.
150 group = iotac.get_group(resource="/iot/json", apikey=...)
152 # ToDo: Create an additional device holding a command attribute and
153 # post it to the IoT-Agent. It should be mapped to the `type` heater.
154 # create the sim_time attribute and add it during device creation
155 t_sim = DeviceAttribute(name="sim_time", object_id="t_sim", type="Number")
157 # ToDo: Create the command attribute of name `heater_on` (currently it is
158 # not possible to add metadata here).
159 cmd = DeviceCommand(name=..., type=...)
161 # ToDo: Create the device configuration and send it to the server.
162 heater = Device(...)
164 iotac.post_device(device=heater)
166 # ToDo: Check the entity that corresponds to your device.
167 heater_entity = cbc.get_entity(
168 entity_id=heater.entity_name, entity_type=heater.entity_type
169 )
170 print(
171 f"Your device entity before running the simulation: \n "
172 f"{heater_entity.model_dump_json(indent=2)}"
173 )
175 # create a MQTTv5 client with paho-mqtt and the known groups and devices.
176 mqttc = IoTAMQTTClient(
177 protocol=mqtt.MQTTv5,
178 devices=[weather_station, zone_temperature_sensor, heater],
179 service_groups=[group],
180 )
181 # set user data if required
182 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)
184 # ToDo: Implement a callback function that gets triggered when the
185 # command is sent to the device. The incoming command should update the
186 # heater attribute of the simulation model.
187 def on_command(client, obj, msg):
188 """
189 Callback for incoming commands
190 """
191 # decode the message payload using the libraries builtin encoders
192 apikey, device_id, payload = client.get_encoder(
193 PayloadProtocol.IOTA_JSON
194 ).decode_message(msg=msg)
195 # map the command value to the simulation
196 sim_model.heater_on = payload[cmd.name]
198 # ToDo: Acknowledge the command. In this case commands are usually single
199 # messages. The first key is equal to the commands name.
200 client.publish(device_id=device_id, command_name=..., payload=...)
202 # ToDo: Add the command callback to your MQTTClient. This will get
203 # triggered for the specified device_id.
204 mqttc.add_command_callback(device_id=..., callback=...)
206 # ToDO: Create an MQTT subscription for asynchronous communication that
207 # gets triggered when the temperature attribute changes.
208 subscription = {
209 "description": "Subscription to receive MQTT-Notifications about "
210 "urn:ngsi-ld:ThermalZone:001",
211 "subject": {
212 "entities": [{"id": ..., "type": ...}],
213 },
214 "notification": {
215 "mqtt": {
216 "url": MQTT_BROKER_URL_INTERNAL,
217 "topic": TOPIC_CONTROLLER,
218 "user": MQTT_USER,
219 "passwd": MQTT_PW,
220 }
221 },
222 "throttling": 0,
223 }
224 # generate Subscription object for validation and post it
225 subscription = Subscription(**subscription)
226 subscription_id = cbc.post_subscription(subscription=subscription)
228 # ToDo: You need to implement a controller that controls the
229 # heater state with respect to the zone temperature. This will be
230 # implemented with asynchronous communication using MQTT-Subscriptions.
231 def on_measurement(client, obj, msg):
232 """
233 Callback for measurement notifications
234 """
235 message = Message.model_validate_json(msg.payload)
236 updated_zone_temperature_sensor = message.data[0]
238 # ToDo: Retrieve the value of temperature attribute.
239 temperature = ...
241 update = True
242 if temperature <= 19:
243 state = 1
244 elif temperature >= 21:
245 state = 0
246 else:
247 update = False
249 # ToDo: Send the command to the heater entity.
250 if update:
251 command = NamedCommand(name=cmd.name, value=state)
252 cbc.post_command(...)
254 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement)
256 # connect to the mqtt broker and subscribe to your topic
257 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED)
258 mqttc.connect(
259 host=mqtt_url.hostname,
260 port=mqtt_url.port,
261 keepalive=60,
262 bind_address="",
263 bind_port=0,
264 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
265 properties=None,
266 )
267 # subscribe to topics
268 # subscribe to all incoming command topics for the registered devices
269 mqttc.subscribe()
270 mqttc.subscribe(topic=TOPIC_CONTROLLER)
272 # create a non-blocking thread for mqtt communication
273 mqttc.loop_start()
275 # ToDo: Create a loop that publishes a message every 100 milliseconds
276 # to the broker that holds the simulation time "sim_time" and the
277 # corresponding temperature "temperature". You may use the `object_id`
278 # or the attribute name as key in your payload.
279 for t_sim in range(
280 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP)
281 ):
282 # publish the simulated ambient temperature
283 mqttc.publish(
284 device_id=weather_station.device_id,
285 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim},
286 )
288 # publish the simulated zone temperature
289 mqttc.publish(
290 device_id=zone_temperature_sensor.device_id,
291 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim},
292 )
294 # publish the 'sim_time' for the heater device
295 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim})
297 time.sleep(0.1)
298 # simulation step for next loop
299 sim_model.do_step(int(t_sim + COM_STEP))
300 # wait for 0.1 second before publishing the next values
301 time.sleep(0.1)
303 # get corresponding entities and write values to history
304 weather_station_entity = cbc.get_entity(
305 entity_id=weather_station.entity_name,
306 entity_type=weather_station.entity_type,
307 )
308 # append the data to the local history
309 history_weather_station.append(
310 {
311 "sim_time": weather_station_entity.sim_time.value,
312 "temperature": weather_station_entity.temperature.value,
313 }
314 )
316 # get zone temperature sensor and write values to history
317 zone_temperature_sensor_entity = cbc.get_entity(
318 entity_id=zone_temperature_sensor.entity_name,
319 entity_type=zone_temperature_sensor.entity_type,
320 )
321 history_zone_temperature_sensor.append(
322 {
323 "sim_time": zone_temperature_sensor_entity.sim_time.value,
324 "temperature": zone_temperature_sensor_entity.temperature.value,
325 }
326 )
328 # get zone temperature sensor and write values to history
329 heater_entity = cbc.get_entity(
330 entity_id=heater.entity_name, entity_type=heater.entity_type
331 )
332 history_heater.append(
333 {
334 "sim_time": heater_entity.sim_time.value,
335 "on_off": heater_entity.heater_on_info.value,
336 }
337 )
339 # close the mqtt listening thread
340 mqttc.loop_stop()
341 # disconnect the mqtt device
342 mqttc.disconnect()
344 print(
345 cbc.get_entity(
346 entity_id=heater.entity_name, entity_type=heater.entity_type
347 ).model_dump_json(indent=2)
348 )
350 # plot results
351 fig, ax = plt.subplots()
352 t_simulation = [item["sim_time"] / 60 for item in history_weather_station]
353 temperature = [item["temperature"] for item in history_weather_station]
354 ax.plot(t_simulation, temperature)
355 ax.title.set_text("Weather Station")
356 ax.set_xlabel("time in min")
357 ax.set_ylabel("ambient temperature in °C")
358 plt.show()
360 fig2, ax2 = plt.subplots()
361 t_simulation = [item["sim_time"] / 60 for item in history_zone_temperature_sensor]
362 temperature = [item["temperature"] for item in history_zone_temperature_sensor]
363 ax2.plot(t_simulation, temperature)
364 ax2.title.set_text("Zone Temperature Sensor")
365 ax2.set_xlabel("time in min")
366 ax2.set_ylabel("zone temperature in °C")
367 plt.show()
369 fig3, ax3 = plt.subplots()
370 t_simulation = [item["sim_time"] / 60 for item in history_heater]
371 on_off = [item["on_off"] for item in history_heater]
372 ax3.plot(t_simulation, on_off)
373 ax3.title.set_text("Heater")
374 ax3.set_xlabel("time in min")
375 ax3.set_ylabel("on/off")
376 plt.show()
378 # write devices and groups to file and clear server state
379 assert (
380 WRITE_DEVICES_FILEPATH.suffix == ".json"
381 ), f"Wrong file extension! {WRITE_DEVICES_FILEPATH.suffix}"
382 WRITE_DEVICES_FILEPATH.touch(exist_ok=True)
383 with WRITE_DEVICES_FILEPATH.open("w", encoding="utf-8") as f:
384 devices = [item.model_dump() for item in iotac.get_device_list()]
385 json.dump(devices, f, ensure_ascii=False, indent=2)
387 assert (
388 WRITE_GROUPS_FILEPATH.suffix == ".json"
389 ), f"Wrong file extension! {WRITE_GROUPS_FILEPATH.suffix}"
390 WRITE_GROUPS_FILEPATH.touch(exist_ok=True)
391 with WRITE_GROUPS_FILEPATH.open("w", encoding="utf-8") as f:
392 groups = [item.model_dump() for item in iotac.get_group_list()]
393 json.dump(groups, f, ensure_ascii=False, indent=2)
395 assert (
396 WRITE_SUBSCRIPTIONS_FILEPATH.suffix == ".json"
397 ), f"Wrong file extension! {WRITE_SUBSCRIPTIONS_FILEPATH.suffix}"
398 WRITE_SUBSCRIPTIONS_FILEPATH.touch(exist_ok=True)
399 with WRITE_SUBSCRIPTIONS_FILEPATH.open("w", encoding="utf-8") as f:
400 subs = [item.model_dump() for item in cbc.get_subscription_list()]
401 json.dump(subs, f, ensure_ascii=False, indent=2)
403 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
404 clear_context_broker(url=CB_URL, fiware_header=fiware_header)