234 lines
8.3 KiB
Python
234 lines
8.3 KiB
Python
# Other library imports
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
|
|
import requests
|
|
import util
|
|
|
|
# Project imports
|
|
from settings import settings
|
|
|
|
|
|
class Verify(util.Base):
|
|
"""
|
|
Class to handle user verification.
|
|
"""
|
|
|
|
def create_uid(self, platform, username):
|
|
return f"{platform}|{username}"
|
|
|
|
def get_uid(self, external_user_id):
|
|
"""
|
|
Get the platform and username from the external user ID.
|
|
"""
|
|
spl = external_user_id.split("|")
|
|
if not len(spl) == 2:
|
|
self.log.error(f"Split invalid, cannot get customer: {spl}")
|
|
return False
|
|
platform, username = spl
|
|
return (platform, username)
|
|
|
|
def verification_successful(self, external_user_id):
|
|
"""
|
|
Called when verification has been successfully passed.
|
|
"""
|
|
self.antifraud.user_verification_successful(external_user_id)
|
|
|
|
def update_verification_status(
|
|
self, external_user_id, review_status, review_answer=None
|
|
):
|
|
"""
|
|
Update the authentication status of a external user ID.
|
|
"""
|
|
if review_status == "completed" and review_answer == "GREEN":
|
|
self.verification_successful(external_user_id)
|
|
|
|
def verify_webhook_signature(self, content, payload_digest):
|
|
if type(content) == str:
|
|
content = content.encode("utf-8")
|
|
# hmac needs bytes
|
|
signature = hmac.new(
|
|
settings.Verify.WebHookSecret.encode("utf-8"),
|
|
content,
|
|
digestmod=hashlib.sha1,
|
|
).hexdigest()
|
|
|
|
return signature == payload_digest
|
|
|
|
def process_callback(self, content_json):
|
|
if "externalUserId" in content_json:
|
|
external_user_id = content_json["externalUserId"]
|
|
else:
|
|
self.log.warning("Useless callback received. No external user ID.")
|
|
return False
|
|
if "reviewStatus" in content_json:
|
|
review_status = content_json["reviewStatus"]
|
|
else:
|
|
self.log.warning("Useless callback received. No review status.")
|
|
return False
|
|
|
|
review_answer = None
|
|
if review_status == "completed":
|
|
if "reviewResult" in content_json:
|
|
if "reviewAnswer" in content_json["reviewResult"]:
|
|
review_answer = content_json["reviewResult"]["reviewAnswer"]
|
|
|
|
self.update_verification_status(
|
|
external_user_id, review_status, review_answer=review_answer
|
|
)
|
|
return True
|
|
|
|
def handle_callback(self, request):
|
|
"""
|
|
Handle a webhook callback.
|
|
"""
|
|
content = request.content.read()
|
|
payload_digest = request.getHeader("x-payload-digest")
|
|
if not self.verify_webhook_signature(content, payload_digest):
|
|
self.log.error("Webhook is not signed. Aborting.")
|
|
return False
|
|
content_json = json.loads(content)
|
|
rtrn = self.process_callback(content_json)
|
|
return rtrn
|
|
|
|
def get_external_user_id_details(self, external_user_id):
|
|
# /resources/applicants/-;externalUserId={externalUserId}/one
|
|
url = f"{settings.Verify.Base}/resources/applicants/-;externalUserId={external_user_id}/one"
|
|
resp = self.sign_request(requests.Request("GET", url))
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
info = response.json()
|
|
if "info" in info:
|
|
if {"firstName", "lastName"}.issubset(set(info["info"].keys())):
|
|
first_name = info["info"]["firstName"]
|
|
last_name = info["info"]["lastName"]
|
|
if first_name.startswith("MR "):
|
|
first_name = first_name[3:]
|
|
return (first_name, last_name)
|
|
|
|
def create_applicant_and_get_link(self, external_user_id):
|
|
"""
|
|
Create the applicant and return the authentication link.
|
|
"""
|
|
# applicant_id = self.create_applicant(external_user_id)
|
|
auth_url = self.get_authentication_link(external_user_id)
|
|
return auth_url
|
|
|
|
def get_authentication_link(self, external_user_id):
|
|
"""
|
|
Get an external authentication link for a user.
|
|
"""
|
|
# /resources/sdkIntegrations/levels/{levelName}/websdkLink?ttlInSecs={lifetime}&externalUserId={externalUserId}&lang={locale}
|
|
url = (
|
|
f"{settings.Verify.Base}/resources/sdkIntegrations/levels/{settings.Verify.LevelName}"
|
|
f"/websdkLink?ttlInSecs=36000&externalUserId={external_user_id}"
|
|
)
|
|
resp = self.sign_request(requests.Request("POST", url))
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
verification_url = response.json()["url"]
|
|
return verification_url
|
|
|
|
def get_applicant_status(self, applicant_id):
|
|
"""
|
|
Get the status of an applicant.
|
|
"""
|
|
# url = settings.Verify.Base + '/resources/applicants/' + applicant_id + '/requiredIdDocsStatus'
|
|
url = f"{settings.Verify.Base}/resources/applicants/'{applicant_id}/requiredIdDocsStatus"
|
|
resp = self.sign_request(requests.Request("GET", url))
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
return response
|
|
|
|
def get_external_user_id_status(self, external_user_id):
|
|
"""
|
|
Get the status of an applicant by the external user ID.
|
|
"""
|
|
url = (
|
|
settings.Verify.Base
|
|
+ f"/resources/applicants/-;externalUserId={external_user_id}/one"
|
|
)
|
|
resp = self.sign_request(requests.Request("GET", url))
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
response_json = response.json()
|
|
if "review" in response_json:
|
|
if "reviewResult" in response_json["review"]:
|
|
if "reviewAnswer" in response_json["review"]["reviewResult"]:
|
|
return response_json["review"]["reviewResult"]["reviewAnswer"]
|
|
return
|
|
|
|
def create_applicant(self, external_user_id):
|
|
"""
|
|
Create an applicant.
|
|
"""
|
|
body = {"externalUserId": external_user_id}
|
|
params = {"levelName": settings.Verify.LevelName}
|
|
headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"}
|
|
resp = self.sign_request(
|
|
requests.Request(
|
|
"POST",
|
|
f"{settings.Verify.Base}/resources/applicants?levelName={settings.Verify.LevelName}",
|
|
params=params,
|
|
data=json.dumps(body),
|
|
headers=headers,
|
|
)
|
|
)
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
applicant_id = response.json()["id"]
|
|
return applicant_id
|
|
|
|
def get_access_token(self, external_user_id, level_name):
|
|
"""
|
|
Get an access token for an external user ID.
|
|
"""
|
|
params = {
|
|
"userId": external_user_id,
|
|
"ttlInSecs": "600",
|
|
"levelName": level_name,
|
|
}
|
|
headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"}
|
|
resp = self.sign_request(
|
|
requests.Request(
|
|
"POST",
|
|
f"{settings.Verify.Base}/resources/accessTokens",
|
|
params=params,
|
|
headers=headers,
|
|
)
|
|
)
|
|
s = requests.Session()
|
|
response = s.send(resp)
|
|
token = response.json()["token"]
|
|
|
|
return token
|
|
|
|
def sign_request(self, request: requests.Request) -> requests.PreparedRequest:
|
|
"""
|
|
Sign a request.
|
|
"""
|
|
prepared_request = request.prepare()
|
|
now = int(time.time())
|
|
method = request.method.upper()
|
|
path_url = prepared_request.path_url # includes encoded query params
|
|
# could be None so we use an empty **byte** string here
|
|
body = b"" if prepared_request.body is None else prepared_request.body
|
|
if type(body) == str:
|
|
body = body.encode("utf-8")
|
|
data_to_sign = (
|
|
str(now).encode("utf-8")
|
|
+ method.encode("utf-8")
|
|
+ path_url.encode("utf-8")
|
|
+ body
|
|
)
|
|
# hmac needs bytes
|
|
signature = hmac.new(
|
|
settings.Verify.Key.encode("utf-8"), data_to_sign, digestmod=hashlib.sha256
|
|
)
|
|
prepared_request.headers["X-App-Token"] = settings.Verify.Token
|
|
prepared_request.headers["X-App-Access-Ts"] = str(now)
|
|
prepared_request.headers["X-App-Access-Sig"] = signature.hexdigest()
|
|
return prepared_request
|