aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorParker <contact@pkrm.dev>2024-04-09 01:28:01 -0500
committerParker <contact@pkrm.dev>2024-04-09 01:28:01 -0500
commitd7fa64ffa2dd960a462633eb008faac80d76185b (patch)
tree5e24c0c88c31ac40949ce3b6005ab60ac53784d4
parent2040a2f2fb9ef156ad0052c7ad194a8dd234553c (diff)
Change Spotify support to use custom source
-rw-r--r--code/bot.py19
-rw-r--r--code/cogs/play.py171
-rw-r--r--code/custom_source.py114
3 files changed, 263 insertions, 41 deletions
diff --git a/code/bot.py b/code/bot.py
index 9ed2df6..21a0942 100644
--- a/code/bot.py
+++ b/code/bot.py
@@ -1,9 +1,10 @@
import discord
-from discord.ext import commands
+from discord.ext import commands, tasks
import os
+import requests
from validate_config import create_config
-from global_variables import LOG, BOT_TOKEN
+from global_variables import LOG, BOT_TOKEN, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET
class MyBot(commands.Bot):
@@ -16,6 +17,7 @@ class MyBot(commands.Bot):
async def setup_hook(self):
create_config()
+ get_access_token.start()
for ext in os.listdir("./code/cogs"):
if ext.endswith(".py"):
await self.load_extension(f"cogs.{ext[:-3]}")
@@ -33,5 +35,18 @@ async def on_ready():
LOG.info(f"{bot.user} has connected to Discord.")
+@tasks.loop(minutes=45)
+async def get_access_token():
+ auth_url = "https://accounts.spotify.com/api/token"
+ data = {
+ "grant_type": "client_credentials",
+ "client_id": SPOTIFY_CLIENT_ID,
+ "client_secret": SPOTIFY_CLIENT_SECRET,
+ }
+ response = requests.post(auth_url, data=data)
+ access_token = response.json()["access_token"]
+ bot.spotify_headers = {"Authorization": f"Bearer {access_token}"}
+
+
if __name__ == "__main__":
bot.run(BOT_TOKEN)
diff --git a/code/cogs/play.py b/code/cogs/play.py
index 895f5ab..f25b3d8 100644
--- a/code/cogs/play.py
+++ b/code/cogs/play.py
@@ -6,8 +6,10 @@ from cogs.music import Music
from lavalink import LoadType
import re
from cogs.music import LavalinkVoiceClient
+import requests
from global_variables import BOT_COLOR
+from custom_source import CustomSource
url_rx = re.compile(r"https?://(?:www\.)?.+")
@@ -42,57 +44,148 @@ class Play(commands.Cog):
)
return await interaction.response.send_message(embed=embed)
- if not url_rx.match(query):
- ytsearch = f"scsearch:{query}"
- results = await player.node.get_tracks(ytsearch)
+ # If a Spotify link is found, act accordingly
+ # We use a custom source for this (I tried the LavaSrc plugin, but Spotify
+ # links would just result in random shit being played whenever ytsearch was removed)
+ if "open.spotify.com" in query:
+ embed = discord.Embed(color=BOT_COLOR)
+
+ if "open.spotify.com/playlist" in query:
+ playlist_id = query.split("playlist/")[1].split("?si=")[0]
+ playlist_url = f"https://api.spotify.com/v1/playlists/{playlist_id}"
+ response = requests.get(playlist_url, headers=self.bot.spotify_headers)
+ if response.status_code == 200:
+ playlist = response.json()
+
+ embed.title = "Playlist Queued"
+ embed.description = f"**{playlist['name']}** from **{playlist['owner']['display_name']}**\n` {len(playlist['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}"
+ embed.set_thumbnail(url=playlist["images"][0]["url"])
+ embed.set_footer(
+ text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ + " UTC"
+ )
+ await interaction.response.send_message(embed=embed)
+
+ tracks = await CustomSource.load_playlist(
+ self, interaction.user, playlist
+ )
+ for track in tracks["tracks"]:
+ player.add(requester=interaction.user, track=track)
+
+ if "open.spotify.com/album" in query:
+ album_id = query.split("album/")[1]
+ album_url = f"https://api.spotify.com/v1/albums/{album_id}"
+ response = requests.get(album_url, headers=self.bot.spotify_headers)
+ if response.status_code == 200:
+ album = response.json()
+
+ embed.title = "Album Queued"
+ embed.description = f"**{album['name']}** by **{album['artists'][0]['name']}**\n` {len(album['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}"
+ embed.set_thumbnail(url=album["images"][0]["url"])
+ embed.set_footer(
+ text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ + " UTC"
+ )
+ await interaction.response.send_message(embed=embed)
+
+ tracks = await CustomSource.load_album(
+ self, interaction.user, album
+ )
+ for track in tracks["tracks"]:
+ player.add(requester=interaction.user, track=track)
+
+ if "open.spotify.com/track" in query:
+ track_id = query.split("track/")[1]
+ track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
+ response = requests.get(track_url, headers=self.bot.spotify_headers)
+ if response.status_code == 200:
+ track = response.json()
+
+ embed.title = "Track Queued"
+ embed.description = f"**{track['name']}** by **{track['artists'][0]['name']}**\n\nQueued by: {interaction.user.mention}"
+ embed.set_thumbnail(url=track["album"]["images"][0]["url"])
+ embed.set_footer(
+ text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ + " UTC"
+ )
+ await interaction.response.send_message(embed=embed)
+
+ results = await CustomSource.load_item(
+ self, interaction.user, track
+ )
+ player.add(requester=interaction.user, track=results.tracks[0])
+
+ if "open.spotify.com/artists" in query:
+ embed.title = "Artists Cannot Be Played"
+ embed.description = "I cannot play just artists, you must provide a song/album/playlist. Please try again."
+ return await interaction.response.send_message(
+ embed=embed, ephemeral=True
+ )
+
+ # For anything else, do default searches and attempt to find track and play
+
+ else:
+ if not url_rx.match(query):
+ ytsearch = f"scsearch:{query}"
+ results = await player.node.get_tracks(ytsearch)
+
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ scsearch = f"dzsearch:{query}"
+ results = await player.node.get_tracks(scsearch)
+ else:
+ results = await player.node.get_tracks(query)
+
+ embed = discord.Embed(color=BOT_COLOR)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
- scsearch = f"dzsearch:{query}"
- results = await player.node.get_tracks(scsearch)
- else:
- results = await player.node.get_tracks(query)
+ embed.title = "Nothing Found"
+ embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know."
+ return await interaction.response.send_message(
+ embed=embed, ephemeral=True
+ )
- embed = discord.Embed(color=BOT_COLOR)
+ elif results.load_type == LoadType.PLAYLIST:
+ tracks = results.tracks
- if not results.tracks or results.load_type in (LoadType.EMPTY, LoadType.ERROR):
- embed.title = "Nothing Found"
- embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know."
- return await interaction.response.send_message(embed=embed, ephemeral=True)
+ for track in tracks:
+ player.add(requester=interaction.user, track=track)
- elif results.load_type == LoadType.PLAYLIST:
- tracks = results.tracks
+ embed.title = "Songs Queued!"
+ embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}"
+ embed.set_footer(
+ text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ + " UTC"
+ )
+ await interaction.response.send_message(embed=embed)
- for track in tracks:
+ else:
+ track = results.tracks[0]
player.add(requester=interaction.user, track=track)
- embed.title = "Songs Queued!"
- embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}"
- embed.set_footer(
- text=datetime.datetime.now(datetime.timezone.utc).strftime(
- "%Y-%m-%d %H:%M:%S"
+ embed.title = "Track Queued"
+ embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}"
+ embed.set_thumbnail(url=track.artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
+ + " UTC"
)
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- else:
- track = results.tracks[0]
-
- embed.title = "Track Queued"
- embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}"
- embed.set_thumbnail(url=track.artwork_url)
- embed.set_footer(
- text=datetime.datetime.now(datetime.timezone.utc).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- + " UTC"
- )
-
- player.add(requester=interaction.user, track=track)
- await interaction.response.send_message(embed=embed)
+ await interaction.response.send_message(embed=embed)
# Only join the voice channel now, so that the bot doesn't join if nothing is found
if not interaction.guild.voice_client:
diff --git a/code/custom_source.py b/code/custom_source.py
new file mode 100644
index 0000000..d22dea5
--- /dev/null
+++ b/code/custom_source.py
@@ -0,0 +1,114 @@
+from lavalink import LoadResult, LoadType, Source, DeferredAudioTrack, PlaylistInfo
+
+
+class LoadError(Exception): # We'll raise this if we have trouble loading our track.
+ pass
+
+
+class CustomAudioTrack(DeferredAudioTrack):
+ # A DeferredAudioTrack allows us to load metadata now, and a playback URL later.
+ # This makes the DeferredAudioTrack highly efficient, particularly in cases
+ # where large playlists are loaded.
+
+ async def load(
+ self, client
+ ): # Load our 'actual' playback track using the metadata from this one.
+ scsearch = f"scsearch:{self.title} {self.author}"
+ results = await client.get_tracks(scsearch)
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ dzsearch = f"dzsearch:{self.title} {self.author}"
+ results = await client.get_tracks(dzsearch)
+
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ raise LoadError
+
+ first_track = results.tracks[0] # Grab the first track from the results.
+ base64 = first_track.track # Extract the base64 string from the track.
+ self.track = base64 # We'll store this for later, as it allows us to save making network requests
+ # if this track is re-used (e.g. repeat).
+
+ return base64
+
+
+class CustomSource(Source):
+ def __init__(self):
+ super().__init__(
+ name="custom"
+ ) # Initialising our custom source with the name 'custom'.
+
+ async def load_item(self, user, metadata):
+ track = CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": metadata[
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": metadata["artists"][0]["name"],
+ "length": metadata["duration_ms"],
+ "isStream": False,
+ "title": metadata["name"],
+ "uri": metadata["external_urls"]["spotify"],
+ "duration": metadata["duration_ms"],
+ "artworkUrl": metadata["album"]["images"][0]["url"],
+ },
+ requester=user,
+ )
+ return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none())
+
+ async def load_album(self, user, metadata):
+ tracks = []
+ for track in metadata["tracks"][
+ "items"
+ ]: # Loop through each track in the album.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track[
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": track["artists"][0]["name"],
+ "length": track["duration_ms"],
+ "isStream": False,
+ "title": track["name"],
+ "uri": track["external_urls"]["spotify"],
+ "duration": track["duration_ms"],
+ "artworkUrl": metadata["images"][0]["url"],
+ },
+ requester=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())
+
+ async def load_playlist(self, user, metadata):
+ tracks = []
+ for track in metadata["tracks"][
+ "items"
+ ]: # Loop through each track in the playlist.
+ tracks.append(
+ CustomAudioTrack(
+ { # Create an instance of our CustomAudioTrack.
+ "identifier": track["track"][
+ "id"
+ ], # Fill it with metadata that we've obtained from our source's provider.
+ "isSeekable": True,
+ "author": track["track"]["artists"][0]["name"],
+ "length": track["track"]["duration_ms"],
+ "isStream": False,
+ "title": track["track"]["name"],
+ "uri": track["track"]["external_urls"]["spotify"],
+ "duration": track["track"]["duration_ms"],
+ "artworkUrl": track["track"]["album"]["images"][0]["url"],
+ },
+ requster=user,
+ )
+ )
+
+ return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())