from abc import ABC, abstractmethod import aiohttp import orjson from glom import glom from pydantic.error_wrappers import ValidationError from core.lib import schemas from core.util import logs # Return error if the schema for the message type is not found STRICT_VALIDATION = False # Raise exception if the conversion schema is not found STRICT_CONVERSION = False # TODO: Set them to True when all message types are implemented log = logs.get_logger("clients") class NoSchema(Exception): """ Raised when: - The schema for the message type is not found - The conversion schema is not found - There is no schema library for the client """ pass class NoSuchMethod(Exception): """ Client library has no such method. """ pass class GenericAPIError(Exception): """ Generic API error. """ pass def is_camel_case(s): return s != s.lower() and s != s.upper() and "_" not in s def snake_to_camel(word): if is_camel_case(word): return word return "".join(x.capitalize() or "_" for x in word.split("_")) DEFAULT_HEADERS = { "accept": "application/json", "Content-Type": "application/json", } class BaseClient(ABC): token = None async def __new__(cls, *a, **kw): instance = super().__new__(cls) await instance.__init__(*a, **kw) return instance async def __init__(self, instance): """ Initialise the client. :param instance: the database object, e.g. Aggregator """ name = self.__class__.__name__ self.name = name.replace("Client", "").lower() self.instance = instance self.client = None await self.connect() @abstractmethod async def connect(self): pass @property def schema(self): """ Get the schema library for the client. """ # Does the schemas library have a library for this client name? if hasattr(schemas, f"{self.name}_s"): schema_instance = getattr(schemas, f"{self.name}_s") else: log.error(f"No schema library for {self.name}") raise Exception(f"No schema library for client {self.name}") return schema_instance def get_schema(self, method, convert=False): if isinstance(method, str): to_camel = snake_to_camel(method) else: to_camel = snake_to_camel(method.__class__.__name__) if convert: to_camel = f"{to_camel}Schema" # if hasattr(self.schema, method): # schema = getattr(self.schema, method) if hasattr(self.schema, to_camel): schema = getattr(self.schema, to_camel) else: raise NoSchema(f"Could not get schema: {to_camel}") return schema async def call_method(self, method, *args, **kwargs): """ Call a method with aiohttp. """ if kwargs.get("append_slash", True): path = f"{self.url}/{method}/" else: path = f"{self.url}/{method}" http_method = kwargs.get("http_method", "get") cast = { "headers": DEFAULT_HEADERS, } # Use the token if it's set if self.token is not None: cast["headers"]["Authorization"] = f"Bearer {self.token}" if "data" in kwargs: cast["data"] = orjson.dumps(kwargs["data"]) # Use the method to send a HTTP request async with aiohttp.ClientSession() as session: session_method = getattr(session, http_method) async with session_method(path, **cast) as response: response_json = await response.json() return response_json def convert_spec(self, response, method): """ Convert an API response to the requested spec. :raises NoSchema: If the conversion schema is not found """ schema = self.get_schema(method, convert=True) # Use glom to convert the response to the schema converted = glom(response, schema) return converted def validate_response(self, response, method): schema = self.get_schema(method) # Return a dict of the validated response try: response_valid = schema(**response).dict() except ValidationError as e: log.error(f"Error validating {method} response: {response}") log.error(f"Errors: {e}") raise GenericAPIError("Error validating response") return response_valid def method_filter(self, method): """ Return a new method. """ return method async def call(self, method, *args, **kwargs): """ Call the exchange API and validate the response :raises NoSchema: If the method is not in the schema mapping :raises ValidationError: If the response cannot be validated """ # try: response = await self.call_method(method, *args, **kwargs) # except (APIError, V20Error) as e: # log.error(f"Error calling method {method}: {e}") # raise GenericAPIError(e) if "schema" in kwargs: method = kwargs["schema"] else: method = self.method_filter(method) try: response_valid = self.validate_response(response, method) except NoSchema as e: log.error(f"{e} - {response}") response_valid = response # Convert the response to a format that we can use try: response_converted = self.convert_spec(response_valid, method) except NoSchema as e: log.error(f"{e} - {response}") response_converted = response_valid # return (True, response_converted) return response_converted