Files
cloud_storage/cloud_bot.py
Wiktor 55b2a3d2ad Create cloud_bot.py
Create cloud_bot.py in the main branch
2025-03-21 00:09:32 +01:00

502 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import shutil
import datetime
import time
import asyncio
from enum import Enum, auto
from typing import Dict, Union
from datetime import datetime, timezone
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
ConversationHandler,
)
# Load environment variables
load_dotenv()
# Get bot token from environment variable
BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Constants
STORAGE_DIR = 'local_storage'
PASSWORD = 'cloudstorage1'
ADMIN_USERNAME = 'overspend1'
ANIMATION_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
UPLOAD_PROGRESS = ["", ""]
# Create storage directory if it doesn't exist
os.makedirs(STORAGE_DIR, exist_ok=True)
# State definitions for conversation
class States(Enum):
WAITING_FOR_PASSWORD = auto()
WAITING_FOR_FILENAME = auto()
WAITING_FOR_NEWNAME = auto()
# Store user sessions and activity
user_sessions: Dict[int, Dict] = {}
def get_current_time() -> str:
"""Get current UTC time in YYYY-MM-DD HH:MM:SS format."""
return datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
def format_progress_bar(progress: float, width: int = 20) -> str:
"""Create a progress bar string."""
filled = int(width * progress / 100)
bar = UPLOAD_PROGRESS[1] * filled + UPLOAD_PROGRESS[0] * (width - filled)
return f"[{bar}] {progress:.1f}%"
async def animate_progress(message, text: str, total_time: int = 3):
"""Animate a progress message."""
start_time = time.time()
while time.time() - start_time < total_time:
for frame in ANIMATION_FRAMES:
progress = min(100, ((time.time() - start_time) / total_time) * 100)
progress_bar = format_progress_bar(progress)
try:
await message.edit_text(f"{text}\n{frame} {progress_bar}")
await asyncio.sleep(0.1)
except:
pass
async def log_activity(user_id: int, action: str) -> None:
"""Log user activity with timestamp."""
timestamp = get_current_time()
if user_id in user_sessions:
user_sessions[user_id]['last_activity'] = timestamp
user_sessions[user_id]['activities'].append(f"{timestamp} - {action}")async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_PASSWORD:
"""Start the conversation and ask for password."""
user_id = update.effective_user.id
username = update.effective_user.username
msg = await update.message.reply_text(
f"🌟 Welcome to Cloud Storage Bot!\n"
f"Current time (UTC): {get_current_time()}\n"
f"Your username: {username}"
)
await animate_progress(msg, "Initializing bot...")
await msg.edit_text(
f"👋 Welcome to Cloud Storage Bot!\n"
f"Current time (UTC): {get_current_time()}\n"
f"Your username: {username}\n"
"Please enter the password to continue."
)
user_sessions[user_id] = {
'username': username,
'authenticated': False,
'last_activity': get_current_time(),
'activities': []
}
return States.WAITING_FOR_PASSWORD
async def check_password(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Union[int, States]:
"""Check the provided password."""
user_id = update.effective_user.id
if update.message.text == PASSWORD:
user_sessions[user_id]['authenticated'] = True
await log_activity(user_id, "Authentication successful")
msg = await update.message.reply_text("Verifying password...")
await animate_progress(msg, "Authenticating...")
await msg.edit_text(
f"✅ Password accepted! Use /help to see available commands.\n"
f"Current time (UTC): {get_current_time()}"
)
return ConversationHandler.END
else:
await log_activity(user_id, "Authentication failed")
await update.message.reply_text("❌ Incorrect password. Please try again.")
return States.WAITING_FOR_PASSWORD
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show help message."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return
msg = await update.message.reply_text("Loading help...")
await animate_progress(msg, "Loading commands...")
help_text = f"""
📚 Available commands:
/upload - Upload a file 📤
/download - Download a file 📥
/list - List files 📂
/delete - Delete a file 🗑️
/rename - Rename a file ✏️
/move - Move a file 🚚
/copy - Copy a file 📄
/metadata - Show file info 📝
/cleanup - Remove old files 🧹
/stats - Show statistics 📊
/help - Show this message
Current time (UTC): {get_current_time()}
Your username: {update.effective_user.username}
"""
await log_activity(user_id, "Viewed help")
await msg.edit_text(help_text)
async def upload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /upload command."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return
await log_activity(user_id, "Initiated file upload")
msg = await update.message.reply_text("Preparing upload...")
await animate_progress(msg, "Initializing upload system...")
await msg.edit_text(
f"📤 Please send me any file to upload to cloud storage.\n"
f"Current time (UTC): {get_current_time()}\n"
f"User: {update.effective_user.username}"
)
async def handle_file_upload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle uploaded files."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return
try:
file = update.message.document
if not file:
await update.message.reply_text("❌ Please send a file to upload.")
return
file_name = file.file_name
file_path = os.path.join(STORAGE_DIR, file_name)
# Check if file already exists
if os.path.exists(file_path):
await update.message.reply_text(
f"⚠️ File {file_name} already exists.\n"
f"Please rename the file or delete the existing one.\n"
f"Current time (UTC): {get_current_time()}"
)
return
msg = await update.message.reply_text("Starting upload...")
# Simulate upload progress
progress_steps = [0, 20, 40, 60, 80, 90, 95, 100]
for progress in progress_steps:
progress_bar = format_progress_bar(progress)
await msg.edit_text(
f"📤 Uploading {file_name}...\n"
f"{progress_bar}\n"
f"⏳ Please wait..."
)
await asyncio.sleep(0.5)
new_file = await context.bot.get_file(file.file_id)
await new_file.download_to_drive(custom_path=file_path)
file_size = os.path.getsize(file_path)
await log_activity(user_id, f"Uploaded file: {file_name} ({file_size / 1024:.1f} KB)")
# Final success message
await msg.edit_text(
f"✅ File uploaded successfully!\n"
f"📄 Name: {file_name}\n"
f"📦 Size: {file_size / 1024:.1f} KB\n"
f"⏰ Time: {get_current_time()}\n"
f"👤 Uploaded by: {update.effective_user.username}\n"
f"{format_progress_bar(100)}"
)
except Exception as e:
logger.error(f"Error uploading file: {e}")
await update.message.reply_text(
"❌ Error uploading file. Please try again.\n"
f"Time: {get_current_time()}"
)async def download(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_FILENAME:
"""Ask for file name to download."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return ConversationHandler.END
msg = await update.message.reply_text("Loading file list...")
await animate_progress(msg, "Scanning storage...")
# List available files
files = os.listdir(STORAGE_DIR)
if not files:
await msg.edit_text(
f"📂 Storage is empty. No files to download.\n"
f"Current time (UTC): {get_current_time()}"
)
return ConversationHandler.END
file_list = "Available files:\n"
for file in files:
size = os.path.getsize(os.path.join(STORAGE_DIR, file))
file_list += f"📄 {file} ({size / 1024:.1f} KB)\n"
await msg.edit_text(
f"📥 Enter the name of the file you want to download:\n\n{file_list}\n"
f"Current time (UTC): {get_current_time()}"
)
return States.WAITING_FOR_FILENAME
async def handle_download_request(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handle file download."""
user_id = update.effective_user.id
file_name = update.message.text
file_path = os.path.join(STORAGE_DIR, file_name)
try:
if os.path.exists(file_path):
msg = await update.message.reply_text("Preparing download...")
await animate_progress(msg, "Preparing file...")
file_size = os.path.getsize(file_path)
await msg.edit_text(
f"📤 Sending file: {file_name}\n"
f"📦 Size: {file_size / 1024:.1f} KB\n"
f"⏰ Time: {get_current_time()}\n"
f"{format_progress_bar(50)}"
)
# Send the actual file
with open(file_path, 'rb') as file:
await update.message.reply_document(
document=file,
filename=file_name,
caption=f"✅ File downloaded at {get_current_time()}"
)
await msg.edit_text(
f"✅ Download complete!\n"
f"📄 {file_name}\n"
f"{get_current_time()}\n"
f"{format_progress_bar(100)}"
)
await log_activity(user_id, f"Downloaded file: {file_name}")
else:
await update.message.reply_text(
f"❌ File {file_name} not found.\n"
f"Time: {get_current_time()}"
)
except Exception as e:
logger.error(f"Error downloading file: {e}")
await update.message.reply_text(
"❌ Error downloading file. Please try again.\n"
f"Time: {get_current_time()}"
)
return ConversationHandler.END
async def delete_file_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> States.WAITING_FOR_FILENAME:
"""Ask for file name to delete."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return ConversationHandler.END
msg = await update.message.reply_text("Loading file list...")
await animate_progress(msg, "Scanning storage...")
files = os.listdir(STORAGE_DIR)
if not files:
await msg.edit_text(
f"📂 Storage is empty. No files to delete.\n"
f"Current time (UTC): {get_current_time()}"
)
return ConversationHandler.END
file_list = "Available files:\n"
for file in files:
size = os.path.getsize(os.path.join(STORAGE_DIR, file))
file_list += f"📄 {file} ({size / 1024:.1f} KB)\n"
await msg.edit_text(
f"🗑️ Enter the name of the file to delete:\n\n{file_list}\n"
f"Current time (UTC): {get_current_time()}"
)
return States.WAITING_FOR_FILENAME
async def handle_delete(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handle file deletion."""
user_id = update.effective_user.id
file_name = update.message.text
file_path = os.path.join(STORAGE_DIR, file_name)
msg = await update.message.reply_text("Processing deletion request...")
await animate_progress(msg, "Deleting file...")
try:
if os.path.exists(file_path):
os.remove(file_path)
await log_activity(user_id, f"Deleted file: {file_name}")
await msg.edit_text(
f"✅ File {file_name} deleted successfully.\n"
f"⏰ Time: {get_current_time()}"
)
else:
await msg.edit_text(
f"❌ File {file_name} not found.\n"
f"Time: {get_current_time()}"
)
except Exception as e:
logger.error(f"Error deleting file: {e}")
await msg.edit_text(
"❌ Error deleting file.\n"
f"Time: {get_current_time()}"
)
return ConversationHandler.END
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show storage statistics."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return
msg = await update.message.reply_text("Loading statistics...")
await animate_progress(msg, "Calculating storage usage...")
try:
files = os.listdir(STORAGE_DIR)
total_size = sum(os.path.getsize(os.path.join(STORAGE_DIR, f)) for f in files)
stats_text = (
f"📊 Storage Statistics (as of {get_current_time()}):\n"
f"📁 Total files: {len(files)}\n"
f"💾 Total size: {total_size / 1024 / 1024:.1f} MB\n"
f"👥 Active users: {len(user_sessions)}\n"
f"👤 Current user: {update.effective_user.username}\n"
f"🕒 Last activity: {user_sessions[user_id]['last_activity']}"
)
await log_activity(user_id, "Viewed statistics")
await msg.edit_text(stats_text)
except Exception as e:
logger.error(f"Error getting stats: {e}")
await msg.edit_text(
"❌ Error getting statistics.\n"
f"Time: {get_current_time()}"
)
async def cleanup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Clean up old files (older than 30 days)."""
user_id = update.effective_user.id
if not user_sessions.get(user_id, {}).get('authenticated', False):
await update.message.reply_text("❌ Please use /start to enter password first.")
return
msg = await update.message.reply_text("Starting cleanup...")
await animate_progress(msg, "Scanning for old files...")
try:
now = datetime.now()
deleted = 0
deleted_size = 0
for file in os.listdir(STORAGE_DIR):
file_path = os.path.join(STORAGE_DIR, file)
if os.path.isfile(file_path):
mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if (now - mtime).days > 30:
size = os.path.getsize(file_path)
deleted_size += size
os.remove(file_path)
deleted += 1
await msg.edit_text(
f"🗑️ Removing old file: {file}\n"
f"{format_progress_bar(deleted * 100 / len(os.listdir(STORAGE_DIR)))}"
)
await log_activity(user_id, f"Cleanup: removed {deleted} files ({deleted_size / 1024 / 1024:.1f} MB)")
await msg.edit_text(
f"✅ Cleanup complete!\n"
f"🗑️ Removed {deleted} old files\n"
f"💾 Freed up: {deleted_size / 1024 / 1024:.1f} MB\n"
f"⏰ Time: {get_current_time()}"
)
except Exception as e:
logger.error(f"Error during cleanup: {e}")
await msg.edit_text(
"❌ Error during cleanup.\n"
f"Time: {get_current_time()}"
)
def main() -> None:
"""Start the bot."""
if not BOT_TOKEN:
print("Error: TELEGRAM_BOT_TOKEN environment variable is not set")
return
# Create application
application = Application.builder().token(BOT_TOKEN).build()
# Add conversation handler for password protection
conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", start)],
states={
States.WAITING_FOR_PASSWORD: [MessageHandler(filters.TEXT & ~filters.COMMAND, check_password)],
},
fallbacks=[CommandHandler("start", start)],
)
# Add delete conversation handler
delete_conv_handler = ConversationHandler(
entry_points=[CommandHandler("delete", delete_file_command)],
states={
States.WAITING_FOR_FILENAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_delete)],
},
fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)],
)
# Add download conversation handler
download_conv_handler = ConversationHandler(
entry_points=[CommandHandler("download", download)],
states={
States.WAITING_FOR_FILENAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_download_request)],
},
fallbacks=[CommandHandler("cancel", lambda u, c: ConversationHandler.END)],
)
# Add handlers
application.add_handler(conv_handler)
application.add_handler(delete_conv_handler)
application.add_handler(download_conv_handler)
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("list", list_files))
application.add_handler(CommandHandler("stats", stats))
application.add_handler(CommandHandler("cleanup", cleanup))
application.add_handler(CommandHandler("upload", upload))
application.add_handler(MessageHandler(filters.Document.ALL, handle_file_upload))
# Start the bot
print(f"🤖 Bot started at {get_current_time()}")
print(f"👤 Admin username: {ADMIN_USERNAME}")
print(f"📁 Storage directory: {os.path.abspath(STORAGE_DIR)}")
application.run_polling()
if __name__ == "__main__":
main()