Coverage for filip/clients/ngsi_v2/client.py: 74%
72 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2Module for FIWARE api client
3"""
5import logging
6import json
7import errno
8from typing import Optional, Union, Dict
9from pathlib import Path
10from pydantic import BaseModel, AnyHttpUrl
11from requests.auth import HTTPBasicAuth, HTTPDigestAuth
12from requests import Session
13from filip.clients.base_http_client import BaseHttpClient
14from filip.config import settings
15from filip.models.base import FiwareHeader
16from filip.clients.ngsi_v2 import ContextBrokerClient, IoTAClient, QuantumLeapClient
19logger = logging.getLogger("client")
22class HttpClientConfig(BaseModel):
23 """
24 Config class for http client
25 """
27 cb_url: Optional[AnyHttpUrl] = settings.CB_URL
28 iota_url: Optional[AnyHttpUrl] = settings.IOTA_URL
29 ql_url: Optional[AnyHttpUrl] = settings.QL_URL
30 auth: Optional[Dict] = None
33class HttpClient(BaseHttpClient):
34 """
35 Master client. This client contains all implemented sub clients based on
36 the principal of composition. Hence, each sub client is accessible from
37 this client, but they share a general config and if provided a session.
38 """
40 def __init__(
41 self,
42 config: Union[str, Path, HttpClientConfig, Dict] = None,
43 session: Session = None,
44 fiware_header: FiwareHeader = None,
45 **kwargs
46 ):
47 """
48 Constructor for master client
49 Args:
50 config (Union[str, Path, Dict]): Configuration object
51 session (request.Session): Session object
52 fiware_header (FiwareHeader): Fiware header
53 **kwargs: Optional arguments that ``request`` takes.
54 """
55 if config:
56 self.config = config
57 else:
58 self.config = HttpClientConfig()
60 super().__init__(session=session, fiware_header=fiware_header, **kwargs)
62 # initialize sub clients
63 self.cb = ContextBrokerClient(
64 url=self.config.cb_url,
65 session=self.session,
66 fiware_header=self.fiware_headers,
67 **self.kwargs
68 )
70 self.iota = IoTAClient(
71 url=self.config.iota_url,
72 session=self.session,
73 fiware_header=self.fiware_headers,
74 **self.kwargs
75 )
77 self.timeseries = QuantumLeapClient(
78 url=self.config.ql_url,
79 session=self.session,
80 fiware_header=self.fiware_headers,
81 **self.kwargs
82 )
84 # from here on deprecated?
85 auth_types = {
86 "basicauth": self.__http_basic_auth,
87 "digestauth": self.__http_digest_auth,
88 }
89 # 'oauth2': self.__oauth2}
91 if self.config.auth:
92 assert self.config.auth["type"].lower() in auth_types.keys()
93 self.__get_secrets_file(path=self.config.auth["secret"])
94 auth_types[self.config.auth["type"]]()
96 self.__secrets = {
97 "username": None,
98 "password": None,
99 "client_id": None,
100 "client_secret": None,
101 }
103 @property
104 def config(self):
105 """Return current config"""
106 return self._config
108 @config.setter
109 def config(self, config: HttpClientConfig):
110 """Set a new config"""
111 if isinstance(config, HttpClientConfig):
112 self._config = config
113 elif isinstance(config, (str, Path)):
114 with open(config) as f:
115 config_json = f.read()
116 self._config = HttpClientConfig.model_validate_json(config_json)
117 else:
118 self._config = HttpClientConfig.model_validate(config)
120 @property
121 def cert(self):
122 """Return session certificate"""
123 return self.session.cert
125 @property
126 def secrets(self):
127 """Returns secrets"""
128 return self.__secrets
130 @secrets.setter
131 def secrets(self, data: dict):
132 """Set new secrets"""
133 self.__secrets.update(data)
135 @secrets.deleter
136 def secrets(self):
137 """Delete secrets"""
138 self.__secrets = {}
140 def __get_secrets_file(self, path=None):
141 """
142 Reads credentials form secret file the path variable is pointing to.
144 Args:
145 path: location of secrets-file
146 Returns:
147 None
148 """
149 try:
150 with open(path, "r") as filename:
151 logger.info("Reading credentials from: %s", path)
152 self.__secrets.update(json.load(filename))
154 except IOError as err:
155 if err.errno == errno.ENOENT:
156 logger.error("%s - does not exist", path)
157 elif err.errno == errno.EACCES:
158 logger.error("%s - cannot be read", path)
159 else:
160 logger.error("%s - some other error", path)
162 def __http_basic_auth(self):
163 """
164 Initiates a client using the basic authorization mechanism provided by
165 the requests package. The documentation of the package is located here:
166 https://requests.readthedocs.io/en/master/user/authentication/
167 The credentials must be provided via secret-file.
168 """
169 try:
170 self.session = Session()
171 self.session.auth = HTTPBasicAuth(
172 self.__secrets["username"], self.__secrets["password"]
173 )
174 except KeyError:
175 pass
177 def __http_digest_auth(self):
178 """
179 Initiates a client using the digest authorization mechanism provided by
180 the requests package. The documentation of the package is located here:
181 https://requests.readthedocs.io/en/master/user/authentication/
182 The credentials must be provided via secret-file.
183 """
184 try:
185 self.session = Session()
186 self.session.auth = HTTPDigestAuth(
187 self.__secrets["username"], self.__secrets["password"]
188 )
189 except KeyError:
190 pass
192 # def __oauth2(self):
193 # """
194 # Initiates a oauthclient according to the workflows defined by OAuth2.0.
195 # We use requests-oauthlib for this implementation. The documentation
196 # of the package is located here:
197 # https://requests-oauthlib.readthedocs.io/en/latest/index.html
198 # The information for workflow selection must be provided via
199 # filip-config. The credentials must be provided via secrets-file.
200 # :return: None
201 # """
202 # oauth2clients = {'authorization_code': None,
203 # 'implicit': MobileApplicationClient,
204 # 'resource_owner_password_credentials':
205 # LegacyApplicationClient,
206 # 'client_credentials': BackendApplicationClient, }
207 # try:
208 # workflow = self.config['auth']['workflow']
209 # except KeyError:
210 # logger.warning(f"No workflow for OAuth2 defined! Default "
211 # f"workflow will used: Authorization Code Grant."
212 # f"Other oauth2-workflows available are: "
213 # f"{oauth2clients.keys()}")
214 # workflow = 'authorization_code_grant'
215 #
216 # oauthclient = oauth2clients[workflow](client_id=self.__secrets[
217 # 'client_id'])
218 # self.session = OAuth2Session(client_id=None,
219 # client=oauthclient,
220 # auto_refresh_url=self.__secrets[
221 # 'token_url'],
222 # auto_refresh_kwargs={
223 # self.__secrets['client_id'],
224 # self.__secrets['client_secret']})
225 #
226 # self.__token = self.session.fetch_token(
227 # token_url=self.__secrets['token_url'],
228 # username=self.__secrets['username'],
229 # password=self.__secrets['password'],
230 # client_id=self.__secrets['client_id'],
231 # client_secret=self.__secrets['client_secret'])
233 def __token_saver(self, token):
234 self.__token = token