from glom import glom from pydantic import ValidationError from core.lib import schemas from core.util import logs # Return error if the schema for the message type is not found STRICT_VALIDATION = False # Raise exception if the conversion schema is not found STRICT_CONVERSTION = False # TODO: Set them to True when all message types are implemented class BaseExchange(object): def __init__(self, account): name = self.__class__.__name__ self.name = name.replace("Exchange", "").lower() self.account = account self.log = logs.get_logger(self.name) self.client = None self.set_schema() self.connect() def set_schema(self): raise NotImplementedError def connect(self): raise NotImplementedError def convert_spec(self, response, msg_type): # Does the schemas library have a library for this exchange name? if hasattr(schemas, f"{self.name}_s"): schema_instance = getattr(schemas, f"{self.name}_s") else: raise Exception(f"No schema for {self.name} in schema mapping") # Does the message type have a conversion spec for this message type? if hasattr(schema_instance, f"{msg_type}_schema"): schema = getattr(schema_instance, f"{msg_type}_schema") else: # Let us know so we can implement it, but don't do anything with it self.log.error(f"No schema for message: {msg_type} - {response}") if STRICT_CONVERSION: raise Exception(f"No schema for {msg_type} in schema mapping") return response # Use glom to convert the response to the schema converted = glom(response, schema) print(f"[{self.name}] Converted of {msg_type}: {converted}") return converted def call(self, method, *args, **kwargs) -> (bool, dict): if hasattr(self.client, method): try: response = getattr(self.client, method)(*args, **kwargs) if isinstance(response, list): response = {"itemlist": response} if method not in self.schema: self.log.error(f"Method cannot be validated: {method}") self.log.debug(f"Response: {response}") if STRICT_VALIDATION: return (False, f"Method cannot be validated: {method}") return (True, response) # Return a dict of the validated response response_valid = self.schema[method](**response).dict() # Convert the response to a format that we can use response_converted = self.convert_spec(response_valid, method) return (True, response_converted) except ValidationError as e: self.log.error(f"Could not validate response: {e}") return (False, e) except Exception as e: self.log.error(f"Error calling {method}: {e}") return (False, e) else: return (False, "No such method") def get_account(self): raise NotImplementedError def get_supported_assets(self): raise NotImplementedError def get_balance(self): raise NotImplementedError def get_market_value(self, symbol): raise NotImplementedError def post_trade(self, trade): raise NotImplementedError def get_trade(self, trade_id): raise NotImplementedError def update_trade(self, trade): raise NotImplementedError def cancel_trade(self, trade_id): raise NotImplementedError def get_position_info(self, symbol): raise NotImplementedError def get_all_positions(self): raise NotImplementedError