|
1 |
| -use crate::protocol::{ |
2 |
| - ConnAck, Filter, LastWill, Packet, Publish, QoS, RetainForwardRule, Subscribe, |
3 |
| -}; |
| 1 | +use crate::protocol::{Filter, LastWill, Packet, Publish, QoS, RetainForwardRule, Subscribe}; |
4 | 2 | use crate::router::Ack;
|
5 | 3 | use crate::router::{
|
6 | 4 | iobufs::{Incoming, Outgoing},
|
@@ -127,121 +125,6 @@ impl<'a> LinkBuilder<'a> {
|
127 | 125 | }
|
128 | 126 | }
|
129 | 127 |
|
130 |
| -pub struct Link; |
131 |
| - |
132 |
| -#[deprecated = "Link will be removed soon, please consider using LinkBuilder"] |
133 |
| -impl Link { |
134 |
| - #[allow(clippy::type_complexity)] |
135 |
| - fn prepare( |
136 |
| - tenant_id: Option<String>, |
137 |
| - client_id: &str, |
138 |
| - clean: bool, |
139 |
| - last_will: Option<LastWill>, |
140 |
| - dynamic_filters: bool, |
141 |
| - topic_alias_max: u16, |
142 |
| - ) -> ( |
143 |
| - Event, |
144 |
| - Arc<Mutex<VecDeque<Packet>>>, |
145 |
| - Arc<Mutex<VecDeque<Notification>>>, |
146 |
| - Receiver<()>, |
147 |
| - ) { |
148 |
| - let connection = Connection::new( |
149 |
| - tenant_id, |
150 |
| - client_id.to_owned(), |
151 |
| - clean, |
152 |
| - last_will, |
153 |
| - dynamic_filters, |
154 |
| - topic_alias_max, |
155 |
| - ); |
156 |
| - let incoming = Incoming::new(connection.client_id.to_owned()); |
157 |
| - let (outgoing, link_rx) = Outgoing::new(connection.client_id.to_owned()); |
158 |
| - let outgoing_data_buffer = outgoing.buffer(); |
159 |
| - let incoming_data_buffer = incoming.buffer(); |
160 |
| - |
161 |
| - let event = Event::Connect { |
162 |
| - connection, |
163 |
| - incoming, |
164 |
| - outgoing, |
165 |
| - }; |
166 |
| - |
167 |
| - (event, incoming_data_buffer, outgoing_data_buffer, link_rx) |
168 |
| - } |
169 |
| - |
170 |
| - #[allow(clippy::new_ret_no_self)] |
171 |
| - pub fn new( |
172 |
| - tenant_id: Option<String>, |
173 |
| - client_id: &str, |
174 |
| - router_tx: Sender<(ConnectionId, Event)>, |
175 |
| - clean: bool, |
176 |
| - last_will: Option<LastWill>, |
177 |
| - dynamic_filters: bool, |
178 |
| - topic_alias_max: Option<u16>, |
179 |
| - ) -> Result<(LinkTx, LinkRx, Notification), LinkError> { |
180 |
| - // Connect to router |
181 |
| - // Local connections to the router shall have access to all subscriptions |
182 |
| - |
183 |
| - let (message, i, o, link_rx) = Link::prepare( |
184 |
| - tenant_id, |
185 |
| - client_id, |
186 |
| - clean, |
187 |
| - last_will, |
188 |
| - dynamic_filters, |
189 |
| - topic_alias_max.unwrap_or(0), |
190 |
| - ); |
191 |
| - router_tx.send((0, message))?; |
192 |
| - |
193 |
| - link_rx.recv()?; |
194 |
| - let notification = o.lock().pop_front().unwrap(); |
195 |
| - |
196 |
| - // Right now link identifies failure with dropped rx in router, |
197 |
| - // which is probably ok. We need this here to get id assigned by router |
198 |
| - let id = match notification { |
199 |
| - Notification::DeviceAck(Ack::ConnAck(id, ..)) => id, |
200 |
| - _message => return Err(LinkError::NotConnectionAck), |
201 |
| - }; |
202 |
| - |
203 |
| - let tx = LinkTx::new(id, router_tx.clone(), i); |
204 |
| - let rx = LinkRx::new(id, router_tx, link_rx, o); |
205 |
| - Ok((tx, rx, notification)) |
206 |
| - } |
207 |
| - |
208 |
| - pub async fn init( |
209 |
| - tenant_id: Option<String>, |
210 |
| - client_id: &str, |
211 |
| - router_tx: Sender<(ConnectionId, Event)>, |
212 |
| - clean: bool, |
213 |
| - last_will: Option<LastWill>, |
214 |
| - dynamic_filters: bool, |
215 |
| - topic_alias_max: Option<u16>, |
216 |
| - ) -> Result<(LinkTx, LinkRx, ConnAck), LinkError> { |
217 |
| - // Connect to router |
218 |
| - // Local connections to the router shall have access to all subscriptions |
219 |
| - |
220 |
| - let (message, i, o, link_rx) = Link::prepare( |
221 |
| - tenant_id, |
222 |
| - client_id, |
223 |
| - clean, |
224 |
| - last_will, |
225 |
| - dynamic_filters, |
226 |
| - topic_alias_max.unwrap_or(0), |
227 |
| - ); |
228 |
| - router_tx.send_async((0, message)).await?; |
229 |
| - |
230 |
| - link_rx.recv_async().await?; |
231 |
| - let notification = o.lock().pop_front().unwrap(); |
232 |
| - // Right now link identifies failure with dropped rx in router, |
233 |
| - // which is probably ok. We need this here to get id assigned by router |
234 |
| - let (id, ack) = match notification { |
235 |
| - Notification::DeviceAck(Ack::ConnAck(id, ack, _)) => (id, ack), |
236 |
| - _message => return Err(LinkError::NotConnectionAck), |
237 |
| - }; |
238 |
| - |
239 |
| - let tx = LinkTx::new(id, router_tx.clone(), i); |
240 |
| - let rx = LinkRx::new(id, router_tx, link_rx, o); |
241 |
| - Ok((tx, rx, ack)) |
242 |
| - } |
243 |
| -} |
244 |
| - |
245 | 128 | pub struct LinkTx {
|
246 | 129 | pub(crate) connection_id: ConnectionId,
|
247 | 130 | router_tx: Sender<(ConnectionId, Event)>,
|
|
0 commit comments