diff --git a/app/urls.py b/app/urls.py index f8da157..19b8c14 100644 --- a/app/urls.py +++ b/app/urls.py @@ -116,4 +116,10 @@ urlpatterns = [ banks.BanksBalances.as_view(), name="balances", ), + # Transactions + path( + "banks//transactions///", + banks.BanksTransactions.as_view(), + name="transactions", + ), ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) diff --git a/core/__init__.py b/core/__init__.py index 3463c6f..329b27e 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -2,7 +2,6 @@ import os # import stripe os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" -# from redis import StrictRedis # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) diff --git a/core/clients/aggregator.py b/core/clients/aggregator.py index 082aa31..ed50518 100644 --- a/core/clients/aggregator.py +++ b/core/clients/aggregator.py @@ -1,5 +1,7 @@ from abc import ABC +from core.lib.db import convert, r + class AggregatorClient(ABC): def store_account_info(self, account_infos): @@ -40,3 +42,28 @@ class AggregatorClient(ABC): self.instance.currencies = currencies self.instance.save() + + async def process_transactions(self, account_id, transactions): + if not transactions: + return False + transaction_ids = [x["transaction_id"] for x in transactions] + new_key_name = f"new.transactions.{self.instance.id}.{self.name}.{account_id}" + old_key_name = f"transactions.{self.instance.id}.{self.name}.{account_id}" + # for transaction_id in transaction_ids: + if not transaction_ids: + return + r.sadd(new_key_name, *transaction_ids) + + difference = list(r.sdiff(new_key_name, old_key_name)) + + difference = convert(difference) + + new_transactions = [ + x for x in transactions if x["transaction_id"] in difference + ] + + # Rename the new key to the old key so we can run the diff again + r.rename(new_key_name, old_key_name) + for transaction in new_transactions: + transaction["subclass"] = self.name + # self.tx.transaction(transaction) diff --git a/core/clients/aggregators/nordigen.py b/core/clients/aggregators/nordigen.py index f1a1a86..4013300 100644 --- a/core/clients/aggregators/nordigen.py +++ b/core/clients/aggregators/nordigen.py @@ -1,7 +1,9 @@ from datetime import timedelta +from hashlib import sha256 from django.conf import settings from django.utils import timezone +from orjson import dumps from core.clients.aggregator import AggregatorClient from core.clients.base import BaseClient @@ -125,7 +127,7 @@ class NordigenClient(BaseClient, AggregatorClient): async def get_account(self, account_id): """ Get details of an account. - :param requisition: requisition ID""" + :param account_id: account ID""" path = f"accounts/{account_id}/details" response = await self.call(path, schema="AccountDetails") @@ -145,6 +147,7 @@ class NordigenClient(BaseClient, AggregatorClient): parsed["recipient"] = "TODO" # Let's add the account ID so we can reference it later parsed["account_id"] = account_id + parsed["aggregator_id"] = str(self.instance.id) return parsed async def get_all_account_info(self, requisition=None, store=False): @@ -243,3 +246,61 @@ class NordigenClient(BaseClient, AggregatorClient): else: totals[currency] = amount return totals + + def normalise_transactions(self, transactions): + for transaction in transactions: + # Rename ID + if "transactionId" in transaction: + transaction["transaction_id"] = transaction["transactionId"] + del transaction["transactionId"] + elif "internalTransactionId" in transaction: + transaction["transaction_id"] = transaction["internalTransactionId"] + del transaction["internalTransactionId"] + else: + # No transaction ID. This is a problem for our implementation + + tx_hash = sha256( + dumps(transaction, sort_keys=True).encode("utf8") + ).hexdigest() + transaction["transaction_id"] = tx_hash + + # Rename timestamp + + if "bookingDateTime" in transaction: + transaction["ts"] = transaction["bookingDateTime"] + del transaction["bookingDateTime"] + elif "bookingDate" in transaction: + transaction["ts"] = transaction["bookingDate"] + del transaction["bookingDate"] + + transaction["amount"] = float(transaction["transactionAmount"]["amount"]) + transaction["currency"] = transaction["transactionAmount"]["currency"] + del transaction["transactionAmount"] + + if transaction["remittanceInformationUnstructuredArray"]: + ref_list = transaction["remittanceInformationUnstructuredArray"] + reference = "|".join(ref_list) + transaction["reference"] = reference + del transaction["remittanceInformationUnstructuredArray"] + elif transaction["remittanceInformationUnstructured"]: + reference = transaction["remittanceInformationUnstructured"] + transaction["reference"] = reference + del transaction["remittanceInformationUnstructured"] + else: + raise Exception(f"No way to get reference: {transaction}") + + async def get_transactions(self, account_id, process=False): + """ + Get all transactions for an account. + :param account_id: account to fetch transactions for + :return: list of transactions + :rtype: dict + """ + path = f"accounts/{account_id}/transactions" + response = await self.call(path, schema="Transactions") + + parsed = response["booked"] + self.normalise_transactions(parsed) + if process: + await self.process_transactions(parsed) + return parsed diff --git a/core/lib/db.py b/core/lib/db.py new file mode 100644 index 0000000..70e5c83 --- /dev/null +++ b/core/lib/db.py @@ -0,0 +1,150 @@ +from redis import asyncio as aioredis + +from core.util import logs + +log = logs.get_logger("scheduling") + +r = aioredis.from_url("redis://redis:6379", db=0) + + +def convert(data): + """ + Recursively convert a dictionary. + """ + if isinstance(data, bytes): + return data.decode("ascii") + if isinstance(data, dict): + return dict(map(convert, data.items())) + if isinstance(data, tuple): + return map(convert, data) + if isinstance(data, list): + return list(map(convert, data)) + return data + + +def get_refs(): + """ + Get all reference IDs for trades. + :return: list of trade IDs + :rtype: list + """ + references = [] + ref_keys = r.keys("trade.*.reference") + for key in ref_keys: + references.append(r.get(key)) + return convert(references) + + +def tx_to_ref(tx): + """ + Convert a trade ID to a reference. + :param tx: trade ID + :type tx: string + :return: reference + :rtype: string + """ + refs = get_refs() + for reference in refs: + ref_data = convert(r.hgetall(f"trade.{reference}")) + if not ref_data: + continue + if ref_data["id"] == tx: + return reference + + +def ref_to_tx(reference): + """ + Convert a reference to a trade ID. + :param reference: trade reference + :type reference: string + :return: trade ID + :rtype: string + """ + ref_data = convert(r.hgetall(f"trade.{reference}")) + if not ref_data: + return False + return ref_data["id"] + + +def get_ref_map(): + """ + Get all reference IDs for trades. + :return: dict of references keyed by TXID + :rtype: dict + """ + references = {} + ref_keys = r.keys("trade.*.reference") + for key in ref_keys: + tx = convert(key).split(".")[1] + references[tx] = r.get(key) + return convert(references) + + +def get_ref(reference): + """ + Get the trade information for a reference. + :param reference: trade reference + :type reference: string + :return: dict of trade information + :rtype: dict + """ + ref_data = r.hgetall(f"trade.{reference}") + ref_data = convert(ref_data) + if "subclass" not in ref_data: + ref_data["subclass"] = "agora" + if not ref_data: + return False + return ref_data + + +def get_tx(tx): + """ + Get the transaction information for a transaction ID. + :param reference: trade reference + :type reference: string + :return: dict of trade information + :rtype: dict + """ + tx_data = r.hgetall(f"tx.{tx}") + tx_data = convert(tx_data) + if not tx_data: + return False + return tx_data + + +def get_subclass(reference): + obj = r.hget(f"trade.{reference}", "subclass") + subclass = convert(obj) + return subclass + + +def del_ref(reference): + """ + Delete a given reference from the Redis database. + :param reference: trade reference to delete + :type reference: string + """ + tx = ref_to_tx(reference) + r.delete(f"trade.{reference}") + r.delete(f"trade.{tx}.reference") + + +def cleanup(subclass, references): + """ + Reconcile the internal reference database with a given list of references. + Delete all internal references not present in the list and clean up artifacts. + :param references: list of references to reconcile against + :type references: list + """ + messages = [] + for tx, reference in get_ref_map().items(): + if reference not in references: + if get_subclass(reference) == subclass: + logmessage = ( + f"[{reference}] ({subclass}): Archiving trade reference. TX: {tx}" + ) + messages.append(logmessage) + log.info(logmessage) + r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference") + r.rename(f"trade.{reference}", f"archive.trade.{reference}") + return messages diff --git a/core/lib/schemas/nordigen_s.py b/core/lib/schemas/nordigen_s.py index e5e7147..6aaaa69 100644 --- a/core/lib/schemas/nordigen_s.py +++ b/core/lib/schemas/nordigen_s.py @@ -146,3 +146,54 @@ AccountBalancesSchema = { "balances": "balances", "summary": "summary", } + + +class TXCurrencyAmount(BaseModel): + amount: str + currency: str + + +class TransactionsCurrencyExchange(BaseModel): + instructedAmount: TXCurrencyAmount + sourceCurrency: str + exchangeRate: str + unitCurrency: str + targetCurrency: str + + +class TXAccount(BaseModel): + iban: str + bban: str | None + + +class TransactionsNested(BaseModel): + transactionId: str | None + bookingDate: str | None + valueDate: str + bookingDateTime: str | None + valueDateTime: str | None + transactionAmount: TXCurrencyAmount + creditorName: str | None + creditorAccount: TXAccount | None + debtorName: str | None + debtorAccount: TXAccount | None + remittanceInformationUnstructuredArray: list[str] | None + remittanceInformationUnstructured: str | None + proprietaryBankTransactionCode: str | None + internalTransactionId: str | None + currencyExchange: TransactionsCurrencyExchange | None + + +class TransactionsBookedPending(BaseModel): + booked: list[TransactionsNested] + pending: list[TransactionsNested] + + +class Transactions(BaseModel): + transactions: TransactionsBookedPending + + +TransactionsSchema = { + "booked": "transactions.booked", + "pending": "transactions.pending", +} diff --git a/core/templates/partials/banks-currencies-list.html b/core/templates/partials/banks-currencies-list.html index 793fffc..6451220 100644 --- a/core/templates/partials/banks-currencies-list.html +++ b/core/templates/partials/banks-currencies-list.html @@ -21,6 +21,7 @@ details payment id + actions {% for account in accounts %} @@ -42,6 +43,17 @@ + + + + {% endfor %} diff --git a/core/templates/partials/banks-transactions-list.html b/core/templates/partials/banks-transactions-list.html new file mode 100644 index 0000000..26419bf --- /dev/null +++ b/core/templates/partials/banks-transactions-list.html @@ -0,0 +1,57 @@ +{% load cache %} +{% load cachalot cache %} +{% get_last_invalidation 'core.Aggregator' as last %} +{% include 'mixins/partials/notify.html' %} +{# cache 600 objects_banks_transactions request.user.id object_list type last #} + + + + + + + + + + + {% for item in object_list %} + + + + + + + + + + {% endfor %} + +
idtsrecipientsenderamountcurrencyreference
+ + + + + + {{ item.ts }} + {{ item.creditorName }} + {% for item in item.creditorAccount.values %} + {{ item|default_if_none:"—" }} + {% endfor %} + + {{ item.debtorName }} + {% for item in item.debtorAccount.values %} + {{ item|default_if_none:"—" }} + {% endfor %} + {{ item.amount }}{{ item.currency }}{{ item.reference }}
+{# endcache #} \ No newline at end of file diff --git a/core/views/banks.py b/core/views/banks.py index ed3ca86..17fe36d 100644 --- a/core/views/banks.py +++ b/core/views/banks.py @@ -92,3 +92,34 @@ class BanksBalances(LoginRequiredMixin, OTPRequiredMixin, ObjectList): account_balances[k].append(item) return account_balances + + +class BanksTransactions(LoginRequiredMixin, OTPRequiredMixin, ObjectList): + """ + Get bank transactions. + """ + + list_template = "partials/banks-transactions-list.html" + page_title = "Bank Transactions" + + context_object_name_singular = "transaction" + context_object_name = "transactions" + + list_url_name = "transactions" + list_url_args = ["type", "account_id", "aggregator_id"] + + def get_queryset(self, **kwargs): + aggregator_id = self.kwargs.get("aggregator_id") + account_id = self.kwargs.get("account_id") + try: + aggregator = Aggregator.get_by_id(aggregator_id, self.request.user) + except Aggregator.DoesNotExist: + context = { + "message": "Aggregator does not exist", + "class": "danger", + } + return self.render_to_response(context) + + run = synchronize_async_helper(NordigenClient(aggregator)) + transactions = synchronize_async_helper(run.get_transactions(account_id)) + return transactions diff --git a/requirements.txt b/requirements.txt index 1896360..16179b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,5 @@ pyOpenSSL Klein ConfigObject aiohttp[speedups] -aioredis[hiredis] elasticsearch[async] uvloop