Skip to content

Commit 468743f

Browse files
frankudoagsmerklefruitjenpaffvarun-doshimattsse
authored
feat: define subscription type (#2203)
* feat: define subscription type * feat: `FallbackLayer` transport (#2135) * feat: initial impl of fallback layer and service * chore: released trait bounds * chore: use random ping id * chore: update fn name * chore: update fn name * chore: rm health check task and sample count * fix: lock scope * chore: adjust logs * chore: small cleanup * chore: cargo fmt * chore: cargo fmt again * chore: addressed review, dont take transports from the list on each request * chore: rm private doc * chore: update mod doc comment ordering * chore: addressed review --------- Co-authored-by: Jennifer <[email protected]> * feat: add BlobAndProofV2 (#2202) * feat: add BlobAndProofV2 * Update crates/eips/src/eip4844/engine.rs --------- Co-authored-by: Matthias Seitz <[email protected]> * fix: buffer size go brrr * refactor: rename EthSuscribe to GetSubscription, naming is hard sigh * feat: - fix description - add `method` member - make params optional - handle optional channel size and also handle params case and no params case separately * chore: fmt, rename buffer to channel_size --------- Co-authored-by: nicolas <[email protected]> Co-authored-by: Jennifer <[email protected]> Co-authored-by: Varun Doshi <[email protected]> Co-authored-by: Matthias Seitz <[email protected]>
1 parent 80abfd4 commit 468743f

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

crates/provider/src/provider/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,6 @@ pub use multicall::*;
2727

2828
mod erased;
2929
pub use erased::DynProvider;
30+
31+
mod subscription;
32+
pub use subscription::GetSubscription;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use crate::{Provider, RootProvider};
2+
use alloy_json_rpc::{RpcRecv, RpcSend};
3+
use alloy_network::{Ethereum, Network};
4+
use alloy_transport::TransportResult;
5+
use std::{borrow::Cow, future::Future, pin::Pin};
6+
7+
/// A general-purpose subscription request builder
8+
///
9+
/// This struct allows configuring subscription parameters and channel size
10+
/// before initiating a request to subscribe to Ethereum events.
11+
pub struct GetSubscription<P, R, N = Ethereum>
12+
where
13+
P: RpcSend,
14+
R: RpcRecv,
15+
N: Network,
16+
{
17+
root: RootProvider<N>,
18+
method: Cow<'static, str>,
19+
params: Option<P>,
20+
channel_size: Option<usize>,
21+
_marker: std::marker::PhantomData<fn() -> R>,
22+
}
23+
24+
impl<P, R, N> GetSubscription<P, R, N>
25+
where
26+
N: Network,
27+
P: RpcSend,
28+
R: RpcRecv,
29+
{
30+
/// Creates a new [`GetSubscription`] instance
31+
pub fn new(
32+
root: RootProvider<N>,
33+
method: impl Into<Cow<'static, str>>,
34+
params: Option<P>,
35+
) -> Self {
36+
Self {
37+
root,
38+
method: method.into(),
39+
channel_size: None,
40+
params,
41+
_marker: std::marker::PhantomData,
42+
}
43+
}
44+
45+
/// Set the channel_size for the subscription stream.
46+
pub fn channel_size(mut self, size: usize) -> Self {
47+
self.channel_size = Some(size);
48+
self
49+
}
50+
}
51+
52+
impl<P, R, N> core::fmt::Debug for GetSubscription<P, R, N>
53+
where
54+
N: Network,
55+
P: RpcSend,
56+
R: RpcRecv,
57+
{
58+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
59+
f.debug_struct("GetSubscription")
60+
.field("channel_size", &self.channel_size)
61+
.field("method", &self.method)
62+
.finish()
63+
}
64+
}
65+
66+
#[cfg(feature = "pubsub")]
67+
impl<P, R, N> std::future::IntoFuture for GetSubscription<P, R, N>
68+
where
69+
N: Network,
70+
P: RpcSend + 'static,
71+
R: RpcRecv,
72+
{
73+
type Output = TransportResult<alloy_pubsub::Subscription<R>>;
74+
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'static>>;
75+
76+
fn into_future(self) -> Self::IntoFuture {
77+
Box::pin(async move {
78+
let pubsub = self.root.pubsub_frontend()?;
79+
80+
// Set config channel size if any
81+
if let Some(size) = self.channel_size {
82+
pubsub.set_channel_size(size);
83+
}
84+
85+
// Handle params and no-params case separately
86+
let id = if let Some(params) = self.params {
87+
let mut call = self.root.client().request(self.method, params);
88+
call.set_is_subscription();
89+
call.await?
90+
} else {
91+
let mut call = self.root.client().request_noparams(self.method);
92+
call.set_is_subscription();
93+
call.await?
94+
};
95+
96+
self.root.get_subscription(id).await
97+
})
98+
}
99+
}

0 commit comments

Comments
 (0)