diff options
Diffstat (limited to 'api/util')
-rw-r--r-- | api/util/authentication.py | 111 | ||||
-rw-r--r-- | api/util/db_dependency.py | 9 |
2 files changed, 0 insertions, 120 deletions
diff --git a/api/util/authentication.py b/api/util/authentication.py deleted file mode 100644 index b8ac6a6..0000000 --- a/api/util/authentication.py +++ /dev/null @@ -1,111 +0,0 @@ -import random -import bcrypt -from fastapi import Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer -from jwt.exceptions import InvalidTokenError -from datetime import datetime, timedelta -from typing import Annotated, Optional -import jwt - -from api.util.db_dependency import get_db -from api.schemas.auth_schemas import * -from models import User as UserDB - -secret_key = random.randbytes(32) -algorithm = "HS256" -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - -""" -Helper functions for authentication -""" - - -def verify_password(plain_password, hashed_password): - return bcrypt.checkpw( - plain_password.encode("utf-8"), hashed_password.encode("utf-8") - ) - - -def get_user(db, username: str): - """ - Get the user object from the database - """ - user = db.query(UserDB).filter(UserDB.username == username).first() - if user: - return UserInDB(**user.__dict__) - - -def authenticate_user(db, username: str, password: str): - """ - Determine if the correct username and password were provided - If so, return the user object - """ - user = get_user(db, username) - if not user: - return False - if not verify_password(password, user.hashed_password): - return False - return user - - -def create_access_token(data: dict, expires_delta: timedelta): - """ - Return an encoded JWT token with the given data - """ - to_encode = data.copy() - expire = datetime.utcnow() + expires_delta - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) - return encoded_jwt - - -# Backwards kinda of way to get refresh token support -# 'refresh_get_current_user' is only called from /refresh -# and alerts 'current_user' that it should expect a refresh token -async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): - user = await current_user(token) - return user - - -async def refresh_get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], -): - user = await current_user(token, is_refresh=True) - return user - - -async def current_user( - token: str, - is_refresh: bool = False, - db=Depends(get_db), -): - """ - Return the current user based on the token, or raise a 401 error - """ - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, secret_key, algorithms=[algorithm]) - username: str = payload.get("sub") - refresh: bool = payload.get("refresh") - if username is None: - raise credentials_exception - # For some reason, an access token was passed when a refresh - # token was expected - some likely malicious activity - if not refresh and is_refresh: - raise credentials_exception - # If the token passed is a refresh token and the function - # is not expecting a refresh token, raise an error - if refresh and not is_refresh: - raise credentials_exception - - token_data = TokenData(username=username) - except InvalidTokenError: - raise credentials_exception - user = get_user(db, username=token_data.username) - if user is None: - raise credentials_exception - return user diff --git a/api/util/db_dependency.py b/api/util/db_dependency.py deleted file mode 100644 index a6734ea..0000000 --- a/api/util/db_dependency.py +++ /dev/null @@ -1,9 +0,0 @@ -from database import SessionLocal - - -def get_db(): - db = SessionLocal() - try: - yield db - finally: - db.close() |