Skip to content

Commit 5d98a5c

Browse files
fix(iroh-gossip): clarify docs and semantics of gossip joined event (#2597)
## Description Improves documentation around the `GossipEvent::Joined` event: It is only emitted once at the beginning of the stream, and the event will not be emitted when awaiting `GossipReceiver::joined`. Also makes sure that the event is actually only emitted once per intent (it potentially could have been emitted multiple times before if the neighbor count first got down to 0 and then up again for `GossipTopics` subscribing inbetween). ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions Inspired by the discussion in chatmail/core#5860 ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. Co-authored-by: Friedel Ziegelmayer <[email protected]>
1 parent a2d2ec6 commit 5d98a5c

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

iroh-gossip/src/net.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,9 @@ impl Actor {
501501
neighbors,
502502
event_senders,
503503
command_rx_keys,
504+
joined,
504505
} = state;
505-
if !neighbors.is_empty() {
506+
if *joined {
506507
let neighbors = neighbors.iter().copied().collect();
507508
channels
508509
.event_tx
@@ -588,14 +589,15 @@ impl Actor {
588589
continue;
589590
};
590591
let TopicState {
592+
joined,
591593
neighbors,
592594
event_senders,
593595
command_rx_keys,
594596
} = state;
595597
let event = if let ProtoEvent::NeighborUp(neighbor) = event {
596-
let was_empty = neighbors.is_empty();
597598
neighbors.insert(neighbor);
598-
if was_empty {
599+
if !*joined {
600+
*joined = true;
599601
GossipEvent::Joined(vec![neighbor])
600602
} else {
601603
GossipEvent::NeighborUp(neighbor)
@@ -662,6 +664,7 @@ impl Default for PeerState {
662664

663665
#[derive(Debug, Default)]
664666
struct TopicState {
667+
joined: bool,
665668
neighbors: BTreeSet<NodeId>,
666669
event_senders: EventSenders,
667670
command_rx_keys: HashSet<stream_group::Key>,

iroh-gossip/src/net/handles.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
task::{Context, Poll},
99
};
1010

11-
use anyhow::{anyhow, Result};
11+
use anyhow::{anyhow, Context as _, Result};
1212
use bytes::Bytes;
1313
use futures_lite::{Stream, StreamExt};
1414
use iroh_net::NodeId;
@@ -112,13 +112,15 @@ pub struct GossipReceiver {
112112
#[debug("EventStream")]
113113
stream: EventStream,
114114
neighbors: HashSet<NodeId>,
115+
joined: bool,
115116
}
116117

117118
impl GossipReceiver {
118119
pub(crate) fn new(events_rx: EventStream) -> Self {
119120
Self {
120121
stream: events_rx,
121122
neighbors: Default::default(),
123+
joined: false,
122124
}
123125
}
124126

@@ -128,9 +130,22 @@ impl GossipReceiver {
128130
}
129131

130132
/// Waits until we are connected to at least one node.
133+
///
134+
/// This progresses the stream until we received [`GossipEvent::Joined`], which is the first
135+
/// item emitted on the stream.
136+
///
137+
/// Note that this consumes the [`GossipEvent::Joined`] event. If you want to act on these
138+
/// initial neighbors, use [`Self::neighbors`] after awaiting [`Self::joined`].
131139
pub async fn joined(&mut self) -> Result<()> {
132-
while self.neighbors.is_empty() {
133-
let _ = self.try_next().await?;
140+
if !self.joined {
141+
match self
142+
.try_next()
143+
.await?
144+
.context("Gossip receiver closed before Joined event was received.")?
145+
{
146+
Event::Gossip(GossipEvent::Joined(_)) => {}
147+
_ => anyhow::bail!("Expected Joined event to be the first event received."),
148+
}
134149
}
135150
Ok(())
136151
}
@@ -148,6 +163,7 @@ impl Stream for GossipReceiver {
148163
if let Some(Ok(item)) = &item {
149164
match item {
150165
Event::Gossip(GossipEvent::Joined(neighbors)) => {
166+
self.joined = true;
151167
self.neighbors.extend(neighbors.iter().copied());
152168
}
153169
Event::Gossip(GossipEvent::NeighborUp(node_id)) => {
@@ -163,26 +179,32 @@ impl Stream for GossipReceiver {
163179
}
164180
}
165181

166-
/// Update from a subscribed gossip topic.
182+
/// Events emitted from a gossip topic with a lagging notification.
183+
///
184+
/// This is the item of the [`GossipReceiver`] stream. It wraps the actual gossip events to also
185+
/// provide a notification if we missed gossip events for the topic.
167186
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
168187
pub enum Event {
169-
/// A message was received.
188+
/// We received an event.
170189
Gossip(GossipEvent),
171-
/// We missed some messages.
190+
/// We missed some messages because our [`GossipReceiver`] was not progressing fast enough.
172191
Lagged,
173192
}
174193

175-
/// Gossip event
176-
/// An event to be emitted to the application for a particular topic.
194+
/// Events emitted from a gossip topic.
195+
///
196+
/// These are the events emitted from a [`GossipReceiver`], wrapped in [`Event::Gossip`].
177197
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
178198
pub enum GossipEvent {
179199
/// We joined the topic with at least one peer.
200+
///
201+
/// This is the first event on a [`GossipReceiver`] and will only be emitted once.
180202
Joined(Vec<NodeId>),
181-
/// We have a new, direct neighbor in the swarm membership layer for this topic
203+
/// We have a new, direct neighbor in the swarm membership layer for this topic.
182204
NeighborUp(NodeId),
183-
/// We dropped direct neighbor in the swarm membership layer for this topic
205+
/// We dropped direct neighbor in the swarm membership layer for this topic.
184206
NeighborDown(NodeId),
185-
/// A gossip message was received for this topic
207+
/// We received a gossip message for this topic.
186208
Received(Message),
187209
}
188210

0 commit comments

Comments
 (0)