aboutsummaryrefslogtreecommitdiff
path: root/app/routes
diff options
context:
space:
mode:
Diffstat (limited to 'app/routes')
-rw-r--r--app/routes/links_route.py155
-rw-r--r--app/routes/refresh_route.py33
-rw-r--r--app/routes/token_route.py54
3 files changed, 242 insertions, 0 deletions
diff --git a/app/routes/links_route.py b/app/routes/links_route.py
new file mode 100644
index 0000000..054508a
--- /dev/null
+++ b/app/routes/links_route.py
@@ -0,0 +1,155 @@
+from fastapi import APIRouter, status, Path, Depends
+from fastapi.exception_handlers import HTTPException
+from typing import Annotated
+import string
+import random
+import datetime
+import validators
+
+from app.util.db_dependency import get_db
+from models import Link, Record
+from app.schemas.links_schemas import URLSchema
+from app.schemas.auth_schemas import User
+from app.util.authentication import get_current_user_from_token
+
+
+router = APIRouter(prefix="/links", tags=["links"])
+
+
+@router.get("/", summary="Get all of the links associated with your account")
+async def get_links(
+ current_user: Annotated[User, Depends(get_current_user_from_token)],
+ db=Depends(get_db),
+):
+ links = db.query(Link).filter(Link.owner == current_user.id).all()
+ if not links:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND, detail="No links found"
+ )
+ return links
+
+
+@router.post("/", summary="Create a new link")
+async def create_link(
+ url: URLSchema,
+ current_user: Annotated[User, Depends(get_current_user_from_token)],
+ db=Depends(get_db),
+):
+ # Check if the URL is valid
+ if not validators.url(url.url):
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail="Invalid URL",
+ )
+ # Create the new link and add it to the database
+ while True:
+ try:
+ link_path = "".join(
+ random.choices(string.ascii_uppercase + "1234567890", k=5)
+ ).upper()
+ new_link = Link(
+ link=link_path,
+ owner=current_user.id,
+ redirect_link=url.url,
+ expire_date=datetime.datetime.now()
+ + datetime.timedelta(days=30),
+ )
+ db.add(new_link)
+ db.commit()
+ break
+ except:
+ continue
+
+ return {
+ "response": "Link successfully created",
+ "expire_date": new_link.expire_date,
+ "link": new_link.link,
+ }
+
+
+@router.delete("/{link}", summary="Delete a link")
+async def delete_link(
+ link: Annotated[str, Path(title="Link to delete")],
+ current_user: Annotated[User, Depends(get_current_user_from_token)],
+ db=Depends(get_db),
+):
+ link = link.upper()
+ # Get the link and check the owner
+ link = db.query(Link).filter(Link.link == link).first()
+ if not link:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
+ )
+ if link.owner != current_user.id:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Link not associated with your account",
+ )
+
+ # Get and delete all records associated with the link
+ records = db.query(Record).filter(Record.link == link.link).all()
+ for record in records:
+ db.delete(record)
+ # Delete the link
+ db.delete(link)
+ db.commit()
+
+ return {"response": "Link successfully deleted", "link": link.link}
+
+
+@router.get(
+ "/{link}/records",
+ summary="Get all of the IP log records associated with a link",
+)
+async def get_link_records(
+ link: Annotated[str, Path(title="Link to get records for")],
+ current_user: Annotated[User, Depends(get_current_user_from_token)],
+ db=Depends(get_db),
+):
+ link = link.upper()
+ # Get the link and check the owner
+ link = db.query(Link).filter(Link.link == link).first()
+ if not link:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
+ )
+ if link.owner != current_user.id:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Link not associated with your account",
+ )
+
+ # Get and return all of the records associated with the link
+ records = db.query(Record).filter(Record.link == link.link).all()
+ return records
+
+
+@router.delete(
+ "/{link}/records",
+ summary="Delete all of the IP log records associated with a link",
+)
+async def delete_link_records(
+ link: Annotated[str, Path(title="Link to delete records for")],
+ current_user: Annotated[User, Depends(get_current_user_from_token)],
+ db=Depends(get_db),
+):
+ link = link.upper()
+ # Get the link and check the owner
+ link = db.query(Link).filter(Link.link == link).first()
+ if not link:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
+ )
+ if link.owner != current_user.id:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Link not associated with your account",
+ )
+
+ # Get all of the records associated with the link and delete them
+ records = db.query(Record).filter(Record.link == link.link).all()
+ for record in records:
+ db.delete(record)
+ db.commit()
+
+ return {"response": "Records successfully deleted", "link": link.link}
diff --git a/app/routes/refresh_route.py b/app/routes/refresh_route.py
new file mode 100644
index 0000000..6bc8797
--- /dev/null
+++ b/app/routes/refresh_route.py
@@ -0,0 +1,33 @@
+from fastapi import Depends, APIRouter
+from fastapi.responses import RedirectResponse
+from datetime import timedelta
+from typing import Annotated
+
+from app.util.authentication import (
+ create_access_token,
+ refresh_get_current_user,
+)
+from app.schemas.auth_schemas import Token, User
+
+
+router = APIRouter(prefix="/refresh", tags=["refresh"])
+
+
+# Full native JWT support is not complete in FastAPI yet :(
+# Part of that is token refresh, so we must implement it ourselves
+@router.post("/")
+async def refresh_access_token(
+ current_user: Annotated[User, Depends(refresh_get_current_user)],
+) -> Token:
+ """
+ Return a new access token if the refresh token is valid
+ """
+ access_token_expires = timedelta(minutes=30)
+ access_token = create_access_token(
+ data={"sub": current_user.username, "refresh": False},
+ expires_delta=access_token_expires,
+ )
+ return Token(
+ access_token=access_token,
+ token_type="bearer",
+ )
diff --git a/app/routes/token_route.py b/app/routes/token_route.py
new file mode 100644
index 0000000..8000616
--- /dev/null
+++ b/app/routes/token_route.py
@@ -0,0 +1,54 @@
+from fastapi import APIRouter, status, Depends, HTTPException
+from fastapi.responses import JSONResponse, Response
+from typing import Annotated
+from datetime import timedelta
+from typing import Annotated
+from fastapi.security import OAuth2PasswordRequestForm
+
+from app.util.db_dependency import get_db
+from app.util.authentication import (
+ authenticate_user,
+ create_access_token,
+)
+from app.schemas.auth_schemas import Token
+
+
+router = APIRouter(prefix="/token", tags=["token"])
+
+
+@router.post("/")
+async def login_for_access_token(
+ form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
+ response: Response,
+ db=Depends(get_db),
+) -> Token:
+ """
+ Return an access token for the user, if the given authentication details are correct
+ """
+ user = authenticate_user(db, form_data.username, form_data.password)
+ if not user:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Incorrect username or password",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+ access_token_expires = timedelta(minutes=15)
+ access_token = create_access_token(
+ data={"sub": user.username, "refresh": False},
+ expires_delta=access_token_expires,
+ )
+ # Create a refresh token - just an access token with a longer expiry
+ # and more restrictions ("refresh" is True)
+ refresh_token_expires = timedelta(days=1)
+ refresh_token = create_access_token(
+ data={"sub": user.username, "refresh": True},
+ expires_delta=refresh_token_expires,
+ )
+ response = JSONResponse(content={"success": True})
+ response.set_cookie(
+ key="access_token", value=access_token, httponly=True, samesite="lax"
+ )
+ response.set_cookie(
+ key="refresh_token", value=refresh_token, httponly=True, samesite="lax"
+ )
+ return response