Implement Revolut refresh token retrieval

This commit is contained in:
Mark Veidemanis 2021-12-24 17:26:54 +00:00
parent ff91b0ac81
commit 9ba5721872
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
1 changed files with 56 additions and 4 deletions

View File

@ -5,6 +5,9 @@ from twisted.logger import Logger
from json import dumps from json import dumps
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
import requests import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import jwt
# Project imports # Project imports
from settings import settings from settings import settings
@ -15,9 +18,59 @@ class Revolut(object):
Class to handle Revolut API calls. Class to handle Revolut API calls.
""" """
def __init__(self): def __init__(self, irc):
self.log = Logger("revolut") self.log = Logger("revolut")
self.token = None self.token = None
self.irc = irc
def setup_auth(self):
self.create_new_jwt()
self.get_access_token()
def create_new_jwt(self):
payload = {
"iss": settings.Revolut.Domain,
"sub": settings.Revolut.ClientID,
"aud": "https://revolut.com",
"exp": int(settings.Revolut.Expiry),
}
with open(settings.Revolut.PrivateKey, "rb") as f:
pem_bytes = f.read()
# payload = {jwt_header, jwt_body}
private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend())
encoded = jwt.encode(payload, private_key, algorithm="RS256")
settings.Revolut.JWT = encoded
settings.write()
def get_access_token(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "authorization_code",
"code": settings.Revolut.AuthCode,
"client_id": settings.Revolut.ClientID,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": settings.Revolut.JWT,
}
r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
return False
if r.status_code == 200:
try:
settings.Revolut.RefreshToken = parsed["refresh_token"]
settings.Revolut.SetupToken = "0"
settings.write()
self.log.info("Refreshed refresh token: {refresh_token}", refresh_token=settings.Revolut.RefreshToken)
self.token = parsed["access_token"]
self.log.info("Refreshed access token: {access_token}", access_token=self.token)
except KeyError:
self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed)
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False
def get_new_token(self): def get_new_token(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
@ -36,9 +89,8 @@ class Revolut(object):
if r.status_code == 200: if r.status_code == 200:
if "access_token" in parsed.keys(): if "access_token" in parsed.keys():
self.token = parsed["access_token"] self.token = parsed["access_token"]
if len(self.token) == len(settings.Revolut.RefreshToken): self.log.info("Refreshed access token: {access_token}", access_token=self.token)
self.log.info("Refreshed access token: {access_token}", access_token=self.token) return True
return True
else: else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
return False return False