Coverage for tutorials/ngsi_v2/e5_iot_thermal_zone_control/e5_iot_thermal_zone_control_solution.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2# # Exercise 5: Virtual Thermal Zone with Control 

3 

4# Create a virtual IoT device that simulates a heater for your 

5# thermal zone. The heater can be turned on and off via a simple hysteresis 

6# controller. The devices from e4_iot_thermal_zone_sensors.py will be loaded 

7# from the stored *.json-files. 

8 

9# The input sections are marked with 'ToDo' 

10 

11# #### Steps to complete: 

12# 1. Set up the missing parameters in the parameter section 

13# 2. Retrieve the service group and device configurations of already existing 

14# devices from the IoT-Agent 

15# 3. Create a third device configuration for a heater holding a command 

16# for turning it `on` and `off` and post it to the server 

17# 4. Create an MQTT client using the filip.client.mqtt package and register 

18# your service group and your devices 

19# 4. Define a callback function that will be executed when the client 

20# receives a command. Decode the message and set the update state in 

21# simulation model. Afterwards, acknowledge the command using the api of the 

22# IoTAMQTTClient. 

23# 5. Add the callback for your heater device to the IoTAMQTTClient 

24# 6. Create an MQTT subscription for asynchronous communication that 

25# gets triggered when the temperature attribute changes. 

26# 7. Write a second callback that represents your controller. It should get 

27# triggered when the MQTTClient receives a notification message due to your 

28# subscription. Add the callback to your MQTTClient using the original 

29# paho-api (`message_callback_add`) 

30# 8. Run the simulation and plot 

