phoenix_pubsub

PubSub Module - A topic-based publish-subscribe system for asyncio applications.

For the usage examples check out test.py and README.md

1"""
2PubSub Module - A topic-based publish-subscribe system for asyncio applications.
3
4For the usage examples check out [test.py](https://github.com/0riginaln0/phoenix-pubsub/blob/main/src/phoenix_pubsub/test.py) and [README.md](https://github.com/0riginaln0/phoenix-pubsub/blob/main/README.md)
5"""
6
7from .phoenix_pubsub import PubSub, Topic, Message, Dispatcher, Subscribers, Peer, synchronous_dispatcher, concurrent_dispatcher
8
9__all__ = ["PubSub", "Topic", "Message", "Dispatcher", "Subscribers", "Peer", "synchronous_dispatcher", "concurrent_dispatcher"]
class PubSub:
 62class PubSub:
 63    """
 64    A topic-based publish-subscribe system for asynchronous message passing.
 65
 66    Supports attaching arbitrary metadata to subscriptions, allowing custom
 67    dispatcher logic for flexible message routing.
 68    """
 69
 70    def __init__(self):
 71        self._topics: Registry = defaultdict(dict)
 72        "Dictionary mapping Topics to Subscribers"
 73        self._lock = asyncio.Lock()
 74        "Lock for thread-safe operations on the registry"
 75
 76    async def subscribe(
 77        self, peer: Peer, *topics: str, metadata: Optional[dict] = None
 78    ):
 79        """
 80        Subscribe a queue to one or more topics.
 81
 82        Args:
 83            peer: The queue that will receive messages.
 84                This queue will receive messages as (topic, message) tuples.
 85            *topics: Variable number of topic strings to subscribe to.
 86                The subscriber will receive messages published to any of these topics.
 87            metadata: Arbitrary data attached to this subscription.
 88                The dispatcher can use it for custom delivery logic (e.g., filtering,
 89                prioritisation).
 90
 91        Example:
 92            ```python
 93            queue = asyncio.Queue()
 94
 95            # Subscribe to single topic
 96            await pubsub.subscribe(queue, "temperature")
 97
 98            # Subscribe to multiple topics
 99            await pubsub.subscribe(queue, "news", "weather", "sports")
100
101            # Provide metadata
102            await pubsub.subscribe(queue, "stock", metadata={"priority": 10})
103            ```
104        """
105        metadata = metadata or {}
106        async with self._lock:
107            for topic in topics:
108                self._topics[topic][peer] = metadata
109
110    async def unsubscribe(self, peer: Peer, *topics: str):
111        """
112        Unsubscribe a queue from one or more topics.
113
114        Args:
115            peer: The queue to unsubscribe.
116            *topics: Topics to unsubscribe from.
117
118        Example:
119            ```python
120            # Unsubscribe from a single topic
121            await pubsub.unsubscribe(queue, "weather")
122
123            # Unsubscribe from multiple topics
124            await pubsub.unsubscribe(queue, "news", "sports")
125            ```
126        """
127        async with self._lock:
128            for topic in topics:
129                if subscribers := self._topics.get(topic):
130                    subscribers.pop(peer, None)
131                    if not subscribers:
132                        del self._topics[topic]
133
134    async def broadcast(
135        self,
136        message: Message,
137        *topics: str,
138        dispatcher: Dispatcher = synchronous_dispatcher,
139    ):
140        """
141        Broadcast a message to all subscribers of the specified topics.
142
143        Args:
144            message: The message to broadcast.
145            *topics: Topics to broadcast to.
146            dispatcher: A callable that handles the actual message delivery.
147                Custom dispatchers can implement filtering, transformation,
148                or prioritisation using the metadata stored with each subscriber.
149
150        Example:
151            ```python
152            await pubsub.broadcast("Hello world!", "greetings")
153            # Subscribers receive: ("greetings", "Hello world!")
154
155            data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"}
156            await pubsub.broadcast(data, "telemetry", "monitoring")
157            # Subscribers receive: ("telemetry", data)
158            #                      ("monitoring", data)
159            ```
160
161        Note:
162            - Slow consumers may miss messages if their queue is full
163            - The broadcast is asynchronous - it doesn't wait for subscribers to process messages
164        """
165        pending: list[tuple[Topic, Subscribers]] = []
166        async with self._lock:
167            for topic in topics:
168                if subscribers := self._topics.get(topic):
169                    pending.append((topic, subscribers.copy()))
170
171        for topic, subscribers in pending:
172            dispatcher(topic, message, subscribers, None)
173
174    async def broadcast_from(
175        self,
176        publisher: Peer,
177        message: Message,
178        *topics: str,
179        dispatcher: Dispatcher = synchronous_dispatcher,
180    ):
181        """
182        Broadcast a message to all subscribers except the publisher itself.
183
184        Args:
185            publisher: The queue of the publisher to exclude.
186                This peer (the publisher) will not receive the message.
187            message: The message to broadcast.
188            *topics: Topics to broadcast to.
189            dispatcher: A callable that handles the actual message delivery.
190                Custom dispatchers can implement filtering, transformation,
191                or prioritisation using the metadata stored with each subscriber.
192
193        Example:
194            ```python
195            publisher_queue = asyncio.Queue()
196            chat_queue = asyncio.Queue()
197            await pubsub.subscribe(publisher_queue, "chat")
198            await pubsub.subscribe(chat_queue, "chat")
199
200            await pubsub.broadcast_from(
201                publisher_queue,
202                "User joined the channel",
203                "chat"
204            )
205            # chat_queue receives ("chat", "User joined the channel")
206            # publisher_queue doesn't receive this message
207            ```
208        """
209        pending: list[tuple[Topic, Subscribers]] = []
210        async with self._lock:
211            for topic in topics:
212                subscribers = self._topics.get(topic)
213                if subscribers:
214                    pending.append((topic, subscribers.copy()))
215
216        for topic, subscribers in pending:
217            dispatcher(topic, message, subscribers, publisher)

