From 4a9e6116511cc1c7359d9300da343929d86dd645 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 27 Jan 2022 13:45:08 +0000 Subject: [PATCH] Implement updating ads with exponential backoff --- handler/agora.py | 54 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/handler/agora.py b/handler/agora.py index 9a4218d..844cef5 100644 --- a/handler/agora.py +++ b/handler/agora.py @@ -10,6 +10,7 @@ from agoradesk_py import AgoraDesk from httpx import ReadTimeout, ReadError from pycoingecko import CoinGeckoAPI from datetime import datetime +from time import sleep # Project imports from settings import settings @@ -316,6 +317,9 @@ class Agora(object): if coin == "bitcoin": coin = "bitcoins" ads = self.agora._api_call(api_method=f"buy-{coin}-online/{currency}/REVOLUT", query_values={"page": page}) + if ads is None: + print("public ads is none for", coin, currency) + return False if ads is False: print("public ads is false for", coin, currency) return False @@ -352,6 +356,9 @@ class Agora(object): print("wrap public ads is false for", asset, currency) return False ads = list(ads_obj) + if ads_obj is False: + print("wrap public ads is false2 for", asset, currency) + return False if not rates: # Set the price based on the asset base_currency_price = self.cg.get_price(ids=coin, vs_currencies=currency)[coin][currency.lower()] @@ -372,6 +379,7 @@ class Agora(object): @handle_exceptions def update_prices(self, xmr=True, btc=True): our_ads = self.enum_ads() + to_update = [] if our_ads is None: print("our ads is none for", xmr, btc) return False @@ -434,12 +442,50 @@ class Agora(object): # Don't waste API rate limits on setting the same margin as before if new_margin != our_margin: - rtrn = self.agora.ad_equation(ad_id, new_formula) - if not rtrn["success"]: - self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"]) + # rtrn = self.agora.ad_equation(ad_id, new_formula) + to_update.append([ad_id, new_formula, False]) + # if not rtrn["success"]: + # self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"]) self.log.info("Rate for {currency}: {margin}", currency=currency, margin=new_margin) else: self.log.info("Not changed rate for {currency}, keeping old margin of {margin}", currency=currency, margin=our_margin) + self.slow_ad_update(to_update) + + def slow_ad_update(self, ads): + """ + Slow ad equation update utilising exponential backoff in order to guarantee all ads are updated. + :param ads: our list of ads + """ + self.log.info("Beginning slow ad update for {num} ads", num=len(ads)) + iterations = 0 + throttled = 0 + while not all([x[2] for x in ads]) or iterations == 1000: + for ad_index in range(len(ads)): + ad_id, new_formula, actioned = ads[ad_index] + if not actioned: + rtrn = self.agora.ad_equation(ad_id, new_formula) + if rtrn["success"]: + ads[ad_index][2] = True + throttled = 0 + self.log.info("Successfully updated ad: {id}", id=ad_id) + continue + else: + if rtrn["response"]["error"]["error_code"] == 429: + throttled += 1 + sleep_time = pow(throttled, float(settings.Agora.SleepExponent)) + self.log.info( + "Throttled {x} times while updating {id}, sleeping for {sleep} seconds", + x=throttled, + id=ad_id, + sleep=sleep_time, + ) + # We're running in a thread, so this is fine + sleep(sleep_time) + self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"]) + continue + iterations += 1 + self.log.info("Slow ad update completed with {iterations} iterations", iterations=iterations) + self.irc.sendmsg(f"Slow ad update completed with {iterations} iterations") def autoprice(self, ads, currency): """ @@ -721,6 +767,8 @@ class Agora(object): Withdraw excess funds to our XMR/BTC wallets. """ totals_all = self.tx.get_total() + if totals_all is False: + return False wallet_xmr, _ = totals_all[2]