Coverage for filip/clients/ngsi_v2/client.py: 74%

72 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Module for FIWARE api client 

3""" 

4import logging 

5import json 

6import errno 

7from typing import Optional, Union, Dict 

8from pathlib import Path 

9from pydantic import BaseModel, AnyHttpUrl 

10from requests.auth import HTTPBasicAuth, HTTPDigestAuth 

11from requests import Session 

12from filip.clients.base_http_client import BaseHttpClient 

13from filip.config import settings 

14from filip.models.base import FiwareHeader 

15from filip.clients.ngsi_v2 import \ 

16 ContextBrokerClient, \ 

17 IoTAClient, \ 

18 QuantumLeapClient 

19 

20 

21logger = logging.getLogger('client') 

22 

23 

24class HttpClientConfig(BaseModel): 

25 """ 

26 Config class for http client 

27 """ 

28 cb_url: Optional[AnyHttpUrl] = settings.CB_URL 

29 iota_url: Optional[AnyHttpUrl] = settings.IOTA_URL 

30 ql_url: Optional[AnyHttpUrl] = settings.QL_URL 

31 auth: Optional[Dict] = None 

32 

33 

34class HttpClient(BaseHttpClient): 

35 """ 

36 Master client. This client contains all implemented sub clients based on 

37 the principal of composition. Hence, each sub client is accessible from 

38 this client, but they share a general config and if provided a session. 

39 """ 

40 def __init__(self, 

41 config: Union[str, Path, HttpClientConfig, Dict] = None, 

42 session: Session = None, 

43 fiware_header: FiwareHeader = None, 

44 **kwargs): 

45 """ 

46 Constructor for master client 

47 Args: 

48 config (Union[str, Path, Dict]): Configuration object 

49 session (request.Session): Session object 

50 fiware_header (FiwareHeader): Fiware header 

51 **kwargs: Optional arguments that ``request`` takes. 

52 """ 

53 if config: 

54 self.config = config 

55 else: 

56 self.config = HttpClientConfig() 

57 

58 super().__init__(session=session, 

59 fiware_header=fiware_header, 

60 **kwargs) 

61 

62 # initialize sub clients 

63 self.cb = ContextBrokerClient(url=self.config.cb_url, 

64 session=self.session, 

65 fiware_header=self.fiware_headers, 

66 **self.kwargs) 

67 

68 self.iota = IoTAClient(url=self.config.iota_url, 

69 session=self.session, 

70 fiware_header=self.fiware_headers, 

71 **self.kwargs) 

72 

73 self.timeseries = QuantumLeapClient(url=self.config.ql_url, 

74 session=self.session, 

75 fiware_header=self.fiware_headers, 

76 **self.kwargs) 

77 

78 # from here on deprecated? 

79 auth_types = {'basicauth': self.__http_basic_auth, 

80 'digestauth': self.__http_digest_auth} 

81 # 'oauth2': self.__oauth2} 

82 

83 if self.config.auth: 

84 assert self.config.auth['type'].lower() in auth_types.keys() 

85 self.__get_secrets_file(path=self.config.auth['secret']) 

86 auth_types[self.config.auth['type']]() 

87 

88 self.__secrets = {"username": None, 

89 "password": None, 

90 "client_id": None, 

91 "client_secret": None} 

92 

93 @property 

94 def config(self): 

95 """Return current config""" 

96 return self._config 

97 

98 @config.setter 

99 def config(self, config: HttpClientConfig): 

100 """Set a new config""" 

101 if isinstance(config, HttpClientConfig): 

102 self._config = config 

103 elif isinstance(config, (str, Path)): 

104 with open(config) as f: 

105 config_json = f.read() 

106 self._config = HttpClientConfig.model_validate_json(config_json) 

107 else: 

108 self._config = HttpClientConfig.model_validate(config) 

109 

110 @property 

111 def cert(self): 

112 """Return session certificate""" 

113 return self.session.cert 

114 

115 @property 

116 def secrets(self): 

117 """Returns secrets""" 

118 return self.__secrets 

119 

120 @secrets.setter 

121 def secrets(self, data: dict): 

122 """Set new secrets""" 

123 self.__secrets.update(data) 

124 

125 @secrets.deleter 

126 def secrets(self): 

127 """Delete secrets""" 

128 self.__secrets = {} 

129 

130 def __get_secrets_file(self, path=None): 

131 """ 

