from copy import deepcopy from datetime import datetime from decimal import Decimal as D from core.exchanges.convert import ( annotate_trade_tp_sl_percent, convert_trades, sl_percent_to_price, tp_percent_to_price, ) from core.lib.notify import sendmsg from core.trading import assetfilter, checks, market, risk from core.trading.crossfilter import crossfilter from core.trading.market import get_base_quote, get_trade_size_in_base from core.util import logs log = logs.get_logger("ams") class TradeClosed(Exception): pass class ActiveManagement(object): def __init__(self, strategy): self.strategy = strategy self.policy = strategy.active_management_policy self.trades = [] self.actions = {} self.balance = None self.balance_usd = None def add_action(self, action, check_type, trade_id, **kwargs): if action not in self.actions: self.actions[action] = [] self.actions[action].append( {"id": trade_id, "check": check_type, "extra": kwargs} ) def reduce_actions(self): """ If a trade is in the close actions, remove it from adjust. """ if "close" in self.actions: for close_action in self.actions["close"]: if "adjust" in self.actions: self.actions["adjust"] = [ action for action in self.actions["adjust"] if action["id"] != close_action["id"] ] def get_trades(self): if not self.trades: self.trades = self.strategy.account.client.get_all_open_trades() return self.trades def get_balance(self, return_usd=False): if return_usd: if self.balance_usd is None: self.balance_usd = self.strategy.account.client.get_balance( return_usd=True ) return self.balance_usd else: return self.balance_usd else: if self.balance is None: self.balance = self.strategy.account.client.get_balance( return_usd=False ) return self.balance else: return self.balance def close_trade(self, trade_id): self.strategy.account.client.close_trade(trade_id) def bulk_close_trades(self, trade_ids): for trade_id in trade_ids: self.close_trade(trade_id) def bulk_notify(self, action, action_cast_list): msg = "" for action_cast in action_cast_list: msg += f"ACTION: '{action}' on trade ID '{action_cast['id']}'\n" msg += f"VIOLATION: '{action_cast['check']}'\n" if action_cast["extra"]: extra = action_cast["extra"] extra = ", ".join([f"{k}: {v}" for k, v in extra.items()]) msg += f"EXTRA: {extra}\n" msg += "=========\n" sendmsg(self.strategy.user, msg, title=f"AMS: {action}") def adjust_position_size(self, trade_id, new_size): pass # TODO def adjust_protection(self, trade_id, new_protection): pass # TODO def bulk_adjust(self, action_cast_list): for item in action_cast_list: trade_id = item["id"] check = item["check"] if "extra" in item: extra = item["extra"] else: log.error(f"Adjust action missing extra data: {item}") continue if check == "position_size": new_size = extra["size"] self.adjust_position_size(trade_id, new_size) elif check == "protection": self.adjust_protection(trade_id, extra) def run_actions(self): for action, action_cast_list in self.actions.items(): if action == "none": continue self.bulk_notify(action, action_cast_list) if action == "close": trade_ids = [action_cast["id"] for action_cast in action_cast_list] self.bulk_close_trades(trade_ids) elif action == "adjust": self.bulk_adjust(action_cast_list) def handle_violation(self, check_type, action, trade_id, **kwargs): if action == "none": return self.add_action(action, check_type, trade_id, **kwargs) def check_trading_time(self, trade): open_ts = trade["open_time"] open_ts_as_date = datetime.strptime(open_ts, "%Y-%m-%dT%H:%M:%S.%fZ") trading_time_pass = checks.within_trading_times(self.strategy, open_ts_as_date) if not trading_time_pass: self.handle_violation( "trading_time", self.policy.when_trading_time_violated, trade["id"] ) if self.policy.when_trading_time_violated == "close": raise TradeClosed def check_trends(self, trade): direction = trade["direction"] symbol = trade["symbol"] trends_pass = checks.within_trends(self.strategy, symbol, direction) if not trends_pass: self.handle_violation( "trends", self.policy.when_trends_violated, trade["id"] ) if self.policy.when_trends_violated == "close": raise TradeClosed def check_position_size(self, trade): """ Check the position size is within the allowed deviation. WARNING: This uses the current balance, not the balance at the time of the trade. WARNING: This uses the current symbol prices, not those at the time of the trade. This should normally be run every 5 seconds, so this is fine. """ # TODO: add the trade value to the balance # Need to determine which prices to use balance = self.get_balance() direction = trade["direction"] symbol = trade["symbol"] # TODO: base, quote = get_base_quote(self.strategy.account.exchange, symbol) expected_trade_size = get_trade_size_in_base( direction, self.strategy.account, self.strategy, balance, base ) deviation = D(0.05) # 5% actual_trade_size = D(trade["amount"]) # Ensure the trade size not above the expected trade size by more than 5% max_trade_size = expected_trade_size + (deviation * expected_trade_size) within_max_trade_size = actual_trade_size <= max_trade_size if not within_max_trade_size: self.handle_violation( "position_size", self.policy.when_position_size_violated, trade["id"], size=expected_trade_size, ) if self.policy.when_position_size_violated == "close": raise TradeClosed elif self.policy.when_position_size_violated == "adjust": trade["amount"] = expected_trade_size trade["units"] = expected_trade_size def check_protection(self, trade): deviation = D(0.05) # 5% # fmt: off matches = { "stop_loss_percent": self.strategy.order_settings.stop_loss_percent, "take_profit_percent": self.strategy.order_settings.take_profit_percent, "trailing_stop_percent": self.strategy.order_settings.trailing_stop_loss_percent, } violations = {} for key, expected in matches.items(): if expected == 0: continue if key in trade: actual = D(trade[key]) expected = D(expected) min_val = expected - (deviation * expected) max_val = expected + (deviation * expected) within_deviation = min_val <= actual <= max_val else: within_deviation = False if not within_deviation: # violations[key] = expected if key == "take_profit_percent": tp_price = tp_percent_to_price( expected, trade["side"], trade["current_price"], trade["amount"], trade["pl"], ) violations["take_profit_price"] = tp_price elif key == "stop_loss_percent": sl_price = sl_percent_to_price( expected, trade["side"], trade["current_price"], trade["amount"], trade["pl"], ) violations["stop_loss_price"] = sl_price elif key == "trailing_stop_loss_percent": tsl_price = sl_percent_to_price( expected, trade["side"], trade["current_price"], trade["amount"], trade["pl"], ) violations["trailing_stop_loss_price"] = tsl_price if violations: self.handle_violation( "protection", self.policy.when_protection_violated, trade["id"], **violations ) if self.policy.when_protection_violated == "close": raise TradeClosed elif self.policy.when_protection_violated == "adjust": trade.update(violations) annotate_trade_tp_sl_percent(trade) market.convert_trades_to_usd(self.strategy.account, [trade]) def check_asset_groups(self, trade): if self.strategy.asset_group is not None: base, quote = get_base_quote( self.strategy.account.exchange, trade["symbol"] ) allowed = assetfilter.get_allowed( self.strategy.asset_group, base, quote, trade["side"] ) if not allowed: self.handle_violation( "asset_group", self.policy.when_asset_groups_violated, trade["id"] ) if self.policy.when_asset_groups_violated == "close": raise TradeClosed def get_sorted_trades_copy(self, trades, reverse=True): trades_copy = deepcopy(trades) # sort by open time, newest first trades_copy.sort( key=lambda x: datetime.strptime(x["open_time"], "%Y-%m-%dT%H:%M:%S.%fZ"), reverse=reverse, ) return trades_copy def check_crossfilter(self, trades): close_trades = [] # trades_copy = self.get_sorted_trades_copy(trades) iterations = 0 finished = [] # Recursively run crossfilter on the newest-first list until we have no more # conflicts length_before = len(trades) while not len(finished) == length_before: iterations += 1 if iterations > 10000: raise Exception("Too many iterations") # For each trade # We need reverse because we are removing items from the list # This works in our favour because the list is sorted the wrong # way around in run_checks() for trade in reversed(trades): # Abort if we've already checked this trade if trade in close_trades: continue # Calculate trades excluding this one # Also remove if we have already checked this others = [ t for t in trades if t["id"] != trade["id"] and t not in close_trades ] symbol = trade["symbol"] direction = trade["direction"] func = "entry" # Check if this trade is filtered, pretending we are opening it # And passing the remaining trades as the other trades in the account filtered = crossfilter( self.strategy.account, symbol, direction, func, all_positions=others ) if not filtered: # This trade is fine, add it to finished finished.append(trade) continue if filtered["action"] == "rejected": # It's rejected, add it to the close trades list # And don't check it again finished.append(trade) close_trades.append(trade) # Remove it from the trades list if self.policy.when_crossfilter_violated == "close": trades.remove(trade) if not close_trades: return if close_trades: for trade in close_trades: self.handle_violation( "crossfilter", self.policy.when_crossfilter_violated, trade["id"] ) def check_max_open_trades(self, trades): if self.strategy.risk_model is None: return max_open_pass = risk.check_max_open_trades(self.strategy.risk_model, trades) if not max_open_pass: # trades_copy = self.get_sorted_trades_copy(trades, reverse=False) # fmt: off trades_over_limit = trades[self.strategy.risk_model.max_open_trades:] for trade in trades_over_limit: self.handle_violation( "max_open_trades", self.policy.when_max_open_trades_violated, trade["id"], ) if self.policy.when_max_open_trades_violated == "close": trades.remove(trade) def check_max_open_trades_per_symbol(self, trades): if self.strategy.risk_model is None: return max_open_pass = risk.check_max_open_trades_per_symbol( self.strategy.risk_model, trades, return_symbols=True ) max_open_pass = list(max_open_pass) if max_open_pass: # trades_copy = self.get_sorted_trades_copy(trades, reverse=False) trades_over_limit = [] for symbol in max_open_pass: symbol_trades = [x for x in trades if x["symbol"] == symbol] # fmt: off exceeding_limit = symbol_trades[ self.strategy.risk_model.max_open_trades_per_symbol: ] for x in exceeding_limit: trades_over_limit.append(x) for trade in trades_over_limit: self.handle_violation( "max_open_trades_per_symbol", self.policy.when_max_open_trades_violated, trade["id"], ) if self.policy.when_max_open_trades_violated == "close": trades.remove(trade) def check_max_loss(self, trades): if self.strategy.risk_model is None: return check_passed = risk.check_max_loss( self.strategy.risk_model, self.strategy.account.initial_balance, self.get_balance(), ) if not check_passed: self.handle_violation( "max_loss", self.policy.when_max_loss_violated, None # Close all trades ) if self.policy.when_max_loss_violated == "close": for trade in trades: trades.remove(trade) def check_max_risk(self, trades): if self.strategy.risk_model is None: return close_trades = [] trades_copy = self.get_sorted_trades_copy(trades, reverse=False) # market.convert_trades_to_usd(self.strategy.account, trades_copy) iterations = 0 finished = False while not finished: iterations += 1 if iterations > 10000: raise Exception("Too many iterations") check_passed = risk.check_max_risk( self.strategy.risk_model, self.get_balance(return_usd=True), trades_copy, ) if check_passed: finished = True else: # Add the newest trade to close_trades and remove it from trades_copy close_trades.append(trades_copy[-1]) trades_copy = trades_copy[:-1] if close_trades: for trade in close_trades: self.handle_violation( "max_risk", self.policy.when_max_risk_violated, trade["id"] ) if self.policy.when_max_risk_violated == "close": trades.remove(trade) def run_checks(self): converted_trades = convert_trades(self.get_trades()) trades_copy = self.get_sorted_trades_copy(converted_trades, reverse=False) market.convert_trades_to_usd(self.strategy.account, trades_copy) for trade in reversed(trades_copy): try: self.check_trading_time(trade) self.check_trends(trade) self.check_position_size(trade) self.check_protection(trade) self.check_asset_groups(trade) except TradeClosed: # Trade was closed, don't check it again trades_copy.remove(trade) continue self.check_crossfilter(trades_copy) self.check_max_open_trades(trades_copy) self.check_max_open_trades_per_symbol(trades_copy) self.check_max_loss(trades_copy) self.check_max_risk(trades_copy) def execute_actions(self): if not self.actions: return self.reduce_actions() self.run_actions()