Skip to content

Broadcast channels #987

Open
Open
@njsmith

Description

@njsmith

[This is a speculative discussion. I don't know if we should do anything here. But I was thinking about it so I figured I'd write it down so it doesn't get lost.]

There are two kinds of multi-consumer channels: the kind where each item is distributed to one receiver (e.g., by round-robin), and the kind where each item is distributed to all receivers ("broadcast"). We already have good support for the former, via memory channels. Implementing broadcast is trickier, though, because you need to pick some policy for handling the case where consumers run at different speeds. I guess the basic options are:

  • Apply backpressure based on the slowest consumer: this is appropriate if all the receivers are working together in some sense, and there's no point in letting one consumer get too far ahead of the others. It's straightforward to implement by using a collection of memory channels with bounded buffers, like:

    async def broadcast_send(self, value):
        for channel in self._channels:
            await channel.send(value)
  • Apply backpressure based on the faster consumer: this is intuitively tempting in cases where the receivers are unrelated, but it requires unbounded buffer space, so... not necessarily a good idea. It can be approximated by re-using our broadcast_send from the previous bullet point, but with infinite buffers. But, this disables backpressure entirely, rather than applying backpressure based on the fastest receiver. Doing this precisely would require some more work. I'm not sure if it's ever useful.

  • Instead of applying backpressure, drop messages: The intuition here is that one common case is where you have a value that updates occasionally, and subscribers want to keep track of the latest version. However, if multiple updates happen in quick succession while a subscriber isn't looking, then they only care about the latest value. One possible API:

    class BroadcastValue(Generic[T]):
        def set(self, value: T) -> None:
            ...
    
        def subscribe(self) -> ReceiveChannel[T]:
            ...

    The semantics of the ReceiveChannel would be: the first time you call receive (or __anext__), it immediately returns the current value, and stores some state in the channel object keeping track of what was the last value returned by this particular channel. After that, each call to receive waits until there's been at least one call to set (compared to the last time this channel returned a value), and then returns the latest value. So in practice, you'd write:

    async for current_value in broadcast_value.subscribe():
        print("Latest update:", current_value)

    and if there's a fast update rate, then the loop might skip over some intermediate values, but if the updates stop then the loop will always catch up with the latest value.

Are there any other interesting policies that I'm missing?

There's some discussion of related ideas in #637

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions