Major overhaul + Bare bones web UI

This commit is contained in:
Parker M. 2024-06-24 16:24:09 -05:00
parent 80a39d38bf
commit 5b92454760
No known key found for this signature in database
GPG Key ID: 95CD2E0C7E329F2A
34 changed files with 1097 additions and 541 deletions

View File

@ -1,4 +0,0 @@
BASE_URL="" # Redirect link for when people visit old/dead/non-existant link
IP_TO_LOCATION="" # "True" or "False" - whether or not you need the IP to location feature,
# If not needed, you can leave the API_KEY blank
API_KEY="" # API Key for IP2Location.io

View File

@ -10,4 +10,4 @@ COPY . .
RUN pip install -r requirements.txt
ENTRYPOINT [ "python" ]
CMD [ "-u", "app/linklogger.py" ]
CMD [ "-u", "linklogger.py" ]

View File

@ -1,38 +1,56 @@
<h1 align="center">
LinkLogger
</h1>
# LinkLogger API
<h3 align="center">
Link shortener and IP logger
</h3>
A simple API for you to create redirect links on my domain (link.pkrm.dev) and log all IPs that click on the link. Essentially just grabify with no GUI.
<p align="center">
<a href="https://github.com/psf/black">
<img src="https://img.shields.io/badge/code%20style-black-000000.svg" alt="Code Style: Black">
</a>
<a href="https://makeapullrequest.com">
<img src="https://img.shields.io/badge/PRs-welcome-brightgreen.svg">
</a>
</p>
Feel free to submit an issue for any problems you experience or if you have an idea for a new feature. If you have a fix for anything, please submit a pull request for review.
# Overview
### Create an account now at [link.pkrm.dev](https://link.pkrm.dev/signup)
## Want to self-host?
<br>
#### Bare metal
Feel free to fork this code and run it yourself, simply install the dependencies, create your `.env` file and run the `linklogger.py` file.
LinkLogger is simple and public API to create redirect links and log IPs. Every visit to a registered short link will log the users IP address, location, user agent, browser, and OS before redirecting them to a specific URL.
Just like Grabify, but unrestricted and with no real web UI.
Feel free to submit an issue for any problems you experience or if you have an idea for a new feature. If you have a fix for anything, feel free to submit a pull request for review.
# API Reference
View the API reference and try out the endpoints at the [docs page](https://link.pkrm.dev/api/docs)
# Want to self-host?
## Bare metal
To run LinkLogger on bare metal, follow the steps below.
*NOTE: For information on each configuration variable, look at the `Configuration` section of this page.
1. Install Python and Pip
2. Clone this repository
3. Install the requirements with `pip install -r requirements.txt`
4. Run the `linklogger.py` file
5. Input information into the newly created `config.ini` file.
6. Re-run the `linklogger.py` file.
## Docker
To run LinkLogger in Docker, use the [docker-compose.yaml](/docker-compose.yaml) as a template for the contianer.
## Config
Below are all of the configuration variables that are used within LinkLogger.
#### Docker
Use the docker-compose below as an example of running LinkLogger in docker.
```yaml
version: '3.3'
services:
linklogger:
container_name: linklogger
image: ghcr.io/packetparker/linklogger:latest
network_mode: host
environment:
- BASE_URL=https://your.domain
- IP_TO_LOCATION=True
- API_KEY=Your Key
volumes:
- /local/file/path:/data
restart: unless-stopped
```
Variable | Description | Requirement
---|---|---
BASE_URL | Redirect link for when people visit old/dead/non-existant link | **Required**
IP_TO_LOCATION | "True"/"False" Whether or not you want to IP to Location feature (requires IP2Location.io account) | **Required**
API_KEY | IP2Location.io API Key | **Required** *unless IP_TO_LOCATION is "False"*
## API Reference
#### View the API reference and try out the endpoints at the [docs page](https://link.pkrm.dev/docs)
BASE_URL | `URL`: Redirect URL for when people visit old, dead, or non-existant links | **Required**
IP_TO_LOCATION | `BOOLEAN`: Whether or not you want toe IP to Location feature <br> *(requires IP2Location.io account)* | **Required**
API_KEY | `KEY`: IP2Location.io API Key | **Required** *only if IP_TO_LOCATION is set to True*

59
api/main.py Normal file
View File

@ -0,0 +1,59 @@
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
import string
import random
from api.routes.links_route import router as links_router
from api.util.db_dependency import get_db
from api.util.check_api_key import check_api_key
from models import User
metadata_tags = [
{"name": "links", "description": "Operations for managing links"},
]
app = FastAPI(
title="LinkLogger API",
version="1.0",
summary="Public API for a combined link shortener and IP logger",
license_info={
"name": "The Unlicense",
"identifier": "Unlicense",
"url": "https://unlicense.org",
},
openapi_tags=metadata_tags,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
)
# Import routes
app.include_router(links_router)
# Regenerate the API key for the user
@app.post("/regenerate")
async def login(api_key: str = Security(check_api_key), db = Depends(get_db)):
print(api_key['value'])
user = db.query(User).filter(User.api_key == api_key['value']).first()
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
# Generate a new API key
new_api_key = ''.join(random.choices(string.ascii_letters + string.digits, k=20))
user.api_key = new_api_key
db.commit()
return {"status": "success", "new_api_key": new_api_key}
# Redirect /api -> /api/docs
@app.get("/")
async def redirect_to_docs():
return RedirectResponse(url="/api/docs")

143
api/routes/links_route.py Normal file
View File

@ -0,0 +1,143 @@
from fastapi import APIRouter, status, Path, Depends, Security, Request
from fastapi.exception_handlers import HTTPException
from typing import Annotated
import string
import random
import datetime
import validators
from api.util.db_dependency import get_db
from api.util.check_api_key import check_api_key
from models import Link, Record
from api.schemas.links_schemas import URLSchema
router = APIRouter(prefix="/links", tags=["links"])
@router.get("/", summary="Get all of the links associated with your account")
async def get_links(
db=Depends(get_db),
api_key: str = Security(check_api_key),
):
links = db.query(Link).filter(Link.owner == api_key["owner"]).all()
if not links:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="No links found"
)
return links
@router.post("/", summary="Create a new link")
async def create_link(
url: URLSchema,
db=Depends(get_db),
api_key: str = Security(check_api_key),
):
# Check if the URL is valid
if not validators.url(url.url):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid URL"
)
# Create the new link and add it to the database
while True:
try:
link_path = "".join(
random.choices(string.ascii_uppercase + "1234567890", k=5)
)
new_link = Link(
link=link_path,
owner=api_key["owner"],
redirect_link=url.url,
expire_date=datetime.datetime.now() + datetime.timedelta(days=30),
)
db.add(new_link)
db.commit()
break
except:
continue
return {
"response": "Link successfully created",
"expire_date": new_link.expire_date,
"link": new_link.link,
}
@router.delete("/{link}", summary="Delete a link")
async def delete_link(
link: Annotated[str, Path(title="Link to delete")],
db=Depends(get_db),
api_key: str = Security(check_api_key),
):
# Get the link and check the owner
link = db.query(Link).filter(Link.link == link).first()
if not link:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
)
if link.owner != api_key["owner"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with your account",
)
# Get and delete all records associated with the link
records = db.query(Record).filter(Record.link == link.link).all()
for record in records:
db.delete(record)
# Delete the link
db.delete(link)
db.commit()
return {"response": "Link successfully deleted", "link": link.link}
@router.get("/{link}/records", summary="Get all of the IP log records associated with a link")
async def get_link_records(
link: Annotated[str, Path(title="Link to get records for")],
db=Depends(get_db),
api_key: str = Security(check_api_key),
):
# Get the link and check the owner
link = db.query(Link).filter(Link.link == link).first()
if not link:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
)
if link.owner != api_key["owner"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with your account",
)
# Get and return all of the records associated with the link
records = db.query(Record).filter(Record.link == link.link).all()
return records
@router.delete("/{link}/records", summary="Delete all of the IP log records associated with a link")
async def delete_link_records(
link: Annotated[str, Path(title="Link to delete records for")],
db=Depends(get_db),
api_key: str = Security(check_api_key),
):
# Get the link and check the owner
link = db.query(Link).filter(Link.link == link).first()
if not link:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Link not found"
)
if link.owner != api_key["owner"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with your account",
)
# Get all of the records associated with the link and delete them
records = db.query(Record).filter(Record.link == link.link).all()
for record in records:
db.delete(record)
db.commit()
return {"response": "Records successfully deleted", "link": link.link}

