"""
Module for FIWARE api client
"""
import logging
import json
import errno
from typing import Optional, Union, Dict
from pathlib import Path
from pydantic import BaseModel, AnyHttpUrl
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from requests import Session
from filip.clients.base_http_client import BaseHttpClient
from filip.config import settings
from filip.models.base import FiwareHeader
from filip.clients.ngsi_v2 import \
ContextBrokerClient, \
IoTAClient, \
QuantumLeapClient
logger = logging.getLogger('client')
[docs]class HttpClientConfig(BaseModel):
"""
Config class for http client
"""
cb_url: Optional[AnyHttpUrl] = settings.CB_URL
iota_url: Optional[AnyHttpUrl] = settings.IOTA_URL
ql_url: Optional[AnyHttpUrl] = settings.QL_URL
auth: Optional[Dict] = None
[docs]class HttpClient(BaseHttpClient):
"""
Master client. This client contains all implemented sub clients based on
the principal of composition. Hence, each sub client is accessible from
this client, but they share a general config and if provided a session.
"""
def __init__(self,
config: Union[str, Path, HttpClientConfig, Dict] = None,
session: Session = None,
fiware_header: FiwareHeader = None,
**kwargs):
"""
Constructor for master client
Args:
config (Union[str, Path, Dict]): Configuration object
session (request.Session): Session object
fiware_header (FiwareHeader): Fiware header
**kwargs: Optional arguments that ``request`` takes.
"""
if config:
self.config = config
else:
self.config = HttpClientConfig()
super().__init__(session=session,
fiware_header=fiware_header,
**kwargs)
# initialize sub clients
self.cb = ContextBrokerClient(url=self.config.cb_url,
session=self.session,
fiware_header=self.fiware_headers,
**self.kwargs)
self.iota = IoTAClient(url=self.config.iota_url,
session=self.session,
fiware_header=self.fiware_headers,
**self.kwargs)
self.timeseries = QuantumLeapClient(url=self.config.ql_url,
session=self.session,
fiware_header=self.fiware_headers,
**self.kwargs)
# from here on deprecated?
auth_types = {'basicauth': self.__http_basic_auth,
'digestauth': self.__http_digest_auth}
# 'oauth2': self.__oauth2}
if self.config.auth:
assert self.config.auth['type'].lower() in auth_types.keys()
self.__get_secrets_file(path=self.config.auth['secret'])
auth_types[self.config.auth['type']]()
self.__secrets = {"username": None,
"password": None,
"client_id": None,
"client_secret": None}
@property
def config(self):
"""Return current config"""
return self._config
@config.setter
def config(self, config: HttpClientConfig):
"""Set a new config"""
if isinstance(config, HttpClientConfig):
self._config = config
elif isinstance(config, (str, Path)):
with open(config) as f:
config_json = f.read()
self._config = HttpClientConfig.model_validate_json(config_json)
else:
self._config = HttpClientConfig.model_validate(config)
@property
def cert(self):
"""Return session certificate"""
return self.session.cert
@property
def secrets(self):
"""Returns secrets"""
return self.__secrets
@secrets.setter
def secrets(self, data: dict):
"""Set new secrets"""
self.__secrets.update(data)
@secrets.deleter
def secrets(self):
"""Delete secrets"""
self.__secrets = {}
def __get_secrets_file(self, path=None):
"""
Reads credentials form secret file the path variable is pointing to.
Args:
path: location of secrets-file
Returns:
None
"""
try:
with open(path, 'r') as filename:
logger.info("Reading credentials from: %s", path)
self.__secrets.update(json.load(filename))
except IOError as err:
if err.errno == errno.ENOENT:
logger.error("%s - does not exist", path)
elif err.errno == errno.EACCES:
logger.error("%s - cannot be read", path)
else:
logger.error("%s - some other error", path)
def __http_basic_auth(self):
"""
Initiates a client using the basic authorization mechanism provided by
the requests package. The documentation of the package is located here:
https://requests.readthedocs.io/en/master/user/authentication/
The credentials must be provided via secret-file.
"""
try:
self.session = Session()
self.session.auth = HTTPBasicAuth(self.__secrets['username'],
self.__secrets['password'])
except KeyError:
pass
def __http_digest_auth(self):
"""
Initiates a client using the digest authorization mechanism provided by
the requests package. The documentation of the package is located here:
https://requests.readthedocs.io/en/master/user/authentication/
The credentials must be provided via secret-file.
"""
try:
self.session = Session()
self.session.auth = HTTPDigestAuth(self.__secrets['username'],
self.__secrets['password'])
except KeyError:
pass
# def __oauth2(self):
# """
# Initiates a oauthclient according to the workflows defined by OAuth2.0.
# We use requests-oauthlib for this implementation. The documentation
# of the package is located here:
# https://requests-oauthlib.readthedocs.io/en/latest/index.html
# The information for workflow selection must be provided via
# filip-config. The credentials must be provided via secrets-file.
# :return: None
# """
# oauth2clients = {'authorization_code': None,
# 'implicit': MobileApplicationClient,
# 'resource_owner_password_credentials':
# LegacyApplicationClient,
# 'client_credentials': BackendApplicationClient, }
# try:
# workflow = self.config['auth']['workflow']
# except KeyError:
# logger.warning(f"No workflow for OAuth2 defined! Default "
# f"workflow will used: Authorization Code Grant."
# f"Other oauth2-workflows available are: "
# f"{oauth2clients.keys()}")
# workflow = 'authorization_code_grant'
#
# oauthclient = oauth2clients[workflow](client_id=self.__secrets[
# 'client_id'])
# self.session = OAuth2Session(client_id=None,
# client=oauthclient,
# auto_refresh_url=self.__secrets[
# 'token_url'],
# auto_refresh_kwargs={
# self.__secrets['client_id'],
# self.__secrets['client_secret']})
#
# self.__token = self.session.fetch_token(
# token_url=self.__secrets['token_url'],
# username=self.__secrets['username'],
# password=self.__secrets['password'],
# client_id=self.__secrets['client_id'],
# client_secret=self.__secrets['client_secret'])
def __token_saver(self, token):
self.__token = token