Skip to content

Commit 2737c70

Browse files
Release InboundMessage and request instances earlier
Follow-up to elastic#126138. We can now release requst bytes directly after deserialization. Also, request instances need not go through a ref-counting cycle when forking, removing some contention from transport threads.
1 parent 149ff93 commit 2737c70

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Inbo
264264
Releasables.assertOnce(message.takeBreakerReleaseControl())
265265
);
266266

267-
try (message) {
267+
try {
268268
messageListener.onRequestReceived(requestId, action);
269269
if (reg != null) {
270270
reg.addRequestStats(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
@@ -278,9 +278,11 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Inbo
278278
assert reg != null;
279279
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
280280
assert assertRemoteVersion(stream, header.getVersion());
281-
final T request;
281+
T request;
282282
try {
283283
request = reg.newRequest(stream);
284+
message.close();
285+
message = null;
284286
} catch (Exception e) {
285287
assert ignoreDeserializationErrors : e;
286288
throw e;
@@ -295,13 +297,20 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Inbo
295297
doHandleRequest(reg, request, transportChannel);
296298
}
297299
} else {
298-
handleRequestForking(request, reg, transportChannel);
300+
handleRequestForking(/* autocloses */ request, reg, transportChannel);
301+
request = null; // now owned by the thread we forked to
299302
}
300303
} finally {
301-
request.decRef();
304+
if (request != null) {
305+
request.decRef();
306+
}
302307
}
303308
} catch (Exception e) {
304309
sendErrorResponse(action, transportChannel, e);
310+
} finally {
311+
if (message != null) {
312+
message.close();
313+
}
305314
}
306315
}
307316

@@ -315,7 +324,6 @@ private static <T extends TransportRequest> void doHandleRequest(RequestHandlerR
315324

316325
private <T extends TransportRequest> void handleRequestForking(T request, RequestHandlerRegistry<T> reg, TransportChannel channel) {
317326
boolean success = false;
318-
request.mustIncRef();
319327
try {
320328
reg.getExecutor().execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
321329
@Override

0 commit comments

Comments
 (0)