Amend asset filter matching to be more explicit

This commit is contained in:
Mark Veidemanis 2023-02-20 07:20:01 +00:00
parent 9e22abe057
commit 89ef8408e6
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
2 changed files with 201 additions and 48 deletions

View File

@ -22,13 +22,12 @@ class AssetfilterTestCase(TestCase):
Test that the asset filter works on negative aggregations. Test that the asset filter works on negative aggregations.
""" """
# We have negative news about EUR # We have negative news about EUR
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=3, status=3,
) )
self.asset_rule.save()
# This means that: # This means that:
# * base == EUR: long is not allowed, short is allowed # * base == EUR: long is not allowed, short is allowed
@ -47,18 +46,156 @@ class AssetfilterTestCase(TestCase):
# Test that short on quote of EUR is not allowed # Test that short on quote of EUR is not allowed
self.assertFalse(assetfilter.get_allowed(self.group, "USD", "EUR", "short")) self.assertFalse(assetfilter.get_allowed(self.group, "USD", "EUR", "short"))
def test_get_allowed_base_not_listed(self):
self.group.when_no_data = 7 # Always deny
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=6, # Always allow
)
self.assertFalse(assetfilter.get_allowed(self.group, "XAG", "JPY", "long"))
def test_get_allowed_quote_not_listed(self):
self.group.when_no_data = 7
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=6,
)
self.assertFalse(assetfilter.get_allowed(self.group, "JPY", "XAG", "long"))
def test_get_allowed_paranoid_success(self):
"""
Test that when we request explicit checking of both base and quote,
it works.
"""
self.group.when_no_data = 7
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=6,
)
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="XAG",
status=6,
)
self.assertTrue(assetfilter.get_allowed(self.group, "JPY", "XAG", "long"))
def test_get_allowed_paranoid_success_direction(self):
"""
Test that when we request explicit checking of both base and quote,
and specify a direction, it works.
"""
self.group.when_no_data = 7
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=2, # Positive
)
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="XAG",
status=3, # Negative
)
self.assertTrue(assetfilter.get_allowed(self.group, "JPY", "XAG", "long"))
def test_get_allowed_paranoid_fail_direction(self):
"""
Test that when we request explicit checking of both base and quote,
and specify a direction, it works.
"""
self.group.when_no_data = 7
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=3, # Negative
)
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="XAG",
status=2, # Positive
)
self.assertFalse(assetfilter.get_allowed(self.group, "JPY", "XAG", "long"))
def test_get_allowed_paranoid_fail_direction_short(self):
"""
Test that when we request explicit checking of both base and quote,
and specify a direction, it works.
"""
self.group.when_no_data = 7
self.group.save()
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=2, # Positive
)
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="XAG",
status=3, # Negative
)
self.assertFalse(assetfilter.get_allowed(self.group, "JPY", "XAG", "short"))
def test_get_allowed_mixed_sentiment(self):
"""
Test that deny is hit before allow.
"""
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="JPY",
status=6, # Always allow
)
AssetRule.objects.create(
user=self.user,
group=self.group,
asset="XAG",
status=7, # Always deny
)
self.assertFalse(assetfilter.get_allowed(self.group, "JPY", "XAG", "short"))
self.assertFalse(assetfilter.get_allowed(self.group, "XAG", "JPY", "short"))
self.assertFalse(assetfilter.get_allowed(self.group, "JPY", "XAG", "long"))
self.assertFalse(assetfilter.get_allowed(self.group, "XAG", "JPY", "long"))
def test_get_allowed_permitted(self): def test_get_allowed_permitted(self):
""" """
Test that the asset filter works on positive aggregations. Test that the asset filter works on positive aggregations.
""" """
# We have positive news about EUR # We have positive news about EUR
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=2, status=2,
) )
self.asset_rule.save()
# This means that: # This means that:
# * base == EUR: long is allowed, short is not allowed # * base == EUR: long is allowed, short is not allowed
@ -78,80 +215,74 @@ class AssetfilterTestCase(TestCase):
self.assertFalse(assetfilter.get_allowed(self.group, "USD", "EUR", "long")) self.assertFalse(assetfilter.get_allowed(self.group, "USD", "EUR", "long"))
def test_get_allowed_no_data(self): def test_get_allowed_no_data(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=0, status=0,
) )
self.asset_rule.save()
self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_no_data_prohibited(self): def test_get_allowed_no_data_prohibited(self):
self.group.when_no_data = 7 self.group.when_no_data = 7
self.group.save() self.group.save()
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=0, status=0,
) )
self.asset_rule.save()
self.assertFalse(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertFalse(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_no_match(self): def test_get_allowed_no_match(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=1, status=1,
) )
self.asset_rule.save()
self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_no_aggregation(self): def test_get_allowed_no_aggregation(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=4, status=4,
) )
self.asset_rule.save()
self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_not_in_bounds(self): def test_get_allowed_not_in_bounds(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=5, status=5,
) )
self.asset_rule.save()
self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_always_allow(self): def test_get_allowed_always_allow(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=6, status=6,
) )
self.asset_rule.save()
self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertTrue(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))
def test_get_allowed_always_deny(self): def test_get_allowed_always_deny(self):
self.asset_rule = AssetRule.objects.create( AssetRule.objects.create(
user=self.user, user=self.user,
group=self.group, group=self.group,
asset="EUR", asset="EUR",
status=7, status=7,
) )
self.asset_rule.save()
self.assertFalse(assetfilter.get_allowed(self.group, "EUR", "USD", "long")) self.assertFalse(assetfilter.get_allowed(self.group, "EUR", "USD", "long"))

