Coverage for tutorials/ngsi_v2/e6_timeseries_data/e6_timeseries_data.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2# # Exercise 6: Time Series Data 

3 

4# We now want store our data in the historic data storage and visualize it 

5 

6# The input sections are marked with 'ToDo' 

7 

8# #### Steps to complete: 

9# 1. Set up the missing parameters in the parameter section 

10# 2. Create a quantumleap client that creates subscription that gets triggered 

11# by the updates on your context entities 

12# 3. Run the simulation 

13# 4. Retrieve the data via QuantumLeap and visualize it 

14""" 

15 

16# ## Import packages 

17import json 

18from pathlib import Path 

19import time 

20from typing import List 

21from urllib.parse import urlparse 

22from uuid import uuid4 

23import pandas as pd 

24import paho.mqtt.client as mqtt 

25from pydantic import TypeAdapter 

26import matplotlib.pyplot as plt 

27 

28# import from filip 

29from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient 

30from filip.clients.mqtt import IoTAMQTTClient 

31from filip.models.base import FiwareHeader 

32from filip.models.ngsi_v2.context import NamedCommand 

33from filip.models.ngsi_v2.subscriptions import ( 

34 Subscription, 

35 Message, 

36 Subject, 

37 Notification, 

38) 

39from filip.models.ngsi_v2.iot import Device, PayloadProtocol, ServiceGroup 

40from filip.utils.cleanup import clear_context_broker, clear_iot_agent, clear_quantumleap 

41 

42# import simulation model 

43from tutorials.ngsi_v2.simulation_model import SimulationModel 

44 

45# ## Parameters 

46# ToDo: Enter your context broker host and port, e.g. http://localhost:1026. 

47CB_URL = "http://localhost:1026" 

48# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041. 

49IOTA_URL = "http://localhost:4041" 

50# ToDo: Enter your IoT-Agent host and port, e.g. http://localhost:4041. 

51QL_URL = "http://localhost:8668" 

52# ToDo: Enter your mqtt broker url, e.g. mqtt://test.mosquitto.org:1883. 

53MQTT_BROKER_URL_EXPOSED = "mqtt://localhost:1883" 

54# ToDo: Enter your mqtt broker url, e.g. mqtt://mosquitto:1883. 

55MQTT_BROKER_URL_INTERNAL = "mqtt://mosquitto:1883" 

56# ToDo: If required, enter your username and password. 

57MQTT_USER = "" 

58MQTT_PW = "" 

59 

60# ToDo: Change the name of your service to something unique. If you run 

61# on a shared instance this is very important in order to avoid user 

62# collisions. You will use this service through the whole tutorial. 

63# If you forget to change it, an error will be raised! 

64# FIWARE-Service 

65SERVICE = "filip_tutorial" 

66# FIWARE-Service path 

67SERVICE_PATH = "/" 

68 

69# ToDo: Change the APIKEY to something unique. This represents the "token" 

70# for IoT devices to connect (send/receive data) with the platform. In the 

71# context of MQTT, APIKEY is linked with the topic used for communication. 

72APIKEY = "your_apikey" 

73UNIQUE_ID = str(uuid4()) 

74TOPIC_CONTROLLER = f"fiware_workshop/{UNIQUE_ID}/controller" 

75print(TOPIC_CONTROLLER) 

76 

77# path to read json-files from previous exercises 

78READ_GROUPS_FILEPATH = Path("../e5_iot_thermal_zone_control_groups.json") 

79READ_DEVICES_FILEPATH = Path("../e5_iot_thermal_zone_control_devices.json") 

80READ_SUBSCRIPTIONS_FILEPATH = Path("../e5_iot_thermal_zone_control_subscriptions.json") 

81 

82# opening the files 

83with open(READ_GROUPS_FILEPATH, "r") as groups_file, open( 

84 READ_DEVICES_FILEPATH, "r" 

85) as devices_file, open(READ_SUBSCRIPTIONS_FILEPATH, "r") as subscriptions_file: 

86 json_groups = json.load(groups_file) 

87 json_devices = json.load(devices_file) 

88 json_subscriptions = json.load(subscriptions_file) 

89 

90# set parameters for the temperature simulation 

91TEMPERATURE_MAX = 10 # maximal ambient temperature 

92TEMPERATURE_MIN = -5 # minimal ambient temperature 

93TEMPERATURE_ZONE_START = 20 # start value of the zone temperature 

94 

95T_SIM_START = 0 # simulation start time in seconds 

96T_SIM_END = 24 * 60 * 60 # simulation end time in seconds 

97COM_STEP = 60 * 60 * 0.25 # 15 min communication step in seconds 

98 

99# ## Main script 

100if __name__ == "__main__": 

101 # create a fiware header object 

102 fiware_header = FiwareHeader(service=SERVICE, service_path=SERVICE_PATH) 

103 # clear the state of your service and scope 

104 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

105 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

106 clear_quantumleap(url=QL_URL, fiware_header=fiware_header) 

107 

108 # instantiate simulation model 

109 sim_model = SimulationModel( 

110 t_start=T_SIM_START, 

111 t_end=T_SIM_END, 

112 temp_max=TEMPERATURE_MAX, 

113 temp_min=TEMPERATURE_MIN, 

114 temp_start=TEMPERATURE_ZONE_START, 

115 ) 

116 

117 # create clients and restore devices and groups from file 

118 groups = TypeAdapter(List[ServiceGroup]).validate_python(json_groups) 

119 devices = TypeAdapter(List[Device]).validate_python(json_devices) 

120 sub = TypeAdapter(List[Subscription]).validate_python(json_subscriptions)[0] 

121 sub.notification.mqtt.topic = TOPIC_CONTROLLER 

122 sub.notification.mqtt.user = MQTT_USER 

123 sub.notification.mqtt.passwd = MQTT_PW 

124 cbc = ContextBrokerClient(url=CB_URL, fiware_header=fiware_header) 

125 cbc.post_subscription(subscription=sub) 

126 iotac = IoTAClient(url=IOTA_URL, fiware_header=fiware_header) 

127 iotac.post_groups(service_groups=groups) 

128 iotac.post_devices(devices=devices) 

129 

130 # get the group and device configurations from the server 

131 group = iotac.get_group(resource="/iot/json", apikey=APIKEY) 

132 weather_station = iotac.get_device(device_id="device:001") 

133 zone_temperature_sensor = iotac.get_device(device_id="device:002") 

134 heater = iotac.get_device(device_id="device:003") 

135 

136 # create a MQTTv5 client with paho-mqtt and the known groups and 

137 # devices. 

138 mqttc = IoTAMQTTClient( 

139 protocol=mqtt.MQTTv5, 

140 devices=[weather_station, zone_temperature_sensor, heater], 

141 service_groups=[group], 

142 ) 

143 # ToDo: Set user data if required. 

144 mqttc.username_pw_set(username=MQTT_USER, password=MQTT_PW) 

145 # Implement a callback function that gets triggered when the 

146 # command is sent to the device. The incoming command should update the 

147 # heater attribute of the simulation model 

148 

149 def on_command(client, obj, msg): 

150 """ 

