Implement account handling for Nordigen

This commit is contained in:
Mark Veidemanis 2022-03-22 21:56:35 +00:00
parent 2ca518962c
commit 7b38e487cf
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
5 changed files with 259 additions and 15 deletions

View File

@ -66,11 +66,17 @@ class WebApp(util.Base):
# endpoint called after we finish setting up a connection above
@app.route("/callback-truelayer", methods=["POST"])
def signin_callback(self, request):
def signin_callback_truelayer(self, request):
code = request.args[b"code"]
self.sinks.truelayer.handle_authcode_received(code)
return dumps(True)
@app.route("/callback-nordigen", methods=["GET"])
def signin_callback_nordigen(self, request):
# code = request.args[b"code"]
# self.sinks.truelayer.handle_authcode_received(code)
return dumps(True)
@app.route("/accounts/<string:account>", methods=["GET"])
def balance(self, request, account):
accounts = self.sinks.truelayer.get_accounts(account)

View File

@ -29,7 +29,7 @@ class Sinks(util.Base):
any race conditions by relying on something that might not be there.
"""
self.fidor = sinks.fidor.Fidor()
self.nordigen = sinks.nordigen.Nordigen()
self.nordigen = sinks.nordigen.Nordigen(self)
self.truelayer = sinks.truelayer.TrueLayer(self)
# setattr(self.truelayer, "sinks", self)
@ -62,15 +62,24 @@ class Sinks(util.Base):
:param account_infos: dict of dicts of account information
:param account_infos: dict
"""
for bank, accounts in account_infos.items():
for account in list(accounts):
for index, account in enumerate(list(accounts)):
if "account_number" not in account:
account_infos[bank][index]["account_number"] = {}
fields = ["sort_code", "number", "iban"]
for field in fields:
if field in account:
account_infos[bank][index]["account_number"][field] = account[field]
del account_infos[bank][index][field]
if len(account["account_number"]) == 1:
account_infos[bank].remove(account)
self.log.warning(f"Potentially useless bank account: {account}")
currencies = [account["currency"] for bank, accounts in account_infos.items() for account in accounts]
self.account_info = account_infos
for bank, accounts in account_infos.items():
self.account_info[bank] = []
for account in accounts:
self.account_info[bank].append(account)
# self.account_info = account_infos
self.currencies = currencies
# parsed_details =

View File

