From 7418fa9384a51454e19e6d4ed76d9a75693277d5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 31 Jul 2020 13:49:18 -0400 Subject: [PATCH 01/18] Add a JSON producer which iteratively creates JSON. --- synapse/http/server.py | 53 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index d4f9ad6e6732..719c8293267e 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -25,7 +25,7 @@ from typing import Any, Callable, Dict, Tuple, Union import jinja2 -from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json +from canonicaljson import _canonical_encoder, _pretty_encoder, json from twisted.internet import defer from twisted.python import failure @@ -492,6 +492,38 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): pass +class _JsonProducer: + """ + Iteratively write JSON to the request. + """ + + def __init__(self, request, json_encoder, json_object): + self.request = request + self._generator = json_encoder.iterencode(json_object) + + def start(self): + self.request.registerProducer(self, False) + + def resumeProducing(self): + # We've stopped producing in the meantime. + if not self.request: + return + + # Get the next chunk and write it to the request. Calling write will + # spin the reactor (and might be re-entrant). + try: + data = next(self._generator) + self.request.write(data.encode("utf-8")) + except StopIteration: + self.request.unregisterProducer() + self.request.finish() + self.stopProducing() + + def stopProducing(self): + self._generator = None + self.request = None + + def respond_with_json( request: Request, code: int, @@ -526,15 +558,24 @@ def respond_with_json( return None if pretty_print: - json_bytes = encode_pretty_printed_json(json_object) + b"\n" + encoder = _pretty_encoder else: if canonical_json or synapse.events.USE_FROZEN_DICTS: - # canonicaljson already encodes to bytes - json_bytes = encode_canonical_json(json_object) + encoder = _canonical_encoder else: - json_bytes = json.dumps(json_object).encode("utf-8") + # TODO Re-use this. + encoder = json.JSONEncoder(separators=(",", ":")) - return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors) + request.setResponseCode(code) + request.setHeader(b"Content-Type", b"application/json") + request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate") + + if send_cors: + set_cors_headers(request) + + producer = _JsonProducer(request, encoder, json_object) + producer.start() + return NOT_DONE_YET def respond_with_json_bytes( From f141196b289925d4ca8d0d09a954c1910dd9b23d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 31 Jul 2020 13:54:08 -0400 Subject: [PATCH 02/18] Do not manually call respond_with_json_bytes when unnecessary. --- synapse/rest/key/v2/remote_key_resource.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index 9b3f85b306d1..e266204f95af 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -15,12 +15,12 @@ import logging from typing import Dict, Set -from canonicaljson import encode_canonical_json, json +from canonicaljson import json from signedjson.sign import sign_json from synapse.api.errors import Codes, SynapseError from synapse.crypto.keyring import ServerKeyFetcher -from synapse.http.server import DirectServeJsonResource, respond_with_json_bytes +from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.servlet import parse_integer, parse_json_object_from_request logger = logging.getLogger(__name__) @@ -223,4 +223,4 @@ async def query_keys(self, request, query, query_remote_on_cache_miss=False): results = {"server_keys": signed_keys} - respond_with_json_bytes(request, 200, encode_canonical_json(results)) + respond_with_json(request, 200, results, canonical_json=True) From 133fe03031928e5f5a5689015096cd924ed9e886 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 31 Jul 2020 13:57:57 -0400 Subject: [PATCH 03/18] Add a changelog. --- changelog.d/8013.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8013.feature diff --git a/changelog.d/8013.feature b/changelog.d/8013.feature new file mode 100644 index 000000000000..b1eaf1e78a71 --- /dev/null +++ b/changelog.d/8013.feature @@ -0,0 +1 @@ +Iteratively encode JSON to avoid blocking the reactor. From 17798e6ea75291c28590f38273d79812ee8c6059 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 3 Aug 2020 09:39:48 -0400 Subject: [PATCH 04/18] Add a minimum chunk size (which seems to also fix the tests). --- synapse/http/server.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 719c8293267e..ce0ea6d64917 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -22,7 +22,7 @@ import urllib from http import HTTPStatus from io import BytesIO -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, List, Tuple, Union import jinja2 from canonicaljson import _canonical_encoder, _pretty_encoder, json @@ -496,6 +496,9 @@ class _JsonProducer: """ Iteratively write JSON to the request. """ + # The minimum number of bytes for each chunk. Note that the last chunk will + # usually be smaller than this. + min_chunk_size = 1024 def __init__(self, request, json_encoder, json_object): self.request = request @@ -504,6 +507,14 @@ def __init__(self, request, json_encoder, json_object): def start(self): self.request.registerProducer(self, False) + def _send_data(self, data: List[str]): + """ + Send a list of strings as a response to the request. + """ + if not data: + return + self.request.write("".join(data).encode("utf-8")) + def resumeProducing(self): # We've stopped producing in the meantime. if not self.request: @@ -511,13 +522,23 @@ def resumeProducing(self): # Get the next chunk and write it to the request. Calling write will # spin the reactor (and might be re-entrant). - try: - data = next(self._generator) - self.request.write(data.encode("utf-8")) - except StopIteration: - self.request.unregisterProducer() - self.request.finish() - self.stopProducing() + buffer = [] + buffered_bytes = 0 + while buffered_bytes < self.min_chunk_size: + try: + data = next(self._generator) + buffer.append(data) + buffered_bytes += len(data) + except StopIteration: + # Everything is serialized, write any data, then finalize the + # producer. + self._send_data(buffer) + self.request.unregisterProducer() + self.request.finish() + self.stopProducing() + return + + self._send_data(data) def stopProducing(self): self._generator = None From 69c5475301f7c5fb964d43a4c16732931da42926 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 3 Aug 2020 09:44:20 -0400 Subject: [PATCH 05/18] Lint. --- synapse/http/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/http/server.py b/synapse/http/server.py index ce0ea6d64917..dfb299cb7eb4 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -496,6 +496,7 @@ class _JsonProducer: """ Iteratively write JSON to the request. """ + # The minimum number of bytes for each chunk. Note that the last chunk will # usually be smaller than this. min_chunk_size = 1024 From 4579dbb403397bfbd1c788c307ed4ac7e80fd834 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 3 Aug 2020 11:02:46 -0400 Subject: [PATCH 06/18] Fix bug from re-factoring. --- synapse/http/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index dfb299cb7eb4..d22fbbc1e0f7 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -539,7 +539,7 @@ def resumeProducing(self): self.stopProducing() return - self._send_data(data) + self._send_data(buffer) def stopProducing(self): self._generator = None From 5675445d23d586ec1634837526332af3561e500a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 13:38:10 -0400 Subject: [PATCH 07/18] Updates to use a public API from canonicaljson. --- synapse/http/server.py | 50 ++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index d22fbbc1e0f7..ce9ca585685d 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -22,10 +22,14 @@ import urllib from http import HTTPStatus from io import BytesIO -from typing import Any, Callable, Dict, List, Tuple, Union +from typing import Any, Callable, Dict, Iterator, List, Tuple, Union import jinja2 -from canonicaljson import _canonical_encoder, _pretty_encoder, json +from canonicaljson import ( + iterencode_canonical_json, + iterencode_pretty_printed_json, + json, +) from twisted.internet import defer from twisted.python import failure @@ -501,24 +505,29 @@ class _JsonProducer: # usually be smaller than this. min_chunk_size = 1024 - def __init__(self, request, json_encoder, json_object): - self.request = request - self._generator = json_encoder.iterencode(json_object) + def __init__( + self, + request: Request, + json_encoder: Callable[[Any], Iterator[bytes]], + json_object: Any, + ): + self._request = request + self._generator = json_encoder(json_object) def start(self): - self.request.registerProducer(self, False) + self._request.registerProducer(self, False) - def _send_data(self, data: List[str]): + def _send_data(self, data: List[bytes]) -> None: """ Send a list of strings as a response to the request. """ if not data: return - self.request.write("".join(data).encode("utf-8")) + self._request.write(b"".join(data)) def resumeProducing(self): # We've stopped producing in the meantime. - if not self.request: + if not self._request: return # Get the next chunk and write it to the request. Calling write will @@ -534,16 +543,25 @@ def resumeProducing(self): # Everything is serialized, write any data, then finalize the # producer. self._send_data(buffer) - self.request.unregisterProducer() - self.request.finish() + self._request.unregisterProducer() + self._request.finish() self.stopProducing() return self._send_data(buffer) def stopProducing(self): - self._generator = None - self.request = None + self._generator = None # type: ignore + self._request = None + + +_json_encoder = json.JSONEncoder(separators=(",", ":")) + + +def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: + """""" + for chunk in _json_encoder.iterencode(json_object): + yield chunk.encode("utf-8") def respond_with_json( @@ -580,13 +598,13 @@ def respond_with_json( return None if pretty_print: - encoder = _pretty_encoder + encoder = iterencode_pretty_printed_json else: if canonical_json or synapse.events.USE_FROZEN_DICTS: - encoder = _canonical_encoder + encoder = iterencode_canonical_json else: # TODO Re-use this. - encoder = json.JSONEncoder(separators=(",", ":")) + encoder = _encode_json_bytes request.setResponseCode(code) request.setHeader(b"Content-Type", b"application/json") From 23d3cca143d0f0a752f05fa9f5bf2e6c8f976972 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 13:39:46 -0400 Subject: [PATCH 08/18] Temporarily use a branch of canonicaljson. --- synapse/python_dependencies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index abea2be4ef5f..1ec44ca75ff1 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -43,7 +43,7 @@ "jsonschema>=2.5.1", "frozendict>=1", "unpaddedbase64>=1.1.0", - "canonicaljson>=1.2.0", + "canonicaljson@https://github.com/matrix-org/python-canonicaljson/archive/clokep/iter-methods.zip", # we use the type definitions added in signedjson 1.1. "signedjson>=1.1.0", "pynacl>=1.2.1", From cf426ee84467cb4439622006355b5da57d8fb3a4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 13:42:58 -0400 Subject: [PATCH 09/18] Fill in a missing comment. --- synapse/http/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index ce9ca585685d..f0738e761125 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -559,7 +559,9 @@ def stopProducing(self): def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: - """""" + """ + Encode an object into JSON. Returns an iterator of bytes. + """ for chunk in _json_encoder.iterencode(json_object): yield chunk.encode("utf-8") From 21e1bfe812c63d51d785f2dfe094da18bb6b6696 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 13:44:25 -0400 Subject: [PATCH 10/18] Remove obsolete comment. --- synapse/http/server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index f0738e761125..01a20c326890 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -605,7 +605,6 @@ def respond_with_json( if canonical_json or synapse.events.USE_FROZEN_DICTS: encoder = iterencode_canonical_json else: - # TODO Re-use this. encoder = _encode_json_bytes request.setResponseCode(code) From 6e250568a9be5cc638ff0ffd06b24acf94748bf7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 14:21:44 -0400 Subject: [PATCH 11/18] Add some interface information. --- synapse/http/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 01a20c326890..1b38a0a80767 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -30,8 +30,9 @@ iterencode_pretty_printed_json, json, ) +from zope.interface import implementer -from twisted.internet import defer +from twisted.internet import defer, interfaces from twisted.python import failure from twisted.web import resource from twisted.web.server import NOT_DONE_YET, Request @@ -496,6 +497,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): pass +@implementer(interfaces.IPullProducer) class _JsonProducer: """ Iteratively write JSON to the request. From d43d309c6ec3e117913c86dd1b48cf8a4391d34b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 7 Aug 2020 07:29:37 -0400 Subject: [PATCH 12/18] Convert code to byte iterator instead of JSON iterator. --- synapse/http/server.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 84fa73f3b9c4..b95a7c3fafdc 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -504,9 +504,9 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): @implementer(interfaces.IPullProducer) -class _JsonProducer: +class _ByteProducer: """ - Iteratively write JSON to the request. + Iteratively write bytes to the request. """ # The minimum number of bytes for each chunk. Note that the last chunk will @@ -514,13 +514,10 @@ class _JsonProducer: min_chunk_size = 1024 def __init__( - self, - request: Request, - json_encoder: Callable[[Any], Iterator[bytes]], - json_object: Any, + self, request: Request, iterator: Iterator[bytes], ): self._request = request - self._generator = json_encoder(json_object) + self._iterator = iterator def start(self): self._request.registerProducer(self, False) @@ -544,7 +541,7 @@ def resumeProducing(self): buffered_bytes = 0 while buffered_bytes < self.min_chunk_size: try: - data = next(self._generator) + data = next(self._iterator) buffer.append(data) buffered_bytes += len(data) except StopIteration: @@ -622,7 +619,7 @@ def respond_with_json( if send_cors: set_cors_headers(request) - producer = _JsonProducer(request, encoder, json_object) + producer = _ByteProducer(request, encoder(json_object)) producer.start() return NOT_DONE_YET From 26cd7be87188ade32918d533f7372249206511ff Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 07:47:23 -0400 Subject: [PATCH 13/18] Bump to released canonicaljson version. --- synapse/python_dependencies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e1873423accb..d500b76d787e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -43,7 +43,7 @@ "jsonschema>=2.5.1", "frozendict>=1", "unpaddedbase64>=1.1.0", - "canonicaljson@https://github.com/matrix-org/python-canonicaljson/archive/clokep/iter-methods.zip", + "canonicaljson>=1.3.0", # we use the type definitions added in signedjson 1.1. "signedjson>=1.1.0", "pynacl>=1.2.1", From 7469b0771c4f1ebfee0eba6fc1dbd98673a05642 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 07:49:15 -0400 Subject: [PATCH 14/18] Fix imports. --- synapse/http/server.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index eef8aa61042a..f330664e05eb 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -25,10 +25,7 @@ from typing import Any, Callable, Dict, Iterator, List, Tuple, Union import jinja2 -from canonicaljson import ( - iterencode_canonical_json, - iterencode_pretty_printed_json, -) +from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json from zope.interface import implementer from twisted.internet import defer, interfaces From 3a3d2886fabf5fcdcbbb0639bbf8c54e31c6dc0d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 08:23:49 -0400 Subject: [PATCH 15/18] Fix JSON test. --- tests/test_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_server.py b/tests/test_server.py index d628070e48d6..655c918a15fe 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -178,7 +178,6 @@ def _callback(request, **kwargs): self.assertEqual(channel.result["code"], b"200") self.assertNotIn("body", channel.result) - self.assertEqual(channel.headers.getRawHeaders(b"Content-Length"), [b"15"]) class OptionsResourceTests(unittest.TestCase): From b9d09c0b318919c5361d43c6329b51ad2d6196c5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 07:49:03 -0400 Subject: [PATCH 16/18] Add return type hints. Co-authored-by: Erik Johnston --- synapse/http/server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index f330664e05eb..8d59e9ce9147 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -516,7 +516,7 @@ def __init__( self._request = request self._iterator = iterator - def start(self): + def start(self) -> None: self._request.registerProducer(self, False) def _send_data(self, data: List[bytes]) -> None: @@ -527,7 +527,7 @@ def _send_data(self, data: List[bytes]) -> None: return self._request.write(b"".join(data)) - def resumeProducing(self): + def resumeProducing(self) -> None: # We've stopped producing in the meantime. if not self._request: return @@ -552,7 +552,7 @@ def resumeProducing(self): self._send_data(buffer) - def stopProducing(self): + def stopProducing(self) -> None: self._generator = None # type: ignore self._request = None From c00905c5099914942806781763cfcf1af9db0917 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 08:12:20 -0400 Subject: [PATCH 17/18] Review comments. --- synapse/http/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 8d59e9ce9147..f2daf247bbf7 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -534,6 +534,9 @@ def resumeProducing(self) -> None: # Get the next chunk and write it to the request. Calling write will # spin the reactor (and might be re-entrant). + # + # Note that buffer stores a list of bytes (instead of appending to + # bytes) to hopefully avoid many allocations. buffer = [] buffered_bytes = 0 while buffered_bytes < self.min_chunk_size: @@ -553,7 +556,6 @@ def resumeProducing(self) -> None: self._send_data(buffer) def stopProducing(self) -> None: - self._generator = None # type: ignore self._request = None From 65b9e90d43fcb81fa9da605d4b2359c6ba2afa2e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 08:28:23 -0400 Subject: [PATCH 18/18] Clarify some comments. --- synapse/http/server.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index f2daf247bbf7..37fdf14405ec 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -528,12 +528,16 @@ def _send_data(self, data: List[bytes]) -> None: self._request.write(b"".join(data)) def resumeProducing(self) -> None: - # We've stopped producing in the meantime. + # We've stopped producing in the meantime (note that this might be + # re-entrant after calling write). if not self._request: return - # Get the next chunk and write it to the request. Calling write will - # spin the reactor (and might be re-entrant). + # Get the next chunk and write it to the request. + # + # The output of the JSON encoder is coalesced until min_chunk_size is + # reached. (This is because JSON encoders produce a very small output + # per iteration.) # # Note that buffer stores a list of bytes (instead of appending to # bytes) to hopefully avoid many allocations. @@ -545,8 +549,9 @@ def resumeProducing(self) -> None: buffer.append(data) buffered_bytes += len(data) except StopIteration: - # Everything is serialized, write any data, then finalize the - # producer. + # The entire JSON object has been serialized, write any + # remaining data, finalize the producer and the request, and + # clean-up any references. self._send_data(buffer) self._request.unregisterProducer() self._request.finish()