Initial commit

This commit is contained in:
sharath3589
2025-10-18 17:23:35 +05:30
commit 7d3bae1f69
13 changed files with 1121 additions and 0 deletions

32
.dockerignore Normal file
View File

@@ -0,0 +1,32 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
.venv/
venv/
ENV/
# Database
*.db
*.db-journal
# Environment variables
.env
# IDEs
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Logs
*.log
# Docker
data/

6
.env.example Normal file
View File

@@ -0,0 +1,6 @@
# Environment variables for the bot
# Copy this file to .env and fill in your actual values
TELEGRAM_BOT_TOKEN=your_bot_token_here
OWNER_ID=your_telegram_user_id_here
CHANNEL_ID=@your_channel_or_id_here

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.venv
__pycache__/
*.pyc
memes.db
.env

292
DOCKER_DEPLOY.md Normal file
View File

@@ -0,0 +1,292 @@
# Memebot Docker Deployment Guide
This guide covers how to run your Telegram memebot using Docker.
## Prerequisites
- Docker installed on your system
- Docker Compose installed (usually comes with Docker Desktop)
- Your bot token, owner ID, and channel ID
### Installing Docker
**On macOS:**
```bash
brew install --cask docker
# Or download Docker Desktop from https://www.docker.com/products/docker-desktop
```
**On Linux (Ubuntu/Debian):**
```bash
# Install Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
# Install Docker Compose
sudo apt install docker-compose-plugin -y
# Add your user to docker group (to run without sudo)
sudo usermod -aG docker $USER
# Log out and back in for this to take effect
```
## Quick Start
### 1. Set Up Environment Variables
Copy the example file and edit it:
```bash
cp .env.example .env
nano .env # or use any text editor
```
Fill in your actual values:
```
TELEGRAM_BOT_TOKEN=123456789:ABCdefGHIjklMNOpqrsTUVwxyz
OWNER_ID=987654321
CHANNEL_ID=@yourchannel
```
### 2. Build and Run with Docker Compose (Easiest)
```bash
# Build and start in background
docker-compose up -d
# View logs
docker-compose logs -f
# Stop the bot
docker-compose down
# Restart the bot
docker-compose restart
```
That's it! Your bot is now running in Docker! 🎉
## Alternative: Using Docker Commands Directly
### Build the Image
```bash
docker build -t memebot .
```
### Run the Container
```bash
# Create data directory for database persistence
mkdir -p ./data
# Run the bot (replace with your actual values)
docker run -d \
--name memebot \
--restart unless-stopped \
-e TELEGRAM_BOT_TOKEN="your_token_here" \
-e OWNER_ID="your_id_here" \
-e CHANNEL_ID="@your_channel" \
-v $(pwd)/data:/app/data \
memebot
```
### Manage the Container
```bash
# View logs
docker logs -f memebot
# Stop the bot
docker stop memebot
# Start the bot
docker start memebot
# Restart the bot
docker restart memebot
# Remove the container
docker rm -f memebot
# View container status
docker ps -a
```
## Deploying to a Remote Server
### Option 1: Copy Files and Build on Server
```bash
# From your Mac, copy files to server
scp -i /path/to/ssh_key -r \
/Users/hyperterminal/myspace/memebot \
username@server_ip:~/
# SSH into server
ssh -i /path/to/ssh_key username@server_ip
# Navigate to bot directory
cd ~/memebot
# Create .env file
cp .env.example .env
nano .env # Fill in your credentials
# Build and run
docker-compose up -d
# Check logs
docker-compose logs -f
```
### Option 2: Build Locally and Push to Registry
```bash
# Tag the image
docker tag memebot yourusername/memebot:latest
# Push to Docker Hub (requires docker login)
docker push yourusername/memebot:latest
# On the server, pull and run
docker pull yourusername/memebot:latest
docker run -d \
--name memebot \
--restart unless-stopped \
-e TELEGRAM_BOT_TOKEN="your_token" \
-e OWNER_ID="your_id" \
-e CHANNEL_ID="@channel" \
-v ~/memebot-data:/app/data \
yourusername/memebot:latest
```
## Updating the Bot
### If using Docker Compose:
```bash
# Make your code changes, then:
docker-compose down
docker-compose build
docker-compose up -d
```
### If using Docker commands:
```bash
# Stop and remove old container
docker stop memebot
docker rm memebot
# Rebuild image
docker build -t memebot .
# Run new container
docker run -d \
--name memebot \
--restart unless-stopped \
-e TELEGRAM_BOT_TOKEN="your_token" \
-e OWNER_ID="your_id" \
-e CHANNEL_ID="@channel" \
-v $(pwd)/data:/app/data \
memebot
```
## Data Persistence
The database file (`memes.db`) is stored in the `./data` directory on your host machine, which is mounted as a volume in the container. This means:
- ✅ Your scheduled memes persist even if you stop/restart the container
- ✅ You can backup the database by copying the `./data` folder
- ✅ You can inspect the database from your host machine
## Troubleshooting
### Check if container is running:
```bash
docker ps
```
### View logs:
```bash
# Docker Compose
docker-compose logs -f
# Docker command
docker logs -f memebot
```
### Access container shell:
```bash
# Docker Compose
docker-compose exec memebot /bin/bash
# Docker command
docker exec -it memebot /bin/bash
```
### Container keeps restarting:
```bash
# Check logs for errors
docker logs memebot
# Common issues:
# 1. Missing environment variables
# 2. Invalid bot token
# 3. Database permission issues
```
### Remove everything and start fresh:
```bash
docker-compose down -v # Removes containers and volumes
docker system prune -a # Clean up Docker system (optional)
```
## Benefits of Docker
-**Consistent environment**: Works the same everywhere
-**Easy deployment**: Just copy files and run
-**Isolation**: Doesn't interfere with system Python
-**Easy updates**: Just rebuild and restart
-**Portability**: Move between servers easily
-**Auto-restart**: Container restarts automatically if it crashes
## Docker on Friend's Server
When your friend gives you SSH access:
```bash
# 1. Connect to server
ssh -i /path/to/key username@server_ip
# 2. Install Docker (if not installed)
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo apt install docker-compose-plugin -y
# 3. Create bot directory
mkdir -p ~/memebot
cd ~/memebot
# 4. Exit SSH and upload files from your Mac
exit
scp -i /path/to/key -r \
/Users/hyperterminal/myspace/memebot/* \
username@server_ip:~/memebot/
# 5. SSH back and run
ssh -i /path/to/key username@server_ip
cd ~/memebot
# Create .env file
nano .env # Add your credentials
# Run with Docker Compose
docker-compose up -d
# Check it's working
docker-compose logs -f
```
Done! Your bot runs 24/7 automatically! 🚀

