182 lines
5.8 KiB
Python
182 lines
5.8 KiB
Python
from datetime import timedelta
|
|
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
|
|
from core.clients.base import BaseClient
|
|
from core.util import logs
|
|
|
|
log = logs.get_logger("nordigen")
|
|
|
|
|
|
class NordigenClient(BaseClient):
|
|
url = "https://ob.nordigen.com/api/v2"
|
|
|
|
async def connect(self):
|
|
now = timezone.now()
|
|
# Check if access token expires later than now
|
|
if self.instance.access_token_expires is not None:
|
|
if self.instance.access_token_expires > now:
|
|
self.token = self.instance.access_token
|
|
return
|
|
await self.get_access_token()
|
|
|
|
def method_filter(self, method):
|
|
new_method = method.replace("/", "_")
|
|
return new_method
|
|
|
|
async def get_access_token(self):
|
|
"""
|
|
Get the access token for the Nordigen API.
|
|
"""
|
|
log.debug(f"Getting new access token for {self.instance}")
|
|
data = {
|
|
"secret_id": self.instance.secret_id,
|
|
"secret_key": self.instance.secret_key,
|
|
}
|
|
|
|
response = await self.call("token/new", http_method="post", data=data)
|
|
access = response["access"]
|
|
access_expires = response["access_expires"]
|
|
now = timezone.now()
|
|
# Offset now by access_expires seconds
|
|
access_expires = now + timedelta(seconds=access_expires)
|
|
self.instance.access_token = access
|
|
self.instance.access_token_expires = access_expires
|
|
self.instance.save()
|
|
|
|
self.token = access
|
|
|
|
async def get_requisitions(self):
|
|
"""
|
|
Get a list of active accounts.
|
|
"""
|
|
response = await self.call("requisitions")
|
|
return response["results"]
|
|
|
|
async def get_countries(self):
|
|
"""
|
|
Get a list of countries.
|
|
"""
|
|
# This function is a stub.
|
|
|
|
return ["GB", "SE"]
|
|
|
|
async def get_banks(self, country):
|
|
"""
|
|
Get a list of supported banks for a country.
|
|
:param country: country to query
|
|
:return: list of institutions
|
|
:rtype: list
|
|
"""
|
|
if not len(country) == 2:
|
|
return False
|
|
path = f"institutions/?country={country}"
|
|
response = await self.call(path, schema="Institutions", append_slash=False)
|
|
|
|
return response
|
|
|
|
async def build_link(self, institution_id, redirect=None):
|
|
"""Create a link to access an institution.
|
|
:param institution_id: ID of the institution
|
|
"""
|
|
|
|
data = {
|
|
"institution_id": institution_id,
|
|
"redirect": settings.URL,
|
|
}
|
|
if redirect:
|
|
data["redirect"] = redirect
|
|
response = await self.call(
|
|
"requisitions", schema="RequisitionsPost", http_method="post", data=data
|
|
)
|
|
if "link" in response:
|
|
return response["link"]
|
|
return False
|
|
|
|
async def delete_requisition(self, requisition_id):
|
|
"""
|
|
Delete a requisision ID.
|
|
"""
|
|
path = f"requisitions/{requisition_id}"
|
|
|
|
response = await self.call(
|
|
path, schema="RequisitionDelete", http_method="delete"
|
|
)
|
|
return response
|
|
|
|
async def get_requisition(self, requisition):
|
|
"""
|
|
Get a list of accounts for a requisition.
|
|
:param requisition: requisition ID"""
|
|
path = f"requisitions/{requisition}"
|
|
response = await self.call(path, schema="Requisition")
|
|
|
|
return response
|
|
|
|
# def get_ownernames(self):
|
|
# """
|
|
# Get list of supplementary owner names.
|
|
# """
|
|
# ownernames = loads(settings.Nordigen.OwnerNames)
|
|
# return ownernames
|
|
|
|
async def get_account(self, account_id):
|
|
"""
|
|
Get details of an account.
|
|
:param requisition: requisition ID"""
|
|
|
|
path = f"accounts/{account_id}/details"
|
|
response = await self.call(path, schema="AccountDetails")
|
|
if "account" not in response:
|
|
return False
|
|
parsed = response["account"]
|
|
if "bban" in parsed and parsed["currency"] == "GBP":
|
|
sort_code = parsed["bban"][0:6]
|
|
account_number = parsed["bban"][6:]
|
|
# if "ownerName" not in parsed:
|
|
# ownernames = self.get_ownernames()
|
|
# if account_id in ownernames:
|
|
# parsed["ownerName"] = ownernames[account_id]
|
|
|
|
# else:
|
|
# return False
|
|
# recipient = parsed["ownerName"]
|
|
del parsed["bban"]
|
|
if "iban" in parsed:
|
|
del parsed["iban"]
|
|
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
|
|
parsed["sort_code"] = sort_code
|
|
parsed["number"] = account_number
|
|
parsed["recipient"] = "TODO"
|
|
# Let's add the account ID so we can reference it later
|
|
parsed["account_id"] = account_id
|
|
print("PARSED", parsed)
|
|
return parsed
|
|
|
|
async def get_all_account_info(self, requisition=None):
|
|
print("GET ALL ACCOU T INFO", requisition)
|
|
to_return = {}
|
|
if not requisition:
|
|
raise NotImplementedError
|
|
# requisitions = await self.get_requisitions()
|
|
else:
|
|
requisitions = [await self.get_requisition(requisition)]
|
|
print("REQUISITIONS", requisitions)
|
|
|
|
print("REQUISITIONS", requisitions)
|
|
|
|
for req in requisitions:
|
|
print("REQ ITER", req)
|
|
accounts = req["accounts"]
|
|
print("ACCOUNTS", accounts)
|
|
for account_id in accounts:
|
|
account_info = await self.get_account(account_id)
|
|
if not account_info:
|
|
continue
|
|
if req["institution_id"] in to_return:
|
|
to_return[req["institution_id"]].append(account_info)
|
|
else:
|
|
to_return[req["institution_id"]] = [account_info]
|
|
return to_return
|