132 Reads credentials form secret file the path variable is pointing to. 

133 

134 Args: 

135 path: location of secrets-file 

136 Returns: 

137 None 

138 """ 

139 try: 

140 with open(path, 'r') as filename: 

141 logger.info("Reading credentials from: %s", path) 

142 self.__secrets.update(json.load(filename)) 

143 

144 except IOError as err: 

145 if err.errno == errno.ENOENT: 

146 logger.error("%s - does not exist", path) 

147 elif err.errno == errno.EACCES: 

148 logger.error("%s - cannot be read", path) 

149 else: 

150 logger.error("%s - some other error", path) 

151 

152 def __http_basic_auth(self): 

153 """ 

154 Initiates a client using the basic authorization mechanism provided by 

155 the requests package. The documentation of the package is located here: 

156 https://requests.readthedocs.io/en/master/user/authentication/ 

157 The credentials must be provided via secret-file. 

158 """ 

159 try: 

160 self.session = Session() 

161 self.session.auth = HTTPBasicAuth(self.__secrets['username'], 

162 self.__secrets['password']) 

163 except KeyError: 

164 pass 

165 

166 def __http_digest_auth(self): 

167 """ 

168 Initiates a client using the digest authorization mechanism provided by 

169 the requests package. The documentation of the package is located here: 

170 https://requests.readthedocs.io/en/master/user/authentication/ 

171 The credentials must be provided via secret-file. 

172 """ 

173 try: 

174 self.session = Session() 

175 self.session.auth = HTTPDigestAuth(self.__secrets['username'], 

176 self.__secrets['password']) 

177 except KeyError: 

178 pass 

179 

180 # def __oauth2(self): 

181 # """ 

182 # Initiates a oauthclient according to the workflows defined by OAuth2.0. 

183 # We use requests-oauthlib for this implementation. The documentation 

184 # of the package is located here: 

185 # https://requests-oauthlib.readthedocs.io/en/latest/index.html 

186 # The information for workflow selection must be provided via 

187 # filip-config. The credentials must be provided via secrets-file. 

188 # :return: None 

189 # """ 

190 # oauth2clients = {'authorization_code': None, 

191 # 'implicit': MobileApplicationClient, 

192 # 'resource_owner_password_credentials': 

193 # LegacyApplicationClient, 

194 # 'client_credentials': BackendApplicationClient, } 

195 # try: 

196 # workflow = self.config['auth']['workflow'] 

197 # except KeyError: 

198 # logger.warning(f"No workflow for OAuth2 defined! Default " 

199 # f"workflow will used: Authorization Code Grant." 

200 # f"Other oauth2-workflows available are: " 

201 # f"{oauth2clients.keys()}") 

202 # workflow = 'authorization_code_grant' 

203# 

204 # oauthclient = oauth2clients[workflow](client_id=self.__secrets[ 

205 # 'client_id']) 

206 # self.session = OAuth2Session(client_id=None, 

207 # client=oauthclient, 

208 # auto_refresh_url=self.__secrets[ 

209 # 'token_url'], 

210 # auto_refresh_kwargs={ 

211 # self.__secrets['client_id'], 

212 # self.__secrets['client_secret']}) 

213# 

214 # self.__token = self.session.fetch_token( 

215 # token_url=self.__secrets['token_url'], 

216 # username=self.__secrets['username'], 

217 # password=self.__secrets['password'], 

218 # client_id=self.__secrets['client_id'], 

219 # client_secret=self.__secrets['client_secret']) 

220 

221 def __token_saver(self, token): 

222 self.__token = token