Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Allow bigger responses to /federation/v1/state #12877

Merged
merged 3 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12877.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.54 which could sometimes cause exceptions when handling federated traffic.
15 changes: 8 additions & 7 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@

logger = logging.getLogger(__name__)

# Send join responses can be huge, so we set a separate limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024


class TransportLayerClient:
"""Sends federation HTTP requests to other servers"""
Expand Down Expand Up @@ -349,7 +344,6 @@ async def send_join_v1(
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=True),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

async def send_join_v2(
Expand All @@ -372,7 +366,6 @@ async def send_join_v2(
args=query_params,
data=content,
parser=SendJoinParser(room_version, v1_api=False),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

async def send_leave_v1(
Expand Down Expand Up @@ -1360,6 +1353,11 @@ class SendJoinParser(ByteParser[SendJoinResponse]):

CONTENT_TYPE = "application/json"

# /send_join responses can be huge, so we override the size limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE = 500 * 1024 * 1024

def __init__(self, room_version: RoomVersion, v1_api: bool):
self._response = SendJoinResponse([], [], event_dict={})
self._room_version = room_version
Expand Down Expand Up @@ -1427,6 +1425,9 @@ class _StateParser(ByteParser[StateRequestResponse]):

CONTENT_TYPE = "application/json"

# As with /send_join, /state responses can be huge.
MAX_RESPONSE_SIZE = 500 * 1024 * 1024

def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], [])
self._room_version = room_version
Expand Down
29 changes: 7 additions & 22 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
)

# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE = 100 * 1024 * 1024

MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
Expand All @@ -116,6 +113,11 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC):
the content type doesn't match we fail the request.
"""

# a federation response can be rather large (eg a big state_ids is 50M or so), so we
# need a generous limit here.
MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
"""The largest response this parser will accept."""

@abc.abstractmethod
def finish(self) -> T:
"""Called when response has finished streaming and the parser should
Expand Down Expand Up @@ -203,7 +205,6 @@ async def _handle_response(
response: IResponse,
start_ms: int,
parser: ByteParser[T],
max_response_size: Optional[int] = None,
) -> T:
"""
Reads the body of a response with a timeout and sends it to a parser
Expand All @@ -215,15 +216,12 @@ async def _handle_response(
response: response to the request
start_ms: Timestamp when request was made
parser: The parser for the response
max_response_size: The maximum size to read from the response, if None
uses the default.

Returns:
The parsed response
"""

if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE
max_response_size = parser.MAX_RESPONSE_SIZE

try:
check_content_type_is(response.headers, parser.CONTENT_TYPE)
Expand All @@ -240,7 +238,7 @@ async def _handle_response(
"{%s} [%s] JSON response exceeded max size %i - %s %s",
request.txn_id,
request.destination,
MAX_RESPONSE_SIZE,
max_response_size,
request.method,
request.uri.decode("ascii"),
)
Expand Down Expand Up @@ -772,7 +770,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]:
...

Expand All @@ -790,7 +787,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
max_response_size: Optional[int] = None,
) -> T:
...

Expand All @@ -807,7 +803,6 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
):
"""Sends the specified json data using PUT

Expand Down Expand Up @@ -843,8 +838,6 @@ async def put_json(
enabled.
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
max_response_size: The maximum size to read from the response, if None
uses the default.

Returns:
Succeeds when we get a 2xx HTTP response. The
Expand Down Expand Up @@ -895,7 +888,6 @@ async def put_json(
response,
start_ms,
parser=parser,
max_response_size=max_response_size,
)

return body
Expand Down Expand Up @@ -984,7 +976,6 @@ async def get_json(
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]:
...

Expand All @@ -999,7 +990,6 @@ async def get_json(
ignore_backoff: bool = ...,
try_trailing_slash_on_400: bool = ...,
parser: ByteParser[T] = ...,
max_response_size: Optional[int] = ...,
) -> T:
...

Expand All @@ -1013,7 +1003,6 @@ async def get_json(
ignore_backoff: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
):
"""GETs some json from the given host homeserver and path

Expand Down Expand Up @@ -1043,9 +1032,6 @@ async def get_json(
parser: The parser to use to decode the response. Defaults to
parsing as JSON.

max_response_size: The maximum size to read from the response. If None,
uses the default.

Returns:
Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Expand Down Expand Up @@ -1090,7 +1076,6 @@ async def get_json(
response,
start_ms,
parser=parser,
max_response_size=max_response_size,
)

return body
Expand Down
6 changes: 3 additions & 3 deletions tests/http/test_fedclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from synapse.api.errors import RequestSendFailed
from synapse.http.matrixfederationclient import (
MAX_RESPONSE_SIZE,
JsonParser,
MatrixFederationHttpClient,
MatrixFederationRequest,
)
Expand Down Expand Up @@ -609,9 +609,9 @@ def test_too_big(self):
while not test_d.called:
protocol.dataReceived(b"a" * chunk_size)
sent += chunk_size
self.assertLessEqual(sent, MAX_RESPONSE_SIZE)
self.assertLessEqual(sent, JsonParser.MAX_RESPONSE_SIZE)

self.assertEqual(sent, MAX_RESPONSE_SIZE)
self.assertEqual(sent, JsonParser.MAX_RESPONSE_SIZE)

f = self.failureResultOf(test_d)
self.assertIsInstance(f.value, RequestSendFailed)
Expand Down