pluto/core/clients/aggregators/nordigen.py

244 lines
8.0 KiB
Python
Raw Normal View History

2023-03-08 12:48:05 +00:00
from datetime import timedelta
from django.conf import settings
from django.utils import timezone
2023-03-09 16:44:16 +00:00
from core.clients.aggregator import AggregatorClient
2023-03-08 12:48:05 +00:00
from core.clients.base import BaseClient
from core.util import logs
log = logs.get_logger("nordigen")
2023-03-09 16:44:16 +00:00
class NordigenClient(BaseClient, AggregatorClient):
2023-03-08 12:48:05 +00:00
url = "https://ob.nordigen.com/api/v2"
async def connect(self):
now = timezone.now()
# Check if access token expires later than now
if self.instance.access_token_expires is not None:
if self.instance.access_token_expires > now:
self.token = self.instance.access_token
return
await self.get_access_token()
def method_filter(self, method):
new_method = method.replace("/", "_")
return new_method
async def get_access_token(self):
"""
Get the access token for the Nordigen API.
"""
log.debug(f"Getting new access token for {self.instance}")
data = {
"secret_id": self.instance.secret_id,
"secret_key": self.instance.secret_key,
}
response = await self.call("token/new", http_method="post", data=data)
access = response["access"]
access_expires = response["access_expires"]
now = timezone.now()
# Offset now by access_expires seconds
access_expires = now + timedelta(seconds=access_expires)
self.instance.access_token = access
self.instance.access_token_expires = access_expires
self.instance.save()
self.token = access
async def get_requisitions(self):
"""
Get a list of active accounts.
"""
response = await self.call("requisitions")
return response["results"]
async def get_countries(self):
"""
Get a list of countries.
"""
# This function is a stub.
2023-03-09 18:09:29 +00:00
return ["GB", "SE", "BG"]
2023-03-08 12:48:05 +00:00
async def get_banks(self, country):
"""
Get a list of supported banks for a country.
:param country: country to query
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
path = f"institutions/?country={country}"
response = await self.call(path, schema="Institutions", append_slash=False)
return response
async def build_link(self, institution_id, redirect=None):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
data = {
"institution_id": institution_id,
"redirect": settings.URL,
}
if redirect:
data["redirect"] = redirect
response = await self.call(
"requisitions", schema="RequisitionsPost", http_method="post", data=data
)
if "link" in response:
return response["link"]
return False
2023-03-08 15:44:21 +00:00
async def delete_requisition(self, requisition_id):
"""
Delete a requisision ID.
"""
path = f"requisitions/{requisition_id}"
response = await self.call(
path, schema="RequisitionDelete", http_method="delete"
)
return response
2023-03-08 17:04:47 +00:00
async def get_requisition(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
path = f"requisitions/{requisition}"
response = await self.call(path, schema="Requisition")
return response
# def get_ownernames(self):
# """
# Get list of supplementary owner names.
# """
# ownernames = loads(settings.Nordigen.OwnerNames)
# return ownernames
async def get_account(self, account_id):
"""
Get details of an account.
:param requisition: requisition ID"""
path = f"accounts/{account_id}/details"
response = await self.call(path, schema="AccountDetails")
if "account" not in response:
return False
parsed = response["account"]
2023-03-09 16:44:16 +00:00
if "iban" in parsed and parsed["currency"] == "GBP":
if parsed["iban"]:
sort_code = parsed["iban"][-14:-8]
account_number = parsed["iban"][-8:]
2023-03-08 17:04:47 +00:00
del parsed["iban"]
2023-03-09 16:44:16 +00:00
# if "iban" in parsed:
# del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
parsed["recipient"] = "TODO"
2023-03-08 17:04:47 +00:00
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
return parsed
2023-03-09 16:44:16 +00:00
async def get_all_account_info(self, requisition=None, store=False):
2023-03-08 17:04:47 +00:00
to_return = {}
if not requisition:
2023-03-09 16:44:16 +00:00
requisitions = await self.get_requisitions()
2023-03-08 17:04:47 +00:00
else:
requisitions = [await self.get_requisition(requisition)]
for req in requisitions:
accounts = req["accounts"]
for account_id in accounts:
account_info = await self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
2023-03-09 16:44:16 +00:00
if store:
2023-03-09 18:09:29 +00:00
if requisition is not None:
raise Exception("Cannot store partial data")
2023-03-09 16:44:16 +00:00
self.store_account_info(to_return)
2023-03-08 17:04:47 +00:00
return to_return
2023-03-09 18:09:29 +00:00
async def get_balance(self, account_id):
"""
Get the balance and currency of an account.
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
"""
path = f"accounts/{account_id}/balances"
response = await self.call(path, schema="AccountBalance")
total = 0
currency = None
if "balances" not in response:
return (False, False)
for entry in response["balances"]:
if currency:
if not currency == entry["balanceAmount"]["currency"]:
return (False, False)
total += float(entry["balanceAmount"]["amount"])
currency = entry["balanceAmount"]["currency"]
return (currency, total)
async def get_all_balances(self):
"""
Get all balances.
Keyed by bank.
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
account_balances = {}
for bank, accounts in self.instance.account_info.items():
if bank not in account_balances:
account_balances[bank] = []
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
account_balances[bank].append(
{
"currency": currency,
"balance": amount,
"account_id": account_id,
}
)
return account_balances
async def get_total_map(self):
"""
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
"""
if self.instance.account_info is None:
await self.get_all_account_info(store=True)
totals = {}
for bank, accounts in self.instance.account_info.items():
for account in accounts:
account_id = account["account_id"]
currency, amount = await self.get_balance(account_id)
if not amount:
continue
if not currency:
continue
if currency in totals:
totals[currency] += amount
else:
totals[currency] = amount
return totals