diff options
author | Parker <contact@pkrm.dev> | 2024-11-10 16:36:16 -0600 |
---|---|---|
committer | Parker <contact@pkrm.dev> | 2024-11-10 16:36:16 -0600 |
commit | 691aa744a0398f185b3ca98a36fbd83806c7786c (patch) | |
tree | 7840f31c30bb6eda903abd6bbf4dbfb2ac590966 /api/util | |
parent | 8941213c8d94f3ad84e07e467e78105dc7fed734 (diff) |
TOO MUCH STUFF
Diffstat (limited to 'api/util')
-rw-r--r-- | api/util/authentication.py | 97 | ||||
-rw-r--r-- | api/util/check_password_reqs.py | 26 | ||||
-rw-r--r-- | api/util/db_dependency.py | 9 | ||||
-rw-r--r-- | api/util/log.py | 84 |
4 files changed, 216 insertions, 0 deletions
diff --git a/api/util/authentication.py b/api/util/authentication.py new file mode 100644 index 0000000..43b147a --- /dev/null +++ b/api/util/authentication.py @@ -0,0 +1,97 @@ +import random +import bcrypt +from fastapi import Depends, HTTPException, status, Request, Cookie +from fastapi.security import OAuth2PasswordBearer +from fastapi.responses import Response +from jwt.exceptions import InvalidTokenError, ExpiredSignatureError +from datetime import datetime, timedelta +import jwt + +from api.util.db_dependency import get_db +from sqlalchemy.orm import Session +from api.schemas.auth_schemas import * +from models import User as UserModel + +secret_key = random.randbytes(32) +algorithm = "HS256" +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/token") + +""" +Helper functions for authentication +""" + + +def verify_password(plain_password, hashed_password): + return bcrypt.checkpw( + plain_password.encode("utf-8"), hashed_password.encode("utf-8") + ) + + +def get_user(db, username: str): + """ + Get the user object from the database + """ + user = db.query(UserModel).filter(UserModel.username == username).first() + if user: + return UserInDB(**user.__dict__) + + +def authenticate_user(db, username: str, password: str): + """ + Determine if the correct username and password were provided + If so, return the user object + """ + user = get_user(db, username) + if not user: + return False + if not verify_password(password, user.hashed_password): + print("WHY") + return False + return user + + +def create_access_token(data: dict, expires_delta: timedelta): + """ + Return an encoded JWT token with the given data + """ + to_encode = data.copy() + expire = datetime.utcnow() + expires_delta + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) + return encoded_jwt + + +def raise_unauthorized(): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + + +async def get_current_user( + request: Request, + db=Depends(get_db), +): + """ + Return the current user object if the access token is valid + + All failed attempts will return a 401 + """ + token = request.cookies.get("access_token") + + try: + payload = jwt.decode(token, secret_key, algorithms=[algorithm]) + id: int = payload.get("sub") + username: str = payload.get("username") + if not id or not username: + return raise_unauthorized() + + except InvalidTokenError: + return raise_unauthorized() + + user = get_user(db, username) + if user is None: + return raise_unauthorized() + + return user diff --git a/api/util/check_password_reqs.py b/api/util/check_password_reqs.py new file mode 100644 index 0000000..dcb9bf8 --- /dev/null +++ b/api/util/check_password_reqs.py @@ -0,0 +1,26 @@ +from fastapi import HTTPException, status + + +def check_password_reqs(password: str): + """ + Make sure the entered password meets the security requirements: + 1. At least 8 characters + 2. At least one digit + 3. At least one uppercase letter + """ + if len(password) < 8: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Password must be at least 8 characters", + ) + if not any(char.isdigit() for char in password): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Password must contain at least one digit", + ) + if not any(char.isupper() for char in password): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Password must contain at least one uppercase letter", + ) + return diff --git a/api/util/db_dependency.py b/api/util/db_dependency.py new file mode 100644 index 0000000..a6734ea --- /dev/null +++ b/api/util/db_dependency.py @@ -0,0 +1,9 @@ +from database import SessionLocal + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/api/util/log.py b/api/util/log.py new file mode 100644 index 0000000..58a56f9 --- /dev/null +++ b/api/util/log.py @@ -0,0 +1,84 @@ +import requests +import datetime +from ua_parser import user_agent_parser + +from database import SessionLocal +import config +from models import Link, Log + +""" +Create a new log whenever a link is visited +""" + + +def ip_to_location(ip): + if not config.IP_TO_LOCATION: + return "-, -", "-" + + url = f"https://api.ip2location.io/?key={config.API_KEY}&ip={ip}" + response = requests.get(url) + data = response.json() + + if response.status_code != 200: + config.LOG.error( + "Error with IP2Location API. Perhaps the API is down." + ) + return "-, -", "-" + + if "error" in data: + config.LOG.error( + "Error with IP2Location API. Likely wrong API key or insufficient" + " funds." + ) + return "-, -", "-" + + location = "" + # Sometimes a certain name may not be present, so always check + if "city_name" in data: + location += data["city_name"] + + if "region_name" in data: + location += f', {data["region_name"]}' + + if "country_name" in data: + location += f', {data["country_name"]}' + + isp = data["as"] + return location, isp + + +def log(link, ip, user_agent): + db = SessionLocal() + + # Get the redirect link and owner of the link + redirect_link, owner = ( + db.query(Link.redirect_link, Link.owner) + .filter(Link.link == link) + .first() + ) + + # Get the location and ISP of the user + location, isp = ip_to_location(ip) + + ua_string = user_agent_parser.Parse(user_agent) + browser = ua_string["user_agent"]["family"] + os = f'{ua_string["os"]["family"]} {ua_string["os"]["major"]}' + + # Create the log and commit it to the database + new_log = Log( + owner=owner, + link=link, + timestamp=datetime.datetime.utcnow(), + ip=ip, + location=location, + browser=browser, + os=os, + user_agent=user_agent, + isp=isp, + ) + db.add(new_log) + db.commit() + db.close() + + # Return the redirect link in order to properly redirect the user + return redirect_link |