Lucas Simon

Web Developer. lucassrod@gmail.com

Série API em Flask - Parte 12 - Autênticação por JWT

O JWT é um dos meios mais comuns de autorização entre aplicações web e SPA’s desenvolvidos com ReactJS ou outros frameworks JavaScript. Por ser um meio fácil de conter informações o JWT pode ter dados do usuário e um tempo de expiração.

Para entendimento, segue os capítulos que serão abordados.

O repositório com todo o código fonte esta aqui. Os capítulos estão em branches.

Adição e configurações do JWT

Nesse projeto existem dois pacotes que irei utilizar. O primeiro é o PyJWT e a extensão do flask Flask-JWT-Extended.

Vamos editar os nossos requeriments/base.txt e adicionar esses dois pacotes.

Flask-JWT-Extended==3.13.1
PyJWT==1.6.4

Em seguida pip install -r requirements/base.txt

Feito isso vamos colocar algumas configurações iniciais em nosso .env.

JWT_ACCESS_TOKEN_EXPIRES=20
JWT_REFRESH_TOKEN_EXPIRES=30

E em nossa classe de configurações do flask, Config, em config.py.

# ...

from datetime import timedelta


class Config:
    SECRET_KEY = getenv('SECRET_KEY') or 'uma string randômica e gigante'
    APP_PORT = int(getenv('APP_PORT'))
    DEBUG = eval(getenv('DEBUG').title())
    MONGODB_HOST = getenv('MONGODB_URI')
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(
        minutes=int(getenv('JWT_ACCESS_TOKEN_EXPIRES'))
    )
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(
        days=int(getenv('JWT_REFRESH_TOKEN_EXPIRES'))
    )

# ...

Para fins de segurança é importante setar um tempo maximo para expirar um Token de 20 minutos que é o tempo médio que um usuário permanece logado. Acima disso qualquer um que conseguir um token válido poderá entrar na conta do usuário e fazer algumas bagunças no perfil.

Crie novas mensagens em nosso apps/messages.py

MSG_TOKEN_CREATED = 'Token criado.'
MSG_INVALID_CREDENTIALS = 'As credenciais estão inválidas para log in.'
MSG_TOKEN_EXPIRED = 'Token expirou.'
MSG_PERMISSION_DENIED = 'Permissão negada.'

Crie uma nova resposta para nossas requisições no apps/response.py

# ...
from .messages import MSG_ALREADY_EXISTS, MSG_PERMISSION_DENIED
# ...

def resp_notallowed_user(resource: str, msg: str = MSG_PERMISSION_DENIED):

    if not isinstance(resource, str):
        raise ValueError('Recurso precisa ser uma string')

    resp = jsonify({
        'status': 401,
        'resource': resource,
        'message': msg
    })

    resp.status_code = 401

    return resp

Em seguida vamos criar uma função para buscar o usuaŕio por email. Abra o arquivo apps/users/utils.py e adicione o código abaixo:

# ...

def get_user_by_email(email: str):
    try:
        # buscamos todos os usuários da base utilizando o paginate
        return User.objects.get(email=email)

    except DoesNotExist as e:
        return resp_does_not_exist('Users', 'Usuário')

    except FieldDoesNotExist as e:
        return resp_exception('Users', description=e.__str__())

    except Exception as e:
        return resp_exception('Users', description=e.__str__())

# ...

Depois das configurações vamos criar um função factory adicionando o JwtManager extensão do plugin que instalamos anteriormente e aplicar em nossa api.

Crie o módulo apps/jwt.py com o seguinte conteúdo.

# -*- coding: utf-8 -*-

# Flask

from flask import jsonify

# Third
from flask_jwt_extended import JWTManager

# Apps
from apps.users.models import User

# Local
from .messages import MSG_INVALID_CREDENTIALS, MSG_TOKEN_EXPIRED


def configure_jwt(app):

    # Add jwt handler
    jwt = JWTManager(app)

    @jwt.user_claims_loader
    def add_claims_to_access_token(identity):
        user = User.objects.get(email=identity)

        # Podemos extender as informações do usuaŕio adicionando
        # novos campos: active, roles, full_name e etc...

        if user:
            return {
                'active': user.active
            }

    @jwt.expired_token_loader
    def my_expired_token_callback():
        resp = jsonify({
            'status': 401,
            'sub_status': 42,
            'message': MSG_TOKEN_EXPIRED
        })

        resp.status_code = 401

        return resp

    @jwt.unauthorized_loader
    def my_unauthorized_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 1,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.claims_verification_loader
    def my_claims_verification_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 2,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.invalid_token_loader
    def my_invalid_token_loader_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 3,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.needs_fresh_token_loader
    def my_needs_fresh_token_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 4,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.revoked_token_loader
    def my_revoked_token_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 5,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.user_loader_callback_loader
    def my_user_loader_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 6,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.user_loader_error_loader
    def my_user_loader_error_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 7,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.token_in_blacklist_loader
    def my_token_in_blacklist_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 8,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

    @jwt.claims_verification_failed_loader
    def my_claims_verification_failed_callback(e):
        resp = jsonify({
            'status': 401,
            'sub_status': 9,
            'description': e,
            'message': MSG_INVALID_CREDENTIALS
        })

        resp.status_code = 401

        return resp

