from __future__ import annotations from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, List, Optional from core.error_handler import capture_errors Subscriber = Callable[[Any], Awaitable[None]] Requester = Callable[[Any], Awaitable[Any]] @dataclass class BusMessage: topic: str data: Any class MessageBus: def __init__(self) -> None: self._topics: Dict[str, List[Subscriber]] = {} self._services: Dict[str, Any] = {} self._requests: Dict[str, Requester] = {} def subscribe(self, topic: str, handler: Subscriber) -> None: handlers = self._topics.setdefault(topic, []) handlers.append(capture_errors(handler)) async def publish(self, topic: str, data: Any) -> None: for handler in list(self._topics.get(topic, [])): await handler(data) def register_request_handler(self, topic: str, handler: Requester) -> None: self._requests[topic] = handler async def request(self, topic: str, data: Any) -> Optional[Any]: handler = self._requests.get(topic) if not handler: return None return await handler(data) def register_service(self, name: str, service: Any) -> None: self._services[name] = service def get_service(self, name: str) -> Any: return self._services.get(name)