@@ -86,6 +86,11 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
86
86
this .slowLogThresholdMs = slowLogThreshold .getMillis ();
87
87
}
88
88
89
+ /**
90
+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
91
+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
92
+ * the message themselves otherwise
93
+ */
89
94
void inboundMessage (TcpChannel channel , InboundMessage message ) throws Exception {
90
95
final long startTime = threadPool .rawRelativeTimeInMillis ();
91
96
channel .getChannelStats ().markAccessed (startTime );
@@ -101,6 +106,11 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception
101
106
// Empty stream constant to avoid instantiating a new stream for empty messages.
102
107
private static final StreamInput EMPTY_STREAM_INPUT = new ByteBufferStreamInput (ByteBuffer .wrap (BytesRef .EMPTY_BYTES ));
103
108
109
+ /**
110
+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
111
+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
112
+ * the message themselves otherwise
113
+ */
104
114
private void messageReceived (TcpChannel channel , InboundMessage message , long startTime ) throws IOException {
105
115
final InetSocketAddress remoteAddress = channel .getRemoteAddress ();
106
116
final Header header = message .getHeader ();
@@ -136,6 +146,11 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
136
146
}
137
147
}
138
148
149
+ /**
150
+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
151
+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
152
+ * the message themselves otherwise
153
+ */
139
154
private void executeResponseHandler (
140
155
InboundMessage message ,
141
156
TransportResponseHandler <?> responseHandler ,
@@ -221,6 +236,11 @@ private void verifyResponseReadFully(Header header, TransportResponseHandler<?>
221
236
}
222
237
}
223
238
239
+ /**
240
+ * @param message the transport message received, guaranteed to be closed by this method if it returns without exception.
241
+ * Callers must ensure that {@code message} is closed if this method throws an exception but must not release
242
+ * the message themselves otherwise
243
+ */
224
244
private <T extends TransportRequest > void handleRequest (TcpChannel channel , InboundMessage message ) throws IOException {
225
245
final Header header = message .getHeader ();
226
246
if (header .isHandshake ()) {
@@ -333,6 +353,9 @@ public void onAfter() {
333
353
}
334
354
}
335
355
356
+ /**
357
+ * @param message guaranteed to get closed by this method
358
+ */
336
359
private void handleHandshakeRequest (TcpChannel channel , InboundMessage message ) throws IOException {
337
360
var header = message .getHeader ();
338
361
assert header .actionName .equals (TransportHandshaker .HANDSHAKE_ACTION_NAME );
@@ -373,27 +396,30 @@ private static void sendErrorResponse(String actionName, TransportChannel transp
373
396
}
374
397
}
375
398
399
+ /**
400
+ * @param message guaranteed to get closed by this method
401
+ */
376
402
private <T extends TransportResponse > void handleResponse (
377
403
InetSocketAddress remoteAddress ,
378
404
final StreamInput stream ,
379
405
final TransportResponseHandler <T > handler ,
380
- final InboundMessage inboundMessage
406
+ final InboundMessage message
381
407
) {
382
408
final var executor = handler .executor ();
383
409
if (executor == EsExecutors .DIRECT_EXECUTOR_SERVICE ) {
384
410
// no need to provide a buffer release here, we never escape the buffer when handling directly
385
- doHandleResponse (handler , remoteAddress , stream , inboundMessage );
411
+ doHandleResponse (handler , remoteAddress , stream , message );
386
412
} else {
387
413
// release buffer once we deserialize the message, but have a fail-safe in #onAfter below in case that didn't work out
388
414
executor .execute (new ForkingResponseHandlerRunnable (handler , null ) {
389
415
@ Override
390
416
protected void doRun () {
391
- doHandleResponse (handler , remoteAddress , stream , inboundMessage );
417
+ doHandleResponse (handler , remoteAddress , stream , message );
392
418
}
393
419
394
420
@ Override
395
421
public void onAfter () {
396
- inboundMessage .close ();
422
+ message .close ();
397
423
}
398
424
});
399
425
}
@@ -404,7 +430,7 @@ public void onAfter() {
404
430
* @param handler response handler
405
431
* @param remoteAddress remote address that the message was sent from
406
432
* @param stream bytes stream for reading the message
407
- * @param inboundMessage inbound message
433
+ * @param inboundMessage inbound message, guaranteed to get closed by this method
408
434
* @param <T> response message type
409
435
*/
410
436
private <T extends TransportResponse > void doHandleResponse (
@@ -436,6 +462,9 @@ private <T extends TransportResponse> void doHandleResponse(
436
462
}
437
463
}
438
464
465
+ /**
466
+ * @param message guaranteed to get closed by this method
467
+ */
439
468
private void handlerResponseError (StreamInput stream , InboundMessage message , final TransportResponseHandler <?> handler ) {
440
469
Exception error ;
441
470
try (message ) {
0 commit comments