Change Spotify support to use custom source

This commit is contained in:
Parker M. 2024-04-09 01:28:01 -05:00
parent 2040a2f2fb
commit d7fa64ffa2
No known key found for this signature in database
GPG Key ID: 95CD2E0C7E329F2A
3 changed files with 263 additions and 41 deletions

View File

@ -1,9 +1,10 @@
import discord
from discord.ext import commands
from discord.ext import commands, tasks
import os
import requests
from validate_config import create_config
from global_variables import LOG, BOT_TOKEN
from global_variables import LOG, BOT_TOKEN, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET
class MyBot(commands.Bot):
@ -16,6 +17,7 @@ class MyBot(commands.Bot):
async def setup_hook(self):
create_config()
get_access_token.start()
for ext in os.listdir("./code/cogs"):
if ext.endswith(".py"):
await self.load_extension(f"cogs.{ext[:-3]}")
@ -33,5 +35,18 @@ async def on_ready():
LOG.info(f"{bot.user} has connected to Discord.")
@tasks.loop(minutes=45)
async def get_access_token():
auth_url = "https://accounts.spotify.com/api/token"
data = {
"grant_type": "client_credentials",
"client_id": SPOTIFY_CLIENT_ID,
"client_secret": SPOTIFY_CLIENT_SECRET,
}
response = requests.post(auth_url, data=data)
access_token = response.json()["access_token"]
bot.spotify_headers = {"Authorization": f"Bearer {access_token}"}
if __name__ == "__main__":
bot.run(BOT_TOKEN)

View File