31""" 

32 

33# ## Import packages 

34import json 

35from pathlib import Path 

36import time 

37from typing import List 

38from urllib.parse import urlparse 

39from uuid import uuid4 

40import paho.mqtt.client as mqtt 

41from pydantic import TypeAdapter 

42import matplotlib.pyplot as plt 

43 

44# import from filip 

45from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient 

46from filip.clients.mqtt import IoTAMQTTClient 

47from filip.models.base import DataType, FiwareHeader 

48from filip.models.ngsi_v2.context import NamedCommand 

49from filip.models.ngsi_v2.subscriptions import Subscription, Message 

50from filip.models.ngsi_v2.iot import ( 

51 Device, 

52 DeviceAttribute, 

53 DeviceCommand, 

54 PayloadProtocol, 

55 ServiceGroup, 

56) 

57from filip.utils.cleanup import clear_context_broker, clear_iot_agent 

58 

59# import simulation model 

60from tutorials.ngsi_v2.simulation_model import SimulationModel 

61 

62# ## Parameters 

63# ToDo: Enter your context broker host and port, e.g http://localhost:1026. 

64CB_URL = "http://localhost:1026" 

65# ToDo: Enter your IoT-Agent host and port, e.g http://localhost:4041. 

66IOTA_URL = "http://localhost:4041" 

67# ToDo: Enter your mqtt broker url, e.g mqtt://test.mosquitto.org:1883. 

68MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883" 

69# ToDo: Enter your mqtt broker url, e.g mqtt://mosquitto:1883. 

70MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883" 

71# ToDo: If required, enter your username and password. 

72MQTT_USER = "" 

73MQTT_PW = "" 

74 

75# ToDo: Change the name of your service to something unique. If you run 

76# on a shared instance this is very important in order to avoid user 

77# collisions. You will use this service through the whole tutorial. 

78# If you forget to change it, an error will be raised! 

79# FIWARE-Service 

80SERVICE = "filip_tutorial" 

81# FIWARE-Service path 

82SERVICE_PATH = "/" 

83 

84# ToDo: Change the APIKEY to something unique. This represents the "token" 

85# for IoT devices to connect (send/receive data) with the platform. In the 

86# context of MQTT, APIKEY is linked with the topic used for communication. 

87APIKEY = "your_apikey" 

88UNIQUE_ID = str(uuid4()) 

89TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller" 

90print(TOPIC_CONTROLLER) 

91# path to json-files to store entity data for follow-up exercises 

92WRITE_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_groups.json") 

93WRITE_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_devices.json") 

94WRITE_SUBSCRIPTIONS_FILEPATH = Path( 

95 "../e5_iot_thermal_zone_control_solution_subscriptions.json" 

96) 

97# path to read json-files from previous exercises 

98READ_GROUPS_FILEPATH = Path("../e4_iot_thermal_zone_sensors_solution_groups.json") 

99READ_DEVICES_FILEPATH = Path("../e4_iot_thermal_zone_sensors_solution_devices.json") 

100 

101# opening the files 

102with open(READ_GROUPS_FILEPATH, "r") as groups_file, open( 

103 READ_DEVICES_FILEPATH, "r" 

104) as devices_file: 

105 json_groups = json.load(groups_file) 

106 json_devices = json.load(devices_file) 

107 

108# set parameters for the temperature simulation 

109TEMPERATURE_MAX = 10 # maximal ambient temperature 

110TEMPERATURE_MIN = -5 # minimal ambient temperature 

111TEMPERATURE_ZONE_START = 20 # start value of the zone temperature 

112 

113T_SIM_START = 0 # simulation start time in seconds 

114T_SIM_END = 24 * 60 * 60 # simulation end time in seconds 

115COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds 

116 

117# ## Main script 

118if __name__ == "__main__": 

119 # create a fiware header object 

120 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH) 

121 # clear the state of your service and scope 

122 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

123 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

124 

125 # instantiate simulation model 

126 sim_model = SimulationModel( 

127 t_start=T_SIM_START, 

128 t_end=T_SIM_END, 

129 temp_max=TEMPERATURE_MAX, 

130 temp_min=TEMPERATURE_MIN, 

131 temp_start=TEMPERATURE_ZONE_START, 

132 ) 

133 

134 # define lists to store historical data 

135 history_weather_station = [] 

136 history_zone_temperature_sensor = [] 

137 history_heater = [] 

138 

139 # create clients and also restore devices and groups from file 

140 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups) 

141 devices = TypeAdapter(List[Device]).validate_python(json_devices) 

142 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header) 

143 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header) 

144 iotac.post_groups(service_groups=groups) 

145 iotac.post_devices(devices=devices) 

146 

147 # ToDo: Get the device configurations from the server. 

148 weather_station = iotac.get_device(device_id="device:001") 

149 zone_temperature_sensor = iotac.get_device(device_id="device:002") 

150 

151 # ToDo: Get the service group configurations from the server. 

152 group = iotac.get_group(resource="/iot/json", apikey=APIKEY) 

153 

154 # ToDo: Create an additional device holding a command attribute and 

155 # post it to the IoT-Agent. It should be mapped to the `type` heater. 

156 # create the sim_time attribute and add it during device creation 

157 t_sim = DeviceAttribute(name="sim_time", object_id="t_sim", type="Number") 

158 

159 # ToDo: Create the command attribute of name `heater_on` (currently it is 

160 # not possible to add metadata here). 

161 cmd = DeviceCommand(name="heater_on", type=DataType.BOOLEAN) 

162 

163 # ToDo: Create the device configuration and send it to the server. 

164 heater = Device( 

165 device_id="device:003", 

166 entity_name="urn:ngsi-ld:Heater:001", 

167 entity_type="Heater", 

168 apikey=APIKEY, 

169 attributes=[t_sim], 

170 commands=[cmd], 

171 transport="MQTT", 

172 protocol="IoTA-JSON", 

173 ) 

174 

175 iotac.post_device(device=heater) 

176 

177 # ToDo: Check the entity that corresponds to your device. 

178 heater_entity = cbc.get_entity( 

179 entity_id=heater.entity_name, entity_type=heater.entity_type 

180 ) 

181 print( 

182 f"Your device entity before running the simulation: \n " 

183 f"{heater_entity.model_dump_json(indent=2)}" 

184 ) 

185 

186 # create a MQTTv5 client with paho-mqtt and the known groups and devices. 

187 mqttc = IoTAMQTTClient( 

188 protocol=mqtt.MQTTv5, 

189 devices=[weather_station, zone_temperature_sensor, heater], 

190 service_groups=[group], 

191 ) 

192 # set user data if required 

193 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW) 

194 

195 # ToDo: Implement a callback function that gets triggered when the 

196 # command is sent to the device. The incoming command should update the 

197 # heater attribute of the simulation model. 

198 def on_command(client, obj, msg): 

199 """ 

200 Callback for incoming commands 

