From fda03dff62cc7967bd279777a012a22ddaa6ad34 Mon Sep 17 00:00:00 2001 From: Parker Date: Sat, 20 Jul 2024 18:15:15 -0500 Subject: Move files to `utils` folder --- code/utils/ai_recommendations.py | 81 ++++++++++++ code/utils/command_tree.py | 4 +- code/utils/config.py | 274 +++++++++++++++++++++++++++++++++++++++ code/utils/custom_sources.py | 203 +++++++++++++++++++++++++++++ 4 files changed, 560 insertions(+), 2 deletions(-) create mode 100644 code/utils/ai_recommendations.py create mode 100644 code/utils/config.py create mode 100644 code/utils/custom_sources.py (limited to 'code/utils') diff --git a/code/utils/ai_recommendations.py b/code/utils/ai_recommendations.py new file mode 100644 index 0000000..68764bc --- /dev/null +++ b/code/utils/ai_recommendations.py @@ -0,0 +1,81 @@ +from lavalink import LoadType +import re + + +async def add_song_recommendations( + openai_client, bot_user, player, number, inputs, retries: int = 1 +): + input_list = [f'"{song} by {artist}"' for song, artist in inputs.items()] + + completion = ( + openai_client.chat.completions.create( + messages=[ + { + "role": "user", + "content": f""" + BACKGROUND: You're an AI music recommendation system with a knack for understanding + user preferences based on provided input. Your task is to generate a list + of {number} songs that the user might enjoy, derived from a given list of {number} songs. + The input will be in the format of + ["Song-1-Name by Song-1-Artist", "Song-2-Name by Song-2-Artist", ...] + and you need to return a list formatted in the same way. + + When recommending songs, consider the genre, tempo, and mood of the input + songs to suggest similar ones that align with the user's tastes. Also, it + is important to mix up the artists, don't only give the same artists that + are already in the queue. If you cannot find {number} songs that match the + criteria or encounter any issues, return the list ["NOTHING FOUND"]. + + Please be sure to also only use characters A-Z, a-z, 0-9, and spaces in the + song and artist names. Do not include escape/special characters, emojis, or + quotes in the output. + + INPUT: {input_list} + """, + } + ], + model="gpt-4o-mini", + ) + .choices[0] + .message.content.strip() + .strip('"') + ) + + # Sometimes ChatGPT will return `["NOTHING FOUND"]` even if it should + # have found something, so we check each prompt up to 3 times before + # giving up. + if completion == '["NOTHING FOUND"]': + if retries <= 3: + await add_song_recommendations( + openai_client, bot_user, player, number, inputs, retries + 1 + ) + else: + return False + + else: + # Clean up the completion string to remove any potential issues + # with the eval function (e.g. OUTPUT: prefix, escaped quotes, etc.) + completion = re.sub(r"[\\\'\[\]\n]+|OUTPUT: ", "", completion) + + for entry in eval(completion): + song, artist = entry.split(" by ") + ytsearch = f"ytsearch:{song} by {artist} audio" + results = await player.node.get_tracks(ytsearch) + + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + dzsearch = f"dzsearch:{song}" + results = await player.node.get_tracks(dzsearch) + + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + continue + + track = results.tracks[0] + player.add(requester=bot_user, track=track) + + return True diff --git a/code/utils/command_tree.py b/code/utils/command_tree.py index 504d3a9..947a58c 100644 --- a/code/utils/command_tree.py +++ b/code/utils/command_tree.py @@ -3,8 +3,8 @@ from discord import app_commands from discord.ext.commands.errors import * import datetime -from config import BOT_COLOR -from custom_sources import LoadError +from utils.config import BOT_COLOR +from utils.custom_sources import LoadError # Create a custom AppCommandError for the create_player function diff --git a/code/utils/config.py b/code/utils/config.py new file mode 100644 index 0000000..d411653 --- /dev/null +++ b/code/utils/config.py @@ -0,0 +1,274 @@ +import configparser +import re +import os +import validators +import sys +import discord +import openai +import logging +from colorlog import ColoredFormatter + +log_level = logging.DEBUG +log_format = ( + " %(log_color)s%(levelname)-8s%(reset)s | %(log_color)s%(message)s%(reset)s" +) + +logging.root.setLevel(log_level) +formatter = ColoredFormatter(log_format) + +stream = logging.StreamHandler() +stream.setLevel(log_level) +stream.setFormatter(formatter) + +LOG = logging.getLogger("pythonConfig") +LOG.setLevel(log_level) +LOG.addHandler(stream) + +TOKEN = None +BOT_COLOR = None +BOT_INVITE_LINK = None +FEEDBACK_CHANNEL_ID = None +BUG_CHANNEL_ID = None +SPOTIFY_CLIENT_ID = None +SPOTIFY_CLIENT_SECRET = None +APPLE_MUSIC_KEY = None +OPENAI_API_KEY = None +LAVALINK_HOST = None +LAVALINK_PORT = None +LAVALINK_PASSWORD = None + +""" +Load the config.ini file and return the contents for validation or +create a new templated config.ini file if it doesn't exist. +""" + + +def load_config(): + # Look for variables in the environment + if "TOKEN" in os.environ or "BOT_COLOR" in os.environ or "BOT_INVITE_LINK" in os.environ: + LOG.info("Detected environment variables. Checking for configuration options.") + return validate_env_vars() + else: + LOG.info("Detected local environment. Checking for config.ini file.") + + try: + with open("config.ini", "r") as f: + file_contents = f.read() + validate_config(file_contents) + + except FileNotFoundError: + config = configparser.ConfigParser() + config["BOT_INFO"] = { + "TOKEN": "", + "BOT_COLOR": "", + "BOT_INVITE_LINK": "", + "FEEDBACK_CHANNEL_ID": "", + "BUG_CHANNEL_ID": "", + } + + config["SPOTIFY"] = { + "SPOTIFY_CLIENT_ID": "", + "SPOTIFY_CLIENT_SECRET": "", + } + + config["APPLE_MUSIC"] = { + "APPLE_MUSIC_KEY": "", + } + + config["OPENAI"] = { + "OPENAI_API_KEY": "", + } + + config["LAVALINK"] = { + "HOST": "", + "PORT": "", + "PASSWORD": "", + } + + with open("config.ini", "w") as configfile: + config.write(configfile) + + sys.exit( + LOG.critical( + "Configuration file `config.ini` has been generated. Please fill out all of the necessary information. Refer to the docs for information on what a specific configuration option is." + ) + ) + + +""" +Validate all of the options in the config.ini file. +""" + + +def validate_config(file_contents): + global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, APPLE_MUSIC_KEY, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD + config = configparser.ConfigParser() + config.read_string(file_contents) + + hex_pattern_one = "^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$" + hex_pattern_two = "^([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$" + + errors = 0 + + # Make sure all sections are present + if ["BOT_INFO", "SPOTIFY", "APPLE_MUSIC", "OPENAI", "LAVALINK"] != config.sections(): + sys.exit( + LOG.critical( + "Missing sections in config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + if ["token","bot_color","bot_invite_link", "feedback_channel_id","bug_channel_id",] != config.options("BOT_INFO"): + sys.exit( + LOG.critical( + "Missing options in BOT_INFO section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + if ["spotify_client_id", "spotify_client_secret"] != config.options("SPOTIFY"): + sys.exit( + LOG.critical( + "Missing options in SPOTIFY section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + if ["apple_music_key"] != config.options("APPLE_MUSIC"): + sys.exit( + LOG.critical( + "Missing options in APPLE_MUSIC section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + if ["openai_api_key"] != config.options("OPENAI"): + sys.exit( + LOG.critical( + "Missing options in OPENAI section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + if ["host", "port", "password"] != config.options("LAVALINK"): + sys.exit( + LOG.critical( + "Missing options in LAVALINK section of config.ini file. Delete the file and re-run the bot to generate a blank config.ini file." + ) + ) + + # Make sure BOT_COLOR is a valid hex color + if not bool(re.match(hex_pattern_one, config["BOT_INFO"]["BOT_COLOR"])) and not bool(re.match(hex_pattern_two, config["BOT_INFO"]["BOT_COLOR"])): + LOG.error("BOT_COLOR is not a valid hex color.") + errors += 1 + else: + BOT_COLOR = discord.Color(int((config["BOT_INFO"]["BOT_COLOR"]).replace("#", ""), 16)) + + # Make sure BOT_INVITE_LINK is a valid URL + if not validators.url(config["BOT_INFO"]["BOT_INVITE_LINK"]): + LOG.error("BOT_INVITE_LINK is not a valid URL.") + errors += 1 + else: + BOT_INVITE_LINK = config["BOT_INFO"]["BOT_INVITE_LINK"] + + # Make sure FEEDBACK_CHANNEL_ID is either exactly 0 or 19 characters long + if len(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"]) != 0: + if len(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"]) != 19: + LOG.error("FEEDBACK_CHANNEL_ID is not a valid Discord channel ID.") + errors += 1 + else: + FEEDBACK_CHANNEL_ID = int(config["BOT_INFO"]["FEEDBACK_CHANNEL_ID"]) + + # Make sure BUG_CHANNEL_ID is either exactly 0 or 19 characters long + if len(config["BOT_INFO"]["BUG_CHANNEL_ID"]) != 0: + if len(config["BOT_INFO"]["BUG_CHANNEL_ID"]) != 19: + LOG.error("BUG_CHANNEL_ID is not a valid Discord channel ID.") + errors += 1 + else: + BUG_CHANNEL_ID = int(config["BOT_INFO"]["BUG_CHANNEL_ID"]) + + # Assign the rest of the variables + TOKEN = config["BOT_INFO"]["TOKEN"] + SPOTIFY_CLIENT_ID = config["SPOTIFY"]["SPOTIFY_CLIENT_ID"] + SPOTIFY_CLIENT_SECRET = config["SPOTIFY"]["SPOTIFY_CLIENT_SECRET"] + APPLE_MUSIC_KEY = config["APPLE_MUSIC"]["APPLE_MUSIC_KEY"] + OPENAI_API_KEY = config["OPENAI"]["OPENAI_API_KEY"] + LAVALINK_HOST = config["LAVALINK"]["HOST"] + LAVALINK_PORT = config["LAVALINK"]["PORT"] + LAVALINK_PASSWORD = config["LAVALINK"]["PASSWORD"] + + if errors > 0: + sys.exit( + LOG.critical( + f"Found {errors} error(s) in the config.ini file. Please fix them and try again." + ) + ) + + +""" +Validate all of the environment variables. +""" + + +def validate_env_vars(): + global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, APPLE_MUSIC_KEY, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD + + hex_pattern_one = "^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$" + hex_pattern_two = "^([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$" + + errors = 0 + + # Make sure all required variables are present in the environment + required_vars = ["TOKEN", "BOT_COLOR", "BOT_INVITE_LINK", "SPOTIFY_CLIENT_ID", "SPOTIFY_CLIENT_SECRET", "APPLE_MUSIC_KEY", "OPENAI_API_KEY", "LAVALINK_HOST", "LAVALINK_PORT", "LAVALINK_PASSWORD"] + + for var in required_vars: + if var not in os.environ: + LOG.error(f"Missing environment variable: {var}") + errors += 1 + + # Make sure BOT_COLOR is a valid hex color + if not bool(re.match(hex_pattern_one, os.environ["BOT_COLOR"])) and not bool(re.match(hex_pattern_two, os.environ["BOT_COLOR"])): + LOG.error("BOT_COLOR is not a valid hex color.") + errors += 1 + else: + BOT_COLOR = discord.Color(int((os.environ["BOT_COLOR"]).replace("#", ""), 16)) + + # Make sure BOT_INVITE_LINK is a valid URL + if not validators.url(os.environ["BOT_INVITE_LINK"]): + LOG.error("BOT_INVITE_LINK is not a valid URL.") + errors += 1 + else: + BOT_INVITE_LINK = os.environ["BOT_INVITE_LINK"] + + # Make sure FEEDBACK_CHANNEL_ID is either None or 19 characters long + try: + if len(os.environ["FEEDBACK_CHANNEL_ID"]) != 19: + LOG.error("FEEDBACK_CHANNEL_ID is not a valid Discord channel ID.") + errors += 1 + else: + FEEDBACK_CHANNEL_ID = int(os.environ["FEEDBACK_CHANNEL_ID"]) + except KeyError: + FEEDBACK_CHANNEL_ID = None + + # Make sure BUG_CHANNEL_ID is either None or 19 characters long + try: + if len(os.environ["BUG_CHANNEL_ID"]) != 19: + LOG.error("BUG_CHANNEL_ID is not a valid Discord channel ID.") + errors += 1 + else: + BUG_CHANNEL_ID = int(os.environ["BUG_CHANNEL_ID"]) + except KeyError: + BUG_CHANNEL_ID = None + + if errors > 0: + sys.exit( + LOG.critical( + f"Found {errors} error(s) with environment variables. Please fix them and try again." + ) + ) + + # Assign the rest of the variables + TOKEN = os.environ["TOKEN"] + SPOTIFY_CLIENT_ID = os.environ["SPOTIFY_CLIENT_ID"] + SPOTIFY_CLIENT_SECRET = os.environ["SPOTIFY_CLIENT_SECRET"] + APPLE_MUSIC_KEY = os.environ["APPLE_MUSIC_KEY"] + OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] + LAVALINK_HOST = os.environ["LAVALINK_HOST"] + LAVALINK_PORT = os.environ["LAVALINK_PORT"] + LAVALINK_PASSWORD = os.environ["LAVALINK_PASSWORD"] \ No newline at end of file diff --git a/code/utils/custom_sources.py b/code/utils/custom_sources.py new file mode 100644 index 0000000..96ae3a9 --- /dev/null +++ b/code/utils/custom_sources.py @@ -0,0 +1,203 @@ +from lavalink import LoadResult, LoadType, Source, DeferredAudioTrack, PlaylistInfo + + +class LoadError(Exception): # We'll raise this if we have trouble loading our track. + pass + + +""" +Retrieve the playback URL for a custom track +""" + + +class CustomAudioTrack(DeferredAudioTrack): + # A DeferredAudioTrack allows us to load metadata now, and a playback URL later. + # This makes the DeferredAudioTrack highly efficient, particularly in cases + # where large playlists are loaded. + + async def load( + self, client + ): # Load our 'actual' playback track using the metadata from this one. + ytsearch = f"ytsearch:{self.title} {self.author} audio" + results = await client.get_tracks(ytsearch) + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + dzsearch = f"dzsearch:{self.title} {self.author}" + results = await client.get_tracks(dzsearch) + + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + raise LoadError + + first_track = results.tracks[0] # Grab the first track from the results. + base64 = first_track.track # Extract the base64 string from the track. + self.track = base64 # We'll store this for later, as it allows us to save making network requests + # if this track is re-used (e.g. repeat). + + return base64 + + +""" +Custom Source for Spotify links +""" + + +class SpotifySource(Source): + def __init__(self): + super().__init__( + name="custom" + ) # Initialising our custom source with the name 'custom'. + + async def load_item(self, user, metadata): + track = CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": metadata[ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": metadata["artists"][0]["name"], + "length": metadata["duration_ms"], + "isStream": False, + "title": metadata["name"], + "uri": metadata["external_urls"]["spotify"], + "duration": metadata["duration_ms"], + "artworkUrl": metadata["album"]["images"][0]["url"], + }, + requester=user, + ) + return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none()) + + async def load_album(self, user, metadata): + tracks = [] + for track in metadata["tracks"][ + "items" + ]: # Loop through each track in the album. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track[ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": track["artists"][0]["name"], + "length": track["duration_ms"], + "isStream": False, + "title": track["name"], + "uri": track["external_urls"]["spotify"], + "duration": track["duration_ms"], + "artworkUrl": metadata["images"][0]["url"], + }, + requester=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()) + + async def load_playlist(self, user, metadata): + tracks = [] + for track in metadata["tracks"][ + "items" + ]: # Loop through each track in the playlist. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track["track"][ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": track["track"]["artists"][0]["name"], + "length": track["track"]["duration_ms"], + "isStream": False, + "title": track["track"]["name"], + "uri": track["track"]["external_urls"]["spotify"], + "duration": track["track"]["duration_ms"], + "artworkUrl": track["track"]["album"]["images"][0]["url"], + }, + requster=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()) + + +""" +Custom Source for Apple Music links +""" + + +class AppleSource(Source): + def __init__(self): + super().__init__(name="custom") + + async def load_item(self, user, metadata): + track = CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": metadata["data"][0]["id"], + "isSeekable": True, + "author": metadata["data"][0]["attributes"]["artistName"], + "length": metadata["data"][0]["attributes"]["durationInMillis"], + "isStream": False, + "title": metadata["data"][0]["attributes"]["name"], + "uri": metadata["data"][0]["attributes"]["url"], + "duration": metadata["data"][0]["attributes"]["durationInMillis"], + "artworkUrl": metadata["data"][0]["attributes"]["artwork"]["url"].replace( + "{w}x{h}", "300x300" + ), + }, + requester=user, + ) + return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none()) + + async def load_album(self, user, metadata): + tracks = [] + for track in metadata["data"][0]["relationships"]["tracks"][ + "data" + ]: # Loop through each track in the album. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track["id"], + "isSeekable": True, + "author": track["attributes"]["artistName"], + "length": track["attributes"]["durationInMillis"], + "isStream": False, + "title": track["attributes"]["name"], + "uri": track["attributes"]["url"], + "duration": track["attributes"]["durationInMillis"], + "artworkUrl": track["attributes"]["artwork"]["url"].replace( + "{w}x{h}", "300x300" + ), + }, + requster=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()) + + async def load_playlist(self, user, metadata): + tracks = [] + for track in metadata["data"]: # Loop through each track in the playlist. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track["id"], + "isSeekable": True, + "author": track["attributes"]["artistName"], + "length": track["attributes"]["durationInMillis"], + "isStream": False, + "title": track["attributes"]["name"], + "uri": track["attributes"]["url"], + "duration": track["attributes"]["durationInMillis"], + "artworkUrl": track["attributes"]["artwork"]["url"].replace( + "{w}x{h}", "300x300" + ), + }, + requster=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()) -- cgit v1.2.3-70-g09d2