fisk/core/lib/market.py

641 lines
22 KiB
Python

from datetime import datetime
from decimal import Decimal as D
from core.exchanges import GenericAPIError
from core.lib.notify import sendmsg
from core.models import Account, Strategy, Trade
from core.util import logs
log = logs.get_logger(__name__)
def check_existing_position(
func: str,
position: dict,
open_side: str,
open_symbol: str,
open_units: str,
new_side: str,
new_symbol: str,
trade_side_opposite: str,
):
# Check if we already have a position for the symbol
if open_symbol == new_symbol:
# If the live side is the inverse of what we want to do,
# we can't open a position
if open_side == trade_side_opposite:
# If there is a position open, we can't open a new one in the opposite
# direction
if open_units != "0":
# If we have a short on GBP/AUD, we can only place more shorts on
# GBP/AUD.
if func == "entry":
log.debug(
f"Refusing to open new {new_side} position on {new_symbol} due "
f"to {open_side} position on {open_symbol}"
)
return {
"action": "rejected",
"positions": position,
}
elif func == "exit":
log.debug(
(
f"Found {open_units} units of "
f"{open_symbol} on side {trade_side_opposite}"
)
)
# Pass back opposing side so we can close it
return {
"action": "close",
"side": trade_side_opposite,
"positions": position,
}
return False
def check_conflicting_position(
func: str,
position: dict,
open_base: str,
open_quote: str,
open_side: str,
open_symbol: str,
open_units: str,
new_base: str,
new_quote: str,
new_side: str,
new_symbol: str,
trade_side_opposite: str,
):
if open_base == new_quote or open_quote == new_base:
# If we have a long on GBP/AUD, we can only place shorts on XAU/GBP.
if open_side != trade_side_opposite:
if open_units != "0":
# Only do this for entries
if func == "entry":
log.debug(
f"Refusing to open {new_side} position on {new_symbol} due to "
f"{open_side} position on {open_symbol}"
)
return {
"action": "rejected",
"positions": position,
}
def crossfilter(account, new_symbol, new_direction, func):
"""
Determine if we are betting against ourselves.
Checks open positions for the account, rejecting the trade if there is one
with an opposite direction to this one.
:param account: Account object
:param symbol: Symbol
:param direction: Direction of the trade
:param func: Whether we are checking entries or exits
:return: dict of action and opposing position, or False
"""
try:
# Only get the data we need
if func == "entry":
all_positions = account.client.get_all_positions()
else:
all_positions = [account.client.get_position_info(new_symbol)]
except GenericAPIError as e:
if "No position exists for the specified instrument" in str(e):
log.debug("No position exists for this symbol")
return False
else:
log.error(f"Error getting position info: {e}")
return None
if new_direction == "buy":
opposing_side = "short"
new_side = "long"
elif new_direction == "sell":
opposing_side = "long"
new_side = "short"
quotes = []
new_base, new_quote = new_symbol.split("_")
for position in all_positions:
# For Forex, get a list of all the quotes.
# This is to prevent betting against ourselves.
# Consider we have a long position open, EUR/USD, and we want to open a
# long position on USD/JPY. If the first goes up, the second one will go
# down just as much. We won't make any money.
if "_" in position["symbol"]:
open_base, open_quote = position["symbol"].split("_")
quotes.append(open_quote)
open_symbol = position["symbol"]
open_side = position["side"]
open_base, open_quote = open_symbol.split("_")
# Check if we already have a position
existing_position_check = check_existing_position(
func=func,
position=position,
open_side=open_side,
open_symbol=open_symbol,
open_units=position["units"],
new_side=new_side,
new_symbol=new_symbol,
trade_side_opposite=opposing_side,
)
if existing_position_check:
return existing_position_check
# Check if we are betting against ourselves
conflicting_position_check = check_conflicting_position(
func=func,
position=position,
open_base=open_base,
open_quote=open_quote,
open_side=open_side,
open_symbol=open_symbol,
open_units=position["units"],
new_base=new_base,
new_quote=new_quote,
new_side=new_side,
new_symbol=new_symbol,
trade_side_opposite=opposing_side,
)
if conflicting_position_check:
return conflicting_position_check
return False
def get_pair(account, base, quote, invert=False):
"""
Get the pair for the given account and currencies.
:param account: Account object
:param base: Base currency
:param quote: Quote currency
:param invert: Invert the pair
:return: currency symbol, e.g. BTC_USD, BTC/USD, etc.
"""
# Currently we only have two exchanges with different pair separators
if account.exchange == "alpaca":
separator = "/"
elif account.exchange == "oanda":
separator = "_"
# Flip the pair if needed
if invert:
symbol = f"{quote.upper()}{separator}{base.upper()}"
else:
symbol = f"{base.upper()}{separator}{quote.upper()}"
# Check it exists
if symbol not in account.supported_symbols:
return False
return symbol
def to_currency(direction, account, amount, from_currency, to_currency):
"""
Convert an amount from one currency to another.
:param direction: Direction of the trade
:param account: Account object
:param amount: Amount to convert
:param from_currency: Currency to convert from
:param to_currency: Currency to convert to
:return: Converted amount
"""
inverted = False
# This is needed because OANDA has different values for bid and ask
if direction == "buy":
price_index = "bids"
elif direction == "sell":
price_index = "asks"
symbol = get_pair(account, from_currency, to_currency)
if not symbol:
symbol = get_pair(account, from_currency, to_currency, invert=True)
inverted = True
# Bit of a hack but it works
if not symbol:
log.error(f"Could not find symbol for {from_currency} -> {to_currency}")
raise Exception("Could not find symbol")
try:
prices = account.client.get_currencies([symbol])
except GenericAPIError as e:
log.error(f"Error getting currencies and inverted currencies: {e}")
return None
price = D(prices["prices"][0][price_index][0]["price"])
# If we had to flip base and quote, we need to use the reciprocal of the price
if inverted:
price = D(1.0) / price
# Convert the amount to the destination currency
converted = D(amount) * price
return converted
def get_price(account, direction, symbol):
"""
Get the price for a given symbol.
:param account: Account object
:param direction: direction of the trade
:param symbol: symbol
:return: price of bid for buys, price of ask for sells
"""
if direction == "buy":
price_index = "bids"
elif direction == "sell":
price_index = "asks"
try:
prices = account.client.get_currencies([symbol])
except GenericAPIError as e:
log.error(f"Error getting currencies: {e}")
return None
price = D(prices["prices"][0][price_index][0]["price"])
return price
def get_trade_size_in_base(direction, account, strategy, cash_balance, base):
"""
Get the trade size in the base currency.
:param direction: Direction of the trade
:param account: Account object
:param strategy: Strategy object
:param cash_balance: Cash balance in the Account's base currency
:param base: Base currency
:return: Trade size in the base currency
"""
# Convert the trade size in percent to a ratio
trade_size_as_ratio = D(strategy.trade_size_percent) / D(100)
log.debug(f"Trade size as ratio: {trade_size_as_ratio}")
# Multiply with cash balance to get the trade size in the account's
# base currency
amount_fiat = D(trade_size_as_ratio) * D(cash_balance)
log.debug(f"Trade size: {amount_fiat}")
# Convert the trade size to the base currency
if account.currency.lower() == base.lower():
trade_size_in_base = amount_fiat
else:
trade_size_in_base = to_currency(
direction, account, amount_fiat, account.currency, base
)
log.debug(f"Trade size in base: {trade_size_in_base}")
return trade_size_in_base
def get_tp(direction, take_profit_percent, price):
"""
Get the take profit price.
:param direction: Direction of the trade
:param strategy: Strategy object
:param price: Entry price
"""
# Convert to ratio
take_profit_as_ratio = D(take_profit_percent) / D(100)
log.debug(f"Take profit as ratio: {take_profit_as_ratio}")
take_profit_var = D(price) * D(take_profit_as_ratio)
log.debug(f"Take profit var: {take_profit_var}")
if direction == "buy":
take_profit = D(price) + D(take_profit_var)
elif direction == "sell":
take_profit = D(price) - D(take_profit_var)
log.debug(f"Take profit: {take_profit}")
return take_profit
def get_sl(direction, stop_loss_percent, price, return_var=False):
"""
Get the stop loss price.
Also used for trailing stop loss.
:param direction: Direction of the trade
:param strategy: Strategy object
:param price: Entry price
"""
# Convert to ratio
stop_loss_as_ratio = D(stop_loss_percent) / D(100)
log.debug(f"Stop loss as ratio: {stop_loss_as_ratio}")
stop_loss_var = D(price) * D(stop_loss_as_ratio)
log.debug(f"Stop loss var: {stop_loss_var}")
if return_var:
return stop_loss_var
if direction == "buy":
stop_loss = D(price) - D(stop_loss_var)
elif direction == "sell":
stop_loss = D(price) + D(stop_loss_var)
log.debug(f"Stop loss: {stop_loss}")
return stop_loss
def get_tp_sl(direction, strategy, price):
"""
Get the take profit and stop loss prices.
:param direction: Direction of the trade
:param strategy: Strategy object
:param price: Price of the trade
:return: Take profit and stop loss prices
"""
take_profit = get_tp(direction, strategy.take_profit_percent, price)
stop_loss = get_sl(direction, strategy.stop_loss_percent, price)
cast = {"tp": take_profit, "sl": stop_loss}
# Look up the TSL if required by the strategy
if strategy.trailing_stop_loss_percent:
trailing_stop_loss = get_sl(
direction, strategy.trailing_stop_loss_percent, price, return_var=True
)
cast["tsl"] = trailing_stop_loss
return cast
def get_price_bound(direction, strategy, price, current_price):
"""
Get the price bound for a given price using the slippage from the strategy.
* Check that the price of the callback is within the callback price deviation of the
current price
* Calculate the price bounds such that the maximum slippage should be within the
price slippage relative to the current price.
Note that the maximum actual slippage may be as high as the sum of these two values.
:param direction: Direction of the trade
:param strategy: Strategy object
:param price: Price of the trade
:param current_price: current price from the exchange
:return: Price bound
"""
# Convert the callback price deviation to a ratio
callback_price_deviation_as_ratio = D(
strategy.callback_price_deviation_percent
) / D(100)
log.debug(f"Callback price deviation as ratio: {callback_price_deviation_as_ratio}")
maximum_price_deviation = D(current_price) * D(callback_price_deviation_as_ratio)
# Ensure the current price is within price_slippage_as_ratio of the callback price
if abs(current_price - price) <= maximum_price_deviation:
log.debug("Current price is within price deviation of callback price")
else:
log.error("Current price is not within price deviation of callback price")
log.debug(f"Difference: {abs(current_price - price)}")
return None
# Convert the maximum price slippage to a ratio
price_slippage_as_ratio = D(strategy.price_slippage_percent) / D(100)
log.debug(f"Maximum price slippage as ratio: {price_slippage_as_ratio}")
# Calculate the price bound by multiplying with the price
# The price bound is the worst price we are willing to pay for the trade
price_slippage = D(current_price) * D(price_slippage_as_ratio)
log.debug(f"Maximum deviation from callback price: {price_slippage}")
current_price_slippage = D(current_price) * D(price_slippage_as_ratio)
log.debug(f"Maximum deviation from current price: {current_price_slippage}")
# Price bound is the worst price we are willing to pay for the trade
# For buys, a higher price is worse
if direction == "buy":
price_bound = D(current_price) + D(price_slippage)
# For sells, a lower price is worse
elif direction == "sell":
price_bound = D(current_price) - D(price_slippage)
log.debug(f"Price bound: {price_bound}")
return price_bound
def execute_strategy(callback, strategy, func):
"""
Execute a strategy.
:param callback: Callback object
:param strategy: Strategy object
"""
# Only check times for entries. We can always exit trades and set trends.
if func == "entry":
# Check if we can trade now!
now_utc = datetime.utcnow()
trading_times = strategy.trading_times.all()
if not trading_times:
log.error("No trading times set for strategy")
return
matches = [x.within_range(now_utc) for x in trading_times]
if not any(matches):
log.debug("Not within trading time range")
return
# Instruments supported by the account
if not strategy.account.instruments:
strategy.account.update_info()
# Refresh account object
strategy.account = Account.objects.get(id=strategy.account.id)
instruments = strategy.account.instruments
if not instruments:
log.error("No instruments found")
return
# Shorten some hook, strategy and callback vars for convenience
user = strategy.user
account = strategy.account
hook = callback.hook
signal = callback.signal
base = callback.base
quote = callback.quote
direction = signal.direction
# Don't be silly
if callback.exchange != account.exchange:
log.error("Market exchange differs from account exchange.")
return
# Get the pair we are trading
symbol = get_pair(account, base, quote)
if not symbol:
log.error(f"Symbol not supported by account: {symbol}")
return
# Extract the information for the symbol
instrument = strategy.account.client.extract_instrument(instruments, symbol)
if not instrument:
log.error(f"Symbol not found: {symbol}")
return
# Get the required precision
try:
trade_precision = instrument["tradeUnitsPrecision"]
display_precision = instrument["displayPrecision"]
except KeyError:
log.error(f"Precision not found for {symbol}")
return
# Round the received price to the display precision
price = round(D(callback.price), display_precision)
log.debug(f"Extracted price of quote: {price}")
current_price = get_price(account, direction, symbol)
log.debug(f"Callback price: {price}")
log.debug(f"Current price: {current_price}")
# Calculate price bound and round to the display precision
price_bound = get_price_bound(direction, strategy, price, current_price)
if not price_bound:
return
price_bound = round(price_bound, display_precision)
# Callback now verified
if func == "exit":
check_exit = crossfilter(account, symbol, direction, func)
if check_exit is None:
return
if not check_exit:
log.debug("Exit conditions not met.")
return
if check_exit["action"] == "close":
log.debug(f"Closing position on exit signal: {symbol}")
side = check_exit["side"]
response = account.client.close_position(side, symbol)
log.debug(f"Close position response: {response}")
return
# Set the trend
elif func == "trend":
if strategy.trends is None:
strategy.trends = {}
strategy.trends[symbol] = direction
strategy.save()
log.debug(f"Set trend for {symbol}: {direction}")
return
# Check if we are trading against the trend
if strategy.trend_signals.exists():
if strategy.trends is None:
log.debug("Refusing to trade with no trend signals received")
return
if symbol not in strategy.trends:
log.debug("Refusing to trade asset without established trend")
return
else:
if strategy.trends[symbol] != direction:
log.debug("Refusing to trade against the trend")
return
else:
log.debug(f"Trend check passed for {symbol} - {direction}")
type = strategy.order_type
# Get the account's balance in the native account currency
cash_balance = strategy.account.client.get_balance()
log.debug(f"Cash balance: {cash_balance}")
# Convert the trade size, which is currently in the account's base currency,
# to the base currency of the pair we are trading
trade_size_in_base = get_trade_size_in_base(
direction, account, strategy, cash_balance, base
)
# Calculate TP/SL/TSL
protection = get_tp_sl(direction, strategy, current_price)
stop_loss = protection["sl"]
take_profit = protection["tp"]
trailing_stop_loss = None
if "tsl" in protection:
trailing_stop_loss = protection["tsl"]
# Create object, note that the amount is rounded to the trade precision
amount_rounded = float(round(trade_size_in_base, trade_precision))
new_trade = Trade.objects.create(
user=user,
account=account,
hook=hook,
signal=signal,
symbol=symbol,
type=type,
time_in_force=strategy.time_in_force,
# amount_fiat=amount_fiat,
amount=amount_rounded,
# price=price_bound,
price=price_bound,
stop_loss=float(round(stop_loss, display_precision)),
take_profit=float(round(take_profit, display_precision)),
direction=direction,
)
# Add TSL if applicable
if trailing_stop_loss:
new_trade.trailing_stop_loss = float(
round(trailing_stop_loss, display_precision)
)
new_trade.save()
# Run the crossfilter to ensure we don't trade the same pair in opposite directions
filtered = crossfilter(account, symbol, direction, func)
# TP/SL calculation and get_trade_size_in_base are wasted here, but it's important
# to record the decision in the Trade object. We can only get it after we do those.
# It shows what would be done.
if filtered:
log.debug(f"Trade filtered. Action: {filtered['action']}")
if filtered["action"] == "rejected":
new_trade.status = "rejected"
new_trade.save()
else:
info = new_trade.post()
log.debug(f"Posted trade: {info}")
# Send notification with limited number of fields
wanted_fields = ["requestID", "type", "symbol", "units", "reason"]
sendmsg(
user,
", ".join([str(v) for k, v in info.items() if k in wanted_fields]),
title=f"{direction} {amount_rounded} on {symbol}",
)
def process_callback(callback):
log.info(f"Received callback for {callback.hook} - {callback.signal}")
# Scan for trend
log.debug("Scanning for trend strategies...")
strategies = Strategy.objects.filter(trend_signals=callback.signal, enabled=True)
log.debug(f"Matched strategies: {strategies}")
for strategy in strategies:
log.debug(f"Executing strategy {strategy}")
if callback.hook.user != strategy.user:
log.error("Ownership differs between callback and strategy.")
continue
execute_strategy(callback, strategy, func="trend")
# Scan for entry
log.debug("Scanning for entry strategies...")
strategies = Strategy.objects.filter(entry_signals=callback.signal, enabled=True)
log.debug(f"Matched strategies: {strategies}")
for strategy in strategies:
log.debug(f"Executing strategy {strategy}")
if callback.hook.user != strategy.user:
log.error("Ownership differs between callback and strategy.")
continue
execute_strategy(callback, strategy, func="entry")
# Scan for exit
log.debug("Scanning for exit strategies...")
strategies = Strategy.objects.filter(exit_signals=callback.signal, enabled=True)
log.debug(f"Matched strategies: {strategies}")
for strategy in strategies:
log.debug(f"Executing strategy {strategy}")
if callback.hook.user != strategy.user:
log.error("Ownership differs between callback and strategy.")
continue
execute_strategy(callback, strategy, func="exit")