# Twisted/Klein imports from twisted.logger import Logger # Other library imports from json import dumps from json.decoder import JSONDecodeError import requests # Project imports from settings import settings class Revolut(object): """ Class to handle Revolut API calls. """ def __init__(self): self.log = Logger("revolut") self.token = None def get_new_token(self): headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", "refresh_token": settings.Revolut.RefreshToken, "client_id": settings.Revolut.ClientID, "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", "client_assertion": settings.Revolut.JWT, } r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers) try: parsed = r.json() except JSONDecodeError: return False if r.status_code == 200: if "access_token" in parsed.keys(): self.token = parsed["access_token"] if len(self.token) == len(settings.Revolut.RefreshToken): self.log.info("Refreshed access token: {access_token}", access_token=self.token) return True else: self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) return False else: self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) return False def setup_webhook(self): self.log.info("Setting up webhook: {url}", url=settings.Revolut.WebhookURL) headers = {"Authorization": f"Bearer {self.token}"} data = {"url": settings.Revolut.WebhookURL} r = requests.post(f"{settings.Revolut.Base}/webhook", data=dumps(data), headers=headers) if r.status_code == 204: self.log.info("Set up webhook: {url}", url=settings.Revolut.WebhookURL) return dumps({"success": True}) else: parsed = r.json() self.log.info("Failed setting up webhook: {parsed}", parsed=parsed)