Skip to content

Commit 1243a85

Browse files
committed
Merge remote-tracking branch 'origin/unstable' into cargo-sort
2 parents 14d785d + 2662dc7 commit 1243a85

File tree

6 files changed

+68
-36
lines changed

6 files changed

+68
-36
lines changed

Cargo.lock

Lines changed: 28 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2796,6 +2796,7 @@ async fn finalizes_after_resuming_from_db() {
27962796
);
27972797
}
27982798

2799+
#[allow(clippy::large_stack_frames)]
27992800
#[tokio::test]
28002801
async fn revert_minority_fork_on_resume() {
28012802
let validator_count = 16;

beacon_node/lighthouse_network/src/rpc/handler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,9 @@ where
964964
request_info: (Id, RequestType<E>),
965965
error: StreamUpgradeError<RPCError>,
966966
) {
967+
// This dialing is now considered failed
968+
self.dial_negotiated -= 1;
969+
967970
let (id, req) = request_info;
968971

969972
// map the error
@@ -989,9 +992,6 @@ where
989992
StreamUpgradeError::Apply(other) => other,
990993
};
991994

992-
// This dialing is now considered failed
993-
self.dial_negotiated -= 1;
994-
995995
self.outbound_io_error_retries = 0;
996996
self.events_out
997997
.push(HandlerEvent::Err(HandlerErr::Outbound {

common/eth2/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@ ethereum_ssz = { workspace = true }
1313
ethereum_ssz_derive = { workspace = true }
1414
futures = { workspace = true }
1515
futures-util = "0.3.8"
16-
libsecp256k1 = { workspace = true }
1716
lighthouse_network = { workspace = true }
1817
mediatype = "0.19.13"
1918
pretty_reqwest_error = { workspace = true }
2019
proto_array = { workspace = true }
2120
reqwest = { workspace = true }
22-
ring = { workspace = true }
21+
reqwest-eventsource = "0.5.0"
2322
sensitive_url = { workspace = true }
2423
serde = { workspace = true }
2524
serde_json = { workspace = true }

common/eth2/src/lib.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use reqwest::{
2727
Body, IntoUrl, RequestBuilder, Response,
2828
};
2929
pub use reqwest::{StatusCode, Url};
30+
use reqwest_eventsource::{Event, EventSource};
3031
pub use sensitive_url::{SensitiveError, SensitiveUrl};
3132
use serde::{de::DeserializeOwned, Serialize};
3233
use ssz::Encode;
@@ -52,6 +53,8 @@ pub const SSZ_CONTENT_TYPE_HEADER: &str = "application/octet-stream";
5253
pub enum Error {
5354
/// The `reqwest` client raised an error.
5455
HttpClient(PrettyReqwestError),
56+
/// The `reqwest_eventsource` client raised an error.
57+
SseClient(reqwest_eventsource::Error),
5558
/// The server returned an error message where the body was able to be parsed.
5659
ServerMessage(ErrorMessage),
5760
/// The server returned an error message with an array of errors.
@@ -93,6 +96,13 @@ impl Error {
9396
pub fn status(&self) -> Option<StatusCode> {
9497
match self {
9598
Error::HttpClient(error) => error.inner().status(),
99+
Error::SseClient(error) => {
100+
if let reqwest_eventsource::Error::InvalidStatusCode(status, _) = error {
101+
Some(*status)
102+
} else {
103+
None
104+
}
105+
}
96106
Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(),
97107
Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(),
98108
Error::StatusCode(status) => Some(*status),
@@ -2592,16 +2602,29 @@ impl BeaconNodeHttpClient {
25922602
.join(",");
25932603
path.query_pairs_mut().append_pair("topics", &topic_string);
25942604

2595-
Ok(self
2596-
.client
2597-
.get(path)
2598-
.send()
2599-
.await?
2600-
.bytes_stream()
2601-
.map(|next| match next {
2602-
Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()),
2603-
Err(e) => Err(Error::HttpClient(e.into())),
2604-
}))
2605+
let mut es = EventSource::get(path);
2606+
// If we don't await `Event::Open` here, then the consumer
2607+
// will not get any Message events until they start awaiting the stream.
2608+
// This is a way to register the stream with the sse server before
2609+
// message events start getting emitted.
2610+
while let Some(event) = es.next().await {
2611+
match event {
2612+
Ok(Event::Open) => break,
2613+
Err(err) => return Err(Error::SseClient(err)),
2614+
// This should never happen as we are guaranteed to get the
2615+
// Open event before any message starts coming through.
2616+
Ok(Event::Message(_)) => continue,
2617+
}
2618+
}
2619+
Ok(Box::pin(es.filter_map(|event| async move {
2620+
match event {
2621+
Ok(Event::Open) => None,
2622+
Ok(Event::Message(message)) => {
2623+
Some(EventKind::from_sse_bytes(&message.event, &message.data))
2624+
}
2625+
Err(err) => Some(Err(Error::SseClient(err))),
2626+
}
2627+
})))
26052628
}
26062629

26072630
/// `POST validator/duties/sync/{epoch}`

common/eth2/src/types.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use serde_json::Value;
1313
use ssz::{Decode, DecodeError};
1414
use ssz_derive::{Decode, Encode};
1515
use std::fmt::{self, Display};
16-
use std::str::{from_utf8, FromStr};
16+
use std::str::FromStr;
1717
use std::sync::Arc;
1818
use std::time::Duration;
1919
use types::beacon_block_body::KzgCommitments;
@@ -1153,24 +1153,7 @@ impl<E: EthSpec> EventKind<E> {
11531153
}
11541154
}
11551155

1156-
pub fn from_sse_bytes(message: &[u8]) -> Result<Self, ServerError> {
1157-
let s = from_utf8(message)
1158-
.map_err(|e| ServerError::InvalidServerSentEvent(format!("{:?}", e)))?;
1159-
1160-
let mut split = s.split('\n');
1161-
let event = split
1162-
.next()
1163-
.ok_or_else(|| {
1164-
ServerError::InvalidServerSentEvent("Could not parse event tag".to_string())
1165-
})?
1166-
.trim_start_matches("event:");
1167-
let data = split
1168-
.next()
1169-
.ok_or_else(|| {
1170-
ServerError::InvalidServerSentEvent("Could not parse data tag".to_string())
1171-
})?
1172-
.trim_start_matches("data:");
1173-
1156+
pub fn from_sse_bytes(event: &str, data: &str) -> Result<Self, ServerError> {
11741157
match event {
11751158
"attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err(
11761159
|e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)),

0 commit comments

Comments
 (0)