phoenix_pubsub
PubSub Module - A topic-based publish-subscribe system for asyncio applications.
1""" 2PubSub Module - A topic-based publish-subscribe system for asyncio applications. 3 4For the usage examples check out [test.py](https://github.com/0riginaln0/phoenix-pubsub/blob/main/src/phoenix_pubsub/test.py) and [README.md](https://github.com/0riginaln0/phoenix-pubsub/blob/main/README.md) 5""" 6 7from .phoenix_pubsub import PubSub, Topic, Message, Dispatcher, Subscribers, Peer, synchronous_dispatcher, concurrent_dispatcher 8 9__all__ = ["PubSub", "Topic", "Message", "Dispatcher", "Subscribers", "Peer", "synchronous_dispatcher", "concurrent_dispatcher"]
class
PubSub:
62class PubSub: 63 """ 64 A topic-based publish-subscribe system for asynchronous message passing. 65 66 Supports attaching arbitrary metadata to subscriptions, allowing custom 67 dispatcher logic for flexible message routing. 68 """ 69 70 def __init__(self): 71 self._topics: Registry = defaultdict(dict) 72 "Dictionary mapping Topics to Subscribers" 73 self._lock = asyncio.Lock() 74 "Lock for thread-safe operations on the registry" 75 76 async def subscribe( 77 self, peer: Peer, *topics: str, metadata: Optional[dict] = None 78 ): 79 """ 80 Subscribe a queue to one or more topics. 81 82 Args: 83 peer: The queue that will receive messages. 84 This queue will receive messages as (topic, message) tuples. 85 *topics: Variable number of topic strings to subscribe to. 86 The subscriber will receive messages published to any of these topics. 87 metadata: Arbitrary data attached to this subscription. 88 The dispatcher can use it for custom delivery logic (e.g., filtering, 89 prioritisation). 90 91 Example: 92 ```python 93 queue = asyncio.Queue() 94 95 # Subscribe to single topic 96 await pubsub.subscribe(queue, "temperature") 97 98 # Subscribe to multiple topics 99 await pubsub.subscribe(queue, "news", "weather", "sports") 100 101 # Provide metadata 102 await pubsub.subscribe(queue, "stock", metadata={"priority": 10}) 103 ``` 104 """ 105 metadata = metadata or {} 106 async with self._lock: 107 for topic in topics: 108 self._topics[topic][peer] = metadata 109 110 async def unsubscribe(self, peer: Peer, *topics: str): 111 """ 112 Unsubscribe a queue from one or more topics. 113 114 Args: 115 peer: The queue to unsubscribe. 116 *topics: Topics to unsubscribe from. 117 118 Example: 119 ```python 120 # Unsubscribe from a single topic 121 await pubsub.unsubscribe(queue, "weather") 122 123 # Unsubscribe from multiple topics 124 await pubsub.unsubscribe(queue, "news", "sports") 125 ``` 126 """ 127 async with self._lock: 128 for topic in topics: 129 if subscribers := self._topics.get(topic): 130 subscribers.pop(peer, None) 131 if not subscribers: 132 del self._topics[topic] 133 134 async def broadcast( 135 self, 136 message: Message, 137 *topics: str, 138 dispatcher: Dispatcher = synchronous_dispatcher, 139 ): 140 """ 141 Broadcast a message to all subscribers of the specified topics. 142 143 Args: 144 message: The message to broadcast. 145 *topics: Topics to broadcast to. 146 dispatcher: A callable that handles the actual message delivery. 147 Custom dispatchers can implement filtering, transformation, 148 or prioritisation using the metadata stored with each subscriber. 149 150 Example: 151 ```python 152 await pubsub.broadcast("Hello world!", "greetings") 153 # Subscribers receive: ("greetings", "Hello world!") 154 155 data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"} 156 await pubsub.broadcast(data, "telemetry", "monitoring") 157 # Subscribers receive: ("telemetry", data) 158 # ("monitoring", data) 159 ``` 160 161 Note: 162 - Slow consumers may miss messages if their queue is full 163 - The broadcast is asynchronous - it doesn't wait for subscribers to process messages 164 """ 165 pending: list[tuple[Topic, Subscribers]] = [] 166 async with self._lock: 167 for topic in topics: 168 if subscribers := self._topics.get(topic): 169 pending.append((topic, subscribers.copy())) 170 171 for topic, subscribers in pending: 172 dispatcher(topic, message, subscribers, None) 173 174 async def broadcast_from( 175 self, 176 publisher: Peer, 177 message: Message, 178 *topics: str, 179 dispatcher: Dispatcher = synchronous_dispatcher, 180 ): 181 """ 182 Broadcast a message to all subscribers except the publisher itself. 183 184 Args: 185 publisher: The queue of the publisher to exclude. 186 This peer (the publisher) will not receive the message. 187 message: The message to broadcast. 188 *topics: Topics to broadcast to. 189 dispatcher: A callable that handles the actual message delivery. 190 Custom dispatchers can implement filtering, transformation, 191 or prioritisation using the metadata stored with each subscriber. 192 193 Example: 194 ```python 195 publisher_queue = asyncio.Queue() 196 chat_queue = asyncio.Queue() 197 await pubsub.subscribe(publisher_queue, "chat") 198 await pubsub.subscribe(chat_queue, "chat") 199 200 await pubsub.broadcast_from( 201 publisher_queue, 202 "User joined the channel", 203 "chat" 204 ) 205 # chat_queue receives ("chat", "User joined the channel") 206 # publisher_queue doesn't receive this message 207 ``` 208 """ 209 pending: list[tuple[Topic, Subscribers]] = [] 210 async with self._lock: 211 for topic in topics: 212 subscribers = self._topics.get(topic) 213 if subscribers: 214 pending.append((topic, subscribers.copy())) 215 216 for topic, subscribers in pending: 217 dispatcher(topic, message, subscribers, publisher)
A topic-based publish-subscribe system for asynchronous message passing.
Supports attaching arbitrary metadata to subscriptions, allowing custom dispatcher logic for flexible message routing.
async def
subscribe(self, peer: Peer, *topics: str, metadata: dict | None = None):
76 async def subscribe( 77 self, peer: Peer, *topics: str, metadata: Optional[dict] = None 78 ): 79 """ 80 Subscribe a queue to one or more topics. 81 82 Args: 83 peer: The queue that will receive messages. 84 This queue will receive messages as (topic, message) tuples. 85 *topics: Variable number of topic strings to subscribe to. 86 The subscriber will receive messages published to any of these topics. 87 metadata: Arbitrary data attached to this subscription. 88 The dispatcher can use it for custom delivery logic (e.g., filtering, 89 prioritisation). 90 91 Example: 92 ```python 93 queue = asyncio.Queue() 94 95 # Subscribe to single topic 96 await pubsub.subscribe(queue, "temperature") 97 98 # Subscribe to multiple topics 99 await pubsub.subscribe(queue, "news", "weather", "sports") 100 101 # Provide metadata 102 await pubsub.subscribe(queue, "stock", metadata={"priority": 10}) 103 ``` 104 """ 105 metadata = metadata or {} 106 async with self._lock: 107 for topic in topics: 108 self._topics[topic][peer] = metadata
Subscribe a queue to one or more topics.
Arguments:
- peer: The queue that will receive messages. This queue will receive messages as (topic, message) tuples.
- *topics: Variable number of topic strings to subscribe to. The subscriber will receive messages published to any of these topics.
- metadata: Arbitrary data attached to this subscription. The dispatcher can use it for custom delivery logic (e.g., filtering, prioritisation).
Example:
queue = asyncio.Queue() # Subscribe to single topic await pubsub.subscribe(queue, "temperature") # Subscribe to multiple topics await pubsub.subscribe(queue, "news", "weather", "sports") # Provide metadata await pubsub.subscribe(queue, "stock", metadata={"priority": 10})
async def
unsubscribe(self, peer: Peer, *topics: str):
110 async def unsubscribe(self, peer: Peer, *topics: str): 111 """ 112 Unsubscribe a queue from one or more topics. 113 114 Args: 115 peer: The queue to unsubscribe. 116 *topics: Topics to unsubscribe from. 117 118 Example: 119 ```python 120 # Unsubscribe from a single topic 121 await pubsub.unsubscribe(queue, "weather") 122 123 # Unsubscribe from multiple topics 124 await pubsub.unsubscribe(queue, "news", "sports") 125 ``` 126 """ 127 async with self._lock: 128 for topic in topics: 129 if subscribers := self._topics.get(topic): 130 subscribers.pop(peer, None) 131 if not subscribers: 132 del self._topics[topic]
Unsubscribe a queue from one or more topics.
Arguments:
- peer: The queue to unsubscribe.
- *topics: Topics to unsubscribe from.
Example:
# Unsubscribe from a single topic await pubsub.unsubscribe(queue, "weather") # Unsubscribe from multiple topics await pubsub.unsubscribe(queue, "news", "sports")
async def
broadcast( self, message: Message, *topics: str, dispatcher: Dispatcher = <function synchronous_dispatcher>):
134 async def broadcast( 135 self, 136 message: Message, 137 *topics: str, 138 dispatcher: Dispatcher = synchronous_dispatcher, 139 ): 140 """ 141 Broadcast a message to all subscribers of the specified topics. 142 143 Args: 144 message: The message to broadcast. 145 *topics: Topics to broadcast to. 146 dispatcher: A callable that handles the actual message delivery. 147 Custom dispatchers can implement filtering, transformation, 148 or prioritisation using the metadata stored with each subscriber. 149 150 Example: 151 ```python 152 await pubsub.broadcast("Hello world!", "greetings") 153 # Subscribers receive: ("greetings", "Hello world!") 154 155 data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"} 156 await pubsub.broadcast(data, "telemetry", "monitoring") 157 # Subscribers receive: ("telemetry", data) 158 # ("monitoring", data) 159 ``` 160 161 Note: 162 - Slow consumers may miss messages if their queue is full 163 - The broadcast is asynchronous - it doesn't wait for subscribers to process messages 164 """ 165 pending: list[tuple[Topic, Subscribers]] = [] 166 async with self._lock: 167 for topic in topics: 168 if subscribers := self._topics.get(topic): 169 pending.append((topic, subscribers.copy())) 170 171 for topic, subscribers in pending: 172 dispatcher(topic, message, subscribers, None)
Broadcast a message to all subscribers of the specified topics.
Arguments:
- message: The message to broadcast.
- *topics: Topics to broadcast to.
- dispatcher: A callable that handles the actual message delivery. Custom dispatchers can implement filtering, transformation, or prioritisation using the metadata stored with each subscriber.
Example:
await pubsub.broadcast("Hello world!", "greetings") # Subscribers receive: ("greetings", "Hello world!") data = {"sensor": "temperature", "value": 23.5, "unit": "celsius"} await pubsub.broadcast(data, "telemetry", "monitoring") # Subscribers receive: ("telemetry", data) # ("monitoring", data)
Note:
- Slow consumers may miss messages if their queue is full
- The broadcast is asynchronous - it doesn't wait for subscribers to process messages
async def
broadcast_from( self, publisher: Peer, message: Message, *topics: str, dispatcher: Dispatcher = <function synchronous_dispatcher>):
174 async def broadcast_from( 175 self, 176 publisher: Peer, 177 message: Message, 178 *topics: str, 179 dispatcher: Dispatcher = synchronous_dispatcher, 180 ): 181 """ 182 Broadcast a message to all subscribers except the publisher itself. 183 184 Args: 185 publisher: The queue of the publisher to exclude. 186 This peer (the publisher) will not receive the message. 187 message: The message to broadcast. 188 *topics: Topics to broadcast to. 189 dispatcher: A callable that handles the actual message delivery. 190 Custom dispatchers can implement filtering, transformation, 191 or prioritisation using the metadata stored with each subscriber. 192 193 Example: 194 ```python 195 publisher_queue = asyncio.Queue() 196 chat_queue = asyncio.Queue() 197 await pubsub.subscribe(publisher_queue, "chat") 198 await pubsub.subscribe(chat_queue, "chat") 199 200 await pubsub.broadcast_from( 201 publisher_queue, 202 "User joined the channel", 203 "chat" 204 ) 205 # chat_queue receives ("chat", "User joined the channel") 206 # publisher_queue doesn't receive this message 207 ``` 208 """ 209 pending: list[tuple[Topic, Subscribers]] = [] 210 async with self._lock: 211 for topic in topics: 212 subscribers = self._topics.get(topic) 213 if subscribers: 214 pending.append((topic, subscribers.copy())) 215 216 for topic, subscribers in pending: 217 dispatcher(topic, message, subscribers, publisher)
Broadcast a message to all subscribers except the publisher itself.
Arguments:
- publisher: The queue of the publisher to exclude. This peer (the publisher) will not receive the message.
- message: The message to broadcast.
- *topics: Topics to broadcast to.
- dispatcher: A callable that handles the actual message delivery. Custom dispatchers can implement filtering, transformation, or prioritisation using the metadata stored with each subscriber.
Example:
publisher_queue = asyncio.Queue() chat_queue = asyncio.Queue() await pubsub.subscribe(publisher_queue, "chat") await pubsub.subscribe(chat_queue, "chat") await pubsub.broadcast_from( publisher_queue, "User joined the channel", "chat" ) # chat_queue receives ("chat", "User joined the channel") # publisher_queue doesn't receive this message
type Topic =
str
type Message =
Any
type Dispatcher =
Callable[[Topic, Message, Subscribers, Peer | None], NoneType]
type Subscribers =
dict[Peer, dict]
type Peer =
asyncio.queues.Queue
def
synchronous_dispatcher( topic: Topic, message: Message, subscribers: Subscribers, publisher: Peer | None = None):
14def synchronous_dispatcher( 15 topic: Topic, 16 message: Message, 17 subscribers: Subscribers, 18 publisher: Optional[Peer] = None, 19): 20 """ 21 Blocks the caller until all put attempts are made. 22 """ 23 24 def try_put_message(peer: asyncio.Queue, topic: str, message: Message): 25 try: 26 peer.put_nowait((topic, message)) 27 except (asyncio.QueueFull, asyncio.QueueShutDown): 28 # Handle slow consumers – you might want to drop or log 29 pass 30 31 if publisher: # broadcast_from 32 for peer, _metadata in subscribers.items(): 33 if peer is not publisher: 34 try_put_message(peer, topic, message) 35 else: # broadcast 36 for peer in subscribers.keys(): 37 try_put_message(peer, topic, message)
Blocks the caller until all put attempts are made.
def
concurrent_dispatcher( topic: Topic, message: Message, subscribers: Subscribers, publisher: Peer | None = None):
40def concurrent_dispatcher( 41 topic: Topic, 42 message: Message, 43 subscribers: Subscribers, 44 publisher: Optional[Peer] = None, 45): 46 """ 47 Spawns one background put attempt task per subscriber. 48 """ 49 50 async def try_put_message(peer: asyncio.Queue, topic: str, message: Message): 51 try: 52 peer.put_nowait((topic, message)) 53 except (asyncio.QueueFull, asyncio.QueueShutDown): 54 pass 55 56 for peer in subscribers.keys(): 57 if publisher and peer is publisher: 58 continue 59 asyncio.create_task(try_put_message(peer, topic, message))
Spawns one background put attempt task per subscriber.