526 lines
20 KiB
Python
526 lines
20 KiB
Python
# Twisted/Klein imports
|
|
from twisted.logger import Logger
|
|
from twisted.internet.task import LoopingCall
|
|
|
|
# Other library imports
|
|
from json import loads
|
|
from forex_python.converter import CurrencyRates
|
|
from agoradesk_py import AgoraDesk
|
|
from httpx import ReadTimeout
|
|
from pycoingecko import CoinGeckoAPI
|
|
|
|
# Project imports
|
|
from settings import settings
|
|
|
|
|
|
class Agora(object):
|
|
"""
|
|
AgoraDesk API handler.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialise the AgoraDesk and CurrencyRates APIs.
|
|
Initialise the last_dash storage for detecting new trades.
|
|
"""
|
|
self.log = Logger("agora")
|
|
self.agora = AgoraDesk(settings.Agora.Token)
|
|
self.cr = CurrencyRates()
|
|
self.cg = CoinGeckoAPI()
|
|
|
|
# Cache for detecting new trades
|
|
self.last_dash = set()
|
|
|
|
# Cache for detecting new messages
|
|
self.last_messages = {}
|
|
|
|
def set_irc(self, irc):
|
|
self.irc = irc
|
|
|
|
def set_tx(self, tx):
|
|
self.tx = tx
|
|
|
|
def setup_loop(self):
|
|
"""
|
|
Set up the LoopingCall to get all active trades and messages.
|
|
"""
|
|
self.lc_dash = LoopingCall(self.loop_check)
|
|
self.lc_dash.start(int(settings.Agora.RefreshSec))
|
|
|
|
def wrap_dashboard(self):
|
|
try:
|
|
dash = self.agora.dashboard_seller()
|
|
except ReadTimeout:
|
|
return False
|
|
dash_tmp = {}
|
|
if "data" not in dash["response"].keys():
|
|
self.log.error("Data not in dashboard response: {content}", content=dash)
|
|
return dash_tmp
|
|
if dash["response"]["data"]["contact_count"] > 0:
|
|
for contact in dash["response"]["data"]["contact_list"]:
|
|
contact_id = contact["data"]["contact_id"]
|
|
dash_tmp[contact_id] = contact
|
|
return dash_tmp
|
|
|
|
def loop_check(self):
|
|
"""
|
|
Calls hooks to parse dashboard info and get all contact messages.
|
|
"""
|
|
dash_tmp = self.wrap_dashboard()
|
|
|
|
# Call dashboard hooks
|
|
self.dashboard_hook(dash_tmp)
|
|
|
|
# Get recent messages
|
|
try:
|
|
self.get_recent_messages()
|
|
except ReadTimeout:
|
|
pass
|
|
return dash_tmp
|
|
|
|
def get_dashboard(self):
|
|
"""
|
|
Get dashboard helper for IRC only.
|
|
"""
|
|
dash = self.wrap_dashboard()
|
|
rtrn = []
|
|
if not dash.items():
|
|
return
|
|
for contact_id, contact in dash.items():
|
|
reference = self.tx.tx_to_ref(contact_id)
|
|
buyer = contact["data"]["buyer"]["username"]
|
|
amount = contact["data"]["amount"]
|
|
amount_xmr = contact["data"]["amount_xmr"]
|
|
currency = contact["data"]["currency"]
|
|
if not contact["data"]["is_selling"]:
|
|
continue
|
|
rtrn.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
|
|
return rtrn
|
|
|
|
def dashboard_hook(self, dash):
|
|
"""
|
|
Get information about our open trades.
|
|
Post new trades to IRC and cache trades for the future.
|
|
"""
|
|
current_trades = []
|
|
if not dash.items():
|
|
return
|
|
for contact_id, contact in dash.items():
|
|
reference = self.tx.tx_to_ref(contact_id)
|
|
if reference:
|
|
current_trades.append(reference)
|
|
buyer = contact["data"]["buyer"]["username"]
|
|
amount = contact["data"]["amount"]
|
|
amount_xmr = contact["data"]["amount_xmr"]
|
|
currency = contact["data"]["currency"]
|
|
if not contact["data"]["is_selling"]:
|
|
continue
|
|
if reference not in self.last_dash:
|
|
reference = self.tx.new_trade(contact_id, buyer, currency, amount, amount_xmr)
|
|
if reference:
|
|
if reference not in current_trades:
|
|
current_trades.append(reference)
|
|
# Let us know there is a new trade
|
|
self.irc.sendmsg(f"AUTO {reference}: {buyer} {amount}{currency} {amount_xmr}XMR")
|
|
# Note that we have seen this reference
|
|
self.last_dash.add(reference)
|
|
|
|
# Purge old trades from cache
|
|
for ref in list(self.last_dash): # We're removing from the list on the fly
|
|
if ref not in current_trades:
|
|
self.last_dash.remove(ref)
|
|
if reference and reference not in current_trades:
|
|
current_trades.append(reference)
|
|
self.tx.cleanup(current_trades)
|
|
|
|
def dashboard_release_urls(self):
|
|
"""
|
|
Get information about our open trades.
|
|
Post new trades to IRC and cache trades for the future.
|
|
:return: human readable list of strings about our trades or False
|
|
:rtype: list or bool
|
|
"""
|
|
dash = self.agora.dashboard_seller()
|
|
dash_tmp = []
|
|
if "data" not in dash["response"]:
|
|
self.log.error("Data not in dashboard response: {content}", content=dash)
|
|
return False
|
|
if dash["response"]["data"]["contact_count"] > 0:
|
|
for contact in dash["response"]["data"]["contact_list"]:
|
|
contact_id = contact["data"]["contact_id"]
|
|
buyer = contact["data"]["buyer"]["username"]
|
|
amount = contact["data"]["amount"]
|
|
amount_xmr = contact["data"]["amount_xmr"]
|
|
currency = contact["data"]["currency"]
|
|
release_url = contact["actions"]["release_url"]
|
|
if not contact["data"]["is_selling"]:
|
|
continue
|
|
reference = self.tx.tx_to_ref(contact_id)
|
|
if not reference:
|
|
reference = "not_set"
|
|
dash_tmp.append(f"{reference}: {buyer} {amount}{currency} {amount_xmr}XMR {release_url}")
|
|
|
|
return dash_tmp
|
|
|
|
def get_recent_messages(self, send_irc=True):
|
|
"""
|
|
Get recent messages.
|
|
"""
|
|
messages_tmp = {}
|
|
messages = self.agora.recent_messages()
|
|
if not messages["success"]:
|
|
return False
|
|
open_tx = self.tx.get_ref_map().keys()
|
|
for message in messages["response"]["data"]["message_list"]:
|
|
contact_id = message["contact_id"]
|
|
username = message["sender"]["username"]
|
|
msg = message["msg"]
|
|
if contact_id not in open_tx:
|
|
continue
|
|
reference = self.tx.tx_to_ref(contact_id)
|
|
if reference in messages_tmp:
|
|
messages_tmp[reference].append([username, msg])
|
|
else:
|
|
messages_tmp[reference] = [[username, msg]]
|
|
|
|
# Send new messages on IRC
|
|
if send_irc:
|
|
for user, message in messages_tmp[reference]:
|
|
if reference in self.last_messages:
|
|
if not [user, message] in self.last_messages[reference]:
|
|
self.irc.sendmsg(f"AUTO {reference}: ({user}) {message}")
|
|
# Append sent messages to last_messages so we don't send them again
|
|
self.last_messages[reference].append([user, message])
|
|
else:
|
|
self.last_messages[reference] = [[user, message]]
|
|
for x in messages_tmp[reference]:
|
|
self.irc.sendmsg(f"NEW {reference}: ({user}) {message}")
|
|
|
|
# Purge old trades from cache
|
|
for ref in list(self.last_messages): # We're removing from the list on the fly
|
|
if ref not in messages_tmp:
|
|
del self.last_messages[ref]
|
|
|
|
return messages_tmp
|
|
|
|
def enum_ad_ids(self, page=0):
|
|
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
|
|
ads_total = []
|
|
if not ads["success"]:
|
|
return False
|
|
for ad in ads["response"]["data"]["ad_list"]:
|
|
ads_total.append(ad["data"]["ad_id"])
|
|
if "pagination" in ads["response"]:
|
|
if "next" in ads["response"]["pagination"]:
|
|
page += 1
|
|
for ad in self.enum_ad_ids(page):
|
|
ads_total.append(ad)
|
|
return ads_total
|
|
|
|
def enum_ads(self, page=0):
|
|
ads = self.agora._api_call(api_method="ads", query_values={"page": page})
|
|
ads_total = []
|
|
if not ads["success"]:
|
|
return False
|
|
for ad in ads["response"]["data"]["ad_list"]:
|
|
ads_total.append([ad["data"]["ad_id"], ad["data"]["countrycode"], ad["data"]["currency"]])
|
|
if "pagination" in ads["response"]:
|
|
if "next" in ads["response"]["pagination"]:
|
|
page += 1
|
|
for ad in self.enum_ads(page):
|
|
ads_total.append([ad[0], ad[1], ad[2]])
|
|
return ads_total
|
|
|
|
def enum_public_ads(self, currency, page=0):
|
|
ads = self.agora._api_call(api_method=f"buy-monero-online/{currency}/REVOLUT", query_values={"page": page})
|
|
if not ads["success"]:
|
|
return False
|
|
for ad in ads["response"]["data"]["ad_list"]:
|
|
if ad["data"]["online_provider"] == "REVOLUT":
|
|
yield [ad["data"]["ad_id"], ad["data"]["profile"]["username"], ad["data"]["temp_price"]]
|
|
if "pagination" in ads["response"]:
|
|
if "next" in ads["response"]["pagination"]:
|
|
page += 1
|
|
for ad in self.enum_public_ads(currency, page):
|
|
yield [ad[0], ad[1], ad[2]]
|
|
|
|
def wrap_public_ads(self, currency, rates=None):
|
|
"""
|
|
Wrapper to sort public ads.
|
|
"""
|
|
ads = list(self.enum_public_ads(currency.upper()))
|
|
if not rates:
|
|
base_monero_price = self.cg.get_price(ids="monero", vs_currencies=currency)["monero"][currency.lower()]
|
|
else:
|
|
base_monero_price = rates
|
|
for ad in ads:
|
|
price = float(ad[2])
|
|
rate = round(price / base_monero_price, 2)
|
|
ad.append(rate)
|
|
ads.sort(key=lambda x: float(x[2]))
|
|
return ads
|
|
|
|
def sort_ads_annotate_place(self, ads):
|
|
ads.sort(key=lambda x: float(x[2]))
|
|
place = None
|
|
for index, ad in enumerate(ads):
|
|
if ad[1] == settings.Agora.Username:
|
|
place = index
|
|
break
|
|
if place is None:
|
|
print("Place is none for ads:", ads)
|
|
return (ads, place)
|
|
|
|
def update_prices(self):
|
|
our_ads = self.enum_ads()
|
|
currencies = [x[2].lower() for x in our_ads]
|
|
public_ad_dict = {}
|
|
rates_xmr = self.cg.get_price(ids="monero", vs_currencies=currencies)
|
|
for currency in currencies:
|
|
try:
|
|
rates = rates_xmr["monero"][currency]
|
|
public_ad_dict[currency] = self.wrap_public_ads(currency, rates=rates)
|
|
except KeyError:
|
|
self.log.error("Error getting public ads for currency {currency}", currency=currency)
|
|
continue
|
|
for ad_id, country, currency in our_ads:
|
|
# Get the ads for this country/currency pair
|
|
try:
|
|
public_ads = public_ad_dict[currency.lower()]
|
|
except KeyError:
|
|
continue
|
|
new_margin = self.autoprice(public_ads, currency)
|
|
new_formula = f"coingeckoxmrusd*usd{currency.lower()}*{new_margin}"
|
|
rtrn = self.agora.ad_equation(ad_id, new_formula)
|
|
if not rtrn["success"]:
|
|
self.log.error("Error updating ad {ad_id}: {response}", ad_id=ad_id, response=rtrn["response"])
|
|
self.log.info("Rate for {currency}: {margin}", currency=currency, margin=new_margin)
|
|
|
|
def autoprice(self, ads, currency):
|
|
"""
|
|
Helper function to automatically adjust the price up/down in certain markets
|
|
in order to gain the most profits and sales.
|
|
"""
|
|
ads, place = self.sort_ads_annotate_place(ads)
|
|
if place == 0:
|
|
if len(ads) > 1:
|
|
competitor_index = None
|
|
for index, ad in enumerate(ads):
|
|
if ad[1] != settings.Agora.Username:
|
|
competitor_index = index
|
|
print("Found a competitor at index", competitor_index, ad)
|
|
break
|
|
else:
|
|
print("Skipping ad", ad)
|
|
if not competitor_index:
|
|
print("Couldn't find a competitor from ads:", ads)
|
|
return float(ads[0][3])
|
|
|
|
next_max = float(ads[competitor_index][3])
|
|
our_max_adjusted = next_max - 0.01
|
|
if our_max_adjusted > float(settings.Agora.MaxMargin):
|
|
our_max_adjusted = float(settings.Agora.MaxMargin)
|
|
# Lowball next competitor
|
|
self.log.info(
|
|
"Lowballing next competitor for {currency}: {margin}",
|
|
currency=currency,
|
|
margin=our_max_adjusted,
|
|
)
|
|
return our_max_adjusted
|
|
else:
|
|
our_max_adjusted = float(settings.Agora.MaxMargin)
|
|
# Set the max rates as people have no choice
|
|
return our_max_adjusted
|
|
iter_count = 0
|
|
while place > 1 and not iter_count > 100:
|
|
iter_count += 1
|
|
recommended_margin = float(ads[place][3]) - 0.01
|
|
if recommended_margin <= float(settings.Agora.MinMargin):
|
|
break
|
|
ads[place][3] = recommended_margin
|
|
print("Set recommended margin for", currency, recommended_margin)
|
|
|
|
ads, place = self.sort_ads_annotate_place(ads)
|
|
# Return our adjusted (or left) margin
|
|
return float(ads[place][3])
|
|
|
|
def nuke_ads(self):
|
|
"""
|
|
Delete all of our adverts.
|
|
:return: True or False
|
|
:rtype: bool
|
|
"""
|
|
ads = self.enum_ad_ids()
|
|
return_ids = []
|
|
if not ads:
|
|
return False
|
|
for ad_id in ads:
|
|
rtrn = self.agora.ad_delete(ad_id)
|
|
return_ids.append(rtrn["success"])
|
|
return all(return_ids)
|
|
|
|
def get_rates_all(self):
|
|
"""
|
|
Get all rates that pair with USD.
|
|
:return: dictionary of USD/XXX rates
|
|
:rtype: dict
|
|
"""
|
|
rates = self.cr.get_rates("USD")
|
|
return rates
|
|
|
|
def get_acceptable_margins(self, currency, amount):
|
|
"""
|
|
Get the minimum and maximum amounts we would accept a trade for.
|
|
:param currency: currency code
|
|
:param amount: amount
|
|
:return: (min, max)
|
|
:rtype: tuple
|
|
"""
|
|
rates = self.get_rates_all()
|
|
if currency == "USD":
|
|
min_amount = amount - float(settings.Agora.AcceptableUSDMargin)
|
|
max_amount = amount + float(settings.Agora.AcceptableUSDMargin)
|
|
return (min_amount, max_amount)
|
|
amount_usd = amount / rates[currency]
|
|
min_usd = amount_usd - float(settings.Agora.AcceptableUSDMargin)
|
|
max_usd = amount_usd + float(settings.Agora.AcceptableUSDMargin)
|
|
min_local = min_usd * rates[currency]
|
|
max_local = max_usd * rates[currency]
|
|
return (min_local, max_local)
|
|
|
|
def create_ad(self, countrycode, currency):
|
|
"""
|
|
Post an ad in a country with a given currency.
|
|
Convert the min and max amounts from settings to the given currency with CurrencyRates.
|
|
:param countrycode: country code
|
|
:param currency: currency code
|
|
:type countrycode: string
|
|
:type currency: string
|
|
:return: data about created object or error
|
|
:rtype: dict
|
|
"""
|
|
ad = settings.Agora.Ad
|
|
paymentdetails = settings.Agora.PaymentDetails
|
|
ad = ad.replace("$CURRENCY$", currency)
|
|
if countrycode == "GB" and currency == "GBP":
|
|
adtext = ad.replace("$PAYMENT$", settings.Agora.GBPDetailsAd)
|
|
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.GBPDetailsPayment)
|
|
else:
|
|
adtext = ad.replace("$PAYMENT$", settings.Agora.DefaultDetailsAd)
|
|
paymentdetailstext = paymentdetails.replace("$PAYMENT$", settings.Agora.DefaultDetailsPayment)
|
|
rates = self.get_rates_all()
|
|
if currency == "USD":
|
|
min_amount = float(settings.Agora.MinUSD)
|
|
max_amount = float(settings.Agora.MaxUSD)
|
|
else:
|
|
min_amount = rates[currency] * float(settings.Agora.MinUSD)
|
|
max_amount = rates[currency] * float(settings.Agora.MaxUSD)
|
|
price_formula = f"coingeckoxmrusd*usd{currency.lower()}*{settings.Agora.Margin}"
|
|
# price_formula = f"coingeckoxmrusd*{settings.Agora.Margin}"
|
|
ad = settings.Agora.Ad
|
|
ad = ad.replace("\\t", "\t")
|
|
ad = self.agora.ad_create(
|
|
country_code=countrycode,
|
|
currency=currency,
|
|
trade_type="ONLINE_SELL",
|
|
asset="XMR",
|
|
price_equation=price_formula,
|
|
track_max_amount=False,
|
|
require_trusted_by_advertiser=False,
|
|
# verified_email_required = False,
|
|
online_provider="REVOLUT",
|
|
msg=adtext,
|
|
min_amount=min_amount,
|
|
max_amount=max_amount,
|
|
payment_method_details=settings.Agora.PaymentMethodDetails,
|
|
# require_feedback_score = 0,
|
|
account_info=paymentdetailstext,
|
|
)
|
|
return ad
|
|
|
|
def dist_countries(self):
|
|
"""
|
|
Distribute our advert into all countries listed in the config.
|
|
Exits on errors.
|
|
:return: False or dict with response
|
|
:rtype: bool or dict
|
|
"""
|
|
for currency, countrycode in loads(settings.Agora.DistList):
|
|
rtrn = self.create_ad(countrycode, currency)
|
|
if not rtrn:
|
|
return False
|
|
yield rtrn
|
|
|
|
def get_combinations(self):
|
|
"""
|
|
Get all combinations of currencies and countries from the configuration.
|
|
:return: list of [country, currency]
|
|
:rtype: list
|
|
"""
|
|
currencies = loads(settings.Agora.BruteCurrencies)
|
|
countries = loads(settings.Agora.BruteCountries)
|
|
combinations = [[country, currency] for country in countries for currency in currencies]
|
|
return combinations
|
|
|
|
def dist_bruteforce(self):
|
|
"""
|
|
Bruteforce all possible ads from the currencies and countries in the config.
|
|
Does not exit on errors.
|
|
:return: False or dict with response
|
|
:rtype: bool or dict
|
|
"""
|
|
combinations = self.get_combinations()
|
|
for country, currency in combinations:
|
|
rtrn = self.create_ad(country, currency)
|
|
if not rtrn:
|
|
yield False
|
|
yield rtrn
|
|
|
|
def bruteforce_fill_blanks(self):
|
|
"""
|
|
Get the ads that we want to configure but have not, and fill in the blanks.
|
|
:return: False or dict with response
|
|
:rtype: bool or dict
|
|
"""
|
|
existing_ads = self.enum_ads()
|
|
combinations = self.get_combinations()
|
|
for country, currency in combinations:
|
|
if not [country, currency] in existing_ads:
|
|
rtrn = self.create_ad(country, currency)
|
|
if not rtrn:
|
|
yield False
|
|
yield rtrn
|
|
|
|
def strip_duplicate_ads(self):
|
|
"""
|
|
Remove duplicate ads.
|
|
:return: list of duplicate ads
|
|
:rtype: list
|
|
"""
|
|
existing_ads = self.enum_ads()
|
|
_size = len(existing_ads)
|
|
repeated = []
|
|
for i in range(_size):
|
|
k = i + 1
|
|
for j in range(k, _size):
|
|
if existing_ads[i] == existing_ads[j] and existing_ads[i] not in repeated:
|
|
repeated.append(existing_ads[i])
|
|
|
|
actioned = []
|
|
for ad_id, country, currency in repeated:
|
|
rtrn = self.agora.ad_delete(ad_id)
|
|
actioned.append(rtrn["success"])
|
|
return all(actioned)
|
|
|
|
def release_funds(self, contact_id):
|
|
"""
|
|
Release funds for a contact_id.
|
|
:param contact_id: trade/contact ID
|
|
:type contact_id: string
|
|
:return: response dict
|
|
:rtype: dict
|
|
"""
|
|
payload = {"tradeId": contact_id, "password": settings.Agora.Pass}
|
|
rtrn = self.agora._api_call(api_method=f"contact_release/{contact_id}", http_method="POST", query_values=payload)
|
|
return rtrn
|