Coverage for tutorials/ngsi_v2/e5_iot_thermal_zone_control/e5_iot_thermal_zone_control.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2# # Exercise 5: Virtual Thermal Zone with Control 

3 

4# Create a virtual IoT device that simulates a heater for your 

5# thermal zone. The heater can be turned on and off via a simple hysteresis 

6# controller. The devices from e4_iot_thermal_zone_sensors.py will be loaded 

7# from the stored *.json-files. 

8 

9# The input sections are marked with 'ToDo' 

10 

11# #### Steps to complete: 

12# 1. Set up the missing parameters in the parameter section 

13# 2. Retrieve the service group and device configurations of already existing 

14# devices from the IoT-Agent 

15# 3. Create a third device configuration for a heater holding a command 

16# for turning it `on` and `off` and post it to the server 

17# 4. Create an MQTT client using the filip.client.mqtt package and register 

18# your service group and your devices 

19# 4. Define a callback function that will be executed when the client 

20# receives a command. Decode the message and set the update state in 

21# simulation model. Afterwards, acknowledge the command using the api of the 

22# IoTAMQTTClient. 

23# 5. Add the callback for your heater device to the IoTAMQTTClient 

24# 6. Create an MQTT subscription for asynchronous communication that 

25# gets triggered when the temperature attribute changes. 

26# 7. Write a second callback that represents your controller. It should get 

27# triggered when the MQTTClient receives a notification message due to your 

28# subscription. Add the callback to your MQTTClient using the original 

29# paho-api (`message_callback_add`) 

30# 8. Run the simulation and plot 

