You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
3.0 KiB
Python

from oandapyV20 import API
from oandapyV20.endpoints import accounts, orders, positions, trades
from pydantic import ValidationError
from core.exchanges import BaseExchange
from core.lib.schemas import oanda_s
OANDA_SCHEMA_MAPPING = {"OpenPositions": oanda_s.OpenPositions}
class OANDAExchange(BaseExchange):
def call(self, method, request):
self.client.request(request)
response = request.response
if isinstance(response, list):
response = {"itemlist": response}
if method not in self.schema:
self.log.error(f"Method cannot be validated: {method}")
self.log.debug(f"Response: {response}")
return (False, f"Method cannot be validated: {method}")
try:
# Return a dict of the validated response
response_valid = self.schema[method](**response).dict()
# Convert the response to a format that we can use
response_converted = self.convert_spec(response_valid, method)
return (True, response_converted)
except ValidationError as e:
self.log.error(f"Could not validate response: {e}")
return (False, e)
def set_schema(self):
self.schema = OANDA_SCHEMA_MAPPING
def connect(self):
self.client = API(access_token=self.account.api_secret)
self.account_id = self.account.api_key
def get_account(self):
r = accounts.AccountDetails(self.account_id)
self.client.request(r)
return r.response
def get_supported_assets(self):
return False
def get_balance(self):
raise NotImplementedError
def get_market_value(self, symbol):
raise NotImplementedError
def post_trade(self, trade):
raise NotImplementedError
r = orders.OrderCreate(accountID, data=data)
self.client.request(r)
return r.response
def get_trade(self, trade_id):
r = accounts.TradeDetails(accountID=self.account_id, tradeID=trade_id)
self.client.request(r)
return r.response
def update_trade(self, trade):
raise NotImplementedError
r = orders.OrderReplace(
accountID=self.account_id, orderID=trade.order_id, data=data
)
self.client.request(r)
return r.response
def cancel_trade(self, trade_id):
raise NotImplementedError
def get_position_info(self, symbol):
r = positions.PositionDetails(self.account_id, symbol)
self.client.request(r)
return r.response
def get_all_positions(self):
items = []
r = positions.OpenPositions(accountID=self.account_id)
success, response = self.call("OpenPositions", r)
if not success:
return (success, response)
print("Positions", response)
for item in response["itemlist"]:
item["account_id"] = self.account.id
item["unrealized_pl"] = float(item["unrealized_pl"])
items.append(item)
return (True, items)