aboutsummaryrefslogtreecommitdiff
path: root/code/utils
diff options
context:
space:
mode:
authorParker <contact@pkrm.dev>2024-07-20 18:15:15 -0500
committerParker <contact@pkrm.dev>2024-07-20 18:15:15 -0500
commitfda03dff62cc7967bd279777a012a22ddaa6ad34 (patch)
treeea6fb63db30bca8959d0e25e6333af8d7c9bbb7f /code/utils
parentd69807b2e4302febbac55050ee3efbdee08b8e50 (diff)
Move files to `utils` folder
Diffstat (limited to 'code/utils')
-rw-r--r--code/utils/ai_recommendations.py81
-rw-r--r--code/utils/command_tree.py4
-rw-r--r--code/utils/config.py274
-rw-r--r--code/utils/custom_sources.py203
4 files changed, 560 insertions, 2 deletions
diff --git a/code/utils/ai_recommendations.py b/code/utils/ai_recommendations.py
new file mode 100644
index 0000000..68764bc
--- /dev/null
+++ b/code/utils/ai_recommendations.py
@@ -0,0 +1,81 @@
+from lavalink import LoadType
+import re
+
+
+async def add_song_recommendations(
+ openai_client, bot_user, player, number, inputs, retries: int = 1
+):
+ input_list = [f'"{song} by {artist}"' for song, artist in inputs.items()]
+
+ completion = (
+ openai_client.chat.completions.create(
+ messages=[
+ {
+ "role": "user",
+ "content": f"""
+ BACKGROUND: You're an AI music recommendation system with a knack for understanding
+ user preferences based on provided input. Your task is to generate a list
+ of {number} songs that the user might enjoy, derived from a given list of {number} songs.
+ The input will be in the format of
+ ["Song-1-Name by Song-1-Artist", "Song-2-Name by Song-2-Artist", ...]
+ and you need to return a list formatted in the same way.
+
+ When recommending songs, consider the genre, tempo, and mood of the input
+ songs to suggest similar ones that align with the user's tastes. Also, it
+ is important to mix up the artists, don't only give the same artists that
+ are already in the queue. If you cannot find {number} songs that match the
+ criteria or encounter any issues, return the list ["NOTHING FOUND"].
+
+ Please be sure to also only use characters A-Z, a-z, 0-9, and spaces in the
+ song and artist names. Do not include escape/special characters, emojis, or
+ quotes in the output.
+
+ INPUT: {input_list}
+ """,
+ }
+ ],
+ model="gpt-4o-mini",
+ )
+ .choices[0]
+ .message.content.strip()
+ .strip('"')
+ )
+
+ # Sometimes ChatGPT will return `["NOTHING FOUND"]` even if it should
+ # have found something, so we check each prompt up to 3 times before
+ # giving up.
+ if completion == '["NOTHING FOUND"]':
+ if retries <= 3:
+ await add_song_recommendations(
+ openai_client, bot_user, player, number, inputs, retries + 1
+ )
+ else:
+ return False
+
+ else:
+ # Clean up the completion string to remove any potential issues
+ # with the eval function (e.g. OUTPUT: prefix, escaped quotes, etc.)
+ completion = re.sub(r"[\\\'\[\]\n]+|OUTPUT: ", "", completion)
+
+ for entry in eval(completion):
+ song, artist = entry.split(" by ")
+ ytsearch = f"ytsearch:{song} by {artist} audio"
+ results = await player.node.get_tracks(ytsearch)
+
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ dzsearch = f"dzsearch:{song}"
+ results = await player.node.get_tracks(dzsearch)
+
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ continue
+
+ track = results.tracks[0]
+ player.add(requester=bot_user, track=track)
+
+ return True
diff --git a/code/utils/command_tree.py b/code/utils/command_tree.py
index 504d3a9..947a58c 100644
--- a/code/utils/command_tree.py
+++ b/code/utils/command_tree.py
@@ -3,8 +3,8 @@ from discord import app_commands
from discord.ext.commands.errors import *
import datetime
-from config import BOT_COLOR
-from custom_sources import LoadError
+from utils.config import BOT_COLOR
+from utils.custom_sources import LoadError
# Create a custom AppCommandError for the create_player function
diff --git a/code/utils/config.py b/code/utils/config.py
new file mode 100644
index 0000000..d411653
--- /dev/null
+++ b/code/utils/config.py
@@ -0,0 +1,274 @@
+import configparser
+import re
+import os
+import validators
+import sys
+import discord
+import openai
+import logging
+from colorlog import ColoredFormatter
+
+log_level = logging.DEBUG
+log_format = (
+ " %(log_color)s%(levelname)-8s%(reset)s | %(log_color)s%(message)s%(reset)s"
+)
+
+logging.root.setLevel(log_level)
+formatter = ColoredFormatter(log_format)
+
+stream = logging.StreamHandler()
+stream.setLevel(log_level)
+stream.setFormatter(formatter)
+
+LOG = logging.getLogger("pythonConfig")
+LOG.setLevel(log_level)
+LOG.addHandler(stream)
+
+TOKEN = None
+BOT_COLOR = None
+BOT_INVITE_LINK = None
+FEEDBACK_CHANNEL_ID = None
+BUG_CHANNEL_ID = None
+SPOTIFY_CLIENT_ID = None
+SPOTIFY_CLIENT_SECRET = None
+APPLE_MUSIC_KEY = None
+OPENAI_API_KEY = None
+LAVALINK_HOST = None
+LAVALINK_PORT = None
+LAVALINK_PASSWORD = None
+
+"""
+Load the config.ini file and return the contents for validation or
+create a new templated config.ini file if it doesn't exist.
+"""
+
+
+def load_config():
+ # Look for variables in the environment
+ if "TOKEN" in os.environ or "BOT_COLOR" in os.environ or "BOT_INVITE_LINK" in os.environ:
+ LOG.info("Detected environment variables. Checking for configuration options.")
+ return validate_env_vars()
+ else:
+ LOG.info("Detected local environment. Checking for config.ini file.")
+
+ try:
+ with open("config.ini", "r") as f:
+ file_contents = f.read()
+ validate_config(file_contents)
+
+ except FileNotFoundError:
+ config = configparser.ConfigParser()
+ config["BOT_INFO"] = {
+ "TOKEN": "",
+ "BOT_COLOR": "",
+ "BOT_INVITE_LINK": "",
+ "FEEDBACK_CHANNEL_ID": "",
+ "BUG_CHANNEL_ID": "",
+ }
+
+ config["SPOTIFY"] = {
+ "SPOTIFY_CLIENT_ID": "",
+ "SPOTIFY_CLIENT_SECRET": "",
+ }
+
+ config["APPLE_MUSIC"] = {
+ "APPLE_MUSIC_KEY": "",
+ }
+
+ config["OPENAI"] = {
+ "OPENAI_API_KEY": "",
+ }
+
+ config["LAVALINK"] = {
+ "HOST": "",
+ "PORT": "",
+ "PASSWORD": "",
+ }
+
+ with open("config.ini", "w") as configfile:
+ config.write(configfile)
+
+ sys.exit(
+ LOG.critical(
+ "Configuration file `config.ini` has been generated. Please fill out all of the necessary information. Refer to the docs for information on what a specific configuration option is."
+ )
+ )
+
+
+"""
+Validate all of the options in the config.ini file.
+"""
+
+
+def validate_config(file_contents):
+ global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, APPLE_MUSIC_KEY, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD
+ config = configparser.ConfigParser()
+ config.read_string(file_contents)
+
+ hex_pattern_one = "^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
+ hex_pattern_two = "^([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
+
+ errors = 0
+
+ # Make sure all sections are present
+ if ["BOT_INFO", "SPOTIFY", "APPLE_MUSIC", "OPENAI", "LAVALINK"] != config.sections():
+ sys.exit(
+ LOG.critical(
+ "Missing sections in config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ if ["token","bot_color","bot_invite_link", "feedback_channel_id","bug_channel_id",] != config.options("BOT_INFO"):
+ sys.exit(
+ LOG.critical(
+ "Missing options in BOT_INFO section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ if ["spotify_client_id", "spotify_client_secret"] != config.options("SPOTIFY"):
+ sys.exit(
+ LOG.critical(
+ "Missing options in SPOTIFY section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ if ["apple_music_key"] != config.options("APPLE_MUSIC"):
+ sys.exit(
+ LOG.critical(
+ "Missing options in APPLE_MUSIC section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ if ["openai_api_key"] != config.options("OPENAI"):
+ sys.exit(
+ LOG.critical(
+ "Missing options in OPENAI section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ if ["host", "port", "password"] != config.options("LAVALINK"):
+ sys.exit(
+ LOG.critical(
+ "Missing options in LAVALINK section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file."
+ )
+ )
+
+ # Make sure BOT_COLOR is a valid hex color
+ if not bool(re.match(hex_pattern_one, config["BOT_INFO"]["BOT_COLOR"])) and not bool(re.match(hex_pattern_two, config["BOT_INFO"]["BOT_COLOR"])):
+ LOG.error("BOT_COLOR is not a valid hex color.")
+ errors += 1
+ else:
+ BOT_COLOR = discord.Color(int((config["BOT_INFO"]["BOT_COLOR"]).replace("#", ""), 16))
+
+ # Make sure BOT_INVITE_LINK is a valid URL
+ if not validators.url(config["BOT_INFO"]["BOT_INVITE_LINK"]):
+ LOG.error("BOT_INVITE_LINK is not a valid URL.")
+ errors += 1
+ else:
+ BOT_INVITE_LINK = config["BOT_INFO"]["BOT_INVITE_LINK"]
+
+ # Make sure FEEDBACK_CHANNEL_ID is either exactly 0 or 19 characters long
+ if len(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"]) != 0:
+ if len(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"]) != 19:
+ LOG.error("FEEDBACK_CHANNEL_ID is not a valid Discord channel ID.")
+ errors += 1
+ else:
+ FEEDBACK_CHANNEL_ID = int(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"])
+
+ # Make sure BUG_CHANNEL_ID is either exactly 0 or 19 characters long
+ if len(config["BOT_INFO"]["BUG_CHANNEL_ID"]) != 0:
+ if len(config["BOT_INFO"]["BUG_CHANNEL_ID"]) != 19:
+ LOG.error("BUG_CHANNEL_ID is not a valid Discord channel ID.")
+ errors += 1
+ else:
+ BUG_CHANNEL_ID = int(config["BOT_INFO"]["BUG_CHANNEL_ID"])
+
+ # Assign the rest of the variables
+ TOKEN = config["BOT_INFO"]["TOKEN"]
+ SPOTIFY_CLIENT_ID = config["SPOTIFY"]["SPOTIFY_CLIENT_ID"]
+ SPOTIFY_CLIENT_SECRET = config["SPOTIFY"]["SPOTIFY_CLIENT_SECRET"]
+ APPLE_MUSIC_KEY = config["APPLE_MUSIC"]["APPLE_MUSIC_KEY"]
+ OPENAI_API_KEY = config["OPENAI"]["OPENAI_API_KEY"]
+ LAVALINK_HOST = config["LAVALINK"]["HOST"]
+ LAVALINK_PORT = config["LAVALINK"]["PORT"]
+ LAVALINK_PASSWORD = config["LAVALINK"]["PASSWORD"]
+
+ if errors > 0:
+ sys.exit(
+ LOG.critical(
+ f"Found {errors} error(s) in the config.ini file. Please fix them and try again."
+ )
+ )
+
+
+"""
+Validate all of the environment variables.
+"""
+
+
+def validate_env_vars():
+ global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, APPLE_MUSIC_KEY, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD
+
+ hex_pattern_one = "^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
+ hex_pattern_two = "^([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
+
+ errors = 0
+
+ # Make sure all required variables are present in the environment
+ required_vars = ["TOKEN", "BOT_COLOR", "BOT_INVITE_LINK", "SPOTIFY_CLIENT_ID", "SPOTIFY_CLIENT_SECRET", "APPLE_MUSIC_KEY", "OPENAI_API_KEY", "LAVALINK_HOST", "LAVALINK_PORT", "LAVALINK_PASSWORD"]
+
+ for var in required_vars:
+ if var not in os.environ:
+ LOG.error(f"Missing environment variable: {var}")
+ errors += 1
+
+ # Make sure BOT_COLOR is a valid hex color
+ if not bool(re.match(hex_pattern_one, os.environ["BOT_COLOR"])) and not bool(re.match(hex_pattern_two, os.environ["BOT_COLOR"])):
+ LOG.error("BOT_COLOR is not a valid hex color.")
+ errors += 1
+ else:
+ BOT_COLOR = discord.Color(int((os.environ["BOT_COLOR"]).replace("#", ""), 16))
+
+ # Make sure BOT_INVITE_LINK is a valid URL
+ if not validators.url(os.environ["BOT_INVITE_LINK"]):
+ LOG.error("BOT_INVITE_LINK is not a valid URL.")
+ errors += 1
+ else:
+ BOT_INVITE_LINK = os.environ["BOT_INVITE_LINK"]
+
+ # Make sure FEEDBACK_CHANNEL_ID is either None or 19 characters long
+ try:
+ if len(os.environ["FEEDBACK_CHANNEL_ID"]) != 19:
+ LOG.error("FEEDBACK_CHANNEL_ID is not a valid Discord channel ID.")
+ errors += 1
+ else:
+ FEEDBACK_CHANNEL_ID = int(os.environ["FEEDBACK_CHANNEL_ID"])
+ except KeyError:
+ FEEDBACK_CHANNEL_ID = None
+
+ # Make sure BUG_CHANNEL_ID is either None or 19 characters long
+ try:
+ if len(os.environ["BUG_CHANNEL_ID"]) != 19:
+ LOG.error("BUG_CHANNEL_ID is not a valid Discord channel ID.")
+ errors += 1
+ else:
+ BUG_CHANNEL_ID = int(os.environ["BUG_CHANNEL_ID"])
+ except KeyError:
+ BUG_CHANNEL_ID = None
+
+ if errors > 0:
+ sys.exit(
+ LOG.critical(
+ f"Found {errors} error(s) with environment variables. Please fix them and try again."
+ )
+ )
+
+ # Assign the rest of the variables
+ TOKEN = os.environ["TOKEN"]
+ SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"]
+ SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"]
+ APPLE_MUSIC_KEY = os.environ["APPLE_MUSIC_KEY"]
+ OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
+ LAVALINK_HOST = os.environ["LAVALINK_HOST"]
+ LAVALINK_PORT = os.environ["LAVALINK_PORT"]
+ LAVALINK_PASSWORD = os.environ["LAVALINK_PASSWORD"] \ No newline at end of file
diff --git a/code/utils/custom_sources.py b/code/utils/custom_sources.py
new file mode 100644
index 0000000..96ae3a9
--- /dev/null
+++ b/code/utils/custom_sources.py
@@ -0,0 +1,203 @@
+from lavalink import LoadResult, LoadType, Source, DeferredAudioTrack, PlaylistInfo
+
+
+class LoadError(Exception): # We'll raise this if we have trouble loading our track.
+ pass
+
+
+"""
+Retrieve the playback URL for a custom track
+"""
+
+
+class CustomAudioTrack(DeferredAudioTrack):
+ # A DeferredAudioTrack allows us to load metadata now, and a playback URL later.
+ # This makes the DeferredAudioTrack highly efficient, particularly in cases
+ # where large playlists are loaded.
+
+ async def load(
+ self, client
+ ): # Load our 'actual' playback track using the metadata from this one.
+ ytsearch = f"ytsearch:{self.title} {self.author} audio"
+ results = await client.get_tracks(ytsearch)
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ dzsearch = f"dzsearch:{self.title} {self.author}"
+ results = await client.get_tracks(dzsearch)
+
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ raise LoadError
+
+ first_track = results.tracks[0] # Grab the first track from the results.
+ base64 = first_track.track # Extract the base64 string from the track.
+ self.track = base64 # We'll store this for later, as it allows us to save making network requests
+ # if this track is re-used (e.g. repeat).
+
+ return base64
+
+
+"""
+Custom Source for Spotify links
+"""
+
+
+class SpotifySource(Source):
+ def __init__(self):
+ super().__init__(
+ name="custom"
+ ) # Initialising our custom source with the name 'custom'.
+
+ async def load_item(self, user, metadata):
+ track = CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": metadata[
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": metadata["artists"][0]["name"],
+ "length": metadata["duration_ms"],
+ "isStream": False,
+ "title": metadata["name"],
+ "uri": metadata["external_urls"]["spotify"],
+ "duration": metadata["duration_ms"],
+ "artworkUrl": metadata["album"]["images"][0]["url"],
+ },
+ requester=user,
+ )
+ return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none())
+
+ async def load_album(self, user, metadata):
+ tracks = []
+ for track in metadata["tracks"][
+ "items"
+ ]: # Loop through each track in the album.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track[
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": track["artists"][0]["name"],
+ "length": track["duration_ms"],
+ "isStream": False,
+ "title": track["name"],
+ "uri": track["external_urls"]["spotify"],
+ "duration": track["duration_ms"],
+ "artworkUrl": metadata["images"][0]["url"],
+ },
+ requester=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())
+
+ async def load_playlist(self, user, metadata):
+ tracks = []
+ for track in metadata["tracks"][
+ "items"
+ ]: # Loop through each track in the playlist.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track["track"][
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": track["track"]["artists"][0]["name"],
+ "length": track["track"]["duration_ms"],
+ "isStream": False,
+ "title": track["track"]["name"],
+ "uri": track["track"]["external_urls"]["spotify"],
+ "duration": track["track"]["duration_ms"],
+ "artworkUrl": track["track"]["album"]["images"][0]["url"],
+ },
+ requster=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())
+
+
+"""
+Custom Source for Apple Music links
+"""
+
+
+class AppleSource(Source):
+ def __init__(self):
+ super().__init__(name="custom")
+
+ async def load_item(self, user, metadata):
+ track = CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": metadata["data"][0]["id"],
+ "isSeekable": True,
+ "author": metadata["data"][0]["attributes"]["artistName"],
+ "length": metadata["data"][0]["attributes"]["durationInMillis"],
+ "isStream": False,
+ "title": metadata["data"][0]["attributes"]["name"],
+ "uri": metadata["data"][0]["attributes"]["url"],
+ "duration": metadata["data"][0]["attributes"]["durationInMillis"],
+ "artworkUrl": metadata["data"][0]["attributes"]["artwork"]["url"].replace(
+ "{w}x{h}", "300x300"
+ ),
+ },
+ requester=user,
+ )
+ return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none())
+
+ async def load_album(self, user, metadata):
+ tracks = []
+ for track in metadata["data"][0]["relationships"]["tracks"][
+ "data"
+ ]: # Loop through each track in the album.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track["id"],
+ "isSeekable": True,
+ "author": track["attributes"]["artistName"],
+ "length": track["attributes"]["durationInMillis"],
+ "isStream": False,
+ "title": track["attributes"]["name"],
+ "uri": track["attributes"]["url"],
+ "duration": track["attributes"]["durationInMillis"],
+ "artworkUrl": track["attributes"]["artwork"]["url"].replace(
+ "{w}x{h}", "300x300"
+ ),
+ },
+ requster=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())
+
+ async def load_playlist(self, user, metadata):
+ tracks = []
+ for track in metadata["data"]: # Loop through each track in the playlist.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track["id"],
+ "isSeekable": True,
+ "author": track["attributes"]["artistName"],
+ "length": track["attributes"]["durationInMillis"],
+ "isStream": False,
+ "title": track["attributes"]["name"],
+ "uri": track["attributes"]["url"],
+ "duration": track["attributes"]["durationInMillis"],
+ "artworkUrl": track["attributes"]["artwork"]["url"].replace(
+ "{w}x{h}", "300x300"
+ ),
+ },
+ requster=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())