import os import aiohttp import random import string from config import NAMING_SCHEME, LOG async def save_attachments(message) -> list: """Download attachments and return a list of their paths.""" paths = [] for attachment in message.attachments: async with aiohttp.ClientSession() as session: async with session.get(attachment.url) as response: # Check if the request was successful if response.status != 200: LOG.warn( f"Failed to download attachment: {attachment.url}" ) continue # Check for content type content_type = response.headers.get("Content-Type") if not content_type: LOG.warn( f"Failed to get content type for: {attachment.url}" ) continue filename = get_filename( attachment, message, content_type.split("/")[-1] ) # Save the attachment with open(f"images/{filename}", "wb") as file: file.write(await response.read()) # Add the path to the attachments list paths.append(f"images/{filename}") return paths def get_filename(attachment, message, file_extension) -> str: """Generate a filename based on the naming scheme.""" if NAMING_SCHEME == "original": i = 1 filename = attachment.filename # account for duplicate filenames while os.path.exists(f"images/{filename}.{file_extension}"): filename = f"{attachment.filename}_{i}" i += 1 elif NAMING_SCHEME == "timestamp": i = 1 filename = message.created_at.isoformat() # account for multiple attachments from the same message while os.path.exists(f"images/{filename}.{file_extension}"): filename = f"{message.created_at.isoformat()}_{i}" i += 1 elif NAMING_SCHEME == "id": filename = str(attachment.id) else: # random filename = "".join( random.choice(string.ascii_letters) for _ in range(15) ) return f"{filename}.{file_extension}"