-
Notifications
You must be signed in to change notification settings - Fork 65
Make Sink
and future-util
optional
#143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
fb35b2a
dc29f8d
9208c7d
4481a97
814112d
9bb9c2b
18a274b
9ca7965
6fe059f
f010d7a
b0e700a
fb9dcc2
a626706
8b5762b
3202986
abe97c0
2f7c1d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,17 +58,18 @@ | |
))] | ||
pub mod stream; | ||
|
||
use std::io::{Read, Write}; | ||
use std::{ | ||
io::{Read, Write}, | ||
pin::Pin, | ||
task::{ready, Context, Poll}, | ||
sdroege marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
||
use compat::{cvt, AllowStd, ContextWaker}; | ||
use futures_core::stream::{FusedStream, Stream}; | ||
use futures_io::{AsyncRead, AsyncWrite}; | ||
use futures_util::{ | ||
sink::{Sink, SinkExt}, | ||
stream::{FusedStream, Stream}, | ||
}; | ||
#[cfg(feature = "sink")] | ||
use futures_util::SinkExt; | ||
use log::*; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
#[cfg(feature = "handshake")] | ||
use tungstenite::{ | ||
|
@@ -227,7 +228,7 @@ | |
#[derive(Debug)] | ||
pub struct WebSocketStream<S> { | ||
inner: WebSocket<AllowStd<S>>, | ||
closing: bool, | ||
Check warning on line 231 in src/lib.rs
|
||
ended: bool, | ||
/// Tungstenite is probably ready to receive more data. | ||
/// | ||
|
@@ -337,7 +338,7 @@ | |
return Poll::Ready(None); | ||
} | ||
|
||
match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { | ||
match ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { | ||
#[cfg(feature = "verbose-logging")] | ||
trace!( | ||
"{}:{} Stream.with_context poll_next -> read()", | ||
|
@@ -368,7 +369,8 @@ | |
} | ||
} | ||
|
||
impl<T> Sink<Message> for WebSocketStream<T> | ||
#[cfg(feature = "sink")] | ||
impl<T> futures_util::Sink<Message> for WebSocketStream<T> | ||
where | ||
T: AsyncRead + AsyncWrite + Unpin, | ||
{ | ||
|
@@ -446,6 +448,88 @@ | |
} | ||
} | ||
|
||
#[cfg(not(feature = "sink"))] | ||
impl<S> WebSocketStream<S> { | ||
/// Simple send method to replace `futures_sink::Sink` (till v0.3). | ||
pub async fn send(&mut self, msg: Message) -> Result<(), WsError> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't hurt to provide this always, and should make sure that both this and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think apart from this one it's all ready There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new function is already used by everything (e.g. the autobahn tests). Maybe you can change it so that the server uses |
||
where | ||
S: AsyncRead + AsyncWrite + Unpin, | ||
{ | ||
Send::new(self, msg).await | ||
} | ||
} | ||
|
||
#[cfg(not(feature = "sink"))] | ||
struct Send<'a, S> { | ||
ws: &'a mut WebSocketStream<S>, | ||
msg: Option<Message>, | ||
} | ||
|
||
#[cfg(not(feature = "sink"))] | ||
impl<'a, S> Send<'a, S> | ||
where | ||
S: AsyncRead + AsyncWrite + Unpin, | ||
{ | ||
fn new(ws: &'a mut WebSocketStream<S>, msg: Message) -> Self { | ||
Self { ws, msg: Some(msg) } | ||
} | ||
} | ||
|
||
#[cfg(not(feature = "sink"))] | ||
impl<S> std::future::Future for Send<'_, S> | ||
where | ||
S: AsyncRead + AsyncWrite + Unpin, | ||
{ | ||
type Output = Result<(), WsError>; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
if self.msg.is_some() { | ||
if !self.ws.ready { | ||
// Currently blocked so try to flush the blockage away | ||
let polled = self | ||
.ws | ||
.with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())) | ||
.map(|r| { | ||
self.ws.ready = true; | ||
r | ||
}); | ||
ready!(polled)? | ||
} | ||
|
||
let msg = self.msg.take().expect("unreachable"); | ||
match self.ws.with_context(None, |s| s.write(msg)) { | ||
Ok(_) => Ok(()), | ||
Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { | ||
// the message was accepted and queued so not an error | ||
// | ||
// set to false here for cancel safe of *this* Future | ||
stackinspector marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.ws.ready = false; | ||
Ok(()) | ||
} | ||
Err(e) => { | ||
debug!("websocket start_send error: {}", e); | ||
Err(e) | ||
} | ||
}?; | ||
} | ||
|
||
let polled = self | ||
.ws | ||
.with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())) | ||
.map(|r| { | ||
self.ws.ready = true; | ||
match r { | ||
// WebSocket connection has just been closed. Flushing completed, not an error. | ||
Err(WsError::ConnectionClosed) => Ok(()), | ||
other => other, | ||
} | ||
}); | ||
ready!(polled)?; | ||
|
||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
|
||
#[cfg(any( | ||
feature = "async-tls", | ||
feature = "async-std-runtime", | ||
|
Uh oh!
There was an error while loading. Please reload this page.