Handle multiple accounts with TrueLayer

This commit is contained in:
Mark Veidemanis 2022-03-06 18:51:16 +00:00
parent 28d4db5694
commit 95e9a55f19
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
3 changed files with 128 additions and 63 deletions

View File

@ -4,8 +4,7 @@ from twisted.internet import reactor
from klein import Klein from klein import Klein
# Other library imports # Other library imports
from json import dumps, loads from json import dumps
from json.decoder import JSONDecodeError
from signal import signal, SIGINT from signal import signal, SIGINT
# Project imports # Project imports
@ -47,23 +46,23 @@ class WebApp(util.Base):
app = Klein() app = Klein()
@app.route("/callback", methods=["POST"]) # @app.route("/callback", methods=["POST"])
def callback(self, request): # def callback(self, request):
content = request.content.read() # content = request.content.read()
try: # try:
parsed = loads(content) # parsed = loads(content)
except JSONDecodeError: # except JSONDecodeError:
self.log.error(f"Failed to parse JSON callback: {content}") # self.log.error(f"Failed to parse JSON callback: {content}")
return dumps(False) # return dumps(False)
self.log.info("Callback received: {parsed}", parsed=parsed["data"]["id"]) # self.log.info("Callback received: {parsed}", parsed=parsed["data"]["id"])
# self.tx.transaction(parsed) # # self.tx.transaction(parsed)
return dumps(True) # return dumps(True)
# set up another connection to a bank # set up another connection to a bank
@app.route("/signin", methods=["GET"]) @app.route("/signin/<string:account>", methods=["GET"])
def signin(self, request): def signin(self, request, account):
auth_url = self.sinks.truelayer.create_auth_url() auth_url = self.sinks.truelayer.create_auth_url(account)
return f'Please sign in <a href="{auth_url}" target="_blank">here.</a>' return f'Please sign in to {account} <a href="{auth_url}" target="_blank">here.</a>'
# endpoint called after we finish setting up a connection above # endpoint called after we finish setting up a connection above
@app.route("/callback-truelayer", methods=["POST"]) @app.route("/callback-truelayer", methods=["POST"])
@ -72,9 +71,9 @@ class WebApp(util.Base):
self.sinks.truelayer.handle_authcode_received(code) self.sinks.truelayer.handle_authcode_received(code)
return dumps(True) return dumps(True)
@app.route("/accounts", methods=["GET"]) @app.route("/accounts/<string:account>", methods=["GET"])
def balance(self, request): def balance(self, request, account):
accounts = self.sinks.truelayer.get_accounts() accounts = self.sinks.truelayer.get_accounts(account)
return dumps(accounts, indent=2) return dumps(accounts, indent=2)

View File

