Files
overub/core/client.py
2025-12-21 18:35:20 +01:00

254 lines
9.9 KiB
Python

from __future__ import annotations
import asyncio
import subprocess
import sys
import threading
import webbrowser
from pathlib import Path
from typing import Any, Optional
from core.logger import get_logger
logger = get_logger("core.client")
class ClientWrapper:
def __init__(self, api_id: int, api_hash: str, session_name: str) -> None:
self.api_id = api_id
self.api_hash = api_hash
self.session_name = session_name
self.client: Any = None
async def connect(
self,
login_mode: str = "phone",
phone: Optional[str] = None,
qr_path: Optional[str] = None,
qr_open: bool = False,
session_string: Optional[str] = None,
) -> None:
try:
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
from telethon.sessions import StringSession
from telethon.tl import functions, types
except ImportError as exc:
raise RuntimeError("Telethon not installed") from exc
if session_string:
self.client = TelegramClient(StringSession(session_string), self.api_id, self.api_hash)
else:
self.client = TelegramClient(self.session_name, self.api_id, self.api_hash)
if login_mode == "qr":
await self.client.connect()
if await self.client.is_user_authorized():
logger.info("Telethon client already authorized")
return
try:
qr = await self.client.qr_login()
except AttributeError:
logger.warning("QR login not supported by this Telethon version, falling back to phone login")
await self.client.start(phone=phone)
return
self._print_qr(qr.url, qr_path=qr_path, qr_open=qr_open)
try:
await qr.wait()
except SessionPasswordNeededError:
password = await self._prompt("Two-step verification enabled. Enter password: ")
await self.client.sign_in(password=password)
elif login_mode in {"sms", "code", "phone"}:
if not phone:
raise RuntimeError("Phone number required for login")
await self.client.connect()
if await self.client.is_user_authorized():
logger.info("Telethon client already authorized")
return
await self._login_by_code(phone, force_sms=login_mode == "sms", functions=functions, types=types)
else:
await self.client.start(phone=phone)
logger.info("Telethon client connected")
async def attach_handlers(self, app: Any) -> None:
if self.client is None:
return
try:
from telethon import events
except ImportError as exc:
raise RuntimeError("Telethon not installed") from exc
@self.client.on(events.NewMessage)
async def on_new_message(event):
await app.handle_message_event(event)
@self.client.on(events.MessageEdited)
async def on_message_edit(event):
await app.handle_edit_event(event)
@self.client.on(events.MessageDeleted)
async def on_message_delete(event):
await app.handle_delete_event(event)
if hasattr(events, "MessageRead"):
@self.client.on(events.MessageRead)
async def on_message_read(event):
await app.events.emit("on_message_read", event=event)
@self.client.on(events.ChatAction)
async def on_chat_action(event):
await app.events.emit("on_chat_action", event=event)
await app.events.emit("on_chat_update", event=event)
if getattr(event, "user_typing", False):
await app.events.emit("on_typing", event=event)
if getattr(event, "user_recording", False):
await app.events.emit("on_recording", event=event)
if hasattr(events, "CallbackQuery"):
@self.client.on(events.CallbackQuery)
async def on_callback_query(event):
await app.events.emit("on_callback_query", event=event)
if hasattr(events, "InlineQuery"):
@self.client.on(events.InlineQuery)
async def on_inline_query(event):
await app.events.emit("on_inline_query", event=event)
if hasattr(events, "UserUpdate"):
@self.client.on(events.UserUpdate)
async def on_user_update(event):
await app.events.emit("on_user_update", event=event)
if getattr(event, "status", None) is not None:
await app.events.emit("on_status_update", event=event)
if getattr(event, "contact", None) is not None:
await app.events.emit("on_contact_update", event=event)
if hasattr(events, "Disconnected"):
@self.client.on(events.Disconnected)
async def on_disconnect(event):
await app.events.emit("on_disconnect", event=event)
if hasattr(events, "Connected"):
@self.client.on(events.Connected)
async def on_reconnect(event):
await app.events.emit("on_reconnect", event=event)
logger.info("Telethon event handlers attached")
async def disconnect(self) -> None:
if self.client:
await self.client.disconnect()
logger.info("Telethon client disconnected")
async def wait_until_disconnected(self) -> None:
if self.client:
await self.client.disconnected
async def _login_by_code(self, phone: str, force_sms: bool, functions, types) -> None:
sent = await self.client.send_code_request(phone, force_sms=force_sms)
code_type = getattr(sent, "type", None)
logger.info("Login code type: %s", type(code_type).__name__ if code_type else "unknown")
if isinstance(code_type, types.auth.SentCodeTypeSetUpEmailRequired):
logger.warning("Telegram requires login email verification")
try:
sent = await self.client(functions.auth.ResetLoginEmailRequest(phone, sent.phone_code_hash))
except Exception as exc:
logger.error("Email reset failed: %s", exc)
raise RuntimeError(
"Set a recovery email in Telegram (Settings -> Privacy and Security -> Two-Step Verification) "
"then retry login."
) from exc
code_type = getattr(sent, "type", None)
logger.info("Login code type: %s", type(code_type).__name__ if code_type else "unknown")
if isinstance(code_type, types.auth.SentCodeTypeEmailCode):
logger.warning("Check your email for the login code")
code = await self._prompt("Please enter the code you received: ")
try:
await self.client.sign_in(phone=phone, code=code, phone_code_hash=sent.phone_code_hash)
except Exception as exc:
from telethon.errors import SessionPasswordNeededError
if isinstance(exc, SessionPasswordNeededError):
password = await self._prompt("Two-step verification enabled. Enter password: ")
await self.client.sign_in(password=password)
return
raise
def _print_qr(self, url: str, qr_path: Optional[str] = None, qr_open: bool = False) -> None:
logger.info("QR login URL: %s", url)
print("Telegram app -> Settings -> Devices -> Scan QR")
print(f"If you cannot scan, try opening this URL on your phone: {url}")
saved_path = None
if qr_path:
saved_path = self._save_qr_image(url, qr_path)
try:
import qrcode
qr = qrcode.QRCode(border=1)
qr.add_data(url)
qr.make(fit=True)
qr.print_ascii(invert=True)
except Exception:
pass
if qr_open and saved_path:
self._open_qr_window(saved_path)
def _save_qr_image(self, url: str, qr_path: str) -> Optional[Path]:
try:
import qrcode
except Exception:
logger.warning("qrcode package not installed; cannot save QR image")
return None
path = Path(qr_path)
path.parent.mkdir(parents=True, exist_ok=True)
img = qrcode.make(url)
img.save(path)
logger.info("QR image saved to %s", path)
return path
def _open_qr_window(self, path: Path) -> None:
if self._try_tk_window(path):
return
self._open_file(path)
def _try_tk_window(self, path: Path) -> bool:
try:
import tkinter as tk
except Exception:
return False
def run_window() -> None:
try:
root = tk.Tk()
root.title("OverUB QR Login")
root.attributes("-topmost", True)
image = tk.PhotoImage(file=str(path))
label = tk.Label(root, image=image)
label.image = image
label.pack()
root.mainloop()
except Exception:
return
thread = threading.Thread(target=run_window, daemon=True)
thread.start()
return True
def _open_file(self, path: Path) -> None:
try:
if sys.platform.startswith("darwin"):
subprocess.run(["open", str(path)], check=False)
elif sys.platform.startswith("win"):
subprocess.run(["cmd", "/c", "start", "", str(path)], check=False)
else:
subprocess.run(["xdg-open", str(path)], check=False)
except Exception:
try:
webbrowser.open(path.as_uri())
except Exception:
logger.warning("Failed to open QR image automatically")
async def _prompt(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)