Skip to content

Commit 2662dc7

Browse files
authored
Fix Sse client api (#6685)
* Use reqwest eventsource for get_events api * await for Event::Open before returning stream * fmt * Merge branch 'unstable' into sse-client-fix * Ignore lint
1 parent 1315c94 commit 2662dc7

File tree

5 files changed

+65
-29
lines changed

5 files changed

+65
-29
lines changed

Cargo.lock

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2796,6 +2796,7 @@ async fn finalizes_after_resuming_from_db() {
27962796
);
27972797
}
27982798

2799+
#[allow(clippy::large_stack_frames)]
27992800
#[tokio::test]
28002801
async fn revert_minority_fork_on_resume() {
28012802
let validator_count = 16;

common/eth2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ slashing_protection = { workspace = true }
2727
mediatype = "0.19.13"
2828
pretty_reqwest_error = { workspace = true }
2929
derivative = { workspace = true }
30+
reqwest-eventsource = "0.5.0"
3031

3132
[dev-dependencies]
3233
tokio = { workspace = true }

common/eth2/src/lib.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use reqwest::{
2727
Body, IntoUrl, RequestBuilder, Response,
2828
};
2929
pub use reqwest::{StatusCode, Url};
30+
use reqwest_eventsource::{Event, EventSource};
3031
pub use sensitive_url::{SensitiveError, SensitiveUrl};
3132
use serde::{de::DeserializeOwned, Serialize};
3233
use ssz::Encode;
@@ -52,6 +53,8 @@ pub const SSZ_CONTENT_TYPE_HEADER: &str = "application/octet-stream";
5253
pub enum Error {
5354
/// The `reqwest` client raised an error.
5455
HttpClient(PrettyReqwestError),
56+
/// The `reqwest_eventsource` client raised an error.
57+
SseClient(reqwest_eventsource::Error),
5558
/// The server returned an error message where the body was able to be parsed.
5659
ServerMessage(ErrorMessage),
5760
/// The server returned an error message with an array of errors.
@@ -93,6 +96,13 @@ impl Error {
9396
pub fn status(&self) -> Option<StatusCode> {
9497
match self {
9598
Error::HttpClient(error) => error.inner().status(),
99+
Error::SseClient(error) => {
100+
if let reqwest_eventsource::Error::InvalidStatusCode(status, _) = error {
101+
Some(*status)
102+
} else {
103+
None
104+
}
105+
}
96106
Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(),
97107
Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(),
98108
Error::StatusCode(status) => Some(*status),
@@ -2592,16 +2602,29 @@ impl BeaconNodeHttpClient {
25922602
.join(",");
25932603
path.query_pairs_mut().append_pair("topics", &topic_string);
25942604

2595-
Ok(self
2596-
.client
2597-
.get(path)
2598-
.send()
2599-
.await?
2600-
.bytes_stream()
2601-
.map(|next| match next {
2602-
Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()),
2603-
Err(e) => Err(Error::HttpClient(e.into())),
2604-
}))
2605+
let mut es = EventSource::get(path);
2606+
// If we don't await `Event::Open` here, then the consumer
2607+
// will not get any Message events until they start awaiting the stream.
2608+
// This is a way to register the stream with the sse server before
2609+
// message events start getting emitted.
2610+
while let Some(event) = es.next().await {
2611+
match event {
2612+
Ok(Event::Open) => break,
2613+
Err(err) => return Err(Error::SseClient(err)),
2614+
// This should never happen as we are guaranteed to get the
2615+
// Open event before any message starts coming through.
2616+
Ok(Event::Message(_)) => continue,
2617+
}
2618+
}
2619+
Ok(Box::pin(es.filter_map(|event| async move {
2620+
match event {
2621+
Ok(Event::Open) => None,
2622+
Ok(Event::Message(message)) => {
2623+
Some(EventKind::from_sse_bytes(&message.event, &message.data))
2624+
}
2625+
Err(err) => Some(Err(Error::SseClient(err))),
2626+
}
2627+
})))
26052628
}
26062629

26072630
/// `POST validator/duties/sync/{epoch}`

common/eth2/src/types.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use serde_json::Value;
1313
use ssz::{Decode, DecodeError};
1414
use ssz_derive::{Decode, Encode};
1515
use std::fmt::{self, Display};
16-
use std::str::{from_utf8, FromStr};
16+
use std::str::FromStr;
1717
use std::sync::Arc;
1818
use std::time::Duration;
1919
use types::beacon_block_body::KzgCommitments;
@@ -1153,24 +1153,7 @@ impl<E: EthSpec> EventKind<E> {
11531153
}
11541154
}
11551155

1156-
pub fn from_sse_bytes(message: &[u8]) -> Result<Self, ServerError> {
1157-
let s = from_utf8(message)
1158-
.map_err(|e| ServerError::InvalidServerSentEvent(format!("{:?}", e)))?;
1159-
1160-
let mut split = s.split('\n');
1161-
let event = split
1162-
.next()
1163-
.ok_or_else(|| {
1164-
ServerError::InvalidServerSentEvent("Could not parse event tag".to_string())
1165-
})?
1166-
.trim_start_matches("event:");
1167-
let data = split
1168-
.next()
1169-
.ok_or_else(|| {
1170-
ServerError::InvalidServerSentEvent("Could not parse data tag".to_string())
1171-
})?
1172-
.trim_start_matches("data:");
1173-
1156+
pub fn from_sse_bytes(event: &str, data: &str) -> Result<Self, ServerError> {
11741157
match event {
11751158
"attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err(
11761159
|e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)),

0 commit comments

Comments
 (0)