Implement aiohttp
This commit is contained in:
45
db.py
45
db.py
@@ -19,8 +19,7 @@ def store_message(msg):
|
||||
Store a message into Manticore
|
||||
:param msg: dict
|
||||
"""
|
||||
log.info(f"store_message() {msg}")
|
||||
|
||||
print("DISCORD MSGLEN", len(msg["msg"]))
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
@@ -46,8 +45,46 @@ def store_message(msg):
|
||||
#print(body_post)
|
||||
try:
|
||||
# Bulk index operations
|
||||
api_response = api_instance.bulk(body_post)
|
||||
pprint(api_response)
|
||||
api_response = api_instance.bulk(body_post, async_req=True)
|
||||
#print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
|
||||
async def store_message_bulk(messages):
|
||||
"""
|
||||
Store a message into Manticore
|
||||
:param msg: dict
|
||||
"""
|
||||
print("BULK", len(messages))
|
||||
total = []
|
||||
for msg in messages:
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
if key in schema:
|
||||
if isinstance(value, int):
|
||||
if schema[key].startswith("string"):
|
||||
msg[key] = str(value)
|
||||
|
||||
body = {
|
||||
"insert": {
|
||||
"index": "main",
|
||||
"doc": msg
|
||||
}
|
||||
}
|
||||
total.append(body)
|
||||
|
||||
body_post = ""
|
||||
for item in total:
|
||||
body_post += json.dumps(item)
|
||||
body_post += "\n"
|
||||
|
||||
#print(body_post)
|
||||
try:
|
||||
# Bulk index operations
|
||||
api_response = api_instance.bulk(body_post, async_req=True)
|
||||
#print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user