pluto/handler/sinks/nordigen.py

261 lines
9.3 KiB
Python

# Twisted/Klein imports
from twisted.internet.task import LoopingCall
# Other library imports
import requests
from simplejson.errors import JSONDecodeError
from json import dumps, loads
from lib.serde.nordigen import TXRoot, AccessToken, Institutions, Agreement, Requisitions, AccountDetails
from serde import ValidationError
# Project imports
from settings import settings
import util
class Nordigen(util.Base):
"""
Class to manage calls to Open Banking APIs through Nordigen.
"""
def __init__(self, sinks):
super().__init__()
self.sinks = sinks
self.token = None
self.banks = {}
self.authed = False
# Get the banks from the config and cache them
self.get_mapped_accounts()
self.lc = LoopingCall(self.get_access_token)
self.lc.start(int(settings.Nordigen.TokenRefreshSec))
def __authed__(self):
"""
Called when we have received the access token.
"""
self.log.info("Connection authenticated.")
account_infos = self.get_all_account_info()
# Filter for added accounts since we only do that for TrueLayer
account_infos = {
bank: accounts for bank, accounts in account_infos.items() for account in accounts if account["account_id"] in self.banks
}
self.sinks.got_account_info("nordigen", account_infos)
def get_access_token(self):
"""
Get an access token.
:return: True or False
:rtype: bool
"""
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {
"secret_id": settings.Nordigen.ID,
"secret_key": settings.Nordigen.Key,
}
path = f"{settings.Nordigen.Base}/token/new/"
r = requests.post(path, headers=headers, data=dumps(data))
try:
obj = AccessToken.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
self.token = parsed["access"]
self.log.info("Refreshed access token")
if not self.authed:
self.__authed__()
self.authed = True
def get_institutions(self, country, filter_name=None):
"""
Get a list of supported institutions.
:param country: country to query
:param filter_name: return only results with this in the name
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/institutions/?country={country}"
r = requests.get(path, headers=headers)
try:
parsed_pre = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing institutions response: {r.content}")
return False
parsed = {"institutions": parsed_pre}
try:
obj = Institutions.from_dict(parsed)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()["institutions"]
new_list = []
if filter_name:
for i in parsed:
if filter_name in i["name"]:
new_list.append(i)
return new_list
return parsed
def build_link(self, institution_id):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions/"
data = {"institution_id": institution_id, "redirect": settings.Nordigen.CallbackURL}
r = requests.post(path, headers=headers, data=data)
try:
obj = Agreement.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "link" in parsed:
return parsed["link"]
return False
def create_auth_url(self, country, bank_name):
"""Helper to look up a bank and create a link.
:param country: country
:param bank_name: bank name string to search"""
institutions = self.get_institutions(country, filter_name=bank_name)
# We were not precise enough to have one result
if not len(institutions) == 1:
return False
institution = institutions[0]
link = self.build_link(institution["id"])
if not link:
return False
return link
def get_requisitions(self):
"""
Get a list of active accounts.
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions"
r = requests.get(path, headers=headers)
try:
obj = Requisitions.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "results" in parsed:
return parsed["results"]
else:
self.log.error(f"Results not in requisitions response: {parsed}")
return False
def get_accounts(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions/{requisition}/"
r = requests.get(path, headers=headers)
try:
obj = Agreement.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "accounts" in parsed:
return parsed["accounts"]
return False
def get_account(self, account_id):
"""
Get details of an account.
:param requisition: requisition ID"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/"
r = requests.get(path, headers=headers)
try:
obj = AccountDetails.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()["account"]
if "bban" in parsed and parsed["currency"] == "GBP":
sort_code = parsed["bban"][0:6]
account_number = parsed["bban"][6:]
del parsed["bban"]
del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
return parsed
def get_mapped_accounts(self):
existing_entry = loads(settings.Nordigen.Maps)
self.banks = existing_entry
def map_account(self, account_id):
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
account_data = self.get_account(account_id)
currency = account_data["currency"]
existing_entry = loads(settings.Nordigen.Maps)
if account_id in existing_entry:
return
else:
existing_entry.append(account_id)
settings.Nordigen.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
return currency
def get_all_account_info(self):
to_return = {}
requisitions = self.get_requisitions()
if not requisitions:
self.log.error("Could not get requisitions.")
return {}
for req in requisitions:
if not req["accounts"]:
continue
accounts = self.get_accounts(req["id"])
for account_id in accounts:
# if not account_id in self.banks:
# print("account_id", account_id, "not in self.banks!")
# continue
account_info = self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
return to_return
def get_transactions(self, account_id):
"""
Get all transactions for an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/transactions/"
r = requests.get(path, headers=headers)
obj = TXRoot.from_json(r.content)
return obj.to_dict()["transactions"]["booked"]