31""" 

32 

33# ## Import packages 

34import json 

35from pathlib import Path 

36import time 

37from typing import List 

38from urllib.parse import urlparse 

39from uuid import uuid4 

40import paho.mqtt.client as mqtt 

41from pydantic import TypeAdapter 

42import matplotlib.pyplot as plt 

43 

44# import from filip 

45from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient 

46from filip.clients.mqtt import IoTAMQTTClient 

47from filip.models.base import DataType, FiwareHeader 

48from filip.models.ngsi_v2.context import NamedCommand 

49from filip.models.ngsi_v2.subscriptions import Subscription, Message 

50from filip.models.ngsi_v2.iot import ( 

51 Device, 

52 DeviceAttribute, 

53 DeviceCommand, 

54 PayloadProtocol, 

55 ServiceGroup, 

56) 

57from filip.utils.cleanup import clear_context_broker, clear_iot_agent 

58 

59# import simulation model 

60from tutorials.ngsi_v2.simulation_model import SimulationModel 

61 

62# ## Parameters 

63# ToDo: Enter your context broker host and port, e.g http://localhost:1026. 

64CB_URL = "http://localhost:1026" 

65# ToDo: Enter your IoT-Agent host and port, e.g http://localhost:4041. 

66IOTA_URL = "http://localhost:4041" 

67# ToDo: Enter your mqtt broker url, e.g mqtt://test.mosquitto.org:1883. 

68MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883" 

69# ToDo: Enter your mqtt broker url, e.g mqtt://mosquitto:1883. 

70MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883" 

71# ToDo: If required, enter your username and password. 

72MQTT_USER = "" 

73MQTT_PW = "" 

74 

75# ToDo: Change the name of your service to something unique. If you run 

76# on a shared instance this is very important in order to avoid user 

77# collisions. You will use this service through the whole tutorial. 

78# If you forget to change it, an error will be raised! 

79# FIWARE-Service 

80SERVICE = "filip_tutorial" 

81# FIWARE-Service path 

82SERVICE_PATH = "/" 

83 

84# ToDo: Change the APIKEY to something unique. This represents the "token" 

85# for IoT devices to connect (send/receive data) with the platform. In the 

86# context of MQTT, APIKEY is linked with the topic used for communication. 

87APIKEY = "your_apikey" 

88UNIQUE_ID = str(uuid4()) 

89TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller" 

90print(TOPIC_CONTROLLER) 

91# path to json-files to store entity data for follow-up exercises 

92WRITE_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_groups.json") 

93WRITE_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_devices.json") 

94WRITE_SUBSCRIPTIONS_FILEPATH = Path("../e5_iot_thermal_zone_control_subscriptions.json") 

95# path to read json-files from previous exercises 

96READ_GROUPS_FILEPATH = Path("../e4_iot_thermal_zone_sensors_groups.json") 

97READ_DEVICES_FILEPATH = Path("../e4_iot_thermal_zone_sensors_devices.json") 

98 

99# opening the files 

100with open(READ_GROUPS_FILEPATH, "r") as groups_file, open( 

101 READ_DEVICES_FILEPATH, "r" 

102) as devices_file: 

103 json_groups = json.load(groups_file) 

104 json_devices = json.load(devices_file) 

105 

106# set parameters for the temperature simulation 

107TEMPERATURE_MAX = 10 # maximal ambient temperature 

108TEMPERATURE_MIN = -5 # minimal ambient temperature 

109TEMPERATURE_ZONE_START = 20 # start value of the zone temperature 

110 

111T_SIM_START = 0 # simulation start time in seconds 

112T_SIM_END = 24 * 60 * 60 # simulation end time in seconds 

113COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds 

114 

115# ## Main script 

116if __name__ == "__main__": 

117 # create a fiware header object 

118 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH) 

119 # clear the state of your service and scope 

120 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

121 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

122 

123 # instantiate simulation model 

124 sim_model = SimulationModel( 

125 t_start=T_SIM_START, 

126 t_end=T_SIM_END, 

127 temp_max=TEMPERATURE_MAX, 

128 temp_min=TEMPERATURE_MIN, 

129 temp_start=TEMPERATURE_ZONE_START, 

130 ) 

131 

132 # define lists to store historical data 

133 history_weather_station = [] 

134 history_zone_temperature_sensor = [] 

135 history_heater = [] 

136 

137 # create clients and also restore devices and groups from file 

138 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups) 

139 devices = TypeAdapter(List[Device]).validate_python(json_devices) 

140 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header) 

141 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header) 

142 iotac.post_groups(service_groups=groups) 

143 iotac.post_devices(devices=devices) 

144 

145 # ToDo: Get the device configurations from the server. 

146 weather_station = iotac.get_device(device_id="device:001") 

147 zone_temperature_sensor = ... 

148 

149 # ToDo: Get the service group configurations from the server. 

150 group = iotac.get_group(resource="/iot/json", apikey=...) 

151 

152 # ToDo: Create an additional device holding a command attribute and 

153 # post it to the IoT-Agent. It should be mapped to the `type` heater. 

154 # create the sim_time attribute and add it during device creation 

155 t_sim = DeviceAttribute(name="sim_time", object_id="t_sim", type="Number") 

156 

157 # ToDo: Create the command attribute of name `heater_on` (currently it is 

158 # not possible to add metadata here). 

159 cmd = DeviceCommand(name=..., type=...) 

160 

161 # ToDo: Create the device configuration and send it to the server. 

162 heater = Device(...) 

163 

164 iotac.post_device(device=heater) 

165 

166 # ToDo: Check the entity that corresponds to your device. 

167 heater_entity = cbc.get_entity( 

168 entity_id=heater.entity_name, entity_type=heater.entity_type 

169 ) 

170 print( 

171 f"Your device entity before running the simulation: \n " 

172 f"{heater_entity.model_dump_json(indent=2)}" 

173 ) 

174 

175 # create a MQTTv5 client with paho-mqtt and the known groups and devices. 

176 mqttc = IoTAMQTTClient( 

177 protocol=mqtt.MQTTv5, 

178 devices=[weather_station, zone_temperature_sensor, heater], 

179 service_groups=[group], 

180 ) 

181 # set user data if required 

182 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW) 

183 

184 # ToDo: Implement a callback function that gets triggered when the 

185 # command is sent to the device. The incoming command should update the 

186 # heater attribute of the simulation model. 

187 def on_command(client, obj, msg): 

188 """ 

189 Callback for incoming commands 