View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class URLSchema(BaseModel):
url: str

21
api/util/check_api_key.py Normal file
View File

@ -0,0 +1,21 @@
from fastapi import Security, HTTPException, Depends, status
from fastapi.security import APIKeyHeader
from models import User
from api.util.db_dependency import get_db
"""
Make sure the provided API key is valid, then return the user's ID
"""
api_key_header = APIKeyHeader(name="X-API-Key")
def check_api_key(
api_key_header: str = Security(api_key_header), db=Depends(get_db)
) -> str:
response = db.query(User).filter(User.api_key == api_key_header).first()
if not response:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key"
)
return {"value": api_key_header, "owner": response.id}

View File

@ -0,0 +1,9 @@
from database import SessionLocal
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@ -0,0 +1,20 @@
import bcrypt
from fastapi import Depends
from api.util.db_dependency import get_db
from models import User
"""
Validate the login information provided by the user
"""
def validate_login_information(
username: str, password: str, db=Depends(get_db)
) -> bool:
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
return True
return False

View File

@ -1,22 +0,0 @@
import fastapi
from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
import sqlalchemy
from db import engine
"""
Make sure the provided API key is valid
"""
api_key_header = APIKeyHeader(name="X-API-Key")
def check_api_key(api_key_header: str = Security(api_key_header)) -> str:
with engine.begin() as conn:
response = conn.execute(sqlalchemy.text("SELECT api_key FROM keys WHERE api_key = :api_key"), {'api_key': api_key_header}).fetchone()
if response:
return response[0]
else:
raise HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key"
)

View File

