pluto/handler/sinks/truelayer.py

125 lines
4.2 KiB
Python

# Twisted/Klein imports
from twisted.internet.task import LoopingCall
# Other library imports
import requests
from simplejson.errors import JSONDecodeError
from time import time
import urllib
# Project imports
from settings import settings
import util
class TrueLayer(util.Base):
"""
Class to manage calls to Open Banking APIs through TrueLayer.
"""
def __init__(self):
super().__init__()
self.token = None
self.lc = LoopingCall(self.get_new_token)
self.lc.start(int(settings.TrueLayer.RefreshSec))
def create_auth_url(self):
query = urllib.parse.urlencode(
{
"response_type": "code",
"response_mode": "form_post",
"client_id": settings.TrueLayer.ID,
"scope": "info accounts balance transactions offline_access",
"nonce": int(time()),
"redirect_uri": settings.TrueLayer.CallbackURL,
"enable_mock": "true",
}
)
auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}"
return auth_uri
def handle_authcode_received(self, authcode):
data = {
"client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key,
"code": authcode,
"grant_type": "authorization_code",
"redirect_uri": settings.TrueLayer.CallbackURL,
}
r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data)
try:
parsed = r.json()
except JSONDecodeError:
return False
if "error" in parsed:
self.log.error("Error requesting refresh token: {error}", error=parsed["error"])
return False
settings.TrueLayer.RefreshToken = parsed["refresh_token"]
settings.TrueLayer.AuthCode = authcode
settings.write()
self.token = parsed["access_token"]
self.log.info("Retrieved access/refresh tokens")
def get_new_token(self):
"""
Exchange our refresh token for an access token.
"""
if not settings.TrueLayer.RefreshToken:
return
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": settings.TrueLayer.RefreshToken,
"client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key,
}
r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
return False
if r.status_code == 200:
if "access_token" in parsed.keys():
self.token = parsed["access_token"]
self.log.info("Refreshed access token")
return True
else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False
def get_accounts(self):
"""
Get a list of accounts.
"""
headers = {"Authorization": f"Bearer {self.token}"}
path = f"{settings.TrueLayer.DataBase}/accounts"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing accounts response: {content}", content=r.content)
return False
return parsed
def get_transactions(self, account_id):
"""
Get a list of transactions from an account.
:param account_id: account to fetch transactions for
:return: list of transactions
:rtype: dict
"""
headers = {"Authorization": f"Bearer {self.token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing transactions response: {content}", content=r.content)
return False
return parsed["results"]