aboutsummaryrefslogtreecommitdiff
path: root/code
diff options
context:
space:
mode:
Diffstat (limited to 'code')
-rw-r--r--code/cogs/play.py414
-rw-r--r--code/utils/command_tree.py22
-rw-r--r--code/utils/source_helpers/apple/album.py79
-rw-r--r--code/utils/source_helpers/apple/playlist.py90
-rw-r--r--code/utils/source_helpers/apple/song.py74
-rw-r--r--code/utils/source_helpers/parse.py80
-rw-r--r--code/utils/source_helpers/spotify/album.py73
-rw-r--r--code/utils/source_helpers/spotify/playlist.py73
-rw-r--r--code/utils/source_helpers/spotify/song.py68
9 files changed, 638 insertions, 335 deletions
diff --git a/code/cogs/play.py b/code/cogs/play.py
index 82ac214..38a91b8 100644
--- a/code/cogs/play.py
+++ b/code/cogs/play.py
@@ -4,11 +4,14 @@ from discord import app_commands
from discord.ext import commands
from lavalink import LoadType
import re
-import requests
from cogs.music import Music, LavalinkVoiceClient
from utils.config import BOT_COLOR, YOUTUBE_SUPPORT
-from utils.custom_sources import SpotifySource, AppleSource
+from utils.custom_sources import (
+ LoadError,
+ CustomAudioTrack,
+)
+from utils.source_helpers.parse import parse_custom_source
url_rx = re.compile(r"https?://(?:www\.)?.+")
@@ -43,366 +46,137 @@ class Play(commands.Cog):
embed=embed, ephemeral=True
)
- ###
- ### APPLE MUSIC links, perform API requests and load all tracks from the playlist/album/track
- ###
-
if "music.apple.com" in query:
- if not self.bot.apple_headers:
- embed = discord.Embed(
- title="Apple Music Error",
- description=(
- "Apple Music support seems to be broken at the moment."
- " Please try again and fill out a bug report with"
- " </bug:1224840889906499626> if this continues to"
- " happen."
- ),
- color=BOT_COLOR,
- )
- return await interaction.response.send_message(
- embed=embed, ephemeral=True
- )
-
- embed = discord.Embed(color=BOT_COLOR)
-
- if "/playlist/" in query and "?i=" not in query:
- playlist_id = query.split("/playlist/")[1].split("/")[1]
- # Get all of the tracks in the playlist (limit at 250)
- playlist_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100"
- response = requests.get(
- playlist_url, headers=self.bot.apple_headers
- )
- if response.status_code == 200:
- playlist = response.json()
- # Get the general playlist info (name, artwork)
- playlist_info_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}"
- playlist_info = requests.get(
- playlist_info_url, headers=self.bot.apple_headers
- )
- playlist_info = playlist_info.json()
- try:
- artwork_url = playlist_info["data"][0]["attributes"][
- "artwork"
- ]["url"].replace("{w}x{h}", "300x300")
- except KeyError:
- artwork_url = None
-
- embed.title = "Playlist Queued"
- embed.description = (
- f"**{playlist_info['data'][0]['attributes']['name']}**\n`"
- f" {len(playlist['data'])} ` tracks\n\nQueued by:"
- f" {interaction.user.mention}"
- )
- embed.set_thumbnail(url=artwork_url)
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- # Add small alert if the playlist is the max size
- if len(playlist["data"]) == 100:
- embed.description += (
- "\n\n*This playlist is longer than the 100 song"
- " maximum. Only the first 100 songs will be"
- " queued.*"
- )
-
- await interaction.response.send_message(embed=embed)
-
- tracks = await AppleSource.load_playlist(
- self, interaction.user, playlist
- )
- for track in tracks["tracks"]:
- player.add(requester=interaction.user, track=track)
-
- # If there is an album, not a specific song within the album
- if "/album/" in query and "?i=" not in query:
- album_id = query.split("/album/")[1].split("/")[1]
- album_url = f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}"
- response = requests.get(
- album_url, headers=self.bot.apple_headers
- )
- if response.status_code == 200:
- album = response.json()
-
- embed.title = "Album Queued"
- embed.description = (
- f"**{album['data'][0]['attributes']['name']}** by"
- f" **{album['data'][0]['attributes']['artistName']}**\n`"
- f" {len(album['data'][0]['relationships']['tracks']['data'])} `"
- f" tracks\n\nQueued by: {interaction.user.mention}"
- )
- embed.set_thumbnail(
- url=album["data"][0]["attributes"]["artwork"][
- "url"
- ].replace("{w}x{h}", "300x300")
- )
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- tracks = await AppleSource.load_album(
- self, interaction.user, album
- )
- for track in tracks["tracks"]:
- player.add(requester=interaction.user, track=track)
-
- # If there is a specific song
- if "/album/" in query and "?i=" in query:
- song_id = query.split("/album/")[1].split("?i=")[1]
- song_url = f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}"
- response = requests.get(
- song_url, headers=self.bot.apple_headers
- )
- if response.status_code == 200:
- song = response.json()
-
- embed.title = "Song Queued"
- embed.description = (
- f"**{song['data'][0]['attributes']['name']}** by"
- f" **{song['data'][0]['attributes']['artistName']}**\n\nQueued"
- f" by: {interaction.user.mention}"
- )
- embed.set_thumbnail(
- url=song["data"][0]["attributes"]["artwork"][
- "url"
- ].replace("{w}x{h}", "300x300")
- )
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- results = await AppleSource.load_item(
- self, interaction.user, song
- )
- player.add(
- requester=interaction.user, track=results.tracks[0]
- )
-
- ###
- ### SPOTIFY links, perform API requests and load all tracks from the playlist/album/track
- ###
+ results, embed = await parse_custom_source(
+ self, "apple", query, interaction.user
+ )
elif "open.spotify.com" in query:
- if not self.bot.spotify_headers:
- embed = discord.Embed(
- title="Spotify Error",
- description=(
- "Spotify support seems to be broken at the moment."
- " Please try again and fill out a bug report with"
- " </bug:1224840889906499626> if this continues to"
- " happen."
- ),
- color=BOT_COLOR,
- )
- return await interaction.response.send_message(
- embed=embed, ephemeral=True
- )
-
- embed = discord.Embed(color=BOT_COLOR)
-
- if "open.spotify.com/playlist" in query:
- playlist_id = query.split("playlist/")[1].split("?si=")[0]
- playlist_url = (
- f"https://api.spotify.com/v1/playlists/{playlist_id}"
- )
- response = requests.get(
- playlist_url, headers=self.bot.spotify_headers
- )
- if response.status_code == 200:
- playlist = response.json()
-
- embed.title = "Playlist Queued"
- embed.description = (
- f"**{playlist['name']}** from"
- f" **{playlist['owner']['display_name']}**\n`"
- f" {len(playlist['tracks']['items'])} `"
- f" tracks\n\nQueued by: {interaction.user.mention}"
- )
- embed.set_thumbnail(url=playlist["images"][0]["url"])
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- tracks = await SpotifySource.load_playlist(
- self, interaction.user, playlist
- )
- for track in tracks["tracks"]:
- player.add(requester=interaction.user, track=track)
-
- if "open.spotify.com/album" in query:
- album_id = query.split("album/")[1]
- album_url = f"https://api.spotify.com/v1/albums/{album_id}"
- response = requests.get(
- album_url, headers=self.bot.spotify_headers
- )
- if response.status_code == 200:
- album = response.json()
-
- embed.title = "Album Queued"
- embed.description = (
- f"**{album['name']}** by"
- f" **{album['artists'][0]['name']}**\n`"
- f" {len(album['tracks']['items'])} ` tracks\n\nQueued"
- f" by: {interaction.user.mention}"
- )
- embed.set_thumbnail(url=album["images"][0]["url"])
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- tracks = await SpotifySource.load_album(
- self, interaction.user, album
- )
- for track in tracks["tracks"]:
- player.add(requester=interaction.user, track=track)
-
- if "open.spotify.com/track" in query:
- track_id = query.split("track/")[1]
- track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
- response = requests.get(
- track_url, headers=self.bot.spotify_headers
- )
- if response.status_code == 200:
- track = response.json()
-
- embed.title = "Track Queued"
- embed.description = (
- f"**{track['name']}** by"
- f" **{track['artists'][0]['name']}**\n\nQueued by:"
- f" {interaction.user.mention}"
- )
- embed.set_thumbnail(url=track["album"]["images"][0]["url"])
- embed.set_footer(
- text=datetime.datetime.now(
- datetime.timezone.utc
- ).strftime("%Y-%m-%d %H:%M:%S")
- + " UTC"
- )
- await interaction.response.send_message(embed=embed)
-
- results = await SpotifySource.load_item(
- self, interaction.user, track
- )
- player.add(
- requester=interaction.user, track=results.tracks[0]
- )
-
- if "open.spotify.com/artists" in query:
- embed.title = "Artists Cannot Be Played"
- embed.description = (
- "I cannot play just artists, you must provide a"
- " song/album/playlist. Please try again."
- )
- return await interaction.response.send_message(
- embed=embed, ephemeral=True
- )
+ results, embed = await parse_custom_source(
+ self, "spotify", query, interaction.user
+ )
###
### For anything else, use default Lavalink providers to search the query
###
else:
+ # If the query is not a URL, begin searching
if not url_rx.match(query):
dzsearch = f"dzsearch:{query}"
results = await player.node.get_tracks(dzsearch)
-
+ # If Deezer returned nothing
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
- ytmsearch = f"ytmsearch:{query}"
- results = await player.node.get_tracks(ytmsearch)
-
- if not results.tracks or results.load_type in (
- LoadType.EMPTY,
- LoadType.ERROR,
- ):
- ytsearch = f"ytsearch:{query}"
- results = await player.node.get_tracks(ytsearch)
+ if YOUTUBE_SUPPORT:
+ ytmsearch = f"ytmsearch:{query}"
+ results = await player.node.get_tracks(ytmsearch)
+ # If YouTube Music returned nothing
+ if not results.tracks or results.load_type in (
+ LoadType.EMPTY,
+ LoadType.ERROR,
+ ):
+ # Final search attempt with YouTube
+ ytsearch = f"ytsearch:{query}"
+ results = await player.node.get_tracks(ytsearch)
else:
results = await player.node.get_tracks(query)
- embed = discord.Embed(color=BOT_COLOR)
-
+ # If there are no results found, set results/embed to None, handled further down
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
- embed.title = "Nothing Found"
- embed.description = (
- "Nothing for that query could be found. If this continues"
- " happening for other songs, please run"
- " </bug:1224840889906499626> to let the developer know."
- )
- return await interaction.response.send_message(
- embed=embed, ephemeral=True
- )
-
- elif results.load_type == LoadType.PLAYLIST:
- tracks = results.tracks
-
- for track in tracks:
- player.add(requester=interaction.user, track=track)
+ results, embed = None, None
- embed.title = "Songs Queued!"
- embed.description = (
- f"**{results.playlist_info.name}**\n` {len(tracks)} `"
- f" tracks\n\nQueued by: {interaction.user.mention}"
+ # Create the embed if the results are a playlist
+ if results.load_type == LoadType.PLAYLIST:
+ embed = discord.Embed(
+ title="Songs Queued!",
+ description=(
+ f"**{results.playlist_info.name}**\n"
+ f"` {len(results.tracks)} ` tracks\n\n"
+ f"Queued by: {interaction.user.mention}"
+ ),
+ color=BOT_COLOR,
)
embed.set_footer(
- text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ text=datetime.datetime.utcnow().strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
-
+ # Otherwise, the result is just a single track, create that embed
else:
- track = results.tracks[0]
- player.add(requester=interaction.user, track=track)
-
- embed.title = "Track Queued"
- embed.description = (
- f"**{track.title}** by **{track.author}**\n\nQueued by:"
- f" {interaction.user.mention}"
+ # Remove all but first track (most relevant result)
+ results.tracks = results.tracks[:1]
+ embed = discord.Embed(
+ title="Song Queued!",
+ description=(
+ f"**{results.tracks[0].title}** by"
+ f" **{results.tracks[0].author}**\n\nQueued by:"
+ f" {interaction.user.mention}"
+ ),
+ color=BOT_COLOR,
)
- embed.set_thumbnail(url=track.artwork_url)
+ embed.set_thumbnail(url=results.tracks[0].artwork_url)
embed.set_footer(
- text=datetime.datetime.now(datetime.timezone.utc).strftime(
+ text=datetime.datetime.utcnow().strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
- # Only join the voice channel now, so that the bot doesn't join if nothing is found
- if not interaction.guild.voice_client:
- await interaction.user.voice.channel.connect(
- cls=LavalinkVoiceClient, self_deaf=True
+ # If there are no results, and no embed
+ if not results and not embed:
+ embed = discord.Embed(
+ title="Nothing Found",
+ description=(
+ "I was not able to find or load any songs for that query."
+ " Please try again and fill out a bug report with"
+ " </bug:1224840889906499626> if this continues to happen."
+ ),
+ color=BOT_COLOR,
+ )
+ return await interaction.response.send_message(
+ embed=embed, ephemeral=True
+ )
+ # If there are no results, but there is an embed (error msg)
+ elif embed and not results:
+ return await interaction.response.send_message(
+ embed=embed, ephemeral=True
)
+ # If there are results, add them to the player
+ else:
+ for track in results.tracks:
+ player.add(requester=interaction.user, track=track)
+
+ # If the track is CustomAudioTrack (Apple Music/Spotify)
+ if type(results.tracks[0]) == CustomAudioTrack:
+ # Attempt to load an actual track from a provider
+ try:
+ await results.tracks[0].load(player.node)
+ # If it fails, remove it from the queue and alert the user
+ except LoadError:
+ player.queue.remove(results.tracks[0])
+ raise LoadError
+
+ # Join the voice channel if not already connected
+ if not interaction.guild.voice_client:
+ await interaction.user.voice.channel.connect(
+ cls=LavalinkVoiceClient, self_deaf=True
+ )
+
+ # Only call player.play if it is not already playing, otherwise it will
+ # effectively skip the current track
+ if not player.is_playing:
+ await player.play()
- # We don't want to call .play() if the player is playing as that will
- # effectively skip the current track
- if not player.is_playing:
- await player.play()
+ await interaction.response.send_message(embed=embed)
async def setup(bot):
diff --git a/code/utils/command_tree.py b/code/utils/command_tree.py
index 3d9214d..7ccb8b1 100644
--- a/code/utils/command_tree.py
+++ b/code/utils/command_tree.py
@@ -84,31 +84,23 @@ class Tree(app_commands.CommandTree):
except discord.errors.InteractionResponded:
await interaction.followup.send(embed=embed, ephemeral=True)
- # If a Spotify song is linked but cannot be found on a provider (e.g. YouTube)
- elif isinstance(error, LoadError):
+ elif (error, LoadError):
embed = discord.Embed(
- title="Nothing Found",
+ title="Load Error",
description=(
- "Spotify does not allow direct play, meaning songs have to"
- " be found on a supported provider. In this case, the song"
- " couldn't be found. Please try again with a different"
- " song, or try searching for just the name and artist"
- " manually rather than sending a link."
+ "Apple Music and Spotify do not allow direct playing from"
+ " their websites, and I was unable to load a track on a"
+ " valid source. Please try again."
),
color=BOT_COLOR,
)
- embed.set_footer(
- text=datetime.datetime.now(datetime.timezone.utc).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- + " UTC"
- )
+ # Only send the error if the interaction is still valid
try:
await interaction.response.send_message(
embed=embed, ephemeral=True
)
except discord.errors.InteractionResponded:
- await interaction.followup.send(embed=embed, ephemeral=True)
+ pass
else:
raise error
diff --git a/code/utils/source_helpers/apple/album.py b/code/utils/source_helpers/apple/album.py
new file mode 100644
index 0000000..3195e2d
--- /dev/null
+++ b/code/utils/source_helpers/apple/album.py
@@ -0,0 +1,79 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the album info from the Apple Music API
+ """
+ album_id = query.split("/album/")[1].split("/")[1]
+
+ try:
+ # Get the album info
+ response = requests.get(
+ f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Album Not Found",
+ description=(
+ "The album could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Apple Music API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ # Unpack the album info
+ album = response.json()
+ name = album["data"][0]["attributes"]["name"]
+ artist = album["data"][0]["attributes"]["artistName"]
+ num_tracks = len(album["data"][0]["relationships"]["tracks"]["data"])
+ except IndexError:
+ LOG.error("Failed unpacking Apple Music album info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Apple Music API")
+ return None, None
+
+ # Extract artwork URL, if available
+ artwork_url = (
+ album["data"][0]["attributes"].get("artwork", {}).get("url", None)
+ )
+ if artwork_url:
+ artwork_url = artwork_url.replace("{w}x{h}", "300x300")
+
+ embed = discord.Embed(
+ title="Album Queued",
+ description=(
+ f"**{name}** by **{artist}**\n"
+ f"` {num_tracks} ` tracks\n\n"
+ f"Queued by: {user.mention}"
+ ),
+ color=BOT_COLOR,
+ )
+ embed.set_thumbnail(url=artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
+ )
+
+ return album, embed
diff --git a/code/utils/source_helpers/apple/playlist.py b/code/utils/source_helpers/apple/playlist.py
new file mode 100644
index 0000000..8a27315
--- /dev/null
+++ b/code/utils/source_helpers/apple/playlist.py
@@ -0,0 +1,90 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the playlist info from the Apple Music API
+ """
+ playlist_id = query.split("/playlist/")[1].split("/")[1]
+ try:
+ # Get all of the tracks in the playlist (limit at 100)
+ response = requests.get(
+ f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Playlist Not Found",
+ description=(
+ "The playlist could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Apple Music API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ playlist = response.json()
+
+ # Get the general playlist info (name, artwork)
+ response = requests.get(
+ f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}",
+ headers=headers,
+ )
+
+ response.raise_for_status()
+ # Unpack the playlist info
+ playlist_info = response.json()
+ name = playlist_info["data"][0]["attributes"]["name"]
+ num_tracks = len(playlist["data"])
+ except IndexError:
+ LOG.error("Failed unpacking Apple Music playlist info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Apple Music API")
+ return None, None
+
+ # Extract artwork URL, if available
+ artwork_url = (
+ playlist_info["data"][0]["attributes"]
+ .get("artwork", {})
+ .get("url", None)
+ )
+ if artwork_url:
+ artwork_url = artwork_url.replace("{w}x{h}", "300x300")
+
+ embed = discord.Embed(
+ title="Playlist Queued",
+ description=(
+ f"**{name}**\n` {num_tracks} ` tracks\n\nQueued by: {user.mention}"
+ ),
+ color=BOT_COLOR,
+ )
+
+ # Add small alert if the playlist is the max size
+ if len(playlist["data"]) == 100:
+ embed.description += (
+ "\n\n*This playlist is longer than the 100 song"
+ " maximum. Only the first 100 songs will be"
+ " queued.*"
+ )
+
+ return playlist, embed
diff --git a/code/utils/source_helpers/apple/song.py b/code/utils/source_helpers/apple/song.py
new file mode 100644
index 0000000..55db003
--- /dev/null
+++ b/code/utils/source_helpers/apple/song.py
@@ -0,0 +1,74 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the song info from the Apple Music API
+ """
+ song_id = query.split("/album/")[1].split("?i=")[1]
+
+ try:
+ # Get the song info
+ response = requests.get(
+ f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Song Not Found",
+ description=(
+ "The song could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Apple Music API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ # Unpack the song info
+ song = response.json()
+ name = song["data"][0]["attributes"]["name"]
+ artist = song["data"][0]["attributes"]["artistName"]
+ except IndexError:
+ LOG.error("Failed unpacking Apple Music song info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Apple Music API")
+ return None, None
+
+ # Extract artwork URL, if available
+ artwork_url = (
+ song["data"][0]["attributes"].get("artwork", {}).get("url", None)
+ )
+ if artwork_url:
+ artwork_url = artwork_url.replace("{w}x{h}", "300x300")
+
+ embed = discord.Embed(
+ title="Song Queued",
+ description=f"**{name}** by **{artist}**\n\nQueued by: {user.mention}",
+ color=BOT_COLOR,
+ )
+ embed.set_thumbnail(url=artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
+ )
+
+ return song, embed
diff --git a/code/utils/source_helpers/parse.py b/code/utils/source_helpers/parse.py
new file mode 100644
index 0000000..8489515
--- /dev/null
+++ b/code/utils/source_helpers/parse.py
@@ -0,0 +1,80 @@
+import discord
+
+from utils.source_helpers.apple import (
+ album as apple_album,
+ playlist as apple_playlist,
+ song as apple_song,
+)
+from utils.source_helpers.spotify import (
+ album as spotify_album,
+ playlist as spotify_playlist,
+ song as spotify_song,
+)
+from utils.custom_sources import AppleSource, SpotifySource
+
+
+async def parse_custom_source(
+ self, provider: str, query: str, user: discord.User
+):
+ """
+ Parse the query and run the appropriate functions to get the results/info
+
+ Return the results and an embed or None, None
+ """
+ load_funcs = {
+ "apple": {
+ "album": apple_album.load,
+ "playlist": apple_playlist.load,
+ "song": apple_song.load,
+ },
+ "spotify": {
+ "album": spotify_album.load,
+ "playlist": spotify_playlist.load,
+ "song": spotify_song.load,
+ },
+ }
+
+ headers = {
+ "apple": self.bot.apple_headers,
+ "spotify": self.bot.spotify_headers,
+ }
+
+ sources = {
+ "apple": AppleSource,
+ "spotify": SpotifySource,
+ }
+
+ # Catch all songs
+ if "?i=" in query or "/track/" in query:
+ song, embed = await load_funcs[provider]["song"](
+ headers[provider], query, user
+ )
+
+ if song:
+ results = await sources[provider].load_item(self, user, song)
+ else:
+ return None, embed
+ # Catch all playlists
+ elif "/playlist/" in query:
+ playlist, embed = await load_funcs[provider]["playlist"](
+ headers[provider], query, user
+ )
+
+ if playlist:
+ results = await sources[provider].load_playlist(
+ self, user, playlist
+ )
+ else:
+ return None, embed
+ # Catch all albums
+ elif "/album/" in query:
+ album, embed = await load_funcs[provider]["album"](
+ headers[provider], query, user
+ )
+
+ if album:
+ results = await sources[provider].load_album(self, user, album)
+ else:
+ return None, embed
+
+ return results, embed
diff --git a/code/utils/source_helpers/spotify/album.py b/code/utils/source_helpers/spotify/album.py
new file mode 100644
index 0000000..cf527ed
--- /dev/null
+++ b/code/utils/source_helpers/spotify/album.py
@@ -0,0 +1,73 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the album info from the Spotify API
+ """
+ album_id = query.split("/album/")[1]
+
+ try:
+ # Get the album info
+ response = requests.get(
+ f"https://api.spotify.com/v1/albums/{album_id}",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Album Not Found",
+ description=(
+ "The album could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Spotify API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ # Unpack the album info
+ album = response.json()
+ name = album["name"]
+ artist = album["artists"][0]["name"]
+ num_tracks = len(album["tracks"]["items"])
+ artwork_url = album["images"][0]["url"]
+ except IndexError:
+ LOG.error("Failed unpacking Spotify album info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Spotify API")
+ return None, None
+
+ embed = discord.Embed(
+ title="Album Queued",
+ description=(
+ f"**{name}** by **{artist}**\n"
+ f"` {num_tracks} ` tracks\n\n"
+ f"Queued by: {user.mention}"
+ ),
+ color=BOT_COLOR,
+ )
+ embed.set_thumbnail(url=artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
+ )
+
+ return album, embed
diff --git a/code/utils/source_helpers/spotify/playlist.py b/code/utils/source_helpers/spotify/playlist.py
new file mode 100644
index 0000000..c806e39
--- /dev/null
+++ b/code/utils/source_helpers/spotify/playlist.py
@@ -0,0 +1,73 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the playlist info from the Spotify API
+ """
+ playlist_id = query.split("/playlist/")[1].split("?si=")[0]
+
+ try:
+ # Get the playlist info
+ response = requests.get(
+ f"https://api.spotify.com/v1/playlists/{playlist_id}",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Playlist Not Found",
+ description=(
+ "The playlist could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Spotify API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ # Unpack the playlist info
+ playlist = response.json()
+ name = playlist["name"]
+ owner = playlist["owner"]["display_name"]
+ num_tracks = len(playlist["tracks"]["items"])
+ artwork_url = playlist["images"][0]["url"]
+ except IndexError:
+ LOG.error("Failed unpacking Spotify playlist info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Spotify API")
+ return None, None
+
+ embed = discord.Embed(
+ title="Playlist Queued",
+ description=(
+ f"**{name}** from **{owner}**\n"
+ f"` {num_tracks} ` tracks\n\n"
+ f"Queued by {user.mention}"
+ ),
+ color=BOT_COLOR,
+ )
+ embed.set_thumbnail(url=artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
+ )
+
+ return playlist, embed
diff --git a/code/utils/source_helpers/spotify/song.py b/code/utils/source_helpers/spotify/song.py
new file mode 100644
index 0000000..258b652
--- /dev/null
+++ b/code/utils/source_helpers/spotify/song.py
@@ -0,0 +1,68 @@
+import datetime
+import discord
+import requests
+from typing import Tuple, Optional
+from requests.exceptions import JSONDecodeError
+
+from utils.config import BOT_COLOR, LOG
+
+
+async def load(
+ headers: dict,
+ query: str,
+ user: discord.User,
+) -> Tuple[Optional[dict], Optional[discord.Embed]]:
+ """
+ Get the song info from the Spotify API
+ """
+ song_id = query.split("/track/")[1]
+
+ try:
+ # Get the song info
+ response = requests.get(
+ f"https://api.spotify.com/v1/tracks/{song_id}",
+ headers=headers,
+ )
+
+ if response.status_code == 404:
+ embed = discord.Embed(
+ title="Song Not Found",
+ description=(
+ "The song could not be found as the provided link is"
+ " invalid. Please try again."
+ ),
+ color=BOT_COLOR,
+ )
+ return None, embed
+
+ if response.status_code == 401:
+ LOG.error(
+ "Could not authorize with Spotify API. Likely need to"
+ " restart the bot."
+ )
+ return None, None
+
+ response.raise_for_status()
+ # Unpack the song info
+ song = response.json()
+ name = song["name"]
+ artist = song["artists"][0]["name"]
+ artwork_url = song["album"]["images"][0]["url"]
+ except IndexError:
+ LOG.error("Failed unpacking Spotify song info")
+ return None, None
+ except (JSONDecodeError, requests.HTTPError):
+ LOG.error("Failed making request to Spotify API")
+ return None, None
+
+ embed = discord.Embed(
+ title="Song Queued",
+ description=f"**{name}** by **{artist}**\n\nQueued by {user.mention}",
+ color=BOT_COLOR,
+ )
+ embed.set_thumbnail(url=artwork_url)
+ embed.set_footer(
+ text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
+ )
+
+ return song, embed