Coverage for filip/clients/ngsi_v2/client.py: 74%

72 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2Module for FIWARE api client 

3""" 

4 

5import logging 

6import json 

7import errno 

8from typing import Optional, Union, Dict 

9from pathlib import Path 

10from pydantic import BaseModel, AnyHttpUrl 

11from requests.auth import HTTPBasicAuth, HTTPDigestAuth 

12from requests import Session 

13from filip.clients.base_http_client import BaseHttpClient 

14from filip.config import settings 

15from filip.models.base import FiwareHeader 

16from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient 

17 

18 

19logger = logging.getLogger("client") 

20 

21 

22class HttpClientConfig(BaseModel): 

23 """ 

24 Config class for http client 

25 """ 

26 

27 cb_url: Optional[AnyHttpUrl] = settings.CB_URL 

28 iota_url: Optional[AnyHttpUrl] = settings.IOTA_URL 

29 ql_url: Optional[AnyHttpUrl] = settings.QL_URL 

30 auth: Optional[Dict] = None 

31 

32 

33class HttpClient(BaseHttpClient): 

34 """ 

35 Master client. This client contains all implemented sub clients based on 

36 the principal of composition. Hence, each sub client is accessible from 

37 this client, but they share a general config and if provided a session. 

38 """ 

39 

40 def __init__( 

41 self, 

42 config: Union[str, Path, HttpClientConfig, Dict] = None, 

43 session: Session = None, 

44 fiware_header: FiwareHeader = None, 

45 **kwargs 

46 ): 

47 """ 

48 Constructor for master client 

49 Args: 

50 config (Union[str, Path, Dict]): Configuration object 

51 session (request.Session): Session object 

52 fiware_header (FiwareHeader): Fiware header 

53 **kwargs: Optional arguments that ``request`` takes. 

54 """ 

55 if config: 

56 self.config = config 

57 else: 

58 self.config = HttpClientConfig() 

59 

60 super().__init__(session=session, fiware_header=fiware_header, **kwargs) 

61 

62 # initialize sub clients 

63 self.cb = ContextBrokerClient( 

64 url=self.config.cb_url, 

65 session=self.session, 

66 fiware_header=self.fiware_headers, 

67 **self.kwargs 

68 ) 

69 

70 self.iota = IoTAClient( 

71 url=self.config.iota_url, 

72 session=self.session, 

73 fiware_header=self.fiware_headers, 

74 **self.kwargs 

75 ) 

76 

77 self.timeseries = QuantumLeapClient( 

78 url=self.config.ql_url, 

79 session=self.session, 

80 fiware_header=self.fiware_headers, 

81 **self.kwargs 

82 ) 

83 

84 # from here on deprecated? 

85 auth_types = { 

86 "basicauth": self.__http_basic_auth, 

87 "digestauth": self.__http_digest_auth, 

88 } 

89 # 'oauth2': self.__oauth2} 

90 

91 if self.config.auth: 

92 assert self.config.auth["type"].lower() in auth_types.keys() 

93 self.__get_secrets_file(path=self.config.auth["secret"]) 

94 auth_types[self.config.auth["type"]]() 

95 

96 self.__secrets = { 

97 "username": None, 

98 "password": None, 

99 "client_id": None, 

100 "client_secret": None, 

101 } 

102 

103 @property 

104 def config(self): 

105 """Return current config""" 

106 return self._config 

107 

108 @config.setter 

109 def config(self, config: HttpClientConfig): 

110 """Set a new config""" 

111 if isinstance(config, HttpClientConfig): 

112 self._config = config 

113 elif isinstance(config, (str, Path)): 

114 with open(config) as f: 

115 config_json = f.read() 

116 self._config = HttpClientConfig.model_validate_json(config_json) 

117 else: 

118 self._config = HttpClientConfig.model_validate(config) 

119 

120 @property 

121 def cert(self): 

122 """Return session certificate""" 

123 return self.session.cert 

124 

125 @property 

126 def secrets(self): 

127 """Returns secrets""" 

128 return self.__secrets 

129 

130 @secrets.setter 

131 def secrets(self, data: dict): 

132 """Set new secrets""" 

133 self.__secrets.update(data) 

134 

135 @secrets.deleter 

136 def secrets(self): 

137 """Delete secrets""" 

138 self.__secrets = {} 

139 

140 def __get_secrets_file(self, path=None): 

141 """ 

