from alpaca.common.exceptions import APIError from alpaca.trading.client import TradingClient from alpaca.trading.enums import OrderSide, TimeInForce from alpaca.trading.requests import ( GetAssetsRequest, LimitOrderRequest, MarketOrderRequest, ) from core.exchanges import BaseExchange from core.lib.schemas import alpaca_s ALPACA_SCHEMA_MAPPING = { "get_account": alpaca_s.GetAccount, "get_all_assets": alpaca_s.GetAllAssets, "get_all_positions": alpaca_s.GetAllPositions, "get_open_position": alpaca_s.GetOpenPosition, } class AlpacaExchange(BaseExchange): def set_schema(self): self.schema = ALPACA_SCHEMA_MAPPING def connect(self): self.client = TradingClient( self.account.api_key, self.account.api_secret, paper=self.account.sandbox, raw_data=True, ) def get_account(self): return self.call("get_account") def get_supported_assets(self): request = GetAssetsRequest(status="active", asset_class="crypto") success, assets = self.call("get_all_assets", filter=request) # assets = self.client.get_all_assets(filter=request) if not success: return (success, assets) assets = assets["itemlist"] asset_list = [x["symbol"] for x in assets if "symbol" in x] print("Supported symbols", asset_list) return (True, asset_list) def get_balance(self): success, account_info = self.call("get_account") if not success: return (success, account_info) equity = account_info["equity"] try: balance = float(equity) except ValueError: return (False, "Invalid balance") return (True, balance) def get_market_value(self, symbol): # TODO: pydantic try: position = self.client.get_position(symbol) except APIError as e: self.log.error(f"Could not get market value for {symbol}: {e}") return False return float(position["market_value"]) def post_trade(self, trade): # TODO: pydantic # the trade is not placed yet if trade.direction == "buy": direction = OrderSide.BUY elif trade.direction == "sell": direction = OrderSide.SELL else: raise Exception("Unknown direction") cast = { "symbol": trade.symbol, "side": direction, "time_in_force": TimeInForce.IOC, } if trade.amount is not None: cast["qty"] = trade.amount if trade.amount_usd is not None: cast["notional"] = trade.amount_usd if not trade.amount and not trade.amount_usd: return (False, "No amount specified") if trade.take_profit: cast["take_profit"] = {"limit_price": trade.take_profit} if trade.stop_loss: stop_limit_price = trade.stop_loss - (trade.stop_loss * 0.005) cast["stop_loss"] = { "stop_price": trade.stop_loss, "limit_price": stop_limit_price, } if trade.type == "market": market_order_data = MarketOrderRequest(**cast) try: order = self.client.submit_order(order_data=market_order_data) except APIError as e: self.log.error(f"Error placing market order: {e}") trade.status = "error" trade.save() return (False, e) elif trade.type == "limit": if not trade.price: return (False, "Limit order with no price") cast["limit_price"] = trade.price limit_order_data = LimitOrderRequest(**cast) try: order = self.client.submit_order(order_data=limit_order_data) except APIError as e: self.log.error(f"Error placing limit order: {e}") trade.status = "error" trade.save() return (False, e) else: raise Exception("Unknown trade type") trade.response = order trade.status = "posted" trade.order_id = order["id"] trade.client_order_id = order["client_order_id"] trade.save() return (True, order) def get_trade(self, trade_id): pass def update_trade(self, trade): pass def cancel_trade(self, trade_id): pass def get_position_info(self, symbol): success, position = self.call("get_open_position", symbol) if not success: return (success, position) return (True, position) def get_all_positions(self): items = [] success, response = self.call("get_all_positions") if not success: return (success, response) for item in response["itemlist"]: item["account"] = self.account.name item["account_id"] = self.account.id item["unrealized_pl"] = float(item["unrealized_pl"]) items.append(item) return (True, items)