pluto/handler/revolut.py

55 lines
2.0 KiB
Python

# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
from json import dumps
import requests
# Project imports
from settings import settings
class Revolut(object):
"""
Class to handle Revolut API calls.
"""
def __init__(self):
self.log = Logger("revolut")
def get_new_token(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "refresh_token",
"refresh_token": settings.refresh_token,
"client_id": settings.client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": settings.jwt,
}
r = requests.post(f"{settings.api_base}/auth/token", data=data, headers=headers)
parsed = r.json()
if r.status_code == 200:
if "access_token" in parsed.keys():
settings.access_token = parsed["access_token"]
if len(settings.access_token) == len(settings.refresh_token):
self.log.info("Refreshed access token: {access_token}", access_token=settings.access_token)
return True
else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False
def setup_webhook(self):
self.log.info("Setting up webhook: {url}", url=settings.webhook_url)
headers = {"Authorization": f"Bearer {settings.access_token}"}
data = {"url": settings.webhook_url}
r = requests.post(f"{settings.api_base}/webhook", data=dumps(data), headers=headers)
if r.status_code == 204:
self.log.info("Set up webhook: {url}", url=settings.webhook_url)
return dumps({"success": True})
else:
parsed = r.json()
self.log.info("Failed setting up webhook: {parsed}", parsed=parsed)