12
12
import org .apache .logging .log4j .Level ;
13
13
import org .apache .logging .log4j .LogManager ;
14
14
import org .apache .logging .log4j .Logger ;
15
- import org .apache .lucene .store .AlreadyClosedException ;
16
15
import org .apache .lucene .util .BytesRef ;
17
16
import org .elasticsearch .TransportVersion ;
18
17
import org .elasticsearch .TransportVersions ;
19
18
import org .elasticsearch .action .ActionListener ;
20
19
import org .elasticsearch .cluster .node .DiscoveryNode ;
20
+ import org .elasticsearch .common .Strings ;
21
21
import org .elasticsearch .common .bytes .BytesReference ;
22
+ import org .elasticsearch .common .bytes .CompositeBytesReference ;
23
+ import org .elasticsearch .common .bytes .ReleasableBytesReference ;
24
+ import org .elasticsearch .common .compress .CompressorFactory ;
25
+ import org .elasticsearch .common .io .stream .OutputStreamStreamOutput ;
22
26
import org .elasticsearch .common .io .stream .RecyclerBytesStreamOutput ;
27
+ import org .elasticsearch .common .io .stream .StreamOutput ;
28
+ import org .elasticsearch .common .io .stream .Writeable ;
23
29
import org .elasticsearch .common .network .CloseableChannel ;
24
30
import org .elasticsearch .common .network .HandlingTimeTracker ;
25
31
import org .elasticsearch .common .recycler .Recycler ;
26
32
import org .elasticsearch .common .transport .NetworkExceptionHelper ;
33
+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
27
34
import org .elasticsearch .core .Nullable ;
35
+ import org .elasticsearch .core .RefCounted ;
28
36
import org .elasticsearch .core .Releasable ;
29
37
import org .elasticsearch .core .Releasables ;
38
+ import org .elasticsearch .core .Streams ;
30
39
import org .elasticsearch .core .TimeValue ;
31
40
import org .elasticsearch .threadpool .ThreadPool ;
32
41
33
42
import java .io .IOException ;
43
+ import java .util .function .Supplier ;
34
44
35
45
import static org .elasticsearch .core .Strings .format ;
36
46
37
- final class OutboundHandler {
47
+ public final class OutboundHandler {
38
48
39
49
private static final Logger logger = LogManager .getLogger (OutboundHandler .class );
40
50
@@ -83,7 +93,7 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
83
93
* thread.
84
94
*/
85
95
void sendBytes (TcpChannel channel , BytesReference bytes , ActionListener <Void > listener ) {
86
- internalSend (channel , bytes , null , listener );
96
+ internalSend (channel , bytes , () -> "raw bytes" , listener );
87
97
}
88
98
89
99
/**
@@ -102,26 +112,17 @@ void sendRequest(
102
112
final boolean isHandshake
103
113
) throws IOException , TransportException {
104
114
assert assertValidTransportVersion (transportVersion );
105
- final OutboundMessage .Request message = new OutboundMessage .Request (
106
- threadPool .getThreadContext (),
107
- request ,
108
- transportVersion ,
115
+ sendMessage (
116
+ channel ,
109
117
action ,
118
+ request ,
110
119
requestId ,
111
120
isHandshake ,
112
- compressionScheme
121
+ compressionScheme ,
122
+ transportVersion ,
123
+ ResponseStatsConsumer .NONE ,
124
+ () -> messageListener .onRequestSent (node , requestId , action , request , options )
113
125
);
114
- if (request .tryIncRef () == false ) {
115
- assert false : "request [" + request + "] has been released already" ;
116
- throw new AlreadyClosedException ("request [" + request + "] has been released already" );
117
- }
118
- sendMessage (channel , message , ResponseStatsConsumer .NONE , () -> {
119
- try {
120
- messageListener .onRequestSent (node , requestId , action , request , options );
121
- } finally {
122
- request .decRef ();
123
- }
124
- });
125
126
}
126
127
127
128
/**
@@ -141,23 +142,19 @@ void sendResponse(
141
142
final ResponseStatsConsumer responseStatsConsumer
142
143
) {
143
144
assert assertValidTransportVersion (transportVersion );
144
- OutboundMessage .Response message = new OutboundMessage .Response (
145
- threadPool .getThreadContext (),
146
- response ,
147
- transportVersion ,
148
- requestId ,
149
- isHandshake ,
150
- compressionScheme
151
- );
152
- response .mustIncRef ();
145
+ assert response .hasReferences ();
153
146
try {
154
- sendMessage (channel , message , responseStatsConsumer , () -> {
155
- try {
156
- messageListener .onResponseSent (requestId , action );
157
- } finally {
158
- response .decRef ();
159
- }
160
- });
147
+ sendMessage (
148
+ channel ,
149
+ null ,
150
+ response ,
151
+ requestId ,
152
+ isHandshake ,
153
+ compressionScheme ,
154
+ transportVersion ,
155
+ responseStatsConsumer ,
156
+ () -> messageListener .onResponseSent (requestId , action )
157
+ );
161
158
} catch (Exception ex ) {
162
159
if (isHandshake ) {
163
160
logger .error (
@@ -187,16 +184,19 @@ void sendErrorResponse(
187
184
final Exception error
188
185
) {
189
186
assert assertValidTransportVersion (transportVersion );
190
- OutboundMessage .Response message = new OutboundMessage .Response (
191
- threadPool .getThreadContext (),
192
- new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error ),
193
- transportVersion ,
194
- requestId ,
195
- false ,
196
- null
197
- );
187
+ var msg = new RemoteTransportException (nodeName , channel .getLocalAddress (), action , error );
198
188
try {
199
- sendMessage (channel , message , responseStatsConsumer , () -> messageListener .onResponseSent (requestId , action , error ));
189
+ sendMessage (
190
+ channel ,
191
+ null ,
192
+ msg ,
193
+ requestId ,
194
+ false ,
195
+ null ,
196
+ transportVersion ,
197
+ responseStatsConsumer ,
198
+ () -> messageListener .onResponseSent (requestId , action , error )
199
+ );
200
200
} catch (Exception sendException ) {
201
201
sendException .addSuppressed (error );
202
202
logger .error (() -> format ("Failed to send error response on channel [%s], closing channel" , channel ), sendException );
@@ -206,42 +206,157 @@ void sendErrorResponse(
206
206
207
207
private void sendMessage (
208
208
TcpChannel channel ,
209
- OutboundMessage networkMessage ,
209
+ @ Nullable String requestAction ,
210
+ Writeable writeable ,
211
+ long requestId ,
212
+ boolean isHandshake ,
213
+ Compression .Scheme compressionScheme ,
214
+ TransportVersion version ,
210
215
ResponseStatsConsumer responseStatsConsumer ,
211
216
Releasable onAfter
212
217
) throws IOException {
213
- final RecyclerBytesStreamOutput byteStreamOutput ;
214
- boolean bufferSuccess = false ;
215
- try {
216
- byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
217
- bufferSuccess = true ;
218
- } finally {
219
- if (bufferSuccess == false ) {
220
- Releasables .closeExpectNoException (onAfter );
221
- }
222
- }
223
- final Releasable release = Releasables .wrap (byteStreamOutput , onAfter );
218
+ compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme ;
224
219
final BytesReference message ;
225
220
boolean serializeSuccess = false ;
221
+ final boolean isError = writeable instanceof RemoteTransportException ;
222
+ final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput (recycler );
226
223
try {
227
- message = networkMessage .serialize (byteStreamOutput );
224
+ message = serialize (
225
+ requestAction ,
226
+ requestId ,
227
+ isHandshake ,
228
+ version ,
229
+ isError ,
230
+ compressionScheme ,
231
+ writeable ,
232
+ threadPool .getThreadContext (),
233
+ byteStreamOutput
234
+ );
228
235
serializeSuccess = true ;
229
236
} catch (Exception e ) {
230
- logger .warn (() -> "failed to serialize outbound message [" + networkMessage + "]" , e );
237
+ logger .warn (() -> "failed to serialize outbound message [" + writeable + "]" , e );
231
238
throw e ;
232
239
} finally {
233
240
if (serializeSuccess == false ) {
234
- release .close ();
241
+ Releasables .close (byteStreamOutput , onAfter );
235
242
}
236
243
}
237
244
responseStatsConsumer .addResponseStats (message .length ());
238
- internalSend (channel , message , networkMessage , ActionListener .running (release ::close ));
245
+ final var responseType = writeable .getClass ();
246
+ final boolean compress = compressionScheme != null ;
247
+ internalSend (
248
+ channel ,
249
+ message ,
250
+ requestAction == null
251
+ ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
252
+ : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}" ,
253
+ ActionListener .releasing (
254
+ message instanceof ReleasableBytesReference r
255
+ ? Releasables .wrap (byteStreamOutput , onAfter , r )
256
+ : Releasables .wrap (byteStreamOutput , onAfter )
257
+ )
258
+ );
259
+ }
260
+
261
+ // public for tests
262
+ public static BytesReference serialize (
263
+ @ Nullable String requestAction ,
264
+ long requestId ,
265
+ boolean isHandshake ,
266
+ TransportVersion version ,
267
+ boolean isError ,
268
+ Compression .Scheme compressionScheme ,
269
+ Writeable writeable ,
270
+ ThreadContext threadContext ,
271
+ RecyclerBytesStreamOutput byteStreamOutput
272
+ ) throws IOException {
273
+ assert byteStreamOutput .position () == 0 ;
274
+ byteStreamOutput .setTransportVersion (version );
275
+ byteStreamOutput .skip (TcpHeader .HEADER_SIZE );
276
+ threadContext .writeTo (byteStreamOutput );
277
+ if (requestAction != null ) {
278
+ if (version .before (TransportVersions .V_8_0_0 )) {
279
+ // empty features array
280
+ byteStreamOutput .writeStringArray (Strings .EMPTY_ARRAY );
281
+ }
282
+ byteStreamOutput .writeString (requestAction );
283
+ }
284
+
285
+ final int variableHeaderLength = Math .toIntExact (byteStreamOutput .position () - TcpHeader .HEADER_SIZE );
286
+ BytesReference message = serializeMessageBody (writeable , compressionScheme , version , byteStreamOutput );
287
+ byte status = 0 ;
288
+ if (requestAction == null ) {
289
+ status = TransportStatus .setResponse (status );
290
+ }
291
+ if (isHandshake ) {
292
+ status = TransportStatus .setHandshake (status );
293
+ }
294
+ if (isError ) {
295
+ status = TransportStatus .setError (status );
296
+ }
297
+ if (compressionScheme != null ) {
298
+ status = TransportStatus .setCompress (status );
299
+ }
300
+ byteStreamOutput .seek (0 );
301
+ TcpHeader .writeHeader (byteStreamOutput , requestId , status , version , message .length () - TcpHeader .HEADER_SIZE , variableHeaderLength );
302
+ return message ;
303
+ }
304
+
305
+ private static BytesReference serializeMessageBody (
306
+ Writeable writeable ,
307
+ Compression .Scheme compressionScheme ,
308
+ TransportVersion version ,
309
+ RecyclerBytesStreamOutput byteStreamOutput
310
+ ) throws IOException {
311
+ // The compressible bytes stream will not close the underlying bytes stream
312
+ final StreamOutput stream = compressionScheme != null ? wrapCompressed (compressionScheme , byteStreamOutput ) : byteStreamOutput ;
313
+ final ReleasableBytesReference zeroCopyBuffer ;
314
+ try {
315
+ stream .setTransportVersion (version );
316
+ if (writeable instanceof BytesTransportRequest bRequest ) {
317
+ bRequest .writeThin (stream );
318
+ zeroCopyBuffer = bRequest .bytes ;
319
+ } else if (writeable instanceof RemoteTransportException remoteTransportException ) {
320
+ stream .writeException (remoteTransportException );
321
+ zeroCopyBuffer = ReleasableBytesReference .empty ();
322
+ } else {
323
+ writeable .writeTo (stream );
324
+ zeroCopyBuffer = ReleasableBytesReference .empty ();
325
+ }
326
+ } finally {
327
+ // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
328
+ // are written.
329
+ if (compressionScheme != null ) {
330
+ stream .close ();
331
+ }
332
+ }
333
+ final BytesReference msg = byteStreamOutput .bytes ();
334
+ if (zeroCopyBuffer .length () == 0 ) {
335
+ return msg ;
336
+ }
337
+ zeroCopyBuffer .mustIncRef ();
338
+ return new ReleasableBytesReference (CompositeBytesReference .of (msg , zeroCopyBuffer ), (RefCounted ) zeroCopyBuffer );
339
+ }
340
+
341
+ // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
342
+ // resources and write EOS marker bytes but must not yet release the bytes themselves
343
+ private static StreamOutput wrapCompressed (Compression .Scheme compressionScheme , RecyclerBytesStreamOutput bytesStream )
344
+ throws IOException {
345
+ if (compressionScheme == Compression .Scheme .DEFLATE ) {
346
+ return new OutputStreamStreamOutput (
347
+ CompressorFactory .COMPRESSOR .threadLocalOutputStream (org .elasticsearch .core .Streams .noCloseStream (bytesStream ))
348
+ );
349
+ } else if (compressionScheme == Compression .Scheme .LZ4 ) {
350
+ return new OutputStreamStreamOutput (Compression .Scheme .lz4OutputStream (Streams .noCloseStream (bytesStream )));
351
+ } else {
352
+ throw new IllegalArgumentException ("Invalid compression scheme: " + compressionScheme );
353
+ }
239
354
}
240
355
241
356
private void internalSend (
242
357
TcpChannel channel ,
243
358
BytesReference reference ,
244
- @ Nullable OutboundMessage message ,
359
+ Supplier < String > messageDescription ,
245
360
ActionListener <Void > listener
246
361
) {
247
362
final long startTime = threadPool .rawRelativeTimeInMillis ();
@@ -281,7 +396,7 @@ private void maybeLogSlowMessage(boolean success) {
281
396
logger .warn (
282
397
"sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
283
398
+ "threshold of [{}ms] with success [{}]" ,
284
- message ,
399
+ messageDescription . get () ,
285
400
messageSize ,
286
401
channel ,
287
402
took ,
0 commit comments