Coverage for filip/clients/ngsi_v2/client.py: 74%
72 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Module for FIWARE api client
3"""
4import logging
5import json
6import errno
7from typing import Optional, Union, Dict
8from pathlib import Path
9from pydantic import BaseModel, AnyHttpUrl
10from requests.auth import HTTPBasicAuth, HTTPDigestAuth
11from requests import Session
12from filip.clients.base_http_client import BaseHttpClient
13from filip.config import settings
14from filip.models.base import FiwareHeader
15from filip.clients.ngsi_v2 import \
16 ContextBrokerClient, \
17 IoTAClient, \
18 QuantumLeapClient
21logger = logging.getLogger('client')
24class HttpClientConfig(BaseModel):
25 """
26 Config class for http client
27 """
28 cb_url: Optional[AnyHttpUrl] = settings.CB_URL
29 iota_url: Optional[AnyHttpUrl] = settings.IOTA_URL
30 ql_url: Optional[AnyHttpUrl] = settings.QL_URL
31 auth: Optional[Dict] = None
34class HttpClient(BaseHttpClient):
35 """
36 Master client. This client contains all implemented sub clients based on
37 the principal of composition. Hence, each sub client is accessible from
38 this client, but they share a general config and if provided a session.
39 """
40 def __init__(self,
41 config: Union[str, Path, HttpClientConfig, Dict] = None,
42 session: Session = None,
43 fiware_header: FiwareHeader = None,
44 **kwargs):
45 """
46 Constructor for master client
47 Args:
48 config (Union[str, Path, Dict]): Configuration object
49 session (request.Session): Session object
50 fiware_header (FiwareHeader): Fiware header
51 **kwargs: Optional arguments that ``request`` takes.
52 """
53 if config:
54 self.config = config
55 else:
56 self.config = HttpClientConfig()
58 super().__init__(session=session,
59 fiware_header=fiware_header,
60 **kwargs)
62 # initialize sub clients
63 self.cb = ContextBrokerClient(url=self.config.cb_url,
64 session=self.session,
65 fiware_header=self.fiware_headers,
66 **self.kwargs)
68 self.iota = IoTAClient(url=self.config.iota_url,
69 session=self.session,
70 fiware_header=self.fiware_headers,
71 **self.kwargs)
73 self.timeseries = QuantumLeapClient(url=self.config.ql_url,
74 session=self.session,
75 fiware_header=self.fiware_headers,
76 **self.kwargs)
78 # from here on deprecated?
79 auth_types = {'basicauth': self.__http_basic_auth,
80 'digestauth': self.__http_digest_auth}
81 # 'oauth2': self.__oauth2}
83 if self.config.auth:
84 assert self.config.auth['type'].lower() in auth_types.keys()
85 self.__get_secrets_file(path=self.config.auth['secret'])
86 auth_types[self.config.auth['type']]()
88 self.__secrets = {"username": None,
89 "password": None,
90 "client_id": None,
91 "client_secret": None}
93 @property
94 def config(self):
95 """Return current config"""
96 return self._config
98 @config.setter
99 def config(self, config: HttpClientConfig):
100 """Set a new config"""
101 if isinstance(config, HttpClientConfig):
102 self._config = config
103 elif isinstance(config, (str, Path)):
104 with open(config) as f:
105 config_json = f.read()
106 self._config = HttpClientConfig.model_validate_json(config_json)
107 else:
108 self._config = HttpClientConfig.model_validate(config)
110 @property
111 def cert(self):
112 """Return session certificate"""
113 return self.session.cert
115 @property
116 def secrets(self):
117 """Returns secrets"""
118 return self.__secrets
120 @secrets.setter
121 def secrets(self, data: dict):
122 """Set new secrets"""
123 self.__secrets.update(data)
125 @secrets.deleter
126 def secrets(self):
127 """Delete secrets"""
128 self.__secrets = {}
130 def __get_secrets_file(self, path=None):
131 """
132 Reads credentials form secret file the path variable is pointing to.
134 Args:
135 path: location of secrets-file
136 Returns:
137 None
138 """
139 try:
140 with open(path, 'r') as filename:
141 logger.info("Reading credentials from: %s", path)
142 self.__secrets.update(json.load(filename))
144 except IOError as err:
145 if err.errno == errno.ENOENT:
146 logger.error("%s - does not exist", path)
147 elif err.errno == errno.EACCES:
148 logger.error("%s - cannot be read", path)
149 else:
150 logger.error("%s - some other error", path)
152 def __http_basic_auth(self):
153 """
154 Initiates a client using the basic authorization mechanism provided by
155 the requests package. The documentation of the package is located here:
156 https://requests.readthedocs.io/en/master/user/authentication/
157 The credentials must be provided via secret-file.
158 """
159 try:
160 self.session = Session()
161 self.session.auth = HTTPBasicAuth(self.__secrets['username'],
162 self.__secrets['password'])
163 except KeyError:
164 pass
166 def __http_digest_auth(self):
167 """
168 Initiates a client using the digest authorization mechanism provided by
169 the requests package. The documentation of the package is located here:
170 https://requests.readthedocs.io/en/master/user/authentication/
171 The credentials must be provided via secret-file.
172 """
173 try:
174 self.session = Session()
175 self.session.auth = HTTPDigestAuth(self.__secrets['username'],
176 self.__secrets['password'])
177 except KeyError:
178 pass
180 # def __oauth2(self):
181 # """
182 # Initiates a oauthclient according to the workflows defined by OAuth2.0.
183 # We use requests-oauthlib for this implementation. The documentation
184 # of the package is located here:
185 # https://requests-oauthlib.readthedocs.io/en/latest/index.html
186 # The information for workflow selection must be provided via
187 # filip-config. The credentials must be provided via secrets-file.
188 # :return: None
189 # """
190 # oauth2clients = {'authorization_code': None,
191 # 'implicit': MobileApplicationClient,
192 # 'resource_owner_password_credentials':
193 # LegacyApplicationClient,
194 # 'client_credentials': BackendApplicationClient, }
195 # try:
196 # workflow = self.config['auth']['workflow']
197 # except KeyError:
198 # logger.warning(f"No workflow for OAuth2 defined! Default "
199 # f"workflow will used: Authorization Code Grant."
200 # f"Other oauth2-workflows available are: "
201 # f"{oauth2clients.keys()}")
202 # workflow = 'authorization_code_grant'
203#
204 # oauthclient = oauth2clients[workflow](client_id=self.__secrets[
205 # 'client_id'])
206 # self.session = OAuth2Session(client_id=None,
207 # client=oauthclient,
208 # auto_refresh_url=self.__secrets[
209 # 'token_url'],
210 # auto_refresh_kwargs={
211 # self.__secrets['client_id'],
212 # self.__secrets['client_secret']})
213#
214 # self.__token = self.session.fetch_token(
215 # token_url=self.__secrets['token_url'],
216 # username=self.__secrets['username'],
217 # password=self.__secrets['password'],
218 # client_id=self.__secrets['client_id'],
219 # client_secret=self.__secrets['client_secret'])
221 def __token_saver(self, token):
222 self.__token = token