30
Dockerfile Normal file
View File

@@ -0,0 +1,30 @@
# Use Python 3.12 slim image
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install system dependencies (if needed)
RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first (for better caching)
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the bot code
COPY bot.py .
# Create directory for database
RUN mkdir -p /app/data
# Set environment variables (will be overridden by docker-compose or run command)
ENV TELEGRAM_BOT_TOKEN=""
ENV OWNER_ID=""
ENV CHANNEL_ID=""
ENV MEMEBOT_DB="/app/data/memes.db"
# Run the bot
CMD ["python", "bot.py"]

38
README.md Normal file
View File

@@ -0,0 +1,38 @@
# Memebot
Telegram bot that accepts memes (photos, GIF animations, videos) from the owner's private messages and schedules them into a channel at the next available slot among 11:00, 16:00, 21:00.
Setup
1. Create a virtualenv and install dependencies:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
2. Set environment variables:
```
export TELEGRAM_BOT_TOKEN=123:ABC
export OWNER_ID=123456789
export CHANNEL_ID=@yourchannel or -1001234567890
```
3. Run the bot:
```bash
python bot.py
```
How it works
- Owner sends a photo/video/animation in the bot's DM.
- Bot stores the Telegram file_id and schedules it for the next available slot: 11:00, 16:00, 21:00. If there's an existing scheduled meme, new ones are scheduled after the last one using the same cycle.
- A background task posts due memes into the configured channel.
Notes
- Times are computed using server local time. Stored timestamps are Unix timestamps.
- Make sure the bot is admin in the channel to post messages.

552
bot.py Normal file
View File

