Implement async for expensive Nordigen calls

This commit is contained in:
Mark Veidemanis 2022-05-02 16:33:00 +01:00
parent 620adbe123
commit 9f8718a83b
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
2 changed files with 54 additions and 17 deletions

View File

@ -1,5 +1,6 @@
# Twisted/Klein imports
from twisted.internet.task import LoopingCall
from twisted.internet.defer import inlineCallbacks
# Other library imports
import requests
@ -17,6 +18,7 @@ from lib.serde.nordigen import (
)
from serde import ValidationError
from hashlib import sha256
import treq
# Project imports
from settings import settings
@ -34,20 +36,28 @@ class Nordigen(util.Base):
self.token = None
self.banks = {}
self.authed = False
self.requisitions = None
# Get the banks from the config and cache them
self.log.debug("Getting mapped accounts.")
self.get_mapped_accounts()
self.log.debug("Finished getting mapped accounts.")
self.log.debug("Creating loop to get access token.")
self.lc = LoopingCall(self.get_access_token)
self.lc.start(int(settings.Nordigen.TokenRefreshSec))
self.log.debug("Finished creating loops.")
def __authed__(self):
"""
Called when we have received the access token.
"""
self.log.info("Connection authenticated.")
account_infos = self.get_all_account_info()
# self.get_requisitions()
d = self.get_all_account_info()
d.addCallback(self.got_all_account_info)
def got_all_account_info(self, account_infos):
# Filter for added accounts since we only do that for TrueLayer
account_infos = {
bank: accounts
@ -64,8 +74,23 @@ class Nordigen(util.Base):
def transaction_loop(self):
for account_id in self.banks:
transactions = self.get_transactions(account_id)
self.sinks.got_transactions("nordigen", account_id, transactions)
transactions.addCallback(self.got_transactions, account_id)
def got_transactions(self, transactions, account_id):
self.sinks.got_transactions("nordigen", account_id, transactions)
def generic_deferred(self, response, dest_func):
"""
Generic function to take a treq response and fire a callback with
its content to dest_func.
:param response: a treq response
:param dest_func: function to call with the response data
"""
self.log.debug(f"Generic deferred received: {response}")
content = response.content()
content.addCallback(dest_func)
@inlineCallbacks
def get_access_token(self):
"""
Get an access token.
@ -81,9 +106,11 @@ class Nordigen(util.Base):
"secret_key": settings.Nordigen.Key,
}
path = f"{settings.Nordigen.Base}/token/new/"
r = requests.post(path, headers=headers, data=dumps(data))
self.log.debug("Getting new access token.")
d = yield treq.post(path, headers=headers, data=data)
content = yield d.content()
try:
obj = AccessToken.from_json(r.content)
obj = AccessToken.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
@ -168,6 +195,7 @@ class Nordigen(util.Base):
return False
return link
@inlineCallbacks
def get_requisitions(self):
"""
Get a list of active accounts.
@ -177,9 +205,10 @@ class Nordigen(util.Base):
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions"
r = requests.get(path, headers=headers)
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = Requisitions.from_json(r.content)
obj = Requisitions.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
@ -209,6 +238,7 @@ class Nordigen(util.Base):
parsed = obj.to_dict()
return parsed
@inlineCallbacks
def get_accounts(self, requisition):
"""
Get a list of accounts for a requisition.
@ -218,9 +248,10 @@ class Nordigen(util.Base):
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/requisitions/{requisition}/"
r = requests.get(path, headers=headers)
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = Agreement.from_json(r.content)
obj = Agreement.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
@ -236,6 +267,7 @@ class Nordigen(util.Base):
ownernames = loads(settings.Nordigen.OwnerNames)
return ownernames
@inlineCallbacks
def get_account(self, account_id):
"""
Get details of an account.
@ -245,10 +277,10 @@ class Nordigen(util.Base):
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/details/"
r = requests.get(path, headers=headers)
print("GET ACCOUNT", account_id, r.content)
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = AccountDetails.from_json(r.content)
obj = AccountDetails.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return
@ -283,7 +315,7 @@ class Nordigen(util.Base):
existing_entry = loads(settings.Nordigen.Maps)
self.banks = existing_entry
def map_account(self, account_id):
def map_account(self, account_id): # TODO: inlineCallbacks?
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
@ -323,18 +355,19 @@ class Nordigen(util.Base):
self.banks = existing_entry
settings.write()
@inlineCallbacks
def get_all_account_info(self):
to_return = {}
requisitions = self.get_requisitions()
requisitions = yield self.get_requisitions()
if not requisitions:
self.log.error("Could not get requisitions.")
return {}
for req in requisitions:
if not req["accounts"]:
continue
accounts = self.get_accounts(req["id"])
accounts = yield self.get_accounts(req["id"])
for account_id in accounts:
account_info = self.get_account(account_id)
account_info = yield self.get_account(account_id)
if not account_info:
continue
if req["institution_id"] in to_return:
@ -366,6 +399,7 @@ class Nordigen(util.Base):
transaction["reference"] = transaction["remittanceInformationUnstructured"]
del transaction["remittanceInformationUnstructured"]
@inlineCallbacks
def get_transactions(self, account_id):
"""
Get all transactions for an account.
@ -378,9 +412,10 @@ class Nordigen(util.Base):
"Authorization": f"Bearer {self.token}",
}
path = f"{settings.Nordigen.Base}/accounts/{account_id}/transactions/"
r = requests.get(path, headers=headers)
d = yield treq.get(path, headers=headers)
content = yield d.content()
try:
obj = TXRoot.from_json(r.content)
obj = TXRoot.from_json(content)
except ValidationError as err:
self.log.error(f"Validation error: {err}")
return

View File

@ -17,8 +17,10 @@ class Sources(util.Base):
self.lbtc = sources.localbitcoins.LBTC()
def __irc_started__(self):
self.log.debug("IRC hook called.")
self.agora.setup_loop()
self.lbtc.setup_loop()
self.log.debug("Finished setting up loops.")
def __xmerged__(self):
"""