33
33
import com .google .api .gax .httpjson .ApiMethodDescriptor .MethodType ;
34
34
import com .google .api .gax .httpjson .HttpRequestRunnable .ResultListener ;
35
35
import com .google .api .gax .httpjson .HttpRequestRunnable .RunnableResult ;
36
+ import com .google .api .gax .rpc .StatusCode ;
36
37
import com .google .common .base .Preconditions ;
37
38
import java .io .IOException ;
38
39
import java .io .InputStreamReader ;
39
40
import java .io .Reader ;
40
41
import java .nio .charset .StandardCharsets ;
42
+ import java .time .Duration ;
41
43
import java .util .ArrayDeque ;
42
44
import java .util .Queue ;
43
45
import java .util .concurrent .CancellationException ;
44
46
import java .util .concurrent .Executor ;
47
+ import java .util .concurrent .ScheduledExecutorService ;
48
+ import java .util .concurrent .TimeUnit ;
45
49
import javax .annotation .Nullable ;
46
50
import javax .annotation .concurrent .GuardedBy ;
47
51
@@ -88,6 +92,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
88
92
private final ApiMethodDescriptor <RequestT , ResponseT > methodDescriptor ;
89
93
private final HttpTransport httpTransport ;
90
94
private final Executor executor ;
95
+ private final ScheduledExecutorService deadlineCancellationExecutor ;
91
96
92
97
//
93
98
// Request-specific data (provided by client code) before we get a response.
@@ -114,19 +119,21 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
114
119
private ProtoMessageJsonStreamIterator responseStreamIterator ;
115
120
116
121
@ GuardedBy ("lock" )
117
- private boolean closed ;
122
+ private volatile boolean closed ;
118
123
119
124
HttpJsonClientCallImpl (
120
125
ApiMethodDescriptor <RequestT , ResponseT > methodDescriptor ,
121
126
String endpoint ,
122
127
HttpJsonCallOptions callOptions ,
123
128
HttpTransport httpTransport ,
124
- Executor executor ) {
129
+ Executor executor ,
130
+ ScheduledExecutorService deadlineCancellationExecutor ) {
125
131
this .methodDescriptor = methodDescriptor ;
126
132
this .endpoint = endpoint ;
127
133
this .callOptions = callOptions ;
128
134
this .httpTransport = httpTransport ;
129
135
this .executor = executor ;
136
+ this .deadlineCancellationExecutor = deadlineCancellationExecutor ;
130
137
this .closed = false ;
131
138
}
132
139
@@ -161,6 +168,38 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
161
168
this .listener = responseListener ;
162
169
this .requestHeaders = requestHeaders ;
163
170
}
171
+
172
+ // Use the timeout duration value instead of calculating the future Instant
173
+ // Only schedule the deadline if the RPC timeout has been set in the RetrySettings
174
+ Duration timeout = callOptions .getTimeout ();
175
+ if (timeout != null ) {
176
+ // The future timeout value is guaranteed to not be a negative value as the
177
+ // RetryAlgorithm will not retry
178
+ long timeoutMs = timeout .toMillis ();
179
+ this .deadlineCancellationExecutor .schedule (this ::timeout , timeoutMs , TimeUnit .MILLISECONDS );
180
+ }
181
+ }
182
+
183
+ // Notify the FutureListener that the there is a timeout exception from this RPC
184
+ // call (DEADLINE_EXCEEDED). For retrying RPCs, this code is returned for every attempt
185
+ // that exceeds the timeout. The RetryAlgorithm will check both the timing and code to
186
+ // ensure another attempt is made.
187
+ private void timeout () {
188
+ // There is a race between the deadline scheduler and response being returned from
189
+ // the server. The deadline scheduler has priority as it will clear out the pending
190
+ // notifications queue and add the DEADLINE_EXCEEDED event once it is able to obtain
191
+ // the lock.
192
+ synchronized (lock ) {
193
+ close (
194
+ StatusCode .Code .DEADLINE_EXCEEDED .getHttpStatusCode (),
195
+ "Deadline exceeded" ,
196
+ new HttpJsonStatusRuntimeException (
197
+ StatusCode .Code .DEADLINE_EXCEEDED .getHttpStatusCode (), "Deadline exceeded" , null ),
198
+ true );
199
+ }
200
+
201
+ // trigger delivery loop if not already running
202
+ deliver ();
164
203
}
165
204
166
205
@ Override
@@ -260,9 +299,10 @@ private void deliver() {
260
299
throw new InterruptedException ("Message delivery has been interrupted" );
261
300
}
262
301
263
- // All listeners must be called under delivery loop (but outside the lock) to ensure that no
264
- // two notifications come simultaneously from two different threads and that we do not go
265
- // indefinitely deep in the stack if delivery logic is called recursively via listeners.
302
+ // All listeners must be called under delivery loop (but outside the lock) to ensure that
303
+ // no two notifications come simultaneously from two different threads and that we do not
304
+ // go indefinitely deep in the stack if delivery logic is called recursively via
305
+ // listeners.
266
306
notifyListeners ();
267
307
268
308
// The synchronized block around message reading and cancellation notification processing
@@ -302,7 +342,7 @@ private void deliver() {
302
342
inDelivery = false ;
303
343
break ;
304
344
} else {
305
- // We still have some stuff in notiticationTasksQueue so continue the loop, most
345
+ // We still have some stuff in notificationTasksQueue so continue the loop, most
306
346
// likely we will finally terminate on the next cycle.
307
347
continue ;
308
348
}
@@ -319,8 +359,8 @@ private void deliver() {
319
359
// can do in such an unlikely situation (otherwise we would stay forever in the delivery
320
360
// loop).
321
361
synchronized (lock ) {
322
- // Close the call immediately marking it cancelled. If already closed close() will have no
323
- // effect.
362
+ // Close the call immediately marking it cancelled. If already closed, close() will have
363
+ // no effect.
324
364
close (ex .getStatusCode (), ex .getMessage (), ex , true );
325
365
}
326
366
}
@@ -352,7 +392,7 @@ private boolean consumeMessageFromStream() throws IOException {
352
392
boolean allMessagesConsumed ;
353
393
Reader responseReader ;
354
394
if (methodDescriptor .getType () == MethodType .SERVER_STREAMING ) {
355
- // Lazily initialize responseStreamIterator in case if it is a server steraming response
395
+ // Lazily initialize responseStreamIterator in case if it is a server streaming response
356
396
if (responseStreamIterator == null ) {
357
397
responseStreamIterator =
358
398
new ProtoMessageJsonStreamIterator (
@@ -384,7 +424,7 @@ private boolean consumeMessageFromStream() throws IOException {
384
424
385
425
@ GuardedBy ("lock" )
386
426
private void close (
387
- int statusCode , String message , Throwable cause , boolean terminateImmediatelly ) {
427
+ int statusCode , String message , Throwable cause , boolean terminateImmediately ) {
388
428
try {
389
429
if (closed ) {
390
430
return ;
@@ -399,12 +439,12 @@ private void close(
399
439
requestRunnable = null ;
400
440
}
401
441
402
- HttpJsonMetadata .Builder meatadaBuilder = HttpJsonMetadata .newBuilder ();
442
+ HttpJsonMetadata .Builder metadataBuilder = HttpJsonMetadata .newBuilder ();
403
443
if (runnableResult != null && runnableResult .getTrailers () != null ) {
404
- meatadaBuilder = runnableResult .getTrailers ().toBuilder ();
444
+ metadataBuilder = runnableResult .getTrailers ().toBuilder ();
405
445
}
406
- meatadaBuilder .setException (cause );
407
- meatadaBuilder .setStatusMessage (message );
446
+ metadataBuilder .setException (cause );
447
+ metadataBuilder .setStatusMessage (message );
408
448
if (responseStreamIterator != null ) {
409
449
responseStreamIterator .close ();
410
450
}
@@ -415,19 +455,19 @@ private void close(
415
455
// onClose() suppresses all other pending notifications.
416
456
// there should be no place in the code which inserts something in this queue before checking
417
457
// the `closed` flag under the lock and refusing to insert anything if `closed == true`.
418
- if (terminateImmediatelly ) {
458
+ if (terminateImmediately ) {
419
459
// This usually means we are cancelling the call before processing the response in full.
420
460
// It may happen if a user explicitly cancels the call or in response to an unexpected
421
461
// exception either from server or a call listener execution.
422
462
pendingNotifications .clear ();
423
463
}
424
464
425
465
pendingNotifications .offer (
426
- new OnCloseNotificationTask <>(listener , statusCode , meatadaBuilder .build ()));
466
+ new OnCloseNotificationTask <>(listener , statusCode , metadataBuilder .build ()));
427
467
428
468
} catch (Throwable e ) {
429
469
// suppress stream closing exceptions in favor of the actual call closing cause. This method
430
- // should not throw, otherwise we may stuck in an infinite loop of exception processing.
470
+ // should not throw, otherwise we may be stuck in an infinite loop of exception processing.
431
471
}
432
472
}
433
473
0 commit comments