diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index 55358a3a47..e4511f1515 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -27,7 +27,7 @@ use std::io; use std::sync::Arc; -use crate::{to_json_raw_value, Error}; +use crate::Error; use futures_channel::mpsc; use futures_util::StreamExt; use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; @@ -119,8 +119,8 @@ impl MethodSink { tracing::error!("Error serializing response: {:?}", err); if err.is_io() { - let data = to_json_raw_value(&format!("Exceeded max limit {}", self.max_response_size)).ok(); - let err = ErrorObject::borrowed(OVERSIZED_RESPONSE_CODE, &OVERSIZED_RESPONSE_MSG, data.as_deref()); + let data = format!("Exceeded max limit of {}", self.max_response_size); + let err = ErrorObject::owned(OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG, Some(data)); return self.send_error(id, err); } else { return self.send_error(id, ErrorCode::InternalError.into()); @@ -217,12 +217,17 @@ impl SubscriptionPermit { pub struct BoundedSubscriptions { resource: Arc, guard: Arc, + max: u32, } impl BoundedSubscriptions { /// Create a new bounded subscription. pub fn new(max_subscriptions: u32) -> Self { - Self { resource: Arc::new(Notify::new()), guard: Arc::new(Semaphore::new(max_subscriptions as usize)) } + Self { + resource: Arc::new(Notify::new()), + guard: Arc::new(Semaphore::new(max_subscriptions as usize)), + max: max_subscriptions, + } } /// Attempts to acquire a subscription slot. @@ -235,6 +240,11 @@ impl BoundedSubscriptions { .map(|p| SubscriptionPermit { _permit: p, resource: self.resource.clone() }) } + /// Get the maximum number of permitted subscriptions. + pub const fn max(&self) -> u32 { + self.max + } + /// Close all subscriptions. pub fn close(&self) { self.resource.notify_waiters(); diff --git a/http-server/src/response.rs b/http-server/src/response.rs index c216bafa65..6a35556304 100644 --- a/http-server/src/response.rs +++ b/http-server/src/response.rs @@ -26,6 +26,8 @@ //! Contains common builders for hyper responses. +use jsonrpsee_types::error::reject_too_big_request; + use crate::types::error::{ErrorCode, ErrorResponse}; use crate::types::Id; @@ -73,8 +75,8 @@ pub fn invalid_allow_headers() -> hyper::Response { } /// Create a json response for oversized requests (413) -pub fn too_large() -> hyper::Response { - let error = serde_json::to_string(&ErrorResponse::borrowed(ErrorCode::OversizedRequest.into(), Id::Null)) +pub fn too_large(limit: u32) -> hyper::Response { + let error = serde_json::to_string(&ErrorResponse::borrowed(reject_too_big_request(limit), Id::Null)) .expect("built from known-good data; qed"); from_template(hyper::StatusCode::PAYLOAD_TOO_LARGE, error, JSON) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 7f96b301b3..4d2195278b 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -556,7 +556,7 @@ async fn process_validated_request( let (body, mut is_single) = match read_body(&parts.headers, body, max_request_body_size).await { Ok(r) => r, - Err(GenericTransportError::TooLarge) => return Ok(response::too_large()), + Err(GenericTransportError::TooLarge) => return Ok(response::too_large(max_request_body_size)), Err(GenericTransportError::Malformed) => return Ok(response::malformed()), Err(GenericTransportError::Inner(e)) => { tracing::error!("Internal error reading request body: {}", e); diff --git a/http-server/src/tests.rs b/http-server/src/tests.rs index aa4a394b06..45da37afba 100644 --- a/http-server/src/tests.rs +++ b/http-server/src/tests.rs @@ -430,7 +430,7 @@ async fn can_set_the_max_request_body_size() { // Invalid: too long let req = format!(r#"{{"jsonrpc":"2.0", "method":{}, "id":1}}"#, "a".repeat(100)); let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap(); - assert_eq!(response.body, oversized_request()); + assert_eq!(response.body, oversized_request(100)); // Max request body size should not override the max response size let req = r#"{"jsonrpc":"2.0", "method":"anything", "id":1}"#; diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index 093e68a3dd..dae9bae8c5 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -71,8 +71,11 @@ pub fn parse_error(id: Id) -> String { ) } -pub fn oversized_request() -> String { - r#"{"jsonrpc":"2.0","error":{"code":-32701,"message":"Request is too big"},"id":null}"#.into() +pub fn oversized_request(max_limit: u32) -> String { + format!( + r#"{{"jsonrpc":"2.0","error":{{"code":-32701,"message":"Request is too big","data":"Exceeded max limit of {}"}},"id":null}}"#, + max_limit + ) } pub fn batches_not_supported() -> String { @@ -81,7 +84,7 @@ pub fn batches_not_supported() -> String { pub fn oversized_response(id: Id, max_limit: u32) -> String { format!( - r#"{{"jsonrpc":"2.0","error":{{"code":-32702,"message":"Response is too big","data":"Exceeded max limit {}"}},"id":{}}}"#, + r#"{{"jsonrpc":"2.0","error":{{"code":-32702,"message":"Response is too big","data":"Exceeded max limit of {}"}},"id":{}}}"#, max_limit, serde_json::to_string(&id).unwrap(), ) diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index f62faddcb4..a1326964e8 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -493,7 +493,7 @@ async fn ws_batch_works() { #[tokio::test] async fn ws_server_limit_subs_per_conn_works() { use futures::StreamExt; - use jsonrpsee::types::error::{CallError, SERVER_IS_BUSY_CODE, SERVER_IS_BUSY_MSG}; + use jsonrpsee::types::error::{CallError, TOO_MANY_SUBSCRIPTIONS_CODE, TOO_MANY_SUBSCRIPTIONS_MSG}; use jsonrpsee::{ws_server::WsServerBuilder, RpcModule}; let server = WsServerBuilder::default().max_subscriptions_per_connection(10).build("127.0.0.1:0").await.unwrap(); @@ -537,11 +537,13 @@ async fn ws_server_limit_subs_per_conn_works() { let err1 = c1.subscribe::("subscribe_forever", None, "unsubscribe_forever").await; let err2 = c1.subscribe::("subscribe_forever", None, "unsubscribe_forever").await; + let data = "\"Exceeded max limit of 10\""; + assert!( - matches!(err1, Err(Error::Call(CallError::Custom(err))) if err.code() == SERVER_IS_BUSY_CODE && err.message() == SERVER_IS_BUSY_MSG) + matches!(err1, Err(Error::Call(CallError::Custom(err))) if err.code() == TOO_MANY_SUBSCRIPTIONS_CODE && err.message() == TOO_MANY_SUBSCRIPTIONS_MSG && err.data().unwrap().get() == data) ); assert!( - matches!(err2, Err(Error::Call(CallError::Custom(err))) if err.code() == SERVER_IS_BUSY_CODE && err.message() == SERVER_IS_BUSY_MSG) + matches!(err2, Err(Error::Call(CallError::Custom(err))) if err.code() == TOO_MANY_SUBSCRIPTIONS_CODE && err.message() == TOO_MANY_SUBSCRIPTIONS_MSG && err.data().unwrap().get() == data) ); } diff --git a/types/src/error.rs b/types/src/error.rs index a46b0371ad..4db09994a9 100644 --- a/types/src/error.rs +++ b/types/src/error.rs @@ -182,6 +182,8 @@ pub const SUBSCRIPTION_CLOSED: i32 = -32003; pub const SUBSCRIPTION_CLOSED_WITH_ERROR: i32 = -32004; /// Batched requests are not supported by the server. pub const BATCHES_NOT_SUPPORTED_CODE: i32 = -32005; +/// Subscription limit per connection was exceeded. +pub const TOO_MANY_SUBSCRIPTIONS_CODE: i32 = -32006; /// Parse error message pub const PARSE_ERROR_MSG: &str = "Parse error"; @@ -203,6 +205,8 @@ pub const SERVER_IS_BUSY_MSG: &str = "Server is busy, try again later"; pub const SERVER_ERROR_MSG: &str = "Server error"; /// Batched requests not supported error message. pub const BATCHES_NOT_SUPPORTED_MSG: &str = "Batched requests are not supported by this server"; +/// Subscription limit per connection was exceeded. +pub const TOO_MANY_SUBSCRIPTIONS_MSG: &str = "Too many subscriptions on the connection"; /// JSONRPC error code #[derive(Error, Debug, PartialEq, Copy, Clone)] @@ -322,6 +326,24 @@ impl CallError { } } +/// Helper to get a `JSON-RPC` error object when the maximum number of subscriptions have been exceeded. +pub fn reject_too_many_subscriptions(limit: u32) -> ErrorObject<'static> { + ErrorObjectOwned::owned( + TOO_MANY_SUBSCRIPTIONS_CODE, + TOO_MANY_SUBSCRIPTIONS_MSG, + Some(format!("Exceeded max limit of {}", limit)), + ) +} + +/// Helper to get a `JSON-RPC` error object when the maximum request size limit have been exceeded. +pub fn reject_too_big_request(limit: u32) -> ErrorObject<'static> { + ErrorObjectOwned::owned( + OVERSIZED_REQUEST_CODE, + OVERSIZED_REQUEST_MSG, + Some(format!("Exceeded max limit of {}", limit)), + ) +} + #[cfg(test)] mod tests { use super::{ErrorCode, ErrorObject, ErrorResponse, Id, TwoPointZero}; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index ba935b6735..aa80612124 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -45,6 +45,7 @@ use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{ConnState, ConnectionId, MethodKind, Methods}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; +use jsonrpsee_types::error::{reject_too_big_request, reject_too_many_subscriptions}; use jsonrpsee_types::Params; use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; @@ -397,7 +398,7 @@ async fn background_task( current, maximum ); - sink.send_error(Id::Null, ErrorCode::OversizedRequest.into()); + sink.send_error(Id::Null, reject_too_big_request(max_request_body_size)); continue; } // These errors can not be gracefully handled, so just log them and terminate the connection. @@ -483,7 +484,10 @@ async fn background_task( ConnState { conn_id, close_notify: cn, id_provider: &*id_provider }; callback(id, params, &sink, conn_state) } else { - sink.send_error(req.id, ErrorCode::ServerIsBusy.into()); + sink.send_error( + req.id, + reject_too_many_subscriptions(bounded_subscriptions.max()), + ); false }; middleware.on_result(name, result, request_start); @@ -602,7 +606,10 @@ async fn background_task( }; callback(id, params, &sink_batch, conn_state) } else { - sink_batch.send_error(req.id, ErrorCode::ServerIsBusy.into()); + sink_batch.send_error( + req.id, + reject_too_many_subscriptions(bounded_subscriptions2.max()), + ); false }; middleware.on_result(&req.method, result, request_start); diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index f478401adc..b0f6f2bf29 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -198,7 +198,7 @@ async fn can_set_the_max_request_body_size() { // Invalid: too long let req = format!(r#"{{"jsonrpc":"2.0", "method":{}, "id":1}}"#, "a".repeat(100)); let response = client.send_request_text(req).await.unwrap(); - assert_eq!(response, oversized_request()); + assert_eq!(response, oversized_request(100)); // Max request body size should not override the max response body size let req = r#"{"jsonrpc":"2.0", "method":"anything", "id":1}"#;