From 132e58735a563fd0f13e2d598e95369c08df4d66 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 20 Apr 2022 19:08:10 +0100 Subject: [PATCH] Implement webhook callback handling --- handler/ux/verify.py | 57 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/handler/ux/verify.py b/handler/ux/verify.py index 9ce6405..4f0fcdb 100644 --- a/handler/ux/verify.py +++ b/handler/ux/verify.py @@ -15,6 +15,63 @@ class Verify(util.Base): Class to handle user verification. """ + def verification_successful(self, external_user_id): + """ + Called when verification has been successfully passed. + """ + self.tx.user_verification_successful(external_user_id) + + def update_verification_status(self, external_user_id, review_status, review_answer=None): + """ + Update the authentication status of a external user ID. + """ + print("Updating verification status", external_user_id, review_status) + if review_status == "completed" and review_answer == "GREEN": + self.verification_successful(external_user_id) + + def verify_webhook_signature(self, content, payload_digest): + if type(content) == str: + content = content.encode("utf-8") + # hmac needs bytes + signature = hmac.new(settings.Verify.WebHookSecret.encode("utf-8"), content, digestmod=hashlib.sha1).hexdigest() + + return signature == payload_digest + + def process_callback(self, content_json): + print("CONTENT JSON", content_json) + if "externalUserId" in content_json: + external_user_id = content_json["externalUserId"] + else: + self.log.warning("Useless callback received. No external user ID.") + return False + if "reviewStatus" in content_json: + review_status = content_json["reviewStatus"] + else: + self.log.warning("Useless callback received. No review status.") + return False + + review_answer = None + if review_status == "completed": + if "reviewResult" in content_json: + if "reviewAnswer" in content_json["reviewResult"]: + review_answer = content_json["reviewResult"]["reviewAnswer"] + + self.update_verification_status(external_user_id, review_status, review_answer=review_answer) + return True + + def handle_callback(self, request): + """ + Handle a webhook callback. + """ + content = request.content.read() + payload_digest = request.getHeader("x-payload-digest") + if not self.verify_webhook_signature(content, payload_digest): + self.log.error("Webhook is not signed. Aborting.") + return False + content_json = json.loads(content) + rtrn = self.process_callback(content_json) + return rtrn + def create_applicant_and_get_link(self, external_user_id): """ Create the applicant and return the authentication link.