# Other library imports import requests import hashlib import hmac import time import json # Project imports from settings import settings import util class Verify(util.Base): """ Class to handle user verification. """ def verification_successful(self, external_user_id): """ Called when verification has been successfully passed. """ self.tx.user_verification_successful(external_user_id) def update_verification_status(self, external_user_id, review_status, review_answer=None): """ Update the authentication status of a external user ID. """ print("Updating verification status", external_user_id, review_status) if review_status == "completed" and review_answer == "GREEN": self.verification_successful(external_user_id) def verify_webhook_signature(self, content, payload_digest): if type(content) == str: content = content.encode("utf-8") # hmac needs bytes signature = hmac.new(settings.Verify.WebHookSecret.encode("utf-8"), content, digestmod=hashlib.sha1).hexdigest() return signature == payload_digest def process_callback(self, content_json): print("CONTENT JSON", content_json) if "externalUserId" in content_json: external_user_id = content_json["externalUserId"] else: self.log.warning("Useless callback received. No external user ID.") return False if "reviewStatus" in content_json: review_status = content_json["reviewStatus"] else: self.log.warning("Useless callback received. No review status.") return False review_answer = None if review_status == "completed": if "reviewResult" in content_json: if "reviewAnswer" in content_json["reviewResult"]: review_answer = content_json["reviewResult"]["reviewAnswer"] self.update_verification_status(external_user_id, review_status, review_answer=review_answer) return True def handle_callback(self, request): """ Handle a webhook callback. """ content = request.content.read() payload_digest = request.getHeader("x-payload-digest") if not self.verify_webhook_signature(content, payload_digest): self.log.error("Webhook is not signed. Aborting.") return False content_json = json.loads(content) rtrn = self.process_callback(content_json) return rtrn def get_external_user_id_details(self, external_user_id): # /resources/applicants/-;externalUserId={externalUserId}/one url = f"{settings.Verify.Base}/resources/applicants/-;externalUserId={external_user_id}/one" resp = self.sign_request(requests.Request("GET", url)) s = requests.Session() response = s.send(resp) info = response.json() if "info" in info: if {"firstName", "lastName"}.issubset(set(info["info"].keys())): first_name = info["info"]["firstName"] last_name = info["info"]["lastName"] if first_name.startswith("MR "): first_name = first_name[3:] print("info", info) return (first_name, last_name) def create_applicant_and_get_link(self, external_user_id): """ Create the applicant and return the authentication link. """ # applicant_id = self.create_applicant(external_user_id) auth_url = self.get_authentication_link(external_user_id) return auth_url def get_authentication_link(self, external_user_id): """ Get an external authentication link for a user. """ # /resources/sdkIntegrations/levels/{levelName}/websdkLink?ttlInSecs={lifetime}&externalUserId={externalUserId}&lang={locale} url = ( f"{settings.Verify.Base}/resources/sdkIntegrations/levels/{settings.Verify.LevelName}" f"/websdkLink?ttlInSecs=36000&externalUserId={external_user_id}" ) resp = self.sign_request(requests.Request("POST", url)) s = requests.Session() response = s.send(resp) verification_url = response.json()["url"] return verification_url def get_applicant_status(self, applicant_id): """ Get the status of an applicant. """ # url = settings.Verify.Base + '/resources/applicants/' + applicant_id + '/requiredIdDocsStatus' url = f"{settings.Verify.Base}/resources/applicants/'{applicant_id}/requiredIdDocsStatus" resp = self.sign_request(requests.Request("GET", url)) s = requests.Session() response = s.send(resp) return response def get_external_user_id_status(self, external_user_id): """ Get the status of an applicant by the external user ID. """ url = settings.Verify.Base + f"/resources/applicants/-;externalUserId={external_user_id}/one" resp = self.sign_request(requests.Request("GET", url)) s = requests.Session() response = s.send(resp) response_json = response.json() if "review" in response_json: if "reviewResult" in response_json["review"]: if "reviewAnswer" in response_json["review"]["reviewResult"]: return response_json["review"]["reviewResult"]["reviewAnswer"] return def create_applicant(self, external_user_id): """ Create an applicant. """ body = {"externalUserId": external_user_id} params = {"levelName": settings.Verify.LevelName} headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"} resp = self.sign_request( requests.Request( "POST", f"{settings.Verify.Base}/resources/applicants?levelName={settings.Verify.LevelName}", params=params, data=json.dumps(body), headers=headers, ) ) s = requests.Session() response = s.send(resp) applicant_id = response.json()["id"] return applicant_id def get_access_token(self, external_user_id, level_name): """ Get an access token for an external user ID. """ params = {"userId": external_user_id, "ttlInSecs": "600", "levelName": level_name} headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"} resp = self.sign_request( requests.Request("POST", f"{settings.Verify.Base}/resources/accessTokens", params=params, headers=headers) ) s = requests.Session() response = s.send(resp) token = response.json()["token"] return token def sign_request(self, request: requests.Request) -> requests.PreparedRequest: """ Sign a request. """ prepared_request = request.prepare() now = int(time.time()) method = request.method.upper() path_url = prepared_request.path_url # includes encoded query params # could be None so we use an empty **byte** string here body = b"" if prepared_request.body is None else prepared_request.body if type(body) == str: body = body.encode("utf-8") data_to_sign = str(now).encode("utf-8") + method.encode("utf-8") + path_url.encode("utf-8") + body # hmac needs bytes signature = hmac.new(settings.Verify.Key.encode("utf-8"), data_to_sign, digestmod=hashlib.sha256) prepared_request.headers["X-App-Token"] = settings.Verify.Token prepared_request.headers["X-App-Access-Ts"] = str(now) prepared_request.headers["X-App-Access-Sig"] = signature.hexdigest() return prepared_request