aboutsummaryrefslogtreecommitdiff
path: root/app/routes/auth_routes.py
blob: ceb68b134b127b3c6b98c78cdca79baf309b0433 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from fastapi import Depends, APIRouter, status, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import Response, JSONResponse
from datetime import timedelta
from typing import Annotated

from app.util.authentication import (
    create_access_token,
    authenticate_user,
    refresh_get_current_user,
)
from app.schemas.auth_schemas import Token, User
from app.util.db_dependency import get_db


router = APIRouter(prefix="/auth", tags=["auth"])


@router.post("/token", summary="Authenticate and get an access token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    response: Response,
    db=Depends(get_db),
):
    """
    Return an access token for the user, if the given authentication details are correct
    """
    user = authenticate_user(db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=1)
    access_token = create_access_token(
        data={"sub": user.id, "username": user.username, "refresh": False},
        expires_delta=access_token_expires,
    )
    # Create a refresh token - just an access token with a longer expiry
    # and more restrictions ("refresh" is True)
    refresh_token_expires = timedelta(days=1)
    refresh_token = create_access_token(
        data={"sub": user.id, "username": user.username, "refresh": True},
        expires_delta=refresh_token_expires,
    )
    response = JSONResponse(content={"success": True})
    response.set_cookie(key="access_token", value=access_token, httponly=True)
    response.set_cookie(
        key="refresh_token", value=refresh_token, httponly=True
    )
    return response


# Full native JWT support is not complete in FastAPI yet :(
# Part of that is token refresh, so we must implement it ourselves
@router.post("/refresh")
async def refresh_access_token(
    current_user: Annotated[User, Depends(refresh_get_current_user)],
    response: Response,
) -> Token:
    """
    Return a new access token if the refresh token is valid
    """
    access_token_expires = timedelta(minutes=1)
    access_token = create_access_token(
        data={"sub": current_user.id, "refresh": False},
        expires_delta=access_token_expires,
    )
    response = JSONResponse(content={"success": True})
    response.set_cookie(key="access_token", value=access_token, httponly=True)
    return response