Skip to content

Commit e0684ab

Browse files
authored
Merge pull request #19 from elenaf9/quic/multiple-endpoints-2
transports/quic: Refactor listener handling (updated)
2 parents 8aff243 + bf06d96 commit e0684ab

File tree

118 files changed

+3239
-2531
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+3239
-2531
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
steps:
2121

2222
- name: Cancel Previous Runs
23-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
23+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
2424
with:
2525
access_token: ${{ github.token }}
2626

@@ -52,7 +52,7 @@ jobs:
5252
steps:
5353

5454
- name: Cancel Previous Runs
55-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
55+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
5656
with:
5757
access_token: ${{ github.token }}
5858

@@ -87,7 +87,7 @@ jobs:
8787
steps:
8888

8989
- name: Cancel Previous Runs
90-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
90+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
9191
with:
9292
access_token: ${{ github.token }}
9393

@@ -109,7 +109,7 @@ jobs:
109109
steps:
110110

111111
- name: Cancel Previous Runs
112-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
112+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
113113
with:
114114
access_token: ${{ github.token }}
115115

@@ -135,7 +135,7 @@ jobs:
135135
steps:
136136

137137
- name: Cancel Previous Runs
138-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
138+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
139139
with:
140140
access_token: ${{ github.token }}
141141

@@ -157,7 +157,7 @@ jobs:
157157
steps:
158158

159159
- name: Cancel Previous Runs
160-
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
160+
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
161161
with:
162162
access_token: ${{ github.token }}
163163

CHANGELOG.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,35 @@
4343

4444
# `libp2p` facade crate
4545

