Fix sending references and bank details

This commit is contained in:
Mark Veidemanis 2023-03-10 14:51:34 +00:00
parent 0148525c8b
commit c95d9d7557
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
8 changed files with 225 additions and 74 deletions

View File

@ -80,17 +80,18 @@ class LocalPlatformClient(ABC):
if not dash:
return False
print("DASH", dash)
if dash["contact_count"] > 0:
for contact in dash["contact_list"]:
contact_id = contact["data"]["contact_id"]
dash_tmp[contact_id] = contact
return dash_tmp
async def loop_check(self):
async def poll(self):
"""
Calls hooks to parse dashboard info and get all contact messages.
"""
dashboard_response = await self.api.dashboard()
dashboard_response = await self.call("dashboard")
await self.got_dashboard(dashboard_response)
# Get recent messages
@ -150,6 +151,7 @@ class LocalPlatformClient(ABC):
elif self.name == "lbtc":
asset = "BTC"
provider = contact["data"]["advertisement"]["payment_method"]
ad_id = provider = contact["data"]["advertisement"]["id"]
if asset == "XMR":
amount_crypto = contact["data"]["amount_xmr"]
elif asset == "BTC":
@ -157,34 +159,22 @@ class LocalPlatformClient(ABC):
currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]:
continue
if reference not in self.last_dash:
reference = await self.new_trade(
self.name,
asset,
contact_id,
buyer,
currency,
amount,
amount_crypto,
provider,
)
if reference:
if reference not in current_trades:
current_trades.append(reference)
# Let us know there is a new trade
title = "New trade"
message = (
f"[#] [{reference}] ({self.name}) <{buyer}>"
f" {amount}{currency} {provider} {amount_crypto}{asset}"
)
await notify.sendmsg(self.instance.user, message, title=title)
# Note that we have seen this reference
self.last_dash.add(reference)
reference = await self.new_trade(
asset,
contact_id,
buyer,
currency,
amount,
amount_crypto,
provider,
ad_id,
)
if reference:
if reference not in current_trades:
current_trades.append(reference)
# Purge old trades from cache
for ref in list(self.last_dash): # We're removing from the list on the fly
if ref not in current_trades:
self.last_dash.remove(ref)
if reference and reference not in current_trades:
current_trades.append(reference)
messages = await db.cleanup(self.name, current_trades)
@ -205,14 +195,15 @@ class LocalPlatformClient(ABC):
if "data" not in messages["response"]:
log.error(f"Data not in messages response: {messages['response']}")
return False
open_tx = db.get_ref_map().keys()
ref_map = await db.get_ref_map()
open_tx = ref_map.keys()
for message in messages["response"]["data"]["message_list"]:
contact_id = str(message["contact_id"])
username = message["sender"]["username"]
msg = message["msg"]
if contact_id not in open_tx:
continue
reference = db.tx_to_ref(contact_id)
reference = await db.tx_to_ref(contact_id)
if reference in messages_tmp:
messages_tmp[reference].append([username, msg])
else:
@ -221,25 +212,38 @@ class LocalPlatformClient(ABC):
# Send new messages on IRC
if send_irc:
for user, message in messages_tmp[reference][::-1]:
if reference in self.last_messages:
if not [user, message] in self.last_messages[reference]:
self.irc.sendmsg(
f"[{reference}] ({self.name}) <{user}> {message}"
if reference in self.instance.last_messages:
if (
not [user, message]
in self.instance.last_messages[reference]
):
await notify.sendmsg(
self.instance.user,
f"[{reference}] ({self.name}) <{user}> {message}",
title="New message",
)
# Append sent messages to last_messages so we don't send
# them again
self.last_messages[reference].append([user, message])
self.instance.last_messages[reference].append(
[user, message]
)
else:
self.last_messages[reference] = [[user, message]]
self.instance.last_messages[reference] = [[user, message]]
for x in messages_tmp[reference]:
self.irc.sendmsg(
f"[{reference}] ({self.name}) <{user}> {message}"
await notify.sendmsg(
self.instance.user,
f"[{reference}] ({self.name}) <{user}> {message}",
title="New message",
)
# Purge old trades from cache
for ref in list(self.last_messages): # We're removing from the list on the fly
for ref in list(
self.instance.last_messages
): # We're removing from the list on the fly
if ref not in messages_tmp:
del self.last_messages[ref]
del self.instance.last_messages[ref]
self.instance.save()
return messages_tmp
@ -635,10 +639,19 @@ class LocalPlatformClient(ABC):
form["bank_name"] = bank_name
if edit:
ad = await self.api.ad(ad_id=ad_id, **form)
ad_response = await self.api.ad(ad_id=ad_id, **form)
if ad_response["success"]:
self.instance.platform_ad_ids[ad_id] = str(ad.id)
self.instance.save()
else:
ad = await self.api.ad_create(**form)
return ad
ad_response = await self.api.ad_create(**form)
if ad_response["success"]:
ad_id = ad_response["response"]["data"]["ad_id"]
self.instance.platform_ad_ids[ad_id] = str(ad.id)
self.instance.save()
print("AD", ad_response)
return ad_response
async def dist_countries(self, ad, filter_asset=None):
"""
@ -647,12 +660,12 @@ class LocalPlatformClient(ABC):
:return: False or dict with response
:rtype: bool or dict
"""
dist_list = list(self.create_distribution_list(self.name, ad, filter_asset))
dist_list = list(self.create_distribution_list(ad, filter_asset))
our_ads = await self.enum_ads()
(
supported_currencies,
account_info,
) = self.get_valid_account_details(self.name)
) = self.get_valid_account_details()
# Let's get rid of the ad IDs and make it a tuple like dist_list
our_ads = [(x[0], x[2], x[3], x[4]) for x in our_ads]
if not our_ads:
@ -746,7 +759,6 @@ class LocalPlatformClient(ABC):
async def release_funds(self, trade_id, reference):
stored_trade = await db.get_ref(reference)
platform = stored_trade["subclass"]
logmessage = f"All checks passed, releasing funds for {trade_id} {reference}"
log.info(logmessage)
title = "Releasing escrow"
@ -797,7 +809,6 @@ class LocalPlatformClient(ABC):
if not tx_obj:
log.error(f"Could not get TX for {tx}.")
return None
platform = stored_trade["subclass"]
platform_buyer = stored_trade["buyer"]
bank_sender = tx_obj["sender"]
trade_id = stored_trade["id"]
@ -807,7 +818,7 @@ class LocalPlatformClient(ABC):
elif is_updated is True:
# We mapped the trade successfully
self.release_funds(trade_id, reference)
antifraud.add_bank_sender(platform, platform_buyer, bank_sender)
antifraud.add_bank_sender(platform_buyer, bank_sender)
return True
elif is_updated is False:
# Already mapped
@ -816,7 +827,6 @@ class LocalPlatformClient(ABC):
async def new_trade(
self,
subclass,
asset,
trade_id,
buyer,
@ -824,6 +834,7 @@ class LocalPlatformClient(ABC):
amount,
amount_crypto,
provider,
ad_id,
):
"""
Called when we have a new trade in Agora.
@ -832,7 +843,7 @@ class LocalPlatformClient(ABC):
"""
reference = "".join(choices(ascii_uppercase, k=5))
reference = f"AGR-{reference}"
existing_ref = db.r.get(f"trade.{trade_id}.reference")
existing_ref = await db.r.get(f"trade.{trade_id}.reference")
if not existing_ref:
to_store = {
"id": trade_id,
@ -844,7 +855,6 @@ class LocalPlatformClient(ABC):
"amount_crypto": amount_crypto,
"reference": reference,
"provider": provider,
"subclass": subclass,
}
log.info(f"Storing trade information: {str(to_store)}")
await db.r.hmset(f"trade.{reference}", to_store)
@ -859,8 +869,13 @@ class LocalPlatformClient(ABC):
# self.antifraud.send_verification_url(subclass, uid, trade_id)
# else: # User is verified
# log.info(f"UID {uid} is verified.")
self.send_bank_details(subclass, currency, trade_id)
self.send_reference(subclass, trade_id, reference)
ad_obj = self.instance.get_ad(ad_id)
if not ad_obj:
log.error(f"Could not get ad object for {ad_id}.")
return
await self.send_bank_details(currency, trade_id, ad_obj)
await self.send_reference(trade_id, reference)
if existing_ref:
return db.convert(existing_ref)
else:
@ -887,11 +902,10 @@ class LocalPlatformClient(ABC):
matching_trades = []
for reference in refs:
ref_data = await db.get_ref(reference)
tx_platform = ref_data["subclass"]
tx_username = ref_data["buyer"]
trade_id = ref_data["id"]
currency = ref_data["currency"]
if tx_platform == platform and tx_username == username:
if tx_username == username:
to_append = (platform, trade_id, reference, currency)
matching_trades.append(to_append)
return matching_trades
@ -903,32 +917,34 @@ class LocalPlatformClient(ABC):
return (send_setting, post_message)
async def send_reference(self, platform, trade_id, reference):
async def send_reference(self, trade_id, reference):
"""
Send the reference to a customer.
"""
send_setting, post_message = self.get_send_settings(platform)
if send_setting is True:
await post_message(
if self.instance.send is True:
print("SEND IS TRUE REF")
await self.api.contact_message_post(
trade_id,
f"When sending the payment please use reference code: {reference}",
)
async def send_bank_details(self, platform, currency, trade_id, ad):
async def send_bank_details(self, currency, trade_id, ad):
"""
Send the bank details to a trade.
"""
send_setting, post_message = self.get_send_settings(platform)
log.info(f"Sending bank details/reference for {platform}/{trade_id}")
if send_setting == "1":
account_info = self.get_matching_account_details(platform, currency)
log.info(f"Sending bank details/reference for {trade_id}")
if self.instance.send is True:
print("SEND IS TRUE")
account_info = self.get_matching_account_details(currency)
print("ACCOUNT INFO", account_info)
formatted_account_info = self.format_payment_details(
currency, account_info, ad, real=True
)
print("formatted_account_info", formatted_account_info)
if not formatted_account_info:
log.error(f"Payment info invalid: {formatted_account_info}")
return
post_message(
await self.api.contact_message_post(
trade_id,
f"Payment details: \n{formatted_account_info}",
)
@ -1098,7 +1114,7 @@ class LocalPlatformClient(ABC):
# log.debug("Cheapest ad above our min that is not us: {x}", x=cheapest_ad)
return cheapest_ad_margin
def create_distribution_list(self, platform, ad, filter_asset=None):
def create_distribution_list(self, ad, filter_asset=None):
"""
Create a list for distribution of ads.
:return: generator of asset, countrycode, currency, provider
@ -1125,7 +1141,7 @@ class LocalPlatformClient(ABC):
else:
yield (asset, countrycode, currency, provider)
def get_valid_account_details(self, platform):
def get_valid_account_details(self):
currencies = self.instance.currencies
account_info = self.instance.account_info
currency_account_info_map = {}
@ -1140,11 +1156,11 @@ class LocalPlatformClient(ABC):
]
return (currencies, currency_account_info_map)
def get_matching_account_details(self, platform, currency):
def get_matching_account_details(self, currency):
(
supported_currencies,
currency_account_info_map,
) = self.get_valid_account_details(platform)
) = self.get_valid_account_details()
if currency not in supported_currencies:
return False
return currency_account_info_map[currency]

View File

@ -31,7 +31,8 @@ async def get_refs():
references = []
ref_keys = await r.keys("trade.*.reference")
for key in ref_keys:
references.append(r.get(key))
key_data = await r.get(key)
references.append(key_data)
return convert(references)
@ -45,7 +46,8 @@ async def tx_to_ref(tx):
"""
refs = await get_refs()
for reference in refs:
ref_data = convert(await r.hgetall(f"trade.{reference}"))
ref_data = await r.hgetall(f"trade.{reference}")
ref_data = convert(ref_data)
if not ref_data:
continue
if ref_data["id"] == tx:
@ -137,7 +139,8 @@ async def cleanup(subclass, references):
:type references: list
"""
messages = []
for tx, reference in await get_ref_map().items():
ref_map = await get_ref_map()
for tx, reference in ref_map.items():
if reference not in references:
if await get_subclass(reference) == subclass:
logmessage = (

View File

@ -23,8 +23,7 @@ async def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic
"data": msg,
}
async with aiohttp.ClientSession() as session:
async with session.post(f"{url}/{topic}", **cast) as response:
response = await response.content()
await session.post(f"{url}/{topic}", **cast)
# Sendmsg helper to send a message to a user's notification settings

View File

@ -0,0 +1,54 @@
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from django.core.management.base import BaseCommand
from core.clients.aggregators.nordigen import NordigenClient
from core.clients.platforms.agora import AgoraClient
from core.models import Aggregator, Platform
from core.util import logs
log = logs.get_logger("polling")
INTERVAL = 5
async def poll_aggregator(aggregator):
print("Polling aggregator", aggregator)
async def poll_platform(platform):
print("Polling platform", platform)
client = await AgoraClient(platform)
await client.poll()
async def job():
platforms = Platform.objects.filter(enabled=True)
aggregators = Aggregator.objects.filter(enabled=True)
tasks = []
for platform in platforms:
tasks.append(poll_platform(platform))
for aggregator in aggregators:
tasks.append(poll_aggregator(aggregator))
# Run it all at once
await asyncio.gather(*tasks)
class Command(BaseCommand):
def handle(self, *args, **options):
"""
Start the polling process.
"""
scheduler = AsyncIOScheduler()
log.debug(f"Scheduling polling process job every {INTERVAL} seconds")
scheduler.add_job(job, "interval", seconds=INTERVAL)
scheduler.start()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.7 on 2023-03-10 14:02
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0011_ad_visible'),
]
operations = [
migrations.AddField(
model_name='platform',
name='last_messages',
field=models.JSONField(default=dict),
),
]

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.7 on 2023-03-10 14:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0012_platform_last_messages'),
]
operations = [
migrations.AddField(
model_name='platform',
name='platform_ad_ids',
field=models.JSONField(default=dict),
),
]

View File

@ -120,11 +120,21 @@ class Platform(models.Model):
accept_within_usd = models.FloatField(default=1)
no_reference_amount_check_max_usd = models.FloatField(default=400)
last_messages = models.JSONField(default=dict)
platform_ad_ids = models.JSONField(default=dict)
enabled = models.BooleanField(default=True)
def __str__(self):
return self.name
def get_ad(self, platform_ad_id):
ad_id = self.platform_ad_ids.get(platform_ad_id, None)
if not ad_id:
return None
ad_object = Ad.objects.filter(id=ad_id, user=self.user, enabled=True).first()
return ad_object
@classmethod
def get_for_user(cls, user):
return cls.objects.filter(user=user, enabled=True)
@ -219,6 +229,10 @@ class Ad(models.Model):
visible = models.BooleanField(default=True)
enabled = models.BooleanField(default=True)
@classmethod
def get_by_id(cls, ad_id, user):
return cls.objects.filter(id=ad_id, user=user, enabled=True).first()
assets = {
"XMR": "Monero",

View File

@ -60,6 +60,35 @@ services:
- xf
- elastic
polling:
image: xf/pluto:prod
container_name: polling_pluto
build:
context: .
args:
OPERATION: ${OPERATION}
command: sh -c '. /venv/bin/activate && python manage.py polling'
volumes:
- ${REPO_DIR}:/code
- ${REPO_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini
- ${APP_DATABASE_FILE}:/conf/db.sqlite3
- pluto_static:${STATIC_ROOT}
env_file:
- stack.env
volumes_from:
- tmp
depends_on:
redis:
condition: service_healthy
migration:
condition: service_started
collectstatic:
condition: service_started
networks:
- default
- xf
- elastic
migration:
image: xf/pluto:prod
container_name: migration_pluto