@ -6,8 +6,10 @@ from cogs.music import Music
from lavalink import LoadType
import re
from cogs.music import LavalinkVoiceClient
import requests
from global_variables import BOT_COLOR
from custom_source import CustomSource
url_rx = re.compile(r"https?://(?:www\.)?.+")
@ -42,57 +44,148 @@ class Play(commands.Cog):
)
return await interaction.response.send_message(embed=embed)
if not url_rx.match(query):
ytsearch = f"scsearch:{query}"
results = await player.node.get_tracks(ytsearch)
# If a Spotify link is found, act accordingly
# We use a custom source for this (I tried the LavaSrc plugin, but Spotify
# links would just result in random shit being played whenever ytsearch was removed)
if "open.spotify.com" in query:
embed = discord.Embed(color=BOT_COLOR)
if "open.spotify.com/playlist" in query:
playlist_id = query.split("playlist/")[1].split("?si=")[0]
playlist_url = f"https://api.spotify.com/v1/playlists/{playlist_id}"
response = requests.get(playlist_url, headers=self.bot.spotify_headers)
if response.status_code == 200:
playlist = response.json()
embed.title = "Playlist Queued"
embed.description = f"**{playlist['name']}** from **{playlist['owner']['display_name']}**\n` {len(playlist['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}"
embed.set_thumbnail(url=playlist["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
tracks = await CustomSource.load_playlist(
self, interaction.user, playlist
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
if "open.spotify.com/album" in query:
album_id = query.split("album/")[1]
album_url = f"https://api.spotify.com/v1/albums/{album_id}"
response = requests.get(album_url, headers=self.bot.spotify_headers)
if response.status_code == 200:
album = response.json()
embed.title = "Album Queued"
embed.description = f"**{album['name']}** by **{album['artists'][0]['name']}**\n` {len(album['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}"
embed.set_thumbnail(url=album["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
tracks = await CustomSource.load_album(
self, interaction.user, album
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
if "open.spotify.com/track" in query:
track_id = query.split("track/")[1]
track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
response = requests.get(track_url, headers=self.bot.spotify_headers)
if response.status_code == 200:
track = response.json()
embed.title = "Track Queued"
embed.description = f"**{track['name']}** by **{track['artists'][0]['name']}**\n\nQueued by: {interaction.user.mention}"
embed.set_thumbnail(url=track["album"]["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
results = await CustomSource.load_item(
self, interaction.user, track
)
player.add(requester=interaction.user, track=results.tracks[0])
if "open.spotify.com/artists" in query:
embed.title = "Artists Cannot Be Played"
embed.description = "I cannot play just artists, you must provide a song/album/playlist. Please try again."
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
# For anything else, do default searches and attempt to find track and play
else:
if not url_rx.match(query):
ytsearch = f"scsearch:{query}"
results = await player.node.get_tracks(ytsearch)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
scsearch = f"dzsearch:{query}"
results = await player.node.get_tracks(scsearch)
else:
results = await player.node.get_tracks(query)
embed = discord.Embed(color=BOT_COLOR)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
scsearch = f"dzsearch:{query}"
results = await player.node.get_tracks(scsearch)
else:
results = await player.node.get_tracks(query)
embed.title = "Nothing Found"
embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know."
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
embed = discord.Embed(color=BOT_COLOR)
elif results.load_type == LoadType.PLAYLIST:
tracks = results.tracks
if not results.tracks or results.load_type in (LoadType.EMPTY, LoadType.ERROR):
embed.title = "Nothing Found"
embed.description = "Nothing for that query could be found. If this continues happening for other songs, please run `/bug` to let the developer know."
return await interaction.response.send_message(embed=embed, ephemeral=True)
for track in tracks:
player.add(requester=interaction.user, track=track)
elif results.load_type == LoadType.PLAYLIST:
tracks = results.tracks
embed.title = "Songs Queued!"
embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}"
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
for track in tracks:
else:
track = results.tracks[0]
player.add(requester=interaction.user, track=track)
embed.title = "Songs Queued!"
embed.description = f"**{results.playlist_info.name}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}"
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
embed.title = "Track Queued"
embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}"
embed.set_thumbnail(url=track.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
else:
track = results.tracks[0]
embed.title = "Track Queued"
embed.description = f"**{track.title}** by **{track.author}**\n\nQueued by: {interaction.user.mention}"
embed.set_thumbnail(url=track.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
player.add(requester=interaction.user, track=track)
await interaction.response.send_message(embed=embed)
await interaction.response.send_message(embed=embed)
# Only join the voice channel now, so that the bot doesn't join if nothing is found
if not interaction.guild.voice_client:

114
code/custom_source.py Normal file
View File

@ -0,0 +1,114 @@
from lavalink import LoadResult, LoadType, Source, DeferredAudioTrack, PlaylistInfo
class LoadError(Exception): # We'll raise this if we have trouble loading our track.
pass
class CustomAudioTrack(DeferredAudioTrack):
# A DeferredAudioTrack allows us to load metadata now, and a playback URL later.
# This makes the DeferredAudioTrack highly efficient, particularly in cases
# where large playlists are loaded.
async def load(
self, client
): # Load our 'actual' playback track using the metadata from this one.
scsearch = f"scsearch:{self.title} {self.author}"
results = await client.get_tracks(scsearch)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
dzsearch = f"dzsearch:{self.title} {self.author}"
results = await client.get_tracks(dzsearch)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
raise LoadError
first_track = results.tracks[0] # Grab the first track from the results.
base64 = first_track.track # Extract the base64 string from the track.
self.track = base64 # We'll store this for later, as it allows us to save making network requests
# if this track is re-used (e.g. repeat).
return base64
class CustomSource(Source):
def __init__(self):
super().__init__(
name="custom"
) # Initialising our custom source with the name 'custom'.
async def load_item(self, user, metadata):
track = CustomAudioTrack(
{ # Create an instance of our CustomAudioTrack.
"identifier": metadata[
"id"
], # Fill it with metadata that we've obtained from our source's provider.
"isSeekable": True,
"author": metadata["artists"][0]["name"],
"length": metadata["duration_ms"],
"isStream": False,
"title": metadata["name"],
"uri": metadata["external_urls"]["spotify"],
"duration": metadata["duration_ms"],
"artworkUrl": metadata["album"]["images"][0]["url"],
},
requester=user,
)
return LoadResult(LoadType.TRACK, [track], playlist_info=PlaylistInfo.none())
async def load_album(self, user, metadata):
tracks = []
for track in metadata["tracks"][
"items"
]: # Loop through each track in the album.
tracks.append(
CustomAudioTrack(
{ # Create an instance of our CustomAudioTrack.
"identifier": track[
"id"
], # Fill it with metadata that we've obtained from our source's provider.
"isSeekable": True,
"author": track["artists"][0]["name"],
"length": track["duration_ms"],
"isStream": False,
"title": track["name"],
"uri": track["external_urls"]["spotify"],
"duration": track["duration_ms"],
"artworkUrl": metadata["images"][0]["url"],
},
requester=user,
)
)
return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())
async def load_playlist(self, user, metadata):
tracks = []
for track in metadata["tracks"][
"items"
]: # Loop through each track in the playlist.
tracks.append(
CustomAudioTrack(
{ # Create an instance of our CustomAudioTrack.
"identifier": track["track"][
"id"
], # Fill it with metadata that we've obtained from our source's provider.
"isSeekable": True,
"author": track["track"]["artists"][0]["name"],
"length": track["track"]["duration_ms"],
"isStream": False,
"title": track["track"]["name"],
"uri": track["track"]["external_urls"]["spotify"],
"duration": track["track"]["duration_ms"],
"artworkUrl": track["track"]["album"]["images"][0]["url"],
},
requster=user,
)
)
return LoadResult(LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none())