From 9f8718a83b80b31c5b409f7f3d84f76e43badec5 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Mon, 2 May 2022 16:33:00 +0100 Subject: [PATCH] Implement async for expensive Nordigen calls --- handler/sinks/nordigen.py | 69 ++++++++++++++++++++++++++++--------- handler/sources/__init__.py | 2 ++ 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/handler/sinks/nordigen.py b/handler/sinks/nordigen.py index a9e7a1c..749efff 100644 --- a/handler/sinks/nordigen.py +++ b/handler/sinks/nordigen.py @@ -1,5 +1,6 @@ # Twisted/Klein imports from twisted.internet.task import LoopingCall +from twisted.internet.defer import inlineCallbacks # Other library imports import requests @@ -17,6 +18,7 @@ from lib.serde.nordigen import ( ) from serde import ValidationError from hashlib import sha256 +import treq # Project imports from settings import settings @@ -34,20 +36,28 @@ class Nordigen(util.Base): self.token = None self.banks = {} self.authed = False + self.requisitions = None # Get the banks from the config and cache them + self.log.debug("Getting mapped accounts.") self.get_mapped_accounts() + self.log.debug("Finished getting mapped accounts.") + self.log.debug("Creating loop to get access token.") self.lc = LoopingCall(self.get_access_token) self.lc.start(int(settings.Nordigen.TokenRefreshSec)) + self.log.debug("Finished creating loops.") def __authed__(self): """ Called when we have received the access token. """ self.log.info("Connection authenticated.") - account_infos = self.get_all_account_info() + # self.get_requisitions() + d = self.get_all_account_info() + d.addCallback(self.got_all_account_info) + def got_all_account_info(self, account_infos): # Filter for added accounts since we only do that for TrueLayer account_infos = { bank: accounts @@ -64,8 +74,23 @@ class Nordigen(util.Base): def transaction_loop(self): for account_id in self.banks: transactions = self.get_transactions(account_id) - self.sinks.got_transactions("nordigen", account_id, transactions) + transactions.addCallback(self.got_transactions, account_id) + def got_transactions(self, transactions, account_id): + self.sinks.got_transactions("nordigen", account_id, transactions) + + def generic_deferred(self, response, dest_func): + """ + Generic function to take a treq response and fire a callback with + its content to dest_func. + :param response: a treq response + :param dest_func: function to call with the response data + """ + self.log.debug(f"Generic deferred received: {response}") + content = response.content() + content.addCallback(dest_func) + + @inlineCallbacks def get_access_token(self): """ Get an access token. @@ -81,9 +106,11 @@ class Nordigen(util.Base): "secret_key": settings.Nordigen.Key, } path = f"{settings.Nordigen.Base}/token/new/" - r = requests.post(path, headers=headers, data=dumps(data)) + self.log.debug("Getting new access token.") + d = yield treq.post(path, headers=headers, data=data) + content = yield d.content() try: - obj = AccessToken.from_json(r.content) + obj = AccessToken.from_json(content) except ValidationError as err: self.log.error(f"Validation error: {err}") return @@ -168,6 +195,7 @@ class Nordigen(util.Base): return False return link + @inlineCallbacks def get_requisitions(self): """ Get a list of active accounts. @@ -177,9 +205,10 @@ class Nordigen(util.Base): "Authorization": f"Bearer {self.token}", } path = f"{settings.Nordigen.Base}/requisitions" - r = requests.get(path, headers=headers) + d = yield treq.get(path, headers=headers) + content = yield d.content() try: - obj = Requisitions.from_json(r.content) + obj = Requisitions.from_json(content) except ValidationError as err: self.log.error(f"Validation error: {err}") return @@ -209,6 +238,7 @@ class Nordigen(util.Base): parsed = obj.to_dict() return parsed + @inlineCallbacks def get_accounts(self, requisition): """ Get a list of accounts for a requisition. @@ -218,9 +248,10 @@ class Nordigen(util.Base): "Authorization": f"Bearer {self.token}", } path = f"{settings.Nordigen.Base}/requisitions/{requisition}/" - r = requests.get(path, headers=headers) + d = yield treq.get(path, headers=headers) + content = yield d.content() try: - obj = Agreement.from_json(r.content) + obj = Agreement.from_json(content) except ValidationError as err: self.log.error(f"Validation error: {err}") return @@ -236,6 +267,7 @@ class Nordigen(util.Base): ownernames = loads(settings.Nordigen.OwnerNames) return ownernames + @inlineCallbacks def get_account(self, account_id): """ Get details of an account. @@ -245,10 +277,10 @@ class Nordigen(util.Base): "Authorization": f"Bearer {self.token}", } path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/" - r = requests.get(path, headers=headers) - print("GET ACCOUNT", account_id, r.content) + d = yield treq.get(path, headers=headers) + content = yield d.content() try: - obj = AccountDetails.from_json(r.content) + obj = AccountDetails.from_json(content) except ValidationError as err: self.log.error(f"Validation error: {err}") return @@ -283,7 +315,7 @@ class Nordigen(util.Base): existing_entry = loads(settings.Nordigen.Maps) self.banks = existing_entry - def map_account(self, account_id): + def map_account(self, account_id): # TODO: inlineCallbacks? """ Map an account_id at a bank to an account_name. This enables the account for fetching. @@ -323,18 +355,19 @@ class Nordigen(util.Base): self.banks = existing_entry settings.write() + @inlineCallbacks def get_all_account_info(self): to_return = {} - requisitions = self.get_requisitions() + requisitions = yield self.get_requisitions() if not requisitions: self.log.error("Could not get requisitions.") return {} for req in requisitions: if not req["accounts"]: continue - accounts = self.get_accounts(req["id"]) + accounts = yield self.get_accounts(req["id"]) for account_id in accounts: - account_info = self.get_account(account_id) + account_info = yield self.get_account(account_id) if not account_info: continue if req["institution_id"] in to_return: @@ -366,6 +399,7 @@ class Nordigen(util.Base): transaction["reference"] = transaction["remittanceInformationUnstructured"] del transaction["remittanceInformationUnstructured"] + @inlineCallbacks def get_transactions(self, account_id): """ Get all transactions for an account. @@ -378,9 +412,10 @@ class Nordigen(util.Base): "Authorization": f"Bearer {self.token}", } path = f"{settings.Nordigen.Base}/accounts/{account_id}/transactions/" - r = requests.get(path, headers=headers) + d = yield treq.get(path, headers=headers) + content = yield d.content() try: - obj = TXRoot.from_json(r.content) + obj = TXRoot.from_json(content) except ValidationError as err: self.log.error(f"Validation error: {err}") return diff --git a/handler/sources/__init__.py b/handler/sources/__init__.py index 47e1de6..70920d7 100644 --- a/handler/sources/__init__.py +++ b/handler/sources/__init__.py @@ -17,8 +17,10 @@ class Sources(util.Base): self.lbtc = sources.localbitcoins.LBTC() def __irc_started__(self): + self.log.debug("IRC hook called.") self.agora.setup_loop() self.lbtc.setup_loop() + self.log.debug("Finished setting up loops.") def __xmerged__(self): """