@ -1,37 +0,0 @@
import sqlalchemy
import os
try:
os.mkdir('data')
except FileExistsError:
pass
engine = sqlalchemy.create_engine('sqlite:///data/data.db')
def init_db():
with engine.begin() as conn:
conn.execute(sqlalchemy.text(
'''
CREATE TABLE IF NOT EXISTS keys (
api_key, PRIMARY KEY (api_key)
)
'''
))
conn.execute(sqlalchemy.text(
'''
CREATE TABLE IF NOT EXISTS links (
owner, link, redirect_link, expire_date,
FOREIGN KEY (owner) REFERENCES keys(api_key), PRIMARY KEY (link)
)
'''
))
conn.execute(sqlalchemy.text(
'''
CREATE TABLE IF NOT EXISTS records (
owner, link, timestamp, ip, location, browser, os, user_agent, isp,
FOREIGN KEY (owner) REFERENCES links(owner),
FOREIGN KEY (link) REFERENCES links(link))
'''
))
conn.commit()

View File

@ -1,23 +0,0 @@
import sqlalchemy
from sqlalchemy import exc
import random
import string
from db import engine
"""
Generate and return a randomized API key string for the user
Keys are composed of 20 uppercase ASCII characters
"""
def generate_api_key():
with engine.begin() as conn:
while True:
try:
api_key_string = ''.join(random.choices(string.ascii_uppercase, k=20))
conn.execute(sqlalchemy.text('INSERT INTO keys(api_key) VALUES(:api_key)'), [{'api_key': api_key_string}])
conn.commit()
break
except exc.IntegrityError:
continue
return api_key_string

View File

@ -1,20 +0,0 @@
import sqlalchemy
from db import engine
"""
Delete the specified link from the users associated links
"""
def delete_link(link, owner):
with engine.begin() as conn:
try:
link_owner = conn.execute(sqlalchemy.text('SELECT owner FROM links WHERE link = :link'), [{'link': link}]).fetchone()[0]
except TypeError:
return 404
if owner == link_owner:
with engine.begin() as conn:
conn.execute(sqlalchemy.text('DELETE FROM links WHERE link = :link'), [{'link': link}])
return link
else:
return 401

View File

@ -1,20 +0,0 @@
import sqlalchemy
from db import engine
"""
Delete all of the IP log records that are associated with a specific link
"""
def delete_link_records(link, owner):
with engine.begin() as conn:
try:
link_owner = conn.execute(sqlalchemy.text('SELECT owner FROM links WHERE link = :link'), [{'link': link}]).fetchone()[0]
except TypeError:
return 404
if owner == link_owner:
with engine.begin() as conn:
conn.execute(sqlalchemy.text('DELETE FROM records WHERE link = :link'), [{'link': link}])
return link
else:
return 401

View File

@ -1,23 +0,0 @@
import sqlalchemy
from db import engine
"""
Retrieve all records associated with a specific link
"""
def get_link_records(link, owner):
with engine.begin() as conn:
try:
link_owner = conn.execute(sqlalchemy.text('SELECT owner FROM links WHERE link = :link'), [{'link': link}]).fetchone()[0]
except TypeError:
return 404
if owner == link_owner:
with engine.begin() as conn:
records = conn.execute(sqlalchemy.text('SELECT timestamp, ip, location, browser, os, user_agent, isp FROM records WHERE owner = :owner and link = :link'), [{'owner': owner, 'link': link}]).fetchall()
if not records:
return 204
else:
return 401
return records

View File

@ -1,23 +0,0 @@
import sqlalchemy
import datetime
from db import engine
"""
Renew a specified link so that the user can continue logging through that URL
Adds 7 days from the current date
"""
def renew_link(link, owner):
with engine.begin() as conn:
try:
link_owner = conn.execute(sqlalchemy.text('SELECT owner FROM links WHERE link = :link'), [{'link': link}]).fetchone()[0]
except TypeError:
return 404
if owner == link_owner:
with engine.begin() as conn:
expire_date = datetime.datetime.date(datetime.datetime.now()) + datetime.timedelta(days=7)
conn.execute(sqlalchemy.text('UPDATE links SET expire_date = :expire_date WHERE link = :link'), [{'expire_date': expire_date, 'link': link}])
return link, expire_date
else:
return 401

View File

