Série API em Flask - Parte 12 - Autênticação por JWT
O JWT é um dos meios mais comuns de autorização entre aplicações web e SPA’s desenvolvidos com ReactJS ou outros frameworks JavaScript. Por ser um meio fácil de conter informações o JWT pode ter dados do usuário e um tempo de expiração.
Para entendimento, segue os capítulos que serão abordados.
Capítulo 1: Introdução, configuração e Hello World
Capítulo 2: Organizando as dependências e requerimentos
Capítulo 3: Configurando o pytest e nosso primeiro teste
Capítulo 4: Configurando o Makefile
Capítulo 5: Adicionando o MongoDB
Capítulo 6: Criando e testando o modelo de usuários
Capítulo 7: Criando usuários
Capítulo 8: Listando usuários
Capítulo 9: Buscando usuários
Capítulo 10: Editando um usuário
Capítulo 11: Deletando um usuário
Capítulo 12: Autênticação por JWT Estamos aqui
Capítulo 13: Criando um container Docker
Capítulo 14: Arquivos de configuração para Deploy na Digital Ocean
Capítulo 15: Automatizando o processo de deploy com Fabric
Capítulo 16: CI e CD com Jenkins, Python, Flask e Fabric
Capítulo 17: Utilizando o RabbitMQ com Flask e Sendgrid para enviar e-mails de boas vindas e ativar a conta do usuário
O repositório com todo o código fonte esta aqui. Os capítulos estão em branches
.
Adição e configurações do JWT
Nesse projeto existem dois pacotes que irei utilizar. O primeiro é o PyJWT e a extensão do flask Flask-JWT-Extended.
Vamos editar os nossos requeriments/base.txt
e adicionar esses dois pacotes.
Flask-JWT-Extended==3.13.1
PyJWT==1.6.4
Em seguida pip install -r requirements/base.txt
Feito isso vamos colocar algumas configurações iniciais em nosso .env
.
JWT_ACCESS_TOKEN_EXPIRES=20
JWT_REFRESH_TOKEN_EXPIRES=30
E em nossa classe de configurações do flask, Config
, em config.py
.
# ...
from datetime import timedelta
class Config:
SECRET_KEY = getenv('SECRET_KEY') or 'uma string randômica e gigante'
APP_PORT = int(getenv('APP_PORT'))
DEBUG = eval(getenv('DEBUG').title())
MONGODB_HOST = getenv('MONGODB_URI')
JWT_ACCESS_TOKEN_EXPIRES = timedelta(
minutes=int(getenv('JWT_ACCESS_TOKEN_EXPIRES'))
)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(
days=int(getenv('JWT_REFRESH_TOKEN_EXPIRES'))
)
# ...
Para fins de segurança é importante setar um tempo maximo para expirar um Token de 20 minutos que é o tempo médio que um usuário permanece logado. Acima disso qualquer um que conseguir um token válido poderá entrar na conta do usuário e fazer algumas bagunças no perfil.
Crie novas mensagens em nosso apps/messages.py
MSG_TOKEN_CREATED = 'Token criado.'
MSG_INVALID_CREDENTIALS = 'As credenciais estão inválidas para log in.'
MSG_TOKEN_EXPIRED = 'Token expirou.'
MSG_PERMISSION_DENIED = 'Permissão negada.'
Crie uma nova resposta para nossas requisições no apps/response.py
# ...
from .messages import MSG_ALREADY_EXISTS, MSG_PERMISSION_DENIED
# ...
def resp_notallowed_user(resource: str, msg: str = MSG_PERMISSION_DENIED):
if not isinstance(resource, str):
raise ValueError('Recurso precisa ser uma string')
resp = jsonify({
'status': 401,
'resource': resource,
'message': msg
})
resp.status_code = 401
return resp
Em seguida vamos criar uma função para buscar o usuaŕio por email. Abra o arquivo apps/users/utils.py
e adicione o código abaixo:
# ...
def get_user_by_email(email: str):
try:
# buscamos todos os usuários da base utilizando o paginate
return User.objects.get(email=email)
except DoesNotExist as e:
return resp_does_not_exist('Users', 'Usuário')
except FieldDoesNotExist as e:
return resp_exception('Users', description=e.__str__())
except Exception as e:
return resp_exception('Users', description=e.__str__())
# ...
Depois das configurações vamos criar um função factory adicionando o JwtManager
extensão do plugin que instalamos anteriormente e aplicar em nossa api.
Crie o módulo apps/jwt.py
com o seguinte conteúdo.
# -*- coding: utf-8 -*-
# Flask
from flask import jsonify
# Third
from flask_jwt_extended import JWTManager
# Apps
from apps.users.models import User
# Local
from .messages import MSG_INVALID_CREDENTIALS, MSG_TOKEN_EXPIRED
def configure_jwt(app):
# Add jwt handler
jwt = JWTManager(app)
@jwt.user_claims_loader
def add_claims_to_access_token(identity):
user = User.objects.get(email=identity)
# Podemos extender as informações do usuaŕio adicionando
# novos campos: active, roles, full_name e etc...
if user:
return {
'active': user.active
}
@jwt.expired_token_loader
def my_expired_token_callback():
resp = jsonify({
'status': 401,
'sub_status': 42,
'message': MSG_TOKEN_EXPIRED
})
resp.status_code = 401
return resp
@jwt.unauthorized_loader
def my_unauthorized_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 1,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.claims_verification_loader
def my_claims_verification_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 2,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.invalid_token_loader
def my_invalid_token_loader_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 3,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.needs_fresh_token_loader
def my_needs_fresh_token_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 4,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.revoked_token_loader
def my_revoked_token_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 5,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.user_loader_callback_loader
def my_user_loader_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 6,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.user_loader_error_loader
def my_user_loader_error_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 7,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.token_in_blacklist_loader
def my_token_in_blacklist_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 8,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
@jwt.claims_verification_failed_loader
def my_claims_verification_failed_callback(e):
resp = jsonify({
'status': 401,
'sub_status': 9,
'description': e,
'message': MSG_INVALID_CREDENTIALS
})
resp.status_code = 401
return resp
Todos os decorators como @jwt.claims_verification_failed_loader
foram extendidos para retornar uma resposta JSON ao invés de retornar exceções.
Em seguida vamos chamar no nosso arquivo apps/__init__.py
dentro da função create_app
a função que acabamos de criar configure_jwt
. Ficando assim:
# -*- coding: utf-8 -*-
from flask import Flask
from config import config
# Realize a importação da função que configura a api
from .api import configure_api
from .db import db
from .jwt import configure_jwt
def create_app(config_name):
app = Flask('api-users')
app.config.from_object(config[config_name])
# Configure MongoEngine
db.init_app(app)
# Configure JWT
configure_jwt(app)
# executa a chamada da função de configuração
configure_api(app)
return app
Criando nosso app e o recurso de Login
Depois das configurações iniciais vamos criar nossos endpoints e recursos para autenticação. Adicione no api.py
o endpoint /auth
from apps.auth.resources import AuthResource
# ...
api = Api()
def configure_api(app):
# ...
# rotas para autenticacao
api.add_resource(AuthResource, '/auth')
# ...
Crie um novo diretório chamado apps/auth
e ele terá dois módulos:
$ tree .
.
├── api.py
├── auth
│ ├── __init__.py
│ ├── resources.py
│ └── schemas.py
...
Nosso apps/auth/schema.py
não tem nada de novo e bem simples por sinal.
# -*- coding: utf-8 -*-
from marshmallow import Schema
from marshmallow.fields import Email, Str
from apps.messages import MSG_FIELD_REQUIRED
class LoginSchema(Schema):
email = Email(
required=True, error_messages={'required': MSG_FIELD_REQUIRED}
)
password = Str(
required=True, error_messages={'required': MSG_FIELD_REQUIRED}
)
E dentro do apps/auth/resources.py
vamos criar nossa rota de autenticação.
# -*- coding:utf-8 -*-
# Python
# Flask
from flask import request
# Third
from flask_restful import Resource
from flask_jwt_extended import create_access_token, create_refresh_token
from bcrypt import checkpw
# Apps
from apps.users.models import User
from apps.users.schemas import UserSchema
from apps.users.utils import get_user_by_email
from apps.messages import MSG_NO_DATA, MSG_TOKEN_CREATED
from apps.responses import resp_ok, resp_data_invalid, resp_notallowed_user
# Local
from .schemas import LoginSchema
class AuthResource(Resource):
def post(self, *args, **kwargs):
'''
Route to do login in API
'''
req_data = request.get_json() or None
user = None
login_schema = LoginSchema()
schema = UserSchema()
if req_data is None:
return resp_data_invalid('Users', [], msg=MSG_NO_DATA)
data, errors = login_schema.load(req_data)
if errors:
return resp_data_invalid('Users', errors)
# Buscamos nosso usuário pelo email
user = get_user_by_email(data.get('email'))
# Em caso de exceção ou não é uma instancia do Modelo de User
# retornamos a resposta
if not isinstance(user, User):
return user
# Verificamos se o usuário está ativo na plataforma. Se não ele
# não podera autenticar e não ter acesso a nada
if not user.is_active():
return resp_notallowed_user('Auth')
# Conferimos a senha informada no payload de dados com a senha cadastrada
# em nosso banco.
if checkpw(data.get('password').encode('utf-8'), user.password.encode('utf-8')):
# Chamamos os metodos para criar os tokens passando como identidade
# o email do nosso usuario
extras = {
'token': create_access_token(identity=user.email),
'refresh': create_refresh_token(identity=user.email)
}
result = schema.dump(user)
return resp_ok(
'Auth', MSG_TOKEN_CREATED, data=result.data, **extras
)
return resp_notallowed_user('Auth')
Vamos testar a autenticaçaõ?
$ http -v POST 0.0.0.0:5000/auth email=teste@teste.com password=123456
POST /auth HTTP/1.1
Accept: application/json, */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 50
Content-Type: application/json
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8
{
"email": "teste@teste.com",
"password": "123456"
}
HTTP/1.0 200 OK
Content-Length: 887
Content-Type: application/json
Date: Wed, 17 Oct 2018 22:11:04 GMT
Server: Werkzeug/0.14.1 Python/3.6.5
{
"data": {
"active": true,
"cpf_cnpj": "123456",
"email": "teste@teste.com",
"full_name": "teste sobrenome",
"id": "5bbeaf52fb5d1b0a32466c93"
},
"message": "Token criado.",
"refresh": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTQyNjQsIm5iZiI6MTUzOTgxNDI2NCwianRpIjoiYTU3YmMwMTctMTFhOC00Yjc3LThkNmUtOWQxMDI5MTJkYTY0IiwiZXhwIjoxNTM5ODE1NDY0LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.0WiSlQ7_wJRUy0p7qfVyl0rdsPYTJxfGO2UMfXWryPI",
"resource": "Auth",
"status": 200,
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTQyNjQsIm5iZiI6MTUzOTgxNDI2NCwianRpIjoiZjdiZmI4NDYtMjk4ZS00MmY1LWIxYzItMWJiOTM2NzkwZjU1IiwiZXhwIjoxNTM5ODE1NDY0LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.BGEBy_XsAKXsu4_O_s8DBsbmWad16uNYWjZMevkYBGk"
}
Revalidando nosso token expirado
Para criar um novo token caso tenha expirado vamos criar um novo endpoint em nosso api.py
.
from apps.auth.resources import AuthResource, RefreshTokenResource,
# ...
api = Api()
def configure_api(app):
# ...
# rotas para autenticacao
api.add_resource(AuthResource, '/auth')
api.add_resource(RefreshTokenResource, '/auth/refresh')
# ...
Agora vamos criar nosso recurso RefreshTokenResource
.
# ...
from flask_jwt_extended import jwt_refresh_token_required, get_jwt_identity
#...
class RefreshTokenResource(Resource):
@jwt_refresh_token_required
def post(self, *args, **kwargs):
'''
Refresh a token that expired.
http://flask-jwt-extended.readthedocs.io/en/latest/refresh_tokens.html
'''
extras = {
'token': create_access_token(identity=get_jwt_identity()),
}
return resp_ok(
'Auth', MSG_TOKEN_CREATED, **extras
)
E para testar:
$ http -v POST localhost:5000/auth/refresh "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5MDYsIm5iZiI6MTUzOTgxODkwNiwianRpIjoiZGQwMzdhMTAtNjFjNS00ZjgwLTgxYWUtZmJmZjRlM2M4NDljIiwiZXhwIjoxNTQyNDEwOTA2LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.F26hCslA-iNvPI2r4yLAJuzM6m9n8F41ghDfThwfmtY"
POST /auth/refresh HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5MDYsIm5iZiI6MTUzOTgxODkwNiwianRpIjoiZGQwMzdhMTAtNjFjNS00ZjgwLTgxYWUtZmJmZjRlM2M4NDljIiwiZXhwIjoxNTQyNDEwOTA2LCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsInR5cGUiOiJyZWZyZXNoIn0.F26hCslA-iNvPI2r4yLAJuzM6m9n8F41ghDfThwfmtY
Connection: keep-alive
Content-Length: 0
Host: localhost:5000
User-Agent: HTTPie/0.9.8
HTTP/1.0 200 OK
Content-Length: 419
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:29:31 GMT
Server: Werkzeug/0.14.1 Python/3.6.5
{
"message": "Token criado.",
"resource": "Auth",
"status": 200,
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MTg5NzEsIm5iZiI6MTUzOTgxODk3MSwianRpIjoiYTY5MjlmODktN2EyMC00MjdhLTkxYjYtZjdjZjFhMzc4MjEwIiwiZXhwIjoxNTM5ODIwMTcxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.zN2WCq_Ou0dhQYvbOdn_IyD3iy-8ZneXgZKeaVI_tMA"
}
Rotas com autenticação obrigatórias
Hoje todas as nossas rotas administrativas para controle de usuários estão abertas e sem autenticação. Logo precisamos fazer com que elas so possam ser acessadas através de um access token
válido e claro vamos verificar se o usuário autenticado é admin=True
. Caso contrário ele não poderá receber o resultado correto e sim uma mensagem de erro.
Vamos voltar em nosso apps/resources_admin.py
e refatora-lo.
Primeiro vamos importar as seguintes funções do flask_jwt_extend
.
# ...
# Third
# ...
from flask_jwt_extended import get_jwt_identity, jwt_required
#...
O jwt_required
é um decorator do pacote a qual verifica se estamos recebendo um token em nossa requisição HTTP através do cabeçalho Authorization
. Com isso vamos alterar nossos métodos para:
class AdminUserPageList(Resource):
@jwt_required
def get(self, page_id=1):
#...
class AdminUserResource(Resource):
@jwt_required
def get(self, user_id):
#...
@jwt_required
def put(self, user_id):
#...
@jwt_required
def delete(self, user_id):
#...
Se tentarmos acessar qualquer uma desses endpoints teremos o seguinte erro:
$ http -v GET 0.0.0.0:5000/admin/users/5bbeaf52fb5d1b0a32466c93
GET /admin/users/5bbeaf52fb5d1b0a32466c93 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8
HTTP/1.0 401 UNAUTHORIZED
Content-Length: 161
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:43:03 GMT
Server: Werkzeug/0.14.1 Python/3.6.5
{
"description": "Missing Authorization Header",
"message": "As credenciais estão inválidas para log in.",
"status": 401,
"sub_status": 1
}
Certo, mas e agora? Agora teremos de criar dois novos metodos para verificar se nosso usuário esta ativo e possui permissões de admin.
Em nosso apps/users/resources_admin.py
vamos alterar TODOS os métodos com as seguintes linhas.
# ...
from flask_jwt_extended import get_jwt_identity, jwt_required
# ...
from .utils import get_user_by_id, exists_email_in_users, get_user_by_email
#...
class AdminUserResource(Resource):
@jwt_required
def get(self, user_id):
result = None
schema = UserSchema()
current_user = get_user_by_email(get_jwt_identity())
if not isinstance(current_user, User):
return current_user
if not (current_user.is_active()) and current_user.is_admin():
return resp_notallowed_user('Users')
# ...
Testando:
$ http -v GET 0.0.0.0:5000/admin/users/5bbeaf52fb5d1b0a32466c93 "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MjAzOTEsIm5iZiI6MTUzOTgyMDM5MSwianRpIjoiN2YwN2I5YTAtMmY3YS00ZDlmLWFmMzQtMTRmNzlmYmNlZjM4IiwiZXhwIjoxNTM5ODIxNTkxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.p_fjeR_Kc-5kJOLQ9F2EE6qeXe1iNL6amd-QkFEskJ8"
GET /admin/users/5bbeaf52fb5d1b0a32466c93 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1Mzk4MjAzOTEsIm5iZiI6MTUzOTgyMDM5MSwianRpIjoiN2YwN2I5YTAtMmY3YS00ZDlmLWFmMzQtMTRmNzlmYmNlZjM4IiwiZXhwIjoxNTM5ODIxNTkxLCJpZGVudGl0eSI6InRlc3RlQHRlc3RlLmNvbSIsImZyZXNoIjpmYWxzZSwidHlwZSI6ImFjY2VzcyIsInVzZXJfY2xhaW1zIjp7ImFjdGl2ZSI6dHJ1ZX19.p_fjeR_Kc-5kJOLQ9F2EE6qeXe1iNL6amd-QkFEskJ8
Connection: keep-alive
Host: 0.0.0.0:5000
User-Agent: HTTPie/0.9.8
HTTP/1.0 200 OK
Content-Length: 267
Content-Type: application/json
Date: Wed, 17 Oct 2018 23:53:29 GMT
Server: Werkzeug/0.14.1 Python/3.6.5
{
"data": {
"active": true,
"cpf_cnpj": "123456",
"email": "teste@teste.com",
"full_name": "teste sobrenome",
"id": "5bbeaf52fb5d1b0a32466c93"
},
"message": "Usuários retornado(a).",
"resource": "Users",
"status": 200
}
Próximos passos
Chegamos a um nível bem satisfatório dessa série e ainda ha muito o que evoluir apenas com este pequeno projeto. Dentre minhas expectativas de estudos inclui:
JWT por cookies e não por json como foi feito
Revogar tokens quando fazer um logout
JWT por assinatura RSA (id_rsa e id_rsa.pub)
Deploy no Heroku
Deploy na Digital Ocean
Deploy no EC2 da AWS
Criar um container via Docker
Deploy na ECS da AWS
Fazer todo o processo de CI e CD, talvez com Jenkins
Criar um docker-compose contendo a api e o servidor mongo
Evoluir para uma arquitetura de micro serviços baseado no protocolo HTTP com outro serviço mais simples.
Por enquanto exitem uma boa gama de estudos principalmente na parte de cloud computing que devem ser estudados e colocados em prática. Espero que tenham gostado dessa série e que ela tenha ajudado os leitores.
Em breve vou lançando novos capítulos para essa série. Agradeço a todos. Abraços e fiquem com Deus.
Próximo artigo: Criando um container Docker