Skip to content

Commit 354900f

Browse files
committed
protocols/dcutr: Implement Encoder and Decoder for DCUtR prost messages
1 parent ce93ec2 commit 354900f

File tree

4 files changed

+126
-70
lines changed

4 files changed

+126
-70
lines changed

protocols/dcutr/src/protocol.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21+
mod codec;
2122
pub mod inbound;
2223
pub mod outbound;
2324

2425
const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr";
25-
26-
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;

protocols/dcutr/src/protocol/codec.rs

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2022 Protocol Labs.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use crate::message_proto;
22+
use bytes::BytesMut;
23+
use prost::Message;
24+
use std::io::Cursor;
25+
use thiserror::Error;
26+
use unsigned_varint::codec::UviBytes;
27+
28+
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
29+
30+
pub struct Codec(UviBytes);
31+
32+
impl Codec {
33+
pub fn new() -> Self {
34+
let mut codec = UviBytes::default();
35+
codec.set_max_len(MAX_MESSAGE_SIZE_BYTES);
36+
Self(codec)
37+
}
38+
}
39+
40+
impl asynchronous_codec::Encoder for Codec {
41+
type Item = message_proto::HolePunch;
42+
type Error = Error;
43+
44+
fn encode(
45+
&mut self,
46+
item: Self::Item,
47+
dst: &mut asynchronous_codec::BytesMut,
48+
) -> Result<(), Self::Error> {
49+
let mut encoded_msg = BytesMut::new();
50+
item.encode(&mut encoded_msg)
51+
.expect("BytesMut to have sufficient capacity.");
52+
self.0
53+
.encode(encoded_msg.freeze(), dst)
54+
.map_err(|e| e.into())
55+
}
56+
}
57+
58+
impl asynchronous_codec::Decoder for Codec {
59+
type Item = message_proto::HolePunch;
60+
type Error = Error;
61+
62+
fn decode(
63+
&mut self,
64+
src: &mut asynchronous_codec::BytesMut,
65+
) -> Result<Option<Self::Item>, Self::Error> {
66+
Ok(self
67+
.0
68+
.decode(src)?
69+
.map(|msg| message_proto::HolePunch::decode(Cursor::new(msg)))
70+
.transpose()?)
71+
}
72+
}
73+
74+
#[derive(Debug, Error)]
75+
pub enum Error {
76+
#[error("Failed to decode response: {0}.")]
77+
Decode(
78+
#[from]
79+
#[source]
80+
prost::DecodeError,
81+
),
82+
#[error("Io error {0}")]
83+
Io(
84+
#[from]
85+
#[source]
86+
std::io::Error,
87+
),
88+
}

protocols/dcutr/src/protocol/inbound.rs

+22-35
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020

2121
use crate::message_proto::{hole_punch, HolePunch};
2222
use asynchronous_codec::Framed;
23-
use bytes::BytesMut;
2423
use futures::{future::BoxFuture, prelude::*};
2524
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
2625
use libp2p_swarm::NegotiatedSubstream;
27-
use prost::Message;
2826
use std::convert::TryFrom;
29-
use std::io::Cursor;
3027
use std::iter;
3128
use thiserror::Error;
32-
use unsigned_varint::codec::UviBytes;
3329

3430
pub struct Upgrade {}
3531

@@ -48,17 +44,17 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
4844
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
4945

5046
fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
51-
let mut codec = UviBytes::default();
52-
codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES);
53-
let mut substream = Framed::new(substream, codec);
47+
let mut substream = Framed::new(substream, super::codec::Codec::new());
5448