151 Callback for incoming commands 

152 """ 

153 # Decode the message payload using the libraries builtin encoders 

154 apikey, device_id, payload = client.get_encoder( 

155 PayloadProtocol.IOTA_JSON 

156 ).decode_message(msg=msg) 

157 

158 sim_model.heater_on = payload[heater.commands[0].name] 

159 

160 # Acknowledge the command. Here commands are usually single 

161 # messages. The first key is equal to the commands name. 

162 client.publish( 

163 device_id=device_id, command_name=next(iter(payload)), payload=payload 

164 ) 

165 

166 # Add the command callback to your MQTTClient. This will get 

167 # triggered for the specified device_id. 

168 mqttc.add_command_callback(device_id=heater.device_id, callback=on_command) 

169 

170 # You need to implement a controller that controls the 

171 # heater state with respect to the zone temperature. This will be 

172 # implemented with asynchronous communication using MQTT-Subscriptions 

173 def on_measurement(client, obj, msg): 

174 """ 

175 Callback for measurement notifications 

176 """ 

177 message = Message.model_validate_json(msg.payload) 

178 updated_zone_temperature_sensor = message.data[0] 

179 

180 # retrieve the value of temperature attribute 

181 temperature = updated_zone_temperature_sensor.temperature.value 

182 

183 update = True 

184 if temperature <= 19: 

185 state = 1 

186 elif temperature >= 21: 

187 state = 0 

188 else: 

189 update = False 

190 # send the command to the heater entity 

191 if update: 

192 command = NamedCommand(name=heater.commands[0].name, value=state) 

193 cbc.post_command( 

194 entity_id=heater.entity_name, 

195 entity_type=heater.entity_type, 

196 command=command, 

197 ) 

198 

199 mqttc.message_callback_add(sub=TOPIC_CONTROLLER, callback=on_measurement) 

200 

201 # ToDo: Create http subscriptions that get triggered by updates of your 

202 # device attributes. Note that you can only post the subscription 

203 # to the context broker. 

204 # Subscription for weather station 

205 cbc.post_subscription( 

206 subscription=Subscription( 

207 subject=Subject( 

208 **{ 

209 "entities": [ 

210 { 

211 "id": weather_station.entity_name, 

212 "type": weather_station.entity_type, 

213 } 

214 ] 

215 } 

216 ), 

217 notification=Notification( 

218 **{"http": {"url": "http://quantumleap:8668/v2/notify"}} 

219 ), 

220 throttling=0, 

221 ) 

222 ) 

223 

224 # Subscription for zone temperature sensor 

225 cbc.post_subscription(subscription=Subscription(...)) 

226 

227 # Subscription for heater 

228 cbc.post_subscription(subscription=Subscription(...)) 

229 

230 # connect to the mqtt broker and subscribe to your topic 

231 mqtt_url = urlparse(MQTT_BROKER_URL_EXPOSED) 

232 mqttc.connect( 

233 host=mqtt_url.hostname, 

234 port=mqtt_url.port, 

235 keepalive=60, 

236 bind_address="", 

237 bind_port=0, 

238 clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, 

239 properties=None, 

240 ) 

241 

242 # subscribe to topics 

243 # subscribe to all incoming command topics for the registered devices 

244 mqttc.subscribe() 

245 mqttc.subscribe(topic=TOPIC_CONTROLLER) 

246 

247 # create a non-blocking thread for mqtt communication 

248 mqttc.loop_start() 

249 

250 # Create a loop that publishes a message every 0.3 seconds to the broker 

251 # that holds the simulation time "sim_time" and the corresponding 

252 # temperature "temperature". You may use the `object_id` 

253 # or the attribute name as key in your payload. 

254 for t_sim in range( 

255 sim_model.t_start, sim_model.t_end + int(COM_STEP), int(COM_STEP) 

256 ): 

257 # publish the simulated ambient temperature 

258 mqttc.publish( 

259 device_id=weather_station.device_id, 

260 payload={"temperature": sim_model.t_amb, "sim_time": sim_model.t_sim}, 

261 ) 

262 

263 # publish the simulated zone temperature 

264 mqttc.publish( 

265 device_id=zone_temperature_sensor.device_id, 

266 payload={"temperature": sim_model.t_zone, "sim_time": sim_model.t_sim}, 

267 ) 

268 

269 # publish the 'sim_time' for the heater device 

270 mqttc.publish(device_id=heater.device_id, payload={"sim_time": sim_model.t_sim}) 

271 

272 time.sleep(0.3) 

273 # simulation step for next loop 

274 sim_model.do_step(int(t_sim + COM_STEP)) 

275 # wait for 0.3 seconds before publishing the next values 

276 time.sleep(0.3) 

277 

278 # close the mqtt listening thread 

279 mqttc.loop_stop() 

280 # disconnect the mqtt device 

281 mqttc.disconnect() 

282 

283 # wait until all data is available 

284 time.sleep(10) 

285 

286 # Todo: Create a quantumleap client. 

287 qlc = QuantumLeapClient(...) 

288 

289 # ToDo: Retrieve the historic data from QuantumLeap, convert them to a 

290 # pandas dataframe and plot them. 

291 # retrieve the data for the weather station 

292 history_weather_station = qlc.get_entity_by_id( 

293 entity_id=weather_station.entity_name, 

294 entity_type=weather_station.entity_type, 

295 last_n=10000, 

296 ) 

297 

298 # convert to pandas dataframe and print it 

299 history_weather_station = history_weather_station.to_pandas() 

300 print(history_weather_station) 

301 # drop unnecessary index levels 

302 history_weather_station = history_weather_station.droplevel( 

303 level=("entityId", "entityType"), axis=1 

304 ) 

305 history_weather_station["sim_time"] = pd.to_numeric( 

306 history_weather_station["sim_time"], downcast="float" 

307 ) 

308 history_weather_station["temperature"] = pd.to_numeric( 

309 history_weather_station["temperature"], downcast="float" 

310 ) 

311 # ToDo: Plot the results. 

312 fig, ax = plt.subplots() 

313 ax.plot( 

314 history_weather_station["sim_time"] / 60, history_weather_station["temperature"] 

315 ) 

316 ax.title.set_text("Weather Station") 

317 ax.set_xlabel("time in min") 

318 ax.set_ylabel("ambient temperature in °C") 

319 plt.show() 

320 

321 # ToDo: Retrieve the data for the zone temperature. 

322 history_zone_temperature_sensor = ... 

323 

324 # ToDo: Convert to pandas dataframe and print it. 

325 history_zone_temperature_sensor = ... 

326 # ToDo: Drop unnecessary index levels. 

327 history_zone_temperature_sensor = ... 

328 history_zone_temperature_sensor["sim_time"] = pd.to_numeric( 

329 history_zone_temperature_sensor["sim_time"], downcast="float" 

330 ) 

331 history_zone_temperature_sensor["temperature"] = pd.to_numeric( 

332 history_zone_temperature_sensor["temperature"], downcast="float" 

333 ) 

334 # ToDo: Plot the results. 

335 fig2, ax2 = plt.subplots() 

336 ax2.plot( 

337 history_zone_temperature_sensor["sim_time"] / 60, 

338 history_zone_temperature_sensor["temperature"], 

339 ) 

340 ax2.title.set_text("Zone Temperature Sensor") 

341 ax2.set_xlabel("time in min") 

342 ax2.set_ylabel("zone temperature in °C") 

343 plt.show() 

344 

345 # ToDo: Retrieve the data for the heater. 

346 history_heater = ... 

347 

348 # convert to pandas dataframe and print it 

349 history_heater = history_heater.to_pandas() 

350 history_heater = history_heater.replace(" ", 0) 

351 print(history_heater) 

352 

353 # ToDo: Drop unnecessary index levels. 

354 history_heater = history_heater.droplevel(level=("entityId", "entityType"), axis=1) 

355 history_heater["sim_time"] = pd.to_numeric( 

356 history_heater["sim_time"], downcast="float" 

357 ) 

358 history_heater["heater_on_info"] = pd.to_numeric( 

359 history_heater["heater_on_info"], downcast="float" 

360 ) 

361 # ToDo: Plot the results. 

362 fig3, ax3 = plt.subplots() 

363 ax3.plot(history_heater["sim_time"] / 60, history_heater["heater_on_info"]) 

364 ax3.title.set_text("Heater") 

365 ax3.set_xlabel("time in min") 

366 ax3.set_ylabel("set point") 

367 plt.show() 

368 

369 clear_iot_agent(url=IOTA_URL, fiware_header=fiware_header) 

370 clear_context_broker(url=CB_URL, fiware_header=fiware_header) 

371 clear_quantumleap(url=QL_URL, fiware_header=fiware_header)