|
16 | 16 | import io.nats.client.*;
|
17 | 17 | import io.nats.client.api.Error;
|
18 | 18 | import io.nats.client.api.*;
|
19 |
| -import io.nats.client.support.Status; |
20 | 19 |
|
21 | 20 | import java.io.IOException;
|
22 | 21 | import java.nio.charset.StandardCharsets;
|
23 | 22 | import java.time.ZonedDateTime;
|
24 |
| -import java.util.ArrayList; |
25 | 23 | import java.util.List;
|
26 |
| -import java.util.concurrent.LinkedBlockingQueue; |
27 | 24 |
|
28 |
| -import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired; |
29 |
| -import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable; |
30 | 25 | import static io.nats.client.support.Validator.*;
|
31 | 26 |
|
32 | 27 | public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
|
@@ -345,109 +340,6 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
|
345 | 340 | }
|
346 | 341 | }
|
347 | 342 |
|
348 |
| - /** |
349 |
| - * {@inheritDoc} |
350 |
| - */ |
351 |
| - @Override |
352 |
| - public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { |
353 |
| - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); |
354 |
| - List<MessageInfo> results = new ArrayList<>(); |
355 |
| - _requestMessageBatch(streamName, messageBatchGetRequest, msg -> { |
356 |
| - if (msg != MessageInfo.EOD) { |
357 |
| - results.add(msg); |
358 |
| - } |
359 |
| - }); |
360 |
| - return results; |
361 |
| - } |
362 |
| - |
363 |
| - /** |
364 |
| - * {@inheritDoc} |
365 |
| - */ |
366 |
| - @Override |
367 |
| - public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { |
368 |
| - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); |
369 |
| - final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>(); |
370 |
| - conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add)); |
371 |
| - return q; |
372 |
| - } |
373 |
| - |
374 |
| - /** |
375 |
| - * {@inheritDoc} |
376 |
| - */ |
377 |
| - @Override |
378 |
| - public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException { |
379 |
| - validateMessageBatchGetRequest(streamName, messageBatchGetRequest); |
380 |
| - _requestMessageBatch(streamName, messageBatchGetRequest, handler); |
381 |
| - } |
382 |
| - |
383 |
| - public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) { |
384 |
| - Subscription sub = null; |
385 |
| - try { |
386 |
| - String replyTo = conn.createInbox(); |
387 |
| - sub = conn.subscribe(replyTo); |
388 |
| - |
389 |
| - String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName)); |
390 |
| - conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize()); |
391 |
| - |
392 |
| - long maxTimeMillis = getTimeout().toMillis(); |
393 |
| - long timeLeft = maxTimeMillis; |
394 |
| - long start = System.currentTimeMillis(); |
395 |
| - while (true) { |
396 |
| - Message msg = sub.nextMessage(timeLeft); |
397 |
| - if (msg == null) { |
398 |
| - break; |
399 |
| - } |
400 |
| - if (msg.isStatusMessage()) { |
401 |
| - Status status = msg.getStatus(); |
402 |
| - // Report error, otherwise successful status. |
403 |
| - if (status.getCode() < 200 || status.getCode() > 299) { |
404 |
| - MessageInfo messageInfo = new MessageInfo(Error.convert(status), true); |
405 |
| - handler.onMessageInfo(messageInfo); |
406 |
| - } |
407 |
| - break; |
408 |
| - } |
409 |
| - |
410 |
| - Headers headers = msg.getHeaders(); |
411 |
| - if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) { |
412 |
| - throw JsDirectBatchGet211NotAvailable.instance(); |
413 |
| - } |
414 |
| - |
415 |
| - MessageInfo messageInfo = new MessageInfo(msg, streamName, true); |
416 |
| - handler.onMessageInfo(messageInfo); |
417 |
| - timeLeft = maxTimeMillis - (System.currentTimeMillis() - start); |
418 |
| - } |
419 |
| - } catch (InterruptedException e) { |
420 |
| - // sub.nextMessage was fetching one message |
421 |
| - // and data is not completely read |
422 |
| - // so it seems like this is an error condition |
423 |
| - Thread.currentThread().interrupt(); |
424 |
| - throw new RuntimeException(e); |
425 |
| - } finally { |
426 |
| - try { |
427 |
| - handler.onMessageInfo(MessageInfo.EOD); |
428 |
| - } catch (Exception ignore) { |
429 |
| - } |
430 |
| - try { |
431 |
| - //noinspection DataFlowIssue |
432 |
| - sub.unsubscribe(); |
433 |
| - } catch (Exception ignore) { |
434 |
| - } |
435 |
| - } |
436 |
| - } |
437 |
| - |
438 |
| - private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException { |
439 |
| - validateNotNull(messageBatchGetRequest, "Message Batch Get Request"); |
440 |
| - |
441 |
| - if (!directBatchGet211Available) { |
442 |
| - throw JsDirectBatchGet211NotAvailable.instance(); |
443 |
| - } |
444 |
| - |
445 |
| - CachedStreamInfo csi = getCachedStreamInfo(streamName); |
446 |
| - if (!csi.allowDirect) { |
447 |
| - throw JsAllowDirectRequired.instance(); |
448 |
| - } |
449 |
| - } |
450 |
| - |
451 | 343 | /**
|
452 | 344 | * {@inheritDoc}
|
453 | 345 | */
|
|
0 commit comments