157 lines
7.2 KiB
Python
157 lines
7.2 KiB
Python
# Moon-Userbot - telegram userbot
|
|
# Copyright (C) 2020-present Moon Userbot Organization
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# TODO: Add ability to kill session by hash
|
|
|
|
import time
|
|
from datetime import datetime
|
|
from html import escape
|
|
|
|
from pyrogram import Client, filters
|
|
from pyrogram import ContinuePropagation
|
|
from pyrogram.errors import RPCError
|
|
from pyrogram.raw.functions.account import GetAuthorizations, ResetAuthorization
|
|
from pyrogram.raw.types import UpdateServiceNotification
|
|
from pyrogram.types import Message
|
|
|
|
from utils.db import db
|
|
from utils.misc import modules_help, prefix
|
|
|
|
auth_hashes = db.get("core.sessionkiller", "auths_hashes", [])
|
|
|
|
|
|
@Client.on_message(filters.command(["sessions"], prefix) & filters.me)
|
|
async def sessions_list(client: Client, message: Message):
|
|
formatted_sessions = []
|
|
sessions = (await client.invoke(GetAuthorizations())).authorizations
|
|
for num, session in enumerate(sessions, 1):
|
|
formatted_sessions.append(
|
|
f"<b>{num}</b>. <b>{escape(session.device_model)}</b> on <code>{escape(session.platform if session.platform!= '' else 'unknown platform')}</code>\n"
|
|
f"<b>Hash:</b> <code>{session.hash}</code>\n"
|
|
f"<b>App name:</b> <code>{escape(session.app_name)}</code> <code>v.{escape(session.app_version if session.app_version!= '' else 'unknown')}</code>\n"
|
|
f"<b>Created (last activity):</b> {datetime.fromtimestamp(session.date_created).isoformat()} ({datetime.fromtimestamp(session.date_active).isoformat()})\n"
|
|
f"<b>IP and location: </b>: <code>{session.ip}</code> (<i>{session.country}</i>)\n"
|
|
f"<b>Official status:</b> <code>{'✅' if session.official_app else '❌️'}</code>\n"
|
|
f"<b>2FA accepted:</b> <code>{'❌️️' if session.password_pending else '✅'}</code>\n"
|
|
f"<b>Can accept calls / secret chats:</b> {'❌️️' if session.call_requests_disabled else '✅'} / {'❌️️' if session.encrypted_requests_disabled else '✅'}"
|
|
)
|
|
answer = "<b>Active sessions at your account:</b>\n\n"
|
|
chunk = []
|
|
for s in formatted_sessions:
|
|
chunk.append(s)
|
|
if len(chunk) == 5:
|
|
answer += "\n\n".join(chunk)
|
|
await message.reply(answer)
|
|
answer = ""
|
|
chunk.clear()
|
|
if chunk:
|
|
await message.reply("\n\n".join(chunk))
|
|
await message.delete()
|
|
|
|
|
|
@Client.on_message(filters.command(["sessionkiller", "sk"], prefix) & filters.me)
|
|
async def sessionkiller(client: Client, message: Message):
|
|
if len(message.command) == 1:
|
|
if db.get("core.sessionkiller", "enabled", False):
|
|
await message.edit(
|
|
"<b>Sessionkiller status: enabled\n"
|
|
f"You can disable it with <code>{prefix}sessionkiller disable</code></b>"
|
|
)
|
|
else:
|
|
await message.edit(
|
|
"<b>Sessionkiller status: disabled\n"
|
|
f"You can enable it with <code>{prefix}sessionkiller enable</code></b>"
|
|
)
|
|
elif message.command[1] in ["enable", "on", "1", "yes", "true"]:
|
|
db.set("core.sessionkiller", "enabled", True)
|
|
await message.edit("<b>Sessionkiller enabled!</b>")
|
|
db.set(
|
|
"core.sessionkiller",
|
|
"auths_hashes",
|
|
[
|
|
auth.hash
|
|
for auth in (await client.invoke(GetAuthorizations())).authorizations
|
|
],
|
|
)
|
|
|
|
elif message.command[1] in ["disable", "off", "0", "no", "false"]:
|
|
db.set("core.sessionkiller", "enabled", False)
|
|
await message.edit("<b>Sessionkiller disabled!</b>")
|
|
else:
|
|
await message.edit(f"<b>Usage: {prefix}sessionkiller [enable|disable]</b>")
|
|
|
|
|
|
@Client.on_raw_update()
|
|
async def check_new_login(client: Client, update: UpdateServiceNotification, _, __):
|
|
if not isinstance(update, UpdateServiceNotification) or not update.type.startswith(
|
|
"auth"
|
|
):
|
|
raise ContinuePropagation
|
|
if not db.get("core.sessionkiller", "enabled", False):
|
|
raise ContinuePropagation
|
|
authorizations = (await client.invoke(GetAuthorizations()))["authorizations"]
|
|
for auth in authorizations:
|
|
if auth.current:
|
|
continue
|
|
if auth["hash"] not in auth_hashes:
|
|
# found new unexpected login
|
|
try:
|
|
await client.invoke(ResetAuthorization(hash=auth.hash))
|
|
except RPCError:
|
|
info_text = (
|
|
"Someone tried to log in to your account. You can see this report because you"
|
|
"turned on this feature. But I couldn't terminate attacker's session and "
|
|
"⚠ <b>you must reset it manually</b>. You should change your 2FA password "
|
|
"(if enabled), or set it.\n"
|
|
)
|
|
else:
|
|
info_text = (
|
|
"Someone tried to log in to your account. Since you have enabled "
|
|
"this feature, I deleted the attacker's session from your account. "
|
|
"You should change your 2FA password (if enabled), or set it.\n"
|
|
)
|
|
logined_time = datetime.utcfromtimestamp(auth.date_created).strftime(
|
|
"%d-%m-%Y %H-%M-%S UTC"
|
|
)
|
|
full_report = (
|
|
"<b>!!! ACTION REQUIRED !!!</b>\n"
|
|
+ info_text
|
|
+ "Below is the information about the attacker that I got.\n\n"
|
|
f"Unique authorization hash: <code>{auth.hash}</code> (not valid anymore)\n"
|
|
f"Device model: <code>{escape(auth.device_model)}</code>\n"
|
|
f"Platform: <code>{escape(auth.platform)}</code>\n"
|
|
f"API ID: <code>{auth.api_id}</code>\n"
|
|
f"App name: <code>{escape(auth.app_name)}</code>\n"
|
|
f"App version: <code>{auth.app_version}</code>\n"
|
|
f"Logined at: <code>{logined_time}</code>\n"
|
|
f"IP: <code>{auth.ip}</code>\n"
|
|
f"Country: <code>{auth.country}</code>\n"
|
|
f'Official app: <b>{"yes" if auth.official_app else "no"}</b>\n\n'
|
|
f"<b>It is you? Type <code>{prefix}sk off</code> and try logging "
|
|
f"in again.</b>"
|
|
)
|
|
# schedule sending report message so user will get notification
|
|
schedule_date = int(time.time() + 15)
|
|
await client.send_message("me", full_report, schedule_date=schedule_date)
|
|
return
|
|
|
|
|
|
modules_help["sessions"] = {
|
|
"sessionkiller [enable|disable]": "When enabled, every new session will be terminated.\n"
|
|
"Useful for additional protection for your account",
|
|
"sessions": "List all sessions on your account",
|
|
}
|