pluto/core/clients/aggregators/nordigen.py

175 lines
5.6 KiB
Python

from datetime import timedelta
from django.conf import settings
from django.utils import timezone
from core.clients.aggregator import AggregatorClient
from core.clients.base import BaseClient
from core.util import logs
log = logs.get_logger("nordigen")
class NordigenClient(BaseClient, AggregatorClient):
url = "https://ob.nordigen.com/api/v2"
async def connect(self):
now = timezone.now()
# Check if access token expires later than now
if self.instance.access_token_expires is not None:
if self.instance.access_token_expires > now:
self.token = self.instance.access_token
return
await self.get_access_token()
def method_filter(self, method):
new_method = method.replace("/", "_")
return new_method
async def get_access_token(self):
"""
Get the access token for the Nordigen API.
"""
log.debug(f"Getting new access token for {self.instance}")
data = {
"secret_id": self.instance.secret_id,
"secret_key": self.instance.secret_key,
}
response = await self.call("token/new", http_method="post", data=data)
access = response["access"]
access_expires = response["access_expires"]
now = timezone.now()
# Offset now by access_expires seconds
access_expires = now + timedelta(seconds=access_expires)
self.instance.access_token = access
self.instance.access_token_expires = access_expires
self.instance.save()
self.token = access
async def get_requisitions(self):
"""
Get a list of active accounts.
"""
response = await self.call("requisitions")
return response["results"]
async def get_countries(self):
"""
Get a list of countries.
"""
# This function is a stub.
return ["GB", "SE", "BG", "UA"]
async def get_banks(self, country):
"""
Get a list of supported banks for a country.
:param country: country to query
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
path = f"institutions/?country={country}"
response = await self.call(path, schema="Institutions", append_slash=False)
return response
async def build_link(self, institution_id, redirect=None):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
data = {
"institution_id": institution_id,
"redirect": settings.URL,
}
if redirect:
data["redirect"] = redirect
response = await self.call(
"requisitions", schema="RequisitionsPost", http_method="post", data=data
)
if "link" in response:
return response["link"]
return False
async def delete_requisition(self, requisition_id):
"""
Delete a requisision ID.
"""
path = f"requisitions/{requisition_id}"
response = await self.call(
path, schema="RequisitionDelete", http_method="delete"
)
return response
async def get_requisition(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
path = f"requisitions/{requisition}"
response = await self.call(path, schema="Requisition")
return response
# def get_ownernames(self):
# """
# Get list of supplementary owner names.
# """
# ownernames = loads(settings.Nordigen.OwnerNames)
# return ownernames
async def get_account(self, account_id):
"""
Get details of an account.
:param requisition: requisition ID"""
path = f"accounts/{account_id}/details"
response = await self.call(path, schema="AccountDetails")
print("RESPONSE", response)
if "account" not in response:
return False
parsed = response["account"]
if "iban" in parsed and parsed["currency"] == "GBP":
if parsed["iban"]:
sort_code = parsed["iban"][-14:-8]
account_number = parsed["iban"][-8:]
del parsed["iban"]
# if "iban" in parsed:
# del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
parsed["recipient"] = "TODO"
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
return parsed
async def get_all_account_info(self, requisition=None, store=False):
to_return = {}
if not requisition:
print("NOT REQUISITION")
requisitions = await self.get_requisitions()
print("GOT REQS", requisitions)
else:
requisitions = [await self.get_requisition(requisition)]
for req in requisitions:
accounts = req["accounts"]
for account_id in accounts:
account_info = await self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
if store:
print("TO STORE IS TRUE")
self.store_account_info(to_return)
return to_return