Files
overub/core/client.py

210 lines
7.5 KiB
Python

from __future__ import annotations
import asyncio
import subprocess
import sys
import threading
import webbrowser
from pathlib import Path
from typing import Any, Optional
from core.logger import get_logger
logger = get_logger("core.client")
class ClientWrapper:
def __init__(self, api_id: int, api_hash: str, session_name: str) -> None:
self.api_id = api_id
self.api_hash = api_hash
self.session_name = session_name
self.client: Any = None
async def connect(
self,
login_mode: str = "phone",
phone: Optional[str] = None,
qr_path: Optional[str] = None,
qr_open: bool = False,
) -> None:
try:
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError
except ImportError as exc:
raise RuntimeError("Telethon not installed") from exc
self.client = TelegramClient(self.session_name, self.api_id, self.api_hash)
if login_mode == "qr":
await self.client.connect()
if await self.client.is_user_authorized():
logger.info("Telethon client already authorized")
return
try:
qr = await self.client.qr_login()
except AttributeError:
logger.warning("QR login not supported by this Telethon version, falling back to phone login")
await self.client.start(phone=phone)
return
self._print_qr(qr.url, qr_path=qr_path, qr_open=qr_open)
try:
await qr.wait()
except SessionPasswordNeededError:
password = await self._prompt("Two-step verification enabled. Enter password: ")
await self.client.sign_in(password=password)
else:
await self.client.start(phone=phone)
logger.info("Telethon client connected")
async def attach_handlers(self, app: Any) -> None:
if self.client is None:
return
try:
from telethon import events
except ImportError as exc:
raise RuntimeError("Telethon not installed") from exc
@self.client.on(events.NewMessage)
async def on_new_message(event):
await app.handle_message_event(event)
@self.client.on(events.MessageEdited)
async def on_message_edit(event):
await app.handle_edit_event(event)
@self.client.on(events.MessageDeleted)
async def on_message_delete(event):
await app.handle_delete_event(event)
if hasattr(events, "MessageRead"):
@self.client.on(events.MessageRead)
async def on_message_read(event):
await app.events.emit("on_message_read", event=event)
@self.client.on(events.ChatAction)
async def on_chat_action(event):
await app.events.emit("on_chat_action", event=event)
await app.events.emit("on_chat_update", event=event)
if getattr(event, "user_typing", False):
await app.events.emit("on_typing", event=event)
if getattr(event, "user_recording", False):
await app.events.emit("on_recording", event=event)
if hasattr(events, "CallbackQuery"):
@self.client.on(events.CallbackQuery)
async def on_callback_query(event):
await app.events.emit("on_callback_query", event=event)
if hasattr(events, "InlineQuery"):
@self.client.on(events.InlineQuery)
async def on_inline_query(event):
await app.events.emit("on_inline_query", event=event)
if hasattr(events, "UserUpdate"):
@self.client.on(events.UserUpdate)
async def on_user_update(event):
await app.events.emit("on_user_update", event=event)
if getattr(event, "status", None) is not None:
await app.events.emit("on_status_update", event=event)
if getattr(event, "contact", None) is not None:
await app.events.emit("on_contact_update", event=event)
if hasattr(events, "Disconnected"):
@self.client.on(events.Disconnected)
async def on_disconnect(event):
await app.events.emit("on_disconnect", event=event)
if hasattr(events, "Connected"):
@self.client.on(events.Connected)
async def on_reconnect(event):
await app.events.emit("on_reconnect", event=event)
logger.info("Telethon event handlers attached")
async def disconnect(self) -> None:
if self.client:
await self.client.disconnect()
logger.info("Telethon client disconnected")
async def wait_until_disconnected(self) -> None:
if self.client:
await self.client.disconnected
def _print_qr(self, url: str, qr_path: Optional[str] = None, qr_open: bool = False) -> None:
logger.info("QR login URL: %s", url)
print("Telegram app -> Settings -> Devices -> Scan QR")
print(f"If you cannot scan, try opening this URL on your phone: {url}")
saved_path = None
if qr_path:
saved_path = self._save_qr_image(url, qr_path)
try:
import qrcode
qr = qrcode.QRCode(border=1)
qr.add_data(url)
qr.make(fit=True)
qr.print_ascii(invert=True)
except Exception:
pass
if qr_open and saved_path:
self._open_qr_window(saved_path)
def _save_qr_image(self, url: str, qr_path: str) -> Optional[Path]:
try:
import qrcode
except Exception:
logger.warning("qrcode package not installed; cannot save QR image")
return None
path = Path(qr_path)
path.parent.mkdir(parents=True, exist_ok=True)
img = qrcode.make(url)
img.save(path)
logger.info("QR image saved to %s", path)
return path
def _open_qr_window(self, path: Path) -> None:
if self._try_tk_window(path):
return
self._open_file(path)
def _try_tk_window(self, path: Path) -> bool:
try:
import tkinter as tk
except Exception:
return False
def run_window() -> None:
try:
root = tk.Tk()
root.title("OverUB QR Login")
root.attributes("-topmost", True)
image = tk.PhotoImage(file=str(path))
label = tk.Label(root, image=image)
label.image = image
label.pack()
root.mainloop()
except Exception:
return
thread = threading.Thread(target=run_window, daemon=True)
thread.start()
return True
def _open_file(self, path: Path) -> None:
try:
if sys.platform.startswith("darwin"):
subprocess.run(["open", str(path)], check=False)
elif sys.platform.startswith("win"):
subprocess.run(["cmd", "/c", "start", "", str(path)], check=False)
else:
subprocess.run(["xdg-open", str(path)], check=False)
except Exception:
try:
webbrowser.open(path.as_uri())
except Exception:
logger.warning("Failed to open QR image automatically")
async def _prompt(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)