from abc import ABC, abstractmethod from alpaca.common.exceptions import APIError from glom import glom from oandapyV20.exceptions import V20Error from pydantic.error_wrappers import ValidationError from core.lib import schemas from core.util import logs # Return error if the schema for the message type is not found STRICT_VALIDATION = False # Raise exception if the conversion schema is not found STRICT_CONVERSION = False # TODO: Set them to True when all message types are implemented log = logs.get_logger("exchanges") class NoSchema(Exception): """ Raised when: - The schema for the message type is not found - The conversion schema is not found - There is no schema library for the exchange """ pass class NoSuchMethod(Exception): """ Exchange library has no such method. """ pass class GenericAPIError(Exception): """ Generic API error. """ pass class ExchangeError(Exception): """ Exchange error. """ pass def is_camel_case(s): return s != s.lower() and s != s.upper() and "_" not in s def snake_to_camel(word): if is_camel_case(word): return word return "".join(x.capitalize() or "_" for x in word.split("_")) class BaseExchange(ABC): def __init__(self, account): name = self.__class__.__name__ self.name = name.replace("Exchange", "").lower() self.account = account self.client = None self.connect() @abstractmethod def connect(self): pass @property def schema(self): """ Get the schema library for the exchange. """ # Does the schemas library have a library for this exchange name? if hasattr(schemas, f"{self.name}_s"): schema_instance = getattr(schemas, f"{self.name}_s") else: log.error(f"No schema library for {self.name}") raise Exception(f"No schema library for exchange {self.name}") return schema_instance def get_schema(self, method, convert=False): if isinstance(method, str): to_camel = snake_to_camel(method) else: to_camel = snake_to_camel(method.__class__.__name__) if convert: to_camel = f"{to_camel}Schema" # if hasattr(self.schema, method): # schema = getattr(self.schema, method) if hasattr(self.schema, to_camel): schema = getattr(self.schema, to_camel) else: raise NoSchema(f"Could not get schema: {to_camel}") return schema def call_method(self, method, *args, **kwargs): """ Get a method from the exchange library. """ if hasattr(self.client, method): response = getattr(self.client, method)(*args, **kwargs) if isinstance(response, list): response = {"itemlist": response} return response else: raise NoSuchMethod def convert_spec(self, response, method): """ Convert an API response to the requested spec. :raises NoSchema: If the conversion schema is not found """ schema = self.get_schema(method, convert=True) # Use glom to convert the response to the schema converted = glom(response, schema) return converted def validate_response(self, response, method): schema = self.get_schema(method) # Return a dict of the validated response try: response_valid = schema(**response).dict() except ValidationError as e: log.error(f"Error validating {method} response: {response}") log.error(f"Errors: {e}") raise GenericAPIError("Error validating response") return response_valid def call(self, method, *args, **kwargs): """ Call the exchange API and validate the response :raises NoSchema: If the method is not in the schema mapping :raises ValidationError: If the response cannot be validated """ try: response = self.call_method(method, *args, **kwargs) except (APIError, V20Error) as e: log.error(f"Error calling method {method}: {e}") raise GenericAPIError(e) try: response_valid = self.validate_response(response, method) except NoSchema as e: log.error(f"{e} - {response}") response_valid = response # Convert the response to a format that we can use try: response_converted = self.convert_spec(response_valid, method) except NoSchema as e: log.error(f"{e} - {response}") response_converted = response_valid # return (True, response_converted) return response_converted # except Exception as e: # log.error(f"Error calling method: {e}") # raise GenericAPIError(e) @abstractmethod def get_account(self): pass def extract_instrument(self, instruments, instrument): for x in instruments["itemlist"]: if x["name"] == instrument: return x return None @abstractmethod def get_currencies(self, symbols): pass @abstractmethod def get_instruments(self): pass @abstractmethod def get_supported_assets(self): pass @abstractmethod def get_balance(self): pass @abstractmethod def get_market_value(self, symbol): pass @abstractmethod def post_trade(self, trade): pass @abstractmethod def close_trade(self, trade_id): pass @abstractmethod def get_trade(self, trade_id): pass @abstractmethod def update_trade(self, trade): pass @abstractmethod def cancel_trade(self, trade_id): pass @abstractmethod def get_position_info(self, symbol): pass @abstractmethod def get_all_positions(self): pass @abstractmethod def get_all_open_trades(self): pass @abstractmethod def close_position(self, side, symbol): pass @abstractmethod def close_all_positions(self): pass