Implement LBTC authentication

This commit is contained in:
Mark Veidemanis 2022-04-12 22:05:46 +01:00
parent d6408d5200
commit 6dbed22e49
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
2 changed files with 86 additions and 104 deletions

View File

@ -20,7 +20,10 @@ __copyright__ = "(C) 2021 https://codeberg.org/MarvinsCryptoTools/agoradesk_py"
__version__ = "0.1.0" __version__ = "0.1.0"
# set logging # set logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logging.getLogger("requests.packages.urllib3").setLevel(logging.INFO) logging.getLogger("requests.packages.urllib3").setLevel(logging.INFO)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
logger = util.get_logger(__name__) logger = util.get_logger(__name__)
@ -618,7 +621,10 @@ class AgoraDesk:
return self._api_call( return self._api_call(
api_method="equation", api_method="equation",
http_method="POST", http_method="POST",
query_values={"price_equation": price_equation, "currency": currency}, query_values={
"price_equation": price_equation,
"currency": currency,
},
) )
# Public ad search related API Methods # Public ad search related API Methods
@ -1019,4 +1025,8 @@ class AgoraDesk:
if otp: if otp:
params["otp"] = otp params["otp"] = otp
return self._api_call(api_method="wallet-send/XMR", http_method="POST", query_values=params) return self._api_call(
api_method="wallet-send/XMR",
http_method="POST",
query_values=params,
)

View File

