Coverage for tutorials/ngsi_v2/e6_timeseries_data/e6_timeseries_data_solution.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2# # Exercise 6: Time Series Data 

3 

4# We now want store our data in the historic data storage and visualize it 

5 

6# The input sections are marked with 'ToDo' 

7 

8# #### Steps to complete: 

9# 1. Set up the missing parameters in the parameter section 

10# 2. Create a quantumleap client that creates subscription that gets triggered 

11# by the updates on your context entities 

12# 3. Run the simulation 

13# 4. Retrieve the data via QuantumLeap and visualize it 

14""" 

15 

16# ## Import packages 

17import json 

18from pathlib import Path 

19import time 

20from typing import List 

21from urllib.parse import urlparse 

22from uuid import uuid4 

23import pandas as pd 

24import paho.mqtt.client as mqtt 

25from pydantic import TypeAdapter 

26import matplotlib.pyplot as plt 

27 

28# import from filip 

29from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient 

30from filip.clients.mqtt import IoTAMQTTClient 

31from filip.models.base import FiwareHeader 

32from filip.models.ngsi_v2.context import NamedCommand 

33from filip.models.ngsi_v2.subscriptions import ( 

34 Subscription, 

35 Message, 

36 Notification, 

37 Subject, 

38) 

39from filip.models.ngsi_v2.iot import Device, PayloadProtocol, ServiceGroup 

40from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap 

41 

42# import simulation model 

43from tutorials.ngsi_v2.simulation_model import SimulationModel 

44 

45# ## Parameters 

46# ToDo: Enter your context broker host and port, e.g. http://localhost:1026. 

47CB_URL = "http://localhost:1026" 

48# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041. 

49IOTA_URL = "http://localhost:4041" 

50# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041. 

51QL_URL = "http://localhost:8668" 

52# ToDo: Enter your mqtt broker url, e.g. mqtt://test.mosquitto.org:1883. 

53MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883" 

54# ToDo: Enter your mqtt broker url, e.g. mqtt://mosquitto:1883. 

55MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883" 

56# ToDo: If required, enter your username and password. 

57MQTT_USER = "" 

58MQTT_PW = "" 

59 

60# ToDo: Change the name of your service to something unique. If you run 

61# on a shared instance this is very important in order to avoid user 

62# collisions. You will use this service through the whole tutorial. 

63# If you forget to change it, an error will be raised! 

64# FIWARE-Service 

65SERVICE = "filip_tutorial" 

66# FIWARE-Service path 

67SERVICE_PATH = "/" 

68 

69# ToDo: Change the APIKEY to something unique. This represents the "token" 

70# for IoT devices to connect (send/receive data) with the platform. In the 

71# context of MQTT, APIKEY is linked with the topic used for communication. 

72APIKEY = "your_apikey" 

73UNIQUE_ID = str(uuid4()) 

74TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller" 

75print(TOPIC_CONTROLLER) 

76 

77# path to read json-files from previous exercises 

78READ_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_groups.json") 

79READ_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_solution_devices.json") 

80READ_SUBSCRIPTIONS_FILEPATH = Path( 

81 "../e5_iot_thermal_zone_control_solution_subscriptions.json" 

82) 

83 

84# opening the files 

85with open(READ_GROUPS_FILEPATH, "r") as groups_file, open( 

86 READ_DEVICES_FILEPATH, "r" 

87) as devices_file, open(READ_SUBSCRIPTIONS_FILEPATH, "r") as subscriptions_file: 

88 json_groups = json.load(groups_file) 

89 json_devices = json.load(devices_file) 

90 json_subscriptions = json.load(subscriptions_file) 

91 

92# set parameters for the temperature simulation 

93TEMPERATURE_MAX = 10 # maximal ambient temperature 

94TEMPERATURE_MIN = -5 # minimal ambient temperature 

95TEMPERATURE_ZONE_START = 20 # start value of the zone temperature 

96 

97T_SIM_START = 0 # simulation start time in seconds 

98T_SIM_END = 24 * 60 * 60 # simulation end time in seconds 

99COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds 

100 

101# ## Main script 

102if __name__ == "__main__": 

103 # create a fiware header object 

104 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH) 

105 # clear the state of your service and scope 

106 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

107 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

108 clear_quantumleap(url=QL_URL, fiware_header=fiware_header) 

109 

110 # instantiate simulation model 

111 sim_model = SimulationModel( 

112 t_start=T_SIM_START, 

113 t_end=T_SIM_END, 

114 temp_max=TEMPERATURE_MAX, 

115 temp_min=TEMPERATURE_MIN, 

116 temp_start=TEMPERATURE_ZONE_START, 

117 ) 

118 

119 # create clients and restore devices and groups from file 

120 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups) 

121 devices = TypeAdapter(List[Device]).validate_python(json_devices) 

122 sub = TypeAdapter(List[Subscription]).validate_python(json_subscriptions)[0] 

123 sub.notification.mqtt.topic = TOPIC_CONTROLLER 

124 sub.notification.mqtt.user = MQTT_USER 

125 sub.notification.mqtt.passwd = MQTT_PW 

126 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header) 

127 cbc.post_subscription(subscription=sub) 

128 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header) 

129 iotac.post_groups(service_groups=groups) 

130 iotac.post_devices(devices=devices) 

131 

132 # get the group and device configurations from the server 

133 group = iotac.get_group(resource="/iot/json", apikey=APIKEY) 

134 weather_station = iotac.get_device(device_id="device:001") 

135 zone_temperature_sensor = iotac.get_device(device_id="device:002") 

136 heater = iotac.get_device(device_id="device:003") 

137 

138 # create a MQTTv5 client with paho-mqtt and the known groups and 

139 # devices. 

140 mqttc = IoTAMQTTClient( 

141 protocol=mqtt.MQTTv5, 

142 devices=[weather_station, zone_temperature_sensor, heater], 

143 service_groups=[group], 

144 ) 

145 # ToDo: Set user data if required. 

146 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW) 

147 # Implement a callback function that gets triggered when the 

148 # command is sent to the device. The incoming command should update the 

149 # heater attribute of the simulation model 

150 

151 def on_command(client, obj, msg): 

152 """ 

