Merge pull request #10 from PacketParker/dev

Update
This commit is contained in:
Parker M. 2024-12-03 06:05:14 +00:00 committed by GitHub
commit 15e3383163
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1181 additions and 832 deletions

3
.gitignore vendored
View File

@ -2,4 +2,5 @@ config.ini
__pycache__ __pycache__
count.db count.db
notes.txt notes.txt
config.yaml config.yaml
track_events.log

View File

@ -29,7 +29,7 @@ class MyBot(commands.Bot):
config.LOG.info("Loading cogs...") config.LOG.info("Loading cogs...")
config.LOG.info( config.LOG.info(
"YouTube support is enabled" "YouTube support is enabled, make sure to set a poToken"
if config.YOUTUBE_SUPPORT if config.YOUTUBE_SUPPORT
else "YouTube support is disabled" else "YouTube support is disabled"
) )

View File

@ -6,7 +6,7 @@ from cogs.music import Music
from typing import Literal from typing import Literal
from utils.ai_recommendations import add_song_recommendations from utils.ai_recommendations import add_song_recommendations
from utils.config import BOT_COLOR from utils.config import create_embed
class Autoplay(commands.Cog): class Autoplay(commands.Cog):
@ -19,32 +19,29 @@ class Autoplay(commands.Cog):
async def autoplay( async def autoplay(
self, interaction: discord.Interaction, toggle: Literal["ON", "OFF"] self, interaction: discord.Interaction, toggle: Literal["ON", "OFF"]
): ):
"Keep the music playing forever with music suggestions from OpenAI" "Keep music playing 24/7 with AI-generated song recommendations"
if toggle == "OFF": if toggle == "OFF":
self.bot.autoplay.remove(interaction.guild.id) self.bot.autoplay.remove(interaction.guild.id)
embed = discord.Embed( embed = create_embed(
title="Autoplay Off", title="Autoplay Off",
description=( description=(
"Autoplay has been turned off. I will no longer" "Autoplay has been turned off. Song recommendations will"
" automatically add new songs to the queue based on AI" " no longer be added to the queue."
" recommendations."
), ),
color=BOT_COLOR,
) )
return await interaction.response.send_message(embed=embed) return await interaction.response.send_message(embed=embed)
# Otherwise, toggle must be "ON", so enable autoplaying # Otherwise, toggle must be "ON", so enable autoplaying
if interaction.guild.id in self.bot.autoplay: if interaction.guild.id in self.bot.autoplay:
embed = discord.Embed( embed = create_embed(
title="Autoplay Already Enabled", title="Autoplay Already Enabled",
description=( description=(
"Autoplay is already enabled. If you would like to turn it" "Autoplay is already enabled. If you would like to turn it"
" off, choose the `OFF` option in the" " off, run </autoplay:1228216490386391052> and choose the"
" </autoplay:1228216490386391052> command." " `OFF` option."
), ),
color=BOT_COLOR,
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
@ -53,21 +50,13 @@ class Autoplay(commands.Cog):
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
if len(player.queue) < 5: if len(player.queue) < 5:
embed = discord.Embed( embed = create_embed(
title="Not Enough Context", title="Not Enough Context",
description=( description=(
"You must have at least 5 songs in the queue so that I can" "Autoplay requires at least 5 songs in the queue in order"
" get a good understanding of what music I should continue" " to generate recommendations. Please add more and try"
" to play. Add some more music to the queue, then try"
" again." " again."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
@ -77,13 +66,12 @@ class Autoplay(commands.Cog):
for song in player.queue[:10]: for song in player.queue[:10]:
inputs[song.title] = song.author inputs[song.title] = song.author
embed = discord.Embed( embed = create_embed(
title="Getting AI Recommendations", title="Getting AI Recommendations",
description=( description=(
"Attempting to generate recommendations based on the current" "Attempting to generate recommendations based on the current"
" songs in your queue. Just a moment..." " songs in your queue. Just a moment..."
), ),
color=BOT_COLOR,
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)
@ -91,35 +79,25 @@ class Autoplay(commands.Cog):
self.bot.openai, self.bot.user, player, 5, inputs self.bot.openai, self.bot.user, player, 5, inputs
): ):
self.bot.autoplay.append(interaction.guild.id) self.bot.autoplay.append(interaction.guild.id)
embed = discord.Embed( embed = create_embed(
title=":infinity: Autoplay Enabled :infinity:", title=":infinity: Autoplay Enabled :infinity:",
description=( description=(
"I have added a few similar songs to the queue and will" "Recommendations have been generated and added to the"
" continue to do so once the queue gets low again. Now" " queue. Autoplay will automatically search for more"
" just sit back and enjoy the music!\n\nEnabled by:" " songs whenever the queue gets low.\n\nEnabled by:"
f" {interaction.user.mention}" f" {interaction.user.mention}"
), ),
color=BOT_COLOR,
) )
await interaction.edit_original_response(embed=embed) await interaction.edit_original_response(embed=embed)
else: else:
embed = discord.Embed( embed = create_embed(
title="Autoplay Error", title="Autoplay Error",
description=( description=(
"Autoplay is an experimental feature, meaning sometimes it" "Unable to get AI recommendations at this time. Please try"
" doesn't work as expected. I had an error when attempting" " again. If issues continue, please fill out a bug report"
" to get similar songs for you, please try running the" " with </bug:1224840889906499626>."
" command again. If the issue persists, fill out a bug"
" report with the </bug:1224840889906499626> command."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.edit_original_response(embed=embed) await interaction.edit_original_response(embed=embed)

View File

@ -15,7 +15,9 @@ class BugReport(discord.ui.Modal, title="Report a bug"):
placeholder="EX: itsmefreddy01...", placeholder="EX: itsmefreddy01...",
) )
command = discord.ui.TextInput( command = discord.ui.TextInput(
label="Command with error", placeholder="EX: skip...", required=True label="Command with error",
placeholder="EX: autoplay, skip...",
required=True,
) )
report = discord.ui.TextInput( report = discord.ui.TextInput(
label="A detailed report of the bug", label="A detailed report of the bug",
@ -27,8 +29,8 @@ class BugReport(discord.ui.Modal, title="Report a bug"):
async def on_submit(self, interaction: discord.Interaction): async def on_submit(self, interaction: discord.Interaction):
await interaction.response.send_message( await interaction.response.send_message(
f"Thanks for your bug report. We will get back to you as soon as" f"Thanks for your bug report. We will work on resolving the"
f" possible", f" issue as soon as possible.",
ephemeral=True, ephemeral=True,
) )
channel = self.bot.get_channel(BUG_CHANNEL_ID) channel = self.bot.get_channel(BUG_CHANNEL_ID)

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Clear(commands.Cog): class Clear(commands.Cog):
@ -18,19 +18,13 @@ class Clear(commands.Cog):
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
player.queue.clear() player.queue.clear()
embed = discord.Embed(
embed = create_embed(
title="Queue Cleared", title="Queue Cleared",
description=( description=(
"The queue has been cleared of all songs!\n\nIssued by:" "The queue has been cleared of all songs!\n\nIssued by:"
f" {interaction.user.mention}" f" {interaction.user.mention}"
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -169,18 +169,18 @@ class Help(commands.Cog):
embed = discord.Embed( embed = discord.Embed(
title=f":musical_note: Help :musical_note:", title=f":musical_note: Help :musical_note:",
description=( description=(
"**Check out recent news and updates about the bot with" "**Check out recent updates with the"
" the </news:1260842465666007040> command!\n\u200b**" " </news:1260842465666007040> command!\n\u200b**"
), ),
color=BOT_COLOR, color=BOT_COLOR,
) )
embed.add_field( embed.add_field(
name="**Use Me**", name="**Get Started**",
value=( value=(
"> To get started, use the </play:1224840890368000172>" "> Start playing music with the"
" command and enter the name or link to the song of your" " </play:1224840890368000172> command. Enter the name or"
" choice." " link of the song you want to play."
), ),
inline=False, inline=False,
) )
@ -195,9 +195,9 @@ class Help(commands.Cog):
embed.add_field( embed.add_field(
name="**Help for Specific Commands**", name="**Help for Specific Commands**",
value=( value=(
"> If you want more information on how to use a specific" "> To get information on a specific command, use"
" command, use the </help:1224854217597124610> command and" " </help:1224854217597124610> and include the command"
" include the specific command." " name."
), ),
inline=False, inline=False,
) )

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Lyrics(commands.Cog): class Lyrics(commands.Cog):
@ -19,19 +19,12 @@ class Lyrics(commands.Cog):
# If the Genius API client is not setup, send an error message # If the Genius API client is not setup, send an error message
if not self.bot.genius: if not self.bot.genius:
embed = discord.Embed( embed = create_embed(
title="Lyrics Feature Error", title="Lyrics Feature Error",
description=( description=(
"The lyrics feature is currently disabled due to errors" "The lyrics feature is currently disabled due to errors"
" with the Genius API." " with the Genius API."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
@ -48,20 +41,13 @@ class Lyrics(commands.Cog):
# If no lyrics are found, send an error message # If no lyrics are found, send an error message
if song is None: if song is None:
embed = discord.Embed( embed = create_embed(
title="Lyrics Not Found", title="Lyrics Not Found",
description=( description=(
"Unfortunately, I wasn't able to find any lyrics for the" "Unfortunately, I wasn't able to find any lyrics for the"
" song that is currently playing." " song that is currently playing."
), ),
color=BOT_COLOR, thumbnail=player.current.artwork_url,
)
embed.set_thumbnail(url=player.current.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.edit_original_response(embed=embed) return await interaction.edit_original_response(embed=embed)
@ -73,40 +59,27 @@ class Lyrics(commands.Cog):
# If the lyrics are too long, send just a link to the lyrics # If the lyrics are too long, send just a link to the lyrics
if len(lyrics) > 2048: if len(lyrics) > 2048:
embed = discord.Embed( embed = create_embed(
title=( title=(
f"Lyrics for {player.current.title} by" f"Lyrics for {player.current.title} by"
f" {player.current.author}" f" {player.current.author}"
), ),
description=( description=(
"Song lyrics are too long to display on Discord. [Click" "The lyrics for this song are too long to display on"
f" here to view the lyrics on Genius]({song.url})." " Discord. [Click here to view the lyrics on"
f" Genius]({song.url})."
), ),
color=BOT_COLOR, thumbnail=player.current.artwork_url,
)
embed.set_thumbnail(url=player.current.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.edit_original_response(embed=embed) return await interaction.edit_original_response(embed=embed)
# If everything is successful, send the lyrics # If everything is successful, send the lyrics
embed = discord.Embed( embed = create_embed(
title=( title=(
f"Lyrics for {player.current.title} by {player.current.author}" f"Lyrics for {player.current.title} by {player.current.author}"
), ),
description=f"Provided from [Genius]({song.url})\n\n" + lyrics, description=f"Provided from [Genius]({song.url})\n\n" + lyrics,
color=BOT_COLOR, thumbnail=player.current.artwork_url,
)
embed.set_thumbnail(url=player.current.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.edit_original_response(embed=embed) await interaction.edit_original_response(embed=embed)

View File

@ -2,13 +2,14 @@ import discord
from discord.ext import commands from discord.ext import commands
import lavalink import lavalink
from lavalink import errors from lavalink import errors
from discord.ext import tasks import os
from utils.config import ( from utils.config import (
LAVALINK_HOST, LAVALINK_HOST,
LAVALINK_PASSWORD, LAVALINK_PASSWORD,
LAVALINK_PORT, LAVALINK_PORT,
LOG, LOG,
LOG_SONGS,
) )
from utils.command_tree import CheckPlayerError from utils.command_tree import CheckPlayerError
from utils.ai_recommendations import add_song_recommendations from utils.ai_recommendations import add_song_recommendations
@ -95,6 +96,7 @@ class LavalinkVoiceClient(discord.VoiceProtocol):
class Music(commands.Cog): class Music(commands.Cog):
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
self.log_file = "track_events.log"
async def cog_load(self): async def cog_load(self):
if not hasattr( if not hasattr(
@ -119,11 +121,16 @@ class Music(commands.Cog):
return return
else: else:
await node.connect() await node.connect()
LOG.info(f"Connected to lavalink node {node.name}")
self.lavalink: lavalink.Client = self.bot.lavalink self.lavalink: lavalink.Client = self.bot.lavalink
self.lavalink.add_event_hooks(self) self.lavalink.add_event_hooks(self)
if os.path.exists("/.dockerenv"):
self.log_file = "/config/track_events.log"
if LOG_SONGS:
LOG.info(f"Logging track events to {self.log_file}")
def cog_unload(self): def cog_unload(self):
"""Cog unload handler. This removes any event hooks that were registered.""" """Cog unload handler. This removes any event hooks that were registered."""
self.lavalink._event_hooks.clear() self.lavalink._event_hooks.clear()
@ -241,6 +248,54 @@ class Music(commands.Cog):
self.bot.openai, self.bot.user, event.player, 5, inputs self.bot.openai, self.bot.user, event.player, 5, inputs
) )
@lavalink.listener(lavalink.events.NodeConnectedEvent)
async def node_connected(self, event: lavalink.events.NodeConnectedEvent):
LOG.info(f"Lavalink node {event.node.name} has connected")
@lavalink.listener(lavalink.events.NodeReadyEvent)
async def node_ready(self, event: lavalink.events.NodeReadyEvent):
LOG.info(f"Lavalink node {event.node.name} is ready")
@lavalink.listener(lavalink.events.NodeDisconnectedEvent)
async def node_disconnected(
self, event: lavalink.events.NodeDisconnectedEvent
):
LOG.error(f"Lavalink node {event.node.name} has disconnected")
# If we get a track load failed event (like LoadError, but for some reason that
# wasn't the eception raised), skip the track
@lavalink.listener(lavalink.events.TrackLoadFailedEvent)
async def track_load_failed(self, event: lavalink.events.TrackEndEvent):
await event.player.skip()
# Only log track events if enabled
if LOG_SONGS:
@lavalink.listener(lavalink.events.TrackStartEvent)
async def track_start(self, event: lavalink.events.TrackStartEvent):
with open(self.log_file, "a") as f:
f.write(
f"STARTED: {event.track.title} by {event.track.author}\n"
)
@lavalink.listener(lavalink.events.TrackStuckEvent)
async def track_stuck(self, event: lavalink.events.TrackStuckEvent):
with open(self.log_file, "a") as f:
f.write(
f"STUCK: {event.track.title} by {event.track.author} -"
f" {event.track.uri}\n"
)
@lavalink.listener(lavalink.events.TrackExceptionEvent)
async def track_exception(
self, event: lavalink.events.TrackExceptionEvent
):
with open(self.log_file, "a") as f:
f.write(
f"EXCEPTION{event.track.title} by {event.track.author} -"
f" {event.track.uri}\n"
)
async def setup(bot): async def setup(bot):
await bot.add_cog(Music(bot)) await bot.add_cog(Music(bot))

View File

@ -13,7 +13,7 @@ class News(commands.Cog):
async def news(self, interaction: discord.Interaction): async def news(self, interaction: discord.Interaction):
"Get recent news and updates about the bot" "Get recent news and updates about the bot"
embed = discord.Embed( embed = discord.Embed(
title="Recent News :newspaper2:", title="Recent News and Updates",
description=( description=(
"View recent code commits" "View recent code commits"
" [here](https://github.com/packetparker/guava/commits)\n\u200b" " [here](https://github.com/packetparker/guava/commits)\n\u200b"
@ -22,30 +22,22 @@ class News(commands.Cog):
) )
embed.add_field( embed.add_field(
name="**Lyrics!**", name="**Limited YouTube Support**",
value=( value=(
"> You can now get lyrics for the song that is currently" "Support for YouTube links and searches has been added. This"
" playing. Just use the `/lyrics` command! Some songs may not" " is currently in a testing phase and is not guaranteed to"
" have lyrics available, but the bot will do its best to find" " work. If you encounter any issues, please submit a but"
" them." " report."
), ),
inline=False,
) )
embed.add_field( embed.add_field(
name="**Apple Music Support!**", name="**General Improvements**",
value=( value=(
"> After some trial and error, you can now play music through" "Quality of life updates and general improvements have been"
" Apple Music links. Just paste the link and the bot will do" " made. Hopefully there are no new bugs, but please report any"
" the rest!" " with </bug:1224840889906499626>."
),
)
embed.add_field(
name="**Autoplay Update**",
value=(
"> Autoplay is now much more stable after a revamp of the"
" previous system. If you experienced short outages recently,"
" this was due to the update. Thank you for your patience!"
), ),
inline=False, inline=False,
) )

View File

@ -5,7 +5,7 @@ from discord.ext import commands
from cogs.music import Music from cogs.music import Music
import lavalink import lavalink
from utils.config import BOT_COLOR from utils.config import create_embed
class NowPlaying(commands.Cog): class NowPlaying(commands.Cog):
@ -15,7 +15,7 @@ class NowPlaying(commands.Cog):
@app_commands.command() @app_commands.command()
@app_commands.check(Music.create_player) @app_commands.check(Music.create_player)
async def np(self, interaction: discord.Interaction): async def np(self, interaction: discord.Interaction):
"Show what song is currently playing" "See what song is currently playing"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
time_in = str(datetime.timedelta(milliseconds=player.position))[:-7] time_in = str(datetime.timedelta(milliseconds=player.position))[:-7]
@ -25,21 +25,14 @@ class NowPlaying(commands.Cog):
time_in = time_in[2:] time_in = time_in[2:]
total_duration = total_duration[3:] total_duration = total_duration[3:]
embed = discord.Embed( embed = create_embed(
title="Now Playing 🎶", title="Now Playing 🎶",
description=( description=(
f"**[{player.current.title}]({player.current.uri})** by" f"**[{player.current.title}]({player.current.uri})** by"
f" {player.current.author}\n{f'` {time_in}/{total_duration} `'}\n\nQueued" f" {player.current.author}\n{f'` {time_in}/{total_duration} `'}\n\nQueued"
f" by: {player.current.requester.mention}" f" by: {player.current.requester.mention}"
), ),
color=BOT_COLOR, thumbnail=player.current.artwork_url,
)
embed.set_thumbnail(url=player.current.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -0,0 +1,46 @@
from discord.ext import commands
import requests
from utils.config import LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD, LOG
class POToken(commands.Cog):
def __init__(self, bot):
self.bot = bot
@commands.command()
@commands.dm_only()
@commands.is_owner()
async def potoken(self, ctx, token: str = None, visitor_data: str = None):
"""Update the poToken for lavalink youtube support."""
if not token or not visitor_data:
return await ctx.send(
"Missing token and/or visitor data. Format as"
f" `{self.bot.command_prefix}potoken <token> <visitor"
" data>`\n\nTo generate a poToken, see"
" [this](https://github.com/iv-org/youtube-trusted-session-generator)"
)
url = f"http://{LAVALINK_HOST}:{LAVALINK_PORT}/youtube"
request = requests.post(
url,
json={"poToken": token, "visitorData": visitor_data},
headers={"Authorization": LAVALINK_PASSWORD},
)
if request.status_code != 204:
LOG.error("Error updating poToken")
return await ctx.send(
"Error setting poToken, YouTube source plugin is likely not"
" enabled. Read the Guava docs and look"
" [here](https://github.com/lavalink-devs/youtube-source)."
)
LOG.info("poToken successfully updated")
await ctx.send(
"Successfully posted the token and visitor data to lavalink."
)
async def setup(bot):
await bot.add_cog(POToken(bot))

View File

@ -1,10 +1,11 @@
import discord import discord
import datetime import datetime
from typing import Literal
from discord import app_commands from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Pause(commands.Cog): class Pause(commands.Cog):
@ -12,28 +13,39 @@ class Pause(commands.Cog):
self.bot = bot self.bot = bot
@app_commands.command() @app_commands.command()
@app_commands.describe(pause="TRUE to pause, FALSE to unpause")
@app_commands.check(Music.create_player) @app_commands.check(Music.create_player)
async def pause(self, interaction: discord.Interaction): async def pause(
"Pauses the song that is currently playing" self, interaction: discord.Interaction, pause: Literal["TRUE", "FALSE"]
):
"Pause or unpause the current song"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
await player.set_pause(pause=True) if pause:
embed = discord.Embed( await player.set_pause(pause=True)
title=f"Music Now Paused ⏸️", embed = create_embed(
description=( title=f"Music Paused ⏸️",
f"**[{player.current.title}]({player.current.uri})**\n\nQueued" description=(
f" by: {player.current.requester.mention}" f"**[{player.current.title}]({player.current.uri})** by"
), f" {player.current.author}\n\nQueued by:"
color=BOT_COLOR, f" {player.current.requester.mention}"
) ),
embed.set_thumbnail(url=player.current.artwork_url) thumbnail=player.current.artwork_url,
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
) )
+ " UTC" return await interaction.response.send_message(embed=embed)
)
await interaction.response.send_message(embed=embed) else:
await player.set_pause(pause=False)
embed = create_embed(
title=f"Music Unpaused ▶️",
description=(
f"**[{player.current.title}]({player.current.uri})** by"
f" {player.current.author}\n\nQueued by:"
f" {player.current.requester.mention}"
),
thumbnail=player.current.artwork_url,
)
return await interaction.response.send_message(embed=embed)
async def setup(bot): async def setup(bot):

View File

@ -4,11 +4,14 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from lavalink import LoadType from lavalink import LoadType
import re import re
import requests
from cogs.music import Music, LavalinkVoiceClient from cogs.music import Music, LavalinkVoiceClient
from utils.config import BOT_COLOR, YOUTUBE_SUPPORT from utils.config import YOUTUBE_SUPPORT, create_embed
from utils.custom_sources import SpotifySource, AppleSource from utils.custom_sources import (
LoadError,
CustomAudioTrack,
)
from utils.source_helpers.parse import parse_custom_source
url_rx = re.compile(r"https?://(?:www\.)?.+") url_rx = re.compile(r"https?://(?:www\.)?.+")
@ -28,7 +31,7 @@ class Play(commands.Cog):
# Notify users that YouTube links are not allowed if YouTube support is disabled # Notify users that YouTube links are not allowed if YouTube support is disabled
if "youtube.com" in query or "youtu.be" in query: if "youtube.com" in query or "youtu.be" in query:
if not YOUTUBE_SUPPORT: if not YOUTUBE_SUPPORT:
embed = discord.Embed( embed = create_embed(
title="YouTube Not Supported", title="YouTube Not Supported",
description=( description=(
"Unfortunately, YouTube does not allow bots to stream" "Unfortunately, YouTube does not allow bots to stream"
@ -37,372 +40,134 @@ class Play(commands.Cog):
" song and I will automatically find it on a supported" " song and I will automatically find it on a supported"
" platform." " platform."
), ),
color=BOT_COLOR,
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
) )
### # Check for custom sources (Apple Music/Spotify)
### APPLE MUSIC links, perform API requests and load all tracks from the playlist/album/track
###
if "music.apple.com" in query: if "music.apple.com" in query:
if not self.bot.apple_headers: results, embed = await parse_custom_source(
embed = discord.Embed( self, "apple", query, interaction.user
title="Apple Music Error", )
description=(
"Apple Music support seems to be broken at the moment."
" Please try again and fill out a bug report with"
" </bug:1224840889906499626> if this continues to"
" happen."
),
color=BOT_COLOR,
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
embed = discord.Embed(color=BOT_COLOR)
if "/playlist/" in query and "?i=" not in query:
playlist_id = query.split("/playlist/")[1].split("/")[1]
# Get all of the tracks in the playlist (limit at 250)
playlist_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100"
response = requests.get(
playlist_url, headers=self.bot.apple_headers
)
if response.status_code == 200:
playlist = response.json()
# Get the general playlist info (name, artwork)
playlist_info_url = f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}"
playlist_info = requests.get(
playlist_info_url, headers=self.bot.apple_headers
)
playlist_info = playlist_info.json()
try:
artwork_url = playlist_info["data"][0]["attributes"][
"artwork"
]["url"].replace("{w}x{h}", "300x300")
except KeyError:
artwork_url = None
embed.title = "Playlist Queued"
embed.description = (
f"**{playlist_info['data'][0]['attributes']['name']}**\n`"
f" {len(playlist['data'])} ` tracks\n\nQueued by:"
f" {interaction.user.mention}"
)
embed.set_thumbnail(url=artwork_url)
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
# Add small alert if the playlist is the max size
if len(playlist["data"]) == 100:
embed.description += (
"\n\n*This playlist is longer than the 100 song"
" maximum. Only the first 100 songs will be"
" queued.*"
)
await interaction.response.send_message(embed=embed)
tracks = await AppleSource.load_playlist(
self, interaction.user, playlist
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
# If there is an album, not a specific song within the album
if "/album/" in query and "?i=" not in query:
album_id = query.split("/album/")[1].split("/")[1]
album_url = f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}"
response = requests.get(
album_url, headers=self.bot.apple_headers
)
if response.status_code == 200:
album = response.json()
embed.title = "Album Queued"
embed.description = (
f"**{album['data'][0]['attributes']['name']}** by"
f" **{album['data'][0]['attributes']['artistName']}**\n`"
f" {len(album['data'][0]['relationships']['tracks']['data'])} `"
f" tracks\n\nQueued by: {interaction.user.mention}"
)
embed.set_thumbnail(
url=album["data"][0]["attributes"]["artwork"][
"url"
].replace("{w}x{h}", "300x300")
)
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
await interaction.response.send_message(embed=embed)
tracks = await AppleSource.load_album(
self, interaction.user, album
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
# If there is a specific song
if "/album/" in query and "?i=" in query:
song_id = query.split("/album/")[1].split("?i=")[1]
song_url = f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}"
response = requests.get(
song_url, headers=self.bot.apple_headers
)
if response.status_code == 200:
song = response.json()
embed.title = "Song Queued"
embed.description = (
f"**{song['data'][0]['attributes']['name']}** by"
f" **{song['data'][0]['attributes']['artistName']}**\n\nQueued"
f" by: {interaction.user.mention}"
)
embed.set_thumbnail(
url=song["data"][0]["attributes"]["artwork"][
"url"
].replace("{w}x{h}", "300x300")
)
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
await interaction.response.send_message(embed=embed)
results = await AppleSource.load_item(
self, interaction.user, song
)
player.add(
requester=interaction.user, track=results.tracks[0]
)
###
### SPOTIFY links, perform API requests and load all tracks from the playlist/album/track
###
elif "open.spotify.com" in query: elif "open.spotify.com" in query:
if not self.bot.spotify_headers: results, embed = await parse_custom_source(
embed = discord.Embed( self, "spotify", query, interaction.user
title="Spotify Error", )
description=(
"Spotify support seems to be broken at the moment."
" Please try again and fill out a bug report with"
" </bug:1224840889906499626> if this continues to"
" happen."
),
color=BOT_COLOR,
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
embed = discord.Embed(color=BOT_COLOR)
if "open.spotify.com/playlist" in query:
playlist_id = query.split("playlist/")[1].split("?si=")[0]
playlist_url = (
f"https://api.spotify.com/v1/playlists/{playlist_id}"
)
response = requests.get(
playlist_url, headers=self.bot.spotify_headers
)
if response.status_code == 200:
playlist = response.json()
embed.title = "Playlist Queued"
embed.description = (
f"**{playlist['name']}** from"
f" **{playlist['owner']['display_name']}**\n`"
f" {len(playlist['tracks']['items'])} `"
f" tracks\n\nQueued by: {interaction.user.mention}"
)
embed.set_thumbnail(url=playlist["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
await interaction.response.send_message(embed=embed)
tracks = await SpotifySource.load_playlist(
self, interaction.user, playlist
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
if "open.spotify.com/album" in query:
album_id = query.split("album/")[1]
album_url = f"https://api.spotify.com/v1/albums/{album_id}"
response = requests.get(
album_url, headers=self.bot.spotify_headers
)
if response.status_code == 200:
album = response.json()
embed.title = "Album Queued"
embed.description = (
f"**{album['name']}** by"
f" **{album['artists'][0]['name']}**\n`"
f" {len(album['tracks']['items'])} ` tracks\n\nQueued"
f" by: {interaction.user.mention}"
)
embed.set_thumbnail(url=album["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
await interaction.response.send_message(embed=embed)
tracks = await SpotifySource.load_album(
self, interaction.user, album
)
for track in tracks["tracks"]:
player.add(requester=interaction.user, track=track)
if "open.spotify.com/track" in query:
track_id = query.split("track/")[1]
track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
response = requests.get(
track_url, headers=self.bot.spotify_headers
)
if response.status_code == 200:
track = response.json()
embed.title = "Track Queued"
embed.description = (
f"**{track['name']}** by"
f" **{track['artists'][0]['name']}**\n\nQueued by:"
f" {interaction.user.mention}"
)
embed.set_thumbnail(url=track["album"]["images"][0]["url"])
embed.set_footer(
text=datetime.datetime.now(
datetime.timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
+ " UTC"
)
await interaction.response.send_message(embed=embed)
results = await SpotifySource.load_item(
self, interaction.user, track
)
player.add(
requester=interaction.user, track=results.tracks[0]
)
if "open.spotify.com/artists" in query:
embed.title = "Artists Cannot Be Played"
embed.description = (
"I cannot play just artists, you must provide a"
" song/album/playlist. Please try again."
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
###
### For anything else, use default Lavalink providers to search the query
###
# For anything else, use default Lavalink providers to search the query
else: else:
# If the query is not a URL, begin searching
if not url_rx.match(query): if not url_rx.match(query):
dzsearch = f"dzsearch:{query}" dzsearch = f"dzsearch:{query}"
results = await player.node.get_tracks(dzsearch) results = await player.node.get_tracks(dzsearch)
# If Deezer returned nothing
if not results.tracks or results.load_type in ( if not results.tracks or results.load_type in (
LoadType.EMPTY, LoadType.EMPTY,
LoadType.ERROR, LoadType.ERROR,
): ):
ytmsearch = f"ytmsearch:{query}" if YOUTUBE_SUPPORT:
results = await player.node.get_tracks(ytmsearch) ytmsearch = f"ytmsearch:{query}"
results = await player.node.get_tracks(ytmsearch)
if not results.tracks or results.load_type in ( # If YouTube Music returned nothing
LoadType.EMPTY, if not results.tracks or results.load_type in (
LoadType.ERROR, LoadType.EMPTY,
): LoadType.ERROR,
ytsearch = f"ytsearch:{query}" ):
results = await player.node.get_tracks(ytsearch) # Final search attempt with YouTube
ytsearch = f"ytsearch:{query} audio"
results = await player.node.get_tracks(ytsearch)
else: else:
results = await player.node.get_tracks(query) results = await player.node.get_tracks(query)
embed = discord.Embed(color=BOT_COLOR) # If there are no results found, set results/embed to None, handled further down
if not results.tracks or results.load_type in ( if not results.tracks or results.load_type in (
LoadType.EMPTY, LoadType.EMPTY,
LoadType.ERROR, LoadType.ERROR,
): ):
embed.title = "Nothing Found" results, embed = None, None
embed.description = (
"Nothing for that query could be found. If this continues"
" happening for other songs, please run"
" </bug:1224840889906499626> to let the developer know."
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
elif results.load_type == LoadType.PLAYLIST: # Create the embed if the results are a playlist
tracks = results.tracks if results.load_type == LoadType.PLAYLIST:
embed = create_embed(
for track in tracks: title="Songs Queued",
player.add(requester=interaction.user, track=track) description=(
f"**{results.playlist_info.name}**\n"
embed.title = "Songs Queued!" f"` {len(results.tracks)} ` tracks\n\n"
embed.description = ( f"Queued by: {interaction.user.mention}"
f"**{results.playlist_info.name}**\n` {len(tracks)} `" ),
f" tracks\n\nQueued by: {interaction.user.mention}"
) )
embed.set_footer( # Otherwise, the result is just a single track, create that embed
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
else: else:
track = results.tracks[0] # Remove all but first track (most relevant result)
results.tracks = results.tracks[:1]
embed = create_embed(
title="Song Queued",
description=(
f"**{results.tracks[0].title}** by"
f" **{results.tracks[0].author}**\n\nQueued by:"
f" {interaction.user.mention}"
),
thumbnail=results.tracks[0].artwork_url,
)
# If there are no results, and no embed
if not results and not embed:
embed = create_embed(
title="Nothing Found",
description=(
"No songs were found for that query. Please try again and"
" fill out a bug report with </bug:1224840889906499626> if"
" this continues to happen."
),
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
# If there are no results, but there is an embed (error msg)
elif embed and not results:
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
# If there are results, add them to the player
else:
for track in results.tracks:
player.add(requester=interaction.user, track=track) player.add(requester=interaction.user, track=track)
embed.title = "Track Queued" # If the track is CustomAudioTrack (Apple Music/Spotify)
embed.description = ( if type(results.tracks[0]) == CustomAudioTrack:
f"**{track.title}** by **{track.author}**\n\nQueued by:" # Attempt to load an actual track from a provider
f" {interaction.user.mention}" try:
) await results.tracks[0].load(player.node)
embed.set_thumbnail(url=track.artwork_url) # If it fails, remove it from the queue and alert the user
embed.set_footer( except LoadError as e:
text=datetime.datetime.now(datetime.timezone.utc).strftime( player.queue.remove(results.tracks[0])
"%Y-%m-%d %H:%M:%S" embed = create_embed(
title="Load Error",
description=(
"Apple Music and Spotify do not allow direct"
" playing from their websites, and I was unable to"
" load a track on a supported platform. Please try"
" again."
),
) )
+ " UTC" return await interaction.response.send_message(
embed=embed, ephemeral=True
)
# Join the voice channel if not already connected
if not interaction.guild.voice_client:
await interaction.user.voice.channel.connect(
cls=LavalinkVoiceClient, self_deaf=True
) )
await interaction.response.send_message(embed=embed)
# Only join the voice channel now, so that the bot doesn't join if nothing is found # Only call player.play if it is not already playing, otherwise it will
if not interaction.guild.voice_client: # effectively skip the current track
await interaction.user.voice.channel.connect( if not player.is_playing:
cls=LavalinkVoiceClient, self_deaf=True await player.play()
)
# We don't want to call .play() if the player is playing as that will await interaction.response.send_message(embed=embed)
# effectively skip the current track
if not player.is_playing:
await player.play()
async def setup(bot): async def setup(bot):

View File

@ -6,7 +6,37 @@ from cogs.music import Music
import math import math
import lavalink import lavalink
from utils.config import BOT_COLOR from utils.config import create_embed
"""
Create an embed for the queue given the current queue and desired page number/pages
"""
def create_queue_embed(queue: list, page: int, pages: int):
items_per_page = 10
start = (page - 1) * items_per_page
end = start + items_per_page
queue_list = ""
for index, track in enumerate(queue[start:end], start=start):
# Change ms duration to hour, min, sec in the format of 00:00:00
track_duration = lavalink.utils.format_time(track.duration)
# If the track is less than an hour, remove the hour from the duration
if track_duration.split(":")[0] == "00":
track_duration = track_duration[3:]
queue_list += (
f"`{index+1}. ` [{track.title}]({track.uri}) -"
f" {track.author} `({track_duration})`\n"
)
embed = create_embed(
title=f"Current Song Queue",
description=f"**{len(queue)} total tracks**\n\n{queue_list}",
footer=f"Page {page}/{pages}",
)
return embed
class Queue(commands.Cog): class Queue(commands.Cog):
@ -22,54 +52,68 @@ class Queue(commands.Cog):
"See the current queue of songs" "See the current queue of songs"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
pages = math.ceil(len(player.queue) / 10)
# Force page to 1 if an invalid page is provided
if page < 1 or page > pages:
page = 1
if not player.queue: if not player.queue:
embed = discord.Embed( embed = create_embed(
title="Nothing Queued", title="Nothing Queued",
description=( description=(
"Nothing is currently in the queue, add a song with the" "Nothing is currently in the queue, add a song with the"
" </play:1224840890368000172> command." " </play:1224840890368000172> command."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
) )
items_per_page = 10 embed = create_queue_embed(player.queue, page, pages)
pages = math.ceil(len(player.queue) / items_per_page) view = QueueView(page, pages, player.queue)
await interaction.response.send_message(embed=embed, view=view)
start = (page - 1) * items_per_page
end = start + items_per_page
queue_list = ""
for index, track in enumerate(player.queue[start:end], start=start):
# Change ms duration to hour, min, sec in the format of 00:00:00
track_duration = lavalink.utils.format_time(track.duration)
# If the track is less than an hour, remove the hour from the duration
if track_duration.split(":")[0] == "00":
track_duration = track_duration[3:]
queue_list += (
f"`{index+1}. ` [{track.title}]({track.uri}) -"
f" {track.author} `({track_duration})`\n"
)
embed = discord.Embed(
title=f"Queue for {interaction.guild.name}",
description=(
f"**{len(player.queue)} tracks total**\n\n{queue_list}"
),
color=BOT_COLOR,
)
embed.set_footer(text=f"Viewing page {page}/{pages}")
await interaction.response.send_message(embed=embed)
async def setup(bot): async def setup(bot):
await bot.add_cog(Queue(bot)) await bot.add_cog(Queue(bot))
class QueueView(discord.ui.View):
def __init__(self, page: int, pages: int, queue: list):
super().__init__()
self.page = page
self.pages = pages
self.queue = queue
# Create the previous and next buttons
self.previous_button = discord.ui.Button(
label="Previous", style=discord.ButtonStyle.gray
)
# Determine if the button should be disabled, add callback, add to view
self.previous_button.disabled = self.page <= 1
self.previous_button.callback = self.previous_page
self.add_item(self.previous_button)
self.next_button = discord.ui.Button(
label="Next", style=discord.ButtonStyle.gray
)
self.next_button.disabled = self.page >= self.pages
self.next_button.callback = self.next_page
self.add_item(self.next_button)
async def previous_page(self, interaction: discord.Interaction):
# Decrement the page number, recreate the embed, determine if the
# button should be disabled, and update the message
self.page -= 1
embed = create_queue_embed(self.queue, self.page, self.pages)
self.previous_button.disabled = self.page <= 1
self.next_button.disabled = self.page >= self.pages
await interaction.response.edit_message(embed=embed, view=self)
async def next_page(self, interaction: discord.Interaction):
# Increment the page number, recreate the embed, determine if the
# button should be disabled, and update the message
self.page += 1
embed = create_queue_embed(self.queue, self.page, self.pages)
self.previous_button.disabled = self.page <= 1
self.next_button.disabled = self.page >= self.pages
await interaction.response.edit_message(embed=embed, view=self)

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Remove(commands.Cog): class Remove(commands.Cog):
@ -19,27 +19,24 @@ class Remove(commands.Cog):
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
if not player.queue: if not player.queue:
embed = discord.Embed( embed = create_embed(
title="Nothing Queued", title="Nothing Queued",
description=( description="There are no songs in the queue to remove.",
"Nothing is currently in the queue, so there is nothing"
" for me to remove."
),
color=BOT_COLOR,
) )
embed.set_footer( return await interaction.response.send_message(
text=datetime.datetime.now(datetime.timezone.utc).strftime( embed=embed, ephemeral=True
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
return await interaction.response.send_message(embed=embed)
if number > len(player.queue) or number < 1: if number > len(player.queue) or number < 1:
embed = create_embed(
title="Number Out of Range",
description=(
"The number you entered is outside of the allowed range."
" Please try again with a valid song number."
),
)
return await interaction.response.send_message( return await interaction.response.send_message(
"The number entered is not a number within the queue - please" embed=embed, ephemeral=True
" try again!",
ephemeral=True,
) )
index = number - 1 index = number - 1
@ -48,21 +45,13 @@ class Remove(commands.Cog):
removed_artwork = player.queue[index].artwork_url removed_artwork = player.queue[index].artwork_url
player.queue.pop(index) player.queue.pop(index)
embed = discord.Embed( embed = create_embed(
title="Song Removed from Queue", title="Song Removed from Queue",
description=( description=(
"**Song Removed -" f"**[{removed_title}]({removed_url})** has been unqueued.\n\n"
f" [{removed_title}]({removed_url})**\n\nIssued by:" f"Issued by: {interaction.user.mention}"
f" {interaction.user.mention}"
), ),
color=BOT_COLOR, thumbnail=removed_artwork,
)
embed.set_thumbnail(url=removed_artwork)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Repeat(commands.GroupCog, name="repeat"): class Repeat(commands.GroupCog, name="repeat"):
@ -17,34 +17,11 @@ class Repeat(commands.GroupCog, name="repeat"):
"Turn song/queue repetition off" "Turn song/queue repetition off"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
if player.loop == 0:
embed = discord.Embed(
title=f"Repeating Already Off",
description=f"Music repetition is already turned off.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
player.loop = 0 player.loop = 0
embed = discord.Embed( embed = create_embed(
title=f"Repeating Off", title="Repeating Off",
description=f"Music will no longer be repeated.", description="Music will not be repeated.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)
@ -54,37 +31,14 @@ class Repeat(commands.GroupCog, name="repeat"):
"Forever repeat that song that is currently playing" "Forever repeat that song that is currently playing"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
if player.loop == 1:
embed = discord.Embed(
title=f"Repeating Already On",
description=f"The current song is already being repeated.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
player.loop = 1 player.loop = 1
embed = discord.Embed( embed = create_embed(
title=f"Repeating Current Song 🔁", title="Repeating Current Song 🔁",
description=( description=(
f"The song that is currently playing will be repeated until" "The song that is currently playing will be repeated until"
f" the </repeat off:1224840891395608737> command is run" " the </repeat off:1224840891395608737> command is run"
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)
@ -94,37 +48,14 @@ class Repeat(commands.GroupCog, name="repeat"):
"Continuously repeat the queue once it reaches the end" "Continuously repeat the queue once it reaches the end"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
if player.loop == 2:
embed = discord.Embed(
title=f"Repeating Already On",
description=f"The queue is already being repeated.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
player.loop = 2 player.loop = 2
embed = discord.Embed( embed = create_embed(
title=f"Repeating Current Song 🔂", title="Repeating Queue 🔂",
description=( description=(
f"All songs in the queue will continue to repeat until the" "The queue will continuously repeat until the"
f" </repeat off:1224840891395608737> command is run." " </repeat off:1224840891395608737> command is run."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -1,40 +0,0 @@
import discord
import datetime
from discord import app_commands
from discord.ext import commands
from cogs.music import Music
from utils.config import BOT_COLOR
class Resume(commands.Cog):
def __init__(self, bot):
self.bot = bot
@app_commands.command()
@app_commands.check(Music.create_player)
async def resume(self, interaction: discord.Interaction):
"Resumes the paused song"
player = self.bot.lavalink.player_manager.get(interaction.guild.id)
await player.set_pause(pause=False)
embed = discord.Embed(
title=f"Music Now Resumed ⏯️",
description=(
f"**[{player.current.title}]({player.current.uri})**\n\nQueued"
f" by: {player.current.requester.mention}"
),
color=BOT_COLOR,
)
embed.set_thumbnail(url=player.current.artwork_url)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
await interaction.response.send_message(embed=embed)
async def setup(bot):
await bot.add_cog(Resume(bot))

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Shuffle(commands.GroupCog, name="shuffle"): class Shuffle(commands.GroupCog, name="shuffle"):
@ -19,16 +19,9 @@ class Shuffle(commands.GroupCog, name="shuffle"):
player.shuffle = True player.shuffle = True
embed = discord.Embed( embed = create_embed(
title=f"Shuffle Enabled 🔀", title="Shuffle Enabled 🔀",
description=f"All music will now be shuffled.", description="All music will now be shuffled.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)
@ -40,16 +33,9 @@ class Shuffle(commands.GroupCog, name="shuffle"):
player.shuffle = False player.shuffle = False
embed = discord.Embed( embed = create_embed(
title=f"Disabled 🔀", title="Shuffle Disabled 🔀",
description=f"Music will no longer be shuffled.", description="Music will no longer be shuffled.",
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -5,7 +5,7 @@ from discord.ext import commands
from cogs.music import Music from cogs.music import Music
import asyncio import asyncio
from utils.config import BOT_COLOR from utils.config import create_embed
from utils.custom_sources import LoadError from utils.custom_sources import LoadError
@ -22,22 +22,25 @@ class Skip(commands.Cog):
"Skips the song that is currently playing" "Skips the song that is currently playing"
player = self.bot.lavalink.player_manager.get(interaction.guild.id) player = self.bot.lavalink.player_manager.get(interaction.guild.id)
embed = discord.Embed(color=BOT_COLOR)
if number != 1: if number != 1:
if number < 1: if number < 1:
embed.title = "Invalid Number" embed = create_embed(
embed.description = "The number option cannot be less than 1" title="Invalid Number",
description="The number option cannot be less than 1",
)
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
) )
elif number > len(player.queue): elif number > len(player.queue):
embed.title = "Number too Large" embed = create_embed(
embed.description = ( title="Number too Large",
"The number you entered is larger than the number of songs" description=(
" in queue. If you want to stop playing music entirely," "The number you entered is larger than the number of"
" try the </stop:1224840890866991305> command." " songs in queue. If you want to stop playing music"
" entirely, try the </stop:1224840890866991305>"
" command."
),
) )
return await interaction.response.send_message( return await interaction.response.send_message(
embed=embed, ephemeral=True embed=embed, ephemeral=True
@ -46,65 +49,54 @@ class Skip(commands.Cog):
for i in range(number - 2, -1, -1): for i in range(number - 2, -1, -1):
player.queue.pop(i) player.queue.pop(i)
# If there is a next song, get it # If the queue is empty, but the current song is on repeat
try: if player.loop == 1 and not player.queue:
next_song = player.queue[0] embed = create_embed(
except IndexError: title="Song on Repeat",
# If the song is on repeat, catch the IndexError and get the current song description=(
# Otherwise, pass "There is nothing in queue, but the current song is on"
if player.loop == 1: " repeat. Use </stop:1224840890866991305> to stop"
embed = discord.Embed( " playing music."
title="Song on Repeat", ),
description=( )
"There is nothing in queue, but the current song is on" return await interaction.response.send_message(
" repeat. Use </stop:1224840890866991305> to stop" embed=embed, ephemeral=True
" playing music." )
), else:
color=BOT_COLOR,
)
return await interaction.response.send_message(
embed=embed, ephemeral=True
)
else:
pass
# Sometimes when a playlist/album of custom source tracks are loaded, one is not able to be found
# so, when a user attempts to skip to that track, we get a LoadError. In this case, just pass it.
try:
await player.skip()
except LoadError:
pass pass
await player.skip()
# Skip current track, continue skipping on LoadError
while True:
try:
await player.skip()
break
except LoadError as e:
continue
if not player.current: if not player.current:
embed = discord.Embed( embed = create_embed(
title="End of Queue", title="End of Queue",
description=( description=(
"All songs in queue have been played. Thank you for using" "I have left the voice channel as all songs in the queue"
f" me :wave:\n\nIssued by: {interaction.user.mention}" " have been played.\n\n"
f"Issued by: {interaction.user.mention}"
), ),
color=BOT_COLOR,
) )
return await interaction.response.send_message(embed=embed) return await interaction.response.send_message(embed=embed)
# It takes a sec for the new track to be grabbed and played # It takes a sec for the new track to be grabbed and played
# So just wait a sec before sending the message # So just wait a sec before sending the message
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
embed = discord.Embed( embed = create_embed(
title="Track Skipped", title=(
description=( f"{'Track Skipped' if number == 1 else f'{number} Tracks Skipped'}"
f"**Now Playing: [{next_song.title}]({next_song.uri})** by"
f" {next_song.author}\n\nQueued by:"
f" {next_song.requester.mention}"
), ),
color=BOT_COLOR, description=(
) f"**[{player.current.title}]({player.current.uri})**"
embed.set_thumbnail(url=next_song.artwork_url) f" by **{player.current.author}** is now playing\n\n"
embed.set_footer( f"Issued by: {interaction.user.mention}"
text=datetime.datetime.now(datetime.timezone.utc).strftime( ),
"%Y-%m-%d %H:%M:%S" thumbnail=player.current.artwork_url,
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -4,7 +4,7 @@ from discord import app_commands
from discord.ext import commands from discord.ext import commands
from cogs.music import Music from cogs.music import Music
from utils.config import BOT_COLOR from utils.config import create_embed
class Stop(commands.Cog): class Stop(commands.Cog):
@ -25,19 +25,12 @@ class Stop(commands.Cog):
await player.stop() await player.stop()
await interaction.guild.voice_client.disconnect(force=True) await interaction.guild.voice_client.disconnect(force=True)
embed = discord.Embed( embed = create_embed(
title="Queue Cleared and Music Stopped", title="Queue Cleared and Music Stopped",
description=( description=(
"Thank you for using me :wave:\n\nIssued by:" f"Thank you for using {self.bot.user.mention}\n\n"
f" {interaction.user.mention}" f"Issued by: {interaction.user.mention}"
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
await interaction.response.send_message(embed=embed) await interaction.response.send_message(embed=embed)

View File

@ -1,9 +1,8 @@
import discord import discord
from discord import app_commands from discord import app_commands
from discord.ext.commands.errors import * from discord.ext.commands.errors import *
import datetime
from utils.config import BOT_COLOR from utils.config import create_embed
from utils.custom_sources import LoadError from utils.custom_sources import LoadError
@ -38,16 +37,9 @@ class Tree(app_commands.CommandTree):
# Custom Error class for the `create_player` function # Custom Error class for the `create_player` function
# Issues that arise may be user not in vc, user not in correct vc, missing perms, etc. # Issues that arise may be user not in vc, user not in correct vc, missing perms, etc.
elif isinstance(error, CheckPlayerError): elif isinstance(error, CheckPlayerError):
embed = discord.Embed( embed = create_embed(
title=error.info["title"], title=error.info["title"],
description=error.info["description"], description=error.info["description"],
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
try: try:
await interaction.response.send_message( await interaction.response.send_message(
@ -62,46 +54,13 @@ class Tree(app_commands.CommandTree):
isinstance(error, app_commands.CheckFailure) isinstance(error, app_commands.CheckFailure)
and interaction.command.name in music_commands and interaction.command.name in music_commands
): ):
embed = discord.Embed( embed = create_embed(
title="Player Creation Error", title="Player Creation Error",
description=( description=(
"An error occured when trying to create a player. Please" "An error occured when trying to create a player. Please"
" submit a bug report with </bug:1224840889906499626> if" " submit a bug report with </bug:1224840889906499626> if"
" this issue persists." " this issue persists."
), ),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
)
try:
await interaction.response.send_message(
embed=embed, ephemeral=True
)
except discord.errors.InteractionResponded:
await interaction.followup.send(embed=embed, ephemeral=True)
# If a Spotify song is linked but cannot be found on a provider (e.g. YouTube)
elif isinstance(error, LoadError):
embed = discord.Embed(
title="Nothing Found",
description=(
"Spotify does not allow direct play, meaning songs have to"
" be found on a supported provider. In this case, the song"
" couldn't be found. Please try again with a different"
" song, or try searching for just the name and artist"
" manually rather than sending a link."
),
color=BOT_COLOR,
)
embed.set_footer(
text=datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ " UTC"
) )
try: try:
await interaction.response.send_message( await interaction.response.send_message(

View File

@ -8,6 +8,7 @@ import sys
import discord import discord
import logging import logging
import requests import requests
from datetime import datetime
from colorlog import ColoredFormatter from colorlog import ColoredFormatter
log_level = logging.DEBUG log_level = logging.DEBUG
@ -32,7 +33,8 @@ BOT_COLOR = None
BOT_INVITE_LINK = None BOT_INVITE_LINK = None
FEEDBACK_CHANNEL_ID = None FEEDBACK_CHANNEL_ID = None
BUG_CHANNEL_ID = None BUG_CHANNEL_ID = None
YOUTUBE_SUPPORT = None LOG_SONGS = False
YOUTUBE_SUPPORT = False
SPOTIFY_CLIENT_ID = None SPOTIFY_CLIENT_ID = None
SPOTIFY_CLIENT_SECRET = None SPOTIFY_CLIENT_SECRET = None
GENIUS_CLIENT_ID = None GENIUS_CLIENT_ID = None
@ -53,10 +55,17 @@ schema = {
"bot_invite_link": {"type": "string"}, "bot_invite_link": {"type": "string"},
"feedback_channel_id": {"type": "integer"}, "feedback_channel_id": {"type": "integer"},
"bug_channel_id": {"type": "integer"}, "bug_channel_id": {"type": "integer"},
"youtube_support": {"type": "boolean"}, "log_songs": {"type": "boolean"},
}, },
"required": ["token"], "required": ["token"],
}, },
"youtube": {
"type": "object",
"properties": {
"enabled": {"type": "boolean"},
},
"required": ["enabled"],
},
"spotify": { "spotify": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -117,7 +126,10 @@ bot_info:
bot_invite_link: "" bot_invite_link: ""
feedback_channel_id: "" feedback_channel_id: ""
bug_channel_id: "" bug_channel_id: ""
youtube_support: false log_songs: true
youtube:
enabled: false
spotify: spotify:
spotify_client_id: "" spotify_client_id: ""
@ -148,7 +160,7 @@ lavalink:
# Thouroughly validate all of the options in the config.yaml file # Thouroughly validate all of the options in the config.yaml file
def validate_config(file_contents): def validate_config(file_contents):
global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, YOUTUBE_SUPPORT, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, GENIUS_CLIENT_ID, GENIUS_CLIENT_SECRET, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD global TOKEN, BOT_COLOR, BOT_INVITE_LINK, FEEDBACK_CHANNEL_ID, BUG_CHANNEL_ID, LOG_SONGS, YOUTUBE_SUPPORT, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, GENIUS_CLIENT_ID, GENIUS_CLIENT_SECRET, OPENAI_API_KEY, LAVALINK_HOST, LAVALINK_PORT, LAVALINK_PASSWORD
config = yaml.safe_load(file_contents) config = yaml.safe_load(file_contents)
try: try:
@ -208,10 +220,12 @@ def validate_config(file_contents):
else: else:
BUG_CHANNEL_ID = config["bot_info"]["bug_channel_id"] BUG_CHANNEL_ID = config["bot_info"]["bug_channel_id"]
if "youtube_support" in config["bot_info"]: if "log_songs" in config["bot_info"]:
YOUTUBE_SUPPORT = bool(config["bot_info"]["youtube_support"]) LOG_SONGS = bool(config["bot_info"]["log_songs"])
else:
YOUTUBE_SUPPORT = False # Check for YouTube support
if "youtube" in config:
YOUTUBE_SUPPORT = bool(config["youtube"]["enabled"])
# #
# If the SPOTIFY section is present, make sure the client ID and secret are valid # If the SPOTIFY section is present, make sure the client ID and secret are valid
@ -274,3 +288,34 @@ def validate_config(file_contents):
LAVALINK_HOST = config["lavalink"]["host"] LAVALINK_HOST = config["lavalink"]["host"]
LAVALINK_PORT = config["lavalink"]["port"] LAVALINK_PORT = config["lavalink"]["port"]
LAVALINK_PASSWORD = config["lavalink"]["password"] LAVALINK_PASSWORD = config["lavalink"]["password"]
"""
Template for embeds
"""
def create_embed(
title: str = None,
description: str = None,
color=None,
footer=None,
thumbnail=None,
):
embed = discord.Embed(
title=title,
description=description,
color=color if color else BOT_COLOR,
)
if footer:
embed.set_footer(text=footer)
# else:
# embed.set_footer(
# text=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + " UTC"
# )
if thumbnail:
embed.set_thumbnail(url=thumbnail)
return embed

View File

@ -6,6 +6,8 @@ from lavalink import (
PlaylistInfo, PlaylistInfo,
) )
from utils.config import YOUTUBE_SUPPORT
class LoadError( class LoadError(
Exception Exception
@ -32,21 +34,22 @@ class CustomAudioTrack(DeferredAudioTrack):
LoadType.EMPTY, LoadType.EMPTY,
LoadType.ERROR, LoadType.ERROR,
): ):
ytmsearch = f"ytmsearch:{self.title} {self.author}" if YOUTUBE_SUPPORT:
results = await client.get_tracks(ytmsearch) ytmsearch = f"ytmsearch:{self.title} {self.author}"
results = await client.get_tracks(ytmsearch)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
ytsearch = f"ytsearch:{self.title} {self.author} audio"
results = await client.get_tracks(ytsearch)
if not results.tracks or results.load_type in ( if not results.tracks or results.load_type in (
LoadType.EMPTY, LoadType.EMPTY,
LoadType.ERROR, LoadType.ERROR,
): ):
raise LoadError ytsearch = f"ytsearch:{self.title} {self.author} audio"
results = await client.get_tracks(ytsearch)
if not results.tracks or results.load_type in (
LoadType.EMPTY,
LoadType.ERROR,
):
raise LoadError
first_track = results.tracks[ first_track = results.tracks[
0 0
@ -70,6 +73,10 @@ class SpotifySource(Source):
) # Initialising our custom source with the name 'custom'. ) # Initialising our custom source with the name 'custom'.
async def load_item(self, user, metadata): async def load_item(self, user, metadata):
try:
artwork_url = metadata["album"]["images"][0]["url"]
except IndexError:
artwork_url = None
track = CustomAudioTrack( track = CustomAudioTrack(
{ # Create an instance of our CustomAudioTrack. { # Create an instance of our CustomAudioTrack.
"identifier": metadata[ "identifier": metadata[
@ -82,7 +89,7 @@ class SpotifySource(Source):
"title": metadata["name"], "title": metadata["name"],
"uri": metadata["external_urls"]["spotify"], "uri": metadata["external_urls"]["spotify"],
"duration": metadata["duration_ms"], "duration": metadata["duration_ms"],
"artworkUrl": metadata["album"]["images"][0]["url"], "artworkUrl": artwork_url,
}, },
requester=user, requester=user,
) )
@ -91,6 +98,11 @@ class SpotifySource(Source):
) )
async def load_album(self, user, metadata): async def load_album(self, user, metadata):
try:
artwork_url = metadata["images"][0]["url"]
except IndexError:
artwork_url = None
tracks = [] tracks = []
for track in metadata["tracks"][ for track in metadata["tracks"][
"items" "items"
@ -108,7 +120,7 @@ class SpotifySource(Source):
"title": track["name"], "title": track["name"],
"uri": track["external_urls"]["spotify"], "uri": track["external_urls"]["spotify"],
"duration": track["duration_ms"], "duration": track["duration_ms"],
"artworkUrl": metadata["images"][0]["url"], "artworkUrl": artwork_url,
}, },
requester=user, requester=user,
) )
@ -123,6 +135,10 @@ class SpotifySource(Source):
for track in metadata["tracks"][ for track in metadata["tracks"][
"items" "items"
]: # Loop through each track in the playlist. ]: # Loop through each track in the playlist.
try:
artwork_url = track["track"]["album"]["images"][0]["url"]
except IndexError:
artwork_url = None
tracks.append( tracks.append(
CustomAudioTrack( CustomAudioTrack(
{ # Create an instance of our CustomAudioTrack. { # Create an instance of our CustomAudioTrack.
@ -136,9 +152,7 @@ class SpotifySource(Source):
"title": track["track"]["name"], "title": track["track"]["name"],
"uri": track["track"]["external_urls"]["spotify"], "uri": track["track"]["external_urls"]["spotify"],
"duration": track["track"]["duration_ms"], "duration": track["track"]["duration_ms"],
"artworkUrl": track["track"]["album"]["images"][0][ "artworkUrl": artwork_url,
"url"
],
}, },
requster=user, requster=user,
) )
@ -148,6 +162,34 @@ class SpotifySource(Source):
LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none() LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()
) )
async def load_artist(self, user, metadata):
tracks = []
for track in metadata["tracks"]:
try:
artwork_url = track["album"]["images"][0]["url"]
except IndexError:
artwork_url = None
tracks.append(
CustomAudioTrack(
{
"identifier": track["id"],
"isSeekable": True,
"author": track["artists"][0]["name"],
"length": track["duration_ms"],
"isStream": False,
"title": track["name"],
"uri": track["external_urls"]["spotify"],
"duration": track["duration_ms"],
"artworkUrl": artwork_url,
},
requester=user,
)
)
return LoadResult(
LoadType.PLAYLIST, tracks, playlist_info=PlaylistInfo.none()
)
""" """
Custom Source for Apple Music links Custom Source for Apple Music links

View File

@ -0,0 +1,74 @@
import datetime
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the album info from the Apple Music API
"""
album_id = query.split("/album/")[1].split("/")[1]
try:
# Get the album info
response = requests.get(
f"https://api.music.apple.com/v1/catalog/us/albums/{album_id}",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Album Not Found",
description=(
"The album could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Apple Music API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the album info
album = response.json()
name = album["data"][0]["attributes"]["name"]
artist = album["data"][0]["attributes"]["artistName"]
num_tracks = len(album["data"][0]["relationships"]["tracks"]["data"])
except IndexError:
LOG.error("Failed unpacking Apple Music album info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Apple Music API")
return None, None
# Extract artwork URL, if available
artwork_url = (
album["data"][0]["attributes"].get("artwork", {}).get("url", None)
)
if artwork_url:
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
embed = create_embed(
title="Album Queued",
description=(
f"**{name}** by **{artist}**\n"
f"` {num_tracks} ` tracks\n\n"
f"Queued by: {user.mention}"
),
thumbnail=artwork_url,
)
return album, embed

View File

@ -0,0 +1,88 @@
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the playlist info from the Apple Music API
"""
playlist_id = query.split("/playlist/")[1].split("/")[1]
try:
# Get all of the tracks in the playlist (limit at 100)
response = requests.get(
f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}/tracks?limit=100",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Playlist Not Found",
description=(
"The playlist could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Apple Music API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
playlist = response.json()
# Get the general playlist info (name, artwork)
response = requests.get(
f"https://api.music.apple.com/v1/catalog/us/playlists/{playlist_id}",
headers=headers,
)
response.raise_for_status()
# Unpack the playlist info
playlist_info = response.json()
name = playlist_info["data"][0]["attributes"]["name"]
num_tracks = len(playlist["data"])
except IndexError:
LOG.error("Failed unpacking Apple Music playlist info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Apple Music API")
return None, None
# Extract artwork URL, if available
artwork_url = (
playlist_info["data"][0]["attributes"]
.get("artwork", {})
.get("url", None)
)
if artwork_url:
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
embed = create_embed(
title="Playlist Queued",
description=(
f"**{name}**\n` {num_tracks} ` tracks\n\nQueued by: {user.mention}"
),
thumbnail=artwork_url,
)
# Add small alert if the playlist is the max size
if len(playlist["data"]) == 100:
embed.description += (
"\n\n*This playlist is longer than the 100 song"
" maximum. Only the first 100 songs will be"
" queued.*"
)
return playlist, embed

View File

@ -0,0 +1,68 @@
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the song info from the Apple Music API
"""
song_id = query.split("/album/")[1].split("?i=")[1]
try:
# Get the song info
response = requests.get(
f"https://api.music.apple.com/v1/catalog/us/songs/{song_id}",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Song Not Found",
description=(
"The song could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Apple Music API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the song info
song = response.json()
name = song["data"][0]["attributes"]["name"]
artist = song["data"][0]["attributes"]["artistName"]
except IndexError:
LOG.error("Failed unpacking Apple Music song info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Apple Music API")
return None, None
# Extract artwork URL, if available
artwork_url = (
song["data"][0]["attributes"].get("artwork", {}).get("url", None)
)
if artwork_url:
artwork_url = artwork_url.replace("{w}x{h}", "300x300")
embed = create_embed(
title="Song Queued",
description=f"**{name}** by **{artist}**\n\nQueued by {user.mention}",
thumbnail=artwork_url,
)
return song, embed

View File

@ -0,0 +1,91 @@
import discord
from utils.source_helpers.apple import (
album as apple_album,
playlist as apple_playlist,
song as apple_song,
)
from utils.source_helpers.spotify import (
album as spotify_album,
artist as spotify_artist,
playlist as spotify_playlist,
song as spotify_song,
)
from utils.custom_sources import AppleSource, SpotifySource
async def parse_custom_source(
self, provider: str, query: str, user: discord.User
):
"""
Parse the query and run the appropriate functions to get the results/info
Return the results and an embed or None, None
"""
load_funcs = {
"apple": {
"album": apple_album.load,
"playlist": apple_playlist.load,
"song": apple_song.load,
},
"spotify": {
"album": spotify_album.load,
"artist": spotify_artist.load,
"playlist": spotify_playlist.load,
"song": spotify_song.load,
},
}
headers = {
"apple": self.bot.apple_headers,
"spotify": self.bot.spotify_headers,
}
sources = {
"apple": AppleSource,
"spotify": SpotifySource,
}
# Catch all songs
if "?i=" in query or "/track/" in query:
song, embed = await load_funcs[provider]["song"](
headers[provider], query, user
)
if song:
results = await sources[provider].load_item(self, user, song)
else:
return None, embed
# Catch all playlists
elif "/playlist/" in query:
playlist, embed = await load_funcs[provider]["playlist"](
headers[provider], query, user
)
if playlist:
results = await sources[provider].load_playlist(
self, user, playlist
)
else:
return None, embed
# Catch all albums
elif "/album/" in query:
album, embed = await load_funcs[provider]["album"](
headers[provider], query, user
)
if album:
results = await sources[provider].load_album(self, user, album)
else:
return None, embed
# Catch Spotify artists
elif "/artist/" in query:
artist, embed = await load_funcs[provider]["artist"](
headers[provider], query, user
)
if artist:
results = await sources[provider].load_artist(self, user, artist)
else:
return None, embed
return results, embed

View File

@ -0,0 +1,68 @@
import datetime
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the album info from the Spotify API
"""
album_id = query.split("/album/")[1]
try:
# Get the album info
response = requests.get(
f"https://api.spotify.com/v1/albums/{album_id}",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Album Not Found",
description=(
"The album could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Spotify API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the album info
album = response.json()
name = album["name"]
artist = album["artists"][0]["name"]
num_tracks = len(album["tracks"]["items"])
artwork_url = album["images"][0]["url"]
except IndexError:
LOG.error("Failed unpacking Spotify album info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Spotify API")
return None, None
embed = create_embed(
title="Album Queued",
description=(
f"**{name}** by **{artist}**\n"
f"` {num_tracks} ` tracks\n\n"
f"Queued by: {user.mention}"
),
thumbnail=artwork_url,
)
return album, embed

View File

@ -0,0 +1,77 @@
import datetime
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the artists top tracks from the Spotify API
"""
artist_id = query.split("/artist/")[1]
try:
# Get the artists songs
response = requests.get(
f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Artist Not Found",
description=(
"Either the provided link is malformed, the artist does"
" not exist, or the artist does not have any songs."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Spotify API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the artists songs
artist = response.json()
name = artist["tracks"][0]["artists"][0]["name"]
num_tracks = len(artist["tracks"])
# Get the artist info (for the thumbnail)
response = requests.get(
f"https://api.spotify.com/v1/artists/{artist_id}",
headers=headers,
)
response.raise_for_status()
try:
artwork_url = response.json()["images"][0]["url"]
except IndexError:
artwork_url = None
except IndexError:
LOG.error("Failed unpacking Spotify artist info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Spotify API")
return None, None
embed = create_embed(
title="Artist Queued",
description=(
f"Top `{num_tracks}` track by **{name}**\n\n"
f"Queued by {user.mention}"
),
thumbnail=artwork_url,
)
return artist, embed

View File

@ -0,0 +1,68 @@
import datetime
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the playlist info from the Spotify API
"""
playlist_id = query.split("/playlist/")[1]
try:
# Get the playlist info
response = requests.get(
f"https://api.spotify.com/v1/playlists/{playlist_id}",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Playlist Not Found",
description=(
"The playlist could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Spotify API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the playlist info
playlist = response.json()
name = playlist["name"]
owner = playlist["owner"]["display_name"]
num_tracks = len(playlist["tracks"]["items"])
artwork_url = playlist["images"][0]["url"]
except IndexError:
LOG.error("Failed unpacking Spotify playlist info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Spotify API")
return None, None
embed = create_embed(
title="Playlist Queued",
description=(
f"**{name}** from **{owner}**\n"
f"` {num_tracks} ` tracks\n\n"
f"Queued by {user.mention}"
),
thumbnail=artwork_url,
)
return playlist, embed

View File

@ -0,0 +1,63 @@
import datetime
import discord
import requests
from typing import Tuple, Optional
from requests.exceptions import JSONDecodeError
from utils.config import create_embed, LOG
async def load(
headers: dict,
query: str,
user: discord.User,
) -> Tuple[Optional[dict], Optional[discord.Embed]]:
"""
Get the song info from the Spotify API
"""
song_id = query.split("/track/")[1]
try:
# Get the song info
response = requests.get(
f"https://api.spotify.com/v1/tracks/{song_id}",
headers=headers,
)
if response.status_code == 404:
embed = create_embed(
title="Song Not Found",
description=(
"The song could not be found as the provided link is"
" invalid. Please try again."
),
)
return None, embed
if response.status_code == 401:
LOG.error(
"Could not authorize with Spotify API. Likely need to"
" restart the bot."
)
return None, None
response.raise_for_status()
# Unpack the song info
song = response.json()
name = song["name"]
artist = song["artists"][0]["name"]
artwork_url = song["album"]["images"][0]["url"]
except IndexError:
LOG.error("Failed unpacking Spotify song info")
return None, None
except (JSONDecodeError, requests.HTTPError):
LOG.error("Failed making request to Spotify API")
return None, None
embed = create_embed(
title="Song Queued",
description=f"**{name}** by **{artist}**\n\nQueued by {user.mention}",
thumbnail=artwork_url,
)
return song, embed