Skip to content

Commit 25093ca

Browse files
authored
Builder pattern for constructing Link (bytebeamio#659)
1 parent aeeec87 commit 25093ca

File tree

6 files changed

+120
-28
lines changed

6 files changed

+120
-28
lines changed

rumqttd/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111
- Support for Websocket connections (#633)
12+
- LinkBuilder for constructing LinkRx/LinkTx (#659)
13+
1214
### Changed
1315

1416
### Deprecated
17+
- Link and its implementation, use LinkBuilder instead
1518

1619
### Removed
1720

rumqttd/src/link/bridge.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ use tokio_rustls::{
2626
use tracing::*;
2727

2828
use crate::{
29-
link::{
30-
local::{Link, LinkError},
31-
network::Network,
32-
},
29+
link::{local::LinkError, network::Network},
30+
local::LinkBuilder,
3331
protocol::{self, Connect, Packet, PingReq, Protocol, QoS, RetainForwardRule, Subscribe},
3432
router::Event,
3533
BridgeConfig, ConnectionId, Notification, Transport,
@@ -59,7 +57,9 @@ where
5957
"Starting bridge with subscription on filter \"{}\"",
6058
&config.sub_path,
6159
);
62-
let (mut tx, mut rx, _ack) = Link::new(None, &config.name, router_tx, true, None, true, None)?;
60+
let (mut tx, mut rx, _ack) = LinkBuilder::new(&config.name, router_tx)
61+
.dynamic_filters(true)
62+
.build()?;
6363

6464
'outer: loop {
6565
let mut network = match network_connect(&config, &config.addr, protocol.clone()).await {

rumqttd/src/link/console.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crate::link::local::{Link, LinkRx};
1+
use crate::link::local::LinkRx;
2+
use crate::local::LinkBuilder;
23
use crate::router::{Event, Print};
34
use crate::{ConnectionId, ConsoleSettings};
45
use axum::extract::{Path, State};
@@ -23,8 +24,10 @@ impl ConsoleLink {
2324
/// Requires the corresponding Router to be running to complete
2425
pub fn new(config: ConsoleSettings, router_tx: Sender<(ConnectionId, Event)>) -> ConsoleLink {
2526
let tx = router_tx.clone();
26-
let (link_tx, link_rx, _ack) =
27-
Link::new(None, "console", tx, true, None, true, None).unwrap();
27+
let (link_tx, link_rx, _ack) = LinkBuilder::new("console", tx)
28+
.dynamic_filters(true)
29+
.build()
30+
.unwrap();
2831
let connection_id = link_tx.connection_id;
2932
ConsoleLink {
3033
config,

rumqttd/src/link/local.rs

+93
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,101 @@ pub enum LinkError {
3535
Elapsed(#[from] tokio::time::error::Elapsed),
3636
}
3737

38+
// used to build LinkTx and LinkRx
39+
pub struct LinkBuilder<'a> {
40+
tenant_id: Option<String>,
41+
client_id: &'a str,
42+
router_tx: Sender<(ConnectionId, Event)>,
43+
// true by default
44+
clean_session: bool,
45+
last_will: Option<LastWill>,
46+
// false by default
47+
dynamic_filters: bool,
48+
// default to 0, indicating to not use topic alias
49+
topic_alias_max: u16,
50+
}
51+
52+
impl<'a> LinkBuilder<'a> {
53+
pub fn new(client_id: &'a str, router_tx: Sender<(ConnectionId, Event)>) -> Self {
54+
LinkBuilder {
55+
client_id,
56+
router_tx,
57+
tenant_id: None,
58+
clean_session: true,
59+
last_will: None,
60+
dynamic_filters: false,
61+
topic_alias_max: 0,
62+
}
63+
}
64+
65+
pub fn tenant_id(mut self, tenant_id: Option<String>) -> Self {
66+
self.tenant_id = tenant_id;
67+
self
68+
}
69+
70+
pub fn last_will(mut self, last_will: Option<LastWill>) -> Self {
71+
self.last_will = last_will;
72+
self
73+
}
74+
75+
pub fn topic_alias_max(mut self, max: u16) -> Self {
76+
self.topic_alias_max = max;
77+
self
78+
}
79+
80+
pub fn clean_session(mut self, clean: bool) -> Self {
81+
self.clean_session = clean;
82+
self
83+
}
84+
85+
pub fn dynamic_filters(mut self, dynamic_filters: bool) -> Self {
86+
self.dynamic_filters = dynamic_filters;
87+
self
88+
}
89+
90+
pub fn build(self) -> Result<(LinkTx, LinkRx, Notification), LinkError> {
91+
// Connect to router
92+
// Local connections to the router shall have access to all subscriptions
93+
let connection = Connection::new(
94+
self.tenant_id,
95+
self.client_id.to_owned(),
96+
self.clean_session,
97+
self.last_will,
98+
self.dynamic_filters,
99+
self.topic_alias_max,
100+
);
101+
let incoming = Incoming::new(connection.client_id.to_owned());
102+
let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned());
103+
let outgoing_data_buffer = outgoing.buffer();
104+
let incoming_data_buffer = incoming.buffer();
105+
106+
let event = Event::Connect {
107+
connection,
108+
incoming,
109+
outgoing,
110+
};
111+
112+
self.router_tx.send((0, event))?;
113+
114+
link_rx.recv()?;
115+
let notification = outgoing_data_buffer.lock().pop_front().unwrap();
116+
117+
// Right now link identifies failure with dropped rx in router,
118+
// which is probably ok. We need this here to get id assigned by router
119+
let id = match notification {
120+
Notification::DeviceAck(Ack::ConnAck(id, ..)) => id,
121+
_message => return Err(LinkError::NotConnectionAck),
122+
};
123+
124+
let tx = LinkTx::new(id, self.router_tx.clone(), incoming_data_buffer);
125+
let rx = LinkRx::new(id, self.router_tx, link_rx, outgoing_data_buffer);
126+
Ok((tx, rx, notification))
127+
}
128+
}
129+
38130
pub struct Link;
39131

132+
#[deprecated = "Link will be removed soon, please consider using LinkBuilder"]
40133
impl Link {
41134
#[allow(clippy::type_complexity)]
42135
fn prepare(

rumqttd/src/link/remote.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use crate::link::local::{Link, LinkError, LinkRx, LinkTx};
1+
use crate::link::local::{LinkError, LinkRx, LinkTx};
22
use crate::link::network;
33
use crate::link::network::Network;
4+
use crate::local::LinkBuilder;
45
use crate::protocol::{Connect, Packet, Protocol};
56
use crate::router::{Event, Notification};
67
use crate::{ConnectionId, ConnectionSettings};
@@ -121,15 +122,13 @@ impl<P: Protocol> RemoteLink<P> {
121122

122123
let topic_alias_max = props.and_then(|p| p.topic_alias_max);
123124

124-
let (link_tx, link_rx, notification) = Link::new(
125-
tenant_id,
126-
&client_id,
127-
router_tx,
128-
clean_session,
129-
lastwill,
130-
dynamic_filters,
131-
topic_alias_max,
132-
)?;
125+
let (link_tx, link_rx, notification) = LinkBuilder::new(&client_id, router_tx)
126+
.tenant_id(tenant_id)
127+
.clean_session(clean_session)
128+
.last_will(lastwill)
129+
.dynamic_filters(dynamic_filters)
130+
.topic_alias_max(topic_alias_max.unwrap_or(0))
131+
.build()?;
133132

134133
let id = link_rx.id();
135134
Span::current().record("connection_id", id);

rumqttd/src/server/broker.rs

+4-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::link::console::ConsoleLink;
33
use crate::link::network::{Network, N};
44
use crate::link::remote::{self, RemoteLink};
55
use crate::link::{bridge, timer};
6+
use crate::local::LinkBuilder;
67
use crate::protocol::v4::V4;
78
use crate::protocol::v5::V5;
89
use crate::protocol::Protocol;
@@ -31,7 +32,7 @@ use std::time::Duration;
3132
use std::{io, thread};
3233

3334
use crate::link::console;
34-
use crate::link::local::{self, Link, LinkRx, LinkTx};
35+
use crate::link::local::{self, LinkRx, LinkTx};
3536
use crate::router::{Disconnection, Event, Router};
3637
use crate::{Config, ConnectionId, ServerSettings};
3738
use tokio::net::{TcpListener, TcpStream};
@@ -144,15 +145,8 @@ impl Broker {
144145
pub fn link(&self, client_id: &str) -> Result<(LinkTx, LinkRx), local::LinkError> {
145146
// Register this connection with the router. Router replies with ack which if ok will
146147
// start the link. Router can sometimes reject the connection (ex. max connection limit).
147-
let (link_tx, link_rx, _ack) = Link::new(
148-
None,
149-
client_id,
150-
self.router_tx.clone(),
151-
true,
152-
None,
153-
false,
154-
None,
155-
)?;
148+
let (link_tx, link_rx, _ack) =
149+
LinkBuilder::new(client_id, self.router_tx.clone()).build()?;
156150
Ok((link_tx, link_rx))
157151
}
158152

0 commit comments

Comments
 (0)