Skip to content

Extending memory channel API, adding lightweight pub-sub #1503

Open
@guilledk

Description

@guilledk

I'm currently working on bidirectional remote procedure call using trio sockets. The abstraction I've been using works like this:

A task is permanently awaiting incoming bytes and sending them through a memory channel.
This memory channel is a special channel that never blocks on send (using math.inf on the constructor) this is the "publisher".
Then anywhere in the code I can create a "subscriber" that will match every message sent through the channel, and if matched will send it through its own channel.

I created a small package calling this abstraction AsyncQueue. The implementation is cool I think, it ends up like a tree-like structure of AsyncQueues.
Also for a real world example boxer, is a UDP Hole punching client/server.

Quick API reference:

from triopatterns import AsyncQueue

inbound = AsyncQueue()

Anywere in your program create a subscriber:

def matcher(*args):
    return b"hello" in args[0]

# msg_queue will receive all messages that contain the byte sequence "hello"
async with inbound.subscribe(matcher) as msg_queue:
    msg = await msg_queue.receive()

Send a message through:

await inbound.send(b"hello world!")

You can also use lambdas for subscriber matcher callbacks:

async with inbound.subscribe(
    lambda *args: b"hello" in args[0]
        ) as msg_queue:
    msg = await msg_queue.receive()

To match a specific message you can pass more information to the matcher

def matcher(*args):
    return args[0] == args[1]

async with inbound.subscribe(
    matcher,
    args=[b"7"]
        ) as msg_queue:
    msg = await msg_queue.receive()

In the case you need to apply a modification to every message sent when matched, use modifiers:

def str_matcher(*args):
    try:
        return (True, str(args[0]))
    except UnicodeDecodeError:
        return (False, None)

async with inbound.modify(str_matcher) as msg_queue:
    # msg will be of type str
    msg = await msg_queue.receive()

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions