You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
5.7 KiB
Python

from abc import ABC, abstractmethod
from alpaca.common.exceptions import APIError
from glom import glom
from oandapyV20.exceptions import V20Error
from core.lib import schemas
from core.util import logs
# Return error if the schema for the message type is not found
STRICT_VALIDATION = False
# Raise exception if the conversion schema is not found
STRICT_CONVERSION = False
# TODO: Set them to True when all message types are implemented
log = logs.get_logger("exchanges")
class NoSchema(Exception):
"""
Raised when:
- The schema for the message type is not found
- The conversion schema is not found
- There is no schema library for the exchange
"""
pass
class NoSuchMethod(Exception):
"""
Exchange library has no such method.
"""
pass
class GenericAPIError(Exception):
"""
Generic API error.
"""
pass
class ExchangeError(Exception):
"""
Exchange error.
"""
pass
def is_camel_case(s):
return s != s.lower() and s != s.upper() and "_" not in s
def snake_to_camel(word):
if is_camel_case(word):
return word
return "".join(x.capitalize() or "_" for x in word.split("_"))
class BaseExchange(ABC):
def __init__(self, account):
name = self.__class__.__name__
self.name = name.replace("Exchange", "").lower()
self.account = account
self.client = None
self.connect()
@abstractmethod
def connect(self):
pass
@property
def schema(self):
"""
Get the schema library for the exchange.
"""
# Does the schemas library have a library for this exchange name?
if hasattr(schemas, f"{self.name}_s"):
schema_instance = getattr(schemas, f"{self.name}_s")
else:
log.error(f"No schema library for {self.name}")
raise Exception(f"No schema library for exchange {self.name}")
return schema_instance
def get_schema(self, method, convert=False):
if isinstance(method, str):
to_camel = snake_to_camel(method)
else:
to_camel = snake_to_camel(method.__class__.__name__)
if convert:
to_camel = f"{to_camel}Schema"
# if hasattr(self.schema, method):
# schema = getattr(self.schema, method)
if hasattr(self.schema, to_camel):
schema = getattr(self.schema, to_camel)
else:
raise NoSchema(f"Could not get schema: {to_camel}")
return schema
def call_method(self, method, *args, **kwargs):
"""
Get a method from the exchange library.
"""
if hasattr(self.client, method):
response = getattr(self.client, method)(*args, **kwargs)
if isinstance(response, list):
response = {"itemlist": response}
return response
else:
raise NoSuchMethod
def convert_spec(self, response, method):
"""
Convert an API response to the requested spec.
:raises NoSchema: If the conversion schema is not found
"""
schema = self.get_schema(method, convert=True)
# Use glom to convert the response to the schema
converted = glom(response, schema)
return converted
def validate_response(self, response, method):
schema = self.get_schema(method)
# Return a dict of the validated response
response_valid = schema(**response).dict()
return response_valid
def call(self, method, *args, **kwargs):
"""
Call the exchange API and validate the response
:raises NoSchema: If the method is not in the schema mapping
:raises ValidationError: If the response cannot be validated
"""
try:
response = self.call_method(method, *args, **kwargs)
except (APIError, V20Error) as e:
log.error(f"Error calling method {method}: {e}")
raise GenericAPIError(e)
try:
response_valid = self.validate_response(response, method)
except NoSchema as e:
log.error(f"{e} - {response}")
response_valid = response
# Convert the response to a format that we can use
try:
response_converted = self.convert_spec(response_valid, method)
except NoSchema as e:
log.error(f"{e} - {response}")
response_converted = response_valid
# return (True, response_converted)
return response_converted
# except Exception as e:
# log.error(f"Error calling method: {e}")
# raise GenericAPIError(e)
@abstractmethod
def get_account(self):
pass
def extract_instrument(self, instruments, instrument):
for x in instruments["itemlist"]:
if x["name"] == instrument:
return x
return None
@abstractmethod
def get_currencies(self, symbols):
pass
@abstractmethod
def get_instruments(self):
pass
@abstractmethod
def get_supported_assets(self):
pass
@abstractmethod
def get_balance(self):
pass
@abstractmethod
def get_market_value(self, symbol):
pass
@abstractmethod
def post_trade(self, trade):
pass
@abstractmethod
def get_trade(self, trade_id):
pass
@abstractmethod
def update_trade(self, trade):
pass
@abstractmethod
def cancel_trade(self, trade_id):
pass
@abstractmethod
def get_position_info(self, symbol):
pass
@abstractmethod
def get_all_positions(self):
pass
@abstractmethod
def close_position(self, side, symbol):
pass
@abstractmethod
def close_all_positions(self):
pass