# Twisted imports import logging from datetime import datetime import asyncio import urllib3 # Other library imports from core.models import Aggregator, Platform from aiocoingecko import AsyncCoinGeckoAPISession from django.conf import settings from elasticsearch import AsyncElasticsearch from forex_python.converter import CurrencyRates # TODO: secure ES traffic properly urllib3.disable_warnings() tracer = logging.getLogger("opensearch") tracer.setLevel(logging.CRITICAL) tracer = logging.getLogger("elastic_transport.transport") tracer.setLevel(logging.CRITICAL) class Money(object): """ Generic class for handling money-related matters that aren't Revolut or Agora. """ def __init__(self): """ Initialise the Money object. Set the logger. Initialise the CoinGecko API. """ self.cr = CurrencyRates() self.cg = AsyncCoinGeckoAPISession() auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch( settings.ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False ) self.es = client async def check_all(self, user=None, nordigen=None, agora=None): """ Run all the balance checks that output into ES in another thread. """ if not all([user, nordigen, agora]): raise Exception # I hate circular dependencies self.nordigen = nordigen self.agora = agora aggregators = Aggregator.objects.filter(user=user, enabled=True) platforms = Platform.objects.filter(user=user, enabled=True) order = [ "total", # "remaining", # "profit", # "profit_with_trades", # "open_trades", # "total_remaining", # "total_with_trades", ] tasks = [ self.get_total(aggregators, platforms), # self.get_remaining(), # self.get_profit(), # self.get_profit(True), # self.get_open_trades_usd(), # self.get_total_remaining(), # self.get_total_with_trades(), ] task_output = await asyncio.gather(*tasks) print("TASK OUTPUT", task_output) results = dict(zip(order, task_output)) print("RESULTS", results) return results # def setup_loops(self): # """ # Set up the LoopingCalls to get the balance so we have data in ES. # """ # if settings.ES.Enabled == "1" or settings.Logstash.Enabled == "1": # self.lc_es_checks = LoopingCall(self.run_checks_in_thread) # delay = int(settings.ES.RefreshSec) # self.lc_es_checks.start(delay) # if settings.ES.Enabled == "1": # self.agora.es = self.es # self.lbtc.es = self.es async def write_to_es(self, msgtype, cast): cast["type"] = "money" cast["ts"] = str(datetime.now().isoformat()) cast["xtype"] = msgtype cast["user_id"] = self.instance.user.id cast["platform_id"] = self.instance.id await self.es.index(index=settings.ELASTICSEARCH_INDEX, body=cast) async def lookup_rates(self, platform, ads, rates=None): """ Lookup the rates for a list of public ads. """ if not rates: rates = await self.cg.get_price( ids=["monero", "bitcoin"], vs_currencies=self.markets.get_all_currencies(platform), ) # Set the price based on the asset for ad in ads: if ad[4] == "XMR": coin = "monero" elif ad[4] == "BTC": coin = "bitcoin" # No s here currency = ad[5] base_currency_price = rates[coin][currency.lower()] price = float(ad[2]) rate = round(price / base_currency_price, 2) ad.append(rate) # TODO: sort? return sorted(ads, key=lambda x: x[2]) async def get_rates_all(self): """ Get all rates that pair with USD. :return: dictionary of USD/XXX rates :rtype: dict """ rates = self.cr.get_rates("USD") return rates async def get_acceptable_margins(self, platform, currency, amount): """ Get the minimum and maximum amounts we would accept a trade for. :param currency: currency code :param amount: amount :return: (min, max) :rtype: tuple """ sets = util.get_settings(platform) rates = await self.get_rates_all() if currency == "USD": min_amount = amount - float(sets.AcceptableUSDMargin) max_amount = amount + float(sets.AcceptableUSDMargin) return (min_amount, max_amount) amount_usd = amount / rates[currency] min_usd = amount_usd - float(sets.AcceptableUSDMargin) max_usd = amount_usd + float(sets.AcceptableUSDMargin) min_local = min_usd * rates[currency] max_local = max_usd * rates[currency] return (min_local, max_local) async def get_minmax(self, min_usd, max_usd, asset, currency): rates = await self.get_rates_all() if currency not in rates and not currency == "USD": self.log.error(f"Can't create ad without rates: {currency}") return if currency == "USD": min_amount = min_usd max_amount = max_usd else: min_amount = rates[currency] * min_usd max_amount = rates[currency] * max_usd return (min_amount, max_amount) async def to_usd(self, amount, currency): if currency == "USD": return float(amount) else: rates = await self.get_rates_all() return float(amount) / rates[currency] async def multiple_to_usd(self, currency_map): """ Convert multiple curencies to USD while saving API calls. """ rates = await self.get_rates_all() cumul = 0 for currency, amount in currency_map.items(): if currency == "USD": cumul += float(amount) else: cumul += float(amount) / rates[currency] return cumul async def get_profit(self, trades=False): """ Check how much total profit we have made. :return: profit in USD :rtype: float """ total_usd = await self.get_total_usd() if not total_usd: return False if trades: trades_usd = await self.get_open_trades_usd() total_usd += trades_usd profit = total_usd - float(settings.Money.BaseUSD) if trades: cast_es = { "profit_trades_usd": profit, } else: cast_es = { "profit_usd": profit, } await self.write_to_es("get_profit", cast_es) return profit async def get_total_usd(self): """ Get total USD in all our accounts, bank and trading. :return: value in USD :rtype float: """ total_sinks_usd = await self.sinks.get_total_usd() agora_wallet_xmr = await self.agora.api.wallet_balance_xmr() agora_wallet_btc = await self.agora.api.wallet_balance() # lbtc_wallet_btc = await self.lbtc.api.wallet_balance() if not agora_wallet_xmr["success"]: return False if not agora_wallet_btc["success"]: return False # if not lbtc_wallet_btc["success"]: # return False if not agora_wallet_xmr["response"]: return False if not agora_wallet_btc["response"]: return False # if not lbtc_wallet_btc["response"]: # return False total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"] total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"] # total_btc_lbtc = lbtc_wallet_btc["response"]["data"]["total"]["balance"] # Get the XMR -> USD exchange rate xmr_usd = await self.cg.get_price(ids="monero", vs_currencies=["USD"]) # Get the BTC -> USD exchange rate btc_usd = await self.cg.get_price(ids="bitcoin", vs_currencies=["USD"]) # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Convert the LBTC BTC total to USD # total_usd_lbtc_btc = float(total_btc_lbtc) * btc_usd["bitcoin"]["usd"] # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc # total_usd_lbtc = total_usd_lbtc_btc total_usd = total_usd_agora + total_sinks_usd # total_usd_lbtc + cast_es = { "price_usd": total_usd, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, # "total_usd_lbtc_btc": total_usd_lbtc_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, # "total_btc_lbtc": total_btc_lbtc, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], "total_sinks_usd": total_sinks_usd, "total_usd_agora": total_usd_agora, } await self.write_to_es("get_total_usd", cast_es) return total_usd async def gather_total_map(self, aggregators): total = 0 for aggregator in aggregators: run = await self.nordigen(aggregator) total_map = await run.get_total_map() total_usd = await self.multiple_to_usd(total_map) print("gather_total_map total_usd", total_usd) total += total_usd return total async def gather_wallet_balance_xmr(self, platforms): # TODO: check success total = 0 for platform in platforms: run = await self.agora(platform) xmr = await run.api.wallet_balance_xmr() xmr = float(xmr["response"]["data"]["total"]["balance"]) print("gather waller xmr", xmr) total += xmr return total async def gather_wallet_balance(self, platforms): # TODO: check success total = 0 for platform in platforms: run = await self.agora(platform) btc = await run.api.wallet_balance() btc = float(btc["response"]["data"]["total"]["balance"]) total += btc return total # TODO: possibly refactor this into smaller functions which don't return as much # check if this is all really needed in the corresponding withdraw function async def get_total(self, aggregators, platforms): """ Get all the values corresponding to the amount of money we hold. :return: ((total SEK, total USD, total GBP), (total XMR USD, total BTC USD), (total XMR, total BTC)) :rtype: tuple(tuple(float, float, float), tuple(float, float), tuple(float, float)) """ total_sinks_usd = await self.gather_total_map(aggregators) agora_wallet_xmr = await self.gather_wallet_balance_xmr(platforms) agora_wallet_btc = await self.gather_wallet_balance(platforms) # lbtc_wallet_btc = await self.lbtc.api.wallet_balance() total_xmr_agora = agora_wallet_xmr total_btc_agora = agora_wallet_btc # total_btc_lbtc = lbtc_wallet_btc["response"]["data"]["total"]["balance"] # Get the XMR -> USD exchange rate async with AsyncCoinGeckoAPISession() as cg: xmr_usd = await cg.get_price(ids="monero", vs_currencies="USD") # Get the BTC -> USD exchange rate btc_usd = await cg.get_price(ids="bitcoin", vs_currencies="USD") # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Convert the LBTC BTC total to USD # total_usd_lbtc_btc = float(total_btc_lbtc) * btc_usd["bitcoin"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc # total_usd_lbtc = total_usd_lbtc_btc total_usd = total_usd_agora + total_sinks_usd # total_usd_lbtc total_btc_usd = total_usd_agora_btc # + total_usd_lbtc_btc total_xmr_usd = total_usd_agora_xmr total_xmr = total_xmr_agora total_btc = total_btc_agora # total_btc_lbtc # Convert the total USD price to GBP and SEK rates = await self.get_rates_all() price_sek = rates["SEK"] * total_usd price_usd = total_usd price_gbp = rates["GBP"] * total_usd # cast = ( # ( # price_sek, # price_usd, # price_gbp, # ), # Total prices in our 3 favourite currencies # ( # total_xmr_usd, # total_btc_usd, # ), # Total USD balance in only Agora # (total_xmr, total_btc), # ) # Total XMR and BTC balance in Agora # TODO cast_es = { "price_sek": price_sek, "price_usd": price_usd, "price_gbp": price_gbp, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, # "total_usd_lbtc_btc": total_usd_lbtc_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, # "total_btc_lbtc": total_btc_lbtc, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], "total_sinks_usd": total_sinks_usd, "total_usd_agora": total_usd_agora, } # await self.write_to_es("get_total", cast_es) return cast_es async def get_remaining(self): """ Check how much profit we need to make in order to withdraw. :return: profit remaining in USD :rtype: float """ total_usd = await self.get_total_usd() if not total_usd: return False withdraw_threshold = float(settings.Money.BaseUSD) + float( settings.Money.WithdrawLimit ) remaining = withdraw_threshold - total_usd cast_es = { "remaining_usd": remaining, } await self.write_to_es("get_remaining", cast_es) return remaining async def open_trades_usd_parse_dash(self, platform, dash, rates): cumul_usd = 0 for contact_id, contact in dash.items(): # We need created at in order to look up the historical prices created_at = contact["data"]["created_at"] # Reformat the date how CoinGecko likes # 2022-05-02T11:17:14+00:00 if "+" in created_at: date_split = created_at.split("+") date_split[1].replace(".", "") date_split[1].replace(":", "") created_at = "+".join(date_split) date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S%z") else: date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ") date_formatted = date_parsed.strftime("%d-%m-%Y") # Get the historical rates for the right asset, extract the price if platform == "agora": asset = contact["data"]["advertisement"]["asset"] elif platform == "lbtc": asset = "BTC" if asset == "XMR": amount_crypto = contact["data"]["amount_xmr"] history = await self.cg.get_coin_history_by_id( id="monero", date=date_formatted ) if "market_data" not in history: return False crypto_usd = float(history["market_data"]["current_price"]["usd"]) elif asset == "BTC": amount_crypto = contact["data"]["amount_btc"] history = await self.cg.get_coin_history_by_id( id="bitcoin", date=date_formatted ) crypto_usd = float(history["market_data"]["current_price"]["usd"]) # Convert crypto to fiat amount = float(amount_crypto) * crypto_usd currency = contact["data"]["currency"] if not contact["data"]["is_selling"]: continue if currency == "USD": cumul_usd += float(amount) else: rate = rates[currency] amount_usd = float(amount) / rate cumul_usd += amount_usd return cumul_usd async def get_open_trades_usd(self): """ Get total value of open trades in USD. :return: total trade value :rtype: float """ dash_agora = await self.agora.wrap_dashboard() # dash_lbtc = self.lbtc.wrap_dashboard() # dash_lbtc = yield dash_lbtc if dash_agora is False: return False # if dash_lbtc is False: # return False rates = await self.get_rates_all() cumul_usd_agora = await self.open_trades_usd_parse_dash( "agora", dash_agora, rates ) # cumul_usd_lbtc = await self.open_trades_usd_parse_dash("lbtc", dash_lbtc, # rates) cumul_usd = cumul_usd_agora # + cumul_usd_lbtc cast_es = { "trades_usd": cumul_usd, } await self.write_to_es("get_open_trades_usd", cast_es) return cumul_usd async def get_total_remaining(self): """ Check how much profit we need to make in order to withdraw, taking into account open trade value. :return: profit remaining in USD :rtype: float """ total_usd = await self.get_total_usd() total_trades_usd = await self.get_open_trades_usd() if not total_usd: return False total_usd += total_trades_usd withdraw_threshold = float(settings.Money.BaseUSD) + float( settings.Money.WithdrawLimit ) remaining = withdraw_threshold - total_usd cast_es = { "total_remaining_usd": remaining, } await self.write_to_es("get_total_remaining", cast_es) return remaining async def get_total_with_trades(self): total_usd = await self.get_total_usd() if not total_usd: return False total_trades_usd = await self.get_open_trades_usd() total_with_trades = total_usd + total_trades_usd cast_es = { "total_with_trades": total_with_trades, } await self.write_to_es("get_total_with_trades", cast_es) return total_with_trades money = Money()