@ -1,72 +0,0 @@
import ip2locationio
import sqlalchemy
import datetime
import validators
from ua_parser import user_agent_parser
from dotenv import load_dotenv
import os
from ip2locationio.ipgeolocation import IP2LocationIOAPIError
from db import engine
load_dotenv()
try:
ip_to_location = os.getenv('IP_TO_LOCATION').upper().replace('"', '')
if ip_to_location == 'TRUE':
api_key = os.getenv('API_KEY').replace('"', '')
else:
api_key = "NO_API_KEY"
base_url = os.getenv('BASE_URL').replace('"', '')
# .env File does not exist - likely a docker run
except AttributeError:
ip_to_location = str(os.environ['IP_TO_LOCATION']).upper().replace('"', '')
if ip_to_location == 'TRUE':
api_key = str(os.environ('API_KEY')).replace('"', '')
else:
api_key = "NO_API_KEY"
base_url = str(os.environ('BASE_URL')).replace('"', '')
if not validators.url(base_url):
print(base_url)
print('BASE_URL varaible is malformed.')
exit()
configuration = ip2locationio.Configuration(api_key)
ipgeolocation = ip2locationio.IPGeolocation(configuration)
"""
Create a new log record whenever a link is visited
"""
def log(link, ip, user_agent):
with engine.begin() as conn:
try:
redirect_link, owner = conn.execute(sqlalchemy.text('SELECT redirect_link, owner FROM links WHERE link = :link'), [{'link': link}]).fetchone()
except TypeError:
return base_url
with engine.begin() as conn:
if ip_to_location == 'TRUE':
# Get IP to GEO via IP2Location.io
try:
data = ipgeolocation.lookup(ip)
location = f'{data["country_name"]}, {data["city_name"]}'
isp = data['as']
# Fatal error, API key is invalid or out of requests, quit
except IP2LocationIOAPIError:
print('Invalid API key or insifficient credit. Change .env file if you do not need IP to location feature.')
location = '-, -'
isp = '-'
else:
location = '-, -'
isp = '-'
timestamp = datetime.datetime.now()
ua_string = user_agent_parser.Parse(user_agent)
browser = ua_string['user_agent']['family']
os = f'{ua_string["os"]["family"]} {ua_string["os"]["major"]}'
conn.execute(sqlalchemy.text('INSERT INTO records (owner, link, timestamp, ip, location, browser, os, user_agent, isp) VALUES (:owner, :link, :timestamp, :ip, :location, :browser, :os, :user_agent, :isp)'), [{'owner': owner, 'link': link, 'timestamp': timestamp, 'ip': ip, 'location': location, 'browser': browser, 'os': os, 'user_agent': user_agent, 'isp': isp}])
return redirect_link

View File

@ -1,30 +0,0 @@
import validators
import random
import string
import datetime
import sqlalchemy
from sqlalchemy import exc
from db import engine
"""
Generate and return a new randomized link that is connected to the user
Links are composed of 5 uppercase ASCII characters + numbers
"""
def generate_link(redirect_link, owner):
if not validators.url(redirect_link):
return 422
with engine.begin() as conn:
choices = string.ascii_uppercase + '1234567890'
while True:
try:
link = ''.join(random.choices(choices, k=5))
expire_date = datetime.datetime.date(datetime.datetime.now()) + datetime.timedelta(days=7)
conn.execute(sqlalchemy.text('INSERT INTO links(owner, link, redirect_link, expire_date) VALUES (:owner, :link, :redirect_link, :expire_date)'), [{'owner': owner, 'link': link, 'redirect_link': redirect_link, 'expire_date': expire_date}])
conn.commit()
break
except exc.IntegrityError:
continue
return link, expire_date

View File

@ -1,24 +0,0 @@
import sqlalchemy
import datetime
from db import engine
"""
Remove all links and associated records when the expire date has passed
"""
def remove_old_data():
with engine.begin() as conn:
today = datetime.datetime.date(datetime.datetime.now())
old_links = conn.execute(sqlalchemy.text('SELECT link FROM links WHERE expire_date < :today'), [{'today': today}])
delete_links = []
for row in old_links:
link = row.link
delete_links.append({'link': link})
if delete_links:
with engine.begin() as conn:
conn.execute(sqlalchemy.text('DELETE FROM links WHERE link = :link'), delete_links)
conn.execute(sqlalchemy.text('DELETE FROM records WHERE link = :link'), delete_links)
conn.commit()

View File

@ -1,9 +0,0 @@
from db import init_db
import uvicorn
from routes import app
if __name__ == '__main__':
init_db()
server = uvicorn.run(app=app, host="0.0.0.0", port=5252)

162
app/main.py Normal file
View File

