Skip to content

Commit ce25de5

Browse files
huntcranile
andauthored
Provides an EventSource implementation (#246)
* Provides an EventSource implementation Modelled on the WebSocket API. * PR feedback * Removes an unnecessary dependency * Adds links to MDN * Remove the interior mutability * Doc improvement * Renamed test function to describe what it actually tests * We can be sure of the type - no need to check * Allow the lack of copy * Streams per subscription * Corrected error handling * Permit clone and explain closing behaviour * See that subscribed streams are properly closed I had previously assumed that the event source would fire an error event on close, but it only does it on open according to the docs. We therefore fire the error event explicitly so that the subscribed streams know when to shut down. * Address review comments * Formatting Co-authored-by: Muhammad Hamza <[email protected]>
1 parent 1926e4e commit ce25de5

File tree

8 files changed

+357
-11
lines changed

8 files changed

+357
-11
lines changed

.github/workflows/tests.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ jobs:
130130
ports:
131131
- 8080:80
132132
echo_server:
133-
image: jmalloc/echo-server@sha256:c461e7e54d947a8777413aaf9c624b4ad1f1bac5d8272475da859ae82c1abd7d
133+
image: jmalloc/echo-server@sha256:e43a10c9ecbd025df7ed6dac1e45551ce7bd676142600b0734fe7dcd10a47abe
134134
ports:
135135
- 8081:8080
136136

@@ -160,7 +160,8 @@ jobs:
160160
- name: Run browser tests
161161
env:
162162
HTTPBIN_URL: "http://localhost:8080"
163-
ECHO_SERVER_URL: "ws://localhost:8081"
163+
WS_ECHO_SERVER_URL: "ws://localhost:8081"
164+
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
164165
run: |
165166
cd crates/net
166167
wasm-pack test --chrome --firefox --headless --all-features
@@ -175,7 +176,8 @@ jobs:
175176
- name: Run native tests
176177
env:
177178
HTTPBIN_URL: "http://localhost:8080"
178-
ECHO_SERVER_URL: "ws://localhost:8081"
179+
WS_ECHO_SERVER_URL: "ws://localhost:8081"
180+
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
179181
uses: actions-rs/cargo@v1
180182
with:
181183
command: test

crates/net/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ wasm-bindgen-test = "0.3"
3737
futures = "0.3"
3838

3939
[features]
40-
default = ["json", "websocket", "http"]
40+
default = ["json", "websocket", "http", "eventsource"]
4141

4242
# Enables `.json()` on `Response`
4343
json = ["serde", "serde_json"]
@@ -79,3 +79,13 @@ http = [
7979
'web-sys/Blob',
8080
'web-sys/FormData',
8181
]
82+
# Enables the EventSource API
83+
eventsource = [
84+
"futures-channel",
85+
"futures-core",
86+
"pin-project",
87+
'web-sys/Event',
88+
'web-sys/EventTarget',
89+
'web-sys/EventSource',
90+
'web-sys/MessageEvent',
91+
]

crates/net/README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<sub>Built with 🦀🕸 by <a href="https://rustwasm.github.io/">The Rust and WebAssembly Working Group</a></sub>
2020
</div>
2121

22-
HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` `fetch` and `WebSocket` API
22+
HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` [`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API), [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) APIs.
2323

2424
## Examples
2525

@@ -55,3 +55,23 @@ spawn_local(async move {
5555
console_log!("WebSocket Closed")
5656
})
5757
```
58+
59+
### EventSource
60+
61+
```rust
62+
use gloo_net::eventsource::futures::EventSource;
63+
use wasm_bindgen_futures::spawn_local;
64+
use futures::{stream, StreamExt};
65+
66+
let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
67+
let stream_1 = es.subscribe("some-event-type").unwrap();
68+
let stream_2 = es.subscribe("another-event-type").unwrap();
69+
70+
spawn_local(async move {
71+
let mut all_streams = stream::select(stream_1, stream_2);
72+
while let Some(Ok((event_type, msg))) = all_streams.next().await {
73+
console_log!(format!("1. {}: {:?}", event_type, msg))
74+
}
75+
console_log!("EventSource Closed");
76+
})
77+
```

crates/net/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ pub enum Error {
1818
),
1919
}
2020

21-
#[cfg(any(feature = "http", feature = "websocket"))]
21+
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
2222
pub(crate) use conversion::*;
23-
#[cfg(any(feature = "http", feature = "websocket"))]
23+
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
2424
mod conversion {
2525
use gloo_utils::errors::JsError;
2626
use std::convert::TryFrom;

crates/net/src/eventsource/futures.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//! A wrapper around the `EventSource` API using the Futures API to be used with async rust.
2+
//!
3+
//! EventSource is similar to WebSocket with the major differences being:
4+
//!
5+
//! * they are a one-way stream of server generated events
6+
//! * their connection is managed entirely by the browser
7+
//! * their data is slightly more structured including an id, type and data
8+
//!
9+
//! EventSource is therefore suitable for simpler scenarios than WebSocket.
10+
//!
11+
//! See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) to learn more.
12+
//!
13+
//! # Example
14+
//!
15+
//! ```rust
16+
//! use gloo_net::eventsource::futures::EventSource;
17+
//! use wasm_bindgen_futures::spawn_local;
18+
//! use futures::{stream, StreamExt};
19+
//!
20+
//! # macro_rules! console_log {
21+
//! # ($($expr:expr),*) => {{}};
22+
//! # }
23+
//! # fn no_run() {
24+
//! let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
25+
//! let stream_1 = es.subscribe("some-event-type").unwrap();
26+
//! let stream_2 = es.subscribe("another-event-type").unwrap();
27+
//!
28+
//! spawn_local(async move {
29+
//! let mut all_streams = stream::select(stream_1, stream_2);
30+
//! while let Some(Ok((event_type, msg))) = all_streams.next().await {
31+
//! console_log!(format!("1. {}: {:?}", event_type, msg))
32+
//! }
33+
//! console_log!("EventSource Closed");
34+
//! })
35+
//! # }
36+
//! ```
37+
use crate::eventsource::{EventSourceError, State};
38+
use crate::js_to_js_error;
39+
use futures_channel::mpsc;
40+
use futures_core::{ready, Stream};
41+
use gloo_utils::errors::JsError;
42+
use pin_project::{pin_project, pinned_drop};
43+
use std::fmt;
44+
use std::fmt::Formatter;
45+
use std::pin::Pin;
46+
use std::task::{Context, Poll};
47+
use wasm_bindgen::prelude::*;
48+
use wasm_bindgen::JsCast;
49+
use web_sys::MessageEvent;
50+
51+
/// Wrapper around browser's EventSource API. Dropping
52+
/// this will close the underlying event source.
53+
#[derive(Clone)]
54+
pub struct EventSource {
55+
es: web_sys::EventSource,
56+
}
57+
58+
impl fmt::Debug for EventSource {
59+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
60+
f.debug_struct("EventSource")
61+
.field("url", &self.es.url())
62+
.field("with_credentials", &self.es.with_credentials())
63+
.field("ready_state", &self.state())
64+
.finish_non_exhaustive()
65+
}
66+
}
67+
68+
/// Wrapper around browser's EventSource API.
69+
#[pin_project(PinnedDrop)]
70+
pub struct EventSourceSubscription {
71+
#[allow(clippy::type_complexity)]
72+
error_callback: Closure<dyn FnMut(web_sys::Event)>,
73+
es: web_sys::EventSource,
74+
event_type: String,
75+
message_callback: Closure<dyn FnMut(MessageEvent)>,
76+
#[pin]
77+
message_receiver: mpsc::UnboundedReceiver<StreamMessage>,
78+
}
79+
80+
impl fmt::Debug for EventSourceSubscription {
81+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
82+
f.debug_struct("EventSourceSubscription")
83+
.field("event_source", &self.es)
84+
.field("event_type", &self.event_type)
85+
.finish_non_exhaustive()
86+
}
87+
}
88+
89+
impl EventSource {
90+
/// Establish an EventSource.
91+
///
92+
/// This function may error in the following cases:
93+
/// - The connection url is invalid
94+
///
95+
/// The error returned is [`JsError`]. See the
96+
/// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource#exceptions_thrown)
97+
/// to learn more.
98+
pub fn new(url: &str) -> Result<Self, JsError> {
99+
let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?;
100+
101+
Ok(Self { es })
102+
}
103+
104+
/// Subscribes to listening for a specific type of event.
105+
///
106+
/// All events for this type are streamed back given the subscription
107+
/// returned.
108+
///
109+
/// The event type of "message" is a special case, as it will capture
110+
/// events without an event field as well as events that have the
111+
/// specific type `event: message`. It will not trigger on any
112+
/// other event type.
113+
pub fn subscribe(
114+
&mut self,
115+
event_type: impl Into<String>,
116+
) -> Result<EventSourceSubscription, JsError> {
117+
let event_type = event_type.into();
118+
let (message_sender, message_receiver) = mpsc::unbounded();
119+
120+
let message_callback: Closure<dyn FnMut(MessageEvent)> = {
121+
let event_type = event_type.clone();
122+
let sender = message_sender.clone();
123+
Closure::wrap(Box::new(move |e: MessageEvent| {
124+
let event_type = event_type.clone();
125+
let _ = sender.unbounded_send(StreamMessage::Message(event_type, e));
126+
}) as Box<dyn FnMut(MessageEvent)>)
127+
};
128+
129+
self.es
130+
.add_event_listener_with_callback(
131+
&event_type,
132+
message_callback.as_ref().unchecked_ref(),
133+
)
134+
.map_err(js_to_js_error)?;
135+
136+
let error_callback: Closure<dyn FnMut(web_sys::Event)> = {
137+
Closure::wrap(Box::new(move |e: web_sys::Event| {
138+
let is_connecting = e
139+
.current_target()
140+
.map(|target| target.unchecked_into::<web_sys::EventSource>())
141+
.map(|es| es.ready_state() == web_sys::EventSource::CONNECTING)
142+
.unwrap_or(false);
143+
if !is_connecting {
144+
let _ = message_sender.unbounded_send(StreamMessage::ErrorEvent);
145+
};
146+
}) as Box<dyn FnMut(web_sys::Event)>)
147+
};
148+
149+
self.es
150+
.add_event_listener_with_callback("error", error_callback.as_ref().unchecked_ref())
151+
.map_err(js_to_js_error)?;
152+
153+
Ok(EventSourceSubscription {
154+
error_callback,
155+
es: self.es.clone(),
156+
event_type,
157+
message_callback,
158+
message_receiver,
159+
})
160+
}
161+
162+
/// Closes the EventSource.
163+
///
164+
/// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close#parameters)
165+
/// to learn about this function
166+
pub fn close(mut self) {
167+
self.close_and_notify();
168+
}
169+
170+
fn close_and_notify(&mut self) {
171+
self.es.close();
172+
// Fire an error event to cause all subscriber
173+
// streams to close down.
174+
if let Ok(event) = web_sys::Event::new("error") {
175+
let _ = self.es.dispatch_event(&event);
176+
}
177+
}
178+
179+
/// The current state of the EventSource.
180+
pub fn state(&self) -> State {
181+
let ready_state = self.es.ready_state();
182+
match ready_state {
183+
0 => State::Connecting,
184+
1 => State::Open,
185+
2 => State::Closed,
186+
_ => unreachable!(),
187+
}
188+
}
189+
}
190+
191+
impl Drop for EventSource {
192+
fn drop(&mut self) {
193+
self.close_and_notify();
194+
}
195+
}
196+
197+
#[derive(Clone)]
198+
enum StreamMessage {
199+
ErrorEvent,
200+
Message(String, MessageEvent),
201+
}
202+
203+
impl Stream for EventSourceSubscription {
204+
type Item = Result<(String, MessageEvent), EventSourceError>;
205+
206+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207+
let msg = ready!(self.project().message_receiver.poll_next(cx));
208+
match msg {
209+
Some(StreamMessage::Message(event_type, msg)) => {
210+
Poll::Ready(Some(Ok((event_type, msg))))
211+
}
212+
Some(StreamMessage::ErrorEvent) => {
213+
Poll::Ready(Some(Err(EventSourceError::ConnectionError)))
214+
}
215+
None => Poll::Ready(None),
216+
}
217+
}
218+
}
219+
220+
#[pinned_drop]
221+
impl PinnedDrop for EventSourceSubscription {
222+
fn drop(self: Pin<&mut Self>) {
223+
let _ = self.es.remove_event_listener_with_callback(
224+
"error",
225+
self.error_callback.as_ref().unchecked_ref(),
226+
);
227+
228+
let _ = self.es.remove_event_listener_with_callback(
229+
&self.event_type,
230+
self.message_callback.as_ref().unchecked_ref(),
231+
);
232+
}
233+
}
234+
235+
#[cfg(test)]
236+
mod tests {
237+
use super::*;
238+
use futures::StreamExt;
239+
use wasm_bindgen_futures::spawn_local;
240+
use wasm_bindgen_test::*;
241+
242+
wasm_bindgen_test_configure!(run_in_browser);
243+
244+
const SSE_ECHO_SERVER_URL: &str = env!("SSE_ECHO_SERVER_URL");
245+
246+
#[wasm_bindgen_test]
247+
fn eventsource_works() {
248+
let mut es = EventSource::new(SSE_ECHO_SERVER_URL).unwrap();
249+
let mut servers = es.subscribe("server").unwrap();
250+
let mut requests = es.subscribe("request").unwrap();
251+
252+
spawn_local(async move {
253+
assert_eq!(servers.next().await.unwrap().unwrap().0, "server");
254+
assert_eq!(requests.next().await.unwrap().unwrap().0, "request");
255+
});
256+
}
257+
258+
#[wasm_bindgen_test]
259+
fn eventsource_connect_failure_works() {
260+
let mut es = EventSource::new("rubbish").unwrap();
261+
let mut servers = es.subscribe("server").unwrap();
262+
263+
spawn_local(async move {
264+
// we should expect an immediate failure
265+
266+
assert_eq!(
267+
servers.next().await,
268+
Some(Err(EventSourceError::ConnectionError))
269+
);
270+
})
271+
}
272+
}

0 commit comments

Comments
 (0)