A topic-based publish-subscribe system for asynchronous message passing.

Supports attaching arbitrary metadata to subscriptions, allowing custom dispatcher logic for flexible message routing.

async def subscribe(self, peer: Peer, *topics: str, metadata: dict | None = None):
 76    async def subscribe(
 77        self, peer: Peer, *topics: str, metadata: Optional[dict] = None
 78    ):
 79        """
 80        Subscribe a queue to one or more topics.
 81
 82        Args:
 83            peer: The queue that will receive messages.
 84                This queue will receive messages as (topic, message) tuples.
 85            *topics: Variable number of topic strings to subscribe to.
 86                The subscriber will receive messages published to any of these topics.
 87            metadata: Arbitrary data attached to this subscription.
 88                The dispatcher can use it for custom delivery logic (e.g., filtering,
 89                prioritisation).
 90
 91        Example:
 92            ```python
 93            queue = asyncio.Queue()
 94
 95            # Subscribe to single topic
 96            await pubsub.subscribe(queue, "temperature")
 97
 98            # Subscribe to multiple topics
 99            await pubsub.subscribe(queue, "news", "weather", "sports")
100
101            # Provide metadata
102            await pubsub.subscribe(queue, "stock", metadata={"priority": 10})
103            ```
104        """
105        metadata = metadata or {}
106        async with self._lock:
107            for topic in topics:
108                self._topics[topic][peer] = metadata

Subscribe a queue to one or more topics.

Arguments:
  • peer: The queue that will receive messages. This queue will receive messages as (topic, message) tuples.
  • *topics: Variable number of topic strings to subscribe to. The subscriber will receive messages published to any of these topics.
  • metadata: Arbitrary data attached to this subscription. The dispatcher can use it for custom delivery logic (e.g., filtering, prioritisation).
Example:
queue = asyncio.Queue()

# Subscribe to single topic
await pubsub.subscribe(queue, "temperature")