46-
# 0.46.0 [unreleased]
46+
# 0.47.0 [unreleased]
47+
48+
- Update to [`libp2p-dcutr` `v0.5.0`](protocols/dcutr/CHANGELOG.md#050).
49+
50+
- Update to [`libp2p-rendezvous` `v0.8.0`](protocols/rendezvous/CHANGELOG.md#080).
51+
52+
- Update to [`libp2p-ping` `v0.38.0`](protocols/ping/CHANGELOG.md#0380).
53+
54+
- Update to [`libp2p-identify` `v0.38.0`](protocols/identify/CHANGELOG.md#0380).
55+
56+
- Update to [`libp2p-floodsub` `v0.38.0`](protocols/floodsub/CHANGELOG.md#0380).
57+
58+
- Update to [`libp2p-relay` `v0.11.0`](protocols/relay/CHANGELOG.md#0110).
59+
60+
- Update to [`libp2p-metrics` `v0.8.0`](misc/metrics/CHANGELOG.md#080).
61+
62+
- Update to [`libp2p-kad` `v0.39.0`](protocols/kad/CHANGELOG.md#0390).
63+
64+
- Update to [`libp2p-autonat` `v0.6.0`](protocols/autonat/CHANGELOG.md#060).
65+
66+
- Update to [`libp2p-request-response` `v0.20.0`](protocols/request-response/CHANGELOG.md#0200).
67+
68+
- Update to [`libp2p-swarm` `v0.38.0`](swarm/CHANGELOG.md#0380).
69+
70+
# 0.46.1
71+
72+
- Update to `libp2p-derive` [`v0.28.0`](swarm-derive/CHANGELOG.md#0280).
73+
74+
# 0.46.0
4775

4876
- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
4977
- Added weak dependencies for features. See [PR 2646].

Cargo.toml

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ default = [
3636
"websocket",
3737
"yamux",
3838
]
39+
3940
autonat = ["dep:libp2p-autonat"]
4041
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
4142
deflate = ["dep:libp2p-deflate"]
@@ -78,23 +79,23 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
7879
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
7980
lazy_static = "1.2"
8081

81-
libp2p-autonat = { version = "0.5.0", path = "protocols/autonat", optional = true }
82+
libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
8283
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
83-
libp2p-dcutr = { version = "0.4.0", path = "protocols/dcutr", optional = true }
84-
libp2p-floodsub = { version = "0.37.0", path = "protocols/floodsub", optional = true }
85-
libp2p-identify = { version = "0.37.0", path = "protocols/identify", optional = true }
86-
libp2p-kad = { version = "0.38.0", path = "protocols/kad", optional = true }
87-
libp2p-metrics = { version = "0.7.0", path = "misc/metrics", optional = true }
84+
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
85+
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
86+
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
87+
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
88+
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
8889
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
8990
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
90-
libp2p-ping = { version = "0.37.0", path = "protocols/ping", optional = true }
91+
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
9192
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
9293
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
93-
libp2p-relay = { version = "0.10.0", path = "protocols/relay", optional = true }
94-
libp2p-rendezvous = { version = "0.7.0", path = "protocols/rendezvous", optional = true }
95-
libp2p-request-response = { version = "0.19.0", path = "protocols/request-response", optional = true }
96-
libp2p-swarm = { version = "0.37.0", path = "swarm" }
97-
libp2p-swarm-derive = { version = "0.27.0", path = "swarm-derive" }
94+
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
95+
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
96+
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
97+
libp2p-swarm = { version = "0.38.0", path = "swarm" }
98+
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
9899
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
99100
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
100101
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
@@ -107,13 +108,13 @@ smallvec = "1.6.1"
107108
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
108109
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
109110
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
110-
libp2p-mdns = { version = "0.38.0", path = "protocols/mdns", optional = true }
111+
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
111112
libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true }
112113
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
113114
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }
114115

115116
[target.'cfg(not(target_os = "unknown"))'.dependencies]
116-
libp2p-gossipsub = { version = "0.39.0", path = "protocols/gossipsub", optional = true }
117+
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }
117118

118119
[dev-dependencies]
119120
async-std = { version = "1.6.2", features = ["attributes"] }

core/CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
# 0.34.0 - unreleased
1+
# 0.34.0
22

33
- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
44
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
55
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
6+
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].
7+
8+
- Remove the concept of individual `Transport::Listener` streams from `Transport`.
9+
Instead the `Transport` is polled directly via `Transport::poll`. The
10+
`Transport` is now responsible for driving its listeners. See [PR 2652].
611

712
[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
813
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
14+
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
15+
[PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652
916

1017
# 0.33.0
1118

core/src/connection.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
4343
}
4444
}
4545

46-
/// The ID of a single listener.
47-
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
48-
pub struct ListenerId(u64);
49-
50-
impl ListenerId {
51-
/// Creates a `ListenerId` from a non-negative integer.
52-
pub fn new(id: u64) -> Self {
53-
Self(id)
54-
}
55-
}
56-
57-
impl std::ops::Add<u64> for ListenerId {
58-
type Output = Self;
59-
60-
fn add(self, other: u64) -> Self {
61-
Self(self.0 + other)
62-
}
63-
}
64-
6546
/// The endpoint roles associated with a peer-to-peer communication channel.
6647
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
6748
pub enum Endpoint {

core/src/either.rs

Lines changed: 53 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
use crate::{
2222
muxing::{StreamMuxer, StreamMuxerEvent},
23-
transport::{ListenerEvent, Transport, TransportError},
23+
transport::{ListenerId, Transport, TransportError, TransportEvent},
2424
Multiaddr, ProtocolName,
2525
};
2626
use futures::{
@@ -203,7 +203,7 @@ where
203203
{
204204
type Substream = EitherOutput<A::Substream, B::Substream>;
205205
type OutboundSubstream = EitherOutbound<A, B>;
206-
type Error = io::Error;
206+
type Error = EitherError<A::Error, B::Error>;
207207

208208
fn poll_event(
209209
&self,
@@ -212,11 +212,11 @@ where
212212
match self {
213213
EitherOutput::First(inner) => inner
214214
.poll_event(cx)
215-
.map_err(|e| e.into())
215+
.map_err(EitherError::A)
216216
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
217217
EitherOutput::Second(inner) => inner
218218
.poll_event(cx)
219-
.map_err(|e| e.into())
219+
.map_err(EitherError::B)
220220
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
221221
}
222222
}
@@ -237,11 +237,11 @@ where
237237
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
238238
.poll_outbound(cx, substream)
239239
.map(|p| p.map(EitherOutput::First))
240-
.map_err(|e| e.into()),
240+
.map_err(EitherError::A),
241241
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
242242
.poll_outbound(cx, substream)
243243
.map(|p| p.map(EitherOutput::Second))
244-
.map_err(|e| e.into()),
244+
.map_err(EitherError::B),
245245
_ => panic!("Wrong API usage"),
246246
}
247247
}
@@ -261,8 +261,8 @@ where
261261

262262
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263263
match self {
264-
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
265-
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
264+
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
265+
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
266266
}
267267
}
268268
}
@@ -274,48 +274,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
274274
B(B::OutboundSubstream),
275275
}
276276

277-
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
278-
#[pin_project(project = EitherListenStreamProj)]
279-
#[derive(Debug, Copy, Clone)]
280-
#[must_use = "futures do nothing unless polled"]
281-
pub enum EitherListenStream<A, B> {
282-
First(#[pin] A),
283-
Second(#[pin] B),
284-
}
285-
286-
impl<AStream, BStream, AInner, BInner, AError, BError> Stream
287-
for EitherListenStream<AStream, BStream>
288-
where
289-
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
290-
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
291-
{
292-
type Item = Result<
293-
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
294-
EitherError<AError, BError>,
295-
>;
296-
297-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298-
match self.project() {
299-
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
300-
Poll::Pending => Poll::Pending,
301-
Poll::Ready(None) => Poll::Ready(None),
302-
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
303-
.map(EitherFuture::First)
304-
.map_err(EitherError::A)))),
305-
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
306-
},
307-
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
308-
Poll::Pending => Poll::Pending,
309-
Poll::Ready(None) => Poll::Ready(None),
310-
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
311-
.map(EitherFuture::Second)
312-
.map_err(EitherError::B)))),
313-
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
314-
},
315-
}
316-
}
317-
}
318-
319277
/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
320278
#[pin_project(project = EitherFutureProj)]
321279
#[derive(Debug, Copy, Clone)]
@@ -385,11 +343,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
385343
}
386344
}
387345
}
388-
389-
#[derive(Debug, Copy, Clone)]
346+
#[pin_project(project = EitherTransportProj)]
347+
#[derive(Debug)]
348+
#[must_use = "transports do nothing unless polled"]
390349
pub enum EitherTransport<A, B> {
391-
Left(A),
392-
Right(B),
350+
Left(#[pin] A),
351+
Right(#[pin] B),
393352
}
394353

395354
impl<A, B> Transport for EitherTransport<A, B>
@@ -399,29 +358,54 @@ where
399358
{
400359
type Output = EitherOutput<A::Output, B::Output>;
401360
type Error = EitherError<A::Error, B::Error>;
402-
type Listener = EitherListenStream<A::Listener, B::Listener>;
403361
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
404362
type Dial = EitherFuture<A::Dial, B::Dial>;
405363

406-
fn listen_on(
407-
&mut self,
408-
addr: Multiaddr,
409-
) -> Result<Self::Listener, TransportError<Self::Error>> {
410-
use TransportError::*;
411-
match self {
412-
EitherTransport::Left(a) => match a.listen_on(addr) {
413-
Ok(listener) => Ok(EitherListenStream::First(listener)),
414-
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
415-
Err(Other(err)) => Err(Other(EitherError::A(err))),
364+
fn poll(
365+
self: Pin<&mut Self>,
366+
cx: &mut Context<'_>,
367+
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
368+
match self.project() {
369+
EitherTransportProj::Left(a) => match a.poll(cx) {
370+
Poll::Pending => Poll::Pending,
371+
Poll::Ready(event) => Poll::Ready(
372+
event
373+
.map_upgrade(EitherFuture::First)
374+
.map_err(EitherError::A),
375+
),
416376
},
417-
EitherTransport::Right(b) => match b.listen_on(addr) {
418-
Ok(listener) => Ok(EitherListenStream::Second(listener)),
419-
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
420-
Err(Other(err)) => Err(Other(EitherError::B(err))),
377+
EitherTransportProj::Right(b) => match b.poll(cx) {
378+
Poll::Pending => Poll::Pending,
379+
Poll::Ready(event) => Poll::Ready(
380+
event
381+
.map_upgrade(EitherFuture::Second)
382+
.map_err(EitherError::B),
383+
),
421384
},
422385
}
423386
}
424387

388+
fn remove_listener(&mut self, id: ListenerId) -> bool {
389+
match self {
390+
EitherTransport::Left(t) => t.remove_listener(id),
391+
EitherTransport::Right(t) => t.remove_listener(id),
392+
}
393+
}
394+
395+
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
396+
use TransportError::*;
397+
match self {
398+
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
399+
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
400+
Other(err) => Other(EitherError::A(err)),
401+
}),
402+
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
403+
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
404+
Other(err) => Other(EitherError::B(err)),
405+
}),
406+
}
407+
}
408+
425409
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
426410
use TransportError::*;
427411
match self {

0 commit comments

Comments
 (0)