Add Elasticsearch support

This commit is contained in:
Mark Veidemanis 2022-12-13 07:20:49 +00:00
parent 50820172b1
commit 8afe638f0d
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
7 changed files with 78 additions and 7 deletions

View File

@ -34,6 +34,11 @@ HOOK_PATH = "hook"
NOTIFY_TOPIC = getenv("NOTIFY_TOPIC", "great-fisk")
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
DEBUG = getenv("DEBUG", "false").lower() in trues
PROFILER = getenv("PROFILER", "false").lower() in trues

View File

@ -7,7 +7,7 @@ from alpaca.trading.requests import (
MarketOrderRequest,
)
from core.exchanges import BaseExchange, ExchangeError, GenericAPIError
from core.exchanges import BaseExchange, ExchangeError, GenericAPIError, common
class AlpacaExchange(BaseExchange):
@ -38,6 +38,13 @@ class AlpacaExchange(BaseExchange):
except ValueError:
raise GenericAPIError(f"Balance is not a float: {equity}")
common.get_balance_hook(
self.account.user.id,
self.account.user.username,
self.account.id,
self.account.name,
balance,
)
return balance
def get_market_value(self, symbol): # TODO: pydantic

18
core/exchanges/common.py Normal file
View File

@ -0,0 +1,18 @@
from core.lib.elastic import store_msg
def get_balance_hook(user_id, user_name, account_id, account_name, balance):
"""
Called every time the balance is fetched on an account.
Store this into Elasticsearch.
"""
store_msg(
"balances",
{
"user_id": user_id,
"user_name": user_name,
"account_id": account_id,
"account_name": account_name,
"balance": balance,
},
)

View File

@ -1,7 +1,7 @@
from oandapyV20 import API
from oandapyV20.endpoints import accounts, orders, positions, pricing, trades
from core.exchanges import BaseExchange
from core.exchanges import BaseExchange, common
class OANDAExchange(BaseExchange):
@ -39,7 +39,16 @@ class OANDAExchange(BaseExchange):
def get_balance(self):
r = accounts.AccountSummary(self.account_id)
response = self.call(r)
return float(response["balance"])
balance = float(response["balance"])
common.get_balance_hook(
self.account.user.id,
self.account.user.username,
self.account.id,
self.account.name,
balance,
)
return balance
def get_market_value(self, symbol):
raise NotImplementedError

32
core/lib/elastic.py Normal file
View File

@ -0,0 +1,32 @@
from datetime import datetime
from django.conf import settings
from elasticsearch import Elasticsearch
from core.util import logs
log = logs.get_logger(__name__)
client = None
def initialise_elasticsearch():
"""
Initialise the Elasticsearch client.
"""
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = Elasticsearch(
settings.ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False
)
return client
def store_msg(index, msg):
global client
if not client:
client = initialise_elasticsearch()
if "ts" not in msg:
msg["ts"] = datetime.utcnow().isoformat()
result = client.index(index=index, body=msg)
if not result["result"] == "created":
log.error(f"Indexing of '{msg}' failed: {result}")

View File

@ -29,6 +29,7 @@ services:
networks:
- default
- xf
- elastic
migration:
image: xf/fisk:prod
@ -121,6 +122,8 @@ networks:
driver: bridge
xf:
external: true
elastic:
external: true
volumes:
fisk_static: {}

View File

@ -6,9 +6,6 @@ django-crispy-forms==1.14.0
crispy-bulma
stripe
django-rest-framework
uvloop #
uvicorn[standard] #
gunicorn #
django-htmx
cryptography
django-debug-toolbar
@ -23,4 +20,4 @@ pydantic
alpaca-py
oandapyV20
glom
watchfiles #
elasticsearch