diff options
Diffstat (limited to 'app/util/authentication.py')
-rw-r--r-- | app/util/authentication.py | 55 |
1 files changed, 16 insertions, 39 deletions
diff --git a/app/util/authentication.py b/app/util/authentication.py index b270c6d..0bc7e09 100644 --- a/app/util/authentication.py +++ b/app/util/authentication.py @@ -1,15 +1,15 @@ import random import bcrypt -from fastapi import Depends, HTTPException, status, Cookie +from fastapi import Depends, HTTPException, status, Request, Cookie from fastapi.security import OAuth2PasswordBearer from fastapi.responses import RedirectResponse from jwt.exceptions import InvalidTokenError from datetime import datetime, timedelta -from typing import Annotated +from typing import Annotated, Optional import jwt from app.util.db_dependency import get_db -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import Session from app.schemas.auth_schemas import * from models import User as UserModel @@ -62,30 +62,6 @@ def create_access_token(data: dict, expires_delta: timedelta): return encoded_jwt -async def get_current_user_from_cookie( - access_token: str = Cookie(None), db=Depends(get_db) -): - """ - Return the user based on the access token in the cookie - - Used for authentication into UI pages - so if no cookie - exists, redirect to login page rather than returning a 401 - - Also pass is_ui=True to alert get_current_user that we need - to use RedirectResponse rather than raising an HTTPException - """ - if access_token: - return await get_current_user(access_token, is_ui=True, db=db) - return RedirectResponse(url="/login") - - -async def get_current_user_from_token( - token: Annotated[str, Depends(oauth2_scheme)], - db=Depends(get_db), -): - return await get_current_user(token, db=db) - - # Backwards kind of way to get refresh token support # `refresh_get_current_user` is only called from /refresh # and alerts `get_current_user` that it should expect a refresh token @@ -97,10 +73,8 @@ async def refresh_get_current_user( async def get_current_user( - token: str, - is_refresh: bool = False, - is_ui: bool = False, - db: sessionmaker = None, + request: Request, + db=Depends(get_db), ): """ Return the current user based on the token @@ -110,9 +84,16 @@ async def get_current_user( Otherwise, the request is from an API and we should return a 401 """ + # If the request is from /api/auth/refresh, it is a request to get + # a new access token using a refresh token + if request.url.path == "/api/auth/refresh": + token = request.cookies.get("refresh_token") + is_refresh = True + else: + token = request.cookies.get("access_token") + is_refresh = False + def raise_unauthorized(): - if is_ui: - return RedirectResponse(url="/login") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", @@ -126,12 +107,8 @@ async def get_current_user( refresh: bool = payload.get("refresh") if not id or not username: return raise_unauthorized() - # For some reason, an access token was passed when a refresh - # token was expected - some likely malicious activity - if not refresh and is_refresh: - return raise_unauthorized() - # If the token passed is a refresh token and the function - # is not expecting a refresh token, raise an error + + # Make sure that a refresh token was not passed to any other endpoint if refresh and not is_refresh: return raise_unauthorized() |