201 """ 

202 # decode the message payload using the libraries builtin encoders 

203 apikey, device_id, payload = client.get_encoder( 

204 PayloadProtocol.IOTA_JSON 

205 ).decode_message(msg=msg) 

206 # map the command value to the simulation 

207 sim_model.heater_on = payload[cmd.name] 

208 

209 # ToDo: Acknowledge the command. In this case commands are usually single 

210 # messages. The first key is equal to the commands name. 

211 client.publish( 

212 device_id=device_id, command_name=next(iter(payload)), payload=payload 

213 ) 

214 

215 # ToDo: Add the command callback to your MQTTClient. This will get 

216 # triggered for the specified device_id. 

217 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command) 

218 

219 # ToDO: Create an MQTT subscription for asynchronous communication that 

220 # gets triggered when the temperature attribute changes. 

221 subscription = { 

222 "description": "Subscription to receive MQTT-Notifications about " 

223 "urn:ngsi-ld:ThermalZone:001", 

224 "subject": { 

225 "entities": [ 

226 { 

227 "id": zone_temperature_sensor.entity_name, 

228 "type": zone_temperature_sensor.entity_type, 

229 } 

230 ], 

231 }, 

232 "notification": { 

233 "mqtt": { 

234 "url": MQTT_BROKER_URL_INTERNAL, 

235 "topic": TOPIC_CONTROLLER, 

236 "user": MQTT_USER, 

237 "passwd": MQTT_PW, 

238 } 

239 }, 

240 "throttling": 0, 

241 } 

242 # generate Subscription object for validation and post it 

243 subscription = Subscription(**subscription) 

244 subscription_id = cbc.post_subscription(subscription=subscription) 

245 

246 # ToDo: You need to implement a controller that controls the 

247 # heater state with respect to the zone temperature. This will be 

248 # implemented with asynchronous communication using MQTT-Subscriptions. 

249 def on_measurement(client, obj, msg): 

250 """ 

251 Callback for measurement notifications 

