# Twisted/Klein imports from twisted.internet.task import LoopingCall # Other library imports import requests from simplejson.errors import JSONDecodeError from json import dumps, loads from lib.serde.nordigen import ( TXRoot, AccessToken, Institutions, Agreement, Requisitions, AccountDetails, AccountBalancesRoot, RequisitionResponse, ) from serde import ValidationError # Project imports from settings import settings import util class Nordigen(util.Base): """ Class to manage calls to Open Banking APIs through Nordigen. """ def __init__(self, sinks): super().__init__() self.sinks = sinks self.token = None self.banks = {} self.authed = False # Get the banks from the config and cache them self.get_mapped_accounts() self.lc = LoopingCall(self.get_access_token) self.lc.start(int(settings.Nordigen.TokenRefreshSec)) def __authed__(self): """ Called when we have received the access token. """ self.log.info("Connection authenticated.") account_infos = self.get_all_account_info() # Filter for added accounts since we only do that for TrueLayer account_infos = { bank: accounts for bank, accounts in account_infos.items() for account in accounts if account["account_id"] in self.banks } self.sinks.got_account_info("nordigen", account_infos) self.lc_tx = LoopingCall(self.transaction_loop) self.lc_tx.start(int(settings.Nordigen.RefreshSec)) def transaction_loop(self): for account_id in self.banks: transactions = self.get_transactions(account_id) self.sinks.got_transactions("nordigen", account_id, transactions) def get_access_token(self): """ Get an access token. :return: True or False :rtype: bool """ headers = {"accept": "application/json", "Content-Type": "application/json"} data = { "secret_id": settings.Nordigen.ID, "secret_key": settings.Nordigen.Key, } path = f"{settings.Nordigen.Base}/token/new/" r = requests.post(path, headers=headers, data=dumps(data)) try: obj = AccessToken.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict() self.token = parsed["access"] self.log.info("Refreshed access token") if not self.authed: self.__authed__() self.authed = True def get_institutions(self, country, filter_name=None): """ Get a list of supported institutions. :param country: country to query :param filter_name: return only results with this in the name :return: list of institutions :rtype: list """ if not len(country) == 2: return False headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/institutions/?country={country}" r = requests.get(path, headers=headers) try: parsed_pre = r.json() except JSONDecodeError: self.log.error(f"Error parsing institutions response: {r.content}") return False parsed = {"institutions": parsed_pre} try: obj = Institutions.from_dict(parsed) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict()["institutions"] new_list = [] if filter_name: for i in parsed: if filter_name in i["name"]: new_list.append(i) return new_list return parsed def build_link(self, institution_id): """Create a link to access an institution. :param institution_id: ID of the institution """ headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/requisitions/" data = {"institution_id": institution_id, "redirect": settings.Nordigen.CallbackURL} r = requests.post(path, headers=headers, data=data) try: obj = Agreement.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict() if "link" in parsed: return parsed["link"] return False def create_auth_url(self, country, bank_name): """Helper to look up a bank and create a link. :param country: country :param bank_name: bank name string to search""" institutions = self.get_institutions(country, filter_name=bank_name) # We were not precise enough to have one result if not len(institutions) == 1: return False institution = institutions[0] link = self.build_link(institution["id"]) if not link: return False return link def get_requisitions(self): """ Get a list of active accounts. """ headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/requisitions" r = requests.get(path, headers=headers) try: obj = Requisitions.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict() if "results" in parsed: return parsed["results"] else: self.log.error(f"Results not in requisitions response: {parsed}") return False def delete_requisition(self, requisition_id): """ Delete a requisision ID. """ headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/requisitions/{requisition_id}/" r = requests.delete(path, headers=headers) try: obj = RequisitionResponse.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict() return parsed def get_accounts(self, requisition): """ Get a list of accounts for a requisition. :param requisition: requisition ID""" headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/requisitions/{requisition}/" r = requests.get(path, headers=headers) try: obj = Agreement.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict() if "accounts" in parsed: return parsed["accounts"] return False def get_account(self, account_id): """ Get details of an account. :param requisition: requisition ID""" headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/" r = requests.get(path, headers=headers) try: obj = AccountDetails.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict()["account"] if "bban" in parsed and parsed["currency"] == "GBP": sort_code = parsed["bban"][0:6] account_number = parsed["bban"][6:] del parsed["bban"] del parsed["iban"] sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2)))) parsed["sort_code"] = sort_code parsed["number"] = account_number # Let's add the account ID so we can reference it later parsed["account_id"] = account_id return parsed def get_mapped_accounts(self): existing_entry = loads(settings.Nordigen.Maps) self.banks = existing_entry def map_account(self, account_id): """ Map an account_id at a bank to an account_name. This enables the account for fetching. Data type: {"monzo": [account, ids, here], "revolut": [account, ids, here]} """ account_data = self.get_account(account_id) currency = account_data["currency"] existing_entry = loads(settings.Nordigen.Maps) if account_id in existing_entry: return else: existing_entry.append(account_id) settings.Nordigen.Maps = dumps(existing_entry) self.banks = existing_entry settings.write() return currency def unmap_account(self, account_id): """ Unmap an account_id at a bank to an account_name. This disables the account for fetching. Data type: {"monzo": [account, ids, here], "revolut": [account, ids, here]} """ existing_entry = loads(settings.Nordigen.Maps) if account_id not in existing_entry: return else: existing_entry.remove(account_id) settings.Nordigen.Maps = dumps(existing_entry) self.banks = existing_entry settings.write() def get_all_account_info(self): to_return = {} requisitions = self.get_requisitions() if not requisitions: self.log.error("Could not get requisitions.") return {} for req in requisitions: if not req["accounts"]: continue accounts = self.get_accounts(req["id"]) for account_id in accounts: account_info = self.get_account(account_id) if not account_info: continue if req["institution_id"] in to_return: to_return[req["institution_id"]].append(account_info) else: to_return[req["institution_id"]] = [account_info] return to_return def normalise_transactions(self, transactions): for transaction in transactions: # Rename ID transaction["transaction_id"] = transaction["transactionId"] del transaction["transactionId"] # Rename timestamp transaction["timestamp"] = transaction["bookingDate"] del transaction["bookingDate"] transaction["amount"] = float(transaction["transactionAmount"]["amount"]) transaction["currency"] = transaction["transactionAmount"]["currency"] del transaction["transactionAmount"] transaction["reference"] = transaction["remittanceInformationUnstructured"] del transaction["remittanceInformationUnstructured"] def get_transactions(self, account_id): """ Get all transactions for an account. :param account_id: account to fetch transactions for :return: list of transactions :rtype: dict """ headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/accounts/{account_id}/transactions/" r = requests.get(path, headers=headers) try: obj = TXRoot.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict()["transactions"]["booked"] self.normalise_transactions(parsed) return parsed def get_balance(self, account_id): """ Get the balance and currency of an account. :param account_id: the account ID :return: tuple of (currency, amount) :rtype: tuple """ headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"} path = f"{settings.Nordigen.Base}/accounts/{account_id}/balances/" r = requests.get(path, headers=headers) try: obj = AccountBalancesRoot.from_json(r.content) except ValidationError as err: self.log.error(f"Validation error: {err}") return parsed = obj.to_dict()["balances"] total = 0 currency = None for entry in parsed: if currency: if not currency == entry["balanceAmount"]["currency"]: self.log.error("Different currencies in balance query.") return total += float(entry["balanceAmount"]["amount"]) currency = entry["balanceAmount"]["currency"] return (currency, total) def get_total_map(self): """ Return a dictionary keyed by currencies with the amounts as values. :return: dict keyed by currency, values are amounts :rtype: dict """ totals = {} for account_id in self.banks: currency, amount = self.get_balance(account_id) if not amount: continue if currency in totals: totals[currency] += amount else: totals[currency] = amount return totals