diff --git a/handler/lib/localbitcoins.py b/handler/lib/localbitcoins_py.py similarity index 100% rename from handler/lib/localbitcoins.py rename to handler/lib/localbitcoins_py.py diff --git a/handler/sources/localbitcoins.py b/handler/sources/localbitcoins.py index e69de29..6beef59 100644 --- a/handler/sources/localbitcoins.py +++ b/handler/sources/localbitcoins.py @@ -0,0 +1,708 @@ +# Twisted/Klein imports +from twisted.internet.task import LoopingCall +from twisted.internet.threads import deferToThread + +# Other library imports +from json import loads +from forex_python.converter import CurrencyRates +from lib.localbitcoins_py import LocalBitcoins +from pycoingecko import CoinGeckoAPI # TODO: remove this import and defer to money +from time import sleep +from pyotp import TOTP +from datetime import datetime + +# Project imports +from settings import settings +import util + + +class LBTC(util.Base): + """ + LocalBitcoins API handler. + """ + + def __init__(self): + """ + Initialise the LocalBitcoins and CurrencyRates APIs. + Initialise the last_dash storage for detecting new trades. + """ + super().__init__() + self.lbtc = LocalBitcoins(settings.LocalBitcoins.Token) + self.cr = CurrencyRates() # TODO: remove this and defer to money + self.cg = CoinGeckoAPI() # TODO: remove this and defer to money + + # Cache for detecting new trades + self.last_dash = set() + + # Cache for detecting new messages + self.last_messages = {} + + # Assets that cheat has been run on + self.cheat_run_on = [] + + def setup_loop(self): # TODO:: move to main sources + """ + Set up the LoopingCall to get all active trades and messages. + """ + self.lc_dash = LoopingCall(self.loop_check) + self.lc_dash.start(int(settings.LocalBitcoins.RefreshSec)) + if settings.LocalBitcoins.Cheat == "1": + self.lc_cheat = LoopingCall(self.run_cheat_in_thread) + self.lc_cheat.start(int(settings.LocalBitcoins.CheatSec)) + + @util.handle_exceptions + def wrap_dashboard(self): + dash = self.lbtc.dashboard() # no dashboard_seller for lbtc + if dash is None: + return False + if dash is False: + return False + if dash["response"] is None: + return False + dash_tmp = {} + if not dash.items(): + return False + if "data" not in dash["response"].keys(): + self.log.error(f"Data not in dashboard response: {dash}") + return dash_tmp + if dash["response"]["data"]["contact_count"] > 0: + for contact in dash["response"]["data"]["contact_list"]: + contact_id = contact["data"]["contact_id"] + dash_tmp[contact_id] = contact + return dash_tmp + + def loop_check(self): + """ + Calls hooks to parse dashboard info and get all contact messages. + """ + dash_tmp = self.wrap_dashboard() + + # Call dashboard hooks + self.dashboard_hook(dash_tmp) + + # Get recent messages + self.get_recent_messages() + return dash_tmp + + def get_dashboard(self): + """ + Get dashboard helper for IRC only. + """ + dash = self.wrap_dashboard() + rtrn = [] + if dash is False: + return False + for contact_id, contact in dash.items(): + reference = self.tx.tx_to_ref(contact_id) + buyer = contact["data"]["buyer"]["username"] + amount = contact["data"]["amount"] + asset = contact["data"]["advertisement"]["asset"] + amount_crypto = contact["data"]["amount_btc"] + currency = contact["data"]["currency"] + provider = contact["data"]["advertisement"]["payment_method"] + if not contact["data"]["is_selling"]: + continue + rtrn.append(f"{reference}: {buyer} {amount}{currency} {provider} {amount_crypto}{asset}") + return rtrn + + def dashboard_hook(self, dash): + """ + Get information about our open trades. + Post new trades to IRC and cache trades for the future. + """ + current_trades = [] + if not dash: + return + if not dash.items(): + return + for contact_id, contact in dash.items(): + reference = self.tx.tx_to_ref(contact_id) + if reference: + current_trades.append(reference) + buyer = contact["data"]["buyer"]["username"] + amount = contact["data"]["amount"] + asset = contact["data"]["advertisement"]["asset"] + provider = contact["data"]["advertisement"]["payment_method"] + amount_crypto = contact["data"]["amount_btc"] + currency = contact["data"]["currency"] + if not contact["data"]["is_selling"]: + continue + if reference not in self.last_dash: + reference = self.tx.new_trade(asset, contact_id, buyer, currency, amount, amount_crypto, provider) + if reference: + if reference not in current_trades: + current_trades.append(reference) + # Let us know there is a new trade + self.irc.sendmsg(f"AUTO {reference}: {buyer} {amount}{currency} {provider} {amount_crypto}{asset}") + # Note that we have seen this reference + self.last_dash.add(reference) + + # Purge old trades from cache + for ref in list(self.last_dash): # We're removing from the list on the fly + if ref not in current_trades: + self.last_dash.remove(ref) + if reference and reference not in current_trades: + current_trades.append(reference) + self.tx.cleanup(current_trades) + + @util.handle_exceptions + def get_recent_messages(self, send_irc=True): + """ + Get recent messages. + """ + messages_tmp = {} + messages = self.lbtc.recent_messages() + if messages is False: + return False + if not messages["success"]: + return False + if "data" not in messages["response"]: + self.log.error(f"Data not in messages response: {messages['response']}") + return False + open_tx = self.tx.get_ref_map().keys() + for message in messages["response"]["data"]["message_list"]: + contact_id = message["contact_id"] + username = message["sender"]["username"] + msg = message["msg"] + if contact_id not in open_tx: + continue + reference = self.tx.tx_to_ref(contact_id) + if reference in messages_tmp: + messages_tmp[reference].append([username, msg]) + else: + messages_tmp[reference] = [[username, msg]] + + # Send new messages on IRC + if send_irc: + for user, message in messages_tmp[reference]: + if reference in self.last_messages: + if not [user, message] in self.last_messages[reference]: + self.irc.sendmsg(f"AUTO {reference}: ({user}) {message}") + # Append sent messages to last_messages so we don't send them again + self.last_messages[reference].append([user, message]) + else: + self.last_messages[reference] = [[user, message]] + for x in messages_tmp[reference]: + self.irc.sendmsg(f"NEW {reference}: ({user}) {message}") + + # Purge old trades from cache + for ref in list(self.last_messages): # We're removing from the list on the fly + if ref not in messages_tmp: + del self.last_messages[ref] + + return messages_tmp + + @util.handle_exceptions + def enum_ad_ids(self, page=0): + ads = self.lbtc._api_call(api_method="ads", query_values={"page": page}) + if ads is False: + return False + ads_total = [] + if not ads["success"]: + return False + for ad in ads["response"]["data"]["ad_list"]: + ads_total.append(ad["data"]["ad_id"]) + if "pagination" in ads["response"]: + if "next" in ads["response"]["pagination"]: + page += 1 + ads_iter = self.enum_ad_ids(page) + if ads_iter is None: + return False + if ads_iter is False: + return False + for ad in ads_iter: + ads_total.append(ad) + return ads_total + + @util.handle_exceptions + def enum_ads(self, requested_asset=None, page=0): + query_values = {"page": page} + if requested_asset: + query_values["asset"] = requested_asset + ads = self.lbtc._api_call(api_method="ads", query_values=query_values) + if ads is False: + return False + ads_total = [] + if not ads["success"]: + return False + for ad in ads["response"]["data"]["ad_list"]: + asset = ad["data"]["asset"] + ad_id = ad["data"]["ad_id"] + country = ad["data"]["countrycode"] + currency = ad["data"]["currency"] + provider = ad["data"]["online_provider"] + ads_total.append([asset, ad_id, country, currency, provider]) + if "pagination" in ads["response"]: + if "next" in ads["response"]["pagination"]: + page += 1 + ads_iter = self.enum_ads(requested_asset, page) + if ads_iter is None: + return False + if ads_iter is False: + return False + for ad in ads_iter: + ads_total.append([ad[0], ad[1], ad[2], ad[3], ad[4]]) + return ads_total + + @util.handle_exceptions + def enum_public_ads(self, asset, currency, providers=None, page=0): + to_return = [] + if not providers: + providers = ["REVOLUT"] + # buy-monero-online, buy-bitcoin-online + # Work around Agora weirdness calling it bitcoins + # if len(providers) == 1: + # ads = self.agora._api_call(api_method=f"buy-{coin}-online/{currency}/{providers[0]}", query_values={"page": page}) + # elif len(providers) > 1: + ads = self.lbtc._api_call(api_method=f"buy-bitcoins-online/{currency}", query_values={"page": page}) + # with open("pub.json", "a") as f: + # import json + # f.write(json.dumps([page, currency, asset, ads])+"\n") + # f.close() + if ads is None: + return False + if ads is False: + return False + if ads["response"] is None: + return False + if "data" not in ads["response"]: + return False + for ad in ads["response"]["data"]["ad_list"]: + if ad["data"]["online_provider"] not in providers: + continue + date_last_seen = ad["data"]["profile"]["last_online"] + # Check if this person was seen recently + if not util.last_online_recent(date_last_seen): + continue + ad_id = ad["data"]["ad_id"] + username = ad["data"]["profile"]["username"] + temp_price = ad["data"]["temp_price"] + provider = ad["data"]["online_provider"] + if ad["data"]["currency"] != currency: + continue + to_append = [ad_id, username, temp_price, provider, asset, currency] + if to_append not in to_return: + to_return.append(to_append) + # yield [ad_id, username, temp_price, provider, asset, currency] + if "pagination" in ads["response"]: + if "next" in ads["response"]["pagination"]: + page += 1 + ads_iter = self.enum_public_ads(asset, currency, providers, page) + if ads_iter is None: + return False + if ads_iter is False: + return False + for ad in ads_iter: + to_append = [ad[0], ad[1], ad[2], ad[3], ad[4], ad[5]] + if to_append not in to_return: + to_return.append(to_append) + return to_return + + def run_cheat_in_thread(self, assets=None): + """ + Update prices in another thread. + """ + if not assets: + all_assets = loads(settings.LocalBitcoins.AssetList) + assets_not_run = set(all_assets) ^ set(self.cheat_run_on) + if not assets_not_run: + self.cheat_run_on = [] + asset = list(all_assets).pop() + self.cheat_run_on.append(asset) + else: + asset = assets_not_run.pop() + self.cheat_run_on.append(asset) + deferToThread(self.update_prices, [asset]) + return asset + else: + deferToThread(self.update_prices, assets) + + @util.handle_exceptions + def update_prices(self, assets=None): + # Get all public ads for the given assets + public_ads = self.get_all_public_ads(assets) + if not public_ads: + return False + + # Get the ads to update + to_update = self.markets.get_new_ad_equations(public_ads, assets) + self.slow_ad_update(to_update) + + # TODO: make generic and move to markets + @util.handle_exceptions + def get_all_public_ads(self, assets=None, currencies=None, providers=None): + """ + Get all public ads for our listed currencies. + :return: dict of public ads keyed by currency + :rtype: dict + """ + public_ads = {} + crypto_map = { + "BTC": "bitcoin", + } + + if not assets: + assets = self.markets.get_all_assets() + # Get all currencies we have ads for, deduplicated + if not currencies: + currencies = self.markets.get_all_currencies() + if not providers: + providers = self.markets.get_all_providers() + # We want to get the ads for each of these currencies and return the result + rates = self.cg.get_price(ids=["bitcoin"], vs_currencies=currencies) + for asset in assets: + for currency in currencies: + cg_asset_name = crypto_map[asset] + try: + rates[cg_asset_name][currency.lower()] + except KeyError: + # self.log.error("Error getting public ads for currency {currency}", currency=currency) + continue + ads_list = self.enum_public_ads(asset, currency, providers) + if not ads_list: + continue + ads = self.money.lookup_rates(ads_list, rates=rates) + if not ads: + continue + self.write_to_es_ads("ads", ads) + if currency in public_ads: + for ad in list(ads): + if ad not in public_ads[currency]: + public_ads[currency].append(ad) + else: + public_ads[currency] = ads + + return public_ads + + def write_to_es_ads(self, msgtype, ads): + if settings.ES.Enabled == "1": + for ad in ads: + cast = { + "id": ad[0], + "username": ad[1], + "price": ad[2], + "provider": ad[3], + "asset": ad[4], + "currency": ad[5], + "margin": ad[6], + } + cast["type"] = msgtype + cast["ts"] = str(datetime.now().isoformat()) + cast["xtype"] = "platorm" + cast["market"] = "localbitcoins" + self.es.index(index=settings.ES.MetaIndex, document=cast) + + def slow_ad_update(self, ads): + """ + Slow ad equation update utilising exponential backoff in order to guarantee all ads are updated. + :param ads: our list of ads + """ + iterations = 0 + throttled = 0 + assets = set() + currencies = set() + while not all([x[4] for x in ads]) or iterations == 1000: + for ad_index in range(len(ads)): + ad_id, new_formula, asset, currency, actioned = ads[ad_index] + assets.add(asset) + currencies.add(currency) + if not actioned: + rtrn = self.agora.ad_equation(ad_id, new_formula) + if rtrn["success"]: + ads[ad_index][4] = True + throttled = 0 + continue + else: + if "error_code" not in rtrn["response"]["error"]: + self.log.error(f"Error code not in return for ad {ad_id}: {rtrn['response']}") + return + if rtrn["response"]["error"]["error_code"] == 429: + throttled += 1 + sleep_time = pow(throttled, float(settings.LocalBitcoins.SleepExponent)) + self.log.info(f"Throttled {throttled} times while updating {ad_id}, sleeping for {sleep_time} seconds") + # We're running in a thread, so this is fine + sleep(sleep_time) + self.log.error(f"Error updating ad {ad_id}: {rtrn['response']}") + continue + iterations += 1 + + @util.handle_exceptions + def nuke_ads(self): + """ + Delete all of our adverts. + :return: True or False + :rtype: bool + """ + ads = self.enum_ad_ids() + return_ids = [] + if ads is False: + return False + for ad_id in ads: + rtrn = self.lbtc.ad_delete(ad_id) + return_ids.append(rtrn["success"]) + return all(return_ids) + + def format_ad(self, asset, currency, payment_details_text): + """ + Format the ad. + """ + ad = settings.Platform.Ad + + # Substitute the currency + ad = ad.replace("$CURRENCY$", currency) + + # Substitute the asset + ad = ad.replace("$ASSET$", asset) + + # Substitute the payment details + ad = ad.replace("$PAYMENT$", payment_details_text) + + # Strip extra tabs + ad = ad.replace("\\t", "\t") + return ad + + def format_payment_details(self, currency, payment_details): + """ + Format the payment details. + """ + payment = settings.Platform.PaymentDetails + + payment_text = "" + for field, value in payment_details.items(): + formatted_name = field.replace("_", " ") + formatted_name = formatted_name.capitalize() + payment_text += f"* {formatted_name}: **{value}**" + if field != list(payment_details.keys())[-1]: # No trailing newline + payment_text += "\n" + + payment = payment.replace("$PAYMENT$", payment_text) + payment = payment.replace("$CURRENCY$", currency) + + return payment + + def get_minmax(self, asset, currency): + rates = self.money.get_rates_all() + if currency not in rates and not currency == "USD": + self.log.error(f"Can't create ad without rates: {currency}") + return + if asset == "XMR": + min_usd = float(settings.Agora.MinUSDXMR) + max_usd = float(settings.Agora.MaxUSDXMR) + elif asset == "BTC": + min_usd = float(settings.Agora.MinUSDBTC) + max_usd = float(settings.Agora.MaxUSDBTC) + if currency == "USD": + min_amount = min_usd + max_amount = max_usd + else: + min_amount = rates[currency] * min_usd + max_amount = rates[currency] * max_usd + + return (min_amount, max_amount) + + @util.handle_exceptions + def create_ad(self, asset, countrycode, currency, provider, payment_details, visible=None, edit=False, ad_id=None): + """ + Post an ad with the given asset in a country with a given currency. + Convert the min and max amounts from settings to the given currency with CurrencyRates. + :param asset: the crypto asset to list (XMR or BTC) + :type asset: string + :param countrycode: country code + :param currency: currency code + :param payment_details: the payment details + :type countrycode: string + :type currency: string + :type payment_details: dict + :return: data about created object or error + :rtype: dict + """ + + if payment_details: + payment_details_text = self.format_payment_details(currency, payment_details) + ad_text = self.format_ad(asset, currency, payment_details_text) + min_amount, max_amount = self.get_minmax(asset, currency) + + price_formula = f"coingecko{asset.lower()}usd*usd{currency.lower()}*{settings.LocalBitcoins.Margin}" + + form = { + "country_code": countrycode, + "currency": currency, + "trade_type": "ONLINE_SELL", + "asset": asset, + "price_equation": price_formula, + "track_max_amount": False, + "require_trusted_by_advertiser": False, + "online_provider": provider, + "payment_method_details": settings.Platform.PaymentMethodDetails, + } + if visible is False: + form["visible"] = False + elif visible is True: + form["visible"] = False + if payment_details: + form["account_info"] = payment_details_text + form["msg"] = ad_text + form["min_amount"] = min_amount + form["max_amount"] = max_amount + + if edit: + ad = self.agora.ad(ad_id=ad_id, **form) + else: + ad = self.agora.ad_create(**form) + return ad + + def dist_countries(self, filter_asset=None): + """ + Distribute our advert into all countries and providers listed in the config. + Exits on errors. + :return: False or dict with response + :rtype: bool or dict + """ + dist_list = list(self.markets.create_distribution_list(filter_asset)) + our_ads = self.enum_ads() + supported_currencies, account_info = self.markets.get_valid_account_details() + # Let's get rid of the ad IDs and make it a tuple like dist_list + our_ads = [(x[0], x[2], x[3], x[4]) for x in our_ads] + for asset, countrycode, currency, provider in dist_list: + if (asset, countrycode, currency, provider) not in our_ads: + if currency in supported_currencies: + # Create the actual ad and pass in all the stuff + rtrn = self.create_ad(asset, countrycode, currency, provider, payment_details=account_info[currency]) + # Bail on first error, let's not continue + if rtrn is False: + return False + yield rtrn + + def redist_countries(self): + """ + Redistribute our advert details into all our listed adverts. + This will edit all ads and update the details. Only works if we have already run dist. + This will not post any new ads. + Exits on errors. + :return: False or dict with response + :rtype: bool or dict + """ + our_ads = self.enum_ads() + supported_currencies, account_info = self.markets.get_valid_account_details() + for asset, ad_id, countrycode, currency, provider in our_ads: + if currency in supported_currencies: + rtrn = self.create_ad( + asset, countrycode, currency, provider, payment_details=account_info[currency], edit=True, ad_id=ad_id + ) + # Bail on first error, let's not continue + if rtrn is False: + return False + yield (rtrn, ad_id) + + @util.handle_exceptions + def strip_duplicate_ads(self): + """ + Remove duplicate ads. + :return: list of duplicate ads + :rtype: list + """ + existing_ads = self.enum_ads() + _size = len(existing_ads) + repeated = [] + for i in range(_size): + k = i + 1 + for j in range(k, _size): + if existing_ads[i] == existing_ads[j] and existing_ads[i] not in repeated: + repeated.append(existing_ads[i]) + + actioned = [] + for ad_id, country, currency in repeated: + return all(actioned) + + @util.handle_exceptions + def release_funds(self, contact_id): + """ + Release funds for a contact_id. + :param contact_id: trade/contact ID + :type contact_id: string + :return: response dict + :rtype: dict + """ + if settings.LocalBitcoins.Dummy == "1": + self.log.error(f"Running in dummy mode, not releasing funds for {contact_id}") + return + payload = {"tradeId": contact_id, "password": settings.LocalBitcoins.Pass} + rtrn = self.agora._api_call(api_method=f"contact_release/{contact_id}", http_method="POST", query_values=payload) + + # Check if we can withdraw funds + self.withdraw_funds() + + return rtrn + + # TODO: rewrite to handle BTC + @util.handle_exceptions + def withdraw_funds(self): + """ + Withdraw excess funds to our XMR wallets. + """ + totals_all = self.tx.get_total() + if totals_all is False: + return False + + wallet_xmr, _ = totals_all[2] + + # Get the wallet balances in USD + total_usd = totals_all[0][1] + + # total_trades_usd = self.tx.get_open_trades_usd() + if not total_usd: + return False + # total_usd += total_trades_usd + # print("total_usd after trades add", total_usd) + + profit_usd = total_usd - float(settings.Money.BaseUSD) + # Get the XMR -> USD exchange rate + xmr_usd = self.cg.get_price(ids="bitcoin", vs_currencies=["USD"]) + + # Convert the USD total to XMR + profit_usd_in_xmr = float(profit_usd) / xmr_usd["bitcoin"]["usd"] + + # Check profit is above zero + if not profit_usd >= 0: + return + + if not float(wallet_xmr) > profit_usd_in_xmr: + # Not enough funds to withdraw + # TODO: Send a message to notify + self.log.error(f"Not enough funds to withdraw {profit_usd_in_xmr}, as wallet only contains {wallet_xmr}") + self.irc.sendmsg(f"Not enough funds to withdraw {profit_usd_in_xmr}, as wallet only contains {wallet_xmr}") + return + + if not profit_usd >= float(settings.Money.WithdrawLimit): + # Not enough profit to withdraw + return + + half = profit_usd_in_xmr / 2 + + half_rounded = round(half, 8) + + # Read OTP secret + with open("otp.key", "r") as f: + otp_key = f.read() + f.close() + otp_key = otp_key.replace("\n", "") + + # Get OTP code + otp_code = TOTP(otp_key) + + # Set up the format for calling wallet_send_xmr + send_cast = { + "address": None, + "amount": half_rounded, + "password": settings.Agora.Pass, + "otp": otp_code.now(), + } + + send_cast["address"] = settings.XMR.Wallet1 + rtrn1 = self.agora.wallet_send_xmr(**send_cast) + + send_cast["address"] = settings.XMR.Wallet2 + rtrn2 = self.agora.wallet_send_xmr(**send_cast) + + self.irc.sendmsg(f"Withdrawal: {rtrn1['success']} | {rtrn2['success']}") + self.ux.notify.notify_withdrawal(half_rounded)