Todos os decorators como @jwt.claims_verification_failed_loader foram extendidos para retornar uma resposta JSON ao invés de retornar exceções.

Em seguida vamos chamar no nosso arquivo apps/__init__.py dentro da função create_app a função que acabamos de criar configure_jwt. Ficando assim:

# -*- coding: utf-8 -*-

from flask import Flask
from config import config

# Realize a importação da função que configura a api
from .api import configure_api
from .db import db
from .jwt import configure_jwt


def create_app(config_name):
    app = Flask('api-users')

    app.config.from_object(config[config_name])

    # Configure MongoEngine
    db.init_app(app)

    # Configure JWT
    configure_jwt(app)

    # executa a chamada da função de configuração
    configure_api(app)

    return app

Criando nosso app e o recurso de Login

Depois das configurações iniciais vamos criar nossos endpoints e recursos para autenticação. Adicione no api.py o endpoint /auth

from apps.auth.resources import AuthResource

# ...

api = Api()


def configure_api(app):
    # ...

    # rotas para autenticacao
    api.add_resource(AuthResource, '/auth')

    # ...

Crie um novo diretório chamado apps/auth e ele terá dois módulos:

$ tree .
.
├── api.py
├── auth
│   ├── __init__.py
│   ├── resources.py
│   └── schemas.py
...

Nosso apps/auth/schema.py não tem nada de novo e bem simples por sinal.

# -*- coding: utf-8 -*-


from marshmallow import Schema
from marshmallow.fields import Email, Str

from apps.messages import MSG_FIELD_REQUIRED


class LoginSchema(Schema):
    email = Email(
        required=True, error_messages={'required': MSG_FIELD_REQUIRED}
    )
    password = Str(
        required=True, error_messages={'required': MSG_FIELD_REQUIRED}
    )

E dentro do apps/auth/resources.py vamos criar nossa rota de autenticação.

# -*- coding:utf-8 -*-

# Python

# Flask
from flask import request

# Third
from flask_restful import Resource
from flask_jwt_extended import create_access_token, create_refresh_token
from bcrypt import checkpw

# Apps
from apps.users.models import User
from apps.users.schemas import UserSchema
from apps.users.utils import get_user_by_email
from apps.messages import MSG_NO_DATA, MSG_TOKEN_CREATED
from apps.responses import resp_ok, resp_data_invalid, resp_notallowed_user

# Local
from .schemas import LoginSchema


class AuthResource(Resource):
    def post(self, *args, **kwargs):
        '''
        Route to do login in API
        '''
        req_data = request.get_json() or None
        user = None
        login_schema = LoginSchema()
        schema = UserSchema()

        if req_data is None:
            return resp_data_invalid('Users', [], msg=MSG_NO_DATA)

        data, errors = login_schema.load(req_data)

        if errors:
            return resp_data_invalid('Users', errors)

        # Buscamos nosso usuário pelo email
        user = get_user_by_email(data.get('email'))

        # Em caso de exceção ou não é uma instancia do Modelo de User
        # retornamos a resposta
        if not isinstance(user, User):
            return user

        # Verificamos se o usuário está ativo na plataforma. Se não ele
        # não podera autenticar e não ter acesso a nada
        if not user.is_active():
            return resp_notallowed_user('Auth')

        # Conferimos a senha informada no payload de dados com a senha cadastrada
        # em nosso banco.
        if checkpw(data.get('password').encode('utf-8'), user.password.encode('utf-8')):

            # Chamamos os metodos para criar os tokens passando como identidade
            # o email do nosso usuario
            extras = {
                'token': create_access_token(identity=user.email),
                'refresh': create_refresh_token(identity=user.email)
            }

            result = schema.dump(user)

            return resp_ok(
                'Auth', MSG_TOKEN_CREATED, data=result.data, **extras
            )

        return resp_notallowed_user('Auth')

Vamos testar a autenticaçaõ?

