pluto/handler/sinks/truelayer.py

331 lines
12 KiB
Python

# Twisted/Klein imports
from twisted.internet.task import LoopingCall
# Other library imports
import requests
from simplejson.errors import JSONDecodeError
from time import time
from json import dumps, loads
from lib.serde.truelayer import AccountBalancesRoot
import urllib
from serde import ValidationError
# Project imports
from settings import settings
import util
class TrueLayer(util.Base):
"""
Class to manage calls to Open Banking APIs through TrueLayer.
"""
def __init__(self, sinks):
super().__init__()
self.sinks = sinks
self.tokens = {}
self.banks = {}
self.refresh_tokens = {}
self.authed = False
# Get the banks from the config and cache them
self.get_mapped_accounts()
# account we are authenticating - where to store the refresh keys
self.current_authcode_bank = None
self.lc = LoopingCall(self.get_new_tokens_all)
# self.get_new_tokens_all()
# self.get_new_token(bank)
# -> set self.tokens[bank] = access_token
self.lc.start(int(settings.TrueLayer.TokenRefreshSec))
def __authed__(self):
"""
Called when we have received all the API tokens.
"""
# Get the account information and pass it to the main function
self.log.info("All accounts authenticated: " + ", ".join(self.tokens.keys()))
account_infos = self.get_all_account_info()
self.sinks.got_account_info("truelayer", account_infos)
self.lc_tx = LoopingCall(self.transaction_loop)
self.lc_tx.start(int(settings.TrueLayer.RefreshSec))
def transaction_loop(self):
for bank in self.banks:
for account_id in self.banks[bank]:
# account_data = self.get_account(bank, account_id)
transactions = self.get_transactions(bank, account_id)
self.sinks.got_transactions("truelayer", account_id, transactions)
def add_refresh_token(self, refresh_token):
"""
Add an API key to the configuration.
Data type: {"monzo": refresh_token,
"revolut": refresh_token}
"""
account = self.current_authcode_bank
if not account:
return False
existing_entry = loads(settings.TrueLayer.RefreshKeys)
existing_entry[account] = refresh_token
settings.TrueLayer.RefreshKeys = dumps(existing_entry)
# Set the cached entry
self.refresh_tokens = existing_entry
settings.write()
# def get_refresh_tokens(self):
# existing_entry = loads(settings.TrueLayer.RefreshKeys)
# return existing_entry
def get_key(self, bank):
if bank in self.tokens:
return self.tokens[bank]
else:
return False
def create_auth_url(self, bank):
query = urllib.parse.urlencode(
{
"response_type": "code",
"response_mode": "form_post",
"client_id": settings.TrueLayer.ID,
"scope": "info accounts balance transactions offline_access",
"nonce": int(time()),
"redirect_uri": settings.TrueLayer.CallbackURL,
"enable_mock": "true",
}
)
auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}"
self.current_authcode_bank = bank
return auth_uri
def handle_authcode_received(self, authcode):
data = {
"client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key,
"code": authcode,
"grant_type": "authorization_code",
"redirect_uri": settings.TrueLayer.CallbackURL,
}
r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data)
try:
parsed = r.json()
except JSONDecodeError:
return False
if "error" in parsed:
self.log.error("Error requesting refresh token: {parsed['error']}")
return False
# Extract the access tokens
refresh_token = parsed["refresh_token"]
access_token = parsed["access_token"]
# Add the refresh token
self.add_refresh_token(refresh_token)
# Add the access
if self.current_authcode_bank:
self.tokens[self.current_authcode_bank] = access_token
else:
self.log.error(f"Received an authcode we didn't ask for")
return
self.log.info(f"Retrieved access/refresh tokens for {self.current_authcode_bank}")
def get_new_tokens_all(self):
refresh_tokens = loads(settings.TrueLayer.RefreshKeys)
# Set the cached entry
self.refresh_tokens = refresh_tokens
for bank in refresh_tokens:
rtrn = self.get_new_token(bank)
if not rtrn:
self.log.error(f"Error getting token for {bank}")
return
def get_new_token(self, bank):
"""
Exchange our refresh token for an access token.
:param account: account to refresh the token for
:type account:
"""
if bank not in self.refresh_tokens:
self.log.error(f"Bank {bank} not in refresh tokens")
return
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_tokens[bank],
"client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key,
}
r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Failed to decode JSON: {r.content}")
return False
if r.status_code == 200:
if "access_token" in parsed.keys():
self.tokens[bank] = parsed["access_token"]
# self.log.info(f"Refreshed access token for {bank}")
if len(self.refresh_tokens.keys()) == len(self.tokens.keys()) and not self.authed:
# We are now fully authenticated and ready to start loops!
self.__authed__()
self.authed = True
return True
else:
self.log.error(f"Token refresh didn't contain access token: {parsed}")
return False
else:
self.log.error(f"Cannot refresh token: {parsed}")
return False
def get_accounts(self, bank):
"""
Get a list of accounts.
"""
token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing accounts response: {content}", content=r.content)
return False
return parsed
def _get_account(self, bank, account_id):
token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing accounts response: {r.content}")
return False
return parsed
def get_mapped_accounts(self):
existing_entry = loads(settings.TrueLayer.Maps)
self.banks = existing_entry
def get_all_account_info(self):
to_return = {}
for bank in self.banks:
for account_id in self.banks[bank]:
account_data = self.get_account(bank, account_id)
if bank in to_return:
to_return[bank].append(account_data)
else:
to_return[bank] = [account_data]
return to_return
def get_account(self, bank, account_id):
account_data = self._get_account(bank, account_id)
if "results" not in account_data:
return False
if not len(account_data["results"]) == 1:
return False
if not len(account_data) == 2:
return False
if not account_data["status"] == "Succeeded":
return False
base = account_data["results"][0]
return base
def map_account(self, bank, account_id):
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
account_data = self.get_account(bank, account_id)
currency = account_data["currency"]
existing_entry = loads(settings.TrueLayer.Maps)
if bank in existing_entry:
if account_id not in existing_entry[bank]:
existing_entry[bank].append(account_id)
else:
existing_entry[bank] = [account_id]
settings.TrueLayer.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
return currency
def get_transactions(self, bank, account_id):
"""
Get a list of transactions from an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error(f"Error parsing transactions response: {r.content}")
return (False, False)
if "results" in parsed:
return parsed["results"]
else:
return (False, False)
def get_balance(self, bank, account_id):
"""
Get the balance of an account.
:param bank: the bank to check
:param account_id: the account ID
:return: tuple of (currency, amount)
:rtype: tuple
"""
token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/balance"
r = requests.get(path, headers=headers)
try:
obj = AccountBalancesRoot.from_json(r.content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
parsed = obj.to_dict()["results"]
total = 0
currency = None
for entry in parsed:
if currency:
if not currency == entry["currency"]:
self.log.error("Different currencies in balance query.")
return
total += entry["available"]
currency = entry["currency"]
return (currency, total)
def get_total_map(self):
"""
Return a dictionary keyed by currencies with the amounts as values.
:return: dict keyed by currency, values are amounts
:rtype: dict
"""
totals = {}
for bank in self.banks:
for account_id in self.banks[bank]:
currency, amount = self.get_balance(bank, account_id)
if not amount:
continue
if currency in totals:
totals[currency] += amount
else:
totals[currency] = amount
return totals