190 """ 

191 # decode the message payload using the libraries builtin encoders 

192 apikey, device_id, payload = client.get_encoder( 

193 PayloadProtocol.IOTA_JSON 

194 ).decode_message(msg=msg) 

195 # map the command value to the simulation 

196 sim_model.heater_on = payload[cmd.name] 

197 

198 # ToDo: Acknowledge the command. In this case commands are usually single 

199 # messages. The first key is equal to the commands name. 

200 client.publish(device_id=device_id, command_name=..., payload=...) 

201 

202 # ToDo: Add the command callback to your MQTTClient. This will get 

203 # triggered for the specified device_id. 

204 mqttc.add_command_callback(device_id=..., callback=...) 

205 

206 # ToDO: Create an MQTT subscription for asynchronous communication that 

207 # gets triggered when the temperature attribute changes. 

208 subscription = { 

209 "description": "Subscription to receive MQTT-Notifications about " 

210 "urn:ngsi-ld:ThermalZone:001", 

211 "subject": { 

212 "entities": [{"id": ..., "type": ...}], 

213 }, 

214 "notification": { 

215 "mqtt": { 

216 "url": MQTT_BROKER_URL_INTERNAL, 

217 "topic": TOPIC_CONTROLLER, 

218 "user": MQTT_USER, 

219 "passwd": MQTT_PW, 

220 } 

221 }, 

222 "throttling": 0, 

223 } 

224 # generate Subscription object for validation and post it 

225 subscription = Subscription(**subscription) 

226 subscription_id = cbc.post_subscription(subscription=subscription) 

227 

228 # ToDo: You need to implement a controller that controls the 

229 # heater state with respect to the zone temperature. This will be 

230 # implemented with asynchronous communication using MQTT-Subscriptions. 

231 def on_measurement(client, obj, msg): 

232 """ 

233 Callback for measurement notifications 

