1. 1
  1. Implementing a Python OAuth 2.0 Provider - Part 2 - Authorization Provider

    Last week, I covered the Basics of the OAuth 2.0 Authorization Flow. Today, I will walk through how we used pyoauth2 to set up a minimal Authorization Provider for SHIFT. This post covers setting up endpoints for steps 2 and 5 from the overview.

    The role of the Authorization Provider is to securely generate, validate, and store authorization codes, access tokens, and refresh tokens.

    Routes

    Before getting started, it’s helpful to set up the routes used for OAuth for both a test CustomApp and your application, and have a staging environment to test in. Due to the need for SSL, it is complicated, but not impossible, to test locally.

    The routes for OAuth 2 using pyoauth2 are extremely simple; just forward enough information for pyoauth2 to do it’s job and convert the response to a type appropriate for your application framework. We’ll create an authentication route and a token exchange route, showing code for Flask:

    import flask
    from flask import request
    from app import app
    
    # This class will be defined later in this post
    from shift.oauth.provider import ShiftAuthorizationProvider
    
    # Authorization Code
    # Returns a redirect header on success
    @app.route("/v1/oauth2/auth", methods=["GET"])
    def authorization_code():
    
        # You can cache this instance for efficiency
        provider = ShiftAuthorizationProvider()
    
        # This is the important line
        response = provider.get_authorization_code_from_uri(request.url)
    
        # For maximum compatibility, a standard Response object is provided
        # Response has the following properties:
        # 
        #     response.status_code        int
        #     response.text               response body
        #     response.headers            iterable dict-like object with keys and values
        #
        # This response must be converted to a type that your application
        # framework can use and returned.
        flask_res = flask.make_response(response.text, response.status_code)
        for k, v in response.headers.iteritems():
            flask_res.headers[k] = v
        return flask_res
    
    # Token exchange
    # Returns JSON token information on success
    @app.route("/v1/oauth2/token", methods=["POST"])
    def token():
    
        # You can cache this instance for efficiency
        provider = ShiftAuthorizationProvider()
    
        # Get a dict of POSTed form data
        data = {k: request.form[k] for k in request.form.iterkeys()}
    
        # This is the important line
        response = provider.get_token_from_post_data(data)
    
        # The same Response object is provided, and must be converted
        # to a type that your application framework can use and returned.
        flask_res = flask.make_response(response.text, response.status_code)
        for k, v in response.headers.iteritems():
            flask_res.headers[k] = v
        return flask_res
    

    Ensure that your basic routing is set up properly and returning dummy results before continuing. A good REST client like Postman for Google Chrome will be extremely useful for testing.

    Authorization Provider

    The provider needs to perform many small, independent tasks, mostly validation of important data points, but also storing and retrieving session and token information.

    > Note: All of the major functionality of the authorization provider is implemented in a single class, which I will break out throughout this post. I’ll call this class ShiftAuthorizationProvider. This class acts as a bridge between pyoauth2 and your application logic.

    To start, let’s validate the passed client (CustomApp) information, including the client ID, secret, and redirect_uri. Assume the Application class is a simple ORM. In our case, it’s a MongoEngine model, but should be whatever database you are currently using. All of the validation methods need to return True or False, and a False value will cause the authorization to fail immediately.

    import json
    import re
    from flask import request, session
    from models import Application
    from pyoauth2.provider import AuthorizationProvider
    
    class ShiftAuthorizationProvider(AuthorizationProvider):
    
        def validate_client_id(self, client_id):
            """Check that the client_id represents a valid application.
    
            :param client_id: Client id.
            :type client_id: str
            """
            return Application.find(client_id) is not None
    
        def validate_client_secret(self, client_id, client_secret):
            """Check that the client secret matches the application secret.
    
            :param client_id: Client Id.
            :type client_id: str
            :param client_secret: Client secret.
            :type client_secret: str
            """
            app = Application.find(client_id)
            if app is not None and app.secret == client_secret:
                return True
            return False
    
        def validate_redirect_uri(self, client_id, redirect_uri):
            """Validate that the redirect_uri requested is available for the app.
    
            :param redirect_uri: Redirect URI.
            :type redirect_uri: str
            """
            app = Application.find(client_id)
    
            # When matching against a redirect_uri, it is very important to 
            # ignore the query parameters, or else this step will fail as the 
            # parameters change with every request
            if app is not None and app.redirect_uri == redirect_uri.split('?')[0]:
                return True
            return False
    

    Authorization request validation

    The validate_access method lets pyoauth2 know whether or not a user is logged into your application, and can thus have an access token generated. If someone tries to authenticate to CustomApp without being logged in, the authentication will fail.

    Scope in the context of OAuth represents the categories of access you are granting to the remote app. You can use this to control which third-party applications are whitelisted for different sections of your application. I won’t cover implementing custom scopes in this post, so let’s just return True when no scope is requested.

        def validate_access(self):
            """Validate that an OAuth token can be generated from the
            current session."""
            return session.user is not None
    
        def validate_scope(self, client_id, scope):
            """Validate that the scope requested is available for the app.
    
            :param client_id: Client id.
            :type client_id: str
            :param scope: Requested scope.
            :type scope: str
            """
            return True if scope == "" else False
    

    Token and Code Persistence

    When an authorization code is requested, the request is persisted, and should be valid for a short amount of time. Access tokens are saved for a longer period, and refresh tokens are stored permanently. I’ll assume a StrictRedis client resides in the variable self.redis.

    The data parameter is constructed according to your needs in persist_authorization_code, and only passed through to persist_token_information when the token request succeeds. You should not need to modify data in persist_token_information.

    def persist_authorization_code(self, client_id, code, scope):
            """Store important session information (user_id) along with the
            authorization code to later allow an access token to be created.
    
            :param client_id: Client Id.
            :type client_id: str
            :param code: Authorization code.
            :type code: str
            :param scope: Scope.
            :type scope: str
            """
            key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
    
            # Store any information about the current session that is needed
            # to later authenticate the user.
            data = {'client_id': client_id,
                    'scope': scope,
                    'user_id': session.user.id}
    
            # Authorization codes expire in 1 minute
            self.redis.setex(key, 60, json.dumps(data))
    
        def persist_token_information(self, client_id, scope, access_token,
                                      token_type, expires_in, refresh_token,
                                      data):
            """Save OAuth access and refresh token information.
    
            :param client_id: Client Id.
            :type client_id: str
            :param scope: Scope.
            :type scope: str
            :param access_token: Access token.
            :type access_token: str
            :param token_type: Token type (currently only Bearer)
            :type token_type: str
            :param expires_in: Access token expiration seconds.
            :type expires_in: int
            :param refresh_token: Refresh token.
            :type refresh_token: str
            :param data: Data from authorization code grant.
            :type data: mixed
            """
    
            # Set access token with proper expiration
            access_key = 'oauth2.access_token:%s' % access_token
            self.redis.setex(access_key, expires_in, json.dumps(data))
    
            # Set refresh token with no expiration
            refresh_key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
            self.redis.set(refresh_key, json.dumps(data))
    
            # Associate tokens to user for easy token revocation per app user
            key = 'oauth2.client_user.%s:%s' % (client_id, data.get('user_id'))
            self.redis.sadd(key, access_key, refresh_key)
    

    Token and Code Loading

    Just as we persisted OAuth information above, we need to implement the loading of the data dictionary from an authorization code (initial OAuth flow) and refresh token (application automatic refresh of access token).

        def from_authorization_code(self, client_id, code, scope):
            """Get session data from authorization code.
    
            :param client_id: Client ID.
            :type client_id: str
            :param code: Authorization code.
            :type code: str
            :param scope: Scope to validate.
            :type scope: str
            :rtype: dict if valid else None
            """
            key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
            data = self.redis.get(key)
            if data is not None:
                data = json.loads(data)
    
                # Validate scope and client_id
                if (scope == '' or scope == data.get('scope')) and \
                    data.get('client_id') == client_id:
                    return data
    
            return None  # The OAuth authorization will fail at this point
    
        def from_refresh_token(self, client_id, refresh_token, scope):
            """Get session data from refresh token.
    
            :param client_id: Client Id.
            :type client_id: str
            :param refresh_token: Refresh token.
            :type refresh_token: str
            :param scope: Scope to validate.
            :type scope: str
            :rtype: dict if valid else None
            """
            key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
            data = self.redis.get(key)
            if data is not None:
                data = json.loads(data)
    
                # Validate scope and client_id
                if (scope == '' or scope == data.get('scope')) and \
                    data.get('client_id') == client_id:
                    return data
    
            return None  # The OAuth token refresh will fail at this point
    

    Authorization Code and Token Cleanup

    The last essential part of the OAuth flow is discarding unneeded or invalid OAuth data. This keeps your application secure, and eliminates expired or used tokens. If this step is not properly implemented, old access or refresh tokens could be used to compromise your application. These methods will be automatically called by pyoauth2 as appropriate.

        def discard_authorization_code(self, client_id, code):
            """Delete authorization code from the store.
    
            :param client_id: Client Id.
            :type client_id: str
            :param code: Authorization code.
            :type code: str
            """
            key = 'oauth2.authorization_code.%s:%s' % (client_id, code)
            self.redis.delete(key)
    
        def discard_refresh_token(self, client_id, refresh_token):
            """Delete refresh token from the store.
    
            :param client_id: Client Id.
            :type client_id: str
            :param refresh_token: Refresh token.
            :type refresh_token: str
    
            """
            key = 'oauth2.refresh_token.%s:%s' % (client_id, refresh_token)
            self.redis.delete(key)
    
        def discard_client_user_tokens(self, client_id, user_id):
            """Delete access and refresh tokens from the store.
    
            :param client_id: Client Id.
            :type client_id: str
            :param user_id: User Id.
            :type user_id: str
    
            """
            keys = 'oauth2.client_user.%s:%s' % (client_id, user_id)
            pipe = self.redis.pipeline()
            for key in self.redis.smembers(keys):
                pipe.delete(key)
            pipe.execute()
    

    You should now have a working OAuth application! Test the routes manually or by configuring a test application. So far, we have only an Authorization Provider, but no Resource Provider. The Resource Provider will handle a request for a protected resource and respond if the access token checks out.

    The next and final post in this series will cover the remaining steps needed to complete the OAuth protected resource lifecycle.

    comments powered by Disqus
    1. olizax reblogged this from shiftdevs
    2. vadimii reblogged this from shiftdevs
    3. shiftdevs posted this