$ http -v POST 0.0.0.0:5000/auth email=teste@teste.com password=123456
POST /auth HTTP/1.1
Accept: application/json, */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 50
Content-Type: application/json
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8

{
    "email": "teste@teste.com",
    "password": "123456"
}

HTTP/1.0 200 OK
Content-Length: 887
Content-Type: application/json
Date: Wed, 17 Oct 2018 22:11:04 GMT
Server: Werkzeug/0.14.1 Python/3.6.5

{
    "data": {
        "active": true,
        "cpf_cnpj": "123456",
        "email": "teste@teste.com",
        "full_name": "teste sobrenome",
        "id": "5bbeaf52fb5d1b0a32466c93"
    },
    "message": "Token criado.",
    "refresh": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTQyNjQsIm5iZiI6MTUzOTgxNDI2NCwianRpIjoiYTU3YmMwMTctMTFhOC00Yjc3LThkNmUtOWQxMDI5MTJkYTY0IiwiZXhwIjoxNTM5ODE1NDY0LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.0WiSlQ7_wJRUy0p7qfVyl0rdsPYTJxfGO2UMfXWryPI",
    "resource": "Auth",
    "status": 200,
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTQyNjQsIm5iZiI6MTUzOTgxNDI2NCwianRpIjoiZjdiZmI4NDYtMjk4ZS00MmY1LWIxYzItMWJiOTM2NzkwZjU1IiwiZXhwIjoxNTM5ODE1NDY0LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.BGEBy_XsAKXsu4_O_s8DBsbmWad16uNYWjZMevkYBGk"
}

Revalidando nosso token expirado

Para criar um novo token caso tenha expirado vamos criar um novo endpoint em nosso api.py.

from apps.auth.resources import AuthResource, RefreshTokenResource,

# ...

api = Api()


def configure_api(app):
    # ...

    # rotas para autenticacao
    api.add_resource(AuthResource, '/auth')
    api.add_resource(RefreshTokenResource, '/auth/refresh')
    # ...

Agora vamos criar nosso recurso RefreshTokenResource.

# ...
from flask_jwt_extended import jwt_refresh_token_required, get_jwt_identity

#...

class RefreshTokenResource(Resource):

    @jwt_refresh_token_required
    def post(self, *args, **kwargs):
        '''
        Refresh a token that expired.

        http://flask-jwt-extended.readthedocs.io/en/latest/refresh_tokens.html
        '''
        extras = {
            'token': create_access_token(identity=get_jwt_identity()),
        }

        return resp_ok(
            'Auth', MSG_TOKEN_CREATED, **extras
        )

E para testar:

$ http -v POST localhost:5000/auth/refresh "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5MDYsIm5iZiI6MTUzOTgxODkwNiwianRpIjoiZGQwMzdhMTAtNjFjNS00ZjgwLTgxYWUtZmJmZjRlM2M4NDljIiwiZXhwIjoxNTQyNDEwOTA2LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.F26hCslA-iNvPI2r4yLAJuzM6m9n8F41ghDfThwfmtY"
POST /auth/refresh HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5MDYsIm5iZiI6MTUzOTgxODkwNiwianRpIjoiZGQwMzdhMTAtNjFjNS00ZjgwLTgxYWUtZmJmZjRlM2M4NDljIiwiZXhwIjoxNTQyNDEwOTA2LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.F26hCslA-iNvPI2r4yLAJuzM6m9n8F41ghDfThwfmtY
Connection: keep-alive
Content-Length: 0
Host: localhost:5000
User-Agent: HTTPie/0.9.8



HTTP/1.0 200 OK
Content-Length: 419
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:29:31 GMT
Server: Werkzeug/0.14.1 Python/3.6.5

{
    "message": "Token criado.",
    "resource": "Auth",
    "status": 200,
    "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5NzEsIm5iZiI6MTUzOTgxODk3MSwianRpIjoiYTY5MjlmODktN2EyMC00MjdhLTkxYjYtZjdjZjFhMzc4MjEwIiwiZXhwIjoxNTM5ODIwMTcxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.zN2WCq_Ou0dhQYvbOdn_IyD3iy-8ZneXgZKeaVI_tMA"
}

Rotas com autenticação obrigatórias

Hoje todas as nossas rotas administrativas para controle de usuários estão abertas e sem autenticação. Logo precisamos fazer com que elas so possam ser acessadas através de um access token válido e claro vamos verificar se o usuário autenticado é admin=True. Caso contrário ele não poderá receber o resultado correto e sim uma mensagem de erro.

Vamos voltar em nosso apps/resources_admin.py e refatora-lo.

Primeiro vamos importar as seguintes funções do flask_jwt_extend.

