pluto/handler/agora.py

510 lines
19 KiB
Python

# Twisted/Klein imports
from twisted.logger import Logger
from twisted.internet.task import LoopingCall
# Other library imports
from json import loads
from forex_python.converter import CurrencyRates
from agoradesk_py import AgoraDesk
from httpx import ReadTimeout
from pycoingecko import CoinGeckoAPI
# Project imports
from settings import settings
class Agora(object):
"""
AgoraDesk API handler.
"""
def __init__(self):
"""
Initialise the AgoraDesk and CurrencyRates APIs.
Initialise the last_dash storage for detecting new trades.
"""
self.log = Logger("agora")
self.agora = AgoraDesk(settings.Agora.Token)
self.cr = CurrencyRates()
self.cg = CoinGeckoAPI()
# Cache for detecting new trades
self.last_dash = set()
# Cache for detecting new messages
self.last_messages = {}
def set_irc(self, irc):
self.irc = irc
def set_tx(self, tx):
self.tx = tx
def setup_loop(self):
"""
Set up the LoopingCall to get all active trades and messages.
"""
self.lc_dash = LoopingCall(self.loop_check)
self.lc_dash.start(int(settings.Agora.RefreshSec))
def loop_check(self):
"""
Calls hooks to parse dashboard info and get all contact messages.
"""
try:
dash = self.agora.dashboard_seller()
except ReadTimeout:
return False
dash_tmp = {}
if "data" not in dash["response"].keys():
self.log.error("Data not in dashboard response: {content}", content=dash)
return False
if dash["response"]["data"]["contact_count"] > 0:
for contact in dash["response"]["data"]["contact_list"]:
contact_id = contact["data"]["contact_id"]
dash_tmp[contact_id] = contact
# Call dashboard hooks
self.dashboard_hook(dash_tmp)
# Get recent messages
try:
self.get_recent_messages()
except ReadTimeout:
pass
return dash_tmp
def get_dashboard(self):
"""
Get dashboard helper for IRC only.
"""
# dash_tmp.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
pass
def dashboard_hook(self, dash):
"""
Get information about our open trades.
Post new trades to IRC and cache trades for the future.
"""
current_trades = []
if not dash.items():
return
for contact_id, contact in dash.items():
reference = self.tx.tx_to_ref(contact_id)
if reference:
current_trades.append(reference)
buyer = contact["data"]["buyer"]["username"]
amount = contact["data"]["amount"]
amount_xmr = contact["data"]["amount_xmr"]
currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
if reference not in self.last_dash:
reference = self.tx.new_trade(contact_id, buyer, currency, amount, amount_xmr)
if reference:
if reference not in current_trades:
current_trades.append(reference)
# Let us know there is a new trade
self.irc.sendmsg(f"AUTO {reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
# Note that we have seen this reference
self.last_dash.add(reference)
# Purge old trades from cache
for ref in list(self.last_dash): # We're removing from the list on the fly
if ref not in current_trades:
self.last_dash.remove(ref)
if reference and reference not in current_trades:
current_trades.append(reference)
self.tx.cleanup(current_trades)
def dashboard_release_urls(self):
"""
Get information about our open trades.
Post new trades to IRC and cache trades for the future.
:return: human readable list of strings about our trades or False
:rtype: list or bool
"""
dash = self.agora.dashboard_seller()
dash_tmp = []
if "data" not in dash["response"]:
self.log.error("Data not in dashboard response: {content}", content=dash)
return False
if dash["response"]["data"]["contact_count"] > 0:
for contact in dash["response"]["data"]["contact_list"]:
contact_id = contact["data"]["contact_id"]
buyer = contact["data"]["buyer"]["username"]
amount = contact["data"]["amount"]
amount_xmr = contact["data"]["amount_xmr"]
currency = contact["data"]["currency"]
release_url = contact["actions"]["release_url"]
if not contact["data"]["is_selling"]:
continue
reference = self.tx.tx_to_ref(contact_id)
if not reference:
reference = "not_set"
dash_tmp.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR {release_url}")
return dash_tmp
def get_recent_messages(self, send_irc=True):
"""
Get recent messages.
"""
messages_tmp = {}
messages = self.agora.recent_messages()
if not messages["success"]:
return False
open_tx = self.tx.get_ref_map().keys()
for message in messages["response"]["data"]["message_list"]:
contact_id = message["contact_id"]
username = message["sender"]["username"]
msg = message["msg"]
if contact_id not in open_tx:
continue
reference = self.tx.tx_to_ref(contact_id)
if reference in messages_tmp:
messages_tmp[reference].append([username, msg])
else:
messages_tmp[reference] = [[username, msg]]
# Send new messages on IRC
if send_irc:
for user, message in messages_tmp[reference]:
if reference in self.last_messages:
if not [user, message] in self.last_messages[reference]:
self.irc.sendmsg(f"AUTO {reference}: ({user}) {message}")
# Append sent messages to last_messages so we don't send them again
self.last_messages[reference].append([user, message])
else:
self.last_messages[reference] = [[user, message]]
for x in messages_tmp[reference]:
self.irc.sendmsg(f"NEW {reference}: ({user}) {message}")
# Purge old trades from cache
for ref in list(self.last_messages): # We're removing from the list on the fly
if ref not in messages_tmp:
del self.last_messages[ref]
return messages_tmp
def enum_ad_ids(self, page=0):
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
ads_total = []
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
ads_total.append(ad["data"]["ad_id"])
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
for ad in self.enum_ad_ids(page):
ads_total.append(ad)
return ads_total
def enum_ads(self, page=0):
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
ads_total = []
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
ads_total.append([ad["data"]["ad_id"], ad["data"]["countrycode"], ad["data"]["currency"]])
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
for ad in self.enum_ads(page):
ads_total.append([ad[0], ad[1], ad[2]])
return ads_total
def enum_public_ads(self, currency, page=0):
ads = self.agora._api_call(api_method=f"buy-monero-online/{currency}/REVOLUT", query_values={"page": page})
if not ads["success"]:
return False
for ad in ads["response"]["data"]["ad_list"]:
if ad["data"]["online_provider"] == "REVOLUT":
yield [ad["data"]["ad_id"], ad["data"]["profile"]["username"], ad["data"]["temp_price"]]
if "pagination" in ads["response"]:
if "next" in ads["response"]["pagination"]:
page += 1
for ad in self.enum_public_ads(currency, page):
yield [ad[0], ad[1], ad[2]]
def wrap_public_ads(self, currency, rates=None):
"""
Wrapper to sort public ads.
"""
ads = list(self.enum_public_ads(currency.upper()))
if not rates:
base_monero_price = self.cg.get_price(ids="monero", vs_currencies=currency)["monero"][currency.lower()]
else:
base_monero_price = rates
for ad in ads:
price = float(ad[2])
rate = round(price / base_monero_price, 2)
ad.append(rate)
ads.sort(key=lambda x: float(x[2]))
return ads
def sort_ads_annotate_place(self, ads):
ads.sort(key=lambda x: float(x[2]))
place = None
for index, ad in enumerate(ads):
if ad[1] == settings.Agora.Username:
place = index
break
if place is None:
print("Place is none for ads:", ads)
return (ads, place)
def update_prices(self):
our_ads = self.enum_ads()
currencies = [x[2].lower() for x in our_ads]
public_ad_dict = {}
rates_xmr = self.cg.get_price(ids="monero", vs_currencies=currencies)
for currency in currencies:
try:
rates = rates_xmr["monero"][currency]
public_ad_dict[currency] = self.wrap_public_ads(currency, rates=rates)
except KeyError:
self.log.error("Error getting public ads for currency {currency}", currency=currency)
continue
for ad_id, country, currency in our_ads:
# Get the ads for this country/currency pair
try:
public_ads = public_ad_dict[currency.lower()]
except KeyError:
continue
new_margin = self.autoprice(public_ads, currency)
new_formula = f"coingeckoxmrusd*usd{currency.lower()}*{new_margin}"
rtrn = self.agora.ad_equation(ad_id, new_formula)
if not rtrn["success"]:
self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"])
self.log.info("Rate for {currency}: {margin}", currency=currency, margin=new_margin)
def autoprice(self, ads, currency):
"""
Helper function to automatically adjust the price up/down in certain markets
in order to gain the most profits and sales.
"""
ads, place = self.sort_ads_annotate_place(ads)
if place == 0:
if len(ads) > 1:
competitor_index = None
for index, ad in enumerate(ads):
if ad[1] != settings.Agora.Username:
competitor_index = index
print("Found a competitor at index", competitor_index, ad)
break
else:
print("Skipping ad", ad)
if not competitor_index:
print("Couldn't find a competitor from ads:", ads)
return float(ads[0][3])
next_max = float(ads[competitor_index][3])
our_max_adjusted = next_max - 0.01
if our_max_adjusted > float(settings.Agora.MaxMargin):
our_max_adjusted = float(settings.Agora.MaxMargin)
# Lowball next competitor
self.log.info(
"Lowballing next competitor for {currency}: {margin}",
currency=currency,
margin=our_max_adjusted,
)
return our_max_adjusted
else:
our_max_adjusted = float(settings.Agora.MaxMargin)
# Set the max rates as people have no choice
return our_max_adjusted
iter_count = 0
while place > 1 and not iter_count > 100:
iter_count += 1
recommended_margin = float(ads[place][3]) - 0.01
if recommended_margin <= float(settings.Agora.MinMargin):
break
ads[place][3] = recommended_margin
print("Set recommended margin for", currency, recommended_margin)
ads, place = self.sort_ads_annotate_place(ads)
# Return our adjusted (or left) margin
return float(ads[place][3])
def nuke_ads(self):
"""
Delete all of our adverts.
:return: True or False
:rtype: bool
"""
ads = self.enum_ad_ids()
return_ids = []
if not ads:
return False
for ad_id in ads:
rtrn = self.agora.ad_delete(ad_id)
return_ids.append(rtrn["success"])
return all(return_ids)
def get_rates_all(self):
"""
Get all rates that pair with USD.
:return: dictionary of USD/XXX rates
:rtype: dict
"""
rates = self.cr.get_rates("USD")
return rates
def get_acceptable_margins(self, currency, amount):
"""
Get the minimum and maximum amounts we would accept a trade for.
:param currency: currency code
:param amount: amount
:return: (min, max)
:rtype: tuple
"""
rates = self.get_rates_all()
if currency == "USD":
min_amount = amount - float(settings.Agora.AcceptableUSDMargin)
max_amount = amount + float(settings.Agora.AcceptableUSDMargin)
return (min_amount, max_amount)
amount_usd = amount / rates[currency]
min_usd = amount_usd - float(settings.Agora.AcceptableUSDMargin)
max_usd = amount_usd + float(settings.Agora.AcceptableUSDMargin)
min_local = min_usd * rates[currency]
max_local = max_usd * rates[currency]
return (min_local, max_local)
def create_ad(self, countrycode, currency):
"""
Post an ad in a country with a given currency.
Convert the min and max amounts from settings to the given currency with CurrencyRates.
:param countrycode: country code
:param currency: currency code
:type countrycode: string
:type currency: string
:return: data about created object or error
:rtype: dict
"""
ad = settings.Agora.Ad
paymentdetails = settings.Agora.PaymentDetails
ad = ad.replace("$CURRENCY$", currency)
if countrycode == "GB" and currency == "GBP":
adtext = ad.replace("$PAYMENT$", settings.Agora.GBPDetailsAd)
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.GBPDetailsPayment)
else:
adtext = ad.replace("$PAYMENT$", settings.Agora.DefaultDetailsAd)
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.DefaultDetailsPayment)
rates = self.get_rates_all()
if currency == "USD":
min_amount = float(settings.Agora.MinUSD)
max_amount = float(settings.Agora.MaxUSD)
else:
min_amount = rates[currency] * float(settings.Agora.MinUSD)
max_amount = rates[currency] * float(settings.Agora.MaxUSD)
price_formula = f"coingeckoxmrusd*usd{currency.lower()}*{settings.Agora.Margin}"
# price_formula = f"coingeckoxmrusd*{settings.Agora.Margin}"
ad = settings.Agora.Ad
ad = ad.replace("\\t", "\t")
ad = self.agora.ad_create(
country_code=countrycode,
currency=currency,
trade_type="ONLINE_SELL",
asset="XMR",
price_equation=price_formula,
track_max_amount=False,
require_trusted_by_advertiser=False,
# verified_email_required = False,
online_provider="REVOLUT",
msg=adtext,
min_amount=min_amount,
max_amount=max_amount,
payment_method_details=settings.Agora.PaymentMethodDetails,
# require_feedback_score = 0,
account_info=paymentdetailstext,
)
return ad
def dist_countries(self):
"""
Distribute our advert into all countries listed in the config.
Exits on errors.
:return: False or dict with response
:rtype: bool or dict
"""
for currency, countrycode in loads(settings.Agora.DistList):
rtrn = self.create_ad(countrycode, currency)
if not rtrn:
return False
yield rtrn
def get_combinations(self):
"""
Get all combinations of currencies and countries from the configuration.
:return: list of [country, currency]
:rtype: list
"""
currencies = loads(settings.Agora.BruteCurrencies)
countries = loads(settings.Agora.BruteCountries)
combinations = [[country, currency] for country in countries for currency in currencies]
return combinations
def dist_bruteforce(self):
"""
Bruteforce all possible ads from the currencies and countries in the config.
Does not exit on errors.
:return: False or dict with response
:rtype: bool or dict
"""
combinations = self.get_combinations()
for country, currency in combinations:
rtrn = self.create_ad(country, currency)
if not rtrn:
yield False
yield rtrn
def bruteforce_fill_blanks(self):
"""
Get the ads that we want to configure but have not, and fill in the blanks.
:return: False or dict with response
:rtype: bool or dict
"""
existing_ads = self.enum_ads()
combinations = self.get_combinations()
for country, currency in combinations:
if not [country, currency] in existing_ads:
rtrn = self.create_ad(country, currency)
if not rtrn:
yield False
yield rtrn
def strip_duplicate_ads(self):
"""
Remove duplicate ads.
:return: list of duplicate ads
:rtype: list
"""
existing_ads = self.enum_ads()
_size = len(existing_ads)
repeated = []
for i in range(_size):
k = i + 1
for j in range(k, _size):
if existing_ads[i] == existing_ads[j] and existing_ads[i] not in repeated:
repeated.append(existing_ads[i])
actioned = []
for ad_id, country, currency in repeated:
rtrn = self.agora.ad_delete(ad_id)
actioned.append(rtrn["success"])
return all(actioned)
def release_funds(self, contact_id):
"""
Release funds for a contact_id.
:param contact_id: trade/contact ID
:type contact_id: string
:return: response dict
:rtype: dict
"""
payload = {"tradeId": contact_id, "password": settings.Agora.Pass}
rtrn = self.agora._api_call(api_method=f"contact_release/{contact_id}", http_method="POST", query_values=payload)
return rtrn