# Subscribe to multiple topics
await pubsub.subscribe(queue, "news", "weather", "sports")

# Provide metadata
await pubsub.subscribe(queue, "stock", metadata={"priority": 10})
async def unsubscribe(self, peer: Peer, *topics: str):
110    async def unsubscribe(self, peer: Peer, *topics: str):
111        """
112        Unsubscribe a queue from one or more topics.
113
114        Args:
115            peer: The queue to unsubscribe.
116            *topics: Topics to unsubscribe from.
117
118        Example:
119            ```python
120            # Unsubscribe from a single topic
121            await pubsub.unsubscribe(queue, "weather")
122
123            # Unsubscribe from multiple topics
124            await pubsub.unsubscribe(queue, "news", "sports")
125            ```
126        """
127        async with self._lock:
128            for topic in topics:
129                if subscribers := self._topics.get(topic):
130                    subscribers.pop(peer, None)
131                    if not subscribers:
132                        del self._topics[topic]

Unsubscribe a queue from one or more topics.

Arguments:
  • peer: The queue to unsubscribe.
  • *topics: Topics to unsubscribe from.
Example:
# Unsubscribe from a single topic
await pubsub.unsubscribe(queue, "weather")

# Unsubscribe from multiple topics
await pubsub.unsubscribe(queue, "news", "sports")
async def broadcast( self, message: Message, *topics: str, dispatcher: Dispatcher = <function synchronous_dispatcher>):
134    async def broadcast(
135        self,
136        message: Message,
137        *topics: str,
138        dispatcher: Dispatcher = synchronous_dispatcher,
139    ):
140        """
141        Broadcast a message to all subscribers of the specified topics.
142
143        Args:
144            message: The message to broadcast.
145            *topics: Topics to broadcast to.
146            dispatcher: A callable that handles the actual message delivery.
147                Custom dispatchers can implement filtering, transformation,
148                or prioritisation using the metadata stored with each subscriber.
149
150        Example:
151            ```python
152            await pubsub.broadcast("Hello world!", "greetings")
153            # Subscribers receive: ("greetings", "Hello world!")
154
155            data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"}
156            await pubsub.broadcast(data, "telemetry", "monitoring")
157            # Subscribers receive: ("telemetry", data)
158            #                      ("monitoring", data)
159            ```
160
161        Note:
162            - Slow consumers may miss messages if their queue is full
163            - The broadcast is asynchronous - it doesn't wait for subscribers to process messages
164        """
165        pending: list[tuple[Topic, Subscribers]] = []
166        async with self._lock:
167            for topic in topics:
168                if subscribers := self._topics.get(topic):
169                    pending.append((topic, subscribers.copy()))
170
171        for topic, subscribers in pending:
172            dispatcher(topic, message, subscribers, None)

Broadcast a message to all subscribers of the specified topics.

Arguments:
  • message: The message to broadcast.
  • *topics: Topics to broadcast to.
  • dispatcher: A callable that handles the actual message delivery. Custom dispatchers can implement filtering, transformation, or prioritisation using the metadata stored with each subscriber.
Example:
await pubsub.broadcast("Hello world!", "greetings")
# Subscribers receive: ("greetings", "Hello world!")

data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"}
await pubsub.broadcast(data, "telemetry", "monitoring")
# Subscribers receive: ("telemetry", data)
#                      ("monitoring", data)
Note:
  • Slow consumers may miss messages if their queue is full
  • The broadcast is asynchronous - it doesn't wait for subscribers to process messages
