diff --git a/core/clients/platform.py b/core/clients/platform.py index f90be16..9420aac 100644 --- a/core/clients/platform.py +++ b/core/clients/platform.py @@ -80,17 +80,18 @@ class LocalPlatformClient(ABC): if not dash: return False + print("DASH", dash) if dash["contact_count"] > 0: for contact in dash["contact_list"]: contact_id = contact["data"]["contact_id"] dash_tmp[contact_id] = contact return dash_tmp - async def loop_check(self): + async def poll(self): """ Calls hooks to parse dashboard info and get all contact messages. """ - dashboard_response = await self.api.dashboard() + dashboard_response = await self.call("dashboard") await self.got_dashboard(dashboard_response) # Get recent messages @@ -150,6 +151,7 @@ class LocalPlatformClient(ABC): elif self.name == "lbtc": asset = "BTC" provider = contact["data"]["advertisement"]["payment_method"] + ad_id = provider = contact["data"]["advertisement"]["id"] if asset == "XMR": amount_crypto = contact["data"]["amount_xmr"] elif asset == "BTC": @@ -157,34 +159,22 @@ class LocalPlatformClient(ABC): currency = contact["data"]["currency"] if not contact["data"]["is_selling"]: continue - if reference not in self.last_dash: - reference = await self.new_trade( - self.name, - asset, - contact_id, - buyer, - currency, - amount, - amount_crypto, - provider, - ) - if reference: - if reference not in current_trades: - current_trades.append(reference) - # Let us know there is a new trade - title = "New trade" - message = ( - f"[#] [{reference}] ({self.name}) <{buyer}>" - f" {amount}{currency} {provider} {amount_crypto}{asset}" - ) - await notify.sendmsg(self.instance.user, message, title=title) - # Note that we have seen this reference - self.last_dash.add(reference) + reference = await self.new_trade( + asset, + contact_id, + buyer, + currency, + amount, + amount_crypto, + provider, + ad_id, + ) + if reference: + if reference not in current_trades: + current_trades.append(reference) # Purge old trades from cache - for ref in list(self.last_dash): # We're removing from the list on the fly - if ref not in current_trades: - self.last_dash.remove(ref) + if reference and reference not in current_trades: current_trades.append(reference) messages = await db.cleanup(self.name, current_trades) @@ -205,14 +195,15 @@ class LocalPlatformClient(ABC): if "data" not in messages["response"]: log.error(f"Data not in messages response: {messages['response']}") return False - open_tx = db.get_ref_map().keys() + ref_map = await db.get_ref_map() + open_tx = ref_map.keys() for message in messages["response"]["data"]["message_list"]: contact_id = str(message["contact_id"]) username = message["sender"]["username"] msg = message["msg"] if contact_id not in open_tx: continue - reference = db.tx_to_ref(contact_id) + reference = await db.tx_to_ref(contact_id) if reference in messages_tmp: messages_tmp[reference].append([username, msg]) else: @@ -221,25 +212,38 @@ class LocalPlatformClient(ABC): # Send new messages on IRC if send_irc: for user, message in messages_tmp[reference][::-1]: - if reference in self.last_messages: - if not [user, message] in self.last_messages[reference]: - self.irc.sendmsg( - f"[{reference}] ({self.name}) <{user}> {message}" + if reference in self.instance.last_messages: + if ( + not [user, message] + in self.instance.last_messages[reference] + ): + await notify.sendmsg( + self.instance.user, + f"[{reference}] ({self.name}) <{user}> {message}", + title="New message", ) # Append sent messages to last_messages so we don't send # them again - self.last_messages[reference].append([user, message]) + self.instance.last_messages[reference].append( + [user, message] + ) else: - self.last_messages[reference] = [[user, message]] + self.instance.last_messages[reference] = [[user, message]] for x in messages_tmp[reference]: - self.irc.sendmsg( - f"[{reference}] ({self.name}) <{user}> {message}" + await notify.sendmsg( + self.instance.user, + f"[{reference}] ({self.name}) <{user}> {message}", + title="New message", ) # Purge old trades from cache - for ref in list(self.last_messages): # We're removing from the list on the fly + for ref in list( + self.instance.last_messages + ): # We're removing from the list on the fly if ref not in messages_tmp: - del self.last_messages[ref] + del self.instance.last_messages[ref] + + self.instance.save() return messages_tmp @@ -635,10 +639,19 @@ class LocalPlatformClient(ABC): form["bank_name"] = bank_name if edit: - ad = await self.api.ad(ad_id=ad_id, **form) + ad_response = await self.api.ad(ad_id=ad_id, **form) + if ad_response["success"]: + self.instance.platform_ad_ids[ad_id] = str(ad.id) + self.instance.save() else: - ad = await self.api.ad_create(**form) - return ad + ad_response = await self.api.ad_create(**form) + if ad_response["success"]: + ad_id = ad_response["response"]["data"]["ad_id"] + self.instance.platform_ad_ids[ad_id] = str(ad.id) + self.instance.save() + + print("AD", ad_response) + return ad_response async def dist_countries(self, ad, filter_asset=None): """ @@ -647,12 +660,12 @@ class LocalPlatformClient(ABC): :return: False or dict with response :rtype: bool or dict """ - dist_list = list(self.create_distribution_list(self.name, ad, filter_asset)) + dist_list = list(self.create_distribution_list(ad, filter_asset)) our_ads = await self.enum_ads() ( supported_currencies, account_info, - ) = self.get_valid_account_details(self.name) + ) = self.get_valid_account_details() # Let's get rid of the ad IDs and make it a tuple like dist_list our_ads = [(x[0], x[2], x[3], x[4]) for x in our_ads] if not our_ads: @@ -746,7 +759,6 @@ class LocalPlatformClient(ABC): async def release_funds(self, trade_id, reference): stored_trade = await db.get_ref(reference) - platform = stored_trade["subclass"] logmessage = f"All checks passed, releasing funds for {trade_id} {reference}" log.info(logmessage) title = "Releasing escrow" @@ -797,7 +809,6 @@ class LocalPlatformClient(ABC): if not tx_obj: log.error(f"Could not get TX for {tx}.") return None - platform = stored_trade["subclass"] platform_buyer = stored_trade["buyer"] bank_sender = tx_obj["sender"] trade_id = stored_trade["id"] @@ -807,7 +818,7 @@ class LocalPlatformClient(ABC): elif is_updated is True: # We mapped the trade successfully self.release_funds(trade_id, reference) - antifraud.add_bank_sender(platform, platform_buyer, bank_sender) + antifraud.add_bank_sender(platform_buyer, bank_sender) return True elif is_updated is False: # Already mapped @@ -816,7 +827,6 @@ class LocalPlatformClient(ABC): async def new_trade( self, - subclass, asset, trade_id, buyer, @@ -824,6 +834,7 @@ class LocalPlatformClient(ABC): amount, amount_crypto, provider, + ad_id, ): """ Called when we have a new trade in Agora. @@ -832,7 +843,7 @@ class LocalPlatformClient(ABC): """ reference = "".join(choices(ascii_uppercase, k=5)) reference = f"AGR-{reference}" - existing_ref = db.r.get(f"trade.{trade_id}.reference") + existing_ref = await db.r.get(f"trade.{trade_id}.reference") if not existing_ref: to_store = { "id": trade_id, @@ -844,7 +855,6 @@ class LocalPlatformClient(ABC): "amount_crypto": amount_crypto, "reference": reference, "provider": provider, - "subclass": subclass, } log.info(f"Storing trade information: {str(to_store)}") await db.r.hmset(f"trade.{reference}", to_store) @@ -859,8 +869,13 @@ class LocalPlatformClient(ABC): # self.antifraud.send_verification_url(subclass, uid, trade_id) # else: # User is verified # log.info(f"UID {uid} is verified.") - self.send_bank_details(subclass, currency, trade_id) - self.send_reference(subclass, trade_id, reference) + ad_obj = self.instance.get_ad(ad_id) + if not ad_obj: + log.error(f"Could not get ad object for {ad_id}.") + return + + await self.send_bank_details(currency, trade_id, ad_obj) + await self.send_reference(trade_id, reference) if existing_ref: return db.convert(existing_ref) else: @@ -887,11 +902,10 @@ class LocalPlatformClient(ABC): matching_trades = [] for reference in refs: ref_data = await db.get_ref(reference) - tx_platform = ref_data["subclass"] tx_username = ref_data["buyer"] trade_id = ref_data["id"] currency = ref_data["currency"] - if tx_platform == platform and tx_username == username: + if tx_username == username: to_append = (platform, trade_id, reference, currency) matching_trades.append(to_append) return matching_trades @@ -903,32 +917,34 @@ class LocalPlatformClient(ABC): return (send_setting, post_message) - async def send_reference(self, platform, trade_id, reference): + async def send_reference(self, trade_id, reference): """ Send the reference to a customer. """ - send_setting, post_message = self.get_send_settings(platform) - if send_setting is True: - await post_message( + if self.instance.send is True: + print("SEND IS TRUE REF") + await self.api.contact_message_post( trade_id, f"When sending the payment please use reference code: {reference}", ) - async def send_bank_details(self, platform, currency, trade_id, ad): + async def send_bank_details(self, currency, trade_id, ad): """ Send the bank details to a trade. """ - send_setting, post_message = self.get_send_settings(platform) - log.info(f"Sending bank details/reference for {platform}/{trade_id}") - if send_setting == "1": - account_info = self.get_matching_account_details(platform, currency) + log.info(f"Sending bank details/reference for {trade_id}") + if self.instance.send is True: + print("SEND IS TRUE") + account_info = self.get_matching_account_details(currency) + print("ACCOUNT INFO", account_info) formatted_account_info = self.format_payment_details( currency, account_info, ad, real=True ) + print("formatted_account_info", formatted_account_info) if not formatted_account_info: log.error(f"Payment info invalid: {formatted_account_info}") return - post_message( + await self.api.contact_message_post( trade_id, f"Payment details: \n{formatted_account_info}", ) @@ -1098,7 +1114,7 @@ class LocalPlatformClient(ABC): # log.debug("Cheapest ad above our min that is not us: {x}", x=cheapest_ad) return cheapest_ad_margin - def create_distribution_list(self, platform, ad, filter_asset=None): + def create_distribution_list(self, ad, filter_asset=None): """ Create a list for distribution of ads. :return: generator of asset, countrycode, currency, provider @@ -1125,7 +1141,7 @@ class LocalPlatformClient(ABC): else: yield (asset, countrycode, currency, provider) - def get_valid_account_details(self, platform): + def get_valid_account_details(self): currencies = self.instance.currencies account_info = self.instance.account_info currency_account_info_map = {} @@ -1140,11 +1156,11 @@ class LocalPlatformClient(ABC): ] return (currencies, currency_account_info_map) - def get_matching_account_details(self, platform, currency): + def get_matching_account_details(self, currency): ( supported_currencies, currency_account_info_map, - ) = self.get_valid_account_details(platform) + ) = self.get_valid_account_details() if currency not in supported_currencies: return False return currency_account_info_map[currency] diff --git a/core/lib/db.py b/core/lib/db.py index 80fffbc..a26ac71 100644 --- a/core/lib/db.py +++ b/core/lib/db.py @@ -31,7 +31,8 @@ async def get_refs(): references = [] ref_keys = await r.keys("trade.*.reference") for key in ref_keys: - references.append(r.get(key)) + key_data = await r.get(key) + references.append(key_data) return convert(references) @@ -45,7 +46,8 @@ async def tx_to_ref(tx): """ refs = await get_refs() for reference in refs: - ref_data = convert(await r.hgetall(f"trade.{reference}")) + ref_data = await r.hgetall(f"trade.{reference}") + ref_data = convert(ref_data) if not ref_data: continue if ref_data["id"] == tx: @@ -137,7 +139,8 @@ async def cleanup(subclass, references): :type references: list """ messages = [] - for tx, reference in await get_ref_map().items(): + ref_map = await get_ref_map() + for tx, reference in ref_map.items(): if reference not in references: if await get_subclass(reference) == subclass: logmessage = ( diff --git a/core/lib/notify.py b/core/lib/notify.py index 9326cb9..cebf10f 100644 --- a/core/lib/notify.py +++ b/core/lib/notify.py @@ -23,8 +23,7 @@ async def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic "data": msg, } async with aiohttp.ClientSession() as session: - async with session.post(f"{url}/{topic}", **cast) as response: - response = await response.content() + await session.post(f"{url}/{topic}", **cast) # Sendmsg helper to send a message to a user's notification settings diff --git a/core/management/commands/polling.py b/core/management/commands/polling.py new file mode 100644 index 0000000..6e46956 --- /dev/null +++ b/core/management/commands/polling.py @@ -0,0 +1,54 @@ +import asyncio + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from django.core.management.base import BaseCommand + +from core.clients.aggregators.nordigen import NordigenClient +from core.clients.platforms.agora import AgoraClient +from core.models import Aggregator, Platform +from core.util import logs + +log = logs.get_logger("polling") + +INTERVAL = 5 + +async def poll_aggregator(aggregator): + print("Polling aggregator", aggregator) + +async def poll_platform(platform): + print("Polling platform", platform) + client = await AgoraClient(platform) + await client.poll() + +async def job(): + platforms = Platform.objects.filter(enabled=True) + aggregators = Aggregator.objects.filter(enabled=True) + + tasks = [] + for platform in platforms: + tasks.append(poll_platform(platform)) + + for aggregator in aggregators: + tasks.append(poll_aggregator(aggregator)) + + # Run it all at once + await asyncio.gather(*tasks) + + +class Command(BaseCommand): + def handle(self, *args, **options): + """ + Start the polling process. + """ + scheduler = AsyncIOScheduler() + + log.debug(f"Scheduling polling process job every {INTERVAL} seconds") + scheduler.add_job(job, "interval", seconds=INTERVAL) + scheduler.start() + loop = asyncio.get_event_loop() + try: + loop.run_forever() + except (KeyboardInterrupt, SystemExit): + log.info("Process terminating") + finally: + loop.close() diff --git a/core/migrations/0012_platform_last_messages.py b/core/migrations/0012_platform_last_messages.py new file mode 100644 index 0000000..6a86020 --- /dev/null +++ b/core/migrations/0012_platform_last_messages.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.7 on 2023-03-10 14:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0011_ad_visible'), + ] + + operations = [ + migrations.AddField( + model_name='platform', + name='last_messages', + field=models.JSONField(default=dict), + ), + ] diff --git a/core/migrations/0013_platform_platform_ad_ids.py b/core/migrations/0013_platform_platform_ad_ids.py new file mode 100644 index 0000000..02e22fa --- /dev/null +++ b/core/migrations/0013_platform_platform_ad_ids.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.7 on 2023-03-10 14:24 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0012_platform_last_messages'), + ] + + operations = [ + migrations.AddField( + model_name='platform', + name='platform_ad_ids', + field=models.JSONField(default=dict), + ), + ] diff --git a/core/models.py b/core/models.py index 08ec346..af253af 100644 --- a/core/models.py +++ b/core/models.py @@ -120,11 +120,21 @@ class Platform(models.Model): accept_within_usd = models.FloatField(default=1) no_reference_amount_check_max_usd = models.FloatField(default=400) + last_messages = models.JSONField(default=dict) + platform_ad_ids = models.JSONField(default=dict) + enabled = models.BooleanField(default=True) def __str__(self): return self.name + def get_ad(self, platform_ad_id): + ad_id = self.platform_ad_ids.get(platform_ad_id, None) + if not ad_id: + return None + ad_object = Ad.objects.filter(id=ad_id, user=self.user, enabled=True).first() + return ad_object + @classmethod def get_for_user(cls, user): return cls.objects.filter(user=user, enabled=True) @@ -219,6 +229,10 @@ class Ad(models.Model): visible = models.BooleanField(default=True) enabled = models.BooleanField(default=True) + @classmethod + def get_by_id(cls, ad_id, user): + return cls.objects.filter(id=ad_id, user=user, enabled=True).first() + assets = { "XMR": "Monero", diff --git a/docker-compose.yml b/docker-compose.yml index 69863c5..42e6cef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,6 +60,35 @@ services: - xf - elastic + polling: + image: xf/pluto:prod + container_name: polling_pluto + build: + context: . + args: + OPERATION: ${OPERATION} + command: sh -c '. /venv/bin/activate && python manage.py polling' + volumes: + - ${REPO_DIR}:/code + - ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini + - ${APP_DATABASE_FILE}:/conf/db.sqlite3 + - pluto_static:${STATIC_ROOT} + env_file: + - stack.env + volumes_from: + - tmp + depends_on: + redis: + condition: service_healthy + migration: + condition: service_started + collectstatic: + condition: service_started + networks: + - default + - xf + - elastic + migration: image: xf/pluto:prod container_name: migration_pluto