424 lines
17 KiB
Python
424 lines
17 KiB
Python
from abc import ABC
|
|
|
|
import orjson
|
|
|
|
from core.clients.platforms.agora import AgoraClient
|
|
from core.lib import notify
|
|
from core.lib.money import money
|
|
from core.util import logs
|
|
|
|
log = logs.get_logger("aggregator")
|
|
|
|
|
|
class AggregatorClient(ABC):
|
|
def store_account_info(self, account_infos):
|
|
# account_infos = {
|
|
# bank: accounts
|
|
# for bank, accounts in account_info.items()
|
|
# for account in accounts
|
|
# #if account["account_id"] in self.banks
|
|
# }
|
|
# For each bank
|
|
for bank, accounts in account_infos.items():
|
|
# Iterate the accounts
|
|
for index, account in enumerate(list(accounts)):
|
|
if account["ownerName"] is None:
|
|
requisition = self.instance.get_requisition(
|
|
account["requisition_id"]
|
|
)
|
|
if requisition is not None:
|
|
account["ownerName"] = requisition.owner_name
|
|
if "account_number" not in account:
|
|
account_infos[bank][index]["account_number"] = {}
|
|
fields = ["sort_code", "number", "iban"]
|
|
for field in fields:
|
|
if field in account:
|
|
account_infos[bank][index]["account_number"][
|
|
field
|
|
] = account[field]
|
|
del account_infos[bank][index][field]
|
|
# if len(account["account_number"]) == 1:
|
|
# account_infos[bank].remove(account)
|
|
currencies = [
|
|
account["currency"]
|
|
for bank, accounts in account_infos.items()
|
|
for account in accounts
|
|
]
|
|
for bank, accounts in account_infos.items():
|
|
if not self.instance.account_info:
|
|
self.instance.account_info = {}
|
|
self.instance.account_info[bank] = []
|
|
for account in accounts:
|
|
self.instance.account_info[bank].append(account)
|
|
# self.account_info = account_infos
|
|
self.currencies = currencies
|
|
|
|
self.instance.currencies = currencies
|
|
self.instance.save()
|
|
|
|
async def process_transactions(self, account_id, transactions, req):
|
|
if not transactions:
|
|
return False
|
|
|
|
if not req:
|
|
return False
|
|
|
|
platforms = self.instance.platforms
|
|
for transaction in transactions:
|
|
transaction_id = transaction["transaction_id"]
|
|
tx_obj = self.instance.get_transaction(
|
|
account_id,
|
|
transaction_id,
|
|
)
|
|
if tx_obj is None:
|
|
tx_cast = {
|
|
"transaction_id": transaction_id,
|
|
"recipient": transaction["creditorName"],
|
|
"sender": transaction["debtorName"],
|
|
"amount": transaction["amount"],
|
|
"currency": transaction["currency"],
|
|
"note": transaction["reference"],
|
|
}
|
|
tx_obj = self.instance.add_transaction(
|
|
req,
|
|
account_id,
|
|
tx_cast,
|
|
)
|
|
# New transaction
|
|
await notify.sendmsg(
|
|
self.instance.user,
|
|
f"New transaction: {orjson.dumps(tx_cast)}",
|
|
title="New transaction",
|
|
)
|
|
await self.transaction(platforms, tx_obj)
|
|
else:
|
|
# Transaction exists
|
|
continue
|
|
# transaction_ids = [x["transaction_id"] for x in transactions]
|
|
# new_key_name = f"new.transactions.{self.instance.id}.{self.name}.{account_id}"
|
|
# old_key_name = f"transactions.{self.instance.id}.{self.name}.{account_id}"
|
|
# # for transaction_id in transaction_ids:
|
|
# if not transaction_ids:
|
|
# return
|
|
# await db.r.sadd(new_key_name, *transaction_ids)
|
|
|
|
# difference = list(await db.r.sdiff(new_key_name, old_key_name))
|
|
|
|
# difference = db.convert(difference)
|
|
|
|
# new_transactions = [
|
|
# x for x in transactions if x["transaction_id"] in difference
|
|
# ]
|
|
|
|
# # Rename the new key to the old key so we can run the diff again
|
|
# await db.r.rename(new_key_name, old_key_name)
|
|
# for transaction in new_transactions:
|
|
# transaction["subclass"] = self.name
|
|
# # self.tx.transaction(transaction)
|
|
|
|
def valid_transaction(self, tx_obj):
|
|
"""
|
|
Determine if a given transaction object is valid.
|
|
:param data: a transaction cast
|
|
:type data: dict
|
|
:return: whether the transaction is valid
|
|
:rtype: bool
|
|
"""
|
|
txid = tx_obj.transaction_id
|
|
if tx_obj.amount is None:
|
|
return False
|
|
if tx_obj.currency is None:
|
|
return False
|
|
amount = tx_obj.amount
|
|
if amount <= 0:
|
|
log.info(f"Ignoring transaction with negative/zero amount: {txid}")
|
|
return False
|
|
return True
|
|
|
|
# def extract_reference(self, data):
|
|
# """
|
|
# Extract a reference from the transaction cast.
|
|
# :param data: a transaction cast
|
|
# :type data: dict
|
|
# :return: the extracted reference or not_set
|
|
# :rtype: str
|
|
# """
|
|
# if "reference" in data:
|
|
# return data["reference"]
|
|
# elif "meta" in data:
|
|
# if "provider_reference" in data["meta"]:
|
|
# return data["meta"]["provider_reference"]
|
|
# return "not_set"
|
|
|
|
# def extract_sender(self, data):
|
|
# """
|
|
# Extract a sender name from the transaction cast.
|
|
# :param data: a transaction cast
|
|
# :type data: dict
|
|
# :return: the sender name or not_set
|
|
# :rtype: str
|
|
# """
|
|
# if "debtorName" in data:
|
|
# return data["debtorName"]
|
|
# elif "meta" in data:
|
|
# if "debtor_account_name" in data["meta"]:
|
|
# return data["meta"]["debtor_account_name"]
|
|
# elif " " in data["reference"]:
|
|
# refsplit = data["reference"].split(" ")
|
|
# if not len(refsplit) == 2:
|
|
# log.error(f"Sender cannot be extracted: {data}")
|
|
# return "not_set"
|
|
# realname, part2 = data["reference"].split(" ")
|
|
# return realname
|
|
|
|
# return "not_set"
|
|
|
|
async def reference_partial_check(
|
|
self, platform, reference, txid, currency, amount
|
|
):
|
|
"""
|
|
Perform a partial check by intersecting all parts of the split of the
|
|
reference against the existing references, and returning a set of the matches.
|
|
:param reference: the reference to check
|
|
:type reference: str
|
|
:return: matching trade ID string
|
|
:rtype: str
|
|
"""
|
|
# Partial reference implementation
|
|
# Account for silly people not removing the default string
|
|
# Split the reference into parts
|
|
ref_split = reference.split(" ")
|
|
# Get all existing references
|
|
existing_refs = platform.references
|
|
# Get all parts of the given reference split that match the existing references
|
|
# stored_trade_reference = set(existing_refs).intersection(set(ref_split))
|
|
stored_trade_reference = [x for x in existing_refs if x in ref_split]
|
|
if len(stored_trade_reference) > 1:
|
|
message = (
|
|
f"Multiple references valid for TXID {txid}: {reference}"
|
|
f"Currency: {currency} | Amount: {amount}"
|
|
)
|
|
title = "Error: multiple references valid"
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return False
|
|
if len(stored_trade_reference) == 0:
|
|
return None
|
|
return stored_trade_reference.pop()
|
|
|
|
# TODO: pass platform here
|
|
async def can_alt_lookup(self, platform, amount, currency, reference):
|
|
amount_usd = await money.to_usd(amount, currency)
|
|
# Amount is reliable here as it is checked by find_trade,
|
|
# so no need for stored_trade["amount"]
|
|
if amount_usd > platform.no_reference_amount_check_max_usd:
|
|
message = (
|
|
f"Amount exceeds max for {reference}"
|
|
f"Currency: {currency} | Amount: {amount}"
|
|
)
|
|
title = "Amount exceeds max for {reference}"
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return False
|
|
return True
|
|
|
|
def find_trade(self, platform, txid, currency, amount):
|
|
"""
|
|
Get a trade reference that matches the given currency and amount.
|
|
Only works if there is one result.
|
|
:param txid: Sink transaction ID
|
|
:param currency: currency
|
|
:param amount: amount
|
|
:type txid: string
|
|
:type currency: string
|
|
:type amount: int
|
|
:return: matching trade object or False
|
|
:rtype: dict or bool
|
|
"""
|
|
refs = platform.references
|
|
matching_refs = []
|
|
# TODO: use get_ref_map in this function instead of calling get_ref multiple
|
|
# times
|
|
for ref in refs:
|
|
stored_trade = platform.get_trade_by_reference(ref)
|
|
if stored_trade.currency == currency and stored_trade.amount_fiat == amount:
|
|
matching_refs.append(stored_trade)
|
|
if len(matching_refs) != 1:
|
|
log.error(
|
|
f"Find trade returned multiple results for TXID {txid}: {matching_refs}"
|
|
)
|
|
return False
|
|
return matching_refs[0]
|
|
|
|
async def amount_currency_lookup(self, platform, amount, currency, txid, ref):
|
|
title = f"Checking against amount and currency for TXID {txid}"
|
|
message = (
|
|
f"Checking against amount and currency for TXID {txid}"
|
|
f"Currency: {currency} | Amount: {amount}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
|
|
if not await self.can_alt_lookup(platform, amount, currency, ref):
|
|
return False
|
|
stored_trade = self.find_trade(platform, txid, currency, amount)
|
|
if not stored_trade:
|
|
title = f"Failed to get reference by amount and currency: {txid}"
|
|
message = (
|
|
f"Failed to get reference by amount and currency: {txid}"
|
|
f"Currency: {currency} | Amount: {amount}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return None
|
|
return stored_trade
|
|
|
|
async def normal_lookup(
|
|
self, platform, stored_trade_reference, reference, currency, amount
|
|
):
|
|
stored_trade = platform.get_trade_by_reference(stored_trade_reference)
|
|
if not stored_trade:
|
|
title = f"No reference in DB for {reference}"
|
|
message = (
|
|
f"No reference in DB for {reference}"
|
|
f"Currency: {currency} | Amount: {amount}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return False
|
|
# stored_trade["amount"] = float(stored_trade["amount"]) # convert to float
|
|
return stored_trade
|
|
|
|
async def currency_check(self, currency, stored_trade):
|
|
if not stored_trade.currency == currency:
|
|
title = "Currency mismatch"
|
|
message = (
|
|
f"Currency mismatch, Agora: {stored_trade.currency} "
|
|
f"/ Sink: {currency}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return False
|
|
return True
|
|
|
|
async def alt_amount_check(self, platform, amount, currency, stored_trade):
|
|
# If the amount does not match exactly, get the min and max values for our
|
|
# given acceptable margins for trades
|
|
min_amount, max_amount = await money.get_acceptable_margins(
|
|
platform, currency, stored_trade.amount_fiat
|
|
)
|
|
log.info(
|
|
(
|
|
f"Amount does not match exactly, trying with margins: min: {min_amount}"
|
|
f" / max: {max_amount}"
|
|
)
|
|
)
|
|
title = "Amount does not match exactly"
|
|
message = (
|
|
f"Amount does not match exactly, trying with margins: min: "
|
|
f"{min_amount} / max: {max_amount}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
if not min_amount < amount < max_amount:
|
|
title = "Amount mismatch - not in margins"
|
|
message = (
|
|
f"Amount mismatch - not in margins: {stored_trade.amount_fiat} "
|
|
f"(min: {min_amount} / max: {max_amount}"
|
|
)
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|
|
return False
|
|
return True
|
|
|
|
async def transaction(self, platforms, tx_obj):
|
|
"""
|
|
Store details of transaction and post notifications to IRC.
|
|
Matches it up with data stored in Redis to attempt to reconcile with an Agora
|
|
trade.
|
|
:param data: details of transaction
|
|
:type data: dict
|
|
"""
|
|
valid = self.valid_transaction(tx_obj)
|
|
if not valid:
|
|
return False
|
|
txid = tx_obj.transaction_id
|
|
amount = tx_obj.amount
|
|
currency = tx_obj.currency
|
|
|
|
reference = tx_obj.note
|
|
|
|
# reference = self.extract_reference(data)
|
|
# sender = tx_obj.sender
|
|
|
|
log.info(f"Transaction processed: {tx_obj}")
|
|
await notify.sendmsg(
|
|
self.instance.user,
|
|
(f"Transaction: {txid} {amount}{currency}: {reference}"),
|
|
title="Incoming transaction",
|
|
)
|
|
for platform in platforms:
|
|
stored_trade_reference = await self.reference_partial_check(
|
|
platform, reference, txid, currency, amount
|
|
)
|
|
if stored_trade_reference is False: # can be None though
|
|
continue
|
|
|
|
stored_trade = False
|
|
looked_up_without_reference = False
|
|
|
|
# Normal implementation for when we have a reference
|
|
if stored_trade_reference:
|
|
stored_trade = await self.normal_lookup(
|
|
platform, stored_trade_reference, reference, currency, amount
|
|
)
|
|
# if not stored_trade:
|
|
# return
|
|
|
|
# Amount/currency lookup implementation for when we have no reference
|
|
else:
|
|
if not stored_trade: # check we don't overwrite the lookup above
|
|
stored_trade = await self.amount_currency_lookup(
|
|
platform, amount, currency, txid, reference
|
|
)
|
|
if stored_trade is False:
|
|
continue
|
|
if stored_trade:
|
|
# Note that we have looked it up without reference so we don't
|
|
# use +- below
|
|
# This might be redundant given the checks in find_trade,
|
|
# but better safe than sorry!
|
|
looked_up_without_reference = True
|
|
else:
|
|
continue
|
|
else:
|
|
# Stored trade reference is none, the checks below will do nothing
|
|
continue
|
|
|
|
# Make sure it was sent in the expected currency
|
|
if not await self.currency_check(currency, stored_trade):
|
|
continue
|
|
|
|
# Make sure the expected amount was sent
|
|
if not stored_trade.amount_fiat == amount:
|
|
if looked_up_without_reference:
|
|
continue
|
|
if not await self.alt_amount_check(
|
|
platform, amount, currency, stored_trade
|
|
):
|
|
continue
|
|
# platform_buyer = stored_trade["buyer"]
|
|
|
|
# Check sender - we don't do anything with this yet
|
|
# sender_valid = antifraud.check_valid_sender(
|
|
# reference, platform, sender, platform_buyer
|
|
# )
|
|
# log.info(f"Trade {reference} buyer {platform_buyer}
|
|
# valid: {sender_valid}")
|
|
instance = await AgoraClient(platform)
|
|
rtrn = await instance.release_map_trade(stored_trade, tx_obj)
|
|
# if trade_released:
|
|
# self.ux.notify.notify_complete_trade(amount, currency)
|
|
# else:
|
|
# log.error(f"Cannot release trade {reference}.")
|
|
# return
|
|
|
|
# rtrn = await platform.release_funds(stored_trade["id"],
|
|
# stored_trade["reference"])
|
|
if rtrn:
|
|
title = "Trade complete"
|
|
message = f"Trade complete: {amount}{currency}"
|
|
await notify.sendmsg(self.instance.user, message, title=title)
|