diff options
Diffstat (limited to 'api')
-rw-r--r-- | api/main.py | 112 | ||||
-rw-r--r-- | api/routes/links_route.py | 155 | ||||
-rw-r--r-- | api/schemas/auth_schemas.py | 20 | ||||
-rw-r--r-- | api/schemas/links_schemas.py | 5 | ||||
-rw-r--r-- | api/util/authentication.py | 111 | ||||
-rw-r--r-- | api/util/db_dependency.py | 9 |
6 files changed, 0 insertions, 412 deletions
diff --git a/api/main.py b/api/main.py deleted file mode 100644 index fbe8805..0000000 --- a/api/main.py +++ /dev/null @@ -1,112 +0,0 @@ -import random -from fastapi import FastAPI, Depends, HTTPException, status -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import RedirectResponse -from datetime import timedelta -from typing import Annotated -from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer - -from api.util.authentication import ( - authenticate_user, - create_access_token, - refresh_get_current_user, -) -from api.routes.links_route import router as links_router -from api.util.db_dependency import get_db -from api.schemas.auth_schemas import Token, User - - -metadata_tags = [ - {"name": "links", "description": "Operations for managing links"}, -] - -app = FastAPI( - title="LinkLogger API", - version="1.0", - summary="Public API for a combined link shortener and IP logger", - license_info={ - "name": "The Unlicense", - "identifier": "Unlicense", - "url": "https://unlicense.org", - }, - openapi_tags=metadata_tags, -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_methods=["*"], - allow_headers=["*"], - allow_credentials=True, -) - -secret_key = random.randbytes(32) -algorithm = "HS256" -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - -# Import routes -app.include_router(links_router) - - -""" -Authentication -""" - - -@app.post("/token") -async def login_for_access_token( - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], - db=Depends(get_db), -) -> Token: - """ - Return an access token for the user, if the given authentication details are correct - """ - user = authenticate_user(db, form_data.username, form_data.password) - if not user: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", - headers={"WWW-Authenticate": "Bearer"}, - ) - access_token_expires = timedelta(minutes=15) - access_token = create_access_token( - data={"sub": user.username, "refresh": False}, - expires_delta=access_token_expires, - ) - # Create a refresh token - just an access token with a longer expiry - # and more restrictions ("refresh" is True) - refresh_token_expires = timedelta(days=1) - refresh_token = create_access_token( - data={"sub": user.username, "refresh": True}, - expires_delta=refresh_token_expires, - ) - return Token( - access_token=access_token, - refresh_token=refresh_token, - token_type="bearer", - ) - - -# Full native JWT support is not complete in FastAPI yet :( -# Part of that is token refresh, so we must implement it ourselves -@app.post("/refresh") -async def refresh_access_token( - current_user: Annotated[User, Depends(refresh_get_current_user)], -) -> Token: - """ - Return a new access token if the refresh token is valid - """ - access_token_expires = timedelta(minutes=30) - access_token = create_access_token( - data={"sub": current_user.username}, expires_delta=access_token_expires - ) - return Token( - access_token=access_token, - token_type="bearer", - ) - - -# Redirect /api -> /api/docs -@app.get("/") -async def redirect_to_docs(): - return RedirectResponse(url="/api/docs") diff --git a/api/routes/links_route.py b/api/routes/links_route.py deleted file mode 100644 index 08e7690..0000000 --- a/api/routes/links_route.py +++ /dev/null @@ -1,155 +0,0 @@ -from fastapi import APIRouter, status, Path, Depends, Security, Request -from fastapi.exception_handlers import HTTPException -from typing import Annotated -import string -import random -import datetime -import validators - -from api.util.db_dependency import get_db -from models import Link, Record -from api.schemas.links_schemas import URLSchema -from api.schemas.auth_schemas import User -from api.util.authentication import get_current_user - - -router = APIRouter(prefix="/links", tags=["links"]) - - -@router.get("/", summary="Get all of the links associated with your account") -async def get_links( - current_user: Annotated[User, Depends(get_current_user)], - db=Depends(get_db), -): - links = db.query(Link).filter(Link.owner == current_user.id).all() - if not links: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="No links found" - ) - return links - - -@router.post("/", summary="Create a new link") -async def create_link( - url: URLSchema, - current_user: Annotated[User, Depends(get_current_user)], - db=Depends(get_db), -): - # Check if the URL is valid - if not validators.url(url.url): - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Invalid URL", - ) - # Create the new link and add it to the database - while True: - try: - link_path = "".join( - random.choices(string.ascii_uppercase + "1234567890", k=5) - ).upper() - new_link = Link( - link=link_path, - owner=current_user.id, - redirect_link=url.url, - expire_date=datetime.datetime.now() - + datetime.timedelta(days=30), - ) - db.add(new_link) - db.commit() - break - except: - continue - - return { - "response": "Link successfully created", - "expire_date": new_link.expire_date, - "link": new_link.link, - } - - -@router.delete("/{link}", summary="Delete a link") -async def delete_link( - link: Annotated[str, Path(title="Link to delete")], - current_user: Annotated[User, Depends(get_current_user)], - db=Depends(get_db), -): - link = link.upper() - # Get the link and check the owner - link = db.query(Link).filter(Link.link == link).first() - if not link: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" - ) - if link.owner != current_user.id: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Link not associated with your account", - ) - - # Get and delete all records associated with the link - records = db.query(Record).filter(Record.link == link.link).all() - for record in records: - db.delete(record) - # Delete the link - db.delete(link) - db.commit() - - return {"response": "Link successfully deleted", "link": link.link} - - -@router.get( - "/{link}/records", - summary="Get all of the IP log records associated with a link", -) -async def get_link_records( - link: Annotated[str, Path(title="Link to get records for")], - current_user: Annotated[User, Depends(get_current_user)], - db=Depends(get_db), -): - link = link.upper() - # Get the link and check the owner - link = db.query(Link).filter(Link.link == link).first() - if not link: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" - ) - if link.owner != current_user.id: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Link not associated with your account", - ) - - # Get and return all of the records associated with the link - records = db.query(Record).filter(Record.link == link.link).all() - return records - - -@router.delete( - "/{link}/records", - summary="Delete all of the IP log records associated with a link", -) -async def delete_link_records( - link: Annotated[str, Path(title="Link to delete records for")], - current_user: Annotated[User, Depends(get_current_user)], - db=Depends(get_db), -): - link = link.upper() - # Get the link and check the owner - link = db.query(Link).filter(Link.link == link).first() - if not link: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" - ) - if link.owner != current_user.id: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Link not associated with your account", - ) - - # Get all of the records associated with the link and delete them - records = db.query(Record).filter(Record.link == link.link).all() - for record in records: - db.delete(record) - db.commit() - - return {"response": "Records successfully deleted", "link": link.link} diff --git a/api/schemas/auth_schemas.py b/api/schemas/auth_schemas.py deleted file mode 100644 index 006a7c8..0000000 --- a/api/schemas/auth_schemas.py +++ /dev/null @@ -1,20 +0,0 @@ -from pydantic import BaseModel - - -class Token(BaseModel): - access_token: str - refresh_token: str | None = None - token_type: str - - -class TokenData(BaseModel): - username: str | None = None - - -class User(BaseModel): - username: str - id: int - - -class UserInDB(User): - hashed_password: str diff --git a/api/schemas/links_schemas.py b/api/schemas/links_schemas.py deleted file mode 100644 index e2812fb..0000000 --- a/api/schemas/links_schemas.py +++ /dev/null @@ -1,5 +0,0 @@ -from pydantic import BaseModel - - -class URLSchema(BaseModel): - url: str diff --git a/api/util/authentication.py b/api/util/authentication.py deleted file mode 100644 index b8ac6a6..0000000 --- a/api/util/authentication.py +++ /dev/null @@ -1,111 +0,0 @@ -import random -import bcrypt -from fastapi import Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer -from jwt.exceptions import InvalidTokenError -from datetime import datetime, timedelta -from typing import Annotated, Optional -import jwt - -from api.util.db_dependency import get_db -from api.schemas.auth_schemas import * -from models import User as UserDB - -secret_key = random.randbytes(32) -algorithm = "HS256" -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - -""" -Helper functions for authentication -""" - - -def verify_password(plain_password, hashed_password): - return bcrypt.checkpw( - plain_password.encode("utf-8"), hashed_password.encode("utf-8") - ) - - -def get_user(db, username: str): - """ - Get the user object from the database - """ - user = db.query(UserDB).filter(UserDB.username == username).first() - if user: - return UserInDB(**user.__dict__) - - -def authenticate_user(db, username: str, password: str): - """ - Determine if the correct username and password were provided - If so, return the user object - """ - user = get_user(db, username) - if not user: - return False - if not verify_password(password, user.hashed_password): - return False - return user - - -def create_access_token(data: dict, expires_delta: timedelta): - """ - Return an encoded JWT token with the given data - """ - to_encode = data.copy() - expire = datetime.utcnow() + expires_delta - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) - return encoded_jwt - - -# Backwards kinda of way to get refresh token support -# 'refresh_get_current_user' is only called from /refresh -# and alerts 'current_user' that it should expect a refresh token -async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): - user = await current_user(token) - return user - - -async def refresh_get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], -): - user = await current_user(token, is_refresh=True) - return user - - -async def current_user( - token: str, - is_refresh: bool = False, - db=Depends(get_db), -): - """ - Return the current user based on the token, or raise a 401 error - """ - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - try: - payload = jwt.decode(token, secret_key, algorithms=[algorithm]) - username: str = payload.get("sub") - refresh: bool = payload.get("refresh") - if username is None: - raise credentials_exception - # For some reason, an access token was passed when a refresh - # token was expected - some likely malicious activity - if not refresh and is_refresh: - raise credentials_exception - # If the token passed is a refresh token and the function - # is not expecting a refresh token, raise an error - if refresh and not is_refresh: - raise credentials_exception - - token_data = TokenData(username=username) - except InvalidTokenError: - raise credentials_exception - user = get_user(db, username=token_data.username) - if user is None: - raise credentials_exception - return user diff --git a/api/util/db_dependency.py b/api/util/db_dependency.py deleted file mode 100644 index a6734ea..0000000 --- a/api/util/db_dependency.py +++ /dev/null @@ -1,9 +0,0 @@ -from database import SessionLocal - - -def get_db(): - db = SessionLocal() - try: - yield db - finally: - db.close() |