From 55b2a3d2adb66f1584f98a691221c8a29552155f Mon Sep 17 00:00:00 2001 From: Wiktor <42137698+overspend1@users.noreply.github.com> Date: Fri, 21 Mar 2025 00:09:32 +0100 Subject: [PATCH] Create cloud_bot.py Create cloud_bot.py in the main branch --- cloud_bot.py | 501 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 501 insertions(+) create mode 100644 cloud_bot.py diff --git a/cloud_bot.py b/cloud_bot.py new file mode 100644 index 0000000..a9beac9 --- /dev/null +++ b/cloud_bot.py @@ -0,0 +1,501 @@ +import logging +import os +import shutil +import datetime +import time +import asyncio +from enum import Enum, auto +from typing import Dict, Union +from datetime import datetime, timezone +from dotenv import load_dotenv + +from telegram import Update +from telegram.ext import ( + Application, + CommandHandler, + MessageHandler, + filters, + ContextTypes, + ConversationHandler, +) + +# Load environment variables +load_dotenv() + +# Get bot token from environment variable +BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') + +# Enable logging +logging.basicConfig( + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + level=logging.INFO +) +logger = logging.getLogger(__name__) + +# Constants +STORAGE_DIR = 'local_storage' +PASSWORD = 'cloudstorage1' +ADMIN_USERNAME = 'overspend1' +ANIMATION_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] +UPLOAD_PROGRESS = ["▱", "▰"] + +# Create storage directory if it doesn't exist +os.makedirs(STORAGE_DIR, exist_ok=True) + +# State definitions for conversation +class States(Enum): + WAITING_FOR_PASSWORD = auto() + WAITING_FOR_FILENAME = auto() + WAITING_FOR_NEWNAME = auto() + +# Store user sessions and activity +user_sessions: Dict[int, Dict] = {} + +def get_current_time() -> str: + """Get current UTC time in YYYY-MM-DD HH:MM:SS format.""" + return datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + +def format_progress_bar(progress: float, width: int = 20) -> str: + """Create a progress bar string.""" + filled = int(width * progress / 100) + bar = UPLOAD_PROGRESS[1] * filled + UPLOAD_PROGRESS[0] * (width - filled) + return f"[{bar}] {progress:.1f}%" + +async def animate_progress(message, text: str, total_time: int = 3): + """Animate a progress message.""" + start_time = time.time() + while time.time() - start_time < total_time: + for frame in ANIMATION_FRAMES: + progress = min(100, ((time.time() - start_time) / total_time) * 100) + progress_bar = format_progress_bar(progress) + try: + await message.edit_text(f"{text}\n{frame} {progress_bar}") + await asyncio.sleep(0.1) + except: + pass + +async def log_activity(user_id: int, action: str) -> None: + """Log user activity with timestamp.""" + timestamp = get_current_time() + if user_id in user_sessions: + user_sessions[user_id]['last_activity'] = timestamp + user_sessions[user_id]['activities'].append(f"{timestamp} - {action}")async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_PASSWORD: + """Start the conversation and ask for password.""" + user_id = update.effective_user.id + username = update.effective_user.username + + msg = await update.message.reply_text( + f"🌟 Welcome to Cloud Storage Bot!\n" + f"Current time (UTC): {get_current_time()}\n" + f"Your username: {username}" + ) + + await animate_progress(msg, "Initializing bot...") + + await msg.edit_text( + f"👋 Welcome to Cloud Storage Bot!\n" + f"Current time (UTC): {get_current_time()}\n" + f"Your username: {username}\n" + "Please enter the password to continue." + ) + + user_sessions[user_id] = { + 'username': username, + 'authenticated': False, + 'last_activity': get_current_time(), + 'activities': [] + } + + return States.WAITING_FOR_PASSWORD + +async def check_password(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Union[int, States]: + """Check the provided password.""" + user_id = update.effective_user.id + if update.message.text == PASSWORD: + user_sessions[user_id]['authenticated'] = True + await log_activity(user_id, "Authentication successful") + msg = await update.message.reply_text("Verifying password...") + await animate_progress(msg, "Authenticating...") + await msg.edit_text( + f"✅ Password accepted! Use /help to see available commands.\n" + f"Current time (UTC): {get_current_time()}" + ) + return ConversationHandler.END + else: + await log_activity(user_id, "Authentication failed") + await update.message.reply_text("❌ Incorrect password. Please try again.") + return States.WAITING_FOR_PASSWORD + +async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Show help message.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return + + msg = await update.message.reply_text("Loading help...") + await animate_progress(msg, "Loading commands...") + + help_text = f""" +📚 Available commands: +/upload - Upload a file 📤 +/download - Download a file 📥 +/list - List files 📂 +/delete - Delete a file 🗑️ +/rename - Rename a file ✏️ +/move - Move a file 🚚 +/copy - Copy a file 📄 +/metadata - Show file info 📝 +/cleanup - Remove old files 🧹 +/stats - Show statistics 📊 +/help - Show this message ℹ️ + +Current time (UTC): {get_current_time()} +Your username: {update.effective_user.username} +""" + await log_activity(user_id, "Viewed help") + await msg.edit_text(help_text) + +async def upload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle the /upload command.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return + + await log_activity(user_id, "Initiated file upload") + msg = await update.message.reply_text("Preparing upload...") + await animate_progress(msg, "Initializing upload system...") + await msg.edit_text( + f"📤 Please send me any file to upload to cloud storage.\n" + f"Current time (UTC): {get_current_time()}\n" + f"User: {update.effective_user.username}" + ) + +async def handle_file_upload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle uploaded files.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return + + try: + file = update.message.document + if not file: + await update.message.reply_text("❌ Please send a file to upload.") + return + + file_name = file.file_name + file_path = os.path.join(STORAGE_DIR, file_name) + + # Check if file already exists + if os.path.exists(file_path): + await update.message.reply_text( + f"⚠️ File {file_name} already exists.\n" + f"Please rename the file or delete the existing one.\n" + f"Current time (UTC): {get_current_time()}" + ) + return + + msg = await update.message.reply_text("Starting upload...") + + # Simulate upload progress + progress_steps = [0, 20, 40, 60, 80, 90, 95, 100] + for progress in progress_steps: + progress_bar = format_progress_bar(progress) + await msg.edit_text( + f"📤 Uploading {file_name}...\n" + f"{progress_bar}\n" + f"⏳ Please wait..." + ) + await asyncio.sleep(0.5) + + new_file = await context.bot.get_file(file.file_id) + await new_file.download_to_drive(custom_path=file_path) + + file_size = os.path.getsize(file_path) + await log_activity(user_id, f"Uploaded file: {file_name} ({file_size / 1024:.1f} KB)") + + # Final success message + await msg.edit_text( + f"✅ File uploaded successfully!\n" + f"📄 Name: {file_name}\n" + f"📦 Size: {file_size / 1024:.1f} KB\n" + f"⏰ Time: {get_current_time()}\n" + f"👤 Uploaded by: {update.effective_user.username}\n" + f"{format_progress_bar(100)}" + ) + except Exception as e: + logger.error(f"Error uploading file: {e}") + await update.message.reply_text( + "❌ Error uploading file. Please try again.\n" + f"Time: {get_current_time()}" + )async def download(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_FILENAME: + """Ask for file name to download.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return ConversationHandler.END + + msg = await update.message.reply_text("Loading file list...") + await animate_progress(msg, "Scanning storage...") + + # List available files + files = os.listdir(STORAGE_DIR) + if not files: + await msg.edit_text( + f"📂 Storage is empty. No files to download.\n" + f"Current time (UTC): {get_current_time()}" + ) + return ConversationHandler.END + + file_list = "Available files:\n" + for file in files: + size = os.path.getsize(os.path.join(STORAGE_DIR, file)) + file_list += f"📄 {file} ({size / 1024:.1f} KB)\n" + + await msg.edit_text( + f"📥 Enter the name of the file you want to download:\n\n{file_list}\n" + f"Current time (UTC): {get_current_time()}" + ) + return States.WAITING_FOR_FILENAME + +async def handle_download_request(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle file download.""" + user_id = update.effective_user.id + file_name = update.message.text + file_path = os.path.join(STORAGE_DIR, file_name) + + try: + if os.path.exists(file_path): + msg = await update.message.reply_text("Preparing download...") + await animate_progress(msg, "Preparing file...") + + file_size = os.path.getsize(file_path) + await msg.edit_text( + f"📤 Sending file: {file_name}\n" + f"📦 Size: {file_size / 1024:.1f} KB\n" + f"⏰ Time: {get_current_time()}\n" + f"{format_progress_bar(50)}" + ) + + # Send the actual file + with open(file_path, 'rb') as file: + await update.message.reply_document( + document=file, + filename=file_name, + caption=f"✅ File downloaded at {get_current_time()}" + ) + + await msg.edit_text( + f"✅ Download complete!\n" + f"📄 {file_name}\n" + f"⏰ {get_current_time()}\n" + f"{format_progress_bar(100)}" + ) + + await log_activity(user_id, f"Downloaded file: {file_name}") + else: + await update.message.reply_text( + f"❌ File {file_name} not found.\n" + f"Time: {get_current_time()}" + ) + except Exception as e: + logger.error(f"Error downloading file: {e}") + await update.message.reply_text( + "❌ Error downloading file. Please try again.\n" + f"Time: {get_current_time()}" + ) + + return ConversationHandler.END + +async def delete_file_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_FILENAME: + """Ask for file name to delete.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return ConversationHandler.END + + msg = await update.message.reply_text("Loading file list...") + await animate_progress(msg, "Scanning storage...") + + files = os.listdir(STORAGE_DIR) + if not files: + await msg.edit_text( + f"📂 Storage is empty. No files to delete.\n" + f"Current time (UTC): {get_current_time()}" + ) + return ConversationHandler.END + + file_list = "Available files:\n" + for file in files: + size = os.path.getsize(os.path.join(STORAGE_DIR, file)) + file_list += f"📄 {file} ({size / 1024:.1f} KB)\n" + + await msg.edit_text( + f"🗑️ Enter the name of the file to delete:\n\n{file_list}\n" + f"Current time (UTC): {get_current_time()}" + ) + return States.WAITING_FOR_FILENAME + +async def handle_delete(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle file deletion.""" + user_id = update.effective_user.id + file_name = update.message.text + file_path = os.path.join(STORAGE_DIR, file_name) + + msg = await update.message.reply_text("Processing deletion request...") + await animate_progress(msg, "Deleting file...") + + try: + if os.path.exists(file_path): + os.remove(file_path) + await log_activity(user_id, f"Deleted file: {file_name}") + await msg.edit_text( + f"✅ File {file_name} deleted successfully.\n" + f"⏰ Time: {get_current_time()}" + ) + else: + await msg.edit_text( + f"❌ File {file_name} not found.\n" + f"Time: {get_current_time()}" + ) + except Exception as e: + logger.error(f"Error deleting file: {e}") + await msg.edit_text( + "❌ Error deleting file.\n" + f"Time: {get_current_time()}" + ) + + return ConversationHandler.END + +async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Show storage statistics.""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return + + msg = await update.message.reply_text("Loading statistics...") + await animate_progress(msg, "Calculating storage usage...") + + try: + files = os.listdir(STORAGE_DIR) + total_size = sum(os.path.getsize(os.path.join(STORAGE_DIR, f)) for f in files) + + stats_text = ( + f"📊 Storage Statistics (as of {get_current_time()}):\n" + f"📁 Total files: {len(files)}\n" + f"💾 Total size: {total_size / 1024 / 1024:.1f} MB\n" + f"👥 Active users: {len(user_sessions)}\n" + f"👤 Current user: {update.effective_user.username}\n" + f"🕒 Last activity: {user_sessions[user_id]['last_activity']}" + ) + + await log_activity(user_id, "Viewed statistics") + await msg.edit_text(stats_text) + except Exception as e: + logger.error(f"Error getting stats: {e}") + await msg.edit_text( + "❌ Error getting statistics.\n" + f"Time: {get_current_time()}" + ) + +async def cleanup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Clean up old files (older than 30 days).""" + user_id = update.effective_user.id + if not user_sessions.get(user_id, {}).get('authenticated', False): + await update.message.reply_text("❌ Please use /start to enter password first.") + return + + msg = await update.message.reply_text("Starting cleanup...") + await animate_progress(msg, "Scanning for old files...") + + try: + now = datetime.now() + deleted = 0 + deleted_size = 0 + + for file in os.listdir(STORAGE_DIR): + file_path = os.path.join(STORAGE_DIR, file) + if os.path.isfile(file_path): + mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) + if (now - mtime).days > 30: + size = os.path.getsize(file_path) + deleted_size += size + os.remove(file_path) + deleted += 1 + await msg.edit_text( + f"🗑️ Removing old file: {file}\n" + f"{format_progress_bar(deleted * 100 / len(os.listdir(STORAGE_DIR)))}" + ) + + await log_activity(user_id, f"Cleanup: removed {deleted} files ({deleted_size / 1024 / 1024:.1f} MB)") + await msg.edit_text( + f"✅ Cleanup complete!\n" + f"🗑️ Removed {deleted} old files\n" + f"💾 Freed up: {deleted_size / 1024 / 1024:.1f} MB\n" + f"⏰ Time: {get_current_time()}" + ) + except Exception as e: + logger.error(f"Error during cleanup: {e}") + await msg.edit_text( + "❌ Error during cleanup.\n" + f"Time: {get_current_time()}" + ) + +def main() -> None: + """Start the bot.""" + if not BOT_TOKEN: + print("Error: TELEGRAM_BOT_TOKEN environment variable is not set") + return + + # Create application + application = Application.builder().token(BOT_TOKEN).build() + + # Add conversation handler for password protection + conv_handler = ConversationHandler( + entry_points=[CommandHandler("start", start)], + states={ + States.WAITING_FOR_PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, check_password)], + }, + fallbacks=[CommandHandler("start", start)], + ) + + # Add delete conversation handler + delete_conv_handler = ConversationHandler( + entry_points=[CommandHandler("delete", delete_file_command)], + states={ + States.WAITING_FOR_FILENAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_delete)], + }, + fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)], + ) + + # Add download conversation handler + download_conv_handler = ConversationHandler( + entry_points=[CommandHandler("download", download)], + states={ + States.WAITING_FOR_FILENAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_download_request)], + }, + fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)], + ) + + # Add handlers + application.add_handler(conv_handler) + application.add_handler(delete_conv_handler) + application.add_handler(download_conv_handler) + application.add_handler(CommandHandler("help", help_command)) + application.add_handler(CommandHandler("list", list_files)) + application.add_handler(CommandHandler("stats", stats)) + application.add_handler(CommandHandler("cleanup", cleanup)) + application.add_handler(CommandHandler("upload", upload)) + application.add_handler(MessageHandler(filters.Document.ALL, handle_file_upload)) + + # Start the bot + print(f"🤖 Bot started at {get_current_time()}") + print(f"👤 Admin username: {ADMIN_USERNAME}") + print(f"📁 Storage directory: {os.path.abspath(STORAGE_DIR)}") + application.run_polling() + +if __name__ == "__main__": + main()