pluto/core/clients/aggregators/nordigen.py

315 lines
11 KiB
Python
Raw Normal View History

2023-03-08 12:48:05 +00:00
from datetime import timedelta
from hashlib import sha256
2023-03-08 12:48:05 +00:00
from django.conf import settings
from django.utils import timezone
from orjson import dumps
2023-03-08 12:48:05 +00:00
2023-03-09 16:44:16 +00:00
from core.clients.aggregator import AggregatorClient
2023-03-08 12:48:05 +00:00
from core.clients.base import BaseClient
from core.util import logs
log = logs.get_logger("nordigen")
2023-03-09 16:44:16 +00:00
class NordigenClient(BaseClient, AggregatorClient):
2023-03-08 12:48:05 +00:00
url = "https://ob.nordigen.com/api/v2"
async def connect(self):
now = timezone.now()
# Check if access token expires later than now
if self.instance.access_token_expires is not None:
if self.instance.access_token_expires > now:
self.token = self.instance.access_token
return
await self.get_access_token()
def method_filter(self, method):
new_method = method.replace("/", "_")
return new_method
async def get_access_token(self):
"""
Get the access token for the Nordigen API.
"""
log.debug(f"Getting new access token for {self.instance}")
data = {
"secret_id": self.instance.secret_id,
"secret_key": self.instance.secret_key,
}
response = await self.call("token/new", http_method="post", data=data)
access = response["access"]
access_expires = response["access_expires"]
now = timezone.now()
# Offset now by access_expires seconds
access_expires = now + timedelta(seconds=access_expires)
self.instance.access_token = access
self.instance.access_token_expires = access_expires
self.instance.save()
self.token = access
async def get_requisitions(self):
"""
Get a list of active accounts.
"""
response = await self.call("requisitions")
return response["results"]
async def get_countries(self):
"""
Get a list of countries.
"""
# This function is a stub.
2023-03-09 18:09:29 +00:00
return ["GB", "SE", "BG"]
2023-03-08 12:48:05 +00:00
async def get_banks(self, country):
"""
Get a list of supported banks for a country.
:param country: country to query
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
path = f"institutions/?country={country}"
response = await self.call(path, schema="Institutions", append_slash=False)
return response
async def build_link(self, institution_id, redirect=None):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
data = {
"institution_id": institution_id,
"redirect": settings.URL,
}
if redirect:
data["redirect"] = redirect
response = await self.call(
"requisitions", schema="RequisitionsPost", http_method="post", data=data
)
if "link" in response:
return response["link"]
return False
2023-03-08 15:44:21 +00:00
async def delete_requisition(self, requisition_id):
"""
Delete a requisision ID.
"""
path = f"requisitions/{requisition_id}"
response = await self.call(
path, schema="RequisitionDelete", http_method="delete"
)
return response
2023-03-08 17:04:47 +00:00
async def get_requisition(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
path = f"requisitions/{requisition}"
response = await self.call(path, schema="Requisition")
return response
# def get_ownernames(self):
# """
# Get list of supplementary owner names.
# """
# ownernames = loads(settings.Nordigen.OwnerNames)
# return ownernames
async def get_account(self, account_id):
"""
Get details of an account.
:param account_id: account ID"""
2023-03-08 17:04:47 +00:00
path = f"accounts/{account_id}/details"
response = await self.call(path, schema="AccountDetails")
if "account" not in response:
return False
parsed = response["account"]
2023-03-09 16:44:16 +00:00
if "iban" in parsed and parsed["currency"] == "GBP":
if parsed["iban"]:
sort_code = parsed["iban"][-14:-8]
account_number = parsed["iban"][-8:]
2023-03-08 17:04:47 +00:00
del parsed["iban"]
2023-03-09 16:44:16 +00:00
# if "iban" in parsed:
# del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
parsed["recipient"] = "TODO"
2023-03-08 17:04:47 +00:00
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
parsed["aggregator_id"] = str(self.instance.id)
2023-03-08 17:04:47 +00:00
return parsed
2023-03-09 16:44:16 +00:00
async def get_all_account_info(self, requisition=None, store=False):
2023-03-08 17:04:47 +00:00
to_return = {}
if not requisition:
2023-03-09 16:44:16 +00:00
requisitions = await self.get_requisitions()
2023-03-08 17:04:47 +00:00
else:
requisitions = [await self.get_requisition(requisition)]
for req in requisitions:
accounts = req["accounts"]
for account_id in accounts:
account_info = await self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
2023-03-09 16:44:16 +00:00
if store:
2023-03-09 18:09:29 +00:00
if requisition is not None:
raise Exception("Cannot store partial data")
2023-03-09 16:44:16 +00:00
self.store_account_info(to_return)
2023-03-08 17:04:47 +00:00
return to_return
2023-03-09 18:09:29 +00:00
async def get_balance(self, account_id):
"""
Get the balance and currency of an account.
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
"""
path = f"accounts/{account_id}/balances"
2023-03-09 18:28:50 +00:00
response = await self.call(path, schema="AccountBalances")
2023-03-09 18:09:29 +00:00
total = 0
currency = None
if "balances" not in response:
return (False, False)
for entry in response["balances"]:
if currency:
if not currency == entry["balanceAmount"]["currency"]:
return (False, False)
2023-03-09 19:02:02 +00:00
if not entry["balanceType"] == "interimAvailable":
continue
2023-03-09 18:09:29 +00:00
total += float(entry["balanceAmount"]["amount"])
currency = entry["balanceAmount"]["currency"]
return (currency, total)
async def get_all_balances(self):
"""
Get all balances.
Keyed by bank.
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
account_balances = {}
for bank, accounts in self.instance.account_info.items():
if bank not in account_balances:
account_balances[bank] = []
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
account_balances[bank].append(
{
"currency": currency,
"balance": amount,
"account_id": account_id,
}
)
return account_balances
async def get_total_map(self):
"""
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
totals = {}
for bank, accounts in self.instance.account_info.items():
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
if not amount:
continue
if not currency:
continue
if currency in totals:
totals[currency] += amount
else:
totals[currency] = amount
return totals
2023-03-11 11:45:09 +00:00
def normalise_transactions(self, transactions, state=None):
for transaction in transactions:
# Rename ID
if "transactionId" in transaction:
transaction["transaction_id"] = transaction["transactionId"]
del transaction["transactionId"]
elif "internalTransactionId" in transaction:
transaction["transaction_id"] = transaction["internalTransactionId"]
del transaction["internalTransactionId"]
else:
# No transaction ID. This is a problem for our implementation
tx_hash = sha256(
dumps(transaction, sort_keys=True).encode("utf8")
).hexdigest()
transaction["transaction_id"] = tx_hash
# Rename timestamp
if "bookingDateTime" in transaction:
transaction["ts"] = transaction["bookingDateTime"]
del transaction["bookingDateTime"]
elif "bookingDate" in transaction:
transaction["ts"] = transaction["bookingDate"]
del transaction["bookingDate"]
transaction["amount"] = float(transaction["transactionAmount"]["amount"])
transaction["currency"] = transaction["transactionAmount"]["currency"]
2023-03-11 11:45:09 +00:00
if state:
transaction["state"] = state
del transaction["transactionAmount"]
if transaction["remittanceInformationUnstructuredArray"]:
ref_list = transaction["remittanceInformationUnstructuredArray"]
reference = "|".join(ref_list)
transaction["reference"] = reference
del transaction["remittanceInformationUnstructuredArray"]
elif transaction["remittanceInformationUnstructured"]:
reference = transaction["remittanceInformationUnstructured"]
transaction["reference"] = reference
del transaction["remittanceInformationUnstructured"]
else:
raise Exception(f"No way to get reference: {transaction}")
2023-03-11 11:45:09 +00:00
async def get_transactions(self, account_id, process=False, pending=False):
"""
Get all transactions for an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
path = f"accounts/{account_id}/transactions"
response = await self.call(path, schema="Transactions")
2023-03-11 11:45:09 +00:00
print("RESP", response["pending"])
parsed = response["booked"]
2023-03-11 11:45:09 +00:00
self.normalise_transactions(parsed, state="booked")
if process:
await self.process_transactions(parsed)
2023-03-11 11:45:09 +00:00
if pending:
parsed_pending = response["pending"]
self.normalise_transactions(parsed_pending, state="pending")
parsed_pending.extend(parsed)
parsed = parsed_pending
return parsed