# ...

# Third
# ...
from flask_jwt_extended import get_jwt_identity, jwt_required

#...

O jwt_required é um decorator do pacote a qual verifica se estamos recebendo um token em nossa requisição HTTP através do cabeçalho Authorization. Com isso vamos alterar nossos métodos para:


class AdminUserPageList(Resource):
    @jwt_required
    def get(self, page_id=1):
        #...

class AdminUserResource(Resource):
    @jwt_required
    def get(self, user_id):
        #...

    @jwt_required
    def put(self, user_id):
        #...

    @jwt_required
    def delete(self, user_id):
        #...

Se tentarmos acessar qualquer uma desses endpoints teremos o seguinte erro:

$ http -v GET 0.0.0.0:5000/admin/users/5bbeaf52fb5d1b0a32466c93
GET /admin/users/5bbeaf52fb5d1b0a32466c93 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8



HTTP/1.0 401 UNAUTHORIZED
Content-Length: 161
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:43:03 GMT
Server: Werkzeug/0.14.1 Python/3.6.5

{
    "description": "Missing Authorization Header",
    "message": "As credenciais estão inválidas para log in.",
    "status": 401,
    "sub_status": 1
}

Certo, mas e agora? Agora teremos de criar dois novos metodos para verificar se nosso usuário esta ativo e possui permissões de admin.

Em nosso apps/users/resources_admin.py vamos alterar TODOS os métodos com as seguintes linhas.

# ...
from flask_jwt_extended import get_jwt_identity, jwt_required

# ...
from .utils import get_user_by_id, exists_email_in_users, get_user_by_email


#...

class AdminUserResource(Resource):
    @jwt_required
    def get(self, user_id):
        result = None
        schema = UserSchema()
        current_user = get_user_by_email(get_jwt_identity())

        if not isinstance(current_user, User):
            return current_user

        if not (current_user.is_active()) and current_user.is_admin():
            return resp_notallowed_user('Users')

        # ...

Testando:

$ http -v GET 0.0.0.0:5000/admin/users/5bbeaf52fb5d1b0a32466c93 "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MjAzOTEsIm5iZiI6MTUzOTgyMDM5MSwianRpIjoiN2YwN2I5YTAtMmY3YS00ZDlmLWFmMzQtMTRmNzlmYmNlZjM4IiwiZXhwIjoxNTM5ODIxNTkxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.p_fjeR_Kc-5kJOLQ9F2EE6qeXe1iNL6amd-QkFEskJ8"
GET /admin/users/5bbeaf52fb5d1b0a32466c93 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MjAzOTEsIm5iZiI6MTUzOTgyMDM5MSwianRpIjoiN2YwN2I5YTAtMmY3YS00ZDlmLWFmMzQtMTRmNzlmYmNlZjM4IiwiZXhwIjoxNTM5ODIxNTkxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.p_fjeR_Kc-5kJOLQ9F2EE6qeXe1iNL6amd-QkFEskJ8
Connection: keep-alive
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8



HTTP/1.0 200 OK
Content-Length: 267
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:53:29 GMT
Server: Werkzeug/0.14.1 Python/3.6.5

{
    "data": {
        "active": true,
        "cpf_cnpj": "123456",
        "email": "teste@teste.com",
        "full_name": "teste sobrenome",
        "id": "5bbeaf52fb5d1b0a32466c93"
    },
    "message": "Usuários retornado(a).",
    "resource": "Users",
    "status": 200
}

Próximos passos

Chegamos a um nível bem satisfatório dessa série e ainda ha muito o que evoluir apenas com este pequeno projeto. Dentre minhas expectativas de estudos inclui:

  • JWT por cookies e não por json como foi feito

  • Revogar tokens quando fazer um logout

  • JWT por assinatura RSA (id_rsa e id_rsa.pub)

  • Deploy no Heroku

  • Deploy na Digital Ocean

  • Deploy no EC2 da AWS

  • Criar um container via Docker

  • Deploy na ECS da AWS

  • Fazer todo o processo de CI e CD, talvez com Jenkins

  • Criar um docker-compose contendo a api e o servidor mongo

  • Evoluir para uma arquitetura de micro serviços baseado no protocolo HTTP com outro serviço mais simples.

Por enquanto exitem uma boa gama de estudos principalmente na parte de cloud computing que devem ser estudados e colocados em prática. Espero que tenham gostado dessa série e que ela tenha ajudado os leitores.

Em breve vou lançando novos capítulos para essa série. Agradeço a todos. Abraços e fiquem com Deus.

Próximo artigo: Criando um container Docker