Revoking Tokens
This will allow you to revoke a specific tokens so that it can no longer access your endpoints. You will have to choose what tokens you want to check against the denylist. Denylist works by providing a callback function to this extension, using the token_in_denylist_loader(). This method will be called whenever the specified tokens (access and/or refresh) is used to access a protected endpoint. If the callback function says that the tokens is revoked, we will not allow the requester to continue, otherwise we will allow the requester to access the endpoint as normal.
Here is a basic example use tokens revoking:
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
username: str
password: str
# set denylist enabled to True
# you can set to check access or refresh token or even both of them
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
authjwt_denylist_enabled: bool = True
authjwt_denylist_token_checks: set = {"access","refresh"}
@AuthJWT.load_config
def get_config():
return Settings()
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.message}
)
# A storage engine to save revoked tokens. in production,
# you can use Redis for storage system
denylist = set()
# For this example, we are just checking if the tokens jti
# (unique identifier) is in the denylist set. This could
# be made more complex, for example storing the token in Redis
# with the value true if revoked and false if not revoked
@AuthJWT.token_in_denylist_loader
def check_if_token_in_denylist(decrypted_token):
jti = decrypted_token['jti']
return jti in denylist
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401,detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
# Standard refresh endpoint. Token in denylist will not
# be able to access this endpoint
@app.post('/refresh')
def refresh(Authorize: AuthJWT = Depends()):
Authorize.jwt_refresh_token_required()
current_user = Authorize.get_jwt_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
# Endpoint for revoking the current users access token
@app.delete('/access-revoke')
def access_revoke(Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
jti = Authorize.get_raw_jwt()['jti']
denylist.add(jti)
return {"detail":"Access token has been revoke"}
# Endpoint for revoking the current users refresh token
@app.delete('/refresh-revoke')
def refresh_revoke(Authorize: AuthJWT = Depends()):
Authorize.jwt_refresh_token_required()
jti = Authorize.get_raw_jwt()['jti']
denylist.add(jti)
return {"detail":"Refresh token has been revoke"}
# A token in denylist will not be able to access this any more
@app.get('/protected')
def protected(Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
current_user = Authorize.get_jwt_subject()
return {"user": current_user}
In production, you will likely want to use either a database or in-memory store (such as Redis) to store your tokens. Memory stores are great if you are wanting to revoke a tokens when the users log out and you can define timeout to your tokens in Redis, after the timeout has expired, the tokens will automatically be deleted.
Note
Before that make sure redis already installed on your local machine,
you can use docker using this command docker run -d -p 6379:6379 redis
Here example use Redis for revoking a tokens:
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
from datetime import timedelta
from redis import Redis
app = FastAPI()
class User(BaseModel):
username: str
password: str
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
authjwt_denylist_enabled: bool = True
authjwt_denylist_token_checks: set = {"access","refresh"}
access_expires: int = timedelta(minutes=15)
refresh_expires: int = timedelta(days=30)
settings = Settings()
@AuthJWT.load_config
def get_config():
return settings
@app.exception_handler(AuthJWTException)
def authjwt_exception_handler(request: Request, exc: AuthJWTException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.message}
)
# Setup our redis connection for storing the denylist tokens
redis_conn = Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Create our function to check if a token has been revoked. In this simple
# case, we will just store the tokens jti (unique identifier) in redis.
# This function will return the revoked status of a token. If a token exists
# in redis and value is true, token has been revoked
@AuthJWT.token_in_denylist_loader
def check_if_token_in_denylist(decrypted_token):
jti = decrypted_token['jti']
entry = redis_conn.get(jti)
return entry and entry == 'true'
@app.post('/login')
def login(user: User, Authorize: AuthJWT = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401,detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
# Standard refresh endpoint. Token in denylist will not
# be able to access this endpoint
@app.post('/refresh')
def refresh(Authorize: AuthJWT = Depends()):
Authorize.jwt_refresh_token_required()
current_user = Authorize.get_jwt_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
# Endpoint for revoking the current users access token
@app.delete('/access-revoke')
def access_revoke(Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
# Store the tokens in redis with the value true for revoked.
# We can also set an expires time on these tokens in redis,
# so they will get automatically removed after they expired.
jti = Authorize.get_raw_jwt()['jti']
redis_conn.setex(jti,settings.access_expires,'true')
return {"detail":"Access token has been revoke"}
# Endpoint for revoking the current users refresh token
@app.delete('/refresh-revoke')
def refresh_revoke(Authorize: AuthJWT = Depends()):
Authorize.jwt_refresh_token_required()
jti = Authorize.get_raw_jwt()['jti']
redis_conn.setex(jti,settings.refresh_expires,'true')
return {"detail":"Refresh token has been revoke"}
# A token in denylist will not be able to access this any more
@app.get('/protected')
def protected(Authorize: AuthJWT = Depends()):
Authorize.jwt_required()
current_user = Authorize.get_jwt_subject()
return {"user": current_user}