Implement getting TrueLayer access tokens

This commit is contained in:
Mark Veidemanis 2022-02-28 20:09:19 +00:00
parent a40423af8c
commit cdf99641ba
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
2 changed files with 35 additions and 8 deletions

View File

@ -79,9 +79,9 @@ class Revolut(object):
settings.Revolut.RefreshToken = parsed["refresh_token"] settings.Revolut.RefreshToken = parsed["refresh_token"]
settings.Revolut.SetupToken = "0" settings.Revolut.SetupToken = "0"
settings.write() settings.write()
self.log.info("Refreshed refresh token") self.log.info("Refreshed refresh token - Revolut")
self.token = parsed["access_token"] self.token = parsed["access_token"]
self.log.info("Refreshed access token") self.log.info("Refreshed access token - Revolut")
except KeyError: except KeyError:
self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed) self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed)
return False return False
@ -115,7 +115,7 @@ class Revolut(object):
if r.status_code == 200: if r.status_code == 200:
if "access_token" in parsed.keys(): if "access_token" in parsed.keys():
self.token = parsed["access_token"] self.token = parsed["access_token"]
self.log.info("Refreshed access token") self.log.info("Refreshed access token - Revolut")
return True return True
else: else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)

View File

@ -47,9 +47,9 @@ class TrueLayer(object):
settings.TrueLayer.Monzo_RefreshToken = parsed["refresh_token"] settings.TrueLayer.Monzo_RefreshToken = parsed["refresh_token"]
settings.TrueLayer.Monzo_SetupToken = "0" settings.TrueLayer.Monzo_SetupToken = "0"
settings.write() settings.write()
self.log.info("Refreshed refresh token") self.log.info("Refreshed refresh token - TrueLayer")
self.token = parsed["access_token"] self.token = parsed["access_token"]
self.log.info("Refreshed access token") self.log.info("Refreshed access token - TrueLayer")
except KeyError: except KeyError:
self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed) self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed)
return False return False
@ -57,12 +57,39 @@ class TrueLayer(object):
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False return False
def get_new_token(self, _=None): def get_new_token(self, fail=False):
""" """
Exchange our refresh token for an access token. Exchange our refresh token for an access token.
""" """
print("calling get new token") headers = {"Content-Type": "application/x-www-form-urlencoded"}
pass data = {
"grant_type": "refresh_token",
"refresh_token": settings.TrueLayer.Monzo_RefreshToken,
"client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key,
}
r = requests.post(f"{settings.TrueLayer.Base}/connect/token", data=data, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
if fail:
exit()
return False
if r.status_code == 200:
if "access_token" in parsed.keys():
self.token = parsed["access_token"]
self.log.info("Refreshed access token - TrueLayer")
return True
else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
if fail:
exit()
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
if fail:
exit()
return False
def get_institutions(self, filter_name=None): def get_institutions(self, filter_name=None):
""" """