from datetime import timedelta from hashlib import sha256 import orjson from django.conf import settings from django.utils import timezone from core.clients.aggregator import AggregatorClient from core.clients.base import BaseClient from core.util import logs log = logs.get_logger("nordigen") class NordigenClient(BaseClient, AggregatorClient): url = "https://ob.nordigen.com/api/v2" async def connect(self): now = timezone.now() # Check if access token expires later than now if self.instance.access_token_expires is not None: if self.instance.access_token_expires > now: self.token = self.instance.access_token return await self.get_access_token() def method_filter(self, method): new_method = method.replace("/", "_") return new_method async def get_access_token(self): """ Get the access token for the Nordigen API. """ log.debug(f"Getting new access token for {self.instance}") data = { "secret_id": self.instance.secret_id, "secret_key": self.instance.secret_key, } response = await self.call("token/new", http_method="post", data=data) access = response["access"] access_expires = response["access_expires"] now = timezone.now() # Offset now by access_expires seconds access_expires = now + timedelta(seconds=access_expires) self.instance.access_token = access self.instance.access_token_expires = access_expires self.instance.save() self.token = access async def get_requisitions(self): """ Get a list of active accounts. """ response = await self.call("requisitions") return response["results"] async def get_countries(self): """ Get a list of countries. """ # This function is a stub. return ["GB", "SE", "BG"] async def get_banks(self, country): """ Get a list of supported banks for a country. :param country: country to query :return: list of institutions :rtype: list """ if not len(country) == 2: return False path = f"institutions/?country={country}" response = await self.call(path, schema="Institutions", append_slash=False) return response async def build_link(self, institution_id, redirect=None): """Create a link to access an institution. :param institution_id: ID of the institution """ data = { "institution_id": institution_id, "redirect": settings.URL, } if redirect: data["redirect"] = redirect response = await self.call( "requisitions", schema="RequisitionsPost", http_method="post", data=data ) if "link" in response: return response["link"] return False async def delete_requisition(self, requisition_id): """ Delete a requisision ID. """ path = f"requisitions/{requisition_id}" response = await self.call( path, schema="RequisitionDelete", http_method="delete" ) return response async def get_requisition(self, requisition): """ Get a list of accounts for a requisition. :param requisition: requisition ID""" path = f"requisitions/{requisition}" response = await self.call(path, schema="Requisition") return response # def get_ownernames(self): # """ # Get list of supplementary owner names. # """ # ownernames = loads(settings.Nordigen.OwnerNames) # return ownernames async def get_account(self, req_id, account_id): """ Get details of an account. :param account_id: account ID""" path = f"accounts/{account_id}/details" response = await self.call(path, schema="AccountDetails") if "account" not in response: return False parsed = response["account"] if "iban" in parsed and parsed["currency"] == "GBP": if parsed["iban"]: sort_code = parsed["iban"][-14:-8] account_number = parsed["iban"][-8:] del parsed["iban"] # if "iban" in parsed: # del parsed["iban"] sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2)))) parsed["sort_code"] = sort_code parsed["number"] = account_number # Let's add the account ID so we can reference it later parsed["account_id"] = account_id parsed["aggregator_id"] = str(self.instance.id) parsed["requisition_id"] = str(req_id) return parsed async def get_all_account_info(self, requisition=None, store=False): to_return = {} if not requisition: requisitions = await self.get_requisitions() else: requisitions = [await self.get_requisition(requisition)] for req in requisitions: accounts = req["accounts"] for account_id in accounts: account_info = await self.get_account(req["id"], account_id) if not account_info: continue if req["institution_id"] in to_return: to_return[req["institution_id"]].append(account_info) else: to_return[req["institution_id"]] = [account_info] if store: if requisition is not None: raise Exception("Cannot store partial data") self.store_account_info(to_return) return to_return async def get_balance(self, account_id): """ Get the balance and currency of an account. :param account_id: the account ID :return: tuple of (currency, amount) :rtype: tuple """ path = f"accounts/{account_id}/balances" response = await self.call(path, schema="AccountBalances") total = 0 currency = None if "balances" not in response: return (False, False) for entry in response["balances"]: if currency: if not currency == entry["balanceAmount"]["currency"]: return (False, False) if not entry["balanceType"] == "interimAvailable": continue total += float(entry["balanceAmount"]["amount"]) currency = entry["balanceAmount"]["currency"] return (currency, total) async def get_all_balances(self): """ Get all balances. Keyed by bank. """ if self.instance.account_info is None: await self.get_all_account_info(store=True) account_balances = {} for bank, accounts in self.instance.account_info.items(): if bank not in account_balances: account_balances[bank] = [] for account in accounts: account_id = account["account_id"] currency, amount = await self.get_balance(account_id) account_balances[bank].append( { "currency": currency, "balance": amount, "account_id": account_id, } ) return account_balances async def get_total_map(self): """ Return a dictionary keyed by currencies with the amounts as values. :return: dict keyed by currency, values are amounts :rtype: dict """ if self.instance.account_info is None: await self.get_all_account_info(store=True) totals = {} for bank, accounts in self.instance.account_info.items(): for account in accounts: account_id = account["account_id"] currency, amount = await self.get_balance(account_id) if not amount: continue if not currency: continue if currency in totals: totals[currency] += amount else: totals[currency] = amount return totals def normalise_transactions(self, transactions, state=None): for transaction in transactions: # Rename ID if transaction["transactionId"]: transaction["transaction_id"] = transaction["transactionId"] del transaction["transactionId"] elif transaction["internalTransactionId"]: transaction["transaction_id"] = transaction["internalTransactionId"] del transaction["internalTransactionId"] else: # No transaction ID. This is a problem for our implementation tx_hash = sha256( orjson.dumps(transaction, option=orjson.OPT_SORT_KEYS) ).hexdigest() transaction["transaction_id"] = tx_hash # Rename timestamp if transaction["bookingDateTime"]: transaction["ts"] = transaction["bookingDateTime"] del transaction["bookingDateTime"] elif transaction["bookingDate"]: transaction["ts"] = transaction["bookingDate"] del transaction["bookingDate"] elif transaction["valueDate"]: transaction["ts"] = transaction["valueDate"] del transaction["valueDate"] transaction["amount"] = float(transaction["transactionAmount"]["amount"]) transaction["currency"] = transaction["transactionAmount"]["currency"] if state: transaction["state"] = state del transaction["transactionAmount"] if transaction["remittanceInformationUnstructuredArray"]: ref_list = transaction["remittanceInformationUnstructuredArray"] reference = "|".join(ref_list) transaction["reference"] = reference del transaction["remittanceInformationUnstructuredArray"] elif transaction["remittanceInformationUnstructured"]: reference = transaction["remittanceInformationUnstructured"] transaction["reference"] = reference del transaction["remittanceInformationUnstructured"] else: raise Exception(f"No way to get reference: {transaction}") async def get_transactions( self, account_id, req=None, process=False, pending=False ): """ Get all transactions for an account. :param account_id: account to fetch transactions for :return: list of transactions :rtype: dict """ path = f"accounts/{account_id}/transactions" response = await self.call(path, schema="Transactions") source = "booked" # If requisition is specified, try to get the object # If present, take the transaction source from there, # pending or booked. if req: requisition = self.instance.get_requisition(req) if requisition: source = requisition.transaction_source parsed = response[source] self.normalise_transactions(parsed, state=source) if process: await self.process_transactions(account_id, parsed, req=req) if pending: if process: raise Exception("Cannot process and get pending") parsed_pending = response["pending"] self.normalise_transactions(parsed_pending, state="pending") parsed_pending.extend(parsed) parsed = parsed_pending return parsed