cordarr/code/cogs/status.py
2025-01-22 16:03:57 -06:00

256 lines
8.8 KiB
Python

import discord
from discord import app_commands
from discord.ext import commands
import requests
from utils.models import Requests
from utils.database import Session
from utils.config import (
RADARR_HOST_URL,
RADARR_HEADERS,
SONARR_HOST_URL,
SONARR_HEADERS,
)
class Status(commands.Cog):
def __init__(self, bot):
self.bot = bot
@app_commands.command()
async def status(self, interaction: discord.Interaction) -> None:
"""Get the status of the movies you have requested"""
# Defer the response
await interaction.response.defer(ephemeral=True)
with Session() as session:
requested_content = (
session.query(
Requests.title,
Requests.release_year,
Requests.local_id,
Requests.tmdbid,
Requests.tvdbid,
)
.filter(Requests.user_id == interaction.user.id)
.all()
)
# No content requested
if len(requested_content) == 0:
embed = discord.Embed(
title="No Content Requested",
description=(
"If you believe this is in error, the content you have"
" requested is likely already downloaded."
),
color=0xD01B86,
)
return await interaction.followup.send(embed=embed)
# Create template embed
embed = discord.Embed(
title="Requested Content",
description=(
"Below are the movies/shows you have requested that are"
" currently being downloaded:\n"
),
color=0xD01B86,
)
# Unpack the content
radarr_content_info, sonarr_content_info = self.unpack_content(
requested_content
)
# Get the descriptions and local IDs found in queue
radarr_desc, radarr_added_ids = self.process_queue(
radarr_content_info, "radarr"
)
sonarr_desc, sonarr_added_ids = self.process_queue(
sonarr_content_info, "sonarr"
)
added_ids = radarr_added_ids + sonarr_added_ids
# Get the description of content not in the queue
non_queue_desc = self.get_non_queue_content(
requested_content, added_ids, interaction.user.id
)
embed.description += radarr_desc + sonarr_desc + non_queue_desc
# Send the follow-up message
await interaction.followup.send(embed=embed)
def unpack_content(self, requested_content: list) -> tuple:
"""
Given a list of requested content, unpack it into two dictionaries
Args:
requested_content (list): A list of requested content
Returns:
tuple: A tuple of two dictionaries
"""
radarr_content_info = {}
sonarr_content_info = {}
for content in requested_content:
title, release_year, local_id, tmdbid, tvdbid = content
if tmdbid is not None:
radarr_content_info[local_id] = {
"title": title,
"release_year": release_year,
"tmdbid": tmdbid,
}
else:
sonarr_content_info[local_id] = {
"title": title,
"release_year": release_year,
"tvdbid": tvdbid,
}
return radarr_content_info, sonarr_content_info
def process_queue(self, content_info: dict, service: str) -> str:
"""
Given a dictionary of requested content and "sonarr"/"radarr", process the queue
Args:
content_info (dict): A dictionary of content information
service (str): The service to check the queue of
Returns:
str: The description of the embed
"""
description = ""
added_ids = []
queue = requests.get(
f"{RADARR_HOST_URL if service == 'radarr' else SONARR_HOST_URL}/api/v3/queue",
headers=RADARR_HEADERS if service == "radarr" else SONARR_HEADERS,
).json()
for download in queue["records"]:
id_str = "movieId" if service == "radarr" else "seriesId"
# If the content was requested by the user
if (
int(download[id_str]) in content_info.keys()
and int(download[id_str]) not in added_ids
):
# Append local ID
added_ids.append(int(download[id_str]))
# Add the download to the embed
try:
time_left = self.process_time(download["timeleft"])
except KeyError:
time_left = "Unknown"
description += (
f"\n**{content_info[int(download[id_str])]['title']} ({content_info[int(download[id_str])]['release_year']})**"
f" - Time Left: `{time_left}`"
)
return description, added_ids
def get_non_queue_content(
self, requested_content: list, added_ids: list, user_id: int
) -> str:
"""
Given a list of requested content and a list of added IDs, return a description of content not in the queue
Args:
requested_content (list): A list of requested content
added_ids (list): A list of IDs that are in the queue
user_id (int): The ID of the user
Returns:
str: A description of content not in the queue
"""
description = ""
# For evry piece of content not in the queue, check if it has a file
for content in requested_content:
title, release_year, local_id, tmdbid, _ = content
# If not in queue
if local_id not in added_ids:
# Pull the movie data from the service
if tmdbid is not None:
data = requests.get(
f"{RADARR_HOST_URL}/api/v3/movie/{local_id}",
headers=RADARR_HEADERS,
).json()
else:
data = requests.get(
f"{SONARR_HOST_URL}/api/v3/series/{local_id}",
headers=SONARR_HEADERS,
).json()
# If the movie has a file, then it has finished downloading
if data.get("hasFile", True):
# Remove from database
with Session() as session:
request = (
session.query(Requests)
.filter(Requests.user_id == user_id)
.filter(Requests.local_id == local_id)
.first()
)
session.delete(request)
session.commit()
# If series and only a portion of episodes have been downloaded
# If data["statistics"] exists and is not None
if "statistics" in data and data["statistics"] != None:
if "percentOfEpisodes" in data["statistics"]:
description += (
f"\n**{title} ({release_year})** - Status: `NOT"
" FOUND"
f" ({int(data['statistics']['percentOfEpisodes'])}%"
" of eps.)`"
)
# All other scenarios, download not found
else:
description += (
f"\n**{title} ({release_year})** - Status: `NOT FOUND`"
)
return description
def process_time(self, time) -> str:
"""
Given a time string, process it into a human readable format
Args:
time (str): A string representing time
Returns:
str: A human readable time
"""
# Split the input by either ':' or spaces
parts = time.replace(" ", ":").replace(".", ":").split(":")
# Handle different input lengths
if len(parts) == 2: # Format: MM:SS
minutes, seconds = map(int, parts)
return f"{minutes} min. {seconds} sec."
elif len(parts) == 3: # Format: HH:MM:SS
hours, minutes, seconds = map(int, parts)
if hours == 0:
return f"{minutes} min. {seconds} sec."
return f"{hours} hr. {minutes} min."
elif len(parts) == 4: # Format: D:HH:MM:SS
days, hours, minutes, seconds = map(int, parts)
if days == 0:
return f"{hours} hr. {minutes} min."
return f"{days} days {hours} hr."
else:
return "Unknown"
async def setup(bot):
await bot.add_cog(Status(bot))