pluto/handler/transactions.py

578 lines
23 KiB
Python

# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
from json import dumps
from random import choices
from string import ascii_uppercase
from elasticsearch import Elasticsearch
from datetime import datetime
# Project imports
from settings import settings
from db import r
from util import convert
class Transactions(object):
"""
Handler class for incoming Revolut transactions.
"""
def __init__(self):
"""
Initialise the Transaction object.
Set the logger.
"""
self.log = Logger("transactions")
self.es = Elasticsearch(
f"https://{settings.ES.Host}:9200",
verify_certs=False,
basic_auth=(settings.ES.Username, settings.ES.Pass),
ssl_assert_fingerprint=("6b264fd2fd107d45652d8add1750a8a78f424542e13b056d0548173006260710"),
ca_certs="certs/ca.crt",
)
# TODO: write tests then refactor, this is terribly complicated!
def transaction(self, data):
"""
Store details of transaction and post notifications to IRC.
Matches it up with data stored in Redis to attempt to reconcile with an Agora trade.
:param data: details of transaction
:type data: dict
"""
event = data["event"]
ts = data["timestamp"]
inside = data["data"]
txid = inside["id"]
if "type" not in inside:
# stored_trade here is actually TX
stored_trade = r.hgetall(f"tx.{txid}")
if not stored_trade:
self.log.error("Could not find entry in DB for typeless transaction: {id}", id=txid)
return
print("BEFORE CONVERT STORED TRADE", stored_trade)
stored_trade = convert(stored_trade)
if "old_state" in inside:
if "new_state" in inside:
# We don't care unless we're being told a transaction is now completed
if not inside["new_state"] == "completed":
return
# We don't care unless the existing trade is pending
if not stored_trade["state"] == "pending":
return
# Check the old state is what we also think it is
if inside["old_state"] == stored_trade["state"]:
# Set the state to the new state
stored_trade["state"] = inside["new_state"]
# Store the updated state
r.hmset(f"tx.{txid}", stored_trade)
# Check it's all been previously validated
if "valid" not in stored_trade:
self.log.error("Valid not in stored trade for {txid}, aborting.", txid=txid)
return
if stored_trade["valid"] == "1":
# Make it invalid immediately, as we're going to release now
stored_trade["valid"] = "0"
r.hmset(f"tx.{txid}", stored_trade)
reference = self.tx_to_ref(stored_trade["trade_id"])
self.release_funds(stored_trade["trade_id"], reference)
self.notify.notify_complete_trade(stored_trade["amount"], stored_trade["currency"])
return
# If type not in inside and we haven't hit any more returns
return
else:
txtype = inside["type"]
if txtype == "card_payment":
self.log.info("Ignoring card payment: {id}", id=txid)
return
state = inside["state"]
if "reference" in inside:
reference = inside["reference"]
else:
reference = "not_given"
leg = inside["legs"][0]
if "counterparty" in leg:
account_type = leg["counterparty"]["account_type"]
else:
account_type = "not_given"
amount = leg["amount"]
if amount <= 0:
self.log.info("Ignoring transaction with negative/zero amount: {id}", id=txid)
return
currency = leg["currency"]
description = leg["description"]
to_store = {
"event": event,
"trade_id": "",
"ts": ts,
"txid": txid,
"txtype": txtype,
"state": state,
"reference": reference,
"account_type": account_type,
"amount": amount,
"currency": currency,
"description": description,
"valid": 0, # All checks passed and we can release escrow?
}
self.log.info("Transaction processed: {formatted}", formatted=dumps(to_store, indent=2))
self.irc.sendmsg(f"AUTO Incoming transaction: {amount}{currency} ({reference}) - {state} - {description}")
# Partial reference implementation
# Account for silly people not removing the default string
# Split the reference into parts
ref_split = reference.split(" ")
# Get all existing references
existing_refs = self.get_refs()
# Get all parts of the given reference split that match the existing references
stored_trade_reference = set(existing_refs).intersection(set(ref_split))
if len(stored_trade_reference) > 1:
self.log.error("Multiple references valid for TXID {txid}: {reference}", txid=txid, reference=reference)
self.irc.sendmsg(f"Multiple references valid for TXID {txid}: {reference}")
return
stored_trade = False
looked_up_without_reference = False
# Amount/currency lookup implementation
if not stored_trade_reference:
self.log.info(f"No reference in DB refs for {reference}", reference=reference)
self.irc.sendmsg(f"No reference in DB refs for {reference}")
# Try checking just amount and currency, as some people (usually people buying small amounts)
# are unable to put in a reference properly.
self.log.info("Checking against amount and currency for TXID {txid}", txid=txid)
self.irc.sendmsg(f"Checking against amount and currency for TXID {txid}")
stored_trade = self.find_trade(txid, currency, amount)
if not stored_trade:
self.log.info(
"Failed to get reference by amount and currency: {txid} {currency} {amount}",
txid=txid,
currency=currency,
amount=amount,
)
self.irc.sendmsg(f"Failed to get reference by amount and currency: {txid} {currency} {amount}")
return
if currency == "USD":
amount_usd = amount
else:
rates = self.agora.get_rates_all()
amount_usd = amount / rates[currency]
# Amount is reliable here as it is checked by find_trade, so no need for stored_trade["amount"]
if float(amount_usd) > float(settings.Agora.AcceptableAltLookupUSD):
self.log.info("Not checking against amount and currency as amount exceeds MAX")
self.irc.sendmsg(f"Not checking against amount and currency as amount exceeds MAX")
# Close here if the amount exceeds the allowable limit for no reference
return
# Note that we have looked it up without reference so we don't use +- below
# This might be redundant given the amount checks in find_trade, but better safe than sorry!
looked_up_without_reference = True
if not stored_trade:
stored_trade = self.get_ref(stored_trade_reference.pop())
if not stored_trade:
self.log.info("No reference in DB for {reference}", reference=reference)
self.irc.sendmsg(f"No reference in DB for {reference}")
return
amount = float(amount)
stored_trade["amount"] = float(stored_trade["amount"])
# Make sure it was sent in the expected currency
if not stored_trade["currency"] == currency:
self.log.info(
"Currency mismatch, Agora: {currency_agora} / Revolut: {currency}",
currency_agora=stored_trade["currency"],
currency=currency,
)
self.irc.sendmsg(f"Currency mismatch, Agora: {stored_trade['currency']} / Revolut: {currency}")
return
# Make sure the expected amount was sent
if not stored_trade["amount"] == amount:
if looked_up_without_reference:
return
# If the amount does not match exactly, get the min and max values for our given acceptable margins for trades
min_amount, max_amount = self.agora.get_acceptable_margins(currency, amount)
self.log.info(
"Amount does not match exactly, trying with margins: min: {min_amount} / max: {max_amount}",
min_amount=min_amount,
max_amount=max_amount,
)
self.irc.sendmsg(f"Amount does not match exactly, trying with margins: min: {min_amount} / max: {max_amount}")
if not min_amount < stored_trade["amount"] < max_amount:
self.log.info(
"Amount mismatch - not in margins: {amount} (min: {min_amount} / max: {max_amount}",
amount=stored_trade["amount"],
min_amount=min_amount,
max_amount=max_amount,
)
self.irc.sendmsg(f"Amount mismatch - not in margins: {stored_trade['amount']} (min: {min_amount} / max: {max_amount}")
return
# Make sure the account type was Revolut, as these are completed instantly
# if not account_type == "revolut":
# self.log.info("Account type is not Revolut: {account_type}", account_type=account_type)
# self.irc.sendmsg(f"Account type is not Revolut: {account_type}")
# return
# We have made it this far without hitting any of the returns, so let's set valid = True
# This will let us instantly release if the type is pending, and it is subsequently updated to completed with a callback.
to_store["valid"] = 1
# Store the trade ID so we can release it easily
to_store["trade_id"] = stored_trade["id"]
if not state == "completed":
self.log.info("Storing incomplete trade: {id}", id=txid)
r.hmset(f"tx.{txid}", to_store)
# Don't procees further if state is not "completed"
return
r.hmset(f"tx.{txid}", to_store)
self.release_funds(stored_trade["id"], stored_trade["reference"])
self.notify.notify_complete_trade(amount, currency)
def release_funds(self, trade_id, reference):
self.log.info("All checks passed, releasing funds for {trade_id} {reference}", trade_id=trade_id, reference=reference)
self.irc.sendmsg(f"All checks passed, releasing funds for {trade_id} / {reference}")
rtrn = self.agora.release_funds(trade_id)
self.agora.agora.contact_message_post(trade_id, "Thanks! Releasing now :)")
# Parse the escrow release response
message = rtrn["message"]
message_long = rtrn["response"]["data"]["message"]
self.irc.sendmsg(f"{message} - {message_long}")
def new_trade(self, asset, trade_id, buyer, currency, amount, amount_crypto, provider):
"""
Called when we have a new trade in Agora.
Store details in Redis, generate a reference and optionally let the customer know the reference.
"""
reference = "".join(choices(ascii_uppercase, k=5))
reference = f"{asset}-{reference}"
existing_ref = r.get(f"trade.{trade_id}.reference")
if not existing_ref:
r.set(f"trade.{trade_id}.reference", reference)
to_store = {
"id": trade_id,
"asset": asset,
"buyer": buyer,
"currency": currency,
"amount": amount,
"amount_crypto": amount_crypto,
"reference": reference,
"provider": provider,
}
self.log.info("Storing trade information: {info}", info=str(to_store))
r.hmset(f"trade.{reference}", to_store)
self.irc.sendmsg(f"Generated reference for {trade_id}: {reference}")
self.notify.notify_new_trade(amount, currency)
if settings.Agora.Send == "1":
self.agora.agora.contact_message_post(trade_id, f"Hi! When sending the payment please use reference code: {reference}")
if existing_ref:
return convert(existing_ref)
else:
return reference
def find_trade(self, txid, currency, amount):
"""
Get a trade reference that matches the given currency and amount.
Only works if there is one result.
:param txid: Revolut transaction ID
:param currency: currency
:param amount: amount
:type txid: string
:type currency: string
:type amount: int
:return: matching trade object or False
:rtype: dict or bool
"""
refs = self.get_refs()
matching_refs = []
# TODO: use get_ref_map in this function instead of calling get_ref multiple times
for ref in refs:
stored_trade = self.get_ref(ref)
if stored_trade["currency"] == currency and float(stored_trade["amount"]) == float(amount):
matching_refs.append(stored_trade)
if len(matching_refs) != 1:
self.log.error("Find trade returned multiple results for TXID {txid}: {matching_refs}", txid=txid, matching_refs=matching_refs)
return False
return matching_refs[0]
def get_refs(self):
"""
Get all reference IDs for trades.
:return: list of trade IDs
:rtype: list
"""
references = []
ref_keys = r.keys("trade.*.reference")
for key in ref_keys:
references.append(r.get(key))
return convert(references)
def get_ref_map(self):
"""
Get all reference IDs for trades.
:return: dict of references keyed by TXID
:rtype: dict
"""
references = {}
ref_keys = r.keys("trade.*.reference")
for key in ref_keys:
tx = convert(key).split(".")[1]
references[tx] = r.get(key)
return convert(references)
def get_ref(self, reference):
"""
Get the trade information for a reference.
:param reference: trade reference
:type reference: string
:return: dict of trade information
:rtype: dict
"""
ref_data = r.hgetall(f"trade.{reference}")
ref_data = convert(ref_data)
if not ref_data:
return False
return ref_data
def del_ref(self, reference):
"""
Delete a given reference from the Redis database.
:param reference: trade reference to delete
:type reference: string
"""
tx = self.ref_to_tx(reference)
r.delete(f"trade.{reference}")
r.delete(f"trade.{tx}.reference")
def cleanup(self, references):
"""
Reconcile the internal reference database with a given list of references.
Delete all internal references not present in the list and clean up artifacts.
:param references: list of references to reconcile against
:type references: list
"""
for tx, reference in self.get_ref_map().items():
if reference not in references:
self.log.info("Archiving trade reference: {reference} / TX: {tx}", reference=reference, tx=tx)
r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference")
r.rename(f"trade.{reference}", f"archive.trade.{reference}")
def tx_to_ref(self, tx):
"""
Convert a trade ID to a reference.
:param tx: trade ID
:type tx: string
:return: reference
:rtype: string
"""
refs = self.get_refs()
for reference in refs:
ref_data = convert(r.hgetall(f"trade.{reference}"))
if not ref_data:
continue
if ref_data["id"] == tx:
return reference
def ref_to_tx(self, reference):
"""
Convert a reference to a trade ID.
:param reference: trade reference
:type reference: string
:return: trade ID
:rtype: string
"""
ref_data = convert(r.hgetall(f"trade.{reference}"))
if not ref_data:
return False
return ref_data["id"]
def get_total_usd(self):
"""
Get total USD in all our accounts, bank and trading.
:return: value in USD
:rtype float:
"""
total_usd_revolut = self.revolut.get_total_usd()
if total_usd_revolut is False:
return False
agora_wallet_xmr = self.agora.agora.wallet_balance_xmr()
if not agora_wallet_xmr["success"]:
return False
agora_wallet_btc = self.agora.agora.wallet_balance()
if not agora_wallet_btc["success"]:
return False
total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"]
total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"]
# Get the XMR -> USD exchange rate
xmr_usd = self.agora.cg.get_price(ids="monero", vs_currencies=["USD"])
# Get the BTC -> USD exchange rate
btc_usd = self.agora.cg.get_price(ids="bitcoin", vs_currencies=["USD"])
# Convert the Agora XMR total to USD
total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"]
# Convert the Agora BTC total to USD
total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"]
# Add it all up
total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc
total_usd = total_usd_agora + total_usd_revolut
return total_usd
# TODO: possibly refactor this into smaller functions which don't return as much stuff
# check if this is all really needed in the corresponding withdraw function
def get_total(self):
"""
Get all the values corresponding to the amount of money we hold.
:return: ((total SEK, total USD, total GBP), (total XMR USD, total BTC USD), (total XMR, total BTC))
:rtype: tuple(tuple(float, float, float), tuple(float, float), tuple(float, float))
"""
total_usd_revolut = self.revolut.get_total_usd()
if total_usd_revolut is False:
self.log.error("Could not get USD total.")
return False
agora_wallet_xmr = self.agora.agora.wallet_balance_xmr()
if not agora_wallet_xmr["success"]:
self.log.error("Could not get Agora XMR wallet total.")
return False
agora_wallet_btc = self.agora.agora.wallet_balance()
if not agora_wallet_btc["success"]:
self.log.error("Could not get Agora BTC wallet total.")
return False
total_xmr_agora = agora_wallet_xmr["response"]["data"]["total"]["balance"]
total_btc_agora = agora_wallet_btc["response"]["data"]["total"]["balance"]
# Get the XMR -> USD exchange rate
xmr_usd = self.agora.cg.get_price(ids="monero", vs_currencies=["USD"])
# Get the BTC -> USD exchange rate
btc_usd = self.agora.cg.get_price(ids="bitcoin", vs_currencies=["USD"])
# Convert the Agora XMR total to USD
total_usd_agora_xmr = float(total_xmr_agora) * xmr_usd["monero"]["usd"]
# Convert the Agora BTC total to USD
total_usd_agora_btc = float(total_btc_agora) * btc_usd["bitcoin"]["usd"]
# Add it all up
total_usd_agora = total_usd_agora_xmr + total_usd_agora_btc
total_usd = total_usd_agora + total_usd_revolut
# Convert the total USD price to GBP and SEK
rates = self.agora.get_rates_all()
price_sek = rates["SEK"] * total_usd
price_usd = total_usd
price_gbp = rates["GBP"] * total_usd
cast = (
(price_sek, price_usd, price_gbp), # Total prices in our 3 favourite currencies
(total_usd_agora_xmr, total_usd_agora_btc), # Total USD balance in only Agora
(total_xmr_agora, total_btc_agora),
) # Total XMR and BTC balance in Agora
self.write_to_es("total", cast)
return cast
def write_to_es(self, msgtype, cast):
cast_remap = {
"type": msgtype,
"price_sek": cast[0][0],
"price_usd": cast[0][1],
"price_gbp": cast[0][2],
"total_usd_agora_xmr": cast[1][0],
"total_usd_agora_btc": cast[1][1],
"total_xmr_agora": cast[2][0],
"total_btc_agora": cast[2][1],
"ts": str(datetime.now().isoformat()),
}
self.es.index(index=settings.ES.Index, document=cast_remap)
def get_remaining(self):
"""
Check how much profit we need to make in order to withdraw.
:return: profit remaining in USD
:rtype: float
"""
total_usd = self.get_total_usd()
if not total_usd:
return False
withdraw_threshold = float(settings.Money.BaseUSD) + float(settings.Money.WithdrawLimit)
remaining = withdraw_threshold - total_usd
return remaining
# TODO: move to money
def get_profit(self, trades=False):
"""
Check how much total profit we have made.
:return: profit in USD
:rtype: float
"""
total_usd = self.get_total_usd()
if not total_usd:
return False
if trades:
trades_usd = self.get_open_trades_usd()
total_usd += trades_usd
profit = total_usd - float(settings.Money.BaseUSD)
return profit
def get_open_trades_usd(self):
"""
Get total value of open trades in USD.
:return: total trade value
:rtype: float
"""
dash = self.agora.wrap_dashboard()
if dash is False:
return False
rates = self.agora.get_rates_all()
cumul_usd = 0
for contact_id, contact in dash.items():
amount = contact["data"]["amount"]
currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
if currency == "USD":
cumul_usd += amount
else:
rate = rates[currency]
amount_usd = float(amount) / rate
cumul_usd += amount_usd
return cumul_usd
def get_total_remaining(self):
"""
Check how much profit we need to make in order to withdraw, taking into account open trade value.
:return: profit remaining in USD
:rtype: float
"""
total_usd = self.get_total_usd()
total_trades_usd = self.get_open_trades_usd()
if not total_usd:
return False
total_usd += total_trades_usd
withdraw_threshold = float(settings.Money.BaseUSD) + float(settings.Money.WithdrawLimit)
remaining = withdraw_threshold - total_usd
return remaining
def get_total_with_trades(self):
total_usd = self.get_total_usd()
if not total_usd:
return False
total_trades_usd = self.get_open_trades_usd()
return total_usd + total_trades_usd