45
45
import java .util .Set ;
46
46
import java .util .concurrent .CountDownLatch ;
47
47
import java .util .concurrent .TimeUnit ;
48
+ import java .util .function .Function ;
48
49
import java .util .stream .Collectors ;
49
50
50
51
import static org .opensearch .index .translog .transfer .FileSnapshot .TransferFileSnapshot ;
51
52
import static org .opensearch .index .translog .transfer .FileSnapshot .TranslogFileSnapshot ;
53
+ import static org .opensearch .index .translog .transfer .TranslogTransferMetadata .METADATA_SEPARATOR ;
52
54
53
55
/**
54
56
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
@@ -337,35 +339,54 @@ private void deleteFileIfExists(Path filePath) throws IOException {
337
339
}
338
340
}
339
341
342
+ public TranslogTransferMetadata readMetadata (long pinnedTimestamp ) throws IOException {
343
+ if (pinnedTimestamp <= 0 ) {
344
+ return readMetadata ();
345
+ }
346
+ return readMetadata ((blobMetadataList ) -> {
347
+ List <String > metadataFiles = blobMetadataList .stream ().map (BlobMetadata ::name ).collect (Collectors .toList ());
348
+ Set <String > metadataFilesMatchingTimestamp = RemoteStoreUtils .getPinnedTimestampLockedFiles (
349
+ metadataFiles ,
350
+ Set .of (pinnedTimestamp ),
351
+ file -> RemoteStoreUtils .invertLong (file .split (METADATA_SEPARATOR )[3 ]),
352
+ TranslogTransferMetadata ::getNodeIdByPrimaryTermAndGen ,
353
+ true
354
+ );
355
+ if (metadataFilesMatchingTimestamp .isEmpty ()) {
356
+ return null ;
357
+ }
358
+ assert metadataFilesMatchingTimestamp .size () == 1 : "There should be only 1 metadata file matching given timestamp" ;
359
+ return metadataFilesMatchingTimestamp .stream ().findFirst ().get ();
360
+ }, Integer .MAX_VALUE );
361
+ }
362
+
340
363
public TranslogTransferMetadata readMetadata () throws IOException {
364
+ return readMetadata ((blobMetadataList ) -> {
365
+ RemoteStoreUtils .verifyNoMultipleWriters (
366
+ blobMetadataList .stream ().map (BlobMetadata ::name ).collect (Collectors .toList ()),
367
+ TranslogTransferMetadata ::getNodeIdByPrimaryTermAndGen
368
+ );
369
+ return blobMetadataList .get (0 ).name ();
370
+ }, METADATA_FILES_TO_FETCH );
371
+ }
372
+
373
+ private TranslogTransferMetadata readMetadata (Function <List <BlobMetadata >, String > getMetadataFileToRead , int numberOfFilesToFetch )
374
+ throws IOException {
341
375
SetOnce <TranslogTransferMetadata > metadataSetOnce = new SetOnce <>();
342
376
SetOnce <IOException > exceptionSetOnce = new SetOnce <>();
343
377
final CountDownLatch latch = new CountDownLatch (1 );
344
378
LatchedActionListener <List <BlobMetadata >> latchedActionListener = new LatchedActionListener <>(
345
379
ActionListener .wrap (blobMetadataList -> {
346
380
if (blobMetadataList .isEmpty ()) return ;
347
- RemoteStoreUtils .verifyNoMultipleWriters (
348
- blobMetadataList .stream ().map (BlobMetadata ::name ).collect (Collectors .toList ()),
349
- TranslogTransferMetadata ::getNodeIdByPrimaryTermAndGen
350
- );
351
- String filename = blobMetadataList .get (0 ).name ();
352
- boolean downloadStatus = false ;
353
- long downloadStartTime = System .nanoTime (), bytesToRead = 0 ;
354
- try (InputStream inputStream = transferService .downloadBlob (remoteMetadataTransferPath , filename )) {
355
- // Capture number of bytes for stats before reading
356
- bytesToRead = inputStream .available ();
357
- IndexInput indexInput = new ByteArrayIndexInput ("metadata file" , inputStream .readAllBytes ());
358
- metadataSetOnce .set (metadataStreamWrapper .readStream (indexInput ));
359
- downloadStatus = true ;
381
+ String filename = getMetadataFileToRead .apply (blobMetadataList );
382
+ if (filename == null ) {
383
+ return ;
384
+ }
385
+ try {
386
+ metadataSetOnce .set (readMetadata (filename ));
360
387
} catch (IOException e ) {
361
388
logger .error (() -> new ParameterizedMessage ("Exception while reading metadata file: {}" , filename ), e );
362
389
exceptionSetOnce .set (e );
363
- } finally {
364
- remoteTranslogTransferTracker .addDownloadTimeInMillis ((System .nanoTime () - downloadStartTime ) / 1_000_000L );
365
- logger .debug ("translogMetadataDownloadStatus={}" , downloadStatus );
366
- if (downloadStatus ) {
367
- remoteTranslogTransferTracker .addDownloadBytesSucceeded (bytesToRead );
368
- }
369
390
}
370
391
}, e -> {
371
392
if (e instanceof RuntimeException ) {
@@ -381,12 +402,14 @@ public TranslogTransferMetadata readMetadata() throws IOException {
381
402
transferService .listAllInSortedOrder (
382
403
remoteMetadataTransferPath ,
383
404
TranslogTransferMetadata .METADATA_PREFIX ,
384
- METADATA_FILES_TO_FETCH ,
405
+ numberOfFilesToFetch ,
385
406
latchedActionListener
386
407
);
387
- latch .await ();
408
+ if (latch .await (remoteStoreSettings .getClusterRemoteTranslogTransferTimeout ().millis (), TimeUnit .MILLISECONDS ) == false ) {
409
+ throw new RuntimeException ("Timed out reading metadata file" );
410
+ }
388
411
} catch (InterruptedException e ) {
389
- throw new IOException ("Exception while reading/downloading metadafile " , e );
412
+ throw new IOException ("Exception while reading/downloading metadata file " , e );
390
413
}
391
414
392
415
if (exceptionSetOnce .get () != null ) {
@@ -396,6 +419,26 @@ public TranslogTransferMetadata readMetadata() throws IOException {
396
419
return metadataSetOnce .get ();
397
420
}
398
421
422
+ public TranslogTransferMetadata readMetadata (String metadataFilename ) throws IOException {
423
+ boolean downloadStatus = false ;
424
+ TranslogTransferMetadata translogTransferMetadata = null ;
425
+ long downloadStartTime = System .nanoTime (), bytesToRead = 0 ;
426
+ try (InputStream inputStream = transferService .downloadBlob (remoteMetadataTransferPath , metadataFilename )) {
427
+ // Capture number of bytes for stats before reading
428
+ bytesToRead = inputStream .available ();
429
+ IndexInput indexInput = new ByteArrayIndexInput ("metadata file" , inputStream .readAllBytes ());
430
+ translogTransferMetadata = metadataStreamWrapper .readStream (indexInput );
431
+ downloadStatus = true ;
432
+ } finally {
433
+ remoteTranslogTransferTracker .addDownloadTimeInMillis ((System .nanoTime () - downloadStartTime ) / 1_000_000L );
434
+ logger .debug ("translogMetadataDownloadStatus={}" , downloadStatus );
435
+ if (downloadStatus ) {
436
+ remoteTranslogTransferTracker .addDownloadBytesSucceeded (bytesToRead );
437
+ }
438
+ }
439
+ return translogTransferMetadata ;
440
+ }
441
+
399
442
private TransferFileSnapshot prepareMetadata (TransferSnapshot transferSnapshot ) throws IOException {
400
443
Map <String , String > generationPrimaryTermMap = transferSnapshot .getTranslogFileSnapshots ().stream ().map (s -> {
401
444
assert s instanceof TranslogFileSnapshot ;
@@ -549,6 +592,16 @@ public void onFailure(Exception e) {
549
592
});
550
593
}
551
594
595
+ public void listTranslogMetadataFilesAsync (ActionListener <List <BlobMetadata >> listener ) {
596
+ transferService .listAllInSortedOrderAsync (
597
+ ThreadPool .Names .REMOTE_PURGE ,
598
+ remoteMetadataTransferPath ,
599
+ TranslogTransferMetadata .METADATA_PREFIX ,
600
+ Integer .MAX_VALUE ,
601
+ listener
602
+ );
603
+ }
604
+
552
605
public void deleteStaleTranslogMetadataFilesAsync (Runnable onCompletion ) {
553
606
try {
554
607
transferService .listAllInSortedOrderAsync (
@@ -635,7 +688,7 @@ public void onFailure(Exception e) {
635
688
* @param files list of metadata files to be deleted.
636
689
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
637
690
*/
638
- private void deleteMetadataFilesAsync (List <String > files , Runnable onCompletion ) {
691
+ public void deleteMetadataFilesAsync (List <String > files , Runnable onCompletion ) {
639
692
try {
640
693
transferService .deleteBlobsAsync (ThreadPool .Names .REMOTE_PURGE , remoteMetadataTransferPath , files , new ActionListener <>() {
641
694
@ Override
0 commit comments