# Twisted/Klein imports from twisted.logger import Logger from twisted.internet.task import LoopingCall # Other library imports import requests from simplejson.errors import JSONDecodeError from time import time import urllib # Project imports from settings import settings class TrueLayer(object): """ Class to manage calls to Open Banking APIs through TrueLayer. """ def __init__(self): self.log = Logger("truelayer") self.token = None self.lc = LoopingCall(self.get_new_token) self.lc.start(int(settings.TrueLayer.RefreshSec)) def setup_auth(self): pass def create_auth_url(self): query = urllib.parse.urlencode( { "response_type": "code", "response_mode": "form_post", "client_id": settings.TrueLayer.ID, "scope": "info accounts balance transactions offline_access", "nonce": int(time()), "redirect_uri": settings.TrueLayer.CallbackURL, "enable_mock": "true", } ) auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}" return auth_uri def handle_authcode_received(self, authcode): data = { "client_id": settings.TrueLayer.ID, "client_secret": settings.TrueLayer.Key, "code": authcode, "grant_type": "authorization_code", "redirect_uri": settings.TrueLayer.CallbackURL, } r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data) try: parsed = r.json() except JSONDecodeError: return False if "error" in parsed: self.log.error("Error requesting refresh token: {error}", error=parsed["error"]) return False settings.TrueLayer.RefreshToken = parsed["refresh_token"] settings.TrueLayer.AuthCode = authcode settings.write() self.token = parsed["access_token"] self.log.info("Retrieved access/refresh tokens - TrueLayer") def get_new_token(self, fail=False): """ Exchange our refresh token for an access token. """ if not settings.TrueLayer.RefreshToken: return headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", "refresh_token": settings.TrueLayer.RefreshToken, "client_id": settings.TrueLayer.ID, "client_secret": settings.TrueLayer.Key, } r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data, headers=headers) try: parsed = r.json() except JSONDecodeError: if fail: exit() return False if r.status_code == 200: if "access_token" in parsed.keys(): self.token = parsed["access_token"] self.log.info("Refreshed access token - TrueLayer") return True else: self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) if fail: exit() return False else: self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) if fail: exit() return False def get_accounts(self): """ Get a list of accounts. """ headers = {"Authorization": f"Bearer {self.token}"} path = f"{settings.TrueLayer.DataBase}/accounts" r = requests.get(path, headers=headers) try: parsed = r.json() except JSONDecodeError: self.log.error("Error parsing institutions response: {content}", content=r.content) return False return parsed