# Twisted imports import asyncio import logging from datetime import datetime import urllib3 from aiocoingecko import AsyncCoinGeckoAPISession from django.conf import settings from elasticsearch import AsyncElasticsearch from forex_python.converter import CurrencyRates # Other library imports from core.models import Aggregator, OperatorWallets, Platform from core.util import logs # TODO: secure ES traffic properly urllib3.disable_warnings() tracer = logging.getLogger("opensearch") tracer.setLevel(logging.CRITICAL) tracer = logging.getLogger("elastic_transport.transport") tracer.setLevel(logging.CRITICAL) log = logs.get_logger("money") class Money(object): """ Generic class for handling money-related matters that aren't Revolut or Agora. """ def __init__(self): """ Initialise the Money object. Set the logger. Initialise the CoinGecko API. """ self.cr = CurrencyRates() self.cg = AsyncCoinGeckoAPISession() auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch( settings.ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False ) self.es = client async def check_all(self, user=None, link_group=None, nordigen=None, agora=None): """ Run all the balance checks that output into ES in another thread. """ if not all([nordigen, agora]): raise Exception if not any([user, link_group]): raise Exception # I hate circular dependencies self.nordigen = nordigen self.agora = agora cast = {} if user is not None: cast["user"] = user if link_group is not None: cast["link_group"] = link_group aggregators = Aggregator.objects.filter(enabled=True, **cast) platforms = Platform.objects.filter(enabled=True, **cast) total = await self.get_total(aggregators, platforms, trades=True) return total # def setup_loops(self): # """ # Set up the LoopingCalls to get the balance so we have data in ES. # """ # if settings.ES.Enabled == "1" or settings.Logstash.Enabled == "1": # self.lc_es_checks = LoopingCall(self.run_checks_in_thread) # delay = int(settings.ES.RefreshSec) # self.lc_es_checks.start(delay) # if settings.ES.Enabled == "1": # self.agora.es = self.es # self.lbtc.es = self.es async def write_to_es(self, msgtype, cast): cast["type"] = "money" cast["ts"] = str(datetime.now().isoformat()) cast["xtype"] = msgtype cast["user_id"] = self.instance.user.id cast["platform_id"] = self.instance.id await self.es.index(index=settings.ELASTICSEARCH_INDEX, body=cast) async def lookup_rates(self, platform, ads, rates=None): """ Lookup the rates for a list of public ads. """ if not rates: rates = await self.cg.get_price( ids=["monero", "bitcoin"], vs_currencies=self.markets.get_all_currencies(platform), ) # Set the price based on the asset for ad in ads: if ad[4] == "XMR": coin = "monero" elif ad[4] == "BTC": coin = "bitcoin" # No s here currency = ad[5] base_currency_price = rates[coin][currency.lower()] price = float(ad[2]) rate = round(price / base_currency_price, 2) ad.append(rate) # TODO: sort? return sorted(ads, key=lambda x: x[2]) async def get_rates_all(self): """ Get all rates that pair with USD. :return: dictionary of USD/XXX rates :rtype: dict """ rates = self.cr.get_rates("USD") return rates # TODO: pass platform async def get_acceptable_margins(self, platform, currency, amount): """ Get the minimum and maximum amounts we would accept a trade for. :param currency: currency code :param amount: amount :return: (min, max) :rtype: tuple """ rates = await self.get_rates_all() if currency == "USD": min_amount = amount - platform.accept_within_usd max_amount = amount + platform.accept_within_usd return (min_amount, max_amount) amount_usd = amount / rates[currency] min_usd = amount_usd - platform.accept_within_usd max_usd = amount_usd + platform.accept_within_usd min_local = min_usd * rates[currency] max_local = max_usd * rates[currency] return (min_local, max_local) async def get_minmax(self, min_usd, max_usd, asset, currency): rates = await self.get_rates_all() if currency not in rates and not currency == "USD": log.error(f"Can't create ad without rates: {currency}") return (None, None) if currency == "USD": min_amount = min_usd max_amount = max_usd else: min_amount = rates[currency] * min_usd max_amount = rates[currency] * max_usd return (min_amount, max_amount) async def to_usd(self, amount, currency): if currency == "USD": return float(amount) else: rates = await self.get_rates_all() return float(amount) / rates[currency] async def multiple_to_usd(self, currency_map, rates=None): """ Convert multiple curencies to USD while saving API calls. """ if not rates: rates = await self.get_rates_all() cumul = 0 for currency, amount in currency_map.items(): if currency == "USD": cumul += float(amount) else: cumul += float(amount) / rates[currency] return cumul async def get_total_usd(self): """ Get total USD in all our accounts, bank and trading. :return: value in USD :rtype float: """ total_sinks_usd = await self.sinks.get_total_usd() agora_wallet_xmr = await self.agora.api.wallet_balance_xmr() agora_wallet_btc = await self.agora.api.wallet_balance() # lbtc_wallet_btc = await self.lbtc.api.wallet_balance() if not agora_wallet_xmr["success"]: return False if not agora_wallet_btc["success"]: return False # if not lbtc_wallet_btc["success"]: # return False if not agora_wallet_xmr["response"]: return False if not agora_wallet_btc["response"]: return False # if not lbtc_wallet_btc["response"]: # return False total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"] total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"] # total_btc_lbtc = lbtc_wallet_btc["response"]["data"]["total"]["balance"] # Get the XMR -> USD exchange rate xmr_usd = await self.cg.get_price(ids="monero", vs_currencies=["USD"]) # Get the BTC -> USD exchange rate btc_usd = await self.cg.get_price(ids="bitcoin", vs_currencies=["USD"]) # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Convert the LBTC BTC total to USD # total_usd_lbtc_btc = float(total_btc_lbtc) * btc_usd["bitcoin"]["usd"] # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc # total_usd_lbtc = total_usd_lbtc_btc total_usd = total_usd_agora + total_sinks_usd # total_usd_lbtc + cast_es = { "price_usd": total_usd, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, # "total_usd_lbtc_btc": total_usd_lbtc_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, # "total_btc_lbtc": total_btc_lbtc, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], "total_sinks_usd": total_sinks_usd, "total_usd_agora": total_usd_agora, } await self.write_to_es("get_total_usd", cast_es) return total_usd async def gather_total_map(self, aggregators, rates): """ Gather the total USD of specified aggregators. """ total_run_tasks = [self.nordigen(x) for x in aggregators] total_run = await asyncio.gather(*total_run_tasks) total_map_tasks = [x.get_total_map() for x in total_run] total_map = await asyncio.gather(*total_map_tasks) to_usd_tasks = [self.multiple_to_usd(x, rates=rates) for x in total_map] total = await asyncio.gather(*to_usd_tasks) total = sum(total) return total async def gather_wallet_balance_xmr(self, platforms): """ Gather the total XMR of the specified platforms. """ run_tasks = [self.agora(platform) for platform in platforms] run = await asyncio.gather(*run_tasks) xmr_tasks = [x.api.wallet_balance_xmr() for x in run] xmr_pre = await asyncio.gather(*xmr_tasks) xmr = [float(x["response"]["data"]["total"]["balance"]) for x in xmr_pre] xmr = sum(xmr) return xmr async def gather_wallet_balance(self, platforms): """ Gather the total BTC of the specified platforms. """ run_tasks = [self.agora(platform) for platform in platforms] run = await asyncio.gather(*run_tasks) btc_tasks = [x.api.wallet_balance() for x in run] btc_pre = await asyncio.gather(*btc_tasks) btc = [float(x["response"]["data"]["total"]["balance"]) for x in btc_pre] btc = sum(btc) return btc def gather_base_usd(self, platforms): total = 0 for platform in platforms: total += platform.base_usd return total def gather_withdrawal_limit(self, platforms): total = 0 for platform in platforms: total += platform.withdrawal_trigger return total # TODO: possibly refactor this into smaller functions which don't return as much # check if this is all really needed in the corresponding withdraw function async def get_total(self, aggregators, platforms, trades=False): """ Get all the values corresponding to the amount of money we hold. :return: ((total SEK, total USD, total GBP), (total XMR USD, total BTC USD), (total XMR, total BTC)) :rtype: tuple(tuple(float, float, float), tuple(float, float), tuple(float, float)) """ rates = await self.get_rates_all() total_sinks_usd = await self.gather_total_map(aggregators, rates=rates) agora_wallet_xmr = await self.gather_wallet_balance_xmr(platforms) agora_wallet_btc = await self.gather_wallet_balance(platforms) total_xmr_agora = agora_wallet_xmr total_btc_agora = agora_wallet_btc # Get the XMR -> USD exchange rate async with AsyncCoinGeckoAPISession() as cg: xmr_usd = await cg.get_price(ids="monero", vs_currencies="USD") # Get the BTC -> USD exchange rate btc_usd = await cg.get_price(ids="bitcoin", vs_currencies="USD") # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc total_usd = total_usd_agora + total_sinks_usd # Get aggregate totals and withdrawal limits total_base_usd = self.gather_base_usd(platforms) total_withdrawal_limit = self.gather_withdrawal_limit(platforms) # Use those to calculate amount remaining withdraw_threshold = total_base_usd + total_withdrawal_limit remaining = withdraw_threshold - total_usd profit = total_usd - total_base_usd # Convert the total USD price to GBP and SEK price_sek = rates["SEK"] * total_usd price_usd = total_usd price_gbp = rates["GBP"] * total_usd # Get open trades value if trades: dashboards = await self.gather_dashboards(platforms) cumul_trades = 0 for dash in dashboards: cumul_add = await self.open_trades_usd_parse_dash(dash, rates) cumul_trades += cumul_add total_with_trades = total_usd + cumul_trades total_remaining = withdraw_threshold - total_with_trades total_profit = total_with_trades - total_base_usd # cast = ( # ( # price_sek, # price_usd, # price_gbp, # ), # Total prices in our 3 favourite currencies # ( # total_xmr_usd, # total_btc_usd, # ), # Total USD balance in only Agora # (total_xmr, total_btc), # ) # Total XMR and BTC balance in Agora # TODO cast_es = { "price_sek": price_sek, "price_usd": price_usd, "price_gbp": price_gbp, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, # "total_usd_lbtc_btc": total_usd_lbtc_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, # "total_btc_lbtc": total_btc_lbtc, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], "total_sinks_usd": total_sinks_usd, "total_usd_agora": total_usd_agora, "total_usd": total_usd, "total_base_usd": total_base_usd, "total_withdrawal_limit": total_withdrawal_limit, "remaining": remaining, "profit": profit, "withdraw_threshold": withdraw_threshold, } if trades: cast_es["open_trade_value"] = cumul_trades cast_es["total_with_trades"] = total_with_trades cast_es["total_remaining"] = total_remaining cast_es["total_profit"] = total_profit # await self.write_to_es("get_total", cast_es) return cast_es async def open_trades_usd_parse_dash(self, dash, rates): cumul_usd = 0 cache = {} async with AsyncCoinGeckoAPISession() as cg: for _, contact in dash.items(): # We need created at in order to look up the historical prices created_at = contact["data"]["created_at"] # Reformat the date how CoinGecko likes # 2022-05-02T11:17:14+00:00 if "+" in created_at: date_split = created_at.split("+") date_split[1].replace(".", "") date_split[1].replace(":", "") created_at = "+".join(date_split) date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S%z") else: date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ") date_formatted = date_parsed.strftime("%d-%m-%Y") # Get the historical rates for the right asset, extract the price asset = contact["data"]["advertisement"]["asset"] if asset == "XMR": amount_crypto = contact["data"]["amount_xmr"] if (asset, date_formatted) in cache: history = cache[(asset, date_formatted)] else: history = await cg.get_coin_history_by_id( coin_id="monero", date=date_formatted ) if "market_data" not in history: return False crypto_usd = float(history["market_data"]["current_price"]["usd"]) elif asset == "BTC": amount_crypto = contact["data"]["amount_btc"] if (asset, date_formatted) in cache: history = cache[(asset, date_formatted)] else: history = await cg.get_coin_history_by_id( coin_id="bitcoin", date=date_formatted ) if "market_data" not in history: return False crypto_usd = float(history["market_data"]["current_price"]["usd"]) if (asset, date_formatted) not in cache: cache[(asset, date_formatted)] = history # Convert crypto to USD amount = float(amount_crypto) * crypto_usd # currency = contact["data"]["currency"] if not contact["data"]["is_selling"]: continue cumul_usd += float(amount) # else: # rate = rates[currency] # print("RATE", rate) # print("AMOUNT", amount) # amount_usd = float(amount) / rate # print("AMOUJT USD", amount_usd) # cumul_usd += amount_usd return cumul_usd async def gather_dashboards(self, platforms): dashboards = [] for platform in platforms: run = await self.agora(platform) dash = await run.wrap_dashboard() dashboards.append(dash) return dashboards # async def get_open_trades_usd(self, rates): # """ # Get total value of open trades in USD. # :return: total trade value # :rtype: float # """ # dash_agora = await self.agora.wrap_dashboard() # # dash_lbtc = self.lbtc.wrap_dashboard() # # dash_lbtc = yield dash_lbtc # if dash_agora is False: # return False # # if dash_lbtc is False: # # return False # # rates = await self.get_rates_all() # cumul_usd_agora = await self.open_trades_usd_parse_dash( # "agora", dash_agora, rates # ) # # cumul_usd_lbtc = await self.open_trades_usd_parse_dash("lbtc", dash_lbtc, # # rates) # cumul_usd = cumul_usd_agora # + cumul_usd_lbtc # cast_es = { # "trades_usd": cumul_usd, # } # await self.write_to_es("get_open_trades_usd", cast_es) # return cumul_usd async def get_total_remaining(self): """ Check how much profit we need to make in order to withdraw, taking into account open trade value. :return: profit remaining in USD :rtype: float """ total_usd = await self.get_total_usd() total_trades_usd = await self.get_open_trades_usd() if not total_usd: return False total_usd += total_trades_usd withdraw_threshold = float(settings.Money.BaseUSD) + float( settings.Money.WithdrawLimit ) remaining = withdraw_threshold - total_usd cast_es = { "total_remaining_usd": remaining, } await self.write_to_es("get_total_remaining", cast_es) return remaining async def get_total_with_trades(self): total_usd = await self.get_total_usd() if not total_usd: return False total_trades_usd = await self.get_open_trades_usd() total_with_trades = total_usd + total_trades_usd cast_es = { "total_with_trades": total_with_trades, } await self.write_to_es("get_total_with_trades", cast_es) return total_with_trades def get_pay_list(self, linkgroup, requisitions, platforms, user, profit): pay_list = {} # Wallet: [(amount, reason), (amount, reason), ...] # Get the total amount of money we have total_throughput_platform = 0 total_throughput_requisition = 0 for requisition in requisitions: total_throughput_requisition += requisition.throughput for platform in platforms: total_throughput_platform += platform.throughput cut_platform = profit * (linkgroup.platform_owner_cut_percentage / 100) cut_req = profit * (linkgroup.requisition_owner_cut_percentage / 100) cut_operator = profit * (linkgroup.operator_cut_percentage / 100) # Add the operator payment operator_wallets = OperatorWallets.objects.filter(user=user).first() operator_length = len(operator_wallets.payees.all()) payment_per_operator = cut_operator / operator_length for wallet in operator_wallets.payees.all(): if wallet not in pay_list: pay_list[wallet] = [] detail = ( f"Operator cut for 1 of {operator_length} operators, total " f"{cut_operator}" ) pay_list[wallet].append((payment_per_operator, detail)) # Add the platform payment for platform in platforms: # Get ratio of platform.throughput to the total platform throughput if total_throughput_platform == 0: ratio = 0 else: ratio = platform.throughput / total_throughput_platform platform_payment = cut_platform * ratio payees_length = len(platform.payees.all()) if payees_length == 0: payment_per_payee = 0 else: payment_per_payee = platform_payment / payees_length for wallet in platform.payees.all(): if wallet not in pay_list: pay_list[wallet] = [] detail = ( f"Platform {platform} cut for 1 of {payees_length} payees, " f"total {cut_platform}" ) pay_list[wallet].append((payment_per_payee, detail)) # Add the requisition payment for requisition in requisitions: # Get ratio of requisition.throughput to the requisition cut if total_throughput_requisition == 0: ratio = 0 else: ratio = requisition.throughput / total_throughput_requisition req_payment = cut_req * ratio payees_length = len(requisition.payees.all()) if payees_length == 0: payment_per_payee = 0 else: payment_per_payee = req_payment / payees_length for wallet in requisition.payees.all(): if wallet not in pay_list: pay_list[wallet] = [] detail = ( f"Requisition {requisition} cut for 1 of {payees_length} payees, " f"total {cut_req}" ) pay_list[wallet].append((payment_per_payee, detail)) return pay_list def collapse_pay_list(self, pay_list): """ Collapse the pay list into a single dict of wallet: amount. """ collapsed = {} for wallet, payments in pay_list.items(): collapsed[wallet] = sum([x[0] for x in payments]) return collapsed money = Money()