You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
3.1 KiB

#!/usr/bin/env python3
# Twisted/Klein imports
from twisted.logger import Logger
from twisted.internet import reactor
from klein import Klein
# Other library imports
from json import dumps, loads
from json.decoder import JSONDecodeError
from signal import signal, SIGINT
# Project imports
from settings import settings
import util
# Old style classes
from agora import Agora
from transactions import Transactions
from markets import Markets
from money import Money
# New style classes
import sinks
import ux
init_map = None
# TODO: extend this with more
def cleanup(sig, frame):
if init_map:
except: # noqa
pass # noqa
signal(SIGINT, cleanup) # Handle Ctrl-C and run the cleanup routine
def convert(data):
if isinstance(data, bytes):
return data.decode("ascii")
if isinstance(data, dict):
return dict(map(convert, data.items()))
if isinstance(data, tuple):
return map(convert, data)
return data
class WebApp(object):
Our Klein webapp.
app = Klein()
def __init__(self):
self.log = Logger("webapp")
@app.route("/callback", methods=["POST"])
def callback(self, request):
content =
parsed = loads(content)
except JSONDecodeError:
self.log.error("Failed to parse JSON callback: {content}", content=content)
return dumps(False)"Callback received: {parsed}", parsed=parsed["data"]["id"])
# self.tx.transaction(parsed)
return dumps(True)
# set up another connection to a bank
@app.route("/signin", methods=["GET"])
def signin(self, request):
auth_url = self.truelayer.create_auth_url()
return f'Please sign in <a href="{auth_url}" target="_blank">here.</a>'
# endpoint called after we finish setting up a connection above
@app.route("/callback-truelayer", methods=["POST"])
def signin_callback(self, request):
code = request.args[b"code"]
return dumps(True)
@app.route("/accounts", methods=["GET"])
def balance(self, request):
accounts = self.truelayer.get_accounts()
return dumps(accounts, indent=2)
if __name__ == "__main__":
init_map = {
"ux": ux.UX(),
"agora": Agora(),
"markets": Markets(),
"sinks": sinks.Sinks(),
"tx": Transactions(),
"webapp": WebApp(),
"money": Money(),
# Merge all classes into each other
# Let the classes know they have been merged
for class_name, class_instance in init_map.items():
if hasattr(class_instance, "__xmerged__"):
# Set up the loops to put data in ES
# Run the WebApp
init_map["webapp"], 8080)