252 """ 

253 message = Message.model_validate_json(msg.payload) 

254 updated_zone_temperature_sensor = message.data[0] 

255 

256 # ToDo: Retrieve the value of temperature attribute. 

257 temperature = updated_zone_temperature_sensor.temperature.value 

258 

259 update = True 

260 if temperature <= 19: 

261 state = 1 

262 elif temperature >= 21: 

263 state = 0 

264 else: 

265 update = False 

266 

267 # ToDo: Send the command to the heater entity. 

268 if update: 

269 command = NamedCommand(name=cmd.name, value=state) 

270 cbc.post_command( 

271 entity_id=heater.entity_name, 

272 entity_type=heater.entity_type, 

273 command=command, 

274 ) 

275 

276 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement) 

277 

278 # connect to the mqtt broker and subscribe to your topic 

279 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED) 

280 mqttc.connect( 

281 host=mqtt_url.hostname, 

282 port=mqtt_url.port, 

283 keepalive=60, 

284 bind_address="", 

285 bind_port=0, 

286 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, 

287 properties=None, 

288 ) 

289 # subscribe to topics 

290 # subscribe to all incoming command topics for the registered devices 

291 mqttc.subscribe() 

292 mqttc.subscribe(topic=TOPIC_CONTROLLER) 

293 

294 # create a non-blocking thread for mqtt communication 

295 mqttc.loop_start() 

296 

297 # ToDo: Create a loop that publishes a message every 100 milliseconds 

298 # to the broker that holds the simulation time "sim_time" and the 

299 # corresponding temperature "temperature". You may use the `object_id` 

300 # or the attribute name as key in your payload. 

301 for t_sim in range( 

302 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP) 

303 ): 

304 # publish the simulated ambient temperature 

305 mqttc.publish( 

306 device_id=weather_station.device_id, 

307 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim}, 

308 ) 

309 

310 # publish the simulated zone temperature 

311 mqttc.publish( 

312 device_id=zone_temperature_sensor.device_id, 

313 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim}, 

314 ) 

315 

316 # publish the 'sim_time' for the heater device 

317 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim}) 

318 

319 time.sleep(0.1) 

320 # simulation step for next loop 

321 sim_model.do_step(int(t_sim + COM_STEP)) 

322 # wait for 0.1 second before publishing the next values 

323 time.sleep(0.1) 

324 

325 # get corresponding entities and write values to history 

326 weather_station_entity = cbc.get_entity( 

327 entity_id=weather_station.entity_name, 

328 entity_type=weather_station.entity_type, 

329 ) 

330 # append the data to the local history 

331 history_weather_station.append( 

332 { 

333 "sim_time": weather_station_entity.sim_time.value, 

334 "temperature": weather_station_entity.temperature.value, 

335 } 

336 ) 

337 

338 # get zone temperature sensor and write values to history 

339 zone_temperature_sensor_entity = cbc.get_entity( 

340 entity_id=zone_temperature_sensor.entity_name, 

341 entity_type=zone_temperature_sensor.entity_type, 

342 ) 

343 history_zone_temperature_sensor.append( 

344 { 

345 "sim_time": zone_temperature_sensor_entity.sim_time.value, 

346 "temperature": zone_temperature_sensor_entity.temperature.value, 

347 } 

348 ) 

349 

350 # get zone temperature sensor and write values to history 

351 heater_entity = cbc.get_entity( 

352 entity_id=heater.entity_name, entity_type=heater.entity_type 

353 ) 

354 history_heater.append( 

355 { 

356 "sim_time": heater_entity.sim_time.value, 

357 "on_off": heater_entity.heater_on_info.value, 

358 } 

359 ) 

360 

361 # close the mqtt listening thread 

362 mqttc.loop_stop() 

363 # disconnect the mqtt device 

364 mqttc.disconnect() 

365 

366 print( 

367 cbc.get_entity( 

368 entity_id=heater.entity_name, entity_type=heater.entity_type 

369 ).model_dump_json(indent=2) 

370 ) 

371 

372 # plot results 

373 fig, ax = plt.subplots() 

374 t_simulation = [item["sim_time"] / 60 for item in history_weather_station] 

375 temperature = [item["temperature"] for item in history_weather_station] 

376 ax.plot(t_simulation, temperature) 

377 ax.title.set_text("Weather Station") 

378 ax.set_xlabel("time in min") 

379 ax.set_ylabel("ambient temperature in °C") 

380 plt.show() 

381 

382 fig2, ax2 = plt.subplots() 

383 t_simulation = [item["sim_time"] / 60 for item in history_zone_temperature_sensor] 

384 temperature = [item["temperature"] for item in history_zone_temperature_sensor] 

385 ax2.plot(t_simulation, temperature) 

386 ax2.title.set_text("Zone Temperature Sensor") 

387 ax2.set_xlabel("time in min") 

388 ax2.set_ylabel("zone temperature in °C") 

389 plt.show() 

390 

391 fig3, ax3 = plt.subplots() 

392 t_simulation = [item["sim_time"] / 60 for item in history_heater] 

393 on_off = [item["on_off"] for item in history_heater] 

394 ax3.plot(t_simulation, on_off) 

395 ax3.title.set_text("Heater") 

396 ax3.set_xlabel("time in min") 

397 ax3.set_ylabel("on/off") 

398 plt.show() 

399 

400 # write devices and groups to file and clear server state 

401 assert ( 

402 WRITE_DEVICES_FILEPATH.suffix == ".json" 

403 ), f"Wrong file extension! {WRITE_DEVICES_FILEPATH.suffix}" 

404 WRITE_DEVICES_FILEPATH.touch(exist_ok=True) 

405 with WRITE_DEVICES_FILEPATH.open("w", encoding="utf-8") as f: 

406 devices = [item.model_dump() for item in iotac.get_device_list()] 

407 json.dump(devices, f, ensure_ascii=False, indent=2) 

408 

409 assert ( 

410 WRITE_GROUPS_FILEPATH.suffix == ".json" 

411 ), f"Wrong file extension! {WRITE_GROUPS_FILEPATH.suffix}" 

412 WRITE_GROUPS_FILEPATH.touch(exist_ok=True) 

413 with WRITE_GROUPS_FILEPATH.open("w", encoding="utf-8") as f: 

414 groups = [item.model_dump() for item in iotac.get_group_list()] 

415 json.dump(groups, f, ensure_ascii=False, indent=2) 

416 

417 assert ( 

418 WRITE_SUBSCRIPTIONS_FILEPATH.suffix == ".json" 

419 ), f"Wrong file extension! {WRITE_SUBSCRIPTIONS_FILEPATH.suffix}" 

420 WRITE_SUBSCRIPTIONS_FILEPATH.touch(exist_ok=True) 

421 with WRITE_SUBSCRIPTIONS_FILEPATH.open("w", encoding="utf-8") as f: 

422 subs = [item.model_dump() for item in cbc.get_subscription_list()] 

423 json.dump(subs, f, ensure_ascii=False, indent=2) 

424 

425 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

426 clear_context_broker(url=CB_URL, fiware_header=fiware_header)