234 """ 

235 message = Message.model_validate_json(msg.payload) 

236 updated_zone_temperature_sensor = message.data[0] 

237 

238 # ToDo: Retrieve the value of temperature attribute. 

239 temperature = ... 

240 

241 update = True 

242 if temperature <= 19: 

243 state = 1 

244 elif temperature >= 21: 

245 state = 0 

246 else: 

247 update = False 

248 

249 # ToDo: Send the command to the heater entity. 

250 if update: 

251 command = NamedCommand(name=cmd.name, value=state) 

252 cbc.post_command(...) 

253 

254 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement) 

255 

256 # connect to the mqtt broker and subscribe to your topic 

257 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED) 

258 mqttc.connect( 

259 host=mqtt_url.hostname, 

260 port=mqtt_url.port, 

261 keepalive=60, 

262 bind_address="", 

263 bind_port=0, 

264 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, 

265 properties=None, 

266 ) 

267 # subscribe to topics 

268 # subscribe to all incoming command topics for the registered devices 

269 mqttc.subscribe() 

270 mqttc.subscribe(topic=TOPIC_CONTROLLER) 

271 

272 # create a non-blocking thread for mqtt communication 

273 mqttc.loop_start() 

274 

275 # ToDo: Create a loop that publishes a message every 100 milliseconds 

276 # to the broker that holds the simulation time "sim_time" and the 

277 # corresponding temperature "temperature". You may use the `object_id` 

278 # or the attribute name as key in your payload. 

279 for t_sim in range( 

280 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP) 

281 ): 

282 # publish the simulated ambient temperature 

283 mqttc.publish( 

284 device_id=weather_station.device_id, 

285 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim}, 

286 ) 

287 

288 # publish the simulated zone temperature 

289 mqttc.publish( 

290 device_id=zone_temperature_sensor.device_id, 

291 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim}, 

292 ) 

293 

294 # publish the 'sim_time' for the heater device 

295 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim}) 

296 

297 time.sleep(0.1) 

298 # simulation step for next loop 

299 sim_model.do_step(int(t_sim + COM_STEP)) 

300 # wait for 0.1 second before publishing the next values 

301 time.sleep(0.1) 

302 

303 # get corresponding entities and write values to history 

304 weather_station_entity = cbc.get_entity( 

305 entity_id=weather_station.entity_name, 

306 entity_type=weather_station.entity_type, 

307 ) 

308 # append the data to the local history 

309 history_weather_station.append( 

310 { 

311 "sim_time": weather_station_entity.sim_time.value, 

312 "temperature": weather_station_entity.temperature.value, 

313 } 

314 ) 

315 

316 # get zone temperature sensor and write values to history 

317 zone_temperature_sensor_entity = cbc.get_entity( 

318 entity_id=zone_temperature_sensor.entity_name, 

319 entity_type=zone_temperature_sensor.entity_type, 

320 ) 

321 history_zone_temperature_sensor.append( 

322 { 

323 "sim_time": zone_temperature_sensor_entity.sim_time.value, 

324 "temperature": zone_temperature_sensor_entity.temperature.value, 

325 } 

326 ) 

327 

328 # get zone temperature sensor and write values to history 

329 heater_entity = cbc.get_entity( 

330 entity_id=heater.entity_name, entity_type=heater.entity_type 

331 ) 

332 history_heater.append( 

333 { 

334 "sim_time": heater_entity.sim_time.value, 

335 "on_off": heater_entity.heater_on_info.value, 

336 } 

337 ) 

338 

339 # close the mqtt listening thread 

340 mqttc.loop_stop() 

341 # disconnect the mqtt device 

342 mqttc.disconnect() 

343 

344 print( 

345 cbc.get_entity( 

346 entity_id=heater.entity_name, entity_type=heater.entity_type 

347 ).model_dump_json(indent=2) 

348 ) 

349 

350 # plot results 

351 fig, ax = plt.subplots() 

352 t_simulation = [item["sim_time"] / 60 for item in history_weather_station] 

353 temperature = [item["temperature"] for item in history_weather_station] 

354 ax.plot(t_simulation, temperature) 

355 ax.title.set_text("Weather Station") 

356 ax.set_xlabel("time in min") 

357 ax.set_ylabel("ambient temperature in °C") 

358 plt.show() 

359 

360 fig2, ax2 = plt.subplots() 

361 t_simulation = [item["sim_time"] / 60 for item in history_zone_temperature_sensor] 

362 temperature = [item["temperature"] for item in history_zone_temperature_sensor] 

363 ax2.plot(t_simulation, temperature) 

364 ax2.title.set_text("Zone Temperature Sensor") 

365 ax2.set_xlabel("time in min") 

366 ax2.set_ylabel("zone temperature in °C") 

367 plt.show() 

368 

369 fig3, ax3 = plt.subplots() 

370 t_simulation = [item["sim_time"] / 60 for item in history_heater] 

371 on_off = [item["on_off"] for item in history_heater] 

372 ax3.plot(t_simulation, on_off) 

373 ax3.title.set_text("Heater") 

374 ax3.set_xlabel("time in min") 

375 ax3.set_ylabel("on/off") 

376 plt.show() 

377 

378 # write devices and groups to file and clear server state 

379 assert ( 

380 WRITE_DEVICES_FILEPATH.suffix == ".json" 

381 ), f"Wrong file extension! {WRITE_DEVICES_FILEPATH.suffix}" 

382 WRITE_DEVICES_FILEPATH.touch(exist_ok=True) 

383 with WRITE_DEVICES_FILEPATH.open("w", encoding="utf-8") as f: 

384 devices = [item.model_dump() for item in iotac.get_device_list()] 

385 json.dump(devices, f, ensure_ascii=False, indent=2) 

386 

387 assert ( 

388 WRITE_GROUPS_FILEPATH.suffix == ".json" 

389 ), f"Wrong file extension! {WRITE_GROUPS_FILEPATH.suffix}" 

390 WRITE_GROUPS_FILEPATH.touch(exist_ok=True) 

391 with WRITE_GROUPS_FILEPATH.open("w", encoding="utf-8") as f: 

392 groups = [item.model_dump() for item in iotac.get_group_list()] 

393 json.dump(groups, f, ensure_ascii=False, indent=2) 

394 

395 assert ( 

396 WRITE_SUBSCRIPTIONS_FILEPATH.suffix == ".json" 

397 ), f"Wrong file extension! {WRITE_SUBSCRIPTIONS_FILEPATH.suffix}" 

398 WRITE_SUBSCRIPTIONS_FILEPATH.touch(exist_ok=True) 

399 with WRITE_SUBSCRIPTIONS_FILEPATH.open("w", encoding="utf-8") as f: 

400 subs = [item.model_dump() for item in cbc.get_subscription_list()] 

401 json.dump(subs, f, ensure_ascii=False, indent=2) 

402 

403 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

404 clear_context_broker(url=CB_URL, fiware_header=fiware_header)