Skip to content

Commit 76911a2

Browse files
committed
attempt brotli decompression
1 parent cb55e69 commit 76911a2

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/flashblocks-rpc/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,5 @@ itertools.workspace = true
7878
eyre.workspace = true
7979
uuid.workspace = true
8080
time.workspace = true
81-
chrono.workspace = true
81+
chrono.workspace = true
82+
brotli = "8.0.1"

crates/flashblocks-rpc/src/flashblocks.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures_util::StreamExt;
55
use reth_optimism_primitives::{OpBlock, OpReceipt, OpTransactionSigned};
66
use rollup_boost::primitives::{ExecutionPayloadBaseV1, FlashblocksPayloadV1};
77
use serde::{Deserialize, Serialize};
8-
use std::{str::FromStr, sync::Arc};
8+
use std::{io::Read, str::FromStr, sync::Arc};
99
use tokio::sync::mpsc;
1010
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
1111
use tracing::error;
@@ -82,16 +82,14 @@ impl FlashblocksClient {
8282

8383
match msg {
8484
Ok(Message::Binary(bytes)) => {
85-
// Decode binary message to string first
86-
let text = match String::from_utf8(bytes.to_vec()) {
85+
let text = match try_parse_message(&bytes) {
8786
Ok(text) => text,
8887
Err(e) => {
89-
error!("Failed to decode binary message: {}", e);
88+
error!("Failed to decode message: {}", e);
9089
continue;
9190
}
9291
};
9392

94-
// Then parse JSON
9593
let payload: FlashblocksPayloadV1 =
9694
match serde_json::from_str(&text) {
9795
Ok(m) => m,
@@ -146,6 +144,21 @@ impl FlashblocksClient {
146144
}
147145
}
148146

147+
fn try_parse_message(bytes: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
148+
if let Ok(text) = String::from_utf8(bytes.to_vec()) {
149+
if text.trim_start().starts_with("{") {
150+
return Ok(text);
151+
}
152+
}
153+
154+
let mut decompressor = brotli::Decompressor::new(bytes, 4096);
155+
let mut decompressed = Vec::new();
156+
decompressor.read_to_end(&mut decompressed)?;
157+
158+
let text = String::from_utf8(decompressed)?;
159+
Ok(text)
160+
}
161+
149162
fn process_payload(payload: FlashblocksPayloadV1, cache: Arc<Cache>) {
150163
let metrics = Metrics::default();
151164
let msg_processing_start_time = Instant::now();

0 commit comments

Comments
 (0)