47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
|
|
|
from core.error_handler import capture_errors
|
|
|
|
|
|
Subscriber = Callable[[Any], Awaitable[None]]
|
|
Requester = Callable[[Any], Awaitable[Any]]
|
|
|
|
|
|
@dataclass
|
|
class BusMessage:
|
|
topic: str
|
|
data: Any
|
|
|
|
|
|
class MessageBus:
|
|
def __init__(self) -> None:
|
|
self._topics: Dict[str, List[Subscriber]] = {}
|
|
self._services: Dict[str, Any] = {}
|
|
self._requests: Dict[str, Requester] = {}
|
|
|
|
def subscribe(self, topic: str, handler: Subscriber) -> None:
|
|
handlers = self._topics.setdefault(topic, [])
|
|
handlers.append(capture_errors(handler))
|
|
|
|
async def publish(self, topic: str, data: Any) -> None:
|
|
for handler in list(self._topics.get(topic, [])):
|
|
await handler(data)
|
|
|
|
def register_request_handler(self, topic: str, handler: Requester) -> None:
|
|
self._requests[topic] = handler
|
|
|
|
async def request(self, topic: str, data: Any) -> Optional[Any]:
|
|
handler = self._requests.get(topic)
|
|
if not handler:
|
|
return None
|
|
return await handler(data)
|
|
|
|
def register_service(self, name: str, service: Any) -> None:
|
|
self._services[name] = service
|
|
|
|
def get_service(self, name: str) -> Any:
|
|
return self._services.get(name)
|