Files
overub/core/bus.py
2025-12-21 17:12:32 +01:00

47 lines
1.3 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, List, Optional
from core.error_handler import capture_errors
Subscriber = Callable[[Any], Awaitable[None]]
Requester = Callable[[Any], Awaitable[Any]]
@dataclass
class BusMessage:
topic: str
data: Any
class MessageBus:
def __init__(self) -> None:
self._topics: Dict[str, List[Subscriber]] = {}
self._services: Dict[str, Any] = {}
self._requests: Dict[str, Requester] = {}
def subscribe(self, topic: str, handler: Subscriber) -> None:
handlers = self._topics.setdefault(topic, [])
handlers.append(capture_errors(handler))
async def publish(self, topic: str, data: Any) -> None:
for handler in list(self._topics.get(topic, [])):
await handler(data)
def register_request_handler(self, topic: str, handler: Requester) -> None:
self._requests[topic] = handler
async def request(self, topic: str, data: Any) -> Optional[Any]:
handler = self._requests.get(topic)
if not handler:
return None
return await handler(data)
def register_service(self, name: str, service: Any) -> None:
self._services[name] = service
def get_service(self, name: str) -> Any:
return self._services.get(name)