diff --git a/code/bot.py b/code/bot.py index 9ed2df6..21a0942 100644 --- a/code/bot.py +++ b/code/bot.py @@ -1,9 +1,10 @@ import discord -from discord.ext import commands +from discord.ext import commands, tasks import os +import requests from validate_config import create_config -from global_variables import LOG, BOT_TOKEN +from global_variables import LOG, BOT_TOKEN, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET class MyBot(commands.Bot): @@ -16,6 +17,7 @@ class MyBot(commands.Bot): async def setup_hook(self): create_config() + get_access_token.start() for ext in os.listdir("./code/cogs"): if ext.endswith(".py"): await self.load_extension(f"cogs.{ext[:-3]}") @@ -33,5 +35,18 @@ async def on_ready(): LOG.info(f"{bot.user} has connected to Discord.") +@tasks.loop(minutes=45) +async def get_access_token(): + auth_url = "https://accounts.spotify.com/api/token" + data = { + "grant_type": "client_credentials", + "client_id": SPOTIFY_CLIENT_ID, + "client_secret": SPOTIFY_CLIENT_SECRET, + } + response = requests.post(auth_url, data=data) + access_token = response.json()["access_token"] + bot.spotify_headers = {"Authorization": f"Bearer {access_token}"} + + if __name__ == "__main__": bot.run(BOT_TOKEN) diff --git a/code/cogs/play.py b/code/cogs/play.py index 895f5ab..f25b3d8 100644 --- a/code/cogs/play.py +++ b/code/cogs/play.py @@ -6,8 +6,10 @@ from cogs.music import Music from lavalink import LoadType import re from cogs.music import LavalinkVoiceClient +import requests from global_variables import BOT_COLOR +from custom_source import CustomSource url_rx = re.compile(r"https?://(?:www\.)?.+") @@ -42,57 +44,148 @@ class Play(commands.Cog): ) return await interaction.response.send_message(embed=embed) - if not url_rx.match(query): - ytsearch = f"scsearch:{query}" - results = await player.node.get_tracks(ytsearch) + # If a Spotify link is found, act accordingly + # We use a custom source for this (I tried the LavaSrc plugin, but Spotify + # links would just result in random shit being played whenever ytsearch was removed) + if "open.spotify.com" in query: + embed = discord.Embed(color=BOT_COLOR) + + if "open.spotify.com/playlist" in query: + playlist_id = query.split("playlist/")[1].split("?si=")[0] + playlist_url = f"https://api.spotify.com/v1/playlists/{playlist_id}" + response = requests.get(playlist_url, headers=self.bot.spotify_headers) + if response.status_code == 200: + playlist = response.json() + + embed.title = "Playlist Queued" + embed.description = f"**{playlist['name']}** from **{playlist['owner']['display_name']}**\n` {len(playlist['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}" + embed.set_thumbnail(url=playlist["images"][0]["url"]) + embed.set_footer( + text=datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" + ) + await interaction.response.send_message(embed=embed) + + tracks = await CustomSource.load_playlist( + self, interaction.user, playlist + ) + for track in tracks["tracks"]: + player.add(requester=interaction.user, track=track) + + if "open.spotify.com/album" in query: + album_id = query.split("album/")[1] + album_url = f"https://api.spotify.com/v1/albums/{album_id}" + response = requests.get(album_url, headers=self.bot.spotify_headers) + if response.status_code == 200: + album = response.json() + + embed.title = "Album Queued" + embed.description = f"**{album['name']}** by **{album['artists'][0]['name']}**\n` {len(album['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}" + embed.set_thumbnail(url=album["images"][0]["url"]) + embed.set_footer( + text=datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" + ) + await interaction.response.send_message(embed=embed) + + tracks = await CustomSource.load_album( + self, interaction.user, album + ) + for track in tracks["tracks"]: + player.add(requester=interaction.user, track=track) + + if "open.spotify.com/track" in query: + track_id = query.split("track/")[1] + track_url = f"https://api.spotify.com/v1/tracks/{track_id}" + response = requests.get(track_url, headers=self.bot.spotify_headers) + if response.status_code == 200: + track = response.json() + + embed.title = "Track Queued" + embed.description = f"**{track['name']}** by **{track['artists'][0]['name']}**\n\nQueued by: {interaction.user.mention}" + embed.set_thumbnail(url=track["album"]["images"][0]["url"]) + embed.set_footer( + text=datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" + ) + await interaction.response.send_message(embed=embed) + + results = await CustomSource.load_item( + self, interaction.user, track + ) + player.add(requester=interaction.user, track=results.tracks[0]) + + if "open.spotify.com/artists" in query: + embed.title = "Artists Cannot Be Played" + embed.description = "I cannot play just artists, you must provide a song/album/playlist. Please try again." + return await interaction.response.send_message( + embed=embed, ephemeral=True + ) + + # For anything else, do default searches and attempt to find track and play + + else: + if not url_rx.match(query): + ytsearch = f"scsearch:{query}" + results = await player.node.get_tracks(ytsearch) + + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + scsearch = f"dzsearch:{query}" + results = await player.node.get_tracks(scsearch) + else: + results = await player.node.get_tracks(query) + + embed = discord.Embed(color=BOT_COLOR) if not results.tracks or results.load_type in ( LoadType.EMPTY, LoadType.ERROR, ): - scsearch = f"dzsearch:{query}" - results = await player.node.get_tracks(scsearch) - else: - results = await player.node.get_tracks(query) + embed.title = "Nothing Found" + embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know." + return await interaction.response.send_message( + embed=embed, ephemeral=True + ) - embed = discord.Embed(color=BOT_COLOR) + elif results.load_type == LoadType.PLAYLIST: + tracks = results.tracks - if not results.tracks or results.load_type in (LoadType.EMPTY, LoadType.ERROR): - embed.title = "Nothing Found" - embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know." - return await interaction.response.send_message(embed=embed, ephemeral=True) + for track in tracks: + player.add(requester=interaction.user, track=track) - elif results.load_type == LoadType.PLAYLIST: - tracks = results.tracks + embed.title = "Songs Queued!" + embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}" + embed.set_footer( + text=datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" + ) + await interaction.response.send_message(embed=embed) - for track in tracks: + else: + track = results.tracks[0] player.add(requester=interaction.user, track=track) - embed.title = "Songs Queued!" - embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}" - embed.set_footer( - text=datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y-%m-%d %H:%M:%S" + embed.title = "Track Queued" + embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}" + embed.set_thumbnail(url=track.artwork_url) + embed.set_footer( + text=datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + " UTC" ) - + " UTC" - ) - await interaction.response.send_message(embed=embed) - - else: - track = results.tracks[0] - - embed.title = "Track Queued" - embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}" - embed.set_thumbnail(url=track.artwork_url) - embed.set_footer( - text=datetime.datetime.now(datetime.timezone.utc).strftime( - "%Y-%m-%d %H:%M:%S" - ) - + " UTC" - ) - - player.add(requester=interaction.user, track=track) - await interaction.response.send_message(embed=embed) + await interaction.response.send_message(embed=embed) # Only join the voice channel now, so that the bot doesn't join if nothing is found if not interaction.guild.voice_client: diff --git a/code/custom_source.py b/code/custom_source.py new file mode 100644 index 0000000..d22dea5 --- /dev/null +++ b/code/custom_source.py @@ -0,0 +1,114 @@ +from lavalink import LoadResult, LoadType, Source, DeferredAudioTrack, PlaylistInfo + + +class LoadError(Exception): # We'll raise this if we have trouble loading our track. + pass + + +class CustomAudioTrack(DeferredAudioTrack): + # A DeferredAudioTrack allows us to load metadata now, and a playback URL later. + # This makes the DeferredAudioTrack highly efficient, particularly in cases + # where large playlists are loaded. + + async def load( + self, client + ): # Load our 'actual' playback track using the metadata from this one. + scsearch = f"scsearch:{self.title} {self.author}" + results = await client.get_tracks(scsearch) + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + dzsearch = f"dzsearch:{self.title} {self.author}" + results = await client.get_tracks(dzsearch) + + if not results.tracks or results.load_type in ( + LoadType.EMPTY, + LoadType.ERROR, + ): + raise LoadError + + first_track = results.tracks[0] # Grab the first track from the results. + base64 = first_track.track # Extract the base64 string from the track. + self.track = base64 # We'll store this for later, as it allows us to save making network requests + # if this track is re-used (e.g. repeat). + + return base64 + + +class CustomSource(Source): + def __init__(self): + super().__init__( + name="custom" + ) # Initialising our custom source with the name 'custom'. + + async def load_item(self, user, metadata): + track = CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": metadata[ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": metadata["artists"][0]["name"], + "length": metadata["duration_ms"], + "isStream": False, + "title": metadata["name"], + "uri": metadata["external_urls"]["spotify"], + "duration": metadata["duration_ms"], + "artworkUrl": metadata["album"]["images"][0]["url"], + }, + requester=user, + ) + return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none()) + + async def load_album(self, user, metadata): + tracks = [] + for track in metadata["tracks"][ + "items" + ]: # Loop through each track in the album. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track[ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": track["artists"][0]["name"], + "length": track["duration_ms"], + "isStream": False, + "title": track["name"], + "uri": track["external_urls"]["spotify"], + "duration": track["duration_ms"], + "artworkUrl": metadata["images"][0]["url"], + }, + requester=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()) + + async def load_playlist(self, user, metadata): + tracks = [] + for track in metadata["tracks"][ + "items" + ]: # Loop through each track in the playlist. + tracks.append( + CustomAudioTrack( + { # Create an instance of our CustomAudioTrack. + "identifier": track["track"][ + "id" + ], # Fill it with metadata that we've obtained from our source's provider. + "isSeekable": True, + "author": track["track"]["artists"][0]["name"], + "length": track["track"]["duration_ms"], + "isStream": False, + "title": track["track"]["name"], + "uri": track["track"]["external_urls"]["spotify"], + "duration": track["track"]["duration_ms"], + "artworkUrl": track["track"]["album"]["images"][0]["url"], + }, + requster=user, + ) + ) + + return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())