@ -0,0 +1,162 @@
from flask_login import (
current_user,
login_user,
login_required,
logout_user,
LoginManager,
UserMixin,
)
from flask import Flask, redirect, render_template, request, url_for, Request
import bcrypt
import os
import string
import random
from models import User
from database import *
from app.util.log import log
class FlaskUser(UserMixin):
pass
app = Flask(__name__)
app.config["SECRET_KEY"] = os.urandom(24)
login_manager = LoginManager()
login_manager.init_app(app)
@login_manager.user_loader
def user_loader(username):
user = FlaskUser()
user.id = username
return user
"""
Handle login requests from the web UI
"""
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
# Get database session
db = SessionLocal()
user = db.query(User).filter(User.username == username).first()
db.close()
if not user:
return {"status": "Invalid username or password"}
if bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")):
flask_user = FlaskUser()
flask_user.id = username
login_user(flask_user)
return {"status": "success"}
return {"status": "Invalid username or password"}
return render_template("login.html")
"""
Handle signup requests from the web UI
"""
@app.route("/signup", methods=["GET", "POST"])
def signup():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
# Get database session
db = SessionLocal()
user = db.query(User).filter(User.username == username).first()
if user:
db.close()
return {"status": "User already exists"}
# Add information to the database
hashed_password = bcrypt.hashpw(
password.encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
api_key = "".join(random.choices(string.ascii_letters + string.digits, k=20))
new_user = User(username=username, password=hashed_password, api_key=api_key)
db.add(new_user)
db.commit()
db.close()
# Log in the newly created user
flask_user = FlaskUser()
flask_user.id = username
login_user(flask_user)
return {"status": "success"}
return render_template("signup.html")
"""
Load the 'dashboard' page for logged in users
"""
@app.route("/dashboard", methods=["GET"])
@login_required
def dashboard():
# Get database session
db = SessionLocal()
# Get the API key for the current user
user = db.query(User).filter(User.username == current_user.id).first()
db.close()
api_key = user.api_key
return render_template("dashboard.html", api_key=api_key)
"""
Log users out of their account
"""
@app.route("/logout", methods=["GET"])
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
"""
Log all records for visits to shortened links
"""
@app.route("/<link>", methods=["GET"])
def log_redirect(link):
# If the `link` is more than 5 characters, ignore
if len(link) > 5:
return
# If the `link` is one of the registered routes, ignore
if link in ["login", "signup", "dashboard", "logout", "api"]:
return
ip = request.remote_addr
user_agent = request.headers.get("user-agent")
redirect_link = log(link, ip, user_agent)
return redirect(redirect_link)
@app.errorhandler(401)
def unauthorized(e):
return redirect(url_for("login"))
@app.errorhandler(404)
def not_found(e):
return redirect(url_for("login"))

View File

@ -1,201 +0,0 @@
import fastapi
from fastapi import Security, HTTPException, Request
from starlette.responses import RedirectResponse
import pydantic
import sqlalchemy
from db import engine
from check_api_key import check_api_key
from func.generate_api_key import generate_api_key
from func.newlink import generate_link
from func.log import log
from func.link.delete import delete_link
from func.link.renew import renew_link
from func.link.records import get_link_records
from func.link.delrecords import delete_link_records
from func.remove_old_data import remove_old_data
from apscheduler.schedulers.background import BackgroundScheduler
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
# Create the scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(remove_old_data, "cron", hour="0", minute="01")
scheduler.start()
yield
class Newlink(pydantic.BaseModel):
redirect_link: str
app = fastapi.FastAPI(lifespan=lifespan)
@app.post("/api/getapikey")
async def get_api_key():
"""
Create a new API key
"""
api_key = generate_api_key()
return {"api_key": api_key}
@app.post("/api/genlink")
async def newlink(newlink: Newlink, api_key: str = Security(check_api_key)):
"""
Generate a new link that will redirect to the specified URL and log IPs in the middle
"""
data = generate_link(newlink.redirect_link, api_key)
if data == 422:
raise HTTPException(
status_code=fastapi.status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Malformed redirect link provided"
)
return {"link": data[0], "expire_date": data[1]}
"""
Return all records associated with an API key, no matter the link
"""
@app.get("/api/records")
async def records(api_key: str = Security(check_api_key)):
"""
Get ALL IP logs records for every link tied to your API key
"""
with engine.begin() as conn:
records = conn.execute(sqlalchemy.text("SELECT timestamp, ip, location, browser, os, user_agent, isp FROM records WHERE owner = :owner"), [{"owner": api_key}]).fetchall()
if not records:
return {"records": "No records are associated with this API key"}
response = []
for timestamp, ip, location, browser, os, user_agent, isp in records:
response.append({"timestamp": timestamp, "ip": ip, "location": location, "browser": browser, "os": os, "user_agent": user_agent, "isp": isp})
return response
@app.get("/{link}")
def link(link, request: Request):
ip = request.client.host
user_agent = request.headers.get("user-agent")
redirect_link = log(link, ip, user_agent)
return fastapi.responses.RedirectResponse(url=redirect_link)
"""
Return all links associated with an API key
"""
@app.get("/api/links")
async def links(api_key: str = Security(check_api_key)):
"""
Retrieve all links that are currently tied to your API key
"""
with engine.begin() as conn:
links = conn.execute(sqlalchemy.text("SELECT link, expire_date FROM links WHERE owner = :owner"), [{"owner": api_key}]).fetchall()
if not links:
return {"links": "No links are associated with this API key"}
response = []
for link, expire_date in links:
response.append({"link": link, "expire_date": expire_date})
return response
@app.post("/api/{link}/delete")
async def link_delete(link: str, api_key: str = Security(check_api_key)):
"""
Delete the specified link and all records associated with it
"""
data = delete_link(link, api_key)
if data == 404:
raise HTTPException(
status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail="Link does not exist"
)
if data == 401:
raise HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with given API key"
)
else:
return {"link": f"The link {data} has been deleted"}
@app.post("/api/{link}/renew")
async def link_renew(link: str, api_key: str = Security(check_api_key)):
"""
Renew a specifiec link (adds 7 more days from the current date)
"""
data = renew_link(link, api_key)
if data == 404:
raise HTTPException(
status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail="Link does not exist"
)
if data == 401:
raise HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with given API key"
)
else:
return {"link": f"The link {data[0]} has been renewed and will expire on {data[1]}"}
@app.get("/api/{link}/records")
async def link_records(link: str, api_key: str = Security(check_api_key)):
"""
Retrieve all IP log records for the specified link
"""
data = get_link_records(link, api_key)
if data == 404:
raise HTTPException(
status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail="Link does not exist"
)
if data == 401:
raise HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with given API key"
)
if data == 204:
raise HTTPException(
status_code=fastapi.status.HTTP_204_NO_CONTENT,
detail="No records found"
)
else:
response = []
for timestamp, ip, location, browser, os, user_agent, isp in data:
response.append({"timestamp": timestamp, "ip": ip, "location": location, "browser": browser, "os": os, "user_agent": user_agent, "isp": isp})
return response
@app.post("/api/{link}/delrecords")
async def link_delrecords(link: str, api_key: str = Security(check_api_key)):
"""
Delete all IP log records for the specified link
"""
data = delete_link_records(link, api_key)
if data == 404:
raise HTTPException(
status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail="Link does not exist"
)
if data == 401:
raise HTTPException(
status_code=fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Link not associated with given API key"
)
else:
return {"link": f"The records for link {data} have been deleted"}
# Redirect / -> /docs
@app.get("/", summary="Redirect to the Swagger UI documentation")
async def redirect_to_docs():
return RedirectResponse(url="/docs")

