aboutsummaryrefslogtreecommitdiff
path: root/app/routes/user_routes.py
blob: 12b282869f9919b082e6333d6a1ab3d77bee200f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from fastapi import APIRouter, status, Path, Depends
from fastapi.exception_handlers import HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
import string
import bcrypt
import random

from app.util.db_dependency import get_db
from app.util.check_password_reqs import check_password_reqs
from app.schemas.auth_schemas import User
from app.schemas.user_schemas import *
from models import User as UserModel
from app.util.authentication import (
    verify_password,
    get_current_user,
)


router = APIRouter(prefix="/users", tags=["users"])


@router.delete("/{user_id}", summary="Delete your account")
async def delete_user(
    user_id: Annotated[int, Path(title="Link to delete")],
    current_user: Annotated[User, Depends(get_current_user)],
    db=Depends(get_db),
):
    """
    Delete the user account associated with the current user
    """
    # No editing others accounts
    if user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You can only delete your own account",
        )

    # Get the user and delete them
    user = db.query(UserModel).filter(UserModel.id == current_user.id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )

    db.delete(user)
    db.commit()
    return status.HTTP_204_NO_CONTENT


@router.post("/{user_id}/password", summary="Update your account password")
async def update_pass(
    user_id: Annotated[int, Path(title="Link to update")],
    update_data: UpdatePasswordSchema,
    current_user: Annotated[User, Depends(get_current_user)],
    db=Depends(get_db),
):
    """
    Update the pass of the current user account
    """
    if user_id != current_user.id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You can only update your own account",
        )

    # Make sure that they entered the correct current password
    if not verify_password(
        update_data.current_password, current_user.hashed_password
    ):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect current password",
        )

    # Make sure the password meets all of the requirements
    check_password_reqs(update_data.new_password)

    # Get the user and update the password
    user = db.query(UserModel).filter(UserModel.id == current_user.id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
    user.hashed_password = bcrypt.hashpw(
        update_data.new_password.encode("utf-8"), bcrypt.gensalt()
    ).decode("utf-8")
    db.commit()
    return status.HTTP_204_NO_CONTENT


@router.post("/register", summary="Register a new user")
async def get_links(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    db=Depends(get_db),
):
    """
    Given the login data (username, password) process the registration of a new
    user account and return either the user or an error message
    """
    username = form_data.username
    password = form_data.password

    # Make sure the password meets all of the requirements
    check_password_reqs(password)

    # Make sure the username isn't taken
    user = db.query(UserModel).filter(UserModel.username == username).first()
    if user:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Username not available",
        )
    # Otherwise, hash the password, create the api key, and add the new user
    hashed_password = bcrypt.hashpw(
        password.encode("utf-8"), bcrypt.gensalt()
    ).decode("utf-8")
    api_key = "".join(
        random.choices(string.ascii_letters + string.digits, k=20)
    )
    new_user = UserModel(
        username=username, hashed_password=hashed_password, api_key=api_key
    )
    db.add(new_user)
    db.commit()

    return status.HTTP_201_CREATED