pluto/handler/agora.py

533 lines
20 KiB
Python
Raw Normal View History

2021-12-24 02:23:38 +00:00
# Twisted/Klein imports
from twisted.logger import Logger
2021-12-27 19:30:03 +00:00
from twisted.internet.task import LoopingCall
2021-12-24 02:23:38 +00:00
# Other library imports
from json import loads
2021-12-24 02:23:38 +00:00
from forex_python.converter import CurrencyRates
2022-01-08 17:49:04 +00:00
from agoradesk_py import AgoraDesk
from httpx import ReadTimeout
2022-01-09 14:34:37 +00:00
from pycoingecko import CoinGeckoAPI
2021-12-24 02:23:38 +00:00
# Project imports
from settings import settings
class Agora(object):
"""
AgoraDesk API handler.
"""
2021-12-27 20:59:24 +00:00
def __init__(self):
2021-12-27 21:33:49 +00:00
"""
Initialise the AgoraDesk and CurrencyRates APIs.
Initialise the last_dash storage for detecting new trades.
"""
2021-12-24 02:23:38 +00:00
self.log = Logger("agora")
2021-12-24 17:26:31 +00:00
self.agora = AgoraDesk(settings.Agora.Token)
2021-12-24 02:23:38 +00:00
self.cr = CurrencyRates()
2022-01-09 14:34:37 +00:00
self.cg = CoinGeckoAPI()
# Cache for detecting new trades
2021-12-27 19:30:03 +00:00
self.last_dash = set()
2021-12-24 02:23:38 +00:00
# Cache for detecting new messages
self.last_messages = {}
2021-12-27 20:59:24 +00:00
def set_irc(self, irc):
self.irc = irc
def set_tx(self, tx):
self.tx = tx
2021-12-27 19:30:03 +00:00
def setup_loop(self):
2021-12-27 21:33:49 +00:00
"""
Set up the LoopingCall to get all active trades and messages.
2021-12-27 21:33:49 +00:00
"""
2022-01-01 16:34:32 +00:00
self.lc_dash = LoopingCall(self.loop_check)
self.lc_dash.start(int(settings.Agora.RefreshSec))
2022-01-11 21:40:55 +00:00
if settings.Agora.Cheat == "1":
self.lc_cheat = LoopingCall(self.update_prices)
self.lc_cheat.start(int(settings.Agora.CheatSec))
2022-01-11 21:40:05 +00:00
2022-01-09 22:11:39 +00:00
def wrap_dashboard(self):
try:
dash = self.agora.dashboard_seller()
except ReadTimeout:
return False
dash_tmp = {}
2021-12-28 14:09:33 +00:00
if "data" not in dash["response"].keys():
self.log.error("Data not in dashboard response: {content}", content=dash)
2022-01-09 22:11:39 +00:00
return dash_tmp
2021-12-27 19:30:03 +00:00
if dash["response"]["data"]["contact_count"] > 0:
for contact in dash["response"]["data"]["contact_list"]:
contact_id = contact["data"]["contact_id"]
dash_tmp[contact_id] = contact
2022-01-09 22:11:39 +00:00
return dash_tmp
def loop_check(self):
"""
Calls hooks to parse dashboard info and get all contact messages.
"""
dash_tmp = self.wrap_dashboard()
2021-12-31 18:21:56 +00:00
2022-01-01 16:34:32 +00:00
# Call dashboard hooks
self.dashboard_hook(dash_tmp)
# Get recent messages
try:
self.get_recent_messages()
except ReadTimeout:
pass
2021-12-27 19:30:03 +00:00
return dash_tmp
2021-12-24 02:23:38 +00:00
2022-01-01 16:34:32 +00:00
def get_dashboard(self):
"""
Get dashboard helper for IRC only.
"""
2022-01-09 22:11:39 +00:00
dash = self.wrap_dashboard()
rtrn = []
if not dash.items():
return
for contact_id, contact in dash.items():
reference = self.tx.tx_to_ref(contact_id)
buyer = contact["data"]["buyer"]["username"]
amount = contact["data"]["amount"]
amount_xmr = contact["data"]["amount_xmr"]
currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
rtrn.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
return rtrn
2022-01-01 16:34:32 +00:00
def dashboard_hook(self, dash):
2021-12-27 21:33:49 +00:00
"""
Get information about our open trades.
Post new trades to IRC and cache trades for the future.
2021-12-27 21:33:49 +00:00
"""
current_trades = []
if not dash.items():
return
for contact_id, contact in dash.items():
reference = self.tx.tx_to_ref(contact_id)
2021-12-29 18:40:44 +00:00
if reference:
current_trades.append(reference)
buyer = contact["data"]["buyer"]["username"]
amount = contact["data"]["amount"]
amount_xmr = contact["data"]["amount_xmr"]
currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
if reference not in self.last_dash:
2021-12-29 18:40:44 +00:00
reference = self.tx.new_trade(contact_id, buyer, currency, amount, amount_xmr)
if reference:
if reference not in current_trades:
current_trades.append(reference)
2022-01-01 16:34:32 +00:00
# Let us know there is a new trade
self.irc.sendmsg(f"AUTO {reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
2022-01-01 16:34:32 +00:00
# Note that we have seen this reference
self.last_dash.add(reference)
# Purge old trades from cache
for ref in list(self.last_dash): # We're removing from the list on the fly
if ref not in current_trades:
self.last_dash.remove(ref)
2021-12-29 18:40:44 +00:00
if reference and reference not in current_trades:
current_trades.append(reference)
2021-12-29 15:03:47 +00:00
self.tx.cleanup(current_trades)
2021-12-24 02:23:38 +00:00
2021-12-28 13:42:31 +00:00
def dashboard_release_urls(self):
"""
Get information about our open trades.
Post new trades to IRC and cache trades for the future.
:return: human readable list of strings about our trades or False
:rtype: list or bool
"""
dash = self.agora.dashboard_seller()
dash_tmp = []
if "data" not in dash["response"]:
2021-12-28 14:09:33 +00:00
self.log.error("Data not in dashboard response: {content}", content=dash)
2021-12-28 13:42:31 +00:00
return False
if dash["response"]["data"]["contact_count"] > 0:
for contact in dash["response"]["data"]["contact_list"]:
contact_id = contact["data"]["contact_id"]
buyer = contact["data"]["buyer"]["username"]
amount = contact["data"]["amount"]
amount_xmr = contact["data"]["amount_xmr"]
currency = contact["data"]["currency"]
release_url = contact["actions"]["release_url"]
if not contact["data"]["is_selling"]:
continue
reference = self.tx.tx_to_ref(contact_id)
if not reference:
reference = "not_set"
dash_tmp.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR {release_url}")
2021-12-28 13:42:31 +00:00
return dash_tmp
def get_recent_messages(self, send_irc=True):
2021-12-27 21:33:49 +00:00
"""
Get recent messages.
2021-12-27 21:33:49 +00:00
"""
messages_tmp = {}
messages = self.agora.recent_messages()
if not messages["success"]:
2021-12-30 14:15:24 +00:00
return False
2022-01-11 21:16:49 +00:00
if not "data" in messages["response"]:
self.log.error("Data not in messages response: {content}", content=messages["response"])
return False
open_tx = self.tx.get_ref_map().keys()
for message in messages["response"]["data"]["message_list"]:
contact_id = message["contact_id"]
username = message["sender"]["username"]
msg = message["msg"]
if contact_id not in open_tx:
continue
reference = self.tx.tx_to_ref(contact_id)
if reference in messages_tmp:
messages_tmp[reference].append([username, msg])
else:
messages_tmp[reference] = [[username, msg]]
# Send new messages on IRC
if send_irc:
for user, message in messages_tmp[reference]:
if reference in self.last_messages:
if not [user, message] in self.last_messages[reference]:
self.irc.sendmsg(f"AUTO {reference}: ({user}) {message}")
# Append sent messages to last_messages so we don't send them again
self.last_messages[reference].append([user, message])
else:
self.last_messages[reference] = [[user, message]]
for x in messages_tmp[reference]:
self.irc.sendmsg(f"NEW {reference}: ({user}) {message}")
# Purge old trades from cache
for ref in list(self.last_messages): # We're removing from the list on the fly
if ref not in messages_tmp:
del self.last_messages[ref]
return messages_tmp
2021-12-24 02:23:38 +00:00
2021-12-30 14:15:24 +00:00
def enum_ad_ids(self, page=0):
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
ads_total = []
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
ads_total.append(ad["data"]["ad_id"])
2021-12-31 00:39:52 +00:00
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
for ad in self.enum_ad_ids(page):
ads_total.append(ad)
return ads_total
def enum_ads(self, page=0):
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
ads_total = []
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
ads_total.append([ad["data"]["ad_id"], ad["data"]["countrycode"], ad["data"]["currency"]])
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
for ad in self.enum_ads(page):
ads_total.append([ad[0], ad[1], ad[2]])
2021-12-30 14:15:24 +00:00
return ads_total
2022-01-09 17:52:26 +00:00
def enum_public_ads(self, currency, page=0):
ads = self.agora._api_call(api_method=f"buy-monero-online/{currency}/REVOLUT", query_values={"page": page})
2022-01-09 14:34:37 +00:00
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
if ad["data"]["online_provider"] == "REVOLUT":
yield [ad["data"]["ad_id"], ad["data"]["profile"]["username"], ad["data"]["temp_price"]]
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
2022-01-09 17:52:26 +00:00
for ad in self.enum_public_ads(currency, page):
2022-01-09 14:34:37 +00:00
yield [ad[0], ad[1], ad[2]]
2022-01-09 17:52:26 +00:00
def wrap_public_ads(self, currency, rates=None):
2022-01-09 14:34:37 +00:00
"""
Wrapper to sort public ads.
"""
2022-01-09 17:52:26 +00:00
ads = list(self.enum_public_ads(currency.upper()))
2022-01-09 14:34:37 +00:00
if not rates:
base_monero_price = self.cg.get_price(ids="monero", vs_currencies=currency)["monero"][currency.lower()]
else:
base_monero_price = rates
for ad in ads:
price = float(ad[2])
rate = round(price / base_monero_price, 2)
ad.append(rate)
ads.sort(key=lambda x: float(x[2]))
return ads
def sort_ads_annotate_place(self, ads):
ads.sort(key=lambda x: float(x[2]))
2022-01-09 17:52:26 +00:00
place = None
2022-01-09 14:34:37 +00:00
for index, ad in enumerate(ads):
if ad[1] == settings.Agora.Username:
place = index
break
2022-01-09 17:52:26 +00:00
if place is None:
print("Place is none for ads:", ads)
2022-01-09 14:34:37 +00:00
return (ads, place)
def update_prices(self):
our_ads = self.enum_ads()
currencies = [x[2].lower() for x in our_ads]
2022-01-09 17:52:26 +00:00
public_ad_dict = {}
2022-01-09 14:34:37 +00:00
rates_xmr = self.cg.get_price(ids="monero", vs_currencies=currencies)
2022-01-09 17:52:26 +00:00
for currency in currencies:
try:
rates = rates_xmr["monero"][currency]
public_ad_dict[currency] = self.wrap_public_ads(currency, rates=rates)
except KeyError:
self.log.error("Error getting public ads for currency {currency}", currency=currency)
continue
2022-01-09 14:34:37 +00:00
for ad_id, country, currency in our_ads:
# Get the ads for this country/currency pair
try:
2022-01-09 17:52:26 +00:00
public_ads = public_ad_dict[currency.lower()]
2022-01-09 14:34:37 +00:00
except KeyError:
continue
2022-01-09 17:52:26 +00:00
new_margin = self.autoprice(public_ads, currency)
2022-01-09 14:34:37 +00:00
new_formula = f"coingeckoxmrusd*usd{currency.lower()}*{new_margin}"
rtrn = self.agora.ad_equation(ad_id, new_formula)
if not rtrn["success"]:
self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"])
2022-01-09 17:52:26 +00:00
self.log.info("Rate for {currency}: {margin}", currency=currency, margin=new_margin)
2022-01-09 14:34:37 +00:00
2022-01-09 17:52:26 +00:00
def autoprice(self, ads, currency):
2022-01-09 14:34:37 +00:00
"""
Helper function to automatically adjust the price up/down in certain markets
in order to gain the most profits and sales.
"""
ads, place = self.sort_ads_annotate_place(ads)
2022-01-11 21:22:16 +00:00
all_results_us = all([ad[1] == settings.Agora.Username for ad in ads])
2022-01-09 14:34:37 +00:00
if place == 0:
2022-01-11 21:22:16 +00:00
if len(ads) > 1 and not all_results_us:
2022-01-09 17:52:26 +00:00
competitor_index = None
for index, ad in enumerate(ads):
if ad[1] != settings.Agora.Username:
competitor_index = index
print("Found a competitor at index", competitor_index, ad)
break
else:
print("Skipping ad", ad)
if not competitor_index:
print("Couldn't find a competitor from ads:", ads)
return float(ads[0][3])
next_max = float(ads[competitor_index][3])
2022-01-09 14:34:37 +00:00
our_max_adjusted = next_max - 0.01
if our_max_adjusted > float(settings.Agora.MaxMargin):
our_max_adjusted = float(settings.Agora.MaxMargin)
# Lowball next competitor
self.log.info(
2022-01-09 17:52:26 +00:00
"Lowballing next competitor for {currency}: {margin}",
2022-01-09 14:34:37 +00:00
currency=currency,
margin=our_max_adjusted,
)
return our_max_adjusted
else:
our_max_adjusted = float(settings.Agora.MaxMargin)
# Set the max rates as people have no choice
return our_max_adjusted
iter_count = 0
while place > 1 and not iter_count > 100:
iter_count += 1
2022-01-09 17:52:26 +00:00
recommended_margin = float(ads[place][3]) - 0.01
2022-01-09 14:34:37 +00:00
if recommended_margin <= float(settings.Agora.MinMargin):
break
ads[place][3] = recommended_margin
2022-01-09 17:52:26 +00:00
print("Set recommended margin for", currency, recommended_margin)
2022-01-09 14:34:37 +00:00
ads, place = self.sort_ads_annotate_place(ads)
# Return our adjusted (or left) margin
2022-01-09 17:52:26 +00:00
return float(ads[place][3])
2022-01-09 14:34:37 +00:00
2021-12-30 14:15:24 +00:00
def nuke_ads(self):
"""
Delete all of our adverts.
:return: True or False
:rtype: bool
"""
2021-12-31 00:39:52 +00:00
ads = self.enum_ad_ids()
return_ids = []
2021-12-30 14:15:24 +00:00
if not ads:
return False
for ad_id in ads:
2021-12-31 00:39:52 +00:00
rtrn = self.agora.ad_delete(ad_id)
return_ids.append(rtrn["success"])
return all(return_ids)
2021-12-24 02:23:38 +00:00
2021-12-24 17:26:31 +00:00
def get_rates_all(self):
2021-12-27 21:33:49 +00:00
"""
Get all rates that pair with USD.
:return: dictionary of USD/XXX rates
:rtype: dict
"""
2021-12-24 17:26:31 +00:00
rates = self.cr.get_rates("USD")
return rates
2021-12-24 02:23:38 +00:00
2021-12-28 23:56:32 +00:00
def get_acceptable_margins(self, currency, amount):
"""
Get the minimum and maximum amounts we would accept a trade for.
:param currency: currency code
:param amount: amount
:return: (min, max)
:rtype: tuple
"""
rates = self.get_rates_all()
if currency == "USD":
min_amount = amount - float(settings.Agora.AcceptableUSDMargin)
max_amount = amount + float(settings.Agora.AcceptableUSDMargin)
return (min_amount, max_amount)
amount_usd = amount / rates[currency]
min_usd = amount_usd - float(settings.Agora.AcceptableUSDMargin)
max_usd = amount_usd + float(settings.Agora.AcceptableUSDMargin)
min_local = min_usd * rates[currency]
max_local = max_usd * rates[currency]
return (min_local, max_local)
2021-12-24 17:26:31 +00:00
def create_ad(self, countrycode, currency):
2021-12-27 21:33:49 +00:00
"""
Post an ad in a country with a given currency.
Convert the min and max amounts from settings to the given currency with CurrencyRates.
:param countrycode: country code
:param currency: currency code
:type countrycode: string
:type currency: string
:return: data about created object or error
:rtype: dict
"""
ad = settings.Agora.Ad
paymentdetails = settings.Agora.PaymentDetails
ad = ad.replace("$CURRENCY$", currency)
if countrycode == "GB" and currency == "GBP":
adtext = ad.replace("$PAYMENT$", settings.Agora.GBPDetailsAd)
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.GBPDetailsPayment)
else:
adtext = ad.replace("$PAYMENT$", settings.Agora.DefaultDetailsAd)
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.DefaultDetailsPayment)
2021-12-24 17:26:31 +00:00
rates = self.get_rates_all()
if currency == "USD":
min_amount = float(settings.Agora.MinUSD)
max_amount = float(settings.Agora.MaxUSD)
else:
min_amount = rates[currency] * float(settings.Agora.MinUSD)
max_amount = rates[currency] * float(settings.Agora.MaxUSD)
2021-12-26 23:12:02 +00:00
price_formula = f"coingeckoxmrusd*usd{currency.lower()}*{settings.Agora.Margin}"
# price_formula = f"coingeckoxmrusd*{settings.Agora.Margin}"
2021-12-27 13:50:32 +00:00
ad = settings.Agora.Ad
ad = ad.replace("\\t", "\t")
ad = self.agora.ad_create(
country_code=countrycode,
currency=currency,
trade_type="ONLINE_SELL",
asset="XMR",
price_equation=price_formula,
track_max_amount=False,
require_trusted_by_advertiser=False,
# verified_email_required = False,
online_provider="REVOLUT",
msg=adtext,
min_amount=min_amount,
max_amount=max_amount,
payment_method_details=settings.Agora.PaymentMethodDetails,
# require_feedback_score = 0,
account_info=paymentdetailstext,
)
return ad
2021-12-24 02:23:38 +00:00
def dist_countries(self):
2021-12-27 21:33:49 +00:00
"""
Distribute our advert into all countries listed in the config.
Exits on errors.
:return: False or dict with response
:rtype: bool or dict
2021-12-27 21:33:49 +00:00
"""
2021-12-24 02:23:38 +00:00
for currency, countrycode in loads(settings.Agora.DistList):
2021-12-24 17:26:31 +00:00
rtrn = self.create_ad(countrycode, currency)
2021-12-24 02:23:38 +00:00
if not rtrn:
return False
2021-12-29 19:21:05 +00:00
yield rtrn
2021-12-31 00:39:52 +00:00
def get_combinations(self):
"""
Get all combinations of currencies and countries from the configuration.
:return: list of [country, currency]
:rtype: list
"""
currencies = loads(settings.Agora.BruteCurrencies)
countries = loads(settings.Agora.BruteCountries)
combinations = [[country, currency] for country in countries for currency in currencies]
return combinations
def dist_bruteforce(self):
"""
Bruteforce all possible ads from the currencies and countries in the config.
Does not exit on errors.
:return: False or dict with response
:rtype: bool or dict
"""
2021-12-31 00:39:52 +00:00
combinations = self.get_combinations()
for country, currency in combinations:
rtrn = self.create_ad(country, currency)
if not rtrn:
yield False
yield rtrn
2021-12-31 00:39:52 +00:00
def bruteforce_fill_blanks(self):
"""
Get the ads that we want to configure but have not, and fill in the blanks.
:return: False or dict with response
:rtype: bool or dict
"""
existing_ads = self.enum_ads()
combinations = self.get_combinations()
for country, currency in combinations:
if not [country, currency] in existing_ads:
rtrn = self.create_ad(country, currency)
if not rtrn:
yield False
yield rtrn
def strip_duplicate_ads(self):
"""
Remove duplicate ads.
:return: list of duplicate ads
:rtype: list
"""
existing_ads = self.enum_ads()
_size = len(existing_ads)
repeated = []
for i in range(_size):
k = i + 1
for j in range(k, _size):
if existing_ads[i] == existing_ads[j] and existing_ads[i] not in repeated:
repeated.append(existing_ads[i])
actioned = []
for ad_id, country, currency in repeated:
rtrn = self.agora.ad_delete(ad_id)
actioned.append(rtrn["success"])
return all(actioned)
2021-12-29 19:21:05 +00:00
def release_funds(self, contact_id):
"""
Release funds for a contact_id.
:param contact_id: trade/contact ID
:type contact_id: string
:return: response dict
:rtype: dict
"""
payload = {"tradeId": contact_id, "password": settings.Agora.Pass}
rtrn = self.agora._api_call(api_method=f"contact_release/{contact_id}", http_method="POST", query_values=payload)
return rtrn