5549
async move {
56-
let msg: bytes::BytesMut = substream
57-
.next()
58-
.await
59-
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
60-
61-
let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?;
50+
let HolePunch { r#type, obs_addrs } =
51+
substream
52+
.next()
53+
.await
54+
.ok_or(super::codec::Error::Io(std::io::Error::new(
55+
std::io::ErrorKind::UnexpectedEof,
56+
"",
57+
)))??;
6258

6359
let obs_addrs = if obs_addrs.is_empty() {
6460
return Err(UpgradeError::NoAddresses);
@@ -92,7 +88,7 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
9288
}
9389

9490
pub struct PendingConnect {
95-
substream: Framed<NegotiatedSubstream, UviBytes>,
91+
substream: Framed<NegotiatedSubstream, super::codec::Codec>,
9692
remote_obs_addrs: Vec<Multiaddr>,
9793
}
9894

@@ -106,18 +102,15 @@ impl PendingConnect {
106102
obs_addrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(),
107103
};
108104

109-
let mut encoded_msg = BytesMut::new();
110-
msg.encode(&mut encoded_msg)
111-
.expect("BytesMut to have sufficient capacity.");
112-
113-
self.substream.send(encoded_msg.freeze()).await?;
114-
let msg: bytes::BytesMut = self
115-
.substream
116-
.next()
117-
.await
118-
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
119-
120-
let HolePunch { r#type, .. } = HolePunch::decode(Cursor::new(msg))?;
105+
self.substream.send(msg).await?;
106+
let HolePunch { r#type, .. } =
107+
self.substream
108+
.next()
109+
.await
110+
.ok_or(super::codec::Error::Io(std::io::Error::new(
111+
std::io::ErrorKind::UnexpectedEof,
112+
"",
113+
)))??;
121114

122115
let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?;
123116
match r#type {
@@ -131,17 +124,11 @@ impl PendingConnect {
131124

132125
#[derive(Debug, Error)]
133126
pub enum UpgradeError {
134-
#[error("Failed to decode response: {0}.")]
135-
Decode(
136-
#[from]
137-
#[source]
138-
prost::DecodeError,
139-
),
140-
#[error("Io error {0}")]
141-
Io(
127+
#[error("Failed to encode or decode: {0}")]
128+
Codec(
142129
#[from]
143130
#[source]
144-
std::io::Error,
131+
super::codec::Error,
145132
),
146133
#[error("Expected at least one address in reservation.")]
147134
NoAddresses,

protocols/dcutr/src/protocol/outbound.rs

+15-33
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,14 @@
2020

2121
use crate::message_proto::{hole_punch, HolePunch};
2222
use asynchronous_codec::Framed;
23-
use bytes::BytesMut;
2423
use futures::{future::BoxFuture, prelude::*};
2524
use futures_timer::Delay;
2625
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
2726
use libp2p_swarm::NegotiatedSubstream;
28-
use prost::Message;
2927
use std::convert::TryFrom;
30-
use std::io::Cursor;
3128
use std::iter;
3229
use std::time::Instant;
3330
use thiserror::Error;
34-
use unsigned_varint::codec::UviBytes;
3531

3632
pub struct Upgrade {
3733
obs_addrs: Vec<Multiaddr>,
@@ -58,33 +54,29 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
5854
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
5955

6056
fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
57+
let mut substream = Framed::new(substream, super::codec::Codec::new());
58+
6159
let msg = HolePunch {
6260
r#type: hole_punch::Type::Connect.into(),
6361
obs_addrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(),
6462
};
6563

66-
let mut encoded_msg = BytesMut::new();
67-
msg.encode(&mut encoded_msg)
68-
.expect("BytesMut to have sufficient capacity.");
69-
70-
let mut codec = UviBytes::default();
71-
codec.set_max_len(super::MAX_MESSAGE_SIZE_BYTES);
72-
let mut substream = Framed::new(substream, codec);
73-
7464
async move {
75-
substream.send(encoded_msg.freeze()).await?;
65+
substream.send(msg).await?;
7666

7767
let sent_time = Instant::now();
7868

79-
let msg: bytes::BytesMut = substream
80-
.next()
81-
.await
82-
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
69+
let HolePunch { r#type, obs_addrs } =
70+
substream
71+
.next()
72+
.await
73+
.ok_or(super::codec::Error::Io(std::io::Error::new(
74+
std::io::ErrorKind::UnexpectedEof,
75+
"",
76+
)))??;
8377

8478
let rtt = sent_time.elapsed();
8579

86-
let HolePunch { r#type, obs_addrs } = HolePunch::decode(Cursor::new(msg))?;
87-
8880
let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?;
8981
match r#type {
9082
hole_punch::Type::Connect => {}
@@ -111,11 +103,7 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
111103
obs_addrs: vec![],
112104
};
113105

114-
let mut encoded_msg = BytesMut::new();
115-
msg.encode(&mut encoded_msg)
116-
.expect("BytesMut to have sufficient capacity.");
117-
118-
substream.send(encoded_msg.freeze()).await?;
106+
substream.send(msg).await?;
119107

120108
Delay::new(rtt / 2).await;
121109

@@ -131,17 +119,11 @@ pub struct Connect {
131119

132120
#[derive(Debug, Error)]
133121
pub enum UpgradeError {
134-
#[error("Failed to decode response: {0}.")]
135-
Decode(
136-
#[from]
137-
#[source]
138-
prost::DecodeError,
139-
),
140-
#[error("Io error {0}")]
141-
Io(
122+
#[error("Failed to encode or decode: {0}")]
123+
Codec(
142124
#[from]
143125
#[source]
144-
std::io::Error,
126+
super::codec::Error,
145127
),
146128
#[error("Expected 'status' field to be set.")]
147129
MissingStatusField,

0 commit comments

Comments
 (0)