153 Callback for incoming commands 

154 """ 

155 # Decode the message payload using the libraries builtin encoders 

156 apikey, device_id, payload = client.get_encoder( 

157 PayloadProtocol.IOTA_JSON 

158 ).decode_message(msg=msg) 

159 

160 sim_model.heater_on = payload[heater.commands[0].name] 

161 

162 # Acknowledge the command. Here commands are usually single 

163 # messages. The first key is equal to the commands name. 

164 client.publish( 

165 device_id=device_id, command_name=next(iter(payload)), payload=payload 

166 ) 

167 

168 # Add the command callback to your MQTTClient. This will get 

169 # triggered for the specified device_id. 

170 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command) 

171 

172 # You need to implement a controller that controls the 

173 # heater state with respect to the zone temperature. This will be 

174 # implemented with asynchronous communication using MQTT-Subscriptions 

175 def on_measurement(client, obj, msg): 

176 """ 

177 Callback for measurement notifications 

178 """ 

179 message = Message.model_validate_json(msg.payload) 

180 updated_zone_temperature_sensor = message.data[0] 

181 

182 # retrieve the value of temperature attribute 

183 temperature = updated_zone_temperature_sensor.temperature.value 

184 

185 update = True 

186 if temperature <= 19: 

187 state = 1 

188 elif temperature >= 21: 

189 state = 0 

190 else: 

191 update = False 

192 # send the command to the heater entity 

193 if update: 

194 command = NamedCommand(name=heater.commands[0].name, value=state) 

195 cbc.post_command( 

196 entity_id=heater.entity_name, 

197 entity_type=heater.entity_type, 

198 command=command, 

199 ) 

200 

201 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement) 

202 

203 # ToDo: Create http subscriptions that get triggered by updates of your 

204 # device attributes. Note that you can only post the subscription 

205 # to the context broker. 

206 

207 cbc.post_subscription( 

208 subscription=Subscription( 

209 subject=Subject( 

210 **{ 

211 "entities": [ 

212 { 

213 "id": weather_station.entity_name, 

214 "type": weather_station.entity_type, 

215 } 

216 ] 

217 } 

218 ), 

219 notification=Notification( 

220 **{"http": {"url": "http://quantumleap:8668/v2/notify"}} 

221 ), 

222 throttling=0, 

223 ) 

224 ) 

225 

226 cbc.post_subscription( 

227 subscription=Subscription( 

228 subject=Subject( 

229 **{ 

230 "entities": [ 

231 { 

232 "id": zone_temperature_sensor.entity_name, 

233 "type": zone_temperature_sensor.entity_type, 

234 } 

235 ] 

236 } 

237 ), 

238 notification=Notification( 

239 **{"http": {"url": "http://quantumleap:8668/v2/notify"}} 

240 ), 

241 throttling=0, 

242 ) 

243 ) 

244 

245 cbc.post_subscription( 

246 subscription=Subscription( 

247 subject=Subject( 

248 **{"entities": [{"id": heater.entity_name, "type": heater.entity_type}]} 

249 ), 

250 notification=Notification( 

251 **{"http": {"url": "http://quantumleap:8668/v2/notify"}} 

252 ), 

253 throttling=0, 

254 ) 

255 ) 

256 

257 # connect to the mqtt broker and subscribe to your topic 

258 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED) 

259 mqttc.connect( 

260 host=mqtt_url.hostname, 

261 port=mqtt_url.port, 

262 keepalive=60, 

263 bind_address="", 

264 bind_port=0, 

265 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, 

266 properties=None, 

267 ) 

268 

269 # subscribe to topics 

270 # subscribe to all incoming command topics for the registered devices 

271 mqttc.subscribe() 

272 mqttc.subscribe(topic=TOPIC_CONTROLLER) 

273 

274 # create a non-blocking thread for mqtt communication 

275 mqttc.loop_start() 

276 

277 # Create a loop that publishes a message every 0.3 seconds to the broker 

278 # that holds the simulation time "sim_time" and the corresponding 

279 # temperature "temperature". You may use the `object_id` 

280 # or the attribute name as key in your payload. 

281 for t_sim in range( 

282 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP) 

283 ): 

284 # publish the simulated ambient temperature 

285 mqttc.publish( 

286 device_id=weather_station.device_id, 

287 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim}, 

288 ) 

289 

290 # publish the simulated zone temperature 

291 mqttc.publish( 

292 device_id=zone_temperature_sensor.device_id, 

293 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim}, 

294 ) 

295 

296 # publish the 'sim_time' for the heater device 

297 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim}) 

298 

299 time.sleep(0.3) 

300 # simulation step for next loop 

301 sim_model.do_step(int(t_sim + COM_STEP)) 

302 # wait for 0.3 seconds before publishing the next values 

303 time.sleep(0.3) 

304 

305 # close the mqtt listening thread 

306 mqttc.loop_stop() 

307 # disconnect the mqtt device 

308 mqttc.disconnect() 

309 

310 # wait until all data is available 

311 time.sleep(10) 

312 

313 # ToDo: Create a quantumleap client. 

314 qlc = QuantumLeapClient(url=QL_URL, fiware_header=fiware_header) 

315 

316 # ToDo: Retrieve the historic data from QuantumLeap, convert them to a 

317 # pandas dataframe and plot them. 

318 # retrieve the data for the weather station 

319 history_weather_station = qlc.get_entity_by_id( 

320 entity_id=weather_station.entity_name, 

321 entity_type=weather_station.entity_type, 

322 last_n=10000, 

323 ) 

324 

325 # convert to pandas dataframe and print it 

326 history_weather_station = history_weather_station.to_pandas() 

327 print(history_weather_station) 

328 # drop unnecessary index levels 

329 history_weather_station = history_weather_station.droplevel( 

330 level=("entityId", "entityType"), axis=1 

331 ) 

332 history_weather_station["sim_time"] = pd.to_numeric( 

333 history_weather_station["sim_time"], downcast="float" 

334 ) 

335 history_weather_station["temperature"] = pd.to_numeric( 

336 history_weather_station["temperature"], downcast="float" 

337 ) 

338 # ToDo: Plot the results. 

339 fig, ax = plt.subplots() 

340 ax.plot( 

341 history_weather_station["sim_time"] / 60, history_weather_station["temperature"] 

342 ) 

343 ax.title.set_text("Weather Station") 

344 ax.set_xlabel("time in min") 

345 ax.set_ylabel("ambient temperature in °C") 

346 plt.show() 

347 

348 # ToDo: Retrieve the data for the zone temperature. 

349 history_zone_temperature_sensor = qlc.get_entity_by_id( 

350 entity_id=zone_temperature_sensor.entity_name, 

351 entity_type=zone_temperature_sensor.entity_type, 

352 last_n=10000, 

353 ) 

354 

355 # ToDo: Convert to pandas dataframe and print it. 

356 history_zone_temperature_sensor = history_zone_temperature_sensor.to_pandas() 

357 print(history_zone_temperature_sensor) 

358 # ToDo: Drop unnecessary index levels. 

359 history_zone_temperature_sensor = history_zone_temperature_sensor.droplevel( 

360 level=("entityId", "entityType"), axis=1 

361 ) 

362 history_zone_temperature_sensor["sim_time"] = pd.to_numeric( 

363 history_zone_temperature_sensor["sim_time"], downcast="float" 

364 ) 

365 history_zone_temperature_sensor["temperature"] = pd.to_numeric( 

366 history_zone_temperature_sensor["temperature"], downcast="float" 

367 ) 

368 # ToDo: Plot the results. 

369 fig2, ax2 = plt.subplots() 

370 ax2.plot( 

371 history_zone_temperature_sensor["sim_time"] / 60, 

372 history_zone_temperature_sensor["temperature"], 

373 ) 

374 ax2.title.set_text("Zone Temperature Sensor") 

375 ax2.set_xlabel("time in min") 

376 ax2.set_ylabel("zone temperature in °C") 

377 plt.show() 

378 

379 # ToDo: Retrieve the data for the heater. 

380 history_heater = qlc.get_entity_by_id( 

381 entity_id=heater.entity_name, entity_type=heater.entity_type, last_n=10000 

382 ) 

383 

384 # convert to pandas dataframe and print it 

385 history_heater = history_heater.to_pandas() 

386 history_heater = history_heater.replace(" ", 0) 

387 print(history_heater) 

388 

389 # ToDo: Drop unnecessary index levels. 

390 history_heater = history_heater.droplevel(level=("entityId", "entityType"), axis=1) 

391 history_heater["sim_time"] = pd.to_numeric( 

392 history_heater["sim_time"], downcast="float" 

393 ) 

394 history_heater["heater_on_info"] = pd.to_numeric( 

395 history_heater["heater_on_info"], downcast="float" 

396 ) 

397 # ToDo: Plot the results. 

398 fig3, ax3 = plt.subplots() 

399 ax3.plot(history_heater["sim_time"] / 60, history_heater["heater_on_info"]) 

400 ax3.title.set_text("Heater") 

401 ax3.set_xlabel("time in min") 

402 ax3.set_ylabel("set point") 

403 plt.show() 

404 

405 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

406 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

407 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)