142 Reads credentials form secret file the path variable is pointing to. 

143 

144 Args: 

145 path: location of secrets-file 

146 Returns: 

147 None 

148 """ 

149 try: 

150 with open(path, "r") as filename: 

151 logger.info("Reading credentials from: %s", path) 

152 self.__secrets.update(json.load(filename)) 

153 

154 except IOError as err: 

155 if err.errno == errno.ENOENT: 

156 logger.error("%s - does not exist", path) 

157 elif err.errno == errno.EACCES: 

158 logger.error("%s - cannot be read", path) 

159 else: 

160 logger.error("%s - some other error", path) 

161 

162 def __http_basic_auth(self): 

163 """ 

164 Initiates a client using the basic authorization mechanism provided by 

165 the requests package. The documentation of the package is located here: 

166 https://requests.readthedocs.io/en/master/user/authentication/ 

167 The credentials must be provided via secret-file. 

168 """ 

169 try: 

170 self.session = Session() 

171 self.session.auth = HTTPBasicAuth( 

172 self.__secrets["username"], self.__secrets["password"] 

173 ) 

174 except KeyError: 

175 pass 

176 

177 def __http_digest_auth(self): 

178 """ 

179 Initiates a client using the digest authorization mechanism provided by 

180 the requests package. The documentation of the package is located here: 

181 https://requests.readthedocs.io/en/master/user/authentication/ 

182 The credentials must be provided via secret-file. 

183 """ 

184 try: 

185 self.session = Session() 

186 self.session.auth = HTTPDigestAuth( 

187 self.__secrets["username"], self.__secrets["password"] 

188 ) 

189 except KeyError: 

190 pass 

191 

192 # def __oauth2(self): 

193 # """ 

194 # Initiates a oauthclient according to the workflows defined by OAuth2.0. 

195 # We use requests-oauthlib for this implementation. The documentation 

196 # of the package is located here: 

197 # https://requests-oauthlib.readthedocs.io/en/latest/index.html 

198 # The information for workflow selection must be provided via 

199 # filip-config. The credentials must be provided via secrets-file. 

200 # :return: None 

201 # """ 

202 # oauth2clients = {'authorization_code': None, 

203 # 'implicit': MobileApplicationClient, 

204 # 'resource_owner_password_credentials': 

205 # LegacyApplicationClient, 

206 # 'client_credentials': BackendApplicationClient, } 

207 # try: 

208 # workflow = self.config['auth']['workflow'] 

209 # except KeyError: 

210 # logger.warning(f"No workflow for OAuth2 defined! Default " 

211 # f"workflow will used: Authorization Code Grant." 

212 # f"Other oauth2-workflows available are: " 

213 # f"{oauth2clients.keys()}") 

214 # workflow = 'authorization_code_grant' 

215 # 

216 # oauthclient = oauth2clients[workflow](client_id=self.__secrets[ 

217 # 'client_id']) 

218 # self.session = OAuth2Session(client_id=None, 

219 # client=oauthclient, 

220 # auto_refresh_url=self.__secrets[ 

221 # 'token_url'], 

222 # auto_refresh_kwargs={ 

223 # self.__secrets['client_id'], 

224 # self.__secrets['client_secret']}) 

225 # 

226 # self.__token = self.session.fetch_token( 

227 # token_url=self.__secrets['token_url'], 

228 # username=self.__secrets['username'], 

229 # password=self.__secrets['password'], 

230 # client_id=self.__secrets['client_id'], 

231 # client_secret=self.__secrets['client_secret']) 

232 

233 def __token_saver(self, token): 

234 self.__token = token