2022-04-20 12:01:38 +00:00
|
|
|
# Other library imports
|
2022-04-20 12:18:56 +00:00
|
|
|
import requests
|
|
|
|
import hashlib
|
|
|
|
import hmac
|
|
|
|
import time
|
|
|
|
import json
|
2022-04-20 12:01:38 +00:00
|
|
|
|
|
|
|
# Project imports
|
2022-04-20 12:18:56 +00:00
|
|
|
from settings import settings
|
2022-04-20 12:01:38 +00:00
|
|
|
import util
|
|
|
|
|
|
|
|
|
|
|
|
class Verify(util.Base):
|
|
|
|
"""
|
|
|
|
Class to handle user verification.
|
|
|
|
"""
|
2022-04-20 12:18:56 +00:00
|
|
|
|
|
|
|
def get_authentication_link(self, external_user_id):
|
|
|
|
"""
|
|
|
|
Get an external authentication link for a user.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def get_applicant_status(self, applicant_id):
|
|
|
|
"""
|
|
|
|
Get the status of an applicant.
|
|
|
|
"""
|
|
|
|
# url = settings.Verify.Base + '/resources/applicants/' + applicant_id + '/requiredIdDocsStatus'
|
|
|
|
url = f"{settings.Verify.Base}/resources/applicants/'{applicant_id}/requiredIdDocsStatus"
|
|
|
|
resp = self.sign_request(requests.Request("GET", url))
|
|
|
|
s = requests.Session()
|
|
|
|
response = s.send(resp)
|
|
|
|
return response
|
|
|
|
|
|
|
|
def get_external_user_id_status(self, external_user_id):
|
|
|
|
"""
|
|
|
|
Get the status of an applicant by the external user ID.
|
|
|
|
"""
|
|
|
|
url = settings.Verify.Base + f"/resources/applicants/-;externalUserId={external_user_id}/one"
|
|
|
|
resp = self.sign_request(requests.Request("GET", url))
|
|
|
|
s = requests.Session()
|
|
|
|
response = s.send(resp)
|
|
|
|
return response
|
|
|
|
|
|
|
|
def create_applicant(self, external_user_id, level_name):
|
|
|
|
"""
|
|
|
|
Create an applicant.
|
|
|
|
"""
|
|
|
|
body = {"externalUserId": external_user_id}
|
|
|
|
params = {"levelName": level_name}
|
|
|
|
headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"}
|
|
|
|
resp = self.sign_request(
|
|
|
|
requests.Request(
|
|
|
|
"POST",
|
|
|
|
f"{settings.Verify.Base}/resources/applicants?levelName={level_name}",
|
|
|
|
params=params,
|
|
|
|
data=json.dumps(body),
|
|
|
|
headers=headers,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
s = requests.Session()
|
|
|
|
response = s.send(resp)
|
|
|
|
applicant_id = response.json()["id"]
|
|
|
|
return applicant_id
|
|
|
|
|
|
|
|
def get_access_token(self, external_user_id, level_name):
|
|
|
|
"""
|
|
|
|
Get an access token for an external user ID.
|
|
|
|
"""
|
|
|
|
params = {"userId": external_user_id, "ttlInSecs": "600", "levelName": level_name}
|
|
|
|
headers = {"Content-Type": "application/json", "Content-Encoding": "utf-8"}
|
|
|
|
resp = self.sign_request(
|
|
|
|
requests.Request("POST", f"{settings.Verify.Base}/resources/accessTokens", params=params, headers=headers)
|
|
|
|
)
|
|
|
|
s = requests.Session()
|
|
|
|
response = s.send(resp)
|
|
|
|
token = response.json()["token"]
|
|
|
|
|
|
|
|
return token
|
|
|
|
|
|
|
|
def sign_request(self, request: requests.Request) -> requests.PreparedRequest:
|
|
|
|
"""
|
|
|
|
Sign a request.
|
|
|
|
"""
|
|
|
|
prepared_request = request.prepare()
|
|
|
|
now = int(time.time())
|
|
|
|
method = request.method.upper()
|
|
|
|
path_url = prepared_request.path_url # includes encoded query params
|
|
|
|
# could be None so we use an empty **byte** string here
|
|
|
|
body = b"" if prepared_request.body is None else prepared_request.body
|
|
|
|
if type(body) == str:
|
|
|
|
body = body.encode("utf-8")
|
|
|
|
data_to_sign = str(now).encode("utf-8") + method.encode("utf-8") + path_url.encode("utf-8") + body
|
|
|
|
# hmac needs bytes
|
|
|
|
signature = hmac.new(settings.Verify.Key.encode("utf-8"), data_to_sign, digestmod=hashlib.sha256)
|
|
|
|
prepared_request.headers["X-App-Token"] = settings.Verify.Token
|
|
|
|
prepared_request.headers["X-App-Access-Ts"] = str(now)
|
|
|
|
prepared_request.headers["X-App-Access-Sig"] = signature.hexdigest()
|
|
|
|
return prepared_request
|