Implement webhook callback handling

This commit is contained in:
Mark Veidemanis 2022-04-20 19:08:10 +01:00
parent 4a1c359bf9
commit 132e58735a
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
1 changed files with 57 additions and 0 deletions

View File

@ -15,6 +15,63 @@ class Verify(util.Base):
Class to handle user verification.
"""
def verification_successful(self, external_user_id):
"""
Called when verification has been successfully passed.
"""
self.tx.user_verification_successful(external_user_id)
def update_verification_status(self, external_user_id, review_status, review_answer=None):
"""
Update the authentication status of a external user ID.
"""
print("Updating verification status", external_user_id, review_status)
if review_status == "completed" and review_answer == "GREEN":
self.verification_successful(external_user_id)
def verify_webhook_signature(self, content, payload_digest):
if type(content) == str:
content = content.encode("utf-8")
# hmac needs bytes
signature = hmac.new(settings.Verify.WebHookSecret.encode("utf-8"), content, digestmod=hashlib.sha1).hexdigest()
return signature == payload_digest
def process_callback(self, content_json):
print("CONTENT JSON", content_json)
if "externalUserId" in content_json:
external_user_id = content_json["externalUserId"]
else:
self.log.warning("Useless callback received. No external user ID.")
return False
if "reviewStatus" in content_json:
review_status = content_json["reviewStatus"]
else:
self.log.warning("Useless callback received. No review status.")
return False
review_answer = None
if review_status == "completed":
if "reviewResult" in content_json:
if "reviewAnswer" in content_json["reviewResult"]:
review_answer = content_json["reviewResult"]["reviewAnswer"]
self.update_verification_status(external_user_id, review_status, review_answer=review_answer)
return True
def handle_callback(self, request):
"""
Handle a webhook callback.
"""
content = request.content.read()
payload_digest = request.getHeader("x-payload-digest")
if not self.verify_webhook_signature(content, payload_digest):
self.log.error("Webhook is not signed. Aborting.")
return False
content_json = json.loads(content)
rtrn = self.process_callback(content_json)
return rtrn
def create_applicant_and_get_link(self, external_user_id):
"""
Create the applicant and return the authentication link.