@ -1,7 +1,10 @@
# Twisted/Klein imports
from twisted.internet.task import LoopingCall
# Other library imports
import requests
from json import dumps
from simplejson.errors import JSONDecodeError
from json import dumps, loads
# Project imports
from settings import settings
@ -13,10 +16,32 @@ class Nordigen(util.Base):
Class to manage calls to Open Banking APIs through Nordigen.
"""
def __init__(self):
def __init__(self, sinks):
super().__init__()
self.sinks = sinks
self.token = None
self.get_access_token()
self.banks = {}
self.authed = False
# Get the banks from the config and cache them
self.get_mapped_accounts()
self.lc = LoopingCall(self.get_access_token)
self.lc.start(int(settings.Nordigen.TokenRefreshSec))
def __authed__(self):
"""
Called when we have received the access token.
"""
self.log.info("Connection authenticated.")
account_infos = self.get_all_account_info()
# Filter for added accounts since we only do that for TrueLayer
account_infos = {
bank: accounts for bank, accounts in account_infos.items() for account in accounts if account["account_id"] in self.banks
}
self.sinks.got_account_info("nordigen", account_infos)
def get_access_token(self):
"""
@ -39,6 +64,11 @@ class Nordigen(util.Base):
if "access" in parsed:
self.token = parsed["access"]
self.log.info("Refreshed access token")
else:
self.log.error(f"Access token not in response: {parsed}")
if not self.authed:
self.__authed__()
self.authed = True
def get_institutions(self, country, filter_name=None):
"""
@ -65,3 +95,160 @@ class Nordigen(util.Base):
new_list.append(i)
return new_list
return parsed
def create_agreement(self, institution_id):
"""Create an agreement to access an institution.
:param institution_id: ID of the institution
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/agreements/enduser"
data = {"institution_id": institution_id}
r = requests.post(path, headers=headers, data=dumps(data))
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing agreement response: {r.content}")
return False
return parsed
def build_link(self, institution_id):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions/"
data = {"institution_id": institution_id, "redirect": settings.Nordigen.CallbackURL}
r = requests.post(path, headers=headers, data=data)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing link response: {r.content}")
return False
if "link" in parsed:
return parsed["link"]
return False
def create_auth_url(self, country, bank_name):
"""Helper to look up a bank and create a link.
:param country: country
:param bank_name: bank name string to search"""
institutions = self.get_institutions(country, filter_name=bank_name)
# We were not precise enough to have one result
if not len(institutions) == 1:
return False
institution = institutions[0]
link = self.build_link(institution["id"])
if not link:
return False
return link
def get_requisitions(self):
"""
Get a list of active accounts.
"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing requisitions response: {r.content}")
return False
if "results" in parsed:
return parsed["results"]
else:
self.log.error(f"Results not in requisitions response: {parsed}")
return False
def get_accounts(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/requisitions/{requisition}/"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing accounts response: {r.content}")
return False
if "accounts" in parsed:
return parsed["accounts"]
return False
def get_account(self, account_id):
"""
Get details of an account.
:param requisition: requisition ID"""
headers = {"accept": "application/json", "Authorization": f"Bearer {self.token}"}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing account response: {r.content}")
return False
# if "accounts" in parsed:
# return parsed["accounts"]
# return False
parsed = parsed["account"]
if "bban" in parsed and parsed["currency"] == "GBP":
sort_code = parsed["bban"][0:6]
account_number = parsed["bban"][6:]
del parsed["bban"]
del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
return parsed
def get_mapped_accounts(self):
existing_entry = loads(settings.Nordigen.Maps)
self.banks = existing_entry
def map_account(self, account_id):
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
account_data = self.get_account(account_id)
currency = account_data["currency"]
existing_entry = loads(settings.Nordigen.Maps)
if account_id in existing_entry:
return
else:
existing_entry.append(account_id)
settings.Nordigen.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
return currency
def get_all_account_info(self):
to_return = {}
requisitions = self.get_requisitions()
if not requisitions:
self.log.error("Could not get requisitions.")
return {}
for req in requisitions:
if not req["accounts"]:
continue
accounts = self.get_accounts(req["id"])
for account_id in accounts:
# if not account_id in self.banks:
# print("account_id", account_id, "not in self.banks!")
# continue
account_info = self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
return to_return

View File

@ -53,7 +53,6 @@ $PAYMENT$
* If you are asked for address information, please use **24 Holborn Viaduct, London, England, EC1A 2BN**
* The post code is **EC1A 2BN**
* Set **Company name** to **"PATHOGEN LIMITED"**"""
print("EXPECT", ad_text)
self.assertEqual(ad_text, expected)
def test_format_payment_details(self):

View File

@ -496,10 +496,26 @@ class IRCCommands(object):
auth_url = tx.truelayer.create_auth_url(account)
msg(f"Auth URL for {account}: {auth_url}")
class nsignin(object):
name = "nsignin"
authed = True
helptext = "Generate a Nordigen signin URL. Usage: nsignin <country> <bank>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 3:
country = spl[1]
bank_name = spl[2]
auth_url = tx.sinks.nordigen.create_auth_url(country, bank_name)
if not auth_url:
msg("Could not find bank.")
return
msg(f"Auth URL for {bank_name}: {auth_url}")
class accounts(object):
name = "accounts"
authed = True
helptext = "Get a list of acccounts. Usage: accounts <account>"
helptext = "Get a list of acccounts from TrueLayer. Usage: accounts <account>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
@ -509,6 +525,18 @@ class IRCCommands(object):
for account in accounts["results"]:
msg(f"{account['account_id']} {account['display_name']} {account['currency']}")
class naccounts(object):
name = "naccounts"
authed = True
helptext = "Get a list of acccounts from Nordigen. Usage: naccounts"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 1:
accounts = tx.sinks.nordigen.get_all_account_info()
for name, account in accounts.items():
msg(f"{name} {account['account_id']} {account['details']} {account['currency']}")
class transactions(object):
name = "transactions"
authed = True
@ -533,7 +561,7 @@ class IRCCommands(object):
class mapaccount(object):
name = "mapaccount"
authed = True
helptext = "Enable an account_id at a bank for use. Usage: mapaccount <bank> <account_id>"
helptext = "Enable an account_id at a bank for use in TrueLayer. Usage: mapaccount <bank> <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
@ -546,6 +574,21 @@ class IRCCommands(object):
return
msg(f"Mapped account ID {account_id} at bank {bank} to {account_name}")
class nmapaccount(object):
name = "nmapaccount"
authed = True
helptext = "Enable an account_id at a bank for use in Nordigen. Usage: nmapaccount <bank> <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account_id = spl[1]
account_name = tx.sinks.nordigen.map_account(account_id)
if not account_name:
msg(f"Failed to map the account")
return
msg(f"Mapped account ID {account_id} to {account_name}")
class unmapped(object):
name = "unmapped"
authed = True