131 lines
4.5 KiB
Python
131 lines
4.5 KiB
Python
from fastapi import APIRouter, status, Path, Depends
|
|
from fastapi.exception_handlers import HTTPException
|
|
from typing import Annotated
|
|
import string
|
|
import bcrypt
|
|
import random
|
|
import datetime
|
|
import validators
|
|
|
|
from app.util.db_dependency import get_db
|
|
from app.schemas.auth_schemas import User
|
|
from app.schemas.user_schemas import *
|
|
from models import User as UserModel
|
|
from app.util.authentication import get_current_user_from_token
|
|
|
|
|
|
router = APIRouter(prefix="/user", tags=["user"])
|
|
|
|
# In order to help protect some anonymity/privacy, user routes
|
|
# do not use path parameters, as then people could potentially
|
|
# see if a specific username exists or not. Instead, the user
|
|
# routes will use query parameters to specify the user to act
|
|
|
|
|
|
@router.post("/register", summary="Register a new user")
|
|
async def get_links(
|
|
login_data: LoginDataSchema,
|
|
db=Depends(get_db),
|
|
):
|
|
"""
|
|
Given the login data (username, password) process the registration of a new
|
|
user account and return either the user or an error message
|
|
"""
|
|
username = login_data.username
|
|
password = login_data.password
|
|
# Make sure the password meets all of the requirements
|
|
# if len(password) < 8:
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must be at least 8 characters",
|
|
# )
|
|
# if not any(char.isdigit() for char in password):
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must contain at least one digit",
|
|
# )
|
|
# if not any(char.isupper() for char in password):
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must contain at least one uppercase letter",
|
|
# )
|
|
# Make sure the username isn't taken
|
|
user = db.query(UserModel).filter(UserModel.username == username).first()
|
|
if user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Username not available",
|
|
)
|
|
# Otherwise, hash the password, create the api key, and add the new user
|
|
hashed_password = bcrypt.hashpw(
|
|
password.encode("utf-8"), bcrypt.gensalt()
|
|
).decode("utf-8")
|
|
api_key = "".join(
|
|
random.choices(string.ascii_letters + string.digits, k=20)
|
|
)
|
|
new_user = UserModel(
|
|
username=username, hashed_password=hashed_password, api_key=api_key
|
|
)
|
|
db.add(new_user)
|
|
db.commit()
|
|
|
|
return status.HTTP_201_CREATED
|
|
|
|
|
|
@router.get("/delete", summary="Delete a user - provided it's your own")
|
|
async def delete_user(
|
|
current_user: Annotated[User, Depends(get_current_user_from_token)],
|
|
db=Depends(get_db),
|
|
):
|
|
"""
|
|
Delete the user account associated with the current user
|
|
"""
|
|
user = db.query(UserModel).filter(UserModel.id == current_user.id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
db.delete(user)
|
|
db.commit()
|
|
return status.HTTP_204_NO_CONTENT
|
|
|
|
|
|
@router.put("/updatepass", summary="Update your account's password")
|
|
async def update_pass(
|
|
current_user: Annotated[User, Depends(get_current_user_from_token)],
|
|
update_data: UpdatePasswordSchema,
|
|
db=Depends(get_db),
|
|
):
|
|
"""
|
|
Update the pass of the current user account
|
|
"""
|
|
# Make sure the password meets all of the requirements
|
|
# if len(update_data.new_password) < 8:
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must be at least 8 characters",
|
|
# )
|
|
# if not any(char.isdigit() for char in update_data.new_password):
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must contain at least one digit",
|
|
# )
|
|
# if not any(char.isupper() for char in update_data.new_password):
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_400_BAD_REQUEST,
|
|
# detail="Password must contain at least one uppercase letter",
|
|
# )
|
|
# Get the user and update the password
|
|
user = db.query(UserModel).filter(UserModel.id == current_user.id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
user.hashed_password = bcrypt.hashpw(
|
|
update_data.new_password.encode("utf-8"), bcrypt.gensalt()
|
|
).decode("utf-8")
|
|
db.commit()
|
|
return status.HTTP_204_NO_CONTENT
|