@ -29,101 +29,16 @@ __copyright__ = "(C) 2021 https://codeberg.org/MarvinsCryptoTools/agoradesk_py"
__version__ = "0.1.0" __version__ = "0.1.0"
# set logging # set logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logging.getLogger("requests.packages.urllib3").setLevel(logging.INFO) logging.getLogger("requests.packages.urllib3").setLevel(logging.INFO)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
logger = util.get_logger(__name__) logger = util.get_logger(__name__)
URI_API = "https://localbitcoins.com/api/" URI_API = "https://localbitcoins.com/api/"
SERVER = "https://localbitcoins.com"
def hmac(hmac_key, hmac_secret, server="https://localbitcoins.com"):
conn = Connection()
conn._set_hmac(server, hmac_key, hmac_secret)
return conn
class Connection:
def __init__(self):
self.server = None
# HMAC stuff
self.hmac_key = None
self.hmac_secret = None
def call(self, method, url, params=None, stream=False, files=None):
method = method.upper()
if method not in ["GET", "POST"]:
raise Exception("Invalid method {}!".format(method))
if method == "GET" and files:
raise Exception("You cannot send files with GET method!")
if files and not isinstance(files, dict):
raise Exception('"files" must be a dict of file objects or file contents!')
# If URL is absolute, then convert it
if url.startswith(self.server):
url = url[len(self.server) :] # noqa
if self.hmac_key:
# If nonce fails, retry several times, then give up
for retry in range(10):
nonce = str(int(time.time() * 1000)).encode("ascii")
# Prepare request based on method.
if method == "POST":
api_request = requests.Request("POST", self.server + url, data=params, files=files).prepare()
params_encoded = api_request.body
# GET method
else:
api_request = requests.Request("GET", self.server + url, params=params).prepare()
params_encoded = urlparse(api_request.url).query
# Calculate signature
message = nonce + self.hmac_key + url.encode("ascii")
if params_encoded:
if sys.version_info >= (3, 0) and isinstance(params_encoded, str):
message += params_encoded.encode("ascii")
else:
message += params_encoded
signature = hmac_lib.new(self.hmac_secret, msg=message, digestmod=hashlib.sha256).hexdigest().upper()
# Store signature and other stuff to headers
api_request.headers["Apiauth-Key"] = self.hmac_key
api_request.headers["Apiauth-Nonce"] = nonce
api_request.headers["Apiauth-Signature"] = signature
# Send request
session = requests.Session()
response = session.send(api_request, stream=stream)
# If HMAC Nonce is already used, then wait a little and try again
try:
response_json = response.json()
if int(response_json.get("error", {}).get("error_code")) == 42:
time.sleep(0.1)
continue
except: # noqa
# No JSONic response, or interrupt, better just give up
pass
return response
raise Exception("Nonce is too small!")
raise Exception("No HMAC connection initialized!")
def get_access_token(self):
return self.access_token
def _set_hmac(self, server, hmac_key, hmac_secret):
self.server = server
self.hmac_key = hmac_key.encode("ascii")
self.hmac_secret = hmac_secret.encode("ascii")
class LocalBitcoins: class LocalBitcoins:
@ -135,10 +50,18 @@ class LocalBitcoins:
# pylint: disable=too-many-public-methods # pylint: disable=too-many-public-methods
# API provides this many methods, I can't change that # API provides this many methods, I can't change that
def __init__(self, api_key: Optional[str], debug: Optional[bool] = False) -> None: def __init__(
self.api_key = "" self,
if api_key: hmac_key: Optional[str],
self.api_key = api_key hmac_secret: Optional[str],
debug: Optional[bool] = False,
) -> None:
self.hmac_key = ""
self.hmac_secret = ""
if hmac_key:
self.hmac_key = hmac_key.encode("ascii")
if hmac_secret:
self.hmac_secret = hmac_secret.encode("ascii")
self.debug = debug self.debug = debug
if self.debug: if self.debug:
@ -147,20 +70,62 @@ class LocalBitcoins:
else: else:
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
logger.debug("creating instance of LocalBitcoins API with api_key %s", self.api_key) logger.debug(
"creating instance of LocalBitcoins API with api_key %s",
self.hmac_key,
)
def sign_payload(self, nonce, url, params_encoded):
"""
Sign the payload with our HMAC keys.
"""
# Calculate signature
message = nonce + self.hmac_key + url.encode("ascii")
print("message", message)
if params_encoded:
if sys.version_info >= (3, 0) and isinstance(params_encoded, str):
message += params_encoded.encode("ascii")
print("after message appended", message)
else:
message += params_encoded
signature = hmac_lib.new(self.hmac_secret, msg=message, digestmod=hashlib.sha256).hexdigest().upper()
print("signature", signature)
return signature
def encode_params(self, api_method, api_call_url, query_values):
if api_method == "POST":
api_request = requests.Request("POST", api_call_url, data=query_values).prepare()
params_encoded = api_request.body
# GET method
else:
api_request = requests.Request("GET", api_call_url, params=query_values).prepare()
params_encoded = urlparse(api_request.url).query
return params_encoded
def _api_call( def _api_call(
self, self,
api_method: str, api_method: str,
http_method: Optional[str] = "GET", http_method: Optional[str] = "GET",
query_values: Optional[Dict[str, Any]] = None, query_values: Optional[Dict[str, Any]] = None,
files: Optional[Any] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
api_method += "/"
api_call_url = URI_API + api_method api_call_url = URI_API + api_method
url = api_call_url
if url.startswith(SERVER):
url = url[len(SERVER) :] # noqa
# HMAC crypto stuff
params_encoded = self.encode_params(api_method, api_call_url, query_values)
nonce = str(int(time.time() * 1000)).encode("ascii")
signature = self.sign_payload(nonce, url, params_encoded)
headers = { headers = {
"Content-Type": "application/json", "Apiauth-Key": self.hmac_key,
"User-Agent": f"localbitcoins_py/{__version__} " f"https://git.zm.is/Pathogen/pluto", "Apiauth-Nonce": nonce,
"Authorization": self.api_key, "Apiauth-Signature": signature,
} }
logger.debug("API Call URL: %s", api_call_url) logger.debug("API Call URL: %s", api_call_url)
@ -202,7 +167,7 @@ class LocalBitcoins:
result["message"] = "OK" result["message"] = "OK"
else: else:
result["message"] = "API ERROR" result["message"] = "API ERROR"
print("RESP", result)
return result return result
except httpx.ConnectError as error: except httpx.ConnectError as error:
result["message"] = str(error) result["message"] = str(error)
@ -716,7 +681,10 @@ class LocalBitcoins:
return self._api_call( return self._api_call(
api_method="equation", api_method="equation",
http_method="POST", http_method="POST",
query_values={"price_equation": price_equation, "currency": currency}, query_values={
"price_equation": price_equation,
"currency": currency,
},
) )
# Public ad search related API Methods # Public ad search related API Methods
@ -883,4 +851,8 @@ class LocalBitcoins:
if pincode: if pincode:
params["pincode"] = pincode params["pincode"] = pincode
return self._api_call(api_method="wallet-send-pin", http_method="POST", query_values=params) return self._api_call(
api_method="wallet-send-pin",
http_method="POST",
query_values=params,
)