Move code into classes
This commit is contained in:
parent
ffd035dfbf
commit
57e5616531
|
@ -1,96 +1,58 @@
|
|||
#!/usr/sbin/env python3
|
||||
# Twisted/Klein imports
|
||||
from twisted.logger import Logger
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.task import LoopingCall, deferLater
|
||||
from klein import Klein
|
||||
|
||||
# Other library imports
|
||||
from json import dumps, loads
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
import pprint
|
||||
import requests
|
||||
|
||||
from settings import refresh_token, client_id, jwt
|
||||
|
||||
token_refresh_sec = 30
|
||||
|
||||
api_base = "https://sandbox-b2b.revolut.com/api/1.0"
|
||||
webhook_url = "https://callback-sandbox.pathogen.is/callback"
|
||||
|
||||
# SYSTEM VARIABLES BELOW #
|
||||
app = Klein()
|
||||
|
||||
access_token = ""
|
||||
|
||||
log = Logger()
|
||||
|
||||
pp = pprint.PrettyPrinter(indent=2)
|
||||
# Project imports
|
||||
from settings import settings
|
||||
from revolut import Revolut
|
||||
|
||||
|
||||
def get_new_token():
|
||||
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||
data = {
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
"client_id": client_id,
|
||||
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||
"client_assertion": jwt,
|
||||
}
|
||||
r = requests.post(f"{api_base}/auth/token", data=data, headers=headers)
|
||||
parsed = r.json()
|
||||
if r.status_code == 200:
|
||||
if "access_token" in parsed.keys():
|
||||
access_token = parsed["access_token"]
|
||||
if len(access_token) == len(refresh_token):
|
||||
log.info("Refreshed access token")
|
||||
return True
|
||||
else:
|
||||
log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
|
||||
return False
|
||||
else:
|
||||
log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
|
||||
return False
|
||||
class WebApp(object):
|
||||
"""
|
||||
Our Klein webapp.
|
||||
"""
|
||||
|
||||
app = Klein()
|
||||
|
||||
def transaction(sender, source_currency, dest_currency, source_amount, dest_amount):
|
||||
pass
|
||||
def __init__(self):
|
||||
self.revolut = Revolut()
|
||||
self.log = Logger("webapp")
|
||||
|
||||
def setup_webhook():
|
||||
log.info("Setting up webhook")
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
data = {
|
||||
"url": webhook_url
|
||||
}
|
||||
r = requests.post (f"{api_base}/webhook", data=data, headers=headers)
|
||||
parsed = r.json()
|
||||
if r.status_code == 200:
|
||||
log.info("Set up webhook")
|
||||
@app.route("/refresh", methods=["GET"])
|
||||
def refresh(self, request):
|
||||
rtrn = self.revolut.get_new_token()
|
||||
return dumps({"success": rtrn})
|
||||
|
||||
@app.route("/callback", methods=["POST"])
|
||||
def callback(self, request):
|
||||
content = request.content.read()
|
||||
try:
|
||||
parsed = loads(content)
|
||||
except JSONDecodeError:
|
||||
self.log.error("Failed to parse JSON callback: {content}", content=content)
|
||||
return dumps({"success": False})
|
||||
self.log.info("Callback received: {parsed}", parsed=parsed)
|
||||
return dumps({"success": True})
|
||||
else:
|
||||
log.info("Failed setting up webhook")
|
||||
|
||||
|
||||
@app.route("/refresh", methods=["GET"])
|
||||
def refresh(request):
|
||||
rtrn = get_new_token()
|
||||
return dumps({"success": rtrn})
|
||||
def start(webapp):
|
||||
"""
|
||||
Schedule to refresh the API token once the reactor starts, and create LoopingCapp to refresh it periodically.
|
||||
"""
|
||||
deferLater(reactor, 1, webapp.revolut.get_new_token)
|
||||
deferLater(reactor, 4, webapp.revolut.setup_webhook)
|
||||
lc = LoopingCall(webapp.revolut.get_new_token)
|
||||
lc.start(settings.token_refresh_sec)
|
||||
|
||||
|
||||
@app.route("/callback", methods=["POST"])
|
||||
def callback(request):
|
||||
content = request.content.read()
|
||||
try:
|
||||
parsed = loads(content)
|
||||
except JSONDecodeError:
|
||||
log.error("Failed to parse JSON callback: {content}", content=content)
|
||||
return dumps({"success": False})
|
||||
log.info("Callback received: {parsed}", parsed=parsed)
|
||||
return dumps({"success": True})
|
||||
|
||||
|
||||
# Set up loop to refresh token, but get one first
|
||||
deferLater(reactor, 1, get_new_token)
|
||||
deferLater(reactor, 2, setup_webhook)
|
||||
lc = LoopingCall(get_new_token)
|
||||
lc.start(token_refresh_sec)
|
||||
resource = app.resource
|
||||
app.run("127.0.0.1", 8080)
|
||||
if __name__ == "__main__":
|
||||
webapp = WebApp()
|
||||
start(webapp)
|
||||
webapp.app.run("127.0.0.1", 8080)
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
# Twisted/Klein imports
|
||||
from twisted.logger import Logger
|
||||
|
||||
# Other library imports
|
||||
from json import dumps
|
||||
import requests
|
||||
|
||||
# Project imports
|
||||
from settings import settings
|
||||
|
||||
|
||||
class Revolut(object):
|
||||
"""
|
||||
Class to handle Revolut API calls.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.log = Logger("revolut")
|
||||
|
||||
def get_new_token(self):
|
||||
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||
data = {
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": settings.refresh_token,
|
||||
"client_id": settings.client_id,
|
||||
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
|
||||
"client_assertion": settings.jwt,
|
||||
}
|
||||
r = requests.post(f"{settings.api_base}/auth/token", data=data, headers=headers)
|
||||
parsed = r.json()
|
||||
if r.status_code == 200:
|
||||
if "access_token" in parsed.keys():
|
||||
settings.access_token = parsed["access_token"]
|
||||
if len(settings.access_token) == len(settings.refresh_token):
|
||||
self.log.info("Refreshed access token: {access_token}", access_token=settings.access_token)
|
||||
return True
|
||||
else:
|
||||
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
|
||||
return False
|
||||
else:
|
||||
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
|
||||
return False
|
||||
|
||||
def setup_webhook(self):
|
||||
self.log.info("Setting up webhook: {url}", url=settings.webhook_url)
|
||||
headers = {"Authorization": f"Bearer {settings.access_token}"}
|
||||
data = {"url": settings.webhook_url}
|
||||
r = requests.post(f"{settings.api_base}/webhook", data=dumps(data), headers=headers)
|
||||
if r.status_code == 204:
|
||||
self.log.info("Set up webhook: {url}", url=settings.webhook_url)
|
||||
return dumps({"success": True})
|
||||
else:
|
||||
parsed = r.json()
|
||||
self.log.info("Failed setting up webhook: {parsed}", parsed=parsed)
|
|
@ -0,0 +1,13 @@
|
|||
from types import SimpleNamespace
|
||||
|
||||
pre_settings = {
|
||||
"token_refresh_sec": 100,
|
||||
"api_base": "https://sandbox-b2b.revolut.com/api/1.0",
|
||||
"webhook_url": "https://callback-sandbox.pathogen.is/callback",
|
||||
"refresh_token": "",
|
||||
"access_token": "",
|
||||
"client_id": "",
|
||||
"jwt": "",
|
||||
}
|
||||
|
||||
settings = SimpleNamespace(**pre_settings)
|
Loading…
Reference in New Issue