# Twisted/Klein imports from twisted.internet.task import LoopingCall # Other library imports import requests from simplejson.errors import JSONDecodeError from time import time from json import dumps, loads import urllib # Project imports from settings import settings import util class TrueLayer(util.Base): """ Class to manage calls to Open Banking APIs through TrueLayer. """ def __init__(self): super().__init__() self.tokens = {} # account we are authenticating - where to store the refresh keys self.current_authcode_account = None self.lc = LoopingCall(self.get_new_tokens_all) self.lc.start(int(settings.TrueLayer.RefreshSec)) def add_refresh_token(self, refresh_token): """ Add an API key to the configuration. Data type: {"monzo": refresh_token, "revolut": refresh_token} """ account = self.current_authcode_account if not account: return False existing_entry = loads(settings.TrueLayer.RefreshKeys) existing_entry[account] = refresh_token settings.TrueLayer.RefreshKeys = dumps(existing_entry) settings.write() def get_refresh_tokens(self): existing_entry = loads(settings.TrueLayer.RefreshKeys) return existing_entry def get_key(self, account): if account in self.tokens: return self.tokens[account] else: return False def create_auth_url(self, account): query = urllib.parse.urlencode( { "response_type": "code", "response_mode": "form_post", "client_id": settings.TrueLayer.ID, "scope": "info accounts balance transactions offline_access", "nonce": int(time()), "redirect_uri": settings.TrueLayer.CallbackURL, "enable_mock": "true", } ) auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}" self.current_authcode_account = account return auth_uri def handle_authcode_received(self, authcode): data = { "client_id": settings.TrueLayer.ID, "client_secret": settings.TrueLayer.Key, "code": authcode, "grant_type": "authorization_code", "redirect_uri": settings.TrueLayer.CallbackURL, } r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data) try: parsed = r.json() except JSONDecodeError: return False if "error" in parsed: self.log.error("Error requesting refresh token: {parsed['error']}") return False # Extract the access tokens refresh_token = parsed["refresh_token"] access_token = parsed["access_token"] # Add the refresh token self.add_refresh_token(refresh_token) # Add the access if self.current_authcode_account: self.tokens[self.current_authcode_account] = access_token else: self.log.error(f"Received an authcode we didn't ask for") return self.log.info(f"Retrieved access/refresh tokens for {self.current_authcode_account}") def get_new_tokens_all(self): refresh_list = loads(settings.TrueLayer.RefreshKeys) for account in refresh_list: self.get_new_token(account) def get_new_token(self, account): """ Exchange our refresh token for an access token. :param account: account to refresh the token for :type account: """ refresh_tokens = self.get_refresh_tokens() if account not in refresh_tokens: return headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", "refresh_token": refresh_tokens[account], "client_id": settings.TrueLayer.ID, "client_secret": settings.TrueLayer.Key, } r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data, headers=headers) try: parsed = r.json() except JSONDecodeError: return False if r.status_code == 200: if "access_token" in parsed.keys(): self.tokens[account] = parsed["access_token"] self.log.info(f"Refreshed access token for {account}") return True else: self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) return False else: self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) return False def get_accounts(self, account): """ Get a list of accounts. """ token = self.get_key(account) headers = {"Authorization": f"Bearer {token}"} path = f"{settings.TrueLayer.DataBase}/accounts" r = requests.get(path, headers=headers) try: parsed = r.json() except JSONDecodeError: self.log.error("Error parsing accounts response: {content}", content=r.content) return False return parsed def get_transactions(self, account, account_id): """ Get a list of transactions from an account. :param account_id: account to fetch transactions for :return: list of transactions :rtype: dict """ token = self.get_key(account) headers = {"Authorization": f"Bearer {token}"} path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions" r = requests.get(path, headers=headers) try: parsed = r.json() except JSONDecodeError: self.log.error("Error parsing transactions response: {content}", content=r.content) return False return parsed["results"]