Last week, I covered the Basics of the OAuth 2.0 Authorization Flow. Today, I will walk through how we used pyoauth2 to set up a minimal Authorization Provider for SHIFT. This post covers setting up endpoints for steps 2 and 5 from the overview.
The role of the Authorization Provider is to securely generate, validate, and store authorization codes, access tokens, and refresh tokens.
Routes
Before getting started, it’s helpful to set up the routes used for OAuth for both a test CustomApp and your application, and have a staging environment to test in. Due to the need for SSL, it is complicated, but not impossible, to test locally.
The routes for OAuth 2 using pyoauth2 are extremely simple; just forward enough information for pyoauth2 to do it’s job and convert the response to a type appropriate for your application framework. We’ll create an authentication route and a token exchange route, showing code for Flask:
import flask
from flask import request
from app import app
# This class will be defined later in this post
from shift.oauth.provider import ShiftAuthorizationProvider
# Authorization Code
# Returns a redirect header on success
@app.route("/v1/oauth2/auth", methods=["GET"])
def authorization_code():
# You can cache this instance for efficiency
provider = ShiftAuthorizationProvider()
# This is the important line
response = provider.get_authorization_code_from_uri(request.url)
# For maximum compatibility, a standard Response object is provided
# Response has the following properties:
#
# response.status_code int
# response.text response body
# response.headers iterable dict-like object with keys and values
#
# This response must be converted to a type that your application
# framework can use and returned.
flask_res = flask.make_response(response.text, response.status_code)
for k, v in response.headers.iteritems():
flask_res.headers[k] = v
return flask_res
# Token exchange
# Returns JSON token information on success
@app.route("/v1/oauth2/token", methods=["POST"])
def token():
# You can cache this instance for efficiency
provider = ShiftAuthorizationProvider()
# Get a dict of POSTed form data
data = {k: request.form[k] for k in request.form.iterkeys()}
# This is the important line
response = provider.get_token_from_post_data(data)
# The same Response object is provided, and must be converted
# to a type that your application framework can use and returned.
flask_res = flask.make_response(response.text, response.status_code)
for k, v in response.headers.iteritems():
flask_res.headers[k] = v
return flask_res
Ensure that your basic routing is set up properly and returning dummy results before continuing. A good REST client like Postman for Google Chrome will be extremely useful for testing.
Authorization Provider
The provider needs to perform many small, independent tasks, mostly validation of important data points, but also storing and retrieving session and token information.
> Note: All of the major functionality of the authorization provider is implemented in a single class, which I will break out throughout this post. I’ll call this class ShiftAuthorizationProvider. This class acts as a bridge between pyoauth2 and your application logic.
To start, let’s validate the passed client (CustomApp) information, including the client ID, secret, and redirect_uri. Assume the Application class is a simple ORM. In our case, it’s a MongoEngine model, but should be whatever database you are currently using. All of the validation methods need to return True or False, and a False value will cause the authorization to fail immediately.
import json
import re
from flask import request, session
from models import Application
from pyoauth2.provider import AuthorizationProvider
class ShiftAuthorizationProvider(AuthorizationProvider):
def validate_client_id(self, client_id):
"""Check that the client_id represents a valid application.
:param client_id: Client id.
:type client_id: str
"""
return Application.find(client_id) is not None
def validate_client_secret(self, client_id, client_secret):
"""Check that the client secret matches the application secret.
:param client_id: Client Id.
:type client_id: str
:param client_secret: Client secret.
:type client_secret: str
"""
app = Application.find(client_id)
if app is not None and app.secret == client_secret:
return True
return False
def validate_redirect_uri(self, client_id, redirect_uri):
"""Validate that the redirect_uri requested is available for the app.
:param redirect_uri: Redirect URI.
:type redirect_uri: str
"""
app = Application.find(client_id)
# When matching against a redirect_uri, it is very important to
# ignore the query parameters, or else this step will fail as the
# parameters change with every request
if app is not None and app.redirect_uri == redirect_uri.split('?')[0]:
return True
return False
Authorization request validation
The validate_access method lets pyoauth2 know whether or not a user is logged into your application, and can thus have an access token generated. If someone tries to authenticate to CustomApp without being logged in, the authentication will fail.
Scope in the context of OAuth represents the categories of access you are granting to the remote app. You can use this to control which third-party applications are whitelisted for different sections of your application. I won’t cover implementing custom scopes in this post, so let’s just return True when no scope is requested.
def validate_access(self):
"""Validate that an OAuth token can be generated from the
current session."""
return session.user is not None
def validate_scope(self, client_id, scope):
"""Validate that the scope requested is available for the app.
:param client_id: Client id.
:type client_id: str
:param scope: Requested scope.
:type scope: str
"""
return True if scope == "" else False
Token and Code Persistence
When an authorization code is requested, the request is persisted, and should be valid for a short amount of time. Access tokens are saved for a longer period, and refresh tokens are stored permanently. I’ll assume a StrictRedis client resides in the variable self.redis.
The data parameter is constructed according to your needs in persist_authorization_code, and only passed through to persist_token_information when the token request succeeds. You should not need to modify data in persist_token_information.
def persist_authorization_code(self, client_id, code, scope):
"""Store important session information (user_id) along with the
authorization code to later allow an access token to be created.
:param client_id: Client Id.
:type client_id: str
:param code: Authorization code.
:type code: str
:param scope: Scope.
:type scope: str
"""
key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
# Store any information about the current session that is needed
# to later authenticate the user.
data = {'client_id': client_id,
'scope': scope,
'user_id': session.user.id}
# Authorization codes expire in 1 minute
self.redis.setex(key, 60, json.dumps(data))
def persist_token_information(self, client_id, scope, access_token,
token_type, expires_in, refresh_token,
data):
"""Save OAuth access and refresh token information.
:param client_id: Client Id.
:type client_id: str
:param scope: Scope.
:type scope: str
:param access_token: Access token.
:type access_token: str
:param token_type: Token type (currently only Bearer)
:type token_type: str
:param expires_in: Access token expiration seconds.
:type expires_in: int
:param refresh_token: Refresh token.
:type refresh_token: str
:param data: Data from authorization code grant.
:type data: mixed
"""
# Set access token with proper expiration
access_key = 'oauth2.access_token:%s' % access_token
self.redis.setex(access_key, expires_in, json.dumps(data))
# Set refresh token with no expiration
refresh_key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
self.redis.set(refresh_key, json.dumps(data))
# Associate tokens to user for easy token revocation per app user
key = 'oauth2.client_user.%s:%s' % (client_id, data.get('user_id'))
self.redis.sadd(key, access_key, refresh_key)
Token and Code Loading
Just as we persisted OAuth information above, we need to implement the loading of the data dictionary from an authorization code (initial OAuth flow) and refresh token (application automatic refresh of access token).
def from_authorization_code(self, client_id, code, scope):
"""Get session data from authorization code.
:param client_id: Client ID.
:type client_id: str
:param code: Authorization code.
:type code: str
:param scope: Scope to validate.
:type scope: str
:rtype: dict if valid else None
"""
key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
data = self.redis.get(key)
if data is not None:
data = json.loads(data)
# Validate scope and client_id
if (scope == '' or scope == data.get('scope')) and \
data.get('client_id') == client_id:
return data
return None # The OAuth authorization will fail at this point
def from_refresh_token(self, client_id, refresh_token, scope):
"""Get session data from refresh token.
:param client_id: Client Id.
:type client_id: str
:param refresh_token: Refresh token.
:type refresh_token: str
:param scope: Scope to validate.
:type scope: str
:rtype: dict if valid else None
"""
key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
data = self.redis.get(key)
if data is not None:
data = json.loads(data)
# Validate scope and client_id
if (scope == '' or scope == data.get('scope')) and \
data.get('client_id') == client_id:
return data
return None # The OAuth token refresh will fail at this point
Authorization Code and Token Cleanup
The last essential part of the OAuth flow is discarding unneeded or invalid OAuth data. This keeps your application secure, and eliminates expired or used tokens. If this step is not properly implemented, old access or refresh tokens could be used to compromise your application. These methods will be automatically called by pyoauth2 as appropriate.
def discard_authorization_code(self, client_id, code):
"""Delete authorization code from the store.
:param client_id: Client Id.
:type client_id: str
:param code: Authorization code.
:type code: str
"""
key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
self.redis.delete(key)
def discard_refresh_token(self, client_id, refresh_token):
"""Delete refresh token from the store.
:param client_id: Client Id.
:type client_id: str
:param refresh_token: Refresh token.
:type refresh_token: str
"""
key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
self.redis.delete(key)
def discard_client_user_tokens(self, client_id, user_id):
"""Delete access and refresh tokens from the store.
:param client_id: Client Id.
:type client_id: str
:param user_id: User Id.
:type user_id: str
"""
keys = 'oauth2.client_user.%s:%s' % (client_id, user_id)
pipe = self.redis.pipeline()
for key in self.redis.smembers(keys):
pipe.delete(key)
pipe.execute()
You should now have a working OAuth application! Test the routes manually or by configuring a test application. So far, we have only an Authorization Provider, but no Resource Provider. The Resource Provider will handle a request for a protected resource and respond if the access token checks out.
The next and final post in this series will cover the remaining steps needed to complete the OAuth protected resource lifecycle.