@@ -0,0 +1,552 @@
posting_log = [] # in-memory log
import asyncio
import os
import logging
from datetime import datetime, time, timedelta
import aiosqlite
from typing import Optional
import io
from telegram import Update, Message, InputFile
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DB_PATH = os.environ.get("MEMEBOT_DB", "memes.db")
BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
OWNER_ID = int(os.environ.get("OWNER_ID", "0"))
CHANNEL_ID = os.environ.get("CHANNEL_ID") # @channelusername or -100<id>
SLOTS = [time(11, 0), time(16, 0), time(21, 0)]
async def init_db():
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""
CREATE TABLE IF NOT EXISTS memes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
owner_file_id TEXT NOT NULL,
mime_type TEXT,
scheduled_ts INTEGER NOT NULL,
posted INTEGER DEFAULT 0,
created_ts INTEGER NOT NULL
)
"""
)
# Ensure preview_file_id column exists (migration)
async with db.execute("PRAGMA table_info(memes)") as cur:
cols = await cur.fetchall()
col_names = [c[1] for c in cols]
if 'preview_file_id' not in col_names:
await db.execute("ALTER TABLE memes ADD COLUMN preview_file_id TEXT")
await db.commit()
async def compute_next_slot(after_dt: Optional[datetime] = None) -> datetime:
"""Return the next slot datetime from after_dt (exclusive). If after_dt is None, use now()."""
if after_dt is None:
after_dt = datetime.now()
# check same-day slots
today = after_dt.date()
for slot in SLOTS:
candidate = datetime.combine(today, slot)
if candidate > after_dt:
return candidate
# otherwise next day's first slot
next_day = today + timedelta(days=1)
return datetime.combine(next_day, SLOTS[0])
async def get_last_scheduled_ts(db) -> Optional[int]:
async with db.execute("SELECT scheduled_ts FROM memes WHERE posted=0 ORDER BY scheduled_ts DESC LIMIT 1") as cur:
row = await cur.fetchone()
return row[0] if row else None
async def schedule_meme(owner_file_id: str, mime_type: str) -> datetime:
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
# Always schedule after the latest scheduled meme, even if it's far in the future
last_ts = await get_last_scheduled_ts(db)
if last_ts is None:
# no pending memes, schedule relative to now
ref_dt = datetime.now()
else:
ref_dt = datetime.fromtimestamp(last_ts)
next_dt = await compute_next_slot(ref_dt)
# Try to get a preview file_id (for photo: smallest size, for video: thumbnail, for animation: itself)
preview_file_id = None
# context is not available here, so preview is best-effort: use owner_file_id for now
preview_file_id = owner_file_id
await db.execute(
"INSERT INTO memes (owner_file_id, mime_type, scheduled_ts, created_ts, preview_file_id) VALUES (?, ?, ?, ?, ?)",
(owner_file_id, mime_type, int(next_dt.timestamp()), int(datetime.now().timestamp()), preview_file_id),
)
await db.commit()
return next_dt
async def pop_due_memes_and_post(context: ContextTypes.DEFAULT_TYPE):
now_ts = int(datetime.now().timestamp())
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
async with db.execute("SELECT id, owner_file_id, mime_type FROM memes WHERE posted=0 AND scheduled_ts<=? ORDER BY scheduled_ts ASC", (now_ts,)) as cur:
rows = await cur.fetchall()
for row in rows:
mid, file_id, mime = row
try:
sent = False
# Try video first when appropriate
if mime and mime.startswith("video"):
try:
await context.bot.send_video(CHANNEL_ID, file_id)
sent = True
except Exception as e_video:
logger.warning("send_video failed for id=%s: %s", mid, e_video)
if not sent:
# try as photo/animation
try:
await context.bot.send_photo(CHANNEL_ID, file_id)
sent = True
except Exception as e_photo:
logger.warning("send_photo failed for id=%s: %s", mid, e_photo)
# fallback to sending as document
try:
await context.bot.send_document(CHANNEL_ID, file_id)
sent = True
except Exception as e_doc:
logger.warning("send_document failed for id=%s: %s", mid, e_doc)
# raise the last exception to be caught below
raise e_doc
if sent:
await db.execute("UPDATE memes SET posted=1 WHERE id=?", (mid,))
await db.commit()
logger.info("Posted meme id=%s", mid)
posting_log.append(f"[SUCCESS] Posted meme id={mid} at {datetime.now().isoformat(sep=' ')}")
if len(posting_log) > 100:
posting_log.pop(0)
except Exception as e:
logger.exception("Failed to post meme id=%s: %s", mid, e)
posting_log.append(f"[FAIL] Meme id={mid} at {datetime.now().isoformat(sep=' ')}: {type(e).__name__}: {e}")
if len(posting_log) > 100:
posting_log.pop(0)
async def scheduled(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
# Check if preview_file_id column exists to avoid sqlite errors on older DBs
async with db.execute("PRAGMA table_info(memes)") as pcur:
cols = await pcur.fetchall()
col_names = [c[1] for c in cols]
if 'preview_file_id' in col_names:
query = "SELECT id, scheduled_ts, mime_type, preview_file_id FROM memes WHERE posted=0 ORDER BY scheduled_ts ASC"
has_preview = True
else:
query = "SELECT id, scheduled_ts, mime_type FROM memes WHERE posted=0 ORDER BY scheduled_ts ASC"
has_preview = False
async with db.execute(query) as cur:
rows = await cur.fetchall()
if not rows:
await update.message.reply_text("No scheduled memes.")
return
# For each scheduled item, try to send a preview robustly (direct send, then download+reupload)
for row in rows:
if has_preview:
mid, ts, mtype, preview_id = row
else:
mid, ts, mtype = row
preview_id = None
# Fallback: if preview_id is missing/null, use owner_file_id
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
async with db.execute("SELECT owner_file_id FROM memes WHERE id=?", (mid,)) as cur:
owner_row = await cur.fetchone()
owner_file_id = owner_row[0] if owner_row else None
file_id = preview_id if preview_id else owner_file_id
caption = f"ID: {mid}, Time: {datetime.fromtimestamp(ts).isoformat(sep=' ')}, Type: {mtype}"
sent = False
# Try direct sends with fallbacks
try:
if mtype and mtype.startswith('video'):
try:
if file_id:
await context.bot.send_video(update.effective_chat.id, file_id, caption=caption)
sent = True
except Exception as e: # direct video failed
logger.debug("scheduled: direct send_video failed for id=%s: %s", mid, e)
if not sent and file_id:
try:
await context.bot.send_photo(update.effective_chat.id, file_id, caption=caption)
sent = True
except Exception as e:
logger.debug("scheduled: direct send_photo failed for id=%s: %s", mid, e)
try:
await context.bot.send_document(update.effective_chat.id, file_id, caption=caption)
sent = True
except Exception as e2:
logger.debug("scheduled: direct send_document failed for id=%s: %s", mid, e2)
if not sent and file_id:
# Attempt download + reupload
try:
file = await context.bot.get_file(file_id)
bio = io.BytesIO()
await file.download(out=bio)
bio.seek(0)
if mtype and mtype.startswith('video'):
await context.bot.send_video(update.effective_chat.id, InputFile(bio, filename=f"meme_{mid}.mp4"), caption=caption)
else:
try:
await context.bot.send_photo(update.effective_chat.id, InputFile(bio, filename=f"meme_{mid}.jpg"), caption=caption)
except Exception:
bio.seek(0)
await context.bot.send_document(update.effective_chat.id, InputFile(bio, filename=f"meme_{mid}"), caption=caption)
sent = True
except Exception as e:
logger.debug("scheduled: download+reupload failed for id=%s: %s", mid, e)
except Exception as e:
logger.exception("Unexpected error while previewing scheduled id=%s: %s", mid, e)
if not sent:
# If all attempts fail, send a text placeholder
await update.message.reply_text(caption)
async def unschedule(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
if not context.args or not all(arg.isdigit() for arg in context.args):
await update.message.reply_text("Usage: /unschedule <id1> <id2> ...")
return
meme_ids = [int(arg) for arg in context.args]
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
for meme_id in meme_ids:
await db.execute("DELETE FROM memes WHERE id=? AND posted=0", (meme_id,))
await db.commit()
await update.message.reply_text(f"Unscheduled memes with IDs: {', '.join(str(mid) for mid in meme_ids)} (if they existed and were not posted yet).")
async def preview(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Preview a scheduled meme by id. Tries direct send, then downloads and reuploads as a document if needed."""
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
if not context.args or not context.args[0].isdigit():
await update.message.reply_text("Usage: /preview <id>")
return
meme_id = int(context.args[0])
# immediate ack so owner knows the command was received
try:
await update.message.reply_text(f"Previewing meme {meme_id}...")
except Exception:
logger.debug("Could not send ack reply for preview %s", meme_id)
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
async with db.execute("SELECT owner_file_id, mime_type FROM memes WHERE id=?", (meme_id,)) as cur:
row = await cur.fetchone()
if not row:
await update.message.reply_text(f"No meme found with ID {meme_id}.")
return
file_id, mime = row
chat_id = update.effective_chat.id
# Try direct sends with fallbacks
try:
if mime and mime.startswith("video"):
await context.bot.send_video(chat_id, file_id, caption=f"Preview ID {meme_id}")
return
try:
await context.bot.send_photo(chat_id, file_id, caption=f"Preview ID {meme_id}")
return
except Exception as e_photo:
logger.debug("Direct send_photo failed for preview id=%s: %s", meme_id, e_photo)
# try send_document quick fallback
try:
await context.bot.send_document(chat_id, file_id, caption=f"Preview ID {meme_id}")
return
except Exception as e_doc:
logger.debug("Direct send_document failed for preview id=%s: %s", meme_id, e_doc)
# If direct fails, download and reupload
file = await context.bot.get_file(file_id)
bio = io.BytesIO()
await file.download(out=bio)
bio.seek(0)
# pick send method based on mime
if mime and mime.startswith("video"):
await context.bot.send_video(chat_id, InputFile(bio, filename=f"meme_{meme_id}.mp4"), caption=f"Preview ID {meme_id}")
else:
# try as photo first, then document
try:
await context.bot.send_photo(chat_id, InputFile(bio, filename=f"meme_{meme_id}.jpg"), caption=f"Preview ID {meme_id}")
except Exception:
bio.seek(0)
await context.bot.send_document(chat_id, InputFile(bio, filename=f"meme_{meme_id}"), caption=f"Preview ID {meme_id}")
except Exception as e:
logger.exception("Preview failed for id=%s: %s", meme_id, e)
await update.message.reply_text(f"Failed to preview meme {meme_id}: {type(e).__name__}: {e}")
async def logcmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
if not posting_log:
await update.message.reply_text("No posting events yet.")
return
await update.message.reply_text("Last posting events:\n" + "\n".join(posting_log[-10:]))
async def periodic_poster(application):
while True:
try:
await pop_due_memes_and_post(application)
except Exception:
logger.exception("Error in poster loop")
await asyncio.sleep(30)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Hi! I schedule memes to the configured channel.")
async def helpcmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a detailed help message with all commands."""
help_text = (
"""
<b>🤖 <u>Memebot Command Reference</u> 🤖</b>
<b>General:</b>
<b>/start</b> — Show a welcome message.
<b>/help</b> — Show this help message.
<b>Scheduling Memes:</b>
<b>Send a photo/video/animation</b> (as a DM to the bot):
Schedules it for the next available slot (11:00, 16:00, 21:00 IST).
<i>Example:</i> Send a meme to the bot in DM.
<b>/scheduled</b> — List all scheduled memes with previews and their IDs, times, and types.
<b>/unschedule &lt;id1&gt; [&lt;id2&gt; ...]</b> — Remove one or more memes from the schedule (by ID).
<i>Example:</i> <code>/unschedule 3 5 7</code>
<b>/postnow [id]</b> — Immediately post the next scheduled meme, or a specific meme by ID.
<i>Example:</i> <code>/postnow</code> or <code>/postnow 6</code>
<b>/preview &lt;id&gt;</b> — Preview a scheduled meme by its ID.
<i>Example:</i> <code>/preview 4</code>
<b>/log</b> — Show the last 10 posting events (success/failure log).
<b>Advanced Scheduling:</b>
<b>/scheduleat id: &lt;id&gt; &lt;HH:MM&gt;</b> — Reschedule a single meme to a specific time (24h, IST).
<i>Example:</i> <code>/scheduleat id: 6 16:20</code>
<b>/scheduleat ids: &lt;start&gt;-&lt;end&gt; &lt;YYYY-MM-DD&gt;</b> — Reschedule a range of memes to a date, assigning slots (11:00, 16:00, 21:00 IST) in order.
<i>Example:</i> <code>/scheduleat ids: 5-10 2025-10-19</code>
<b>Notes:</b>
• <b>Only the owner</b> (set by OWNER_ID) can use admin commands.
• All times are in <b>IST (Asia/Kolkata)</b>.
• Meme IDs are shown in <b>/scheduled</b> previews.
• Use <b>/preview</b> to check a meme before posting.
<b>✨ Enjoy effortless meme scheduling! ✨</b>
"""
)
await update.message.reply_text(help_text, parse_mode="HTML", disable_web_page_preview=True)
async def handle_media(update: Update, context: ContextTypes.DEFAULT_TYPE):
msg: Message = update.message
user_id = msg.from_user.id
if user_id != OWNER_ID:
await msg.reply_text("Sorry, only the owner can send memes to schedule.")
return
# Determine the best file id and mime
file_id = None
mime = None
if msg.photo:
# highest resolution
file = msg.photo[-1]
file_id = file.file_id
mime = 'image'
elif msg.video:
file_id = msg.video.file_id
mime = 'video'
elif msg.animation:
file_id = msg.animation.file_id
mime = 'image' # gifs treated as image
else:
await msg.reply_text("Please send a photo, animation (GIF) or video.")
return
scheduled_dt = await schedule_meme(file_id, mime)
# Convert to IST (Asia/Kolkata)
try:
import pytz
ist = pytz.timezone('Asia/Kolkata')
scheduled_dt_ist = scheduled_dt.astimezone(ist)
except Exception:
# fallback: add 5:30 manually if pytz not available
scheduled_dt_ist = scheduled_dt + timedelta(hours=5, minutes=30)
await msg.reply_text(f"Scheduled for: {scheduled_dt_ist.strftime('%Y-%m-%d %H:%M:%S')} IST")
async def postnow(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
# If an ID is provided, post that meme; else, post the next scheduled meme
meme_id = None
if context.args and context.args[0].isdigit():
meme_id = int(context.args[0])
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
if meme_id is not None:
async with db.execute("SELECT id, owner_file_id, mime_type FROM memes WHERE posted=0 AND id=?", (meme_id,)) as cur:
row = await cur.fetchone()
if not row:
await update.message.reply_text(f"No scheduled meme with ID {meme_id} to post.")
return
else:
async with db.execute("SELECT id, owner_file_id, mime_type FROM memes WHERE posted=0 ORDER BY scheduled_ts ASC LIMIT 1") as cur:
row = await cur.fetchone()
if not row:
await update.message.reply_text("No scheduled memes to post.")
return
mid, file_id, mime = row
try:
if mime and mime.startswith("video"):
await context.bot.send_video(CHANNEL_ID, file_id)
else:
await context.bot.send_photo(CHANNEL_ID, file_id)
await db.execute("UPDATE memes SET posted=1 WHERE id=?", (mid,))
await db.commit()
await update.message.reply_text(f"Posted meme with ID {mid} to channel.")
except Exception as e:
await update.message.reply_text(f"Failed to post meme: {e}")
import re
async def scheduleat(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id != OWNER_ID:
await update.message.reply_text("Only the owner can use this command.")
return
if not context.args or len(context.args) < 2:
await update.message.reply_text("Usage: /scheduleat id: <id> <HH:MM> or /scheduleat ids: <start>-<end> <YYYY-MM-DD>")
return
argstr = ' '.join(context.args)
# Single ID mode: /scheduleat id: 6 16:20
m_single = re.match(r'id:\s*(\d+)\s+(\d{2}):(\d{2})$', argstr)
# Range mode: /scheduleat ids: 5-10 2025-10-19
m_range = re.match(r'ids:\s*(\d+)-(\d+)\s+(\d{4}-\d{2}-\d{2})$', argstr)
if m_single:
meme_id = int(m_single.group(1))
hour = int(m_single.group(2))
minute = int(m_single.group(3))
# Validate time
if not (0 <= hour < 24 and 0 <= minute < 60):
await update.message.reply_text("Invalid time format. Use 24h HH:MM.")
return
# Schedule meme at specified time today (IST)
from datetime import datetime, timedelta
try:
import pytz
ist = pytz.timezone('Asia/Kolkata')
now_ist = datetime.now(ist)
sched_dt = ist.localize(datetime(now_ist.year, now_ist.month, now_ist.day, hour, minute))
except Exception:
now = datetime.now()
sched_dt = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(hours=5, minutes=30)
sched_ts = int(sched_dt.timestamp())
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
await db.execute("UPDATE memes SET scheduled_ts=? WHERE id=? AND posted=0", (sched_ts, meme_id))
await db.commit()
await update.message.reply_text(f"Rescheduled meme ID {meme_id} for {sched_dt.strftime('%Y-%m-%d %H:%M')} IST.")
return
elif m_range:
start_id = int(m_range.group(1))
end_id = int(m_range.group(2))
date_str = m_range.group(3)
from datetime import datetime, timedelta, time as dtime
try:
import pytz
ist = pytz.timezone('Asia/Kolkata')
base_date = ist.localize(datetime.strptime(date_str, '%Y-%m-%d'))
except Exception:
base_date = datetime.strptime(date_str, '%Y-%m-%d') + timedelta(hours=5, minutes=30)
# Assign slots in order: 11:00, 16:00, 21:00, repeat
slot_times = [dtime(11,0), dtime(16,0), dtime(21,0)]
ids = list(range(start_id, end_id+1))
updates = []
for idx, meme_id in enumerate(ids):
slot = slot_times[idx % len(slot_times)]
sched_dt = base_date.replace(hour=slot.hour, minute=slot.minute, second=0, microsecond=0)
sched_ts = int(sched_dt.timestamp())
updates.append((sched_ts, meme_id))
async with aiosqlite.connect(DB_PATH) as db:
await init_db()
for sched_ts, meme_id in updates:
await db.execute("UPDATE memes SET scheduled_ts=? WHERE id=? AND posted=0", (sched_ts, meme_id))
await db.commit()
await update.message.reply_text(f"Rescheduled memes IDs {start_id}-{end_id} for {date_str} in slots 11:00, 16:00, 21:00 IST (cycled).")
return
else:
await update.message.reply_text("Invalid format. Use /scheduleat id: <id> <HH:MM> or /scheduleat ids: <start>-<end> <YYYY-MM-DD>")
def main():
if not BOT_TOKEN:
raise SystemExit("Please set TELEGRAM_BOT_TOKEN environment variable")
if not OWNER_ID or OWNER_ID == 0:
raise SystemExit("Please set OWNER_ID environment variable to your Telegram user id")
if not CHANNEL_ID:
raise SystemExit("Please set CHANNEL_ID to target channel (username or id)")
# Initialize DB first
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(init_db())
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler('start', start))
app.add_handler(CommandHandler('help', helpcmd))
app.add_handler(CommandHandler('postnow', postnow))
app.add_handler(CommandHandler('scheduled', scheduled))
app.add_handler(CommandHandler('unschedule', unschedule))
app.add_handler(CommandHandler('preview', preview))
app.add_handler(CommandHandler('log', logcmd))
app.add_handler(CommandHandler('scheduleat', scheduleat))
media_filter = filters.ChatType.PRIVATE & (filters.PHOTO | filters.VIDEO | filters.ANIMATION)
app.add_handler(MessageHandler(media_filter, handle_media))
# run background poster using post_init hook
async def post_init(application):
asyncio.create_task(periodic_poster(application))
app.post_init = post_init
logger.info("Starting bot...")
app.run_polling()
if __name__ == '__main__':
main()

104
deploy.sh Executable file
View File

@@ -0,0 +1,104 @@
#!/bin/bash
# Memebot Docker Deployment Script
# This script helps you deploy the bot to a remote server via SSH
set -e # Exit on error
echo "======================================"
echo " Memebot Docker Deployment Script"
echo "======================================"
echo ""
# Check if SSH key path is provided
if [ -z "$1" ]; then
echo "Usage: ./deploy.sh <ssh_key_path> <username@server_ip>"
echo "Example: ./deploy.sh ~/Downloads/server_key.pem ubuntu@123.45.67.89"
exit 1
fi
SSH_KEY="$1"
SERVER="$2"
if [ -z "$SERVER" ]; then
echo "Error: Please provide username@server_ip"
echo "Example: ./deploy.sh ~/Downloads/server_key.pem ubuntu@123.45.67.89"
exit 1
fi
echo "Step 1: Checking if .env file exists..."
if [ ! -f ".env" ]; then
echo "Error: .env file not found!"
echo "Please create .env file with your credentials:"
echo " cp .env.example .env"
echo " nano .env"
exit 1
fi
echo "✓ .env file found"
echo ""
echo "Step 2: Uploading files to server..."
ssh -i "$SSH_KEY" "$SERVER" "mkdir -p ~/memebot"
scp -i "$SSH_KEY" \
Dockerfile \
docker-compose.yml \
bot.py \
requirements.txt \
.env \
"$SERVER":~/memebot/
echo "✓ Files uploaded"
echo ""
echo "Step 3: Installing Docker on server (if needed)..."
ssh -i "$SSH_KEY" "$SERVER" << 'ENDSSH'
# Check if Docker is installed
if ! command -v docker &> /dev/null; then
echo "Docker not found. Installing..."
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo apt install docker-compose-plugin -y
sudo usermod -aG docker $USER
echo "✓ Docker installed"
else
echo "✓ Docker already installed"
fi
ENDSSH
echo ""
echo "Step 4: Building and starting the bot..."
ssh -i "$SSH_KEY" "$SERVER" << 'ENDSSH'
cd ~/memebot
# Stop existing container if running
if docker ps -a | grep -q memebot; then
echo "Stopping existing container..."
docker-compose down
fi
# Build and start
echo "Building Docker image..."
docker-compose build
echo "Starting bot..."
docker-compose up -d
echo ""
echo "======================================"
echo " Deployment Complete! 🎉"
echo "======================================"
echo ""
echo "Your bot is now running on the server!"
echo ""
echo "Useful commands:"
echo " View logs: ssh -i $SSH_KEY $SERVER 'cd ~/memebot && docker-compose logs -f'"
echo " Stop bot: ssh -i $SSH_KEY $SERVER 'cd ~/memebot && docker-compose down'"
echo " Restart bot: ssh -i $SSH_KEY $SERVER 'cd ~/memebot && docker-compose restart'"
echo ""
ENDSSH
echo ""
echo "Showing bot logs (press Ctrl+C to exit)..."
ssh -i "$SSH_KEY" "$SERVER" "cd ~/memebot && docker-compose logs -f"

18
docker-compose.yml Normal file
View File

@@ -0,0 +1,18 @@
services:
memebot:
build: .
container_name: memebot
restart: unless-stopped
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- OWNER_ID=${OWNER_ID}
- CHANNEL_ID=${CHANNEL_ID}
volumes:
# Persist database between container restarts
- ./data:/app/data
# Optional: Add logging configuration
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"

1
requirements-dev.txt Normal file
View File

@@ -0,0 +1 @@
pytest

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
python-telegram-bot==21.0.1
aiosqlite==0.18.0

16
start_bot.sh Executable file
View File

@@ -0,0 +1,16 @@
#!/bin/bash
# Memebot Startup Script
# Double-click this file to start your bot
cd /Users/hyperterminal/myspace/memebot
source .venv/bin/activate
# Set your credentials here (replace with your actual values)
export TELEGRAM_BOT_TOKEN="8478225179:AAH_38KkQHPEnIcacUzCs8M-VDkBGfxR3UA"
export OWNER_ID="324460662"
export CHANNEL_ID="@meme_galore"
# Start the bot
echo "Starting memebot..."
python3 bot.py

25
tests/test_schedule.py Normal file
View File

@@ -0,0 +1,25 @@
import asyncio
from datetime import datetime, time
import pytest
from bot import compute_next_slot, SLOTS
def test_compute_next_slot_before_first():
dt = datetime(2025, 10, 18, 9, 0)
next_dt = asyncio.get_event_loop().run_until_complete(compute_next_slot(dt))
assert next_dt.time() == SLOTS[0]
def test_compute_next_slot_between_slots():
dt = datetime(2025, 10, 18, 12, 0)
next_dt = asyncio.get_event_loop().run_until_complete(compute_next_slot(dt))
assert next_dt.time() == SLOTS[1]
def test_compute_next_slot_after_last():
dt = datetime(2025, 10, 18, 22, 0)
next_dt = asyncio.get_event_loop().run_until_complete(compute_next_slot(dt))
assert next_dt.time() == SLOTS[0]
assert next_dt.date().day == 19