View File

@ -0,0 +1,47 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LinkLogger | Login</title>
</head>
<body>
<div>
<!-- Create a small box that will hold the text for the users api key, next to the box should be a regenerate button -->
<p>Your API Key: <span id="api-key">{{ api_key }}</span></p>
<button onclick="window.location.href='logout'">Logout</button>
</div>
</body>
</html>
<style>
body {
margin: 0;
padding: 0;
font-family: Arial, sans-serif;
background-color: #2c3338;
}
div {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
font-size: 25px;
color: #ccc;
}
button {
display: block;
margin: 10px auto;
width: 200px;
border-radius: 5px;
padding: 15px;
color: #ccc;
background-color: #415eac;
border: none;
font-size: 17px;
cursor: pointer;
}
</style>

111
app/templates/login.html Normal file
View File

@ -0,0 +1,111 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LinkLogger | Login</title>
</head>
<body>
<div>
<p id="error">Incorrect username/password. Please try again.</p>
<form action="/login" method="POST">
<input type="text" name="username" placeholder="Username" required>
<input type="password" name="password" placeholder="Password" required>
<button type="submit">Login</button>
</form>
<hr>
<p>Don't have an account? <a href="/signup">Create one now</a></p>
</div>
</body>
</html>
<style>
body {
margin: 0;
padding: 0;
font-family: Arial, sans-serif;
background-color: #2c3338;
}
div {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
}
input {
display: block;
margin: 10px auto;
width: 300px;
border-radius: 5px;
padding: 15px;
color: #ccc;
background-color: #3b4148;
border: none;
font-size: 17px;
}
button {
display: block;
margin: 10px auto;
width: 100%;
border-radius: 5px;
padding: 15px;
color: #ccc;
background-color: #415eac;
border: none;
font-size: 17px;
cursor: pointer;
}
hr {
color: #606468;
}
p {
color: #606468;
}
#error {
font-size: 15px;
color: #f55757;
display: none;
}
a {
color: #ccc;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
</style>
<script>
document.querySelector('form').addEventListener('submit', async function(event) {
// Prevent default form submission
event.preventDefault();
// Get form data
const formData = new FormData(this);
console.log(formData)
// Send POST request to /login containing form data
const response = await fetch('/login', {
method: 'POST',
body: formData
});
data = await response.json()
if (data.status != "success") {
document.getElementById('error').style.display = 'block';
} else {
window.location.href = '/dashboard';
}
});
</script>

111
app/templates/signup.html Normal file
View File

@ -0,0 +1,111 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LinkLogger | Signup</title>
</head>
<body>
<div>
<p id="error">User already exists. Please try again</p>
<form action="/signup" method="POST">
<input type="text" name="username" placeholder="Username" required>
<input type="password" name="password" placeholder="Password" required>
<button type="submit">Signup</button>
</form>
<hr>
<p>Already have an account? <a href="/login">Log in now</a></p>
</div>
</body>
</html>
<style>
body {
margin: 0;
padding: 0;
font-family: Arial, sans-serif;
background-color: #2c3338;
}
div {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
}
input {
display: block;
margin: 10px auto;
width: 300px;
border-radius: 5px;
padding: 15px;
color: #ccc;
background-color: #3b4148;
border: none;
font-size: 17px;
}
button {
display: block;
margin: 10px auto;
width: 100%;
border-radius: 5px;
padding: 15px;
color: #ccc;
background-color: #415eac;
border: none;
font-size: 17px;
cursor: pointer;
}
hr {
color: #606468;
}
p {
color: #606468;
}
#error {
font-size: 15px;
color: #f55757;
display: none;
}
a {
color: #ccc;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
</style>
<script>
document.querySelector('form').addEventListener('submit', async function(event) {
// Prevent default form submission
event.preventDefault();
// Get form data
const formData = new FormData(this);
console.log(formData)
// Send POST request to /signup containing form data
const response = await fetch('/signup', {
method: 'POST',
body: formData
});
data = await response.json()
if (data.status != "success") {
document.getElementById('error').style.display = 'block';
} else {
window.location.href = '/dashboard';
}
});
</script>

65
app/util/log.py Normal file
View File

@ -0,0 +1,65 @@
import ip2locationio
import datetime
from ua_parser import user_agent_parser
from ip2locationio.ipgeolocation import IP2LocationIOAPIError
from database import SessionLocal
from var import LOG, API_KEY, IP_TO_LOCATION
from models import Link, Record
configuration = ip2locationio.Configuration(API_KEY)
ipgeolocation = ip2locationio.IPGeolocation(configuration)
"""
Create a new log record whenever a link is visited
"""
def log(link, ip, user_agent):
db = SessionLocal()
# Get the redirect link and owner of the link
redirect_link, owner = (
db.query(Link.redirect_link, Link.owner).filter(Link.link == link).first()
)
if IP_TO_LOCATION == "TRUE":
# Get IP to GEO via IP2Location.io
try:
data = ipgeolocation.lookup(ip)
location = f'{data["country_name"]}, {data["city_name"]}'
isp = data["as"]
# Fatal error, API key is invalid or out of requests, quit
except IP2LocationIOAPIError:
LOG.error(
"Invalid API key or insufficient credits. Change the `config.ini` file if you no longer need the IP2Location API feature."
)
location = "-, -"
isp = "-"
else:
location = "-, -"
isp = "-"
timestamp = datetime.datetime.now()
ua_string = user_agent_parser.Parse(user_agent)
browser = ua_string["user_agent"]["family"]
os = f'{ua_string["os"]["family"]} {ua_string["os"]["major"]}'
# Create the log record and commit it to the database
link_record = Record(
owner=owner,
link=link,
timestamp=timestamp,
ip=ip,
location=location,
browser=browser,
os=os,
user_agent=user_agent,
isp=isp,
)
db.add(link_record)
db.commit()
db.close()
# Return the redirect link in order to properly redirect the user
return redirect_link

5
config.ini Normal file
View File

@ -0,0 +1,5 @@
[CONFIG]
base_url = https://pkrm.dev
ip_to_location = true
api_key = F3EB511E9B9B990AB6389CB0F98FF2A8

13
database.py Normal file
View File

@ -0,0 +1,13 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
# Create 'data' directory at root if it doesn't exist
if not os.path.exists("data"):
os.makedirs("data")
engine = create_engine("sqlite:///data/data.db")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

12
docker-compose.yaml Normal file
View File

@ -0,0 +1,12 @@
services:
linklogger:
container_name: linklogger
image: ghcr.io/packetparker/linklogger:latest
network_mode: host
environment:
- BASE_URL=https://your.domain
- IP_TO_LOCATION=True
- API_KEY=awd
volumes:
- /path/on/system:/data
restart: on-failure

21
linklogger.py Normal file
View File

@ -0,0 +1,21 @@
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from a2wsgi import ASGIMiddleware
from validate_config import validate_config
from app.main import app as flask_app
from api.main import app as fastapi_app
from database import Base, engine
Base.metadata.create_all(bind=engine)
flask_app.wsgi_app = DispatcherMiddleware(
flask_app.wsgi_app,
{
"/": flask_app,
"/api": ASGIMiddleware(fastapi_app),
},
)
if __name__ == "__main__":
validate_config()
flask_app.run(port=5252)

40
models.py Normal file
View File

@ -0,0 +1,40 @@
from sqlalchemy import (
Column,
ForeignKey,
Integer,
String,
Text,
DateTime,
)
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, nullable=False)
password = Column(Text, nullable=False)
api_key = Column(String(20), unique=True, nullable=False)
class Link(Base):
__tablename__ = "links"
link = Column(String, primary_key=True)
owner = Column(Integer, ForeignKey("users.id"), nullable=False)
redirect_link = Column(String, nullable=False)
expire_date = Column(DateTime, nullable=False)
class Record(Base):
__tablename__ = "records"
id = Column(Integer, primary_key=True)
owner = Column(Integer, ForeignKey("users.id"), nullable=False)
link = Column(String, ForeignKey("links.link"), nullable=False)
timestamp = Column(DateTime, nullable=False)
ip = Column(String, nullable=False)
location = Column(String, nullable=False)
browser = Column(String, nullable=False)
os = Column(String, nullable=False)
user_agent = Column(String, nullable=False)
isp = Column(String, nullable=False)

