Coverage for filip/utils/cleanup.py: 93%

86 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-02-19 11:48 +0000

1""" 

2Functions to clean up a tenant within a fiware based platform. 

3""" 

4 

5import warnings 

6from functools import wraps 

7 

8from pydantic import AnyHttpUrl, AnyUrl 

9from requests import RequestException 

10from typing import Callable, List, Union 

11from filip.models import FiwareHeader, FiwareLDHeader 

12from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient 

13from filip.clients.ngsi_ld.cb import ContextBrokerLDClient 

14from filip.models.ngsi_ld.context import ActionTypeLD 

15import logging 

16 

17logger = logging.getLogger(__name__) 

18logger.setLevel(logging.DEBUG) 

19 

20 

21def clear_context_broker_ld( 

22 url: str = None, 

23 fiware_ld_header: FiwareLDHeader = None, 

24 cb_ld_client: ContextBrokerLDClient = None, 

25): 

26 """ 

27 Function deletes all entities and subscriptions for a tenant in an LD context broker. 

28 

29 Args: 

30 url: Url of the context broker LD 

31 fiware_ld_header: header of the NGSI-LD tenant 

32 cb_ld_client: NGSI-LD context broker client object 

33 

34 Returns: 

35 

36 """ 

37 assert url or cb_ld_client, "Either url or client object must be given" 

38 # create client 

39 if cb_ld_client is None: 

40 client = ContextBrokerLDClient(url=url, fiware_header=fiware_ld_header) 

41 else: 

42 client = cb_ld_client 

43 # clean entities iteratively 

44 try: 

45 entity_list = True 

46 while entity_list: 

47 entity_list = client.get_entity_list(limit=100) 

48 if entity_list: 

49 client.entity_batch_operation( 

50 action_type=ActionTypeLD.DELETE, entities=entity_list 

51 ) 

52 except RequestException as e: 

53 logger.warning("Could not clean entities completely") 

54 raise 

55 

56 # clean subscriptions 

57 try: 

58 sub_list = cb_ld_client.get_subscription_list() 

59 for sub in sub_list: 

60 cb_ld_client.delete_subscription(sub.id) 

61 except RequestException as e: 

62 logger.warning("Could not clean subscriptions completely") 

63 raise 

64 

65 

66def clear_context_broker( 

67 url: str = None, 

68 fiware_header: FiwareHeader = None, 

69 clear_registrations: bool = False, 

70 cb_client: ContextBrokerClient = None, 

71): 

72 """ 

73 Function deletes all entities, registrations and subscriptions for a 

74 given fiware header. To use TLS connection you need to provide the cb_client parameter 

75 as an argument with the Session object including the certificate and private key. 

76 

77 Note: 

78 Always clear the devices first because the IoT-Agent will otherwise 

79 through errors if it cannot find its registration anymore. 

80 

81 Args: 

82 url: Url of the context broker service 

83 fiware_header: header of the tenant 

84 cb_client: enables TLS communication if created with Session object, only needed 

85 for self-signed certificates 

86 clear_registrations: Determines whether registrations should be deleted. 

87 If registrations are deleted while devices with commands 

88 still exist, these devices become unreachable. 

89 Only set to true once such devices are cleared. 

90 Returns: 

91 None 

92 """ 

93 assert url or cb_client, "Either url or client object must be given" 

94 # create client 

95 if cb_client is None: 

96 client = ContextBrokerClient(url=url, fiware_header=fiware_header) 

97 else: 

98 client = cb_client 

99 

100 # clear registrations 

101 if clear_registrations: 

102 for reg in client.get_registration_list(): 

103 client.delete_registration(registration_id=reg.id) 

104 assert len(client.get_registration_list()) == 0 

105 

106 # clean entities 

107 client.delete_entities(entities=client.get_entity_list()) 

108 

109 # clear subscriptions 

110 for sub in client.get_subscription_list(): 

111 client.delete_subscription(subscription_id=sub.id) 

112 assert len(client.get_subscription_list()) == 0 

113 

114 

115def clear_iot_agent( 

116 url: Union[str, AnyHttpUrl] = None, 

117 fiware_header: FiwareHeader = None, 

118 iota_client: IoTAClient = None, 

119): 

120 """ 

121 Function deletes all device groups and devices for a 

122 given fiware header. To use TLS connection you need to provide the iota_client parameter 

123 as an argument with the Session object including the certificate and private key. 

124 

125 Args: 

126 url: Url of the iot agent service 

127 fiware_header: header of the tenant 

128 iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates 

129 

130 Returns: 

131 None 

132 """ 

133 assert url or iota_client, "Either url or client object must be given" 

134 # create client 

135 if iota_client is None: 

136 client = IoTAClient(url=url, fiware_header=fiware_header) 

137 else: 

138 client = iota_client 

139 

140 # clear registrations 

141 for device in client.get_device_list(): 

142 client.delete_device(device_id=device.device_id) 

143 assert len(client.get_device_list()) == 0 

144 

145 # clear groups 

146 for group in client.get_group_list(): 

147 client.delete_group(resource=group.resource, apikey=group.apikey) 

148 assert len(client.get_group_list()) == 0 

149 

150 

151def clear_quantumleap( 

152 url: str = None, 

153 fiware_header: FiwareHeader = None, 

154 ql_client: QuantumLeapClient = None, 

155): 

