Coverage for tutorials/ngsi_v2/e5_iot_thermal_zone_control/e5_iot_thermal_zone_control_solution.py: 0%
43 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2# # Exercise 5: Virtual Thermal Zone with Control
4# Create a virtual IoT device that simulates a heater for your
5# thermal zone. The heater can be turned on and off via a simple hysteresis
6# controller. The devices from e4_iot_thermal_zone_sensors.py will be loaded
7# from the stored *.json-files.
9# The input sections are marked with 'ToDo'
11# #### Steps to complete:
12# 1. Set up the missing parameters in the parameter section
13# 2. Retrieve the service group and device configurations of already existing
14# devices from the IoT-Agent
15# 3. Create a third device configuration for a heater holding a command
16# for turning it `on` and `off` and post it to the server
17# 4. Create an MQTT client using the filip.client.mqtt package and register
18# your service group and your devices
19# 4. Define a callback function that will be executed when the client
20# receives a command. Decode the message and set the update state in
21# simulation model. Afterwards, acknowledge the command using the api of the
22# IoTAMQTTClient.
23# 5. Add the callback for your heater device to the IoTAMQTTClient
24# 6. Create an MQTT subscription for asynchronous communication that
25# gets triggered when the temperature attribute changes.
26# 7. Write a second callback that represents your controller. It should get
27# triggered when the MQTTClient receives a notification message due to your
28# subscription. Add the callback to your MQTTClient using the original
29# paho-api (`message_callback_add`)
30# 8. Run the simulation and plot
31"""
33# ## Import packages
34import json
35from pathlib import Path
36import time
37from typing import List
38from urllib.parse import urlparse
39from uuid import uuid4
40import paho.mqtt.client as mqtt
41from pydantic import TypeAdapter
42import matplotlib.pyplot as plt
44# import from filip
45from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient
46from filip.clients.mqtt import IoTAMQTTClient
47from filip.models.base import DataType, FiwareHeader
48from filip.models.ngsi_v2.context import NamedCommand
49from filip.models.ngsi_v2.subscriptions import Subscription, Message
50from filip.models.ngsi_v2.iot import (
51 Device,
52 DeviceAttribute,
53 DeviceCommand,
54 PayloadProtocol,
55 ServiceGroup,
56)
57from filip.utils.cleanup import clear_context_broker, clear_iot_agent
59# import simulation model
60from tutorials.ngsi_v2.simulation_model import SimulationModel
62# ## Parameters
63# ToDo: Enter your context broker host and port, e.g http://localhost:1026.
64CB_URL = "http://localhost:1026"
65# ToDo: Enter your IoT-Agent host and port, e.g http://localhost:4041.
66IOTA_URL = "http://localhost:4041"
67# ToDo: Enter your mqtt broker url, e.g mqtt://test.mosquitto.org:1883.
68MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883"
69# ToDo: Enter your mqtt broker url, e.g mqtt://mosquitto:1883.
70MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883"
71# ToDo: If required, enter your username and password.
72MQTT_USER = ""
73MQTT_PW = ""
75# ToDo: Change the name of your service to something unique. If you run
76# on a shared instance this is very important in order to avoid user
77# collisions. You will use this service through the whole tutorial.
78# If you forget to change it, an error will be raised!
79# FIWARE-Service
80SERVICE = "filip_tutorial"
81# FIWARE-Service path
82SERVICE_PATH = "/"
84# ToDo: Change the APIKEY to something unique. This represents the "token"
85# for IoT devices to connect (send/receive data) with the platform. In the
86# context of MQTT, APIKEY is linked with the topic used for communication.
87APIKEY = "your_apikey"
88UNIQUE_ID = str(uuid4())
89TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller"
90print(TOPIC_CONTROLLER)
91# path to json-files to store entity data for follow-up exercises
92WRITE_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_groups.json")
93WRITE_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_devices.json")
94WRITE_SUBSCRIPTIONS_FILEPATH = Path(
95 "../e5_iot_thermal_zone_control_solution_subscriptions.json"
96)
97# path to read json-files from previous exercises
98READ_GROUPS_FILEPATH = Path("../e4_iot_thermal_zone_sensors_solution_groups.json")
99READ_DEVICES_FILEPATH = Path("../e4_iot_thermal_zone_sensors_solution_devices.json")
101# opening the files
102with open(READ_GROUPS_FILEPATH, "r") as groups_file, open(
103 READ_DEVICES_FILEPATH, "r"
104) as devices_file:
105 json_groups = json.load(groups_file)
106 json_devices = json.load(devices_file)
108# set parameters for the temperature simulation
109TEMPERATURE_MAX = 10 # maximal ambient temperature
110TEMPERATURE_MIN = -5 # minimal ambient temperature
111TEMPERATURE_ZONE_START = 20 # start value of the zone temperature
113T_SIM_START = 0 # simulation start time in seconds
114T_SIM_END = 24 * 60 * 60 # simulation end time in seconds
115COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds
117# ## Main script
118if __name__ == "__main__":
119 # create a fiware header object
120 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH)
121 # clear the state of your service and scope
122 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
123 clear_context_broker(url=CB_URL, fiware_header=fiware_header)
125 # instantiate simulation model
126 sim_model = SimulationModel(
127 t_start=T_SIM_START,
128 t_end=T_SIM_END,
129 temp_max=TEMPERATURE_MAX,
130 temp_min=TEMPERATURE_MIN,
131 temp_start=TEMPERATURE_ZONE_START,
132 )
134 # define lists to store historical data
135 history_weather_station = []
136 history_zone_temperature_sensor = []
137 history_heater = []
139 # create clients and also restore devices and groups from file
140 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups)
141 devices = TypeAdapter(List[Device]).validate_python(json_devices)
142 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header)
143 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header)
144 iotac.post_groups(service_groups=groups)
145 iotac.post_devices(devices=devices)
147 # ToDo: Get the device configurations from the server.
148 weather_station = iotac.get_device(device_id="device:001")
149 zone_temperature_sensor = iotac.get_device(device_id="device:002")
151 # ToDo: Get the service group configurations from the server.
152 group = iotac.get_group(resource="/iot/json", apikey=APIKEY)
154 # ToDo: Create an additional device holding a command attribute and
155 # post it to the IoT-Agent. It should be mapped to the `type` heater.
156 # create the sim_time attribute and add it during device creation
157 t_sim = DeviceAttribute(name="sim_time", object_id="t_sim", type="Number")
159 # ToDo: Create the command attribute of name `heater_on` (currently it is
160 # not possible to add metadata here).
161 cmd = DeviceCommand(name="heater_on", type=DataType.BOOLEAN)
163 # ToDo: Create the device configuration and send it to the server.
164 heater = Device(
165 device_id="device:003",
166 entity_name="urn:ngsi-ld:Heater:001",
167 entity_type="Heater",
168 apikey=APIKEY,
169 attributes=[t_sim],
170 commands=[cmd],
171 transport="MQTT",
172 protocol="IoTA-JSON",
173 )
175 iotac.post_device(device=heater)
177 # ToDo: Check the entity that corresponds to your device.
178 heater_entity = cbc.get_entity(
179 entity_id=heater.entity_name, entity_type=heater.entity_type
180 )
181 print(
182 f"Your device entity before running the simulation: \n "
183 f"{heater_entity.model_dump_json(indent=2)}"
184 )
186 # create a MQTTv5 client with paho-mqtt and the known groups and devices.
187 mqttc = IoTAMQTTClient(
188 protocol=mqtt.MQTTv5,
189 devices=[weather_station, zone_temperature_sensor, heater],
190 service_groups=[group],
191 )
192 # set user data if required
193 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW)
195 # ToDo: Implement a callback function that gets triggered when the
196 # command is sent to the device. The incoming command should update the
197 # heater attribute of the simulation model.
198 def on_command(client, obj, msg):
199 """
200 Callback for incoming commands
201 """
202 # decode the message payload using the libraries builtin encoders
203 apikey, device_id, payload = client.get_encoder(
204 PayloadProtocol.IOTA_JSON
205 ).decode_message(msg=msg)
206 # map the command value to the simulation
207 sim_model.heater_on = payload[cmd.name]
209 # ToDo: Acknowledge the command. In this case commands are usually single
210 # messages. The first key is equal to the commands name.
211 client.publish(
212 device_id=device_id, command_name=next(iter(payload)), payload=payload
213 )
215 # ToDo: Add the command callback to your MQTTClient. This will get
216 # triggered for the specified device_id.
217 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command)
219 # ToDO: Create an MQTT subscription for asynchronous communication that
220 # gets triggered when the temperature attribute changes.
221 subscription = {
222 "description": "Subscription to receive MQTT-Notifications about "
223 "urn:ngsi-ld:ThermalZone:001",
224 "subject": {
225 "entities": [
226 {
227 "id": zone_temperature_sensor.entity_name,
228 "type": zone_temperature_sensor.entity_type,
229 }
230 ],
231 },
232 "notification": {
233 "mqtt": {
234 "url": MQTT_BROKER_URL_INTERNAL,
235 "topic": TOPIC_CONTROLLER,
236 "user": MQTT_USER,
237 "passwd": MQTT_PW,
238 }
239 },
240 "throttling": 0,
241 }
242 # generate Subscription object for validation and post it
243 subscription = Subscription(**subscription)
244 subscription_id = cbc.post_subscription(subscription=subscription)
246 # ToDo: You need to implement a controller that controls the
247 # heater state with respect to the zone temperature. This will be
248 # implemented with asynchronous communication using MQTT-Subscriptions.
249 def on_measurement(client, obj, msg):
250 """
251 Callback for measurement notifications
252 """
253 message = Message.model_validate_json(msg.payload)
254 updated_zone_temperature_sensor = message.data[0]
256 # ToDo: Retrieve the value of temperature attribute.
257 temperature = updated_zone_temperature_sensor.temperature.value
259 update = True
260 if temperature <= 19:
261 state = 1
262 elif temperature >= 21:
263 state = 0
264 else:
265 update = False
267 # ToDo: Send the command to the heater entity.
268 if update:
269 command = NamedCommand(name=cmd.name, value=state)
270 cbc.post_command(
271 entity_id=heater.entity_name,
272 entity_type=heater.entity_type,
273 command=command,
274 )
276 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement)
278 # connect to the mqtt broker and subscribe to your topic
279 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED)
280 mqttc.connect(
281 host=mqtt_url.hostname,
282 port=mqtt_url.port,
283 keepalive=60,
284 bind_address="",
285 bind_port=0,
286 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
287 properties=None,
288 )
289 # subscribe to topics
290 # subscribe to all incoming command topics for the registered devices
291 mqttc.subscribe()
292 mqttc.subscribe(topic=TOPIC_CONTROLLER)
294 # create a non-blocking thread for mqtt communication
295 mqttc.loop_start()
297 # ToDo: Create a loop that publishes a message every 100 milliseconds
298 # to the broker that holds the simulation time "sim_time" and the
299 # corresponding temperature "temperature". You may use the `object_id`
300 # or the attribute name as key in your payload.
301 for t_sim in range(
302 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP)
303 ):
304 # publish the simulated ambient temperature
305 mqttc.publish(
306 device_id=weather_station.device_id,
307 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim},
308 )
310 # publish the simulated zone temperature
311 mqttc.publish(
312 device_id=zone_temperature_sensor.device_id,
313 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim},
314 )
316 # publish the 'sim_time' for the heater device
317 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim})
319 time.sleep(0.1)
320 # simulation step for next loop
321 sim_model.do_step(int(t_sim + COM_STEP))
322 # wait for 0.1 second before publishing the next values
323 time.sleep(0.1)
325 # get corresponding entities and write values to history
326 weather_station_entity = cbc.get_entity(
327 entity_id=weather_station.entity_name,
328 entity_type=weather_station.entity_type,
329 )
330 # append the data to the local history
331 history_weather_station.append(
332 {
333 "sim_time": weather_station_entity.sim_time.value,
334 "temperature": weather_station_entity.temperature.value,
335 }
336 )
338 # get zone temperature sensor and write values to history
339 zone_temperature_sensor_entity = cbc.get_entity(
340 entity_id=zone_temperature_sensor.entity_name,
341 entity_type=zone_temperature_sensor.entity_type,
342 )
343 history_zone_temperature_sensor.append(
344 {
345 "sim_time": zone_temperature_sensor_entity.sim_time.value,
346 "temperature": zone_temperature_sensor_entity.temperature.value,
347 }
348 )
350 # get zone temperature sensor and write values to history
351 heater_entity = cbc.get_entity(
352 entity_id=heater.entity_name, entity_type=heater.entity_type
353 )
354 history_heater.append(
355 {
356 "sim_time": heater_entity.sim_time.value,
357 "on_off": heater_entity.heater_on_info.value,
358 }
359 )
361 # close the mqtt listening thread
362 mqttc.loop_stop()
363 # disconnect the mqtt device
364 mqttc.disconnect()
366 print(
367 cbc.get_entity(
368 entity_id=heater.entity_name, entity_type=heater.entity_type
369 ).model_dump_json(indent=2)
370 )
372 # plot results
373 fig, ax = plt.subplots()
374 t_simulation = [item["sim_time"] / 60 for item in history_weather_station]
375 temperature = [item["temperature"] for item in history_weather_station]
376 ax.plot(t_simulation, temperature)
377 ax.title.set_text("Weather Station")
378 ax.set_xlabel("time in min")
379 ax.set_ylabel("ambient temperature in °C")
380 plt.show()
382 fig2, ax2 = plt.subplots()
383 t_simulation = [item["sim_time"] / 60 for item in history_zone_temperature_sensor]
384 temperature = [item["temperature"] for item in history_zone_temperature_sensor]
385 ax2.plot(t_simulation, temperature)
386 ax2.title.set_text("Zone Temperature Sensor")
387 ax2.set_xlabel("time in min")
388 ax2.set_ylabel("zone temperature in °C")
389 plt.show()
391 fig3, ax3 = plt.subplots()
392 t_simulation = [item["sim_time"] / 60 for item in history_heater]
393 on_off = [item["on_off"] for item in history_heater]
394 ax3.plot(t_simulation, on_off)
395 ax3.title.set_text("Heater")
396 ax3.set_xlabel("time in min")
397 ax3.set_ylabel("on/off")
398 plt.show()
400 # write devices and groups to file and clear server state
401 assert (
402 WRITE_DEVICES_FILEPATH.suffix == ".json"
403 ), f"Wrong file extension! {WRITE_DEVICES_FILEPATH.suffix}"
404 WRITE_DEVICES_FILEPATH.touch(exist_ok=True)
405 with WRITE_DEVICES_FILEPATH.open("w", encoding="utf-8") as f:
406 devices = [item.model_dump() for item in iotac.get_device_list()]
407 json.dump(devices, f, ensure_ascii=False, indent=2)
409 assert (
410 WRITE_GROUPS_FILEPATH.suffix == ".json"
411 ), f"Wrong file extension! {WRITE_GROUPS_FILEPATH.suffix}"
412 WRITE_GROUPS_FILEPATH.touch(exist_ok=True)
413 with WRITE_GROUPS_FILEPATH.open("w", encoding="utf-8") as f:
414 groups = [item.model_dump() for item in iotac.get_group_list()]
415 json.dump(groups, f, ensure_ascii=False, indent=2)
417 assert (
418 WRITE_SUBSCRIPTIONS_FILEPATH.suffix == ".json"
419 ), f"Wrong file extension! {WRITE_SUBSCRIPTIONS_FILEPATH.suffix}"
420 WRITE_SUBSCRIPTIONS_FILEPATH.touch(exist_ok=True)
421 with WRITE_SUBSCRIPTIONS_FILEPATH.open("w", encoding="utf-8") as f:
422 subs = [item.model_dump() for item in cbc.get_subscription_list()]
423 json.dump(subs, f, ensure_ascii=False, indent=2)
425 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header)
426 clear_context_broker(url=CB_URL, fiware_header=fiware_header)