View File

@ -1,9 +1,13 @@
pydantic==2.6.2
ip2location-io==1.0.0
python-dotenv==1.0.0
SQLAlchemy==2.0.27
ua-parser==0.18.0
validators==0.22.0
uvicorn==0.27.1
fastapi==0.110.0
APScheduler==3.10.4
APScheduler==3.10.4
Flask==3.0.3
Flask-Login==0.6.3
a2wsgi==1.10.4
colorlog==6.8.2
bcrypt==4.1.3

139
validate_config.py Normal file
View File

@ -0,0 +1,139 @@
import configparser
import validators
import os
import sys
from var import LOG
"""
Validate the config of a Docker run (environment variables)
"""
def validate_docker_config():
errors = 0
# Validate BASE_URL
try:
if not os.environ["BASE_URL"]:
LOG.error("BASE_URL is not set")
errors += 1
elif not validators.url(os.environ["BASE_URL"]):
LOG.error("BASE_URL is not a valid URL")
errors += 1
except KeyError:
LOG.critical("BASE_URL does not exist!")
errors += 1
# Validate IP_TO_LOCATION
try:
if not os.environ["IP_TO_LOCATION"]:
LOG.error("IP_TO_LOCATION is not set")
errors += 1
elif os.environ["IP_TO_LOCATION"].upper() not in ["TRUE", "FALSE", "T", "F"]:
LOG.error("IP_TO_LOCATION is not set to TRUE or FALSE")
errors += 1
else:
iptolocation = (
True if os.environ["IP_TO_LOCATION"].upper() in ["TRUE", "T"] else False
)
# Validate API_KEY if IP_TO_LOCATION is set to TRUE
if iptolocation:
try:
if not os.environ["API_KEY"]:
LOG.error("API_KEY is not set")
errors += 1
except KeyError:
LOG.critical("API_KEY does not exist!")
errors += 1
except KeyError:
LOG.critical("IP_TO_LOCATION does not exist!")
errors += 1
if errors > 0:
LOG.critical(f"{errors} error(s) found in environment variables")
sys.exit()
"""
Validate the config of a bare metal run (config.ini file)
"""
def validate_bare_metal_config(file_contents):
config = configparser.ConfigParser()
config.read_string(file_contents)
errors = 0
# Validate BASE_URL
try:
if not config["CONFIG"]["BASE_URL"]:
LOG.error("BASE_URL is not set")
errors += 1
elif not validators.url(config["CONFIG"]["BASE_URL"]):
LOG.error("BASE_URL is not a valid URL")
errors += 1
except ValueError:
LOG.critical("BASE_URL does not exist!")
errors += 1
# Validate IP_TO_LOCATION
try:
if not config["CONFIG"]["IP_TO_LOCATION"]:
LOG.error("IP_TO_LOCATION is not set")
errors += 1
elif config["CONFIG"]["IP_TO_LOCATION"].upper() not in [
"TRUE",
"FALSE",
"T",
"F",
]:
LOG.error("IP_TO_LOCATION is not set to TRUE or FALSE")
errors += 1
else:
iptolocation = (
True
if config["CONFIG"]["IP_TO_LOCATION"].upper() in ["TRUE", "T"]
else False
)
# Validate API_KEY if IP_TO_LOCATION is set to TRUE
if iptolocation:
try:
if not config["CONFIG"]["API_KEY"]:
LOG.error("API_KEY is not set")
errors += 1
except ValueError:
LOG.critical("API_KEY does not exist!")
errors += 1
except ValueError:
LOG.critical("IP_TO_LOCATION does not exist!")
errors += 1
if errors > 0:
LOG.critical(f"{errors} error(s) found in `config.ini`")
sys.exit()
def validate_config():
# If the app is running in Docker
if "BASE_URL" in os.environ or "IP_TO_LOCATION" in os.environ:
return validate_docker_config()
# Otherwise, the app is running on bare metal
try:
with open("config.ini", "r") as f:
file_contents = f.read()
return validate_bare_metal_config(file_contents)
except FileNotFoundError:
config = configparser.ConfigParser()
config["CONFIG"] = {"BASE_URL": "", "IP_TO_LOCATION": "", "API_KEY": ""}
with open("config.ini", "w") as configfile:
config.write(configfile)
LOG.error(
"`config.ini` has been created. Fill out the necessary information then re-run."
)
sys.exit()