156 """ 

157 Function deletes all data for a given fiware header. To use TLS connection you need to provide the ql_client parameter 

158 as an argument with the Session object including the certificate and private key. 

159 Args: 

160 url: Url of the quantumleap service 

161 fiware_header: header of the tenant 

162 ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates 

163 

164 Returns: 

165 None 

166 """ 

167 

168 def handle_emtpy_db_exception(err: RequestException) -> None: 

169 """ 

170 When the database is empty for request quantumleap returns a 404 

171 error with a error message. This will be handled here 

172 evaluating the empty database error as 'OK' 

173 

174 Args: 

175 err: exception raised by delete function 

176 """ 

177 if ( 

178 err.response.status_code == 404 

179 and err.response.json().get("error", None) == "Not Found" 

180 ): 

181 pass 

182 else: 

183 raise 

184 

185 assert url or ql_client, "Either url or client object must be given" 

186 # create client 

187 if ql_client is None: 

188 client = QuantumLeapClient(url=url, fiware_header=fiware_header) 

189 else: 

190 client = ql_client 

191 

192 # clear data 

193 entities = [] 

194 try: 

195 entities = client.get_entities() 

196 except RequestException as err: 

197 handle_emtpy_db_exception(err) 

198 

199 # will be executed for all found entities 

200 for entity in entities: 

201 client.delete_entity(entity_id=entity.entityId, entity_type=entity.entityType) 

202 

203 

204def clear_all( 

205 *, 

206 fiware_header: FiwareHeader = None, 

207 cb_url: str = None, 

208 iota_url: Union[str, List[str]] = None, 

209 ql_url: str = None, 

210 cb_client: ContextBrokerClient = None, 

211 iota_client: IoTAClient = None, 

212 ql_client: QuantumLeapClient = None 

213): 

214 """ 

215 Clears all services that a url is provided for. 

216 If cb_url is provided, the registration will also be deleted. 

217 

218 Args: 

219 fiware_header: 

220 cb_url: url of the context broker service 

221 iota_url: url of the IoT-Agent service 

222 ql_url: url of the QuantumLeap service 

223 cb_client: enables TLS communication if created with Session object, only needed 

224 for self-signed certificates 

225 iota_client: enables TLS communication if created with Session object, only needed 

226 for self-signed certificates 

227 ql_client: enables TLS communication if created with Session object, only needed 

228 for self-signed certificates 

229 

230 Returns: 

231 None 

232 """ 

233 if iota_url is not None or iota_client is not None: 

234 if iota_url is None: 

235 # loop client 

236 if isinstance(iota_client, IoTAClient): 

237 iota_client = [iota_client] 

238 for client in iota_client: 

239 clear_iot_agent(fiware_header=fiware_header, iota_client=client) 

240 else: 

241 if isinstance(iota_url, (str, AnyUrl)): 

242 iota_url = [iota_url] 

243 for url in iota_url: 

244 clear_iot_agent(url=url, fiware_header=fiware_header) 

245 

246 if cb_url is not None or cb_client is not None: 

247 clear_context_broker( 

248 url=cb_url, 

249 fiware_header=fiware_header, 

250 cb_client=cb_client, 

251 clear_registrations=True, 

252 ) 

253 

254 if ql_url is not None or ql_client is not None: 

255 clear_quantumleap(url=ql_url, fiware_header=fiware_header, ql_client=ql_client) 

256 

257 

258def clean_test( 

259 *, 

260 fiware_service: str, 

261 fiware_servicepath: str, 

262 cb_url: str = None, 

263 iota_url: Union[str, List[str]] = None, 

264 ql_url: str = None, 

265 cb_client: ContextBrokerClient = None, 

266 iota_client: IoTAClient = None, 

267 ql_client: QuantumLeapClient = None 

268) -> Callable: 

269 """ 

270 Decorator to clean up the server before and after the test 

271 

272 Note: 

273 This does not substitute a proper TearDown method, because a failing 

274 test will not execute the clean up after the error. Since this would 

275 mean an unnecessary error handling. We actually want a test to fail 

276 with proper messages. 

277 

278 Args: 

279 fiware_service: tenant 

280 fiware_servicepath: tenant path 

281 cb_url: url of context broker service 

282 iota_url: url of IoT-Agent service 

283 ql_url: url of quantumleap service 

284 cb_client: enables TLS communication if created with Session object, only needed for self-signed certificates 

285 iota_client: enables TLS communication if created with Session object, only needed for self-signed certificates 

286 ql_client: enables TLS communication if created with Session object, only needed for self-signed certificates 

287 

288 Returns: 

289 Decorator for clean tests 

290 """ 

291 fiware_header = FiwareHeader( 

292 service=fiware_service, service_path=fiware_servicepath 

293 ) 

294 clear_all( 

295 fiware_header=fiware_header, 

296 cb_url=cb_url, 

297 iota_url=iota_url, 

298 ql_url=ql_url, 

299 cb_client=cb_client, 

300 iota_client=iota_client, 

301 ql_client=ql_client, 

302 ) 

303 

304 # Inner decorator function 

305 def decorator(func): 

306 # Wrapper function for the decorated function 

307 @wraps(func) 

308 def wrapper(*args, **kwargs): 

309 return func(*args, **kwargs) 

310 

311 return wrapper 

312 

313 clear_all( 

314 fiware_header=fiware_header, 

315 cb_url=cb_url, 

316 iota_url=iota_url, 

317 ql_url=ql_url, 

318 cb_client=cb_client, 

319 iota_client=iota_client, 

320 ql_client=ql_client, 

321 ) 

322 

323 return decorator