Description
I'm currently working on bidirectional remote procedure call using trio sockets. The abstraction I've been using works like this:
A task is permanently awaiting incoming bytes and sending them through a memory channel.
This memory channel is a special channel that never blocks on send (using math.inf on the constructor) this is the "publisher".
Then anywhere in the code I can create a "subscriber" that will match every message sent through the channel, and if matched will send it through its own channel.
I created a small package calling this abstraction AsyncQueue
. The implementation is cool I think, it ends up like a tree-like structure of AsyncQueue
s.
Also for a real world example boxer, is a UDP Hole punching client/server.
Quick API reference:
from triopatterns import AsyncQueue
inbound = AsyncQueue()
Anywere in your program create a subscriber:
def matcher(*args):
return b"hello" in args[0]
# msg_queue will receive all messages that contain the byte sequence "hello"
async with inbound.subscribe(matcher) as msg_queue:
msg = await msg_queue.receive()
Send a message through:
await inbound.send(b"hello world!")
You can also use lambdas for subscriber matcher callbacks:
async with inbound.subscribe(
lambda *args: b"hello" in args[0]
) as msg_queue:
msg = await msg_queue.receive()
To match a specific message you can pass more information to the matcher
def matcher(*args):
return args[0] == args[1]
async with inbound.subscribe(
matcher,
args=[b"7"]
) as msg_queue:
msg = await msg_queue.receive()
In the case you need to apply a modification to every message sent when matched, use modifiers:
def str_matcher(*args):
try:
return (True, str(args[0]))
except UnicodeDecodeError:
return (False, None)
async with inbound.modify(str_matcher) as msg_queue:
# msg will be of type str
msg = await msg_queue.receive()