Overhaul \play\
command
- Split custom sources into helper functions - Add proper logging and handling - Fix LoadError embed messsage
This commit is contained in:
parent
5eb665c99b
commit
382c422629
@ -4,11 +4,14 @@ from discord import app_commands
|
||||
from discord.ext import commands
|
||||
from lavalink import LoadType
|
||||
import re
|
||||
import requests
|
||||
|
||||
from cogs.music import Music, LavalinkVoiceClient
|
||||
from utils.config import BOT_COLOR, YOUTUBE_SUPPORT
|
||||
from utils.custom_sources import SpotifySource, AppleSource
|
||||
from utils.custom_sources import (
|
||||
LoadError,
|
||||
CustomAudioTrack,
|
||||
)
|
||||
from utils.source_helpers.parse import parse_custom_source
|
||||
|
||||
|
||||
url_rx = re.compile(r"https?://(?:www\.)?.+")
|
||||
@ -43,366 +46,137 @@ class Play(commands.Cog):
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
|
||||
###
|
||||
### APPLE MUSIC links, perform API requests and load all tracks from the playlist/album/track
|
||||
###
|
||||
|
||||
if "music.apple.com" in query:
|
||||
if not self.bot.apple_headers:
|
||||
embed = discord.Embed(
|
||||
title="Apple Music Error",
|
||||
description=(
|
||||
"Apple Music support seems to be broken at the moment."
|
||||
" Please try again and fill out a bug report with"
|
||||
" </bug:1224840889906499626> if this continues to"
|
||||
" happen."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
|
||||
embed = discord.Embed(color=BOT_COLOR)
|
||||
|
||||
if "/playlist/" in query and "?i=" not in query:
|
||||
playlist_id = query.split("/playlist/")[1].split("/")[1]
|
||||
# Get all of the tracks in the playlist (limit at 250)
|
||||
playlist_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100"
|
||||
response = requests.get(
|
||||
playlist_url, headers=self.bot.apple_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
playlist = response.json()
|
||||
# Get the general playlist info (name, artwork)
|
||||
playlist_info_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}"
|
||||
playlist_info = requests.get(
|
||||
playlist_info_url, headers=self.bot.apple_headers
|
||||
)
|
||||
playlist_info = playlist_info.json()
|
||||
try:
|
||||
artwork_url = playlist_info["data"][0]["attributes"][
|
||||
"artwork"
|
||||
]["url"].replace("{w}x{h}", "300x300")
|
||||
except KeyError:
|
||||
artwork_url = None
|
||||
|
||||
embed.title = "Playlist Queued"
|
||||
embed.description = (
|
||||
f"**{playlist_info['data'][0]['attributes']['name']}**\n`"
|
||||
f" {len(playlist['data'])} ` tracks\n\nQueued by:"
|
||||
f" {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
# Add small alert if the playlist is the max size
|
||||
if len(playlist["data"]) == 100:
|
||||
embed.description += (
|
||||
"\n\n*This playlist is longer than the 100 song"
|
||||
" maximum. Only the first 100 songs will be"
|
||||
" queued.*"
|
||||
)
|
||||
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
tracks = await AppleSource.load_playlist(
|
||||
self, interaction.user, playlist
|
||||
)
|
||||
for track in tracks["tracks"]:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
# If there is an album, not a specific song within the album
|
||||
if "/album/" in query and "?i=" not in query:
|
||||
album_id = query.split("/album/")[1].split("/")[1]
|
||||
album_url = f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}"
|
||||
response = requests.get(
|
||||
album_url, headers=self.bot.apple_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
album = response.json()
|
||||
|
||||
embed.title = "Album Queued"
|
||||
embed.description = (
|
||||
f"**{album['data'][0]['attributes']['name']}** by"
|
||||
f" **{album['data'][0]['attributes']['artistName']}**\n`"
|
||||
f" {len(album['data'][0]['relationships']['tracks']['data'])} `"
|
||||
f" tracks\n\nQueued by: {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(
|
||||
url=album["data"][0]["attributes"]["artwork"][
|
||||
"url"
|
||||
].replace("{w}x{h}", "300x300")
|
||||
)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
tracks = await AppleSource.load_album(
|
||||
self, interaction.user, album
|
||||
)
|
||||
for track in tracks["tracks"]:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
# If there is a specific song
|
||||
if "/album/" in query and "?i=" in query:
|
||||
song_id = query.split("/album/")[1].split("?i=")[1]
|
||||
song_url = f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}"
|
||||
response = requests.get(
|
||||
song_url, headers=self.bot.apple_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
song = response.json()
|
||||
|
||||
embed.title = "Song Queued"
|
||||
embed.description = (
|
||||
f"**{song['data'][0]['attributes']['name']}** by"
|
||||
f" **{song['data'][0]['attributes']['artistName']}**\n\nQueued"
|
||||
f" by: {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(
|
||||
url=song["data"][0]["attributes"]["artwork"][
|
||||
"url"
|
||||
].replace("{w}x{h}", "300x300")
|
||||
)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
results = await AppleSource.load_item(
|
||||
self, interaction.user, song
|
||||
)
|
||||
player.add(
|
||||
requester=interaction.user, track=results.tracks[0]
|
||||
)
|
||||
|
||||
###
|
||||
### SPOTIFY links, perform API requests and load all tracks from the playlist/album/track
|
||||
###
|
||||
results, embed = await parse_custom_source(
|
||||
self, "apple", query, interaction.user
|
||||
)
|
||||
|
||||
elif "open.spotify.com" in query:
|
||||
if not self.bot.spotify_headers:
|
||||
embed = discord.Embed(
|
||||
title="Spotify Error",
|
||||
description=(
|
||||
"Spotify support seems to be broken at the moment."
|
||||
" Please try again and fill out a bug report with"
|
||||
" </bug:1224840889906499626> if this continues to"
|
||||
" happen."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
|
||||
embed = discord.Embed(color=BOT_COLOR)
|
||||
|
||||
if "open.spotify.com/playlist" in query:
|
||||
playlist_id = query.split("playlist/")[1].split("?si=")[0]
|
||||
playlist_url = (
|
||||
f"https://api.spotify.com/v1/playlists/{playlist_id}"
|
||||
)
|
||||
response = requests.get(
|
||||
playlist_url, headers=self.bot.spotify_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
playlist = response.json()
|
||||
|
||||
embed.title = "Playlist Queued"
|
||||
embed.description = (
|
||||
f"**{playlist['name']}** from"
|
||||
f" **{playlist['owner']['display_name']}**\n`"
|
||||
f" {len(playlist['tracks']['items'])} `"
|
||||
f" tracks\n\nQueued by: {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(url=playlist["images"][0]["url"])
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
tracks = await SpotifySource.load_playlist(
|
||||
self, interaction.user, playlist
|
||||
)
|
||||
for track in tracks["tracks"]:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
if "open.spotify.com/album" in query:
|
||||
album_id = query.split("album/")[1]
|
||||
album_url = f"https://api.spotify.com/v1/albums/{album_id}"
|
||||
response = requests.get(
|
||||
album_url, headers=self.bot.spotify_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
album = response.json()
|
||||
|
||||
embed.title = "Album Queued"
|
||||
embed.description = (
|
||||
f"**{album['name']}** by"
|
||||
f" **{album['artists'][0]['name']}**\n`"
|
||||
f" {len(album['tracks']['items'])} ` tracks\n\nQueued"
|
||||
f" by: {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(url=album["images"][0]["url"])
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
tracks = await SpotifySource.load_album(
|
||||
self, interaction.user, album
|
||||
)
|
||||
for track in tracks["tracks"]:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
if "open.spotify.com/track" in query:
|
||||
track_id = query.split("track/")[1]
|
||||
track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
|
||||
response = requests.get(
|
||||
track_url, headers=self.bot.spotify_headers
|
||||
)
|
||||
if response.status_code == 200:
|
||||
track = response.json()
|
||||
|
||||
embed.title = "Track Queued"
|
||||
embed.description = (
|
||||
f"**{track['name']}** by"
|
||||
f" **{track['artists'][0]['name']}**\n\nQueued by:"
|
||||
f" {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(url=track["album"]["images"][0]["url"])
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(
|
||||
datetime.timezone.utc
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
results = await SpotifySource.load_item(
|
||||
self, interaction.user, track
|
||||
)
|
||||
player.add(
|
||||
requester=interaction.user, track=results.tracks[0]
|
||||
)
|
||||
|
||||
if "open.spotify.com/artists" in query:
|
||||
embed.title = "Artists Cannot Be Played"
|
||||
embed.description = (
|
||||
"I cannot play just artists, you must provide a"
|
||||
" song/album/playlist. Please try again."
|
||||
)
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
results, embed = await parse_custom_source(
|
||||
self, "spotify", query, interaction.user
|
||||
)
|
||||
|
||||
###
|
||||
### For anything else, use default Lavalink providers to search the query
|
||||
###
|
||||
|
||||
else:
|
||||
# If the query is not a URL, begin searching
|
||||
if not url_rx.match(query):
|
||||
dzsearch = f"dzsearch:{query}"
|
||||
results = await player.node.get_tracks(dzsearch)
|
||||
|
||||
# If Deezer returned nothing
|
||||
if not results.tracks or results.load_type in (
|
||||
LoadType.EMPTY,
|
||||
LoadType.ERROR,
|
||||
):
|
||||
ytmsearch = f"ytmsearch:{query}"
|
||||
results = await player.node.get_tracks(ytmsearch)
|
||||
|
||||
if not results.tracks or results.load_type in (
|
||||
LoadType.EMPTY,
|
||||
LoadType.ERROR,
|
||||
):
|
||||
ytsearch = f"ytsearch:{query}"
|
||||
results = await player.node.get_tracks(ytsearch)
|
||||
if YOUTUBE_SUPPORT:
|
||||
ytmsearch = f"ytmsearch:{query}"
|
||||
results = await player.node.get_tracks(ytmsearch)
|
||||
# If YouTube Music returned nothing
|
||||
if not results.tracks or results.load_type in (
|
||||
LoadType.EMPTY,
|
||||
LoadType.ERROR,
|
||||
):
|
||||
# Final search attempt with YouTube
|
||||
ytsearch = f"ytsearch:{query}"
|
||||
results = await player.node.get_tracks(ytsearch)
|
||||
else:
|
||||
results = await player.node.get_tracks(query)
|
||||
|
||||
embed = discord.Embed(color=BOT_COLOR)
|
||||
|
||||
# If there are no results found, set results/embed to None, handled further down
|
||||
if not results.tracks or results.load_type in (
|
||||
LoadType.EMPTY,
|
||||
LoadType.ERROR,
|
||||
):
|
||||
embed.title = "Nothing Found"
|
||||
embed.description = (
|
||||
"Nothing for that query could be found. If this continues"
|
||||
" happening for other songs, please run"
|
||||
" </bug:1224840889906499626> to let the developer know."
|
||||
)
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
results, embed = None, None
|
||||
|
||||
elif results.load_type == LoadType.PLAYLIST:
|
||||
tracks = results.tracks
|
||||
|
||||
for track in tracks:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
embed.title = "Songs Queued!"
|
||||
embed.description = (
|
||||
f"**{results.playlist_info.name}**\n` {len(tracks)} `"
|
||||
f" tracks\n\nQueued by: {interaction.user.mention}"
|
||||
# Create the embed if the results are a playlist
|
||||
if results.load_type == LoadType.PLAYLIST:
|
||||
embed = discord.Embed(
|
||||
title="Songs Queued!",
|
||||
description=(
|
||||
f"**{results.playlist_info.name}**\n"
|
||||
f"` {len(results.tracks)} ` tracks\n\n"
|
||||
f"Queued by: {interaction.user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(datetime.timezone.utc).strftime(
|
||||
text=datetime.datetime.utcnow().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
# Otherwise, the result is just a single track, create that embed
|
||||
else:
|
||||
# Remove all but first track (most relevant result)
|
||||
results.tracks = results.tracks[:1]
|
||||
embed = discord.Embed(
|
||||
title="Song Queued!",
|
||||
description=(
|
||||
f"**{results.tracks[0].title}** by"
|
||||
f" **{results.tracks[0].author}**\n\nQueued by:"
|
||||
f" {interaction.user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=results.tracks[0].artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
else:
|
||||
track = results.tracks[0]
|
||||
# If there are no results, and no embed
|
||||
if not results and not embed:
|
||||
embed = discord.Embed(
|
||||
title="Nothing Found",
|
||||
description=(
|
||||
"I was not able to find or load any songs for that query."
|
||||
" Please try again and fill out a bug report with"
|
||||
" </bug:1224840889906499626> if this continues to happen."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
# If there are no results, but there is an embed (error msg)
|
||||
elif embed and not results:
|
||||
return await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
# If there are results, add them to the player
|
||||
else:
|
||||
for track in results.tracks:
|
||||
player.add(requester=interaction.user, track=track)
|
||||
|
||||
embed.title = "Track Queued"
|
||||
embed.description = (
|
||||
f"**{track.title}** by **{track.author}**\n\nQueued by:"
|
||||
f" {interaction.user.mention}"
|
||||
)
|
||||
embed.set_thumbnail(url=track.artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(datetime.timezone.utc).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
+ " UTC"
|
||||
)
|
||||
await interaction.response.send_message(embed=embed)
|
||||
# If the track is CustomAudioTrack (Apple Music/Spotify)
|
||||
if type(results.tracks[0]) == CustomAudioTrack:
|
||||
# Attempt to load an actual track from a provider
|
||||
try:
|
||||
await results.tracks[0].load(player.node)
|
||||
# If it fails, remove it from the queue and alert the user
|
||||
except LoadError:
|
||||
player.queue.remove(results.tracks[0])
|
||||
raise LoadError
|
||||
|
||||
# Only join the voice channel now, so that the bot doesn't join if nothing is found
|
||||
if not interaction.guild.voice_client:
|
||||
await interaction.user.voice.channel.connect(
|
||||
cls=LavalinkVoiceClient, self_deaf=True
|
||||
)
|
||||
# Join the voice channel if not already connected
|
||||
if not interaction.guild.voice_client:
|
||||
await interaction.user.voice.channel.connect(
|
||||
cls=LavalinkVoiceClient, self_deaf=True
|
||||
)
|
||||
|
||||
# We don't want to call .play() if the player is playing as that will
|
||||
# effectively skip the current track
|
||||
if not player.is_playing:
|
||||
await player.play()
|
||||
# Only call player.play if it is not already playing, otherwise it will
|
||||
# effectively skip the current track
|
||||
if not player.is_playing:
|
||||
await player.play()
|
||||
|
||||
await interaction.response.send_message(embed=embed)
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
|
@ -84,31 +84,23 @@ class Tree(app_commands.CommandTree):
|
||||
except discord.errors.InteractionResponded:
|
||||
await interaction.followup.send(embed=embed, ephemeral=True)
|
||||
|
||||
# If a Spotify song is linked but cannot be found on a provider (e.g. YouTube)
|
||||
elif isinstance(error, LoadError):
|
||||
elif (error, LoadError):
|
||||
embed = discord.Embed(
|
||||
title="Nothing Found",
|
||||
title="Load Error",
|
||||
description=(
|
||||
"Spotify does not allow direct play, meaning songs have to"
|
||||
" be found on a supported provider. In this case, the song"
|
||||
" couldn't be found. Please try again with a different"
|
||||
" song, or try searching for just the name and artist"
|
||||
" manually rather than sending a link."
|
||||
"Apple Music and Spotify do not allow direct playing from"
|
||||
" their websites, and I was unable to load a track on a"
|
||||
" valid source. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.now(datetime.timezone.utc).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
+ " UTC"
|
||||
)
|
||||
# Only send the error if the interaction is still valid
|
||||
try:
|
||||
await interaction.response.send_message(
|
||||
embed=embed, ephemeral=True
|
||||
)
|
||||
except discord.errors.InteractionResponded:
|
||||
await interaction.followup.send(embed=embed, ephemeral=True)
|
||||
pass
|
||||
|
||||
else:
|
||||
raise error
|
||||
|
79
code/utils/source_helpers/apple/album.py
Normal file
79
code/utils/source_helpers/apple/album.py
Normal file
@ -0,0 +1,79 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the album info from the Apple Music API
|
||||
"""
|
||||
album_id = query.split("/album/")[1].split("/")[1]
|
||||
|
||||
try:
|
||||
# Get the album info
|
||||
response = requests.get(
|
||||
f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Album Not Found",
|
||||
description=(
|
||||
"The album could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Apple Music API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the album info
|
||||
album = response.json()
|
||||
name = album["data"][0]["attributes"]["name"]
|
||||
artist = album["data"][0]["attributes"]["artistName"]
|
||||
num_tracks = len(album["data"][0]["relationships"]["tracks"]["data"])
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Apple Music album info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Apple Music API")
|
||||
return None, None
|
||||
|
||||
# Extract artwork URL, if available
|
||||
artwork_url = (
|
||||
album["data"][0]["attributes"].get("artwork", {}).get("url", None)
|
||||
)
|
||||
if artwork_url:
|
||||
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Album Queued",
|
||||
description=(
|
||||
f"**{name}** by **{artist}**\n"
|
||||
f"` {num_tracks} ` tracks\n\n"
|
||||
f"Queued by: {user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
|
||||
)
|
||||
|
||||
return album, embed
|
90
code/utils/source_helpers/apple/playlist.py
Normal file
90
code/utils/source_helpers/apple/playlist.py
Normal file
@ -0,0 +1,90 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the playlist info from the Apple Music API
|
||||
"""
|
||||
playlist_id = query.split("/playlist/")[1].split("/")[1]
|
||||
try:
|
||||
# Get all of the tracks in the playlist (limit at 100)
|
||||
response = requests.get(
|
||||
f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Playlist Not Found",
|
||||
description=(
|
||||
"The playlist could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Apple Music API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
playlist = response.json()
|
||||
|
||||
# Get the general playlist info (name, artwork)
|
||||
response = requests.get(
|
||||
f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the playlist info
|
||||
playlist_info = response.json()
|
||||
name = playlist_info["data"][0]["attributes"]["name"]
|
||||
num_tracks = len(playlist["data"])
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Apple Music playlist info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Apple Music API")
|
||||
return None, None
|
||||
|
||||
# Extract artwork URL, if available
|
||||
artwork_url = (
|
||||
playlist_info["data"][0]["attributes"]
|
||||
.get("artwork", {})
|
||||
.get("url", None)
|
||||
)
|
||||
if artwork_url:
|
||||
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Playlist Queued",
|
||||
description=(
|
||||
f"**{name}**\n` {num_tracks} ` tracks\n\nQueued by: {user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
|
||||
# Add small alert if the playlist is the max size
|
||||
if len(playlist["data"]) == 100:
|
||||
embed.description += (
|
||||
"\n\n*This playlist is longer than the 100 song"
|
||||
" maximum. Only the first 100 songs will be"
|
||||
" queued.*"
|
||||
)
|
||||
|
||||
return playlist, embed
|
74
code/utils/source_helpers/apple/song.py
Normal file
74
code/utils/source_helpers/apple/song.py
Normal file
@ -0,0 +1,74 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the song info from the Apple Music API
|
||||
"""
|
||||
song_id = query.split("/album/")[1].split("?i=")[1]
|
||||
|
||||
try:
|
||||
# Get the song info
|
||||
response = requests.get(
|
||||
f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Song Not Found",
|
||||
description=(
|
||||
"The song could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Apple Music API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the song info
|
||||
song = response.json()
|
||||
name = song["data"][0]["attributes"]["name"]
|
||||
artist = song["data"][0]["attributes"]["artistName"]
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Apple Music song info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Apple Music API")
|
||||
return None, None
|
||||
|
||||
# Extract artwork URL, if available
|
||||
artwork_url = (
|
||||
song["data"][0]["attributes"].get("artwork", {}).get("url", None)
|
||||
)
|
||||
if artwork_url:
|
||||
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Song Queued",
|
||||
description=f"**{name}** by **{artist}**\n\nQueued by: {user.mention}",
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
|
||||
)
|
||||
|
||||
return song, embed
|
80
code/utils/source_helpers/parse.py
Normal file
80
code/utils/source_helpers/parse.py
Normal file
@ -0,0 +1,80 @@
|
||||
import discord
|
||||
|
||||
from utils.source_helpers.apple import (
|
||||
album as apple_album,
|
||||
playlist as apple_playlist,
|
||||
song as apple_song,
|
||||
)
|
||||
from utils.source_helpers.spotify import (
|
||||
album as spotify_album,
|
||||
playlist as spotify_playlist,
|
||||
song as spotify_song,
|
||||
)
|
||||
from utils.custom_sources import AppleSource, SpotifySource
|
||||
|
||||
|
||||
async def parse_custom_source(
|
||||
self, provider: str, query: str, user: discord.User
|
||||
):
|
||||
"""
|
||||
Parse the query and run the appropriate functions to get the results/info
|
||||
|
||||
Return the results and an embed or None, None
|
||||
"""
|
||||
load_funcs = {
|
||||
"apple": {
|
||||
"album": apple_album.load,
|
||||
"playlist": apple_playlist.load,
|
||||
"song": apple_song.load,
|
||||
},
|
||||
"spotify": {
|
||||
"album": spotify_album.load,
|
||||
"playlist": spotify_playlist.load,
|
||||
"song": spotify_song.load,
|
||||
},
|
||||
}
|
||||
|
||||
headers = {
|
||||
"apple": self.bot.apple_headers,
|
||||
"spotify": self.bot.spotify_headers,
|
||||
}
|
||||
|
||||
sources = {
|
||||
"apple": AppleSource,
|
||||
"spotify": SpotifySource,
|
||||
}
|
||||
|
||||
# Catch all songs
|
||||
if "?i=" in query or "/track/" in query:
|
||||
song, embed = await load_funcs[provider]["song"](
|
||||
headers[provider], query, user
|
||||
)
|
||||
|
||||
if song:
|
||||
results = await sources[provider].load_item(self, user, song)
|
||||
else:
|
||||
return None, embed
|
||||
# Catch all playlists
|
||||
elif "/playlist/" in query:
|
||||
playlist, embed = await load_funcs[provider]["playlist"](
|
||||
headers[provider], query, user
|
||||
)
|
||||
|
||||
if playlist:
|
||||
results = await sources[provider].load_playlist(
|
||||
self, user, playlist
|
||||
)
|
||||
else:
|
||||
return None, embed
|
||||
# Catch all albums
|
||||
elif "/album/" in query:
|
||||
album, embed = await load_funcs[provider]["album"](
|
||||
headers[provider], query, user
|
||||
)
|
||||
|
||||
if album:
|
||||
results = await sources[provider].load_album(self, user, album)
|
||||
else:
|
||||
return None, embed
|
||||
|
||||
return results, embed
|
73
code/utils/source_helpers/spotify/album.py
Normal file
73
code/utils/source_helpers/spotify/album.py
Normal file
@ -0,0 +1,73 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the album info from the Spotify API
|
||||
"""
|
||||
album_id = query.split("/album/")[1]
|
||||
|
||||
try:
|
||||
# Get the album info
|
||||
response = requests.get(
|
||||
f"https://api.spotify.com/v1/albums/{album_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Album Not Found",
|
||||
description=(
|
||||
"The album could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Spotify API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the album info
|
||||
album = response.json()
|
||||
name = album["name"]
|
||||
artist = album["artists"][0]["name"]
|
||||
num_tracks = len(album["tracks"]["items"])
|
||||
artwork_url = album["images"][0]["url"]
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Spotify album info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Spotify API")
|
||||
return None, None
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Album Queued",
|
||||
description=(
|
||||
f"**{name}** by **{artist}**\n"
|
||||
f"` {num_tracks} ` tracks\n\n"
|
||||
f"Queued by: {user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
|
||||
)
|
||||
|
||||
return album, embed
|
73
code/utils/source_helpers/spotify/playlist.py
Normal file
73
code/utils/source_helpers/spotify/playlist.py
Normal file
@ -0,0 +1,73 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the playlist info from the Spotify API
|
||||
"""
|
||||
playlist_id = query.split("/playlist/")[1].split("?si=")[0]
|
||||
|
||||
try:
|
||||
# Get the playlist info
|
||||
response = requests.get(
|
||||
f"https://api.spotify.com/v1/playlists/{playlist_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Playlist Not Found",
|
||||
description=(
|
||||
"The playlist could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Spotify API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the playlist info
|
||||
playlist = response.json()
|
||||
name = playlist["name"]
|
||||
owner = playlist["owner"]["display_name"]
|
||||
num_tracks = len(playlist["tracks"]["items"])
|
||||
artwork_url = playlist["images"][0]["url"]
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Spotify playlist info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Spotify API")
|
||||
return None, None
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Playlist Queued",
|
||||
description=(
|
||||
f"**{name}** from **{owner}**\n"
|
||||
f"` {num_tracks} ` tracks\n\n"
|
||||
f"Queued by {user.mention}"
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
|
||||
)
|
||||
|
||||
return playlist, embed
|
68
code/utils/source_helpers/spotify/song.py
Normal file
68
code/utils/source_helpers/spotify/song.py
Normal file
@ -0,0 +1,68 @@
|
||||
import datetime
|
||||
import discord
|
||||
import requests
|
||||
from typing import Tuple, Optional
|
||||
from requests.exceptions import JSONDecodeError
|
||||
|
||||
from utils.config import BOT_COLOR, LOG
|
||||
|
||||
|
||||
async def load(
|
||||
headers: dict,
|
||||
query: str,
|
||||
user: discord.User,
|
||||
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
|
||||
"""
|
||||
Get the song info from the Spotify API
|
||||
"""
|
||||
song_id = query.split("/track/")[1]
|
||||
|
||||
try:
|
||||
# Get the song info
|
||||
response = requests.get(
|
||||
f"https://api.spotify.com/v1/tracks/{song_id}",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
embed = discord.Embed(
|
||||
title="Song Not Found",
|
||||
description=(
|
||||
"The song could not be found as the provided link is"
|
||||
" invalid. Please try again."
|
||||
),
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
return None, embed
|
||||
|
||||
if response.status_code == 401:
|
||||
LOG.error(
|
||||
"Could not authorize with Spotify API. Likely need to"
|
||||
" restart the bot."
|
||||
)
|
||||
return None, None
|
||||
|
||||
response.raise_for_status()
|
||||
# Unpack the song info
|
||||
song = response.json()
|
||||
name = song["name"]
|
||||
artist = song["artists"][0]["name"]
|
||||
artwork_url = song["album"]["images"][0]["url"]
|
||||
except IndexError:
|
||||
LOG.error("Failed unpacking Spotify song info")
|
||||
return None, None
|
||||
except (JSONDecodeError, requests.HTTPError):
|
||||
LOG.error("Failed making request to Spotify API")
|
||||
return None, None
|
||||
|
||||
embed = discord.Embed(
|
||||
title="Song Queued",
|
||||
description=f"**{name}** by **{artist}**\n\nQueued by {user.mention}",
|
||||
color=BOT_COLOR,
|
||||
)
|
||||
embed.set_thumbnail(url=artwork_url)
|
||||
embed.set_footer(
|
||||
text=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
|
||||
)
|
||||
|
||||
return song, embed
|
Loading…
x
Reference in New Issue
Block a user