pluto/core/clients/aggregators/nordigen.py

314 lines
11 KiB
Python

from datetime import timedelta
from hashlib import sha256
from django.conf import settings
from django.utils import timezone
from orjson import dumps
from core.clients.aggregator import AggregatorClient
from core.clients.base import BaseClient
from core.util import logs
log = logs.get_logger("nordigen")
class NordigenClient(BaseClient, AggregatorClient):
url = "https://ob.nordigen.com/api/v2"
async def connect(self):
now = timezone.now()
# Check if access token expires later than now
if self.instance.access_token_expires is not None:
if self.instance.access_token_expires > now:
self.token = self.instance.access_token
return
await self.get_access_token()
def method_filter(self, method):
new_method = method.replace("/", "_")
return new_method
async def get_access_token(self):
"""
Get the access token for the Nordigen API.
"""
log.debug(f"Getting new access token for {self.instance}")
data = {
"secret_id": self.instance.secret_id,
"secret_key": self.instance.secret_key,
}
response = await self.call("token/new", http_method="post", data=data)
access = response["access"]
access_expires = response["access_expires"]
now = timezone.now()
# Offset now by access_expires seconds
access_expires = now + timedelta(seconds=access_expires)
self.instance.access_token = access
self.instance.access_token_expires = access_expires
self.instance.save()
self.token = access
async def get_requisitions(self):
"""
Get a list of active accounts.
"""
response = await self.call("requisitions")
return response["results"]
async def get_countries(self):
"""
Get a list of countries.
"""
# This function is a stub.
return ["GB", "SE", "BG"]
async def get_banks(self, country):
"""
Get a list of supported banks for a country.
:param country: country to query
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
path = f"institutions/?country={country}"
response = await self.call(path, schema="Institutions", append_slash=False)
return response
async def build_link(self, institution_id, redirect=None):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
data = {
"institution_id": institution_id,
"redirect": settings.URL,
}
if redirect:
data["redirect"] = redirect
response = await self.call(
"requisitions", schema="RequisitionsPost", http_method="post", data=data
)
if "link" in response:
return response["link"]
return False
async def delete_requisition(self, requisition_id):
"""
Delete a requisision ID.
"""
path = f"requisitions/{requisition_id}"
response = await self.call(
path, schema="RequisitionDelete", http_method="delete"
)
return response
async def get_requisition(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
path = f"requisitions/{requisition}"
response = await self.call(path, schema="Requisition")
return response
# def get_ownernames(self):
# """
# Get list of supplementary owner names.
# """
# ownernames = loads(settings.Nordigen.OwnerNames)
# return ownernames
async def get_account(self, account_id):
"""
Get details of an account.
:param account_id: account ID"""
path = f"accounts/{account_id}/details"
response = await self.call(path, schema="AccountDetails")
if "account" not in response:
return False
parsed = response["account"]
if "iban" in parsed and parsed["currency"] == "GBP":
if parsed["iban"]:
sort_code = parsed["iban"][-14:-8]
account_number = parsed["iban"][-8:]
del parsed["iban"]
# if "iban" in parsed:
# del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
parsed["recipient"] = "TODO"
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
parsed["aggregator_id"] = str(self.instance.id)
return parsed
async def get_all_account_info(self, requisition=None, store=False):
to_return = {}
if not requisition:
requisitions = await self.get_requisitions()
else:
requisitions = [await self.get_requisition(requisition)]
for req in requisitions:
accounts = req["accounts"]
for account_id in accounts:
account_info = await self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
if store:
if requisition is not None:
raise Exception("Cannot store partial data")
self.store_account_info(to_return)
return to_return
async def get_balance(self, account_id):
"""
Get the balance and currency of an account.
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
"""
path = f"accounts/{account_id}/balances"
response = await self.call(path, schema="AccountBalances")
total = 0
currency = None
if "balances" not in response:
return (False, False)
for entry in response["balances"]:
if currency:
if not currency == entry["balanceAmount"]["currency"]:
return (False, False)
if not entry["balanceType"] == "interimAvailable":
continue
total += float(entry["balanceAmount"]["amount"])
currency = entry["balanceAmount"]["currency"]
return (currency, total)
async def get_all_balances(self):
"""
Get all balances.
Keyed by bank.
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
account_balances = {}
for bank, accounts in self.instance.account_info.items():
if bank not in account_balances:
account_balances[bank] = []
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
account_balances[bank].append(
{
"currency": currency,
"balance": amount,
"account_id": account_id,
}
)
return account_balances
async def get_total_map(self):
"""
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
totals = {}
for bank, accounts in self.instance.account_info.items():
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
if not amount:
continue
if not currency:
continue
if currency in totals:
totals[currency] += amount
else:
totals[currency] = amount
return totals
def normalise_transactions(self, transactions, state=None):
for transaction in transactions:
# Rename ID
if "transactionId" in transaction:
transaction["transaction_id"] = transaction["transactionId"]
del transaction["transactionId"]
elif "internalTransactionId" in transaction:
transaction["transaction_id"] = transaction["internalTransactionId"]
del transaction["internalTransactionId"]
else:
# No transaction ID. This is a problem for our implementation
tx_hash = sha256(
dumps(transaction, sort_keys=True).encode("utf8")
).hexdigest()
transaction["transaction_id"] = tx_hash
# Rename timestamp
if "bookingDateTime" in transaction:
transaction["ts"] = transaction["bookingDateTime"]
del transaction["bookingDateTime"]
elif "bookingDate" in transaction:
transaction["ts"] = transaction["bookingDate"]
del transaction["bookingDate"]
transaction["amount"] = float(transaction["transactionAmount"]["amount"])
transaction["currency"] = transaction["transactionAmount"]["currency"]
if state:
transaction["state"] = state
del transaction["transactionAmount"]
if transaction["remittanceInformationUnstructuredArray"]:
ref_list = transaction["remittanceInformationUnstructuredArray"]
reference = "|".join(ref_list)
transaction["reference"] = reference
del transaction["remittanceInformationUnstructuredArray"]
elif transaction["remittanceInformationUnstructured"]:
reference = transaction["remittanceInformationUnstructured"]
transaction["reference"] = reference
del transaction["remittanceInformationUnstructured"]
else:
raise Exception(f"No way to get reference: {transaction}")
async def get_transactions(self, account_id, process=False, pending=False):
"""
Get all transactions for an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
path = f"accounts/{account_id}/transactions"
response = await self.call(path, schema="Transactions")
parsed = response["booked"]
self.normalise_transactions(parsed, state="booked")
if process:
await self.process_transactions(parsed)
if pending:
parsed_pending = response["pending"]
self.normalise_transactions(parsed_pending, state="pending")
parsed_pending.extend(parsed)
parsed = parsed_pending
return parsed