# -*- coding: utf-8 -*-
"""
xled.auth
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Custom authentication handler and authenticated session to be used with
:class:`requests.Session`
"""
from __future__ import absolute_import
import base64
import logging
import time
import requests
from requests import Request
from requests.auth import AuthBase
from requests.compat import urlparse
from requests_toolbelt.sessions import BaseUrlSession
import xled.security
from xled.exceptions import (
ApplicationError,
AuthenticationError,
TokenExpiredError,
ValidationError,
)
from xled.response import ApplicationResponse
from xled.security import generate_challenge
log = logging.getLogger(__name__)
AUTH_HEADER_NAME = "X-Auth-Token"
[docs]class ChallengeResponseAuth(AuthBase):
def __init__(self, login_url, verify_url, hw_address=None):
self.login_url = login_url
self.verify_url = verify_url
self.hw_address = hw_address
# populated after first request
self.challenge = None
self.challenge_response = None
self.authentication_token = None
[docs] def validate_challenge_response(self):
if not self.hw_address:
msg = (
"validate_challenge_response(): Cannot verify "
"challenge-response without HW address."
)
log.warning(msg)
return None
expected = xled.security.make_challenge_response(
self.challenge, self.hw_address
)
if expected != self.challenge_response:
msg = (
"validate_challenge_response(): login sent "
"challenge-response: %r. But %r was expected."
)
log.error(msg, self.challenge_response, expected)
return False
msg = "validate_challenge_response(): challenge-response is correct."
log.debug(msg)
return True
[docs] def send_challenge(self, response, challenge):
host = urlparse(response.url).hostname
schema = urlparse(response.url).scheme
url = "{schema}://{host}{login_url}".format(
schema=schema, host=host, login_url=self.login_url
)
b64_challenge = base64.b64encode(challenge).decode("utf-8")
body = {"challenge": b64_challenge}
r2 = requests.Request(method="POST", url=url, json=body)
prep = r2.prepare()
_r = response.connection.send(prep)
if _r.status_code != 200:
msg = "send_challenge(): login status code: %s"
log.error(msg, _r.status_code)
return False
content = _r.json()
if content[u"code"] != 1000:
msg = "send_challenge(): login return code: %s"
log.error(msg, content[u"code"])
return False
self.challenge_response = content["challenge-response"]
self.authentication_token = content["authentication_token"]
return True
[docs] def send_challenge_response(self, response):
host = urlparse(response.url).hostname
schema = urlparse(response.url).scheme
url = "{schema}://{host}{verify_url}".format(
schema=schema, host=host, verify_url=self.verify_url
)
headers = {"X-Auth-Token": self.authentication_token}
body = {u"challenge-response": self.challenge_response}
r2 = requests.Request(method="POST", url=url, headers=headers, json=body)
prep = r2.prepare()
_r = response.connection.send(prep)
if _r.status_code != 200:
return False
return True
[docs] def authenticate(self, response, **kwargs):
"""Handles user authentication with challenge-response"""
# Consume the content so we can reuse the connection for the next
# request.
response.content
response.raw.release_conn()
challenge = xled.security.generate_challenge()
log.debug("authenticate(): Challenge: %s", repr(challenge))
login_successfull = self.send_challenge(response, challenge)
if not login_successfull:
return response
cr_correct = self.validate_challenge_response()
if cr_correct is False:
return response
verify_successfull = self.send_challenge_response(response)
if not verify_successfull:
return response
response.request.headers["X-Auth-Token"] = self.authentication_token
_r = response.connection.send(response.request, **kwargs)
_r.history.append(response)
log.debug("authenticate(): returning %s", _r)
return _r
[docs] def handle_401(self, response, **kwargs):
"""
Handles 401's, attempts to use challenge-response authentication
"""
log.debug("handle_401(): Handling: 401")
_r = self.authenticate(response, **kwargs)
log.debug("handle_401(): returning %s", _r)
return _r
[docs] def handle_response(self, response, **kwargs):
"""
Takes the given response and tries challenge-auth, as needed.
"""
num_401s = kwargs.pop("num_401s", 0)
# If response is not 4xx, do not auth
if not 400 <= response.status_code < 500:
log.debug(
"handle_response(): Not authenticating request because status is %s",
response.status_code,
)
return response
if response.status_code == 401 and num_401s < 2:
# 401 Unauthorized. Handle it, and if it still comes back as 401,
# that means authentication failed.
_r = self.handle_401(response, **kwargs)
log.debug("handle_response(): returning %s", _r)
log.debug("handle_response() has seen %d 401 responses", num_401s)
num_401s += 1
return self.handle_response(_r, num_401s=num_401s, **kwargs)
elif response.status_code == 401 and num_401s >= 2:
# Still receiving 401 responses after attempting to handle them.
# Authentication has failed. Return the 401 response.
log.debug("handle_response(): returning 401 %s", response)
return response
[docs] def deregister(self, response):
"""
Deregisters the response handler
"""
response.request.deregister_hook("response", self.handle_response)
def __call__(self, request):
if self.authentication_token:
log.debug(
"Adding authentication token %s to request", self.authentication_token
)
request.headers["X-Auth-Token"] = self.authentication_token
request.register_hook("response", self.handle_response)
try:
self.pos = request.body.tell()
except AttributeError:
# In the case of ChallengeResponseAuth being reused and the body
# of the previous request was a file-like object, pos has
# the file position of the previous body. Ensure it's set to
# None.
self.pos = None
return request
[docs]class BaseUrlChallengeResponseAuthSession(BaseUrlSession):
"""Extension to :class:`requests_toolbelt.BaseUrlSession` to provide authentication.
Any request used with this session gets authentication token added.
Authentication token can be fetched even separately.
"""
def __init__(self, hw_address=None, client=None, auto_refresh_token=True, **kwargs):
"""Construct a new client session.
:param str hw_address: Hardware address of server. Used to validation during
login phase.
:param client: Object with :class:`ClientApplication` interface.
:param bool auto_refresh_token: (optional) if token is found expired
automatically request new one.
:param kwargs: Arguments to pass to the BaseUrlSession initializer.
Most useful is `base_url`.
"""
self.hw_address = hw_address
self.client = client or ClientApplication()
self.auto_refresh_token = auto_refresh_token
super(BaseUrlChallengeResponseAuthSession, self).__init__(**kwargs)
[docs] def prepare_request_challenge(self):
"""Creates prepared request to send challenge
:return: prepared request
:rtype: requests.PreparedRequest
"""
request = Request("POST", self.challenge_url)
prepped = self.prepare_request(request)
request = self.client.prepare_request_challenge(prepped)
return request
[docs] def prepare_request_verify(self):
"""Creates prepared request to send verification
:return: prepared request
:rtype: requests.PreparedRequest
"""
request = Request("POST", self.verify_url)
prepped = self.prepare_request(request)
request = self.client.prepare_request_verify(prepped)
return request
@property
def challenge_url(self):
"""Full URL of login endpoint
:return: String with full url
:rtype: str
"""
return self.create_url("login")
@property
def verify_url(self):
"""Full URL of verify endpoint
:return: Full URL.
:rtype: str
"""
return self.create_url("verify")
[docs] def fetch_token(self):
"""Main authentication method that fetches new token
:return: Token as string.
:rtype: str
"""
prepared = self.prepare_request_challenge()
response = self.send(prepared)
self.client.parse_response_challenge(response)
self.client.challenge_response_valid(self.hw_address)
prepared = self.prepare_request_verify()
response = self.send(prepared)
self.client.parse_response_verify(response)
return self.client.authentication_token
[docs] def add_token(self, headers=None):
"""Adds token header to dictionary with headers
:param dict headers: Optional initial dictionary with headers.
:return: Dict with added authentication header.
:rtype: dict
:raises TokenExpiredError: If token is expected to be expired.
"""
assert self.client.authentication_token
if self.client.token_expired:
raise TokenExpiredError()
headers = headers or {}
headers[AUTH_HEADER_NAME] = self.access_token
return headers
@property
def authorized(self):
"""Boolean that indicates whether this session has an ChallengeResponse
token or not. If `self.authorized` is True, you can reasonably expect
ChallengeResponse-protected requests to the resource to succeed. If
`self.authorized` is False, you need the user to go through the
ChallengeResponse authentication dance before
ChallengeResponse-protected requests to the resource will succeed.
:rtype: bool
"""
return bool(self.access_token)
@property
def access_token(self):
"""Current authentication token if exists. None if it wasn't fetched yet."""
return getattr(self.client, "authentication_token", None)
@access_token.setter
def access_token(self, value):
self.client.authentication_token = value
[docs] def request(self, method, url, headers=None, withhold_token=False, **kwargs):
"""Main request method of the session
Adds authentication to method from
:class:`requests_toolbelt.BaseUrlSession`. Takes auto_refresh_token in
mind.
:param dict withhold_token: If boolean is True authentication token
isn't added to the request.
:rtype: requests.Response
"""
for attempt in range(2):
if not withhold_token:
headers = self.add_authorization(headers)
log.debug("Requesting url %s using method %s.", url, method)
log.debug("Supplying headers %s", headers)
log.debug("Passing through key word arguments %s.", kwargs)
response = super(BaseUrlChallengeResponseAuthSession, self).request(
method, url, headers=headers, **kwargs
)
if response.status_code == 401:
if withhold_token:
log.warning(
"Unexpected HTTP status code 401 to request without added token. Maybe a token is needed for this endpoint?"
)
# Try again, if this was transient issue
continue
log.warning(
"Unexpected HTTP status code 401 to request with added token."
)
self.access_token = False
log.debug("Token invalidated.")
else:
break
return response
[docs] def add_authorization(self, headers):
"""Returns headers with added authorization
:param dict headers: user supplied request headers
:rtype: dict
"""
if not self.authorized:
self.fetch_token()
if self.access_token:
for attempt in range(2):
try:
headers = self.add_token(headers)
except TokenExpiredError:
if not self.auto_refresh_token:
raise
log.debug("Auto refresh token is set, attempting to refresh.")
self.fetch_token()
if not self.access_token:
log.error("Failed to refresh token.")
raise AuthenticationError()
else:
break
else:
log.error("Failed to add token.")
raise AuthenticationError()
return headers
[docs]class ValidatingClientMixin(object):
"""Mixin adds functionality to :class:`ClientApplication` to authenticate server"""
[docs] def challenge_response_valid(self, hw_address=None):
"""Verifies server with hardware address returned correct challenge response
Creates challenge-response for server's hardware address, challenge and
shared password and compares it with stored challenge-response.
:param str hw_address: Hardware address of a server.
:return: If challenge-response is valid returns True. If it cannot be
verified returns None.
:rtype: bool or None
:raises ValidationError: if chalenge-response is invalid
"""
if not hw_address:
msg = "Can not validate challenge-response without hw_address."
log.debug(msg)
return None
expected = xled.security.make_challenge_response(self._challenge, hw_address)
if expected != self._challenge_response:
msg = (
"challenge-response invalid. "
"Received challenge-response: %r but %r was expected."
)
log.error(msg, self._challenge_response, expected)
raise ValidationError()
msg = "challenge-response is correct."
log.debug(msg)
return True
[docs]class ClientApplication(ValidatingClientMixin):
def __init__(self, challenge=None):
self.authentication_token = None
self.expires_at = None
self.challenge = challenge or generate_challenge
self._challenge = challenge
self._challenge = None
self._authentication_token = None
self._challenge_response = None
self._expires_in = None
[docs] def new_challenge(self):
"""Generates a challenge string to be used in authorizations."""
try:
self._challenge = self.challenge()
log.debug("Generated new challenge %r.", self._challenge)
except TypeError:
self._challenge = self.challenge
log.debug("Re-using previously supplied challenge %s.", self._challenge)
return self._challenge
@property
def token_expired(self):
if self.expires_at and self.expires_at < time.time():
log.info("Token has expired.")
return True
return False
@property
def token_valid(self):
return self.authentication_token and not self.token_expired
def _add_token(self, headers=None):
"""Adds authentication token that haven't been verified yet to headers"""
assert self._authentication_token
headers = headers or {}
headers[AUTH_HEADER_NAME] = self._authentication_token
return headers
[docs] def prepare_request_challenge(self, request):
"""Modifies prepared request so it can be sent to login
:param: requests.PreparedRequest request prepared request to modify
:return: Modified prepared request
:rtype: requests.PreparedRequest
"""
challenge = self.new_challenge()
self._authentication_token = None
self._challenge_response = None
self._expires_in = None
b64_challenge = base64.b64encode(challenge).decode("utf-8")
request.prepare_body(None, None, json={"challenge": b64_challenge})
return request
[docs] def populate_token_attributes(self, response):
"""Fetches token attributes from application response
:param: app_response response Response from login endpoint.
:type: application_response :class:`~xled.response.ApplicationResponse`
"""
if "authentication_token" in response:
self._authentication_token = response.get("authentication_token")
if "challenge-response" in response:
self._challenge_response = response.get("challenge-response")
if "authentication_token_expires_in" in response:
self._expires_in = response.get("authentication_token_expires_in")
self.expires_at = time.time() + int(self._expires_in)
[docs] def parse_response_challenge(self, response, **kwargs):
"""Modifies prepared request so challenge can be sent to login
:param: requests.PreparedRequest response prepared request
:return: Modified prepared request
:rtype: requests.PreparedRequest
:raises AuthenticationError: if application response isn't valid
"""
app_response = ApplicationResponse(response)
try:
app_response.raise_for_status()
except ApplicationError:
log.error("Login failed: %r", app_response.data)
raise AuthenticationError()
self.populate_token_attributes(app_response)
log.debug("Got token: %s", self._authentication_token)
return response
[docs] def prepare_request_verify(self, request):
"""Modifies prepared request so it can be sent to verify challenge
:param: requests.PreparedRequest request prepared request to modify
:return: Modified prepared request
:rtype: requests.PreparedRequest
"""
assert self._authentication_token
assert self._challenge_response
token_headers = self._add_token()
request.prepare_headers(token_headers)
request.prepare_body(
None, None, json={"challenge_response": self._challenge_response}
)
return request
[docs] def parse_response_verify(self, response, **kwargs):
"""Process response from verify call
This is last step to be able to use token to authenticate.
:param: requests.Response response Response to process.
:return: Same response that was used as parameter
:rtype: requests.Response
:raises AuthenticationError: if application response isn't valid
"""
app_response = ApplicationResponse(response)
try:
app_response.raise_for_status()
except ApplicationError:
log.error("Verify failed")
return AuthenticationError()
self._challenge_response = None
self.authentication_token = self._authentication_token
self._authentication_token = None
return response