from __future__ import annotations from typing import Any from core.logger import get_logger logger = get_logger("core.client") class ClientWrapper: def __init__(self, api_id: int, api_hash: str, session_name: str) -> None: self.api_id = api_id self.api_hash = api_hash self.session_name = session_name self.client: Any = None async def connect(self) -> None: try: from telethon import TelegramClient except ImportError as exc: raise RuntimeError("Telethon not installed") from exc self.client = TelegramClient(self.session_name, self.api_id, self.api_hash) await self.client.start() logger.info("Telethon client connected") async def attach_handlers(self, app: Any) -> None: if self.client is None: return try: from telethon import events except ImportError as exc: raise RuntimeError("Telethon not installed") from exc @self.client.on(events.NewMessage) async def on_new_message(event): await app.handle_message_event(event) @self.client.on(events.MessageEdited) async def on_message_edit(event): await app.handle_edit_event(event) @self.client.on(events.MessageDeleted) async def on_message_delete(event): await app.handle_delete_event(event) if hasattr(events, "MessageRead"): @self.client.on(events.MessageRead) async def on_message_read(event): await app.events.emit("on_message_read", event=event) @self.client.on(events.ChatAction) async def on_chat_action(event): await app.events.emit("on_chat_action", event=event) await app.events.emit("on_chat_update", event=event) if getattr(event, "user_typing", False): await app.events.emit("on_typing", event=event) if getattr(event, "user_recording", False): await app.events.emit("on_recording", event=event) if hasattr(events, "CallbackQuery"): @self.client.on(events.CallbackQuery) async def on_callback_query(event): await app.events.emit("on_callback_query", event=event) if hasattr(events, "InlineQuery"): @self.client.on(events.InlineQuery) async def on_inline_query(event): await app.events.emit("on_inline_query", event=event) if hasattr(events, "UserUpdate"): @self.client.on(events.UserUpdate) async def on_user_update(event): await app.events.emit("on_user_update", event=event) if getattr(event, "status", None) is not None: await app.events.emit("on_status_update", event=event) if getattr(event, "contact", None) is not None: await app.events.emit("on_contact_update", event=event) if hasattr(events, "Disconnected"): @self.client.on(events.Disconnected) async def on_disconnect(event): await app.events.emit("on_disconnect", event=event) if hasattr(events, "Connected"): @self.client.on(events.Connected) async def on_reconnect(event): await app.events.emit("on_reconnect", event=event) logger.info("Telethon event handlers attached") async def disconnect(self) -> None: if self.client: await self.client.disconnect() logger.info("Telethon client disconnected") async def wait_until_disconnected(self) -> None: if self.client: await self.client.disconnected