View File

@ -13,44 +13,66 @@ def get_allowed(group, base, quote, side):
# If our base has allowed == False, we can only short it, or long the quote # If our base has allowed == False, we can only short it, or long the quote
base_rule = AssetRule.objects.filter(group=group, asset=base).first() base_rule = AssetRule.objects.filter(group=group, asset=base).first()
if base_rule:
mapped_status = update_status_from_mappings(base_rule.status, group)
if mapped_status == 6:
# Always allow
return True
elif mapped_status == 7:
# Always deny
return False
elif mapped_status == 3:
if side == "long":
return False
elif mapped_status == 2:
if side == "short":
return False
# If our quote has allowed == False, we can only long it, or short the base
quote_rule = AssetRule.objects.filter(group=group, asset=quote).first() quote_rule = AssetRule.objects.filter(group=group, asset=quote).first()
if quote_rule:
mapped_status = update_status_from_mappings(quote_rule.status, group)
if mapped_status == 6:
# Always allow
return True
elif mapped_status == 7:
# Always deny
return False
elif mapped_status == 3:
if side == "short":
return False
elif mapped_status == 2:
if side == "long":
return False
if not base_rule and not quote_rule: if all([x is None for x in [base_rule, quote_rule]]):
# Neither side has data, we can't check any group statuses
if group.when_no_data == 7: if group.when_no_data == 7:
# Always deny # Always deny
return False return False
elif group.when_no_data == 6: elif group.when_no_data == 6:
# Always allow # Always allow
return True return True
# Translate statuses, depending on the group's definitions
if base_rule:
base_mapped_status = update_status_from_mappings(base_rule.status, group)
if quote_rule:
quote_mapped_status = update_status_from_mappings(quote_rule.status, group)
# Check for deny first
if base_rule:
if base_mapped_status == 7:
# Always deny
return False
elif base_mapped_status == 3:
if side == "long":
return False
elif base_mapped_status == 2:
if side == "short":
return False
if quote_rule:
if quote_mapped_status == 7:
# Always deny
return False
elif quote_mapped_status == 3:
if side == "short":
return False
elif quote_mapped_status == 2:
if side == "long":
return False
# Only one side does not have data
if any([x is None for x in [base_rule, quote_rule]]):
if group.when_no_data == 7:
# Always deny
return False
elif group.when_no_data == 6:
# Always allow
return True
# Check for explicit allow
if base_rule:
if base_mapped_status == 6:
# Always allow
return True
if quote_rule:
if quote_mapped_status == 6:
# Always allow
return True
return True return True