59
var.py Normal file
View File

@ -0,0 +1,59 @@
import configparser
import logging
import os
from colorlog import ColoredFormatter
log_level = logging.DEBUG
log_format = "%(log_color)s%(levelname)-8s%(reset)s %(log_color)s%(message)s%(reset)s"
logging.root.setLevel(log_level)
formatter = ColoredFormatter(log_format)
stream = logging.StreamHandler()
stream.setLevel(log_level)
stream.setFormatter(formatter)
LOG = logging.getLogger("pythonConfig")
LOG.setLevel(log_level)
LOG.addHandler(stream)
# If the app is running in Docker
if "BASE_URL" in os.environ or "IP_TO_LOCATION" in os.environ:
BASE_URL = os.environ["BASE_URL"]
IP_TO_LOCATION = (
True if os.environ["IP_TO_LOCATION"].upper() in ["TRUE", "T"] else False
)
if IP_TO_LOCATION:
API_KEY = os.environ["API_KEY"]
else:
API_KEY = None
# Otherwise, the app is running on bare metal
try:
with open("config.ini", "r") as f:
config = configparser.ConfigParser()
config.read_string(f.read())
BASE_URL = config["CONFIG"]["BASE_URL"]
IP_TO_LOCATION = (
True
if config["CONFIG"]["IP_TO_LOCATION"].upper() in ["TRUE", "T"]
else False
)
if IP_TO_LOCATION:
API_KEY = config["CONFIG"]["API_KEY"]
else:
API_KEY = None
except FileNotFoundError:
config = configparser.ConfigParser()
config["CONFIG"] = {"BASE_URL": "", "IP_TO_LOCATION": "", "API_KEY": ""}
with open("config.ini", "w") as configfile:
config.write(configfile)
LOG.error(
"`config.ini` has been created. Fill out the necessary information then re-run."
)
exit()