# Twisted/Klein imports from twisted.logger import Logger # Other library imports from json import dumps from json.decoder import JSONDecodeError import requests from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import jwt # Project imports from settings import settings class Revolut(object): """ Class to handle Revolut API calls. """ def __init__(self, irc): self.log = Logger("revolut") self.token = None self.irc = irc def setup_auth(self): self.create_new_jwt() self.get_access_token() def create_new_jwt(self): payload = { "iss": settings.Revolut.Domain, "sub": settings.Revolut.ClientID, "aud": "https://revolut.com", "exp": int(settings.Revolut.Expiry), } with open(settings.Revolut.PrivateKey, "rb") as f: pem_bytes = f.read() # payload = {jwt_header, jwt_body} private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend()) encoded = jwt.encode(payload, private_key, algorithm="RS256") settings.Revolut.JWT = encoded settings.write() def get_access_token(self): headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "authorization_code", "code": settings.Revolut.AuthCode, "client_id": settings.Revolut.ClientID, "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", "client_assertion": settings.Revolut.JWT, } r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers) try: parsed = r.json() except JSONDecodeError: return False if r.status_code == 200: try: settings.Revolut.RefreshToken = parsed["refresh_token"] settings.Revolut.SetupToken = "0" settings.write() self.log.info("Refreshed refresh token: {refresh_token}", refresh_token=settings.Revolut.RefreshToken) self.token = parsed["access_token"] self.log.info("Refreshed access token: {access_token}", access_token=self.token) except KeyError: self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed) return False else: self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) return False def get_new_token(self): headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", "refresh_token": settings.Revolut.RefreshToken, "client_id": settings.Revolut.ClientID, "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", "client_assertion": settings.Revolut.JWT, } r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers) try: parsed = r.json() except JSONDecodeError: return False if r.status_code == 200: if "access_token" in parsed.keys(): self.token = parsed["access_token"] self.log.info("Refreshed access token: {access_token}", access_token=self.token) return True else: self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) return False else: self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) return False def setup_webhook(self): self.log.info("Setting up webhook: {url}", url=settings.Revolut.WebhookURL) headers = {"Authorization": f"Bearer {self.token}"} data = {"url": settings.Revolut.WebhookURL} r = requests.post(f"{settings.Revolut.Base}/webhook", data=dumps(data), headers=headers) if r.status_code == 204: self.log.info("Set up webhook: {url}", url=settings.Revolut.WebhookURL) return dumps({"success": True}) else: parsed = r.json() self.log.info("Failed setting up webhook: {parsed}", parsed=parsed)