45
45
import org .elasticsearch .transport .Transport ;
46
46
import org .elasticsearch .transport .TransportException ;
47
47
import org .elasticsearch .transport .TransportRequestOptions ;
48
+ import org .elasticsearch .transport .TransportResponseHandler ;
48
49
import org .elasticsearch .transport .TransportService ;
49
50
50
51
import java .io .IOException ;
@@ -106,13 +107,36 @@ public PublicationTransportHandler(
106
107
this .namedWriteableRegistry = namedWriteableRegistry ;
107
108
this .handlePublishRequest = handlePublishRequest ;
108
109
110
+ var generic = transportService .getThreadPool ().generic ();
109
111
transportService .registerRequestHandler (
110
112
PUBLISH_STATE_ACTION_NAME ,
111
- transportService . getThreadPool (). generic () ,
113
+ TransportResponseHandler . TRANSPORT_WORKER ,
112
114
false ,
113
115
false ,
114
116
BytesTransportRequest ::new ,
115
- (request , channel , task ) -> this .handleIncomingPublishRequest (request , new ChannelActionListener <>(channel ))
117
+ (req , channel , task ) -> {
118
+ req .mustIncRef ();
119
+ generic .execute (new ActionRunnable <>(new ChannelActionListener <PublishWithJoinResponse >(channel )) {
120
+
121
+ private BytesTransportRequest request = req ;
122
+
123
+ @ Override
124
+ protected void doRun () throws IOException {
125
+ var request = this .request ;
126
+ this .request = null ;
127
+ handleIncomingPublishRequest (request , listener );
128
+ }
129
+
130
+ @ Override
131
+ public void onFailure (Exception e ) {
132
+ if (request != null ) {
133
+ request .decRef ();
134
+ this .request = null ;
135
+ }
136
+ super .onFailure (e );
137
+ }
138
+ });
139
+ }
116
140
);
117
141
}
118
142
@@ -129,10 +153,12 @@ private void handleIncomingPublishRequest(
129
153
BytesTransportRequest request ,
130
154
ActionListener <PublishWithJoinResponse > publishResponseListener
131
155
) throws IOException {
132
- assert ThreadPool .assertCurrentThreadPool (GENERIC );
133
- final Compressor compressor = CompressorFactory .compressor (request .bytes ());
134
- StreamInput in = request .bytes ().streamInput ();
156
+ var bytes = request .bytes ();
157
+ StreamInput in = bytes .streamInput ();
135
158
try {
159
+ assert ThreadPool .assertCurrentThreadPool (GENERIC );
160
+ final Compressor compressor = CompressorFactory .compressor (bytes );
161
+ final int requestLength = bytes .length ();
136
162
if (compressor != null ) {
137
163
in = compressor .threadLocalStreamInput (in );
138
164
}
@@ -150,8 +176,10 @@ private void handleIncomingPublishRequest(
150
176
assert false : e ;
151
177
throw e ;
152
178
}
179
+ request .decRef ();
180
+ request = null ;
153
181
fullClusterStateReceivedCount .incrementAndGet ();
154
- logger .debug ("received full cluster state version [{}] with size [{}]" , incomingState .version (), request . bytes (). length () );
182
+ logger .debug ("received full cluster state version [{}] with size [{}]" , incomingState .version (), requestLength );
155
183
acceptState (incomingState , publishResponseListener .map (response -> {
156
184
lastSeenClusterState .set (incomingState );
157
185
return response ;
@@ -164,12 +192,14 @@ private void handleIncomingPublishRequest(
164
192
throw new IncompatibleClusterStateVersionException ("have no local cluster state" );
165
193
} else {
166
194
final ClusterState incomingState = deserializeAndApplyDiff (request , in , lastSeen );
195
+ request .decRef ();
196
+ request = null ;
167
197
compatibleClusterStateDiffReceivedCount .incrementAndGet ();
168
198
logger .debug (
169
199
"received diff cluster state version [{}] with uuid [{}], diff size [{}]" ,
170
200
incomingState .version (),
171
201
incomingState .stateUUID (),
172
- request . bytes (). length ()
202
+ requestLength
173
203
);
174
204
acceptState (incomingState , publishResponseListener .map (response -> {
175
205
lastSeenClusterState .compareAndSet (lastSeen , incomingState );
@@ -178,6 +208,9 @@ private void handleIncomingPublishRequest(
178
208
}
179
209
}
180
210
} finally {
211
+ if (request != null ) {
212
+ request .decRef ();
213
+ }
181
214
IOUtils .close (in );
182
215
}
183
216
}
0 commit comments