You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
12 KiB

from datetime import timedelta
from hashlib import sha256
import orjson
from django.conf import settings
from django.utils import timezone
from core.clients.aggregator import AggregatorClient
from core.clients.base import BaseClient
from core.util import logs
log = logs.get_logger("nordigen")
class NordigenClient(BaseClient, AggregatorClient):
url = ""
async def connect(self):
now =
# Check if access token expires later than now
if self.instance.access_token_expires is not None:
if self.instance.access_token_expires > now:
self.token = self.instance.access_token
await self.get_access_token()
def method_filter(self, method):
new_method = method.replace("/", "_")
return new_method
async def get_access_token(self):
Get the access token for the Nordigen API.
log.debug(f"Getting new access token for {self.instance}")
data = {
"secret_id": self.instance.secret_id,
"secret_key": self.instance.secret_key,
response = await"token/new", http_method="post", data=data)
access = response["access"]
access_expires = response["access_expires"]
now =
# Offset now by access_expires seconds
access_expires = now + timedelta(seconds=access_expires)
self.instance.access_token = access
self.instance.access_token_expires = access_expires
self.token = access
async def get_requisitions(self):
Get a list of active accounts.
response = await"requisitions")
return response["results"]
async def get_countries(self):
Get a list of countries.
# This function is a stub.
return ["GB", "SE", "BG"]
async def get_banks(self, country):
Get a list of supported banks for a country.
:param country: country to query
:return: list of institutions
:rtype: list
if not len(country) == 2:
return False
path = f"institutions/?country={country}"
response = await, schema="Institutions", append_slash=False)
return response
async def build_link(self, institution_id, redirect=None):
"""Create a link to access an institution.
:param institution_id: ID of the institution
data = {
"institution_id": institution_id,
"redirect": settings.URL,
if redirect:
data["redirect"] = redirect
response = await
"requisitions", schema="RequisitionsPost", http_method="post", data=data
if "link" in response:
return response["link"]
return False
async def delete_requisition(self, requisition_id):
Delete a requisision ID.
path = f"requisitions/{requisition_id}"
response = await
path, schema="RequisitionDelete", http_method="delete"
return response
async def get_requisition(self, requisition):
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
path = f"requisitions/{requisition}"
response = await, schema="Requisition")
return response
# def get_ownernames(self):
# """
# Get list of supplementary owner names.
# """
# ownernames = loads(settings.Nordigen.OwnerNames)
# return ownernames
async def get_account(self, req_id, account_id):
Get details of an account.
:param account_id: account ID"""
path = f"accounts/{account_id}/details"
response = await, schema="AccountDetails")
if "account" not in response:
return False
parsed = response["account"]
if "iban" in parsed and parsed["currency"] == "GBP":
if parsed["iban"]:
sort_code = parsed["iban"][-14:-8]
account_number = parsed["iban"][-8:]
del parsed["iban"]
# if "iban" in parsed:
# del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
parsed["aggregator_id"] = str(
parsed["requisition_id"] = str(req_id)
return parsed
async def get_all_account_info(self, requisition=None, store=False):
to_return = {}
if not requisition:
requisitions = await self.get_requisitions()
requisitions = [await self.get_requisition(requisition)]
for req in requisitions:
accounts = req["accounts"]
for account_id in accounts:
account_info = await self.get_account(req["id"], account_id)
if not account_info:
if req["institution_id"] in to_return:
to_return[req["institution_id"]] = [account_info]
if store:
if requisition is not None:
raise Exception("Cannot store partial data")
return to_return
async def get_balance(self, account_id):
Get the balance and currency of an account.
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
path = f"accounts/{account_id}/balances"
response = await, schema="AccountBalances")
total = 0
currency = None
if "balances" not in response:
return (False, False)
for entry in response["balances"]:
if currency:
if not currency == entry["balanceAmount"]["currency"]:
return (False, False)
if not entry["balanceType"] == "interimAvailable":
total += float(entry["balanceAmount"]["amount"])
currency = entry["balanceAmount"]["currency"]
return (currency, total)
async def get_all_balances(self):
Get all balances.
Keyed by bank.
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
account_balances = {}
for bank, accounts in self.instance.account_info.items():
if bank not in account_balances:
account_balances[bank] = []
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
"currency": currency,
"balance": amount,
"account_id": account_id,
return account_balances
async def get_total_map(self):
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
totals = {}
for bank, accounts in self.instance.account_info.items():
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
if not amount:
if not currency:
if currency in totals:
totals[currency] += amount
totals[currency] = amount
return totals
def normalise_transactions(self, transactions, state=None):
for transaction in transactions:
# Rename ID
if transaction["transactionId"]:
transaction["transaction_id"] = transaction["transactionId"]
del transaction["transactionId"]
elif transaction["internalTransactionId"]:
transaction["transaction_id"] = transaction["internalTransactionId"]
del transaction["internalTransactionId"]
# No transaction ID. This is a problem for our implementation
tx_hash = sha256(
orjson.dumps(transaction, option=orjson.OPT_SORT_KEYS)
transaction["transaction_id"] = tx_hash
# Rename timestamp
if transaction["bookingDateTime"]:
transaction["ts"] = transaction["bookingDateTime"]
del transaction["bookingDateTime"]
elif transaction["bookingDate"]:
transaction["ts"] = transaction["bookingDate"]
del transaction["bookingDate"]
elif transaction["valueDate"]:
transaction["ts"] = transaction["valueDate"]
del transaction["valueDate"]
transaction["amount"] = float(transaction["transactionAmount"]["amount"])
transaction["currency"] = transaction["transactionAmount"]["currency"]
if state:
transaction["state"] = state
del transaction["transactionAmount"]
if transaction["remittanceInformationUnstructuredArray"]:
ref_list = transaction["remittanceInformationUnstructuredArray"]
reference = "|".join(ref_list)
transaction["reference"] = reference
del transaction["remittanceInformationUnstructuredArray"]
elif transaction["remittanceInformationUnstructured"]:
reference = transaction["remittanceInformationUnstructured"]
transaction["reference"] = reference
del transaction["remittanceInformationUnstructured"]
raise Exception(f"No way to get reference: {transaction}")
async def get_transactions(
self, account_id, req=None, process=False, pending=False
Get all transactions for an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
path = f"accounts/{account_id}/transactions"
response = await, schema="Transactions")
source = "booked"
# If requisition is specified, try to get the object
# If present, take the transaction source from there,
# pending or booked.
if req:
requisition = self.instance.get_requisition(req)
if requisition:
source = requisition.transaction_source
parsed = response[source]
self.normalise_transactions(parsed, state=source)
if process:
await self.process_transactions(account_id, parsed, req=req)
if pending:
if process:
raise Exception("Cannot process and get pending")
parsed_pending = response["pending"]
self.normalise_transactions(parsed_pending, state="pending")
parsed = parsed_pending
return parsed