pluto/core/lib/money.py

632 lines
24 KiB
Python

# Twisted imports
import asyncio
import logging
from datetime import datetime
import urllib3
from aiocoingecko import AsyncCoinGeckoAPISession
from django.conf import settings
from elasticsearch import AsyncElasticsearch
from forex_python.converter import CurrencyRates
# Other library imports
from core.models import Aggregator, OperatorWallets, Platform
from core.util import logs
# TODO: secure ES traffic properly
urllib3.disable_warnings()
tracer = logging.getLogger("opensearch")
tracer.setLevel(logging.CRITICAL)
tracer = logging.getLogger("elastic_transport.transport")
tracer.setLevel(logging.CRITICAL)
log = logs.get_logger("money")
class Money(object):
"""
Generic class for handling money-related matters that aren't Revolut or Agora.
"""
def __init__(self):
"""
Initialise the Money object.
Set the logger.
Initialise the CoinGecko API.
"""
self.cr = CurrencyRates()
self.cg = AsyncCoinGeckoAPISession()
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(
settings.ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False
)
self.es = client
async def check_all(self, user=None, link_group=None, nordigen=None, agora=None):
"""
Run all the balance checks that output into ES in another thread.
"""
if not all([nordigen, agora]):
raise Exception
if not any([user, link_group]):
raise Exception
# I hate circular dependencies
self.nordigen = nordigen
self.agora = agora
cast = {}
if user is not None:
cast["user"] = user
if link_group is not None:
cast["link_group"] = link_group
aggregators = Aggregator.objects.filter(enabled=True, **cast)
platforms = Platform.objects.filter(enabled=True, **cast)
total = await self.get_total(aggregators, platforms, trades=True)
return total
# def setup_loops(self):
# """
# Set up the LoopingCalls to get the balance so we have data in ES.
# """
# if settings.ES.Enabled == "1" or settings.Logstash.Enabled == "1":
# self.lc_es_checks = LoopingCall(self.run_checks_in_thread)
# delay = int(settings.ES.RefreshSec)
# self.lc_es_checks.start(delay)
# if settings.ES.Enabled == "1":
# self.agora.es = self.es
# self.lbtc.es = self.es
async def write_to_es(self, msgtype, cast):
cast["type"] = "money"
cast["ts"] = str(datetime.now().isoformat())
cast["xtype"] = msgtype
# cast["user_id"] = self.instance.user.id
# cast["platform_id"] = self.instance.id
try:
await self.es.index(index=settings.ELASTICSEARCH_INDEX, body=cast)
except RuntimeError:
log.warning("Could not write to ES")
async def lookup_rates(self, platform, ads, rates=None):
"""
Lookup the rates for a list of public ads.
"""
if not rates:
rates = await self.cg.get_price(
ids=["monero", "bitcoin"],
vs_currencies=self.markets.get_all_currencies(platform),
)
# Set the price based on the asset
for ad in ads:
if ad[4] == "XMR":
coin = "monero"
elif ad[4] == "BTC":
coin = "bitcoin" # No s here
currency = ad[5]
base_currency_price = rates[coin][currency.lower()]
price = float(ad[2])
rate = round(price / base_currency_price, 2)
ad.append(rate)
# TODO: sort?
return sorted(ads, key=lambda x: x[2])
async def get_rates_all(self):
"""
Get all rates that pair with USD.
:return: dictionary of USD/XXX rates
:rtype: dict
"""
rates = self.cr.get_rates("USD")
return rates
# TODO: pass platform
async def get_acceptable_margins(self, platform, currency, amount):
"""
Get the minimum and maximum amounts we would accept a trade for.
:param currency: currency code
:param amount: amount
:return: (min, max)
:rtype: tuple
"""
rates = await self.get_rates_all()
if currency == "USD":
min_amount = amount - platform.accept_within_usd
max_amount = amount + platform.accept_within_usd
return (min_amount, max_amount)
amount_usd = amount / rates[currency]
min_usd = amount_usd - platform.accept_within_usd
max_usd = amount_usd + platform.accept_within_usd
min_local = min_usd * rates[currency]
max_local = max_usd * rates[currency]
return (min_local, max_local)
async def get_minmax(self, min_usd, max_usd, asset, currency):
rates = await self.get_rates_all()
if currency not in rates and not currency == "USD":
log.error(f"Can't create ad without rates: {currency}")
return (None, None)
if currency == "USD":
min_amount = min_usd
max_amount = max_usd
else:
min_amount = rates[currency] * min_usd
max_amount = rates[currency] * max_usd
return (min_amount, max_amount)
async def to_usd(self, amount, currency):
if currency == "USD":
return float(amount)
else:
rates = await self.get_rates_all()
return float(amount) / rates[currency]
async def multiple_to_usd(self, currency_map, rates=None):
"""
Convert multiple curencies to USD while saving API calls.
"""
if not rates:
rates = await self.get_rates_all()
cumul = 0
for currency, amount in currency_map.items():
if currency == "USD":
cumul += float(amount)
else:
cumul += float(amount) / rates[currency]
return cumul
async def get_total_usd(self):
"""
Get total USD in all our accounts, bank and trading.
:return: value in USD
:rtype float:
"""
total_sinks_usd = await self.sinks.get_total_usd()
agora_wallet_xmr = await self.agora.api.wallet_balance_xmr()
agora_wallet_btc = await self.agora.api.wallet_balance()
# lbtc_wallet_btc = await self.lbtc.api.wallet_balance()
if not agora_wallet_xmr["success"]:
return False
if not agora_wallet_btc["success"]:
return False
# if not lbtc_wallet_btc["success"]:
# return False
if not agora_wallet_xmr["response"]:
return False
if not agora_wallet_btc["response"]:
return False
# if not lbtc_wallet_btc["response"]:
# return False
total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"]
total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"]
# total_btc_lbtc = lbtc_wallet_btc["response"]["data"]["total"]["balance"]
# Get the XMR -> USD exchange rate
xmr_usd = await self.cg.get_price(ids="monero", vs_currencies=["USD"])
# Get the BTC -> USD exchange rate
btc_usd = await self.cg.get_price(ids="bitcoin", vs_currencies=["USD"])
# Convert the Agora BTC total to USD
total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"]
# Convert the LBTC BTC total to USD
# total_usd_lbtc_btc = float(total_btc_lbtc) * btc_usd["bitcoin"]["usd"]
# Convert the Agora XMR total to USD
total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"]
# Add it all up
total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc
# total_usd_lbtc = total_usd_lbtc_btc
total_usd = total_usd_agora + total_sinks_usd
# total_usd_lbtc +
cast_es = {
"price_usd": total_usd,
"total_usd_agora_xmr": total_usd_agora_xmr,
"total_usd_agora_btc": total_usd_agora_btc,
# "total_usd_lbtc_btc": total_usd_lbtc_btc,
"total_xmr_agora": total_xmr_agora,
"total_btc_agora": total_btc_agora,
# "total_btc_lbtc": total_btc_lbtc,
"xmr_usd": xmr_usd["monero"]["usd"],
"btc_usd": btc_usd["bitcoin"]["usd"],
"total_sinks_usd": total_sinks_usd,
"total_usd_agora": total_usd_agora,
}
await self.write_to_es("get_total_usd", cast_es)
return total_usd
async def gather_total_map(self, aggregators, rates):
"""
Gather the total USD of specified aggregators.
"""
total_run_tasks = [self.nordigen(x) for x in aggregators]
total_run = await asyncio.gather(*total_run_tasks)
total_map_tasks = [x.get_total_map() for x in total_run]
total_map = await asyncio.gather(*total_map_tasks)
to_usd_tasks = [self.multiple_to_usd(x, rates=rates) for x in total_map]
total = await asyncio.gather(*to_usd_tasks)
total = sum(total)
return total
async def gather_wallet_balance_xmr(self, platforms):
"""
Gather the total XMR of the specified platforms.
"""
run_tasks = [self.agora(platform) for platform in platforms]
run = await asyncio.gather(*run_tasks)
xmr_tasks = [x.api.wallet_balance_xmr() for x in run]
xmr_pre = await asyncio.gather(*xmr_tasks)
xmr = [float(x["response"]["data"]["total"]["balance"]) for x in xmr_pre]
xmr = sum(xmr)
return xmr
async def gather_wallet_balance(self, platforms):
"""
Gather the total BTC of the specified platforms.
"""
run_tasks = [self.agora(platform) for platform in platforms]
run = await asyncio.gather(*run_tasks)
btc_tasks = [x.api.wallet_balance() for x in run]
btc_pre = await asyncio.gather(*btc_tasks)
btc = [float(x["response"]["data"]["total"]["balance"]) for x in btc_pre]
btc = sum(btc)
return btc
def gather_base_usd(self, platforms):
total = 0
for platform in platforms:
total += platform.base_usd
return total
def gather_withdrawal_limit(self, platforms):
total = 0
for platform in platforms:
total += platform.withdrawal_trigger
return total
# TODO: possibly refactor this into smaller functions which don't return as much
# check if this is all really needed in the corresponding withdraw function
async def get_total(self, aggregators, platforms, trades=False):
"""
Get all the values corresponding to the amount of money we hold.
:return: ((total SEK, total USD, total GBP),
(total XMR USD, total BTC USD),
(total XMR, total BTC))
:rtype: tuple(tuple(float, float, float),
tuple(float, float),
tuple(float, float))
"""
rates = await self.get_rates_all()
total_sinks_usd = await self.gather_total_map(aggregators, rates=rates)
agora_wallet_xmr = await self.gather_wallet_balance_xmr(platforms)
agora_wallet_btc = await self.gather_wallet_balance(platforms)
total_xmr_agora = agora_wallet_xmr
total_btc_agora = agora_wallet_btc
# Get the XMR -> USD exchange rate
async with AsyncCoinGeckoAPISession() as cg:
xmr_usd = await cg.get_price(ids="monero", vs_currencies="USD")
# Get the BTC -> USD exchange rate
btc_usd = await cg.get_price(ids="bitcoin", vs_currencies="USD")
# Convert the Agora XMR total to USD
total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"]
# Convert the Agora BTC total to USD
total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"]
# Add it all up
total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc
total_usd = total_usd_agora + total_sinks_usd
# Get aggregate totals and withdrawal limits
total_base_usd = self.gather_base_usd(platforms)
total_withdrawal_limit = self.gather_withdrawal_limit(platforms)
# Use those to calculate amount remaining
withdraw_threshold = total_base_usd + total_withdrawal_limit
remaining = withdraw_threshold - total_usd
profit = total_usd - total_base_usd
profit_in_xmr = profit / xmr_usd["monero"]["usd"]
# Convert the total USD price to GBP and SEK
price_sek = rates["SEK"] * total_usd
price_usd = total_usd
price_gbp = rates["GBP"] * total_usd
# Get open trades value
if trades:
dashboards = await self.gather_dashboards(platforms)
cumul_trades = 0
for dash in dashboards:
cumul_add = await self.open_trades_usd_parse_dash(dash, rates)
cumul_trades += cumul_add
total_with_trades = total_usd + cumul_trades
total_remaining = withdraw_threshold - total_with_trades
total_profit = total_with_trades - total_base_usd
total_profit_in_xmr = total_profit / xmr_usd["monero"]["usd"]
# cast = (
# (
# price_sek,
# price_usd,
# price_gbp,
# ), # Total prices in our 3 favourite currencies
# (
# total_xmr_usd,
# total_btc_usd,
# ), # Total USD balance in only Agora
# (total_xmr, total_btc),
# ) # Total XMR and BTC balance in Agora
# TODO
cast_es = {
"price_sek": price_sek,
"price_usd": price_usd,
"price_gbp": price_gbp,
"total_usd_agora_xmr": total_usd_agora_xmr,
"total_usd_agora_btc": total_usd_agora_btc,
# "total_usd_lbtc_btc": total_usd_lbtc_btc,
"total_xmr_agora": total_xmr_agora,
"total_btc_agora": total_btc_agora,
# "total_btc_lbtc": total_btc_lbtc,
"xmr_usd": xmr_usd["monero"]["usd"],
"btc_usd": btc_usd["bitcoin"]["usd"],
"total_sinks_usd": total_sinks_usd,
"total_usd_agora": total_usd_agora,
"total_usd": total_usd,
"total_base_usd": total_base_usd,
"total_withdrawal_limit": total_withdrawal_limit,
"remaining": remaining,
"profit": profit,
"profit_in_xmr": profit_in_xmr,
"withdraw_threshold": withdraw_threshold,
}
if trades:
cast_es["open_trade_value"] = cumul_trades
cast_es["total_with_trades"] = total_with_trades
cast_es["total_remaining"] = total_remaining
cast_es["total_profit"] = total_profit
cast_es["total_profit_in_xmr"] = total_profit_in_xmr
await self.write_to_es("get_total", cast_es)
return cast_es
async def open_trades_usd_parse_dash(self, dash, rates):
cumul_usd = 0
cache = {}
async with AsyncCoinGeckoAPISession() as cg:
for _, contact in dash.items():
# We need created at in order to look up the historical prices
created_at = contact["data"]["created_at"]
# Reformat the date how CoinGecko likes
# 2022-05-02T11:17:14+00:00
if "+" in created_at:
date_split = created_at.split("+")
date_split[1].replace(".", "")
date_split[1].replace(":", "")
created_at = "+".join(date_split)
date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S%z")
else:
date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
date_formatted = date_parsed.strftime("%d-%m-%Y")
# Get the historical rates for the right asset, extract the price
asset = contact["data"]["advertisement"]["asset"]
if asset == "XMR":
amount_crypto = contact["data"]["amount_xmr"]
if (asset, date_formatted) in cache:
history = cache[(asset, date_formatted)]
else:
history = await cg.get_coin_history_by_id(
coin_id="monero", date=date_formatted
)
if "market_data" not in history:
return False
crypto_usd = float(history["market_data"]["current_price"]["usd"])
elif asset == "BTC":
amount_crypto = contact["data"]["amount_btc"]
if (asset, date_formatted) in cache:
history = cache[(asset, date_formatted)]
else:
history = await cg.get_coin_history_by_id(
coin_id="bitcoin", date=date_formatted
)
if "market_data" not in history:
return False
crypto_usd = float(history["market_data"]["current_price"]["usd"])
if (asset, date_formatted) not in cache:
cache[(asset, date_formatted)] = history
# Convert crypto to USD
amount = float(amount_crypto) * crypto_usd
# currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
cumul_usd += float(amount)
# else:
# rate = rates[currency]
# print("RATE", rate)
# print("AMOUNT", amount)
# amount_usd = float(amount) / rate
# print("AMOUJT USD", amount_usd)
# cumul_usd += amount_usd
return cumul_usd
async def gather_dashboards(self, platforms):
dashboards = []
for platform in platforms:
run = await self.agora(platform)
dash = await run.wrap_dashboard()
dashboards.append(dash)
return dashboards
# async def get_open_trades_usd(self, rates):
# """
# Get total value of open trades in USD.
# :return: total trade value
# :rtype: float
# """
# dash_agora = await self.agora.wrap_dashboard()
# # dash_lbtc = self.lbtc.wrap_dashboard()
# # dash_lbtc = yield dash_lbtc
# if dash_agora is False:
# return False
# # if dash_lbtc is False:
# # return False
# # rates = await self.get_rates_all()
# cumul_usd_agora = await self.open_trades_usd_parse_dash(
# "agora", dash_agora, rates
# )
# # cumul_usd_lbtc = await self.open_trades_usd_parse_dash("lbtc", dash_lbtc,
# # rates)
# cumul_usd = cumul_usd_agora # + cumul_usd_lbtc
# cast_es = {
# "trades_usd": cumul_usd,
# }
# await self.write_to_es("get_open_trades_usd", cast_es)
# return cumul_usd
async def get_total_remaining(self):
"""
Check how much profit we need to make in order to withdraw, taking into account
open trade value.
:return: profit remaining in USD
:rtype: float
"""
total_usd = await self.get_total_usd()
total_trades_usd = await self.get_open_trades_usd()
if not total_usd:
return False
total_usd += total_trades_usd
withdraw_threshold = float(settings.Money.BaseUSD) + float(
settings.Money.WithdrawLimit
)
remaining = withdraw_threshold - total_usd
cast_es = {
"total_remaining_usd": remaining,
}
await self.write_to_es("get_total_remaining", cast_es)
return remaining
async def get_total_with_trades(self):
total_usd = await self.get_total_usd()
if not total_usd:
return False
total_trades_usd = await self.get_open_trades_usd()
total_with_trades = total_usd + total_trades_usd
cast_es = {
"total_with_trades": total_with_trades,
}
await self.write_to_es("get_total_with_trades", cast_es)
return total_with_trades
def get_pay_list(self, linkgroup, requisitions, platforms, user, profit):
pay_list = {} # Wallet: [(amount, reason), (amount, reason), ...]
# Get the total amount of money we have
total_throughput_platform = 0
total_throughput_requisition = 0
for requisition in requisitions:
total_throughput_requisition += requisition.throughput
for platform in platforms:
total_throughput_platform += platform.throughput
cut_platform = profit * (linkgroup.platform_owner_cut_percentage / 100)
cut_req = profit * (linkgroup.requisition_owner_cut_percentage / 100)
cut_operator = profit * (linkgroup.operator_cut_percentage / 100)
# Add the operator payment
operator_wallets = OperatorWallets.objects.filter(user=user).first()
operator_length = len(operator_wallets.payees.all())
payment_per_operator = cut_operator / operator_length
for wallet in operator_wallets.payees.all():
if wallet not in pay_list:
pay_list[wallet] = []
detail = (
f"Operator cut for 1 of {operator_length} operators, total "
f"{cut_operator}"
)
pay_list[wallet].append((payment_per_operator, detail))
# Add the platform payment
for platform in platforms:
# Get ratio of platform.throughput to the total platform throughput
if total_throughput_platform == 0:
ratio = 0
else:
ratio = platform.throughput / total_throughput_platform
platform_payment = cut_platform * ratio
payees_length = len(platform.payees.all())
if payees_length == 0:
payment_per_payee = 0
else:
payment_per_payee = platform_payment / payees_length
for wallet in platform.payees.all():
if wallet not in pay_list:
pay_list[wallet] = []
detail = (
f"Platform {platform} cut for 1 of {payees_length} payees, "
f"total {cut_platform}"
)
pay_list[wallet].append((payment_per_payee, detail))
# Add the requisition payment
for requisition in requisitions:
# Get ratio of requisition.throughput to the requisition cut
if total_throughput_requisition == 0:
ratio = 0
else:
ratio = requisition.throughput / total_throughput_requisition
req_payment = cut_req * ratio
payees_length = len(requisition.payees.all())
if payees_length == 0:
payment_per_payee = 0
else:
payment_per_payee = req_payment / payees_length
for wallet in requisition.payees.all():
if wallet not in pay_list:
pay_list[wallet] = []
detail = (
f"Requisition {requisition} cut for 1 of {payees_length} payees, "
f"total {cut_req}"
)
pay_list[wallet].append((payment_per_payee, detail))
return pay_list
def collapse_pay_list(self, pay_list):
"""
Collapse the pay list into a single dict of wallet: amount.
"""
collapsed = {}
for wallet, payments in pay_list.items():
collapsed[wallet] = sum([x[0] for x in payments])
return collapsed
money = Money()