pluto/handler/sinks/nordigen.py

479 lines
16 KiB
Python

# Twisted/Klein imports
from twisted.internet.task import LoopingCall
from twisted.internet.defer import inlineCallbacks
# Other library imports
import requests
from simplejson.errors import JSONDecodeError
from json import dumps, loads
from lib.serde.nordigen import (
TXRoot,
AccessToken,
Institutions,
Agreement,
Requisitions,
AccountDetails,
AccountBalancesRoot,
RequisitionResponse,
)
from serde import ValidationError
from hashlib import sha256
import treq
# Project imports
from settings import settings
import util
class Nordigen(util.Base):
"""
Class to manage calls to Open Banking APIs through Nordigen.
"""
def __init__(self, sinks):
super().__init__()
self.sinks = sinks
self.token = None
self.banks = {}
self.authed = False
self.requisitions = None
# Get the banks from the config and cache them
self.log.debug("Getting mapped accounts.")
self.get_mapped_accounts()
self.log.debug("Finished getting mapped accounts.")
self.log.debug("Creating loop to get access token.")
self.lc = LoopingCall(self.get_access_token)
self.lc.start(int(settings.Nordigen.TokenRefreshSec))
self.log.debug("Finished creating loops.")
def __authed__(self):
"""
Called when we have received the access token.
"""
self.log.info("Connection authenticated.")
# self.get_requisitions()
d = self.get_all_account_info()
d.addCallback(self.got_all_account_info)
self.sinks.all_sinks_authenticated()
def got_all_account_info(self, account_infos):
# Filter for added accounts since we only do that for TrueLayer
account_infos = {
bank: accounts
for bank, accounts in account_infos.items()
for account in accounts
if account["account_id"] in self.banks
}
self.sinks.got_account_info("nordigen", account_infos)
self.lc_tx = LoopingCall(self.transaction_loop)
self.lc_tx.start(int(settings.Nordigen.RefreshSec))
def transaction_loop(self):
for account_id in self.banks:
transactions = self.get_transactions(account_id)
transactions.addCallback(self.got_transactions, account_id)
def got_transactions(self, transactions, account_id):
self.sinks.got_transactions("nordigen", account_id, transactions)
def generic_deferred(self, response, dest_func):
"""
Generic function to take a treq response and fire a callback with
its content to dest_func.
:param response: a treq response
:param dest_func: function to call with the response data
"""
self.log.debug(f"Generic deferred received: {response}")
content = response.content()
content.addCallback(dest_func)
@inlineCallbacks
def get_access_token(self):
"""
Get an access token.
:return: True or False
:rtype: bool
"""
headers = {
"accept": "application/json",
"Content-Type": "application/json",
}
data = {
"secret_id": settings.Nordigen.ID,
"secret_key": settings.Nordigen.Key,
}
path = f"{settings.Nordigen.Base}/token/new/"
self.log.debug("Getting new access token.")
d = yield treq.post(path, headers=headers, data=data)
content = yield d.content()
try:
obj = AccessToken.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
self.token = parsed["access"]
self.log.info("Refreshed access token")
if not self.authed:
self.__authed__()
self.authed = True
def get_institutions(self, country, filter_name=None):
"""
Get a list of supported institutions.
:param country: country to query
:param filter_name: return only results with this in the name
:return: list of institutions
:rtype: list
"""
if not len(country) == 2:
return False
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/institutions/?country={country}"
r = requests.get(path, headers=headers)
try:
parsed_pre = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing institutions response: {r.content}")
return False
parsed = {"institutions": parsed_pre}
try:
obj = Institutions.from_dict(parsed)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()["institutions"]
new_list = []
if filter_name:
for i in parsed:
if filter_name in i["name"]:
new_list.append(i)
return new_list
return parsed
def build_link(self, institution_id):
"""Create a link to access an institution.
:param institution_id: ID of the institution
"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions/"
data = {
"institution_id": institution_id,
"redirect": settings.Nordigen.CallbackURL,
}
r = requests.post(path, headers=headers, data=data)
try:
obj = Agreement.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "link" in parsed:
return parsed["link"]
return False
def create_auth_url(self, country, bank_name):
"""Helper to look up a bank and create a link.
:param country: country
:param bank_name: bank name string to search"""
institutions = self.get_institutions(country, filter_name=bank_name)
# We were not precise enough to have one result
if not len(institutions) == 1:
return False
institution = institutions[0]
link = self.build_link(institution["id"])
if not link:
return False
return link
@inlineCallbacks
def get_requisitions(self):
"""
Get a list of active accounts.
"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions"
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = Requisitions.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "results" in parsed:
return parsed["results"]
else:
self.log.error(f"Results not in requisitions response: {parsed}")
return False
def delete_requisition(self, requisition_id):
"""
Delete a requisision ID.
"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions/{requisition_id}/"
r = requests.delete(path, headers=headers)
try:
obj = RequisitionResponse.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
return parsed
@inlineCallbacks
def get_accounts(self, requisition):
"""
Get a list of accounts for a requisition.
:param requisition: requisition ID"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions/{requisition}/"
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = Agreement.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()
if "accounts" in parsed:
return parsed["accounts"]
return False
def get_ownernames(self):
"""
Get list of supplementary owner names.
"""
ownernames = loads(settings.Nordigen.OwnerNames)
return ownernames
@inlineCallbacks
def get_account(self, account_id):
"""
Get details of an account.
:param requisition: requisition ID"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/"
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = AccountDetails.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed_obj = obj.to_dict()
if "account" not in parsed_obj:
return False
parsed = parsed_obj["account"]
if "bban" in parsed and parsed["currency"] == "GBP":
sort_code = parsed["bban"][0:6]
account_number = parsed["bban"][6:]
if "ownerName" not in parsed:
self.log.warning(f"No owner name in parsed, cannot use: {account_id}")
ownernames = self.get_ownernames()
if account_id in ownernames:
parsed["ownerName"] = ownernames[account_id]
self.log.info(f"Found supplementary owner name for {account_id}: {ownernames[account_id]}")
else:
return False
recipient = parsed["ownerName"]
del parsed["bban"]
if "iban" in parsed:
del parsed["iban"]
sort_code = "-".join(list(map("".join, zip(*[iter(sort_code)] * 2))))
parsed["sort_code"] = sort_code
parsed["number"] = account_number
parsed["recipient"] = recipient
# Let's add the account ID so we can reference it later
parsed["account_id"] = account_id
return parsed
def get_mapped_accounts(self):
existing_entry = loads(settings.Nordigen.Maps)
self.banks = existing_entry
def map_account(self, account_id): # TODO: inlineCallbacks?
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
account_data = self.get_account(account_id)
currency = account_data["currency"]
existing_entry = loads(settings.Nordigen.Maps)
if account_id in existing_entry:
return
else:
existing_entry.append(account_id)
settings.Nordigen.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
return currency
def unmap_account(self, account_id):
"""
Unmap an account_id at a bank to an account_name.
This disables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
existing_entry = loads(settings.Nordigen.Maps)
if account_id not in existing_entry:
return
else:
existing_entry.remove(account_id)
settings.Nordigen.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
@inlineCallbacks
def get_all_account_info(self):
to_return = {}
requisitions = yield self.get_requisitions()
if not requisitions:
self.log.error("Could not get requisitions.")
return {}
for req in requisitions:
if not req["accounts"]:
continue
accounts = yield self.get_accounts(req["id"])
for account_id in accounts:
account_info = yield self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
to_return[req["institution_id"]].append(account_info)
else:
to_return[req["institution_id"]] = [account_info]
return to_return
def normalise_transactions(self, transactions):
for transaction in transactions:
# Rename ID
if "transactionId" in transaction:
transaction["transaction_id"] = transaction["transactionId"]
del transaction["transactionId"]
else:
# No transaction ID. This is a problem for our implementation
tx_hash = sha256(dumps(transaction, sort_keys=True).encode("utf8")).hexdigest()
transaction["transaction_id"] = tx_hash
# Rename timestamp
transaction["timestamp"] = transaction["bookingDate"]
del transaction["bookingDate"]
transaction["amount"] = float(transaction["transactionAmount"]["amount"])
transaction["currency"] = transaction["transactionAmount"]["currency"]
del transaction["transactionAmount"]
transaction["reference"] = transaction["remittanceInformationUnstructured"]
del transaction["remittanceInformationUnstructured"]
@inlineCallbacks
def get_transactions(self, account_id):
"""
Get all transactions for an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/transactions/"
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = TXRoot.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed_obj = obj.to_dict()
if "transactions" not in parsed_obj:
self.log.warning(f"No transactions for account: {account_id}")
return {}
parsed = parsed_obj["transactions"]["booked"]
self.normalise_transactions(parsed)
return parsed
def get_balance(self, account_id):
"""
Get the balance and currency of an account.
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
"""
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/balances/"
r = requests.get(path, headers=headers)
try:
obj = AccountBalancesRoot.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()["balances"]
total = 0
currency = None
for entry in parsed:
if currency:
if not currency == entry["balanceAmount"]["currency"]:
self.log.error("Different currencies in balance query.")
return
if not entry["balanceType"] == "interimBooked":
continue
total += float(entry["balanceAmount"]["amount"])
currency = entry["balanceAmount"]["currency"]
return (currency, total)
def get_total_map(self):
"""
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
"""
totals = {}
for account_id in self.banks:
currency, amount = self.get_balance(account_id)
if not amount:
continue
if currency in totals:
totals[currency] += amount
else:
totals[currency] = amount
return totals