From 9ba5721872ab439a9068ac1d21ec7b8790df80a9 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 24 Dec 2021 17:26:54 +0000 Subject: [PATCH] Implement Revolut refresh token retrieval --- handler/revolut.py | 60 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/handler/revolut.py b/handler/revolut.py index d931ddd..240f900 100644 --- a/handler/revolut.py +++ b/handler/revolut.py @@ -5,6 +5,9 @@ from twisted.logger import Logger from json import dumps from json.decoder import JSONDecodeError import requests +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.backends import default_backend +import jwt # Project imports from settings import settings @@ -15,9 +18,59 @@ class Revolut(object): Class to handle Revolut API calls. """ - def __init__(self): + def __init__(self, irc): self.log = Logger("revolut") self.token = None + self.irc = irc + + def setup_auth(self): + self.create_new_jwt() + self.get_access_token() + + def create_new_jwt(self): + payload = { + "iss": settings.Revolut.Domain, + "sub": settings.Revolut.ClientID, + "aud": "https://revolut.com", + "exp": int(settings.Revolut.Expiry), + } + with open(settings.Revolut.PrivateKey, "rb") as f: + pem_bytes = f.read() + + # payload = {jwt_header, jwt_body} + private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend()) + encoded = jwt.encode(payload, private_key, algorithm="RS256") + settings.Revolut.JWT = encoded + settings.write() + + def get_access_token(self): + headers = {"Content-Type": "application/x-www-form-urlencoded"} + data = { + "grant_type": "authorization_code", + "code": settings.Revolut.AuthCode, + "client_id": settings.Revolut.ClientID, + "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", + "client_assertion": settings.Revolut.JWT, + } + r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers) + try: + parsed = r.json() + except JSONDecodeError: + return False + if r.status_code == 200: + try: + settings.Revolut.RefreshToken = parsed["refresh_token"] + settings.Revolut.SetupToken = "0" + settings.write() + self.log.info("Refreshed refresh token: {refresh_token}", refresh_token=settings.Revolut.RefreshToken) + self.token = parsed["access_token"] + self.log.info("Refreshed access token: {access_token}", access_token=self.token) + except KeyError: + self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed) + return False + else: + self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) + return False def get_new_token(self): headers = {"Content-Type": "application/x-www-form-urlencoded"} @@ -36,9 +89,8 @@ class Revolut(object): if r.status_code == 200: if "access_token" in parsed.keys(): self.token = parsed["access_token"] - if len(self.token) == len(settings.Revolut.RefreshToken): - self.log.info("Refreshed access token: {access_token}", access_token=self.token) - return True + self.log.info("Refreshed access token: {access_token}", access_token=self.token) + return True else: self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) return False