diff options
Diffstat (limited to 'api/routes')
-rw-r--r-- | api/routes/auth_routes.py | 46 | ||||
-rw-r--r-- | api/routes/links_routes.py | 172 | ||||
-rw-r--r-- | api/routes/user_routes.py | 129 |
3 files changed, 347 insertions, 0 deletions
diff --git a/api/routes/auth_routes.py b/api/routes/auth_routes.py new file mode 100644 index 0000000..c51557f --- /dev/null +++ b/api/routes/auth_routes.py @@ -0,0 +1,46 @@ +from fastapi import Depends, APIRouter, status, HTTPException +from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import Response, JSONResponse +from datetime import timedelta +from typing import Annotated + +from api.util.authentication import ( + create_access_token, + authenticate_user, +) +from api.util.db_dependency import get_db + + +router = APIRouter(prefix="/auth", tags=["auth"]) + + +@router.post("/token", summary="Authenticate and get an access token") +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + response: Response, + db=Depends(get_db), +): + """ + Return an access token for the user, if the given authentication details are correct + """ + user = authenticate_user(db, form_data.username, form_data.password) + + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=1) + access_token = create_access_token( + data={"sub": user.id, "username": user.username}, + expires_delta=access_token_expires, + ) + response = JSONResponse(content={"success": True}) + response.set_cookie( + key="access_token", + value=access_token, + httponly=True, # Prevents client-side access + # secure=True, # Cookies are only sent over HTTPS + ) + return response diff --git a/api/routes/links_routes.py b/api/routes/links_routes.py new file mode 100644 index 0000000..5ed565b --- /dev/null +++ b/api/routes/links_routes.py @@ -0,0 +1,172 @@ +from fastapi import APIRouter, status, Path, Depends +from fastapi.exception_handlers import HTTPException +from typing import Annotated +import string +import random +import datetime +import validators + +from api.util.db_dependency import get_db +from models import Link, Log +from api.schemas.links_schemas import URLSchema +from api.schemas.auth_schemas import User +from api.util.authentication import get_current_user + + +router = APIRouter(prefix="/links", tags=["links"]) + + +@router.get("", summary="Get all of the links associated with your account") +async def get_links( + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + # Get and sort links by expiration date descending + links = ( + db.query(Link) + .filter(Link.owner == current_user.id) + .order_by(Link.expire_date.desc()) + .all() + ) + if not links: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No links found" + ) + return links + + +@router.post("", summary="Create a new link") +async def create_link( + url: URLSchema, + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + # Check if the URL is valid + if not validators.url(url.url): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Invalid URL", + ) + # Create the new link and add it to the database + while True: + try: + link_path = "".join( + random.choices(string.ascii_uppercase + "1234567890", k=5) + ).upper() + new_link = Link( + link=link_path, + owner=current_user.id, + redirect_link=url.url, + expire_date=datetime.datetime.utcnow() + + datetime.timedelta(days=30), + ) + db.add(new_link) + db.commit() + break + except: + continue + + return {"link": link_path, "expire_date": new_link.expire_date} + + +@router.delete("/{link}", summary="Delete a link and all associated logs") +async def delete_link( + link: Annotated[str, Path(title="Link to delete")], + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + """ + Delete a link and all of the logs associated with it + """ + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and delete all logsk + logs = db.query(Log).filter(Log.link == link.link).all() + for log in logs: + db.delete(log) + # Delete the link + db.delete(link) + db.commit() + + return status.HTTP_204_NO_CONTENT + + +@router.get("/{link}/logs", summary="Get all logs associated with a link") +async def get_link_logs( + link: Annotated[str, Path(title="Link to get logs for")], + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + """ + Get all of the IP logs associated with a link + """ + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and return all of the logs - ordered by timestamp + logs = ( + db.query(Log) + .filter(Log.link == link.link) + .order_by(Log.timestamp.desc()) + .all() + ) + return logs + + +@router.delete( + "/{link}/logs/{log_id}", + summary="Delete a specific log associated with a link", +) +async def delete_single_log( + link: Annotated[str, Path(title="Link associated with the log to delete")], + log_id: Annotated[int, Path(title="Log ID to delete")], + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + """ + Delete the specified log associated with a link + """ + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get the log and delete it + log = db.query(Log).filter(Log.id == log_id).first() + if not log: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Log not found" + ) + db.delete(log) + db.commit() + + return status.HTTP_204_NO_CONTENT diff --git a/api/routes/user_routes.py b/api/routes/user_routes.py new file mode 100644 index 0000000..cf9be52 --- /dev/null +++ b/api/routes/user_routes.py @@ -0,0 +1,129 @@ +from fastapi import APIRouter, status, Path, Depends +from fastapi.exception_handlers import HTTPException +from fastapi.security import OAuth2PasswordRequestForm +from typing import Annotated +import string +import bcrypt +import random + +from api.util.db_dependency import get_db +from api.util.check_password_reqs import check_password_reqs +from api.schemas.auth_schemas import User +from api.schemas.user_schemas import * +from models import User as UserModel +from api.util.authentication import ( + verify_password, + get_current_user, +) + + +router = APIRouter(prefix="/users", tags=["users"]) + + +@router.delete("/{user_id}", summary="Delete your account") +async def delete_user( + user_id: Annotated[int, Path(title="Link to delete")], + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + """ + Delete the user account associated with the current user + """ + # No editing others accounts + if user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only delete your own account", + ) + + # Get the user and delete them + user = db.query(UserModel).filter(UserModel.id == current_user.id).first() + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + + db.delete(user) + db.commit() + return status.HTTP_204_NO_CONTENT + + +@router.post("/{user_id}/password", summary="Update your account password") +async def update_pass( + user_id: Annotated[int, Path(title="Link to update")], + update_data: UpdatePasswordSchema, + current_user: Annotated[User, Depends(get_current_user)], + db=Depends(get_db), +): + """ + Update the pass of the current user account + """ + if user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only update your own account", + ) + + # Make sure that they entered the correct current password + if not verify_password( + update_data.current_password, current_user.hashed_password + ): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect current password", + ) + + # Make sure the password meets all of the requirements + check_password_reqs(update_data.new_password) + + # Get the user and update the password + user = db.query(UserModel).filter(UserModel.id == current_user.id).first() + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found", + ) + user.hashed_password = bcrypt.hashpw( + update_data.new_password.encode("utf-8"), bcrypt.gensalt() + ).decode("utf-8") + db.commit() + return status.HTTP_204_NO_CONTENT + + +@router.post("/register", summary="Register a new user") +async def get_links( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + db=Depends(get_db), +): + """ + Given the login data (username, password) process the registration of a new + user account and return either the user or an error message + """ + username = form_data.username + password = form_data.password + + # Make sure the password meets all of the requirements + check_password_reqs(password) + + # Make sure the username isn't taken + user = db.query(UserModel).filter(UserModel.username == username).first() + if user: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Username not available", + ) + # Otherwise, hash the password, create the api key, and add the new user + hashed_password = bcrypt.hashpw( + password.encode("utf-8"), bcrypt.gensalt() + ).decode("utf-8") + api_key = "".join( + random.choices(string.ascii_letters + string.digits, k=20) + ) + new_user = UserModel( + username=username, hashed_password=hashed_password, api_key=api_key + ) + db.add(new_user) + db.commit() + + return status.HTTP_201_CREATED |