aboutsummaryrefslogtreecommitdiff
path: root/code/cogs/music.py
diff options
context:
space:
mode:
authorParker <contact@pkrm.dev>2025-01-25 22:58:41 -0600
committerParker <contact@pkrm.dev>2025-01-25 22:58:41 -0600
commit3c18d552c2c9eb15834b33687915a7b16c98883b (patch)
treecf502397da128e3c31dc8f45397d1824dd7c2df9 /code/cogs/music.py
reupload
Diffstat (limited to 'code/cogs/music.py')
-rw-r--r--code/cogs/music.py584
1 files changed, 584 insertions, 0 deletions
diff --git a/code/cogs/music.py b/code/cogs/music.py
new file mode 100644
index 0000000..e6f9ef0
--- /dev/null
+++ b/code/cogs/music.py
@@ -0,0 +1,584 @@
+import re
+import discord
+import lavalink
+from discord.ext import commands
+import math
+import requests
+import datetime
+from discord import app_commands
+from custom_source import CustomSource
+from global_variables import BOT_COLOR
+
+url_rx = re.compile(r'https?://(?:www\.)?.+')
+
+class LavalinkVoiceClient(discord.VoiceClient):
+ """
+ This is the preferred way to handle external voice sending
+ This client will be created via a cls in the connect method of the channel
+ see the following documentation:
+ https://discordpy.readthedocs.io/en/latest/api.html#voiceprotocol
+ """
+
+ def __init__(self, client: commands.Bot, channel: discord.abc.Connectable):
+ self.client = client
+ self.channel = channel
+ if hasattr(self.client, 'lavalink'):
+ self.lavalink = self.client.lavalink
+ else:
+ self.client.lavalink = lavalink.Client(client.user.id)
+ self.client.lavalink.add_node(
+ '127.0.0.1',
+ 2333,
+ 'youshallnotpass',
+ 'us-central',
+ 'default-node'
+ )
+ self.lavalink = self.client.lavalink
+
+ async def on_voice_server_update(self, data):
+ lavalink_data = {
+ 't': 'VOICE_SERVER_UPDATE',
+ 'd': data
+ }
+ await self.lavalink.voice_update_handler(lavalink_data)
+
+ async def on_voice_state_update(self, data):
+ lavalink_data = {
+ 't': 'VOICE_STATE_UPDATE',
+ 'd': data
+ }
+ await self.lavalink.voice_update_handler(lavalink_data)
+
+ async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = True, self_mute: bool = False) -> None:
+ self.lavalink.player_manager.create(guild_id=self.channel.guild.id)
+ await self.channel.guild.change_voice_state(channel=self.channel, self_mute=self_mute, self_deaf=self_deaf)
+
+ async def disconnect(self, *, force: bool) -> None:
+ player = self.lavalink.player_manager.get(self.channel.guild.id)
+ if not force and not player.is_connected:
+ return
+
+ await self.channel.guild.change_voice_state(channel=None)
+ player.channel_id = None
+ self.cleanup()
+
+
+class Music(commands.Cog):
+ def __init__(self, bot):
+ self.bot = bot
+
+ if not hasattr(bot, 'lavalink'): # This ensures the client isn't overwritten during cog reloads.
+ bot.lavalink = lavalink.Client(self.bot.user.id)
+ bot.lavalink.add_node(
+ '127.0.0.1',
+ 2333,
+ 'youshallnotpass',
+ 'us-central',
+ 'default-node'
+ ) # Host, Port, Password, Region, Name
+
+ self.lavalink: lavalink.Client = bot.lavalink
+ self.lavalink.add_event_hooks(self.track_hook)
+
+ def cog_unload(self):
+ """ Cog unload handler. This removes any event hooks that were registered. """
+ self.lavalink._event_hooks.clear()
+
+ async def ensure_before(self, interaction):
+ """ Command before-invoke handler. """
+ guild_check = interaction.guild is not None
+
+ if guild_check:
+ await self.ensure_voice(interaction)
+
+ return guild_check
+
+ async def ensure_voice(self, interaction: discord.Interaction):
+ """ This check ensures that the bot and command author are in the same voicechannel. """
+ player = self.lavalink.player_manager.create(interaction.guild.id)
+ should_connect = interaction.command.name in ('play',)
+
+ if not interaction.user.voice or not interaction.user.voice.channel:
+ await interaction.response.send_message('Join a voicechannel first.', ephemeral=True)
+ raise ZeroDivisionError
+
+ if not player.is_connected:
+ if not should_connect:
+ await interaction.response.send_message('Not connected.', ephemeral=True)
+ raise ZeroDivisionError
+
+ permissions = interaction.user.voice.channel.permissions_for(interaction.user)
+
+ if not permissions.connect or not permissions.speak:
+ await interaction.response.send_message('I need the `CONNECT` and `SPEAK` permissions.', ephemeral=True)
+ raise ZeroDivisionError
+
+ player.store('channel', interaction.channel.id)
+ await interaction.user.voice.channel.connect(cls=LavalinkVoiceClient)
+ else:
+ if int(player.channel_id) != interaction.user.voice.channel.id:
+ await interaction.response.send_message('You need to be in my voicechannel.', ephemeral=True)
+ raise ZeroDivisionError
+
+ async def track_hook(self, event):
+ if isinstance(event, lavalink.events.QueueEndEvent):
+ guild_id = event.player.guild_id
+ guild = self.bot.get_guild(guild_id)
+ await guild.voice_client.disconnect(force=True)
+
+ @commands.Cog.listener()
+ async def on_voice_state_update(self, member, before, after):
+ if before.channel and member == self.bot.user:
+ if after.channel is None:
+ player = self.lavalink.player_manager.get(member.guild.id)
+ player.queue.clear()
+ await player.stop()
+ guild = member.guild
+ try:
+ await guild.voice_client.disconnect(force=True)
+ player.shuffle = not player.shuffle if player.shuffle else player.shuffle
+ except AttributeError:
+ pass
+
+
+ @app_commands.command()
+ @app_commands.describe(name="Name or link of song")
+ async def play(
+ self,
+ interaction: discord.Interaction,
+ name: str
+ ):
+ "Play a song from your favorite music provider"
+ await self.ensure_before(interaction)
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+ query = name
+
+ # Below begins the start of the search for Spotify links - we must check for playlist, albums, and tracks
+ # We use a custom source in order to provide us with the correct information and streams
+ if "open.spotify.com/playlist" in query:
+ playlist_id = query.split("playlist/")[1].split("?si=")[0]
+ playlist_url = f"https://api.spotify.com/v1/playlists/{playlist_id}"
+ headers = {"Authorization": f"Bearer {self.bot.access_token}"}
+ response = requests.get(playlist_url, headers=headers)
+ if response.status_code == 200:
+ playlist = response.json()
+
+ embed = discord.Embed(
+ title = "Playlist Queued",
+ description = f"**{playlist['name']}** from **{playlist['owner']['display_name']}**\n` {len(playlist['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=playlist['images'][0]['url'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+ tracks = await CustomSource.load_playlist(self, interaction.user, playlist)
+ for track in tracks['tracks']:
+ player.add(requester=interaction.user, track=track)
+
+ if "open.spotify.com/album" in query:
+ album_id = query.split("album/")[1]
+ album_url = f"https://api.spotify.com/v1/albums/{album_id}"
+ headers = {"Authorization": f"Bearer {self.bot.access_token}"}
+ response = requests.get(album_url, headers=headers)
+ if response.status_code == 200:
+ album = response.json()
+
+ embed = discord.Embed(
+ title = "Album Queued",
+ description = f"**{album['name']}** by **{album['artists'][0]['name']}**\n` {len(album['tracks']['items'])} ` tracks\n\nQueued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=album['images'][0]['url'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+ tracks = await CustomSource.load_album(self, interaction.user, album)
+ for track in tracks['tracks']:
+ player.add(requester=interaction.user, track=track)
+
+ if "open.spotify.com/track" in query:
+ track_id = query.split("track/")[1]
+ track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
+ headers = {"Authorization": f"Bearer {self.bot.access_token}"}
+ response = requests.get(track_url, headers=headers)
+ if response.status_code == 200:
+ track = response.json()
+
+ embed = discord.Embed(
+ title = "Track Queued",
+ description = f"**{track['name']}** by **{track['artists'][0]['name']}**\n\nQueued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=track['album']['images'][0]['url'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+ track_ = await CustomSource.load_item(self, interaction.user, track)
+ player.add(requester=interaction.user, track=track_.tracks[0])
+
+ if 'open.spotify.com/artists' in query:
+ return await interaction.response.send_message("Sorry, I can't play artists. You must provide a song/album/playlist.", ephemeral=True)
+
+ if 'open.spotify.com' in query:
+ if not player.is_playing:
+ await player.play()
+ return
+
+ # Now begins the soundcloud section, this can be just like the youtube section
+ if not url_rx.match(query):
+ query = f'ytsearch:{query}'
+ results = await player.node.get_tracks(query)
+
+ # Below is for YouTube search, which is the default and is used when no link is provided
+ if not url_rx.match(query) and not results or not results['tracks']:
+ query = f'scsearch:{query}'
+ results = await player.node.get_tracks(query)
+
+ if not results or not results['tracks']:
+ return await interaction.response.send_message('Nothing found!', ephemeral=True)
+
+ embed = discord.Embed(color=BOT_COLOR)
+
+ if results['loadType'] == 'PLAYLIST_LOADED':
+ tracks = results['tracks']
+
+ for track in tracks:
+ track_ = lavalink.AudioTrack(track, interaction.user.id, extra=f"https://img.youtube.com/vi/{track['info']['identifier']}/hqdefault.jpg")
+ player.add(requester=interaction.user, track=track_)
+
+ embed.title = 'Playlist Queued!'
+ embed.description = f"**{results['playlistInfo']['name']}**\n` {len(tracks)} ` tracks\n\nQueued by: {interaction.user.mention}"
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ else:
+ track = results['tracks'][0]
+ embed.title = 'Track Queued'
+ embed.description = f"**{track['info']['title']}** by **{track['info']['author']}**\n\nQueued by: {interaction.user.mention}"
+ embed.set_thumbnail(url=f"https://img.youtube.com/vi/{track['info']['identifier']}/hqdefault.jpg")
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+
+ track_ = lavalink.AudioTrack(track, interaction.user.id, recommended=True, extra=f"https://img.youtube.com/vi/{track['info']['identifier']}/hqdefault.jpg")
+ player.add(requester=interaction.user, track=track_)
+
+ await interaction.response.send_message(embed=embed)
+
+ # We don't want to call .play() if the player is playing as that will
+ # effectively skip the current track
+ if not player.is_playing:
+ await player.play()
+
+
+ @app_commands.command()
+ async def stop(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Disconnects the bot from the voice channel and clears the queue"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_connected:
+ embed = discord.Embed(
+ title="No Channel",
+ description="I am not currently connected to any voice channel.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ player.queue.clear()
+ await player.stop()
+ player.shuffle = not player.shuffle if player.shuffle else player.shuffle
+ guild = interaction.guild
+ await guild.voice_client.disconnect(force=True)
+ embed = discord.Embed(
+ title="Queue Cleared and Music Stopped",
+ description=f"Thank you for using Aqua Bot :wave:\n\nIssued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def clear(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Clear the current queue of songs"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_connected:
+ embed = discord.Embed(
+ title="No Channel",
+ description="I am not currently connected to any voice channel.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ player.queue.clear()
+ embed = discord.Embed(
+ title="Queue Cleared",
+ description = f"The queue has been cleared of all songs!\n\nIssued by: {interaction.user.mention}",
+ color=0x0088a9
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def skip(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Skips the song that is currently playing"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Nothing Playing",
+ description="Nothing is currently playing, so I can't skip anything.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ await player.skip()
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Track Skipped",
+ description=f"The queue is now empty, so I have left the voice channel. Thank you for using Aqua Bot.\n\nIssued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed)
+
+ embed = discord.Embed(
+ title="Track Skipped",
+ description = f"**Now Playing: [{player.current.title}]({player.current.uri})**\n\nQueued by: {player.current.requester.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=player.current.extra['extra'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def pause(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Pauses the song that is currently playing"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Nothing Playing",
+ description="Nothing is currently playing, so I can't pause anything.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ await player.set_pause(pause=True)
+ embed = discord.Embed(
+ title = f"Music Now Paused ⏸️",
+ description = f"**[{player.current.title}]({player.current.uri})**\n\nQueued by: {player.current.requester.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=player.current.extra['extra'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def resume(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Resumes the paused song"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Nothing Paused",
+ description="Nothing is currently paused, so I can't resume anything.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ await player.set_pause(pause=False)
+ embed = discord.Embed(
+ title=f"Music Now Resumed ⏯️",
+ description=f"**[{player.current.title}]({player.current.uri})**\n\nQueued by: {player.current.requester.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=player.current.extra['extra'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ @app_commands.describe(page="Queue page number - leave blank if you are unsure")
+ async def queue(
+ self,
+ interaction: discord.Interaction,
+ page: int = 1
+ ):
+ "See the current queue of songs"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.queue:
+ embed = discord.Embed(
+ title="Nothing Queued",
+ description="Nothing is currently in the queue, add a song with the `/play` command.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ items_per_page = 10
+ pages = math.ceil(len(player.queue) / items_per_page)
+
+ start = (page - 1) * items_per_page
+ end = start + items_per_page
+
+ queue_list = ''
+ for index, track in enumerate(player.queue[start:end], start=start):
+ # Change ms duration to hour, min, sec in the format of 00:00:00
+ track_duration = lavalink.utils.format_time(track.duration)
+ # If the track is less than an hour, remove the hour from the duration
+ if track_duration.split(':')[0] == '00':
+ track_duration = track_duration[3:]
+
+ queue_list += f"`{index+1}. ` [{track.title}]({track.uri}) - {track.author} `({track_duration})`\n"
+
+ embed = discord.Embed(
+ title=f"Queue for {interaction.guild.name}",
+ description=f'**{len(player.queue)} tracks total**\n\n{queue_list}',
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=f'Viewing page {page}/{pages}')
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def np(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Show what song is currently playing"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Nothing Playing",
+ description="Nothing is currently playing, play a song with the `/play` command.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ time_in = str(datetime.timedelta(milliseconds=player.position))[:-7]
+ total_duration = lavalink.utils.format_time(player.current.duration)
+ # If total_duration has no hours, then remove the hour part from both times
+ if total_duration.split(":")[0] == "00":
+ time_in = time_in[2:]
+ total_duration = total_duration[3:]
+
+ embed= discord.Embed(
+ title="Now Playing 🎢",
+ description=f"**[{player.current.title}]({player.current.uri})**\n{f'` {time_in}/{total_duration} `'}\n\nQueued by: {player.current.requester.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=player.current.extra['extra'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ @app_commands.describe(number='Song number to have removed')
+ async def remove(
+ self,
+ interaction: discord.Interaction,
+ number: int
+ ):
+ "Removes the specified song from the queue"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.queue:
+ embed = discord.Embed(
+ title="Nothing Queued",
+ description="Nothing is currently in the queue, so there is nothing for me to remove.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed)
+
+ if number > len(player.queue) or number < 1:
+ return await interaction.response.send_message('The number entered is not a number within the queue - please try again!', ephemeral=True)
+
+ index = number - 1
+ removed_title = player.queue[index].title
+ removed_url = player.queue[index].uri
+ player.queue.pop(index)
+
+ embed = discord.Embed(
+ title="Song Removed from Queue",
+ description=f"**Song Removed - [{removed_title}]({removed_url})**\n\nIssued by: {interaction.user.mention}",
+ color=BOT_COLOR
+ )
+ embed.set_thumbnail(url=player.current.extra['extra'])
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+ @app_commands.command()
+ async def shuffle(
+ self,
+ interaction: discord.Interaction
+ ):
+ "Plays the songs in the queue in a randomized order, until turned off"
+ await self.ensure_before(interaction)
+
+ player = self.lavalink.player_manager.get(interaction.guild.id)
+
+ if not player.is_playing:
+ embed = discord.Embed(
+ title="Nothing Playing",
+ description="Nothing is currently playing, therefore I cannot shuffle the music.",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ return await interaction.response.send_message(embed=embed, ephemeral=True)
+
+ player.shuffle = not player.shuffle
+
+ embed = discord.Embed(
+ title=f"{'Shuffle Enabled πŸ”€' if player.shuffle else 'Shuffle Disabled πŸ”€'}",
+ description=f"{'All music will now be shuffled.' if player.shuffle else 'Music will no longer be shuffled.'}",
+ color=BOT_COLOR
+ )
+ embed.set_footer(text=datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')+" UTC")
+ await interaction.response.send_message(embed=embed)
+
+
+async def setup(bot):
+ await bot.add_cog(Music(bot)) \ No newline at end of file