diff options
Diffstat (limited to 'app/routes')
-rw-r--r-- | app/routes/links_route.py | 155 | ||||
-rw-r--r-- | app/routes/refresh_route.py | 33 | ||||
-rw-r--r-- | app/routes/token_route.py | 54 |
3 files changed, 242 insertions, 0 deletions
diff --git a/app/routes/links_route.py b/app/routes/links_route.py new file mode 100644 index 0000000..054508a --- /dev/null +++ b/app/routes/links_route.py @@ -0,0 +1,155 @@ +from fastapi import APIRouter, status, Path, Depends +from fastapi.exception_handlers import HTTPException +from typing import Annotated +import string +import random +import datetime +import validators + +from app.util.db_dependency import get_db +from models import Link, Record +from app.schemas.links_schemas import URLSchema +from app.schemas.auth_schemas import User +from app.util.authentication import get_current_user_from_token + + +router = APIRouter(prefix="/links", tags=["links"]) + + +@router.get("/", summary="Get all of the links associated with your account") +async def get_links( + current_user: Annotated[User, Depends(get_current_user_from_token)], + db=Depends(get_db), +): + links = db.query(Link).filter(Link.owner == current_user.id).all() + if not links: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No links found" + ) + return links + + +@router.post("/", summary="Create a new link") +async def create_link( + url: URLSchema, + current_user: Annotated[User, Depends(get_current_user_from_token)], + db=Depends(get_db), +): + # Check if the URL is valid + if not validators.url(url.url): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Invalid URL", + ) + # Create the new link and add it to the database + while True: + try: + link_path = "".join( + random.choices(string.ascii_uppercase + "1234567890", k=5) + ).upper() + new_link = Link( + link=link_path, + owner=current_user.id, + redirect_link=url.url, + expire_date=datetime.datetime.now() + + datetime.timedelta(days=30), + ) + db.add(new_link) + db.commit() + break + except: + continue + + return { + "response": "Link successfully created", + "expire_date": new_link.expire_date, + "link": new_link.link, + } + + +@router.delete("/{link}", summary="Delete a link") +async def delete_link( + link: Annotated[str, Path(title="Link to delete")], + current_user: Annotated[User, Depends(get_current_user_from_token)], + db=Depends(get_db), +): + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and delete all records associated with the link + records = db.query(Record).filter(Record.link == link.link).all() + for record in records: + db.delete(record) + # Delete the link + db.delete(link) + db.commit() + + return {"response": "Link successfully deleted", "link": link.link} + + +@router.get( + "/{link}/records", + summary="Get all of the IP log records associated with a link", +) +async def get_link_records( + link: Annotated[str, Path(title="Link to get records for")], + current_user: Annotated[User, Depends(get_current_user_from_token)], + db=Depends(get_db), +): + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and return all of the records associated with the link + records = db.query(Record).filter(Record.link == link.link).all() + return records + + +@router.delete( + "/{link}/records", + summary="Delete all of the IP log records associated with a link", +) +async def delete_link_records( + link: Annotated[str, Path(title="Link to delete records for")], + current_user: Annotated[User, Depends(get_current_user_from_token)], + db=Depends(get_db), +): + link = link.upper() + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != current_user.id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get all of the records associated with the link and delete them + records = db.query(Record).filter(Record.link == link.link).all() + for record in records: + db.delete(record) + db.commit() + + return {"response": "Records successfully deleted", "link": link.link} diff --git a/app/routes/refresh_route.py b/app/routes/refresh_route.py new file mode 100644 index 0000000..6bc8797 --- /dev/null +++ b/app/routes/refresh_route.py @@ -0,0 +1,33 @@ +from fastapi import Depends, APIRouter +from fastapi.responses import RedirectResponse +from datetime import timedelta +from typing import Annotated + +from app.util.authentication import ( + create_access_token, + refresh_get_current_user, +) +from app.schemas.auth_schemas import Token, User + + +router = APIRouter(prefix="/refresh", tags=["refresh"]) + + +# Full native JWT support is not complete in FastAPI yet :( +# Part of that is token refresh, so we must implement it ourselves +@router.post("/") +async def refresh_access_token( + current_user: Annotated[User, Depends(refresh_get_current_user)], +) -> Token: + """ + Return a new access token if the refresh token is valid + """ + access_token_expires = timedelta(minutes=30) + access_token = create_access_token( + data={"sub": current_user.username, "refresh": False}, + expires_delta=access_token_expires, + ) + return Token( + access_token=access_token, + token_type="bearer", + ) diff --git a/app/routes/token_route.py b/app/routes/token_route.py new file mode 100644 index 0000000..8000616 --- /dev/null +++ b/app/routes/token_route.py @@ -0,0 +1,54 @@ +from fastapi import APIRouter, status, Depends, HTTPException +from fastapi.responses import JSONResponse, Response +from typing import Annotated +from datetime import timedelta +from typing import Annotated +from fastapi.security import OAuth2PasswordRequestForm + +from app.util.db_dependency import get_db +from app.util.authentication import ( + authenticate_user, + create_access_token, +) +from app.schemas.auth_schemas import Token + + +router = APIRouter(prefix="/token", tags=["token"]) + + +@router.post("/") +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + response: Response, + db=Depends(get_db), +) -> Token: + """ + Return an access token for the user, if the given authentication details are correct + """ + user = authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=15) + access_token = create_access_token( + data={"sub": user.username, "refresh": False}, + expires_delta=access_token_expires, + ) + # Create a refresh token - just an access token with a longer expiry + # and more restrictions ("refresh" is True) + refresh_token_expires = timedelta(days=1) + refresh_token = create_access_token( + data={"sub": user.username, "refresh": True}, + expires_delta=refresh_token_expires, + ) + response = JSONResponse(content={"success": True}) + response.set_cookie( + key="access_token", value=access_token, httponly=True, samesite="lax" + ) + response.set_cookie( + key="refresh_token", value=refresh_token, httponly=True, samesite="lax" + ) + return response |