486 lines
18 KiB
Python
486 lines
18 KiB
Python
from copy import deepcopy
|
|
from datetime import datetime
|
|
from decimal import Decimal as D
|
|
|
|
from core.exchanges.convert import (
|
|
annotate_trade_tp_sl_percent,
|
|
convert_trades,
|
|
sl_percent_to_price,
|
|
tp_percent_to_price,
|
|
)
|
|
from core.lib.notify import sendmsg
|
|
from core.trading import assetfilter, checks, market, risk
|
|
from core.trading.crossfilter import crossfilter
|
|
from core.trading.market import get_base_quote, get_trade_size_in_base
|
|
from core.util import logs
|
|
|
|
log = logs.get_logger("ams")
|
|
|
|
|
|
class TradeClosed(Exception):
|
|
pass
|
|
|
|
|
|
class ActiveManagement(object):
|
|
def __init__(self, strategy):
|
|
self.strategy = strategy
|
|
self.policy = strategy.active_management_policy
|
|
|
|
self.trades = []
|
|
self.actions = {}
|
|
self.balance = None
|
|
self.balance_usd = None
|
|
|
|
def add_action(self, action, check_type, trade_id, **kwargs):
|
|
if action not in self.actions:
|
|
self.actions[action] = []
|
|
self.actions[action].append(
|
|
{"id": trade_id, "check": check_type, "extra": kwargs}
|
|
)
|
|
|
|
def reduce_actions(self):
|
|
"""
|
|
If a trade is in the close actions, remove it from adjust.
|
|
"""
|
|
if "close" in self.actions:
|
|
for close_action in self.actions["close"]:
|
|
if "adjust" in self.actions:
|
|
self.actions["adjust"] = [
|
|
action
|
|
for action in self.actions["adjust"]
|
|
if action["id"] != close_action["id"]
|
|
]
|
|
|
|
def get_trades(self):
|
|
if not self.trades:
|
|
self.trades = self.strategy.account.client.get_all_open_trades()
|
|
return self.trades
|
|
|
|
def get_balance(self, return_usd=False):
|
|
if return_usd:
|
|
if self.balance_usd is None:
|
|
self.balance_usd = self.strategy.account.client.get_balance(
|
|
return_usd=True
|
|
)
|
|
return self.balance_usd
|
|
else:
|
|
return self.balance_usd
|
|
else:
|
|
if self.balance is None:
|
|
self.balance = self.strategy.account.client.get_balance(
|
|
return_usd=False
|
|
)
|
|
return self.balance
|
|
else:
|
|
return self.balance
|
|
|
|
def close_trade(self, trade_id):
|
|
self.strategy.account.client.close_trade(trade_id)
|
|
|
|
def bulk_close_trades(self, trade_ids):
|
|
for trade_id in trade_ids:
|
|
self.close_trade(trade_id)
|
|
|
|
def bulk_notify(self, action, action_cast_list):
|
|
msg = ""
|
|
for action_cast in action_cast_list:
|
|
msg += f"ACTION: '{action}' on trade ID '{action_cast['id']}'\n"
|
|
msg += f"VIOLATION: '{action_cast['check']}'\n"
|
|
if action_cast["extra"]:
|
|
extra = action_cast["extra"]
|
|
extra = ", ".join([f"{k}: {v}" for k, v in extra.items()])
|
|
msg += f"EXTRA: {extra}\n"
|
|
msg += "=========\n"
|
|
|
|
sendmsg(self.strategy.user, msg, title=f"AMS: {action}")
|
|
|
|
def adjust_position_size(self, trade_id, new_size):
|
|
# Get old size
|
|
old_size = None
|
|
for trade in self.trades:
|
|
if trade["id"] == trade_id:
|
|
old_size = D(trade["currentUnits"])
|
|
symbol = trade["symbol"]
|
|
break
|
|
if old_size is None:
|
|
log.error(f"Could not find trade ID {trade_id} in active management")
|
|
return
|
|
|
|
# Reduce only
|
|
assert old_size > new_size
|
|
difference = old_size - new_size
|
|
|
|
# Close the difference
|
|
self.strategy.account.client.close_trade(trade_id, difference, symbol)
|
|
|
|
def adjust_protection(self, trade_id, new_protection):
|
|
pass # TODO
|
|
|
|
def bulk_adjust(self, action_cast_list):
|
|
for item in action_cast_list:
|
|
trade_id = item["id"]
|
|
check = item["check"]
|
|
if "extra" in item:
|
|
extra = item["extra"]
|
|
else:
|
|
log.error(f"Adjust action missing extra data: {item}")
|
|
continue
|
|
if check == "position_size":
|
|
new_size = extra["size"]
|
|
self.adjust_position_size(trade_id, new_size)
|
|
elif check == "protection":
|
|
self.adjust_protection(trade_id, extra)
|
|
|
|
def run_actions(self):
|
|
for action, action_cast_list in self.actions.items():
|
|
if action == "none":
|
|
continue
|
|
self.bulk_notify(action, action_cast_list)
|
|
if action == "close":
|
|
trade_ids = [action_cast["id"] for action_cast in action_cast_list]
|
|
self.bulk_close_trades(trade_ids)
|
|
elif action == "adjust":
|
|
self.bulk_adjust(action_cast_list)
|
|
|
|
def handle_violation(self, check_type, action, trade_id, **kwargs):
|
|
if action == "none":
|
|
return
|
|
self.add_action(action, check_type, trade_id, **kwargs)
|
|
|
|
def check_trading_time(self, trade):
|
|
open_ts = trade["open_time"]
|
|
open_ts_as_date = datetime.strptime(open_ts, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
trading_time_pass = checks.within_trading_times(self.strategy, open_ts_as_date)
|
|
if not trading_time_pass:
|
|
self.handle_violation(
|
|
"trading_time", self.policy.when_trading_time_violated, trade["id"]
|
|
)
|
|
if self.policy.when_trading_time_violated == "close":
|
|
raise TradeClosed
|
|
|
|
def check_trends(self, trade):
|
|
direction = trade["direction"]
|
|
symbol = trade["symbol"]
|
|
trends_pass = checks.within_trends(self.strategy, symbol, direction)
|
|
if not trends_pass:
|
|
self.handle_violation(
|
|
"trends", self.policy.when_trends_violated, trade["id"]
|
|
)
|
|
if self.policy.when_trends_violated == "close":
|
|
raise TradeClosed
|
|
|
|
def check_position_size(self, trade):
|
|
"""
|
|
Check the position size is within the allowed deviation.
|
|
WARNING: This uses the current balance, not the balance at the time of the
|
|
trade.
|
|
WARNING: This uses the current symbol prices, not those at the time of the
|
|
trade.
|
|
This should normally be run every 5 seconds, so this is fine.
|
|
"""
|
|
# TODO: add the trade value to the balance
|
|
# Need to determine which prices to use
|
|
balance = self.get_balance()
|
|
direction = trade["direction"]
|
|
symbol = trade["symbol"]
|
|
# TODO:
|
|
base, quote = get_base_quote(self.strategy.account.exchange, symbol)
|
|
expected_trade_size = get_trade_size_in_base(
|
|
direction, self.strategy.account, self.strategy, balance, base
|
|
)
|
|
|
|
deviation = D(0.05) # 5%
|
|
actual_trade_size = D(trade["amount"])
|
|
# Ensure the trade size not above the expected trade size by more than 5%
|
|
max_trade_size = expected_trade_size + (deviation * expected_trade_size)
|
|
within_max_trade_size = actual_trade_size <= max_trade_size
|
|
|
|
if not within_max_trade_size:
|
|
self.handle_violation(
|
|
"position_size",
|
|
self.policy.when_position_size_violated,
|
|
trade["id"],
|
|
size=expected_trade_size,
|
|
)
|
|
if self.policy.when_position_size_violated == "close":
|
|
raise TradeClosed
|
|
elif self.policy.when_position_size_violated == "adjust":
|
|
trade["amount"] = expected_trade_size
|
|
trade["units"] = expected_trade_size
|
|
|
|
def check_protection(self, trade):
|
|
deviation = D(0.05) # 5%
|
|
|
|
# fmt: off
|
|
matches = {
|
|
"stop_loss_percent": self.strategy.order_settings.stop_loss_percent,
|
|
"take_profit_percent": self.strategy.order_settings.take_profit_percent,
|
|
"trailing_stop_percent":
|
|
self.strategy.order_settings.trailing_stop_loss_percent,
|
|
}
|
|
|
|
violations = {}
|
|
|
|
for key, expected in matches.items():
|
|
if expected == 0:
|
|
continue
|
|
if key in trade:
|
|
actual = D(trade[key])
|
|
expected = D(expected)
|
|
min_val = expected - (deviation * expected)
|
|
max_val = expected + (deviation * expected)
|
|
within_deviation = min_val <= actual <= max_val
|
|
else:
|
|
within_deviation = False
|
|
|
|
if not within_deviation:
|
|
# violations[key] = expected
|
|
if key == "take_profit_percent":
|
|
tp_price = tp_percent_to_price(
|
|
expected,
|
|
trade["side"],
|
|
trade["current_price"],
|
|
trade["amount"],
|
|
trade["pl"],
|
|
)
|
|
violations["take_profit_price"] = tp_price
|
|
elif key == "stop_loss_percent":
|
|
sl_price = sl_percent_to_price(
|
|
expected,
|
|
trade["side"],
|
|
trade["current_price"],
|
|
trade["amount"],
|
|
trade["pl"],
|
|
)
|
|
violations["stop_loss_price"] = sl_price
|
|
elif key == "trailing_stop_loss_percent":
|
|
tsl_price = sl_percent_to_price(
|
|
expected,
|
|
trade["side"],
|
|
trade["current_price"],
|
|
trade["amount"],
|
|
trade["pl"],
|
|
)
|
|
violations["trailing_stop_loss_price"] = tsl_price
|
|
|
|
if violations:
|
|
self.handle_violation(
|
|
"protection",
|
|
self.policy.when_protection_violated,
|
|
trade["id"],
|
|
**violations
|
|
)
|
|
if self.policy.when_protection_violated == "close":
|
|
raise TradeClosed
|
|
elif self.policy.when_protection_violated == "adjust":
|
|
trade.update(violations)
|
|
annotate_trade_tp_sl_percent(trade)
|
|
market.convert_trades_to_usd(self.strategy.account, [trade])
|
|
|
|
def check_asset_groups(self, trade):
|
|
if self.strategy.asset_group is not None:
|
|
base, quote = get_base_quote(
|
|
self.strategy.account.exchange, trade["symbol"]
|
|
)
|
|
allowed = assetfilter.get_allowed(
|
|
self.strategy.asset_group, base, quote, trade["side"]
|
|
)
|
|
if not allowed:
|
|
self.handle_violation(
|
|
"asset_group", self.policy.when_asset_groups_violated, trade["id"]
|
|
)
|
|
if self.policy.when_asset_groups_violated == "close":
|
|
raise TradeClosed
|
|
|
|
def get_sorted_trades_copy(self, trades, reverse=True):
|
|
trades_copy = deepcopy(trades)
|
|
# sort by open time, newest first
|
|
trades_copy.sort(
|
|
key=lambda x: datetime.strptime(x["open_time"], "%Y-%m-%dT%H:%M:%S.%fZ"),
|
|
reverse=reverse,
|
|
)
|
|
return trades_copy
|
|
|
|
def check_crossfilter(self, trades):
|
|
close_trades = []
|
|
|
|
# trades_copy = self.get_sorted_trades_copy(trades)
|
|
|
|
iterations = 0
|
|
finished = []
|
|
# Recursively run crossfilter on the newest-first list until we have no more
|
|
# conflicts
|
|
length_before = len(trades)
|
|
while not len(finished) == length_before:
|
|
iterations += 1
|
|
if iterations > 10000:
|
|
raise Exception("Too many iterations")
|
|
# For each trade
|
|
# We need reverse because we are removing items from the list
|
|
# This works in our favour because the list is sorted the wrong
|
|
# way around in run_checks()
|
|
for trade in reversed(trades):
|
|
# Abort if we've already checked this trade
|
|
if trade in close_trades:
|
|
continue
|
|
# Calculate trades excluding this one
|
|
# Also remove if we have already checked this
|
|
others = [
|
|
t
|
|
for t in trades
|
|
if t["id"] != trade["id"] and t not in close_trades
|
|
]
|
|
symbol = trade["symbol"]
|
|
direction = trade["direction"]
|
|
func = "entry"
|
|
|
|
# Check if this trade is filtered, pretending we are opening it
|
|
# And passing the remaining trades as the other trades in the account
|
|
filtered = crossfilter(
|
|
self.strategy.account, symbol, direction, func, all_positions=others
|
|
)
|
|
if not filtered:
|
|
# This trade is fine, add it to finished
|
|
finished.append(trade)
|
|
continue
|
|
if filtered["action"] == "rejected":
|
|
# It's rejected, add it to the close trades list
|
|
# And don't check it again
|
|
finished.append(trade)
|
|
close_trades.append(trade)
|
|
|
|
# Remove it from the trades list
|
|
if self.policy.when_crossfilter_violated == "close":
|
|
trades.remove(trade)
|
|
if not close_trades:
|
|
return
|
|
|
|
if close_trades:
|
|
for trade in close_trades:
|
|
self.handle_violation(
|
|
"crossfilter", self.policy.when_crossfilter_violated, trade["id"]
|
|
)
|
|
|
|
def check_max_open_trades(self, trades):
|
|
if self.strategy.risk_model is None:
|
|
return
|
|
max_open_pass = risk.check_max_open_trades(self.strategy.risk_model, trades)
|
|
if not max_open_pass:
|
|
# trades_copy = self.get_sorted_trades_copy(trades, reverse=False)
|
|
# fmt: off
|
|
trades_over_limit = trades[self.strategy.risk_model.max_open_trades:]
|
|
for trade in trades_over_limit:
|
|
self.handle_violation(
|
|
"max_open_trades",
|
|
self.policy.when_max_open_trades_violated,
|
|
trade["id"],
|
|
)
|
|
if self.policy.when_max_open_trades_violated == "close":
|
|
trades.remove(trade)
|
|
|
|
def check_max_open_trades_per_symbol(self, trades):
|
|
if self.strategy.risk_model is None:
|
|
return
|
|
max_open_pass = risk.check_max_open_trades_per_symbol(
|
|
self.strategy.risk_model, trades, return_symbols=True
|
|
)
|
|
max_open_pass = list(max_open_pass)
|
|
if max_open_pass:
|
|
# trades_copy = self.get_sorted_trades_copy(trades, reverse=False)
|
|
trades_over_limit = []
|
|
for symbol in max_open_pass:
|
|
symbol_trades = [x for x in trades if x["symbol"] == symbol]
|
|
# fmt: off
|
|
exceeding_limit = symbol_trades[
|
|
self.strategy.risk_model.max_open_trades_per_symbol:
|
|
]
|
|
for x in exceeding_limit:
|
|
trades_over_limit.append(x)
|
|
|
|
for trade in trades_over_limit:
|
|
self.handle_violation(
|
|
"max_open_trades_per_symbol",
|
|
self.policy.when_max_open_trades_violated,
|
|
trade["id"],
|
|
)
|
|
if self.policy.when_max_open_trades_violated == "close":
|
|
trades.remove(trade)
|
|
|
|
def check_max_loss(self, trades):
|
|
if self.strategy.risk_model is None:
|
|
return
|
|
check_passed = risk.check_max_loss(
|
|
self.strategy.risk_model,
|
|
self.strategy.account.initial_balance,
|
|
self.get_balance(),
|
|
)
|
|
if not check_passed:
|
|
self.handle_violation(
|
|
"max_loss", self.policy.when_max_loss_violated, None # Close all trades
|
|
)
|
|
if self.policy.when_max_loss_violated == "close":
|
|
for trade in trades:
|
|
trades.remove(trade)
|
|
|
|
def check_max_risk(self, trades):
|
|
if self.strategy.risk_model is None:
|
|
return
|
|
close_trades = []
|
|
|
|
trades_copy = self.get_sorted_trades_copy(trades, reverse=False)
|
|
# market.convert_trades_to_usd(self.strategy.account, trades_copy)
|
|
|
|
iterations = 0
|
|
finished = False
|
|
while not finished:
|
|
iterations += 1
|
|
if iterations > 10000:
|
|
raise Exception("Too many iterations")
|
|
|
|
check_passed = risk.check_max_risk(
|
|
self.strategy.risk_model,
|
|
self.get_balance(return_usd=True),
|
|
trades_copy,
|
|
)
|
|
if check_passed:
|
|
finished = True
|
|
else:
|
|
# Add the newest trade to close_trades and remove it from trades_copy
|
|
close_trades.append(trades_copy[-1])
|
|
trades_copy = trades_copy[:-1]
|
|
if close_trades:
|
|
for trade in close_trades:
|
|
self.handle_violation(
|
|
"max_risk", self.policy.when_max_risk_violated, trade["id"]
|
|
)
|
|
if self.policy.when_max_risk_violated == "close":
|
|
trades.remove(trade)
|
|
|
|
def run_checks(self):
|
|
converted_trades = convert_trades(self.get_trades())
|
|
trades_copy = self.get_sorted_trades_copy(converted_trades, reverse=False)
|
|
market.convert_trades_to_usd(self.strategy.account, trades_copy)
|
|
for trade in reversed(trades_copy):
|
|
try:
|
|
self.check_trading_time(trade)
|
|
self.check_trends(trade)
|
|
self.check_position_size(trade)
|
|
self.check_protection(trade)
|
|
self.check_asset_groups(trade)
|
|
except TradeClosed:
|
|
# Trade was closed, don't check it again
|
|
trades_copy.remove(trade)
|
|
continue
|
|
|
|
self.check_crossfilter(trades_copy)
|
|
self.check_max_open_trades(trades_copy)
|
|
self.check_max_open_trades_per_symbol(trades_copy)
|
|
self.check_max_loss(trades_copy)
|
|
self.check_max_risk(trades_copy)
|
|
|
|
def execute_actions(self):
|
|
if not self.actions:
|
|
return
|
|
self.reduce_actions()
|
|
self.run_actions()
|