# Moon-Userbot - telegram userbot # Copyright (C) 2020-present Dragon Userbot Organization # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # TODO: Add ability to kill session by hash import time from datetime import datetime from html import escape from pyrogram import Client, filters, enums from pyrogram import ContinuePropagation from pyrogram.errors import RPCError from pyrogram.raw.functions.account import GetAuthorizations, ResetAuthorization from pyrogram.raw.types import UpdateServiceNotification from pyrogram.types import Message from utils.db import db from utils.misc import modules_help, prefix from textwrap import dedent from datetime import datetime auth_hashes = db.get("core.sessionkiller", "auths_hashes", []) @Client.on_message(filters.command(["sessions"], prefix) & filters.me) async def sessions_list(client: Client, message: Message): formatted_sessions = [] sessions = (await client.invoke(GetAuthorizations())).authorizations for num, session in enumerate(sessions, 1): formatted_sessions.append( ( "{num}. {model} on {platform}\n" "Hash: {hash}\n" "App name: {app_name} v.{version}\n" "Created (last activity): {created} ({last_activity})\n" "IP and location: : {ip} ({location})\n" "Official status: {official}\n" "2FA accepted: {password_pending}\n" "Can accept calls / secret chats: {calls} / {secret_chats}" ).format( num=num, model=escape(session.device_model), platform=escape( session.platform if session.platform != "" else "unknown platform" ), hash=session.hash, app_name=escape(session.app_name), version=escape( session.app_version if session.app_version != "" else "unknown" ), created=datetime.fromtimestamp( session.date_created ).isoformat(), last_activity=datetime.fromtimestamp( session.date_active ).isoformat(), ip=session.ip, location=session.country, official="✅" if session.official_app else "❌️", password_pending="❌️️" if session.password_pending else "✅", calls="❌️️" if session.call_requests_disabled else "✅", secret_chats="❌️️" if session.encrypted_requests_disabled else "✅", ) ) answer = "Active sessions at your account:\n\n" chunk = [] for s in formatted_sessions: chunk.append(s) if len(chunk) == 5: answer += "\n\n".join(chunk) await message.reply(answer) answer = "" chunk.clear() if len(chunk): await message.reply("\n\n".join(chunk), parse_mode=enums.ParseMode.HTML) await message.delete() @Client.on_message( filters.command(["sessionkiller", "sk"], prefix) & filters.me ) async def sessionkiller(client: Client, message: Message): if len(message.command) == 1: if db.get("core.sessionkiller", "enabled", False): await message.edit( "Sessionkiller status: enabled\n" f"You can disable it with {prefix}sessionkiller disable", parse_mode=enums.ParseMode.HTML ) else: await message.edit( "Sessionkiller status: disabled\n" f"You can enable it with {prefix}sessionkiller enable", parse_mode=enums.ParseMode.HTML ) elif message.command[1] in ["enable", "on", "1", "yes", "true"]: db.set("core.sessionkiller", "enabled", True) await message.edit("Sessionkiller enabled!", parse_mode=enums.ParseMode.HTML) db.set( "core.sessionkiller", "auths_hashes", [ auth.hash for auth in ( await client.invoke(GetAuthorizations()) ).authorizations ], ) elif message.command[1] in ["disable", "off", "0", "no", "false"]: db.set("core.sessionkiller", "enabled", False) await message.edit("Sessionkiller disabled!", parse_mode=enums.ParseMode.HTML) else: await message.edit( f"Usage: {prefix}sessionkiller [enable|disable]", parse_mode=enums.ParseMode.HTML ) @Client.on_raw_update() async def check_new_login( client: Client, update: UpdateServiceNotification, _, __ ): if not isinstance( update, UpdateServiceNotification ) or not update.type.startswith("auth"): raise ContinuePropagation if not db.get("core.sessionkiller", "enabled", False): raise ContinuePropagation authorizations = (await client.invoke(GetAuthorizations()))[ "authorizations" ] for auth in authorizations: if auth.current: continue if auth["hash"] not in auth_hashes: # found new unexpected login try: await client.invoke(ResetAuthorization(hash=auth.hash)) except RPCError: info_text = ( "Someone tried to log in to your account. You can see this report because you" "turned on this feature. But I couldn't terminate attacker's session and " "⚠ you must reset it manually. You should change your 2FA password " "(if enabled), or set it.\n" ) else: info_text = ( "Someone tried to log in to your account. Since you have enabled " "this feature, I deleted the attacker's session from your account. " "You should change your 2FA password (if enabled), or set it.\n" ) logined_time = datetime.utcfromtimestamp( auth.date_created ).strftime("%d-%m-%Y %H-%M-%S UTC") full_report = ( "!!! ACTION REQUIRED !!!\n" + info_text + "Below is the information about the attacker that I got.\n\n" f"Unique authorization hash: {auth.hash} (not valid anymore)\n" f"Device model: {escape(auth.device_model)}\n" f"Platform: {escape(auth.platform)}\n" f"API ID: {auth.api_id}\n" f"App name: {escape(auth.app_name)}\n" f"App version: {auth.app_version}\n" f"Logined at: {logined_time}\n" f"IP: {auth.ip}\n" f"Country: {auth.country}\n" f'Official app: {"yes" if auth.official_app else "no"}\n\n' f"It is you? Type {prefix}sk off and try logging " f"in again." ) # schedule sending report message so user will get notification schedule_date = int(time.time() + 15) await client.send_message( "me", full_report, schedule_date=schedule_date, parse_mode=enums.ParseMode.HTML ) return modules_help["sessions"] = { "sessionkiller [enable|disable]": "When enabled, every new session will be terminated.\n" "Useful for additional protection for your account", "sessions": "List all sessions on your account", }