# Twisted/Klein imports from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThread # Other library imports from json import dumps from random import choices from string import ascii_uppercase from elasticsearch import Elasticsearch from datetime import datetime import urllib3 import logging # Project imports from settings import settings from db import r import util # TODO: secure ES traffic properly urllib3.disable_warnings() tracer = logging.getLogger("elasticsearch") tracer.setLevel(logging.CRITICAL) tracer = logging.getLogger("elastic_transport.transport") tracer.setLevel(logging.CRITICAL) class Transactions(util.Base): """ Handler class for incoming Revolut transactions. """ def __init__(self): """ Initialise the Transaction object. Set the logger. """ super().__init__() if settings.ES.Enabled == "1": self.es = Elasticsearch( f"https://{settings.ES.Host}:9200", verify_certs=False, basic_auth=(settings.ES.Username, settings.ES.Pass), # ssl_assert_fingerprint=("6b264fd2fd107d45652d8add1750a8a78f424542e13b056d0548173006260710"), ca_certs="certs/ca.crt", ) def run_checks_in_thread(self): """ Run all the balance checks that output into ES in another thread. """ deferToThread(self.get_total) deferToThread(self.get_remaining) deferToThread(self.money.get_profit) deferToThread(self.money.get_profit, True) deferToThread(self.get_open_trades_usd) deferToThread(self.get_total_remaining) deferToThread(self.get_total_with_trades) def setup_loops(self): """ Set up the LoopingCalls to get the balance so we have data in ES. """ if settings.ES.Enabled == "1": self.lc_es_checks = LoopingCall(self.run_checks_in_thread) delay = int(settings.ES.RefreshSec) self.lc_es_checks.start(delay) self.agora.es = self.es # TODO: write tests then refactor, this is terribly complicated! def transaction(self, data): """ Store details of transaction and post notifications to IRC. Matches it up with data stored in Redis to attempt to reconcile with an Agora trade. :param data: details of transaction :type data: dict """ event = data["event"] ts = data["timestamp"] if "data" not in data: return inside = data["data"] txid = inside["id"] if "type" not in inside: # stored_trade here is actually TX stored_trade = r.hgetall(f"tx.{txid}") if not stored_trade: self.log.error(f"Could not find entry in DB for typeless transaction: {txid}") return print("BEFORE CONVERT STORED TRADE", stored_trade) stored_trade = util.convert(stored_trade) if "old_state" in inside: if "new_state" in inside: # We don't care unless we're being told a transaction is now completed if not inside["new_state"] == "completed": return # We don't care unless the existing trade is pending if not stored_trade["state"] == "pending": return # Check the old state is what we also think it is if inside["old_state"] == stored_trade["state"]: # Set the state to the new state stored_trade["state"] = inside["new_state"] # Store the updated state r.hmset(f"tx.{txid}", stored_trade) # Check it's all been previously validated if "valid" not in stored_trade: self.log.error(f"Valid not in stored trade for {txid}, aborting.") return if stored_trade["valid"] == "1": # Make it invalid immediately, as we're going to release now stored_trade["valid"] = "0" r.hmset(f"tx.{txid}", stored_trade) reference = self.tx_to_ref(stored_trade["trade_id"]) self.release_funds(stored_trade["trade_id"], reference) self.ux.notify.notify_complete_trade(stored_trade["amount"], stored_trade["currency"]) return # If type not in inside and we haven't hit any more returns return else: txtype = inside["type"] if txtype == "card_payment": self.log.info(f"Ignoring card payment: {txid}") return state = inside["state"] if "reference" in inside: reference = inside["reference"] else: reference = "not_given" leg = inside["legs"][0] if "counterparty" in leg: account_type = leg["counterparty"]["account_type"] else: account_type = "not_given" amount = leg["amount"] if amount <= 0: self.log.info(f"Ignoring transaction with negative/zero amount: {txid}") return currency = leg["currency"] description = leg["description"] to_store = { "event": event, "trade_id": "", "ts": ts, "txid": txid, "txtype": txtype, "state": state, "reference": reference, "account_type": account_type, "amount": amount, "currency": currency, "description": description, "valid": 0, # All checks passed and we can release escrow? } self.log.info(f"Transaction processed: {dumps(to_store, indent=2)}") self.irc.sendmsg(f"AUTO Incoming transaction: {amount}{currency} ({reference}) - {state} - {description}") # Partial reference implementation # Account for silly people not removing the default string # Split the reference into parts ref_split = reference.split(" ") # Get all existing references existing_refs = self.get_refs() # Get all parts of the given reference split that match the existing references stored_trade_reference = set(existing_refs).intersection(set(ref_split)) if len(stored_trade_reference) > 1: self.log.error(f"Multiple references valid for TXID {txid}: {reference}") self.irc.sendmsg(f"Multiple references valid for TXID {txid}: {reference}") return stored_trade = False looked_up_without_reference = False # Amount/currency lookup implementation if not stored_trade_reference: self.log.info(f"No reference in DB refs for {reference}") self.irc.sendmsg(f"No reference in DB refs for {reference}") # Try checking just amount and currency, as some people (usually people buying small amounts) # are unable to put in a reference properly. self.log.info(f"Checking against amount and currency for TXID {txid}") self.irc.sendmsg(f"Checking against amount and currency for TXID {txid}") stored_trade = self.find_trade(txid, currency, amount) if not stored_trade: self.log.info(f"Failed to get reference by amount and currency: {txid} {currency} {amount}") self.irc.sendmsg(f"Failed to get reference by amount and currency: {txid} {currency} {amount}") return if currency == "USD": amount_usd = amount else: rates = self.money.get_rates_all() amount_usd = amount / rates[currency] # Amount is reliable here as it is checked by find_trade, so no need for stored_trade["amount"] if float(amount_usd) > float(settings.Agora.AcceptableAltLookupUSD): self.log.info("Not checking against amount and currency as amount exceeds MAX") self.irc.sendmsg(f"Not checking against amount and currency as amount exceeds MAX") # Close here if the amount exceeds the allowable limit for no reference return # Note that we have looked it up without reference so we don't use +- below # This might be redundant given the amount checks in find_trade, but better safe than sorry! looked_up_without_reference = True if not stored_trade: stored_trade = self.get_ref(stored_trade_reference.pop()) if not stored_trade: self.log.info(f"No reference in DB for {reference}") self.irc.sendmsg(f"No reference in DB for {reference}") return amount = float(amount) stored_trade["amount"] = float(stored_trade["amount"]) # Make sure it was sent in the expected currency if not stored_trade["currency"] == currency: self.log.info(f"Currency mismatch, Agora: {stored_trade['currency']} / Sink: {currency}") self.irc.sendmsg(f"Currency mismatch, Agora: {stored_trade['currency']} / Sink: {currency}") return # Make sure the expected amount was sent if not stored_trade["amount"] == amount: if looked_up_without_reference: return # If the amount does not match exactly, get the min and max values for our given acceptable margins for trades min_amount, max_amount = self.money.get_acceptable_margins(currency, stored_trade["amount"]) self.log.info(f"Amount does not match exactly, trying with margins: min: {min_amount} / max: {max_amount}") self.irc.sendmsg(f"Amount does not match exactly, trying with margins: min: {min_amount} / max: {max_amount}") if not min_amount < amount < max_amount: self.log.info("Amount mismatch - not in margins: {stored_trade['amount']} (min: {min_amount} / max: {max_amount}") self.irc.sendmsg(f"Amount mismatch - not in margins: {stored_trade['amount']} (min: {min_amount} / max: {max_amount}") return # We have made it this far without hitting any of the returns, so let's set valid = True # This will let us instantly release if the type is pending, and it is subsequently updated to completed with a callback. to_store["valid"] = 1 # Store the trade ID so we can release it easily to_store["trade_id"] = stored_trade["id"] if not state == "completed": self.log.info(f"Storing incomplete trade: {txid}") r.hmset(f"tx.{txid}", to_store) # Don't procees further if state is not "completed" return r.hmset(f"tx.{txid}", to_store) self.release_funds(stored_trade["id"], stored_trade["reference"]) self.ux.notify.notify_complete_trade(amount, currency) def release_funds(self, trade_id, reference): self.log.info(f"All checks passed, releasing funds for {trade_id} {reference}") self.irc.sendmsg(f"All checks passed, releasing funds for {trade_id} / {reference}") rtrn = self.agora.release_funds(trade_id) self.agora.agora.contact_message_post(trade_id, "Thanks! Releasing now :)") # Parse the escrow release response message = rtrn["message"] message_long = rtrn["response"]["data"]["message"] self.irc.sendmsg(f"{message} - {message_long}") def new_trade(self, asset, trade_id, buyer, currency, amount, amount_crypto, provider): """ Called when we have a new trade in Agora. Store details in Redis, generate a reference and optionally let the customer know the reference. """ reference = "".join(choices(ascii_uppercase, k=5)) reference = f"{asset}-{reference}" existing_ref = r.get(f"trade.{trade_id}.reference") if not existing_ref: r.set(f"trade.{trade_id}.reference", reference) to_store = { "id": trade_id, "asset": asset, "buyer": buyer, "currency": currency, "amount": amount, "amount_crypto": amount_crypto, "reference": reference, "provider": provider, } self.log.info(f"Storing trade information: {str(to_store)}") r.hmset(f"trade.{reference}", to_store) self.irc.sendmsg(f"Generated reference for {trade_id}: {reference}") self.ux.notify.notify_new_trade(amount, currency) if settings.Agora.Send == "1": self.agora.agora.contact_message_post(trade_id, f"Hi! When sending the payment please use reference code: {reference}") if existing_ref: return util.convert(existing_ref) else: return reference def find_trade(self, txid, currency, amount): """ Get a trade reference that matches the given currency and amount. Only works if there is one result. :param txid: Sink transaction ID :param currency: currency :param amount: amount :type txid: string :type currency: string :type amount: int :return: matching trade object or False :rtype: dict or bool """ refs = self.get_refs() matching_refs = [] # TODO: use get_ref_map in this function instead of calling get_ref multiple times for ref in refs: stored_trade = self.get_ref(ref) if stored_trade["currency"] == currency and float(stored_trade["amount"]) == float(amount): matching_refs.append(stored_trade) if len(matching_refs) != 1: self.log.error(f"Find trade returned multiple results for TXID {txid}: {matching_refs}") return False return matching_refs[0] def get_refs(self): """ Get all reference IDs for trades. :return: list of trade IDs :rtype: list """ references = [] ref_keys = r.keys("trade.*.reference") for key in ref_keys: references.append(r.get(key)) return util.convert(references) def get_ref_map(self): """ Get all reference IDs for trades. :return: dict of references keyed by TXID :rtype: dict """ references = {} ref_keys = r.keys("trade.*.reference") for key in ref_keys: tx = util.convert(key).split(".")[1] references[tx] = r.get(key) return util.convert(references) def get_ref(self, reference): """ Get the trade information for a reference. :param reference: trade reference :type reference: string :return: dict of trade information :rtype: dict """ ref_data = r.hgetall(f"trade.{reference}") ref_data = util.convert(ref_data) if not ref_data: return False return ref_data def del_ref(self, reference): """ Delete a given reference from the Redis database. :param reference: trade reference to delete :type reference: string """ tx = self.ref_to_tx(reference) r.delete(f"trade.{reference}") r.delete(f"trade.{tx}.reference") def cleanup(self, references): """ Reconcile the internal reference database with a given list of references. Delete all internal references not present in the list and clean up artifacts. :param references: list of references to reconcile against :type references: list """ for tx, reference in self.get_ref_map().items(): if reference not in references: self.log.info(f"Archiving trade reference: {reference} / TX: {tx}") r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference") r.rename(f"trade.{reference}", f"archive.trade.{reference}") def tx_to_ref(self, tx): """ Convert a trade ID to a reference. :param tx: trade ID :type tx: string :return: reference :rtype: string """ refs = self.get_refs() for reference in refs: ref_data = util.convert(r.hgetall(f"trade.{reference}")) if not ref_data: continue if ref_data["id"] == tx: return reference def ref_to_tx(self, reference): """ Convert a reference to a trade ID. :param reference: trade reference :type reference: string :return: trade ID :rtype: string """ ref_data = util.convert(r.hgetall(f"trade.{reference}")) if not ref_data: return False return ref_data["id"] def get_total_usd(self): """ Get total USD in all our accounts, bank and trading. :return: value in USD :rtype float: """ # TODO: get Sink totals agora_wallet_xmr = self.agora.agora.wallet_balance_xmr() if not agora_wallet_xmr["success"]: return False agora_wallet_btc = self.agora.agora.wallet_balance() if not agora_wallet_btc["success"]: return False total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"] total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"] # Get the XMR -> USD exchange rate xmr_usd = self.agora.cg.get_price(ids="monero", vs_currencies=["USD"]) # Get the BTC -> USD exchange rate btc_usd = self.agora.cg.get_price(ids="bitcoin", vs_currencies=["USD"]) # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc # total_usd = total_usd_agora + total_usd_revolut # TODO: add sinks value here total_usd = total_usd_agora cast_es = { "price_usd": total_usd, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], # "total_usd_sinks": total_usd_sinks, "total_usd_agora": total_usd_agora, } self.write_to_es("get_total_usd", cast_es) return total_usd # TODO: possibly refactor this into smaller functions which don't return as much stuff # check if this is all really needed in the corresponding withdraw function def get_total(self): """ Get all the values corresponding to the amount of money we hold. :return: ((total SEK, total USD, total GBP), (total XMR USD, total BTC USD), (total XMR, total BTC)) :rtype: tuple(tuple(float, float, float), tuple(float, float), tuple(float, float)) """ # TODO: get sinks value here # total_usd_revolut = self.revolut.get_total_usd() agora_wallet_xmr = self.agora.agora.wallet_balance_xmr() if not agora_wallet_xmr["success"]: self.log.error("Could not get Agora XMR wallet total.") return False agora_wallet_btc = self.agora.agora.wallet_balance() if not agora_wallet_btc["success"]: self.log.error("Could not get Agora BTC wallet total.") return False total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"] total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"] # Get the XMR -> USD exchange rate xmr_usd = self.agora.cg.get_price(ids="monero", vs_currencies=["USD"]) # Get the BTC -> USD exchange rate btc_usd = self.agora.cg.get_price(ids="bitcoin", vs_currencies=["USD"]) # Convert the Agora XMR total to USD total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"] # Convert the Agora BTC total to USD total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"] # Add it all up total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc # total_usd = total_usd_agora + total_usd_revolut # TODO: add sinks value here total_usd = total_usd_agora # Convert the total USD price to GBP and SEK rates = self.money.get_rates_all() price_sek = rates["SEK"] * total_usd price_usd = total_usd price_gbp = rates["GBP"] * total_usd cast = ( (price_sek, price_usd, price_gbp), # Total prices in our 3 favourite currencies (total_usd_agora_xmr, total_usd_agora_btc), # Total USD balance in only Agora (total_xmr_agora, total_btc_agora), ) # Total XMR and BTC balance in Agora cast_es = { "price_sek": price_sek, "price_usd": price_usd, "price_gbp": price_gbp, "total_usd_agora_xmr": total_usd_agora_xmr, "total_usd_agora_btc": total_usd_agora_btc, "total_xmr_agora": total_xmr_agora, "total_btc_agora": total_btc_agora, "xmr_usd": xmr_usd["monero"]["usd"], "btc_usd": btc_usd["bitcoin"]["usd"], # "total_usd_revolut": total_usd_revolut, "total_usd_agora": total_usd_agora, } self.write_to_es("get_total", cast_es) return cast def write_to_es(self, msgtype, cast): if settings.ES.Enabled == "1": cast["type"] = msgtype cast["ts"] = str(datetime.now().isoformat()) cast["xtype"] = "tx" self.es.index(index=settings.ES.Index, document=cast) def get_remaining(self): """ Check how much profit we need to make in order to withdraw. :return: profit remaining in USD :rtype: float """ total_usd = self.get_total_usd() if not total_usd: return False withdraw_threshold = float(settings.Money.BaseUSD) + float(settings.Money.WithdrawLimit) remaining = withdraw_threshold - total_usd cast_es = { "remaining_usd": remaining, } self.write_to_es("get_remaining", cast_es) return remaining def get_open_trades_usd(self): """ Get total value of open trades in USD. :return: total trade value :rtype: float """ dash = self.agora.wrap_dashboard() if dash is False: return False rates = self.money.get_rates_all() cumul_usd = 0 for contact_id, contact in dash.items(): # We need created at in order to look up the historical prices created_at = contact["data"]["created_at"] # Reformat the date how CoinGecko likes date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ") date_formatted = date_parsed.strftime("%d-%m-%Y") # Get the historical rates for the right asset, extract the price asset = contact["data"]["advertisement"]["asset"] if asset == "XMR": amount_crypto = contact["data"]["amount_xmr"] history = self.agora.cg.get_coin_history_by_id(id="monero", date=date_formatted) crypto_usd = float(history["market_data"]["current_price"]["usd"]) elif asset == "BTC": amount_crypto = contact["data"]["amount_btc"] history = self.agora.cg.get_coin_history_by_id(id="bitcoin", date=date_formatted) crypto_usd = float(history["market_data"]["current_price"]["usd"]) # Convert crypto to fiat amount = float(amount_crypto) * crypto_usd currency = contact["data"]["currency"] if not contact["data"]["is_selling"]: continue if currency == "USD": cumul_usd += float(amount) else: rate = rates[currency] amount_usd = float(amount) / rate cumul_usd += amount_usd cast_es = { "trades_usd": cumul_usd, } self.write_to_es("get_open_trades_usd", cast_es) return cumul_usd def get_total_remaining(self): """ Check how much profit we need to make in order to withdraw, taking into account open trade value. :return: profit remaining in USD :rtype: float """ total_usd = self.get_total_usd() total_trades_usd = self.get_open_trades_usd() if not total_usd: return False total_usd += total_trades_usd withdraw_threshold = float(settings.Money.BaseUSD) + float(settings.Money.WithdrawLimit) remaining = withdraw_threshold - total_usd cast_es = { "total_remaining_usd": remaining, } self.write_to_es("get_total_remaining", cast_es) return remaining def get_total_with_trades(self): total_usd = self.get_total_usd() if not total_usd: return False total_trades_usd = self.get_open_trades_usd() total_with_trades = total_usd + total_trades_usd cast_es = { "total_with_trades": total_with_trades, } self.write_to_es("get_total_with_trades", cast_es) return total_with_trades