Begin implementing TrueLayer API
This commit is contained in:
parent
dcfc4510ad
commit
d566b4da16
|
@ -0,0 +1,94 @@
|
||||||
|
# Twisted/Klein imports
|
||||||
|
from twisted.logger import Logger
|
||||||
|
|
||||||
|
# Other library imports
|
||||||
|
import requests
|
||||||
|
from json import dumps
|
||||||
|
from simplejson.errors import JSONDecodeError
|
||||||
|
|
||||||
|
# Project imports
|
||||||
|
from settings import settings
|
||||||
|
|
||||||
|
|
||||||
|
class TrueLayer(object):
|
||||||
|
"""
|
||||||
|
Class to manage calls to Open Banking APIs through TrueLayer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.log = Logger("truelayer")
|
||||||
|
self.token = None
|
||||||
|
# self.auth = HTTPBasicAuth(settings.Yapily.ID, settings.Yapily.Key)
|
||||||
|
# print(self.get_institutions())
|
||||||
|
# authorisation = self.get_authorisation("monzo_ob", settings.Yapily.CallbackURL)
|
||||||
|
# print("authirisation", authorisation)
|
||||||
|
|
||||||
|
def setup_auth(self):
|
||||||
|
"""
|
||||||
|
Exchange an authentication code for an access and refresh token.
|
||||||
|
"""
|
||||||
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||||
|
data = {
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": settings.TrueLayer.Monzo_AuthCode,
|
||||||
|
"client_id": settings.TrueLayer.ID,
|
||||||
|
"client_secret": settings.TrueLayer.Key,
|
||||||
|
"redirect_uri": "https://console.truelayer.com/redirect-page",
|
||||||
|
}
|
||||||
|
r = requests.post(f"{settings.TrueLayer.Base}/connect/token", data=data, headers=headers)
|
||||||
|
try:
|
||||||
|
parsed = r.json()
|
||||||
|
except JSONDecodeError:
|
||||||
|
self.log.error("Error parsing access token response: {content}", content=r.content)
|
||||||
|
return False
|
||||||
|
print("PARSED", parsed)
|
||||||
|
if r.status_code == 200:
|
||||||
|
try:
|
||||||
|
settings.TrueLayer.Monzo_RefreshToken = parsed["refresh_token"]
|
||||||
|
settings.TrueLayer.Monzo_SetupToken = "0"
|
||||||
|
settings.write()
|
||||||
|
self.log.info("Refreshed refresh token")
|
||||||
|
self.token = parsed["access_token"]
|
||||||
|
self.log.info("Refreshed access token")
|
||||||
|
except KeyError:
|
||||||
|
self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed)
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_new_token(self, _=None):
|
||||||
|
"""
|
||||||
|
Exchange our refresh token for an access token.
|
||||||
|
"""
|
||||||
|
print("calling get new token")
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_institutions(self, filter_name=None):
|
||||||
|
"""
|
||||||
|
Get a list of supported institutions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
path = f"{settings.Yapily.Base}/institutions"
|
||||||
|
r = requests.get(path, auth=self.auth)
|
||||||
|
try:
|
||||||
|
parsed = r.json()
|
||||||
|
except JSONDecodeError:
|
||||||
|
self.log.error("Error parsing institutions response: {content}", content=r.content)
|
||||||
|
return False
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
def get_authorisation(self, institution_id, callback_url):
|
||||||
|
"""
|
||||||
|
Get an authorisation URL for linking a bank account of an institution.
|
||||||
|
"""
|
||||||
|
headers = {"Content-Type": "application/json"}
|
||||||
|
data = {"applicationUserId": "account-data-and-transactions-tutorial", "institutionId": institution_id, "callback": callback_url}
|
||||||
|
path = f"{settings.Yapily.Base}/account-auth-requests"
|
||||||
|
r = requests.post(path, headers=headers, data=dumps(data), auth=self.auth)
|
||||||
|
try:
|
||||||
|
parsed = r.json()
|
||||||
|
except JSONDecodeError:
|
||||||
|
self.log.error("Error parsing institutions response: {content}", content=r.content)
|
||||||
|
return False
|
||||||
|
return parsed
|
Loading…
Reference in New Issue