From 5b92454760a8af14bd1031e72024946f868d1de6 Mon Sep 17 00:00:00 2001 From: Parker Date: Mon, 24 Jun 2024 16:24:09 -0500 Subject: Major overhaul + Bare bones web UI --- api/main.py | 59 ++++++++++++++ api/routes/links_route.py | 143 +++++++++++++++++++++++++++++++++ api/schemas/links_schemas.py | 5 ++ api/util/check_api_key.py | 21 +++++ api/util/db_dependency.py | 9 +++ api/util/validate_login_information.py | 20 +++++ 6 files changed, 257 insertions(+) create mode 100644 api/main.py create mode 100644 api/routes/links_route.py create mode 100644 api/schemas/links_schemas.py create mode 100644 api/util/check_api_key.py create mode 100644 api/util/db_dependency.py create mode 100644 api/util/validate_login_information.py (limited to 'api') diff --git a/api/main.py b/api/main.py new file mode 100644 index 0000000..c869bc0 --- /dev/null +++ b/api/main.py @@ -0,0 +1,59 @@ +from fastapi import FastAPI, Depends, HTTPException, Security +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import RedirectResponse +import string +import random + +from api.routes.links_route import router as links_router +from api.util.db_dependency import get_db +from api.util.check_api_key import check_api_key +from models import User + + +metadata_tags = [ + {"name": "links", "description": "Operations for managing links"}, +] + +app = FastAPI( + title="LinkLogger API", + version="1.0", + summary="Public API for a combined link shortener and IP logger", + license_info={ + "name": "The Unlicense", + "identifier": "Unlicense", + "url": "https://unlicense.org", + }, + openapi_tags=metadata_tags, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + allow_credentials=True, +) + +# Import routes +app.include_router(links_router) + +# Regenerate the API key for the user +@app.post("/regenerate") +async def login(api_key: str = Security(check_api_key), db = Depends(get_db)): + print(api_key['value']) + + user = db.query(User).filter(User.api_key == api_key['value']).first() + if not user: + raise HTTPException(status_code=401, detail="Invalid API key") + + # Generate a new API key + new_api_key = ''.join(random.choices(string.ascii_letters + string.digits, k=20)) + user.api_key = new_api_key + db.commit() + + return {"status": "success", "new_api_key": new_api_key} + +# Redirect /api -> /api/docs +@app.get("/") +async def redirect_to_docs(): + return RedirectResponse(url="/api/docs") \ No newline at end of file diff --git a/api/routes/links_route.py b/api/routes/links_route.py new file mode 100644 index 0000000..fd0a77a --- /dev/null +++ b/api/routes/links_route.py @@ -0,0 +1,143 @@ +from fastapi import APIRouter, status, Path, Depends, Security, Request +from fastapi.exception_handlers import HTTPException +from typing import Annotated +import string +import random +import datetime +import validators + +from api.util.db_dependency import get_db +from api.util.check_api_key import check_api_key +from models import Link, Record +from api.schemas.links_schemas import URLSchema + + +router = APIRouter(prefix="/links", tags=["links"]) + + +@router.get("/", summary="Get all of the links associated with your account") +async def get_links( + db=Depends(get_db), + api_key: str = Security(check_api_key), +): + links = db.query(Link).filter(Link.owner == api_key["owner"]).all() + if not links: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="No links found" + ) + return links + + +@router.post("/", summary="Create a new link") +async def create_link( + url: URLSchema, + db=Depends(get_db), + api_key: str = Security(check_api_key), +): + # Check if the URL is valid + if not validators.url(url.url): + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid URL" + ) + # Create the new link and add it to the database + while True: + try: + link_path = "".join( + random.choices(string.ascii_uppercase + "1234567890", k=5) + ) + new_link = Link( + link=link_path, + owner=api_key["owner"], + redirect_link=url.url, + expire_date=datetime.datetime.now() + datetime.timedelta(days=30), + ) + db.add(new_link) + db.commit() + break + except: + continue + + return { + "response": "Link successfully created", + "expire_date": new_link.expire_date, + "link": new_link.link, + } + + +@router.delete("/{link}", summary="Delete a link") +async def delete_link( + link: Annotated[str, Path(title="Link to delete")], + db=Depends(get_db), + api_key: str = Security(check_api_key), +): + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != api_key["owner"]: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and delete all records associated with the link + records = db.query(Record).filter(Record.link == link.link).all() + for record in records: + db.delete(record) + # Delete the link + db.delete(link) + db.commit() + + return {"response": "Link successfully deleted", "link": link.link} + + +@router.get("/{link}/records", summary="Get all of the IP log records associated with a link") +async def get_link_records( + link: Annotated[str, Path(title="Link to get records for")], + db=Depends(get_db), + api_key: str = Security(check_api_key), +): + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != api_key["owner"]: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get and return all of the records associated with the link + records = db.query(Record).filter(Record.link == link.link).all() + return records + + +@router.delete("/{link}/records", summary="Delete all of the IP log records associated with a link") +async def delete_link_records( + link: Annotated[str, Path(title="Link to delete records for")], + db=Depends(get_db), + api_key: str = Security(check_api_key), +): + # Get the link and check the owner + link = db.query(Link).filter(Link.link == link).first() + if not link: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Link not found" + ) + if link.owner != api_key["owner"]: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Link not associated with your account", + ) + + # Get all of the records associated with the link and delete them + records = db.query(Record).filter(Record.link == link.link).all() + for record in records: + db.delete(record) + db.commit() + + return {"response": "Records successfully deleted", "link": link.link} diff --git a/api/schemas/links_schemas.py b/api/schemas/links_schemas.py new file mode 100644 index 0000000..e2812fb --- /dev/null +++ b/api/schemas/links_schemas.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class URLSchema(BaseModel): + url: str diff --git a/api/util/check_api_key.py b/api/util/check_api_key.py new file mode 100644 index 0000000..9c4c22e --- /dev/null +++ b/api/util/check_api_key.py @@ -0,0 +1,21 @@ +from fastapi import Security, HTTPException, Depends, status +from fastapi.security import APIKeyHeader + +from models import User +from api.util.db_dependency import get_db + +""" +Make sure the provided API key is valid, then return the user's ID +""" +api_key_header = APIKeyHeader(name="X-API-Key") + + +def check_api_key( + api_key_header: str = Security(api_key_header), db=Depends(get_db) +) -> str: + response = db.query(User).filter(User.api_key == api_key_header).first() + if not response: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" + ) + return {"value": api_key_header, "owner": response.id} diff --git a/api/util/db_dependency.py b/api/util/db_dependency.py new file mode 100644 index 0000000..a6734ea --- /dev/null +++ b/api/util/db_dependency.py @@ -0,0 +1,9 @@ +from database import SessionLocal + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/api/util/validate_login_information.py b/api/util/validate_login_information.py new file mode 100644 index 0000000..55bbb2e --- /dev/null +++ b/api/util/validate_login_information.py @@ -0,0 +1,20 @@ +import bcrypt +from fastapi import Depends + +from api.util.db_dependency import get_db +from models import User + +""" +Validate the login information provided by the user +""" + + +def validate_login_information( + username: str, password: str, db=Depends(get_db) +) -> bool: + user = db.query(User).filter(User.username == username).first() + if not user: + return False + if bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")): + return True + return False -- cgit v1.2.3-70-g09d2