pluto/handler/revolut.py

60 lines
2.2 KiB
Python

# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
from json import dumps
from json.decoder import JSONDecodeError
import requests
# Project imports
from settings import settings
class Revolut(object):
"""
Class to handle Revolut API calls.
"""
def __init__(self):
self.log = Logger("revolut")
self.token = None
def get_new_token(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": settings.Revolut.RefreshToken,
"client_id": settings.Revolut.ClientID,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": settings.Revolut.JWT,
}
r = requests.post(f"{settings.Revolut.Base}/auth/token", data=data, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
return False
if r.status_code == 200:
if "access_token" in parsed.keys():
self.token = parsed["access_token"]
if len(self.token) == len(settings.Revolut.RefreshToken):
self.log.info("Refreshed access token: {access_token}", access_token=self.token)
return True
else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False
def setup_webhook(self):
self.log.info("Setting up webhook: {url}", url=settings.Revolut.WebhookURL)
headers = {"Authorization": f"Bearer {self.token}"}
data = {"url": settings.Revolut.WebhookURL}
r = requests.post(f"{settings.Revolut.Base}/webhook", data=dumps(data), headers=headers)
if r.status_code == 204:
self.log.info("Set up webhook: {url}", url=settings.Revolut.WebhookURL)
return dumps({"success": True})
else:
parsed = r.json()
self.log.info("Failed setting up webhook: {parsed}", parsed=parsed)