From 382c42262954767acc3f5ee3e008e03acbfc4f26 Mon Sep 17 00:00:00 2001 From: Parker Date: Thu, 28 Nov 2024 00:04:48 -0600 Subject: [PATCH] Overhaul \`play\` command - Split custom sources into helper functions - Add proper logging and handling - Fix LoadError embed messsage --- code/cogs/play.py | 426 ++++-------------- code/utils/command_tree.py | 22 +- code/utils/source_helpers/apple/album.py | 79 ++++ code/utils/source_helpers/apple/playlist.py | 90 ++++ code/utils/source_helpers/apple/song.py | 74 +++ code/utils/source_helpers/parse.py | 80 ++++ code/utils/source_helpers/spotify/album.py | 73 +++ code/utils/source_helpers/spotify/playlist.py | 73 +++ code/utils/source_helpers/spotify/song.py | 68 +++ test.py | 11 + 10 files changed, 655 insertions(+), 341 deletions(-) create mode 100644 code/utils/source_helpers/apple/album.py create mode 100644 code/utils/source_helpers/apple/playlist.py create mode 100644 code/utils/source_helpers/apple/song.py create mode 100644 code/utils/source_helpers/parse.py create mode 100644 code/utils/source_helpers/spotify/album.py create mode 100644 code/utils/source_helpers/spotify/playlist.py create mode 100644 code/utils/source_helpers/spotify/song.py create mode 100644 test.py diff --git a/code/cogs/play.py b/code/cogs/play.py index 82ac214..38a91b8 100644 --- a/code/cogs/play.py +++ b/code/cogs/play.py @@ -4,11 +4,14 @@ from discord import app_commands from discord.ext import commands from lavalink import LoadType import re -import requests from cogs.music import Music, LavalinkVoiceClient from utils.config import BOT_COLOR, YOUTUBE_SUPPORT -from utils.custom_sources import SpotifySource, AppleSource +from utils.custom_sources import ( + LoadError, + CustomAudioTrack, +) +from utils.source_helpers.parse import parse_custom_source url_rx = re.compile(r"https?://(?:www\.)?.+") @@ -43,366 +46,137 @@ class Play(commands.Cog): embed=embed, ephemeral=True ) - ### - ### APPLE MUSIC links, perform API requests and load all tracks from the playlist/album/track - ### - if "music.apple.com" in query: - if not self.bot.apple_headers: - embed = discord.Embed( - title="Apple Music Error", - description=( - "Apple Music support seems to be broken at the moment." - " Please try again and fill out a bug report with" - " if this continues to" - " happen." - ), - color=BOT_COLOR, - ) - return await interaction.response.send_message( - embed=embed, ephemeral=True - ) - - embed = discord.Embed(color=BOT_COLOR) - - if "/playlist/" in query and "?i=" not in query: - playlist_id = query.split("/playlist/")[1].split("/")[1] - # Get all of the tracks in the playlist (limit at 250) - playlist_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100" - response = requests.get( - playlist_url, headers=self.bot.apple_headers - ) - if response.status_code == 200: - playlist = response.json() - # Get the general playlist info (name, artwork) - playlist_info_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}" - playlist_info = requests.get( - playlist_info_url, headers=self.bot.apple_headers - ) - playlist_info = playlist_info.json() - try: - artwork_url = playlist_info["data"][0]["attributes"][ - "artwork" - ]["url"].replace("{w}x{h}", "300x300") - except KeyError: - artwork_url = None - - embed.title = "Playlist Queued" - embed.description = ( - f"**{playlist_info['data'][0]['attributes']['name']}**\n`" - f" {len(playlist['data'])} ` tracks\n\nQueued by:" - f" {interaction.user.mention}" - ) - embed.set_thumbnail(url=artwork_url) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - # Add small alert if the playlist is the max size - if len(playlist["data"]) == 100: - embed.description += ( - "\n\n*This playlist is longer than the 100 song" - " maximum. Only the first 100 songs will be" - " queued.*" - ) - - await interaction.response.send_message(embed=embed) - - tracks = await AppleSource.load_playlist( - self, interaction.user, playlist - ) - for track in tracks["tracks"]: - player.add(requester=interaction.user, track=track) - - # If there is an album, not a specific song within the album - if "/album/" in query and "?i=" not in query: - album_id = query.split("/album/")[1].split("/")[1] - album_url = f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}" - response = requests.get( - album_url, headers=self.bot.apple_headers - ) - if response.status_code == 200: - album = response.json() - - embed.title = "Album Queued" - embed.description = ( - f"**{album['data'][0]['attributes']['name']}** by" - f" **{album['data'][0]['attributes']['artistName']}**\n`" - f" {len(album['data'][0]['relationships']['tracks']['data'])} `" - f" tracks\n\nQueued by: {interaction.user.mention}" - ) - embed.set_thumbnail( - url=album["data"][0]["attributes"]["artwork"][ - "url" - ].replace("{w}x{h}", "300x300") - ) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - tracks = await AppleSource.load_album( - self, interaction.user, album - ) - for track in tracks["tracks"]: - player.add(requester=interaction.user, track=track) - - # If there is a specific song - if "/album/" in query and "?i=" in query: - song_id = query.split("/album/")[1].split("?i=")[1] - song_url = f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}" - response = requests.get( - song_url, headers=self.bot.apple_headers - ) - if response.status_code == 200: - song = response.json() - - embed.title = "Song Queued" - embed.description = ( - f"**{song['data'][0]['attributes']['name']}** by" - f" **{song['data'][0]['attributes']['artistName']}**\n\nQueued" - f" by: {interaction.user.mention}" - ) - embed.set_thumbnail( - url=song["data"][0]["attributes"]["artwork"][ - "url" - ].replace("{w}x{h}", "300x300") - ) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - results = await AppleSource.load_item( - self, interaction.user, song - ) - player.add( - requester=interaction.user, track=results.tracks[0] - ) - - ### - ### SPOTIFY links, perform API requests and load all tracks from the playlist/album/track - ### + results, embed = await parse_custom_source( + self, "apple", query, interaction.user + ) elif "open.spotify.com" in query: - if not self.bot.spotify_headers: - embed = discord.Embed( - title="Spotify Error", - description=( - "Spotify support seems to be broken at the moment." - " Please try again and fill out a bug report with" - " if this continues to" - " happen." - ), - color=BOT_COLOR, - ) - return await interaction.response.send_message( - embed=embed, ephemeral=True - ) - - embed = discord.Embed(color=BOT_COLOR) - - if "open.spotify.com/playlist" in query: - playlist_id = query.split("playlist/")[1].split("?si=")[0] - playlist_url = ( - f"https://api.spotify.com/v1/playlists/{playlist_id}" - ) - response = requests.get( - playlist_url, headers=self.bot.spotify_headers - ) - if response.status_code == 200: - playlist = response.json() - - embed.title = "Playlist Queued" - embed.description = ( - f"**{playlist['name']}** from" - f" **{playlist['owner']['display_name']}**\n`" - f" {len(playlist['tracks']['items'])} `" - f" tracks\n\nQueued by: {interaction.user.mention}" - ) - embed.set_thumbnail(url=playlist["images"][0]["url"]) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - tracks = await SpotifySource.load_playlist( - self, interaction.user, playlist - ) - for track in tracks["tracks"]: - player.add(requester=interaction.user, track=track) - - if "open.spotify.com/album" in query: - album_id = query.split("album/")[1] - album_url = f"https://api.spotify.com/v1/albums/{album_id}" - response = requests.get( - album_url, headers=self.bot.spotify_headers - ) - if response.status_code == 200: - album = response.json() - - embed.title = "Album Queued" - embed.description = ( - f"**{album['name']}** by" - f" **{album['artists'][0]['name']}**\n`" - f" {len(album['tracks']['items'])} ` tracks\n\nQueued" - f" by: {interaction.user.mention}" - ) - embed.set_thumbnail(url=album["images"][0]["url"]) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - tracks = await SpotifySource.load_album( - self, interaction.user, album - ) - for track in tracks["tracks"]: - player.add(requester=interaction.user, track=track) - - if "open.spotify.com/track" in query: - track_id = query.split("track/")[1] - track_url = f"https://api.spotify.com/v1/tracks/{track_id}" - response = requests.get( - track_url, headers=self.bot.spotify_headers - ) - if response.status_code == 200: - track = response.json() - - embed.title = "Track Queued" - embed.description = ( - f"**{track['name']}** by" - f" **{track['artists'][0]['name']}**\n\nQueued by:" - f" {interaction.user.mention}" - ) - embed.set_thumbnail(url=track["album"]["images"][0]["url"]) - embed.set_footer( - text=datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y-%m-%d %H:%M:%S") - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - results = await SpotifySource.load_item( - self, interaction.user, track - ) - player.add( - requester=interaction.user, track=results.tracks[0] - ) - - if "open.spotify.com/artists" in query: - embed.title = "Artists Cannot Be Played" - embed.description = ( - "I cannot play just artists, you must provide a" - " song/album/playlist. Please try again." - ) - return await interaction.response.send_message( - embed=embed, ephemeral=True - ) + results, embed = await parse_custom_source( + self, "spotify", query, interaction.user + ) ### ### For anything else, use default Lavalink providers to search the query ### else: + # If the query is not a URL, begin searching if not url_rx.match(query): dzsearch = f"dzsearch:{query}" results = await player.node.get_tracks(dzsearch) - + # If Deezer returned nothing if not results.tracks or results.load_type in ( LoadType.EMPTY, LoadType.ERROR, ): - ytmsearch = f"ytmsearch:{query}" - results = await player.node.get_tracks(ytmsearch) - - if not results.tracks or results.load_type in ( - LoadType.EMPTY, - LoadType.ERROR, - ): - ytsearch = f"ytsearch:{query}" - results = await player.node.get_tracks(ytsearch) + if YOUTUBE_SUPPORT: + ytmsearch = f"ytmsearch:{query}" + results = await player.node.get_tracks(ytmsearch) + # If YouTube Music returned nothing + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + # Final search attempt with YouTube + ytsearch = f"ytsearch:{query}" + results = await player.node.get_tracks(ytsearch) else: results = await player.node.get_tracks(query) - embed = discord.Embed(color=BOT_COLOR) - + # If there are no results found, set results/embed to None, handled further down if not results.tracks or results.load_type in ( LoadType.EMPTY, LoadType.ERROR, ): - embed.title = "Nothing Found" - embed.description = ( - "Nothing for that query could be found. If this continues" - " happening for other songs, please run" - " to let the developer know." - ) - return await interaction.response.send_message( - embed=embed, ephemeral=True - ) + results, embed = None, None - elif results.load_type == LoadType.PLAYLIST: - tracks = results.tracks - - for track in tracks: - player.add(requester=interaction.user, track=track) - - embed.title = "Songs Queued!" - embed.description = ( - f"**{results.playlist_info.name}**\n` {len(tracks)} `" - f" tracks\n\nQueued by: {interaction.user.mention}" + # Create the embed if the results are a playlist + if results.load_type == LoadType.PLAYLIST: + embed = discord.Embed( + title="Songs Queued!", + description=( + f"**{results.playlist_info.name}**\n" + f"` {len(results.tracks)} ` tracks\n\n" + f"Queued by: {interaction.user.mention}" + ), + color=BOT_COLOR, ) embed.set_footer( - text=datetime.datetime.now(datetime.timezone.utc).strftime( + text=datetime.datetime.utcnow().strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" + ) + await interaction.response.send_message(embed=embed) + # Otherwise, the result is just a single track, create that embed + else: + # Remove all but first track (most relevant result) + results.tracks = results.tracks[:1] + embed = discord.Embed( + title="Song Queued!", + description=( + f"**{results.tracks[0].title}** by" + f" **{results.tracks[0].author}**\n\nQueued by:" + f" {interaction.user.mention}" + ), + color=BOT_COLOR, + ) + embed.set_thumbnail(url=results.tracks[0].artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime( "%Y-%m-%d %H:%M:%S" ) + " UTC" ) await interaction.response.send_message(embed=embed) - else: - track = results.tracks[0] + # If there are no results, and no embed + if not results and not embed: + embed = discord.Embed( + title="Nothing Found", + description=( + "I was not able to find or load any songs for that query." + " Please try again and fill out a bug report with" + " if this continues to happen." + ), + color=BOT_COLOR, + ) + return await interaction.response.send_message( + embed=embed, ephemeral=True + ) + # If there are no results, but there is an embed (error msg) + elif embed and not results: + return await interaction.response.send_message( + embed=embed, ephemeral=True + ) + # If there are results, add them to the player + else: + for track in results.tracks: player.add(requester=interaction.user, track=track) - embed.title = "Track Queued" - embed.description = ( - f"**{track.title}** by **{track.author}**\n\nQueued by:" - f" {interaction.user.mention}" - ) - embed.set_thumbnail(url=track.artwork_url) - embed.set_footer( - text=datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y-%m-%d %H:%M:%S" - ) - + " UTC" - ) - await interaction.response.send_message(embed=embed) + # If the track is CustomAudioTrack (Apple Music/Spotify) + if type(results.tracks[0]) == CustomAudioTrack: + # Attempt to load an actual track from a provider + try: + await results.tracks[0].load(player.node) + # If it fails, remove it from the queue and alert the user + except LoadError: + player.queue.remove(results.tracks[0]) + raise LoadError - # Only join the voice channel now, so that the bot doesn't join if nothing is found - if not interaction.guild.voice_client: - await interaction.user.voice.channel.connect( - cls=LavalinkVoiceClient, self_deaf=True - ) + # Join the voice channel if not already connected + if not interaction.guild.voice_client: + await interaction.user.voice.channel.connect( + cls=LavalinkVoiceClient, self_deaf=True + ) - # We don't want to call .play() if the player is playing as that will - # effectively skip the current track - if not player.is_playing: - await player.play() + # Only call player.play if it is not already playing, otherwise it will + # effectively skip the current track + if not player.is_playing: + await player.play() + + await interaction.response.send_message(embed=embed) async def setup(bot): diff --git a/code/utils/command_tree.py b/code/utils/command_tree.py index 3d9214d..7ccb8b1 100644 --- a/code/utils/command_tree.py +++ b/code/utils/command_tree.py @@ -84,31 +84,23 @@ class Tree(app_commands.CommandTree): except discord.errors.InteractionResponded: await interaction.followup.send(embed=embed, ephemeral=True) - # If a Spotify song is linked but cannot be found on a provider (e.g. YouTube) - elif isinstance(error, LoadError): + elif (error, LoadError): embed = discord.Embed( - title="Nothing Found", + title="Load Error", description=( - "Spotify does not allow direct play, meaning songs have to" - " be found on a supported provider. In this case, the song" - " couldn't be found. Please try again with a different" - " song, or try searching for just the name and artist" - " manually rather than sending a link." + "Apple Music and Spotify do not allow direct playing from" + " their websites, and I was unable to load a track on a" + " valid source. Please try again." ), color=BOT_COLOR, ) - embed.set_footer( - text=datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y-%m-%d %H:%M:%S" - ) - + " UTC" - ) + # Only send the error if the interaction is still valid try: await interaction.response.send_message( embed=embed, ephemeral=True ) except discord.errors.InteractionResponded: - await interaction.followup.send(embed=embed, ephemeral=True) + pass else: raise error diff --git a/code/utils/source_helpers/apple/album.py b/code/utils/source_helpers/apple/album.py new file mode 100644 index 0000000..3195e2d --- /dev/null +++ b/code/utils/source_helpers/apple/album.py @@ -0,0 +1,79 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the album info from the Apple Music API + """ + album_id = query.split("/album/")[1].split("/")[1] + + try: + # Get the album info + response = requests.get( + f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Album Not Found", + description=( + "The album could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Apple Music API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + # Unpack the album info + album = response.json() + name = album["data"][0]["attributes"]["name"] + artist = album["data"][0]["attributes"]["artistName"] + num_tracks = len(album["data"][0]["relationships"]["tracks"]["data"]) + except IndexError: + LOG.error("Failed unpacking Apple Music album info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Apple Music API") + return None, None + + # Extract artwork URL, if available + artwork_url = ( + album["data"][0]["attributes"].get("artwork", {}).get("url", None) + ) + if artwork_url: + artwork_url = artwork_url.replace("{w}x{h}", "300x300") + + embed = discord.Embed( + title="Album Queued", + description=( + f"**{name}** by **{artist}**\n" + f"` {num_tracks} ` tracks\n\n" + f"Queued by: {user.mention}" + ), + color=BOT_COLOR, + ) + embed.set_thumbnail(url=artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC" + ) + + return album, embed diff --git a/code/utils/source_helpers/apple/playlist.py b/code/utils/source_helpers/apple/playlist.py new file mode 100644 index 0000000..8a27315 --- /dev/null +++ b/code/utils/source_helpers/apple/playlist.py @@ -0,0 +1,90 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the playlist info from the Apple Music API + """ + playlist_id = query.split("/playlist/")[1].split("/")[1] + try: + # Get all of the tracks in the playlist (limit at 100) + response = requests.get( + f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Playlist Not Found", + description=( + "The playlist could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Apple Music API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + playlist = response.json() + + # Get the general playlist info (name, artwork) + response = requests.get( + f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}", + headers=headers, + ) + + response.raise_for_status() + # Unpack the playlist info + playlist_info = response.json() + name = playlist_info["data"][0]["attributes"]["name"] + num_tracks = len(playlist["data"]) + except IndexError: + LOG.error("Failed unpacking Apple Music playlist info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Apple Music API") + return None, None + + # Extract artwork URL, if available + artwork_url = ( + playlist_info["data"][0]["attributes"] + .get("artwork", {}) + .get("url", None) + ) + if artwork_url: + artwork_url = artwork_url.replace("{w}x{h}", "300x300") + + embed = discord.Embed( + title="Playlist Queued", + description=( + f"**{name}**\n` {num_tracks} ` tracks\n\nQueued by: {user.mention}" + ), + color=BOT_COLOR, + ) + + # Add small alert if the playlist is the max size + if len(playlist["data"]) == 100: + embed.description += ( + "\n\n*This playlist is longer than the 100 song" + " maximum. Only the first 100 songs will be" + " queued.*" + ) + + return playlist, embed diff --git a/code/utils/source_helpers/apple/song.py b/code/utils/source_helpers/apple/song.py new file mode 100644 index 0000000..55db003 --- /dev/null +++ b/code/utils/source_helpers/apple/song.py @@ -0,0 +1,74 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the song info from the Apple Music API + """ + song_id = query.split("/album/")[1].split("?i=")[1] + + try: + # Get the song info + response = requests.get( + f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Song Not Found", + description=( + "The song could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Apple Music API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + # Unpack the song info + song = response.json() + name = song["data"][0]["attributes"]["name"] + artist = song["data"][0]["attributes"]["artistName"] + except IndexError: + LOG.error("Failed unpacking Apple Music song info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Apple Music API") + return None, None + + # Extract artwork URL, if available + artwork_url = ( + song["data"][0]["attributes"].get("artwork", {}).get("url", None) + ) + if artwork_url: + artwork_url = artwork_url.replace("{w}x{h}", "300x300") + + embed = discord.Embed( + title="Song Queued", + description=f"**{name}** by **{artist}**\n\nQueued by: {user.mention}", + color=BOT_COLOR, + ) + embed.set_thumbnail(url=artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC" + ) + + return song, embed diff --git a/code/utils/source_helpers/parse.py b/code/utils/source_helpers/parse.py new file mode 100644 index 0000000..8489515 --- /dev/null +++ b/code/utils/source_helpers/parse.py @@ -0,0 +1,80 @@ +import discord + +from utils.source_helpers.apple import ( + album as apple_album, + playlist as apple_playlist, + song as apple_song, +) +from utils.source_helpers.spotify import ( + album as spotify_album, + playlist as spotify_playlist, + song as spotify_song, +) +from utils.custom_sources import AppleSource, SpotifySource + + +async def parse_custom_source( + self, provider: str, query: str, user: discord.User +): + """ + Parse the query and run the appropriate functions to get the results/info + + Return the results and an embed or None, None + """ + load_funcs = { + "apple": { + "album": apple_album.load, + "playlist": apple_playlist.load, + "song": apple_song.load, + }, + "spotify": { + "album": spotify_album.load, + "playlist": spotify_playlist.load, + "song": spotify_song.load, + }, + } + + headers = { + "apple": self.bot.apple_headers, + "spotify": self.bot.spotify_headers, + } + + sources = { + "apple": AppleSource, + "spotify": SpotifySource, + } + + # Catch all songs + if "?i=" in query or "/track/" in query: + song, embed = await load_funcs[provider]["song"]( + headers[provider], query, user + ) + + if song: + results = await sources[provider].load_item(self, user, song) + else: + return None, embed + # Catch all playlists + elif "/playlist/" in query: + playlist, embed = await load_funcs[provider]["playlist"]( + headers[provider], query, user + ) + + if playlist: + results = await sources[provider].load_playlist( + self, user, playlist + ) + else: + return None, embed + # Catch all albums + elif "/album/" in query: + album, embed = await load_funcs[provider]["album"]( + headers[provider], query, user + ) + + if album: + results = await sources[provider].load_album(self, user, album) + else: + return None, embed + + return results, embed diff --git a/code/utils/source_helpers/spotify/album.py b/code/utils/source_helpers/spotify/album.py new file mode 100644 index 0000000..cf527ed --- /dev/null +++ b/code/utils/source_helpers/spotify/album.py @@ -0,0 +1,73 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the album info from the Spotify API + """ + album_id = query.split("/album/")[1] + + try: + # Get the album info + response = requests.get( + f"https://api.spotify.com/v1/albums/{album_id}", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Album Not Found", + description=( + "The album could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Spotify API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + # Unpack the album info + album = response.json() + name = album["name"] + artist = album["artists"][0]["name"] + num_tracks = len(album["tracks"]["items"]) + artwork_url = album["images"][0]["url"] + except IndexError: + LOG.error("Failed unpacking Spotify album info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Spotify API") + return None, None + + embed = discord.Embed( + title="Album Queued", + description=( + f"**{name}** by **{artist}**\n" + f"` {num_tracks} ` tracks\n\n" + f"Queued by: {user.mention}" + ), + color=BOT_COLOR, + ) + embed.set_thumbnail(url=artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC" + ) + + return album, embed diff --git a/code/utils/source_helpers/spotify/playlist.py b/code/utils/source_helpers/spotify/playlist.py new file mode 100644 index 0000000..c806e39 --- /dev/null +++ b/code/utils/source_helpers/spotify/playlist.py @@ -0,0 +1,73 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the playlist info from the Spotify API + """ + playlist_id = query.split("/playlist/")[1].split("?si=")[0] + + try: + # Get the playlist info + response = requests.get( + f"https://api.spotify.com/v1/playlists/{playlist_id}", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Playlist Not Found", + description=( + "The playlist could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Spotify API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + # Unpack the playlist info + playlist = response.json() + name = playlist["name"] + owner = playlist["owner"]["display_name"] + num_tracks = len(playlist["tracks"]["items"]) + artwork_url = playlist["images"][0]["url"] + except IndexError: + LOG.error("Failed unpacking Spotify playlist info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Spotify API") + return None, None + + embed = discord.Embed( + title="Playlist Queued", + description=( + f"**{name}** from **{owner}**\n" + f"` {num_tracks} ` tracks\n\n" + f"Queued by {user.mention}" + ), + color=BOT_COLOR, + ) + embed.set_thumbnail(url=artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC" + ) + + return playlist, embed diff --git a/code/utils/source_helpers/spotify/song.py b/code/utils/source_helpers/spotify/song.py new file mode 100644 index 0000000..258b652 --- /dev/null +++ b/code/utils/source_helpers/spotify/song.py @@ -0,0 +1,68 @@ +import datetime +import discord +import requests +from typing import Tuple, Optional +from requests.exceptions import JSONDecodeError + +from utils.config import BOT_COLOR, LOG + + +async def load( + headers: dict, + query: str, + user: discord.User, +) -> Tuple[Optional[dict], Optional[discord.Embed]]: + """ + Get the song info from the Spotify API + """ + song_id = query.split("/track/")[1] + + try: + # Get the song info + response = requests.get( + f"https://api.spotify.com/v1/tracks/{song_id}", + headers=headers, + ) + + if response.status_code == 404: + embed = discord.Embed( + title="Song Not Found", + description=( + "The song could not be found as the provided link is" + " invalid. Please try again." + ), + color=BOT_COLOR, + ) + return None, embed + + if response.status_code == 401: + LOG.error( + "Could not authorize with Spotify API. Likely need to" + " restart the bot." + ) + return None, None + + response.raise_for_status() + # Unpack the song info + song = response.json() + name = song["name"] + artist = song["artists"][0]["name"] + artwork_url = song["album"]["images"][0]["url"] + except IndexError: + LOG.error("Failed unpacking Spotify song info") + return None, None + except (JSONDecodeError, requests.HTTPError): + LOG.error("Failed making request to Spotify API") + return None, None + + embed = discord.Embed( + title="Song Queued", + description=f"**{name}** by **{artist}**\n\nQueued by {user.mention}", + color=BOT_COLOR, + ) + embed.set_thumbnail(url=artwork_url) + embed.set_footer( + text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC" + ) + + return song, embed diff --git a/test.py b/test.py new file mode 100644 index 0000000..8b4d325 --- /dev/null +++ b/test.py @@ -0,0 +1,11 @@ +import datetime + +now = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" +) + +import time + +print(now) +time.sleep(2) +print(now)