-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Iteratively encode JSON responses #8013
Changes from 12 commits
7418fa9
f141196
133fe03
17798e6
69c5475
4579dbb
5675445
23d3cca
cf426ee
21e1bfe
6e25056
5a7399b
d43d309
9d86790
26cd7be
6df06f3
7469b07
3a3d288
b9d09c0
c00905c
65b9e90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Iteratively encode JSON to avoid blocking the reactor. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,12 +22,17 @@ | |
import urllib | ||
from http import HTTPStatus | ||
from io import BytesIO | ||
from typing import Any, Callable, Dict, Tuple, Union | ||
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union | ||
|
||
import jinja2 | ||
from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json | ||
from canonicaljson import ( | ||
iterencode_canonical_json, | ||
iterencode_pretty_printed_json, | ||
json, | ||
) | ||
from zope.interface import implementer | ||
|
||
from twisted.internet import defer | ||
from twisted.internet import defer, interfaces | ||
from twisted.python import failure | ||
from twisted.web import resource | ||
from twisted.web.server import NOT_DONE_YET, Request | ||
|
@@ -498,6 +503,77 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): | |
pass | ||
|
||
|
||
@implementer(interfaces.IPullProducer) | ||
class _JsonProducer: | ||
""" | ||
Iteratively write JSON to the request. | ||
""" | ||
|
||
# The minimum number of bytes for each chunk. Note that the last chunk will | ||
# usually be smaller than this. | ||
min_chunk_size = 1024 | ||
|
||
def __init__( | ||
self, | ||
request: Request, | ||
json_encoder: Callable[[Any], Iterator[bytes]], | ||
json_object: Any, | ||
): | ||
self._request = request | ||
self._generator = json_encoder(json_object) | ||
|
||
def start(self): | ||
self._request.registerProducer(self, False) | ||
|
||
def _send_data(self, data: List[bytes]) -> None: | ||
""" | ||
Send a list of strings as a response to the request. | ||
""" | ||
if not data: | ||
return | ||
self._request.write(b"".join(data)) | ||
|
||
def resumeProducing(self): | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# We've stopped producing in the meantime. | ||
if not self._request: | ||
return | ||
|
||
# Get the next chunk and write it to the request. Calling write will | ||
# spin the reactor (and might be re-entrant). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this true? We're' not awaiting here so the reactor should not be able to spin? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's cut-and-pasted from elsewhere; but I seem to remember that it is true that this can be re-entrant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was originally cut-and-pasted from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added some logging and I actually wasn't able to cause this to be re-entrant? Anyway the question of "how does the reactor spin?" is convoluted, but I think I found the answer:
The result is that in one reactor "tick" this should generate <~1 KB of data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, right interesting. Since this function is not a coroutine I don't think this can yield back to the reactor half way through the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, it "yields" when the function runs off, NOT in the middle of the function. Do you think any changes are needed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I still don't really see what this comment is trying to convey. If we want to highlight that this function may be re-entrant, it'd be good to mention what the code is doing to make that safe (I guess something to do with the fact that the write is the last thing we do in the function, and its only during There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it is important that you check if Maybe it isn't useful to clarify -- I can remove the part of the comment if you'd like (or maybe move it above the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Saying that at by the check I think would be best There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @erikjohnston I clarified a bunch of comments! Let me know if it is better now! |
||
buffer = [] | ||
buffered_bytes = 0 | ||
while buffered_bytes < self.min_chunk_size: | ||
try: | ||
data = next(self._generator) | ||
buffer.append(data) | ||
buffered_bytes += len(data) | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except StopIteration: | ||
# Everything is serialized, write any data, then finalize the | ||
# producer. | ||
self._send_data(buffer) | ||
self._request.unregisterProducer() | ||
self._request.finish() | ||
self.stopProducing() | ||
return | ||
|
||
self._send_data(buffer) | ||
|
||
def stopProducing(self): | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._generator = None # type: ignore | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._request = None | ||
|
||
|
||
_json_encoder = json.JSONEncoder(separators=(",", ":")) | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: | ||
""" | ||
Encode an object into JSON. Returns an iterator of bytes. | ||
""" | ||
for chunk in _json_encoder.iterencode(json_object): | ||
yield chunk.encode("utf-8") | ||
|
||
|
||
def respond_with_json( | ||
request: Request, | ||
code: int, | ||
|
@@ -532,15 +608,23 @@ def respond_with_json( | |
return None | ||
|
||
if pretty_print: | ||
json_bytes = encode_pretty_printed_json(json_object) + b"\n" | ||
encoder = iterencode_pretty_printed_json | ||
else: | ||
if canonical_json or synapse.events.USE_FROZEN_DICTS: | ||
# canonicaljson already encodes to bytes | ||
json_bytes = encode_canonical_json(json_object) | ||
encoder = iterencode_canonical_json | ||
else: | ||
json_bytes = json.dumps(json_object).encode("utf-8") | ||
encoder = _encode_json_bytes | ||
|
||
return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors) | ||
request.setResponseCode(code) | ||
request.setHeader(b"Content-Type", b"application/json") | ||
request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate") | ||
|
||
if send_cors: | ||
set_cors_headers(request) | ||
|
||
producer = _JsonProducer(request, encoder, json_object) | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
producer.start() | ||
return NOT_DONE_YET | ||
|
||
|
||
def respond_with_json_bytes( | ||
|
Uh oh!
There was an error while loading. Please reload this page.