async def broadcast_from( self, publisher: Peer, message: Message, *topics: str, dispatcher: Dispatcher = <function synchronous_dispatcher>):
174    async def broadcast_from(
175        self,
176        publisher: Peer,
177        message: Message,
178        *topics: str,
179        dispatcher: Dispatcher = synchronous_dispatcher,
180    ):
181        """
182        Broadcast a message to all subscribers except the publisher itself.
183
184        Args:
185            publisher: The queue of the publisher to exclude.
186                This peer (the publisher) will not receive the message.
187            message: The message to broadcast.
188            *topics: Topics to broadcast to.
189            dispatcher: A callable that handles the actual message delivery.
190                Custom dispatchers can implement filtering, transformation,
191                or prioritisation using the metadata stored with each subscriber.
192
193        Example:
194            ```python
195            publisher_queue = asyncio.Queue()
196            chat_queue = asyncio.Queue()
197            await pubsub.subscribe(publisher_queue, "chat")
198            await pubsub.subscribe(chat_queue, "chat")
199
200            await pubsub.broadcast_from(
201                publisher_queue,
202                "User joined the channel",
203                "chat"
204            )
205            # chat_queue receives ("chat", "User joined the channel")
206            # publisher_queue doesn't receive this message
207            ```
208        """
209        pending: list[tuple[Topic, Subscribers]] = []
210        async with self._lock:
211            for topic in topics:
212                subscribers = self._topics.get(topic)
213                if subscribers:
214                    pending.append((topic, subscribers.copy()))
215
216        for topic, subscribers in pending:
217            dispatcher(topic, message, subscribers, publisher)

Broadcast a message to all subscribers except the publisher itself.

Arguments:
  • publisher: The queue of the publisher to exclude. This peer (the publisher) will not receive the message.
  • message: The message to broadcast.
  • *topics: Topics to broadcast to.
  • dispatcher: A callable that handles the actual message delivery. Custom dispatchers can implement filtering, transformation, or prioritisation using the metadata stored with each subscriber.
Example:
publisher_queue = asyncio.Queue()
chat_queue = asyncio.Queue()
await pubsub.subscribe(publisher_queue, "chat")
await pubsub.subscribe(chat_queue, "chat")

await pubsub.broadcast_from(
    publisher_queue,
    "User joined the channel",
    "chat"
)
# chat_queue receives ("chat", "User joined the channel")
# publisher_queue doesn't receive this message
type Topic = str
type Message = Any
type Dispatcher = Callable[[Topic, Message, Subscribers, Peer | None], NoneType]
type Subscribers = dict[Peer, dict]
type Peer = asyncio.queues.Queue
def synchronous_dispatcher( topic: Topic, message: Message, subscribers: Subscribers, publisher: Peer | None = None):
14def synchronous_dispatcher(
15    topic: Topic,
16    message: Message,
17    subscribers: Subscribers,
18    publisher: Optional[Peer] = None,
19):
20    """
21    Blocks the caller until all put attempts are made.
22    """
23
24    def try_put_message(peer: asyncio.Queue, topic: str, message: Message):
25        try:
26            peer.put_nowait((topic, message))
27        except (asyncio.QueueFull, asyncio.QueueShutDown):
28            # Handle slow consumers – you might want to drop or log
29            pass
30
31    if publisher:  # broadcast_from
32        for peer, _metadata in subscribers.items():
33            if peer is not publisher:
34                try_put_message(peer, topic, message)
35    else:  # broadcast
36        for peer in subscribers.keys():
37            try_put_message(peer, topic, message)

Blocks the caller until all put attempts are made.

def concurrent_dispatcher( topic: Topic, message: Message, subscribers: Subscribers, publisher: Peer | None = None):
40def concurrent_dispatcher(
41    topic: Topic,
42    message: Message,
43    subscribers: Subscribers,
44    publisher: Optional[Peer] = None,
45):
46    """
47    Spawns one background put attempt task per subscriber.
48    """
49
50    async def try_put_message(peer: asyncio.Queue, topic: str, message: Message):
51        try:
52            peer.put_nowait((topic, message))
53        except (asyncio.QueueFull, asyncio.QueueShutDown):
54            pass
55
56    for peer in subscribers.keys():
57        if publisher and peer is publisher:
58            continue
59        asyncio.create_task(try_put_message(peer, topic, message))

Spawns one background put attempt task per subscriber.