@ -5,6 +5,7 @@ from twisted.internet.task import LoopingCall
import requests import requests
from simplejson.errors import JSONDecodeError from simplejson.errors import JSONDecodeError
from time import time from time import time
from json import dumps, loads
import urllib import urllib
# Project imports # Project imports
@ -19,11 +20,41 @@ class TrueLayer(util.Base):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.token = None self.tokens = {}
self.lc = LoopingCall(self.get_new_token)
# account we are authenticating - where to store the refresh keys
self.current_authcode_account = None
self.lc = LoopingCall(self.get_new_tokens_all)
self.lc.start(int(settings.TrueLayer.RefreshSec)) self.lc.start(int(settings.TrueLayer.RefreshSec))
def create_auth_url(self): def add_refresh_token(self, refresh_token):
"""
Add an API key to the configuration.
Data type: {"monzo": refresh_token,
"revolut": refresh_token}
"""
print("ADD REFRESH TOKEN", refresh_token)
account = self.current_authcode_account
if not account:
print("CURRENT AUTHCODE ACCOUNT", account)
return False
existing_entry = loads(settings.TrueLayer.RefreshKeys)
print("existing entry", existing_entry)
existing_entry[account] = refresh_token
settings.TrueLayer.RefreshKeys = dumps(existing_entry)
settings.write()
def get_refresh_tokens(self):
existing_entry = loads(settings.TrueLayer.RefreshKeys)
return existing_entry
def get_key(self, account):
if account in self.tokens:
return self.tokens[account]
else:
return False
def create_auth_url(self, account):
query = urllib.parse.urlencode( query = urllib.parse.urlencode(
{ {
"response_type": "code", "response_type": "code",
@ -36,6 +67,8 @@ class TrueLayer(util.Base):
} }
) )
auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}" auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}"
print("SETTING AUTHCODE ACCOUNT TO", account)
self.current_authcode_account = account
return auth_uri return auth_uri
def handle_authcode_received(self, authcode): def handle_authcode_received(self, authcode):
@ -52,24 +85,46 @@ class TrueLayer(util.Base):
except JSONDecodeError: except JSONDecodeError:
return False return False
if "error" in parsed: if "error" in parsed:
self.log.error("Error requesting refresh token: {error}", error=parsed["error"]) self.log.error("Error requesting refresh token: {parsed['error']}")
return False return False
settings.TrueLayer.RefreshToken = parsed["refresh_token"]
settings.TrueLayer.AuthCode = authcode
settings.write()
self.token = parsed["access_token"]
self.log.info("Retrieved access/refresh tokens")
def get_new_token(self): # Extract the access tokens
refresh_token = parsed["refresh_token"]
access_token = parsed["access_token"]
# Add the refresh token
self.add_refresh_token(refresh_token)
# Add the access
if self.current_authcode_account:
self.tokens[self.current_authcode_account] = access_token
else:
self.log.error(f"Received an authcode we didn't ask for")
return
self.log.info(f"Retrieved access/refresh tokens for {self.current_authcode_account}")
def get_new_tokens_all(self):
print("get new_tokens_all running")
refresh_list = loads(settings.TrueLayer.RefreshKeys)
for account in refresh_list:
print("new_new_tokens_all running on", account)
self.get_new_token(account)
def get_new_token(self, account):
""" """
Exchange our refresh token for an access token. Exchange our refresh token for an access token.
:param account: account to refresh the token for
:type account:
""" """
if not settings.TrueLayer.RefreshToken: refresh_tokens = self.get_refresh_tokens()
if account not in refresh_tokens:
return return
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = { data = {
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": settings.TrueLayer.RefreshToken, "refresh_token": refresh_tokens[account],
"client_id": settings.TrueLayer.ID, "client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key, "client_secret": settings.TrueLayer.Key,
} }
@ -80,8 +135,8 @@ class TrueLayer(util.Base):
return False return False
if r.status_code == 200: if r.status_code == 200:
if "access_token" in parsed.keys(): if "access_token" in parsed.keys():
self.token = parsed["access_token"] self.tokens[account] = parsed["access_token"]
self.log.info("Refreshed access token") self.log.info(f"Refreshed access token for {account}")
return True return True
else: else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
@ -90,11 +145,12 @@ class TrueLayer(util.Base):
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False return False
def get_accounts(self): def get_accounts(self, account):
""" """
Get a list of accounts. Get a list of accounts.
""" """
headers = {"Authorization": f"Bearer {self.token}"} token = self.get_key(account)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts" path = f"{settings.TrueLayer.DataBase}/accounts"
r = requests.get(path, headers=headers) r = requests.get(path, headers=headers)
try: try:
@ -105,14 +161,15 @@ class TrueLayer(util.Base):
return parsed return parsed
def get_transactions(self, account_id): def get_transactions(self, account, account_id):
""" """
Get a list of transactions from an account. Get a list of transactions from an account.
:param account_id: account to fetch transactions for :param account_id: account to fetch transactions for
:return: list of transactions :return: list of transactions
:rtype: dict :rtype: dict
""" """
headers = {"Authorization": f"Bearer {self.token}"} token = self.get_key(account)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions" path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions"
r = requests.get(path, headers=headers) r = requests.get(path, headers=headers)
try: try:

View File

@ -443,36 +443,45 @@ class IRCCommands(object):
class signin(object): class signin(object):
name = "signin" name = "signin"
authed = True authed = True
helptext = "Generate a TrueLayer signin URL." helptext = "Generate a TrueLayer signin URL. Usage: signin <account>"
@staticmethod @staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux): def run(cmd, spl, length, authed, msg, agora, tx, ux):
auth_url = tx.truelayer.create_auth_url() if length == 2:
msg(f"Auth URL: {auth_url}") account = spl[1]
auth_url = tx.truelayer.create_auth_url(account)
msg(f"Auth URL for {account}: {auth_url}")
class accounts(object): class accounts(object):
name = "accounts" name = "accounts"
authed = True authed = True
helptext = "Get a list of acccounts." helptext = "Get a list of acccounts. Usage: accounts <account>"
@staticmethod @staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux): def run(cmd, spl, length, authed, msg, agora, tx, ux):
accounts = tx.sinks.truelayer.get_accounts() if length == 2:
account = spl[1]
accounts = tx.sinks.truelayer.get_accounts(account)
msg(dumps(accounts)) msg(dumps(accounts))
class transactions(object): class transactions(object):
name = "transactions" name = "transactions"
authed = True authed = True
helptext = "Get a list of transactions. Usage: transactions <account_id>" helptext = "Get a list of transactions. Usage: transactions <account> <account_id>"
@staticmethod @staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux): def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2: if length == 3:
account_id = spl[1] account = spl[1]
transactions = tx.sinks.truelayer.get_transactions(account_id) account_id = spl[2]
transactions = tx.sinks.truelayer.get_transactions(account, account_id)
for transaction in transactions: for transaction in transactions:
print(transaction)
txid = transaction["transaction_id"]
ptxid = transaction["provider_transaction_id"]
txtype = transaction["transaction_type"]
timestamp = transaction["timestamp"] timestamp = transaction["timestamp"]
amount = transaction["amount"] amount = transaction["amount"]
currency = transaction["currency"] currency = transaction["currency"]
recipient = transaction["counter_party_preferred_name"] description = transaction["description"]
msg(f"{timestamp} {amount}{currency} {recipient}") msg(f"{timestamp} {txid} {ptxid} {txtype} {amount}{currency} {description}")